@ -223,6 +223,14 @@ const (
// It does not manage actual connections (this is handled by the Router),
// only the peer lifecycle state.
//
// We track dialing and connected states independently. This allows us to accept
// an inbound connection from a peer while the router is also dialing an
// outbound connection to that same peer, which will cause the dialer to
// eventually error when attempting to mark the peer as connected. This also
// avoids race conditions where multiple goroutines may end up dialing a peer if
// an incoming connection was briefly accepted and disconnected while we were
// also dialing.
//
// For an outbound connection, the flow is as follows:
// - DialNext: returns a peer address to dial, marking the peer as dialing.
// - DialFailed: reports a dial failure, unmarking the peer as dialing.
@ -239,27 +247,41 @@ const (
// - Disconnected: peer disconnects, unmarking as connected and broadcasts a
// PeerStatusDown peer update.
//
// If we need to evict a peer, typically because we have connected to additional
// higher-scored peers and need to shed lower-scored ones, the flow is as follows:
// If we are connected to too many peers (more than MaxConnections), typically
// because we have upgraded to higher-scored peers and need to shed lower-scored
// ones, the flow is as follows:
// - EvictNext: returns a peer ID to evict, marking peer as evicting.
// - Disconnected: peer was disconnected, unmarking as connected and evicting,
// and broadcasts a PeerStatusDown peer update.
//
// We track dialing and connected states independently. This allows us to accept
// an inbound connection from a peer while the router is also dialing an
// outbound connection to that same peer, which will cause the dialer to
// eventually error (when attempting to mark the peer as connected). This also
// avoids race conditions where multiple goroutines may end up dialing a peer if
// an incoming connection was briefly accepted and disconnected while we were
// also dialing.
// If all connection slots are full (at MaxConnections), we can use up to
// MaxConnectionsUpgrade additional connections to probe any higher-scored
// unconnected peers, and if we reach them (or they reach us) we allow the
// connection and evict lower-scored peers. We mark the lower-scored peer as
// upgrading[from]=to to make sure no other higher-scored peers can claim the
// same one for an upgrade. The flow is as follows:
// - Accepted: if upgrade is possible, mark upgrading[from]=to and connected.
// - DialNext: if upgrade is possible, mark upgrading[from]=to and dialing.
// - DialFailed: unmark upgrading[from]=to and dialing.
// - Dialed: unmark dialing, mark as connected.
// - EvictNext: unmark upgrading[from]=to, then if over MaxConnections
// either the upgraded peer or an even lower-scored one (if found)
// is marked as evicting and returned.
// - Disconnected: unmark connected and evicting, also upgrading[from]=to
// both from and to (in case either disconnected before eviction).
type PeerManager struct {
options PeerManagerOptions
options PeerManagerOptions
wakeDialCh chan struct { } // wakes up DialNext() on relevant peer changes
wakeEvictCh chan struct { } // wakes up EvictNext() on relevant peer changes
closeCh chan struct { } // signal channel for Close()
closeOnce sync . Once
mtx sync . Mutex
store * peerStore
dialing map [ NodeID ] bool
connected map [ NodeID ] bool
evicting map [ NodeID ] bool
dialing map [ NodeID ] bool // peers being dialed (DialNext -> Dialed/DialFail)
connected map [ NodeID ] bool // connected peers (Dialed/Accepted -> Disconnected)
upgrading map [ NodeID ] NodeID // peers claimed for upgrading (key is lower-scored peer)
evicting map [ NodeID ] bool // peers being evicted (EvictNext -> Disconnected)
subscriptions map [ * PeerUpdatesCh ] * PeerUpdatesCh // keyed by struct identity (address)
}
@ -321,17 +343,37 @@ func (o PeerManagerOptions) isPersistent(id NodeID) bool {
func NewPeerManager ( options PeerManagerOptions ) * PeerManager {
return & PeerManager {
options : options ,
closeCh : make ( chan struct { } ) ,
// We use a buffer of size 1 for these trigger channels, with
// non-blocking sends. This ensures that if e.g. wakeDial() is called
// multiple times before the initial trigger is picked up we only
// process the trigger once.
//
// FIXME: This should maybe be a libs/sync type.
wakeDialCh : make ( chan struct { } , 1 ) ,
wakeEvictCh : make ( chan struct { } , 1 ) ,
// FIXME: Once the store persists data, we need to update existing
// peers in the store with any new information, e.g. changes to
// PersistentPeers configuration.
store : newPeerStore ( ) ,
dialing : map [ NodeID ] bool { } ,
connected : map [ NodeID ] bool { } ,
upgrading : map [ NodeID ] NodeID { } ,
evicting : map [ NodeID ] bool { } ,
subscriptions : map [ * PeerUpdatesCh ] * PeerUpdatesCh { } ,
}
}
// Close closes the peer manager, releasing resources allocated with it
// (specifically any running goroutines).
func ( m * PeerManager ) Close ( ) {
m . closeOnce . Do ( func ( ) {
close ( m . closeCh )
} )
}
// Add adds a peer to the manager, given as an address. If the peer already
// exists, the address is added to it.
func ( m * PeerManager ) Add ( address PeerAddress ) error {
@ -352,7 +394,12 @@ func (m *PeerManager) Add(address PeerAddress) error {
}
}
peer . AddAddress ( address )
return m . store . Set ( peer )
err = m . store . Set ( peer )
if err != nil {
return err
}
m . wakeDial ( )
return nil
}
// Subscribe subscribes to peer updates. The caller must consume the peer
@ -398,19 +445,33 @@ func (m *PeerManager) broadcast(peerUpdate PeerUpdate) {
}
// DialNext finds an appropriate peer address to dial, and marks it as dialing.
// The peer will not be returned again until Dialed() or DialFailed() is called
// for the peer and it is no longer connected. Returns an empty ID if no
// appropriate peers are available, or if all connection slots are full.
//
// We allow dialing MaxConnected+MaxConnectedUpgrade peers. Including
// MaxConnectedUpgrade allows us to dial additional peers beyond MaxConnected if
// they have a higher score than any other connected or dialing peer. If we are
// successful in dialing, and thus have more than MaxConnected connected peers,
// the lower-scored peer will be evicted via EvictNext().
func ( m * PeerManager ) DialNext ( ) ( NodeID , PeerAddress , error ) {
// If no peer is found, or all connection slots are full, it blocks until one
// becomes available. The caller must call Dialed() or DialFailed() for the
// returned peer. The context can be used to cancel the call.
func ( m * PeerManager ) DialNext ( ctx context . Context ) ( NodeID , PeerAddress , error ) {
for {
id , address , err := m . TryDialNext ( )
if err != nil || id != "" {
return id , address , err
}
select {
case <- m . wakeDialCh :
case <- ctx . Done ( ) :
return "" , PeerAddress { } , ctx . Err ( )
}
}
}
// TryDialNext is equivalent to DialNext(), but immediately returns an empty
// peer ID if no peers or connection slots are available.
func ( m * PeerManager ) TryDialNext ( ) ( NodeID , PeerAddress , error ) {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
// We allow dialing MaxConnected+MaxConnectedUpgrade peers. Including
// MaxConnectedUpgrade allows us to probe additional peers that have a
// higher score than a connected peer, and if successful evict the
// lower-scored peer via EvictNext().
if m . options . MaxConnected > 0 &&
len ( m . connected ) + len ( m . dialing ) >= int ( m . options . MaxConnected ) + int ( m . options . MaxConnectedUpgrade ) {
return "" , PeerAddress { } , nil
@ -433,23 +494,18 @@ func (m *PeerManager) DialNext() (NodeID, PeerAddress, error) {
// At this point we have an eligible address to dial. If we're full
// but have peer upgrade capacity (as checked above), we need to
// make sure there exists an evictable peer of a lower score that we
// can replace. If so, we can go ahead and dial this peer, and
// EvictNext() will evict a lower-scored one later.
// can replace. If so, we mark the lower-scored peer as upgrading so
// noone else can claim it, and EvictNext() will evict it later.
//
// If we don't find one, there is no point in trying additional
// peers, since they will all have the same or lower score than this
// peer (since they're ordered by score via peerStore.Ranked).
//
// FIXME: There is a race condition here where, if there exists a
// single lower-scored peer, we may end up dialing multiple
// higher-scored new peers that all expect the same lower-scored
// peer to be evicted, causing us to take on too many peers. We may
// need to reserve the eviction for this specific peer such that
// others can't claim it.
if m . options . MaxConnected > 0 &&
len ( m . connected ) >= int ( m . options . MaxConnected ) &&
! m . peerIsUpgrade ( peer , ranked ) {
return "" , PeerAddress { } , nil
if m . options . MaxConnected > 0 && len ( m . connected ) >= int ( m . options . MaxConnected ) {
upgradePeer := m . findUpgradeCandidate ( peer , ranked )
if upgradePeer == "" {
return "" , PeerAddress { } , nil
}
m . upgrading [ upgradePeer ] = peer . ID
}
m . dialing [ peer . ID ] = true
@ -459,6 +515,29 @@ func (m *PeerManager) DialNext() (NodeID, PeerAddress, error) {
return "" , PeerAddress { } , nil
}
// wakeDial is used to notify DialNext about changes that *may* cause new
// peers to become eligible for dialing, such as peer disconnections and
// retry timeouts.
func ( m * PeerManager ) wakeDial ( ) {
// The channel has a 1-size buffer. A non-blocking send ensures
// we only queue up at most 1 trigger between each DialNext().
select {
case m . wakeDialCh <- struct { } { } :
default :
}
}
// wakeEvict is used to notify EvictNext about changes that *may* cause
// peers to become eligible for eviction, such as peer upgrades.
func ( m * PeerManager ) wakeEvict ( ) {
// The channel has a 1-size buffer. A non-blocking send ensures
// we only queue up at most 1 trigger between each EvictNext().
select {
case m . wakeEvictCh <- struct { } { } :
default :
}
}
// retryDelay calculates a dial retry delay using exponential backoff, based on
// retry settings in PeerManagerOptions. If MinRetryTime is 0, this returns
// MaxInt64 (i.e. an infinite retry delay, effectively disabling retries).
@ -492,16 +571,49 @@ func (m *PeerManager) DialFailed(peerID NodeID, address PeerAddress) error {
defer m . mtx . Unlock ( )
delete ( m . dialing , peerID )
for from , to := range m . upgrading {
if to == peerID {
// Unmark failed upgrade attempt.
delete ( m . upgrading , from )
}
}
peer , err := m . store . Get ( peerID )
if err != nil || peer == nil { // Peer may have been removed while dialing, ignore.
return err
}
if addressInfo := peer . LookupAddressInfo ( address ) ; addressInfo != nil {
addressInfo . LastDialFailure = time . Now ( ) . UTC ( )
addressInfo . DialFailures ++
return m . store . Set ( peer )
addressInfo := peer . LookupAddressInfo ( address )
if addressInfo == nil {
return nil // Assume the address has been removed, ignore.
}
addressInfo . LastDialFailure = time . Now ( ) . UTC ( )
addressInfo . DialFailures ++
if err = m . store . Set ( peer ) ; err != nil {
return err
}
// We spawn a goroutine that notifies DialNext() again when the retry
// timeout has elapsed, so that we can consider dialing it again.
//
// FIXME: We need to calculate the retry delay outside of the goroutine,
// since the arguments are currently pointers to structs shared in the
// peerStore. The peerStore should probably return struct copies instead,
// to avoid these sorts of issues.
if retryDelay := m . retryDelay ( peer , addressInfo . DialFailures ) ; retryDelay != time . Duration ( math . MaxInt64 ) {
go func ( ) {
// Use an explicit timer with deferred cleanup instead of
// time.After(), to avoid leaking goroutines on PeerManager.Close().
timer := time . NewTimer ( retryDelay )
defer timer . Stop ( )
select {
case <- timer . C :
m . wakeDial ( )
case <- m . closeCh :
}
} ( )
}
m . wakeDial ( )
return nil
}
@ -527,7 +639,6 @@ func (m *PeerManager) Dialed(peerID NodeID, address PeerAddress) error {
} else if peer == nil {
return fmt . Errorf ( "peer %q was removed while dialing" , peerID )
}
m . connected [ peerID ] = true
now := time . Now ( ) . UTC ( )
peer . LastConnected = now
@ -535,7 +646,14 @@ func (m *PeerManager) Dialed(peerID NodeID, address PeerAddress) error {
addressInfo . DialFailures = 0
addressInfo . LastDialSuccess = now
}
return m . store . Set ( peer )
if err = m . store . Set ( peer ) ; err != nil {
return err
}
m . connected [ peerID ] = true
m . wakeEvict ( )
return nil
}
// Accepted marks an incoming peer connection successfully accepted. If the peer
@ -573,29 +691,30 @@ func (m *PeerManager) Accepted(peerID NodeID) error {
}
}
// If we're already full (i.e. at MaxConnected), but we allow upgrades (and we
// know from the check above that we have upgrade capacity), then we can look
// for a lower-scored evictable peer, and if found we can accept this connection
// anyway and let EvictNext() evict the lower-scored peer for us.
//
// FIXME: There is a race condition here where, if there exists a single
// lower-scored peer, we may end up accepting multiple higher-scored new
// peers that all expect the same lower-scored peer to be evicted, causing
// us to take on too many peers. We may need to reserve the eviction for
// this specific peer such that others can't claim it.
// If we're already full (i.e. at MaxConnected), but we allow upgrades (and
// we know from the check above that we have upgrade capacity), then we can
// look for any lower-scored evictable peer, and if found we can accept this
// connection anyway and let EvictNext() evict a lower-scored peer for us.
if m . options . MaxConnected > 0 && len ( m . connected ) >= int ( m . options . MaxConnected ) {
ranked , err := m . store . Ranked ( )
if err != nil {
return err
}
if ! m . peerIsUpgrade ( peer , ranked ) {
upgradePeer := m . findUpgradeCandidate ( peer , ranked )
if upgradePeer == "" {
return fmt . Errorf ( "already connected to maximum number of peers" )
}
m . upgrading [ upgradePeer ] = peerID
}
m . connected [ peerID ] = true
peer . LastConnected = time . Now ( ) . UTC ( )
return m . store . Set ( peer )
if err = m . store . Set ( peer ) ; err != nil {
return err
}
m . connected [ peerID ] = true
m . wakeEvict ( )
return nil
}
// Ready marks a peer as ready, broadcasting status updates to subscribers. The
@ -606,8 +725,7 @@ func (m *PeerManager) Ready(peerID NodeID) {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
connected := m . connected [ peerID ]
if connected {
if m . connected [ peerID ] {
m . broadcast ( PeerUpdate {
PeerID : peerID ,
Status : PeerStatusUp ,
@ -621,22 +739,65 @@ func (m *PeerManager) Disconnected(peerID NodeID) error {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
// After upgrading to a peer, it's possible for that peer to disconnect
// before EvictNext() gets around to evicting the lower-scored peer. To
// avoid stale upgrade markers, we remove it here.
for from , to := range m . upgrading {
if to == peerID {
delete ( m . upgrading , from )
}
}
delete ( m . connected , peerID )
delete ( m . upgrading , peerID )
delete ( m . evicting , peerID )
m . broadcast ( PeerUpdate {
PeerID : peerID ,
Status : PeerStatusDown ,
} )
m . wakeDial ( )
return nil
}
// EvictNext returns the next peer to evict (i.e. disconnect), or an empty ID if
// no peers should be evicted. The evicted peer will be a lowest-scored peer
// that is currently connected and not already being evicted.
func ( m * PeerManager ) EvictNext ( ) ( NodeID , error ) {
// EvictNext returns the next peer to evict (i.e. disconnect). If no evictable
// peers are found, the call will block until one becomes available or the
// context is cancelled.
func ( m * PeerManager ) EvictNext ( ctx context . Context ) ( NodeID , error ) {
for {
id , err := m . TryEvictNext ( )
if err != nil || id != "" {
return id , err
}
select {
case <- m . wakeEvictCh :
case <- ctx . Done ( ) :
return "" , ctx . Err ( )
}
}
}
// TryEvictNext is equivalent to EvictNext, but immediately returns an empty
// node ID if no evictable peers are found.
func ( m * PeerManager ) TryEvictNext ( ) ( NodeID , error ) {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
// We first prune the upgrade list. All connection slots were full when the
// upgrades began, but we may have disconnected other peers in the meanwhile
// and thus don't have to evict the upgraded peers after all.
for from , to := range m . upgrading {
// Stop pruning when the upgrade slots are only for connections
// exceeding MaxConnected.
if m . options . MaxConnected == 0 ||
len ( m . upgrading ) <= len ( m . connected ) - len ( m . evicting ) - int ( m . options . MaxConnected ) {
break
}
if m . connected [ to ] {
delete ( m . upgrading , from )
}
}
// If we're below capacity, we don't need to evict anything.
if m . options . MaxConnected == 0 ||
len ( m . connected ) - len ( m . evicting ) <= int ( m . options . MaxConnected ) {
return "" , nil
@ -646,6 +807,30 @@ func (m *PeerManager) EvictNext() (NodeID, error) {
if err != nil {
return "" , err
}
// Look for any upgraded peers that we can evict.
for from , to := range m . upgrading {
if m . connected [ to ] {
delete ( m . upgrading , from )
// We may have connected to even lower-scored peers that we can
// evict since we started upgrading this one, in which case we can
// evict one of those.
fromPeer , err := m . store . Get ( from )
if err != nil {
return "" , err
} else if fromPeer == nil {
continue
} else if evictPeer := m . findUpgradeCandidate ( fromPeer , ranked ) ; evictPeer != "" {
m . evicting [ evictPeer ] = true
return evictPeer , nil
} else {
m . evicting [ from ] = true
return from , nil
}
}
}
// If we didn't find any upgraded peers to evict, we just pick a low-ranked one.
for i := len ( ranked ) - 1 ; i >= 0 ; i -- {
peer := ranked [ i ]
if m . connected [ peer . ID ] && ! m . evicting [ peer . ID ] {
@ -653,24 +838,36 @@ func (m *PeerManager) EvictNext() (NodeID, error) {
return peer . ID , nil
}
}
return "" , nil
}
// peerIsUpgrade checks whether connecting to a given peer would be an
// upgrade, i.e. that there exists a lower-scored peer that is already
// connected and not scheduled for eviction, such that connecting to
// the peer would cause a lower-scored peer to be evicted if we're full.
func ( m * PeerManager ) peerIsUpgrade ( peer * peerInfo , ranked [ ] * peerInfo ) bool {
// findUpgradeCandidate looks for a lower-scored peer that we could evict
// to make room for the given peer. Returns an empty ID if none is found.
// The caller must hold the mutex lock.
func ( m * PeerManager ) findUpgradeCandidate ( peer * peerInfo , ranked [ ] * peerInfo ) NodeID {
// Check for any existing upgrade claims to this peer. It is important that
// we return this, since we can get an inbound connection from a peer that
// we're concurrently trying to dial for an upgrade, and we want the inbound
// connection to be accepted in this case.
for from , to := range m . upgrading {
if to == peer . ID {
return from
}
}
for i := len ( ranked ) - 1 ; i >= 0 ; i -- {
candidate := ranked [ i ]
if candidate . Score ( ) >= peer . Score ( ) {
return false
}
if m . connected [ candidate . ID ] && ! m . evicting [ candidate . ID ] {
return true
switch {
case candidate . Score ( ) >= peer . Score ( ) :
return "" // no further peers can be scored lower, due to sorting
case ! m . connected [ candidate . ID ] :
case m . evicting [ candidate . ID ] :
case m . upgrading [ candidate . ID ] != "" :
default :
return candidate . ID
}
}
return false
return ""
}
// GetHeight returns a peer's height, as reported via SetHeight. If the peer