Browse Source

evidence: manage and initialize state objects more clearly in the pool (#8080)

pull/8085/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
01266881b8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 281 additions and 267 deletions
  1. +143
    -145
      internal/consensus/byzantine_test.go
  2. +33
    -35
      internal/evidence/pool.go
  3. +53
    -48
      internal/evidence/pool_test.go
  4. +5
    -4
      internal/evidence/reactor_test.go
  5. +34
    -26
      internal/evidence/verify_test.go
  6. +11
    -1
      node/node.go
  7. +1
    -2
      node/node_test.go
  8. +1
    -6
      node/setup.go

+ 143
- 145
internal/consensus/byzantine_test.go View File

@ -87,10 +87,12 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
mempool.EnableTxsAvailable()
}
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))
require.NoError(t, eventBus.Start(ctx))
// Make a full instance of the evidence pool
evidenceDB := dbm.NewMemDB()
evpool, err := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
evpool := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus)
// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
@ -100,11 +102,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
pv := privVals[i]
cs.SetPrivValidator(ctx, pv)
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))
err = eventBus.Start(ctx)
require.NoError(t, err)
cs.SetEventBus(eventBus)
evpool.SetEventBus(eventBus)
cs.SetTimeoutTicker(tickerFunc())
states[i] = cs
@ -327,42 +325,42 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
// blocksSubs := make([]types.Subscription, n)
// reactors := make([]p2p.Reactor, n)
// for i := 0; i < n; i++ {
// // enable txs so we can create different proposals
// assertMempool(states[i].txNotifier).EnableTxsAvailable()
// // enable txs so we can create different proposals
// assertMempool(states[i].txNotifier).EnableTxsAvailable()
// eventBus := states[i].eventBus
// eventBus.SetLogger(logger.With("module", "events", "validator", i))
// eventBus := states[i].eventBus
// eventBus.SetLogger(logger.With("module", "events", "validator", i))
// var err error
// blocksSubs[i], err = eventBus.Subscribe(ctx, testSubscriber, types.EventQueryNewBlock)
// require.NoError(t, err)
// var err error
// blocksSubs[i], err = eventBus.Subscribe(ctx, testSubscriber, types.EventQueryNewBlock)
// require.NoError(t, err)
// conR := NewReactor(states[i], true) // so we don't start the consensus states
// conR.SetLogger(logger.With("validator", i))
// conR.SetEventBus(eventBus)
// conR := NewReactor(states[i], true) // so we don't start the consensus states
// conR.SetLogger(logger.With("validator", i))
// conR.SetEventBus(eventBus)
// var conRI p2p.Reactor = conR
// var conRI p2p.Reactor = conR
// // make first val byzantine
// if i == 0 {
// conRI = NewByzantineReactor(conR)
// }
// // make first val byzantine
// if i == 0 {
// conRI = NewByzantineReactor(conR)
// }
// reactors[i] = conRI
// err = states[i].blockExec.Store().Save(states[i].state) // for save height 1's validators info
// require.NoError(t, err)
// reactors[i] = conRI
// err = states[i].blockExec.Store().Save(states[i].state) // for save height 1's validators info
// require.NoError(t, err)
// }
// switches := p2p.MakeConnectedSwitches(config.P2P, N, func(i int, sw *p2p.Switch) *p2p.Switch {
// sw.SetLogger(p2pLogger.With("validator", i))
// sw.AddReactor("CONSENSUS", reactors[i])
// return sw
// sw.SetLogger(p2pLogger.With("validator", i))
// sw.AddReactor("CONSENSUS", reactors[i])
// return sw
// }, func(sws []*p2p.Switch, i, j int) {
// // the network starts partitioned with globally active adversary
// if i != 0 {
// return
// }
// p2p.Connect2Switches(sws, i, j)
// // the network starts partitioned with globally active adversary
// if i != 0 {
// return
// }
// p2p.Connect2Switches(sws, i, j)
// })
// // make first val byzantine
@ -370,26 +368,26 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
// // do any safety checks.
// states[0].privValidator.(types.MockPV).DisableChecks()
// states[0].decideProposal = func(j int32) func(int64, int32) {
// return func(height int64, round int32) {
// byzantineDecideProposalFunc(t, height, round, states[j], switches[j])
// }
// return func(height int64, round int32) {
// byzantineDecideProposalFunc(t, height, round, states[j], switches[j])
// }
// }(int32(0))
// // We are setting the prevote function to do nothing because the prevoting
// // and precommitting are done alongside the proposal.
// states[0].doPrevote = func(height int64, round int32) {}
// defer func() {
// for _, sw := range switches {
// err := sw.Stop()
// require.NoError(t, err)
// }
// for _, sw := range switches {
// err := sw.Stop()
// require.NoError(t, err)
// }
// }()
// // start the non-byz state machines.
// // note these must be started before the byz
// for i := 1; i < n; i++ {
// cr := reactors[i].(*Reactor)
// cr.SwitchToConsensus(cr.conS.GetState(), false)
// cr := reactors[i].(*Reactor)
// cr.SwitchToConsensus(cr.conS.GetState(), false)
// }
// // start the byzantine state machine
@ -421,146 +419,146 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
// // (one of them already has)
// wg := new(sync.WaitGroup)
// for i := 1; i < N-1; i++ {
// wg.Add(1)
// go func(j int) {
// <-blocksSubs[j].Out()
// wg.Done()
// }(i)
// wg.Add(1)
// go func(j int) {
// <-blocksSubs[j].Out()
// wg.Done()
// }(i)
// }
// done := make(chan struct{})
// go func() {
// wg.Wait()
// close(done)
// wg.Wait()
// close(done)
// }()
// tick := time.NewTicker(time.Second * 10)
// select {
// case <-done:
// case <-tick.C:
// for i, reactor := range reactors {
// t.Log(fmt.Sprintf("Consensus Reactor %v", i))
// t.Log(fmt.Sprintf("%v", reactor))
// }
// t.Fatalf("Timed out waiting for all validators to commit first block")
// for i, reactor := range reactors {
// t.Log(fmt.Sprintf("Consensus Reactor %v", i))
// t.Log(fmt.Sprintf("%v", reactor))
// }
// t.Fatalf("Timed out waiting for all validators to commit first block")
// }
}
// func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *State, sw *p2p.Switch) {
// // byzantine user should create two proposals and try to split the vote.
// // Avoid sending on internalMsgQueue and running consensus state.
// // Create a new proposal block from state/txs from the mempool.
// block1, blockParts1 := cs.createProposalBlock()
// polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
// proposal1 := types.NewProposal(height, round, polRound, propBlockID)
// p1 := proposal1.ToProto()
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p1); err != nil {
// t.Error(err)
// }
// proposal1.Signature = p1.Signature
// // some new transactions come in (this ensures that the proposals are different)
// deliverTxsRange(cs, 0, 1)
// // Create a new proposal block from state/txs from the mempool.
// block2, blockParts2 := cs.createProposalBlock()
// polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
// proposal2 := types.NewProposal(height, round, polRound, propBlockID)
// p2 := proposal2.ToProto()
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p2); err != nil {
// t.Error(err)
// }
// proposal2.Signature = p2.Signature
// block1Hash := block1.Hash()
// block2Hash := block2.Hash()
// // broadcast conflicting proposals/block parts to peers
// peers := sw.Peers().List()
// t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
// for i, peer := range peers {
// if i < len(peers)/2 {
// go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
// } else {
// go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
// }
// }
// // byzantine user should create two proposals and try to split the vote.
// // Avoid sending on internalMsgQueue and running consensus state.
// // Create a new proposal block from state/txs from the mempool.
// block1, blockParts1 := cs.createProposalBlock()
// polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
// proposal1 := types.NewProposal(height, round, polRound, propBlockID)
// p1 := proposal1.ToProto()
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p1); err != nil {
// t.Error(err)
// }
// proposal1.Signature = p1.Signature
// // some new transactions come in (this ensures that the proposals are different)
// deliverTxsRange(cs, 0, 1)
// // Create a new proposal block from state/txs from the mempool.
// block2, blockParts2 := cs.createProposalBlock()
// polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
// proposal2 := types.NewProposal(height, round, polRound, propBlockID)
// p2 := proposal2.ToProto()
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p2); err != nil {
// t.Error(err)
// }
// proposal2.Signature = p2.Signature
// block1Hash := block1.Hash()
// block2Hash := block2.Hash()
// // broadcast conflicting proposals/block parts to peers
// peers := sw.Peers().List()
// t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
// for i, peer := range peers {
// if i < len(peers)/2 {
// go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
// } else {
// go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
// }
// }
// }
// func sendProposalAndParts(
// height int64,
// round int32,
// cs *State,
// peer p2p.Peer,
// proposal *types.Proposal,
// blockHash []byte,
// parts *types.PartSet,
// height int64,
// round int32,
// cs *State,
// peer p2p.Peer,
// proposal *types.Proposal,
// blockHash []byte,
// parts *types.PartSet,
// ) {
// // proposal
// msg := &ProposalMessage{Proposal: proposal}
// peer.Send(DataChannel, MustEncode(msg))
// // parts
// for i := 0; i < int(parts.Total()); i++ {
// part := parts.GetPart(i)
// msg := &BlockPartMessage{
// Height: height, // This tells peer that this part applies to us.
// Round: round, // This tells peer that this part applies to us.
// Part: part,
// }
// peer.Send(DataChannel, MustEncode(msg))
// }
// // votes
// cs.mtx.Lock()
// prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
// precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
// cs.mtx.Unlock()
// peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
// peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
// // proposal
// msg := &ProposalMessage{Proposal: proposal}
// peer.Send(DataChannel, MustEncode(msg))
// // parts
// for i := 0; i < int(parts.Total()); i++ {
// part := parts.GetPart(i)
// msg := &BlockPartMessage{
// Height: height, // This tells peer that this part applies to us.
// Round: round, // This tells peer that this part applies to us.
// Part: part,
// }
// peer.Send(DataChannel, MustEncode(msg))
// }
// // votes
// cs.mtx.Lock()
// prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
// precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
// cs.mtx.Unlock()
// peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
// peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
// }
// type ByzantineReactor struct {
// service.Service
// reactor *Reactor
// service.Service
// reactor *Reactor
// }
// func NewByzantineReactor(conR *Reactor) *ByzantineReactor {
// return &ByzantineReactor{
// Service: conR,
// reactor: conR,
// }
// return &ByzantineReactor{
// Service: conR,
// reactor: conR,
// }
// }
// func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
// func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
// func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
// if !br.reactor.IsRunning() {
// return
// }
// // Create peerState for peer
// peerState := NewPeerState(peer).SetLogger(br.reactor.logger)
// peer.Set(types.PeerStateKey, peerState)
// // Send our state to peer.
// // If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
// if !br.reactor.waitSync {
// br.reactor.sendNewRoundStepMessage(peer)
// }
// if !br.reactor.IsRunning() {
// return
// }
// // Create peerState for peer
// peerState := NewPeerState(peer).SetLogger(br.reactor.logger)
// peer.Set(types.PeerStateKey, peerState)
// // Send our state to peer.
// // If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
// if !br.reactor.waitSync {
// br.reactor.sendNewRoundStepMessage(peer)
// }
// }
// func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// br.reactor.RemovePeer(peer, reason)
// br.reactor.RemovePeer(peer, reason)
// }
// func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
// br.reactor.Receive(chID, peer, msgBytes)
// br.reactor.Receive(chID, peer, msgBytes)
// }
// func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

+ 33
- 35
internal/evidence/pool.go View File

@ -36,14 +36,14 @@ type Pool struct {
evidenceList *clist.CList // concurrent linked-list of evidence
evidenceSize uint32 // amount of pending evidence
// needed to load validators to verify evidence
stateDB sm.Store
// needed to load headers and commits to verify evidence
blockStore BlockStore
stateDB sm.Store
mtx sync.Mutex
// latest state
state sm.State
state sm.State
isStarted bool
// evidence from consensus is buffered to this slice, awaiting until the next height
// before being flushed to the pool. This prevents broadcasting and proposing of
// evidence before the height with which the evidence happened is finished.
@ -60,46 +60,19 @@ type Pool struct {
Metrics *Metrics
}
func (evpool *Pool) SetEventBus(e *eventbus.EventBus) {
evpool.eventBus = e
}
// NewPool creates an evidence pool. If using an existing evidence store,
// it will add all pending evidence to the concurrent list.
func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore, metrics *Metrics) (*Pool, error) {
state, err := stateDB.Load()
if err != nil {
return nil, fmt.Errorf("failed to load state: %w", err)
}
pool := &Pool{
stateDB: stateDB,
func NewPool(logger log.Logger, evidenceDB dbm.DB, stateStore sm.Store, blockStore BlockStore, metrics *Metrics, eventBus *eventbus.EventBus) *Pool {
return &Pool{
blockStore: blockStore,
state: state,
stateDB: stateStore,
logger: logger,
evidenceStore: evidenceDB,
evidenceList: clist.New(),
consensusBuffer: make([]duplicateVoteSet, 0),
Metrics: metrics,
eventBus: eventBus,
}
// If pending evidence already in db, in event of prior failure, then check
// for expiration, update the size and load it back to the evidenceList.
pool.pruningHeight, pool.pruningTime = pool.removeExpiredPendingEvidence()
evList, _, err := pool.listEvidence(prefixPending, -1)
if err != nil {
return nil, err
}
atomic.StoreUint32(&pool.evidenceSize, uint32(len(evList)))
pool.Metrics.NumEvidence.Set(float64(pool.evidenceSize))
for _, ev := range evList {
pool.evidenceList.PushBack(ev)
}
pool.eventBus = nil
return pool, nil
}
// PendingEvidence is used primarily as part of block proposal and returns up to
@ -277,6 +250,31 @@ func (evpool *Pool) State() sm.State {
return evpool.state
}
func (evpool *Pool) Start(state sm.State) error {
if evpool.isStarted {
return errors.New("pool is already running")
}
evpool.state = state
// If pending evidence already in db, in event of prior failure, then check
// for expiration, update the size and load it back to the evidenceList.
evpool.pruningHeight, evpool.pruningTime = evpool.removeExpiredPendingEvidence()
evList, _, err := evpool.listEvidence(prefixPending, -1)
if err != nil {
return err
}
atomic.StoreUint32(&evpool.evidenceSize, uint32(len(evList)))
evpool.Metrics.NumEvidence.Set(float64(evpool.evidenceSize))
for _, ev := range evList {
evpool.evidenceList.PushBack(ev)
}
return nil
}
func (evpool *Pool) Close() error {
return evpool.evidenceStore.Close()
}
@ -449,6 +447,7 @@ func (evpool *Pool) listEvidence(prefixKey int64, maxBytes int64) ([]types.Evide
}
func (evpool *Pool) removeExpiredPendingEvidence() (int64, time.Time) {
batch := evpool.evidenceStore.NewBatch()
defer batch.Close()
@ -473,7 +472,6 @@ func (evpool *Pool) removeExpiredPendingEvidence() (int64, time.Time) {
// remove evidence from the clist
evpool.removeEvidenceFromList(blockEvidenceMap)
// update the evidence size
atomic.AddUint32(&evpool.evidenceSize, ^uint32(len(blockEvidenceMap)-1))


+ 53
- 48
internal/evidence/pool_test.go View File

@ -34,6 +34,18 @@ var (
defaultEvidenceMaxBytes int64 = 1000
)
func startPool(t *testing.T, pool *evidence.Pool, store sm.Store) {
t.Helper()
state, err := store.Load()
if err != nil {
t.Fatalf("cannot load state: %v", err)
}
if err := pool.Start(state); err != nil {
t.Fatalf("cannot start state pool: %v", err)
}
}
func TestEvidencePoolBasic(t *testing.T) {
var (
height = int64(1)
@ -51,9 +63,13 @@ func TestEvidencePoolBasic(t *testing.T) {
stateStore.On("LoadValidators", mock.AnythingOfType("int64")).Return(valSet, nil)
stateStore.On("Load").Return(createState(height+1, valSet), nil)
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
require.NoError(t, setupEventBus(ctx, pool))
logger := log.NewNopLogger()
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus)
startPool(t, pool, stateStore)
// evidence not seen yet:
evs, size := pool.PendingEvidence(defaultEvidenceMaxBytes)
require.Equal(t, 0, len(evs))
@ -115,10 +131,12 @@ func TestAddExpiredEvidence(t *testing.T) {
return &types.BlockMeta{Header: types.Header{Time: expiredEvidenceTime}}
})
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
logger := log.NewNopLogger()
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus)
startPool(t, pool, stateStore)
testCases := []struct {
evHeight int64
@ -159,9 +177,7 @@ func TestReportConflictingVotes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, pv := defaultTestPool(ctx, t, height)
require.NoError(t, setupEventBus(ctx, pool))
pool, pv, _ := defaultTestPool(ctx, t, height)
val := types.NewValidator(pv.PrivKey.PubKey(), 10)
@ -201,9 +217,7 @@ func TestEvidencePoolUpdate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, val := defaultTestPool(ctx, t, height)
require.NoError(t, setupEventBus(ctx, pool))
pool, val, _ := defaultTestPool(ctx, t, height)
state := pool.State()
@ -273,9 +287,7 @@ func TestVerifyPendingEvidencePasses(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, val := defaultTestPool(ctx, t, height)
require.NoError(t, setupEventBus(ctx, pool))
pool, val, _ := defaultTestPool(ctx, t, height)
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx,
@ -295,9 +307,7 @@ func TestVerifyDuplicatedEvidenceFails(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, val := defaultTestPool(ctx, t, height)
require.NoError(t, setupEventBus(ctx, pool))
pool, val, _ := defaultTestPool(ctx, t, height)
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx,
@ -321,7 +331,7 @@ func TestEventOnEvidenceValidated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, val := defaultTestPool(ctx, t, height)
pool, val, eventBus := defaultTestPool(ctx, t, height)
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx,
@ -332,11 +342,6 @@ func TestEventOnEvidenceValidated(t *testing.T) {
)
require.NoError(t, err)
eventBus := eventbus.NewDefault(log.TestingLogger())
require.NoError(t, eventBus.Start(ctx))
pool.SetEventBus(eventBus)
const query = `tm.event='EvidenceValidated'`
evSub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{
ClientID: "test",
@ -348,6 +353,9 @@ func TestEventOnEvidenceValidated(t *testing.T) {
go func() {
defer close(done)
msg, err := evSub.Next(ctx)
if ctx.Err() != nil {
return
}
assert.NoError(t, err)
edt := msg.Data().(types.EventDataEvidenceValidated)
@ -394,14 +402,15 @@ func TestLightClientAttackEvidenceLifecycle(t *testing.T) {
blockStore.On("LoadBlockCommit", height).Return(trusted.Commit)
blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
logger := log.NewNopLogger()
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus)
hash := ev.Hash()
err = pool.AddEvidence(ctx, ev)
err := pool.AddEvidence(ctx, ev)
require.NoError(t, err)
err = pool.AddEvidence(ctx, ev)
require.NoError(t, err)
@ -449,11 +458,13 @@ func TestRecoverPendingEvidence(t *testing.T) {
blockStore, err := initializeBlockStore(dbm.NewMemDB(), state, valAddress)
require.NoError(t, err)
// create previous pool and populate it
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
logger := log.NewNopLogger()
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
// create previous pool and populate it
pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus)
startPool(t, pool, stateStore)
goodEvidence, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx,
@ -495,9 +506,8 @@ func TestRecoverPendingEvidence(t *testing.T) {
},
}, nil)
newPool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, newStateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
newPool := evidence.NewPool(logger, evidenceDB, newStateStore, blockStore, evidence.NopMetrics(), nil)
startPool(t, newPool, newStateStore)
evList, _ := newPool.PendingEvidence(defaultEvidenceMaxBytes)
require.Equal(t, 1, len(evList))
@ -590,7 +600,7 @@ func makeCommit(height int64, valAddr []byte) *types.Commit {
return types.NewCommit(height, 0, types.BlockID{}, commitSigs)
}
func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence.Pool, types.MockPV) {
func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence.Pool, types.MockPV, *eventbus.EventBus) {
t.Helper()
val := types.NewMockPV()
valAddress := val.PrivKey.PubKey().Address()
@ -601,10 +611,14 @@ func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence
blockStore, err := initializeBlockStore(dbm.NewMemDB(), state, valAddress)
require.NoError(t, err)
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err, "test evidence pool could not be created")
logger := log.NewNopLogger()
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
return pool, val
pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus)
startPool(t, pool, stateStore)
return pool, val, eventBus
}
func createState(height int64, valSet *types.ValidatorSet) sm.State {
@ -616,12 +630,3 @@ func createState(height int64, valSet *types.ValidatorSet) sm.State {
ConsensusParams: *types.DefaultConsensusParams(),
}
}
func setupEventBus(ctx context.Context, evpool *evidence.Pool) error {
eventBus := eventbus.NewDefault(log.TestingLogger())
if err := eventBus.Start(ctx); err != nil {
return err
}
evpool.SetEventBus(eventBus)
return nil
}

+ 5
- 4
internal/evidence/reactor_test.go View File

@ -82,13 +82,14 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint
}
return nil
})
rts.pools[nodeID], err = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore, evidence.NopMetrics())
require.NoError(t, err)
eventBus := eventbus.NewDefault(logger)
err = eventBus.Start(ctx)
require.NoError(t, err)
rts.pools[nodeID].SetEventBus(eventBus)
rts.pools[nodeID] = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore, evidence.NopMetrics(), eventBus)
startPool(t, rts.pools[nodeID], stateStores[idx])
require.NoError(t, err)
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)


+ 34
- 26
internal/evidence/verify_test.go View File

@ -12,6 +12,7 @@ import (
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/evidence/mocks"
sm "github.com/tendermint/tendermint/internal/state"
@ -76,6 +77,7 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) {
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewNopLogger()
attackTime := defaultEvidenceTime.Add(1 * time.Hour)
// create valid lunatic evidence
@ -96,8 +98,7 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) {
blockStore.On("LoadBlockMeta", height).Return(&types.BlockMeta{Header: *trusted.Header})
blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit)
blockStore.On("LoadBlockCommit", height).Return(trusted.Commit)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
pool := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil)
evList := types.EvidenceList{ev}
// check that the evidence pool correctly verifies the evidence
@ -111,32 +112,29 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) {
// if we submit evidence only against a single byzantine validator when we see there are more validators then this
// should return an error
ev.ByzantineValidators = ev.ByzantineValidators[:1]
t.Log(evList)
assert.Error(t, pool.CheckEvidence(ctx, evList))
// restore original byz vals
ev.ByzantineValidators = ev.GetByzantineValidators(common.ValidatorSet, trusted.SignedHeader)
// duplicate evidence should be rejected
evList = types.EvidenceList{ev, ev}
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil)
assert.Error(t, pool.CheckEvidence(ctx, evList))
// If evidence is submitted with an altered timestamp it should return an error
ev.Timestamp = defaultEvidenceTime.Add(1 * time.Minute)
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
ev.Timestamp = defaultEvidenceTime.Add(1 * time.Minute)
pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus)
err = pool.AddEvidence(ctx, ev)
err := pool.AddEvidence(ctx, ev)
assert.Error(t, err)
ev.Timestamp = defaultEvidenceTime
// Evidence submitted with a different validator power should fail
ev.TotalVotingPower = 1
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil)
err = pool.AddEvidence(ctx, ev)
assert.Error(t, err)
ev.TotalVotingPower = common.ValidatorSet.TotalVotingPower()
@ -154,6 +152,9 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewNopLogger()
// create a forward lunatic attack
ev, trusted, common := makeLunaticEvidence(ctx,
t, attackHeight, commonHeight, totalVals, byzVals, totalVals-byzVals, defaultEvidenceTime, attackTime)
@ -179,10 +180,11 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) {
blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit)
blockStore.On("LoadBlockCommit", nodeHeight).Return(trusted.Commit)
blockStore.On("Height").Return(nodeHeight)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
require.NoError(t, setupEventBus(ctx, pool))
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus)
// check that the evidence pool correctly verifies the evidence
assert.NoError(t, pool.CheckEvidence(ctx, types.EvidenceList{ev}))
@ -199,8 +201,7 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) {
oldBlockStore.On("Height").Return(nodeHeight)
require.Equal(t, defaultEvidenceTime, oldBlockStore.LoadBlockMeta(nodeHeight).Header.Time)
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, oldBlockStore, evidence.NopMetrics())
require.NoError(t, err)
pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, oldBlockStore, evidence.NopMetrics(), nil)
assert.Error(t, pool.CheckEvidence(ctx, types.EvidenceList{ev}))
}
@ -208,6 +209,8 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewNopLogger()
conflictingVals, conflictingPrivVals := factory.ValidatorSet(ctx, t, 5, 10)
conflictingHeader := factory.MakeHeader(t, &types.Header{
@ -289,10 +292,10 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) {
blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader})
blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus)
evList := types.EvidenceList{ev}
err = pool.CheckEvidence(ctx, evList)
@ -305,6 +308,9 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) {
func TestVerifyLightClientAttack_Amnesia(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewNopLogger()
var height int64 = 10
conflictingVals, conflictingPrivVals := factory.ValidatorSet(ctx, t, 5, 10)
@ -378,10 +384,10 @@ func TestVerifyLightClientAttack_Amnesia(t *testing.T) {
blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader})
blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus)
evList := types.EvidenceList{ev}
err = pool.CheckEvidence(ctx, evList)
@ -401,6 +407,7 @@ func TestVerifyDuplicateVoteEvidence(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewNopLogger()
val := types.NewMockPV()
val2 := types.NewMockPV()
valSet := types.NewValidatorSet([]*types.Validator{val.ExtractIntoValidator(ctx, 1)})
@ -478,10 +485,11 @@ func TestVerifyDuplicateVoteEvidence(t *testing.T) {
blockStore := &mocks.BlockStore{}
blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: types.Header{Time: defaultEvidenceTime}})
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus)
startPool(t, pool, stateStore)
evList := types.EvidenceList{goodEv}
err = pool.CheckEvidence(ctx, evList)


+ 11
- 1
node/node.go View File

@ -62,7 +62,8 @@ type nodeImpl struct {
// services
eventSinks []indexer.EventSink
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
blockStore *store.BlockStore // store the blockchain to disk
evPool *evidence.Pool
stateSync bool // whether the node should state sync on startup
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
@ -388,6 +389,7 @@ func makeNode(
blockStore: blockStore,
stateSyncReactor: stateSyncReactor,
stateSync: stateSync,
evPool: evPool,
shutdownOps: makeCloser(closers),
@ -462,6 +464,14 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
}
}
state, err := n.stateStore.Load()
if err != nil {
return err
}
if err := n.evPool.Start(state); err != nil {
return err
}
n.rpcEnv.NodeInfo = n.nodeInfo
// Start the RPC server before the P2P server
// so we can eg. receive txs for the first block


+ 1
- 2
node/node_test.go View File

@ -297,8 +297,7 @@ func TestCreateProposalBlock(t *testing.T) {
// Make EvidencePool
evidenceDB := dbm.NewMemDB()
blockStore := store.NewBlockStore(dbm.NewMemDB())
evidencePool, err := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
evidencePool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), nil)
// fill the evidence pool with more evidence
// than can fit in a block


+ 1
- 6
node/setup.go View File

@ -228,12 +228,7 @@ func createEvidenceReactor(
logger = logger.With("module", "evidence")
evidencePool, err := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics)
if err != nil {
return nil, nil, fmt.Errorf("creating evidence pool: %w", err)
}
evidencePool.SetEventBus(eventBus)
evidencePool := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics, eventBus)
evidenceReactor, err := evidence.NewReactor(
ctx,


Loading…
Cancel
Save