From 96215a06ed6fba31e105d88441bd63102b7aef13 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 18 Jan 2021 19:56:13 +0100 Subject: [PATCH] p2p: add prototype peer lifecycle manager (#5882) This adds a prototype peer lifecycle manager, `peerManager`, which stores peer data in an internal `peerStore`. The overall idea here is to have methods for peer lifecycle events which exchange a very narrow subset of peer data, and to keep all of the peer metadata (i.e. the `peerInfo` struct) internal, to decouple this from the router and simplify concurrency control. See `peerManager` GoDoc for more information. The router is still responsible for actually dialing and accepting peer connections, and routing messages across them, but the peer manager is responsible for determining which peers to dial next, preventing multiple connections being established for the same peer (e.g. both inbound and outbound), and making sure we don't dial the same peer several times in parallel. Later it will also track retries and exponential backoff, as well as peer and address quality. It also assumes responsibility for peer updates subscriptions. It's a bit unclear to me whether we want the peer manager to take on the responsibility of actually dialing and accepting connections as well, or if it should only be tracking peer state for the router while the router is responsible for all transport concerns. Let's revisit this later. --- p2p/peer.go | 327 +++++++++++++++++++++++++++++++++++---------- p2p/router.go | 205 +++++++++++----------------- p2p/router_test.go | 3 +- 3 files changed, 339 insertions(+), 196 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index a9839ddfe..235d4d3ae 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -214,97 +214,283 @@ type PeerUpdate struct { Status PeerStatus } -// peerStore manages information about peers. It is currently a bare-bones -// in-memory store of peer addresses, and will be fleshed out later. +// peerManager manages peer information, using a peerStore for underlying +// storage. Its primary purpose is to determine which peers to connect to next, +// make sure a peer only has a single active connection (either inbound or outbound), +// and to avoid dialing the same peer in parallel goroutines. // -// The main function of peerStore is currently to dispense peers to connect to -// (via peerStore.Dispense), giving the caller exclusive "ownership" of that -// peer until the peer is returned (via peerStore.Return). This is used to -// schedule and synchronize peer dialing and accepting in the Router, e.g. -// making sure we only have a single connection (in either direction) to peers. -type peerStore struct { - mtx sync.Mutex - peers map[NodeID]*peerInfo - claimed map[NodeID]bool +// For an outbound connection, the flow is as follows: +// - DialNext: returns a peer address to dial, marking the peer as dialing. +// - DialFailed: reports a dial failure, unmarking the peer as dialing. +// - Dialed: successfully dialed, unmarking as dialing and marking as connected +// (or erroring if already connected). +// - Ready: routing is up, broadcasts a PeerStatusUp peer update to subscribers. +// - Disconnected: peer disconnects, unmarking as connected and broadcasts a +// PeerStatusDown peer update. +// +// For an inbound connection, the flow is as follows: +// - Accepted: successfully accepted connection, marking as connected (or erroring +// if already connected). +// - Ready: routing is up, broadcasts a PeerStatusUp peer update to subscribers. +// - Disconnected: peer disconnects, unmarking as connected and broadcasts a +// PeerStatusDown peer update. +// +// We track dialing and connected states independently. This allows us to accept +// an inbound connection from a peer while the router is also dialing an +// outbound connection to that same peer, which will cause the dialer to +// eventually error (when attempting to mark the peer as connected). This also +// avoids race conditions where multiple goroutines may end up dialing a peer if +// an incoming connection was briefly accepted and disconnected while we were +// also dialing. +type peerManager struct { + mtx sync.Mutex + store *peerStore + dialing map[NodeID]bool + connected map[NodeID]bool + subscriptions map[*PeerUpdatesCh]*PeerUpdatesCh // keyed by struct identity (address) +} + +// newPeerManager creates a new peer manager. +func newPeerManager(store *peerStore) *peerManager { + return &peerManager{ + store: store, + dialing: map[NodeID]bool{}, + connected: map[NodeID]bool{}, + subscriptions: map[*PeerUpdatesCh]*PeerUpdatesCh{}, + } +} + +// Add adds a peer to the manager, given as an address. If the peer already +// exists, the address is added to it. +func (m *peerManager) Add(address PeerAddress) error { + if err := address.Validate(); err != nil { + return err + } + peerID := address.NodeID() + + m.mtx.Lock() + defer m.mtx.Unlock() + peer, err := m.store.Get(peerID) + if err != nil { + return err + } + if peer == nil { + peer = newPeerInfo(peerID) + } + if peer.AddAddress(address) { + return m.store.Set(peer) + } + return nil } -// newPeerStore creates a new peer store. -func newPeerStore() *peerStore { - return &peerStore{ - peers: map[NodeID]*peerInfo{}, - claimed: map[NodeID]bool{}, +// Subscribe subscribes to peer updates. The caller must consume the peer +// updates in a timely fashion and close the subscription when done, since +// delivery is guaranteed and will block peer connection/disconnection +// otherwise. +func (m *peerManager) Subscribe() *PeerUpdatesCh { + // FIXME: We may want to use a size 1 buffer here. When the router + // broadcasts a peer update it has to loop over all of the + // subscriptions, and we want to avoid blocking and waiting for a + // context switch before continuing to the next subscription. This also + // prevents tail latencies from compounding across updates. We also want + // to make sure the subscribers are reasonably in sync, so it should be + // kept at 1. However, this should be benchmarked first. + peerUpdates := NewPeerUpdates(make(chan PeerUpdate)) + m.mtx.Lock() + m.subscriptions[peerUpdates] = peerUpdates + m.mtx.Unlock() + + go func() { + <-peerUpdates.Done() + m.mtx.Lock() + delete(m.subscriptions, peerUpdates) + m.mtx.Unlock() + }() + return peerUpdates +} + +// broadcast broadcasts a peer update to all subscriptions. The caller must +// already hold the mutex lock. This means the mutex is held for the duration +// of the broadcast, which we want to make sure all subscriptions receive all +// updates in the same order. +// +// FIXME: Consider using more fine-grained mutexes here, and/or a channel to +// enforce ordering of updates. +func (m *peerManager) broadcast(peerUpdate PeerUpdate) { + for _, sub := range m.subscriptions { + select { + case sub.updatesCh <- peerUpdate: + case <-sub.doneCh: + } } } -// Add adds a peer to the store, given as an address. -func (s *peerStore) Add(address PeerAddress) error { - if err := address.Validate(); err != nil { +// DialNext finds an appropriate peer address to dial, and marks it as dialing. +// The peer will not be returned again until Dialed() or DialFailed() is called +// for the peer and it is no longer connected. +// +// Returns an empty ID if no appropriate peers are available. +func (m *peerManager) DialNext() (NodeID, PeerAddress, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + peers, err := m.store.List() + if err != nil { + return "", PeerAddress{}, err + } + for _, peer := range peers { + switch { + case len(peer.Addresses) == 0: + case m.dialing[peer.ID]: + case m.connected[peer.ID]: + default: + // FIXME: We currently only dial the first address, but we should + // track connection statistics for each address and return the most + // appropriate one. + m.dialing[peer.ID] = true + return peer.ID, peer.Addresses[0], nil + } + } + return "", PeerAddress{}, nil +} + +// DialFailed reports a failed dial attempt. This will make the peer available +// for dialing again when appropriate. +func (m *peerManager) DialFailed(peerID NodeID, address PeerAddress) error { + m.mtx.Lock() + defer m.mtx.Unlock() + delete(m.dialing, peerID) + // FIXME: We need to track address quality statistics and exponential backoff. + return nil +} + +// Dialed marks a peer as successfully dialed. Any further incoming connections +// will be rejected, and once disconnected the peer may be dialed again. +func (m *peerManager) Dialed(peerID NodeID, address PeerAddress) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + peer, err := m.store.Get(peerID) + if err != nil { return err + } else if peer == nil { + return fmt.Errorf("unknown peer %q", peerID) } - peerID := address.NodeID() - s.mtx.Lock() - defer s.mtx.Unlock() + if m.connected[peerID] { + return fmt.Errorf("peer %v is already connected", peerID) + } + delete(m.dialing, peerID) + m.connected[peerID] = true + return nil +} - peer, ok := s.peers[peerID] - if !ok { - peer = newStorePeer(peerID) - s.peers[peerID] = peer - } else if s.claimed[peerID] { - // FIXME: We need to handle modifications of claimed peers somehow. - return fmt.Errorf("peer %q is claimed", peerID) +// Accepted marks an incoming peer connection successfully accepted. If the peer +// is already connected this will return an error. +// +// NOTE: We can't take an address here, since e.g. TCP uses a different port +// number for outbound traffic than inbound traffic, so the peer's endpoint +// wouldn't necessarily be an appropriate address to dial. +func (m *peerManager) Accepted(peerID NodeID) error { + m.mtx.Lock() + defer m.mtx.Unlock() + peer, err := m.store.Get(peerID) + if err != nil { + return err + } else if peer == nil { + peer = newPeerInfo(peerID) + if err = m.store.Set(peer); err != nil { + return err + } + } + if m.connected[peerID] { + return fmt.Errorf("peer %q is already connected", peerID) } - peer.AddAddress(address) + m.connected[peerID] = true return nil } -// Claim claims a peer. The caller has exclusive ownership of the peer, and must -// return it by calling Return(). Returns nil if the peer could not be claimed. -// If the peer is not known to the store, it is registered and claimed. -func (s *peerStore) Claim(id NodeID) *peerInfo { - s.mtx.Lock() - defer s.mtx.Unlock() - if s.claimed[id] { - return nil +// Ready marks a peer as ready, broadcasting status updates to subscribers. The +// peer must already be marked as connected. This is separate from Dialed() and +// Accepted() to allow the router to set up its internal queues before reactors +// start sending messages (holding the Router.peerMtx mutex while calling +// Accepted or Dialed will halt all message routing while peers are set up, which +// is too expensive and also causes difficulties in tests where we may want to +// consume peer updates and send messages sequentially). +// +// FIXME: This possibly indicates an architectural problem. Should the peerManager +// handle actual network connections to/from peers as well? Or should all of this +// be done by the router? +func (m *peerManager) Ready(peerID NodeID) { + m.mtx.Lock() + defer m.mtx.Unlock() + connected := m.connected[peerID] + if connected { + m.broadcast(PeerUpdate{ + PeerID: peerID, + Status: PeerStatusUp, + }) + } +} + +// Disconnected unmarks a peer as connected, allowing new connections to be +// established. +func (m *peerManager) Disconnected(peerID NodeID) error { + m.mtx.Lock() + defer m.mtx.Unlock() + delete(m.connected, peerID) + m.broadcast(PeerUpdate{ + PeerID: peerID, + Status: PeerStatusDown, + }) + return nil +} + +// peerStore stores information about peers. It is currently a bare-bones +// in-memory store, and will be fleshed out later. +// +// peerStore is not thread-safe, since it assumes it is only used by peerManager +// which handles concurrency control. This allows multiple operations to be +// executed atomically, since the peerManager will hold a mutex while executing. +type peerStore struct { + peers map[NodeID]peerInfo +} + +// newPeerStore creates a new peer store. +func newPeerStore() *peerStore { + return &peerStore{ + peers: map[NodeID]peerInfo{}, } +} + +// Get fetches a peer, returning nil if not found. +func (s *peerStore) Get(id NodeID) (*peerInfo, error) { peer, ok := s.peers[id] if !ok { - peer = newStorePeer(id) - s.peers[id] = peer + return nil, nil } - s.claimed[id] = true - return peer + return &peer, nil } -// Dispense finds an appropriate peer to contact and claims it. The caller has -// exclusive ownership of the peer, and must return it by calling Return(). The -// peer will not be dispensed again until returned. -// -// Returns nil if no appropriate peers are available. -func (s *peerStore) Dispense() *peerInfo { - s.mtx.Lock() - defer s.mtx.Unlock() - for key, peer := range s.peers { - switch { - case len(peer.Addresses) == 0: - case s.claimed[key]: - default: - s.claimed[key] = true - return peer - } +// Set stores peer data. +func (s *peerStore) Set(peer *peerInfo) error { + if peer == nil { + return errors.New("peer cannot be nil") } + s.peers[peer.ID] = *peer return nil } -// Return returns a claimed peer, making it available for other -// callers to claim. -func (s *peerStore) Return(id NodeID) { - s.mtx.Lock() - defer s.mtx.Unlock() - delete(s.claimed, id) +// List retrieves all peers. +func (s *peerStore) List() ([]*peerInfo, error) { + peers := []*peerInfo{} + for _, peer := range s.peers { + peer := peer + peers = append(peers, &peer) + } + return peers, nil } -// peerInfo is a peer stored in the peerStore. +// peerInfo contains peer information stored in a peerStore. // // FIXME: This should be renamed peer or something else once the old peer is // removed. @@ -313,8 +499,8 @@ type peerInfo struct { Addresses []PeerAddress } -// newStorePeer creates a new storePeer. -func newStorePeer(id NodeID) *peerInfo { +// newPeerInfo creates a new peerInfo. +func newPeerInfo(id NodeID) *peerInfo { return &peerInfo{ ID: id, Addresses: []PeerAddress{}, @@ -322,16 +508,17 @@ func newStorePeer(id NodeID) *peerInfo { } // AddAddress adds an address to a peer, unless it already exists. It does not -// validate the address. -func (p *peerInfo) AddAddress(address PeerAddress) { +// validate the address. Returns true if the address was new. +func (p *peerInfo) AddAddress(address PeerAddress) bool { // We just do a linear search for now. addressString := address.String() for _, a := range p.Addresses { if a.String() == addressString { - return + return false } } p.Addresses = append(p.Addresses, address) + return true } // ============================================================================ diff --git a/p2p/router.go b/p2p/router.go index ec63447f2..f33ff1c65 100644 --- a/p2p/router.go +++ b/p2p/router.go @@ -2,7 +2,6 @@ package p2p import ( "context" - "errors" "fmt" "io" "sync" @@ -73,9 +72,9 @@ import ( // forever on a channel that has no consumer. type Router struct { *service.BaseService - logger log.Logger - transports map[Protocol]Transport - store *peerStore + logger log.Logger + transports map[Protocol]Transport + peerManager *peerManager // FIXME: Consider using sync.Map. peerMtx sync.RWMutex @@ -88,10 +87,6 @@ type Router struct { channelQueues map[ChannelID]queue channelMessages map[ChannelID]proto.Message - peerUpdatesCh chan PeerUpdate - peerUpdatesMtx sync.RWMutex - peerUpdatesSubs map[*PeerUpdatesCh]*PeerUpdatesCh // keyed by struct identity (address) - // stopCh is used to signal router shutdown, by closing the channel. stopCh chan struct{} } @@ -105,18 +100,16 @@ func NewRouter(logger log.Logger, transports map[Protocol]Transport, peers []Pee router := &Router{ logger: logger, transports: transports, - store: newPeerStore(), + peerManager: newPeerManager(newPeerStore()), stopCh: make(chan struct{}), channelQueues: map[ChannelID]queue{}, channelMessages: map[ChannelID]proto.Message{}, peerQueues: map[NodeID]queue{}, - peerUpdatesCh: make(chan PeerUpdate), - peerUpdatesSubs: map[*PeerUpdatesCh]*PeerUpdatesCh{}, } router.BaseService = service.NewBaseService(logger, "router", router) for _, address := range peers { - if err := router.store.Add(address); err != nil { + if err := router.peerManager.Add(address); err != nil { logger.Error("failed to add peer", "address", address, "err", err) } } @@ -259,26 +252,31 @@ func (r *Router) acceptPeers(transport Transport) { continue } - peerID := conn.NodeInfo().NodeID - if r.store.Claim(peerID) == nil { - r.logger.Error("already connected to peer, rejecting connection", "peer", peerID) - _ = conn.Close() - continue - } + go func() { + defer func() { + _ = conn.Close() + }() + + peerID := conn.NodeInfo().NodeID + if err := r.peerManager.Accepted(peerID); err != nil { + r.logger.Error("failed to accept connection", "peer", peerID, "err", err) + return + } - queue := newFIFOQueue() - r.peerMtx.Lock() - r.peerQueues[peerID] = queue - r.peerMtx.Unlock() + queue := newFIFOQueue() + r.peerMtx.Lock() + r.peerQueues[peerID] = queue + r.peerMtx.Unlock() + r.peerManager.Ready(peerID) - go func() { defer func() { r.peerMtx.Lock() delete(r.peerQueues, peerID) r.peerMtx.Unlock() queue.close() - _ = conn.Close() - r.store.Return(peerID) + if err := r.peerManager.Disconnected(peerID); err != nil { + r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err) + } }() r.routePeer(peerID, conn, queue) @@ -295,8 +293,11 @@ func (r *Router) dialPeers() { default: } - peer := r.store.Dispense() - if peer == nil { + peerID, address, err := r.peerManager.DialNext() + if err != nil { + r.logger.Error("failed to find next peer to dial", "err", err) + return + } else if peerID == "" { r.logger.Debug("no eligible peers, sleeping") select { case <-time.After(time.Second): @@ -307,63 +308,75 @@ func (r *Router) dialPeers() { } go func() { - defer r.store.Return(peer.ID) - conn, err := r.dialPeer(peer) + conn, err := r.dialPeer(address) if err != nil { - r.logger.Error("failed to dial peer, will retry", "peer", peer.ID) + r.logger.Error("failed to dial peer, will retry", "peer", peerID) + if err = r.peerManager.DialFailed(peerID, address); err != nil { + r.logger.Error("failed to report dial failure", "peer", peerID, "err", err) + } return } defer conn.Close() + if err = r.peerManager.Dialed(peerID, address); err != nil { + r.logger.Error("failed to dial peer", "peer", peerID, "err", err) + return + } + queue := newFIFOQueue() - defer queue.close() r.peerMtx.Lock() - r.peerQueues[peer.ID] = queue + r.peerQueues[peerID] = queue r.peerMtx.Unlock() + r.peerManager.Ready(peerID) defer func() { r.peerMtx.Lock() - delete(r.peerQueues, peer.ID) + delete(r.peerQueues, peerID) r.peerMtx.Unlock() + queue.close() + if err := r.peerManager.Disconnected(peerID); err != nil { + r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err) + } }() - r.routePeer(peer.ID, conn, queue) + r.routePeer(peerID, conn, queue) }() } } // dialPeer attempts to connect to a peer. -func (r *Router) dialPeer(peer *peerInfo) (Connection, error) { +func (r *Router) dialPeer(address PeerAddress) (Connection, error) { ctx := context.Background() - for _, address := range peer.Addresses { - resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - r.logger.Info("resolving peer address", "peer", peer.ID, "address", address) - endpoints, err := address.Resolve(resolveCtx) - if err != nil { - r.logger.Error("failed to resolve address", "address", address, "err", err) + resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + r.logger.Info("resolving peer address", "address", address) + + endpoints, err := address.Resolve(resolveCtx) + if err != nil { + return nil, fmt.Errorf("failed to resolve address %q: %w", address, err) + } + + for _, endpoint := range endpoints { + t, ok := r.transports[endpoint.Protocol] + if !ok { + r.logger.Error("no transport found for protocol", "protocol", endpoint.Protocol) continue } - for _, endpoint := range endpoints { - t, ok := r.transports[endpoint.Protocol] - if !ok { - r.logger.Error("no transport found for protocol", "protocol", endpoint.Protocol) - continue - } - dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - conn, err := t.Dial(dialCtx, endpoint) - if err != nil { - r.logger.Error("failed to dial endpoint", "endpoint", endpoint) - } else { - r.logger.Info("connected to peer", "peer", peer.ID, "endpoint", endpoint) - return conn, nil - } + dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + conn, err := t.Dial(dialCtx, endpoint) + if err != nil { + r.logger.Error("failed to dial endpoint", "endpoint", endpoint) + } else { + r.logger.Info("connected to peer", "peer", address.NodeID(), "endpoint", endpoint) + return conn, nil } } - return nil, errors.New("failed to connect to peer") + return nil, fmt.Errorf("failed to connect to peer via %q", address) } // routePeer routes inbound messages from a peer to channels, and also sends @@ -372,18 +385,7 @@ func (r *Router) dialPeer(peer *peerInfo) (Connection, error) { // sendPeer() goroutines. It blocks until the peer is done, e.g. when the // connection or queue is closed. func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) { - // FIXME: Peer updates should probably be handled by the peer store. - r.peerUpdatesCh <- PeerUpdate{ - PeerID: peerID, - Status: PeerStatusUp, - } - defer func() { - r.peerUpdatesCh <- PeerUpdate{ - PeerID: peerID, - Status: PeerStatusDown, - } - }() - + r.logger.Info("routing peer", "peer", peerID) resultsCh := make(chan error, 2) go func() { resultsCh <- r.receivePeer(peerID, conn) @@ -479,64 +481,19 @@ func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error { } // SubscribePeerUpdates creates a new peer updates subscription. The caller must -// consume the peer updates in a timely fashion, since delivery is guaranteed and -// will block peer connection/disconnection otherwise. -func (r *Router) SubscribePeerUpdates() (*PeerUpdatesCh, error) { - // FIXME: We may want to use a size 1 buffer here. When the router - // broadcasts a peer update it has to loop over all of the - // subscriptions, and we want to avoid blocking and waiting for a - // context switch before continuing to the next subscription. This also - // prevents tail latencies from compounding across updates. We also want - // to make sure the subscribers are reasonably in sync, so it should be - // kept at 1. However, this should be benchmarked first. - peerUpdates := NewPeerUpdates(make(chan PeerUpdate)) - r.peerUpdatesMtx.Lock() - r.peerUpdatesSubs[peerUpdates] = peerUpdates - r.peerUpdatesMtx.Unlock() - - go func() { - select { - case <-peerUpdates.Done(): - r.peerUpdatesMtx.Lock() - delete(r.peerUpdatesSubs, peerUpdates) - r.peerUpdatesMtx.Unlock() - case <-r.stopCh: - } - }() - return peerUpdates, nil -} - -// broadcastPeerUpdates broadcasts peer updates received from the router -// to all subscriptions. -func (r *Router) broadcastPeerUpdates() { - for { - select { - case peerUpdate := <-r.peerUpdatesCh: - subs := []*PeerUpdatesCh{} - r.peerUpdatesMtx.RLock() - for _, sub := range r.peerUpdatesSubs { - subs = append(subs, sub) - } - r.peerUpdatesMtx.RUnlock() - - for _, sub := range subs { - select { - case sub.updatesCh <- peerUpdate: - case <-sub.doneCh: - case <-r.stopCh: - return - } - } - - case <-r.stopCh: - return - } - } +// consume the peer updates in a timely fashion and close the subscription when +// done, since delivery is guaranteed and will block peer +// connection/disconnection otherwise. +// +// FIXME: Consider having callers just use peerManager.Subscribe() directly, if +// we export peerManager and make it an injected dependency (which we probably +// should). +func (r *Router) SubscribePeerUpdates() *PeerUpdatesCh { + return r.peerManager.Subscribe() } // OnStart implements service.Service. func (r *Router) OnStart() error { - go r.broadcastPeerUpdates() go r.dialPeers() for _, transport := range r.transports { go r.acceptPeers(transport) diff --git a/p2p/router_test.go b/p2p/router_test.go index 042200909..393cbd407 100644 --- a/p2p/router_test.go +++ b/p2p/router_test.go @@ -60,8 +60,7 @@ func TestRouter(t *testing.T) { }, peers) channel, err := router.OpenChannel(chID, &TestMessage{}) require.NoError(t, err) - peerUpdates, err := router.SubscribePeerUpdates() - require.NoError(t, err) + peerUpdates := router.SubscribePeerUpdates() err = router.Start() require.NoError(t, err)