|
@ -2,7 +2,6 @@ package consensus |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
"context" |
|
|
"context" |
|
|
"errors" |
|
|
|
|
|
"fmt" |
|
|
"fmt" |
|
|
"runtime/debug" |
|
|
"runtime/debug" |
|
|
"sync" |
|
|
"sync" |
|
@ -1332,10 +1331,6 @@ func (r *Reactor) processStateCh(ctx context.Context) { |
|
|
for iter.Next(ctx) { |
|
|
for iter.Next(ctx) { |
|
|
envelope := iter.Envelope() |
|
|
envelope := iter.Envelope() |
|
|
if err := r.handleMessage(ctx, r.stateCh.ID, envelope); err != nil { |
|
|
if err := r.handleMessage(ctx, r.stateCh.ID, envelope); err != nil { |
|
|
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
r.logger.Error("failed to process message", "ch_id", r.stateCh.ID, "envelope", envelope, "err", err) |
|
|
r.logger.Error("failed to process message", "ch_id", r.stateCh.ID, "envelope", envelope, "err", err) |
|
|
if serr := r.stateCh.SendError(ctx, p2p.PeerError{ |
|
|
if serr := r.stateCh.SendError(ctx, p2p.PeerError{ |
|
|
NodeID: envelope.From, |
|
|
NodeID: envelope.From, |
|
@ -1357,10 +1352,6 @@ func (r *Reactor) processDataCh(ctx context.Context) { |
|
|
for iter.Next(ctx) { |
|
|
for iter.Next(ctx) { |
|
|
envelope := iter.Envelope() |
|
|
envelope := iter.Envelope() |
|
|
if err := r.handleMessage(ctx, r.dataCh.ID, envelope); err != nil { |
|
|
if err := r.handleMessage(ctx, r.dataCh.ID, envelope); err != nil { |
|
|
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
r.logger.Error("failed to process message", "ch_id", r.dataCh.ID, "envelope", envelope, "err", err) |
|
|
r.logger.Error("failed to process message", "ch_id", r.dataCh.ID, "envelope", envelope, "err", err) |
|
|
if serr := r.dataCh.SendError(ctx, p2p.PeerError{ |
|
|
if serr := r.dataCh.SendError(ctx, p2p.PeerError{ |
|
|
NodeID: envelope.From, |
|
|
NodeID: envelope.From, |
|
@ -1382,10 +1373,6 @@ func (r *Reactor) processVoteCh(ctx context.Context) { |
|
|
for iter.Next(ctx) { |
|
|
for iter.Next(ctx) { |
|
|
envelope := iter.Envelope() |
|
|
envelope := iter.Envelope() |
|
|
if err := r.handleMessage(ctx, r.voteCh.ID, envelope); err != nil { |
|
|
if err := r.handleMessage(ctx, r.voteCh.ID, envelope); err != nil { |
|
|
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
r.logger.Error("failed to process message", "ch_id", r.voteCh.ID, "envelope", envelope, "err", err) |
|
|
r.logger.Error("failed to process message", "ch_id", r.voteCh.ID, "envelope", envelope, "err", err) |
|
|
if serr := r.voteCh.SendError(ctx, p2p.PeerError{ |
|
|
if serr := r.voteCh.SendError(ctx, p2p.PeerError{ |
|
|
NodeID: envelope.From, |
|
|
NodeID: envelope.From, |
|
|