Browse Source

p2p: migrate to use new interface for channel errors (#7403)

* p2p: migrate to use new interface for channel errors

* Update internal/p2p/p2ptest/require.go

Co-authored-by: M. J. Fromberger <michael.j.fromberger@gmail.com>

* rename

* feedback

Co-authored-by: M. J. Fromberger <michael.j.fromberger@gmail.com>
pull/7410/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
cb88bd3941
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 61 additions and 31 deletions
  1. +12
    -4
      internal/blocksync/reactor.go
  2. +12
    -4
      internal/consensus/reactor.go
  3. +3
    -1
      internal/evidence/reactor.go
  4. +3
    -1
      internal/mempool/reactor.go
  5. +3
    -3
      internal/p2p/channel.go
  6. +1
    -1
      internal/p2p/channel_test.go
  7. +10
    -6
      internal/p2p/p2ptest/require.go
  8. +3
    -1
      internal/p2p/pex/reactor.go
  9. +3
    -3
      internal/p2p/router_test.go
  10. +11
    -7
      internal/statesync/reactor.go

+ 12
- 4
internal/blocksync/reactor.go View File

@ -292,9 +292,11 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) {
case envelope := <-r.blockSyncCh.In:
if err := r.handleMessage(r.blockSyncCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.blockSyncCh.ID, "envelope", envelope, "err", err)
r.blockSyncCh.Error <- p2p.PeerError{
if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
}
case envelope := <-r.blockSyncOutBridgeCh:
@ -381,9 +383,11 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
Message: &bcproto.BlockRequest{Height: request.Height},
}
case pErr := <-r.errorsCh:
r.blockSyncCh.Error <- p2p.PeerError{
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: pErr.peerID,
Err: pErr.err,
}); err != nil {
return
}
case <-statusUpdateTicker.C:
r.poolWG.Add(1)
@ -523,16 +527,20 @@ FOR_LOOP:
// NOTE: We've already removed the peer's request, but we still need
// to clean up the rest.
peerID := r.pool.RedoRequest(first.Height)
r.blockSyncCh.Error <- p2p.PeerError{
if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerID,
Err: err,
}); serr != nil {
break FOR_LOOP
}
peerID2 := r.pool.RedoRequest(second.Height)
if peerID2 != peerID {
r.blockSyncCh.Error <- p2p.PeerError{
if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerID2,
Err: err,
}); serr != nil {
break FOR_LOOP
}
}


+ 12
- 4
internal/consensus/reactor.go View File

@ -1322,9 +1322,11 @@ func (r *Reactor) processStateCh(ctx context.Context) {
case envelope := <-r.stateCh.In:
if err := r.handleMessage(r.stateCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.stateCh.ID, "envelope", envelope, "err", err)
r.stateCh.Error <- p2p.PeerError{
if serr := r.stateCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
}
}
@ -1345,9 +1347,11 @@ func (r *Reactor) processDataCh(ctx context.Context) {
case envelope := <-r.dataCh.In:
if err := r.handleMessage(r.dataCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.dataCh.ID, "envelope", envelope, "err", err)
r.dataCh.Error <- p2p.PeerError{
if serr := r.dataCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
}
}
@ -1368,9 +1372,11 @@ func (r *Reactor) processVoteCh(ctx context.Context) {
case envelope := <-r.voteCh.In:
if err := r.handleMessage(r.voteCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.voteCh.ID, "envelope", envelope, "err", err)
r.voteCh.Error <- p2p.PeerError{
if serr := r.voteCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
}
}
@ -1391,9 +1397,11 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context) {
case envelope := <-r.voteSetBitsCh.In:
if err := r.handleMessage(r.voteSetBitsCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.voteSetBitsCh.ID, "envelope", envelope, "err", err)
r.voteSetBitsCh.Error <- p2p.PeerError{
if serr := r.voteSetBitsCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
}
}


+ 3
- 1
internal/evidence/reactor.go View File

@ -182,9 +182,11 @@ func (r *Reactor) processEvidenceCh(ctx context.Context) {
case envelope := <-r.evidenceCh.In:
if err := r.handleMessage(r.evidenceCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.evidenceCh.ID, "envelope", envelope, "err", err)
r.evidenceCh.Error <- p2p.PeerError{
if serr := r.evidenceCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
}
}


+ 3
- 1
internal/mempool/reactor.go View File

@ -205,9 +205,11 @@ func (r *Reactor) processMempoolCh(ctx context.Context) {
case envelope := <-r.mempoolCh.In:
if err := r.handleMessage(ctx, r.mempoolCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.mempoolCh.ID, "envelope", envelope, "err", err)
r.mempoolCh.Error <- p2p.PeerError{
if serr := r.mempoolCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
}
case <-ctx.Done():


+ 3
- 3
internal/p2p/channel.go View File

@ -64,7 +64,7 @@ type Channel struct {
ID ChannelID
In <-chan Envelope // inbound messages (peers to reactors)
Out chan<- Envelope // outbound messages (reactors to peers)
Error chan<- PeerError // peer error reporting
errCh chan<- PeerError // peer error reporting
messageType proto.Message // the channel's message type, used for unmarshaling
}
@ -83,7 +83,7 @@ func NewChannel(
messageType: messageType,
In: inCh,
Out: outCh,
Error: errCh,
errCh: errCh,
}
}
@ -104,7 +104,7 @@ func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
select {
case <-ctx.Done():
return ctx.Err()
case ch.Error <- pe:
case ch.errCh <- pe:
return nil
}
}


+ 1
- 1
internal/p2p/channel_test.go View File

@ -25,7 +25,7 @@ func testChannel(size int) (*channelInternal, *Channel) {
ch := &Channel{
In: in.In,
Out: in.Out,
Error: in.Error,
errCh: in.Error,
}
return in, ch
}


+ 10
- 6
internal/p2p/p2ptest/require.go View File

@ -2,6 +2,7 @@ package p2ptest
import (
"context"
"errors"
"testing"
"time"
@ -100,13 +101,16 @@ func RequireNoUpdates(ctx context.Context, t *testing.T, peerUpdates *p2p.PeerUp
}
// RequireError requires that the given peer error is submitted for a peer.
func RequireError(t *testing.T, channel *p2p.Channel, peerError p2p.PeerError) {
timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks
defer timer.Stop()
select {
case channel.Error <- peerError:
case <-timer.C:
func RequireError(ctx context.Context, t *testing.T, channel *p2p.Channel, peerError p2p.PeerError) {
tctx, tcancel := context.WithTimeout(ctx, time.Second)
defer tcancel()
err := channel.SendError(tctx, peerError)
switch {
case errors.Is(err, context.DeadlineExceeded):
require.Fail(t, "timed out reporting error", "%v on %v", peerError, channel.ID)
default:
require.NoError(t, err, "unexpected error")
}
}


+ 3
- 1
internal/p2p/pex/reactor.go View File

@ -172,9 +172,11 @@ func (r *Reactor) processPexCh(ctx context.Context) {
case envelope := <-r.pexCh.In:
if err := r.handleMessage(r.pexCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err)
r.pexCh.Error <- p2p.PeerError{
if serr := r.pexCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
}
}


+ 3
- 3
internal/p2p/router_test.go View File

@ -87,10 +87,10 @@ func TestRouter_Network(t *testing.T) {
// We then submit an error for a peer, and watch it get disconnected and
// then reconnected as the router retries it.
peerUpdates := local.MakePeerUpdatesNoRequireEmpty(ctx, t)
channel.Error <- p2p.PeerError{
require.NoError(t, channel.SendError(ctx, p2p.PeerError{
NodeID: peers[0].NodeID,
Err: errors.New("boom"),
}
}))
p2ptest.RequireUpdates(t, peerUpdates, []p2p.PeerUpdate{
{NodeID: peers[0].NodeID, Status: p2p.PeerStatusDown},
{NodeID: peers[0].NodeID, Status: p2p.PeerStatusUp},
@ -345,7 +345,7 @@ func TestRouter_Channel_Error(t *testing.T) {
// Erroring b should cause it to be disconnected. It will reconnect shortly after.
sub := network.Nodes[aID].MakePeerUpdates(ctx, t)
p2ptest.RequireError(t, a, p2p.PeerError{NodeID: bID, Err: errors.New("boom")})
p2ptest.RequireError(ctx, t, a, p2p.PeerError{NodeID: bID, Err: errors.New("boom")})
p2ptest.RequireUpdates(t, sub, []p2p.PeerUpdate{
{NodeID: bID, Status: p2p.PeerStatusDown},
{NodeID: bID, Status: p2p.PeerStatusUp},


+ 11
- 7
internal/statesync/reactor.go View File

@ -410,9 +410,11 @@ func (r *Reactor) backfill(
r.logger.Info("backfill: fetched light block failed validate basic, removing peer...",
"err", err, "height", height)
queue.retry(height)
r.blockCh.Error <- p2p.PeerError{
if serr := r.blockCh.SendError(ctx, p2p.PeerError{
NodeID: peer,
Err: fmt.Errorf("received invalid light block: %w", err),
}); serr != nil {
return
}
continue
}
@ -445,25 +447,25 @@ func (r *Reactor) backfill(
if w, g := trustedBlockID.Hash, resp.block.Hash(); !bytes.Equal(w, g) {
r.logger.Info("received invalid light block. header hash doesn't match trusted LastBlockID",
"trustedHash", w, "receivedHash", g, "height", resp.block.Height)
r.blockCh.Error <- p2p.PeerError{
if err := r.blockCh.SendError(ctx, p2p.PeerError{
NodeID: resp.peer,
Err: fmt.Errorf("received invalid light block. Expected hash %v, got: %v", w, g),
}); err != nil {
return nil
}
queue.retry(resp.block.Height)
continue
}
// save the signed headers
err := r.blockStore.SaveSignedHeader(resp.block.SignedHeader, trustedBlockID)
if err != nil {
if err := r.blockStore.SaveSignedHeader(resp.block.SignedHeader, trustedBlockID); err != nil {
return err
}
// check if there has been a change in the validator set
if lastValidatorSet != nil && !bytes.Equal(resp.block.Header.ValidatorsHash, resp.block.Header.NextValidatorsHash) {
// save all the heights that the last validator set was the same
err = r.stateStore.SaveValidatorSets(resp.block.Height+1, lastChangeHeight, lastValidatorSet)
if err != nil {
if err := r.stateStore.SaveValidatorSets(resp.block.Height+1, lastChangeHeight, lastValidatorSet); err != nil {
return err
}
@ -810,9 +812,11 @@ func (r *Reactor) processCh(ctx context.Context, ch *p2p.Channel, chName string)
"channel", chName,
"ch_id", ch.ID,
"envelope", envelope)
ch.Error <- p2p.PeerError{
if serr := ch.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
}
}


Loading…
Cancel
Save