diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 8cff2f95b..570832384 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -10,7 +10,6 @@ import ( "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/conn" "github.com/tendermint/tendermint/libs/log" - tmmath "github.com/tendermint/tendermint/libs/math" "github.com/tendermint/tendermint/libs/service" protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p" "github.com/tendermint/tendermint/types" @@ -95,17 +94,10 @@ type ReactorV2 struct { lastReceivedRequests map[types.NodeID]time.Time // the time when another request will be sent - nextRequestTime time.Time + nextRequestInterval time.Duration - // keep track of how many new peers to existing peers we have received to - // extrapolate the size of the network - newPeers uint32 - totalPeers uint32 - - // discoveryRatio is the inverse ratio of new peers to old peers squared. - // This is multiplied by the minimum duration to calculate how long to wait - // between each request. - discoveryRatio float32 + // the total number of unique peers added + totalPeers int } // NewReactor returns a reference to a new reactor. @@ -159,6 +151,7 @@ func (r *ReactorV2) OnStop() { func (r *ReactorV2) processPexCh() { defer r.pexCh.Close() + r.nextRequestInterval = minReceiveRequestInterval for { select { case <-r.closeCh: @@ -235,6 +228,7 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { ) } + var numAdded int for _, pexAddress := range msg.Addresses { // no protocol is prefixed so we assume the default (mconn) peerAddress, err := p2p.ParseNodeAddress( @@ -247,11 +241,11 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { logger.Error("failed to add PEX address", "address", peerAddress, "err", err) } if added { - r.newPeers++ + numAdded++ logger.Debug("added PEX address", "address", peerAddress) } - r.totalPeers++ } + r.calculateNextRequestTime(numAdded) // V2 PEX MESSAGES case *protop2p.PexRequestV2: @@ -289,6 +283,7 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { ) } + var numAdded int for _, pexAddress := range msg.Addresses { peerAddress, err := p2p.ParseNodeAddress(pexAddress.URL) if err != nil { @@ -299,11 +294,11 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { logger.Error("failed to add V2 PEX address", "address", peerAddress, "err", err) } if added { - r.newPeers++ + numAdded++ logger.Debug("added V2 PEX address", "address", peerAddress) } - r.totalPeers++ } + r.calculateNextRequestTime(numAdded) default: return fmt.Errorf("received unknown message: %T", msg) @@ -409,7 +404,7 @@ func (r *ReactorV2) processPeerUpdate(peerUpdate p2p.PeerUpdate) { } func (r *ReactorV2) waitUntilNextRequest() <-chan time.Time { - return time.After(time.Until(r.nextRequestTime)) + return time.After(r.nextRequestInterval) } // sendRequestForPeers pops the first peerID off the list and sends the @@ -421,14 +416,12 @@ func (r *ReactorV2) sendRequestForPeers() { defer r.mtx.Unlock() if len(r.availablePeers) == 0 { // no peers are available - r.Logger.Debug("no available peers to send request to, waiting...") - r.nextRequestTime = time.Now().Add(noAvailablePeersWaitPeriod) - + r.Logger.Debug("no available peers to send a PEX request to (retrying)") return } - var peerID types.NodeID - // use range to get a random peer. + // Select an arbitrary peer from the available set. + var peerID types.NodeID for peerID = range r.availablePeers { break } @@ -449,55 +442,49 @@ func (r *ReactorV2) sendRequestForPeers() { // remove the peer from the abvailable peers list and mark it in the requestsSent map delete(r.availablePeers, peerID) r.requestsSent[peerID] = struct{}{} - - r.calculateNextRequestTime() - r.Logger.Debug("peer request sent", "next_request_time", r.nextRequestTime) } -// calculateNextRequestTime implements something of a proportional controller -// to estimate how often the reactor should be requesting new peer addresses. -// The dependent variable in this calculation is the ratio of new peers to -// all peers that the reactor receives. The interval is thus calculated as the -// inverse squared. In the beginning, all peers should be new peers. -// We expect this ratio to be near 1 and thus the interval to be as short -// as possible. As the node becomes more familiar with the network the ratio of -// new nodes will plummet to a very small number, meaning the interval expands -// to its upper bound. -// CONTRACT: Must use a write lock as nextRequestTime is updated -func (r *ReactorV2) calculateNextRequestTime() { - // check if the peer store is full. If so then there is no need - // to send peer requests too often +// calculateNextRequestTime selects how long we should wait before attempting +// to send out another request for peer addresses. +// +// This implements a simplified proportional control mechanism to poll more +// often when our knowledge of the network is incomplete, and less often as our +// knowledge grows. To estimate our knowledge of the network, we use the +// fraction of "new" peers (addresses we have not previously seen) to the total +// so far observed. When we first join the network, this fraction will be close +// to 1, meaning most new peers are "new" to us, and as we discover more peers, +// the fraction will go toward zero. +// +// The minimum interval will be minReceiveRequestInterval to ensure we will not +// request from any peer more often than we would allow them to do from us. +func (r *ReactorV2) calculateNextRequestTime(added int) { + r.mtx.Lock() + defer r.mtx.Unlock() + + r.totalPeers += added + + // If the peer store is nearly full, wait the maximum interval. if ratio := r.peerManager.PeerRatio(); ratio >= 0.95 { - r.Logger.Debug("peer manager near full ratio, sleeping...", + r.Logger.Debug("Peer manager is nearly full", "sleep_period", fullCapacityInterval, "ratio", ratio) - r.nextRequestTime = time.Now().Add(fullCapacityInterval) + r.nextRequestInterval = fullCapacityInterval return } - // baseTime represents the shortest interval that we can send peer requests - // in. For example if we have 10 peers and we can't send a message to the - // same peer every 500ms, then we can send a request every 50ms. In practice - // we use a safety margin of 2, ergo 100ms - peers := tmmath.MinInt(len(r.availablePeers), 50) - baseTime := minReceiveRequestInterval - if peers > 0 { - baseTime = minReceiveRequestInterval * 2 / time.Duration(peers) + // If there are no available peers to query, poll less aggressively. + if len(r.availablePeers) == 0 { + r.Logger.Debug("No available peers to send a PEX request", + "sleep_period", noAvailablePeersWaitPeriod) + r.nextRequestInterval = noAvailablePeersWaitPeriod + return } - if r.totalPeers > 0 || r.discoveryRatio == 0 { - // find the ratio of new peers. NOTE: We add 1 to both sides to avoid - // divide by zero problems - ratio := float32(r.totalPeers+1) / float32(r.newPeers+1) - // square the ratio in order to get non linear time intervals - // NOTE: The longest possible interval for a network with 100 or more peers - // where a node is connected to 50 of them is 2 minutes. - r.discoveryRatio = ratio * ratio - r.newPeers = 0 - r.totalPeers = 0 - } - // NOTE: As ratio is always >= 1, discovery ratio is >= 1. Therefore we don't need to worry - // about the next request time being less than the minimum time - r.nextRequestTime = time.Now().Add(baseTime * time.Duration(r.discoveryRatio)) + // Reaching here, there are available peers to query and the peer store + // still has space. Estimate our knowledge of the network from the latest + // update and choose a new interval. + base := float64(minReceiveRequestInterval) / float64(len(r.availablePeers)) + multiplier := float64(r.totalPeers+1) / float64(added+1) // +1 to avert zero division + r.nextRequestInterval = time.Duration(base*multiplier*multiplier) + minReceiveRequestInterval } func (r *ReactorV2) markPeerRequest(peer types.NodeID) error { diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index cb1cf117d..d00260587 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -91,7 +91,7 @@ func TestReactorSendsRequestsTooOften(t *testing.T) { peerErr := <-r.pexErrCh require.Error(t, peerErr.Err) require.Empty(t, r.pexOutCh) - require.Contains(t, peerErr.Err.Error(), "peer sent a request too close after a prior one") + require.Contains(t, peerErr.Err.Error(), "sent PEX request too soon") require.Equal(t, badNode, peerErr.NodeID) }