@ -296,28 +296,26 @@ const (
// - Disconnected: peer disconnects, unmarking as connected and broadcasts a
// PeerStatusDown peer update.
//
// If we are connected to too many peers (more than MaxConnections), typically
// because we have upgraded to higher-scored peers and need to shed lower-scored
// ones, the flow is as follows:
// - EvictNext: returns a peer ID to evict, marking peer as evicting.
// - Disconnected: peer was disconnected, unmarking as connected and evicting,
// and broadcasts a PeerStatusDown peer update.
// When evicting peers, either because peers are explicitly scheduled for
// eviction or we are connected to too many peers, the flow is as follows:
// - EvictNext: if marked evict and connected, unmark evict and mark evicting.
// If beyond MaxConnected, pick lowest-scored peer and mark evicting.
// - Disconnected: unmark connected, evicting, evict, and broadcast a
// PeerStatusDown peer update.
//
// If all connection slots are full (at MaxConnections), we can use up to
// MaxConnectionsUpgrade additional connections to probe any higher-scored
// unconnected peers, and if we reach them (or they reach us) we allow the
// connection and evict lower-scored peers . We mark the lower-scored peer as
// connection and evict a lower-scored peer. We mark the lower-scored peer as
// upgrading[from]=to to make sure no other higher-scored peers can claim the
// same one for an upgrade. The flow is as follows:
// - Accepted: if upgrade is possible, mark upgrading[from]=to and connected .
// - Accepted: if upgrade is possible, mark connected and add lower-scored to evict .
// - DialNext: if upgrade is possible, mark upgrading[from]=to and dialing.
// - DialFailed: unmark upgrading[from]=to and dialing.
// - Dialed: unmark dialing, mark as connected.
// - EvictNext: unmark upgrading[from]=to, then if over MaxConnections
// either the upgraded peer or an even lower-scored one (if found)
// is marked as evicting and returned.
// - Disconnected: unmark connected and evicting, also upgrading[from]=to
// both from and to (in case either disconnected before eviction).
// - Dialed: unmark upgrading[from]=to and dialing, mark as connected, add
// lower-scored to evict.
// - EvictNext: pick peer from evict, mark as evicting.
// - Disconnected: unmark connected, upgrading[from]=to, evict, evicting.
type PeerManager struct {
options PeerManagerOptions
wakeDialCh chan struct { } // wakes up DialNext() on relevant peer changes
@ -328,8 +326,9 @@ type PeerManager struct {
mtx sync . Mutex
store * peerStore
dialing map [ NodeID ] bool // peers being dialed (DialNext -> Dialed/DialFail)
upgrading map [ NodeID ] NodeID // peers claimed for upgrade (DialNext -> Dialed/DialFail)
connected map [ NodeID ] bool // connected peers (Dialed/Accepted -> Disconnected)
upgrading map [ NodeID ] NodeID // peers claimed for upgrading (key is lower-scored peer )
evict map [ NodeID ] bool // peers scheduled for eviction (Connected -> EvictNext )
evicting map [ NodeID ] bool // peers being evicted (EvictNext -> Disconnected)
subscriptions map [ * PeerUpdatesCh ] * PeerUpdatesCh // keyed by struct identity (address)
}
@ -402,8 +401,9 @@ func NewPeerManager(peerDB dbm.DB, options PeerManagerOptions) (*PeerManager, er
store : store ,
dialing : map [ NodeID ] bool { } ,
connected : map [ NodeID ] bool { } ,
upgrading : map [ NodeID ] NodeID { } ,
connected : map [ NodeID ] bool { } ,
evict : map [ NodeID ] bool { } ,
evicting : map [ NodeID ] bool { } ,
subscriptions : map [ * PeerUpdatesCh ] * PeerUpdatesCh { } ,
}
@ -577,8 +577,7 @@ func (m *PeerManager) TryDialNext() (NodeID, PeerAddress, error) {
// We allow dialing MaxConnected+MaxConnectedUpgrade peers. Including
// MaxConnectedUpgrade allows us to probe additional peers that have a
// higher score than a connected peer, and if successful evict the
// lower-scored peer via EvictNext().
// higher score than any other peers, and if successful evict it.
if m . options . MaxConnected > 0 &&
len ( m . connected ) + len ( m . dialing ) >= int ( m . options . MaxConnected ) + int ( m . options . MaxConnectedUpgrade ) {
return "" , PeerAddress { } , nil
@ -594,21 +593,19 @@ func (m *PeerManager) TryDialNext() (NodeID, PeerAddress, error) {
continue
}
// At this point we have an eligible address to dial. If we're full
// but have peer upgrade capacity (as checked above), we need to
// make sure there exists an evictable peer of a lower score that we
// can replace. If so, we mark the lower-scored peer as upgrading so
// noone else can claim it, and EvictNext() will evict it later.
// We now have an eligible address to dial. If we're full but have
// upgrade capacity (as checked above), we find a lower-scored peer
// we can replace and mark it as upgrading so noone else claims it.
//
// If we don't find one, there is no point in trying additional
// peers, since they will all have the same or lower score than this
// peer (since they're ordered by score via peerStore.Ranked).
if m . options . MaxConnected > 0 && len ( m . connected ) >= int ( m . options . MaxConnected ) {
upgradePeer := m . findUpgradeCandidate ( peer . ID , peer . Score ( ) )
if upgradePeer == "" {
upgradeFrom Peer := m . findUpgradeCandidate ( peer . ID , peer . Score ( ) )
if upgradeFrom Peer == "" {
return "" , PeerAddress { } , nil
}
m . upgrading [ upgradePeer ] = peer . ID
m . upgrading [ upgradeFrom Peer ] = peer . ID
}
m . dialing [ peer . ID ] = true
@ -676,8 +673,7 @@ func (m *PeerManager) DialFailed(peerID NodeID, address PeerAddress) error {
delete ( m . dialing , peerID )
for from , to := range m . upgrading {
if to == peerID {
// Unmark failed upgrade attempt.
delete ( m . upgrading , from )
delete ( m . upgrading , from ) // Unmark failed upgrade attempt.
}
}
@ -725,6 +721,16 @@ func (m *PeerManager) Dialed(peerID NodeID, address PeerAddress) error {
delete ( m . dialing , peerID )
var upgradeFromPeer NodeID
for from , to := range m . upgrading {
if to == peerID {
delete ( m . upgrading , from )
upgradeFromPeer = from
// Don't break, just in case this peer was marked as upgrading for
// multiple lower-scored peers (shouldn't really happen).
}
}
if m . connected [ peerID ] {
return fmt . Errorf ( "peer %v is already connected" , peerID )
}
@ -737,7 +743,6 @@ func (m *PeerManager) Dialed(peerID NodeID, address PeerAddress) error {
if ! ok {
return fmt . Errorf ( "peer %q was removed while dialing" , peerID )
}
now := time . Now ( ) . UTC ( )
peer . LastConnected = now
if addressInfo , ok := peer . AddressInfo [ address . String ( ) ] ; ok {
@ -749,6 +754,17 @@ func (m *PeerManager) Dialed(peerID NodeID, address PeerAddress) error {
return err
}
if upgradeFromPeer != "" && m . options . MaxConnected > 0 &&
len ( m . connected ) >= int ( m . options . MaxConnected ) {
// Look for an even lower-scored peer that may have appeared
// since we started the upgrade.
if p , ok := m . store . Get ( upgradeFromPeer ) ; ok {
if u := m . findUpgradeCandidate ( p . ID , p . Score ( ) ) ; u != "" {
upgradeFromPeer = u
}
}
m . evict [ upgradeFromPeer ] = true
}
m . connected [ peerID ] = true
m . wakeEvict ( )
@ -759,10 +775,9 @@ func (m *PeerManager) Dialed(peerID NodeID, address PeerAddress) error {
// is already connected or we don't allow additional connections then this will
// return an error.
//
// If MaxConnectedUpgrade is non-zero, the accepted peer is better-scored than any
// other connected peer, and the number of connections does not exceed
// MaxConnected + MaxConnectedUpgrade then we accept the connection and rely on
// EvictNext() to evict lower-scored peers.
// If full but MaxConnectedUpgrade is non-zero and the incoming peer is
// better-scored than any existing peers, then we accept it and evict a
// lower-scored peer.
//
// NOTE: We can't take an address here, since e.g. TCP uses a different port
// number for outbound traffic than inbound traffic, so the peer's endpoint
@ -784,16 +799,15 @@ func (m *PeerManager) Accepted(peerID NodeID) error {
peer = m . makePeerInfo ( peerID )
}
// If we're already full (i.e. at MaxConnected), but we allow upgrades (an d
// we know from the check above that we have upgrade capacity), then we can
// look for any lower-scored evictable peer, and if found we can accept this
// connection anyway and let EvictNext() evict a lower-scored peer for us.
// If all connections slots are full, but we allow upgrades (and we checke d
// above that we have upgrade capacity), then we can look for a lower-scored
// peer to replace and if found accept the connection anyway and evict it.
var upgradeFromPeer NodeID
if m . options . MaxConnected > 0 && len ( m . connected ) >= int ( m . options . MaxConnected ) {
upgradePeer : = m . findUpgradeCandidate ( peer . ID , peer . Score ( ) )
if upgradePeer == "" {
upgradeFromPeer = m . findUpgradeCandidate ( peer . ID , peer . Score ( ) )
if upgradeFrom Peer == "" {
return fmt . Errorf ( "already connected to maximum number of peers" )
}
m . upgrading [ upgradePeer ] = peerID
}
peer . LastConnected = time . Now ( ) . UTC ( )
@ -802,6 +816,9 @@ func (m *PeerManager) Accepted(peerID NodeID) error {
}
m . connected [ peerID ] = true
if upgradeFromPeer != "" {
m . evict [ upgradeFromPeer ] = true
}
m . wakeEvict ( )
return nil
}
@ -828,17 +845,9 @@ func (m *PeerManager) Disconnected(peerID NodeID) error {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
// After upgrading to a peer, it's possible for that peer to disconnect
// before EvictNext() gets around to evicting the lower-scored peer. To
// avoid stale upgrade markers, we remove it here.
for from , to := range m . upgrading {
if to == peerID {
delete ( m . upgrading , from )
}
}
delete ( m . connected , peerID )
delete ( m . upgrading , peerID )
delete ( m . evict , peerID )
delete ( m . evicting , peerID )
m . broadcast ( PeerUpdate {
PeerID : peerID ,
@ -871,18 +880,13 @@ func (m *PeerManager) TryEvictNext() (NodeID, error) {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
// We first prune the upgrade list. All connection slots were full when the
// upgrades began, but we may have disconnected other peers in the meanwhile
// and thus don't have to evict the upgraded peers after all.
for from , to := range m . upgrading {
// Stop pruning when the upgrade slots are only for connections
// exceeding MaxConnected.
if m . options . MaxConnected == 0 ||
len ( m . upgrading ) <= len ( m . connected ) - len ( m . evicting ) - int ( m . options . MaxConnected ) {
break
}
if m . connected [ to ] {
delete ( m . upgrading , from )
// If any connected peers are explicitly scheduled for eviction, we return a
// random one.
for peerID := range m . evict {
delete ( m . evict , peerID )
if m . connected [ peerID ] && ! m . evicting [ peerID ] {
m . evicting [ peerID ] = true
return peerID , nil
}
}
@ -892,26 +896,7 @@ func (m *PeerManager) TryEvictNext() (NodeID, error) {
return "" , nil
}
// Look for any upgraded peers that we can evict.
for from , to := range m . upgrading {
if m . connected [ to ] {
delete ( m . upgrading , from )
// We may have connected to even lower-scored peers that we can
// evict since we started upgrading this one, in which case we can
// evict one of those.
if fromPeer , ok := m . store . Get ( from ) ; ! ok {
continue
} else if evictPeer := m . findUpgradeCandidate ( fromPeer . ID , fromPeer . Score ( ) ) ; evictPeer != "" {
m . evicting [ evictPeer ] = true
return evictPeer , nil
} else {
m . evicting [ from ] = true
return from , nil
}
}
}
// If we didn't find any upgraded peers to evict, we just pick a low-ranked one.
// If we're above capacity, just pick the lowest-ranked peer to evict.
ranked := m . store . Ranked ( )
for i := len ( ranked ) - 1 ; i >= 0 ; i -- {
peer := ranked [ i ]
@ -928,15 +913,6 @@ func (m *PeerManager) TryEvictNext() (NodeID, error) {
// to make room for the given peer. Returns an empty ID if none is found.
// The caller must hold the mutex lock.
func ( m * PeerManager ) findUpgradeCandidate ( id NodeID , score PeerScore ) NodeID {
// Check for any existing upgrade claims to this peer. It is important that
// we return this, since we can get an inbound connection from a peer that
// we're concurrently trying to dial for an upgrade, and we want the inbound
// connection to be accepted in this case.
for from , to := range m . upgrading {
if to == id {
return from
}
}
ranked := m . store . Ranked ( )
for i := len ( ranked ) - 1 ; i >= 0 ; i -- {
candidate := ranked [ i ]
@ -944,6 +920,7 @@ func (m *PeerManager) findUpgradeCandidate(id NodeID, score PeerScore) NodeID {
case candidate . Score ( ) >= score :
return "" // no further peers can be scored lower, due to sorting
case ! m . connected [ candidate . ID ] :
case m . evict [ candidate . ID ] :
case m . evicting [ candidate . ID ] :
case m . upgrading [ candidate . ID ] != "" :
default :