From 81daaacae9f5d7b81cf4fca99f11857b25d205b0 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 25 Jan 2021 18:51:14 +0100 Subject: [PATCH] p2p: simplify PeerManager upgrade logic (#5962) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up from #5947, branched off of #5954. This simplifies the upgrade logic by adding explicit eviction requests, which can also be useful for other use-cases (e.g. if we need to ban a peer that's misbehaving). Changes: * Add `evict` map which queues up peers to explicitly evict. * `upgrading` now only tracks peers that we're upgrading via dialing (`DialNext` → `Dialed`/`DialFailed`). * `Dialed` will unmark `upgrading`, and queue `evict` if still beyond capacity. * `Accepted` will pick a random lower-scored peer to upgrade to, if appropriate, and doesn't care about `upgrading` (the dial will fail later, since it's already connected). * `EvictNext` will return a peer scheduled in `evict` if any, otherwise if beyond capacity just evict the lowest-scored peer. This limits all of the `upgrading` logic to `DialNext`, `Dialed`, and `DialFailed`, making it much simplier, and it should generally do the right thing in all cases I can think of. --- p2p/peer.go | 157 ++++++++++++++++++++++------------------------------ 1 file changed, 67 insertions(+), 90 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index b357fc718..c40de68bf 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -296,28 +296,26 @@ const ( // - Disconnected: peer disconnects, unmarking as connected and broadcasts a // PeerStatusDown peer update. // -// If we are connected to too many peers (more than MaxConnections), typically -// because we have upgraded to higher-scored peers and need to shed lower-scored -// ones, the flow is as follows: -// - EvictNext: returns a peer ID to evict, marking peer as evicting. -// - Disconnected: peer was disconnected, unmarking as connected and evicting, -// and broadcasts a PeerStatusDown peer update. +// When evicting peers, either because peers are explicitly scheduled for +// eviction or we are connected to too many peers, the flow is as follows: +// - EvictNext: if marked evict and connected, unmark evict and mark evicting. +// If beyond MaxConnected, pick lowest-scored peer and mark evicting. +// - Disconnected: unmark connected, evicting, evict, and broadcast a +// PeerStatusDown peer update. // // If all connection slots are full (at MaxConnections), we can use up to // MaxConnectionsUpgrade additional connections to probe any higher-scored // unconnected peers, and if we reach them (or they reach us) we allow the -// connection and evict lower-scored peers. We mark the lower-scored peer as +// connection and evict a lower-scored peer. We mark the lower-scored peer as // upgrading[from]=to to make sure no other higher-scored peers can claim the // same one for an upgrade. The flow is as follows: -// - Accepted: if upgrade is possible, mark upgrading[from]=to and connected. +// - Accepted: if upgrade is possible, mark connected and add lower-scored to evict. // - DialNext: if upgrade is possible, mark upgrading[from]=to and dialing. // - DialFailed: unmark upgrading[from]=to and dialing. -// - Dialed: unmark dialing, mark as connected. -// - EvictNext: unmark upgrading[from]=to, then if over MaxConnections -// either the upgraded peer or an even lower-scored one (if found) -// is marked as evicting and returned. -// - Disconnected: unmark connected and evicting, also upgrading[from]=to -// both from and to (in case either disconnected before eviction). +// - Dialed: unmark upgrading[from]=to and dialing, mark as connected, add +// lower-scored to evict. +// - EvictNext: pick peer from evict, mark as evicting. +// - Disconnected: unmark connected, upgrading[from]=to, evict, evicting. type PeerManager struct { options PeerManagerOptions wakeDialCh chan struct{} // wakes up DialNext() on relevant peer changes @@ -328,8 +326,9 @@ type PeerManager struct { mtx sync.Mutex store *peerStore dialing map[NodeID]bool // peers being dialed (DialNext -> Dialed/DialFail) + upgrading map[NodeID]NodeID // peers claimed for upgrade (DialNext -> Dialed/DialFail) connected map[NodeID]bool // connected peers (Dialed/Accepted -> Disconnected) - upgrading map[NodeID]NodeID // peers claimed for upgrading (key is lower-scored peer) + evict map[NodeID]bool // peers scheduled for eviction (Connected -> EvictNext) evicting map[NodeID]bool // peers being evicted (EvictNext -> Disconnected) subscriptions map[*PeerUpdatesCh]*PeerUpdatesCh // keyed by struct identity (address) } @@ -402,8 +401,9 @@ func NewPeerManager(peerDB dbm.DB, options PeerManagerOptions) (*PeerManager, er store: store, dialing: map[NodeID]bool{}, - connected: map[NodeID]bool{}, upgrading: map[NodeID]NodeID{}, + connected: map[NodeID]bool{}, + evict: map[NodeID]bool{}, evicting: map[NodeID]bool{}, subscriptions: map[*PeerUpdatesCh]*PeerUpdatesCh{}, } @@ -577,8 +577,7 @@ func (m *PeerManager) TryDialNext() (NodeID, PeerAddress, error) { // We allow dialing MaxConnected+MaxConnectedUpgrade peers. Including // MaxConnectedUpgrade allows us to probe additional peers that have a - // higher score than a connected peer, and if successful evict the - // lower-scored peer via EvictNext(). + // higher score than any other peers, and if successful evict it. if m.options.MaxConnected > 0 && len(m.connected)+len(m.dialing) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) { return "", PeerAddress{}, nil @@ -594,21 +593,19 @@ func (m *PeerManager) TryDialNext() (NodeID, PeerAddress, error) { continue } - // At this point we have an eligible address to dial. If we're full - // but have peer upgrade capacity (as checked above), we need to - // make sure there exists an evictable peer of a lower score that we - // can replace. If so, we mark the lower-scored peer as upgrading so - // noone else can claim it, and EvictNext() will evict it later. + // We now have an eligible address to dial. If we're full but have + // upgrade capacity (as checked above), we find a lower-scored peer + // we can replace and mark it as upgrading so noone else claims it. // // If we don't find one, there is no point in trying additional // peers, since they will all have the same or lower score than this // peer (since they're ordered by score via peerStore.Ranked). if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { - upgradePeer := m.findUpgradeCandidate(peer.ID, peer.Score()) - if upgradePeer == "" { + upgradeFromPeer := m.findUpgradeCandidate(peer.ID, peer.Score()) + if upgradeFromPeer == "" { return "", PeerAddress{}, nil } - m.upgrading[upgradePeer] = peer.ID + m.upgrading[upgradeFromPeer] = peer.ID } m.dialing[peer.ID] = true @@ -676,8 +673,7 @@ func (m *PeerManager) DialFailed(peerID NodeID, address PeerAddress) error { delete(m.dialing, peerID) for from, to := range m.upgrading { if to == peerID { - // Unmark failed upgrade attempt. - delete(m.upgrading, from) + delete(m.upgrading, from) // Unmark failed upgrade attempt. } } @@ -725,6 +721,16 @@ func (m *PeerManager) Dialed(peerID NodeID, address PeerAddress) error { delete(m.dialing, peerID) + var upgradeFromPeer NodeID + for from, to := range m.upgrading { + if to == peerID { + delete(m.upgrading, from) + upgradeFromPeer = from + // Don't break, just in case this peer was marked as upgrading for + // multiple lower-scored peers (shouldn't really happen). + } + } + if m.connected[peerID] { return fmt.Errorf("peer %v is already connected", peerID) } @@ -737,7 +743,6 @@ func (m *PeerManager) Dialed(peerID NodeID, address PeerAddress) error { if !ok { return fmt.Errorf("peer %q was removed while dialing", peerID) } - now := time.Now().UTC() peer.LastConnected = now if addressInfo, ok := peer.AddressInfo[address.String()]; ok { @@ -749,6 +754,17 @@ func (m *PeerManager) Dialed(peerID NodeID, address PeerAddress) error { return err } + if upgradeFromPeer != "" && m.options.MaxConnected > 0 && + len(m.connected) >= int(m.options.MaxConnected) { + // Look for an even lower-scored peer that may have appeared + // since we started the upgrade. + if p, ok := m.store.Get(upgradeFromPeer); ok { + if u := m.findUpgradeCandidate(p.ID, p.Score()); u != "" { + upgradeFromPeer = u + } + } + m.evict[upgradeFromPeer] = true + } m.connected[peerID] = true m.wakeEvict() @@ -759,10 +775,9 @@ func (m *PeerManager) Dialed(peerID NodeID, address PeerAddress) error { // is already connected or we don't allow additional connections then this will // return an error. // -// If MaxConnectedUpgrade is non-zero, the accepted peer is better-scored than any -// other connected peer, and the number of connections does not exceed -// MaxConnected + MaxConnectedUpgrade then we accept the connection and rely on -// EvictNext() to evict lower-scored peers. +// If full but MaxConnectedUpgrade is non-zero and the incoming peer is +// better-scored than any existing peers, then we accept it and evict a +// lower-scored peer. // // NOTE: We can't take an address here, since e.g. TCP uses a different port // number for outbound traffic than inbound traffic, so the peer's endpoint @@ -784,16 +799,15 @@ func (m *PeerManager) Accepted(peerID NodeID) error { peer = m.makePeerInfo(peerID) } - // If we're already full (i.e. at MaxConnected), but we allow upgrades (and - // we know from the check above that we have upgrade capacity), then we can - // look for any lower-scored evictable peer, and if found we can accept this - // connection anyway and let EvictNext() evict a lower-scored peer for us. + // If all connections slots are full, but we allow upgrades (and we checked + // above that we have upgrade capacity), then we can look for a lower-scored + // peer to replace and if found accept the connection anyway and evict it. + var upgradeFromPeer NodeID if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { - upgradePeer := m.findUpgradeCandidate(peer.ID, peer.Score()) - if upgradePeer == "" { + upgradeFromPeer = m.findUpgradeCandidate(peer.ID, peer.Score()) + if upgradeFromPeer == "" { return fmt.Errorf("already connected to maximum number of peers") } - m.upgrading[upgradePeer] = peerID } peer.LastConnected = time.Now().UTC() @@ -802,6 +816,9 @@ func (m *PeerManager) Accepted(peerID NodeID) error { } m.connected[peerID] = true + if upgradeFromPeer != "" { + m.evict[upgradeFromPeer] = true + } m.wakeEvict() return nil } @@ -828,17 +845,9 @@ func (m *PeerManager) Disconnected(peerID NodeID) error { m.mtx.Lock() defer m.mtx.Unlock() - // After upgrading to a peer, it's possible for that peer to disconnect - // before EvictNext() gets around to evicting the lower-scored peer. To - // avoid stale upgrade markers, we remove it here. - for from, to := range m.upgrading { - if to == peerID { - delete(m.upgrading, from) - } - } - delete(m.connected, peerID) delete(m.upgrading, peerID) + delete(m.evict, peerID) delete(m.evicting, peerID) m.broadcast(PeerUpdate{ PeerID: peerID, @@ -871,18 +880,13 @@ func (m *PeerManager) TryEvictNext() (NodeID, error) { m.mtx.Lock() defer m.mtx.Unlock() - // We first prune the upgrade list. All connection slots were full when the - // upgrades began, but we may have disconnected other peers in the meanwhile - // and thus don't have to evict the upgraded peers after all. - for from, to := range m.upgrading { - // Stop pruning when the upgrade slots are only for connections - // exceeding MaxConnected. - if m.options.MaxConnected == 0 || - len(m.upgrading) <= len(m.connected)-len(m.evicting)-int(m.options.MaxConnected) { - break - } - if m.connected[to] { - delete(m.upgrading, from) + // If any connected peers are explicitly scheduled for eviction, we return a + // random one. + for peerID := range m.evict { + delete(m.evict, peerID) + if m.connected[peerID] && !m.evicting[peerID] { + m.evicting[peerID] = true + return peerID, nil } } @@ -892,26 +896,7 @@ func (m *PeerManager) TryEvictNext() (NodeID, error) { return "", nil } - // Look for any upgraded peers that we can evict. - for from, to := range m.upgrading { - if m.connected[to] { - delete(m.upgrading, from) - // We may have connected to even lower-scored peers that we can - // evict since we started upgrading this one, in which case we can - // evict one of those. - if fromPeer, ok := m.store.Get(from); !ok { - continue - } else if evictPeer := m.findUpgradeCandidate(fromPeer.ID, fromPeer.Score()); evictPeer != "" { - m.evicting[evictPeer] = true - return evictPeer, nil - } else { - m.evicting[from] = true - return from, nil - } - } - } - - // If we didn't find any upgraded peers to evict, we just pick a low-ranked one. + // If we're above capacity, just pick the lowest-ranked peer to evict. ranked := m.store.Ranked() for i := len(ranked) - 1; i >= 0; i-- { peer := ranked[i] @@ -928,15 +913,6 @@ func (m *PeerManager) TryEvictNext() (NodeID, error) { // to make room for the given peer. Returns an empty ID if none is found. // The caller must hold the mutex lock. func (m *PeerManager) findUpgradeCandidate(id NodeID, score PeerScore) NodeID { - // Check for any existing upgrade claims to this peer. It is important that - // we return this, since we can get an inbound connection from a peer that - // we're concurrently trying to dial for an upgrade, and we want the inbound - // connection to be accepted in this case. - for from, to := range m.upgrading { - if to == id { - return from - } - } ranked := m.store.Ranked() for i := len(ranked) - 1; i >= 0; i-- { candidate := ranked[i] @@ -944,6 +920,7 @@ func (m *PeerManager) findUpgradeCandidate(id NodeID, score PeerScore) NodeID { case candidate.Score() >= score: return "" // no further peers can be scored lower, due to sorting case !m.connected[candidate.ID]: + case m.evict[candidate.ID]: case m.evicting[candidate.ID]: case m.upgrading[candidate.ID] != "": default: