diff --git a/p2p/peer.go b/p2p/peer.go index a9839ddfe..235d4d3ae 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -214,97 +214,283 @@ type PeerUpdate struct { Status PeerStatus } -// peerStore manages information about peers. It is currently a bare-bones -// in-memory store of peer addresses, and will be fleshed out later. +// peerManager manages peer information, using a peerStore for underlying +// storage. Its primary purpose is to determine which peers to connect to next, +// make sure a peer only has a single active connection (either inbound or outbound), +// and to avoid dialing the same peer in parallel goroutines. // -// The main function of peerStore is currently to dispense peers to connect to -// (via peerStore.Dispense), giving the caller exclusive "ownership" of that -// peer until the peer is returned (via peerStore.Return). This is used to -// schedule and synchronize peer dialing and accepting in the Router, e.g. -// making sure we only have a single connection (in either direction) to peers. -type peerStore struct { - mtx sync.Mutex - peers map[NodeID]*peerInfo - claimed map[NodeID]bool +// For an outbound connection, the flow is as follows: +// - DialNext: returns a peer address to dial, marking the peer as dialing. +// - DialFailed: reports a dial failure, unmarking the peer as dialing. +// - Dialed: successfully dialed, unmarking as dialing and marking as connected +// (or erroring if already connected). +// - Ready: routing is up, broadcasts a PeerStatusUp peer update to subscribers. +// - Disconnected: peer disconnects, unmarking as connected and broadcasts a +// PeerStatusDown peer update. +// +// For an inbound connection, the flow is as follows: +// - Accepted: successfully accepted connection, marking as connected (or erroring +// if already connected). +// - Ready: routing is up, broadcasts a PeerStatusUp peer update to subscribers. +// - Disconnected: peer disconnects, unmarking as connected and broadcasts a +// PeerStatusDown peer update. +// +// We track dialing and connected states independently. This allows us to accept +// an inbound connection from a peer while the router is also dialing an +// outbound connection to that same peer, which will cause the dialer to +// eventually error (when attempting to mark the peer as connected). This also +// avoids race conditions where multiple goroutines may end up dialing a peer if +// an incoming connection was briefly accepted and disconnected while we were +// also dialing. +type peerManager struct { + mtx sync.Mutex + store *peerStore + dialing map[NodeID]bool + connected map[NodeID]bool + subscriptions map[*PeerUpdatesCh]*PeerUpdatesCh // keyed by struct identity (address) +} + +// newPeerManager creates a new peer manager. +func newPeerManager(store *peerStore) *peerManager { + return &peerManager{ + store: store, + dialing: map[NodeID]bool{}, + connected: map[NodeID]bool{}, + subscriptions: map[*PeerUpdatesCh]*PeerUpdatesCh{}, + } +} + +// Add adds a peer to the manager, given as an address. If the peer already +// exists, the address is added to it. +func (m *peerManager) Add(address PeerAddress) error { + if err := address.Validate(); err != nil { + return err + } + peerID := address.NodeID() + + m.mtx.Lock() + defer m.mtx.Unlock() + peer, err := m.store.Get(peerID) + if err != nil { + return err + } + if peer == nil { + peer = newPeerInfo(peerID) + } + if peer.AddAddress(address) { + return m.store.Set(peer) + } + return nil } -// newPeerStore creates a new peer store. -func newPeerStore() *peerStore { - return &peerStore{ - peers: map[NodeID]*peerInfo{}, - claimed: map[NodeID]bool{}, +// Subscribe subscribes to peer updates. The caller must consume the peer +// updates in a timely fashion and close the subscription when done, since +// delivery is guaranteed and will block peer connection/disconnection +// otherwise. +func (m *peerManager) Subscribe() *PeerUpdatesCh { + // FIXME: We may want to use a size 1 buffer here. When the router + // broadcasts a peer update it has to loop over all of the + // subscriptions, and we want to avoid blocking and waiting for a + // context switch before continuing to the next subscription. This also + // prevents tail latencies from compounding across updates. We also want + // to make sure the subscribers are reasonably in sync, so it should be + // kept at 1. However, this should be benchmarked first. + peerUpdates := NewPeerUpdates(make(chan PeerUpdate)) + m.mtx.Lock() + m.subscriptions[peerUpdates] = peerUpdates + m.mtx.Unlock() + + go func() { + <-peerUpdates.Done() + m.mtx.Lock() + delete(m.subscriptions, peerUpdates) + m.mtx.Unlock() + }() + return peerUpdates +} + +// broadcast broadcasts a peer update to all subscriptions. The caller must +// already hold the mutex lock. This means the mutex is held for the duration +// of the broadcast, which we want to make sure all subscriptions receive all +// updates in the same order. +// +// FIXME: Consider using more fine-grained mutexes here, and/or a channel to +// enforce ordering of updates. +func (m *peerManager) broadcast(peerUpdate PeerUpdate) { + for _, sub := range m.subscriptions { + select { + case sub.updatesCh <- peerUpdate: + case <-sub.doneCh: + } } } -// Add adds a peer to the store, given as an address. -func (s *peerStore) Add(address PeerAddress) error { - if err := address.Validate(); err != nil { +// DialNext finds an appropriate peer address to dial, and marks it as dialing. +// The peer will not be returned again until Dialed() or DialFailed() is called +// for the peer and it is no longer connected. +// +// Returns an empty ID if no appropriate peers are available. +func (m *peerManager) DialNext() (NodeID, PeerAddress, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + peers, err := m.store.List() + if err != nil { + return "", PeerAddress{}, err + } + for _, peer := range peers { + switch { + case len(peer.Addresses) == 0: + case m.dialing[peer.ID]: + case m.connected[peer.ID]: + default: + // FIXME: We currently only dial the first address, but we should + // track connection statistics for each address and return the most + // appropriate one. + m.dialing[peer.ID] = true + return peer.ID, peer.Addresses[0], nil + } + } + return "", PeerAddress{}, nil +} + +// DialFailed reports a failed dial attempt. This will make the peer available +// for dialing again when appropriate. +func (m *peerManager) DialFailed(peerID NodeID, address PeerAddress) error { + m.mtx.Lock() + defer m.mtx.Unlock() + delete(m.dialing, peerID) + // FIXME: We need to track address quality statistics and exponential backoff. + return nil +} + +// Dialed marks a peer as successfully dialed. Any further incoming connections +// will be rejected, and once disconnected the peer may be dialed again. +func (m *peerManager) Dialed(peerID NodeID, address PeerAddress) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + peer, err := m.store.Get(peerID) + if err != nil { return err + } else if peer == nil { + return fmt.Errorf("unknown peer %q", peerID) } - peerID := address.NodeID() - s.mtx.Lock() - defer s.mtx.Unlock() + if m.connected[peerID] { + return fmt.Errorf("peer %v is already connected", peerID) + } + delete(m.dialing, peerID) + m.connected[peerID] = true + return nil +} - peer, ok := s.peers[peerID] - if !ok { - peer = newStorePeer(peerID) - s.peers[peerID] = peer - } else if s.claimed[peerID] { - // FIXME: We need to handle modifications of claimed peers somehow. - return fmt.Errorf("peer %q is claimed", peerID) +// Accepted marks an incoming peer connection successfully accepted. If the peer +// is already connected this will return an error. +// +// NOTE: We can't take an address here, since e.g. TCP uses a different port +// number for outbound traffic than inbound traffic, so the peer's endpoint +// wouldn't necessarily be an appropriate address to dial. +func (m *peerManager) Accepted(peerID NodeID) error { + m.mtx.Lock() + defer m.mtx.Unlock() + peer, err := m.store.Get(peerID) + if err != nil { + return err + } else if peer == nil { + peer = newPeerInfo(peerID) + if err = m.store.Set(peer); err != nil { + return err + } + } + if m.connected[peerID] { + return fmt.Errorf("peer %q is already connected", peerID) } - peer.AddAddress(address) + m.connected[peerID] = true return nil } -// Claim claims a peer. The caller has exclusive ownership of the peer, and must -// return it by calling Return(). Returns nil if the peer could not be claimed. -// If the peer is not known to the store, it is registered and claimed. -func (s *peerStore) Claim(id NodeID) *peerInfo { - s.mtx.Lock() - defer s.mtx.Unlock() - if s.claimed[id] { - return nil +// Ready marks a peer as ready, broadcasting status updates to subscribers. The +// peer must already be marked as connected. This is separate from Dialed() and +// Accepted() to allow the router to set up its internal queues before reactors +// start sending messages (holding the Router.peerMtx mutex while calling +// Accepted or Dialed will halt all message routing while peers are set up, which +// is too expensive and also causes difficulties in tests where we may want to +// consume peer updates and send messages sequentially). +// +// FIXME: This possibly indicates an architectural problem. Should the peerManager +// handle actual network connections to/from peers as well? Or should all of this +// be done by the router? +func (m *peerManager) Ready(peerID NodeID) { + m.mtx.Lock() + defer m.mtx.Unlock() + connected := m.connected[peerID] + if connected { + m.broadcast(PeerUpdate{ + PeerID: peerID, + Status: PeerStatusUp, + }) + } +} + +// Disconnected unmarks a peer as connected, allowing new connections to be +// established. +func (m *peerManager) Disconnected(peerID NodeID) error { + m.mtx.Lock() + defer m.mtx.Unlock() + delete(m.connected, peerID) + m.broadcast(PeerUpdate{ + PeerID: peerID, + Status: PeerStatusDown, + }) + return nil +} + +// peerStore stores information about peers. It is currently a bare-bones +// in-memory store, and will be fleshed out later. +// +// peerStore is not thread-safe, since it assumes it is only used by peerManager +// which handles concurrency control. This allows multiple operations to be +// executed atomically, since the peerManager will hold a mutex while executing. +type peerStore struct { + peers map[NodeID]peerInfo +} + +// newPeerStore creates a new peer store. +func newPeerStore() *peerStore { + return &peerStore{ + peers: map[NodeID]peerInfo{}, } +} + +// Get fetches a peer, returning nil if not found. +func (s *peerStore) Get(id NodeID) (*peerInfo, error) { peer, ok := s.peers[id] if !ok { - peer = newStorePeer(id) - s.peers[id] = peer + return nil, nil } - s.claimed[id] = true - return peer + return &peer, nil } -// Dispense finds an appropriate peer to contact and claims it. The caller has -// exclusive ownership of the peer, and must return it by calling Return(). The -// peer will not be dispensed again until returned. -// -// Returns nil if no appropriate peers are available. -func (s *peerStore) Dispense() *peerInfo { - s.mtx.Lock() - defer s.mtx.Unlock() - for key, peer := range s.peers { - switch { - case len(peer.Addresses) == 0: - case s.claimed[key]: - default: - s.claimed[key] = true - return peer - } +// Set stores peer data. +func (s *peerStore) Set(peer *peerInfo) error { + if peer == nil { + return errors.New("peer cannot be nil") } + s.peers[peer.ID] = *peer return nil } -// Return returns a claimed peer, making it available for other -// callers to claim. -func (s *peerStore) Return(id NodeID) { - s.mtx.Lock() - defer s.mtx.Unlock() - delete(s.claimed, id) +// List retrieves all peers. +func (s *peerStore) List() ([]*peerInfo, error) { + peers := []*peerInfo{} + for _, peer := range s.peers { + peer := peer + peers = append(peers, &peer) + } + return peers, nil } -// peerInfo is a peer stored in the peerStore. +// peerInfo contains peer information stored in a peerStore. // // FIXME: This should be renamed peer or something else once the old peer is // removed. @@ -313,8 +499,8 @@ type peerInfo struct { Addresses []PeerAddress } -// newStorePeer creates a new storePeer. -func newStorePeer(id NodeID) *peerInfo { +// newPeerInfo creates a new peerInfo. +func newPeerInfo(id NodeID) *peerInfo { return &peerInfo{ ID: id, Addresses: []PeerAddress{}, @@ -322,16 +508,17 @@ func newStorePeer(id NodeID) *peerInfo { } // AddAddress adds an address to a peer, unless it already exists. It does not -// validate the address. -func (p *peerInfo) AddAddress(address PeerAddress) { +// validate the address. Returns true if the address was new. +func (p *peerInfo) AddAddress(address PeerAddress) bool { // We just do a linear search for now. addressString := address.String() for _, a := range p.Addresses { if a.String() == addressString { - return + return false } } p.Addresses = append(p.Addresses, address) + return true } // ============================================================================ diff --git a/p2p/router.go b/p2p/router.go index ec63447f2..f33ff1c65 100644 --- a/p2p/router.go +++ b/p2p/router.go @@ -2,7 +2,6 @@ package p2p import ( "context" - "errors" "fmt" "io" "sync" @@ -73,9 +72,9 @@ import ( // forever on a channel that has no consumer. type Router struct { *service.BaseService - logger log.Logger - transports map[Protocol]Transport - store *peerStore + logger log.Logger + transports map[Protocol]Transport + peerManager *peerManager // FIXME: Consider using sync.Map. peerMtx sync.RWMutex @@ -88,10 +87,6 @@ type Router struct { channelQueues map[ChannelID]queue channelMessages map[ChannelID]proto.Message - peerUpdatesCh chan PeerUpdate - peerUpdatesMtx sync.RWMutex - peerUpdatesSubs map[*PeerUpdatesCh]*PeerUpdatesCh // keyed by struct identity (address) - // stopCh is used to signal router shutdown, by closing the channel. stopCh chan struct{} } @@ -105,18 +100,16 @@ func NewRouter(logger log.Logger, transports map[Protocol]Transport, peers []Pee router := &Router{ logger: logger, transports: transports, - store: newPeerStore(), + peerManager: newPeerManager(newPeerStore()), stopCh: make(chan struct{}), channelQueues: map[ChannelID]queue{}, channelMessages: map[ChannelID]proto.Message{}, peerQueues: map[NodeID]queue{}, - peerUpdatesCh: make(chan PeerUpdate), - peerUpdatesSubs: map[*PeerUpdatesCh]*PeerUpdatesCh{}, } router.BaseService = service.NewBaseService(logger, "router", router) for _, address := range peers { - if err := router.store.Add(address); err != nil { + if err := router.peerManager.Add(address); err != nil { logger.Error("failed to add peer", "address", address, "err", err) } } @@ -259,26 +252,31 @@ func (r *Router) acceptPeers(transport Transport) { continue } - peerID := conn.NodeInfo().NodeID - if r.store.Claim(peerID) == nil { - r.logger.Error("already connected to peer, rejecting connection", "peer", peerID) - _ = conn.Close() - continue - } + go func() { + defer func() { + _ = conn.Close() + }() + + peerID := conn.NodeInfo().NodeID + if err := r.peerManager.Accepted(peerID); err != nil { + r.logger.Error("failed to accept connection", "peer", peerID, "err", err) + return + } - queue := newFIFOQueue() - r.peerMtx.Lock() - r.peerQueues[peerID] = queue - r.peerMtx.Unlock() + queue := newFIFOQueue() + r.peerMtx.Lock() + r.peerQueues[peerID] = queue + r.peerMtx.Unlock() + r.peerManager.Ready(peerID) - go func() { defer func() { r.peerMtx.Lock() delete(r.peerQueues, peerID) r.peerMtx.Unlock() queue.close() - _ = conn.Close() - r.store.Return(peerID) + if err := r.peerManager.Disconnected(peerID); err != nil { + r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err) + } }() r.routePeer(peerID, conn, queue) @@ -295,8 +293,11 @@ func (r *Router) dialPeers() { default: } - peer := r.store.Dispense() - if peer == nil { + peerID, address, err := r.peerManager.DialNext() + if err != nil { + r.logger.Error("failed to find next peer to dial", "err", err) + return + } else if peerID == "" { r.logger.Debug("no eligible peers, sleeping") select { case <-time.After(time.Second): @@ -307,63 +308,75 @@ func (r *Router) dialPeers() { } go func() { - defer r.store.Return(peer.ID) - conn, err := r.dialPeer(peer) + conn, err := r.dialPeer(address) if err != nil { - r.logger.Error("failed to dial peer, will retry", "peer", peer.ID) + r.logger.Error("failed to dial peer, will retry", "peer", peerID) + if err = r.peerManager.DialFailed(peerID, address); err != nil { + r.logger.Error("failed to report dial failure", "peer", peerID, "err", err) + } return } defer conn.Close() + if err = r.peerManager.Dialed(peerID, address); err != nil { + r.logger.Error("failed to dial peer", "peer", peerID, "err", err) + return + } + queue := newFIFOQueue() - defer queue.close() r.peerMtx.Lock() - r.peerQueues[peer.ID] = queue + r.peerQueues[peerID] = queue r.peerMtx.Unlock() + r.peerManager.Ready(peerID) defer func() { r.peerMtx.Lock() - delete(r.peerQueues, peer.ID) + delete(r.peerQueues, peerID) r.peerMtx.Unlock() + queue.close() + if err := r.peerManager.Disconnected(peerID); err != nil { + r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err) + } }() - r.routePeer(peer.ID, conn, queue) + r.routePeer(peerID, conn, queue) }() } } // dialPeer attempts to connect to a peer. -func (r *Router) dialPeer(peer *peerInfo) (Connection, error) { +func (r *Router) dialPeer(address PeerAddress) (Connection, error) { ctx := context.Background() - for _, address := range peer.Addresses { - resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - r.logger.Info("resolving peer address", "peer", peer.ID, "address", address) - endpoints, err := address.Resolve(resolveCtx) - if err != nil { - r.logger.Error("failed to resolve address", "address", address, "err", err) + resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + r.logger.Info("resolving peer address", "address", address) + + endpoints, err := address.Resolve(resolveCtx) + if err != nil { + return nil, fmt.Errorf("failed to resolve address %q: %w", address, err) + } + + for _, endpoint := range endpoints { + t, ok := r.transports[endpoint.Protocol] + if !ok { + r.logger.Error("no transport found for protocol", "protocol", endpoint.Protocol) continue } - for _, endpoint := range endpoints { - t, ok := r.transports[endpoint.Protocol] - if !ok { - r.logger.Error("no transport found for protocol", "protocol", endpoint.Protocol) - continue - } - dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - conn, err := t.Dial(dialCtx, endpoint) - if err != nil { - r.logger.Error("failed to dial endpoint", "endpoint", endpoint) - } else { - r.logger.Info("connected to peer", "peer", peer.ID, "endpoint", endpoint) - return conn, nil - } + dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + conn, err := t.Dial(dialCtx, endpoint) + if err != nil { + r.logger.Error("failed to dial endpoint", "endpoint", endpoint) + } else { + r.logger.Info("connected to peer", "peer", address.NodeID(), "endpoint", endpoint) + return conn, nil } } - return nil, errors.New("failed to connect to peer") + return nil, fmt.Errorf("failed to connect to peer via %q", address) } // routePeer routes inbound messages from a peer to channels, and also sends @@ -372,18 +385,7 @@ func (r *Router) dialPeer(peer *peerInfo) (Connection, error) { // sendPeer() goroutines. It blocks until the peer is done, e.g. when the // connection or queue is closed. func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) { - // FIXME: Peer updates should probably be handled by the peer store. - r.peerUpdatesCh <- PeerUpdate{ - PeerID: peerID, - Status: PeerStatusUp, - } - defer func() { - r.peerUpdatesCh <- PeerUpdate{ - PeerID: peerID, - Status: PeerStatusDown, - } - }() - + r.logger.Info("routing peer", "peer", peerID) resultsCh := make(chan error, 2) go func() { resultsCh <- r.receivePeer(peerID, conn) @@ -479,64 +481,19 @@ func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error { } // SubscribePeerUpdates creates a new peer updates subscription. The caller must -// consume the peer updates in a timely fashion, since delivery is guaranteed and -// will block peer connection/disconnection otherwise. -func (r *Router) SubscribePeerUpdates() (*PeerUpdatesCh, error) { - // FIXME: We may want to use a size 1 buffer here. When the router - // broadcasts a peer update it has to loop over all of the - // subscriptions, and we want to avoid blocking and waiting for a - // context switch before continuing to the next subscription. This also - // prevents tail latencies from compounding across updates. We also want - // to make sure the subscribers are reasonably in sync, so it should be - // kept at 1. However, this should be benchmarked first. - peerUpdates := NewPeerUpdates(make(chan PeerUpdate)) - r.peerUpdatesMtx.Lock() - r.peerUpdatesSubs[peerUpdates] = peerUpdates - r.peerUpdatesMtx.Unlock() - - go func() { - select { - case <-peerUpdates.Done(): - r.peerUpdatesMtx.Lock() - delete(r.peerUpdatesSubs, peerUpdates) - r.peerUpdatesMtx.Unlock() - case <-r.stopCh: - } - }() - return peerUpdates, nil -} - -// broadcastPeerUpdates broadcasts peer updates received from the router -// to all subscriptions. -func (r *Router) broadcastPeerUpdates() { - for { - select { - case peerUpdate := <-r.peerUpdatesCh: - subs := []*PeerUpdatesCh{} - r.peerUpdatesMtx.RLock() - for _, sub := range r.peerUpdatesSubs { - subs = append(subs, sub) - } - r.peerUpdatesMtx.RUnlock() - - for _, sub := range subs { - select { - case sub.updatesCh <- peerUpdate: - case <-sub.doneCh: - case <-r.stopCh: - return - } - } - - case <-r.stopCh: - return - } - } +// consume the peer updates in a timely fashion and close the subscription when +// done, since delivery is guaranteed and will block peer +// connection/disconnection otherwise. +// +// FIXME: Consider having callers just use peerManager.Subscribe() directly, if +// we export peerManager and make it an injected dependency (which we probably +// should). +func (r *Router) SubscribePeerUpdates() *PeerUpdatesCh { + return r.peerManager.Subscribe() } // OnStart implements service.Service. func (r *Router) OnStart() error { - go r.broadcastPeerUpdates() go r.dialPeers() for _, transport := range r.transports { go r.acceptPeers(transport) diff --git a/p2p/router_test.go b/p2p/router_test.go index 042200909..393cbd407 100644 --- a/p2p/router_test.go +++ b/p2p/router_test.go @@ -60,8 +60,7 @@ func TestRouter(t *testing.T) { }, peers) channel, err := router.OpenChannel(chID, &TestMessage{}) require.NoError(t, err) - peerUpdates, err := router.SubscribePeerUpdates() - require.NoError(t, err) + peerUpdates := router.SubscribePeerUpdates() err = router.Start() require.NoError(t, err)