@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"reflect"
"sort"
"time"
"github.com/pkg/errors"
@ -16,10 +17,22 @@ const (
// PexChannel is a channel for PEX messages
PexChannel = byte ( 0x00 )
// period to ensure peers connected
defaultEnsurePeersPeriod = 30 * time . Second
minNumOutboundPeers = 10
maxPexMessageSize = 1048576 // 1MB
maxPexMessageSize = 1048576 // 1MB
// ensure we have enough peers
defaultEnsurePeersPeriod = 30 * time . Second
defaultMinNumOutboundPeers = 10
// Seed/Crawler constants
// TODO:
// We want seeds to only advertise good peers.
// Peers are marked by external mechanisms.
// We need a config value that can be set to be
// on the order of how long it would take before a good
// peer is marked good.
defaultSeedDisconnectWaitPeriod = 2 * time . Minute // disconnect after this
defaultCrawlPeerInterval = 2 * time . Minute // dont redial for this. TODO: back-off
defaultCrawlPeersPeriod = 30 * time . Second // check some peers every this
)
// PEXReactor handles PEX (peer exchange) and ensures that an
@ -45,8 +58,11 @@ type PEXReactor struct {
// PEXReactorConfig holds reactor specific configuration data.
type PEXReactorConfig struct {
// Seeds is a list of addresses reactor may use if it can't connect to peers
// in the addrbook.
// Seed/Crawler mode
SeedMode bool
// Seeds is a list of addresses reactor may use
// if it can't connect to peers in the addrbook.
Seeds [ ] string
}
@ -78,7 +94,13 @@ func (r *PEXReactor) OnStart() error {
return err
}
go r . ensurePeersRoutine ( )
// Check if this node should run
// in seed/crawler mode
if r . config . SeedMode {
go r . crawlPeersRoutine ( )
} else {
go r . ensurePeersRoutine ( )
}
return nil
}
@ -107,7 +129,7 @@ func (r *PEXReactor) AddPeer(p Peer) {
// either via DialPeersAsync or r.Receive.
// Ask it for more peers if we need.
if r . book . NeedMoreAddrs ( ) {
r . RequestPEX ( p )
r . RequestAddrs ( p )
}
} else {
// For inbound peers, the peer is its own source,
@ -137,15 +159,24 @@ func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
switch msg := msg . ( type ) {
case * pexRequestMessage :
// We received a request for peers from src.
// Check we're not receiving too many requests
if err := r . receiveRequest ( src ) ; err != nil {
r . Switch . StopPeerForError ( src , err )
return
}
r . SendAddrs ( src , r . book . GetSelection ( ) )
// Seeds disconnect after sending a batch of addrs
if r . config . SeedMode {
// TODO: should we be more selective ?
r . SendAddrs ( src , r . book . GetSelection ( ) )
r . Switch . StopPeerGracefully ( src )
} else {
r . SendAddrs ( src , r . book . GetSelection ( ) )
}
case * pexAddrsMessage :
// We received some peer addresses from src.
if err := r . ReceivePEX ( msg . Addrs , src ) ; err != nil {
// If we asked for addresses, add them to the book
if err := r . ReceiveAddrs ( msg . Addrs , src ) ; err != nil {
r . Switch . StopPeerForError ( src , err )
return
}
@ -180,9 +211,9 @@ func (r *PEXReactor) receiveRequest(src Peer) error {
return nil
}
// RequestPEX asks peer for more addresses if we do not already
// RequestAddrs asks peer for more addresses if we do not already
// have a request out for this peer.
func ( r * PEXReactor ) RequestPEX ( p Peer ) {
func ( r * PEXReactor ) RequestAddrs ( p Peer ) {
id := string ( p . ID ( ) )
if r . requestsSent . Has ( id ) {
return
@ -191,10 +222,10 @@ func (r *PEXReactor) RequestPEX(p Peer) {
p . Send ( PexChannel , struct { PexMessage } { & pexRequestMessage { } } )
}
// ReceivePEX adds the given addrs to the addrbook if theres an open
// ReceiveAddrs adds the given addrs to the addrbook if theres an open
// request for this peer and deletes the open request.
// If there's no open request for the src peer, it returns an error.
func ( r * PEXReactor ) ReceivePEX ( addrs [ ] * NetAddress , src Peer ) error {
func ( r * PEXReactor ) ReceiveAddrs ( addrs [ ] * NetAddress , src Peer ) error {
id := string ( src . ID ( ) )
if ! r . requestsSent . Has ( id ) {
@ -247,19 +278,12 @@ func (r *PEXReactor) ensurePeersRoutine() {
// ensurePeers ensures that sufficient peers are connected. (once)
//
// Old bucket / New bucket are arbitrary categories to denote whether an
// address is vetted or not, and this needs to be determined over time via a
// heuristic that we haven't perfected yet, or, perhaps is manually edited by
// the node operator. It should not be used to compute what addresses are
// already connected or not.
//
// TODO Basically, we need to work harder on our good-peer/bad-peer marking.
// What we're currently doing in terms of marking good/bad peers is just a
// placeholder. It should not be the case that an address becomes old/vetted
// upon a single successful connection.
func ( r * PEXReactor ) ensurePeers ( ) {
numOutPeers , numInPeers , numDialing := r . Switch . NumPeers ( )
numToDial := m inNumOutboundPeers - ( numOutPeers + numDialing )
numToDial := defaultMinNumOutboundPeers - ( numOutPeers + numDialing )
r . Logger . Info ( "Ensure peers" , "numOutPeers" , numOutPeers , "numDialing" , numDialing , "numToDial" , numToDial )
if numToDial <= 0 {
return
@ -308,14 +332,14 @@ func (r *PEXReactor) ensurePeers() {
if peersCount > 0 {
peer := peers [ rand . Int ( ) % peersCount ] // nolint: gas
r . Logger . Info ( "We need more addresses. Sending pexRequest to random peer" , "peer" , peer )
r . RequestPEX ( peer )
r . RequestAddrs ( peer )
}
}
// If we are not connected to nor dialing anybody, fallback to dialing a seed.
if numOutPeers + numInPeers + numDialing + len ( toDial ) == 0 {
r . Logger . Info ( "No addresses to dial nor connected peers. Falling back to seeds" )
r . dialSeed ( )
r . dialSeeds ( )
}
}
@ -335,7 +359,7 @@ func (r *PEXReactor) checkSeeds() error {
}
// randomly dial seeds until we connect to one or exhaust them
func ( r * PEXReactor ) dialSeed ( ) {
func ( r * PEXReactor ) dialSeeds ( ) {
lSeeds := len ( r . config . Seeds )
if lSeeds == 0 {
return
@ -357,6 +381,116 @@ func (r *PEXReactor) dialSeed() {
r . Switch . Logger . Error ( "Couldn't connect to any seeds" )
}
//----------------------------------------------------------
// Explores the network searching for more peers. (continuous)
// Seed/Crawler Mode causes this node to quickly disconnect
// from peers, except other seed nodes.
func ( r * PEXReactor ) crawlPeersRoutine ( ) {
// Do an initial crawl
r . crawlPeers ( )
// Fire periodically
ticker := time . NewTicker ( defaultCrawlPeersPeriod )
for {
select {
case <- ticker . C :
r . attemptDisconnects ( )
r . crawlPeers ( )
case <- r . Quit :
return
}
}
}
// crawlPeerInfo handles temporary data needed for the
// network crawling performed during seed/crawler mode.
type crawlPeerInfo struct {
// The listening address of a potential peer we learned about
Addr * NetAddress
// The last time we attempt to reach this address
LastAttempt time . Time
// The last time we successfully reached this address
LastSuccess time . Time
}
// oldestFirst implements sort.Interface for []crawlPeerInfo
// based on the LastAttempt field.
type oldestFirst [ ] crawlPeerInfo
func ( of oldestFirst ) Len ( ) int { return len ( of ) }
func ( of oldestFirst ) Swap ( i , j int ) { of [ i ] , of [ j ] = of [ j ] , of [ i ] }
func ( of oldestFirst ) Less ( i , j int ) bool { return of [ i ] . LastAttempt . Before ( of [ j ] . LastAttempt ) }
// getPeersToCrawl returns addresses of potential peers that we wish to validate.
// NOTE: The status information is ordered as described above.
func ( r * PEXReactor ) getPeersToCrawl ( ) [ ] crawlPeerInfo {
var of oldestFirst
// TODO: be more selective
addrs := r . book . ListOfKnownAddresses ( )
for _ , addr := range addrs {
if len ( addr . ID ( ) ) == 0 {
continue // dont use peers without id
}
of = append ( of , crawlPeerInfo {
Addr : addr . Addr ,
LastAttempt : addr . LastAttempt ,
LastSuccess : addr . LastSuccess ,
} )
}
sort . Sort ( of )
return of
}
// crawlPeers will crawl the network looking for new peer addresses. (once)
func ( r * PEXReactor ) crawlPeers ( ) {
peerInfos := r . getPeersToCrawl ( )
now := time . Now ( )
// Use addresses we know of to reach additional peers
for _ , pi := range peerInfos {
// Do not attempt to connect with peers we recently dialed
if now . Sub ( pi . LastAttempt ) < defaultCrawlPeerInterval {
continue
}
// Otherwise, attempt to connect with the known address
_ , err := r . Switch . DialPeerWithAddress ( pi . Addr , false )
if err != nil {
r . book . MarkAttempt ( pi . Addr )
continue
}
}
// Crawl the connected peers asking for more addresses
for _ , pi := range peerInfos {
// We will wait a minimum period of time before crawling peers again
if now . Sub ( pi . LastAttempt ) >= defaultCrawlPeerInterval {
peer := r . Switch . Peers ( ) . Get ( pi . Addr . ID )
if peer != nil {
r . RequestAddrs ( peer )
}
}
}
}
// attemptDisconnects checks if we've been with each peer long enough to disconnect
func ( r * PEXReactor ) attemptDisconnects ( ) {
for _ , peer := range r . Switch . Peers ( ) . List ( ) {
status := peer . Status ( )
if status . Duration < defaultSeedDisconnectWaitPeriod {
continue
}
if peer . IsPersistent ( ) {
continue
}
r . Switch . StopPeerGracefully ( peer )
}
}
//-----------------------------------------------------------------------------
// Messages