diff --git a/Gopkg.lock b/Gopkg.lock index 8280148c9..df971a47b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -278,6 +278,7 @@ "clist", "common", "db", + "events", "flowrate", "log", "merkle", @@ -384,6 +385,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "52a0dcbebdf8714612444914cfce59a3af8c47c4453a2d43c4ccc5ff1a91d8ea" + inputs-digest = "a88c20b6e36b3529d6fdcffc3603d9eb193fc3809de8afbba07bad990539b256" solver-name = "gps-cdcl" solver-version = 1 diff --git a/consensus/reactor.go b/consensus/reactor.go index 5e7295b55..f171aa8d9 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -10,6 +10,7 @@ import ( amino "github.com/tendermint/go-amino" cmn "github.com/tendermint/tmlibs/common" + tmevents "github.com/tendermint/tmlibs/events" "github.com/tendermint/tmlibs/log" cstypes "github.com/tendermint/tendermint/consensus/types" @@ -48,12 +49,6 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens conS: consensusState, fastSync: fastSync, } - // XXX: modifing state to send us new round steps, votes and proposal heartbeats - consensusState.reactorChs = &reactorChs{ - newRoundSteps: make(chan *cstypes.RoundState), - votes: make(chan *types.Vote), - proposalHeartbeats: make(chan *types.Heartbeat), - } conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR) return conR } @@ -65,16 +60,12 @@ func (conR *ConsensusReactor) OnStart() error { return err } - err := conR.startBroadcastRoutine() - if err != nil { - return err - } - if !conR.FastSync() { err := conR.conS.Start() if err != nil { return err } + conR.subscribeToBroadcastEvents() } return nil @@ -106,7 +97,9 @@ func (conR *ConsensusReactor) SwitchToConsensus(state sm.State, blocksSynced int err := conR.conS.Start() if err != nil { conR.Logger.Error("Error starting conS", "err", err) + return } + conR.subscribeToBroadcastEvents() } // GetChannels implements Reactor @@ -350,28 +343,30 @@ func (conR *ConsensusReactor) FastSync() bool { //-------------------------------------- -// startBroadcastRoutine subscribes for new round steps, votes and -// proposal heartbeats using the channels created for precisely this -// purpose in consensus state and starts a goroutine to broadcasts -// events to peers upon receiving them. -func (conR *ConsensusReactor) startBroadcastRoutine() error { - go func() { - rchs := conR.conS.reactorChs.(*reactorChs) - for { - select { - case rs := <-rchs.newRoundSteps: - conR.broadcastNewRoundStepMessages(rs) - case vote := <-rchs.votes: - conR.broadcastHasVoteMessage(vote) - case heartbeat := <-rchs.proposalHeartbeats: - conR.broadcastProposalHeartbeatMessage(heartbeat) - case <-conR.Quit(): - return - } - } - }() +// subscribeToBroadcastEvents subscribes for new round steps, votes and +// proposal heartbeats using internal pubsub defined on state to broadcast +// them to peers upon receiving. +func (conR *ConsensusReactor) subscribeToBroadcastEvents() { + // assert consensus state is running + if !conR.conS.IsRunning() { + panic("consensus state must be running at this point") + } - return nil + const subscriber = "consensus-reactor" + conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep, + func(data tmevents.EventData) { + conR.broadcastNewRoundStepMessages(data.(*cstypes.RoundState)) + }) + + conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote, + func(data tmevents.EventData) { + conR.broadcastHasVoteMessage(data.(*types.Vote)) + }) + + conR.conS.evsw.AddListenerForEvent(subscriber, types.EventProposalHeartbeat, + func(data tmevents.EventData) { + conR.broadcastProposalHeartbeatMessage(data.(*types.Heartbeat)) + }) } func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(hb *types.Heartbeat) { diff --git a/consensus/state.go b/consensus/state.go index 5668ea82d..0c6f7b487 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -11,6 +11,7 @@ import ( fail "github.com/ebuchman/fail-test" cmn "github.com/tendermint/tmlibs/common" + tmevents "github.com/tendermint/tmlibs/events" "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" @@ -111,39 +112,11 @@ type ConsensusState struct { // closed when we finish shutting down done chan struct{} - // synchronous pubsub between consensus state and reactor - // only set when there is a reactor - reactorChs reactorChsI + // synchronous pubsub between consensus state and reactor. + // state only emits EventNewRoundStep, EventVote and EventProposalHeartbeat + evsw tmevents.EventSwitch } -type reactorChsI interface { - NewRoundStep(*cstypes.RoundState) - Vote(*types.Vote) - ProposalHeartbeat(*types.Heartbeat) -} - -// A list of channels to send new round steps, votes and proposal heartbeats to. -type reactorChs struct { - newRoundSteps chan (*cstypes.RoundState) - votes chan (*types.Vote) - proposalHeartbeats chan (*types.Heartbeat) -} - -var _ reactorChsI = (*reactorChs)(nil) - -// BLOCKING -func (rchs *reactorChs) NewRoundStep(rs *cstypes.RoundState) { rchs.newRoundSteps <- rs } -func (rchs *reactorChs) Vote(vote *types.Vote) { rchs.votes <- vote } -func (rchs *reactorChs) ProposalHeartbeat(hb *types.Heartbeat) { rchs.proposalHeartbeats <- hb } - -type nilReactorChs struct{} - -var _ reactorChsI = nilReactorChs{} - -func (nilReactorChs) NewRoundStep(rs *cstypes.RoundState) {} -func (nilReactorChs) Vote(vote *types.Vote) {} -func (nilReactorChs) ProposalHeartbeat(hb *types.Heartbeat) {} - // NewConsensusState returns a new ConsensusState. func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState { cs := &ConsensusState{ @@ -158,7 +131,7 @@ func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *s doWALCatchup: true, wal: nilWAL{}, evpool: evpool, - reactorChs: nilReactorChs{}, + evsw: tmevents.NewEventSwitch(), } // set function defaults (may be overwritten before calling Start) cs.decideProposal = cs.defaultDecideProposal @@ -260,6 +233,10 @@ func (cs *ConsensusState) LoadCommit(height int64) *types.Commit { // OnStart implements cmn.Service. // It loads the latest state via the WAL, and starts the timeout and receive routines. func (cs *ConsensusState) OnStart() error { + if err := cs.evsw.Start(); err != nil { + return err + } + // we may set the WAL in testing before calling Start, // so only OpenWAL if its still the nilWAL if _, ok := cs.wal.(nilWAL); ok { @@ -277,8 +254,7 @@ func (cs *ConsensusState) OnStart() error { // NOTE: we will get a build up of garbage go routines // firing on the tockChan until the receiveRoutine is started // to deal with them (by that point, at most one will be valid) - err := cs.timeoutTicker.Start() - if err != nil { + if err := cs.timeoutTicker.Start(); err != nil { return err } @@ -317,6 +293,8 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) OnStop() { cs.BaseService.OnStop() + cs.evsw.Stop() + cs.timeoutTicker.Stop() // Make BaseService.Wait() wait until cs.wal.Wait() @@ -542,7 +520,7 @@ func (cs *ConsensusState) newStep() { // newStep is called by updateToStep in NewConsensusState before the eventBus is set! if cs.eventBus != nil { cs.eventBus.PublishEventNewRoundStep(rs) - cs.reactorChs.NewRoundStep(&cs.RoundState) + cs.evsw.FireEvent(types.EventNewRoundStep, &cs.RoundState) } } @@ -786,7 +764,7 @@ func (cs *ConsensusState) proposalHeartbeat(height int64, round int) { } cs.privValidator.SignHeartbeat(chainID, heartbeat) cs.eventBus.PublishEventProposalHeartbeat(types.EventDataProposalHeartbeat{heartbeat}) - cs.reactorChs.ProposalHeartbeat(heartbeat) + cs.evsw.FireEvent(types.EventProposalHeartbeat, heartbeat) counter++ time.Sleep(proposalHeartbeatIntervalSeconds * time.Second) } @@ -1453,7 +1431,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool, cs.Logger.Info(cmn.Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) cs.eventBus.PublishEventVote(types.EventDataVote{vote}) - cs.reactorChs.Vote(vote) + cs.evsw.FireEvent(types.EventVote, vote) // if we can skip timeoutCommit and have all the votes now, if cs.config.SkipTimeoutCommit && cs.LastCommit.HasAll() { @@ -1481,7 +1459,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool, } cs.eventBus.PublishEventVote(types.EventDataVote{vote}) - cs.reactorChs.Vote(vote) + cs.evsw.FireEvent(types.EventVote, vote) switch vote.Type { case types.VoteTypePrevote: