@ -3,7 +3,6 @@ package pex
import (
"fmt"
"reflect"
"sort"
"sync"
"time"
@ -35,16 +34,11 @@ const (
// Seed/Crawler constants
// We want seeds to only advertise good peers. Therefore they should wait at
// least as long as we expect it to take for a peer to become good before
// disconnecting.
// see consensus/reactor.go: blocksToContributeToBecomeGoodPeer
// 10000 blocks assuming 1s blocks ~ 2.7 hours.
defaultSeedDisconnectWaitPeriod = 3 * time . Hour
// minTimeBetweenCrawls is a minimum time between attempts to crawl a peer.
minTimeBetweenCrawls = 2 * time . Minute
defaultCrawlPeerInterval = 2 * time . Minute // don't redial for this. TODO: back-off. what for?
defaultCrawlPeersPeriod = 30 * time . Second // check some peers every this
// check some peers every this
crawlPeerPeriod = 30 * time . Second
maxAttemptsToDial = 16 // ~ 35h in total (last attempt - 18h)
@ -77,6 +71,9 @@ type PEXReactor struct {
seedAddrs [ ] * p2p . NetAddress
attemptsToDial sync . Map // address (string) -> {number of attempts (int), last time dialed (time.Time)}
// seed/crawled mode fields
crawlPeerInfos map [ p2p . ID ] crawlPeerInfo
}
func ( r * PEXReactor ) minReceiveRequestInterval ( ) time . Duration {
@ -90,6 +87,11 @@ type PEXReactorConfig struct {
// Seed/Crawler mode
SeedMode bool
// We want seeds to only advertise good peers. Therefore they should wait at
// least as long as we expect it to take for a peer to become good before
// disconnecting.
SeedDisconnectWaitPeriod time . Duration
// Seeds is a list of addresses reactor may use
// if it can't connect to peers in the addrbook.
Seeds [ ] string
@ -108,6 +110,7 @@ func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
ensurePeersPeriod : defaultEnsurePeersPeriod ,
requestsSent : cmn . NewCMap ( ) ,
lastReceivedRequests : cmn . NewCMap ( ) ,
crawlPeerInfos : make ( map [ p2p . ID ] crawlPeerInfo ) ,
}
r . BaseReactor = * p2p . NewBaseReactor ( "PEXReactor" , r )
return r
@ -167,12 +170,18 @@ func (r *PEXReactor) AddPeer(p Peer) {
}
} else {
// inbound peer is its own source
addr := p . SocketAddr ( )
addr , err := p . NodeInfo ( ) . NetAddress ( )
if err != nil {
r . Logger . Error ( "Failed to get peer NetAddress" , "err" , err , "peer" , p )
return
}
// Make it explicit that addr and src are the same for an inbound peer.
src := addr
// add to book. dont RequestAddrs right away because
// we don't trust inbound as much - let ensurePeersRoutine handle it.
err := r . book . AddAddress ( addr , src )
err = r . book . AddAddress ( addr , src )
r . logErrAddrBook ( err )
}
}
@ -309,7 +318,10 @@ func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
}
r . requestsSent . Delete ( id )
srcAddr := src . SocketAddr ( )
srcAddr , err := src . NodeInfo ( ) . NetAddress ( )
if err != nil {
return err
}
for _ , netAddr := range addrs {
// Validate netAddr. Disconnect from a peer if it sends us invalid data.
if netAddr == nil {
@ -363,9 +375,9 @@ func (r *PEXReactor) ensurePeersRoutine() {
)
// Randomize first round of communication to avoid thundering herd.
// If no potential p eers are present directly start connecting so we guarantee
// swift s etup with the help of configured seeds.
if r . hasPotentialPeers ( ) {
// If no peers are present directly start connecting so we guarantee swift
// setup with the help of configured seeds.
if r . nodeHasSomePeersOrDialingAny ( ) {
time . Sleep ( time . Duration ( jitter ) )
}
@ -493,23 +505,26 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) {
err := r . Switch . DialPeerWithAddress ( addr , false )
if err != nil {
if _ , ok := err . ( p2p . ErrCurrentlyDialingOrExistingAddress ) ; ok {
return
}
r . Logger . Error ( "Dialing failed" , "addr" , addr , "err" , err , "attempts" , attempts )
// TODO: detect more "bad peer" scenarios
markAddrInBookBasedOnErr ( addr , r . book , err )
if _ , ok := err . ( p2p . ErrSwitchAuthenticationFailure ) ; ok {
r . book . MarkBad ( addr )
r . attemptsToDial . Delete ( addr . DialString ( ) )
} else {
r . book . MarkAttempt ( addr )
// FIXME: if the addr is going to be removed from the addrbook (hard to
// tell at this point), we need to Delete it from attemptsToDial, not
// record another attempt.
// record attempt
r . attemptsToDial . Store ( addr . DialString ( ) , _attemptsToDial { attempts + 1 , time . Now ( ) } )
}
} else {
// cleanup any history
r . attemptsToDial . Delete ( addr . DialString ( ) )
return
}
// cleanup any history
r . attemptsToDial . Delete ( addr . DialString ( ) )
}
// checkSeeds checks that addresses are well formed.
@ -568,101 +583,92 @@ func (r *PEXReactor) AttemptsToDial(addr *p2p.NetAddress) int {
// from peers, except other seed nodes.
func ( r * PEXReactor ) crawlPeersRoutine ( ) {
// Do an initial crawl
r . crawlPeers ( )
r . crawlPeers ( r . book . GetSelection ( ) )
// Fire periodically
ticker := time . NewTicker ( defaultCrawlPeers Period)
ticker := time . NewTicker ( crawlPeer Period)
for {
select {
case <- ticker . C :
r . attemptDisconnects ( )
r . crawlPeers ( )
r . crawlPeers ( r . book . GetSelection ( ) )
r . cleanupCrawlPeerInfos ( )
case <- r . Quit ( ) :
return
}
}
}
// hasPotentialPeers indicates if there is a potential peer to connect to, by
// consulting the Switch as well as the AddrBook .
func ( r * PEXReactor ) hasPotentialPeers ( ) bool {
// nodeHasSomePeersOrDialingAny returns true if the node is connected to some
// peers or dialing them currently .
func ( r * PEXReactor ) nodeHasSomePeersOrDialingAny ( ) bool {
out , in , dial := r . Switch . NumPeers ( )
return out + in + dial > 0 && len ( r . book . ListOfKnownAddresses ( ) ) > 0
return out + in + dial > 0
}
// crawlPeerInfo handles temporary data needed for the
// network crawling performed during seed/crawler mode.
// crawlPeerInfo handles temporary data needed for the network crawling
// performed during seed/crawler mode.
type crawlPeerInfo struct {
// The listening address of a potential peer we learned about
Addr * p2p . NetAddress
// The last time we attempt to reach this address
LastAttempt time . Time
// The last time we successfully reached this address
LastSuccess time . Time
Addr * p2p . NetAddress ` json:"addr" `
// The last time we crawled the peer or attempted to do so.
LastCrawled time . Time ` json:"last_crawled" `
}
// oldestFirst implements sort.Interface for []crawlPeerInfo
// based on the LastAttempt field.
type oldestFirst [ ] crawlPeerInfo
func ( of oldestFirst ) Len ( ) int { return len ( of ) }
func ( of oldestFirst ) Swap ( i , j int ) { of [ i ] , of [ j ] = of [ j ] , of [ i ] }
func ( of oldestFirst ) Less ( i , j int ) bool { return of [ i ] . LastAttempt . Before ( of [ j ] . LastAttempt ) }
// crawlPeers will crawl the network looking for new peer addresses.
func ( r * PEXReactor ) crawlPeers ( addrs [ ] * p2p . NetAddress ) {
now := time . Now ( )
// getPeersToCrawl returns addresses of potential peers that we wish to validate.
// NOTE: The status information is ordered as described above.
func ( r * PEXReactor ) getPeersToCrawl ( ) [ ] crawlPeerInfo {
// TODO: be more selective
addrs := r . book . ListOfKnownAddresses ( )
of := make ( oldestFirst , 0 , len ( addrs ) )
for _ , addr := range addrs {
if len ( addr . ID ( ) ) == 0 {
continue // dont use peers without id
}
of = append ( of , crawlPeerInfo {
Addr : addr . Addr ,
LastAttempt : addr . LastAttempt ,
LastSuccess : addr . LastSuccess ,
} )
}
sort . Sort ( of )
return of
}
// crawlPeers will crawl the network looking for new peer addresses. (once)
func ( r * PEXReactor ) crawlPeers ( ) {
peerInfos := r . getPeersToCrawl ( )
peerInfo , ok := r . crawlPeerInfos [ addr . ID ]
now := time . Now ( )
// Use addresses we know of to reach additional peers
for _ , pi := range peerInfos {
// Do not attempt to connect with peers we recently dialed
if now . Sub ( pi . LastAttempt ) < defaultCrawlPeerInterval {
// Do not attempt to connect with peers we recently crawled.
if ok && now . Sub ( peerInfo . LastCrawled ) < minTimeBetweenCrawls {
continue
}
// Otherwise, attempt to connect with the known address
err := r . Switch . DialPeerWithAddress ( pi . Addr , false )
// Record crawling attempt.
r . crawlPeerInfos [ addr . ID ] = crawlPeerInfo {
Addr : addr ,
LastCrawled : now ,
}
err := r . Switch . DialPeerWithAddress ( addr , false )
if err != nil {
r . book . MarkAttempt ( pi . Addr )
if _ , ok := err . ( p2p . ErrCurrentlyDialingOrExistingAddress ) ; ok {
continue
}
r . Logger . Error ( "Dialing failed" , "addr" , addr , "err" , err )
markAddrInBookBasedOnErr ( addr , r . book , err )
continue
}
// Ask for more addresses
peer := r . Switch . Peers ( ) . Get ( pi . Addr . ID )
peer := r . Switch . Peers ( ) . Get ( a ddr. ID )
if peer != nil {
r . RequestAddrs ( peer )
}
}
}
func ( r * PEXReactor ) cleanupCrawlPeerInfos ( ) {
for id , info := range r . crawlPeerInfos {
// If we did not crawl a peer for 24 hours, it means the peer was removed
// from the addrbook => remove
//
// 10000 addresses / maxGetSelection = 40 cycles to get all addresses in
// the ideal case,
// 40 * crawlPeerPeriod ~ 20 minutes
if time . Since ( info . LastCrawled ) > 24 * time . Hour {
delete ( r . crawlPeerInfos , id )
}
}
}
// attemptDisconnects checks if we've been with each peer long enough to disconnect
func ( r * PEXReactor ) attemptDisconnects ( ) {
for _ , peer := range r . Switch . Peers ( ) . List ( ) {
if peer . Status ( ) . Duration < defaultSeedDisconnectWaitPeriod {
if peer . Status ( ) . Duration < r . config . SeedDisconnectWaitPeriod {
continue
}
if peer . IsPersistent ( ) {
@ -672,6 +678,16 @@ func (r *PEXReactor) attemptDisconnects() {
}
}
func markAddrInBookBasedOnErr ( addr * p2p . NetAddress , book AddrBook , err error ) {
// TODO: detect more "bad peer" scenarios
switch err . ( type ) {
case p2p . ErrSwitchAuthenticationFailure :
book . MarkBad ( addr )
default :
book . MarkAttempt ( addr )
}
}
//-----------------------------------------------------------------------------
// Messages