Browse Source

Merge branch 'master' into thane/fix-protogen

pull/7975/head
Thane Thomson 3 years ago
committed by GitHub
parent
commit
d10d0e4bf7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 322 additions and 291 deletions
  1. +4
    -3
      docs/tendermint-core/consensus/proposer-based-timestamps.md
  2. +143
    -145
      internal/consensus/byzantine_test.go
  3. +1
    -1
      internal/consensus/mempool_test.go
  4. +33
    -35
      internal/evidence/pool.go
  5. +53
    -48
      internal/evidence/pool_test.go
  6. +5
    -4
      internal/evidence/reactor_test.go
  7. +34
    -26
      internal/evidence/verify_test.go
  8. +1
    -1
      internal/libs/queue/queue_test.go
  9. +8
    -8
      internal/p2p/conn/secret_connection_test.go
  10. +1
    -1
      internal/pubsub/query/syntax/syntax_test.go
  11. +25
    -9
      internal/statesync/reactor_test.go
  12. +11
    -1
      node/node.go
  13. +1
    -2
      node/node_test.go
  14. +1
    -6
      node/setup.go
  15. +1
    -1
      types/vote_set_test.go

+ 4
- 3
docs/tendermint-core/consensus/proposer-based-timestamps.md View File

@ -1,4 +1,6 @@
--- order: 3 ---
---
order: 3
---
# PBTS # PBTS
@ -6,7 +8,7 @@
algorithm added to Tendermint in the v0.36 release. It outlines the core algorithm added to Tendermint in the v0.36 release. It outlines the core
functionality as well as the parameters and constraints of the this algorithm. functionality as well as the parameters and constraints of the this algorithm.
## Algorithm Overview
## Algorithm Overview
The PBTS algorithm defines a way for a Tendermint blockchain to create block The PBTS algorithm defines a way for a Tendermint blockchain to create block
timestamps that are within a reasonable bound of the clocks of the validators on timestamps that are within a reasonable bound of the clocks of the validators on
@ -91,4 +93,3 @@ proposed by their peers to be valid either.
* [The PBTS specification](https://github.com/tendermint/tendermint/blob/master/spec/consensus/proposer-based-timestamp/README.md) * [The PBTS specification](https://github.com/tendermint/tendermint/blob/master/spec/consensus/proposer-based-timestamp/README.md)
contains all of the details of the algorithm. contains all of the details of the algorithm.

+ 143
- 145
internal/consensus/byzantine_test.go View File

@ -87,10 +87,12 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
mempool.EnableTxsAvailable() mempool.EnableTxsAvailable()
} }
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))
require.NoError(t, eventBus.Start(ctx))
// Make a full instance of the evidence pool // Make a full instance of the evidence pool
evidenceDB := dbm.NewMemDB() evidenceDB := dbm.NewMemDB()
evpool, err := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
evpool := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus)
// Make State // Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
@ -100,11 +102,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
pv := privVals[i] pv := privVals[i]
cs.SetPrivValidator(ctx, pv) cs.SetPrivValidator(ctx, pv)
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))
err = eventBus.Start(ctx)
require.NoError(t, err)
cs.SetEventBus(eventBus) cs.SetEventBus(eventBus)
evpool.SetEventBus(eventBus)
cs.SetTimeoutTicker(tickerFunc()) cs.SetTimeoutTicker(tickerFunc())
states[i] = cs states[i] = cs
@ -327,42 +325,42 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
// blocksSubs := make([]types.Subscription, n) // blocksSubs := make([]types.Subscription, n)
// reactors := make([]p2p.Reactor, n) // reactors := make([]p2p.Reactor, n)
// for i := 0; i < n; i++ { // for i := 0; i < n; i++ {
// // enable txs so we can create different proposals
// assertMempool(states[i].txNotifier).EnableTxsAvailable()
// // enable txs so we can create different proposals
// assertMempool(states[i].txNotifier).EnableTxsAvailable()
// eventBus := states[i].eventBus
// eventBus.SetLogger(logger.With("module", "events", "validator", i))
// eventBus := states[i].eventBus
// eventBus.SetLogger(logger.With("module", "events", "validator", i))
// var err error
// blocksSubs[i], err = eventBus.Subscribe(ctx, testSubscriber, types.EventQueryNewBlock)
// require.NoError(t, err)
// var err error
// blocksSubs[i], err = eventBus.Subscribe(ctx, testSubscriber, types.EventQueryNewBlock)
// require.NoError(t, err)
// conR := NewReactor(states[i], true) // so we don't start the consensus states
// conR.SetLogger(logger.With("validator", i))
// conR.SetEventBus(eventBus)
// conR := NewReactor(states[i], true) // so we don't start the consensus states
// conR.SetLogger(logger.With("validator", i))
// conR.SetEventBus(eventBus)
// var conRI p2p.Reactor = conR
// var conRI p2p.Reactor = conR
// // make first val byzantine
// if i == 0 {
// conRI = NewByzantineReactor(conR)
// }
// // make first val byzantine
// if i == 0 {
// conRI = NewByzantineReactor(conR)
// }
// reactors[i] = conRI
// err = states[i].blockExec.Store().Save(states[i].state) // for save height 1's validators info
// require.NoError(t, err)
// reactors[i] = conRI
// err = states[i].blockExec.Store().Save(states[i].state) // for save height 1's validators info
// require.NoError(t, err)
// } // }
// switches := p2p.MakeConnectedSwitches(config.P2P, N, func(i int, sw *p2p.Switch) *p2p.Switch { // switches := p2p.MakeConnectedSwitches(config.P2P, N, func(i int, sw *p2p.Switch) *p2p.Switch {
// sw.SetLogger(p2pLogger.With("validator", i))
// sw.AddReactor("CONSENSUS", reactors[i])
// return sw
// sw.SetLogger(p2pLogger.With("validator", i))
// sw.AddReactor("CONSENSUS", reactors[i])
// return sw
// }, func(sws []*p2p.Switch, i, j int) { // }, func(sws []*p2p.Switch, i, j int) {
// // the network starts partitioned with globally active adversary
// if i != 0 {
// return
// }
// p2p.Connect2Switches(sws, i, j)
// // the network starts partitioned with globally active adversary
// if i != 0 {
// return
// }
// p2p.Connect2Switches(sws, i, j)
// }) // })
// // make first val byzantine // // make first val byzantine
@ -370,26 +368,26 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
// // do any safety checks. // // do any safety checks.
// states[0].privValidator.(types.MockPV).DisableChecks() // states[0].privValidator.(types.MockPV).DisableChecks()
// states[0].decideProposal = func(j int32) func(int64, int32) { // states[0].decideProposal = func(j int32) func(int64, int32) {
// return func(height int64, round int32) {
// byzantineDecideProposalFunc(t, height, round, states[j], switches[j])
// }
// return func(height int64, round int32) {
// byzantineDecideProposalFunc(t, height, round, states[j], switches[j])
// }
// }(int32(0)) // }(int32(0))
// // We are setting the prevote function to do nothing because the prevoting // // We are setting the prevote function to do nothing because the prevoting
// // and precommitting are done alongside the proposal. // // and precommitting are done alongside the proposal.
// states[0].doPrevote = func(height int64, round int32) {} // states[0].doPrevote = func(height int64, round int32) {}
// defer func() { // defer func() {
// for _, sw := range switches {
// err := sw.Stop()
// require.NoError(t, err)
// }
// for _, sw := range switches {
// err := sw.Stop()
// require.NoError(t, err)
// }
// }() // }()
// // start the non-byz state machines. // // start the non-byz state machines.
// // note these must be started before the byz // // note these must be started before the byz
// for i := 1; i < n; i++ { // for i := 1; i < n; i++ {
// cr := reactors[i].(*Reactor)
// cr.SwitchToConsensus(cr.conS.GetState(), false)
// cr := reactors[i].(*Reactor)
// cr.SwitchToConsensus(cr.conS.GetState(), false)
// } // }
// // start the byzantine state machine // // start the byzantine state machine
@ -421,146 +419,146 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
// // (one of them already has) // // (one of them already has)
// wg := new(sync.WaitGroup) // wg := new(sync.WaitGroup)
// for i := 1; i < N-1; i++ { // for i := 1; i < N-1; i++ {
// wg.Add(1)
// go func(j int) {
// <-blocksSubs[j].Out()
// wg.Done()
// }(i)
// wg.Add(1)
// go func(j int) {
// <-blocksSubs[j].Out()
// wg.Done()
// }(i)
// } // }
// done := make(chan struct{}) // done := make(chan struct{})
// go func() { // go func() {
// wg.Wait()
// close(done)
// wg.Wait()
// close(done)
// }() // }()
// tick := time.NewTicker(time.Second * 10) // tick := time.NewTicker(time.Second * 10)
// select { // select {
// case <-done: // case <-done:
// case <-tick.C: // case <-tick.C:
// for i, reactor := range reactors {
// t.Log(fmt.Sprintf("Consensus Reactor %v", i))
// t.Log(fmt.Sprintf("%v", reactor))
// }
// t.Fatalf("Timed out waiting for all validators to commit first block")
// for i, reactor := range reactors {
// t.Log(fmt.Sprintf("Consensus Reactor %v", i))
// t.Log(fmt.Sprintf("%v", reactor))
// }
// t.Fatalf("Timed out waiting for all validators to commit first block")
// } // }
} }
// func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *State, sw *p2p.Switch) { // func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *State, sw *p2p.Switch) {
// // byzantine user should create two proposals and try to split the vote.
// // Avoid sending on internalMsgQueue and running consensus state.
// // Create a new proposal block from state/txs from the mempool.
// block1, blockParts1 := cs.createProposalBlock()
// polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
// proposal1 := types.NewProposal(height, round, polRound, propBlockID)
// p1 := proposal1.ToProto()
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p1); err != nil {
// t.Error(err)
// }
// proposal1.Signature = p1.Signature
// // some new transactions come in (this ensures that the proposals are different)
// deliverTxsRange(cs, 0, 1)
// // Create a new proposal block from state/txs from the mempool.
// block2, blockParts2 := cs.createProposalBlock()
// polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
// proposal2 := types.NewProposal(height, round, polRound, propBlockID)
// p2 := proposal2.ToProto()
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p2); err != nil {
// t.Error(err)
// }
// proposal2.Signature = p2.Signature
// block1Hash := block1.Hash()
// block2Hash := block2.Hash()
// // broadcast conflicting proposals/block parts to peers
// peers := sw.Peers().List()
// t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
// for i, peer := range peers {
// if i < len(peers)/2 {
// go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
// } else {
// go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
// }
// }
// // byzantine user should create two proposals and try to split the vote.
// // Avoid sending on internalMsgQueue and running consensus state.
// // Create a new proposal block from state/txs from the mempool.
// block1, blockParts1 := cs.createProposalBlock()
// polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
// proposal1 := types.NewProposal(height, round, polRound, propBlockID)
// p1 := proposal1.ToProto()
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p1); err != nil {
// t.Error(err)
// }
// proposal1.Signature = p1.Signature
// // some new transactions come in (this ensures that the proposals are different)
// deliverTxsRange(cs, 0, 1)
// // Create a new proposal block from state/txs from the mempool.
// block2, blockParts2 := cs.createProposalBlock()
// polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
// proposal2 := types.NewProposal(height, round, polRound, propBlockID)
// p2 := proposal2.ToProto()
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p2); err != nil {
// t.Error(err)
// }
// proposal2.Signature = p2.Signature
// block1Hash := block1.Hash()
// block2Hash := block2.Hash()
// // broadcast conflicting proposals/block parts to peers
// peers := sw.Peers().List()
// t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
// for i, peer := range peers {
// if i < len(peers)/2 {
// go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
// } else {
// go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
// }
// }
// } // }
// func sendProposalAndParts( // func sendProposalAndParts(
// height int64,
// round int32,
// cs *State,
// peer p2p.Peer,
// proposal *types.Proposal,
// blockHash []byte,
// parts *types.PartSet,
// height int64,
// round int32,
// cs *State,
// peer p2p.Peer,
// proposal *types.Proposal,
// blockHash []byte,
// parts *types.PartSet,
// ) { // ) {
// // proposal
// msg := &ProposalMessage{Proposal: proposal}
// peer.Send(DataChannel, MustEncode(msg))
// // parts
// for i := 0; i < int(parts.Total()); i++ {
// part := parts.GetPart(i)
// msg := &BlockPartMessage{
// Height: height, // This tells peer that this part applies to us.
// Round: round, // This tells peer that this part applies to us.
// Part: part,
// }
// peer.Send(DataChannel, MustEncode(msg))
// }
// // votes
// cs.mtx.Lock()
// prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
// precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
// cs.mtx.Unlock()
// peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
// peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
// // proposal
// msg := &ProposalMessage{Proposal: proposal}
// peer.Send(DataChannel, MustEncode(msg))
// // parts
// for i := 0; i < int(parts.Total()); i++ {
// part := parts.GetPart(i)
// msg := &BlockPartMessage{
// Height: height, // This tells peer that this part applies to us.
// Round: round, // This tells peer that this part applies to us.
// Part: part,
// }
// peer.Send(DataChannel, MustEncode(msg))
// }
// // votes
// cs.mtx.Lock()
// prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
// precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
// cs.mtx.Unlock()
// peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
// peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
// } // }
// type ByzantineReactor struct { // type ByzantineReactor struct {
// service.Service
// reactor *Reactor
// service.Service
// reactor *Reactor
// } // }
// func NewByzantineReactor(conR *Reactor) *ByzantineReactor { // func NewByzantineReactor(conR *Reactor) *ByzantineReactor {
// return &ByzantineReactor{
// Service: conR,
// reactor: conR,
// }
// return &ByzantineReactor{
// Service: conR,
// reactor: conR,
// }
// } // }
// func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) } // func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
// func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() } // func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
// func (br *ByzantineReactor) AddPeer(peer p2p.Peer) { // func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
// if !br.reactor.IsRunning() {
// return
// }
// // Create peerState for peer
// peerState := NewPeerState(peer).SetLogger(br.reactor.logger)
// peer.Set(types.PeerStateKey, peerState)
// // Send our state to peer.
// // If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
// if !br.reactor.waitSync {
// br.reactor.sendNewRoundStepMessage(peer)
// }
// if !br.reactor.IsRunning() {
// return
// }
// // Create peerState for peer
// peerState := NewPeerState(peer).SetLogger(br.reactor.logger)
// peer.Set(types.PeerStateKey, peerState)
// // Send our state to peer.
// // If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
// if !br.reactor.waitSync {
// br.reactor.sendNewRoundStepMessage(peer)
// }
// } // }
// func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) { // func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// br.reactor.RemovePeer(peer, reason)
// br.reactor.RemovePeer(peer, reason)
// } // }
// func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { // func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
// br.reactor.Receive(chID, peer, msgBytes)
// br.reactor.Receive(chID, peer, msgBytes)
// } // }
// func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer } // func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

+ 1
- 1
internal/consensus/mempool_test.go View File

@ -212,7 +212,7 @@ func TestMempoolRmBadTx(t *testing.T) {
checkTxRespCh <- struct{}{} checkTxRespCh <- struct{}{}
}, mempool.TxInfo{}) }, mempool.TxInfo{})
if err != nil { if err != nil {
t.Errorf("error after CheckTx: %w", err)
t.Errorf("error after CheckTx: %v", err)
return return
} }


+ 33
- 35
internal/evidence/pool.go View File

@ -36,14 +36,14 @@ type Pool struct {
evidenceList *clist.CList // concurrent linked-list of evidence evidenceList *clist.CList // concurrent linked-list of evidence
evidenceSize uint32 // amount of pending evidence evidenceSize uint32 // amount of pending evidence
// needed to load validators to verify evidence
stateDB sm.Store
// needed to load headers and commits to verify evidence // needed to load headers and commits to verify evidence
blockStore BlockStore blockStore BlockStore
stateDB sm.Store
mtx sync.Mutex mtx sync.Mutex
// latest state // latest state
state sm.State
state sm.State
isStarted bool
// evidence from consensus is buffered to this slice, awaiting until the next height // evidence from consensus is buffered to this slice, awaiting until the next height
// before being flushed to the pool. This prevents broadcasting and proposing of // before being flushed to the pool. This prevents broadcasting and proposing of
// evidence before the height with which the evidence happened is finished. // evidence before the height with which the evidence happened is finished.
@ -60,46 +60,19 @@ type Pool struct {
Metrics *Metrics Metrics *Metrics
} }
func (evpool *Pool) SetEventBus(e *eventbus.EventBus) {
evpool.eventBus = e
}
// NewPool creates an evidence pool. If using an existing evidence store, // NewPool creates an evidence pool. If using an existing evidence store,
// it will add all pending evidence to the concurrent list. // it will add all pending evidence to the concurrent list.
func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore, metrics *Metrics) (*Pool, error) {
state, err := stateDB.Load()
if err != nil {
return nil, fmt.Errorf("failed to load state: %w", err)
}
pool := &Pool{
stateDB: stateDB,
func NewPool(logger log.Logger, evidenceDB dbm.DB, stateStore sm.Store, blockStore BlockStore, metrics *Metrics, eventBus *eventbus.EventBus) *Pool {
return &Pool{
blockStore: blockStore, blockStore: blockStore,
state: state,
stateDB: stateStore,
logger: logger, logger: logger,
evidenceStore: evidenceDB, evidenceStore: evidenceDB,
evidenceList: clist.New(), evidenceList: clist.New(),
consensusBuffer: make([]duplicateVoteSet, 0), consensusBuffer: make([]duplicateVoteSet, 0),
Metrics: metrics, Metrics: metrics,
eventBus: eventBus,
} }
// If pending evidence already in db, in event of prior failure, then check
// for expiration, update the size and load it back to the evidenceList.
pool.pruningHeight, pool.pruningTime = pool.removeExpiredPendingEvidence()
evList, _, err := pool.listEvidence(prefixPending, -1)
if err != nil {
return nil, err
}
atomic.StoreUint32(&pool.evidenceSize, uint32(len(evList)))
pool.Metrics.NumEvidence.Set(float64(pool.evidenceSize))
for _, ev := range evList {
pool.evidenceList.PushBack(ev)
}
pool.eventBus = nil
return pool, nil
} }
// PendingEvidence is used primarily as part of block proposal and returns up to // PendingEvidence is used primarily as part of block proposal and returns up to
@ -277,6 +250,31 @@ func (evpool *Pool) State() sm.State {
return evpool.state return evpool.state
} }
func (evpool *Pool) Start(state sm.State) error {
if evpool.isStarted {
return errors.New("pool is already running")
}
evpool.state = state
// If pending evidence already in db, in event of prior failure, then check
// for expiration, update the size and load it back to the evidenceList.
evpool.pruningHeight, evpool.pruningTime = evpool.removeExpiredPendingEvidence()
evList, _, err := evpool.listEvidence(prefixPending, -1)
if err != nil {
return err
}
atomic.StoreUint32(&evpool.evidenceSize, uint32(len(evList)))
evpool.Metrics.NumEvidence.Set(float64(evpool.evidenceSize))
for _, ev := range evList {
evpool.evidenceList.PushBack(ev)
}
return nil
}
func (evpool *Pool) Close() error { func (evpool *Pool) Close() error {
return evpool.evidenceStore.Close() return evpool.evidenceStore.Close()
} }
@ -449,6 +447,7 @@ func (evpool *Pool) listEvidence(prefixKey int64, maxBytes int64) ([]types.Evide
} }
func (evpool *Pool) removeExpiredPendingEvidence() (int64, time.Time) { func (evpool *Pool) removeExpiredPendingEvidence() (int64, time.Time) {
batch := evpool.evidenceStore.NewBatch() batch := evpool.evidenceStore.NewBatch()
defer batch.Close() defer batch.Close()
@ -473,7 +472,6 @@ func (evpool *Pool) removeExpiredPendingEvidence() (int64, time.Time) {
// remove evidence from the clist // remove evidence from the clist
evpool.removeEvidenceFromList(blockEvidenceMap) evpool.removeEvidenceFromList(blockEvidenceMap)
// update the evidence size // update the evidence size
atomic.AddUint32(&evpool.evidenceSize, ^uint32(len(blockEvidenceMap)-1)) atomic.AddUint32(&evpool.evidenceSize, ^uint32(len(blockEvidenceMap)-1))


+ 53
- 48
internal/evidence/pool_test.go View File

@ -34,6 +34,18 @@ var (
defaultEvidenceMaxBytes int64 = 1000 defaultEvidenceMaxBytes int64 = 1000
) )
func startPool(t *testing.T, pool *evidence.Pool, store sm.Store) {
t.Helper()
state, err := store.Load()
if err != nil {
t.Fatalf("cannot load state: %v", err)
}
if err := pool.Start(state); err != nil {
t.Fatalf("cannot start state pool: %v", err)
}
}
func TestEvidencePoolBasic(t *testing.T) { func TestEvidencePoolBasic(t *testing.T) {
var ( var (
height = int64(1) height = int64(1)
@ -51,9 +63,13 @@ func TestEvidencePoolBasic(t *testing.T) {
stateStore.On("LoadValidators", mock.AnythingOfType("int64")).Return(valSet, nil) stateStore.On("LoadValidators", mock.AnythingOfType("int64")).Return(valSet, nil)
stateStore.On("Load").Return(createState(height+1, valSet), nil) stateStore.On("Load").Return(createState(height+1, valSet), nil)
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
require.NoError(t, setupEventBus(ctx, pool))
logger := log.NewNopLogger()
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus)
startPool(t, pool, stateStore)
// evidence not seen yet: // evidence not seen yet:
evs, size := pool.PendingEvidence(defaultEvidenceMaxBytes) evs, size := pool.PendingEvidence(defaultEvidenceMaxBytes)
require.Equal(t, 0, len(evs)) require.Equal(t, 0, len(evs))
@ -115,10 +131,12 @@ func TestAddExpiredEvidence(t *testing.T) {
return &types.BlockMeta{Header: types.Header{Time: expiredEvidenceTime}} return &types.BlockMeta{Header: types.Header{Time: expiredEvidenceTime}}
}) })
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
logger := log.NewNopLogger()
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus)
startPool(t, pool, stateStore)
testCases := []struct { testCases := []struct {
evHeight int64 evHeight int64
@ -159,9 +177,7 @@ func TestReportConflictingVotes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
pool, pv := defaultTestPool(ctx, t, height)
require.NoError(t, setupEventBus(ctx, pool))
pool, pv, _ := defaultTestPool(ctx, t, height)
val := types.NewValidator(pv.PrivKey.PubKey(), 10) val := types.NewValidator(pv.PrivKey.PubKey(), 10)
@ -201,9 +217,7 @@ func TestEvidencePoolUpdate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
pool, val := defaultTestPool(ctx, t, height)
require.NoError(t, setupEventBus(ctx, pool))
pool, val, _ := defaultTestPool(ctx, t, height)
state := pool.State() state := pool.State()
@ -273,9 +287,7 @@ func TestVerifyPendingEvidencePasses(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
pool, val := defaultTestPool(ctx, t, height)
require.NoError(t, setupEventBus(ctx, pool))
pool, val, _ := defaultTestPool(ctx, t, height)
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator( ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx, ctx,
@ -295,9 +307,7 @@ func TestVerifyDuplicatedEvidenceFails(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
pool, val := defaultTestPool(ctx, t, height)
require.NoError(t, setupEventBus(ctx, pool))
pool, val, _ := defaultTestPool(ctx, t, height)
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator( ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx, ctx,
@ -321,7 +331,7 @@ func TestEventOnEvidenceValidated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
pool, val := defaultTestPool(ctx, t, height)
pool, val, eventBus := defaultTestPool(ctx, t, height)
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator( ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx, ctx,
@ -332,11 +342,6 @@ func TestEventOnEvidenceValidated(t *testing.T) {
) )
require.NoError(t, err) require.NoError(t, err)
eventBus := eventbus.NewDefault(log.TestingLogger())
require.NoError(t, eventBus.Start(ctx))
pool.SetEventBus(eventBus)
const query = `tm.event='EvidenceValidated'` const query = `tm.event='EvidenceValidated'`
evSub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ evSub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{
ClientID: "test", ClientID: "test",
@ -348,6 +353,9 @@ func TestEventOnEvidenceValidated(t *testing.T) {
go func() { go func() {
defer close(done) defer close(done)
msg, err := evSub.Next(ctx) msg, err := evSub.Next(ctx)
if ctx.Err() != nil {
return
}
assert.NoError(t, err) assert.NoError(t, err)
edt := msg.Data().(types.EventDataEvidenceValidated) edt := msg.Data().(types.EventDataEvidenceValidated)
@ -394,14 +402,15 @@ func TestLightClientAttackEvidenceLifecycle(t *testing.T) {
blockStore.On("LoadBlockCommit", height).Return(trusted.Commit) blockStore.On("LoadBlockCommit", height).Return(trusted.Commit)
blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit) blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
logger := log.NewNopLogger()
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus)
hash := ev.Hash() hash := ev.Hash()
err = pool.AddEvidence(ctx, ev)
err := pool.AddEvidence(ctx, ev)
require.NoError(t, err) require.NoError(t, err)
err = pool.AddEvidence(ctx, ev) err = pool.AddEvidence(ctx, ev)
require.NoError(t, err) require.NoError(t, err)
@ -449,11 +458,13 @@ func TestRecoverPendingEvidence(t *testing.T) {
blockStore, err := initializeBlockStore(dbm.NewMemDB(), state, valAddress) blockStore, err := initializeBlockStore(dbm.NewMemDB(), state, valAddress)
require.NoError(t, err) require.NoError(t, err)
// create previous pool and populate it
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
logger := log.NewNopLogger()
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
// create previous pool and populate it
pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus)
startPool(t, pool, stateStore)
goodEvidence, err := types.NewMockDuplicateVoteEvidenceWithValidator( goodEvidence, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx, ctx,
@ -495,9 +506,8 @@ func TestRecoverPendingEvidence(t *testing.T) {
}, },
}, nil) }, nil)
newPool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, newStateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
newPool := evidence.NewPool(logger, evidenceDB, newStateStore, blockStore, evidence.NopMetrics(), nil)
startPool(t, newPool, newStateStore)
evList, _ := newPool.PendingEvidence(defaultEvidenceMaxBytes) evList, _ := newPool.PendingEvidence(defaultEvidenceMaxBytes)
require.Equal(t, 1, len(evList)) require.Equal(t, 1, len(evList))
@ -590,7 +600,7 @@ func makeCommit(height int64, valAddr []byte) *types.Commit {
return types.NewCommit(height, 0, types.BlockID{}, commitSigs) return types.NewCommit(height, 0, types.BlockID{}, commitSigs)
} }
func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence.Pool, types.MockPV) {
func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence.Pool, types.MockPV, *eventbus.EventBus) {
t.Helper() t.Helper()
val := types.NewMockPV() val := types.NewMockPV()
valAddress := val.PrivKey.PubKey().Address() valAddress := val.PrivKey.PubKey().Address()
@ -601,10 +611,14 @@ func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence
blockStore, err := initializeBlockStore(dbm.NewMemDB(), state, valAddress) blockStore, err := initializeBlockStore(dbm.NewMemDB(), state, valAddress)
require.NoError(t, err) require.NoError(t, err)
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err, "test evidence pool could not be created")
logger := log.NewNopLogger()
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
return pool, val
pool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus)
startPool(t, pool, stateStore)
return pool, val, eventBus
} }
func createState(height int64, valSet *types.ValidatorSet) sm.State { func createState(height int64, valSet *types.ValidatorSet) sm.State {
@ -616,12 +630,3 @@ func createState(height int64, valSet *types.ValidatorSet) sm.State {
ConsensusParams: *types.DefaultConsensusParams(), ConsensusParams: *types.DefaultConsensusParams(),
} }
} }
func setupEventBus(ctx context.Context, evpool *evidence.Pool) error {
eventBus := eventbus.NewDefault(log.TestingLogger())
if err := eventBus.Start(ctx); err != nil {
return err
}
evpool.SetEventBus(eventBus)
return nil
}

+ 5
- 4
internal/evidence/reactor_test.go View File

@ -82,13 +82,14 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint
} }
return nil return nil
}) })
rts.pools[nodeID], err = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore, evidence.NopMetrics())
require.NoError(t, err)
eventBus := eventbus.NewDefault(logger) eventBus := eventbus.NewDefault(logger)
err = eventBus.Start(ctx) err = eventBus.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
rts.pools[nodeID].SetEventBus(eventBus)
rts.pools[nodeID] = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore, evidence.NopMetrics(), eventBus)
startPool(t, rts.pools[nodeID], stateStores[idx])
require.NoError(t, err)
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)


+ 34
- 26
internal/evidence/verify_test.go View File

@ -12,6 +12,7 @@ import (
"github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/tmhash" "github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/evidence" "github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/evidence/mocks" "github.com/tendermint/tendermint/internal/evidence/mocks"
sm "github.com/tendermint/tendermint/internal/state" sm "github.com/tendermint/tendermint/internal/state"
@ -76,6 +77,7 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) {
) )
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
logger := log.NewNopLogger()
attackTime := defaultEvidenceTime.Add(1 * time.Hour) attackTime := defaultEvidenceTime.Add(1 * time.Hour)
// create valid lunatic evidence // create valid lunatic evidence
@ -96,8 +98,7 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) {
blockStore.On("LoadBlockMeta", height).Return(&types.BlockMeta{Header: *trusted.Header}) blockStore.On("LoadBlockMeta", height).Return(&types.BlockMeta{Header: *trusted.Header})
blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit) blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit)
blockStore.On("LoadBlockCommit", height).Return(trusted.Commit) blockStore.On("LoadBlockCommit", height).Return(trusted.Commit)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
pool := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil)
evList := types.EvidenceList{ev} evList := types.EvidenceList{ev}
// check that the evidence pool correctly verifies the evidence // check that the evidence pool correctly verifies the evidence
@ -111,32 +112,29 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) {
// if we submit evidence only against a single byzantine validator when we see there are more validators then this // if we submit evidence only against a single byzantine validator when we see there are more validators then this
// should return an error // should return an error
ev.ByzantineValidators = ev.ByzantineValidators[:1] ev.ByzantineValidators = ev.ByzantineValidators[:1]
t.Log(evList)
assert.Error(t, pool.CheckEvidence(ctx, evList)) assert.Error(t, pool.CheckEvidence(ctx, evList))
// restore original byz vals // restore original byz vals
ev.ByzantineValidators = ev.GetByzantineValidators(common.ValidatorSet, trusted.SignedHeader) ev.ByzantineValidators = ev.GetByzantineValidators(common.ValidatorSet, trusted.SignedHeader)
// duplicate evidence should be rejected // duplicate evidence should be rejected
evList = types.EvidenceList{ev, ev} evList = types.EvidenceList{ev, ev}
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil)
assert.Error(t, pool.CheckEvidence(ctx, evList)) assert.Error(t, pool.CheckEvidence(ctx, evList))
// If evidence is submitted with an altered timestamp it should return an error // If evidence is submitted with an altered timestamp it should return an error
ev.Timestamp = defaultEvidenceTime.Add(1 * time.Minute)
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
ev.Timestamp = defaultEvidenceTime.Add(1 * time.Minute)
pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus)
err = pool.AddEvidence(ctx, ev)
err := pool.AddEvidence(ctx, ev)
assert.Error(t, err) assert.Error(t, err)
ev.Timestamp = defaultEvidenceTime ev.Timestamp = defaultEvidenceTime
// Evidence submitted with a different validator power should fail // Evidence submitted with a different validator power should fail
ev.TotalVotingPower = 1 ev.TotalVotingPower = 1
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil)
err = pool.AddEvidence(ctx, ev) err = pool.AddEvidence(ctx, ev)
assert.Error(t, err) assert.Error(t, err)
ev.TotalVotingPower = common.ValidatorSet.TotalVotingPower() ev.TotalVotingPower = common.ValidatorSet.TotalVotingPower()
@ -154,6 +152,9 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
logger := log.NewNopLogger()
// create a forward lunatic attack // create a forward lunatic attack
ev, trusted, common := makeLunaticEvidence(ctx, ev, trusted, common := makeLunaticEvidence(ctx,
t, attackHeight, commonHeight, totalVals, byzVals, totalVals-byzVals, defaultEvidenceTime, attackTime) t, attackHeight, commonHeight, totalVals, byzVals, totalVals-byzVals, defaultEvidenceTime, attackTime)
@ -179,10 +180,11 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) {
blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit) blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit)
blockStore.On("LoadBlockCommit", nodeHeight).Return(trusted.Commit) blockStore.On("LoadBlockCommit", nodeHeight).Return(trusted.Commit)
blockStore.On("Height").Return(nodeHeight) blockStore.On("Height").Return(nodeHeight)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
require.NoError(t, setupEventBus(ctx, pool))
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus)
// check that the evidence pool correctly verifies the evidence // check that the evidence pool correctly verifies the evidence
assert.NoError(t, pool.CheckEvidence(ctx, types.EvidenceList{ev})) assert.NoError(t, pool.CheckEvidence(ctx, types.EvidenceList{ev}))
@ -199,8 +201,7 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) {
oldBlockStore.On("Height").Return(nodeHeight) oldBlockStore.On("Height").Return(nodeHeight)
require.Equal(t, defaultEvidenceTime, oldBlockStore.LoadBlockMeta(nodeHeight).Header.Time) require.Equal(t, defaultEvidenceTime, oldBlockStore.LoadBlockMeta(nodeHeight).Header.Time)
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, oldBlockStore, evidence.NopMetrics())
require.NoError(t, err)
pool = evidence.NewPool(logger, dbm.NewMemDB(), stateStore, oldBlockStore, evidence.NopMetrics(), nil)
assert.Error(t, pool.CheckEvidence(ctx, types.EvidenceList{ev})) assert.Error(t, pool.CheckEvidence(ctx, types.EvidenceList{ev}))
} }
@ -208,6 +209,8 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
logger := log.NewNopLogger()
conflictingVals, conflictingPrivVals := factory.ValidatorSet(ctx, t, 5, 10) conflictingVals, conflictingPrivVals := factory.ValidatorSet(ctx, t, 5, 10)
conflictingHeader := factory.MakeHeader(t, &types.Header{ conflictingHeader := factory.MakeHeader(t, &types.Header{
@ -289,10 +292,10 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) {
blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader}) blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader})
blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit) blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus)
evList := types.EvidenceList{ev} evList := types.EvidenceList{ev}
err = pool.CheckEvidence(ctx, evList) err = pool.CheckEvidence(ctx, evList)
@ -305,6 +308,9 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) {
func TestVerifyLightClientAttack_Amnesia(t *testing.T) { func TestVerifyLightClientAttack_Amnesia(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
logger := log.NewNopLogger()
var height int64 = 10 var height int64 = 10
conflictingVals, conflictingPrivVals := factory.ValidatorSet(ctx, t, 5, 10) conflictingVals, conflictingPrivVals := factory.ValidatorSet(ctx, t, 5, 10)
@ -378,10 +384,10 @@ func TestVerifyLightClientAttack_Amnesia(t *testing.T) {
blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader}) blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader})
blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit) blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus)
evList := types.EvidenceList{ev} evList := types.EvidenceList{ev}
err = pool.CheckEvidence(ctx, evList) err = pool.CheckEvidence(ctx, evList)
@ -401,6 +407,7 @@ func TestVerifyDuplicateVoteEvidence(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
logger := log.NewNopLogger()
val := types.NewMockPV() val := types.NewMockPV()
val2 := types.NewMockPV() val2 := types.NewMockPV()
valSet := types.NewValidatorSet([]*types.Validator{val.ExtractIntoValidator(ctx, 1)}) valSet := types.NewValidatorSet([]*types.Validator{val.ExtractIntoValidator(ctx, 1)})
@ -478,10 +485,11 @@ func TestVerifyDuplicateVoteEvidence(t *testing.T) {
blockStore := &mocks.BlockStore{} blockStore := &mocks.BlockStore{}
blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: types.Header{Time: defaultEvidenceTime}}) blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: types.Header{Time: defaultEvidenceTime}})
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
require.NoError(t, setupEventBus(ctx, pool))
pool := evidence.NewPool(logger, dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), eventBus)
startPool(t, pool, stateStore)
evList := types.EvidenceList{goodEv} evList := types.EvidenceList{goodEv}
err = pool.CheckEvidence(ctx, evList) err = pool.CheckEvidence(ctx, evList)


+ 1
- 1
internal/libs/queue/queue_test.go View File

@ -167,7 +167,7 @@ func TestWait(t *testing.T) {
defer close(done) defer close(done)
got, err := q.Wait(ctx) got, err := q.Wait(ctx)
if err != nil { if err != nil {
t.Errorf("Wait: unexpected error: %w", err)
t.Errorf("Wait: unexpected error: %v", err)
} else if got != input { } else if got != input {
t.Errorf("Wait: got %q, want %q", got, input) t.Errorf("Wait: got %q, want %q", got, input)
} }


+ 8
- 8
internal/p2p/conn/secret_connection_test.go View File

@ -126,7 +126,7 @@ func TestSecretConnectionReadWrite(t *testing.T) {
nodePrvKey := ed25519.GenPrivKey() nodePrvKey := ed25519.GenPrivKey()
nodeSecretConn, err := MakeSecretConnection(nodeConn, nodePrvKey) nodeSecretConn, err := MakeSecretConnection(nodeConn, nodePrvKey)
if err != nil { if err != nil {
t.Errorf("failed to establish SecretConnection for node: %w", err)
t.Errorf("failed to establish SecretConnection for node: %v", err)
return nil, true, err return nil, true, err
} }
// In parallel, handle some reads and writes. // In parallel, handle some reads and writes.
@ -136,7 +136,7 @@ func TestSecretConnectionReadWrite(t *testing.T) {
for _, nodeWrite := range nodeWrites { for _, nodeWrite := range nodeWrites {
n, err := nodeSecretConn.Write([]byte(nodeWrite)) n, err := nodeSecretConn.Write([]byte(nodeWrite))
if err != nil { if err != nil {
t.Errorf("failed to write to nodeSecretConn: %w", err)
t.Errorf("failed to write to nodeSecretConn: %v", err)
return nil, true, err return nil, true, err
} }
if n != len(nodeWrite) { if n != len(nodeWrite) {
@ -163,7 +163,7 @@ func TestSecretConnectionReadWrite(t *testing.T) {
} }
return nil, false, nil return nil, false, nil
} else if err != nil { } else if err != nil {
t.Errorf("failed to read from nodeSecretConn: %w", err)
t.Errorf("failed to read from nodeSecretConn: %v", err)
return nil, true, err return nil, true, err
} }
*nodeReads = append(*nodeReads, string(readBuffer[:n])) *nodeReads = append(*nodeReads, string(readBuffer[:n]))
@ -288,7 +288,7 @@ func writeLots(t *testing.T, wg *sync.WaitGroup, conn io.Writer, txt string, n i
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
_, err := conn.Write([]byte(txt)) _, err := conn.Write([]byte(txt))
if err != nil { if err != nil {
t.Errorf("failed to write to fooSecConn: %w", err)
t.Errorf("failed to write to fooSecConn: %v", err)
return return
} }
} }
@ -343,7 +343,7 @@ func makeSecretConnPair(tb testing.TB) (fooSecConn, barSecConn *SecretConnection
func(_ int) (val interface{}, abort bool, err error) { func(_ int) (val interface{}, abort bool, err error) {
fooSecConn, err = MakeSecretConnection(fooConn, fooPrvKey) fooSecConn, err = MakeSecretConnection(fooConn, fooPrvKey)
if err != nil { if err != nil {
tb.Errorf("failed to establish SecretConnection for foo: %w", err)
tb.Errorf("failed to establish SecretConnection for foo: %v", err)
return nil, true, err return nil, true, err
} }
remotePubBytes := fooSecConn.RemotePubKey() remotePubBytes := fooSecConn.RemotePubKey()
@ -358,7 +358,7 @@ func makeSecretConnPair(tb testing.TB) (fooSecConn, barSecConn *SecretConnection
func(_ int) (val interface{}, abort bool, err error) { func(_ int) (val interface{}, abort bool, err error) {
barSecConn, err = MakeSecretConnection(barConn, barPrvKey) barSecConn, err = MakeSecretConnection(barConn, barPrvKey)
if barSecConn == nil { if barSecConn == nil {
tb.Errorf("failed to establish SecretConnection for bar: %w", err)
tb.Errorf("failed to establish SecretConnection for bar: %v", err)
return nil, true, err return nil, true, err
} }
remotePubBytes := barSecConn.RemotePubKey() remotePubBytes := barSecConn.RemotePubKey()
@ -405,7 +405,7 @@ func BenchmarkWriteSecretConnection(b *testing.B) {
if err == io.EOF { if err == io.EOF {
return return
} else if err != nil { } else if err != nil {
b.Errorf("failed to read from barSecConn: %w", err)
b.Errorf("failed to read from barSecConn: %v", err)
return return
} }
} }
@ -416,7 +416,7 @@ func BenchmarkWriteSecretConnection(b *testing.B) {
idx := mrand.Intn(len(fooWriteBytes)) idx := mrand.Intn(len(fooWriteBytes))
_, err := fooSecConn.Write(fooWriteBytes[idx]) _, err := fooSecConn.Write(fooWriteBytes[idx])
if err != nil { if err != nil {
b.Errorf("failed to write to fooSecConn: %w", err)
b.Errorf("failed to write to fooSecConn: %v", err)
return return
} }
} }


+ 1
- 1
internal/pubsub/query/syntax/syntax_test.go View File

@ -55,7 +55,7 @@ func TestScanner(t *testing.T) {
got = append(got, s.Token()) got = append(got, s.Token())
} }
if err := s.Err(); err != io.EOF { if err := s.Err(); err != io.EOF {
t.Errorf("Next: unexpected error: %w", err)
t.Errorf("Next: unexpected error: %v", err)
} }
if !reflect.DeepEqual(got, test.want) { if !reflect.DeepEqual(got, test.want) {


+ 25
- 9
internal/statesync/reactor_test.go View File

@ -731,11 +731,15 @@ func handleLightBlockRequests(
if requests%10 >= failureRate { if requests%10 >= failureRate {
lb, err := chain[int64(msg.Height)].ToProto() lb, err := chain[int64(msg.Height)].ToProto()
require.NoError(t, err) require.NoError(t, err)
sending <- p2p.Envelope{
select {
case sending <- p2p.Envelope{
From: envelope.To, From: envelope.To,
Message: &ssproto.LightBlockResponse{ Message: &ssproto.LightBlockResponse{
LightBlock: lb, LightBlock: lb,
}, },
}:
case <-ctx.Done():
return
} }
} else { } else {
switch errorCount % 3 { switch errorCount % 3 {
@ -744,18 +748,26 @@ func handleLightBlockRequests(
_, _, lb := mockLB(ctx, t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID(), vals, pv) _, _, lb := mockLB(ctx, t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID(), vals, pv)
differntLB, err := lb.ToProto() differntLB, err := lb.ToProto()
require.NoError(t, err) require.NoError(t, err)
sending <- p2p.Envelope{
select {
case sending <- p2p.Envelope{
From: envelope.To, From: envelope.To,
Message: &ssproto.LightBlockResponse{ Message: &ssproto.LightBlockResponse{
LightBlock: differntLB, LightBlock: differntLB,
}, },
}:
case <-ctx.Done():
return
} }
case 1: // send nil block i.e. pretend we don't have it case 1: // send nil block i.e. pretend we don't have it
sending <- p2p.Envelope{
select {
case sending <- p2p.Envelope{
From: envelope.To, From: envelope.To,
Message: &ssproto.LightBlockResponse{ Message: &ssproto.LightBlockResponse{
LightBlock: nil, LightBlock: nil,
}, },
}:
case <-ctx.Done():
return
} }
case 2: // don't do anything case 2: // don't do anything
} }
@ -783,19 +795,23 @@ func handleConsensusParamsRequest(
case <-ctx.Done(): case <-ctx.Done():
return return
case envelope := <-receiving: case envelope := <-receiving:
if ctx.Err() != nil {
msg, ok := envelope.Message.(*ssproto.ParamsRequest)
if !ok {
t.Errorf("message was %T which is not a params request", envelope.Message)
return return
} }
t.Log("received consensus params request")
msg, ok := envelope.Message.(*ssproto.ParamsRequest)
require.True(t, ok)
sending <- p2p.Envelope{
select {
case sending <- p2p.Envelope{
From: envelope.To, From: envelope.To,
Message: &ssproto.ParamsResponse{ Message: &ssproto.ParamsResponse{
Height: msg.Height, Height: msg.Height,
ConsensusParams: paramsProto, ConsensusParams: paramsProto,
}, },
}:
case <-ctx.Done():
return
case <-closeCh:
return
} }
case <-closeCh: case <-closeCh:


+ 11
- 1
node/node.go View File

@ -62,7 +62,8 @@ type nodeImpl struct {
// services // services
eventSinks []indexer.EventSink eventSinks []indexer.EventSink
stateStore sm.Store stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
blockStore *store.BlockStore // store the blockchain to disk
evPool *evidence.Pool
stateSync bool // whether the node should state sync on startup stateSync bool // whether the node should state sync on startup
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
@ -388,6 +389,7 @@ func makeNode(
blockStore: blockStore, blockStore: blockStore,
stateSyncReactor: stateSyncReactor, stateSyncReactor: stateSyncReactor,
stateSync: stateSync, stateSync: stateSync,
evPool: evPool,
shutdownOps: makeCloser(closers), shutdownOps: makeCloser(closers),
@ -462,6 +464,14 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
} }
} }
state, err := n.stateStore.Load()
if err != nil {
return err
}
if err := n.evPool.Start(state); err != nil {
return err
}
n.rpcEnv.NodeInfo = n.nodeInfo n.rpcEnv.NodeInfo = n.nodeInfo
// Start the RPC server before the P2P server // Start the RPC server before the P2P server
// so we can eg. receive txs for the first block // so we can eg. receive txs for the first block


+ 1
- 2
node/node_test.go View File

@ -297,8 +297,7 @@ func TestCreateProposalBlock(t *testing.T) {
// Make EvidencePool // Make EvidencePool
evidenceDB := dbm.NewMemDB() evidenceDB := dbm.NewMemDB()
blockStore := store.NewBlockStore(dbm.NewMemDB()) blockStore := store.NewBlockStore(dbm.NewMemDB())
evidencePool, err := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
evidencePool := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics(), nil)
// fill the evidence pool with more evidence // fill the evidence pool with more evidence
// than can fit in a block // than can fit in a block


+ 1
- 6
node/setup.go View File

@ -228,12 +228,7 @@ func createEvidenceReactor(
logger = logger.With("module", "evidence") logger = logger.With("module", "evidence")
evidencePool, err := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics)
if err != nil {
return nil, nil, fmt.Errorf("creating evidence pool: %w", err)
}
evidencePool.SetEventBus(eventBus)
evidencePool := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics, eventBus)
evidenceReactor, err := evidence.NewReactor( evidenceReactor, err := evidence.NewReactor(
ctx, ctx,


+ 1
- 1
types/vote_set_test.go View File

@ -493,7 +493,7 @@ func TestVoteSet_MakeCommit(t *testing.T) {
// Ensure that Commit is good. // Ensure that Commit is good.
if err := commit.ValidateBasic(); err != nil { if err := commit.ValidateBasic(); err != nil {
t.Errorf("error in Commit.ValidateBasic(): %w", err)
t.Errorf("error in Commit.ValidateBasic(): %v", err)
} }
} }


Loading…
Cancel
Save