@ -10,7 +10,6 @@ import (
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/libs/service"
protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/types"
@ -95,17 +94,10 @@ type ReactorV2 struct {
lastReceivedRequests map [ types . NodeID ] time . Time
// the time when another request will be sent
nextRequestTime time . Time
nextRequestInterval time . Duration
// keep track of how many new peers to existing peers we have received to
// extrapolate the size of the network
newPeers uint32
totalPeers uint32
// discoveryRatio is the inverse ratio of new peers to old peers squared.
// This is multiplied by the minimum duration to calculate how long to wait
// between each request.
discoveryRatio float32
// the total number of unique peers added
totalPeers int
}
// NewReactor returns a reference to a new reactor.
@ -159,6 +151,7 @@ func (r *ReactorV2) OnStop() {
func ( r * ReactorV2 ) processPexCh ( ) {
defer r . pexCh . Close ( )
r . nextRequestInterval = minReceiveRequestInterval
for {
select {
case <- r . closeCh :
@ -235,6 +228,7 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error {
)
}
var numAdded int
for _ , pexAddress := range msg . Addresses {
// no protocol is prefixed so we assume the default (mconn)
peerAddress , err := p2p . ParseNodeAddress (
@ -247,11 +241,11 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error {
logger . Error ( "failed to add PEX address" , "address" , peerAddress , "err" , err )
}
if added {
r . newPeers ++
numAdded ++
logger . Debug ( "added PEX address" , "address" , peerAddress )
}
r . totalPeers ++
}
r . calculateNextRequestTime ( numAdded )
// V2 PEX MESSAGES
case * protop2p . PexRequestV2 :
@ -289,6 +283,7 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error {
)
}
var numAdded int
for _ , pexAddress := range msg . Addresses {
peerAddress , err := p2p . ParseNodeAddress ( pexAddress . URL )
if err != nil {
@ -299,11 +294,11 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error {
logger . Error ( "failed to add V2 PEX address" , "address" , peerAddress , "err" , err )
}
if added {
r . newPeers ++
numAdded ++
logger . Debug ( "added V2 PEX address" , "address" , peerAddress )
}
r . totalPeers ++
}
r . calculateNextRequestTime ( numAdded )
default :
return fmt . Errorf ( "received unknown message: %T" , msg )
@ -409,7 +404,7 @@ func (r *ReactorV2) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
}
func ( r * ReactorV2 ) waitUntilNextRequest ( ) <- chan time . Time {
return time . After ( time . Until ( r . nextRequestTime ) )
return time . After ( r . nextRequestInterval )
}
// sendRequestForPeers pops the first peerID off the list and sends the
@ -421,14 +416,12 @@ func (r *ReactorV2) sendRequestForPeers() {
defer r . mtx . Unlock ( )
if len ( r . availablePeers ) == 0 {
// no peers are available
r . Logger . Debug ( "no available peers to send request to, waiting..." )
r . nextRequestTime = time . Now ( ) . Add ( noAvailablePeersWaitPeriod )
r . Logger . Debug ( "no available peers to send a PEX request to (retrying)" )
return
}
var peerID types . NodeID
// use range to get a random peer.
// Select an arbitrary peer from the available set.
var peerID types . NodeID
for peerID = range r . availablePeers {
break
}
@ -449,55 +442,49 @@ func (r *ReactorV2) sendRequestForPeers() {
// remove the peer from the abvailable peers list and mark it in the requestsSent map
delete ( r . availablePeers , peerID )
r . requestsSent [ peerID ] = struct { } { }
r . calculateNextRequestTime ( )
r . Logger . Debug ( "peer request sent" , "next_request_time" , r . nextRequestTime )
}
// calculateNextRequestTime implements something of a proportional controller
// to estimate how often the reactor should be requesting new peer addresses.
// The dependent variable in this calculation is the ratio of new peers to
// all peers that the reactor receives. The interval is thus calculated as the
// inverse squared. In the beginning, all peers should be new peers.
// We expect this ratio to be near 1 and thus the interval to be as short
// as possible. As the node becomes more familiar with the network the ratio of
// new nodes will plummet to a very small number, meaning the interval expands
// to its upper bound.
// CONTRACT: Must use a write lock as nextRequestTime is updated
func ( r * ReactorV2 ) calculateNextRequestTime ( ) {
// check if the peer store is full. If so then there is no need
// to send peer requests too often
// calculateNextRequestTime selects how long we should wait before attempting
// to send out another request for peer addresses.
//
// This implements a simplified proportional control mechanism to poll more
// often when our knowledge of the network is incomplete, and less often as our
// knowledge grows. To estimate our knowledge of the network, we use the
// fraction of "new" peers (addresses we have not previously seen) to the total
// so far observed. When we first join the network, this fraction will be close
// to 1, meaning most new peers are "new" to us, and as we discover more peers,
// the fraction will go toward zero.
//
// The minimum interval will be minReceiveRequestInterval to ensure we will not
// request from any peer more often than we would allow them to do from us.
func ( r * ReactorV2 ) calculateNextRequestTime ( added int ) {
r . mtx . Lock ( )
defer r . mtx . Unlock ( )
r . totalPeers += added
// If the peer store is nearly full, wait the maximum interval.
if ratio := r . peerManager . PeerRatio ( ) ; ratio >= 0.95 {
r . Logger . Debug ( "peer manager near full ratio, sleeping..." ,
r . Logger . Debug ( "Peer manager is nearly full " ,
"sleep_period" , fullCapacityInterval , "ratio" , ratio )
r . nextRequestTime = time . Now ( ) . Add ( fullCapacityInterval )
r . nextRequestInterval = fullCapacityInterval
return
}
// baseTime represents the shortest interval that we can send peer requests
// in. For example if we have 10 peers and we can't send a message to the
// same peer every 500ms, then we can send a request every 50ms. In practice
// we use a safety margin of 2, ergo 100ms
peers := tmmath . MinInt ( len ( r . availablePeers ) , 50 )
baseTime := minReceiveRequestInterval
if peers > 0 {
baseTime = minReceiveRequestInterval * 2 / time . Duration ( peers )
// If there are no available peers to query, poll less aggressively.
if len ( r . availablePeers ) == 0 {
r . Logger . Debug ( "No available peers to send a PEX request" ,
"sleep_period" , noAvailablePeersWaitPeriod )
r . nextRequestInterval = noAvailablePeersWaitPeriod
return
}
if r . totalPeers > 0 || r . discoveryRatio == 0 {
// find the ratio of new peers. NOTE: We add 1 to both sides to avoid
// divide by zero problems
ratio := float32 ( r . totalPeers + 1 ) / float32 ( r . newPeers + 1 )
// square the ratio in order to get non linear time intervals
// NOTE: The longest possible interval for a network with 100 or more peers
// where a node is connected to 50 of them is 2 minutes.
r . discoveryRatio = ratio * ratio
r . newPeers = 0
r . totalPeers = 0
}
// NOTE: As ratio is always >= 1, discovery ratio is >= 1. Therefore we don't need to worry
// about the next request time being less than the minimum time
r . nextRequestTime = time . Now ( ) . Add ( baseTime * time . Duration ( r . discoveryRatio ) )
// Reaching here, there are available peers to query and the peer store
// still has space. Estimate our knowledge of the network from the latest
// update and choose a new interval.
base := float64 ( minReceiveRequestInterval ) / float64 ( len ( r . availablePeers ) )
multiplier := float64 ( r . totalPeers + 1 ) / float64 ( added + 1 ) // +1 to avert zero division
r . nextRequestInterval = time . Duration ( base * multiplier * multiplier ) + minReceiveRequestInterval
}
func ( r * ReactorV2 ) markPeerRequest ( peer types . NodeID ) error {