Browse Source

eventbus: plumb contexts (#7337)

* eventbus: plumb contexts

* fix lint
pull/7346/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
4af2dbd03b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 276 additions and 233 deletions
  1. +1
    -1
      internal/blocksync/reactor.go
  2. +1
    -1
      internal/blocksync/reactor_test.go
  3. +1
    -1
      internal/consensus/byzantine_test.go
  4. +10
    -2
      internal/consensus/common_test.go
  5. +2
    -2
      internal/consensus/reactor.go
  6. +1
    -1
      internal/consensus/reactor_test.go
  7. +20
    -14
      internal/consensus/replay.go
  8. +7
    -7
      internal/consensus/replay_file.go
  9. +12
    -9
      internal/consensus/replay_test.go
  10. +95
    -81
      internal/consensus/state.go
  11. +11
    -11
      internal/consensus/state_test.go
  12. +1
    -1
      internal/consensus/wal_generator.go
  13. +40
    -47
      internal/eventbus/event_bus.go
  14. +22
    -22
      internal/eventbus/event_bus_test.go
  15. +13
    -8
      internal/state/execution.go
  16. +5
    -5
      internal/state/execution_test.go
  17. +15
    -5
      internal/state/helpers_test.go
  18. +3
    -3
      internal/state/indexer/indexer_service_test.go
  19. +3
    -1
      internal/state/validation_test.go
  20. +4
    -4
      node/node.go
  21. +2
    -1
      node/setup.go
  22. +7
    -6
      types/events.go

+ 1
- 1
internal/blocksync/reactor.go View File

@ -580,7 +580,7 @@ FOR_LOOP:
// TODO: Same thing for app - but we would need a way to get the hash // TODO: Same thing for app - but we would need a way to get the hash
// without persisting the state. // without persisting the state.
state, err = r.blockExec.ApplyBlock(state, firstID, first)
state, err = r.blockExec.ApplyBlock(ctx, state, firstID, first)
if err != nil { if err != nil {
// TODO: This is bad, are we zombie? // TODO: This is bad, are we zombie?
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))


+ 1
- 1
internal/blocksync/reactor_test.go View File

@ -158,7 +158,7 @@ func (rts *reactorTestSuite) addNode(
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
state, err = blockExec.ApplyBlock(ctx, state, blockID, thisBlock)
require.NoError(t, err) require.NoError(t, err)
blockStore.SaveBlock(thisBlock, thisParts, lastCommit) blockStore.SaveBlock(thisBlock, thisParts, lastCommit)


+ 1
- 1
internal/consensus/byzantine_test.go View File

@ -90,7 +90,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// Make State // Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(logger, thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(ctx, logger, thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
// set private validator // set private validator
pv := privVals[i] pv := privVals[i]
cs.SetPrivValidator(pv) cs.SetPrivValidator(pv)


+ 10
- 2
internal/consensus/common_test.go View File

@ -224,7 +224,7 @@ func (vss ValidatorStubsByPower) Swap(i, j int) {
// Functions for transitioning the consensus state // Functions for transitioning the consensus state
func startTestRound(ctx context.Context, cs *State, height int64, round int32) { func startTestRound(ctx context.Context, cs *State, height int64, round int32) {
cs.enterNewRound(height, round)
cs.enterNewRound(ctx, height, round)
cs.startRoutines(ctx, 0) cs.startRoutines(ctx, 0)
} }
@ -467,7 +467,15 @@ func newStateWithConfigAndBlockStore(
} }
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore) blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(logger.With("module", "consensus"), thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(ctx,
logger.With("module", "consensus"),
thisConfig.Consensus,
state,
blockExec,
blockStore,
mempool,
evpool,
)
cs.SetPrivValidator(pv) cs.SetPrivValidator(pv)
eventBus := eventbus.NewDefault(logger.With("module", "events")) eventBus := eventbus.NewDefault(logger.With("module", "events"))


+ 2
- 2
internal/consensus/reactor.go View File

@ -275,7 +275,7 @@ func (r *Reactor) SwitchToConsensus(ctx context.Context, state sm.State, skipWAL
// NOTE: The line below causes broadcastNewRoundStepRoutine() to broadcast a // NOTE: The line below causes broadcastNewRoundStepRoutine() to broadcast a
// NewRoundStepMessage. // NewRoundStepMessage.
r.state.updateToState(state)
r.state.updateToState(ctx, state)
r.mtx.Lock() r.mtx.Lock()
r.waitSync = false r.waitSync = false
@ -299,7 +299,7 @@ conR:
} }
d := types.EventDataBlockSyncStatus{Complete: true, Height: state.LastBlockHeight} d := types.EventDataBlockSyncStatus{Complete: true, Height: state.LastBlockHeight}
if err := r.eventBus.PublishEventBlockSyncStatus(d); err != nil {
if err := r.eventBus.PublishEventBlockSyncStatus(ctx, d); err != nil {
r.Logger.Error("failed to emit the blocksync complete event", "err", err) r.Logger.Error("failed to emit the blocksync complete event", "err", err)
} }
} }


+ 1
- 1
internal/consensus/reactor_test.go View File

@ -421,7 +421,7 @@ func TestReactorWithEvidence(t *testing.T) {
evpool2 := sm.EmptyEvidencePool{} evpool2 := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(logger.With("validator", i, "module", "consensus"),
cs := NewState(ctx, logger.With("validator", i, "module", "consensus"),
thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2) thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs.SetPrivValidator(pv) cs.SetPrivValidator(pv)


+ 20
- 14
internal/consensus/replay.go View File

@ -38,7 +38,7 @@ var crc32c = crc32.MakeTable(crc32.Castagnoli)
// Unmarshal and apply a single message to the consensus state as if it were // Unmarshal and apply a single message to the consensus state as if it were
// received in receiveRoutine. Lines that start with "#" are ignored. // received in receiveRoutine. Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running. // NOTE: receiveRoutine should not be running.
func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub eventbus.Subscription) error {
func (cs *State) readReplayMessage(ctx context.Context, msg *TimedWALMessage, newStepSub eventbus.Subscription) error {
// Skip meta messages which exist for demarcating boundaries. // Skip meta messages which exist for demarcating boundaries.
if _, ok := msg.Msg.(EndHeightMessage); ok { if _, ok := msg.Msg.(EndHeightMessage); ok {
return nil return nil
@ -81,10 +81,10 @@ func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub eventbus.Sub
"blockID", v.BlockID, "peer", peerID) "blockID", v.BlockID, "peer", peerID)
} }
cs.handleMsg(m)
cs.handleMsg(ctx, m)
case timeoutInfo: case timeoutInfo:
cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.handleTimeout(m, cs.RoundState)
cs.handleTimeout(ctx, m, cs.RoundState)
default: default:
return fmt.Errorf("replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg)) return fmt.Errorf("replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg))
} }
@ -93,7 +93,7 @@ func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub eventbus.Sub
// Replay only those messages since the last block. `timeoutRoutine` should // Replay only those messages since the last block. `timeoutRoutine` should
// run concurrently to read off tickChan. // run concurrently to read off tickChan.
func (cs *State) catchupReplay(csHeight int64) error {
func (cs *State) catchupReplay(ctx context.Context, csHeight int64) error {
// Set replayMode to true so we don't log signing errors. // Set replayMode to true so we don't log signing errors.
cs.replayMode = true cs.replayMode = true
@ -160,7 +160,7 @@ LOOP:
// NOTE: since the priv key is set when the msgs are received // NOTE: since the priv key is set when the msgs are received
// it will attempt to eg double sign but we can just ignore it // it will attempt to eg double sign but we can just ignore it
// since the votes will be replayed and we'll get to the next step // since the votes will be replayed and we'll get to the next step
if err := cs.readReplayMessage(msg, nil); err != nil {
if err := cs.readReplayMessage(ctx, msg, nil); err != nil {
return err return err
} }
} }
@ -390,7 +390,7 @@ func (h *Handshaker) ReplayBlocks(
// Either the app is asking for replay, or we're all synced up. // Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight { if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false)
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, false)
} else if appBlockHeight == storeBlockHeight { } else if appBlockHeight == storeBlockHeight {
// We're good! // We're good!
@ -405,7 +405,7 @@ func (h *Handshaker) ReplayBlocks(
case appBlockHeight < stateBlockHeight: case appBlockHeight < stateBlockHeight:
// the app is further behind than it should be, so replay blocks // the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL // but leave the last block to go through the WAL
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true)
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, true)
case appBlockHeight == stateBlockHeight: case appBlockHeight == stateBlockHeight:
// We haven't run Commit (both the state and app are one block behind), // We haven't run Commit (both the state and app are one block behind),
@ -413,7 +413,7 @@ func (h *Handshaker) ReplayBlocks(
// NOTE: We could instead use the cs.WAL on cs.Start, // NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT // but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT
h.logger.Info("Replay last block using real app") h.logger.Info("Replay last block using real app")
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus())
return state.AppHash, err return state.AppHash, err
case appBlockHeight == storeBlockHeight: case appBlockHeight == storeBlockHeight:
@ -424,7 +424,7 @@ func (h *Handshaker) ReplayBlocks(
} }
mockApp := newMockProxyApp(ctx, h.logger, appHash, abciResponses) mockApp := newMockProxyApp(ctx, h.logger, appHash, abciResponses)
h.logger.Info("Replay last block using mock app") h.logger.Info("Replay last block using mock app")
state, err = h.replayBlock(state, storeBlockHeight, mockApp)
state, err = h.replayBlock(ctx, state, storeBlockHeight, mockApp)
return state.AppHash, err return state.AppHash, err
} }
@ -435,6 +435,7 @@ func (h *Handshaker) ReplayBlocks(
} }
func (h *Handshaker) replayBlocks( func (h *Handshaker) replayBlocks(
ctx context.Context,
state sm.State, state sm.State,
proxyApp proxy.AppConns, proxyApp proxy.AppConns,
appBlockHeight, appBlockHeight,
@ -474,13 +475,13 @@ func (h *Handshaker) replayBlocks(
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store) h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus) blockExec.SetEventBus(h.eventBus)
appHash, err = sm.ExecCommitBlock(
appHash, err = sm.ExecCommitBlock(ctx,
blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} else { } else {
appHash, err = sm.ExecCommitBlock(
appHash, err = sm.ExecCommitBlock(ctx,
nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil { if err != nil {
return nil, err return nil, err
@ -492,7 +493,7 @@ func (h *Handshaker) replayBlocks(
if mutateState { if mutateState {
// sync the final block // sync the final block
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -504,7 +505,12 @@ func (h *Handshaker) replayBlocks(
} }
// ApplyBlock on the proxyApp with the last block. // ApplyBlock on the proxyApp with the last block.
func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) {
func (h *Handshaker) replayBlock(
ctx context.Context,
state sm.State,
height int64,
proxyApp proxy.AppConnConsensus,
) (sm.State, error) {
block := h.store.LoadBlock(height) block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height) meta := h.store.LoadBlockMeta(height)
@ -514,7 +520,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap
blockExec.SetEventBus(h.eventBus) blockExec.SetEventBus(h.eventBus)
var err error var err error
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)
state, err = blockExec.ApplyBlock(ctx, state, meta.BlockID, block)
if err != nil { if err != nil {
return sm.State{}, err return sm.State{}, err
} }


+ 7
- 7
internal/consensus/replay_file.go View File

@ -104,7 +104,7 @@ func (cs *State) ReplayFile(ctx context.Context, file string, console bool) erro
return err return err
} }
if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
if err := pb.cs.readReplayMessage(ctx, msg, newStepSub); err != nil {
return err return err
} }
@ -141,13 +141,13 @@ func newPlayback(fileName string, fp *os.File, cs *State, genState sm.State) *pl
} }
// go back count steps by resetting the state and running (pb.count - count) steps // go back count steps by resetting the state and running (pb.count - count) steps
func (pb *playback) replayReset(count int, newStepSub eventbus.Subscription) error {
func (pb *playback) replayReset(ctx context.Context, count int, newStepSub eventbus.Subscription) error {
if err := pb.cs.Stop(); err != nil { if err := pb.cs.Stop(); err != nil {
return err return err
} }
pb.cs.Wait() pb.cs.Wait()
newCS := NewState(pb.cs.Logger, pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
newCS := NewState(ctx, pb.cs.Logger, pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool) pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
newCS.SetEventBus(pb.cs.eventBus) newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay() newCS.startForReplay()
@ -173,7 +173,7 @@ func (pb *playback) replayReset(count int, newStepSub eventbus.Subscription) err
} else if err != nil { } else if err != nil {
return err return err
} }
if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
if err := pb.cs.readReplayMessage(ctx, msg, newStepSub); err != nil {
return err return err
} }
pb.count++ pb.count++
@ -254,7 +254,7 @@ func (pb *playback) replayConsoleLoop() (int, error) {
}() }()
if len(tokens) == 1 { if len(tokens) == 1 {
if err := pb.replayReset(1, newStepSub); err != nil {
if err := pb.replayReset(ctx, 1, newStepSub); err != nil {
pb.cs.Logger.Error("Replay reset error", "err", err) pb.cs.Logger.Error("Replay reset error", "err", err)
} }
} else { } else {
@ -263,7 +263,7 @@ func (pb *playback) replayConsoleLoop() (int, error) {
fmt.Println("back takes an integer argument") fmt.Println("back takes an integer argument")
} else if i > pb.count { } else if i > pb.count {
fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count) fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count)
} else if err := pb.replayReset(i, newStepSub); err != nil {
} else if err := pb.replayReset(ctx, i, newStepSub); err != nil {
pb.cs.Logger.Error("Replay reset error", "err", err) pb.cs.Logger.Error("Replay reset error", "err", err)
} }
} }
@ -359,7 +359,7 @@ func newConsensusStateForReplay(
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore) blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(logger, csConfig, state.Copy(), blockExec,
consensusState := NewState(ctx, logger, csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool) blockStore, mempool, evpool)
consensusState.SetEventBus(eventBus) consensusState.SetEventBus(eventBus)


+ 12
- 9
internal/consensus/replay_test.go View File

@ -855,18 +855,21 @@ func testHandshakeReplay(
} }
} }
func applyBlock(stateStore sm.Store,
func applyBlock(
ctx context.Context,
stateStore sm.Store,
mempool mempool.Mempool, mempool mempool.Mempool,
evpool sm.EvidencePool, evpool sm.EvidencePool,
st sm.State, st sm.State,
blk *types.Block, blk *types.Block,
proxyApp proxy.AppConns, proxyApp proxy.AppConns,
blockStore *mockBlockStore) sm.State {
blockStore *mockBlockStore,
) sm.State {
testPartSize := types.BlockPartSizeBytes testPartSize := types.BlockPartSizeBytes
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
blkID := types.BlockID{Hash: blk.Hash(), PartSetHeader: blk.MakePartSet(testPartSize).Header()} blkID := types.BlockID{Hash: blk.Hash(), PartSetHeader: blk.MakePartSet(testPartSize).Header()}
newState, err := blockExec.ApplyBlock(st, blkID, blk)
newState, err := blockExec.ApplyBlock(ctx, st, blkID, blk)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -904,18 +907,18 @@ func buildAppStateFromChain(
case 0: case 0:
for i := 0; i < nBlocks; i++ { for i := 0; i < nBlocks; i++ {
block := chain[i] block := chain[i]
state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore)
state = applyBlock(ctx, stateStore, mempool, evpool, state, block, proxyApp, blockStore)
} }
case 1, 2, 3: case 1, 2, 3:
for i := 0; i < nBlocks-1; i++ { for i := 0; i < nBlocks-1; i++ {
block := chain[i] block := chain[i]
state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore)
state = applyBlock(ctx, stateStore, mempool, evpool, state, block, proxyApp, blockStore)
} }
if mode == 2 || mode == 3 { if mode == 2 || mode == 3 {
// update the kvstore height and apphash // update the kvstore height and apphash
// as if we ran commit but not // as if we ran commit but not
state = applyBlock(stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp, blockStore)
state = applyBlock(ctx, stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp, blockStore)
} }
default: default:
panic(fmt.Sprintf("unknown mode %v", mode)) panic(fmt.Sprintf("unknown mode %v", mode))
@ -961,19 +964,19 @@ func buildTMStateFromChain(
case 0: case 0:
// sync right up // sync right up
for _, block := range chain { for _, block := range chain {
state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore)
state = applyBlock(ctx, stateStore, mempool, evpool, state, block, proxyApp, blockStore)
} }
case 1, 2, 3: case 1, 2, 3:
// sync up to the penultimate as if we stored the block. // sync up to the penultimate as if we stored the block.
// whether we commit or not depends on the appHash // whether we commit or not depends on the appHash
for _, block := range chain[:len(chain)-1] { for _, block := range chain[:len(chain)-1] {
state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore)
state = applyBlock(ctx, stateStore, mempool, evpool, state, block, proxyApp, blockStore)
} }
// apply the final block to a state copy so we can // apply the final block to a state copy so we can
// get the right next appHash but keep the state back // get the right next appHash but keep the state back
applyBlock(stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, blockStore)
applyBlock(ctx, stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, blockStore)
default: default:
panic(fmt.Sprintf("unknown mode %v", mode)) panic(fmt.Sprintf("unknown mode %v", mode))
} }


+ 95
- 81
internal/consensus/state.go View File

@ -153,6 +153,7 @@ type StateOption func(*State)
// NewState returns a new State. // NewState returns a new State.
func NewState( func NewState(
ctx context.Context,
logger log.Logger, logger log.Logger,
cfg *config.ConsensusConfig, cfg *config.ConsensusConfig,
state sm.State, state sm.State,
@ -190,7 +191,7 @@ func NewState(
cs.reconstructLastCommit(state) cs.reconstructLastCommit(state)
} }
cs.updateToState(state)
cs.updateToState(ctx, state)
// NOTE: we do not call scheduleRound0 yet, we do that upon Start() // NOTE: we do not call scheduleRound0 yet, we do that upon Start()
@ -345,7 +346,7 @@ func (cs *State) OnStart(ctx context.Context) error {
LOOP: LOOP:
for { for {
err := cs.catchupReplay(cs.Height)
err := cs.catchupReplay(ctx, cs.Height)
switch { switch {
case err == nil: case err == nil:
break LOOP break LOOP
@ -409,7 +410,7 @@ func (cs *State) OnStart(ctx context.Context) error {
} }
// now start the receiveRoutine // now start the receiveRoutine
go cs.receiveRoutine(0)
go cs.receiveRoutine(ctx, 0)
// schedule the first round! // schedule the first round!
// use GetRoundState so we don't race the receiveRoutine for access // use GetRoundState so we don't race the receiveRoutine for access
@ -427,7 +428,7 @@ func (cs *State) startRoutines(ctx context.Context, maxSteps int) {
return return
} }
go cs.receiveRoutine(maxSteps)
go cs.receiveRoutine(ctx, maxSteps)
} }
// loadWalFile loads WAL data from file. It overwrites cs.wal. // loadWalFile loads WAL data from file. It overwrites cs.wal.
@ -625,7 +626,7 @@ func (cs *State) reconstructLastCommit(state sm.State) {
// Updates State and increments height to match that of state. // Updates State and increments height to match that of state.
// The round becomes 0 and cs.Step becomes cstypes.RoundStepNewHeight. // The round becomes 0 and cs.Step becomes cstypes.RoundStepNewHeight.
func (cs *State) updateToState(state sm.State) {
func (cs *State) updateToState(ctx context.Context, state sm.State) {
if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight { if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight {
panic(fmt.Sprintf( panic(fmt.Sprintf(
"updateToState() expected state height of %v but found %v", "updateToState() expected state height of %v but found %v",
@ -660,7 +661,7 @@ func (cs *State) updateToState(state sm.State) {
"new_height", state.LastBlockHeight+1, "new_height", state.LastBlockHeight+1,
"old_height", cs.state.LastBlockHeight+1, "old_height", cs.state.LastBlockHeight+1,
) )
cs.newStep()
cs.newStep(ctx)
return return
} }
} }
@ -729,10 +730,10 @@ func (cs *State) updateToState(state sm.State) {
cs.state = state cs.state = state
// Finally, broadcast RoundState // Finally, broadcast RoundState
cs.newStep()
cs.newStep(ctx)
} }
func (cs *State) newStep() {
func (cs *State) newStep(ctx context.Context) {
rs := cs.RoundStateEvent() rs := cs.RoundStateEvent()
if err := cs.wal.Write(rs); err != nil { if err := cs.wal.Write(rs); err != nil {
cs.Logger.Error("failed writing to WAL", "err", err) cs.Logger.Error("failed writing to WAL", "err", err)
@ -742,7 +743,7 @@ func (cs *State) newStep() {
// newStep is called by updateToState in NewState before the eventBus is set! // newStep is called by updateToState in NewState before the eventBus is set!
if cs.eventBus != nil { if cs.eventBus != nil {
if err := cs.eventBus.PublishEventNewRoundStep(rs); err != nil {
if err := cs.eventBus.PublishEventNewRoundStep(ctx, rs); err != nil {
cs.Logger.Error("failed publishing new round step", "err", err) cs.Logger.Error("failed publishing new round step", "err", err)
} }
@ -758,7 +759,7 @@ func (cs *State) newStep() {
// It keeps the RoundState and is the only thing that updates it. // It keeps the RoundState and is the only thing that updates it.
// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities. // Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities.
// State must be locked before any internal state is updated. // State must be locked before any internal state is updated.
func (cs *State) receiveRoutine(maxSteps int) {
func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) {
onExit := func(cs *State) { onExit := func(cs *State) {
// NOTE: the internalMsgQueue may have signed messages from our // NOTE: the internalMsgQueue may have signed messages from our
// priv_val that haven't hit the WAL, but its ok because // priv_val that haven't hit the WAL, but its ok because
@ -804,7 +805,7 @@ func (cs *State) receiveRoutine(maxSteps int) {
select { select {
case <-cs.txNotifier.TxsAvailable(): case <-cs.txNotifier.TxsAvailable():
cs.handleTxsAvailable()
cs.handleTxsAvailable(ctx)
case mi = <-cs.peerMsgQueue: case mi = <-cs.peerMsgQueue:
if err := cs.wal.Write(mi); err != nil { if err := cs.wal.Write(mi); err != nil {
@ -813,7 +814,7 @@ func (cs *State) receiveRoutine(maxSteps int) {
// handles proposals, block parts, votes // handles proposals, block parts, votes
// may generate internal events (votes, complete proposals, 2/3 majorities) // may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi)
cs.handleMsg(ctx, mi)
case mi = <-cs.internalMsgQueue: case mi = <-cs.internalMsgQueue:
err := cs.wal.WriteSync(mi) // NOTE: fsync err := cs.wal.WriteSync(mi) // NOTE: fsync
@ -833,7 +834,7 @@ func (cs *State) receiveRoutine(maxSteps int) {
} }
// handles proposals, block parts, votes // handles proposals, block parts, votes
cs.handleMsg(mi)
cs.handleMsg(ctx, mi)
case ti := <-cs.timeoutTicker.Chan(): // tockChan: case ti := <-cs.timeoutTicker.Chan(): // tockChan:
if err := cs.wal.Write(ti); err != nil { if err := cs.wal.Write(ti); err != nil {
@ -842,17 +843,18 @@ func (cs *State) receiveRoutine(maxSteps int) {
// if the timeout is relevant to the rs // if the timeout is relevant to the rs
// go to the next step // go to the next step
cs.handleTimeout(ti, rs)
cs.handleTimeout(ctx, ti, rs)
case <-cs.Quit(): case <-cs.Quit():
onExit(cs) onExit(cs)
return return
} }
// TODO should we handle context cancels here?
} }
} }
// state transitions on complete-proposal, 2/3-any, 2/3-one // state transitions on complete-proposal, 2/3-any, 2/3-one
func (cs *State) handleMsg(mi msgInfo) {
func (cs *State) handleMsg(ctx context.Context, mi msgInfo) {
cs.mtx.Lock() cs.mtx.Lock()
defer cs.mtx.Unlock() defer cs.mtx.Unlock()
@ -871,7 +873,7 @@ func (cs *State) handleMsg(mi msgInfo) {
case *BlockPartMessage: case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
added, err = cs.addProposalBlockPart(msg, peerID)
added, err = cs.addProposalBlockPart(ctx, msg, peerID)
if added { if added {
cs.statsMsgQueue <- mi cs.statsMsgQueue <- mi
} }
@ -889,7 +891,7 @@ func (cs *State) handleMsg(mi msgInfo) {
case *VoteMessage: case *VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature // attempt to add the vote and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition // if the vote gives us a 2/3-any or 2/3-one, we transition
added, err = cs.tryAddVote(msg.Vote, peerID)
added, err = cs.tryAddVote(ctx, msg.Vote, peerID)
if added { if added {
cs.statsMsgQueue <- mi cs.statsMsgQueue <- mi
} }
@ -926,7 +928,11 @@ func (cs *State) handleMsg(mi msgInfo) {
} }
} }
func (cs *State) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) {
func (cs *State) handleTimeout(
ctx context.Context,
ti timeoutInfo,
rs cstypes.RoundState,
) {
cs.Logger.Debug("received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) cs.Logger.Debug("received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
// timeouts must be for current height, round, step // timeouts must be for current height, round, step
@ -943,32 +949,32 @@ func (cs *State) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) {
case cstypes.RoundStepNewHeight: case cstypes.RoundStepNewHeight:
// NewRound event fired from enterNewRound. // NewRound event fired from enterNewRound.
// XXX: should we fire timeout here (for timeout commit)? // XXX: should we fire timeout here (for timeout commit)?
cs.enterNewRound(ti.Height, 0)
cs.enterNewRound(ctx, ti.Height, 0)
case cstypes.RoundStepNewRound: case cstypes.RoundStepNewRound:
cs.enterPropose(ti.Height, 0)
cs.enterPropose(ctx, ti.Height, 0)
case cstypes.RoundStepPropose: case cstypes.RoundStepPropose:
if err := cs.eventBus.PublishEventTimeoutPropose(cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventTimeoutPropose(ctx, cs.RoundStateEvent()); err != nil {
cs.Logger.Error("failed publishing timeout propose", "err", err) cs.Logger.Error("failed publishing timeout propose", "err", err)
} }
cs.enterPrevote(ti.Height, ti.Round)
cs.enterPrevote(ctx, ti.Height, ti.Round)
case cstypes.RoundStepPrevoteWait: case cstypes.RoundStepPrevoteWait:
if err := cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventTimeoutWait(ctx, cs.RoundStateEvent()); err != nil {
cs.Logger.Error("failed publishing timeout wait", "err", err) cs.Logger.Error("failed publishing timeout wait", "err", err)
} }
cs.enterPrecommit(ti.Height, ti.Round)
cs.enterPrecommit(ctx, ti.Height, ti.Round)
case cstypes.RoundStepPrecommitWait: case cstypes.RoundStepPrecommitWait:
if err := cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventTimeoutWait(ctx, cs.RoundStateEvent()); err != nil {
cs.Logger.Error("failed publishing timeout wait", "err", err) cs.Logger.Error("failed publishing timeout wait", "err", err)
} }
cs.enterPrecommit(ti.Height, ti.Round)
cs.enterNewRound(ti.Height, ti.Round+1)
cs.enterPrecommit(ctx, ti.Height, ti.Round)
cs.enterNewRound(ctx, ti.Height, ti.Round+1)
default: default:
panic(fmt.Sprintf("invalid timeout step: %v", ti.Step)) panic(fmt.Sprintf("invalid timeout step: %v", ti.Step))
@ -976,7 +982,7 @@ func (cs *State) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) {
} }
func (cs *State) handleTxsAvailable() {
func (cs *State) handleTxsAvailable(ctx context.Context) {
cs.mtx.Lock() cs.mtx.Lock()
defer cs.mtx.Unlock() defer cs.mtx.Unlock()
@ -997,7 +1003,7 @@ func (cs *State) handleTxsAvailable() {
cs.scheduleTimeout(timeoutCommit, cs.Height, 0, cstypes.RoundStepNewRound) cs.scheduleTimeout(timeoutCommit, cs.Height, 0, cstypes.RoundStepNewRound)
case cstypes.RoundStepNewRound: // after timeoutCommit case cstypes.RoundStepNewRound: // after timeoutCommit
cs.enterPropose(cs.Height, 0)
cs.enterPropose(ctx, cs.Height, 0)
} }
} }
@ -1011,7 +1017,7 @@ func (cs *State) handleTxsAvailable() {
// Enter: +2/3 precommits for nil at (height,round-1) // Enter: +2/3 precommits for nil at (height,round-1)
// Enter: +2/3 prevotes any or +2/3 precommits for block or any from (height, round) // Enter: +2/3 prevotes any or +2/3 precommits for block or any from (height, round)
// NOTE: cs.StartTime was already set for height. // NOTE: cs.StartTime was already set for height.
func (cs *State) enterNewRound(height int64, round int32) {
func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) {
logger := cs.Logger.With("height", height, "round", round) logger := cs.Logger.With("height", height, "round", round)
if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != cstypes.RoundStepNewHeight) { if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != cstypes.RoundStepNewHeight) {
@ -1054,7 +1060,7 @@ func (cs *State) enterNewRound(height int64, round int32) {
cs.Votes.SetRound(tmmath.SafeAddInt32(round, 1)) // also track next round (round+1) to allow round-skipping cs.Votes.SetRound(tmmath.SafeAddInt32(round, 1)) // also track next round (round+1) to allow round-skipping
cs.TriggeredTimeoutPrecommit = false cs.TriggeredTimeoutPrecommit = false
if err := cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()); err != nil {
if err := cs.eventBus.PublishEventNewRound(ctx, cs.NewRoundEvent()); err != nil {
cs.Logger.Error("failed publishing new round", "err", err) cs.Logger.Error("failed publishing new round", "err", err)
} }
@ -1070,7 +1076,7 @@ func (cs *State) enterNewRound(height int64, round int32) {
cstypes.RoundStepNewRound) cstypes.RoundStepNewRound)
} }
} else { } else {
cs.enterPropose(height, round)
cs.enterPropose(ctx, height, round)
} }
} }
@ -1093,7 +1099,7 @@ func (cs *State) needProofBlock(height int64) bool {
// Enter (CreateEmptyBlocks, CreateEmptyBlocksInterval > 0 ): // Enter (CreateEmptyBlocks, CreateEmptyBlocksInterval > 0 ):
// after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval // after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval
// Enter (!CreateEmptyBlocks) : after enterNewRound(height,round), once txs are in the mempool // Enter (!CreateEmptyBlocks) : after enterNewRound(height,round), once txs are in the mempool
func (cs *State) enterPropose(height int64, round int32) {
func (cs *State) enterPropose(ctx context.Context, height int64, round int32) {
logger := cs.Logger.With("height", height, "round", round) logger := cs.Logger.With("height", height, "round", round)
if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPropose <= cs.Step) { if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPropose <= cs.Step) {
@ -1109,13 +1115,13 @@ func (cs *State) enterPropose(height int64, round int32) {
defer func() { defer func() {
// Done enterPropose: // Done enterPropose:
cs.updateRoundStep(round, cstypes.RoundStepPropose) cs.updateRoundStep(round, cstypes.RoundStepPropose)
cs.newStep()
cs.newStep(ctx)
// If we have the whole proposal + POL, then goto Prevote now. // If we have the whole proposal + POL, then goto Prevote now.
// else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart), // else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart),
// or else after timeoutPropose // or else after timeoutPropose
if cs.isProposalComplete() { if cs.isProposalComplete() {
cs.enterPrevote(height, cs.Round)
cs.enterPrevote(ctx, height, cs.Round)
} }
}() }()
@ -1271,7 +1277,7 @@ func (cs *State) createProposalBlock() (block *types.Block, blockParts *types.Pa
// Enter: proposal block and POL is ready. // Enter: proposal block and POL is ready.
// Prevote for LockedBlock if we're locked, or ProposalBlock if valid. // Prevote for LockedBlock if we're locked, or ProposalBlock if valid.
// Otherwise vote nil. // Otherwise vote nil.
func (cs *State) enterPrevote(height int64, round int32) {
func (cs *State) enterPrevote(ctx context.Context, height int64, round int32) {
logger := cs.Logger.With("height", height, "round", round) logger := cs.Logger.With("height", height, "round", round)
if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPrevote <= cs.Step) { if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPrevote <= cs.Step) {
@ -1285,7 +1291,7 @@ func (cs *State) enterPrevote(height int64, round int32) {
defer func() { defer func() {
// Done enterPrevote: // Done enterPrevote:
cs.updateRoundStep(round, cstypes.RoundStepPrevote) cs.updateRoundStep(round, cstypes.RoundStepPrevote)
cs.newStep()
cs.newStep(ctx)
}() }()
logger.Debug("entering prevote step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step)) logger.Debug("entering prevote step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))
@ -1331,7 +1337,7 @@ func (cs *State) defaultDoPrevote(height int64, round int32) {
} }
// Enter: any +2/3 prevotes at next round. // Enter: any +2/3 prevotes at next round.
func (cs *State) enterPrevoteWait(height int64, round int32) {
func (cs *State) enterPrevoteWait(ctx context.Context, height int64, round int32) {
logger := cs.Logger.With("height", height, "round", round) logger := cs.Logger.With("height", height, "round", round)
if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPrevoteWait <= cs.Step) { if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPrevoteWait <= cs.Step) {
@ -1354,7 +1360,7 @@ func (cs *State) enterPrevoteWait(height int64, round int32) {
defer func() { defer func() {
// Done enterPrevoteWait: // Done enterPrevoteWait:
cs.updateRoundStep(round, cstypes.RoundStepPrevoteWait) cs.updateRoundStep(round, cstypes.RoundStepPrevoteWait)
cs.newStep()
cs.newStep(ctx)
}() }()
// Wait for some more prevotes; enterPrecommit // Wait for some more prevotes; enterPrecommit
@ -1367,7 +1373,7 @@ func (cs *State) enterPrevoteWait(height int64, round int32) {
// Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round) // Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round)
// else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil, // else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil,
// else, precommit nil otherwise. // else, precommit nil otherwise.
func (cs *State) enterPrecommit(height int64, round int32) {
func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32) {
logger := cs.Logger.With("height", height, "round", round) logger := cs.Logger.With("height", height, "round", round)
if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPrecommit <= cs.Step) { if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPrecommit <= cs.Step) {
@ -1383,7 +1389,7 @@ func (cs *State) enterPrecommit(height int64, round int32) {
defer func() { defer func() {
// Done enterPrecommit: // Done enterPrecommit:
cs.updateRoundStep(round, cstypes.RoundStepPrecommit) cs.updateRoundStep(round, cstypes.RoundStepPrecommit)
cs.newStep()
cs.newStep(ctx)
}() }()
// check for a polka // check for a polka
@ -1402,7 +1408,7 @@ func (cs *State) enterPrecommit(height int64, round int32) {
} }
// At this point +2/3 prevoted for a particular block or nil. // At this point +2/3 prevoted for a particular block or nil.
if err := cs.eventBus.PublishEventPolka(cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventPolka(ctx, cs.RoundStateEvent()); err != nil {
logger.Error("failed publishing polka", "err", err) logger.Error("failed publishing polka", "err", err)
} }
@ -1422,7 +1428,7 @@ func (cs *State) enterPrecommit(height int64, round int32) {
cs.LockedBlock = nil cs.LockedBlock = nil
cs.LockedBlockParts = nil cs.LockedBlockParts = nil
if err := cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventUnlock(ctx, cs.RoundStateEvent()); err != nil {
logger.Error("failed publishing event unlock", "err", err) logger.Error("failed publishing event unlock", "err", err)
} }
} }
@ -1438,7 +1444,7 @@ func (cs *State) enterPrecommit(height int64, round int32) {
logger.Debug("precommit step; +2/3 prevoted locked block; relocking") logger.Debug("precommit step; +2/3 prevoted locked block; relocking")
cs.LockedRound = round cs.LockedRound = round
if err := cs.eventBus.PublishEventRelock(cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventRelock(ctx, cs.RoundStateEvent()); err != nil {
logger.Error("failed publishing event relock", "err", err) logger.Error("failed publishing event relock", "err", err)
} }
@ -1459,7 +1465,7 @@ func (cs *State) enterPrecommit(height int64, round int32) {
cs.LockedBlock = cs.ProposalBlock cs.LockedBlock = cs.ProposalBlock
cs.LockedBlockParts = cs.ProposalBlockParts cs.LockedBlockParts = cs.ProposalBlockParts
if err := cs.eventBus.PublishEventLock(cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventLock(ctx, cs.RoundStateEvent()); err != nil {
logger.Error("failed publishing event lock", "err", err) logger.Error("failed publishing event lock", "err", err)
} }
@ -1481,7 +1487,7 @@ func (cs *State) enterPrecommit(height int64, round int32) {
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader)
} }
if err := cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventUnlock(ctx, cs.RoundStateEvent()); err != nil {
logger.Error("failed publishing event unlock", "err", err) logger.Error("failed publishing event unlock", "err", err)
} }
@ -1489,7 +1495,7 @@ func (cs *State) enterPrecommit(height int64, round int32) {
} }
// Enter: any +2/3 precommits for next round. // Enter: any +2/3 precommits for next round.
func (cs *State) enterPrecommitWait(height int64, round int32) {
func (cs *State) enterPrecommitWait(ctx context.Context, height int64, round int32) {
logger := cs.Logger.With("height", height, "round", round) logger := cs.Logger.With("height", height, "round", round)
if cs.Height != height || round < cs.Round || (cs.Round == round && cs.TriggeredTimeoutPrecommit) { if cs.Height != height || round < cs.Round || (cs.Round == round && cs.TriggeredTimeoutPrecommit) {
@ -1513,7 +1519,7 @@ func (cs *State) enterPrecommitWait(height int64, round int32) {
defer func() { defer func() {
// Done enterPrecommitWait: // Done enterPrecommitWait:
cs.TriggeredTimeoutPrecommit = true cs.TriggeredTimeoutPrecommit = true
cs.newStep()
cs.newStep(ctx)
}() }()
// wait for some more precommits; enterNewRound // wait for some more precommits; enterNewRound
@ -1521,7 +1527,7 @@ func (cs *State) enterPrecommitWait(height int64, round int32) {
} }
// Enter: +2/3 precommits for block // Enter: +2/3 precommits for block
func (cs *State) enterCommit(height int64, commitRound int32) {
func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int32) {
logger := cs.Logger.With("height", height, "commit_round", commitRound) logger := cs.Logger.With("height", height, "commit_round", commitRound)
if cs.Height != height || cstypes.RoundStepCommit <= cs.Step { if cs.Height != height || cstypes.RoundStepCommit <= cs.Step {
@ -1540,10 +1546,10 @@ func (cs *State) enterCommit(height int64, commitRound int32) {
cs.updateRoundStep(cs.Round, cstypes.RoundStepCommit) cs.updateRoundStep(cs.Round, cstypes.RoundStepCommit)
cs.CommitRound = commitRound cs.CommitRound = commitRound
cs.CommitTime = tmtime.Now() cs.CommitTime = tmtime.Now()
cs.newStep()
cs.newStep(ctx)
// Maybe finalize immediately. // Maybe finalize immediately.
cs.tryFinalizeCommit(height)
cs.tryFinalizeCommit(ctx, height)
}() }()
blockID, ok := cs.Votes.Precommits(commitRound).TwoThirdsMajority() blockID, ok := cs.Votes.Precommits(commitRound).TwoThirdsMajority()
@ -1574,7 +1580,7 @@ func (cs *State) enterCommit(height int64, commitRound int32) {
cs.ProposalBlock = nil cs.ProposalBlock = nil
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader)
if err := cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventValidBlock(ctx, cs.RoundStateEvent()); err != nil {
logger.Error("failed publishing valid block", "err", err) logger.Error("failed publishing valid block", "err", err)
} }
@ -1584,7 +1590,7 @@ func (cs *State) enterCommit(height int64, commitRound int32) {
} }
// If we have the block AND +2/3 commits for it, finalize. // If we have the block AND +2/3 commits for it, finalize.
func (cs *State) tryFinalizeCommit(height int64) {
func (cs *State) tryFinalizeCommit(ctx context.Context, height int64) {
logger := cs.Logger.With("height", height) logger := cs.Logger.With("height", height)
if cs.Height != height { if cs.Height != height {
@ -1608,11 +1614,11 @@ func (cs *State) tryFinalizeCommit(height int64) {
return return
} }
cs.finalizeCommit(height)
cs.finalizeCommit(ctx, height)
} }
// Increment height and goto cstypes.RoundStepNewHeight // Increment height and goto cstypes.RoundStepNewHeight
func (cs *State) finalizeCommit(height int64) {
func (cs *State) finalizeCommit(ctx context.Context, height int64) {
logger := cs.Logger.With("height", height) logger := cs.Logger.With("height", height)
if cs.Height != height || cs.Step != cstypes.RoundStepCommit { if cs.Height != height || cs.Step != cstypes.RoundStepCommit {
@ -1692,7 +1698,7 @@ func (cs *State) finalizeCommit(height int64) {
// Execute and commit the block, update and save the state, and update the mempool. // Execute and commit the block, update and save the state, and update the mempool.
// NOTE The block.AppHash wont reflect these txs until the next block. // NOTE The block.AppHash wont reflect these txs until the next block.
stateCopy, err := cs.blockExec.ApplyBlock(
stateCopy, err := cs.blockExec.ApplyBlock(ctx,
stateCopy, stateCopy,
types.BlockID{ types.BlockID{
Hash: block.Hash(), Hash: block.Hash(),
@ -1711,7 +1717,7 @@ func (cs *State) finalizeCommit(height int64) {
cs.RecordMetrics(height, block) cs.RecordMetrics(height, block)
// NewHeightStep! // NewHeightStep!
cs.updateToState(stateCopy)
cs.updateToState(ctx, stateCopy)
fail.Fail() // XXX fail.Fail() // XXX
@ -1864,7 +1870,11 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error {
// NOTE: block is not necessarily valid. // NOTE: block is not necessarily valid.
// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit, // Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit,
// once we have the full block. // once we have the full block.
func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID types.NodeID) (added bool, err error) {
func (cs *State) addProposalBlockPart(
ctx context.Context,
msg *BlockPartMessage,
peerID types.NodeID,
) (added bool, err error) {
height, round, part := msg.Height, msg.Round, msg.Part height, round, part := msg.Height, msg.Round, msg.Part
// Blocks might be reused, so round mismatch is OK // Blocks might be reused, so round mismatch is OK
@ -1918,7 +1928,7 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID types.NodeID
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal // NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
cs.Logger.Info("received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) cs.Logger.Info("received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil {
if err := cs.eventBus.PublishEventCompleteProposal(ctx, cs.CompleteProposalEvent()); err != nil {
cs.Logger.Error("failed publishing event complete proposal", "err", err) cs.Logger.Error("failed publishing event complete proposal", "err", err)
} }
@ -1946,13 +1956,13 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID types.NodeID
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step // Move onto the next step
cs.enterPrevote(height, cs.Round)
cs.enterPrevote(ctx, height, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(height, cs.Round)
cs.enterPrecommit(ctx, height, cs.Round)
} }
} else if cs.Step == cstypes.RoundStepCommit { } else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block... // If we're waiting on the proposal block...
cs.tryFinalizeCommit(height)
cs.tryFinalizeCommit(ctx, height)
} }
return added, nil return added, nil
@ -1962,8 +1972,8 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID types.NodeID
} }
// Attempt to add the vote. if its a duplicate signature, dupeout the validator // Attempt to add the vote. if its a duplicate signature, dupeout the validator
func (cs *State) tryAddVote(vote *types.Vote, peerID types.NodeID) (bool, error) {
added, err := cs.addVote(vote, peerID)
func (cs *State) tryAddVote(ctx context.Context, vote *types.Vote, peerID types.NodeID) (bool, error) {
added, err := cs.addVote(ctx, vote, peerID)
if err != nil { if err != nil {
// If the vote height is off, we'll just ignore it, // If the vote height is off, we'll just ignore it,
// But if it's a conflicting sig, add it to the cs.evpool. // But if it's a conflicting sig, add it to the cs.evpool.
@ -2010,7 +2020,11 @@ func (cs *State) tryAddVote(vote *types.Vote, peerID types.NodeID) (bool, error)
return added, nil return added, nil
} }
func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err error) {
func (cs *State) addVote(
ctx context.Context,
vote *types.Vote,
peerID types.NodeID,
) (added bool, err error) {
cs.Logger.Debug( cs.Logger.Debug(
"adding vote", "adding vote",
"vote_height", vote.Height, "vote_height", vote.Height,
@ -2034,7 +2048,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err
} }
cs.Logger.Debug("added vote to last precommits", "last_commit", cs.LastCommit.StringShort()) cs.Logger.Debug("added vote to last precommits", "last_commit", cs.LastCommit.StringShort())
if err := cs.eventBus.PublishEventVote(types.EventDataVote{Vote: vote}); err != nil {
if err := cs.eventBus.PublishEventVote(ctx, types.EventDataVote{Vote: vote}); err != nil {
return added, err return added, err
} }
@ -2044,7 +2058,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err
if cs.config.SkipTimeoutCommit && cs.LastCommit.HasAll() { if cs.config.SkipTimeoutCommit && cs.LastCommit.HasAll() {
// go straight to new round (skip timeout commit) // go straight to new round (skip timeout commit)
// cs.scheduleTimeout(time.Duration(0), cs.Height, 0, cstypes.RoundStepNewHeight) // cs.scheduleTimeout(time.Duration(0), cs.Height, 0, cstypes.RoundStepNewHeight)
cs.enterNewRound(cs.Height, 0)
cs.enterNewRound(ctx, cs.Height, 0)
} }
return return
@ -2064,7 +2078,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err
return return
} }
if err := cs.eventBus.PublishEventVote(types.EventDataVote{Vote: vote}); err != nil {
if err := cs.eventBus.PublishEventVote(ctx, types.EventDataVote{Vote: vote}); err != nil {
return added, err return added, err
} }
cs.evsw.FireEvent(types.EventVoteValue, vote) cs.evsw.FireEvent(types.EventVoteValue, vote)
@ -2093,7 +2107,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err
cs.LockedBlock = nil cs.LockedBlock = nil
cs.LockedBlockParts = nil cs.LockedBlockParts = nil
if err := cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventUnlock(ctx, cs.RoundStateEvent()); err != nil {
return added, err return added, err
} }
} }
@ -2122,7 +2136,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err
} }
cs.evsw.FireEvent(types.EventValidBlockValue, &cs.RoundState) cs.evsw.FireEvent(types.EventValidBlockValue, &cs.RoundState)
if err := cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventValidBlock(ctx, cs.RoundStateEvent()); err != nil {
return added, err return added, err
} }
} }
@ -2132,20 +2146,20 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err
switch { switch {
case cs.Round < vote.Round && prevotes.HasTwoThirdsAny(): case cs.Round < vote.Round && prevotes.HasTwoThirdsAny():
// Round-skip if there is any 2/3+ of votes ahead of us // Round-skip if there is any 2/3+ of votes ahead of us
cs.enterNewRound(height, vote.Round)
cs.enterNewRound(ctx, height, vote.Round)
case cs.Round == vote.Round && cstypes.RoundStepPrevote <= cs.Step: // current round case cs.Round == vote.Round && cstypes.RoundStepPrevote <= cs.Step: // current round
blockID, ok := prevotes.TwoThirdsMajority() blockID, ok := prevotes.TwoThirdsMajority()
if ok && (cs.isProposalComplete() || len(blockID.Hash) == 0) { if ok && (cs.isProposalComplete() || len(blockID.Hash) == 0) {
cs.enterPrecommit(height, vote.Round)
cs.enterPrecommit(ctx, height, vote.Round)
} else if prevotes.HasTwoThirdsAny() { } else if prevotes.HasTwoThirdsAny() {
cs.enterPrevoteWait(height, vote.Round)
cs.enterPrevoteWait(ctx, height, vote.Round)
} }
case cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round: case cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round:
// If the proposal is now complete, enter prevote of cs.Round. // If the proposal is now complete, enter prevote of cs.Round.
if cs.isProposalComplete() { if cs.isProposalComplete() {
cs.enterPrevote(height, cs.Round)
cs.enterPrevote(ctx, height, cs.Round)
} }
} }
@ -2161,20 +2175,20 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err
blockID, ok := precommits.TwoThirdsMajority() blockID, ok := precommits.TwoThirdsMajority()
if ok { if ok {
// Executed as TwoThirdsMajority could be from a higher round // Executed as TwoThirdsMajority could be from a higher round
cs.enterNewRound(height, vote.Round)
cs.enterPrecommit(height, vote.Round)
cs.enterNewRound(ctx, height, vote.Round)
cs.enterPrecommit(ctx, height, vote.Round)
if len(blockID.Hash) != 0 { if len(blockID.Hash) != 0 {
cs.enterCommit(height, vote.Round)
cs.enterCommit(ctx, height, vote.Round)
if cs.config.SkipTimeoutCommit && precommits.HasAll() { if cs.config.SkipTimeoutCommit && precommits.HasAll() {
cs.enterNewRound(cs.Height, 0)
cs.enterNewRound(ctx, cs.Height, 0)
} }
} else { } else {
cs.enterPrecommitWait(height, vote.Round)
cs.enterPrecommitWait(ctx, height, vote.Round)
} }
} else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() { } else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() {
cs.enterNewRound(height, vote.Round)
cs.enterPrecommitWait(height, vote.Round)
cs.enterNewRound(ctx, height, vote.Round)
cs.enterPrecommitWait(ctx, height, vote.Round)
} }
default: default:


+ 11
- 11
internal/consensus/state_test.go View File

@ -191,7 +191,7 @@ func TestStateEnterProposeYesPrivValidator(t *testing.T) {
timeoutCh := subscribe(ctx, t, cs.eventBus, types.EventQueryTimeoutPropose) timeoutCh := subscribe(ctx, t, cs.eventBus, types.EventQueryTimeoutPropose)
proposalCh := subscribe(ctx, t, cs.eventBus, types.EventQueryCompleteProposal) proposalCh := subscribe(ctx, t, cs.eventBus, types.EventQueryCompleteProposal)
cs.enterNewRound(height, round)
cs.enterNewRound(ctx, height, round)
cs.startRoutines(ctx, 3) cs.startRoutines(ctx, 3)
ensureNewProposal(proposalCh, height, round) ensureNewProposal(proposalCh, height, round)
@ -399,7 +399,7 @@ func TestStateFullRoundNil(t *testing.T) {
voteCh := subscribe(ctx, t, cs.eventBus, types.EventQueryVote) voteCh := subscribe(ctx, t, cs.eventBus, types.EventQueryVote)
cs.enterPrevote(height, round)
cs.enterPrevote(ctx, height, round)
cs.startRoutines(ctx, 4) cs.startRoutines(ctx, 4)
ensurePrevote(voteCh, height, round) // prevote ensurePrevote(voteCh, height, round) // prevote
@ -479,7 +479,7 @@ func TestStateLockNoPOL(t *testing.T) {
*/ */
// start round and wait for prevote // start round and wait for prevote
cs1.enterNewRound(height, round)
cs1.enterNewRound(ctx, height, round)
cs1.startRoutines(ctx, 0) cs1.startRoutines(ctx, 0)
ensureNewRound(newRoundCh, height, round) ensureNewRound(newRoundCh, height, round)
@ -1986,26 +1986,26 @@ func TestStateOutputsBlockPartsStats(t *testing.T) {
} }
cs.ProposalBlockParts = types.NewPartSetFromHeader(parts.Header()) cs.ProposalBlockParts = types.NewPartSetFromHeader(parts.Header())
cs.handleMsg(msgInfo{msg, peerID})
cs.handleMsg(ctx, msgInfo{msg, peerID})
statsMessage := <-cs.statsMsgQueue statsMessage := <-cs.statsMsgQueue
require.Equal(t, msg, statsMessage.Msg, "") require.Equal(t, msg, statsMessage.Msg, "")
require.Equal(t, peerID, statsMessage.PeerID, "") require.Equal(t, peerID, statsMessage.PeerID, "")
// sending the same part from different peer // sending the same part from different peer
cs.handleMsg(msgInfo{msg, "peer2"})
cs.handleMsg(ctx, msgInfo{msg, "peer2"})
// sending the part with the same height, but different round // sending the part with the same height, but different round
msg.Round = 1 msg.Round = 1
cs.handleMsg(msgInfo{msg, peerID})
cs.handleMsg(ctx, msgInfo{msg, peerID})
// sending the part from the smaller height // sending the part from the smaller height
msg.Height = 0 msg.Height = 0
cs.handleMsg(msgInfo{msg, peerID})
cs.handleMsg(ctx, msgInfo{msg, peerID})
// sending the part from the bigger height // sending the part from the bigger height
msg.Height = 3 msg.Height = 3
cs.handleMsg(msgInfo{msg, peerID})
cs.handleMsg(ctx, msgInfo{msg, peerID})
select { select {
case <-cs.statsMsgQueue: case <-cs.statsMsgQueue:
@ -2031,20 +2031,20 @@ func TestStateOutputVoteStats(t *testing.T) {
vote := signVote(ctx, vss[1], config, tmproto.PrecommitType, randBytes, types.PartSetHeader{}) vote := signVote(ctx, vss[1], config, tmproto.PrecommitType, randBytes, types.PartSetHeader{})
voteMessage := &VoteMessage{vote} voteMessage := &VoteMessage{vote}
cs.handleMsg(msgInfo{voteMessage, peerID})
cs.handleMsg(ctx, msgInfo{voteMessage, peerID})
statsMessage := <-cs.statsMsgQueue statsMessage := <-cs.statsMsgQueue
require.Equal(t, voteMessage, statsMessage.Msg, "") require.Equal(t, voteMessage, statsMessage.Msg, "")
require.Equal(t, peerID, statsMessage.PeerID, "") require.Equal(t, peerID, statsMessage.PeerID, "")
// sending the same part from different peer // sending the same part from different peer
cs.handleMsg(msgInfo{&VoteMessage{vote}, "peer2"})
cs.handleMsg(ctx, msgInfo{&VoteMessage{vote}, "peer2"})
// sending the vote for the bigger height // sending the vote for the bigger height
incrementHeight(vss[1]) incrementHeight(vss[1])
vote = signVote(ctx, vss[1], config, tmproto.PrecommitType, randBytes, types.PartSetHeader{}) vote = signVote(ctx, vss[1], config, tmproto.PrecommitType, randBytes, types.PartSetHeader{})
cs.handleMsg(msgInfo{&VoteMessage{vote}, peerID})
cs.handleMsg(ctx, msgInfo{&VoteMessage{vote}, peerID})
select { select {
case <-cs.statsMsgQueue: case <-cs.statsMsgQueue:


+ 1
- 1
internal/consensus/wal_generator.go View File

@ -80,7 +80,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, wr io.Writer, numBloc
mempool := emptyMempool{} mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{} evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState := NewState(ctx, logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetEventBus(eventBus) consensusState.SetEventBus(eventBus)
if privValidator != nil && privValidator != (*privval.FilePV)(nil) { if privValidator != nil && privValidator != (*privval.FilePV)(nil) {
consensusState.SetPrivValidator(privValidator) consensusState.SetPrivValidator(privValidator)


+ 40
- 47
internal/eventbus/event_bus.go View File

@ -82,10 +82,7 @@ func (b *EventBus) Observe(ctx context.Context, observe func(tmpubsub.Message) e
return b.pubsub.Observe(ctx, observe, queries...) return b.pubsub.Observe(ctx, observe, queries...)
} }
func (b *EventBus) Publish(eventValue string, eventData types.TMEventData) error {
// no explicit deadline for publishing events
ctx := context.Background()
func (b *EventBus) Publish(ctx context.Context, eventValue string, eventData types.TMEventData) error {
tokens := strings.Split(types.EventTypeKey, ".") tokens := strings.Split(types.EventTypeKey, ".")
event := abci.Event{ event := abci.Event{
Type: tokens[0], Type: tokens[0],
@ -100,9 +97,7 @@ func (b *EventBus) Publish(eventValue string, eventData types.TMEventData) error
return b.pubsub.PublishWithEvents(ctx, eventData, []abci.Event{event}) return b.pubsub.PublishWithEvents(ctx, eventData, []abci.Event{event})
} }
func (b *EventBus) PublishEventNewBlock(data types.EventDataNewBlock) error {
// no explicit deadline for publishing events
ctx := context.Background()
func (b *EventBus) PublishEventNewBlock(ctx context.Context, data types.EventDataNewBlock) error {
events := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...) events := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...)
// add Tendermint-reserved new block event // add Tendermint-reserved new block event
@ -111,9 +106,9 @@ func (b *EventBus) PublishEventNewBlock(data types.EventDataNewBlock) error {
return b.pubsub.PublishWithEvents(ctx, data, events) return b.pubsub.PublishWithEvents(ctx, data, events)
} }
func (b *EventBus) PublishEventNewBlockHeader(data types.EventDataNewBlockHeader) error {
func (b *EventBus) PublishEventNewBlockHeader(ctx context.Context, data types.EventDataNewBlockHeader) error {
// no explicit deadline for publishing events // no explicit deadline for publishing events
ctx := context.Background()
events := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...) events := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...)
// add Tendermint-reserved new block header event // add Tendermint-reserved new block header event
@ -122,32 +117,30 @@ func (b *EventBus) PublishEventNewBlockHeader(data types.EventDataNewBlockHeader
return b.pubsub.PublishWithEvents(ctx, data, events) return b.pubsub.PublishWithEvents(ctx, data, events)
} }
func (b *EventBus) PublishEventNewEvidence(evidence types.EventDataNewEvidence) error {
return b.Publish(types.EventNewEvidenceValue, evidence)
func (b *EventBus) PublishEventNewEvidence(ctx context.Context, evidence types.EventDataNewEvidence) error {
return b.Publish(ctx, types.EventNewEvidenceValue, evidence)
} }
func (b *EventBus) PublishEventVote(data types.EventDataVote) error {
return b.Publish(types.EventVoteValue, data)
func (b *EventBus) PublishEventVote(ctx context.Context, data types.EventDataVote) error {
return b.Publish(ctx, types.EventVoteValue, data)
} }
func (b *EventBus) PublishEventValidBlock(data types.EventDataRoundState) error {
return b.Publish(types.EventValidBlockValue, data)
func (b *EventBus) PublishEventValidBlock(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventValidBlockValue, data)
} }
func (b *EventBus) PublishEventBlockSyncStatus(data types.EventDataBlockSyncStatus) error {
return b.Publish(types.EventBlockSyncStatusValue, data)
func (b *EventBus) PublishEventBlockSyncStatus(ctx context.Context, data types.EventDataBlockSyncStatus) error {
return b.Publish(ctx, types.EventBlockSyncStatusValue, data)
} }
func (b *EventBus) PublishEventStateSyncStatus(data types.EventDataStateSyncStatus) error {
return b.Publish(types.EventStateSyncStatusValue, data)
func (b *EventBus) PublishEventStateSyncStatus(ctx context.Context, data types.EventDataStateSyncStatus) error {
return b.Publish(ctx, types.EventStateSyncStatusValue, data)
} }
// PublishEventTx publishes tx event with events from Result. Note it will add // PublishEventTx publishes tx event with events from Result. Note it will add
// predefined keys (EventTypeKey, TxHashKey). Existing events with the same keys // predefined keys (EventTypeKey, TxHashKey). Existing events with the same keys
// will be overwritten. // will be overwritten.
func (b *EventBus) PublishEventTx(data types.EventDataTx) error {
// no explicit deadline for publishing events
ctx := context.Background()
func (b *EventBus) PublishEventTx(ctx context.Context, data types.EventDataTx) error {
events := data.Result.Events events := data.Result.Events
// add Tendermint-reserved events // add Tendermint-reserved events
@ -178,44 +171,44 @@ func (b *EventBus) PublishEventTx(data types.EventDataTx) error {
return b.pubsub.PublishWithEvents(ctx, data, events) return b.pubsub.PublishWithEvents(ctx, data, events)
} }
func (b *EventBus) PublishEventNewRoundStep(data types.EventDataRoundState) error {
return b.Publish(types.EventNewRoundStepValue, data)
func (b *EventBus) PublishEventNewRoundStep(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventNewRoundStepValue, data)
} }
func (b *EventBus) PublishEventTimeoutPropose(data types.EventDataRoundState) error {
return b.Publish(types.EventTimeoutProposeValue, data)
func (b *EventBus) PublishEventTimeoutPropose(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventTimeoutProposeValue, data)
} }
func (b *EventBus) PublishEventTimeoutWait(data types.EventDataRoundState) error {
return b.Publish(types.EventTimeoutWaitValue, data)
func (b *EventBus) PublishEventTimeoutWait(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventTimeoutWaitValue, data)
} }
func (b *EventBus) PublishEventNewRound(data types.EventDataNewRound) error {
return b.Publish(types.EventNewRoundValue, data)
func (b *EventBus) PublishEventNewRound(ctx context.Context, data types.EventDataNewRound) error {
return b.Publish(ctx, types.EventNewRoundValue, data)
} }
func (b *EventBus) PublishEventCompleteProposal(data types.EventDataCompleteProposal) error {
return b.Publish(types.EventCompleteProposalValue, data)
func (b *EventBus) PublishEventCompleteProposal(ctx context.Context, data types.EventDataCompleteProposal) error {
return b.Publish(ctx, types.EventCompleteProposalValue, data)
} }
func (b *EventBus) PublishEventPolka(data types.EventDataRoundState) error {
return b.Publish(types.EventPolkaValue, data)
func (b *EventBus) PublishEventPolka(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventPolkaValue, data)
} }
func (b *EventBus) PublishEventUnlock(data types.EventDataRoundState) error {
return b.Publish(types.EventUnlockValue, data)
func (b *EventBus) PublishEventUnlock(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventUnlockValue, data)
} }
func (b *EventBus) PublishEventRelock(data types.EventDataRoundState) error {
return b.Publish(types.EventRelockValue, data)
func (b *EventBus) PublishEventRelock(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventRelockValue, data)
} }
func (b *EventBus) PublishEventLock(data types.EventDataRoundState) error {
return b.Publish(types.EventLockValue, data)
func (b *EventBus) PublishEventLock(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventLockValue, data)
} }
func (b *EventBus) PublishEventValidatorSetUpdates(data types.EventDataValidatorSetUpdates) error {
return b.Publish(types.EventValidatorSetUpdatesValue, data)
func (b *EventBus) PublishEventValidatorSetUpdates(ctx context.Context, data types.EventDataValidatorSetUpdates) error {
return b.Publish(ctx, types.EventValidatorSetUpdatesValue, data)
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -223,22 +216,22 @@ func (b *EventBus) PublishEventValidatorSetUpdates(data types.EventDataValidator
// NopEventBus implements a types.BlockEventPublisher that discards all events. // NopEventBus implements a types.BlockEventPublisher that discards all events.
type NopEventBus struct{} type NopEventBus struct{}
func (NopEventBus) PublishEventNewBlock(types.EventDataNewBlock) error {
func (NopEventBus) PublishEventNewBlock(context.Context, types.EventDataNewBlock) error {
return nil return nil
} }
func (NopEventBus) PublishEventNewBlockHeader(types.EventDataNewBlockHeader) error {
func (NopEventBus) PublishEventNewBlockHeader(context.Context, types.EventDataNewBlockHeader) error {
return nil return nil
} }
func (NopEventBus) PublishEventNewEvidence(types.EventDataNewEvidence) error {
func (NopEventBus) PublishEventNewEvidence(context.Context, types.EventDataNewEvidence) error {
return nil return nil
} }
func (NopEventBus) PublishEventTx(types.EventDataTx) error {
func (NopEventBus) PublishEventTx(context.Context, types.EventDataTx) error {
return nil return nil
} }
func (NopEventBus) PublishEventValidatorSetUpdates(types.EventDataValidatorSetUpdates) error {
func (NopEventBus) PublishEventValidatorSetUpdates(context.Context, types.EventDataValidatorSetUpdates) error {
return nil return nil
} }

+ 22
- 22
internal/eventbus/event_bus_test.go View File

@ -55,7 +55,7 @@ func TestEventBusPublishEventTx(t *testing.T) {
assert.Equal(t, result, edt.Result) assert.Equal(t, result, edt.Result)
}() }()
err = eventBus.PublishEventTx(types.EventDataTx{
err = eventBus.PublishEventTx(ctx, types.EventDataTx{
TxResult: abci.TxResult{ TxResult: abci.TxResult{
Height: 1, Height: 1,
Index: 0, Index: 0,
@ -113,7 +113,7 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
assert.Equal(t, resultEndBlock, edt.ResultEndBlock) assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
}() }()
err = eventBus.PublishEventNewBlock(types.EventDataNewBlock{
err = eventBus.PublishEventNewBlock(ctx, types.EventDataNewBlock{
Block: block, Block: block,
BlockID: blockID, BlockID: blockID,
ResultBeginBlock: resultBeginBlock, ResultBeginBlock: resultBeginBlock,
@ -225,7 +225,7 @@ func TestEventBusPublishEventTxDuplicateKeys(t *testing.T) {
} }
}() }()
assert.NoError(t, eventBus.PublishEventTx(types.EventDataTx{
assert.NoError(t, eventBus.PublishEventTx(ctx, types.EventDataTx{
TxResult: abci.TxResult{ TxResult: abci.TxResult{
Height: 1, Height: 1,
Index: 0, Index: 0,
@ -285,7 +285,7 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
assert.Equal(t, resultEndBlock, edt.ResultEndBlock) assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
}() }()
err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
err = eventBus.PublishEventNewBlockHeader(ctx, types.EventDataNewBlockHeader{
Header: block.Header, Header: block.Header,
ResultBeginBlock: resultBeginBlock, ResultBeginBlock: resultBeginBlock,
ResultEndBlock: resultEndBlock, ResultEndBlock: resultEndBlock,
@ -327,7 +327,7 @@ func TestEventBusPublishEventNewEvidence(t *testing.T) {
assert.Equal(t, int64(4), edt.Height) assert.Equal(t, int64(4), edt.Height)
}() }()
err = eventBus.PublishEventNewEvidence(types.EventDataNewEvidence{
err = eventBus.PublishEventNewEvidence(ctx, types.EventDataNewEvidence{
Evidence: ev, Evidence: ev,
Height: 4, Height: 4,
}) })
@ -371,23 +371,23 @@ func TestEventBusPublish(t *testing.T) {
} }
}() }()
require.NoError(t, eventBus.Publish(types.EventNewBlockHeaderValue,
require.NoError(t, eventBus.Publish(ctx, types.EventNewBlockHeaderValue,
types.EventDataNewBlockHeader{})) types.EventDataNewBlockHeader{}))
require.NoError(t, eventBus.PublishEventNewBlock(types.EventDataNewBlock{}))
require.NoError(t, eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{}))
require.NoError(t, eventBus.PublishEventVote(types.EventDataVote{}))
require.NoError(t, eventBus.PublishEventNewRoundStep(types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventTimeoutPropose(types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventTimeoutWait(types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventNewRound(types.EventDataNewRound{}))
require.NoError(t, eventBus.PublishEventCompleteProposal(types.EventDataCompleteProposal{}))
require.NoError(t, eventBus.PublishEventPolka(types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventUnlock(types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventRelock(types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventLock(types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventValidatorSetUpdates(types.EventDataValidatorSetUpdates{}))
require.NoError(t, eventBus.PublishEventBlockSyncStatus(types.EventDataBlockSyncStatus{}))
require.NoError(t, eventBus.PublishEventStateSyncStatus(types.EventDataStateSyncStatus{}))
require.NoError(t, eventBus.PublishEventNewBlock(ctx, types.EventDataNewBlock{}))
require.NoError(t, eventBus.PublishEventNewBlockHeader(ctx, types.EventDataNewBlockHeader{}))
require.NoError(t, eventBus.PublishEventVote(ctx, types.EventDataVote{}))
require.NoError(t, eventBus.PublishEventNewRoundStep(ctx, types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventTimeoutPropose(ctx, types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventTimeoutWait(ctx, types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventNewRound(ctx, types.EventDataNewRound{}))
require.NoError(t, eventBus.PublishEventCompleteProposal(ctx, types.EventDataCompleteProposal{}))
require.NoError(t, eventBus.PublishEventPolka(ctx, types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventUnlock(ctx, types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventRelock(ctx, types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventLock(ctx, types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventValidatorSetUpdates(ctx, types.EventDataValidatorSetUpdates{}))
require.NoError(t, eventBus.PublishEventBlockSyncStatus(ctx, types.EventDataBlockSyncStatus{}))
require.NoError(t, eventBus.PublishEventStateSyncStatus(ctx, types.EventDataStateSyncStatus{}))
require.GreaterOrEqual(t, <-count, numEventsExpected) require.GreaterOrEqual(t, <-count, numEventsExpected)
} }
@ -473,7 +473,7 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes
eventValue = randEventValue() eventValue = randEventValue()
} }
err := eventBus.Publish(eventValue, types.EventDataString("Gamora"))
err := eventBus.Publish(ctx, eventValue, types.EventDataString("Gamora"))
if err != nil { if err != nil {
b.Error(err) b.Error(err)
} }


+ 13
- 8
internal/state/execution.go View File

@ -150,7 +150,10 @@ func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) e
// from outside this package to process and commit an entire block. // from outside this package to process and commit an entire block.
// It takes a blockID to avoid recomputing the parts hash. // It takes a blockID to avoid recomputing the parts hash.
func (blockExec *BlockExecutor) ApplyBlock( func (blockExec *BlockExecutor) ApplyBlock(
state State, blockID types.BlockID, block *types.Block,
ctx context.Context,
state State,
blockID types.BlockID,
block *types.Block,
) (State, error) { ) (State, error) {
// validate the block if we haven't already // validate the block if we haven't already
@ -232,7 +235,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
// Events are fired after everything else. // Events are fired after everything else.
// NOTE: if we crash between Commit and Save, events wont be fired during replay // NOTE: if we crash between Commit and Save, events wont be fired during replay
fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, abciResponses, validatorUpdates)
fireEvents(ctx, blockExec.logger, blockExec.eventBus, block, blockID, abciResponses, validatorUpdates)
return state, nil return state, nil
} }
@ -508,6 +511,7 @@ func updateState(
// Fire TxEvent for every tx. // Fire TxEvent for every tx.
// NOTE: if Tendermint crashes before commit, some or all of these events may be published again. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
func fireEvents( func fireEvents(
ctx context.Context,
logger log.Logger, logger log.Logger,
eventBus types.BlockEventPublisher, eventBus types.BlockEventPublisher,
block *types.Block, block *types.Block,
@ -515,7 +519,7 @@ func fireEvents(
abciResponses *tmstate.ABCIResponses, abciResponses *tmstate.ABCIResponses,
validatorUpdates []*types.Validator, validatorUpdates []*types.Validator,
) { ) {
if err := eventBus.PublishEventNewBlock(types.EventDataNewBlock{
if err := eventBus.PublishEventNewBlock(ctx, types.EventDataNewBlock{
Block: block, Block: block,
BlockID: blockID, BlockID: blockID,
ResultBeginBlock: *abciResponses.BeginBlock, ResultBeginBlock: *abciResponses.BeginBlock,
@ -524,7 +528,7 @@ func fireEvents(
logger.Error("failed publishing new block", "err", err) logger.Error("failed publishing new block", "err", err)
} }
if err := eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
if err := eventBus.PublishEventNewBlockHeader(ctx, types.EventDataNewBlockHeader{
Header: block.Header, Header: block.Header,
NumTxs: int64(len(block.Txs)), NumTxs: int64(len(block.Txs)),
ResultBeginBlock: *abciResponses.BeginBlock, ResultBeginBlock: *abciResponses.BeginBlock,
@ -535,7 +539,7 @@ func fireEvents(
if len(block.Evidence.Evidence) != 0 { if len(block.Evidence.Evidence) != 0 {
for _, ev := range block.Evidence.Evidence { for _, ev := range block.Evidence.Evidence {
if err := eventBus.PublishEventNewEvidence(types.EventDataNewEvidence{
if err := eventBus.PublishEventNewEvidence(ctx, types.EventDataNewEvidence{
Evidence: ev, Evidence: ev,
Height: block.Height, Height: block.Height,
}); err != nil { }); err != nil {
@ -545,7 +549,7 @@ func fireEvents(
} }
for i, tx := range block.Data.Txs { for i, tx := range block.Data.Txs {
if err := eventBus.PublishEventTx(types.EventDataTx{TxResult: abci.TxResult{
if err := eventBus.PublishEventTx(ctx, types.EventDataTx{TxResult: abci.TxResult{
Height: block.Height, Height: block.Height,
Index: uint32(i), Index: uint32(i),
Tx: tx, Tx: tx,
@ -556,7 +560,7 @@ func fireEvents(
} }
if len(validatorUpdates) > 0 { if len(validatorUpdates) > 0 {
if err := eventBus.PublishEventValidatorSetUpdates(
if err := eventBus.PublishEventValidatorSetUpdates(ctx,
types.EventDataValidatorSetUpdates{ValidatorUpdates: validatorUpdates}); err != nil { types.EventDataValidatorSetUpdates{ValidatorUpdates: validatorUpdates}); err != nil {
logger.Error("failed publishing event", "err", err) logger.Error("failed publishing event", "err", err)
} }
@ -569,6 +573,7 @@ func fireEvents(
// ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state. // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
// It returns the application root hash (result of abci.Commit). // It returns the application root hash (result of abci.Commit).
func ExecCommitBlock( func ExecCommitBlock(
ctx context.Context,
be *BlockExecutor, be *BlockExecutor,
appConnConsensus proxy.AppConnConsensus, appConnConsensus proxy.AppConnConsensus,
block *types.Block, block *types.Block,
@ -598,7 +603,7 @@ func ExecCommitBlock(
} }
blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(types.BlockPartSizeBytes).Header()} blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(types.BlockPartSizeBytes).Header()}
fireEvents(be.logger, be.eventBus, block, blockID, abciResponses, validatorUpdates)
fireEvents(ctx, be.logger, be.eventBus, block, blockID, abciResponses, validatorUpdates)
} }
// Commit block, get hash back // Commit block, get hash back


+ 5
- 5
internal/state/execution_test.go View File

@ -56,7 +56,7 @@ func TestApplyBlock(t *testing.T) {
block := sf.MakeBlock(state, 1, new(types.Commit)) block := sf.MakeBlock(state, 1, new(types.Commit))
blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()} blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()}
state, err = blockExec.ApplyBlock(state, blockID, block)
state, err = blockExec.ApplyBlock(ctx, state, blockID, block)
require.Nil(t, err) require.Nil(t, err)
// TODO check state and mempool // TODO check state and mempool
@ -111,7 +111,7 @@ func TestBeginBlockValidators(t *testing.T) {
// block for height 2 // block for height 2
block := sf.MakeBlock(state, 2, lastCommit) block := sf.MakeBlock(state, 2, lastCommit)
_, err = sm.ExecCommitBlock(nil, proxyApp.Consensus(), block, log.TestingLogger(), stateStore, 1, state)
_, err = sm.ExecCommitBlock(ctx, nil, proxyApp.Consensus(), block, log.TestingLogger(), stateStore, 1, state)
require.Nil(t, err, tc.desc) require.Nil(t, err, tc.desc)
// -> app receives a list of validators with a bool indicating if they signed // -> app receives a list of validators with a bool indicating if they signed
@ -219,7 +219,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
block.Header.EvidenceHash = block.Evidence.Hash() block.Header.EvidenceHash = block.Evidence.Hash()
blockID = types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()} blockID = types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()}
_, err = blockExec.ApplyBlock(state, blockID, block)
_, err = blockExec.ApplyBlock(ctx, state, blockID, block)
require.Nil(t, err) require.Nil(t, err)
// TODO check state and mempool // TODO check state and mempool
@ -404,7 +404,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
{PubKey: pk, Power: 10}, {PubKey: pk, Power: 10},
} }
state, err = blockExec.ApplyBlock(state, blockID, block)
state, err = blockExec.ApplyBlock(ctx, state, blockID, block)
require.Nil(t, err) require.Nil(t, err)
// test new validator was added to NextValidators // test new validator was added to NextValidators
if assert.Equal(t, state.Validators.Size()+1, state.NextValidators.Size()) { if assert.Equal(t, state.Validators.Size()+1, state.NextValidators.Size()) {
@ -462,7 +462,7 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
{PubKey: vp, Power: 0}, {PubKey: vp, Power: 0},
} }
assert.NotPanics(t, func() { state, err = blockExec.ApplyBlock(state, blockID, block) })
assert.NotPanics(t, func() { state, err = blockExec.ApplyBlock(ctx, state, blockID, block) })
assert.NotNil(t, err) assert.NotNil(t, err)
assert.NotEmpty(t, state.NextValidators.Validators) assert.NotEmpty(t, state.NextValidators.Validators)
} }


+ 15
- 5
internal/state/helpers_test.go View File

@ -2,6 +2,7 @@ package state_test
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"time" "time"
@ -36,15 +37,17 @@ func newTestApp() proxy.AppConns {
} }
func makeAndCommitGoodBlock( func makeAndCommitGoodBlock(
ctx context.Context,
state sm.State, state sm.State,
height int64, height int64,
lastCommit *types.Commit, lastCommit *types.Commit,
proposerAddr []byte, proposerAddr []byte,
blockExec *sm.BlockExecutor, blockExec *sm.BlockExecutor,
privVals map[string]types.PrivValidator, privVals map[string]types.PrivValidator,
evidence []types.Evidence) (sm.State, types.BlockID, *types.Commit, error) {
evidence []types.Evidence,
) (sm.State, types.BlockID, *types.Commit, error) {
// A good block passes // A good block passes
state, blockID, err := makeAndApplyGoodBlock(state, height, lastCommit, proposerAddr, blockExec, evidence)
state, blockID, err := makeAndApplyGoodBlock(ctx, state, height, lastCommit, proposerAddr, blockExec, evidence)
if err != nil { if err != nil {
return state, types.BlockID{}, nil, err return state, types.BlockID{}, nil, err
} }
@ -57,15 +60,22 @@ func makeAndCommitGoodBlock(
return state, blockID, commit, nil return state, blockID, commit, nil
} }
func makeAndApplyGoodBlock(state sm.State, height int64, lastCommit *types.Commit, proposerAddr []byte,
blockExec *sm.BlockExecutor, evidence []types.Evidence) (sm.State, types.BlockID, error) {
func makeAndApplyGoodBlock(
ctx context.Context,
state sm.State,
height int64,
lastCommit *types.Commit,
proposerAddr []byte,
blockExec *sm.BlockExecutor,
evidence []types.Evidence,
) (sm.State, types.BlockID, error) {
block, _ := state.MakeBlock(height, factory.MakeTenTxs(height), lastCommit, evidence, proposerAddr) block, _ := state.MakeBlock(height, factory.MakeTenTxs(height), lastCommit, evidence, proposerAddr)
if err := blockExec.ValidateBlock(state, block); err != nil { if err := blockExec.ValidateBlock(state, block); err != nil {
return state, types.BlockID{}, err return state, types.BlockID{}, err
} }
blockID := types.BlockID{Hash: block.Hash(), blockID := types.BlockID{Hash: block.Hash(),
PartSetHeader: types.PartSetHeader{Total: 3, Hash: tmrand.Bytes(32)}} PartSetHeader: types.PartSetHeader{Total: 3, Hash: tmrand.Bytes(32)}}
state, err := blockExec.ApplyBlock(state, blockID, block)
state, err := blockExec.ApplyBlock(ctx, state, blockID, block)
if err != nil { if err != nil {
return state, types.BlockID{}, err return state, types.BlockID{}, err
} }


+ 3
- 3
internal/state/indexer/indexer_service_test.go View File

@ -75,7 +75,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
t.Cleanup(service.Wait) t.Cleanup(service.Wait)
// publish block with txs // publish block with txs
err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
err = eventBus.PublishEventNewBlockHeader(ctx, types.EventDataNewBlockHeader{
Header: types.Header{Height: 1}, Header: types.Header{Height: 1},
NumTxs: int64(2), NumTxs: int64(2),
}) })
@ -86,7 +86,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
Tx: types.Tx("foo"), Tx: types.Tx("foo"),
Result: abci.ResponseDeliverTx{Code: 0}, Result: abci.ResponseDeliverTx{Code: 0},
} }
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult1})
err = eventBus.PublishEventTx(ctx, types.EventDataTx{TxResult: *txResult1})
require.NoError(t, err) require.NoError(t, err)
txResult2 := &abci.TxResult{ txResult2 := &abci.TxResult{
Height: 1, Height: 1,
@ -94,7 +94,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
Tx: types.Tx("bar"), Tx: types.Tx("bar"),
Result: abci.ResponseDeliverTx{Code: 0}, Result: abci.ResponseDeliverTx{Code: 0},
} }
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult2})
err = eventBus.PublishEventTx(ctx, types.EventDataTx{TxResult: *txResult2})
require.NoError(t, err) require.NoError(t, err)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)


+ 3
- 1
internal/state/validation_test.go View File

@ -103,7 +103,7 @@ func TestValidateBlockHeader(t *testing.T) {
A good block passes A good block passes
*/ */
var err error var err error
state, _, lastCommit, err = makeAndCommitGoodBlock(
state, _, lastCommit, err = makeAndCommitGoodBlock(ctx,
state, height, lastCommit, state.Validators.GetProposer().Address, blockExec, privVals, nil) state, height, lastCommit, state.Validators.GetProposer().Address, blockExec, privVals, nil)
require.NoError(t, err, "height %d", height) require.NoError(t, err, "height %d", height)
} }
@ -186,6 +186,7 @@ func TestValidateBlockCommit(t *testing.T) {
var err error var err error
var blockID types.BlockID var blockID types.BlockID
state, blockID, lastCommit, err = makeAndCommitGoodBlock( state, blockID, lastCommit, err = makeAndCommitGoodBlock(
ctx,
state, state,
height, height,
lastCommit, lastCommit,
@ -310,6 +311,7 @@ func TestValidateBlockEvidence(t *testing.T) {
var err error var err error
state, _, lastCommit, err = makeAndCommitGoodBlock( state, _, lastCommit, err = makeAndCommitGoodBlock(
ctx,
state, state,
height, height,
lastCommit, lastCommit,


+ 4
- 4
node/node.go View File

@ -313,7 +313,7 @@ func makeNode(
sm.BlockExecutorWithMetrics(nodeMetrics.state), sm.BlockExecutorWithMetrics(nodeMetrics.state),
) )
csReactor, csState, err := createConsensusReactor(
csReactor, csState, err := createConsensusReactor(ctx,
cfg, state, blockExec, blockStore, mp, evPool, cfg, state, blockExec, blockStore, mp, evPool,
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus, privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
peerManager, router, logger, peerManager, router, logger,
@ -599,7 +599,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
// At the beginning of the statesync start, we use the initialHeight as the event height // At the beginning of the statesync start, we use the initialHeight as the event height
// because of the statesync doesn't have the concreate state height before fetched the snapshot. // because of the statesync doesn't have the concreate state height before fetched the snapshot.
d := types.EventDataStateSyncStatus{Complete: false, Height: state.InitialHeight} d := types.EventDataStateSyncStatus{Complete: false, Height: state.InitialHeight}
if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil {
if err := n.eventBus.PublishEventStateSyncStatus(ctx, d); err != nil {
n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err) n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err)
} }
@ -619,7 +619,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
n.consensusReactor.SetStateSyncingMetrics(0) n.consensusReactor.SetStateSyncingMetrics(0)
if err := n.eventBus.PublishEventStateSyncStatus(
if err := n.eventBus.PublishEventStateSyncStatus(ctx,
types.EventDataStateSyncStatus{ types.EventDataStateSyncStatus{
Complete: true, Complete: true,
Height: state.LastBlockHeight, Height: state.LastBlockHeight,
@ -638,7 +638,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
return return
} }
if err := n.eventBus.PublishEventBlockSyncStatus(
if err := n.eventBus.PublishEventBlockSyncStatus(ctx,
types.EventDataBlockSyncStatus{ types.EventDataBlockSyncStatus{
Complete: false, Complete: false,
Height: state.LastBlockHeight, Height: state.LastBlockHeight,


+ 2
- 1
node/setup.go View File

@ -302,6 +302,7 @@ func createBlockchainReactor(
} }
func createConsensusReactor( func createConsensusReactor(
ctx context.Context,
cfg *config.Config, cfg *config.Config,
state sm.State, state sm.State,
blockExec *sm.BlockExecutor, blockExec *sm.BlockExecutor,
@ -318,7 +319,7 @@ func createConsensusReactor(
) (*consensus.Reactor, *consensus.State, error) { ) (*consensus.Reactor, *consensus.State, error) {
logger = logger.With("module", "consensus") logger = logger.With("module", "consensus")
consensusState := consensus.NewState(
consensusState := consensus.NewState(ctx,
logger, logger,
cfg.Consensus, cfg.Consensus,
state.Copy(), state.Copy(),


+ 7
- 6
types/events.go View File

@ -1,6 +1,7 @@
package types package types
import ( import (
"context"
"fmt" "fmt"
"strings" "strings"
@ -241,13 +242,13 @@ func QueryForEvent(eventValue string) tmpubsub.Query {
// BlockEventPublisher publishes all block related events // BlockEventPublisher publishes all block related events
type BlockEventPublisher interface { type BlockEventPublisher interface {
PublishEventNewBlock(block EventDataNewBlock) error
PublishEventNewBlockHeader(header EventDataNewBlockHeader) error
PublishEventNewEvidence(evidence EventDataNewEvidence) error
PublishEventTx(EventDataTx) error
PublishEventValidatorSetUpdates(EventDataValidatorSetUpdates) error
PublishEventNewBlock(ctx context.Context, block EventDataNewBlock) error
PublishEventNewBlockHeader(ctx context.Context, header EventDataNewBlockHeader) error
PublishEventNewEvidence(ctx context.Context, evidence EventDataNewEvidence) error
PublishEventTx(context.Context, EventDataTx) error
PublishEventValidatorSetUpdates(context.Context, EventDataValidatorSetUpdates) error
} }
type TxEventPublisher interface { type TxEventPublisher interface {
PublishEventTx(EventDataTx) error
PublishEventTx(context.Context, EventDataTx) error
} }

Loading…
Cancel
Save