|
|
@ -64,6 +64,11 @@ func (conR *ConsensusReactor) OnStart() error { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
err = conR.startPeerErrorsRoutine() |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
if !conR.FastSync() { |
|
|
|
err := conR.conS.Start() |
|
|
|
if err != nil { |
|
|
@ -334,6 +339,39 @@ func (conR *ConsensusReactor) FastSync() bool { |
|
|
|
|
|
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
// startPeerErrorsRoutine spawns a new gororoutine listening for errors from
|
|
|
|
// consensus/state or other consensus modules.
|
|
|
|
func (conR *ConsensusReactor) startPeerErrorsRoutine() error { |
|
|
|
const subscriber = "consensus-reactor" |
|
|
|
ctx := context.Background() |
|
|
|
|
|
|
|
errorsCh := make(chan interface{}) |
|
|
|
err := conR.eventBus.Subscribe(ctx, subscriber, types.QueryForEvent(peerErrorEvent), errorsCh) |
|
|
|
if err != nil { |
|
|
|
return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, peerErrorEvent) |
|
|
|
} |
|
|
|
|
|
|
|
go func() { |
|
|
|
for { |
|
|
|
select { |
|
|
|
case data, ok := <-errorsCh: |
|
|
|
if ok { |
|
|
|
pErr := data.(types.TMEventData).Unwrap().(peerError) |
|
|
|
peer := conR.Switch.Peers().Get(pErr.peerID) |
|
|
|
if peer != nil { |
|
|
|
conR.Switch.StopPeerForError(peer, pErr.err) |
|
|
|
} |
|
|
|
} |
|
|
|
case <-conR.Quit(): |
|
|
|
conR.eventBus.UnsubscribeAll(ctx, subscriber) |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
}() |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// startBroadcastRoutine subscribes for new round steps, votes and proposal
|
|
|
|
// heartbeats using the event bus and starts a go routine to broadcasts events
|
|
|
|
// to peers upon receiving them.
|
|
|
|