From 746b5f015ae40eddecfb5fd7702126b6250a7dff Mon Sep 17 00:00:00 2001 From: tycho garen Date: Mon, 7 Mar 2022 09:31:26 -0500 Subject: [PATCH] evidence: manage and initialize state objects more clearly in the pool --- internal/evidence/pool.go | 68 ++++++++++---------- internal/evidence/pool_test.go | 101 ++++++++++++++++-------------- internal/evidence/reactor_test.go | 9 +-- internal/evidence/verify_test.go | 60 ++++++++++-------- node/node.go | 12 +++- node/node_test.go | 3 +- node/setup.go | 7 +-- 7 files changed, 138 insertions(+), 122 deletions(-) diff --git a/internal/evidence/pool.go b/internal/evidence/pool.go index f4afb1f8c..261fe2108 100644 --- a/internal/evidence/pool.go +++ b/internal/evidence/pool.go @@ -36,14 +36,14 @@ type Pool struct { evidenceList *clist.CList // concurrent linked-list of evidence evidenceSize uint32 // amount of pending evidence - // needed to load validators to verify evidence - stateDB sm.Store // needed to load headers and commits to verify evidence blockStore BlockStore + stateDB sm.Store mtx sync.Mutex // latest state - state sm.State + state sm.State + isStarted bool // evidence from consensus is buffered to this slice, awaiting until the next height // before being flushed to the pool. This prevents broadcasting and proposing of // evidence before the height with which the evidence happened is finished. @@ -60,46 +60,19 @@ type Pool struct { Metrics *Metrics } -func (evpool *Pool) SetEventBus(e *eventbus.EventBus) { - evpool.eventBus = e -} - // NewPool creates an evidence pool. If using an existing evidence store, // it will add all pending evidence to the concurrent list. -func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore, metrics *Metrics) (*Pool, error) { - state, err := stateDB.Load() - if err != nil { - return nil, fmt.Errorf("failed to load state: %w", err) - } - - pool := &Pool{ - stateDB: stateDB, +func NewPool(logger log.Logger, evidenceDB dbm.DB, stateStore sm.Store, blockStore BlockStore, metrics *Metrics, eventBus *eventbus.EventBus) *Pool { + return &Pool{ blockStore: blockStore, - state: state, + stateDB: stateStore, logger: logger, evidenceStore: evidenceDB, evidenceList: clist.New(), consensusBuffer: make([]duplicateVoteSet, 0), Metrics: metrics, + eventBus: eventBus, } - - // If pending evidence already in db, in event of prior failure, then check - // for expiration, update the size and load it back to the evidenceList. - pool.pruningHeight, pool.pruningTime = pool.removeExpiredPendingEvidence() - evList, _, err := pool.listEvidence(prefixPending, -1) - if err != nil { - return nil, err - } - - atomic.StoreUint32(&pool.evidenceSize, uint32(len(evList))) - pool.Metrics.NumEvidence.Set(float64(pool.evidenceSize)) - - for _, ev := range evList { - pool.evidenceList.PushBack(ev) - } - pool.eventBus = nil - - return pool, nil } // PendingEvidence is used primarily as part of block proposal and returns up to @@ -277,6 +250,31 @@ func (evpool *Pool) State() sm.State { return evpool.state } +func (evpool *Pool) Start(state sm.State) error { + if evpool.isStarted { + return errors.New("pooli is already running") + } + + evpool.state = state + + // If pending evidence already in db, in event of prior failure, then check + // for expiration, update the size and load it back to the evidenceList. + evpool.pruningHeight, evpool.pruningTime = evpool.removeExpiredPendingEvidence() + evList, _, err := evpool.listEvidence(prefixPending, -1) + if err != nil { + return err + } + + atomic.StoreUint32(&evpool.evidenceSize, uint32(len(evList))) + evpool.Metrics.NumEvidence.Set(float64(evpool.evidenceSize)) + + for _, ev := range evList { + evpool.evidenceList.PushBack(ev) + } + + return nil +} + func (evpool *Pool) Close() error { return evpool.evidenceStore.Close() } @@ -449,6 +447,7 @@ func (evpool *Pool) listEvidence(prefixKey int64, maxBytes int64) ([]types.Evide } func (evpool *Pool) removeExpiredPendingEvidence() (int64, time.Time) { + batch := evpool.evidenceStore.NewBatch() defer batch.Close() @@ -473,7 +472,6 @@ func (evpool *Pool) removeExpiredPendingEvidence() (int64, time.Time) { // remove evidence from the clist evpool.removeEvidenceFromList(blockEvidenceMap) - // update the evidence size atomic.AddUint32(&evpool.evidenceSize, ^uint32(len(blockEvidenceMap)-1)) diff --git a/internal/evidence/pool_test.go b/internal/evidence/pool_test.go index 51f785221..a2c89b5dc 100644 --- a/internal/evidence/pool_test.go +++ b/internal/evidence/pool_test.go @@ -34,6 +34,18 @@ var ( defaultEvidenceMaxBytes int64 = 1000 ) +func bootstrapPool(t *testing.T, pool *evidence.Pool, store sm.Store) { + t.Helper() + state, err := store.Load() + if err != nil { + t.Fatalf("cannot load state: %v", err) + } + if err := pool.Start(state); err != nil { + t.Fatalf("cannot start state pool: %v", err) + } + +} + func TestEvidencePoolBasic(t *testing.T) { var ( height = int64(1) @@ -51,9 +63,13 @@ func TestEvidencePoolBasic(t *testing.T) { stateStore.On("LoadValidators", mock.AnythingOfType("int64")).Return(valSet, nil) stateStore.On("Load").Return(createState(height+1, valSet), nil) - pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) - require.NoError(t, setupEventBus(ctx, pool)) + logger := log.NewNopLogger() + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + + pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus) + bootstrapPool(t, pool, stateStore) + // evidence not seen yet: evs, size := pool.PendingEvidence(defaultEvidenceMaxBytes) require.Equal(t, 0, len(evs)) @@ -115,10 +131,12 @@ func TestAddExpiredEvidence(t *testing.T) { return &types.BlockMeta{Header: types.Header{Time: expiredEvidenceTime}} }) - pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + logger := log.NewNopLogger() + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus) + bootstrapPool(t, pool, stateStore) testCases := []struct { evHeight int64 @@ -159,9 +177,7 @@ func TestReportConflictingVotes(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, pv := defaultTestPool(ctx, t, height) - - require.NoError(t, setupEventBus(ctx, pool)) + pool, pv, _ := defaultTestPool(ctx, t, height) val := types.NewValidator(pv.PrivKey.PubKey(), 10) @@ -201,9 +217,7 @@ func TestEvidencePoolUpdate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, val := defaultTestPool(ctx, t, height) - - require.NoError(t, setupEventBus(ctx, pool)) + pool, val, _ := defaultTestPool(ctx, t, height) state := pool.State() @@ -273,9 +287,7 @@ func TestVerifyPendingEvidencePasses(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, val := defaultTestPool(ctx, t, height) - - require.NoError(t, setupEventBus(ctx, pool)) + pool, val, _ := defaultTestPool(ctx, t, height) ev, err := types.NewMockDuplicateVoteEvidenceWithValidator( ctx, @@ -295,9 +307,7 @@ func TestVerifyDuplicatedEvidenceFails(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, val := defaultTestPool(ctx, t, height) - - require.NoError(t, setupEventBus(ctx, pool)) + pool, val, _ := defaultTestPool(ctx, t, height) ev, err := types.NewMockDuplicateVoteEvidenceWithValidator( ctx, @@ -321,7 +331,7 @@ func TestEventOnEvidenceValidated(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, val := defaultTestPool(ctx, t, height) + pool, val, eventBus := defaultTestPool(ctx, t, height) ev, err := types.NewMockDuplicateVoteEvidenceWithValidator( ctx, @@ -332,11 +342,6 @@ func TestEventOnEvidenceValidated(t *testing.T) { ) require.NoError(t, err) - eventBus := eventbus.NewDefault(log.TestingLogger()) - require.NoError(t, eventBus.Start(ctx)) - - pool.SetEventBus(eventBus) - const query = `tm.event='EvidenceValidated'` evSub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: "test", @@ -348,6 +353,9 @@ func TestEventOnEvidenceValidated(t *testing.T) { go func() { defer close(done) msg, err := evSub.Next(ctx) + if ctx.Err() != nil { + return + } assert.NoError(t, err) edt := msg.Data().(types.EventDataEvidenceValidated) @@ -394,14 +402,15 @@ func TestLightClientAttackEvidenceLifecycle(t *testing.T) { blockStore.On("LoadBlockCommit", height).Return(trusted.Commit) blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + logger := log.NewNopLogger() + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus) hash := ev.Hash() - err = pool.AddEvidence(ctx, ev) + err := pool.AddEvidence(ctx, ev) require.NoError(t, err) err = pool.AddEvidence(ctx, ev) require.NoError(t, err) @@ -449,11 +458,13 @@ func TestRecoverPendingEvidence(t *testing.T) { blockStore, err := initializeBlockStore(dbm.NewMemDB(), state, valAddress) require.NoError(t, err) - // create previous pool and populate it - pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + logger := log.NewNopLogger() + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + // create previous pool and populate it + pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus) + bootstrapPool(t, pool, stateStore) goodEvidence, err := types.NewMockDuplicateVoteEvidenceWithValidator( ctx, @@ -495,9 +506,8 @@ func TestRecoverPendingEvidence(t *testing.T) { }, }, nil) - newPool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, newStateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) - + newPool := evidence.NewPool(logger, evidenceDB, newStateStore, blockStore, evidence.NopMetrics(), nil) + bootstrapPool(t, newPool, newStateStore) evList, _ := newPool.PendingEvidence(defaultEvidenceMaxBytes) require.Equal(t, 1, len(evList)) @@ -590,7 +600,7 @@ func makeCommit(height int64, valAddr []byte) *types.Commit { return types.NewCommit(height, 0, types.BlockID{}, commitSigs) } -func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence.Pool, types.MockPV) { +func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence.Pool, types.MockPV, *eventbus.EventBus) { t.Helper() val := types.NewMockPV() valAddress := val.PrivKey.PubKey().Address() @@ -601,10 +611,14 @@ func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence blockStore, err := initializeBlockStore(dbm.NewMemDB(), state, valAddress) require.NoError(t, err) - pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err, "test evidence pool could not be created") + logger := log.NewNopLogger() + + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - return pool, val + pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus) + bootstrapPool(t, pool, stateStore) + return pool, val, eventBus } func createState(height int64, valSet *types.ValidatorSet) sm.State { @@ -616,12 +630,3 @@ func createState(height int64, valSet *types.ValidatorSet) sm.State { ConsensusParams: *types.DefaultConsensusParams(), } } - -func setupEventBus(ctx context.Context, evpool *evidence.Pool) error { - eventBus := eventbus.NewDefault(log.TestingLogger()) - if err := eventBus.Start(ctx); err != nil { - return err - } - evpool.SetEventBus(eventBus) - return nil -} diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index d0863acc1..09c5cbfa8 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -82,13 +82,14 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint } return nil }) - rts.pools[nodeID], err = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore, evidence.NopMetrics()) - - require.NoError(t, err) eventBus := eventbus.NewDefault(logger) err = eventBus.Start(ctx) require.NoError(t, err) - rts.pools[nodeID].SetEventBus(eventBus) + + rts.pools[nodeID] = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore, evidence.NopMetrics(), eventBus) + bootstrapPool(t, rts.pools[nodeID], stateStores[idx]) + + require.NoError(t, err) rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) diff --git a/internal/evidence/verify_test.go b/internal/evidence/verify_test.go index 607c8fd50..3928f69ab 100644 --- a/internal/evidence/verify_test.go +++ b/internal/evidence/verify_test.go @@ -12,6 +12,7 @@ import ( "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/tmhash" + "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/evidence" "github.com/tendermint/tendermint/internal/evidence/mocks" sm "github.com/tendermint/tendermint/internal/state" @@ -76,6 +77,7 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) { ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + logger := log.NewNopLogger() attackTime := defaultEvidenceTime.Add(1 * time.Hour) // create valid lunatic evidence @@ -96,8 +98,7 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) { blockStore.On("LoadBlockMeta", height).Return(&types.BlockMeta{Header: *trusted.Header}) blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit) blockStore.On("LoadBlockCommit", height).Return(trusted.Commit) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + pool := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil) evList := types.EvidenceList{ev} // check that the evidence pool correctly verifies the evidence @@ -111,32 +112,29 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) { // if we submit evidence only against a single byzantine validator when we see there are more validators then this // should return an error ev.ByzantineValidators = ev.ByzantineValidators[:1] - t.Log(evList) assert.Error(t, pool.CheckEvidence(ctx, evList)) // restore original byz vals ev.ByzantineValidators = ev.GetByzantineValidators(common.ValidatorSet, trusted.SignedHeader) // duplicate evidence should be rejected evList = types.EvidenceList{ev, ev} - pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil) assert.Error(t, pool.CheckEvidence(ctx, evList)) // If evidence is submitted with an altered timestamp it should return an error - ev.Timestamp = defaultEvidenceTime.Add(1 * time.Minute) - pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + ev.Timestamp = defaultEvidenceTime.Add(1 * time.Minute) + pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus) - err = pool.AddEvidence(ctx, ev) + err := pool.AddEvidence(ctx, ev) assert.Error(t, err) ev.Timestamp = defaultEvidenceTime // Evidence submitted with a different validator power should fail ev.TotalVotingPower = 1 - pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil) err = pool.AddEvidence(ctx, ev) assert.Error(t, err) ev.TotalVotingPower = common.ValidatorSet.TotalVotingPower() @@ -154,6 +152,9 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + logger := log.NewNopLogger() + // create a forward lunatic attack ev, trusted, common := makeLunaticEvidence(ctx, t, attackHeight, commonHeight, totalVals, byzVals, totalVals-byzVals, defaultEvidenceTime, attackTime) @@ -179,10 +180,11 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) { blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit) blockStore.On("LoadBlockCommit", nodeHeight).Return(trusted.Commit) blockStore.On("Height").Return(nodeHeight) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) - require.NoError(t, setupEventBus(ctx, pool)) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + + pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus) // check that the evidence pool correctly verifies the evidence assert.NoError(t, pool.CheckEvidence(ctx, types.EvidenceList{ev})) @@ -199,8 +201,7 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) { oldBlockStore.On("Height").Return(nodeHeight) require.Equal(t, defaultEvidenceTime, oldBlockStore.LoadBlockMeta(nodeHeight).Header.Time) - pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, oldBlockStore, evidence.NopMetrics()) - require.NoError(t, err) + pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, oldBlockStore, evidence.NopMetrics(), nil) assert.Error(t, pool.CheckEvidence(ctx, types.EvidenceList{ev})) } @@ -208,6 +209,8 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + logger := log.NewNopLogger() + conflictingVals, conflictingPrivVals := factory.ValidatorSet(ctx, t, 5, 10) conflictingHeader := factory.MakeHeader(t, &types.Header{ @@ -289,10 +292,10 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) { blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader}) blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus) evList := types.EvidenceList{ev} err = pool.CheckEvidence(ctx, evList) @@ -305,6 +308,9 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) { func TestVerifyLightClientAttack_Amnesia(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + logger := log.NewNopLogger() + var height int64 = 10 conflictingVals, conflictingPrivVals := factory.ValidatorSet(ctx, t, 5, 10) @@ -378,10 +384,10 @@ func TestVerifyLightClientAttack_Amnesia(t *testing.T) { blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader}) blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus) evList := types.EvidenceList{ev} err = pool.CheckEvidence(ctx, evList) @@ -401,6 +407,7 @@ func TestVerifyDuplicateVoteEvidence(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + logger := log.NewNopLogger() val := types.NewMockPV() val2 := types.NewMockPV() valSet := types.NewValidatorSet([]*types.Validator{val.ExtractIntoValidator(ctx, 1)}) @@ -478,10 +485,11 @@ func TestVerifyDuplicateVoteEvidence(t *testing.T) { blockStore := &mocks.BlockStore{} blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: types.Header{Time: defaultEvidenceTime}}) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus) + bootstrapPool(t, pool, stateStore) evList := types.EvidenceList{goodEv} err = pool.CheckEvidence(ctx, evList) diff --git a/node/node.go b/node/node.go index e57b7c4db..dab62de09 100644 --- a/node/node.go +++ b/node/node.go @@ -62,7 +62,8 @@ type nodeImpl struct { // services eventSinks []indexer.EventSink stateStore sm.Store - blockStore *store.BlockStore // store the blockchain to disk + blockStore *store.BlockStore // store the blockchain to disk + evPool *evidence.Pool stateSync bool // whether the node should state sync on startup stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots @@ -388,6 +389,7 @@ func makeNode( blockStore: blockStore, stateSyncReactor: stateSyncReactor, stateSync: stateSync, + evPool: evPool, shutdownOps: makeCloser(closers), @@ -462,6 +464,14 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { } } + state, err := n.stateStore.Load() + if err != nil { + return err + } + if err := n.evPool.Start(state); err != nil { + return err + } + n.rpcEnv.NodeInfo = n.nodeInfo // Start the RPC server before the P2P server // so we can eg. receive txs for the first block diff --git a/node/node_test.go b/node/node_test.go index 690da1a4d..3f8bf9141 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -297,8 +297,7 @@ func TestCreateProposalBlock(t *testing.T) { // Make EvidencePool evidenceDB := dbm.NewMemDB() blockStore := store.NewBlockStore(dbm.NewMemDB()) - evidencePool, err := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + evidencePool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), nil) // fill the evidence pool with more evidence // than can fit in a block diff --git a/node/setup.go b/node/setup.go index 7a473fae9..3b7fcf239 100644 --- a/node/setup.go +++ b/node/setup.go @@ -228,12 +228,7 @@ func createEvidenceReactor( logger = logger.With("module", "evidence") - evidencePool, err := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics) - if err != nil { - return nil, nil, fmt.Errorf("creating evidence pool: %w", err) - } - - evidencePool.SetEventBus(eventBus) + evidencePool := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics, eventBus) evidenceReactor, err := evidence.NewReactor( ctx,