From 153cd6d298786ccc87db6b38ad573988061b5c55 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Wed, 9 Mar 2022 10:22:00 -0800 Subject: [PATCH] p2p: update polling interval calculation for PEX requests The PEX reactor has a simple feedback control mechanism to decide how often to poll peers for peer address updates. The idea is to poll more frequently when knowledge of the network is less, and decrease frequency as knowledge grows. This change solves two problems: 1. It is possible in some cases we may poll a peer "too often" and get dropped by that peer for spamming. 2. The first successful peer update with any content resets the polling timer to a very long time (10m), meaning if we are unlucky in getting an incomplete reply while the network is small, we may not try again for a very long time. This may contribute to difficulties bootstrapping sync. The main change here is to only update the interval when new information is added to the system, and not (as before) whenever a request is sent out to a peer. The rate computation is essentially the same as before, although the code has been a bit simplified, and I consolidated some of the error handling so that we don't have to check in multiple places for the same conditions. Related changes: - Improve error diagnostics for too-soon and overflow conditions. - Clean up state handling in the poll interval computation. - Pin the minimum interval avert a chance of PEX spamming a peer. --- internal/p2p/pex/reactor.go | 218 ++++++++++++++----------------- internal/p2p/pex/reactor_test.go | 4 +- 2 files changed, 102 insertions(+), 120 deletions(-) diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 0c256a4f3..964d85bb9 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -10,7 +10,6 @@ import ( "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/conn" "github.com/tendermint/tendermint/libs/log" - tmmath "github.com/tendermint/tendermint/libs/math" "github.com/tendermint/tendermint/libs/service" protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p" "github.com/tendermint/tendermint/types" @@ -42,7 +41,7 @@ const ( minReceiveRequestInterval = 100 * time.Millisecond // the maximum amount of addresses that can be included in a response - maxAddresses uint16 = 100 + maxAddresses = 100 // How long to wait when there are no peers available before trying again noAvailablePeersWaitPeriod = 1 * time.Second @@ -100,15 +99,8 @@ type Reactor struct { // minReceiveRequestInterval). lastReceivedRequests map[types.NodeID]time.Time - // keep track of how many new peers to existing peers we have received to - // extrapolate the size of the network - newPeers uint32 - totalPeers uint32 - - // discoveryRatio is the inverse ratio of new peers to old peers squared. - // This is multiplied by the minimum duration to calculate how long to wait - // between each request. - discoveryRatio float32 + // the total number of unique peers added + totalPeers int } // NewReactor returns a reference to a new reactor. @@ -156,16 +148,6 @@ func (r *Reactor) OnStop() {} // processPexCh implements a blocking event loop where we listen for p2p // Envelope messages from the pexCh. func (r *Reactor) processPexCh(ctx context.Context) { - timer := time.NewTimer(0) - defer timer.Stop() - - r.mtx.Lock() - var ( - duration = r.calculateNextRequestTime() - err error - ) - r.mtx.Unlock() - incoming := make(chan *p2p.Envelope) go func() { defer close(incoming) @@ -179,36 +161,51 @@ func (r *Reactor) processPexCh(ctx context.Context) { } }() + // Initially, we will request peers quickly to bootstrap. This duration + // will be adjusted upward as knowledge of the network grows. + var nextPeerRequest = minReceiveRequestInterval + + timer := time.NewTimer(0) + defer timer.Stop() + for { - timer.Reset(duration) + timer.Reset(nextPeerRequest) select { case <-ctx.Done(): return - // outbound requests for new peers case <-timer.C: - duration, err = r.sendRequestForPeers(ctx) - if err != nil { + // Send a request for more peer addresses. + if err := r.sendRequestForPeers(ctx); err != nil { return + // TODO(creachadair): Do we really want to stop processing the PEX + // channel just because of an error here? } - // inbound requests for new peers or responses to requests sent by this - // reactor + + // Note we do not update the poll timer upon making a request, only + // when we receive an update that updates our priors. + case envelope, ok := <-incoming: if !ok { - return + return // channel closed } - duration, err = r.handleMessage(ctx, r.pexCh.ID, envelope) + + // A request from another peers, or a response to one of our requests. + dur, err := r.handleMessage(ctx, r.pexCh.ID, envelope) if err != nil { - r.logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err) + r.logger.Error("failed to process message", + "ch_id", r.pexCh.ID, "envelope", envelope, "err", err) if serr := r.pexCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, }); serr != nil { return } + } else if dur != 0 { + // We got a useful result; update the poll timer. + nextPeerRequest = dur } - } } } @@ -228,19 +225,20 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { } // handlePexMessage handles envelopes sent from peers on the PexChannel. +// If an update was received, a new polling interval is returned; otherwise the +// duration is 0. func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) (time.Duration, error) { logger := r.logger.With("peer", envelope.From) switch msg := envelope.Message.(type) { case *protop2p.PexRequest: - // check if the peer hasn't sent a prior request too close to this one - // in time + // Verify that this peer hasn't sent us another request too recently. if err := r.markPeerRequest(envelope.From); err != nil { - return time.Minute, err + return 0, err } - // request peers from the peer manager and parse the NodeAddresses into - // URL strings + // Fetch peers from the peer manager, convert NodeAddresses into URL + // strings, and send them back to the caller. nodeAddresses := r.peerManager.Advertise(envelope.From, maxAddresses) pexAddresses := make([]protop2p.PexAddress, len(nodeAddresses)) for idx, addr := range nodeAddresses { @@ -248,28 +246,24 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) URL: addr.String(), } } - if err := r.pexCh.Send(ctx, p2p.Envelope{ + return 0, r.pexCh.Send(ctx, p2p.Envelope{ To: envelope.From, Message: &protop2p.PexResponse{Addresses: pexAddresses}, - }); err != nil { - return 0, err - } + }) - return time.Second, nil case *protop2p.PexResponse: - // check if the response matches a request that was made to that peer + // Verify that this response corresponds to one of our pending requests. if err := r.markPeerResponse(envelope.From); err != nil { - return time.Minute, err + return 0, err } - // check the size of the response - if len(msg.Addresses) > int(maxAddresses) { - return 10 * time.Minute, fmt.Errorf("peer sent too many addresses (max: %d, got: %d)", - maxAddresses, - len(msg.Addresses), - ) + // Verify that the response does not exceed the safety limit. + if len(msg.Addresses) > maxAddresses { + return 0, fmt.Errorf("Peer sent too many addresses (%d > maxiumum %d)", + len(msg.Addresses), maxAddresses) } + var numAdded int for _, pexAddress := range msg.Addresses { peerAddress, err := p2p.ParseNodeAddress(pexAddress.URL) if err != nil { @@ -278,24 +272,24 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) added, err := r.peerManager.Add(peerAddress) if err != nil { logger.Error("failed to add PEX address", "address", peerAddress, "err", err) + continue } if added { - r.newPeers++ + numAdded++ logger.Debug("added PEX address", "address", peerAddress) } - r.totalPeers++ } - return 10 * time.Minute, nil + return r.calculateNextRequestTime(numAdded), nil + default: - return time.Second, fmt.Errorf("received unknown message: %T", msg) + return 0, fmt.Errorf("received unknown message: %T", msg) } } -// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. -// It will handle errors and any possible panics gracefully. A caller can handle -// any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (duration time.Duration, err error) { +// handleMessage handles an Envelope sent from a peer on the specified Channel. +// This method will convert a panic in message handling as an error. +func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (_ time.Duration, err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -309,14 +303,10 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop r.logger.Debug("received PEX message", "peer", envelope.From) - switch chID { - case p2p.ChannelID(PexChannel): - duration, err = r.handlePexMessage(ctx, envelope) - default: - err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) + if chID == p2p.ChannelID(PexChannel) { + return r.handlePexMessage(ctx, envelope) } - - return + return 0, fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) } // processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we @@ -338,95 +328,87 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { } } -// sendRequestForPeers pops the first peerID off the list and sends the -// peer a request for more peer addresses. The function then moves the -// peer into the requestsSent bucket and calculates when the next request -// time should be -func (r *Reactor) sendRequestForPeers(ctx context.Context) (time.Duration, error) { +// sendRequestForPeers chooses a peer from the set of available peers and sends +// that peer a request for more peer addresses. The chosen peer is moved into +// the requestsSent bucket so that we will not attempt to contact them again +// until they've replied or updated. +func (r *Reactor) sendRequestForPeers(ctx context.Context) error { r.mtx.Lock() defer r.mtx.Unlock() if len(r.availablePeers) == 0 { // no peers are available - r.logger.Debug("no available peers to send request to, waiting...") - return noAvailablePeersWaitPeriod, nil + r.logger.Debug("no available peers to send a PEX request to (retrying)") + return nil } - var peerID types.NodeID - // use range to get a random peer. + // Select an arbitrary peer from the available set. + var peerID types.NodeID for peerID = range r.availablePeers { break } - // send out the pex request if err := r.pexCh.Send(ctx, p2p.Envelope{ To: peerID, Message: &protop2p.PexRequest{}, }); err != nil { - return 0, err + return err } - // remove the peer from the abvailable peers list and mark it in the requestsSent map + // Move the peer from available to pending. delete(r.availablePeers, peerID) r.requestsSent[peerID] = struct{}{} - dur := r.calculateNextRequestTime() - r.logger.Debug("peer request sent", "next_request_time", dur) - return dur, nil + return nil } -// calculateNextRequestTime implements something of a proportional controller -// to estimate how often the reactor should be requesting new peer addresses. -// The dependent variable in this calculation is the ratio of new peers to -// all peers that the reactor receives. The interval is thus calculated as the -// inverse squared. In the beginning, all peers should be new peers. -// We expect this ratio to be near 1 and thus the interval to be as short -// as possible. As the node becomes more familiar with the network the ratio of -// new nodes will plummet to a very small number, meaning the interval expands -// to its upper bound. +// calculateNextRequestTime selects how long we should wait before attempting +// to send out another request for peer addresses. +// +// This implements a simplified proportional control mechanism to poll more +// often when our knowledge of the network is incomplete, and less often as our +// knowledge grows. To estimate our knowledge of the network, we use the +// fraction of "new" peers (addresses we have not previously seen) to the total +// so far observed. When we first join the network, this fraction will be close +// to 1, meaning most new peers are "new" to us, and as we discover more peers, +// the fraction will go toward zero. // -// CONTRACT: The caller must hold r.mtx exclusively when calling this method. -func (r *Reactor) calculateNextRequestTime() time.Duration { - // check if the peer store is full. If so then there is no need - // to send peer requests too often +// The minimum interval will be minReceiveRequestInterval to ensure we will not +// request from any peer more often than we would allow them to do from us. +func (r *Reactor) calculateNextRequestTime(added int) time.Duration { + r.mtx.Lock() + defer r.mtx.Unlock() + + r.totalPeers += added + + // If the peer store is nearly full, wait the maximum interval. if ratio := r.peerManager.PeerRatio(); ratio >= 0.95 { - r.logger.Debug("peer manager near full ratio, sleeping...", + r.logger.Debug("Peer manager is nearly full", "sleep_period", fullCapacityInterval, "ratio", ratio) return fullCapacityInterval } - // baseTime represents the shortest interval that we can send peer requests - // in. For example if we have 10 peers and we can't send a message to the - // same peer every 500ms, then we can send a request every 50ms. In practice - // we use a safety margin of 2, ergo 100ms - peers := tmmath.MinInt(len(r.availablePeers), 50) - baseTime := minReceiveRequestInterval - if peers > 0 { - baseTime = minReceiveRequestInterval * 2 / time.Duration(peers) + // If there are no available peers to query, poll less aggressively. + if len(r.availablePeers) == 0 { + r.logger.Debug("No available peers to send a PEX request", + "sleep_period", noAvailablePeersWaitPeriod) + return noAvailablePeersWaitPeriod } - if r.totalPeers > 0 || r.discoveryRatio == 0 { - // find the ratio of new peers. NOTE: We add 1 to both sides to avoid - // divide by zero problems - ratio := float32(r.totalPeers+1) / float32(r.newPeers+1) - // square the ratio in order to get non linear time intervals - // NOTE: The longest possible interval for a network with 100 or more peers - // where a node is connected to 50 of them is 2 minutes. - r.discoveryRatio = ratio * ratio - r.newPeers = 0 - r.totalPeers = 0 - } - // NOTE: As ratio is always >= 1, discovery ratio is >= 1. Therefore we don't need to worry - // about the next request time being less than the minimum time - return baseTime * time.Duration(r.discoveryRatio) + // Reaching here, there are available peers to query and the peer store + // still has space. Estimate our knowledge of the network from the latest + // update and choose a new interval. + base := float64(minReceiveRequestInterval) / float64(len(r.availablePeers)) + multiplier := float64(r.totalPeers+1) / float64(added+1) // +1 to avert zero division + return time.Duration(base*multiplier*multiplier) + minReceiveRequestInterval } func (r *Reactor) markPeerRequest(peer types.NodeID) error { r.mtx.Lock() defer r.mtx.Unlock() if lastRequestTime, ok := r.lastReceivedRequests[peer]; ok { - if time.Now().Before(lastRequestTime.Add(minReceiveRequestInterval)) { - return fmt.Errorf("peer sent a request too close after a prior one. Minimum interval: %v", - minReceiveRequestInterval) + if d := time.Since(lastRequestTime); d < minReceiveRequestInterval { + return fmt.Errorf("peer %v sent PEX request too soon (%v < minimum %v)", + peer, d, minReceiveRequestInterval) } } r.lastReceivedRequests[peer] = time.Now() diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index 4319cad20..c7a0f5934 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -96,7 +96,7 @@ func TestReactorSendsRequestsTooOften(t *testing.T) { peerErr := <-r.pexErrCh require.Error(t, peerErr.Err) require.Empty(t, r.pexOutCh) - require.Contains(t, peerErr.Err.Error(), "peer sent a request too close after a prior one") + require.Contains(t, peerErr.Err.Error(), "sent PEX request too soon") require.Equal(t, badNode, peerErr.NodeID) } @@ -189,7 +189,7 @@ func TestReactorErrorsOnReceivingTooManyPeers(t *testing.T) { peerErr := <-r.pexErrCh require.Error(t, peerErr.Err) require.Empty(t, r.pexOutCh) - require.Contains(t, peerErr.Err.Error(), "peer sent too many addresses") + require.Contains(t, peerErr.Err.Error(), "Peer sent too many addresses") require.Equal(t, peer.NodeID, peerErr.NodeID) }