From cb88bd39414fe37add21e8d9019b85ebb7cbd40f Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 8 Dec 2021 09:05:01 -0500 Subject: [PATCH] p2p: migrate to use new interface for channel errors (#7403) * p2p: migrate to use new interface for channel errors * Update internal/p2p/p2ptest/require.go Co-authored-by: M. J. Fromberger * rename * feedback Co-authored-by: M. J. Fromberger --- internal/blocksync/reactor.go | 16 ++++++++++++---- internal/consensus/reactor.go | 16 ++++++++++++---- internal/evidence/reactor.go | 4 +++- internal/mempool/reactor.go | 4 +++- internal/p2p/channel.go | 6 +++--- internal/p2p/channel_test.go | 2 +- internal/p2p/p2ptest/require.go | 16 ++++++++++------ internal/p2p/pex/reactor.go | 4 +++- internal/p2p/router_test.go | 6 +++--- internal/statesync/reactor.go | 18 +++++++++++------- 10 files changed, 61 insertions(+), 31 deletions(-) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 6a5620f4e..2f93a3cf3 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -292,9 +292,11 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) { case envelope := <-r.blockSyncCh.In: if err := r.handleMessage(r.blockSyncCh.ID, envelope); err != nil { r.logger.Error("failed to process message", "ch_id", r.blockSyncCh.ID, "envelope", envelope, "err", err) - r.blockSyncCh.Error <- p2p.PeerError{ + if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, + }); serr != nil { + return } } case envelope := <-r.blockSyncOutBridgeCh: @@ -381,9 +383,11 @@ func (r *Reactor) requestRoutine(ctx context.Context) { Message: &bcproto.BlockRequest{Height: request.Height}, } case pErr := <-r.errorsCh: - r.blockSyncCh.Error <- p2p.PeerError{ + if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: pErr.peerID, Err: pErr.err, + }); err != nil { + return } case <-statusUpdateTicker.C: r.poolWG.Add(1) @@ -523,16 +527,20 @@ FOR_LOOP: // NOTE: We've already removed the peer's request, but we still need // to clean up the rest. peerID := r.pool.RedoRequest(first.Height) - r.blockSyncCh.Error <- p2p.PeerError{ + if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: peerID, Err: err, + }); serr != nil { + break FOR_LOOP } peerID2 := r.pool.RedoRequest(second.Height) if peerID2 != peerID { - r.blockSyncCh.Error <- p2p.PeerError{ + if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: peerID2, Err: err, + }); serr != nil { + break FOR_LOOP } } diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 8f5cdd0b1..5e2a6b535 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -1322,9 +1322,11 @@ func (r *Reactor) processStateCh(ctx context.Context) { case envelope := <-r.stateCh.In: if err := r.handleMessage(r.stateCh.ID, envelope); err != nil { r.logger.Error("failed to process message", "ch_id", r.stateCh.ID, "envelope", envelope, "err", err) - r.stateCh.Error <- p2p.PeerError{ + if serr := r.stateCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, + }); serr != nil { + return } } } @@ -1345,9 +1347,11 @@ func (r *Reactor) processDataCh(ctx context.Context) { case envelope := <-r.dataCh.In: if err := r.handleMessage(r.dataCh.ID, envelope); err != nil { r.logger.Error("failed to process message", "ch_id", r.dataCh.ID, "envelope", envelope, "err", err) - r.dataCh.Error <- p2p.PeerError{ + if serr := r.dataCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, + }); serr != nil { + return } } } @@ -1368,9 +1372,11 @@ func (r *Reactor) processVoteCh(ctx context.Context) { case envelope := <-r.voteCh.In: if err := r.handleMessage(r.voteCh.ID, envelope); err != nil { r.logger.Error("failed to process message", "ch_id", r.voteCh.ID, "envelope", envelope, "err", err) - r.voteCh.Error <- p2p.PeerError{ + if serr := r.voteCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, + }); serr != nil { + return } } } @@ -1391,9 +1397,11 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context) { case envelope := <-r.voteSetBitsCh.In: if err := r.handleMessage(r.voteSetBitsCh.ID, envelope); err != nil { r.logger.Error("failed to process message", "ch_id", r.voteSetBitsCh.ID, "envelope", envelope, "err", err) - r.voteSetBitsCh.Error <- p2p.PeerError{ + if serr := r.voteSetBitsCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, + }); serr != nil { + return } } } diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 31e927ba7..29712581c 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -182,9 +182,11 @@ func (r *Reactor) processEvidenceCh(ctx context.Context) { case envelope := <-r.evidenceCh.In: if err := r.handleMessage(r.evidenceCh.ID, envelope); err != nil { r.logger.Error("failed to process message", "ch_id", r.evidenceCh.ID, "envelope", envelope, "err", err) - r.evidenceCh.Error <- p2p.PeerError{ + if serr := r.evidenceCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, + }); serr != nil { + return } } } diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go index 43215f5f8..19d857614 100644 --- a/internal/mempool/reactor.go +++ b/internal/mempool/reactor.go @@ -205,9 +205,11 @@ func (r *Reactor) processMempoolCh(ctx context.Context) { case envelope := <-r.mempoolCh.In: if err := r.handleMessage(ctx, r.mempoolCh.ID, envelope); err != nil { r.logger.Error("failed to process message", "ch_id", r.mempoolCh.ID, "envelope", envelope, "err", err) - r.mempoolCh.Error <- p2p.PeerError{ + if serr := r.mempoolCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, + }); serr != nil { + return } } case <-ctx.Done(): diff --git a/internal/p2p/channel.go b/internal/p2p/channel.go index 2143589fd..9296ca15e 100644 --- a/internal/p2p/channel.go +++ b/internal/p2p/channel.go @@ -64,7 +64,7 @@ type Channel struct { ID ChannelID In <-chan Envelope // inbound messages (peers to reactors) Out chan<- Envelope // outbound messages (reactors to peers) - Error chan<- PeerError // peer error reporting + errCh chan<- PeerError // peer error reporting messageType proto.Message // the channel's message type, used for unmarshaling } @@ -83,7 +83,7 @@ func NewChannel( messageType: messageType, In: inCh, Out: outCh, - Error: errCh, + errCh: errCh, } } @@ -104,7 +104,7 @@ func (ch *Channel) SendError(ctx context.Context, pe PeerError) error { select { case <-ctx.Done(): return ctx.Err() - case ch.Error <- pe: + case ch.errCh <- pe: return nil } } diff --git a/internal/p2p/channel_test.go b/internal/p2p/channel_test.go index 0e2d7ea7c..4b2ce5937 100644 --- a/internal/p2p/channel_test.go +++ b/internal/p2p/channel_test.go @@ -25,7 +25,7 @@ func testChannel(size int) (*channelInternal, *Channel) { ch := &Channel{ In: in.In, Out: in.Out, - Error: in.Error, + errCh: in.Error, } return in, ch } diff --git a/internal/p2p/p2ptest/require.go b/internal/p2p/p2ptest/require.go index ce44a724c..b55d6a51f 100644 --- a/internal/p2p/p2ptest/require.go +++ b/internal/p2p/p2ptest/require.go @@ -2,6 +2,7 @@ package p2ptest import ( "context" + "errors" "testing" "time" @@ -100,13 +101,16 @@ func RequireNoUpdates(ctx context.Context, t *testing.T, peerUpdates *p2p.PeerUp } // RequireError requires that the given peer error is submitted for a peer. -func RequireError(t *testing.T, channel *p2p.Channel, peerError p2p.PeerError) { - timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks - defer timer.Stop() - select { - case channel.Error <- peerError: - case <-timer.C: +func RequireError(ctx context.Context, t *testing.T, channel *p2p.Channel, peerError p2p.PeerError) { + tctx, tcancel := context.WithTimeout(ctx, time.Second) + defer tcancel() + + err := channel.SendError(tctx, peerError) + switch { + case errors.Is(err, context.DeadlineExceeded): require.Fail(t, "timed out reporting error", "%v on %v", peerError, channel.ID) + default: + require.NoError(t, err, "unexpected error") } } diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 6970c6fef..24aeec05f 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -172,9 +172,11 @@ func (r *Reactor) processPexCh(ctx context.Context) { case envelope := <-r.pexCh.In: if err := r.handleMessage(r.pexCh.ID, envelope); err != nil { r.logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err) - r.pexCh.Error <- p2p.PeerError{ + if serr := r.pexCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, + }); serr != nil { + return } } } diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 1a59a0239..2974c1e88 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -87,10 +87,10 @@ func TestRouter_Network(t *testing.T) { // We then submit an error for a peer, and watch it get disconnected and // then reconnected as the router retries it. peerUpdates := local.MakePeerUpdatesNoRequireEmpty(ctx, t) - channel.Error <- p2p.PeerError{ + require.NoError(t, channel.SendError(ctx, p2p.PeerError{ NodeID: peers[0].NodeID, Err: errors.New("boom"), - } + })) p2ptest.RequireUpdates(t, peerUpdates, []p2p.PeerUpdate{ {NodeID: peers[0].NodeID, Status: p2p.PeerStatusDown}, {NodeID: peers[0].NodeID, Status: p2p.PeerStatusUp}, @@ -345,7 +345,7 @@ func TestRouter_Channel_Error(t *testing.T) { // Erroring b should cause it to be disconnected. It will reconnect shortly after. sub := network.Nodes[aID].MakePeerUpdates(ctx, t) - p2ptest.RequireError(t, a, p2p.PeerError{NodeID: bID, Err: errors.New("boom")}) + p2ptest.RequireError(ctx, t, a, p2p.PeerError{NodeID: bID, Err: errors.New("boom")}) p2ptest.RequireUpdates(t, sub, []p2p.PeerUpdate{ {NodeID: bID, Status: p2p.PeerStatusDown}, {NodeID: bID, Status: p2p.PeerStatusUp}, diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 8cac68891..61e3dec08 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -410,9 +410,11 @@ func (r *Reactor) backfill( r.logger.Info("backfill: fetched light block failed validate basic, removing peer...", "err", err, "height", height) queue.retry(height) - r.blockCh.Error <- p2p.PeerError{ + if serr := r.blockCh.SendError(ctx, p2p.PeerError{ NodeID: peer, Err: fmt.Errorf("received invalid light block: %w", err), + }); serr != nil { + return } continue } @@ -445,25 +447,25 @@ func (r *Reactor) backfill( if w, g := trustedBlockID.Hash, resp.block.Hash(); !bytes.Equal(w, g) { r.logger.Info("received invalid light block. header hash doesn't match trusted LastBlockID", "trustedHash", w, "receivedHash", g, "height", resp.block.Height) - r.blockCh.Error <- p2p.PeerError{ + if err := r.blockCh.SendError(ctx, p2p.PeerError{ NodeID: resp.peer, Err: fmt.Errorf("received invalid light block. Expected hash %v, got: %v", w, g), + }); err != nil { + return nil } queue.retry(resp.block.Height) continue } // save the signed headers - err := r.blockStore.SaveSignedHeader(resp.block.SignedHeader, trustedBlockID) - if err != nil { + if err := r.blockStore.SaveSignedHeader(resp.block.SignedHeader, trustedBlockID); err != nil { return err } // check if there has been a change in the validator set if lastValidatorSet != nil && !bytes.Equal(resp.block.Header.ValidatorsHash, resp.block.Header.NextValidatorsHash) { // save all the heights that the last validator set was the same - err = r.stateStore.SaveValidatorSets(resp.block.Height+1, lastChangeHeight, lastValidatorSet) - if err != nil { + if err := r.stateStore.SaveValidatorSets(resp.block.Height+1, lastChangeHeight, lastValidatorSet); err != nil { return err } @@ -810,9 +812,11 @@ func (r *Reactor) processCh(ctx context.Context, ch *p2p.Channel, chName string) "channel", chName, "ch_id", ch.ID, "envelope", envelope) - ch.Error <- p2p.PeerError{ + if serr := ch.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, + }); serr != nil { + return } } }