From 670e9b427b92c2be2b36905188587d5b4abf4781 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 21 Jan 2021 19:07:54 +0100 Subject: [PATCH] p2p: improve PeerManager prototype (#5936) This improves the prototype peer manager by: * Exporting `PeerManager`, making it accessible by e.g. reactors. * Replacing `Router.SubscribePeerUpdates()` with `PeerManager.Subscribe()`. * Tracking address/peer connection statistics, and retrying dial failures with exponential backoff. * Prioritizing peers, with persistent peers configuration. * Limiting simultaneous connections. * Evicting peers and upgrading to higher-priority peers. * Tracking peer heights, as a workaround for legacy shared peer state APIs. This is getting to a point where we need to determine precise semantics and implement tests, so we should figure out whether it's a reasonable abstraction that we want to use. The main questions are around the API model (i.e. synchronous method calls with the router polling the manager, vs. an event-driven model using channels, vs. the peer manager calling methods on the router to connect/disconnect peers), and who should have the responsibility of managing actual connections (currently the router, while the manager only tracks peer state). --- p2p/peer.go | 496 ++++++++++++++++++++++++++++++++++++--------- p2p/router.go | 71 ++++--- p2p/router_test.go | 25 ++- 3 files changed, 468 insertions(+), 124 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index 235d4d3ae..8cace63c5 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -5,9 +5,12 @@ import ( "errors" "fmt" "io" + "math" + "math/rand" "net" "net/url" "runtime/debug" + "sort" "strconv" "sync" "time" @@ -134,15 +137,6 @@ const ( PeerStatusBanned = PeerStatus("banned") // Peer which is banned for misbehavior. ) -// PeerPriority specifies peer priorities. -type PeerPriority int - -const ( - PeerPriorityNormal PeerPriority = iota + 1 - PeerPriorityValidator - PeerPriorityPersistent -) - // PeerError is a peer error reported by a reactor via the Error channel. The // severity may cause the peer to be disconnected or banned depending on policy. type PeerError struct { @@ -214,10 +208,20 @@ type PeerUpdate struct { Status PeerStatus } -// peerManager manages peer information, using a peerStore for underlying -// storage. Its primary purpose is to determine which peers to connect to next, -// make sure a peer only has a single active connection (either inbound or outbound), -// and to avoid dialing the same peer in parallel goroutines. +// PeerScore is a numeric score assigned to a peer (higher is better). +type PeerScore uint16 + +const ( + // PeerScorePersistent is added for persistent peers. + PeerScorePersistent PeerScore = 100 +) + +// PeerManager manages peer lifecycle information, using a peerStore for +// underlying storage. Its primary purpose is to determine which peers to +// connect to next, make sure a peer only has a single active connection (either +// inbound or outbound), and evict peers to make room for higher-scored peers. +// It does not manage actual connections (this is handled by the Router), +// only the peer lifecycle state. // // For an outbound connection, the flow is as follows: // - DialNext: returns a peer address to dial, marking the peer as dialing. @@ -235,6 +239,12 @@ type PeerUpdate struct { // - Disconnected: peer disconnects, unmarking as connected and broadcasts a // PeerStatusDown peer update. // +// If we need to evict a peer, typically because we have connected to additional +// higher-scored peers and need to shed lower-scored ones, the flow is as follows: +// - EvictNext: returns a peer ID to evict, marking peer as evicting. +// - Disconnected: peer was disconnected, unmarking as connected and evicting, +// and broadcasts a PeerStatusDown peer update. +// // We track dialing and connected states independently. This allows us to accept // an inbound connection from a peer while the router is also dialing an // outbound connection to that same peer, which will cause the dialer to @@ -242,52 +252,114 @@ type PeerUpdate struct { // avoids race conditions where multiple goroutines may end up dialing a peer if // an incoming connection was briefly accepted and disconnected while we were // also dialing. -type peerManager struct { +type PeerManager struct { + options PeerManagerOptions + mtx sync.Mutex store *peerStore dialing map[NodeID]bool connected map[NodeID]bool + evicting map[NodeID]bool subscriptions map[*PeerUpdatesCh]*PeerUpdatesCh // keyed by struct identity (address) } -// newPeerManager creates a new peer manager. -func newPeerManager(store *peerStore) *peerManager { - return &peerManager{ - store: store, +// PeerManagerOptions specifies options for a PeerManager. +type PeerManagerOptions struct { + // PersistentPeers are peers that we want to maintain persistent connections + // to. These will be scored higher than other peers, and if + // MaxConnectedUpgrade is non-zero any lower-scored peers will be evicted if + // necessary to make room for these. + PersistentPeers []NodeID + + // MaxConnected is the maximum number of connected peers (inbound and + // outbound). 0 means no limit. + MaxConnected uint16 + + // MaxConnectedUpgrade is the maximum number of additional connections to + // use for probing any better-scored peers to upgrade to when all connection + // slots are full. 0 disables peer upgrading. + // + // For example, if we are already connected to MaxConnected peers, but we + // know or learn about better-scored peers (e.g. configured persistent + // peers) that we are not connected too, then we can probe these peers by + // using up to MaxConnectedUpgrade connections, and once connected evict the + // lowest-scored connected peers. This also works for inbound connections, + // i.e. if a higher-scored peer attempts to connect to us, we can accept + // the connection and evict a lower-scored peer. + MaxConnectedUpgrade uint16 + + // MinRetryTime is the minimum time to wait between retries. Retry times + // double for each retry, up to MaxRetryTime. 0 disables retries. + MinRetryTime time.Duration + + // MaxRetryTime is the maximum time to wait between retries. 0 means + // no maximum, in which case the retry time will keep doubling. + MaxRetryTime time.Duration + + // MaxRetryTimePersistent is the maximum time to wait between retries for + // peers listed in PersistentPeers. 0 uses MaxRetryTime instead. + MaxRetryTimePersistent time.Duration + + // RetryTimeJitter is the upper bound of a random interval added to + // retry times, to avoid thundering herds. 0 disables jutter. + RetryTimeJitter time.Duration +} + +// isPersistent is a convenience function that checks if the given peer ID +// is contained in PersistentPeers. It just uses a linear search, since +// PersistentPeers is expected to be small. +func (o PeerManagerOptions) isPersistent(id NodeID) bool { + for _, p := range o.PersistentPeers { + if id == p { + return true + } + } + return false +} + +// NewPeerManager creates a new peer manager. +func NewPeerManager(options PeerManagerOptions) *PeerManager { + return &PeerManager{ + options: options, + // FIXME: Once the store persists data, we need to update existing + // peers in the store with any new information, e.g. changes to + // PersistentPeers configuration. + store: newPeerStore(), dialing: map[NodeID]bool{}, connected: map[NodeID]bool{}, + evicting: map[NodeID]bool{}, subscriptions: map[*PeerUpdatesCh]*PeerUpdatesCh{}, } } // Add adds a peer to the manager, given as an address. If the peer already // exists, the address is added to it. -func (m *peerManager) Add(address PeerAddress) error { +func (m *PeerManager) Add(address PeerAddress) error { if err := address.Validate(); err != nil { return err } - peerID := address.NodeID() - m.mtx.Lock() defer m.mtx.Unlock() - peer, err := m.store.Get(peerID) + + peer, err := m.store.Get(address.NodeID()) if err != nil { return err } if peer == nil { - peer = newPeerInfo(peerID) - } - if peer.AddAddress(address) { - return m.store.Set(peer) + peer = &peerInfo{ + ID: address.NodeID(), + Persistent: m.options.isPersistent(address.NodeID()), + } } - return nil + peer.AddAddress(address) + return m.store.Set(peer) } // Subscribe subscribes to peer updates. The caller must consume the peer // updates in a timely fashion and close the subscription when done, since // delivery is guaranteed and will block peer connection/disconnection // otherwise. -func (m *peerManager) Subscribe() *PeerUpdatesCh { +func (m *PeerManager) Subscribe() *PeerUpdatesCh { // FIXME: We may want to use a size 1 buffer here. When the router // broadcasts a peer update it has to loop over all of the // subscriptions, and we want to avoid blocking and waiting for a @@ -316,7 +388,7 @@ func (m *peerManager) Subscribe() *PeerUpdatesCh { // // FIXME: Consider using more fine-grained mutexes here, and/or a channel to // enforce ordering of updates. -func (m *peerManager) broadcast(peerUpdate PeerUpdate) { +func (m *PeerManager) broadcast(peerUpdate PeerUpdate) { for _, sub := range m.subscriptions { select { case sub.updatesCh <- peerUpdate: @@ -327,102 +399,213 @@ func (m *peerManager) broadcast(peerUpdate PeerUpdate) { // DialNext finds an appropriate peer address to dial, and marks it as dialing. // The peer will not be returned again until Dialed() or DialFailed() is called -// for the peer and it is no longer connected. +// for the peer and it is no longer connected. Returns an empty ID if no +// appropriate peers are available, or if all connection slots are full. // -// Returns an empty ID if no appropriate peers are available. -func (m *peerManager) DialNext() (NodeID, PeerAddress, error) { +// We allow dialing MaxConnected+MaxConnectedUpgrade peers. Including +// MaxConnectedUpgrade allows us to dial additional peers beyond MaxConnected if +// they have a higher score than any other connected or dialing peer. If we are +// successful in dialing, and thus have more than MaxConnected connected peers, +// the lower-scored peer will be evicted via EvictNext(). +func (m *PeerManager) DialNext() (NodeID, PeerAddress, error) { m.mtx.Lock() defer m.mtx.Unlock() - peers, err := m.store.List() + + if m.options.MaxConnected > 0 && + len(m.connected)+len(m.dialing) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) { + return "", PeerAddress{}, nil + } + + ranked, err := m.store.Ranked() if err != nil { return "", PeerAddress{}, err } - for _, peer := range peers { - switch { - case len(peer.Addresses) == 0: - case m.dialing[peer.ID]: - case m.connected[peer.ID]: - default: - // FIXME: We currently only dial the first address, but we should - // track connection statistics for each address and return the most - // appropriate one. + for _, peer := range ranked { + if m.dialing[peer.ID] || m.connected[peer.ID] { + continue + } + + for _, addressInfo := range peer.AddressInfo { + if time.Since(addressInfo.LastDialFailure) < m.retryDelay(peer, addressInfo.DialFailures) { + continue + } + + // At this point we have an eligible address to dial. If we're full + // but have peer upgrade capacity (as checked above), we need to + // make sure there exists an evictable peer of a lower score that we + // can replace. If so, we can go ahead and dial this peer, and + // EvictNext() will evict a lower-scored one later. + // + // If we don't find one, there is no point in trying additional + // peers, since they will all have the same or lower score than this + // peer (since they're ordered by score via peerStore.Ranked). + // + // FIXME: There is a race condition here where, if there exists a + // single lower-scored peer, we may end up dialing multiple + // higher-scored new peers that all expect the same lower-scored + // peer to be evicted, causing us to take on too many peers. We may + // need to reserve the eviction for this specific peer such that + // others can't claim it. + if m.options.MaxConnected > 0 && + len(m.connected) >= int(m.options.MaxConnected) && + !m.peerIsUpgrade(peer, ranked) { + return "", PeerAddress{}, nil + } + m.dialing[peer.ID] = true - return peer.ID, peer.Addresses[0], nil + return peer.ID, addressInfo.Address, nil } } return "", PeerAddress{}, nil } +// retryDelay calculates a dial retry delay using exponential backoff, based on +// retry settings in PeerManagerOptions. If MinRetryTime is 0, this returns +// MaxInt64 (i.e. an infinite retry delay, effectively disabling retries). +func (m *PeerManager) retryDelay(peer *peerInfo, failures uint32) time.Duration { + if failures == 0 { + return 0 + } + if m.options.MinRetryTime == 0 { + return time.Duration(math.MaxInt64) + } + maxDelay := m.options.MaxRetryTime + if peer.Persistent && m.options.MaxRetryTimePersistent > 0 { + maxDelay = m.options.MaxRetryTimePersistent + } + + delay := m.options.MinRetryTime * time.Duration(math.Pow(2, float64(failures))) + if maxDelay > 0 && delay > maxDelay { + delay = maxDelay + } + // FIXME: This should use a PeerManager-scoped RNG. + delay += time.Duration(rand.Int63n(int64(m.options.RetryTimeJitter))) // nolint:gosec + return delay +} + // DialFailed reports a failed dial attempt. This will make the peer available // for dialing again when appropriate. -func (m *peerManager) DialFailed(peerID NodeID, address PeerAddress) error { +// +// FIXME: This should probably delete or mark bad addresses/peers after some time. +func (m *PeerManager) DialFailed(peerID NodeID, address PeerAddress) error { m.mtx.Lock() defer m.mtx.Unlock() + delete(m.dialing, peerID) - // FIXME: We need to track address quality statistics and exponential backoff. + + peer, err := m.store.Get(peerID) + if err != nil || peer == nil { // Peer may have been removed while dialing, ignore. + return err + } + if addressInfo := peer.LookupAddressInfo(address); addressInfo != nil { + addressInfo.LastDialFailure = time.Now().UTC() + addressInfo.DialFailures++ + return m.store.Set(peer) + } return nil } // Dialed marks a peer as successfully dialed. Any further incoming connections // will be rejected, and once disconnected the peer may be dialed again. -func (m *peerManager) Dialed(peerID NodeID, address PeerAddress) error { +func (m *PeerManager) Dialed(peerID NodeID, address PeerAddress) error { m.mtx.Lock() defer m.mtx.Unlock() + delete(m.dialing, peerID) + + if m.connected[peerID] { + return fmt.Errorf("peer %v is already connected", peerID) + } + if m.options.MaxConnected > 0 && + len(m.connected) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) { + return fmt.Errorf("already connected to maximum number of peers") + } + peer, err := m.store.Get(peerID) if err != nil { return err } else if peer == nil { - return fmt.Errorf("unknown peer %q", peerID) + return fmt.Errorf("peer %q was removed while dialing", peerID) } + m.connected[peerID] = true - if m.connected[peerID] { - return fmt.Errorf("peer %v is already connected", peerID) + now := time.Now().UTC() + peer.LastConnected = now + if addressInfo := peer.LookupAddressInfo(address); addressInfo != nil { + addressInfo.DialFailures = 0 + addressInfo.LastDialSuccess = now } - delete(m.dialing, peerID) - m.connected[peerID] = true - return nil + return m.store.Set(peer) } // Accepted marks an incoming peer connection successfully accepted. If the peer -// is already connected this will return an error. +// is already connected or we don't allow additional connections then this will +// return an error. +// +// If MaxConnectedUpgrade is non-zero, the accepted peer is better-scored than any +// other connected peer, and the number of connections does not exceed +// MaxConnected + MaxConnectedUpgrade then we accept the connection and rely on +// EvictNext() to evict lower-scored peers. // // NOTE: We can't take an address here, since e.g. TCP uses a different port // number for outbound traffic than inbound traffic, so the peer's endpoint // wouldn't necessarily be an appropriate address to dial. -func (m *peerManager) Accepted(peerID NodeID) error { +func (m *PeerManager) Accepted(peerID NodeID) error { m.mtx.Lock() defer m.mtx.Unlock() + + if m.connected[peerID] { + return fmt.Errorf("peer %q is already connected", peerID) + } + if m.options.MaxConnected > 0 && + len(m.connected) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) { + return fmt.Errorf("already connected to maximum number of peers") + } + peer, err := m.store.Get(peerID) if err != nil { return err - } else if peer == nil { - peer = newPeerInfo(peerID) - if err = m.store.Set(peer); err != nil { - return err + } + if peer == nil { + peer = &peerInfo{ + ID: peerID, + Persistent: m.options.isPersistent(peerID), } } - if m.connected[peerID] { - return fmt.Errorf("peer %q is already connected", peerID) + + // If we're already full (i.e. at MaxConnected), but we allow upgrades (and we + // know from the check above that we have upgrade capacity), then we can look + // for a lower-scored evictable peer, and if found we can accept this connection + // anyway and let EvictNext() evict the lower-scored peer for us. + // + // FIXME: There is a race condition here where, if there exists a single + // lower-scored peer, we may end up accepting multiple higher-scored new + // peers that all expect the same lower-scored peer to be evicted, causing + // us to take on too many peers. We may need to reserve the eviction for + // this specific peer such that others can't claim it. + if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { + ranked, err := m.store.Ranked() + if err != nil { + return err + } + if !m.peerIsUpgrade(peer, ranked) { + return fmt.Errorf("already connected to maximum number of peers") + } } + m.connected[peerID] = true - return nil + peer.LastConnected = time.Now().UTC() + return m.store.Set(peer) } // Ready marks a peer as ready, broadcasting status updates to subscribers. The // peer must already be marked as connected. This is separate from Dialed() and // Accepted() to allow the router to set up its internal queues before reactors -// start sending messages (holding the Router.peerMtx mutex while calling -// Accepted or Dialed will halt all message routing while peers are set up, which -// is too expensive and also causes difficulties in tests where we may want to -// consume peer updates and send messages sequentially). -// -// FIXME: This possibly indicates an architectural problem. Should the peerManager -// handle actual network connections to/from peers as well? Or should all of this -// be done by the router? -func (m *peerManager) Ready(peerID NodeID) { +// start sending messages. +func (m *PeerManager) Ready(peerID NodeID) { m.mtx.Lock() defer m.mtx.Unlock() + connected := m.connected[peerID] if connected { m.broadcast(PeerUpdate{ @@ -434,10 +617,12 @@ func (m *peerManager) Ready(peerID NodeID) { // Disconnected unmarks a peer as connected, allowing new connections to be // established. -func (m *peerManager) Disconnected(peerID NodeID) error { +func (m *PeerManager) Disconnected(peerID NodeID) error { m.mtx.Lock() defer m.mtx.Unlock() + delete(m.connected, peerID) + delete(m.evicting, peerID) m.broadcast(PeerUpdate{ PeerID: peerID, Status: PeerStatusDown, @@ -445,12 +630,100 @@ func (m *peerManager) Disconnected(peerID NodeID) error { return nil } +// EvictNext returns the next peer to evict (i.e. disconnect), or an empty ID if +// no peers should be evicted. The evicted peer will be a lowest-scored peer +// that is currently connected and not already being evicted. +func (m *PeerManager) EvictNext() (NodeID, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + + if m.options.MaxConnected == 0 || + len(m.connected)-len(m.evicting) <= int(m.options.MaxConnected) { + return "", nil + } + + ranked, err := m.store.Ranked() + if err != nil { + return "", err + } + for i := len(ranked) - 1; i >= 0; i-- { + peer := ranked[i] + if m.connected[peer.ID] && !m.evicting[peer.ID] { + m.evicting[peer.ID] = true + return peer.ID, nil + } + } + return "", nil +} + +// peerIsUpgrade checks whether connecting to a given peer would be an +// upgrade, i.e. that there exists a lower-scored peer that is already +// connected and not scheduled for eviction, such that connecting to +// the peer would cause a lower-scored peer to be evicted if we're full. +func (m *PeerManager) peerIsUpgrade(peer *peerInfo, ranked []*peerInfo) bool { + for i := len(ranked) - 1; i >= 0; i-- { + candidate := ranked[i] + if candidate.Score() >= peer.Score() { + return false + } + if m.connected[candidate.ID] && !m.evicting[candidate.ID] { + return true + } + } + return false +} + +// GetHeight returns a peer's height, as reported via SetHeight. If the peer +// or height is unknown, this returns 0. +// +// FIXME: This is a temporary workaround for the peer state stored via the +// legacy Peer.Set() and Peer.Get() APIs, used to share height state between the +// consensus and mempool reactors. These dependencies should be removed from the +// reactors, and instead query this information independently via new P2P +// protocol additions. +func (m *PeerManager) GetHeight(peerID NodeID) (int64, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + + peer, err := m.store.Get(peerID) + if err != nil || peer == nil { + return 0, err + } + return peer.Height, nil +} + +// SetHeight stores a peer's height, making it available via GetHeight. If the +// peer is unknown, it is created. +// +// FIXME: This is a temporary workaround for the peer state stored via the +// legacy Peer.Set() and Peer.Get() APIs, used to share height state between the +// consensus and mempool reactors. These dependencies should be removed from the +// reactors, and instead query this information independently via new P2P +// protocol additions. +func (m *PeerManager) SetHeight(peerID NodeID, height int64) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + peer, err := m.store.Get(peerID) + if err != nil { + return err + } + if peer == nil { + peer = &peerInfo{ + ID: peerID, + Persistent: m.options.isPersistent(peerID), + } + } + peer.Height = height + return m.store.Set(peer) +} + // peerStore stores information about peers. It is currently a bare-bones // in-memory store, and will be fleshed out later. // -// peerStore is not thread-safe, since it assumes it is only used by peerManager -// which handles concurrency control. This allows multiple operations to be -// executed atomically, since the peerManager will hold a mutex while executing. +// peerStore is not thread-safe, since it assumes it is only used by PeerManager +// which handles concurrency control. This allows the manager to execute multiple +// operations atomically while it holds the mutex. type peerStore struct { peers map[NodeID]peerInfo } @@ -490,35 +763,76 @@ func (s *peerStore) List() ([]*peerInfo, error) { return peers, nil } -// peerInfo contains peer information stored in a peerStore. +// Ranked returns a list of peers ordered by score (better peers first). +// Peers with equal scores are returned in an arbitrary order. // -// FIXME: This should be renamed peer or something else once the old peer is -// removed. -type peerInfo struct { - ID NodeID - Addresses []PeerAddress +// This is used to determine which peers to connect to and which peers to evict +// in order to make room for better peers. +// +// FIXME: For now, we simply generate the list on every call, but this can get +// expensive since it's called fairly frequently. We may want to either cache +// this, or store peers in a data structure that maintains order (e.g. a heap or +// ordered map). +func (s *peerStore) Ranked() ([]*peerInfo, error) { + peers, err := s.List() + if err != nil { + return nil, err + } + sort.Slice(peers, func(i, j int) bool { + // FIXME: If necessary, consider precomputing scores before sorting, + // to reduce the number of Score() calls. + return peers[i].Score() > peers[j].Score() + }) + return peers, nil } -// newPeerInfo creates a new peerInfo. -func newPeerInfo(id NodeID) *peerInfo { - return &peerInfo{ - ID: id, - Addresses: []PeerAddress{}, - } +// peerInfo contains peer information stored in a peerStore. +type peerInfo struct { + ID NodeID + AddressInfo []*addressInfo + Persistent bool + Height int64 + LastConnected time.Time } // AddAddress adds an address to a peer, unless it already exists. It does not // validate the address. Returns true if the address was new. func (p *peerInfo) AddAddress(address PeerAddress) bool { + if p.LookupAddressInfo(address) != nil { + return false + } + p.AddressInfo = append(p.AddressInfo, &addressInfo{Address: address}) + return true +} + +// LookupAddressInfo returns address info for an address, or nil if unknown. +func (p *peerInfo) LookupAddressInfo(address PeerAddress) *addressInfo { // We just do a linear search for now. addressString := address.String() - for _, a := range p.Addresses { - if a.String() == addressString { - return false + for _, info := range p.AddressInfo { + if info.Address.String() == addressString { + return info } } - p.Addresses = append(p.Addresses, address) - return true + return nil +} + +// Score calculates a score for the peer. Higher-scored peers will be +// preferred over lower scores. +func (p *peerInfo) Score() PeerScore { + var score PeerScore + if p.Persistent { + score += PeerScorePersistent + } + return score +} + +// addressInfo contains information and statistics about an address. +type addressInfo struct { + Address PeerAddress + LastDialSuccess time.Time + LastDialFailure time.Time + DialFailures uint32 // since last successful dial } // ============================================================================ diff --git a/p2p/router.go b/p2p/router.go index f33ff1c65..ae744a7f9 100644 --- a/p2p/router.go +++ b/p2p/router.go @@ -23,15 +23,18 @@ import ( // either the channel is closed by the caller or the router is stopped, at which // point the input message queue is closed and removed. // -// On startup, the router spawns off two primary goroutines that maintain +// On startup, the router spawns off three primary goroutines that maintain // connections to peers and run for the lifetime of the router: // -// Router.dialPeers(): in a loop, asks the peerStore to dispense an -// eligible peer to connect to, and attempts to resolve and dial each -// address until successful. +// Router.dialPeers(): in a loop, asks the PeerManager for the next peer +// address to contact, resolves it into endpoints, and attempts to dial +// each one. // // Router.acceptPeers(): in a loop, waits for the next inbound connection -// from a peer, and attempts to claim it in the peerStore. +// from a peer, and checks with the PeerManager if it should be accepted. +// +// Router.evictPeers(): in a loop, asks the PeerManager for any connected +// peers to evict, and disconnects them. // // Once either an inbound or outbound connection has been made, an outbound // message queue is registered in Router.peerQueues and a goroutine is spawned @@ -74,7 +77,7 @@ type Router struct { *service.BaseService logger log.Logger transports map[Protocol]Transport - peerManager *peerManager + peerManager *PeerManager // FIXME: Consider using sync.Map. peerMtx sync.RWMutex @@ -96,24 +99,17 @@ type Router struct { // FIXME: providing protocol/transport maps is cumbersome in tests, we should // consider adding Protocols() to the Transport interface instead and register // protocol/transport mappings automatically on a first-come basis. -func NewRouter(logger log.Logger, transports map[Protocol]Transport, peers []PeerAddress) *Router { +func NewRouter(logger log.Logger, peerManager *PeerManager, transports map[Protocol]Transport) *Router { router := &Router{ logger: logger, transports: transports, - peerManager: newPeerManager(newPeerStore()), + peerManager: peerManager, stopCh: make(chan struct{}), channelQueues: map[ChannelID]queue{}, channelMessages: map[ChannelID]proto.Message{}, peerQueues: map[NodeID]queue{}, } router.BaseService = service.NewBaseService(logger, "router", router) - - for _, address := range peers { - if err := router.peerManager.Add(address); err != nil { - logger.Error("failed to add peer", "address", address, "err", err) - } - } - return router } @@ -241,6 +237,9 @@ func (r *Router) acceptPeers(transport Transport) { default: } + // FIXME: We may need transports to enforce some sort of rate limiting + // here (e.g. by IP address), or alternatively have PeerManager.Accepted() + // do it for us. conn, err := transport.Accept(context.Background()) switch err { case nil: @@ -480,16 +479,37 @@ func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error { } } -// SubscribePeerUpdates creates a new peer updates subscription. The caller must -// consume the peer updates in a timely fashion and close the subscription when -// done, since delivery is guaranteed and will block peer -// connection/disconnection otherwise. -// -// FIXME: Consider having callers just use peerManager.Subscribe() directly, if -// we export peerManager and make it an injected dependency (which we probably -// should). -func (r *Router) SubscribePeerUpdates() *PeerUpdatesCh { - return r.peerManager.Subscribe() +// evictPeers evicts connected peers as requested by the peer manager. +func (r *Router) evictPeers() { + for { + select { + case <-r.stopCh: + return + default: + } + + peerID, err := r.peerManager.EvictNext() + if err != nil { + r.logger.Error("failed to find next peer to evict", "err", err) + return + } else if peerID == "" { + r.logger.Debug("no evictable peers, sleeping") + select { + case <-time.After(time.Second): + continue + case <-r.stopCh: + return + } + } + + r.logger.Info("evicting peer", "peer", peerID) + r.peerMtx.RLock() + queue, ok := r.peerQueues[peerID] + r.peerMtx.RUnlock() + if ok { + queue.close() + } + } } // OnStart implements service.Service. @@ -498,6 +518,7 @@ func (r *Router) OnStart() error { for _, transport := range r.transports { go r.acceptPeers(transport) } + go r.evictPeers() return nil } diff --git a/p2p/router_test.go b/p2p/router_test.go index 393cbd407..1df66a87f 100644 --- a/p2p/router_test.go +++ b/p2p/router_test.go @@ -39,9 +39,13 @@ func TestRouter(t *testing.T) { peers := []p2p.PeerAddress{} for i := 0; i < 3; i++ { peerTransport := network.GenerateTransport() - peerRouter := p2p.NewRouter(logger.With("peerID", i), map[p2p.Protocol]p2p.Transport{ - p2p.MemoryProtocol: peerTransport, - }, nil) + peerRouter := p2p.NewRouter( + logger.With("peerID", i), + p2p.NewPeerManager(p2p.PeerManagerOptions{}), + map[p2p.Protocol]p2p.Transport{ + p2p.MemoryProtocol: peerTransport, + }, + ) peers = append(peers, peerTransport.Endpoints()[0].PeerAddress()) channel, err := peerRouter.OpenChannel(chID, &TestMessage{}) @@ -55,18 +59,23 @@ func TestRouter(t *testing.T) { } // Start the main router and connect it to the peers above. - router := p2p.NewRouter(logger, map[p2p.Protocol]p2p.Transport{ + peerManager := p2p.NewPeerManager(p2p.PeerManagerOptions{}) + for _, address := range peers { + err := peerManager.Add(address) + require.NoError(t, err) + } + peerUpdates := peerManager.Subscribe() + defer peerUpdates.Close() + + router := p2p.NewRouter(logger, peerManager, map[p2p.Protocol]p2p.Transport{ p2p.MemoryProtocol: transport, - }, peers) + }) channel, err := router.OpenChannel(chID, &TestMessage{}) require.NoError(t, err) - peerUpdates := router.SubscribePeerUpdates() - err = router.Start() require.NoError(t, err) defer func() { channel.Close() - peerUpdates.Close() require.NoError(t, router.Stop()) }()