@ -5,9 +5,12 @@ import (
"errors"
"fmt"
"io"
"math"
"math/rand"
"net"
"net/url"
"runtime/debug"
"sort"
"strconv"
"sync"
"time"
@ -134,15 +137,6 @@ const (
PeerStatusBanned = PeerStatus ( "banned" ) // Peer which is banned for misbehavior.
)
// PeerPriority specifies peer priorities.
type PeerPriority int
const (
PeerPriorityNormal PeerPriority = iota + 1
PeerPriorityValidator
PeerPriorityPersistent
)
// PeerError is a peer error reported by a reactor via the Error channel. The
// severity may cause the peer to be disconnected or banned depending on policy.
type PeerError struct {
@ -214,10 +208,20 @@ type PeerUpdate struct {
Status PeerStatus
}
// peerManager manages peer information, using a peerStore for underlying
// storage. Its primary purpose is to determine which peers to connect to next,
// make sure a peer only has a single active connection (either inbound or outbound),
// and to avoid dialing the same peer in parallel goroutines.
// PeerScore is a numeric score assigned to a peer (higher is better).
type PeerScore uint16
const (
// PeerScorePersistent is added for persistent peers.
PeerScorePersistent PeerScore = 100
)
// PeerManager manages peer lifecycle information, using a peerStore for
// underlying storage. Its primary purpose is to determine which peers to
// connect to next, make sure a peer only has a single active connection (either
// inbound or outbound), and evict peers to make room for higher-scored peers.
// It does not manage actual connections (this is handled by the Router),
// only the peer lifecycle state.
//
// For an outbound connection, the flow is as follows:
// - DialNext: returns a peer address to dial, marking the peer as dialing.
@ -235,6 +239,12 @@ type PeerUpdate struct {
// - Disconnected: peer disconnects, unmarking as connected and broadcasts a
// PeerStatusDown peer update.
//
// If we need to evict a peer, typically because we have connected to additional
// higher-scored peers and need to shed lower-scored ones, the flow is as follows:
// - EvictNext: returns a peer ID to evict, marking peer as evicting.
// - Disconnected: peer was disconnected, unmarking as connected and evicting,
// and broadcasts a PeerStatusDown peer update.
//
// We track dialing and connected states independently. This allows us to accept
// an inbound connection from a peer while the router is also dialing an
// outbound connection to that same peer, which will cause the dialer to
@ -242,52 +252,114 @@ type PeerUpdate struct {
// avoids race conditions where multiple goroutines may end up dialing a peer if
// an incoming connection was briefly accepted and disconnected while we were
// also dialing.
type peerManager struct {
type PeerManager struct {
options PeerManagerOptions
mtx sync . Mutex
store * peerStore
dialing map [ NodeID ] bool
connected map [ NodeID ] bool
evicting map [ NodeID ] bool
subscriptions map [ * PeerUpdatesCh ] * PeerUpdatesCh // keyed by struct identity (address)
}
// newPeerManager creates a new peer manager.
func newPeerManager ( store * peerStore ) * peerManager {
return & peerManager {
store : store ,
// PeerManagerOptions specifies options for a PeerManager.
type PeerManagerOptions struct {
// PersistentPeers are peers that we want to maintain persistent connections
// to. These will be scored higher than other peers, and if
// MaxConnectedUpgrade is non-zero any lower-scored peers will be evicted if
// necessary to make room for these.
PersistentPeers [ ] NodeID
// MaxConnected is the maximum number of connected peers (inbound and
// outbound). 0 means no limit.
MaxConnected uint16
// MaxConnectedUpgrade is the maximum number of additional connections to
// use for probing any better-scored peers to upgrade to when all connection
// slots are full. 0 disables peer upgrading.
//
// For example, if we are already connected to MaxConnected peers, but we
// know or learn about better-scored peers (e.g. configured persistent
// peers) that we are not connected too, then we can probe these peers by
// using up to MaxConnectedUpgrade connections, and once connected evict the
// lowest-scored connected peers. This also works for inbound connections,
// i.e. if a higher-scored peer attempts to connect to us, we can accept
// the connection and evict a lower-scored peer.
MaxConnectedUpgrade uint16
// MinRetryTime is the minimum time to wait between retries. Retry times
// double for each retry, up to MaxRetryTime. 0 disables retries.
MinRetryTime time . Duration
// MaxRetryTime is the maximum time to wait between retries. 0 means
// no maximum, in which case the retry time will keep doubling.
MaxRetryTime time . Duration
// MaxRetryTimePersistent is the maximum time to wait between retries for
// peers listed in PersistentPeers. 0 uses MaxRetryTime instead.
MaxRetryTimePersistent time . Duration
// RetryTimeJitter is the upper bound of a random interval added to
// retry times, to avoid thundering herds. 0 disables jutter.
RetryTimeJitter time . Duration
}
// isPersistent is a convenience function that checks if the given peer ID
// is contained in PersistentPeers. It just uses a linear search, since
// PersistentPeers is expected to be small.
func ( o PeerManagerOptions ) isPersistent ( id NodeID ) bool {
for _ , p := range o . PersistentPeers {
if id == p {
return true
}
}
return false
}
// NewPeerManager creates a new peer manager.
func NewPeerManager ( options PeerManagerOptions ) * PeerManager {
return & PeerManager {
options : options ,
// FIXME: Once the store persists data, we need to update existing
// peers in the store with any new information, e.g. changes to
// PersistentPeers configuration.
store : newPeerStore ( ) ,
dialing : map [ NodeID ] bool { } ,
connected : map [ NodeID ] bool { } ,
evicting : map [ NodeID ] bool { } ,
subscriptions : map [ * PeerUpdatesCh ] * PeerUpdatesCh { } ,
}
}
// Add adds a peer to the manager, given as an address. If the peer already
// exists, the address is added to it.
func ( m * peerManager ) Add ( address PeerAddress ) error {
func ( m * P eerManager) Add ( address PeerAddress ) error {
if err := address . Validate ( ) ; err != nil {
return err
}
peerID := address . NodeID ( )
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
peer , err := m . store . Get ( peerID )
peer , err := m . store . Get ( address . NodeID ( ) )
if err != nil {
return err
}
if peer == nil {
peer = newPeerInfo ( peerID )
}
if peer . AddAddress ( address ) {
return m . store . Set ( peer )
peer = & peerInfo {
ID : address . NodeID ( ) ,
Persistent : m . options . isPersistent ( address . NodeID ( ) ) ,
}
}
return nil
peer . AddAddress ( address )
return m . store . Set ( peer )
}
// Subscribe subscribes to peer updates. The caller must consume the peer
// updates in a timely fashion and close the subscription when done, since
// delivery is guaranteed and will block peer connection/disconnection
// otherwise.
func ( m * peerManager ) Subscribe ( ) * PeerUpdatesCh {
func ( m * P eerManager) Subscribe ( ) * PeerUpdatesCh {
// FIXME: We may want to use a size 1 buffer here. When the router
// broadcasts a peer update it has to loop over all of the
// subscriptions, and we want to avoid blocking and waiting for a
@ -316,7 +388,7 @@ func (m *peerManager) Subscribe() *PeerUpdatesCh {
//
// FIXME: Consider using more fine-grained mutexes here, and/or a channel to
// enforce ordering of updates.
func ( m * p eerManager) broadcast ( peerUpdate PeerUpdate ) {
func ( m * P eerManager) broadcast ( peerUpdate PeerUpdate ) {
for _ , sub := range m . subscriptions {
select {
case sub . updatesCh <- peerUpdate :
@ -327,102 +399,213 @@ func (m *peerManager) broadcast(peerUpdate PeerUpdate) {
// DialNext finds an appropriate peer address to dial, and marks it as dialing.
// The peer will not be returned again until Dialed() or DialFailed() is called
// for the peer and it is no longer connected.
// for the peer and it is no longer connected. Returns an empty ID if no
// appropriate peers are available, or if all connection slots are full.
//
// Returns an empty ID if no appropriate peers are available.
func ( m * peerManager ) DialNext ( ) ( NodeID , PeerAddress , error ) {
// We allow dialing MaxConnected+MaxConnectedUpgrade peers. Including
// MaxConnectedUpgrade allows us to dial additional peers beyond MaxConnected if
// they have a higher score than any other connected or dialing peer. If we are
// successful in dialing, and thus have more than MaxConnected connected peers,
// the lower-scored peer will be evicted via EvictNext().
func ( m * PeerManager ) DialNext ( ) ( NodeID , PeerAddress , error ) {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
peers , err := m . store . List ( )
if m . options . MaxConnected > 0 &&
len ( m . connected ) + len ( m . dialing ) >= int ( m . options . MaxConnected ) + int ( m . options . MaxConnectedUpgrade ) {
return "" , PeerAddress { } , nil
}
ranked , err := m . store . Ranked ( )
if err != nil {
return "" , PeerAddress { } , err
}
for _ , peer := range peers {
switch {
case len ( peer . Addresses ) == 0 :
case m . dialing [ peer . ID ] :
case m . connected [ peer . ID ] :
default :
// FIXME: We currently only dial the first address, but we should
// track connection statistics for each address and return the most
// appropriate one.
for _ , peer := range ranked {
if m . dialing [ peer . ID ] || m . connected [ peer . ID ] {
continue
}
for _ , addressInfo := range peer . AddressInfo {
if time . Since ( addressInfo . LastDialFailure ) < m . retryDelay ( peer , addressInfo . DialFailures ) {
continue
}
// At this point we have an eligible address to dial. If we're full
// but have peer upgrade capacity (as checked above), we need to
// make sure there exists an evictable peer of a lower score that we
// can replace. If so, we can go ahead and dial this peer, and
// EvictNext() will evict a lower-scored one later.
//
// If we don't find one, there is no point in trying additional
// peers, since they will all have the same or lower score than this
// peer (since they're ordered by score via peerStore.Ranked).
//
// FIXME: There is a race condition here where, if there exists a
// single lower-scored peer, we may end up dialing multiple
// higher-scored new peers that all expect the same lower-scored
// peer to be evicted, causing us to take on too many peers. We may
// need to reserve the eviction for this specific peer such that
// others can't claim it.
if m . options . MaxConnected > 0 &&
len ( m . connected ) >= int ( m . options . MaxConnected ) &&
! m . peerIsUpgrade ( peer , ranked ) {
return "" , PeerAddress { } , nil
}
m . dialing [ peer . ID ] = true
return peer . ID , peer . Addresses [ 0 ] , nil
return peer . ID , addressInfo . Address , nil
}
}
return "" , PeerAddress { } , nil
}
// retryDelay calculates a dial retry delay using exponential backoff, based on
// retry settings in PeerManagerOptions. If MinRetryTime is 0, this returns
// MaxInt64 (i.e. an infinite retry delay, effectively disabling retries).
func ( m * PeerManager ) retryDelay ( peer * peerInfo , failures uint32 ) time . Duration {
if failures == 0 {
return 0
}
if m . options . MinRetryTime == 0 {
return time . Duration ( math . MaxInt64 )
}
maxDelay := m . options . MaxRetryTime
if peer . Persistent && m . options . MaxRetryTimePersistent > 0 {
maxDelay = m . options . MaxRetryTimePersistent
}
delay := m . options . MinRetryTime * time . Duration ( math . Pow ( 2 , float64 ( failures ) ) )
if maxDelay > 0 && delay > maxDelay {
delay = maxDelay
}
// FIXME: This should use a PeerManager-scoped RNG.
delay += time . Duration ( rand . Int63n ( int64 ( m . options . RetryTimeJitter ) ) ) // nolint:gosec
return delay
}
// DialFailed reports a failed dial attempt. This will make the peer available
// for dialing again when appropriate.
func ( m * peerManager ) DialFailed ( peerID NodeID , address PeerAddress ) error {
//
// FIXME: This should probably delete or mark bad addresses/peers after some time.
func ( m * PeerManager ) DialFailed ( peerID NodeID , address PeerAddress ) error {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
delete ( m . dialing , peerID )
// FIXME: We need to track address quality statistics and exponential backoff.
peer , err := m . store . Get ( peerID )
if err != nil || peer == nil { // Peer may have been removed while dialing, ignore.
return err
}
if addressInfo := peer . LookupAddressInfo ( address ) ; addressInfo != nil {
addressInfo . LastDialFailure = time . Now ( ) . UTC ( )
addressInfo . DialFailures ++
return m . store . Set ( peer )
}
return nil
}
// Dialed marks a peer as successfully dialed. Any further incoming connections
// will be rejected, and once disconnected the peer may be dialed again.
func ( m * peerManager ) Dialed ( peerID NodeID , address PeerAddress ) error {
func ( m * P eerManager) Dialed ( peerID NodeID , address PeerAddress ) error {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
delete ( m . dialing , peerID )
if m . connected [ peerID ] {
return fmt . Errorf ( "peer %v is already connected" , peerID )
}
if m . options . MaxConnected > 0 &&
len ( m . connected ) >= int ( m . options . MaxConnected ) + int ( m . options . MaxConnectedUpgrade ) {
return fmt . Errorf ( "already connected to maximum number of peers" )
}
peer , err := m . store . Get ( peerID )
if err != nil {
return err
} else if peer == nil {
return fmt . Errorf ( "unknown peer %q" , peerID )
return fmt . Errorf ( "peer %q was removed while dialing " , peerID )
}
m . connected [ peerID ] = true
if m . connected [ peerID ] {
return fmt . Errorf ( "peer %v is already connected" , peerID )
now := time . Now ( ) . UTC ( )
peer . LastConnected = now
if addressInfo := peer . LookupAddressInfo ( address ) ; addressInfo != nil {
addressInfo . DialFailures = 0
addressInfo . LastDialSuccess = now
}
delete ( m . dialing , peerID )
m . connected [ peerID ] = true
return nil
return m . store . Set ( peer )
}
// Accepted marks an incoming peer connection successfully accepted. If the peer
// is already connected this will return an error.
// is already connected or we don't allow additional connections then this will
// return an error.
//
// If MaxConnectedUpgrade is non-zero, the accepted peer is better-scored than any
// other connected peer, and the number of connections does not exceed
// MaxConnected + MaxConnectedUpgrade then we accept the connection and rely on
// EvictNext() to evict lower-scored peers.
//
// NOTE: We can't take an address here, since e.g. TCP uses a different port
// number for outbound traffic than inbound traffic, so the peer's endpoint
// wouldn't necessarily be an appropriate address to dial.
func ( m * peerManager ) Accepted ( peerID NodeID ) error {
func ( m * P eerManager) Accepted ( peerID NodeID ) error {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
if m . connected [ peerID ] {
return fmt . Errorf ( "peer %q is already connected" , peerID )
}
if m . options . MaxConnected > 0 &&
len ( m . connected ) >= int ( m . options . MaxConnected ) + int ( m . options . MaxConnectedUpgrade ) {
return fmt . Errorf ( "already connected to maximum number of peers" )
}
peer , err := m . store . Get ( peerID )
if err != nil {
return err
} else if peer == nil {
peer = newPeerInfo ( peerID )
if err = m . store . Set ( peer ) ; err != nil {
return err
}
if peer == nil {
peer = & peerInfo {
ID : peerID ,
Persistent : m . options . isPersistent ( peerID ) ,
}
}
if m . connected [ peerID ] {
return fmt . Errorf ( "peer %q is already connected" , peerID )
// If we're already full (i.e. at MaxConnected), but we allow upgrades (and we
// know from the check above that we have upgrade capacity), then we can look
// for a lower-scored evictable peer, and if found we can accept this connection
// anyway and let EvictNext() evict the lower-scored peer for us.
//
// FIXME: There is a race condition here where, if there exists a single
// lower-scored peer, we may end up accepting multiple higher-scored new
// peers that all expect the same lower-scored peer to be evicted, causing
// us to take on too many peers. We may need to reserve the eviction for
// this specific peer such that others can't claim it.
if m . options . MaxConnected > 0 && len ( m . connected ) >= int ( m . options . MaxConnected ) {
ranked , err := m . store . Ranked ( )
if err != nil {
return err
}
if ! m . peerIsUpgrade ( peer , ranked ) {
return fmt . Errorf ( "already connected to maximum number of peers" )
}
}
m . connected [ peerID ] = true
return nil
peer . LastConnected = time . Now ( ) . UTC ( )
return m . store . Set ( peer )
}
// Ready marks a peer as ready, broadcasting status updates to subscribers. The
// peer must already be marked as connected. This is separate from Dialed() and
// Accepted() to allow the router to set up its internal queues before reactors
// start sending messages (holding the Router.peerMtx mutex while calling
// Accepted or Dialed will halt all message routing while peers are set up, which
// is too expensive and also causes difficulties in tests where we may want to
// consume peer updates and send messages sequentially).
//
// FIXME: This possibly indicates an architectural problem. Should the peerManager
// handle actual network connections to/from peers as well? Or should all of this
// be done by the router?
func ( m * peerManager ) Ready ( peerID NodeID ) {
// start sending messages.
func ( m * PeerManager ) Ready ( peerID NodeID ) {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
connected := m . connected [ peerID ]
if connected {
m . broadcast ( PeerUpdate {
@ -434,10 +617,12 @@ func (m *peerManager) Ready(peerID NodeID) {
// Disconnected unmarks a peer as connected, allowing new connections to be
// established.
func ( m * p eerManager) Disconnected ( peerID NodeID ) error {
func ( m * P eerManager) Disconnected ( peerID NodeID ) error {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
delete ( m . connected , peerID )
delete ( m . evicting , peerID )
m . broadcast ( PeerUpdate {
PeerID : peerID ,
Status : PeerStatusDown ,
@ -445,12 +630,100 @@ func (m *peerManager) Disconnected(peerID NodeID) error {
return nil
}
// EvictNext returns the next peer to evict (i.e. disconnect), or an empty ID if
// no peers should be evicted. The evicted peer will be a lowest-scored peer
// that is currently connected and not already being evicted.
func ( m * PeerManager ) EvictNext ( ) ( NodeID , error ) {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
if m . options . MaxConnected == 0 ||
len ( m . connected ) - len ( m . evicting ) <= int ( m . options . MaxConnected ) {
return "" , nil
}
ranked , err := m . store . Ranked ( )
if err != nil {
return "" , err
}
for i := len ( ranked ) - 1 ; i >= 0 ; i -- {
peer := ranked [ i ]
if m . connected [ peer . ID ] && ! m . evicting [ peer . ID ] {
m . evicting [ peer . ID ] = true
return peer . ID , nil
}
}
return "" , nil
}
// peerIsUpgrade checks whether connecting to a given peer would be an
// upgrade, i.e. that there exists a lower-scored peer that is already
// connected and not scheduled for eviction, such that connecting to
// the peer would cause a lower-scored peer to be evicted if we're full.
func ( m * PeerManager ) peerIsUpgrade ( peer * peerInfo , ranked [ ] * peerInfo ) bool {
for i := len ( ranked ) - 1 ; i >= 0 ; i -- {
candidate := ranked [ i ]
if candidate . Score ( ) >= peer . Score ( ) {
return false
}
if m . connected [ candidate . ID ] && ! m . evicting [ candidate . ID ] {
return true
}
}
return false
}
// GetHeight returns a peer's height, as reported via SetHeight. If the peer
// or height is unknown, this returns 0.
//
// FIXME: This is a temporary workaround for the peer state stored via the
// legacy Peer.Set() and Peer.Get() APIs, used to share height state between the
// consensus and mempool reactors. These dependencies should be removed from the
// reactors, and instead query this information independently via new P2P
// protocol additions.
func ( m * PeerManager ) GetHeight ( peerID NodeID ) ( int64 , error ) {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
peer , err := m . store . Get ( peerID )
if err != nil || peer == nil {
return 0 , err
}
return peer . Height , nil
}
// SetHeight stores a peer's height, making it available via GetHeight. If the
// peer is unknown, it is created.
//
// FIXME: This is a temporary workaround for the peer state stored via the
// legacy Peer.Set() and Peer.Get() APIs, used to share height state between the
// consensus and mempool reactors. These dependencies should be removed from the
// reactors, and instead query this information independently via new P2P
// protocol additions.
func ( m * PeerManager ) SetHeight ( peerID NodeID , height int64 ) error {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
peer , err := m . store . Get ( peerID )
if err != nil {
return err
}
if peer == nil {
peer = & peerInfo {
ID : peerID ,
Persistent : m . options . isPersistent ( peerID ) ,
}
}
peer . Height = height
return m . store . Set ( peer )
}
// peerStore stores information about peers. It is currently a bare-bones
// in-memory store, and will be fleshed out later.
//
// peerStore is not thread-safe, since it assumes it is only used by peerManager
// which handles concurrency control. This allows multiple operations to be
// executed atomically, since the peerManager will hold a mutex while executing.
// peerStore is not thread-safe, since it assumes it is only used by P eerManager
// which handles concurrency control. This allows the manager to execute multipl e
// operations atomically while it holds the mutex .
type peerStore struct {
peers map [ NodeID ] peerInfo
}
@ -490,35 +763,76 @@ func (s *peerStore) List() ([]*peerInfo, error) {
return peers , nil
}
// peerInfo contains peer information stored in a peerStore.
// Ranked returns a list of peers ordered by score (better peers first).
// Peers with equal scores are returned in an arbitrary order.
//
// FIXME: This should be renamed peer or something else once the old peer is
// removed.
type peerInfo struct {
ID NodeID
Addresses [ ] PeerAddress
// This is used to determine which peers to connect to and which peers to evict
// in order to make room for better peers.
//
// FIXME: For now, we simply generate the list on every call, but this can get
// expensive since it's called fairly frequently. We may want to either cache
// this, or store peers in a data structure that maintains order (e.g. a heap or
// ordered map).
func ( s * peerStore ) Ranked ( ) ( [ ] * peerInfo , error ) {
peers , err := s . List ( )
if err != nil {
return nil , err
}
sort . Slice ( peers , func ( i , j int ) bool {
// FIXME: If necessary, consider precomputing scores before sorting,
// to reduce the number of Score() calls.
return peers [ i ] . Score ( ) > peers [ j ] . Score ( )
} )
return peers , nil
}
// newPeerInfo creates a new peerInfo.
func newPeerInfo ( id NodeID ) * peerInfo {
return & peerInfo {
ID : id ,
Addresses : [ ] PeerAddress { } ,
}
// peerInfo contains peer information stored in a peerStore.
type peerInfo struct {
ID NodeID
AddressInfo [ ] * addressInfo
Persistent bool
Height int64
LastConnected time . Time
}
// AddAddress adds an address to a peer, unless it already exists. It does not
// validate the address. Returns true if the address was new.
func ( p * peerInfo ) AddAddress ( address PeerAddress ) bool {
if p . LookupAddressInfo ( address ) != nil {
return false
}
p . AddressInfo = append ( p . AddressInfo , & addressInfo { Address : address } )
return true
}
// LookupAddressInfo returns address info for an address, or nil if unknown.
func ( p * peerInfo ) LookupAddressInfo ( address PeerAddress ) * addressInfo {
// We just do a linear search for now.
addressString := address . String ( )
for _ , a := range p . Addresses {
if a . String ( ) == addressString {
return false
for _ , info := range p . AddressInfo {
if info . Address . String ( ) == addressString {
return info
}
}
p . Addresses = append ( p . Addresses , address )
return true
return nil
}
// Score calculates a score for the peer. Higher-scored peers will be
// preferred over lower scores.
func ( p * peerInfo ) Score ( ) PeerScore {
var score PeerScore
if p . Persistent {
score += PeerScorePersistent
}
return score
}
// addressInfo contains information and statistics about an address.
type addressInfo struct {
Address PeerAddress
LastDialSuccess time . Time
LastDialFailure time . Time
DialFailures uint32 // since last successful dial
}
// ============================================================================