From e9928f6186771e3628620246dd48a850052ed4ea Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 26 May 2021 14:39:04 -0400 Subject: [PATCH] p2p: limit rate of dialing new peers (#6485) --- light/provider/http/http.go | 7 ++ p2p/p2ptest/network.go | 7 +- p2p/pex/reactor.go | 2 - p2p/pex/reactor_test.go | 18 ++--- p2p/router.go | 155 +++++++++++++++++++++++++++--------- p2p/router_test.go | 27 +++++-- 6 files changed, 160 insertions(+), 56 deletions(-) diff --git a/light/provider/http/http.go b/light/provider/http/http.go index a5e2f02d1..5147087e6 100644 --- a/light/provider/http/http.go +++ b/light/provider/http/http.go @@ -2,6 +2,7 @@ package http import ( "context" + "errors" "fmt" "math/rand" "net/url" @@ -122,6 +123,12 @@ func (p *http) LightBlock(ctx context.Context, height int64) (*types.LightBlock, } } + if sh.Header == nil { + return nil, provider.ErrBadLightBlock{ + Reason: errors.New("header is nil unexpectedly"), + } + } + vs, err := p.validatorSet(ctx, &sh.Height) if err != nil { return nil, err diff --git a/p2p/p2ptest/network.go b/p2p/p2ptest/network.go index ecaa1e592..c8e16770f 100644 --- a/p2p/p2ptest/network.go +++ b/p2p/p2ptest/network.go @@ -1,6 +1,7 @@ package p2ptest import ( + "context" "math/rand" "testing" "time" @@ -97,7 +98,7 @@ func (n *Network) Start(t *testing.T) { NodeID: targetNode.NodeID, Status: p2p.PeerStatusUp, }, peerUpdate) - case <-time.After(time.Second): + case <-time.After(3 * time.Second): require.Fail(t, "timed out waiting for peer", "%v dialing %v", sourceNode.NodeID, targetNode.NodeID) } @@ -108,7 +109,7 @@ func (n *Network) Start(t *testing.T) { NodeID: sourceNode.NodeID, Status: p2p.PeerStatusUp, }, peerUpdate) - case <-time.After(time.Second): + case <-time.After(3 * time.Second): require.Fail(t, "timed out waiting for peer", "%v accepting %v", targetNode.NodeID, sourceNode.NodeID) } @@ -252,7 +253,7 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node { privKey, peerManager, []p2p.Transport{transport}, - p2p.RouterOptions{}, + p2p.RouterOptions{DialSleep: func(_ context.Context) {}}, ) require.NoError(t, err) require.NoError(t, router.Start()) diff --git a/p2p/pex/reactor.go b/p2p/pex/reactor.go index edf684e79..698a9d311 100644 --- a/p2p/pex/reactor.go +++ b/p2p/pex/reactor.go @@ -439,8 +439,6 @@ func (r *ReactorV2) sendRequestForPeers() { } // remove the peer from the abvailable peers list and mark it in the requestsSent map - // WAT(tychoish): do we actually want to do this? doesn't this - // just make churn? delete(r.availablePeers, peerID) r.requestsSent[peerID] = struct{}{} diff --git a/p2p/pex/reactor_test.go b/p2p/pex/reactor_test.go index a9e5a8f2e..3ee0a587f 100644 --- a/p2p/pex/reactor_test.go +++ b/p2p/pex/reactor_test.go @@ -49,7 +49,7 @@ func TestReactorBasic(t *testing.T) { func TestReactorConnectFullNetwork(t *testing.T) { testNet := setupNetwork(t, testOptions{ - TotalNodes: 8, + TotalNodes: 4, }) // make every node be only connected with one other node (it actually ends up @@ -174,9 +174,9 @@ func TestReactorErrorsOnReceivingTooManyPeers(t *testing.T) { func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) { testNet := setupNetwork(t, testOptions{ - TotalNodes: 16, - MaxPeers: 8, - MaxConnected: 6, + TotalNodes: 8, + MaxPeers: 4, + MaxConnected: 3, BufferSize: 8, }) testNet.connectN(t, 1) @@ -193,9 +193,9 @@ func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) { func TestReactorLargePeerStoreInASmallNetwork(t *testing.T) { testNet := setupNetwork(t, testOptions{ - TotalNodes: 5, - MaxPeers: 50, - MaxConnected: 50, + TotalNodes: 3, + MaxPeers: 25, + MaxConnected: 25, BufferSize: 5, }) testNet.connectN(t, 1) @@ -740,7 +740,7 @@ func (r *reactorTestSuite) connectPeers(t *testing.T, sourceNode, targetNode int Status: p2p.PeerStatusUp, }, peerUpdate) r.logger.Debug("target connected with source") - case <-time.After(time.Second): + case <-time.After(2 * time.Second): require.Fail(t, "timed out waiting for peer", "%v accepting %v", targetNode, sourceNode) } @@ -752,7 +752,7 @@ func (r *reactorTestSuite) connectPeers(t *testing.T, sourceNode, targetNode int Status: p2p.PeerStatusUp, }, peerUpdate) r.logger.Debug("source connected with target") - case <-time.After(time.Second): + case <-time.After(2 * time.Second): require.Fail(t, "timed out waiting for peer", "%v dialing %v", sourceNode, targetNode) } diff --git a/p2p/router.go b/p2p/router.go index e9ecda1eb..94498d46c 100644 --- a/p2p/router.go +++ b/p2p/router.go @@ -5,7 +5,9 @@ import ( "errors" "fmt" "io" + "math/rand" "net" + "runtime" "sync" "time" @@ -155,6 +157,17 @@ type RouterOptions struct { // IP address to filter before the handshake. Functions should // return an error to reject the peer. FilterPeerByID func(context.Context, NodeID) error + + // DialSleep controls the amount of time that the router + // sleeps between dialing peers. If not set, a default value + // is used that sleeps for a (random) amount of time up to 3 + // seconds between submitting each peer to be dialed. + DialSleep func(context.Context) + + // NumConcrruentDials controls how many parallel go routines + // are used to dial peers. This defaults to the value of + // runtime.NumCPU. + NumConcurrentDials func() int } const ( @@ -169,7 +182,7 @@ func (o *RouterOptions) Validate() error { case "": o.QueueType = queueTypeFifo case queueTypeFifo, queueTypeWDRR, queueTypePriority: - // pass + // passI me default: return fmt.Errorf("queue type %q is not supported", o.QueueType) } @@ -480,6 +493,14 @@ func (r *Router) routeChannel( } } +func (r *Router) numConccurentDials() int { + if r.options.NumConcurrentDials == nil { + return runtime.NumCPU() + } + + return r.options.NumConcurrentDials() +} + func (r *Router) filterPeersIP(ctx context.Context, ip net.IP, port uint16) error { if r.options.FilterPeerByIP == nil { return nil @@ -496,6 +517,23 @@ func (r *Router) filterPeersID(ctx context.Context, id NodeID) error { return r.options.FilterPeerByID(ctx, id) } +func (r *Router) dialSleep(ctx context.Context) { + if r.options.DialSleep == nil { + // nolint:gosec // G404: Use of weak random number generator + timer := time.NewTimer(time.Duration(rand.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond) + defer timer.Stop() + + select { + case <-ctx.Done(): + case <-timer.C: + } + + return + } + + r.options.DialSleep(ctx) +} + // acceptPeers accepts inbound connections from peers on the given transport, // and spawns goroutines that route messages to/from them. func (r *Router) acceptPeers(transport Transport) { @@ -585,55 +623,98 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) { func (r *Router) dialPeers() { r.logger.Debug("starting dial routine") ctx := r.stopCtx() + + addresses := make(chan NodeAddress) + wg := &sync.WaitGroup{} + + // Start a limited number of goroutines to dial peers in + // parallel. the goal is to avoid starting an unbounded number + // of goroutines thereby spamming the network, but also being + // able to add peers at a reasonable pace, though the number + // is somewhat arbitrary. The action is further throttled by a + // sleep after sending to the addresses channel. + for i := 0; i < r.numConccurentDials(); i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + case address := <-addresses: + r.connectPeer(ctx, address) + } + } + }() + } + +LOOP: for { address, err := r.peerManager.DialNext(ctx) switch { case errors.Is(err, context.Canceled): r.logger.Debug("stopping dial routine") - return + break LOOP case err != nil: r.logger.Error("failed to find next peer to dial", "err", err) - return + break LOOP } - // Spawn off a goroutine to actually dial the peer, so that we can - // dial multiple peers in parallel. - go func() { - conn, err := r.dialPeer(ctx, address) - switch { - case errors.Is(err, context.Canceled): - return - case err != nil: - r.logger.Error("failed to dial peer", "peer", address, "err", err) - if err = r.peerManager.DialFailed(address); err != nil { - r.logger.Error("failed to report dial failure", "peer", address, "err", err) - } - return - } - defer conn.Close() + select { + case addresses <- address: + // this jitters the frequency that we call + // DialNext and prevents us from attempting to + // create connections too quickly. - peerID := address.NodeID - _, _, err = r.handshakePeer(ctx, conn, peerID) - switch { - case errors.Is(err, context.Canceled): - return - case err != nil: - r.logger.Error("failed to handshake with peer", "peer", address, "err", err) - if err = r.peerManager.DialFailed(address); err != nil { - r.logger.Error("failed to report dial failure", "peer", address, "err", err) - } - return - } + r.dialSleep(ctx) + continue + case <-ctx.Done(): + close(addresses) + break LOOP + } + } - if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil { - r.logger.Error("failed to accept connection", - "op", "outgoing/dialing", "peer", address.NodeID, "err", err) - return - } + wg.Wait() +} - r.routePeer(peerID, conn) - }() +func (r *Router) connectPeer(ctx context.Context, address NodeAddress) { + conn, err := r.dialPeer(ctx, address) + switch { + case errors.Is(err, context.Canceled): + return + case err != nil: + r.logger.Error("failed to dial peer", "peer", address, "err", err) + if err = r.peerManager.DialFailed(address); err != nil { + r.logger.Error("failed to report dial failure", "peer", address, "err", err) + } + return } + + _, _, err = r.handshakePeer(ctx, conn, address.NodeID) + switch { + case errors.Is(err, context.Canceled): + conn.Close() + return + case err != nil: + r.logger.Error("failed to handshake with peer", "peer", address, "err", err) + if err = r.peerManager.DialFailed(address); err != nil { + r.logger.Error("failed to report dial failure", "peer", address, "err", err) + } + conn.Close() + return + } + + if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil { + r.logger.Error("failed to dial peer", + "op", "outgoing/dialing", "peer", address.NodeID, "err", err) + conn.Close() + return + + } + + // routePeer (also) calls connection close + go r.routePeer(address.NodeID, conn) } func (r *Router) getOrMakeQueue(peerID NodeID) queue { diff --git a/p2p/router_test.go b/p2p/router_test.go index 2e0d9bbb3..a1aaab3d1 100644 --- a/p2p/router_test.go +++ b/p2p/router_test.go @@ -1,9 +1,11 @@ package p2p_test import ( + "context" "errors" "fmt" "io" + "runtime" "strings" "sync" "testing" @@ -670,16 +672,31 @@ func TestRouter_DialPeers_Parallel(t *testing.T) { selfKey, peerManager, []p2p.Transport{mockTransport}, - p2p.RouterOptions{}, + p2p.RouterOptions{ + DialSleep: func(_ context.Context) {}, + NumConcurrentDials: func() int { + ncpu := runtime.NumCPU() + if ncpu <= 3 { + return 3 + } + return ncpu + }, + }, ) + require.NoError(t, err) require.NoError(t, router.Start()) - require.Eventually(t, func() bool { - return len(dialCh) == 3 - }, time.Second, 10*time.Millisecond) + require.Eventually(t, + func() bool { + return len(dialCh) == 3 + }, + 5*time.Second, + 100*time.Millisecond, + "reached %d rather than 3", len(dialCh)) + close(closeCh) - time.Sleep(100 * time.Millisecond) + time.Sleep(500 * time.Millisecond) require.NoError(t, router.Stop()) mockTransport.AssertExpectations(t)