@ -10,7 +10,6 @@ import (
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/libs/service"
protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/types"
@ -42,7 +41,7 @@ const (
minReceiveRequestInterval = 100 * time . Millisecond
// the maximum amount of addresses that can be included in a response
maxAddresses uint16 = 100
maxAddresses = 100
// How long to wait when there are no peers available before trying again
noAvailablePeersWaitPeriod = 1 * time . Second
@ -100,15 +99,8 @@ type Reactor struct {
// minReceiveRequestInterval).
lastReceivedRequests map [ types . NodeID ] time . Time
// keep track of how many new peers to existing peers we have received to
// extrapolate the size of the network
newPeers uint32
totalPeers uint32
// discoveryRatio is the inverse ratio of new peers to old peers squared.
// This is multiplied by the minimum duration to calculate how long to wait
// between each request.
discoveryRatio float32
// the total number of unique peers added
totalPeers int
}
// NewReactor returns a reference to a new reactor.
@ -156,16 +148,6 @@ func (r *Reactor) OnStop() {}
// processPexCh implements a blocking event loop where we listen for p2p
// Envelope messages from the pexCh.
func ( r * Reactor ) processPexCh ( ctx context . Context ) {
timer := time . NewTimer ( 0 )
defer timer . Stop ( )
r . mtx . Lock ( )
var (
duration = r . calculateNextRequestTime ( )
err error
)
r . mtx . Unlock ( )
incoming := make ( chan * p2p . Envelope )
go func ( ) {
defer close ( incoming )
@ -179,36 +161,51 @@ func (r *Reactor) processPexCh(ctx context.Context) {
}
} ( )
// Initially, we will request peers quickly to bootstrap. This duration
// will be adjusted upward as knowledge of the network grows.
var nextPeerRequest = minReceiveRequestInterval
timer := time . NewTimer ( 0 )
defer timer . Stop ( )
for {
timer . Reset ( duration )
timer . Reset ( nextPeerRequest )
select {
case <- ctx . Done ( ) :
return
// outbound requests for new peers
case <- timer . C :
duration , err = r . sendRequestForPeers ( ctx )
if err != nil {
// Send a request for more peer addresses.
if err := r . sendRequestForPeers ( ctx ) ; err != nil {
return
// TODO(creachadair): Do we really want to stop processing the PEX
// channel just because of an error here?
}
// inbound requests for new peers or responses to requests sent by this
// reactor
// Note we do not update the poll timer upon making a request, only
// when we receive an update that updates our priors.
case envelope , ok := <- incoming :
if ! ok {
return
return // channel closed
}
duration , err = r . handleMessage ( ctx , r . pexCh . ID , envelope )
// A request from another peer, or a response to one of our requests.
dur , err := r . handleMessage ( ctx , r . pexCh . ID , envelope )
if err != nil {
r . logger . Error ( "failed to process message" , "ch_id" , r . pexCh . ID , "envelope" , envelope , "err" , err )
r . logger . Error ( "failed to process message" ,
"ch_id" , r . pexCh . ID , "envelope" , envelope , "err" , err )
if serr := r . pexCh . SendError ( ctx , p2p . PeerError {
NodeID : envelope . From ,
Err : err ,
} ) ; serr != nil {
return
}
} else if dur != 0 {
// We got a useful result; update the poll timer.
nextPeerRequest = dur
}
}
}
}
@ -228,19 +225,20 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
}
// handlePexMessage handles envelopes sent from peers on the PexChannel.
// If an update was received, a new polling interval is returned; otherwise the
// duration is 0.
func ( r * Reactor ) handlePexMessage ( ctx context . Context , envelope * p2p . Envelope ) ( time . Duration , error ) {
logger := r . logger . With ( "peer" , envelope . From )
switch msg := envelope . Message . ( type ) {
case * protop2p . PexRequest :
// check if the peer hasn't sent a prior request too close to this one
// in time
// Verify that this peer hasn't sent us another request too recently.
if err := r . markPeerRequest ( envelope . From ) ; err != nil {
return time . Minute , err
return 0 , err
}
// request peers from the peer manager and parse the NodeAddresses into
// URL strings
// Fetch peers from the peer manager, convert NodeAddresses into URL
// strings, and send them back to the caller.
nodeAddresses := r . peerManager . Advertise ( envelope . From , maxAddresses )
pexAddresses := make ( [ ] protop2p . PexAddress , len ( nodeAddresses ) )
for idx , addr := range nodeAddresses {
@ -248,28 +246,24 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope)
URL : addr . String ( ) ,
}
}
if err := r . pexCh . Send ( ctx , p2p . Envelope {
return 0 , r . pexCh . Send ( ctx , p2p . Envelope {
To : envelope . From ,
Message : & protop2p . PexResponse { Addresses : pexAddresses } ,
} ) ; err != nil {
return 0 , err
}
} )
return time . Second , nil
case * protop2p . PexResponse :
// check if the response matches a request that was made to that peer
// Verify that this response corresponds to one of our pending requests.
if err := r . markPeerResponse ( envelope . From ) ; err != nil {
return time . Minute , err
return 0 , err
}
// check the size of the response
if len ( msg . Addresses ) > int ( maxAddresses ) {
return 10 * time . Minute , fmt . Errorf ( "peer sent too many addresses (max: %d, got: %d)" ,
maxAddresses ,
len ( msg . Addresses ) ,
)
// Verify that the response does not exceed the safety limit.
if len ( msg . Addresses ) > maxAddresses {
return 0 , fmt . Errorf ( "peer sent too many addresses (%d > maxiumum %d)" ,
len ( msg . Addresses ) , maxAddresses )
}
var numAdded int
for _ , pexAddress := range msg . Addresses {
peerAddress , err := p2p . ParseNodeAddress ( pexAddress . URL )
if err != nil {
@ -278,24 +272,24 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope)
added , err := r . peerManager . Add ( peerAddress )
if err != nil {
logger . Error ( "failed to add PEX address" , "address" , peerAddress , "err" , err )
continue
}
if added {
r . newPeers ++
numAdded ++
logger . Debug ( "added PEX address" , "address" , peerAddress )
}
r . totalPeers ++
}
return 10 * time . Minute , nil
return r . calculateNextRequestTime ( numAdded ) , nil
default :
return time . Second , fmt . Errorf ( "received unknown message: %T" , msg )
return 0 , fmt . Errorf ( "received unknown message: %T" , msg )
}
}
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func ( r * Reactor ) handleMessage ( ctx context . Context , chID p2p . ChannelID , envelope * p2p . Envelope ) ( duration time . Duration , err error ) {
// handleMessage handles an Envelope sent from a peer on the specified Channel.
// This method will convert a panic in message handling as an error.
func ( r * Reactor ) handleMessage ( ctx context . Context , chID p2p . ChannelID , envelope * p2p . Envelope ) ( _ time . Duration , err error ) {
defer func ( ) {
if e := recover ( ) ; e != nil {
err = fmt . Errorf ( "panic in processing message: %v" , e )
@ -309,14 +303,10 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
r . logger . Debug ( "received PEX message" , "peer" , envelope . From )
switch chID {
case p2p . ChannelID ( PexChannel ) :
duration , err = r . handlePexMessage ( ctx , envelope )
default :
err = fmt . Errorf ( "unknown channel ID (%d) for envelope (%v)" , chID , envelope )
if chID == p2p . ChannelID ( PexChannel ) {
return r . handlePexMessage ( ctx , envelope )
}
return
return 0 , fmt . Errorf ( "unknown channel ID (%d) for envelope (%v)" , chID , envelope )
}
// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we
@ -338,95 +328,87 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
}
}
// sendRequestForPeers pops the first peerID off the list and sends the
// peer a request for more peer addresses. The function then moves the
// peer into the requestsSent bucket and calculates when the next request
// time should be
func ( r * Reactor ) sendRequestForPeers ( ctx context . Context ) ( time . Duration , error ) {
// sendRequestForPeers chooses a peer from the set of available peers and sends
// that peer a request for more peer addresses. The chosen peer is moved into
// the requestsSent bucket so that we will not attempt to contact them again
// until they've replied or updated.
func ( r * Reactor ) sendRequestForPeers ( ctx context . Context ) error {
r . mtx . Lock ( )
defer r . mtx . Unlock ( )
if len ( r . availablePeers ) == 0 {
// no peers are available
r . logger . Debug ( "no available peers to send request to, waiting... " )
return noAvailablePeersWaitPeriod , nil
r . logger . Debug ( "no available peers to send a PEX request to (retrying) " )
return nil
}
var peerID types . NodeID
// use range to get a random peer.
// Select an arbitrary peer from the available set.
var peerID types . NodeID
for peerID = range r . availablePeers {
break
}
// send out the pex request
if err := r . pexCh . Send ( ctx , p2p . Envelope {
To : peerID ,
Message : & protop2p . PexRequest { } ,
} ) ; err != nil {
return 0 , err
return err
}
// remove the peer from the abvailable peers list and mark it in the requestsSent map
// Move the peer from available to pending.
delete ( r . availablePeers , peerID )
r . requestsSent [ peerID ] = struct { } { }
dur := r . calculateNextRequestTime ( )
r . logger . Debug ( "peer request sent" , "next_request_time" , dur )
return dur , nil
return nil
}
// calculateNextRequestTime implements something of a proportional controller
// to estimate how often the reactor should be requesting new peer addresses.
// The dependent variable in this calculation is the ratio of new peers to
// all peers that the reactor receives. The interval is thus calculated as the
// inverse squared. In the beginning, all peers should be new peers.
// We expect this ratio to be near 1 and thus the interval to be as short
// as possible. As the node becomes more familiar with the network the ratio of
// new nodes will plummet to a very small number, meaning the interval expands
// to its upper bound.
// calculateNextRequestTime selects how long we should wait before attempting
// to send out another request for peer addresses.
//
// This implements a simplified proportional control mechanism to poll more
// often when our knowledge of the network is incomplete, and less often as our
// knowledge grows. To estimate our knowledge of the network, we use the
// fraction of "new" peers (addresses we have not previously seen) to the total
// so far observed. When we first join the network, this fraction will be close
// to 1, meaning most new peers are "new" to us, and as we discover more peers,
// the fraction will go toward zero.
//
// CONTRACT: The caller must hold r.mtx exclusively when calling this method.
func ( r * Reactor ) calculateNextRequestTime ( ) time . Duration {
// check if the peer store is full. If so then there is no need
// to send peer requests too often
// The minimum interval will be minReceiveRequestInterval to ensure we will not
// request from any peer more often than we would allow them to do from us.
func ( r * Reactor ) calculateNextRequestTime ( added int ) time . Duration {
r . mtx . Lock ( )
defer r . mtx . Unlock ( )
r . totalPeers += added
// If the peer store is nearly full, wait the maximum interval.
if ratio := r . peerManager . PeerRatio ( ) ; ratio >= 0.95 {
r . logger . Debug ( "peer manager near full ratio, sleeping..." ,
r . logger . Debug ( "Peer manager is nearly full " ,
"sleep_period" , fullCapacityInterval , "ratio" , ratio )
return fullCapacityInterval
}
// baseTime represents the shortest interval that we can send peer requests
// in. For example if we have 10 peers and we can't send a message to the
// same peer every 500ms, then we can send a request every 50ms. In practice
// we use a safety margin of 2, ergo 100ms
peers := tmmath . MinInt ( len ( r . availablePeers ) , 50 )
baseTime := minReceiveRequestInterval
if peers > 0 {
baseTime = minReceiveRequestInterval * 2 / time . Duration ( peers )
// If there are no available peers to query, poll less aggressively.
if len ( r . availablePeers ) == 0 {
r . logger . Debug ( "No available peers to send a PEX request" ,
"sleep_period" , noAvailablePeersWaitPeriod )
return noAvailablePeersWaitPeriod
}
if r . totalPeers > 0 || r . discoveryRatio == 0 {
// find the ratio of new peers. NOTE: We add 1 to both sides to avoid
// divide by zero problems
ratio := float32 ( r . totalPeers + 1 ) / float32 ( r . newPeers + 1 )
// square the ratio in order to get non linear time intervals
// NOTE: The longest possible interval for a network with 100 or more peers
// where a node is connected to 50 of them is 2 minutes.
r . discoveryRatio = ratio * ratio
r . newPeers = 0
r . totalPeers = 0
}
// NOTE: As ratio is always >= 1, discovery ratio is >= 1. Therefore we don't need to worry
// about the next request time being less than the minimum time
return baseTime * time . Duration ( r . discoveryRatio )
// Reaching here, there are available peers to query and the peer store
// still has space. Estimate our knowledge of the network from the latest
// update and choose a new interval.
base := float64 ( minReceiveRequestInterval ) / float64 ( len ( r . availablePeers ) )
multiplier := float64 ( r . totalPeers + 1 ) / float64 ( added + 1 ) // +1 to avert zero division
return time . Duration ( base * multiplier * multiplier ) + minReceiveRequestInterval
}
func ( r * Reactor ) markPeerRequest ( peer types . NodeID ) error {
r . mtx . Lock ( )
defer r . mtx . Unlock ( )
if lastRequestTime , ok := r . lastReceivedRequests [ peer ] ; ok {
if time . Now ( ) . Before ( lastRequestTime . Add ( minReceiveRequestInterval ) ) {
return fmt . Errorf ( "peer sent a request too close after a prior one. Minimum interval: %v " ,
minReceiveRequestInterval )
if d := time . Since ( lastRequestTime ) ; d < minReceiveRequestInterval {
return fmt . Errorf ( "peer %v sent PEX request too soon (%v < minimum %v)" ,
peer , d , minReceiveRequestInterval )
}
}
r . lastReceivedRequests [ peer ] = time . Now ( )