diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index f0df502f9..029561d51 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -87,10 +87,12 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { mempool.EnableTxsAvailable() } + eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events")) + require.NoError(t, eventBus.Start(ctx)) + // Make a full instance of the evidence pool evidenceDB := dbm.NewMemDB() - evpool, err := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + evpool := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus) // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) @@ -100,11 +102,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { pv := privVals[i] cs.SetPrivValidator(ctx, pv) - eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events")) - err = eventBus.Start(ctx) - require.NoError(t, err) cs.SetEventBus(eventBus) - evpool.SetEventBus(eventBus) cs.SetTimeoutTicker(tickerFunc()) states[i] = cs @@ -327,42 +325,42 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { // blocksSubs := make([]types.Subscription, n) // reactors := make([]p2p.Reactor, n) // for i := 0; i < n; i++ { - // // enable txs so we can create different proposals - // assertMempool(states[i].txNotifier).EnableTxsAvailable() + // // enable txs so we can create different proposals + // assertMempool(states[i].txNotifier).EnableTxsAvailable() - // eventBus := states[i].eventBus - // eventBus.SetLogger(logger.With("module", "events", "validator", i)) + // eventBus := states[i].eventBus + // eventBus.SetLogger(logger.With("module", "events", "validator", i)) - // var err error - // blocksSubs[i], err = eventBus.Subscribe(ctx, testSubscriber, types.EventQueryNewBlock) - // require.NoError(t, err) + // var err error + // blocksSubs[i], err = eventBus.Subscribe(ctx, testSubscriber, types.EventQueryNewBlock) + // require.NoError(t, err) - // conR := NewReactor(states[i], true) // so we don't start the consensus states - // conR.SetLogger(logger.With("validator", i)) - // conR.SetEventBus(eventBus) + // conR := NewReactor(states[i], true) // so we don't start the consensus states + // conR.SetLogger(logger.With("validator", i)) + // conR.SetEventBus(eventBus) - // var conRI p2p.Reactor = conR + // var conRI p2p.Reactor = conR - // // make first val byzantine - // if i == 0 { - // conRI = NewByzantineReactor(conR) - // } + // // make first val byzantine + // if i == 0 { + // conRI = NewByzantineReactor(conR) + // } - // reactors[i] = conRI - // err = states[i].blockExec.Store().Save(states[i].state) // for save height 1's validators info - // require.NoError(t, err) + // reactors[i] = conRI + // err = states[i].blockExec.Store().Save(states[i].state) // for save height 1's validators info + // require.NoError(t, err) // } // switches := p2p.MakeConnectedSwitches(config.P2P, N, func(i int, sw *p2p.Switch) *p2p.Switch { - // sw.SetLogger(p2pLogger.With("validator", i)) - // sw.AddReactor("CONSENSUS", reactors[i]) - // return sw + // sw.SetLogger(p2pLogger.With("validator", i)) + // sw.AddReactor("CONSENSUS", reactors[i]) + // return sw // }, func(sws []*p2p.Switch, i, j int) { - // // the network starts partitioned with globally active adversary - // if i != 0 { - // return - // } - // p2p.Connect2Switches(sws, i, j) + // // the network starts partitioned with globally active adversary + // if i != 0 { + // return + // } + // p2p.Connect2Switches(sws, i, j) // }) // // make first val byzantine @@ -370,26 +368,26 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { // // do any safety checks. // states[0].privValidator.(types.MockPV).DisableChecks() // states[0].decideProposal = func(j int32) func(int64, int32) { - // return func(height int64, round int32) { - // byzantineDecideProposalFunc(t, height, round, states[j], switches[j]) - // } + // return func(height int64, round int32) { + // byzantineDecideProposalFunc(t, height, round, states[j], switches[j]) + // } // }(int32(0)) // // We are setting the prevote function to do nothing because the prevoting // // and precommitting are done alongside the proposal. // states[0].doPrevote = func(height int64, round int32) {} // defer func() { - // for _, sw := range switches { - // err := sw.Stop() - // require.NoError(t, err) - // } + // for _, sw := range switches { + // err := sw.Stop() + // require.NoError(t, err) + // } // }() // // start the non-byz state machines. // // note these must be started before the byz // for i := 1; i < n; i++ { - // cr := reactors[i].(*Reactor) - // cr.SwitchToConsensus(cr.conS.GetState(), false) + // cr := reactors[i].(*Reactor) + // cr.SwitchToConsensus(cr.conS.GetState(), false) // } // // start the byzantine state machine @@ -421,146 +419,146 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { // // (one of them already has) // wg := new(sync.WaitGroup) // for i := 1; i < N-1; i++ { - // wg.Add(1) - // go func(j int) { - // <-blocksSubs[j].Out() - // wg.Done() - // }(i) + // wg.Add(1) + // go func(j int) { + // <-blocksSubs[j].Out() + // wg.Done() + // }(i) // } // done := make(chan struct{}) // go func() { - // wg.Wait() - // close(done) + // wg.Wait() + // close(done) // }() // tick := time.NewTicker(time.Second * 10) // select { // case <-done: // case <-tick.C: - // for i, reactor := range reactors { - // t.Log(fmt.Sprintf("Consensus Reactor %v", i)) - // t.Log(fmt.Sprintf("%v", reactor)) - // } - // t.Fatalf("Timed out waiting for all validators to commit first block") + // for i, reactor := range reactors { + // t.Log(fmt.Sprintf("Consensus Reactor %v", i)) + // t.Log(fmt.Sprintf("%v", reactor)) + // } + // t.Fatalf("Timed out waiting for all validators to commit first block") // } } // func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *State, sw *p2p.Switch) { -// // byzantine user should create two proposals and try to split the vote. -// // Avoid sending on internalMsgQueue and running consensus state. - -// // Create a new proposal block from state/txs from the mempool. -// block1, blockParts1 := cs.createProposalBlock() -// polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()} -// proposal1 := types.NewProposal(height, round, polRound, propBlockID) -// p1 := proposal1.ToProto() -// if err := cs.privValidator.SignProposal(cs.state.ChainID, p1); err != nil { -// t.Error(err) -// } - -// proposal1.Signature = p1.Signature - -// // some new transactions come in (this ensures that the proposals are different) -// deliverTxsRange(cs, 0, 1) - -// // Create a new proposal block from state/txs from the mempool. -// block2, blockParts2 := cs.createProposalBlock() -// polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()} -// proposal2 := types.NewProposal(height, round, polRound, propBlockID) -// p2 := proposal2.ToProto() -// if err := cs.privValidator.SignProposal(cs.state.ChainID, p2); err != nil { -// t.Error(err) -// } - -// proposal2.Signature = p2.Signature - -// block1Hash := block1.Hash() -// block2Hash := block2.Hash() - -// // broadcast conflicting proposals/block parts to peers -// peers := sw.Peers().List() -// t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers)) -// for i, peer := range peers { -// if i < len(peers)/2 { -// go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1) -// } else { -// go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2) -// } -// } +// // byzantine user should create two proposals and try to split the vote. +// // Avoid sending on internalMsgQueue and running consensus state. + +// // Create a new proposal block from state/txs from the mempool. +// block1, blockParts1 := cs.createProposalBlock() +// polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()} +// proposal1 := types.NewProposal(height, round, polRound, propBlockID) +// p1 := proposal1.ToProto() +// if err := cs.privValidator.SignProposal(cs.state.ChainID, p1); err != nil { +// t.Error(err) +// } + +// proposal1.Signature = p1.Signature + +// // some new transactions come in (this ensures that the proposals are different) +// deliverTxsRange(cs, 0, 1) + +// // Create a new proposal block from state/txs from the mempool. +// block2, blockParts2 := cs.createProposalBlock() +// polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()} +// proposal2 := types.NewProposal(height, round, polRound, propBlockID) +// p2 := proposal2.ToProto() +// if err := cs.privValidator.SignProposal(cs.state.ChainID, p2); err != nil { +// t.Error(err) +// } + +// proposal2.Signature = p2.Signature + +// block1Hash := block1.Hash() +// block2Hash := block2.Hash() + +// // broadcast conflicting proposals/block parts to peers +// peers := sw.Peers().List() +// t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers)) +// for i, peer := range peers { +// if i < len(peers)/2 { +// go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1) +// } else { +// go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2) +// } +// } // } // func sendProposalAndParts( -// height int64, -// round int32, -// cs *State, -// peer p2p.Peer, -// proposal *types.Proposal, -// blockHash []byte, -// parts *types.PartSet, +// height int64, +// round int32, +// cs *State, +// peer p2p.Peer, +// proposal *types.Proposal, +// blockHash []byte, +// parts *types.PartSet, // ) { -// // proposal -// msg := &ProposalMessage{Proposal: proposal} -// peer.Send(DataChannel, MustEncode(msg)) - -// // parts -// for i := 0; i < int(parts.Total()); i++ { -// part := parts.GetPart(i) -// msg := &BlockPartMessage{ -// Height: height, // This tells peer that this part applies to us. -// Round: round, // This tells peer that this part applies to us. -// Part: part, -// } -// peer.Send(DataChannel, MustEncode(msg)) -// } - -// // votes -// cs.mtx.Lock() -// prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header()) -// precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header()) -// cs.mtx.Unlock() - -// peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote})) -// peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit})) +// // proposal +// msg := &ProposalMessage{Proposal: proposal} +// peer.Send(DataChannel, MustEncode(msg)) + +// // parts +// for i := 0; i < int(parts.Total()); i++ { +// part := parts.GetPart(i) +// msg := &BlockPartMessage{ +// Height: height, // This tells peer that this part applies to us. +// Round: round, // This tells peer that this part applies to us. +// Part: part, +// } +// peer.Send(DataChannel, MustEncode(msg)) +// } + +// // votes +// cs.mtx.Lock() +// prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header()) +// precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header()) +// cs.mtx.Unlock() + +// peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote})) +// peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit})) // } // type ByzantineReactor struct { -// service.Service -// reactor *Reactor +// service.Service +// reactor *Reactor // } // func NewByzantineReactor(conR *Reactor) *ByzantineReactor { -// return &ByzantineReactor{ -// Service: conR, -// reactor: conR, -// } +// return &ByzantineReactor{ +// Service: conR, +// reactor: conR, +// } // } // func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) } // func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() } // func (br *ByzantineReactor) AddPeer(peer p2p.Peer) { -// if !br.reactor.IsRunning() { -// return -// } - -// // Create peerState for peer -// peerState := NewPeerState(peer).SetLogger(br.reactor.logger) -// peer.Set(types.PeerStateKey, peerState) - -// // Send our state to peer. -// // If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). -// if !br.reactor.waitSync { -// br.reactor.sendNewRoundStepMessage(peer) -// } +// if !br.reactor.IsRunning() { +// return +// } + +// // Create peerState for peer +// peerState := NewPeerState(peer).SetLogger(br.reactor.logger) +// peer.Set(types.PeerStateKey, peerState) + +// // Send our state to peer. +// // If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). +// if !br.reactor.waitSync { +// br.reactor.sendNewRoundStepMessage(peer) +// } // } // func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) { -// br.reactor.RemovePeer(peer, reason) +// br.reactor.RemovePeer(peer, reason) // } // func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { -// br.reactor.Receive(chID, peer, msgBytes) +// br.reactor.Receive(chID, peer, msgBytes) // } // func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer } diff --git a/internal/evidence/pool.go b/internal/evidence/pool.go index f4afb1f8c..d2d998c91 100644 --- a/internal/evidence/pool.go +++ b/internal/evidence/pool.go @@ -36,14 +36,14 @@ type Pool struct { evidenceList *clist.CList // concurrent linked-list of evidence evidenceSize uint32 // amount of pending evidence - // needed to load validators to verify evidence - stateDB sm.Store // needed to load headers and commits to verify evidence blockStore BlockStore + stateDB sm.Store mtx sync.Mutex // latest state - state sm.State + state sm.State + isStarted bool // evidence from consensus is buffered to this slice, awaiting until the next height // before being flushed to the pool. This prevents broadcasting and proposing of // evidence before the height with which the evidence happened is finished. @@ -60,46 +60,19 @@ type Pool struct { Metrics *Metrics } -func (evpool *Pool) SetEventBus(e *eventbus.EventBus) { - evpool.eventBus = e -} - // NewPool creates an evidence pool. If using an existing evidence store, // it will add all pending evidence to the concurrent list. -func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore, metrics *Metrics) (*Pool, error) { - state, err := stateDB.Load() - if err != nil { - return nil, fmt.Errorf("failed to load state: %w", err) - } - - pool := &Pool{ - stateDB: stateDB, +func NewPool(logger log.Logger, evidenceDB dbm.DB, stateStore sm.Store, blockStore BlockStore, metrics *Metrics, eventBus *eventbus.EventBus) *Pool { + return &Pool{ blockStore: blockStore, - state: state, + stateDB: stateStore, logger: logger, evidenceStore: evidenceDB, evidenceList: clist.New(), consensusBuffer: make([]duplicateVoteSet, 0), Metrics: metrics, + eventBus: eventBus, } - - // If pending evidence already in db, in event of prior failure, then check - // for expiration, update the size and load it back to the evidenceList. - pool.pruningHeight, pool.pruningTime = pool.removeExpiredPendingEvidence() - evList, _, err := pool.listEvidence(prefixPending, -1) - if err != nil { - return nil, err - } - - atomic.StoreUint32(&pool.evidenceSize, uint32(len(evList))) - pool.Metrics.NumEvidence.Set(float64(pool.evidenceSize)) - - for _, ev := range evList { - pool.evidenceList.PushBack(ev) - } - pool.eventBus = nil - - return pool, nil } // PendingEvidence is used primarily as part of block proposal and returns up to @@ -277,6 +250,31 @@ func (evpool *Pool) State() sm.State { return evpool.state } +func (evpool *Pool) Start(state sm.State) error { + if evpool.isStarted { + return errors.New("pool is already running") + } + + evpool.state = state + + // If pending evidence already in db, in event of prior failure, then check + // for expiration, update the size and load it back to the evidenceList. + evpool.pruningHeight, evpool.pruningTime = evpool.removeExpiredPendingEvidence() + evList, _, err := evpool.listEvidence(prefixPending, -1) + if err != nil { + return err + } + + atomic.StoreUint32(&evpool.evidenceSize, uint32(len(evList))) + evpool.Metrics.NumEvidence.Set(float64(evpool.evidenceSize)) + + for _, ev := range evList { + evpool.evidenceList.PushBack(ev) + } + + return nil +} + func (evpool *Pool) Close() error { return evpool.evidenceStore.Close() } @@ -449,6 +447,7 @@ func (evpool *Pool) listEvidence(prefixKey int64, maxBytes int64) ([]types.Evide } func (evpool *Pool) removeExpiredPendingEvidence() (int64, time.Time) { + batch := evpool.evidenceStore.NewBatch() defer batch.Close() @@ -473,7 +472,6 @@ func (evpool *Pool) removeExpiredPendingEvidence() (int64, time.Time) { // remove evidence from the clist evpool.removeEvidenceFromList(blockEvidenceMap) - // update the evidence size atomic.AddUint32(&evpool.evidenceSize, ^uint32(len(blockEvidenceMap)-1)) diff --git a/internal/evidence/pool_test.go b/internal/evidence/pool_test.go index 51f785221..b3f5df9f1 100644 --- a/internal/evidence/pool_test.go +++ b/internal/evidence/pool_test.go @@ -34,6 +34,18 @@ var ( defaultEvidenceMaxBytes int64 = 1000 ) +func startPool(t *testing.T, pool *evidence.Pool, store sm.Store) { + t.Helper() + state, err := store.Load() + if err != nil { + t.Fatalf("cannot load state: %v", err) + } + if err := pool.Start(state); err != nil { + t.Fatalf("cannot start state pool: %v", err) + } + +} + func TestEvidencePoolBasic(t *testing.T) { var ( height = int64(1) @@ -51,9 +63,13 @@ func TestEvidencePoolBasic(t *testing.T) { stateStore.On("LoadValidators", mock.AnythingOfType("int64")).Return(valSet, nil) stateStore.On("Load").Return(createState(height+1, valSet), nil) - pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) - require.NoError(t, setupEventBus(ctx, pool)) + logger := log.NewNopLogger() + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + + pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus) + startPool(t, pool, stateStore) + // evidence not seen yet: evs, size := pool.PendingEvidence(defaultEvidenceMaxBytes) require.Equal(t, 0, len(evs)) @@ -115,10 +131,12 @@ func TestAddExpiredEvidence(t *testing.T) { return &types.BlockMeta{Header: types.Header{Time: expiredEvidenceTime}} }) - pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + logger := log.NewNopLogger() + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus) + startPool(t, pool, stateStore) testCases := []struct { evHeight int64 @@ -159,9 +177,7 @@ func TestReportConflictingVotes(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, pv := defaultTestPool(ctx, t, height) - - require.NoError(t, setupEventBus(ctx, pool)) + pool, pv, _ := defaultTestPool(ctx, t, height) val := types.NewValidator(pv.PrivKey.PubKey(), 10) @@ -201,9 +217,7 @@ func TestEvidencePoolUpdate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, val := defaultTestPool(ctx, t, height) - - require.NoError(t, setupEventBus(ctx, pool)) + pool, val, _ := defaultTestPool(ctx, t, height) state := pool.State() @@ -273,9 +287,7 @@ func TestVerifyPendingEvidencePasses(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, val := defaultTestPool(ctx, t, height) - - require.NoError(t, setupEventBus(ctx, pool)) + pool, val, _ := defaultTestPool(ctx, t, height) ev, err := types.NewMockDuplicateVoteEvidenceWithValidator( ctx, @@ -295,9 +307,7 @@ func TestVerifyDuplicatedEvidenceFails(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, val := defaultTestPool(ctx, t, height) - - require.NoError(t, setupEventBus(ctx, pool)) + pool, val, _ := defaultTestPool(ctx, t, height) ev, err := types.NewMockDuplicateVoteEvidenceWithValidator( ctx, @@ -321,7 +331,7 @@ func TestEventOnEvidenceValidated(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, val := defaultTestPool(ctx, t, height) + pool, val, eventBus := defaultTestPool(ctx, t, height) ev, err := types.NewMockDuplicateVoteEvidenceWithValidator( ctx, @@ -332,11 +342,6 @@ func TestEventOnEvidenceValidated(t *testing.T) { ) require.NoError(t, err) - eventBus := eventbus.NewDefault(log.TestingLogger()) - require.NoError(t, eventBus.Start(ctx)) - - pool.SetEventBus(eventBus) - const query = `tm.event='EvidenceValidated'` evSub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: "test", @@ -348,6 +353,9 @@ func TestEventOnEvidenceValidated(t *testing.T) { go func() { defer close(done) msg, err := evSub.Next(ctx) + if ctx.Err() != nil { + return + } assert.NoError(t, err) edt := msg.Data().(types.EventDataEvidenceValidated) @@ -394,14 +402,15 @@ func TestLightClientAttackEvidenceLifecycle(t *testing.T) { blockStore.On("LoadBlockCommit", height).Return(trusted.Commit) blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + logger := log.NewNopLogger() + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus) hash := ev.Hash() - err = pool.AddEvidence(ctx, ev) + err := pool.AddEvidence(ctx, ev) require.NoError(t, err) err = pool.AddEvidence(ctx, ev) require.NoError(t, err) @@ -449,11 +458,13 @@ func TestRecoverPendingEvidence(t *testing.T) { blockStore, err := initializeBlockStore(dbm.NewMemDB(), state, valAddress) require.NoError(t, err) - // create previous pool and populate it - pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + logger := log.NewNopLogger() + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + // create previous pool and populate it + pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus) + startPool(t, pool, stateStore) goodEvidence, err := types.NewMockDuplicateVoteEvidenceWithValidator( ctx, @@ -495,9 +506,8 @@ func TestRecoverPendingEvidence(t *testing.T) { }, }, nil) - newPool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, newStateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) - + newPool := evidence.NewPool(logger, evidenceDB, newStateStore, blockStore, evidence.NopMetrics(), nil) + startPool(t, newPool, newStateStore) evList, _ := newPool.PendingEvidence(defaultEvidenceMaxBytes) require.Equal(t, 1, len(evList)) @@ -590,7 +600,7 @@ func makeCommit(height int64, valAddr []byte) *types.Commit { return types.NewCommit(height, 0, types.BlockID{}, commitSigs) } -func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence.Pool, types.MockPV) { +func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence.Pool, types.MockPV, *eventbus.EventBus) { t.Helper() val := types.NewMockPV() valAddress := val.PrivKey.PubKey().Address() @@ -601,10 +611,14 @@ func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence blockStore, err := initializeBlockStore(dbm.NewMemDB(), state, valAddress) require.NoError(t, err) - pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err, "test evidence pool could not be created") + logger := log.NewNopLogger() + + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - return pool, val + pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus) + startPool(t, pool, stateStore) + return pool, val, eventBus } func createState(height int64, valSet *types.ValidatorSet) sm.State { @@ -616,12 +630,3 @@ func createState(height int64, valSet *types.ValidatorSet) sm.State { ConsensusParams: *types.DefaultConsensusParams(), } } - -func setupEventBus(ctx context.Context, evpool *evidence.Pool) error { - eventBus := eventbus.NewDefault(log.TestingLogger()) - if err := eventBus.Start(ctx); err != nil { - return err - } - evpool.SetEventBus(eventBus) - return nil -} diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index d0863acc1..664fb7b4e 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -82,13 +82,14 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint } return nil }) - rts.pools[nodeID], err = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore, evidence.NopMetrics()) - - require.NoError(t, err) eventBus := eventbus.NewDefault(logger) err = eventBus.Start(ctx) require.NoError(t, err) - rts.pools[nodeID].SetEventBus(eventBus) + + rts.pools[nodeID] = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore, evidence.NopMetrics(), eventBus) + startPool(t, rts.pools[nodeID], stateStores[idx]) + + require.NoError(t, err) rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) diff --git a/internal/evidence/verify_test.go b/internal/evidence/verify_test.go index 607c8fd50..675c5795a 100644 --- a/internal/evidence/verify_test.go +++ b/internal/evidence/verify_test.go @@ -12,6 +12,7 @@ import ( "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/tmhash" + "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/evidence" "github.com/tendermint/tendermint/internal/evidence/mocks" sm "github.com/tendermint/tendermint/internal/state" @@ -76,6 +77,7 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) { ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + logger := log.NewNopLogger() attackTime := defaultEvidenceTime.Add(1 * time.Hour) // create valid lunatic evidence @@ -96,8 +98,7 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) { blockStore.On("LoadBlockMeta", height).Return(&types.BlockMeta{Header: *trusted.Header}) blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit) blockStore.On("LoadBlockCommit", height).Return(trusted.Commit) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + pool := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil) evList := types.EvidenceList{ev} // check that the evidence pool correctly verifies the evidence @@ -111,32 +112,29 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) { // if we submit evidence only against a single byzantine validator when we see there are more validators then this // should return an error ev.ByzantineValidators = ev.ByzantineValidators[:1] - t.Log(evList) assert.Error(t, pool.CheckEvidence(ctx, evList)) // restore original byz vals ev.ByzantineValidators = ev.GetByzantineValidators(common.ValidatorSet, trusted.SignedHeader) // duplicate evidence should be rejected evList = types.EvidenceList{ev, ev} - pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil) assert.Error(t, pool.CheckEvidence(ctx, evList)) // If evidence is submitted with an altered timestamp it should return an error - ev.Timestamp = defaultEvidenceTime.Add(1 * time.Minute) - pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + ev.Timestamp = defaultEvidenceTime.Add(1 * time.Minute) + pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus) - err = pool.AddEvidence(ctx, ev) + err := pool.AddEvidence(ctx, ev) assert.Error(t, err) ev.Timestamp = defaultEvidenceTime // Evidence submitted with a different validator power should fail ev.TotalVotingPower = 1 - pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil) err = pool.AddEvidence(ctx, ev) assert.Error(t, err) ev.TotalVotingPower = common.ValidatorSet.TotalVotingPower() @@ -154,6 +152,9 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + logger := log.NewNopLogger() + // create a forward lunatic attack ev, trusted, common := makeLunaticEvidence(ctx, t, attackHeight, commonHeight, totalVals, byzVals, totalVals-byzVals, defaultEvidenceTime, attackTime) @@ -179,10 +180,11 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) { blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit) blockStore.On("LoadBlockCommit", nodeHeight).Return(trusted.Commit) blockStore.On("Height").Return(nodeHeight) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) - require.NoError(t, setupEventBus(ctx, pool)) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + + pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus) // check that the evidence pool correctly verifies the evidence assert.NoError(t, pool.CheckEvidence(ctx, types.EvidenceList{ev})) @@ -199,8 +201,7 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) { oldBlockStore.On("Height").Return(nodeHeight) require.Equal(t, defaultEvidenceTime, oldBlockStore.LoadBlockMeta(nodeHeight).Header.Time) - pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, oldBlockStore, evidence.NopMetrics()) - require.NoError(t, err) + pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, oldBlockStore, evidence.NopMetrics(), nil) assert.Error(t, pool.CheckEvidence(ctx, types.EvidenceList{ev})) } @@ -208,6 +209,8 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + logger := log.NewNopLogger() + conflictingVals, conflictingPrivVals := factory.ValidatorSet(ctx, t, 5, 10) conflictingHeader := factory.MakeHeader(t, &types.Header{ @@ -289,10 +292,10 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) { blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader}) blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus) evList := types.EvidenceList{ev} err = pool.CheckEvidence(ctx, evList) @@ -305,6 +308,9 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) { func TestVerifyLightClientAttack_Amnesia(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + logger := log.NewNopLogger() + var height int64 = 10 conflictingVals, conflictingPrivVals := factory.ValidatorSet(ctx, t, 5, 10) @@ -378,10 +384,10 @@ func TestVerifyLightClientAttack_Amnesia(t *testing.T) { blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader}) blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus) evList := types.EvidenceList{ev} err = pool.CheckEvidence(ctx, evList) @@ -401,6 +407,7 @@ func TestVerifyDuplicateVoteEvidence(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + logger := log.NewNopLogger() val := types.NewMockPV() val2 := types.NewMockPV() valSet := types.NewValidatorSet([]*types.Validator{val.ExtractIntoValidator(ctx, 1)}) @@ -478,10 +485,11 @@ func TestVerifyDuplicateVoteEvidence(t *testing.T) { blockStore := &mocks.BlockStore{} blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: types.Header{Time: defaultEvidenceTime}}) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) - require.NoError(t, setupEventBus(ctx, pool)) + pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus) + startPool(t, pool, stateStore) evList := types.EvidenceList{goodEv} err = pool.CheckEvidence(ctx, evList) diff --git a/node/node.go b/node/node.go index e57b7c4db..dab62de09 100644 --- a/node/node.go +++ b/node/node.go @@ -62,7 +62,8 @@ type nodeImpl struct { // services eventSinks []indexer.EventSink stateStore sm.Store - blockStore *store.BlockStore // store the blockchain to disk + blockStore *store.BlockStore // store the blockchain to disk + evPool *evidence.Pool stateSync bool // whether the node should state sync on startup stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots @@ -388,6 +389,7 @@ func makeNode( blockStore: blockStore, stateSyncReactor: stateSyncReactor, stateSync: stateSync, + evPool: evPool, shutdownOps: makeCloser(closers), @@ -462,6 +464,14 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { } } + state, err := n.stateStore.Load() + if err != nil { + return err + } + if err := n.evPool.Start(state); err != nil { + return err + } + n.rpcEnv.NodeInfo = n.nodeInfo // Start the RPC server before the P2P server // so we can eg. receive txs for the first block diff --git a/node/node_test.go b/node/node_test.go index 690da1a4d..3f8bf9141 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -297,8 +297,7 @@ func TestCreateProposalBlock(t *testing.T) { // Make EvidencePool evidenceDB := dbm.NewMemDB() blockStore := store.NewBlockStore(dbm.NewMemDB()) - evidencePool, err := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics()) - require.NoError(t, err) + evidencePool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), nil) // fill the evidence pool with more evidence // than can fit in a block diff --git a/node/setup.go b/node/setup.go index 7a473fae9..3b7fcf239 100644 --- a/node/setup.go +++ b/node/setup.go @@ -228,12 +228,7 @@ func createEvidenceReactor( logger = logger.With("module", "evidence") - evidencePool, err := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics) - if err != nil { - return nil, nil, fmt.Errorf("creating evidence pool: %w", err) - } - - evidencePool.SetEventBus(eventBus) + evidencePool := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics, eventBus) evidenceReactor, err := evidence.NewReactor( ctx,