Browse Source

p2p: limit rate of dialing new peers (#6485)

pull/6496/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
e9928f6186
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 160 additions and 56 deletions
  1. +7
    -0
      light/provider/http/http.go
  2. +4
    -3
      p2p/p2ptest/network.go
  3. +0
    -2
      p2p/pex/reactor.go
  4. +9
    -9
      p2p/pex/reactor_test.go
  5. +118
    -37
      p2p/router.go
  6. +22
    -5
      p2p/router_test.go

+ 7
- 0
light/provider/http/http.go View File

@ -2,6 +2,7 @@ package http
import (
"context"
"errors"
"fmt"
"math/rand"
"net/url"
@ -122,6 +123,12 @@ func (p *http) LightBlock(ctx context.Context, height int64) (*types.LightBlock,
}
}
if sh.Header == nil {
return nil, provider.ErrBadLightBlock{
Reason: errors.New("header is nil unexpectedly"),
}
}
vs, err := p.validatorSet(ctx, &sh.Height)
if err != nil {
return nil, err


+ 4
- 3
p2p/p2ptest/network.go View File

@ -1,6 +1,7 @@
package p2ptest
import (
"context"
"math/rand"
"testing"
"time"
@ -97,7 +98,7 @@ func (n *Network) Start(t *testing.T) {
NodeID: targetNode.NodeID,
Status: p2p.PeerStatusUp,
}, peerUpdate)
case <-time.After(time.Second):
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for peer", "%v dialing %v",
sourceNode.NodeID, targetNode.NodeID)
}
@ -108,7 +109,7 @@ func (n *Network) Start(t *testing.T) {
NodeID: sourceNode.NodeID,
Status: p2p.PeerStatusUp,
}, peerUpdate)
case <-time.After(time.Second):
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for peer", "%v accepting %v",
targetNode.NodeID, sourceNode.NodeID)
}
@ -252,7 +253,7 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node {
privKey,
peerManager,
[]p2p.Transport{transport},
p2p.RouterOptions{},
p2p.RouterOptions{DialSleep: func(_ context.Context) {}},
)
require.NoError(t, err)
require.NoError(t, router.Start())


+ 0
- 2
p2p/pex/reactor.go View File

@ -439,8 +439,6 @@ func (r *ReactorV2) sendRequestForPeers() {
}
// remove the peer from the abvailable peers list and mark it in the requestsSent map
// WAT(tychoish): do we actually want to do this? doesn't this
// just make churn?
delete(r.availablePeers, peerID)
r.requestsSent[peerID] = struct{}{}


+ 9
- 9
p2p/pex/reactor_test.go View File

@ -49,7 +49,7 @@ func TestReactorBasic(t *testing.T) {
func TestReactorConnectFullNetwork(t *testing.T) {
testNet := setupNetwork(t, testOptions{
TotalNodes: 8,
TotalNodes: 4,
})
// make every node be only connected with one other node (it actually ends up
@ -174,9 +174,9 @@ func TestReactorErrorsOnReceivingTooManyPeers(t *testing.T) {
func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) {
testNet := setupNetwork(t, testOptions{
TotalNodes: 16,
MaxPeers: 8,
MaxConnected: 6,
TotalNodes: 8,
MaxPeers: 4,
MaxConnected: 3,
BufferSize: 8,
})
testNet.connectN(t, 1)
@ -193,9 +193,9 @@ func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) {
func TestReactorLargePeerStoreInASmallNetwork(t *testing.T) {
testNet := setupNetwork(t, testOptions{
TotalNodes: 5,
MaxPeers: 50,
MaxConnected: 50,
TotalNodes: 3,
MaxPeers: 25,
MaxConnected: 25,
BufferSize: 5,
})
testNet.connectN(t, 1)
@ -740,7 +740,7 @@ func (r *reactorTestSuite) connectPeers(t *testing.T, sourceNode, targetNode int
Status: p2p.PeerStatusUp,
}, peerUpdate)
r.logger.Debug("target connected with source")
case <-time.After(time.Second):
case <-time.After(2 * time.Second):
require.Fail(t, "timed out waiting for peer", "%v accepting %v",
targetNode, sourceNode)
}
@ -752,7 +752,7 @@ func (r *reactorTestSuite) connectPeers(t *testing.T, sourceNode, targetNode int
Status: p2p.PeerStatusUp,
}, peerUpdate)
r.logger.Debug("source connected with target")
case <-time.After(time.Second):
case <-time.After(2 * time.Second):
require.Fail(t, "timed out waiting for peer", "%v dialing %v",
sourceNode, targetNode)
}


+ 118
- 37
p2p/router.go View File

@ -5,7 +5,9 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net"
"runtime"
"sync"
"time"
@ -155,6 +157,17 @@ type RouterOptions struct {
// IP address to filter before the handshake. Functions should
// return an error to reject the peer.
FilterPeerByID func(context.Context, NodeID) error
// DialSleep controls the amount of time that the router
// sleeps between dialing peers. If not set, a default value
// is used that sleeps for a (random) amount of time up to 3
// seconds between submitting each peer to be dialed.
DialSleep func(context.Context)
// NumConcrruentDials controls how many parallel go routines
// are used to dial peers. This defaults to the value of
// runtime.NumCPU.
NumConcurrentDials func() int
}
const (
@ -169,7 +182,7 @@ func (o *RouterOptions) Validate() error {
case "":
o.QueueType = queueTypeFifo
case queueTypeFifo, queueTypeWDRR, queueTypePriority:
// pass
// passI me
default:
return fmt.Errorf("queue type %q is not supported", o.QueueType)
}
@ -480,6 +493,14 @@ func (r *Router) routeChannel(
}
}
func (r *Router) numConccurentDials() int {
if r.options.NumConcurrentDials == nil {
return runtime.NumCPU()
}
return r.options.NumConcurrentDials()
}
func (r *Router) filterPeersIP(ctx context.Context, ip net.IP, port uint16) error {
if r.options.FilterPeerByIP == nil {
return nil
@ -496,6 +517,23 @@ func (r *Router) filterPeersID(ctx context.Context, id NodeID) error {
return r.options.FilterPeerByID(ctx, id)
}
func (r *Router) dialSleep(ctx context.Context) {
if r.options.DialSleep == nil {
// nolint:gosec // G404: Use of weak random number generator
timer := time.NewTimer(time.Duration(rand.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond)
defer timer.Stop()
select {
case <-ctx.Done():
case <-timer.C:
}
return
}
r.options.DialSleep(ctx)
}
// acceptPeers accepts inbound connections from peers on the given transport,
// and spawns goroutines that route messages to/from them.
func (r *Router) acceptPeers(transport Transport) {
@ -585,55 +623,98 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) {
func (r *Router) dialPeers() {
r.logger.Debug("starting dial routine")
ctx := r.stopCtx()
addresses := make(chan NodeAddress)
wg := &sync.WaitGroup{}
// Start a limited number of goroutines to dial peers in
// parallel. the goal is to avoid starting an unbounded number
// of goroutines thereby spamming the network, but also being
// able to add peers at a reasonable pace, though the number
// is somewhat arbitrary. The action is further throttled by a
// sleep after sending to the addresses channel.
for i := 0; i < r.numConccurentDials(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case address := <-addresses:
r.connectPeer(ctx, address)
}
}
}()
}
LOOP:
for {
address, err := r.peerManager.DialNext(ctx)
switch {
case errors.Is(err, context.Canceled):
r.logger.Debug("stopping dial routine")
return
break LOOP
case err != nil:
r.logger.Error("failed to find next peer to dial", "err", err)
return
break LOOP
}
// Spawn off a goroutine to actually dial the peer, so that we can
// dial multiple peers in parallel.
go func() {
conn, err := r.dialPeer(ctx, address)
switch {
case errors.Is(err, context.Canceled):
return
case err != nil:
r.logger.Error("failed to dial peer", "peer", address, "err", err)
if err = r.peerManager.DialFailed(address); err != nil {
r.logger.Error("failed to report dial failure", "peer", address, "err", err)
}
return
}
defer conn.Close()
select {
case addresses <- address:
// this jitters the frequency that we call
// DialNext and prevents us from attempting to
// create connections too quickly.
peerID := address.NodeID
_, _, err = r.handshakePeer(ctx, conn, peerID)
switch {
case errors.Is(err, context.Canceled):
return
case err != nil:
r.logger.Error("failed to handshake with peer", "peer", address, "err", err)
if err = r.peerManager.DialFailed(address); err != nil {
r.logger.Error("failed to report dial failure", "peer", address, "err", err)
}
return
}
r.dialSleep(ctx)
continue
case <-ctx.Done():
close(addresses)
break LOOP
}
}
if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil {
r.logger.Error("failed to accept connection",
"op", "outgoing/dialing", "peer", address.NodeID, "err", err)
return
}
wg.Wait()
}
r.routePeer(peerID, conn)
}()
func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
conn, err := r.dialPeer(ctx, address)
switch {
case errors.Is(err, context.Canceled):
return
case err != nil:
r.logger.Error("failed to dial peer", "peer", address, "err", err)
if err = r.peerManager.DialFailed(address); err != nil {
r.logger.Error("failed to report dial failure", "peer", address, "err", err)
}
return
}
_, _, err = r.handshakePeer(ctx, conn, address.NodeID)
switch {
case errors.Is(err, context.Canceled):
conn.Close()
return
case err != nil:
r.logger.Error("failed to handshake with peer", "peer", address, "err", err)
if err = r.peerManager.DialFailed(address); err != nil {
r.logger.Error("failed to report dial failure", "peer", address, "err", err)
}
conn.Close()
return
}
if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil {
r.logger.Error("failed to dial peer",
"op", "outgoing/dialing", "peer", address.NodeID, "err", err)
conn.Close()
return
}
// routePeer (also) calls connection close
go r.routePeer(address.NodeID, conn)
}
func (r *Router) getOrMakeQueue(peerID NodeID) queue {


+ 22
- 5
p2p/router_test.go View File

@ -1,9 +1,11 @@
package p2p_test
import (
"context"
"errors"
"fmt"
"io"
"runtime"
"strings"
"sync"
"testing"
@ -670,16 +672,31 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
p2p.RouterOptions{},
p2p.RouterOptions{
DialSleep: func(_ context.Context) {},
NumConcurrentDials: func() int {
ncpu := runtime.NumCPU()
if ncpu <= 3 {
return 3
}
return ncpu
},
},
)
require.NoError(t, err)
require.NoError(t, router.Start())
require.Eventually(t, func() bool {
return len(dialCh) == 3
}, time.Second, 10*time.Millisecond)
require.Eventually(t,
func() bool {
return len(dialCh) == 3
},
5*time.Second,
100*time.Millisecond,
"reached %d rather than 3", len(dialCh))
close(closeCh)
time.Sleep(100 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
require.NoError(t, router.Stop())
mockTransport.AssertExpectations(t)


Loading…
Cancel
Save