diff --git a/consensus/reactor.go b/consensus/reactor.go index 8318f2bb0..b57d85573 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -64,6 +64,11 @@ func (conR *ConsensusReactor) OnStart() error { return err } + err = conR.startPeerErrorsRoutine() + if err != nil { + return err + } + if !conR.FastSync() { err := conR.conS.Start() if err != nil { @@ -334,6 +339,39 @@ func (conR *ConsensusReactor) FastSync() bool { //-------------------------------------- +// startPeerErrorsRoutine spawns a new gororoutine listening for errors from +// consensus/state or other consensus modules. +func (conR *ConsensusReactor) startPeerErrorsRoutine() error { + const subscriber = "consensus-reactor" + ctx := context.Background() + + errorsCh := make(chan interface{}) + err := conR.eventBus.Subscribe(ctx, subscriber, types.QueryForEvent(peerErrorEvent), errorsCh) + if err != nil { + return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, peerErrorEvent) + } + + go func() { + for { + select { + case data, ok := <-errorsCh: + if ok { + pErr := data.(types.TMEventData).Unwrap().(peerError) + peer := conR.Switch.Peers().Get(pErr.peerID) + if peer != nil { + conR.Switch.StopPeerForError(peer, pErr.err) + } + } + case <-conR.Quit(): + conR.eventBus.UnsubscribeAll(ctx, subscriber) + return + } + } + }() + + return nil +} + // startBroadcastRoutine subscribes for new round steps, votes and proposal // heartbeats using the event bus and starts a go routine to broadcasts events // to peers upon receiving them. diff --git a/consensus/state.go b/consensus/state.go index 30bd56f10..958574dd0 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -63,6 +63,19 @@ func (ti *timeoutInfo) String() string { return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step) } +type peerError struct { + err error + peerID p2p.ID +} + +func (e peerError) Error() string { + return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error()) +} + +const ( + peerErrorEvent = "cs.PeerError" +) + // ConsensusState handles execution of the consensus algorithm. // It processes votes and proposals, and upon reaching agreement, // commits blocks to the chain and executes them against the application. @@ -582,7 +595,8 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) { // if the vote gives us a 2/3-any or 2/3-one, we transition err := cs.tryAddVote(msg.Vote, peerID) if err == ErrAddingVote { - // TODO: punish peer + // punish peer + cs.eventBus.Publish(peerErrorEvent, types.TMEventData{peerError{err, peerID}}) } // NOTE: the vote is broadcast to peers by the reactor listening