diff --git a/go.mod b/go.mod index ec8a27270..80db11417 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/rs/cors v1.8.2 github.com/rs/zerolog v1.26.1 github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa - github.com/spf13/cobra v1.3.0 + github.com/spf13/cobra v1.4.0 github.com/spf13/viper v1.10.1 github.com/stretchr/testify v1.7.0 github.com/tendermint/tm-db v0.6.6 diff --git a/go.sum b/go.sum index 64f63a14b..5c1ebecc9 100644 --- a/go.sum +++ b/go.sum @@ -940,8 +940,9 @@ github.com/spf13/cast v1.4.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkU github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= -github.com/spf13/cobra v1.3.0 h1:R7cSvGu+Vv+qX0gW5R/85dx2kmmJT5z5NM8ifdYjdn0= github.com/spf13/cobra v1.3.0/go.mod h1:BrRVncBjOJa/eUcVVm9CE+oC6as8k+VYr4NY7WCi9V4= +github.com/spf13/cobra v1.4.0 h1:y+wJpx64xcgO1V+RcnwW0LEHxTKRi2ZDPSBjWnrg88Q= +github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 0c256a4f3..3804552d9 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -10,7 +10,6 @@ import ( "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/conn" "github.com/tendermint/tendermint/libs/log" - tmmath "github.com/tendermint/tendermint/libs/math" "github.com/tendermint/tendermint/libs/service" protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p" "github.com/tendermint/tendermint/types" @@ -42,7 +41,7 @@ const ( minReceiveRequestInterval = 100 * time.Millisecond // the maximum amount of addresses that can be included in a response - maxAddresses uint16 = 100 + maxAddresses = 100 // How long to wait when there are no peers available before trying again noAvailablePeersWaitPeriod = 1 * time.Second @@ -100,15 +99,8 @@ type Reactor struct { // minReceiveRequestInterval). lastReceivedRequests map[types.NodeID]time.Time - // keep track of how many new peers to existing peers we have received to - // extrapolate the size of the network - newPeers uint32 - totalPeers uint32 - - // discoveryRatio is the inverse ratio of new peers to old peers squared. - // This is multiplied by the minimum duration to calculate how long to wait - // between each request. - discoveryRatio float32 + // the total number of unique peers added + totalPeers int } // NewReactor returns a reference to a new reactor. @@ -156,16 +148,6 @@ func (r *Reactor) OnStop() {} // processPexCh implements a blocking event loop where we listen for p2p // Envelope messages from the pexCh. func (r *Reactor) processPexCh(ctx context.Context) { - timer := time.NewTimer(0) - defer timer.Stop() - - r.mtx.Lock() - var ( - duration = r.calculateNextRequestTime() - err error - ) - r.mtx.Unlock() - incoming := make(chan *p2p.Envelope) go func() { defer close(incoming) @@ -179,36 +161,51 @@ func (r *Reactor) processPexCh(ctx context.Context) { } }() + // Initially, we will request peers quickly to bootstrap. This duration + // will be adjusted upward as knowledge of the network grows. + var nextPeerRequest = minReceiveRequestInterval + + timer := time.NewTimer(0) + defer timer.Stop() + for { - timer.Reset(duration) + timer.Reset(nextPeerRequest) select { case <-ctx.Done(): return - // outbound requests for new peers case <-timer.C: - duration, err = r.sendRequestForPeers(ctx) - if err != nil { + // Send a request for more peer addresses. + if err := r.sendRequestForPeers(ctx); err != nil { return + // TODO(creachadair): Do we really want to stop processing the PEX + // channel just because of an error here? } - // inbound requests for new peers or responses to requests sent by this - // reactor + + // Note we do not update the poll timer upon making a request, only + // when we receive an update that updates our priors. + case envelope, ok := <-incoming: if !ok { - return + return // channel closed } - duration, err = r.handleMessage(ctx, r.pexCh.ID, envelope) + + // A request from another peer, or a response to one of our requests. + dur, err := r.handleMessage(ctx, r.pexCh.ID, envelope) if err != nil { - r.logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err) + r.logger.Error("failed to process message", + "ch_id", r.pexCh.ID, "envelope", envelope, "err", err) if serr := r.pexCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, }); serr != nil { return } + } else if dur != 0 { + // We got a useful result; update the poll timer. + nextPeerRequest = dur } - } } } @@ -228,19 +225,20 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { } // handlePexMessage handles envelopes sent from peers on the PexChannel. +// If an update was received, a new polling interval is returned; otherwise the +// duration is 0. func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) (time.Duration, error) { logger := r.logger.With("peer", envelope.From) switch msg := envelope.Message.(type) { case *protop2p.PexRequest: - // check if the peer hasn't sent a prior request too close to this one - // in time + // Verify that this peer hasn't sent us another request too recently. if err := r.markPeerRequest(envelope.From); err != nil { - return time.Minute, err + return 0, err } - // request peers from the peer manager and parse the NodeAddresses into - // URL strings + // Fetch peers from the peer manager, convert NodeAddresses into URL + // strings, and send them back to the caller. nodeAddresses := r.peerManager.Advertise(envelope.From, maxAddresses) pexAddresses := make([]protop2p.PexAddress, len(nodeAddresses)) for idx, addr := range nodeAddresses { @@ -248,28 +246,24 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) URL: addr.String(), } } - if err := r.pexCh.Send(ctx, p2p.Envelope{ + return 0, r.pexCh.Send(ctx, p2p.Envelope{ To: envelope.From, Message: &protop2p.PexResponse{Addresses: pexAddresses}, - }); err != nil { - return 0, err - } + }) - return time.Second, nil case *protop2p.PexResponse: - // check if the response matches a request that was made to that peer + // Verify that this response corresponds to one of our pending requests. if err := r.markPeerResponse(envelope.From); err != nil { - return time.Minute, err + return 0, err } - // check the size of the response - if len(msg.Addresses) > int(maxAddresses) { - return 10 * time.Minute, fmt.Errorf("peer sent too many addresses (max: %d, got: %d)", - maxAddresses, - len(msg.Addresses), - ) + // Verify that the response does not exceed the safety limit. + if len(msg.Addresses) > maxAddresses { + return 0, fmt.Errorf("peer sent too many addresses (%d > maxiumum %d)", + len(msg.Addresses), maxAddresses) } + var numAdded int for _, pexAddress := range msg.Addresses { peerAddress, err := p2p.ParseNodeAddress(pexAddress.URL) if err != nil { @@ -278,24 +272,24 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) added, err := r.peerManager.Add(peerAddress) if err != nil { logger.Error("failed to add PEX address", "address", peerAddress, "err", err) + continue } if added { - r.newPeers++ + numAdded++ logger.Debug("added PEX address", "address", peerAddress) } - r.totalPeers++ } - return 10 * time.Minute, nil + return r.calculateNextRequestTime(numAdded), nil + default: - return time.Second, fmt.Errorf("received unknown message: %T", msg) + return 0, fmt.Errorf("received unknown message: %T", msg) } } -// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. -// It will handle errors and any possible panics gracefully. A caller can handle -// any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (duration time.Duration, err error) { +// handleMessage handles an Envelope sent from a peer on the specified Channel. +// This method will convert a panic in message handling as an error. +func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (_ time.Duration, err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -309,14 +303,10 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop r.logger.Debug("received PEX message", "peer", envelope.From) - switch chID { - case p2p.ChannelID(PexChannel): - duration, err = r.handlePexMessage(ctx, envelope) - default: - err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) + if chID == p2p.ChannelID(PexChannel) { + return r.handlePexMessage(ctx, envelope) } - - return + return 0, fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) } // processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we @@ -338,95 +328,87 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { } } -// sendRequestForPeers pops the first peerID off the list and sends the -// peer a request for more peer addresses. The function then moves the -// peer into the requestsSent bucket and calculates when the next request -// time should be -func (r *Reactor) sendRequestForPeers(ctx context.Context) (time.Duration, error) { +// sendRequestForPeers chooses a peer from the set of available peers and sends +// that peer a request for more peer addresses. The chosen peer is moved into +// the requestsSent bucket so that we will not attempt to contact them again +// until they've replied or updated. +func (r *Reactor) sendRequestForPeers(ctx context.Context) error { r.mtx.Lock() defer r.mtx.Unlock() if len(r.availablePeers) == 0 { // no peers are available - r.logger.Debug("no available peers to send request to, waiting...") - return noAvailablePeersWaitPeriod, nil + r.logger.Debug("no available peers to send a PEX request to (retrying)") + return nil } - var peerID types.NodeID - // use range to get a random peer. + // Select an arbitrary peer from the available set. + var peerID types.NodeID for peerID = range r.availablePeers { break } - // send out the pex request if err := r.pexCh.Send(ctx, p2p.Envelope{ To: peerID, Message: &protop2p.PexRequest{}, }); err != nil { - return 0, err + return err } - // remove the peer from the abvailable peers list and mark it in the requestsSent map + // Move the peer from available to pending. delete(r.availablePeers, peerID) r.requestsSent[peerID] = struct{}{} - dur := r.calculateNextRequestTime() - r.logger.Debug("peer request sent", "next_request_time", dur) - return dur, nil + return nil } -// calculateNextRequestTime implements something of a proportional controller -// to estimate how often the reactor should be requesting new peer addresses. -// The dependent variable in this calculation is the ratio of new peers to -// all peers that the reactor receives. The interval is thus calculated as the -// inverse squared. In the beginning, all peers should be new peers. -// We expect this ratio to be near 1 and thus the interval to be as short -// as possible. As the node becomes more familiar with the network the ratio of -// new nodes will plummet to a very small number, meaning the interval expands -// to its upper bound. +// calculateNextRequestTime selects how long we should wait before attempting +// to send out another request for peer addresses. +// +// This implements a simplified proportional control mechanism to poll more +// often when our knowledge of the network is incomplete, and less often as our +// knowledge grows. To estimate our knowledge of the network, we use the +// fraction of "new" peers (addresses we have not previously seen) to the total +// so far observed. When we first join the network, this fraction will be close +// to 1, meaning most new peers are "new" to us, and as we discover more peers, +// the fraction will go toward zero. // -// CONTRACT: The caller must hold r.mtx exclusively when calling this method. -func (r *Reactor) calculateNextRequestTime() time.Duration { - // check if the peer store is full. If so then there is no need - // to send peer requests too often +// The minimum interval will be minReceiveRequestInterval to ensure we will not +// request from any peer more often than we would allow them to do from us. +func (r *Reactor) calculateNextRequestTime(added int) time.Duration { + r.mtx.Lock() + defer r.mtx.Unlock() + + r.totalPeers += added + + // If the peer store is nearly full, wait the maximum interval. if ratio := r.peerManager.PeerRatio(); ratio >= 0.95 { - r.logger.Debug("peer manager near full ratio, sleeping...", + r.logger.Debug("Peer manager is nearly full", "sleep_period", fullCapacityInterval, "ratio", ratio) return fullCapacityInterval } - // baseTime represents the shortest interval that we can send peer requests - // in. For example if we have 10 peers and we can't send a message to the - // same peer every 500ms, then we can send a request every 50ms. In practice - // we use a safety margin of 2, ergo 100ms - peers := tmmath.MinInt(len(r.availablePeers), 50) - baseTime := minReceiveRequestInterval - if peers > 0 { - baseTime = minReceiveRequestInterval * 2 / time.Duration(peers) + // If there are no available peers to query, poll less aggressively. + if len(r.availablePeers) == 0 { + r.logger.Debug("No available peers to send a PEX request", + "sleep_period", noAvailablePeersWaitPeriod) + return noAvailablePeersWaitPeriod } - if r.totalPeers > 0 || r.discoveryRatio == 0 { - // find the ratio of new peers. NOTE: We add 1 to both sides to avoid - // divide by zero problems - ratio := float32(r.totalPeers+1) / float32(r.newPeers+1) - // square the ratio in order to get non linear time intervals - // NOTE: The longest possible interval for a network with 100 or more peers - // where a node is connected to 50 of them is 2 minutes. - r.discoveryRatio = ratio * ratio - r.newPeers = 0 - r.totalPeers = 0 - } - // NOTE: As ratio is always >= 1, discovery ratio is >= 1. Therefore we don't need to worry - // about the next request time being less than the minimum time - return baseTime * time.Duration(r.discoveryRatio) + // Reaching here, there are available peers to query and the peer store + // still has space. Estimate our knowledge of the network from the latest + // update and choose a new interval. + base := float64(minReceiveRequestInterval) / float64(len(r.availablePeers)) + multiplier := float64(r.totalPeers+1) / float64(added+1) // +1 to avert zero division + return time.Duration(base*multiplier*multiplier) + minReceiveRequestInterval } func (r *Reactor) markPeerRequest(peer types.NodeID) error { r.mtx.Lock() defer r.mtx.Unlock() if lastRequestTime, ok := r.lastReceivedRequests[peer]; ok { - if time.Now().Before(lastRequestTime.Add(minReceiveRequestInterval)) { - return fmt.Errorf("peer sent a request too close after a prior one. Minimum interval: %v", - minReceiveRequestInterval) + if d := time.Since(lastRequestTime); d < minReceiveRequestInterval { + return fmt.Errorf("peer %v sent PEX request too soon (%v < minimum %v)", + peer, d, minReceiveRequestInterval) } } r.lastReceivedRequests[peer] = time.Now() diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index 4319cad20..356d7f435 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -96,7 +96,7 @@ func TestReactorSendsRequestsTooOften(t *testing.T) { peerErr := <-r.pexErrCh require.Error(t, peerErr.Err) require.Empty(t, r.pexOutCh) - require.Contains(t, peerErr.Err.Error(), "peer sent a request too close after a prior one") + require.Contains(t, peerErr.Err.Error(), "sent PEX request too soon") require.Equal(t, badNode, peerErr.NodeID) }