Browse Source

Merge remote-tracking branch 'origin/master' into node-initalization-avoid-panic

pull/8091/head
tycho garen 3 years ago
parent
commit
f11a0b4c47
4 changed files with 104 additions and 121 deletions
  1. +1
    -1
      go.mod
  2. +2
    -1
      go.sum
  3. +100
    -118
      internal/p2p/pex/reactor.go
  4. +1
    -1
      internal/p2p/pex/reactor_test.go

+ 1
- 1
go.mod View File

@ -26,7 +26,7 @@ require (
github.com/rs/cors v1.8.2 github.com/rs/cors v1.8.2
github.com/rs/zerolog v1.26.1 github.com/rs/zerolog v1.26.1
github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa
github.com/spf13/cobra v1.3.0
github.com/spf13/cobra v1.4.0
github.com/spf13/viper v1.10.1 github.com/spf13/viper v1.10.1
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
github.com/tendermint/tm-db v0.6.6 github.com/tendermint/tm-db v0.6.6


+ 2
- 1
go.sum View File

@ -940,8 +940,9 @@ github.com/spf13/cast v1.4.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkU
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/cobra v1.3.0 h1:R7cSvGu+Vv+qX0gW5R/85dx2kmmJT5z5NM8ifdYjdn0=
github.com/spf13/cobra v1.3.0/go.mod h1:BrRVncBjOJa/eUcVVm9CE+oC6as8k+VYr4NY7WCi9V4= github.com/spf13/cobra v1.3.0/go.mod h1:BrRVncBjOJa/eUcVVm9CE+oC6as8k+VYr4NY7WCi9V4=
github.com/spf13/cobra v1.4.0 h1:y+wJpx64xcgO1V+RcnwW0LEHxTKRi2ZDPSBjWnrg88Q=
github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=


+ 100
- 118
internal/p2p/pex/reactor.go View File

@ -10,7 +10,6 @@ import (
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn" "github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/libs/service"
protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p" protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -42,7 +41,7 @@ const (
minReceiveRequestInterval = 100 * time.Millisecond minReceiveRequestInterval = 100 * time.Millisecond
// the maximum amount of addresses that can be included in a response // the maximum amount of addresses that can be included in a response
maxAddresses uint16 = 100
maxAddresses = 100
// How long to wait when there are no peers available before trying again // How long to wait when there are no peers available before trying again
noAvailablePeersWaitPeriod = 1 * time.Second noAvailablePeersWaitPeriod = 1 * time.Second
@ -100,15 +99,8 @@ type Reactor struct {
// minReceiveRequestInterval). // minReceiveRequestInterval).
lastReceivedRequests map[types.NodeID]time.Time lastReceivedRequests map[types.NodeID]time.Time
// keep track of how many new peers to existing peers we have received to
// extrapolate the size of the network
newPeers uint32
totalPeers uint32
// discoveryRatio is the inverse ratio of new peers to old peers squared.
// This is multiplied by the minimum duration to calculate how long to wait
// between each request.
discoveryRatio float32
// the total number of unique peers added
totalPeers int
} }
// NewReactor returns a reference to a new reactor. // NewReactor returns a reference to a new reactor.
@ -156,16 +148,6 @@ func (r *Reactor) OnStop() {}
// processPexCh implements a blocking event loop where we listen for p2p // processPexCh implements a blocking event loop where we listen for p2p
// Envelope messages from the pexCh. // Envelope messages from the pexCh.
func (r *Reactor) processPexCh(ctx context.Context) { func (r *Reactor) processPexCh(ctx context.Context) {
timer := time.NewTimer(0)
defer timer.Stop()
r.mtx.Lock()
var (
duration = r.calculateNextRequestTime()
err error
)
r.mtx.Unlock()
incoming := make(chan *p2p.Envelope) incoming := make(chan *p2p.Envelope)
go func() { go func() {
defer close(incoming) defer close(incoming)
@ -179,36 +161,51 @@ func (r *Reactor) processPexCh(ctx context.Context) {
} }
}() }()
// Initially, we will request peers quickly to bootstrap. This duration
// will be adjusted upward as knowledge of the network grows.
var nextPeerRequest = minReceiveRequestInterval
timer := time.NewTimer(0)
defer timer.Stop()
for { for {
timer.Reset(duration)
timer.Reset(nextPeerRequest)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
// outbound requests for new peers
case <-timer.C: case <-timer.C:
duration, err = r.sendRequestForPeers(ctx)
if err != nil {
// Send a request for more peer addresses.
if err := r.sendRequestForPeers(ctx); err != nil {
return return
// TODO(creachadair): Do we really want to stop processing the PEX
// channel just because of an error here?
} }
// inbound requests for new peers or responses to requests sent by this
// reactor
// Note we do not update the poll timer upon making a request, only
// when we receive an update that updates our priors.
case envelope, ok := <-incoming: case envelope, ok := <-incoming:
if !ok { if !ok {
return
return // channel closed
} }
duration, err = r.handleMessage(ctx, r.pexCh.ID, envelope)
// A request from another peer, or a response to one of our requests.
dur, err := r.handleMessage(ctx, r.pexCh.ID, envelope)
if err != nil { if err != nil {
r.logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err)
r.logger.Error("failed to process message",
"ch_id", r.pexCh.ID, "envelope", envelope, "err", err)
if serr := r.pexCh.SendError(ctx, p2p.PeerError{ if serr := r.pexCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From, NodeID: envelope.From,
Err: err, Err: err,
}); serr != nil { }); serr != nil {
return return
} }
} else if dur != 0 {
// We got a useful result; update the poll timer.
nextPeerRequest = dur
} }
} }
} }
} }
@ -228,19 +225,20 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
} }
// handlePexMessage handles envelopes sent from peers on the PexChannel. // handlePexMessage handles envelopes sent from peers on the PexChannel.
// If an update was received, a new polling interval is returned; otherwise the
// duration is 0.
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) (time.Duration, error) { func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) (time.Duration, error) {
logger := r.logger.With("peer", envelope.From) logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) { switch msg := envelope.Message.(type) {
case *protop2p.PexRequest: case *protop2p.PexRequest:
// check if the peer hasn't sent a prior request too close to this one
// in time
// Verify that this peer hasn't sent us another request too recently.
if err := r.markPeerRequest(envelope.From); err != nil { if err := r.markPeerRequest(envelope.From); err != nil {
return time.Minute, err
return 0, err
} }
// request peers from the peer manager and parse the NodeAddresses into
// URL strings
// Fetch peers from the peer manager, convert NodeAddresses into URL
// strings, and send them back to the caller.
nodeAddresses := r.peerManager.Advertise(envelope.From, maxAddresses) nodeAddresses := r.peerManager.Advertise(envelope.From, maxAddresses)
pexAddresses := make([]protop2p.PexAddress, len(nodeAddresses)) pexAddresses := make([]protop2p.PexAddress, len(nodeAddresses))
for idx, addr := range nodeAddresses { for idx, addr := range nodeAddresses {
@ -248,28 +246,24 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope)
URL: addr.String(), URL: addr.String(),
} }
} }
if err := r.pexCh.Send(ctx, p2p.Envelope{
return 0, r.pexCh.Send(ctx, p2p.Envelope{
To: envelope.From, To: envelope.From,
Message: &protop2p.PexResponse{Addresses: pexAddresses}, Message: &protop2p.PexResponse{Addresses: pexAddresses},
}); err != nil {
return 0, err
}
})
return time.Second, nil
case *protop2p.PexResponse: case *protop2p.PexResponse:
// check if the response matches a request that was made to that peer
// Verify that this response corresponds to one of our pending requests.
if err := r.markPeerResponse(envelope.From); err != nil { if err := r.markPeerResponse(envelope.From); err != nil {
return time.Minute, err
return 0, err
} }
// check the size of the response
if len(msg.Addresses) > int(maxAddresses) {
return 10 * time.Minute, fmt.Errorf("peer sent too many addresses (max: %d, got: %d)",
maxAddresses,
len(msg.Addresses),
)
// Verify that the response does not exceed the safety limit.
if len(msg.Addresses) > maxAddresses {
return 0, fmt.Errorf("peer sent too many addresses (%d > maxiumum %d)",
len(msg.Addresses), maxAddresses)
} }
var numAdded int
for _, pexAddress := range msg.Addresses { for _, pexAddress := range msg.Addresses {
peerAddress, err := p2p.ParseNodeAddress(pexAddress.URL) peerAddress, err := p2p.ParseNodeAddress(pexAddress.URL)
if err != nil { if err != nil {
@ -278,24 +272,24 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope)
added, err := r.peerManager.Add(peerAddress) added, err := r.peerManager.Add(peerAddress)
if err != nil { if err != nil {
logger.Error("failed to add PEX address", "address", peerAddress, "err", err) logger.Error("failed to add PEX address", "address", peerAddress, "err", err)
continue
} }
if added { if added {
r.newPeers++
numAdded++
logger.Debug("added PEX address", "address", peerAddress) logger.Debug("added PEX address", "address", peerAddress)
} }
r.totalPeers++
} }
return 10 * time.Minute, nil
return r.calculateNextRequestTime(numAdded), nil
default: default:
return time.Second, fmt.Errorf("received unknown message: %T", msg)
return 0, fmt.Errorf("received unknown message: %T", msg)
} }
} }
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (duration time.Duration, err error) {
// handleMessage handles an Envelope sent from a peer on the specified Channel.
// This method will convert a panic in message handling as an error.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (_ time.Duration, err error) {
defer func() { defer func() {
if e := recover(); e != nil { if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e) err = fmt.Errorf("panic in processing message: %v", e)
@ -309,14 +303,10 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
r.logger.Debug("received PEX message", "peer", envelope.From) r.logger.Debug("received PEX message", "peer", envelope.From)
switch chID {
case p2p.ChannelID(PexChannel):
duration, err = r.handlePexMessage(ctx, envelope)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
if chID == p2p.ChannelID(PexChannel) {
return r.handlePexMessage(ctx, envelope)
} }
return
return 0, fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
} }
// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we // processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we
@ -338,95 +328,87 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
} }
} }
// sendRequestForPeers pops the first peerID off the list and sends the
// peer a request for more peer addresses. The function then moves the
// peer into the requestsSent bucket and calculates when the next request
// time should be
func (r *Reactor) sendRequestForPeers(ctx context.Context) (time.Duration, error) {
// sendRequestForPeers chooses a peer from the set of available peers and sends
// that peer a request for more peer addresses. The chosen peer is moved into
// the requestsSent bucket so that we will not attempt to contact them again
// until they've replied or updated.
func (r *Reactor) sendRequestForPeers(ctx context.Context) error {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock() defer r.mtx.Unlock()
if len(r.availablePeers) == 0 { if len(r.availablePeers) == 0 {
// no peers are available // no peers are available
r.logger.Debug("no available peers to send request to, waiting...")
return noAvailablePeersWaitPeriod, nil
r.logger.Debug("no available peers to send a PEX request to (retrying)")
return nil
} }
var peerID types.NodeID
// use range to get a random peer.
// Select an arbitrary peer from the available set.
var peerID types.NodeID
for peerID = range r.availablePeers { for peerID = range r.availablePeers {
break break
} }
// send out the pex request
if err := r.pexCh.Send(ctx, p2p.Envelope{ if err := r.pexCh.Send(ctx, p2p.Envelope{
To: peerID, To: peerID,
Message: &protop2p.PexRequest{}, Message: &protop2p.PexRequest{},
}); err != nil { }); err != nil {
return 0, err
return err
} }
// remove the peer from the abvailable peers list and mark it in the requestsSent map
// Move the peer from available to pending.
delete(r.availablePeers, peerID) delete(r.availablePeers, peerID)
r.requestsSent[peerID] = struct{}{} r.requestsSent[peerID] = struct{}{}
dur := r.calculateNextRequestTime()
r.logger.Debug("peer request sent", "next_request_time", dur)
return dur, nil
return nil
} }
// calculateNextRequestTime implements something of a proportional controller
// to estimate how often the reactor should be requesting new peer addresses.
// The dependent variable in this calculation is the ratio of new peers to
// all peers that the reactor receives. The interval is thus calculated as the
// inverse squared. In the beginning, all peers should be new peers.
// We expect this ratio to be near 1 and thus the interval to be as short
// as possible. As the node becomes more familiar with the network the ratio of
// new nodes will plummet to a very small number, meaning the interval expands
// to its upper bound.
// calculateNextRequestTime selects how long we should wait before attempting
// to send out another request for peer addresses.
//
// This implements a simplified proportional control mechanism to poll more
// often when our knowledge of the network is incomplete, and less often as our
// knowledge grows. To estimate our knowledge of the network, we use the
// fraction of "new" peers (addresses we have not previously seen) to the total
// so far observed. When we first join the network, this fraction will be close
// to 1, meaning most new peers are "new" to us, and as we discover more peers,
// the fraction will go toward zero.
// //
// CONTRACT: The caller must hold r.mtx exclusively when calling this method.
func (r *Reactor) calculateNextRequestTime() time.Duration {
// check if the peer store is full. If so then there is no need
// to send peer requests too often
// The minimum interval will be minReceiveRequestInterval to ensure we will not
// request from any peer more often than we would allow them to do from us.
func (r *Reactor) calculateNextRequestTime(added int) time.Duration {
r.mtx.Lock()
defer r.mtx.Unlock()
r.totalPeers += added
// If the peer store is nearly full, wait the maximum interval.
if ratio := r.peerManager.PeerRatio(); ratio >= 0.95 { if ratio := r.peerManager.PeerRatio(); ratio >= 0.95 {
r.logger.Debug("peer manager near full ratio, sleeping...",
r.logger.Debug("Peer manager is nearly full",
"sleep_period", fullCapacityInterval, "ratio", ratio) "sleep_period", fullCapacityInterval, "ratio", ratio)
return fullCapacityInterval return fullCapacityInterval
} }
// baseTime represents the shortest interval that we can send peer requests
// in. For example if we have 10 peers and we can't send a message to the
// same peer every 500ms, then we can send a request every 50ms. In practice
// we use a safety margin of 2, ergo 100ms
peers := tmmath.MinInt(len(r.availablePeers), 50)
baseTime := minReceiveRequestInterval
if peers > 0 {
baseTime = minReceiveRequestInterval * 2 / time.Duration(peers)
// If there are no available peers to query, poll less aggressively.
if len(r.availablePeers) == 0 {
r.logger.Debug("No available peers to send a PEX request",
"sleep_period", noAvailablePeersWaitPeriod)
return noAvailablePeersWaitPeriod
} }
if r.totalPeers > 0 || r.discoveryRatio == 0 {
// find the ratio of new peers. NOTE: We add 1 to both sides to avoid
// divide by zero problems
ratio := float32(r.totalPeers+1) / float32(r.newPeers+1)
// square the ratio in order to get non linear time intervals
// NOTE: The longest possible interval for a network with 100 or more peers
// where a node is connected to 50 of them is 2 minutes.
r.discoveryRatio = ratio * ratio
r.newPeers = 0
r.totalPeers = 0
}
// NOTE: As ratio is always >= 1, discovery ratio is >= 1. Therefore we don't need to worry
// about the next request time being less than the minimum time
return baseTime * time.Duration(r.discoveryRatio)
// Reaching here, there are available peers to query and the peer store
// still has space. Estimate our knowledge of the network from the latest
// update and choose a new interval.
base := float64(minReceiveRequestInterval) / float64(len(r.availablePeers))
multiplier := float64(r.totalPeers+1) / float64(added+1) // +1 to avert zero division
return time.Duration(base*multiplier*multiplier) + minReceiveRequestInterval
} }
func (r *Reactor) markPeerRequest(peer types.NodeID) error { func (r *Reactor) markPeerRequest(peer types.NodeID) error {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock() defer r.mtx.Unlock()
if lastRequestTime, ok := r.lastReceivedRequests[peer]; ok { if lastRequestTime, ok := r.lastReceivedRequests[peer]; ok {
if time.Now().Before(lastRequestTime.Add(minReceiveRequestInterval)) {
return fmt.Errorf("peer sent a request too close after a prior one. Minimum interval: %v",
minReceiveRequestInterval)
if d := time.Since(lastRequestTime); d < minReceiveRequestInterval {
return fmt.Errorf("peer %v sent PEX request too soon (%v < minimum %v)",
peer, d, minReceiveRequestInterval)
} }
} }
r.lastReceivedRequests[peer] = time.Now() r.lastReceivedRequests[peer] = time.Now()


+ 1
- 1
internal/p2p/pex/reactor_test.go View File

@ -96,7 +96,7 @@ func TestReactorSendsRequestsTooOften(t *testing.T) {
peerErr := <-r.pexErrCh peerErr := <-r.pexErrCh
require.Error(t, peerErr.Err) require.Error(t, peerErr.Err)
require.Empty(t, r.pexOutCh) require.Empty(t, r.pexOutCh)
require.Contains(t, peerErr.Err.Error(), "peer sent a request too close after a prior one")
require.Contains(t, peerErr.Err.Error(), "sent PEX request too soon")
require.Equal(t, badNode, peerErr.NodeID) require.Equal(t, badNode, peerErr.NodeID)
} }


Loading…
Cancel
Save