Browse Source

subscribe before state emits NewRoundStep

I had to alter events package for that. Hope that's fine.
Refs #847
pull/1574/head
Anton Kaliaev 7 years ago
parent
commit
c6f612bfc3
No known key found for this signature in database GPG Key ID: 7B6881D965918214
2 changed files with 19 additions and 21 deletions
  1. +14
    -10
      consensus/reactor.go
  2. +5
    -11
      libs/events/events.go

+ 14
- 10
consensus/reactor.go View File

@ -43,7 +43,8 @@ type ConsensusReactor struct {
eventBus *types.EventBus
}
// NewConsensusReactor returns a new ConsensusReactor with the given consensusState.
// NewConsensusReactor returns a new ConsensusReactor with the given
// consensusState.
func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor {
conR := &ConsensusReactor{
conS: consensusState,
@ -53,27 +54,31 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens
return conR
}
// OnStart implements BaseService.
// OnStart implements BaseService by subscribing to events, which later will be
// broadcasted to other peers and starting state if we're not in fast sync.
func (conR *ConsensusReactor) OnStart() error {
conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync())
if err := conR.BaseReactor.OnStart(); err != nil {
return err
}
conR.subscribeToBroadcastEvents()
if !conR.FastSync() {
err := conR.conS.Start()
if err != nil {
return err
}
conR.subscribeToBroadcastEvents()
}
return nil
}
// OnStop implements BaseService
// OnStop implements BaseService by unsubscribing from events and stopping
// state.
func (conR *ConsensusReactor) OnStop() {
conR.BaseReactor.OnStop()
conR.unsubscribeFromBroadcastEvents()
conR.conS.Stop()
}
@ -99,7 +104,6 @@ func (conR *ConsensusReactor) SwitchToConsensus(state sm.State, blocksSynced int
conR.Logger.Error("Error starting conS", "err", err)
return
}
conR.subscribeToBroadcastEvents()
}
// GetChannels implements Reactor
@ -347,11 +351,6 @@ func (conR *ConsensusReactor) FastSync() bool {
// proposal heartbeats using internal pubsub defined on state to broadcast
// them to peers upon receiving.
func (conR *ConsensusReactor) subscribeToBroadcastEvents() {
// assert consensus state is running
if !conR.conS.IsRunning() {
panic("consensus state must be running at this point")
}
const subscriber = "consensus-reactor"
conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep,
func(data tmevents.EventData) {
@ -369,6 +368,11 @@ func (conR *ConsensusReactor) subscribeToBroadcastEvents() {
})
}
func (conR *ConsensusReactor) unsubscribeFromBroadcastEvents() {
const subscriber = "consensus-reactor"
conR.conS.evsw.RemoveListener(subscriber)
}
func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(hb *types.Heartbeat) {
conR.Logger.Debug("Broadcasting proposal heartbeat message",
"height", hb.Height, "round", hb.Round, "sequence", hb.Sequence)


+ 5
- 11
libs/events/events.go View File

@ -44,25 +44,19 @@ type eventSwitch struct {
}
func NewEventSwitch() EventSwitch {
evsw := &eventSwitch{}
evsw := &eventSwitch{
eventCells: make(map[string]*eventCell),
listeners: make(map[string]*eventListener),
}
evsw.BaseService = *cmn.NewBaseService(nil, "EventSwitch", evsw)
return evsw
}
func (evsw *eventSwitch) OnStart() error {
evsw.BaseService.OnStart()
evsw.eventCells = make(map[string]*eventCell)
evsw.listeners = make(map[string]*eventListener)
return nil
}
func (evsw *eventSwitch) OnStop() {
evsw.mtx.Lock()
defer evsw.mtx.Unlock()
evsw.BaseService.OnStop()
evsw.eventCells = nil
evsw.listeners = nil
}
func (evsw *eventSwitch) OnStop() {}
func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) {
// Get/Create eventCell and listener


Loading…
Cancel
Save