diff --git a/consensus/reactor.go b/consensus/reactor.go index 19b0c0fe2..2034ad344 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -43,7 +43,8 @@ type ConsensusReactor struct { eventBus *types.EventBus } -// NewConsensusReactor returns a new ConsensusReactor with the given consensusState. +// NewConsensusReactor returns a new ConsensusReactor with the given +// consensusState. func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor { conR := &ConsensusReactor{ conS: consensusState, @@ -53,27 +54,31 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens return conR } -// OnStart implements BaseService. +// OnStart implements BaseService by subscribing to events, which later will be +// broadcasted to other peers and starting state if we're not in fast sync. func (conR *ConsensusReactor) OnStart() error { conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync()) if err := conR.BaseReactor.OnStart(); err != nil { return err } + conR.subscribeToBroadcastEvents() + if !conR.FastSync() { err := conR.conS.Start() if err != nil { return err } - conR.subscribeToBroadcastEvents() } return nil } -// OnStop implements BaseService +// OnStop implements BaseService by unsubscribing from events and stopping +// state. func (conR *ConsensusReactor) OnStop() { conR.BaseReactor.OnStop() + conR.unsubscribeFromBroadcastEvents() conR.conS.Stop() } @@ -99,7 +104,6 @@ func (conR *ConsensusReactor) SwitchToConsensus(state sm.State, blocksSynced int conR.Logger.Error("Error starting conS", "err", err) return } - conR.subscribeToBroadcastEvents() } // GetChannels implements Reactor @@ -347,11 +351,6 @@ func (conR *ConsensusReactor) FastSync() bool { // proposal heartbeats using internal pubsub defined on state to broadcast // them to peers upon receiving. func (conR *ConsensusReactor) subscribeToBroadcastEvents() { - // assert consensus state is running - if !conR.conS.IsRunning() { - panic("consensus state must be running at this point") - } - const subscriber = "consensus-reactor" conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep, func(data tmevents.EventData) { @@ -369,6 +368,11 @@ func (conR *ConsensusReactor) subscribeToBroadcastEvents() { }) } +func (conR *ConsensusReactor) unsubscribeFromBroadcastEvents() { + const subscriber = "consensus-reactor" + conR.conS.evsw.RemoveListener(subscriber) +} + func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(hb *types.Heartbeat) { conR.Logger.Debug("Broadcasting proposal heartbeat message", "height", hb.Height, "round", hb.Round, "sequence", hb.Sequence) diff --git a/libs/events/events.go b/libs/events/events.go index f1b2a754e..075f9b42b 100644 --- a/libs/events/events.go +++ b/libs/events/events.go @@ -44,25 +44,19 @@ type eventSwitch struct { } func NewEventSwitch() EventSwitch { - evsw := &eventSwitch{} + evsw := &eventSwitch{ + eventCells: make(map[string]*eventCell), + listeners: make(map[string]*eventListener), + } evsw.BaseService = *cmn.NewBaseService(nil, "EventSwitch", evsw) return evsw } func (evsw *eventSwitch) OnStart() error { - evsw.BaseService.OnStart() - evsw.eventCells = make(map[string]*eventCell) - evsw.listeners = make(map[string]*eventListener) return nil } -func (evsw *eventSwitch) OnStop() { - evsw.mtx.Lock() - defer evsw.mtx.Unlock() - evsw.BaseService.OnStop() - evsw.eventCells = nil - evsw.listeners = nil -} +func (evsw *eventSwitch) OnStop() {} func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) { // Get/Create eventCell and listener