From c35d6d6e2c4d8d4afee2db32a1963928e9c172b7 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 8 Mar 2022 08:04:59 -0500 Subject: [PATCH] node: pass eventbus at construction time (#8084) * node: pass eventbus at construction time * remove cruft --- internal/blocksync/reactor_test.go | 5 ++++ internal/consensus/byzantine_test.go | 5 ++-- internal/consensus/common_test.go | 10 ++++---- internal/consensus/reactor.go | 8 ++---- internal/consensus/reactor_test.go | 14 +++++----- internal/consensus/replay.go | 11 +++----- internal/consensus/replay_file.go | 8 +++--- internal/consensus/replay_test.go | 38 +++++++++++++++++++--------- internal/consensus/state.go | 8 ++---- internal/consensus/wal_generator.go | 5 ++-- internal/eventbus/event_bus.go | 25 ------------------ internal/state/execution.go | 9 ++----- internal/state/execution_test.go | 38 ++++++++++++++++++---------- internal/state/validation_test.go | 13 ++++++++++ node/node.go | 1 + node/node_test.go | 11 ++++++++ node/setup.go | 5 ++-- 17 files changed, 111 insertions(+), 103 deletions(-) diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index d73a522fa..8d3399d52 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -14,6 +14,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/consensus" + "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/mempool/mock" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/p2ptest" @@ -121,6 +122,9 @@ func (rts *reactorTestSuite) addNode( require.NoError(t, err) require.NoError(t, stateStore.Save(state)) + eventbus := eventbus.NewDefault(logger) + require.NoError(t, eventbus.Start(ctx)) + blockExec := sm.NewBlockExecutor( stateStore, log.TestingLogger(), @@ -128,6 +132,7 @@ func (rts *reactorTestSuite) addNode( mock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, + eventbus, ) for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 029561d51..4032ddacd 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -95,14 +95,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { evpool := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus) // Make State - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) - cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus) + cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus) require.NoError(t, err) // set private validator pv := privVals[i] cs.SetPrivValidator(ctx, pv) - cs.SetEventBus(eventBus) cs.SetTimeoutTicker(tickerFunc()) states[i] = cs diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index 4a6b96b2d..053e464a9 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -482,7 +482,10 @@ func newStateWithConfigAndBlockStore( stateStore := sm.NewStore(stateDB) require.NoError(t, stateStore.Save(state)) - blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore) + eventBus := eventbus.NewDefault(logger.With("module", "events")) + require.NoError(t, eventBus.Start(ctx)) + + blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore, eventBus) cs, err := NewState(ctx, logger.With("module", "consensus"), thisConfig.Consensus, @@ -491,6 +494,7 @@ func newStateWithConfigAndBlockStore( blockStore, mempool, evpool, + eventBus, ) if err != nil { t.Fatal(err) @@ -498,10 +502,6 @@ func newStateWithConfigAndBlockStore( cs.SetPrivValidator(ctx, pv) - eventBus := eventbus.NewDefault(logger.With("module", "events")) - require.NoError(t, eventBus.Start(ctx)) - - cs.SetEventBus(eventBus) return cs } diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index eb038d9f5..efb3f2d04 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -138,6 +138,7 @@ func NewReactor( cs *State, channelCreator p2p.ChannelCreator, peerUpdates *p2p.PeerUpdates, + eventBus *eventbus.EventBus, waitSync bool, metrics *Metrics, ) (*Reactor, error) { @@ -166,6 +167,7 @@ func NewReactor( state: cs, waitSync: waitSync, peers: make(map[types.NodeID]*PeerState), + eventBus: eventBus, Metrics: metrics, stateCh: stateCh, dataCh: dataCh, @@ -226,12 +228,6 @@ func (r *Reactor) OnStop() { } } -// SetEventBus sets the reactor's event bus. -func (r *Reactor) SetEventBus(b *eventbus.EventBus) { - r.eventBus = b - r.state.SetEventBus(b) -} - // WaitSync returns whether the consensus reactor is waiting for state/block sync. func (r *Reactor) WaitSync() bool { r.mtx.RLock() diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index ea9238a22..1fe395d69 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -110,13 +110,12 @@ func setup( state, chCreator(nodeID), node.MakePeerUpdates(ctx, t), + state.eventBus, true, NopMetrics(), ) require.NoError(t, err) - reactor.SetEventBus(state.eventBus) - blocksSub, err := state.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: testSubscriber, Query: types.EventQueryNewBlock, @@ -504,17 +503,16 @@ func TestReactorWithEvidence(t *testing.T) { evpool2 := sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) + eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events")) + require.NoError(t, eventBus.Start(ctx)) + + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus) cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"), - thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2) + thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2, eventBus) require.NoError(t, err) cs.SetPrivValidator(ctx, pv) - eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events")) - require.NoError(t, eventBus.Start(ctx)) - cs.SetEventBus(eventBus) - cs.SetTimeoutTicker(tickerFunc()) states[i] = cs diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index 4034b2ddf..5d097df21 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -205,7 +205,7 @@ type Handshaker struct { stateStore sm.Store initialState sm.State store sm.BlockStore - eventBus types.BlockEventPublisher + eventBus *eventbus.EventBus genDoc *types.GenesisDoc logger log.Logger @@ -217,7 +217,7 @@ func NewHandshaker( stateStore sm.Store, state sm.State, store sm.BlockStore, - eventBus types.BlockEventPublisher, + eventBus *eventbus.EventBus, genDoc *types.GenesisDoc, ) *Handshaker { @@ -484,9 +484,7 @@ func (h *Handshaker) replayBlocks( if i == finalBlock && !mutateState { // We emit events for the index services at the final block due to the sync issue when // the node shutdown during the block committing status. - blockExec := sm.NewBlockExecutor( - h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store) - blockExec.SetEventBus(h.eventBus) + blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store, h.eventBus) appHash, err = sm.ExecCommitBlock(ctx, blockExec, appClient, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) if err != nil { @@ -528,8 +526,7 @@ func (h *Handshaker) replayBlock( // Use stubs for both mempool and evidence pool since no transactions nor // evidence are needed here - block already exists. - blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store) - blockExec.SetEventBus(h.eventBus) + blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store, h.eventBus) var err error state, err = blockExec.ApplyBlock(ctx, state, meta.BlockID, block) diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index 3ab470d36..492d1d1ee 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -146,11 +146,10 @@ func (pb *playback) replayReset(ctx context.Context, count int, newStepSub event pb.cs.Wait() newCS, err := NewState(ctx, pb.cs.logger, pb.cs.config, pb.stateStore, pb.cs.blockExec, - pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool) + pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool, pb.cs.eventBus) if err != nil { return err } - newCS.SetEventBus(pb.cs.eventBus) newCS.startForReplay() if err := pb.fp.Close(); err != nil { @@ -349,13 +348,12 @@ func newConsensusStateForReplay( } mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore) + blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore, eventBus) consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec, - blockStore, mempool, evpool) + blockStore, mempool, evpool, eventBus) if err != nil { return nil, err } - consensusState.SetEventBus(eventBus) return consensusState, nil } diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index 2bda63d74..2f6a531da 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -748,6 +748,9 @@ func testHandshakeReplay( filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int()))) t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) }) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + clientCreator2 := abciclient.NewLocalClient(logger, kvstoreApp) if nBlocks > 0 { // run nBlocks against a new client to build up the app state. @@ -757,7 +760,7 @@ func testHandshakeReplay( stateStore := sm.NewStore(stateDB1) err := stateStore.Save(genesisState) require.NoError(t, err) - buildAppStateFromChain(ctx, t, proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, nBlocks, mode, store) + buildAppStateFromChain(ctx, t, proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, eventBus, nBlocks, mode, store) } // Prune block store if requested @@ -772,7 +775,7 @@ func testHandshakeReplay( // now start the app using the handshake - it should sync genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) require.NoError(t, err) - handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) + handshaker := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc) proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections") require.True(t, proxyApp.IsRunning()) @@ -822,9 +825,10 @@ func applyBlock( blk *types.Block, appClient abciclient.Client, blockStore *mockBlockStore, + eventBus *eventbus.EventBus, ) sm.State { testPartSize := types.BlockPartSizeBytes - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mempool, evpool, blockStore) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mempool, evpool, blockStore, eventBus) bps, err := blk.MakePartSet(testPartSize) require.NoError(t, err) @@ -843,6 +847,7 @@ func buildAppStateFromChain( evpool sm.EvidencePool, state sm.State, chain []*types.Block, + eventBus *eventbus.EventBus, nBlocks int, mode uint, blockStore *mockBlockStore, @@ -864,18 +869,18 @@ func buildAppStateFromChain( case 0: for i := 0; i < nBlocks; i++ { block := chain[i] - state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore) + state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore, eventBus) } case 1, 2, 3: for i := 0; i < nBlocks-1; i++ { block := chain[i] - state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore) + state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore, eventBus) } if mode == 2 || mode == 3 { // update the kvstore height and apphash // as if we ran commit but not - state = applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[nBlocks-1], appClient, blockStore) + state = applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[nBlocks-1], appClient, blockStore, eventBus) } default: require.Fail(t, "unknown mode %v", mode) @@ -917,23 +922,26 @@ func buildTMStateFromChain( require.NoError(t, stateStore.Save(state)) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + switch mode { case 0: // sync right up for _, block := range chain { - state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore) + state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore, eventBus) } case 1, 2, 3: // sync up to the penultimate as if we stored the block. // whether we commit or not depends on the appHash for _, block := range chain[:len(chain)-1] { - state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore) + state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore, eventBus) } // apply the final block to a state copy so we can // get the right next appHash but keep the state back - applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, blockStore) + applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, blockStore, eventBus) default: require.Fail(t, "unknown mode %v", mode) } @@ -970,6 +978,9 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { logger := log.TestingLogger() + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + // 2. Tendermint must panic if app returns wrong hash for the first block // - RANDOM HASH // - 0x02 @@ -983,7 +994,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { t.Cleanup(func() { cancel(); proxyApp.Wait() }) assert.Panics(t, func() { - h := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) + h := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc) if err = h.Handshake(ctx, proxyApp); err != nil { t.Log(err) } @@ -1003,7 +1014,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { t.Cleanup(func() { cancel(); proxyApp.Wait() }) assert.Panics(t, func() { - h := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) + h := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc) if err = h.Handshake(ctx, proxyApp); err != nil { t.Log(err) } @@ -1235,6 +1246,9 @@ func TestHandshakeUpdatesValidators(t *testing.T) { app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)} client := abciclient.NewLocalClient(logger, app) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + cfg, err := ResetConfig(t.TempDir(), "handshake_test_") require.NoError(t, err) t.Cleanup(func() { _ = os.RemoveAll(cfg.RootDir) }) @@ -1252,7 +1266,7 @@ func TestHandshakeUpdatesValidators(t *testing.T) { genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) require.NoError(t, err) - handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) + handshaker := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc) proxyApp := proxy.New(client, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections") diff --git a/internal/consensus/state.go b/internal/consensus/state.go index ff6ffef3d..413c4ba56 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -197,9 +197,11 @@ func NewState( blockStore sm.BlockStore, txNotifier txNotifier, evpool evidencePool, + eventBus *eventbus.EventBus, options ...StateOption, ) (*State, error) { cs := &State{ + eventBus: eventBus, logger: logger, config: cfg, blockExec: blockExec, @@ -260,12 +262,6 @@ func (cs *State) updateStateFromStore(ctx context.Context) error { return nil } -// SetEventBus sets event bus. -func (cs *State) SetEventBus(b *eventbus.EventBus) { - cs.eventBus = b - cs.blockExec.SetEventBus(b) -} - // StateMetrics sets the metrics. func StateMetrics(metrics *Metrics) StateOption { return func(cs *State) { cs.metrics = metrics } diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index 9608365c1..e99c054c0 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -82,13 +82,12 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr mempool := emptyMempool{} evpool := sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore) - consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore, eventBus) + consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus) if err != nil { t.Fatal(err) } - consensusState.SetEventBus(eventBus) if privValidator != nil && privValidator != (*privval.FilePV)(nil) { consensusState.SetPrivValidator(ctx, privValidator) } diff --git a/internal/eventbus/event_bus.go b/internal/eventbus/event_bus.go index 1d2d510e3..5a64c5f1c 100644 --- a/internal/eventbus/event_bus.go +++ b/internal/eventbus/event_bus.go @@ -194,28 +194,3 @@ func (b *EventBus) PublishEventValidatorSetUpdates(ctx context.Context, data typ func (b *EventBus) PublishEventEvidenceValidated(ctx context.Context, evidence types.EventDataEvidenceValidated) error { return b.Publish(ctx, types.EventEvidenceValidatedValue, evidence) } - -//----------------------------------------------------------------------------- - -// NopEventBus implements a types.BlockEventPublisher that discards all events. -type NopEventBus struct{} - -func (NopEventBus) PublishEventNewBlock(context.Context, types.EventDataNewBlock) error { - return nil -} - -func (NopEventBus) PublishEventNewBlockHeader(context.Context, types.EventDataNewBlockHeader) error { - return nil -} - -func (NopEventBus) PublishEventNewEvidence(context.Context, types.EventDataNewEvidence) error { - return nil -} - -func (NopEventBus) PublishEventTx(context.Context, types.EventDataTx) error { - return nil -} - -func (NopEventBus) PublishEventValidatorSetUpdates(context.Context, types.EventDataValidatorSetUpdates) error { - return nil -} diff --git a/internal/state/execution.go b/internal/state/execution.go index f7c9e02d1..ef242b567 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -64,12 +64,13 @@ func NewBlockExecutor( pool mempool.Mempool, evpool EvidencePool, blockStore BlockStore, + eventBus *eventbus.EventBus, options ...BlockExecutorOption, ) *BlockExecutor { res := &BlockExecutor{ + eventBus: eventBus, store: stateStore, appClient: appClient, - eventBus: eventbus.NopEventBus{}, mempool: pool, evpool: evpool, logger: logger, @@ -89,12 +90,6 @@ func (blockExec *BlockExecutor) Store() Store { return blockExec.store } -// SetEventBus - sets the event bus for publishing block related events. -// If not called, it defaults to types.NopEventBus. -func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) { - blockExec.eventBus = eventBus -} - // CreateProposalBlock calls state.MakeBlock with evidence from the evpool // and txs from the mempool. The max bytes must be big enough to fit the commit. // Up to 1/10th of the block space is allcoated for maximum sized evidence. diff --git a/internal/state/execution_test.go b/internal/state/execution_test.go index 8dd384b61..451a9e2bb 100644 --- a/internal/state/execution_test.go +++ b/internal/state/execution_test.go @@ -45,14 +45,15 @@ func TestApplyBlock(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := proxyApp.Start(ctx) - require.NoError(t, err) + require.NoError(t, proxyApp.Start(ctx)) + + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) state, stateDB, _ := makeState(t, 1, 1) stateStore := sm.NewStore(stateDB) blockStore := store.NewBlockStore(dbm.NewMemDB()) - blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, - mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore) + blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, eventBus) block, err := sf.MakeBlock(state, 1, new(types.Commit)) require.NoError(t, err) @@ -103,7 +104,10 @@ func TestFinalizeBlockDecidedLastCommit(t *testing.T) { evpool.On("Update", ctx, mock.Anything, mock.Anything).Return() evpool.On("CheckEvidence", ctx, mock.Anything).Return(nil) - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mmock.Mempool{}, evpool, blockStore) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mmock.Mempool{}, evpool, blockStore, eventBus) state, _, lastCommit := makeAndCommitGoodBlock(ctx, t, state, 1, new(types.Commit), state.NextValidators.Validators[0].Address, blockExec, privVals, nil) for idx, isAbsent := range tc.absentCommitSigs { @@ -212,10 +216,13 @@ func TestFinalizeBlockByzantineValidators(t *testing.T) { evpool.On("Update", ctx, mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return() evpool.On("CheckEvidence", ctx, mock.AnythingOfType("types.EvidenceList")).Return(nil) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + blockStore := store.NewBlockStore(dbm.NewMemDB()) blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, - mmock.Mempool{}, evpool, blockStore) + mmock.Mempool{}, evpool, blockStore, eventBus) block, err := sf.MakeBlock(state, 1, new(types.Commit)) require.NoError(t, err) @@ -250,6 +257,9 @@ func TestProcessProposal(t *testing.T) { stateStore := sm.NewStore(stateDB) blockStore := store.NewBlockStore(dbm.NewMemDB()) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + blockExec := sm.NewBlockExecutor( stateStore, logger, @@ -257,6 +267,7 @@ func TestProcessProposal(t *testing.T) { mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, + eventBus, ) block0, err := sf.MakeBlock(state, height-1, new(types.Commit)) @@ -453,6 +464,9 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) { stateStore := sm.NewStore(stateDB) blockStore := store.NewBlockStore(dbm.NewMemDB()) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + blockExec := sm.NewBlockExecutor( stateStore, logger, @@ -460,15 +474,9 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) { mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, + eventBus, ) - eventBus := eventbus.NewDefault(logger) - err = eventBus.Start(ctx) - require.NoError(t, err) - defer eventBus.Stop() - - blockExec.SetEventBus(eventBus) - updatesSub, err := eventBus.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: "TestFinalizeBlockValidatorUpdates", Query: types.EventQueryValidatorSetUpdates, @@ -524,6 +532,9 @@ func TestFinalizeBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { err := proxyApp.Start(ctx) require.NoError(t, err) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + state, stateDB, _ := makeState(t, 1, 1) stateStore := sm.NewStore(stateDB) blockStore := store.NewBlockStore(dbm.NewMemDB()) @@ -534,6 +545,7 @@ func TestFinalizeBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, + eventBus, ) block, err := sf.MakeBlock(state, 1, new(types.Commit)) diff --git a/internal/state/validation_test.go b/internal/state/validation_test.go index d24b5098a..74a41fdf9 100644 --- a/internal/state/validation_test.go +++ b/internal/state/validation_test.go @@ -14,6 +14,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/tmhash" + "github.com/tendermint/tendermint/internal/eventbus" memmock "github.com/tendermint/tendermint/internal/mempool/mock" "github.com/tendermint/tendermint/internal/proxy" sm "github.com/tendermint/tendermint/internal/state" @@ -36,6 +37,9 @@ func TestValidateBlockHeader(t *testing.T) { proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx)) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + state, stateDB, privVals := makeState(t, 3, 1) stateStore := sm.NewStore(stateDB) blockStore := store.NewBlockStore(dbm.NewMemDB()) @@ -46,6 +50,7 @@ func TestValidateBlockHeader(t *testing.T) { memmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, + eventBus, ) lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil) @@ -125,6 +130,9 @@ func TestValidateBlockCommit(t *testing.T) { proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx)) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + state, stateDB, privVals := makeState(t, 1, 1) stateStore := sm.NewStore(stateDB) blockStore := store.NewBlockStore(dbm.NewMemDB()) @@ -135,6 +143,7 @@ func TestValidateBlockCommit(t *testing.T) { memmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, + eventBus, ) lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil) wrongSigsCommit := types.NewCommit(1, 0, types.BlockID{}, nil) @@ -263,6 +272,9 @@ func TestValidateBlockEvidence(t *testing.T) { evpool.On("ABCIEvidence", mock.AnythingOfType("int64"), mock.AnythingOfType("[]types.Evidence")).Return( []abci.Evidence{}) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + state.ConsensusParams.Evidence.MaxBytes = 1000 blockExec := sm.NewBlockExecutor( stateStore, @@ -271,6 +283,7 @@ func TestValidateBlockEvidence(t *testing.T) { memmock.Mempool{}, evpool, blockStore, + eventBus, ) lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil) diff --git a/node/node.go b/node/node.go index dab62de09..4bb72f4f6 100644 --- a/node/node.go +++ b/node/node.go @@ -289,6 +289,7 @@ func makeNode( mp, evPool, blockStore, + eventBus, sm.BlockExecutorWithMetrics(nodeMetrics.state), ) diff --git a/node/node_test.go b/node/node_test.go index 3f8bf9141..f5b978418 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -323,6 +323,8 @@ func TestCreateProposalBlock(t *testing.T) { assert.NoError(t, err) } + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) blockExec := sm.NewBlockExecutor( stateStore, logger, @@ -330,6 +332,7 @@ func TestCreateProposalBlock(t *testing.T) { mp, evidencePool, blockStore, + eventBus, ) commit := types.NewCommit(height-1, 0, types.BlockID{}, nil) @@ -398,6 +401,9 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{}) assert.NoError(t, err) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + blockExec := sm.NewBlockExecutor( stateStore, logger, @@ -405,6 +411,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { mp, sm.EmptyEvidencePool{}, blockStore, + eventBus, ) commit := types.NewCommit(height-1, 0, types.BlockID{}, nil) @@ -469,6 +476,9 @@ func TestMaxProposalBlockSize(t *testing.T) { assert.NoError(t, err) } + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + blockExec := sm.NewBlockExecutor( stateStore, logger, @@ -476,6 +486,7 @@ func TestMaxProposalBlockSize(t *testing.T) { mp, sm.EmptyEvidencePool{}, blockStore, + eventBus, ) blockID := types.BlockID{ diff --git a/node/setup.go b/node/setup.go index 3b7fcf239..c3e229fbf 100644 --- a/node/setup.go +++ b/node/setup.go @@ -270,6 +270,7 @@ func createConsensusReactor( blockStore, mp, evidencePool, + eventBus, consensus.StateMetrics(csMetrics), ) if err != nil { @@ -286,6 +287,7 @@ func createConsensusReactor( consensusState, router.OpenChannel, peerManager.Subscribe(ctx), + eventBus, waitSync, csMetrics, ) @@ -293,9 +295,6 @@ func createConsensusReactor( return nil, nil, err } - // Services which will be publishing and/or subscribing for messages (events) - // consensusReactor will set it on consensusState and blockExecutor. - reactor.SetEventBus(eventBus) return reactor, consensusState, nil }