Browse Source

p2p: tighten up and test PeerManager (#6034)

This tightens up the `PeerManager` and related code, adds a ton of tests, and fixes a bunch of inconsistencies and bugs.
pull/6043/head
Erik Grinaker 3 years ago
committed by GitHub
parent
commit
2aad26e2f1
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 3066 additions and 1281 deletions
  1. +16
    -20
      blockchain/v0/reactor.go
  2. +8
    -8
      blockchain/v0/reactor_test.go
  3. +10
    -11
      evidence/reactor.go
  4. +11
    -11
      evidence/reactor_test.go
  5. +30
    -0
      libs/sync/waker.go
  6. +47
    -0
      libs/sync/waker_test.go
  7. +12
    -13
      mempool/reactor.go
  8. +7
    -7
      mempool/reactor_test.go
  9. +1
    -2
      p2p/address.go
  10. +1
    -1
      p2p/address_test.go
  11. +15
    -0
      p2p/channel.go
  12. +0
    -1157
      p2p/peer.go
  13. +1275
    -0
      p2p/peermanager.go
  14. +1580
    -0
      p2p/peermanager_test.go
  15. +6
    -7
      p2p/pex/reactor.go
  16. +19
    -13
      p2p/router.go
  17. +6
    -7
      p2p/router_test.go
  18. +6
    -6
      p2p/shim.go
  19. +2
    -2
      p2p/shim_test.go
  20. +11
    -13
      statesync/reactor.go
  21. +3
    -3
      statesync/reactor_test.go

+ 16
- 20
blockchain/v0/reactor.go View File

@ -84,7 +84,7 @@ type Reactor struct {
fastSync bool
blockchainCh *p2p.Channel
peerUpdates *p2p.PeerUpdatesCh
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
requestsCh <-chan BlockRequest
@ -104,7 +104,7 @@ func NewReactor(
store *store.BlockStore,
consReactor consensusReactor,
blockchainCh *p2p.Channel,
peerUpdates *p2p.PeerUpdatesCh,
peerUpdates *p2p.PeerUpdates,
fastSync bool,
) (*Reactor, error) {
if state.LastBlockHeight != store.Height() {
@ -288,9 +288,8 @@ func (r *Reactor) processBlockchainCh() {
if err := r.handleMessage(r.blockchainCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.blockchainCh.ID(), "envelope", envelope, "err", err)
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
NodeID: envelope.From,
Err: err,
}
}
@ -303,26 +302,26 @@ func (r *Reactor) processBlockchainCh() {
// processPeerUpdate processes a PeerUpdate.
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status)
r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
// XXX: Pool#RedoRequest can sometimes give us an empty peer.
if len(peerUpdate.PeerID) == 0 {
if len(peerUpdate.NodeID) == 0 {
return
}
switch peerUpdate.Status {
case p2p.PeerStatusNew, p2p.PeerStatusUp:
case p2p.PeerStatusUp:
// send a status update the newly added peer
r.blockchainCh.Out() <- p2p.Envelope{
To: peerUpdate.PeerID,
To: peerUpdate.NodeID,
Message: &bcproto.StatusResponse{
Base: r.store.Base(),
Height: r.store.Height(),
},
}
case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned:
r.pool.RemovePeer(peerUpdate.PeerID)
case p2p.PeerStatusDown:
r.pool.RemovePeer(peerUpdate.NodeID)
}
}
@ -384,9 +383,8 @@ func (r *Reactor) requestRoutine() {
case pErr := <-r.errorsCh:
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: pErr.peerID,
Err: pErr.err,
Severity: p2p.PeerErrorSeverityLow,
NodeID: pErr.peerID,
Err: pErr.err,
}
case <-statusUpdateTicker.C:
@ -525,17 +523,15 @@ FOR_LOOP:
// to clean up the rest.
peerID := r.pool.RedoRequest(first.Height)
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: peerID,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
NodeID: peerID,
Err: err,
}
peerID2 := r.pool.RedoRequest(second.Height)
if peerID2 != peerID {
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: peerID2,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
NodeID: peerID2,
Err: err,
}
}


+ 8
- 8
blockchain/v0/reactor_test.go View File

@ -36,7 +36,7 @@ type reactorTestSuite struct {
blockchainPeerErrCh chan p2p.PeerError
peerUpdatesCh chan p2p.PeerUpdate
peerUpdates *p2p.PeerUpdatesCh
peerUpdates *p2p.PeerUpdates
}
func setup(
@ -200,8 +200,8 @@ func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite, dropC
primary.reactor.Logger.Debug("dropped peer error", "err", pErr.Err)
} else {
primary.peerUpdatesCh <- p2p.PeerUpdate{
PeerID: pErr.PeerID,
Status: p2p.PeerStatusRemoved,
NodeID: pErr.NodeID,
Status: p2p.PeerStatusDown,
}
}
}
@ -229,7 +229,7 @@ func TestReactor_AbruptDisconnect(t *testing.T) {
if s.peerID != ss.peerID {
s.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: ss.peerID,
NodeID: ss.peerID,
}
}
}
@ -251,7 +251,7 @@ func TestReactor_AbruptDisconnect(t *testing.T) {
// deadlocks or race conditions within the context of poolRoutine.
testSuites[1].peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusDown,
PeerID: testSuites[0].peerID,
NodeID: testSuites[0].peerID,
}
}
@ -276,7 +276,7 @@ func TestReactor_NoBlockResponse(t *testing.T) {
if s.peerID != ss.peerID {
s.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: ss.peerID,
NodeID: ss.peerID,
}
}
}
@ -341,7 +341,7 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) {
if s.peerID != ss.peerID {
s.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: ss.peerID,
NodeID: ss.peerID,
}
}
}
@ -388,7 +388,7 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) {
for _, s := range testSuites[:len(testSuites)-1] {
newSuite.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: s.peerID,
NodeID: s.peerID,
}
}


+ 10
- 11
evidence/reactor.go View File

@ -55,7 +55,7 @@ type Reactor struct {
evpool *Pool
eventBus *types.EventBus
evidenceCh *p2p.Channel
peerUpdates *p2p.PeerUpdatesCh
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
peerWG sync.WaitGroup
@ -70,7 +70,7 @@ type Reactor struct {
func NewReactor(
logger log.Logger,
evidenceCh *p2p.Channel,
peerUpdates *p2p.PeerUpdatesCh,
peerUpdates *p2p.PeerUpdates,
evpool *Pool,
) *Reactor {
r := &Reactor{
@ -196,9 +196,8 @@ func (r *Reactor) processEvidenceCh() {
if err := r.handleMessage(r.evidenceCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.evidenceCh.ID(), "envelope", envelope, "err", err)
r.evidenceCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
NodeID: envelope.From,
Err: err,
}
}
@ -221,7 +220,7 @@ func (r *Reactor) processEvidenceCh() {
//
// REF: https://github.com/tendermint/tendermint/issues/4727
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status)
r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
defer r.mtx.Unlock()
@ -240,21 +239,21 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
// a new done channel so we can explicitly close the goroutine if the peer
// is later removed, we increment the waitgroup so the reactor can stop
// safely, and finally start the goroutine to broadcast evidence to that peer.
_, ok := r.peerRoutines[peerUpdate.PeerID]
_, ok := r.peerRoutines[peerUpdate.NodeID]
if !ok {
closer := tmsync.NewCloser()
r.peerRoutines[peerUpdate.PeerID] = closer
r.peerRoutines[peerUpdate.NodeID] = closer
r.peerWG.Add(1)
go r.broadcastEvidenceLoop(peerUpdate.PeerID, closer)
go r.broadcastEvidenceLoop(peerUpdate.NodeID, closer)
}
case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned:
case p2p.PeerStatusDown:
// Check if we've started an evidence broadcasting goroutine for this peer.
// If we have, we signal to terminate the goroutine via the channel's closure.
// This will internally decrement the peer waitgroup and remove the peer
// from the map of peer evidence broadcasting goroutines.
closer, ok := r.peerRoutines[peerUpdate.PeerID]
closer, ok := r.peerRoutines[peerUpdate.NodeID]
if ok {
closer.Close()
}


+ 11
- 11
evidence/reactor_test.go View File

@ -42,7 +42,7 @@ type reactorTestSuite struct {
evidencePeerErrCh chan p2p.PeerError
peerUpdatesCh chan p2p.PeerUpdate
peerUpdates *p2p.PeerUpdatesCh
peerUpdates *p2p.PeerUpdates
}
func setup(t *testing.T, logger log.Logger, pool *evidence.Pool, chBuf uint) *reactorTestSuite {
@ -224,18 +224,18 @@ func TestReactorMultiDisconnect(t *testing.T) {
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: secondary.peerID,
NodeID: secondary.peerID,
}
// Ensure "disconnecting" the secondary peer from the primary more than once
// is handled gracefully.
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusDown,
PeerID: secondary.peerID,
NodeID: secondary.peerID,
}
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusDown,
PeerID: secondary.peerID,
NodeID: secondary.peerID,
}
}
@ -276,7 +276,7 @@ func TestReactorBroadcastEvidence(t *testing.T) {
for _, suite := range secondaries {
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: suite.peerID,
NodeID: suite.peerID,
}
}
@ -327,7 +327,7 @@ func TestReactorBroadcastEvidence_Lagging(t *testing.T) {
for _, suite := range secondaries {
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: suite.peerID,
NodeID: suite.peerID,
}
}
@ -378,7 +378,7 @@ func TestReactorBroadcastEvidence_Pending(t *testing.T) {
// add the secondary reactor as a peer to the primary reactor
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: secondary.peerID,
NodeID: secondary.peerID,
}
// The secondary reactor should have received all the evidence ignoring the
@ -438,7 +438,7 @@ func TestReactorBroadcastEvidence_Committed(t *testing.T) {
// add the secondary reactor as a peer to the primary reactor
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: secondary.peerID,
NodeID: secondary.peerID,
}
// The secondary reactor should have received all the evidence ignoring the
@ -487,7 +487,7 @@ func TestReactorBroadcastEvidence_FullyConnected(t *testing.T) {
if suiteI.peerID != suiteJ.peerID {
suiteI.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: suiteJ.peerID,
NodeID: suiteJ.peerID,
}
}
}
@ -530,7 +530,7 @@ func TestReactorBroadcastEvidence_RemovePeer(t *testing.T) {
// add the secondary reactor as a peer to the primary reactor
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: secondary.peerID,
NodeID: secondary.peerID,
}
// have the secondary reactor receive only half the evidence
@ -539,7 +539,7 @@ func TestReactorBroadcastEvidence_RemovePeer(t *testing.T) {
// disconnect the peer
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusDown,
PeerID: secondary.peerID,
NodeID: secondary.peerID,
}
// Ensure the secondary only received half of the evidence before being


+ 30
- 0
libs/sync/waker.go View File

@ -0,0 +1,30 @@
package sync
// Waker is used to wake up a sleeper when some event occurs. It debounces
// multiple wakeup calls occurring between each sleep, and wakeups are
// non-blocking to avoid having to coordinate goroutines.
type Waker struct {
wakeCh chan struct{}
}
// NewWaker creates a new Waker.
func NewWaker() *Waker {
return &Waker{
wakeCh: make(chan struct{}, 1), // buffer used for debouncing
}
}
// Sleep returns a channel that blocks until Wake() is called.
func (w *Waker) Sleep() <-chan struct{} {
return w.wakeCh
}
// Wake wakes up the sleeper.
func (w *Waker) Wake() {
// A non-blocking send with a size 1 buffer ensures that we never block, and
// that we queue up at most a single wakeup call between each Sleep().
select {
case w.wakeCh <- struct{}{}:
default:
}
}

+ 47
- 0
libs/sync/waker_test.go View File

@ -0,0 +1,47 @@
package sync_test
import (
"testing"
"github.com/stretchr/testify/require"
tmsync "github.com/tendermint/tendermint/libs/sync"
)
func TestWaker(t *testing.T) {
// A new waker should block when sleeping.
waker := tmsync.NewWaker()
select {
case <-waker.Sleep():
require.Fail(t, "unexpected wakeup")
default:
}
// Wakeups should not block, and should cause the next sleeper to awaken.
waker.Wake()
select {
case <-waker.Sleep():
default:
require.Fail(t, "expected wakeup, but sleeping instead")
}
// Multiple wakeups should only wake a single sleeper.
waker.Wake()
waker.Wake()
waker.Wake()
select {
case <-waker.Sleep():
default:
require.Fail(t, "expected wakeup, but sleeping instead")
}
select {
case <-waker.Sleep():
require.Fail(t, "unexpected wakeup")
default:
}
}

+ 12
- 13
mempool/reactor.go View File

@ -58,7 +58,7 @@ type Reactor struct {
peerMgr PeerManager
mempoolCh *p2p.Channel
peerUpdates *p2p.PeerUpdatesCh
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
// peerWG is used to coordinate graceful termination of all peer broadcasting
@ -76,7 +76,7 @@ func NewReactor(
peerMgr PeerManager,
mempool *CListMempool,
mempoolCh *p2p.Channel,
peerUpdates *p2p.PeerUpdatesCh,
peerUpdates *p2p.PeerUpdates,
) *Reactor {
r := &Reactor{
@ -225,9 +225,8 @@ func (r *Reactor) processMempoolCh() {
if err := r.handleMessage(r.mempoolCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.mempoolCh.ID(), "envelope", envelope, "err", err)
r.mempoolCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
NodeID: envelope.From,
Err: err,
}
}
@ -244,7 +243,7 @@ func (r *Reactor) processMempoolCh() {
// removed peers, we remove the peer from the mempool peer ID set and signal to
// stop the tx broadcasting goroutine.
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status)
r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
defer r.mtx.Unlock()
@ -264,28 +263,28 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
// a new done channel so we can explicitly close the goroutine if the peer
// is later removed, we increment the waitgroup so the reactor can stop
// safely, and finally start the goroutine to broadcast txs to that peer.
_, ok := r.peerRoutines[peerUpdate.PeerID]
_, ok := r.peerRoutines[peerUpdate.NodeID]
if !ok {
closer := tmsync.NewCloser()
r.peerRoutines[peerUpdate.PeerID] = closer
r.peerRoutines[peerUpdate.NodeID] = closer
r.peerWG.Add(1)
r.ids.ReserveForPeer(peerUpdate.PeerID)
r.ids.ReserveForPeer(peerUpdate.NodeID)
// start a broadcast routine ensuring all txs are forwarded to the peer
go r.broadcastTxRoutine(peerUpdate.PeerID, closer)
go r.broadcastTxRoutine(peerUpdate.NodeID, closer)
}
}
case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned:
r.ids.Reclaim(peerUpdate.PeerID)
case p2p.PeerStatusDown:
r.ids.Reclaim(peerUpdate.NodeID)
// Check if we've started a tx broadcasting goroutine for this peer.
// If we have, we signal to terminate the goroutine via the channel's closure.
// This will internally decrement the peer waitgroup and remove the peer
// from the map of peer tx broadcasting goroutines.
closer, ok := r.peerRoutines[peerUpdate.PeerID]
closer, ok := r.peerRoutines[peerUpdate.NodeID]
if ok {
closer.Close()
}


+ 7
- 7
mempool/reactor_test.go View File

@ -33,7 +33,7 @@ type reactorTestSuite struct {
mempoolPeerErrCh chan p2p.PeerError
peerUpdatesCh chan p2p.PeerUpdate
peerUpdates *p2p.PeerUpdatesCh
peerUpdates *p2p.PeerUpdates
}
func setup(t *testing.T, cfg *cfg.MempoolConfig, logger log.Logger, chBuf uint) *reactorTestSuite {
@ -189,7 +189,7 @@ func TestReactorBroadcastTxs(t *testing.T) {
for _, suite := range secondaries {
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: suite.peerID,
NodeID: suite.peerID,
}
}
@ -295,7 +295,7 @@ func TestReactorNoBroadcastToSender(t *testing.T) {
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: secondary.peerID,
NodeID: secondary.peerID,
}
time.Sleep(100 * time.Millisecond)
@ -360,7 +360,7 @@ func TestReactor_MaxTxBytes(t *testing.T) {
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: secondary.peerID,
NodeID: secondary.peerID,
}
// Wait till all secondary suites (reactor) received all mempool txs from the
@ -406,7 +406,7 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
for i := 0; i < maxActiveIDs+1; i++ {
reactor.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: peerID,
NodeID: peerID,
}
reactor.mempoolOutCh <- p2p.Envelope{
To: peerID,
@ -466,12 +466,12 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
// connect peer
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: secondary.peerID,
NodeID: secondary.peerID,
}
// disconnect peer
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusDown,
PeerID: secondary.peerID,
NodeID: secondary.peerID,
}
}

+ 1
- 2
p2p/address.go View File

@ -106,10 +106,9 @@ func ParseNodeAddress(urlString string) (NodeAddress, error) {
Protocol: Protocol(strings.ToLower(url.Scheme)),
}
// Opaque URLs are expected to contain only a node ID, also used as path.
// Opaque URLs are expected to contain only a node ID.
if url.Opaque != "" {
address.NodeID = NodeID(url.Opaque)
address.Path = url.Opaque
return address, address.Validate()
}


+ 1
- 1
p2p/address_test.go View File

@ -158,7 +158,7 @@ func TestParseNodeAddress(t *testing.T) {
},
{
"memory:" + user,
p2p.NodeAddress{Protocol: "memory", NodeID: id, Path: user},
p2p.NodeAddress{Protocol: "memory", NodeID: id},
true,
},


+ 15
- 0
p2p/channel.go View File

@ -27,6 +27,21 @@ func (e Envelope) Strip() Envelope {
return e
}
// PeerError is a peer error reported via the Error channel.
//
// FIXME: This currently just disconnects the peer, which is too simplistic.
// For example, some errors should be logged, some should cause disconnects,
// and some should ban the peer.
//
// FIXME: This should probably be replaced by a more general PeerBehavior
// concept that can mark good and bad behavior and contributes to peer scoring.
// It should possibly also allow reactors to request explicit actions, e.g.
// disconnection or banning, in addition to doing this based on aggregates.
type PeerError struct {
NodeID NodeID
Err error
}
// Channel is a bidirectional channel for Protobuf message exchange with peers.
// A Channel is safe for concurrent use by multiple goroutines.
type Channel struct {


+ 0
- 1157
p2p/peer.go
File diff suppressed because it is too large
View File


+ 1275
- 0
p2p/peermanager.go
File diff suppressed because it is too large
View File


+ 1580
- 0
p2p/peermanager_test.go
File diff suppressed because it is too large
View File


+ 6
- 7
p2p/pex/reactor.go View File

@ -30,7 +30,7 @@ type ReactorV2 struct {
peerManager *p2p.PeerManager
pexCh *p2p.Channel
peerUpdates *p2p.PeerUpdatesCh
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
}
@ -39,7 +39,7 @@ func NewReactorV2(
logger log.Logger,
peerManager *p2p.PeerManager,
pexCh *p2p.Channel,
peerUpdates *p2p.PeerUpdatesCh,
peerUpdates *p2p.PeerUpdates,
) *ReactorV2 {
r := &ReactorV2{
peerManager: peerManager,
@ -181,9 +181,8 @@ func (r *ReactorV2) processPexCh() {
if err := r.handleMessage(r.pexCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.pexCh.ID(), "envelope", envelope, "err", err)
r.pexCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
NodeID: envelope.From,
Err: err,
}
}
@ -197,11 +196,11 @@ func (r *ReactorV2) processPexCh() {
// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we
// send a request for addresses.
func (r *ReactorV2) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status)
r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
if peerUpdate.Status == p2p.PeerStatusUp {
r.pexCh.Out() <- p2p.Envelope{
To: peerUpdate.PeerID,
To: peerUpdate.NodeID,
Message: &protop2p.PexRequest{},
}
}


+ 19
- 13
p2p/router.go View File

@ -255,13 +255,10 @@ func (r *Router) routeChannel(channel *Channel) {
if !ok {
return
}
// FIXME: We just disconnect the peer for now
r.logger.Error("peer error, disconnecting", "peer", peerError.PeerID, "err", peerError.Err)
r.peerMtx.RLock()
peerQueue, ok := r.peerQueues[peerError.PeerID]
r.peerMtx.RUnlock()
if ok {
peerQueue.close()
// FIXME: We just evict the peer for now.
r.logger.Error("peer error, evicting", "peer", peerError.NodeID, "err", peerError.Err)
if err := r.peerManager.Errored(peerError.NodeID, peerError.Err); err != nil {
r.logger.Error("failed to report peer error", "peer", peerError.NodeID, "err", err)
}
case <-channel.Done():
@ -338,7 +335,6 @@ func (r *Router) acceptPeers(transport Transport) {
r.peerMtx.Lock()
r.peerQueues[peerInfo.NodeID] = queue
r.peerMtx.Unlock()
r.peerManager.Ready(peerInfo.NodeID)
defer func() {
r.peerMtx.Lock()
@ -350,6 +346,11 @@ func (r *Router) acceptPeers(transport Transport) {
}
}()
if err := r.peerManager.Ready(peerInfo.NodeID); err != nil {
r.logger.Error("failed to mark peer as ready", "peer", peerInfo.NodeID, "err", err)
return
}
r.routePeer(peerInfo.NodeID, conn, queue)
}()
}
@ -359,7 +360,7 @@ func (r *Router) acceptPeers(transport Transport) {
func (r *Router) dialPeers() {
ctx := r.stopCtx()
for {
peerID, address, err := r.peerManager.DialNext(ctx)
address, err := r.peerManager.DialNext(ctx)
switch err {
case nil:
case context.Canceled:
@ -371,12 +372,13 @@ func (r *Router) dialPeers() {
}
go func() {
peerID := address.NodeID
conn, err := r.dialPeer(ctx, address)
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
r.logger.Error("failed to dial peer", "peer", peerID, "err", err)
if err = r.peerManager.DialFailed(peerID, address); err != nil {
if err = r.peerManager.DialFailed(address); err != nil {
r.logger.Error("failed to report dial failure", "peer", peerID, "err", err)
}
return
@ -388,13 +390,13 @@ func (r *Router) dialPeers() {
return
} else if err != nil {
r.logger.Error("failed to handshake with peer", "peer", peerID, "err", err)
if err = r.peerManager.DialFailed(peerID, address); err != nil {
if err = r.peerManager.DialFailed(address); err != nil {
r.logger.Error("failed to report dial failure", "peer", peerID, "err", err)
}
return
}
if err = r.peerManager.Dialed(peerID, address); err != nil {
if err = r.peerManager.Dialed(address); err != nil {
r.logger.Error("failed to dial peer", "peer", peerID, "err", err)
return
}
@ -403,7 +405,6 @@ func (r *Router) dialPeers() {
r.peerMtx.Lock()
r.peerQueues[peerID] = queue
r.peerMtx.Unlock()
r.peerManager.Ready(peerID)
defer func() {
r.peerMtx.Lock()
@ -415,6 +416,11 @@ func (r *Router) dialPeers() {
}
}()
if err := r.peerManager.Ready(peerID); err != nil {
r.logger.Error("failed to mark peer as ready", "peer", peerID, "err", err)
return
}
r.routePeer(peerID, conn, queue)
}()
}


+ 6
- 7
p2p/router_test.go View File

@ -115,9 +115,9 @@ func TestRouter(t *testing.T) {
// Wait for peers to come online, and ping them as they do.
for i := 0; i < len(peers); i++ {
peerUpdate := <-peerUpdates.Updates()
peerID := peerUpdate.PeerID
peerID := peerUpdate.NodeID
require.Equal(t, p2p.PeerUpdate{
PeerID: peerID,
NodeID: peerID,
Status: p2p.PeerStatusUp,
}, peerUpdate)
@ -140,13 +140,12 @@ func TestRouter(t *testing.T) {
// We then submit an error for a peer, and watch it get disconnected.
channel.Error() <- p2p.PeerError{
PeerID: peers[0].NodeID,
Err: errors.New("test error"),
Severity: p2p.PeerErrorSeverityCritical,
NodeID: peers[0].NodeID,
Err: errors.New("test error"),
}
peerUpdate := <-peerUpdates.Updates()
require.Equal(t, p2p.PeerUpdate{
PeerID: peers[0].NodeID,
NodeID: peers[0].NodeID,
Status: p2p.PeerStatusDown,
}, peerUpdate)
@ -154,7 +153,7 @@ func TestRouter(t *testing.T) {
// for that to happen.
peerUpdate = <-peerUpdates.Updates()
require.Equal(t, p2p.PeerUpdate{
PeerID: peers[0].NodeID,
NodeID: peers[0].NodeID,
Status: p2p.PeerStatusUp,
}, peerUpdate)
}

+ 6
- 6
p2p/shim.go View File

@ -29,7 +29,7 @@ type (
BaseReactor
Name string
PeerUpdates *PeerUpdatesCh
PeerUpdates *PeerUpdates
Channels map[ChannelID]*ChannelShim
}
@ -162,10 +162,10 @@ func (rs *ReactorShim) handlePeerErrors() {
for _, cs := range rs.Channels {
go func(cs *ChannelShim) {
for pErr := range cs.Channel.errCh {
if pErr.PeerID != "" {
peer := rs.Switch.peers.Get(pErr.PeerID)
if pErr.NodeID != "" {
peer := rs.Switch.peers.Get(pErr.NodeID)
if peer == nil {
rs.Logger.Error("failed to handle peer error; failed to find peer", "peer", pErr.PeerID)
rs.Logger.Error("failed to handle peer error; failed to find peer", "peer", pErr.NodeID)
continue
}
@ -225,7 +225,7 @@ func (rs *ReactorShim) GetChannels() []*ChannelDescriptor {
// handle adding a peer.
func (rs *ReactorShim) AddPeer(peer Peer) {
select {
case rs.PeerUpdates.updatesCh <- PeerUpdate{PeerID: peer.ID(), Status: PeerStatusUp}:
case rs.PeerUpdates.updatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusUp}:
rs.Logger.Debug("sent peer update", "reactor", rs.Name, "peer", peer.ID(), "status", PeerStatusUp)
case <-rs.PeerUpdates.Done():
@ -244,7 +244,7 @@ func (rs *ReactorShim) AddPeer(peer Peer) {
// handle removing a peer.
func (rs *ReactorShim) RemovePeer(peer Peer, reason interface{}) {
select {
case rs.PeerUpdates.updatesCh <- PeerUpdate{PeerID: peer.ID(), Status: PeerStatusDown}:
case rs.PeerUpdates.updatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusDown}:
rs.Logger.Debug(
"sent peer update",
"reactor", rs.Name,


+ 2
- 2
p2p/shim_test.go View File

@ -123,7 +123,7 @@ func TestReactorShim_AddPeer(t *testing.T) {
rts.shim.AddPeer(peerA)
wg.Wait()
require.Equal(t, peerIDA, peerUpdate.PeerID)
require.Equal(t, peerIDA, peerUpdate.NodeID)
require.Equal(t, p2p.PeerStatusUp, peerUpdate.Status)
}
@ -143,7 +143,7 @@ func TestReactorShim_RemovePeer(t *testing.T) {
rts.shim.RemovePeer(peerA, "test reason")
wg.Wait()
require.Equal(t, peerIDA, peerUpdate.PeerID)
require.Equal(t, peerIDA, peerUpdate.NodeID)
require.Equal(t, p2p.PeerStatusDown, peerUpdate.Status)
}


+ 11
- 13
statesync/reactor.go View File

@ -78,7 +78,7 @@ type Reactor struct {
tempDir string
snapshotCh *p2p.Channel
chunkCh *p2p.Channel
peerUpdates *p2p.PeerUpdatesCh
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
// This will only be set when a state sync is in progress. It is used to feed
@ -96,7 +96,7 @@ func NewReactor(
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
snapshotCh, chunkCh *p2p.Channel,
peerUpdates *p2p.PeerUpdatesCh,
peerUpdates *p2p.PeerUpdates,
tempDir string,
) *Reactor {
r := &Reactor{
@ -347,9 +347,8 @@ func (r *Reactor) processSnapshotCh() {
if err := r.handleMessage(r.snapshotCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.snapshotCh.ID(), "envelope", envelope, "err", err)
r.snapshotCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
NodeID: envelope.From,
Err: err,
}
}
@ -374,9 +373,8 @@ func (r *Reactor) processChunkCh() {
if err := r.handleMessage(r.chunkCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.chunkCh.ID(), "envelope", envelope, "err", err)
r.chunkCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
NodeID: envelope.From,
Err: err,
}
}
@ -390,18 +388,18 @@ func (r *Reactor) processChunkCh() {
// processPeerUpdate processes a PeerUpdate, returning an error upon failing to
// handle the PeerUpdate or if a panic is recovered.
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status)
r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.syncer != nil {
switch peerUpdate.Status {
case p2p.PeerStatusNew, p2p.PeerStatusUp:
r.syncer.AddPeer(peerUpdate.PeerID)
case p2p.PeerStatusUp:
r.syncer.AddPeer(peerUpdate.NodeID)
case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned:
r.syncer.RemovePeer(peerUpdate.PeerID)
case p2p.PeerStatusDown:
r.syncer.RemovePeer(peerUpdate.NodeID)
}
}
}


+ 3
- 3
statesync/reactor_test.go View File

@ -33,7 +33,7 @@ type reactorTestSuite struct {
chunkOutCh chan p2p.Envelope
chunkPeerErrCh chan p2p.PeerError
peerUpdates *p2p.PeerUpdatesCh
peerUpdates *p2p.PeerUpdates
}
func setup(
@ -127,7 +127,7 @@ func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
require.Error(t, response.Err)
require.Empty(t, rts.chunkOutCh)
require.Contains(t, response.Err.Error(), "received unknown message")
require.Equal(t, p2p.NodeID("aa"), response.PeerID)
require.Equal(t, p2p.NodeID("aa"), response.NodeID)
}
func TestReactor_ChunkRequest(t *testing.T) {
@ -198,7 +198,7 @@ func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) {
require.Error(t, response.Err)
require.Empty(t, rts.snapshotOutCh)
require.Contains(t, response.Err.Error(), "received unknown message")
require.Equal(t, p2p.NodeID("aa"), response.PeerID)
require.Equal(t, p2p.NodeID("aa"), response.NodeID)
}
func TestReactor_SnapshotsRequest(t *testing.T) {


Loading…
Cancel
Save