diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index a6845b719..ac5d45fb7 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -580,7 +580,7 @@ FOR_LOOP: // TODO: Same thing for app - but we would need a way to get the hash // without persisting the state. - state, err = r.blockExec.ApplyBlock(state, firstID, first) + state, err = r.blockExec.ApplyBlock(ctx, state, firstID, first) if err != nil { // TODO: This is bad, are we zombie? panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 5792d9e78..c5f76066a 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -158,7 +158,7 @@ func (rts *reactorTestSuite) addNode( thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} - state, err = blockExec.ApplyBlock(state, blockID, thisBlock) + state, err = blockExec.ApplyBlock(ctx, state, blockID, thisBlock) require.NoError(t, err) blockStore.SaveBlock(thisBlock, thisParts, lastCommit) diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index d9e5e46b8..4c9ccbb2e 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -90,7 +90,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(logger, thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) + cs := NewState(ctx, logger, thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) // set private validator pv := privVals[i] cs.SetPrivValidator(pv) diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index bcbbe7c88..30729c038 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -224,7 +224,7 @@ func (vss ValidatorStubsByPower) Swap(i, j int) { // Functions for transitioning the consensus state func startTestRound(ctx context.Context, cs *State, height int64, round int32) { - cs.enterNewRound(height, round) + cs.enterNewRound(ctx, height, round) cs.startRoutines(ctx, 0) } @@ -467,7 +467,15 @@ func newStateWithConfigAndBlockStore( } blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(logger.With("module", "consensus"), thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) + cs := NewState(ctx, + logger.With("module", "consensus"), + thisConfig.Consensus, + state, + blockExec, + blockStore, + mempool, + evpool, + ) cs.SetPrivValidator(pv) eventBus := eventbus.NewDefault(logger.With("module", "events")) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 03865d13d..7e46444b9 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -275,7 +275,7 @@ func (r *Reactor) SwitchToConsensus(ctx context.Context, state sm.State, skipWAL // NOTE: The line below causes broadcastNewRoundStepRoutine() to broadcast a // NewRoundStepMessage. - r.state.updateToState(state) + r.state.updateToState(ctx, state) r.mtx.Lock() r.waitSync = false @@ -299,7 +299,7 @@ conR: } d := types.EventDataBlockSyncStatus{Complete: true, Height: state.LastBlockHeight} - if err := r.eventBus.PublishEventBlockSyncStatus(d); err != nil { + if err := r.eventBus.PublishEventBlockSyncStatus(ctx, d); err != nil { r.Logger.Error("failed to emit the blocksync complete event", "err", err) } } diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index de6465f23..9e7c498cd 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -421,7 +421,7 @@ func TestReactorWithEvidence(t *testing.T) { evpool2 := sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(logger.With("validator", i, "module", "consensus"), + cs := NewState(ctx, logger.With("validator", i, "module", "consensus"), thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2) cs.SetPrivValidator(pv) diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index f40389f2b..2408b03f1 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -38,7 +38,7 @@ var crc32c = crc32.MakeTable(crc32.Castagnoli) // Unmarshal and apply a single message to the consensus state as if it were // received in receiveRoutine. Lines that start with "#" are ignored. // NOTE: receiveRoutine should not be running. -func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub eventbus.Subscription) error { +func (cs *State) readReplayMessage(ctx context.Context, msg *TimedWALMessage, newStepSub eventbus.Subscription) error { // Skip meta messages which exist for demarcating boundaries. if _, ok := msg.Msg.(EndHeightMessage); ok { return nil @@ -81,10 +81,10 @@ func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub eventbus.Sub "blockID", v.BlockID, "peer", peerID) } - cs.handleMsg(m) + cs.handleMsg(ctx, m) case timeoutInfo: cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) - cs.handleTimeout(m, cs.RoundState) + cs.handleTimeout(ctx, m, cs.RoundState) default: return fmt.Errorf("replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg)) } @@ -93,7 +93,7 @@ func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub eventbus.Sub // Replay only those messages since the last block. `timeoutRoutine` should // run concurrently to read off tickChan. -func (cs *State) catchupReplay(csHeight int64) error { +func (cs *State) catchupReplay(ctx context.Context, csHeight int64) error { // Set replayMode to true so we don't log signing errors. cs.replayMode = true @@ -160,7 +160,7 @@ LOOP: // NOTE: since the priv key is set when the msgs are received // it will attempt to eg double sign but we can just ignore it // since the votes will be replayed and we'll get to the next step - if err := cs.readReplayMessage(msg, nil); err != nil { + if err := cs.readReplayMessage(ctx, msg, nil); err != nil { return err } } @@ -390,7 +390,7 @@ func (h *Handshaker) ReplayBlocks( // Either the app is asking for replay, or we're all synced up. if appBlockHeight < storeBlockHeight { // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) - return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false) + return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, false) } else if appBlockHeight == storeBlockHeight { // We're good! @@ -405,7 +405,7 @@ func (h *Handshaker) ReplayBlocks( case appBlockHeight < stateBlockHeight: // the app is further behind than it should be, so replay blocks // but leave the last block to go through the WAL - return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true) + return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, true) case appBlockHeight == stateBlockHeight: // We haven't run Commit (both the state and app are one block behind), @@ -413,7 +413,7 @@ func (h *Handshaker) ReplayBlocks( // NOTE: We could instead use the cs.WAL on cs.Start, // but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT h.logger.Info("Replay last block using real app") - state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus()) + state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus()) return state.AppHash, err case appBlockHeight == storeBlockHeight: @@ -424,7 +424,7 @@ func (h *Handshaker) ReplayBlocks( } mockApp := newMockProxyApp(ctx, h.logger, appHash, abciResponses) h.logger.Info("Replay last block using mock app") - state, err = h.replayBlock(state, storeBlockHeight, mockApp) + state, err = h.replayBlock(ctx, state, storeBlockHeight, mockApp) return state.AppHash, err } @@ -435,6 +435,7 @@ func (h *Handshaker) ReplayBlocks( } func (h *Handshaker) replayBlocks( + ctx context.Context, state sm.State, proxyApp proxy.AppConns, appBlockHeight, @@ -474,13 +475,13 @@ func (h *Handshaker) replayBlocks( blockExec := sm.NewBlockExecutor( h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store) blockExec.SetEventBus(h.eventBus) - appHash, err = sm.ExecCommitBlock( + appHash, err = sm.ExecCommitBlock(ctx, blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) if err != nil { return nil, err } } else { - appHash, err = sm.ExecCommitBlock( + appHash, err = sm.ExecCommitBlock(ctx, nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) if err != nil { return nil, err @@ -492,7 +493,7 @@ func (h *Handshaker) replayBlocks( if mutateState { // sync the final block - state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus()) + state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus()) if err != nil { return nil, err } @@ -504,7 +505,12 @@ func (h *Handshaker) replayBlocks( } // ApplyBlock on the proxyApp with the last block. -func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) { +func (h *Handshaker) replayBlock( + ctx context.Context, + state sm.State, + height int64, + proxyApp proxy.AppConnConsensus, +) (sm.State, error) { block := h.store.LoadBlock(height) meta := h.store.LoadBlockMeta(height) @@ -514,7 +520,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap blockExec.SetEventBus(h.eventBus) var err error - state, err = blockExec.ApplyBlock(state, meta.BlockID, block) + state, err = blockExec.ApplyBlock(ctx, state, meta.BlockID, block) if err != nil { return sm.State{}, err } diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index 1de0ffa0e..ef5e88730 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -104,7 +104,7 @@ func (cs *State) ReplayFile(ctx context.Context, file string, console bool) erro return err } - if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil { + if err := pb.cs.readReplayMessage(ctx, msg, newStepSub); err != nil { return err } @@ -141,13 +141,13 @@ func newPlayback(fileName string, fp *os.File, cs *State, genState sm.State) *pl } // go back count steps by resetting the state and running (pb.count - count) steps -func (pb *playback) replayReset(count int, newStepSub eventbus.Subscription) error { +func (pb *playback) replayReset(ctx context.Context, count int, newStepSub eventbus.Subscription) error { if err := pb.cs.Stop(); err != nil { return err } pb.cs.Wait() - newCS := NewState(pb.cs.Logger, pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, + newCS := NewState(ctx, pb.cs.Logger, pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool) newCS.SetEventBus(pb.cs.eventBus) newCS.startForReplay() @@ -173,7 +173,7 @@ func (pb *playback) replayReset(count int, newStepSub eventbus.Subscription) err } else if err != nil { return err } - if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil { + if err := pb.cs.readReplayMessage(ctx, msg, newStepSub); err != nil { return err } pb.count++ @@ -254,7 +254,7 @@ func (pb *playback) replayConsoleLoop() (int, error) { }() if len(tokens) == 1 { - if err := pb.replayReset(1, newStepSub); err != nil { + if err := pb.replayReset(ctx, 1, newStepSub); err != nil { pb.cs.Logger.Error("Replay reset error", "err", err) } } else { @@ -263,7 +263,7 @@ func (pb *playback) replayConsoleLoop() (int, error) { fmt.Println("back takes an integer argument") } else if i > pb.count { fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count) - } else if err := pb.replayReset(i, newStepSub); err != nil { + } else if err := pb.replayReset(ctx, i, newStepSub); err != nil { pb.cs.Logger.Error("Replay reset error", "err", err) } } @@ -359,7 +359,7 @@ func newConsensusStateForReplay( mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore) - consensusState := NewState(logger, csConfig, state.Copy(), blockExec, + consensusState := NewState(ctx, logger, csConfig, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState.SetEventBus(eventBus) diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index bf1927742..6c33268a7 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -855,18 +855,21 @@ func testHandshakeReplay( } } -func applyBlock(stateStore sm.Store, +func applyBlock( + ctx context.Context, + stateStore sm.Store, mempool mempool.Mempool, evpool sm.EvidencePool, st sm.State, blk *types.Block, proxyApp proxy.AppConns, - blockStore *mockBlockStore) sm.State { + blockStore *mockBlockStore, +) sm.State { testPartSize := types.BlockPartSizeBytes blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) blkID := types.BlockID{Hash: blk.Hash(), PartSetHeader: blk.MakePartSet(testPartSize).Header()} - newState, err := blockExec.ApplyBlock(st, blkID, blk) + newState, err := blockExec.ApplyBlock(ctx, st, blkID, blk) if err != nil { panic(err) } @@ -904,18 +907,18 @@ func buildAppStateFromChain( case 0: for i := 0; i < nBlocks; i++ { block := chain[i] - state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore) + state = applyBlock(ctx, stateStore, mempool, evpool, state, block, proxyApp, blockStore) } case 1, 2, 3: for i := 0; i < nBlocks-1; i++ { block := chain[i] - state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore) + state = applyBlock(ctx, stateStore, mempool, evpool, state, block, proxyApp, blockStore) } if mode == 2 || mode == 3 { // update the kvstore height and apphash // as if we ran commit but not - state = applyBlock(stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp, blockStore) + state = applyBlock(ctx, stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp, blockStore) } default: panic(fmt.Sprintf("unknown mode %v", mode)) @@ -961,19 +964,19 @@ func buildTMStateFromChain( case 0: // sync right up for _, block := range chain { - state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore) + state = applyBlock(ctx, stateStore, mempool, evpool, state, block, proxyApp, blockStore) } case 1, 2, 3: // sync up to the penultimate as if we stored the block. // whether we commit or not depends on the appHash for _, block := range chain[:len(chain)-1] { - state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore) + state = applyBlock(ctx, stateStore, mempool, evpool, state, block, proxyApp, blockStore) } // apply the final block to a state copy so we can // get the right next appHash but keep the state back - applyBlock(stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, blockStore) + applyBlock(ctx, stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, blockStore) default: panic(fmt.Sprintf("unknown mode %v", mode)) } diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 8cf1a7f9f..2e9f97503 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -153,6 +153,7 @@ type StateOption func(*State) // NewState returns a new State. func NewState( + ctx context.Context, logger log.Logger, cfg *config.ConsensusConfig, state sm.State, @@ -190,7 +191,7 @@ func NewState( cs.reconstructLastCommit(state) } - cs.updateToState(state) + cs.updateToState(ctx, state) // NOTE: we do not call scheduleRound0 yet, we do that upon Start() @@ -345,7 +346,7 @@ func (cs *State) OnStart(ctx context.Context) error { LOOP: for { - err := cs.catchupReplay(cs.Height) + err := cs.catchupReplay(ctx, cs.Height) switch { case err == nil: break LOOP @@ -409,7 +410,7 @@ func (cs *State) OnStart(ctx context.Context) error { } // now start the receiveRoutine - go cs.receiveRoutine(0) + go cs.receiveRoutine(ctx, 0) // schedule the first round! // use GetRoundState so we don't race the receiveRoutine for access @@ -427,7 +428,7 @@ func (cs *State) startRoutines(ctx context.Context, maxSteps int) { return } - go cs.receiveRoutine(maxSteps) + go cs.receiveRoutine(ctx, maxSteps) } // loadWalFile loads WAL data from file. It overwrites cs.wal. @@ -625,7 +626,7 @@ func (cs *State) reconstructLastCommit(state sm.State) { // Updates State and increments height to match that of state. // The round becomes 0 and cs.Step becomes cstypes.RoundStepNewHeight. -func (cs *State) updateToState(state sm.State) { +func (cs *State) updateToState(ctx context.Context, state sm.State) { if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight { panic(fmt.Sprintf( "updateToState() expected state height of %v but found %v", @@ -660,7 +661,7 @@ func (cs *State) updateToState(state sm.State) { "new_height", state.LastBlockHeight+1, "old_height", cs.state.LastBlockHeight+1, ) - cs.newStep() + cs.newStep(ctx) return } } @@ -729,10 +730,10 @@ func (cs *State) updateToState(state sm.State) { cs.state = state // Finally, broadcast RoundState - cs.newStep() + cs.newStep(ctx) } -func (cs *State) newStep() { +func (cs *State) newStep(ctx context.Context) { rs := cs.RoundStateEvent() if err := cs.wal.Write(rs); err != nil { cs.Logger.Error("failed writing to WAL", "err", err) @@ -742,7 +743,7 @@ func (cs *State) newStep() { // newStep is called by updateToState in NewState before the eventBus is set! if cs.eventBus != nil { - if err := cs.eventBus.PublishEventNewRoundStep(rs); err != nil { + if err := cs.eventBus.PublishEventNewRoundStep(ctx, rs); err != nil { cs.Logger.Error("failed publishing new round step", "err", err) } @@ -758,7 +759,7 @@ func (cs *State) newStep() { // It keeps the RoundState and is the only thing that updates it. // Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities. // State must be locked before any internal state is updated. -func (cs *State) receiveRoutine(maxSteps int) { +func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) { onExit := func(cs *State) { // NOTE: the internalMsgQueue may have signed messages from our // priv_val that haven't hit the WAL, but its ok because @@ -804,7 +805,7 @@ func (cs *State) receiveRoutine(maxSteps int) { select { case <-cs.txNotifier.TxsAvailable(): - cs.handleTxsAvailable() + cs.handleTxsAvailable(ctx) case mi = <-cs.peerMsgQueue: if err := cs.wal.Write(mi); err != nil { @@ -813,7 +814,7 @@ func (cs *State) receiveRoutine(maxSteps int) { // handles proposals, block parts, votes // may generate internal events (votes, complete proposals, 2/3 majorities) - cs.handleMsg(mi) + cs.handleMsg(ctx, mi) case mi = <-cs.internalMsgQueue: err := cs.wal.WriteSync(mi) // NOTE: fsync @@ -833,7 +834,7 @@ func (cs *State) receiveRoutine(maxSteps int) { } // handles proposals, block parts, votes - cs.handleMsg(mi) + cs.handleMsg(ctx, mi) case ti := <-cs.timeoutTicker.Chan(): // tockChan: if err := cs.wal.Write(ti); err != nil { @@ -842,17 +843,18 @@ func (cs *State) receiveRoutine(maxSteps int) { // if the timeout is relevant to the rs // go to the next step - cs.handleTimeout(ti, rs) + cs.handleTimeout(ctx, ti, rs) case <-cs.Quit(): onExit(cs) return } + // TODO should we handle context cancels here? } } // state transitions on complete-proposal, 2/3-any, 2/3-one -func (cs *State) handleMsg(mi msgInfo) { +func (cs *State) handleMsg(ctx context.Context, mi msgInfo) { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -871,7 +873,7 @@ func (cs *State) handleMsg(mi msgInfo) { case *BlockPartMessage: // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit - added, err = cs.addProposalBlockPart(msg, peerID) + added, err = cs.addProposalBlockPart(ctx, msg, peerID) if added { cs.statsMsgQueue <- mi } @@ -889,7 +891,7 @@ func (cs *State) handleMsg(mi msgInfo) { case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature // if the vote gives us a 2/3-any or 2/3-one, we transition - added, err = cs.tryAddVote(msg.Vote, peerID) + added, err = cs.tryAddVote(ctx, msg.Vote, peerID) if added { cs.statsMsgQueue <- mi } @@ -926,7 +928,11 @@ func (cs *State) handleMsg(mi msgInfo) { } } -func (cs *State) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) { +func (cs *State) handleTimeout( + ctx context.Context, + ti timeoutInfo, + rs cstypes.RoundState, +) { cs.Logger.Debug("received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) // timeouts must be for current height, round, step @@ -943,32 +949,32 @@ func (cs *State) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) { case cstypes.RoundStepNewHeight: // NewRound event fired from enterNewRound. // XXX: should we fire timeout here (for timeout commit)? - cs.enterNewRound(ti.Height, 0) + cs.enterNewRound(ctx, ti.Height, 0) case cstypes.RoundStepNewRound: - cs.enterPropose(ti.Height, 0) + cs.enterPropose(ctx, ti.Height, 0) case cstypes.RoundStepPropose: - if err := cs.eventBus.PublishEventTimeoutPropose(cs.RoundStateEvent()); err != nil { + if err := cs.eventBus.PublishEventTimeoutPropose(ctx, cs.RoundStateEvent()); err != nil { cs.Logger.Error("failed publishing timeout propose", "err", err) } - cs.enterPrevote(ti.Height, ti.Round) + cs.enterPrevote(ctx, ti.Height, ti.Round) case cstypes.RoundStepPrevoteWait: - if err := cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent()); err != nil { + if err := cs.eventBus.PublishEventTimeoutWait(ctx, cs.RoundStateEvent()); err != nil { cs.Logger.Error("failed publishing timeout wait", "err", err) } - cs.enterPrecommit(ti.Height, ti.Round) + cs.enterPrecommit(ctx, ti.Height, ti.Round) case cstypes.RoundStepPrecommitWait: - if err := cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent()); err != nil { + if err := cs.eventBus.PublishEventTimeoutWait(ctx, cs.RoundStateEvent()); err != nil { cs.Logger.Error("failed publishing timeout wait", "err", err) } - cs.enterPrecommit(ti.Height, ti.Round) - cs.enterNewRound(ti.Height, ti.Round+1) + cs.enterPrecommit(ctx, ti.Height, ti.Round) + cs.enterNewRound(ctx, ti.Height, ti.Round+1) default: panic(fmt.Sprintf("invalid timeout step: %v", ti.Step)) @@ -976,7 +982,7 @@ func (cs *State) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) { } -func (cs *State) handleTxsAvailable() { +func (cs *State) handleTxsAvailable(ctx context.Context) { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -997,7 +1003,7 @@ func (cs *State) handleTxsAvailable() { cs.scheduleTimeout(timeoutCommit, cs.Height, 0, cstypes.RoundStepNewRound) case cstypes.RoundStepNewRound: // after timeoutCommit - cs.enterPropose(cs.Height, 0) + cs.enterPropose(ctx, cs.Height, 0) } } @@ -1011,7 +1017,7 @@ func (cs *State) handleTxsAvailable() { // Enter: +2/3 precommits for nil at (height,round-1) // Enter: +2/3 prevotes any or +2/3 precommits for block or any from (height, round) // NOTE: cs.StartTime was already set for height. -func (cs *State) enterNewRound(height int64, round int32) { +func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) { logger := cs.Logger.With("height", height, "round", round) if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != cstypes.RoundStepNewHeight) { @@ -1054,7 +1060,7 @@ func (cs *State) enterNewRound(height int64, round int32) { cs.Votes.SetRound(tmmath.SafeAddInt32(round, 1)) // also track next round (round+1) to allow round-skipping cs.TriggeredTimeoutPrecommit = false - if err := cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()); err != nil { + if err := cs.eventBus.PublishEventNewRound(ctx, cs.NewRoundEvent()); err != nil { cs.Logger.Error("failed publishing new round", "err", err) } @@ -1070,7 +1076,7 @@ func (cs *State) enterNewRound(height int64, round int32) { cstypes.RoundStepNewRound) } } else { - cs.enterPropose(height, round) + cs.enterPropose(ctx, height, round) } } @@ -1093,7 +1099,7 @@ func (cs *State) needProofBlock(height int64) bool { // Enter (CreateEmptyBlocks, CreateEmptyBlocksInterval > 0 ): // after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval // Enter (!CreateEmptyBlocks) : after enterNewRound(height,round), once txs are in the mempool -func (cs *State) enterPropose(height int64, round int32) { +func (cs *State) enterPropose(ctx context.Context, height int64, round int32) { logger := cs.Logger.With("height", height, "round", round) if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPropose <= cs.Step) { @@ -1109,13 +1115,13 @@ func (cs *State) enterPropose(height int64, round int32) { defer func() { // Done enterPropose: cs.updateRoundStep(round, cstypes.RoundStepPropose) - cs.newStep() + cs.newStep(ctx) // If we have the whole proposal + POL, then goto Prevote now. // else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart), // or else after timeoutPropose if cs.isProposalComplete() { - cs.enterPrevote(height, cs.Round) + cs.enterPrevote(ctx, height, cs.Round) } }() @@ -1271,7 +1277,7 @@ func (cs *State) createProposalBlock() (block *types.Block, blockParts *types.Pa // Enter: proposal block and POL is ready. // Prevote for LockedBlock if we're locked, or ProposalBlock if valid. // Otherwise vote nil. -func (cs *State) enterPrevote(height int64, round int32) { +func (cs *State) enterPrevote(ctx context.Context, height int64, round int32) { logger := cs.Logger.With("height", height, "round", round) if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPrevote <= cs.Step) { @@ -1285,7 +1291,7 @@ func (cs *State) enterPrevote(height int64, round int32) { defer func() { // Done enterPrevote: cs.updateRoundStep(round, cstypes.RoundStepPrevote) - cs.newStep() + cs.newStep(ctx) }() logger.Debug("entering prevote step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step)) @@ -1331,7 +1337,7 @@ func (cs *State) defaultDoPrevote(height int64, round int32) { } // Enter: any +2/3 prevotes at next round. -func (cs *State) enterPrevoteWait(height int64, round int32) { +func (cs *State) enterPrevoteWait(ctx context.Context, height int64, round int32) { logger := cs.Logger.With("height", height, "round", round) if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPrevoteWait <= cs.Step) { @@ -1354,7 +1360,7 @@ func (cs *State) enterPrevoteWait(height int64, round int32) { defer func() { // Done enterPrevoteWait: cs.updateRoundStep(round, cstypes.RoundStepPrevoteWait) - cs.newStep() + cs.newStep(ctx) }() // Wait for some more prevotes; enterPrecommit @@ -1367,7 +1373,7 @@ func (cs *State) enterPrevoteWait(height int64, round int32) { // Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round) // else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil, // else, precommit nil otherwise. -func (cs *State) enterPrecommit(height int64, round int32) { +func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32) { logger := cs.Logger.With("height", height, "round", round) if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPrecommit <= cs.Step) { @@ -1383,7 +1389,7 @@ func (cs *State) enterPrecommit(height int64, round int32) { defer func() { // Done enterPrecommit: cs.updateRoundStep(round, cstypes.RoundStepPrecommit) - cs.newStep() + cs.newStep(ctx) }() // check for a polka @@ -1402,7 +1408,7 @@ func (cs *State) enterPrecommit(height int64, round int32) { } // At this point +2/3 prevoted for a particular block or nil. - if err := cs.eventBus.PublishEventPolka(cs.RoundStateEvent()); err != nil { + if err := cs.eventBus.PublishEventPolka(ctx, cs.RoundStateEvent()); err != nil { logger.Error("failed publishing polka", "err", err) } @@ -1422,7 +1428,7 @@ func (cs *State) enterPrecommit(height int64, round int32) { cs.LockedBlock = nil cs.LockedBlockParts = nil - if err := cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()); err != nil { + if err := cs.eventBus.PublishEventUnlock(ctx, cs.RoundStateEvent()); err != nil { logger.Error("failed publishing event unlock", "err", err) } } @@ -1438,7 +1444,7 @@ func (cs *State) enterPrecommit(height int64, round int32) { logger.Debug("precommit step; +2/3 prevoted locked block; relocking") cs.LockedRound = round - if err := cs.eventBus.PublishEventRelock(cs.RoundStateEvent()); err != nil { + if err := cs.eventBus.PublishEventRelock(ctx, cs.RoundStateEvent()); err != nil { logger.Error("failed publishing event relock", "err", err) } @@ -1459,7 +1465,7 @@ func (cs *State) enterPrecommit(height int64, round int32) { cs.LockedBlock = cs.ProposalBlock cs.LockedBlockParts = cs.ProposalBlockParts - if err := cs.eventBus.PublishEventLock(cs.RoundStateEvent()); err != nil { + if err := cs.eventBus.PublishEventLock(ctx, cs.RoundStateEvent()); err != nil { logger.Error("failed publishing event lock", "err", err) } @@ -1481,7 +1487,7 @@ func (cs *State) enterPrecommit(height int64, round int32) { cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) } - if err := cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()); err != nil { + if err := cs.eventBus.PublishEventUnlock(ctx, cs.RoundStateEvent()); err != nil { logger.Error("failed publishing event unlock", "err", err) } @@ -1489,7 +1495,7 @@ func (cs *State) enterPrecommit(height int64, round int32) { } // Enter: any +2/3 precommits for next round. -func (cs *State) enterPrecommitWait(height int64, round int32) { +func (cs *State) enterPrecommitWait(ctx context.Context, height int64, round int32) { logger := cs.Logger.With("height", height, "round", round) if cs.Height != height || round < cs.Round || (cs.Round == round && cs.TriggeredTimeoutPrecommit) { @@ -1513,7 +1519,7 @@ func (cs *State) enterPrecommitWait(height int64, round int32) { defer func() { // Done enterPrecommitWait: cs.TriggeredTimeoutPrecommit = true - cs.newStep() + cs.newStep(ctx) }() // wait for some more precommits; enterNewRound @@ -1521,7 +1527,7 @@ func (cs *State) enterPrecommitWait(height int64, round int32) { } // Enter: +2/3 precommits for block -func (cs *State) enterCommit(height int64, commitRound int32) { +func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int32) { logger := cs.Logger.With("height", height, "commit_round", commitRound) if cs.Height != height || cstypes.RoundStepCommit <= cs.Step { @@ -1540,10 +1546,10 @@ func (cs *State) enterCommit(height int64, commitRound int32) { cs.updateRoundStep(cs.Round, cstypes.RoundStepCommit) cs.CommitRound = commitRound cs.CommitTime = tmtime.Now() - cs.newStep() + cs.newStep(ctx) // Maybe finalize immediately. - cs.tryFinalizeCommit(height) + cs.tryFinalizeCommit(ctx, height) }() blockID, ok := cs.Votes.Precommits(commitRound).TwoThirdsMajority() @@ -1574,7 +1580,7 @@ func (cs *State) enterCommit(height int64, commitRound int32) { cs.ProposalBlock = nil cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) - if err := cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()); err != nil { + if err := cs.eventBus.PublishEventValidBlock(ctx, cs.RoundStateEvent()); err != nil { logger.Error("failed publishing valid block", "err", err) } @@ -1584,7 +1590,7 @@ func (cs *State) enterCommit(height int64, commitRound int32) { } // If we have the block AND +2/3 commits for it, finalize. -func (cs *State) tryFinalizeCommit(height int64) { +func (cs *State) tryFinalizeCommit(ctx context.Context, height int64) { logger := cs.Logger.With("height", height) if cs.Height != height { @@ -1608,11 +1614,11 @@ func (cs *State) tryFinalizeCommit(height int64) { return } - cs.finalizeCommit(height) + cs.finalizeCommit(ctx, height) } // Increment height and goto cstypes.RoundStepNewHeight -func (cs *State) finalizeCommit(height int64) { +func (cs *State) finalizeCommit(ctx context.Context, height int64) { logger := cs.Logger.With("height", height) if cs.Height != height || cs.Step != cstypes.RoundStepCommit { @@ -1692,7 +1698,7 @@ func (cs *State) finalizeCommit(height int64) { // Execute and commit the block, update and save the state, and update the mempool. // NOTE The block.AppHash wont reflect these txs until the next block. - stateCopy, err := cs.blockExec.ApplyBlock( + stateCopy, err := cs.blockExec.ApplyBlock(ctx, stateCopy, types.BlockID{ Hash: block.Hash(), @@ -1711,7 +1717,7 @@ func (cs *State) finalizeCommit(height int64) { cs.RecordMetrics(height, block) // NewHeightStep! - cs.updateToState(stateCopy) + cs.updateToState(ctx, stateCopy) fail.Fail() // XXX @@ -1864,7 +1870,11 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error { // NOTE: block is not necessarily valid. // Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit, // once we have the full block. -func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID types.NodeID) (added bool, err error) { +func (cs *State) addProposalBlockPart( + ctx context.Context, + msg *BlockPartMessage, + peerID types.NodeID, +) (added bool, err error) { height, round, part := msg.Height, msg.Round, msg.Part // Blocks might be reused, so round mismatch is OK @@ -1918,7 +1928,7 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID types.NodeID // NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal cs.Logger.Info("received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) - if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil { + if err := cs.eventBus.PublishEventCompleteProposal(ctx, cs.CompleteProposalEvent()); err != nil { cs.Logger.Error("failed publishing event complete proposal", "err", err) } @@ -1946,13 +1956,13 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID types.NodeID if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { // Move onto the next step - cs.enterPrevote(height, cs.Round) + cs.enterPrevote(ctx, height, cs.Round) if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added - cs.enterPrecommit(height, cs.Round) + cs.enterPrecommit(ctx, height, cs.Round) } } else if cs.Step == cstypes.RoundStepCommit { // If we're waiting on the proposal block... - cs.tryFinalizeCommit(height) + cs.tryFinalizeCommit(ctx, height) } return added, nil @@ -1962,8 +1972,8 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID types.NodeID } // Attempt to add the vote. if its a duplicate signature, dupeout the validator -func (cs *State) tryAddVote(vote *types.Vote, peerID types.NodeID) (bool, error) { - added, err := cs.addVote(vote, peerID) +func (cs *State) tryAddVote(ctx context.Context, vote *types.Vote, peerID types.NodeID) (bool, error) { + added, err := cs.addVote(ctx, vote, peerID) if err != nil { // If the vote height is off, we'll just ignore it, // But if it's a conflicting sig, add it to the cs.evpool. @@ -2010,7 +2020,11 @@ func (cs *State) tryAddVote(vote *types.Vote, peerID types.NodeID) (bool, error) return added, nil } -func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err error) { +func (cs *State) addVote( + ctx context.Context, + vote *types.Vote, + peerID types.NodeID, +) (added bool, err error) { cs.Logger.Debug( "adding vote", "vote_height", vote.Height, @@ -2034,7 +2048,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err } cs.Logger.Debug("added vote to last precommits", "last_commit", cs.LastCommit.StringShort()) - if err := cs.eventBus.PublishEventVote(types.EventDataVote{Vote: vote}); err != nil { + if err := cs.eventBus.PublishEventVote(ctx, types.EventDataVote{Vote: vote}); err != nil { return added, err } @@ -2044,7 +2058,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err if cs.config.SkipTimeoutCommit && cs.LastCommit.HasAll() { // go straight to new round (skip timeout commit) // cs.scheduleTimeout(time.Duration(0), cs.Height, 0, cstypes.RoundStepNewHeight) - cs.enterNewRound(cs.Height, 0) + cs.enterNewRound(ctx, cs.Height, 0) } return @@ -2064,7 +2078,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err return } - if err := cs.eventBus.PublishEventVote(types.EventDataVote{Vote: vote}); err != nil { + if err := cs.eventBus.PublishEventVote(ctx, types.EventDataVote{Vote: vote}); err != nil { return added, err } cs.evsw.FireEvent(types.EventVoteValue, vote) @@ -2093,7 +2107,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err cs.LockedBlock = nil cs.LockedBlockParts = nil - if err := cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()); err != nil { + if err := cs.eventBus.PublishEventUnlock(ctx, cs.RoundStateEvent()); err != nil { return added, err } } @@ -2122,7 +2136,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err } cs.evsw.FireEvent(types.EventValidBlockValue, &cs.RoundState) - if err := cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()); err != nil { + if err := cs.eventBus.PublishEventValidBlock(ctx, cs.RoundStateEvent()); err != nil { return added, err } } @@ -2132,20 +2146,20 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err switch { case cs.Round < vote.Round && prevotes.HasTwoThirdsAny(): // Round-skip if there is any 2/3+ of votes ahead of us - cs.enterNewRound(height, vote.Round) + cs.enterNewRound(ctx, height, vote.Round) case cs.Round == vote.Round && cstypes.RoundStepPrevote <= cs.Step: // current round blockID, ok := prevotes.TwoThirdsMajority() if ok && (cs.isProposalComplete() || len(blockID.Hash) == 0) { - cs.enterPrecommit(height, vote.Round) + cs.enterPrecommit(ctx, height, vote.Round) } else if prevotes.HasTwoThirdsAny() { - cs.enterPrevoteWait(height, vote.Round) + cs.enterPrevoteWait(ctx, height, vote.Round) } case cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round: // If the proposal is now complete, enter prevote of cs.Round. if cs.isProposalComplete() { - cs.enterPrevote(height, cs.Round) + cs.enterPrevote(ctx, height, cs.Round) } } @@ -2161,20 +2175,20 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err blockID, ok := precommits.TwoThirdsMajority() if ok { // Executed as TwoThirdsMajority could be from a higher round - cs.enterNewRound(height, vote.Round) - cs.enterPrecommit(height, vote.Round) + cs.enterNewRound(ctx, height, vote.Round) + cs.enterPrecommit(ctx, height, vote.Round) if len(blockID.Hash) != 0 { - cs.enterCommit(height, vote.Round) + cs.enterCommit(ctx, height, vote.Round) if cs.config.SkipTimeoutCommit && precommits.HasAll() { - cs.enterNewRound(cs.Height, 0) + cs.enterNewRound(ctx, cs.Height, 0) } } else { - cs.enterPrecommitWait(height, vote.Round) + cs.enterPrecommitWait(ctx, height, vote.Round) } } else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() { - cs.enterNewRound(height, vote.Round) - cs.enterPrecommitWait(height, vote.Round) + cs.enterNewRound(ctx, height, vote.Round) + cs.enterPrecommitWait(ctx, height, vote.Round) } default: diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index 9216577d5..5d09908aa 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -191,7 +191,7 @@ func TestStateEnterProposeYesPrivValidator(t *testing.T) { timeoutCh := subscribe(ctx, t, cs.eventBus, types.EventQueryTimeoutPropose) proposalCh := subscribe(ctx, t, cs.eventBus, types.EventQueryCompleteProposal) - cs.enterNewRound(height, round) + cs.enterNewRound(ctx, height, round) cs.startRoutines(ctx, 3) ensureNewProposal(proposalCh, height, round) @@ -399,7 +399,7 @@ func TestStateFullRoundNil(t *testing.T) { voteCh := subscribe(ctx, t, cs.eventBus, types.EventQueryVote) - cs.enterPrevote(height, round) + cs.enterPrevote(ctx, height, round) cs.startRoutines(ctx, 4) ensurePrevote(voteCh, height, round) // prevote @@ -479,7 +479,7 @@ func TestStateLockNoPOL(t *testing.T) { */ // start round and wait for prevote - cs1.enterNewRound(height, round) + cs1.enterNewRound(ctx, height, round) cs1.startRoutines(ctx, 0) ensureNewRound(newRoundCh, height, round) @@ -1986,26 +1986,26 @@ func TestStateOutputsBlockPartsStats(t *testing.T) { } cs.ProposalBlockParts = types.NewPartSetFromHeader(parts.Header()) - cs.handleMsg(msgInfo{msg, peerID}) + cs.handleMsg(ctx, msgInfo{msg, peerID}) statsMessage := <-cs.statsMsgQueue require.Equal(t, msg, statsMessage.Msg, "") require.Equal(t, peerID, statsMessage.PeerID, "") // sending the same part from different peer - cs.handleMsg(msgInfo{msg, "peer2"}) + cs.handleMsg(ctx, msgInfo{msg, "peer2"}) // sending the part with the same height, but different round msg.Round = 1 - cs.handleMsg(msgInfo{msg, peerID}) + cs.handleMsg(ctx, msgInfo{msg, peerID}) // sending the part from the smaller height msg.Height = 0 - cs.handleMsg(msgInfo{msg, peerID}) + cs.handleMsg(ctx, msgInfo{msg, peerID}) // sending the part from the bigger height msg.Height = 3 - cs.handleMsg(msgInfo{msg, peerID}) + cs.handleMsg(ctx, msgInfo{msg, peerID}) select { case <-cs.statsMsgQueue: @@ -2031,20 +2031,20 @@ func TestStateOutputVoteStats(t *testing.T) { vote := signVote(ctx, vss[1], config, tmproto.PrecommitType, randBytes, types.PartSetHeader{}) voteMessage := &VoteMessage{vote} - cs.handleMsg(msgInfo{voteMessage, peerID}) + cs.handleMsg(ctx, msgInfo{voteMessage, peerID}) statsMessage := <-cs.statsMsgQueue require.Equal(t, voteMessage, statsMessage.Msg, "") require.Equal(t, peerID, statsMessage.PeerID, "") // sending the same part from different peer - cs.handleMsg(msgInfo{&VoteMessage{vote}, "peer2"}) + cs.handleMsg(ctx, msgInfo{&VoteMessage{vote}, "peer2"}) // sending the vote for the bigger height incrementHeight(vss[1]) vote = signVote(ctx, vss[1], config, tmproto.PrecommitType, randBytes, types.PartSetHeader{}) - cs.handleMsg(msgInfo{&VoteMessage{vote}, peerID}) + cs.handleMsg(ctx, msgInfo{&VoteMessage{vote}, peerID}) select { case <-cs.statsMsgQueue: diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index 35a539d64..d1130defb 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -80,7 +80,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, wr io.Writer, numBloc mempool := emptyMempool{} evpool := sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) - consensusState := NewState(logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) + consensusState := NewState(ctx, logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState.SetEventBus(eventBus) if privValidator != nil && privValidator != (*privval.FilePV)(nil) { consensusState.SetPrivValidator(privValidator) diff --git a/internal/eventbus/event_bus.go b/internal/eventbus/event_bus.go index 4b28f6fcd..61473c713 100644 --- a/internal/eventbus/event_bus.go +++ b/internal/eventbus/event_bus.go @@ -82,10 +82,7 @@ func (b *EventBus) Observe(ctx context.Context, observe func(tmpubsub.Message) e return b.pubsub.Observe(ctx, observe, queries...) } -func (b *EventBus) Publish(eventValue string, eventData types.TMEventData) error { - // no explicit deadline for publishing events - ctx := context.Background() - +func (b *EventBus) Publish(ctx context.Context, eventValue string, eventData types.TMEventData) error { tokens := strings.Split(types.EventTypeKey, ".") event := abci.Event{ Type: tokens[0], @@ -100,9 +97,7 @@ func (b *EventBus) Publish(eventValue string, eventData types.TMEventData) error return b.pubsub.PublishWithEvents(ctx, eventData, []abci.Event{event}) } -func (b *EventBus) PublishEventNewBlock(data types.EventDataNewBlock) error { - // no explicit deadline for publishing events - ctx := context.Background() +func (b *EventBus) PublishEventNewBlock(ctx context.Context, data types.EventDataNewBlock) error { events := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...) // add Tendermint-reserved new block event @@ -111,9 +106,9 @@ func (b *EventBus) PublishEventNewBlock(data types.EventDataNewBlock) error { return b.pubsub.PublishWithEvents(ctx, data, events) } -func (b *EventBus) PublishEventNewBlockHeader(data types.EventDataNewBlockHeader) error { +func (b *EventBus) PublishEventNewBlockHeader(ctx context.Context, data types.EventDataNewBlockHeader) error { // no explicit deadline for publishing events - ctx := context.Background() + events := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...) // add Tendermint-reserved new block header event @@ -122,32 +117,30 @@ func (b *EventBus) PublishEventNewBlockHeader(data types.EventDataNewBlockHeader return b.pubsub.PublishWithEvents(ctx, data, events) } -func (b *EventBus) PublishEventNewEvidence(evidence types.EventDataNewEvidence) error { - return b.Publish(types.EventNewEvidenceValue, evidence) +func (b *EventBus) PublishEventNewEvidence(ctx context.Context, evidence types.EventDataNewEvidence) error { + return b.Publish(ctx, types.EventNewEvidenceValue, evidence) } -func (b *EventBus) PublishEventVote(data types.EventDataVote) error { - return b.Publish(types.EventVoteValue, data) +func (b *EventBus) PublishEventVote(ctx context.Context, data types.EventDataVote) error { + return b.Publish(ctx, types.EventVoteValue, data) } -func (b *EventBus) PublishEventValidBlock(data types.EventDataRoundState) error { - return b.Publish(types.EventValidBlockValue, data) +func (b *EventBus) PublishEventValidBlock(ctx context.Context, data types.EventDataRoundState) error { + return b.Publish(ctx, types.EventValidBlockValue, data) } -func (b *EventBus) PublishEventBlockSyncStatus(data types.EventDataBlockSyncStatus) error { - return b.Publish(types.EventBlockSyncStatusValue, data) +func (b *EventBus) PublishEventBlockSyncStatus(ctx context.Context, data types.EventDataBlockSyncStatus) error { + return b.Publish(ctx, types.EventBlockSyncStatusValue, data) } -func (b *EventBus) PublishEventStateSyncStatus(data types.EventDataStateSyncStatus) error { - return b.Publish(types.EventStateSyncStatusValue, data) +func (b *EventBus) PublishEventStateSyncStatus(ctx context.Context, data types.EventDataStateSyncStatus) error { + return b.Publish(ctx, types.EventStateSyncStatusValue, data) } // PublishEventTx publishes tx event with events from Result. Note it will add // predefined keys (EventTypeKey, TxHashKey). Existing events with the same keys // will be overwritten. -func (b *EventBus) PublishEventTx(data types.EventDataTx) error { - // no explicit deadline for publishing events - ctx := context.Background() +func (b *EventBus) PublishEventTx(ctx context.Context, data types.EventDataTx) error { events := data.Result.Events // add Tendermint-reserved events @@ -178,44 +171,44 @@ func (b *EventBus) PublishEventTx(data types.EventDataTx) error { return b.pubsub.PublishWithEvents(ctx, data, events) } -func (b *EventBus) PublishEventNewRoundStep(data types.EventDataRoundState) error { - return b.Publish(types.EventNewRoundStepValue, data) +func (b *EventBus) PublishEventNewRoundStep(ctx context.Context, data types.EventDataRoundState) error { + return b.Publish(ctx, types.EventNewRoundStepValue, data) } -func (b *EventBus) PublishEventTimeoutPropose(data types.EventDataRoundState) error { - return b.Publish(types.EventTimeoutProposeValue, data) +func (b *EventBus) PublishEventTimeoutPropose(ctx context.Context, data types.EventDataRoundState) error { + return b.Publish(ctx, types.EventTimeoutProposeValue, data) } -func (b *EventBus) PublishEventTimeoutWait(data types.EventDataRoundState) error { - return b.Publish(types.EventTimeoutWaitValue, data) +func (b *EventBus) PublishEventTimeoutWait(ctx context.Context, data types.EventDataRoundState) error { + return b.Publish(ctx, types.EventTimeoutWaitValue, data) } -func (b *EventBus) PublishEventNewRound(data types.EventDataNewRound) error { - return b.Publish(types.EventNewRoundValue, data) +func (b *EventBus) PublishEventNewRound(ctx context.Context, data types.EventDataNewRound) error { + return b.Publish(ctx, types.EventNewRoundValue, data) } -func (b *EventBus) PublishEventCompleteProposal(data types.EventDataCompleteProposal) error { - return b.Publish(types.EventCompleteProposalValue, data) +func (b *EventBus) PublishEventCompleteProposal(ctx context.Context, data types.EventDataCompleteProposal) error { + return b.Publish(ctx, types.EventCompleteProposalValue, data) } -func (b *EventBus) PublishEventPolka(data types.EventDataRoundState) error { - return b.Publish(types.EventPolkaValue, data) +func (b *EventBus) PublishEventPolka(ctx context.Context, data types.EventDataRoundState) error { + return b.Publish(ctx, types.EventPolkaValue, data) } -func (b *EventBus) PublishEventUnlock(data types.EventDataRoundState) error { - return b.Publish(types.EventUnlockValue, data) +func (b *EventBus) PublishEventUnlock(ctx context.Context, data types.EventDataRoundState) error { + return b.Publish(ctx, types.EventUnlockValue, data) } -func (b *EventBus) PublishEventRelock(data types.EventDataRoundState) error { - return b.Publish(types.EventRelockValue, data) +func (b *EventBus) PublishEventRelock(ctx context.Context, data types.EventDataRoundState) error { + return b.Publish(ctx, types.EventRelockValue, data) } -func (b *EventBus) PublishEventLock(data types.EventDataRoundState) error { - return b.Publish(types.EventLockValue, data) +func (b *EventBus) PublishEventLock(ctx context.Context, data types.EventDataRoundState) error { + return b.Publish(ctx, types.EventLockValue, data) } -func (b *EventBus) PublishEventValidatorSetUpdates(data types.EventDataValidatorSetUpdates) error { - return b.Publish(types.EventValidatorSetUpdatesValue, data) +func (b *EventBus) PublishEventValidatorSetUpdates(ctx context.Context, data types.EventDataValidatorSetUpdates) error { + return b.Publish(ctx, types.EventValidatorSetUpdatesValue, data) } //----------------------------------------------------------------------------- @@ -223,22 +216,22 @@ func (b *EventBus) PublishEventValidatorSetUpdates(data types.EventDataValidator // NopEventBus implements a types.BlockEventPublisher that discards all events. type NopEventBus struct{} -func (NopEventBus) PublishEventNewBlock(types.EventDataNewBlock) error { +func (NopEventBus) PublishEventNewBlock(context.Context, types.EventDataNewBlock) error { return nil } -func (NopEventBus) PublishEventNewBlockHeader(types.EventDataNewBlockHeader) error { +func (NopEventBus) PublishEventNewBlockHeader(context.Context, types.EventDataNewBlockHeader) error { return nil } -func (NopEventBus) PublishEventNewEvidence(types.EventDataNewEvidence) error { +func (NopEventBus) PublishEventNewEvidence(context.Context, types.EventDataNewEvidence) error { return nil } -func (NopEventBus) PublishEventTx(types.EventDataTx) error { +func (NopEventBus) PublishEventTx(context.Context, types.EventDataTx) error { return nil } -func (NopEventBus) PublishEventValidatorSetUpdates(types.EventDataValidatorSetUpdates) error { +func (NopEventBus) PublishEventValidatorSetUpdates(context.Context, types.EventDataValidatorSetUpdates) error { return nil } diff --git a/internal/eventbus/event_bus_test.go b/internal/eventbus/event_bus_test.go index c65c8cdd6..06f2bfa64 100644 --- a/internal/eventbus/event_bus_test.go +++ b/internal/eventbus/event_bus_test.go @@ -55,7 +55,7 @@ func TestEventBusPublishEventTx(t *testing.T) { assert.Equal(t, result, edt.Result) }() - err = eventBus.PublishEventTx(types.EventDataTx{ + err = eventBus.PublishEventTx(ctx, types.EventDataTx{ TxResult: abci.TxResult{ Height: 1, Index: 0, @@ -113,7 +113,7 @@ func TestEventBusPublishEventNewBlock(t *testing.T) { assert.Equal(t, resultEndBlock, edt.ResultEndBlock) }() - err = eventBus.PublishEventNewBlock(types.EventDataNewBlock{ + err = eventBus.PublishEventNewBlock(ctx, types.EventDataNewBlock{ Block: block, BlockID: blockID, ResultBeginBlock: resultBeginBlock, @@ -225,7 +225,7 @@ func TestEventBusPublishEventTxDuplicateKeys(t *testing.T) { } }() - assert.NoError(t, eventBus.PublishEventTx(types.EventDataTx{ + assert.NoError(t, eventBus.PublishEventTx(ctx, types.EventDataTx{ TxResult: abci.TxResult{ Height: 1, Index: 0, @@ -285,7 +285,7 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) { assert.Equal(t, resultEndBlock, edt.ResultEndBlock) }() - err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{ + err = eventBus.PublishEventNewBlockHeader(ctx, types.EventDataNewBlockHeader{ Header: block.Header, ResultBeginBlock: resultBeginBlock, ResultEndBlock: resultEndBlock, @@ -327,7 +327,7 @@ func TestEventBusPublishEventNewEvidence(t *testing.T) { assert.Equal(t, int64(4), edt.Height) }() - err = eventBus.PublishEventNewEvidence(types.EventDataNewEvidence{ + err = eventBus.PublishEventNewEvidence(ctx, types.EventDataNewEvidence{ Evidence: ev, Height: 4, }) @@ -371,23 +371,23 @@ func TestEventBusPublish(t *testing.T) { } }() - require.NoError(t, eventBus.Publish(types.EventNewBlockHeaderValue, + require.NoError(t, eventBus.Publish(ctx, types.EventNewBlockHeaderValue, types.EventDataNewBlockHeader{})) - require.NoError(t, eventBus.PublishEventNewBlock(types.EventDataNewBlock{})) - require.NoError(t, eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{})) - require.NoError(t, eventBus.PublishEventVote(types.EventDataVote{})) - require.NoError(t, eventBus.PublishEventNewRoundStep(types.EventDataRoundState{})) - require.NoError(t, eventBus.PublishEventTimeoutPropose(types.EventDataRoundState{})) - require.NoError(t, eventBus.PublishEventTimeoutWait(types.EventDataRoundState{})) - require.NoError(t, eventBus.PublishEventNewRound(types.EventDataNewRound{})) - require.NoError(t, eventBus.PublishEventCompleteProposal(types.EventDataCompleteProposal{})) - require.NoError(t, eventBus.PublishEventPolka(types.EventDataRoundState{})) - require.NoError(t, eventBus.PublishEventUnlock(types.EventDataRoundState{})) - require.NoError(t, eventBus.PublishEventRelock(types.EventDataRoundState{})) - require.NoError(t, eventBus.PublishEventLock(types.EventDataRoundState{})) - require.NoError(t, eventBus.PublishEventValidatorSetUpdates(types.EventDataValidatorSetUpdates{})) - require.NoError(t, eventBus.PublishEventBlockSyncStatus(types.EventDataBlockSyncStatus{})) - require.NoError(t, eventBus.PublishEventStateSyncStatus(types.EventDataStateSyncStatus{})) + require.NoError(t, eventBus.PublishEventNewBlock(ctx, types.EventDataNewBlock{})) + require.NoError(t, eventBus.PublishEventNewBlockHeader(ctx, types.EventDataNewBlockHeader{})) + require.NoError(t, eventBus.PublishEventVote(ctx, types.EventDataVote{})) + require.NoError(t, eventBus.PublishEventNewRoundStep(ctx, types.EventDataRoundState{})) + require.NoError(t, eventBus.PublishEventTimeoutPropose(ctx, types.EventDataRoundState{})) + require.NoError(t, eventBus.PublishEventTimeoutWait(ctx, types.EventDataRoundState{})) + require.NoError(t, eventBus.PublishEventNewRound(ctx, types.EventDataNewRound{})) + require.NoError(t, eventBus.PublishEventCompleteProposal(ctx, types.EventDataCompleteProposal{})) + require.NoError(t, eventBus.PublishEventPolka(ctx, types.EventDataRoundState{})) + require.NoError(t, eventBus.PublishEventUnlock(ctx, types.EventDataRoundState{})) + require.NoError(t, eventBus.PublishEventRelock(ctx, types.EventDataRoundState{})) + require.NoError(t, eventBus.PublishEventLock(ctx, types.EventDataRoundState{})) + require.NoError(t, eventBus.PublishEventValidatorSetUpdates(ctx, types.EventDataValidatorSetUpdates{})) + require.NoError(t, eventBus.PublishEventBlockSyncStatus(ctx, types.EventDataBlockSyncStatus{})) + require.NoError(t, eventBus.PublishEventStateSyncStatus(ctx, types.EventDataStateSyncStatus{})) require.GreaterOrEqual(t, <-count, numEventsExpected) } @@ -473,7 +473,7 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes eventValue = randEventValue() } - err := eventBus.Publish(eventValue, types.EventDataString("Gamora")) + err := eventBus.Publish(ctx, eventValue, types.EventDataString("Gamora")) if err != nil { b.Error(err) } diff --git a/internal/state/execution.go b/internal/state/execution.go index e4a1ba6c3..85e96b017 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -150,7 +150,10 @@ func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) e // from outside this package to process and commit an entire block. // It takes a blockID to avoid recomputing the parts hash. func (blockExec *BlockExecutor) ApplyBlock( - state State, blockID types.BlockID, block *types.Block, + ctx context.Context, + state State, + blockID types.BlockID, + block *types.Block, ) (State, error) { // validate the block if we haven't already @@ -232,7 +235,7 @@ func (blockExec *BlockExecutor) ApplyBlock( // Events are fired after everything else. // NOTE: if we crash between Commit and Save, events wont be fired during replay - fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, abciResponses, validatorUpdates) + fireEvents(ctx, blockExec.logger, blockExec.eventBus, block, blockID, abciResponses, validatorUpdates) return state, nil } @@ -508,6 +511,7 @@ func updateState( // Fire TxEvent for every tx. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again. func fireEvents( + ctx context.Context, logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, @@ -515,7 +519,7 @@ func fireEvents( abciResponses *tmstate.ABCIResponses, validatorUpdates []*types.Validator, ) { - if err := eventBus.PublishEventNewBlock(types.EventDataNewBlock{ + if err := eventBus.PublishEventNewBlock(ctx, types.EventDataNewBlock{ Block: block, BlockID: blockID, ResultBeginBlock: *abciResponses.BeginBlock, @@ -524,7 +528,7 @@ func fireEvents( logger.Error("failed publishing new block", "err", err) } - if err := eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{ + if err := eventBus.PublishEventNewBlockHeader(ctx, types.EventDataNewBlockHeader{ Header: block.Header, NumTxs: int64(len(block.Txs)), ResultBeginBlock: *abciResponses.BeginBlock, @@ -535,7 +539,7 @@ func fireEvents( if len(block.Evidence.Evidence) != 0 { for _, ev := range block.Evidence.Evidence { - if err := eventBus.PublishEventNewEvidence(types.EventDataNewEvidence{ + if err := eventBus.PublishEventNewEvidence(ctx, types.EventDataNewEvidence{ Evidence: ev, Height: block.Height, }); err != nil { @@ -545,7 +549,7 @@ func fireEvents( } for i, tx := range block.Data.Txs { - if err := eventBus.PublishEventTx(types.EventDataTx{TxResult: abci.TxResult{ + if err := eventBus.PublishEventTx(ctx, types.EventDataTx{TxResult: abci.TxResult{ Height: block.Height, Index: uint32(i), Tx: tx, @@ -556,7 +560,7 @@ func fireEvents( } if len(validatorUpdates) > 0 { - if err := eventBus.PublishEventValidatorSetUpdates( + if err := eventBus.PublishEventValidatorSetUpdates(ctx, types.EventDataValidatorSetUpdates{ValidatorUpdates: validatorUpdates}); err != nil { logger.Error("failed publishing event", "err", err) } @@ -569,6 +573,7 @@ func fireEvents( // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state. // It returns the application root hash (result of abci.Commit). func ExecCommitBlock( + ctx context.Context, be *BlockExecutor, appConnConsensus proxy.AppConnConsensus, block *types.Block, @@ -598,7 +603,7 @@ func ExecCommitBlock( } blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(types.BlockPartSizeBytes).Header()} - fireEvents(be.logger, be.eventBus, block, blockID, abciResponses, validatorUpdates) + fireEvents(ctx, be.logger, be.eventBus, block, blockID, abciResponses, validatorUpdates) } // Commit block, get hash back diff --git a/internal/state/execution_test.go b/internal/state/execution_test.go index 5da5adbb5..4f68bd016 100644 --- a/internal/state/execution_test.go +++ b/internal/state/execution_test.go @@ -56,7 +56,7 @@ func TestApplyBlock(t *testing.T) { block := sf.MakeBlock(state, 1, new(types.Commit)) blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()} - state, err = blockExec.ApplyBlock(state, blockID, block) + state, err = blockExec.ApplyBlock(ctx, state, blockID, block) require.Nil(t, err) // TODO check state and mempool @@ -111,7 +111,7 @@ func TestBeginBlockValidators(t *testing.T) { // block for height 2 block := sf.MakeBlock(state, 2, lastCommit) - _, err = sm.ExecCommitBlock(nil, proxyApp.Consensus(), block, log.TestingLogger(), stateStore, 1, state) + _, err = sm.ExecCommitBlock(ctx, nil, proxyApp.Consensus(), block, log.TestingLogger(), stateStore, 1, state) require.Nil(t, err, tc.desc) // -> app receives a list of validators with a bool indicating if they signed @@ -219,7 +219,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) { block.Header.EvidenceHash = block.Evidence.Hash() blockID = types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()} - _, err = blockExec.ApplyBlock(state, blockID, block) + _, err = blockExec.ApplyBlock(ctx, state, blockID, block) require.Nil(t, err) // TODO check state and mempool @@ -404,7 +404,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { {PubKey: pk, Power: 10}, } - state, err = blockExec.ApplyBlock(state, blockID, block) + state, err = blockExec.ApplyBlock(ctx, state, blockID, block) require.Nil(t, err) // test new validator was added to NextValidators if assert.Equal(t, state.Validators.Size()+1, state.NextValidators.Size()) { @@ -462,7 +462,7 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { {PubKey: vp, Power: 0}, } - assert.NotPanics(t, func() { state, err = blockExec.ApplyBlock(state, blockID, block) }) + assert.NotPanics(t, func() { state, err = blockExec.ApplyBlock(ctx, state, blockID, block) }) assert.NotNil(t, err) assert.NotEmpty(t, state.NextValidators.Validators) } diff --git a/internal/state/helpers_test.go b/internal/state/helpers_test.go index 821c0757e..20d931f62 100644 --- a/internal/state/helpers_test.go +++ b/internal/state/helpers_test.go @@ -2,6 +2,7 @@ package state_test import ( "bytes" + "context" "fmt" "time" @@ -36,15 +37,17 @@ func newTestApp() proxy.AppConns { } func makeAndCommitGoodBlock( + ctx context.Context, state sm.State, height int64, lastCommit *types.Commit, proposerAddr []byte, blockExec *sm.BlockExecutor, privVals map[string]types.PrivValidator, - evidence []types.Evidence) (sm.State, types.BlockID, *types.Commit, error) { + evidence []types.Evidence, +) (sm.State, types.BlockID, *types.Commit, error) { // A good block passes - state, blockID, err := makeAndApplyGoodBlock(state, height, lastCommit, proposerAddr, blockExec, evidence) + state, blockID, err := makeAndApplyGoodBlock(ctx, state, height, lastCommit, proposerAddr, blockExec, evidence) if err != nil { return state, types.BlockID{}, nil, err } @@ -57,15 +60,22 @@ func makeAndCommitGoodBlock( return state, blockID, commit, nil } -func makeAndApplyGoodBlock(state sm.State, height int64, lastCommit *types.Commit, proposerAddr []byte, - blockExec *sm.BlockExecutor, evidence []types.Evidence) (sm.State, types.BlockID, error) { +func makeAndApplyGoodBlock( + ctx context.Context, + state sm.State, + height int64, + lastCommit *types.Commit, + proposerAddr []byte, + blockExec *sm.BlockExecutor, + evidence []types.Evidence, +) (sm.State, types.BlockID, error) { block, _ := state.MakeBlock(height, factory.MakeTenTxs(height), lastCommit, evidence, proposerAddr) if err := blockExec.ValidateBlock(state, block); err != nil { return state, types.BlockID{}, err } blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: types.PartSetHeader{Total: 3, Hash: tmrand.Bytes(32)}} - state, err := blockExec.ApplyBlock(state, blockID, block) + state, err := blockExec.ApplyBlock(ctx, state, blockID, block) if err != nil { return state, types.BlockID{}, err } diff --git a/internal/state/indexer/indexer_service_test.go b/internal/state/indexer/indexer_service_test.go index 879cf8820..d24744aa9 100644 --- a/internal/state/indexer/indexer_service_test.go +++ b/internal/state/indexer/indexer_service_test.go @@ -75,7 +75,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { t.Cleanup(service.Wait) // publish block with txs - err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{ + err = eventBus.PublishEventNewBlockHeader(ctx, types.EventDataNewBlockHeader{ Header: types.Header{Height: 1}, NumTxs: int64(2), }) @@ -86,7 +86,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { Tx: types.Tx("foo"), Result: abci.ResponseDeliverTx{Code: 0}, } - err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult1}) + err = eventBus.PublishEventTx(ctx, types.EventDataTx{TxResult: *txResult1}) require.NoError(t, err) txResult2 := &abci.TxResult{ Height: 1, @@ -94,7 +94,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { Tx: types.Tx("bar"), Result: abci.ResponseDeliverTx{Code: 0}, } - err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult2}) + err = eventBus.PublishEventTx(ctx, types.EventDataTx{TxResult: *txResult2}) require.NoError(t, err) time.Sleep(100 * time.Millisecond) diff --git a/internal/state/validation_test.go b/internal/state/validation_test.go index 65c0648d4..19b830709 100644 --- a/internal/state/validation_test.go +++ b/internal/state/validation_test.go @@ -103,7 +103,7 @@ func TestValidateBlockHeader(t *testing.T) { A good block passes */ var err error - state, _, lastCommit, err = makeAndCommitGoodBlock( + state, _, lastCommit, err = makeAndCommitGoodBlock(ctx, state, height, lastCommit, state.Validators.GetProposer().Address, blockExec, privVals, nil) require.NoError(t, err, "height %d", height) } @@ -186,6 +186,7 @@ func TestValidateBlockCommit(t *testing.T) { var err error var blockID types.BlockID state, blockID, lastCommit, err = makeAndCommitGoodBlock( + ctx, state, height, lastCommit, @@ -310,6 +311,7 @@ func TestValidateBlockEvidence(t *testing.T) { var err error state, _, lastCommit, err = makeAndCommitGoodBlock( + ctx, state, height, lastCommit, diff --git a/node/node.go b/node/node.go index 7d3b56b47..86ae173e5 100644 --- a/node/node.go +++ b/node/node.go @@ -313,7 +313,7 @@ func makeNode( sm.BlockExecutorWithMetrics(nodeMetrics.state), ) - csReactor, csState, err := createConsensusReactor( + csReactor, csState, err := createConsensusReactor(ctx, cfg, state, blockExec, blockStore, mp, evPool, privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus, peerManager, router, logger, @@ -599,7 +599,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { // At the beginning of the statesync start, we use the initialHeight as the event height // because of the statesync doesn't have the concreate state height before fetched the snapshot. d := types.EventDataStateSyncStatus{Complete: false, Height: state.InitialHeight} - if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil { + if err := n.eventBus.PublishEventStateSyncStatus(ctx, d); err != nil { n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err) } @@ -619,7 +619,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { n.consensusReactor.SetStateSyncingMetrics(0) - if err := n.eventBus.PublishEventStateSyncStatus( + if err := n.eventBus.PublishEventStateSyncStatus(ctx, types.EventDataStateSyncStatus{ Complete: true, Height: state.LastBlockHeight, @@ -638,7 +638,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { return } - if err := n.eventBus.PublishEventBlockSyncStatus( + if err := n.eventBus.PublishEventBlockSyncStatus(ctx, types.EventDataBlockSyncStatus{ Complete: false, Height: state.LastBlockHeight, diff --git a/node/setup.go b/node/setup.go index 6ca991484..55ca592e0 100644 --- a/node/setup.go +++ b/node/setup.go @@ -302,6 +302,7 @@ func createBlockchainReactor( } func createConsensusReactor( + ctx context.Context, cfg *config.Config, state sm.State, blockExec *sm.BlockExecutor, @@ -318,7 +319,7 @@ func createConsensusReactor( ) (*consensus.Reactor, *consensus.State, error) { logger = logger.With("module", "consensus") - consensusState := consensus.NewState( + consensusState := consensus.NewState(ctx, logger, cfg.Consensus, state.Copy(), diff --git a/types/events.go b/types/events.go index 7bb183fb7..86935ba25 100644 --- a/types/events.go +++ b/types/events.go @@ -1,6 +1,7 @@ package types import ( + "context" "fmt" "strings" @@ -241,13 +242,13 @@ func QueryForEvent(eventValue string) tmpubsub.Query { // BlockEventPublisher publishes all block related events type BlockEventPublisher interface { - PublishEventNewBlock(block EventDataNewBlock) error - PublishEventNewBlockHeader(header EventDataNewBlockHeader) error - PublishEventNewEvidence(evidence EventDataNewEvidence) error - PublishEventTx(EventDataTx) error - PublishEventValidatorSetUpdates(EventDataValidatorSetUpdates) error + PublishEventNewBlock(ctx context.Context, block EventDataNewBlock) error + PublishEventNewBlockHeader(ctx context.Context, header EventDataNewBlockHeader) error + PublishEventNewEvidence(ctx context.Context, evidence EventDataNewEvidence) error + PublishEventTx(context.Context, EventDataTx) error + PublishEventValidatorSetUpdates(context.Context, EventDataValidatorSetUpdates) error } type TxEventPublisher interface { - PublishEventTx(EventDataTx) error + PublishEventTx(context.Context, EventDataTx) error }