From e80541a2515f081a5053a03d65594fc77821ed9c Mon Sep 17 00:00:00 2001 From: Jasmina Malicevic Date: Mon, 21 Feb 2022 14:45:56 +0100 Subject: [PATCH] types/events+evidence: emit events + metrics on evidence validation (#7802) * event: Added Events after evidence validation; evidence: refactored AddEvidence Added context and Metrics as parameter for the pool constructor * evidence: pushed event firing into evidence pool and added metrics to represent the size of the evpool * state: fixed parameters of evpool mock functions * evidence: added test to confirm events are generated * Removed obsolete EvidenceEventPublisher interface * evidence: pool removed error on missing eventbus --- CHANGELOG_PENDING.md | 1 + docs/nodes/metrics.md | 1 + internal/consensus/byzantine_test.go | 3 +- internal/consensus/reactor_test.go | 5 +- internal/consensus/state.go | 6 +- internal/eventbus/event_bus.go | 4 + internal/eventbus/event_bus_test.go | 42 ++++++++ internal/evidence/metrics.go | 47 +++++++++ internal/evidence/pool.go | 54 +++++++--- internal/evidence/pool_test.go | 143 +++++++++++++++++++++----- internal/evidence/reactor.go | 15 +-- internal/evidence/reactor_test.go | 24 +++-- internal/evidence/verify.go | 7 +- internal/evidence/verify_test.go | 55 ++++++---- internal/rpc/core/evidence.go | 2 +- internal/state/execution.go | 8 +- internal/state/execution_test.go | 4 +- internal/state/helpers_test.go | 2 +- internal/state/mocks/evidence_pool.go | 29 +++--- internal/state/services.go | 16 +-- internal/state/validation_test.go | 14 +-- node/node.go | 6 +- node/node_test.go | 4 +- node/setup.go | 7 +- types/events.go | 15 +++ 25 files changed, 392 insertions(+), 122 deletions(-) create mode 100644 internal/evidence/metrics.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index dbac9f44b..c898a944b 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -63,6 +63,7 @@ Special thanks to external contributors on this release: - [internal/protoio] \#7325 Optimized `MarshalDelimited` by inlining the common case and using a `sync.Pool` in the worst case. (@odeke-em) - [consensus] \#6969 remove logic to 'unlock' a locked block. - [evidence] \#7700 Evidence messages contain single Evidence instead of EvidenceList (@jmalicevic) +- [evidence] \#7802 Evidence pool emits events when evidence is validated and updates a metric when the number of evidence in the evidence pool changes. (@jmalicevic) - [pubsub] \#7319 Performance improvements for the event query API (@creachadair) - [node] \#7521 Define concrete type for seed node implementation (@spacech1mp) - [rpc] \#7612 paginate mempool /unconfirmed_txs rpc endpoint (@spacech1mp) diff --git a/docs/nodes/metrics.md b/docs/nodes/metrics.md index 6589e044a..1b2e9f007 100644 --- a/docs/nodes/metrics.md +++ b/docs/nodes/metrics.md @@ -40,6 +40,7 @@ The following metrics are available: | consensus_fast_syncing | gauge | | either 0 (not fast syncing) or 1 (syncing) | | consensus_state_syncing | gauge | | either 0 (not state syncing) or 1 (syncing) | | consensus_block_size_bytes | Gauge | | Block size in bytes | +| evidence_pool_num_evidence | Gauge | | Number of evidence in the evidence pool | p2p_peers | Gauge | | Number of peers node's connected to | | p2p_peer_receive_bytes_total | counter | peer_id, chID | number of bytes per channel received from a given peer | | p2p_peer_send_bytes_total | counter | peer_id, chID | number of bytes per channel sent to a given peer | diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 24650a158..7a5260a64 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -90,7 +90,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // Make a full instance of the evidence pool evidenceDB := dbm.NewMemDB() - evpool, err := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore) + evpool, err := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) // Make State @@ -104,6 +104,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { err = eventBus.Start(ctx) require.NoError(t, err) cs.SetEventBus(eventBus) + evpool.SetEventBus(eventBus) cs.SetTimeoutTicker(tickerFunc()) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 377d943bf..cd6313650 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -427,14 +427,15 @@ func TestReactorWithEvidence(t *testing.T) { ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(ctx, 1, defaultTestTime, privVals[vIdx], cfg.ChainID()) require.NoError(t, err) evpool := &statemocks.EvidencePool{} - evpool.On("CheckEvidence", mock.AnythingOfType("types.EvidenceList")).Return(nil) + evpool.On("CheckEvidence", ctx, mock.AnythingOfType("types.EvidenceList")).Return(nil) evpool.On("PendingEvidence", mock.AnythingOfType("int64")).Return([]types.Evidence{ ev}, int64(len(ev.Bytes()))) - evpool.On("Update", mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return() + evpool.On("Update", ctx, mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return() evpool2 := sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) + cs := NewState(ctx, logger.With("validator", i, "module", "consensus"), thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2) cs.SetPrivValidator(ctx, pv) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index bfd588a44..12b023a1f 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -1432,7 +1432,7 @@ func (cs *State) defaultDoPrevote(ctx context.Context, height int64, round int32 } // Validate proposal block, from Tendermint's perspective - err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock) + err := cs.blockExec.ValidateBlock(ctx, cs.state, cs.ProposalBlock) if err != nil { // ProposalBlock is invalid, prevote nil. logger.Error("prevote step: consensus deems this block invalid; prevoting nil", @@ -1651,7 +1651,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32) logger.Debug("precommit step: +2/3 prevoted proposal block; locking", "hash", blockID.Hash) // Validate the block. - if err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock); err != nil { + if err := cs.blockExec.ValidateBlock(ctx, cs.state, cs.ProposalBlock); err != nil { panic(fmt.Sprintf("precommit step: +2/3 prevoted for an invalid block %v; relocking", err)) } @@ -1831,7 +1831,7 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) { panic("cannot finalize commit; proposal block does not hash to commit hash") } - if err := cs.blockExec.ValidateBlock(cs.state, block); err != nil { + if err := cs.blockExec.ValidateBlock(ctx, cs.state, block); err != nil { panic(fmt.Errorf("+2/3 committed an invalid block: %w", err)) } diff --git a/internal/eventbus/event_bus.go b/internal/eventbus/event_bus.go index 5f9ddb39f..2a7c032b3 100644 --- a/internal/eventbus/event_bus.go +++ b/internal/eventbus/event_bus.go @@ -198,6 +198,10 @@ func (b *EventBus) PublishEventValidatorSetUpdates(ctx context.Context, data typ return b.Publish(ctx, types.EventValidatorSetUpdatesValue, data) } +func (b *EventBus) PublishEventEvidenceValidated(ctx context.Context, evidence types.EventDataEvidenceValidated) error { + return b.Publish(ctx, types.EventEvidenceValidatedValue, evidence) +} + //----------------------------------------------------------------------------- // NopEventBus implements a types.BlockEventPublisher that discards all events. diff --git a/internal/eventbus/event_bus_test.go b/internal/eventbus/event_bus_test.go index 6e8c4e288..bc816aaca 100644 --- a/internal/eventbus/event_bus_test.go +++ b/internal/eventbus/event_bus_test.go @@ -293,6 +293,48 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) { } } +func TestEventBusPublishEventEvidenceValidated(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + eventBus := eventbus.NewDefault(log.TestingLogger()) + err := eventBus.Start(ctx) + require.NoError(t, err) + + ev, err := types.NewMockDuplicateVoteEvidence(ctx, 1, time.Now(), "test-chain-id") + require.NoError(t, err) + + const query = `tm.event='EvidenceValidated'` + evSub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ + ClientID: "test", + Query: tmquery.MustCompile(query), + }) + + require.NoError(t, err) + done := make(chan struct{}) + go func() { + defer close(done) + msg, err := evSub.Next(ctx) + assert.NoError(t, err) + + edt := msg.Data().(types.EventDataEvidenceValidated) + assert.Equal(t, ev, edt.Evidence) + assert.Equal(t, int64(1), edt.Height) + }() + + err = eventBus.PublishEventEvidenceValidated(ctx, types.EventDataEvidenceValidated{ + Evidence: ev, + Height: int64(1), + }) + assert.NoError(t, err) + + select { + case <-done: + case <-time.After(1 * time.Second): + t.Fatal("did not receive a block header after 1 sec.") + } + +} func TestEventBusPublishEventNewEvidence(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/evidence/metrics.go b/internal/evidence/metrics.go new file mode 100644 index 000000000..59efc23f9 --- /dev/null +++ b/internal/evidence/metrics.go @@ -0,0 +1,47 @@ +package evidence + +import ( + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/discard" + "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this + // package. + MetricsSubsystem = "evidence_pool" +) + +// Metrics contains metrics exposed by this package. +// see MetricsProvider for descriptions. +type Metrics struct { + // Number of evidence in the evidence pool + NumEvidence metrics.Gauge +} + +// PrometheusMetrics returns Metrics build using Prometheus client library. +// Optionally, labels can be provided along with their values ("foo", +// "fooValue"). +func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + + NumEvidence: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "num_evidence", + Help: "Number of pending evidence in evidence pool.", + }, labels).With(labelsAndValues...), + } +} + +// NopMetrics returns no-op Metrics. +func NopMetrics() *Metrics { + return &Metrics{ + NumEvidence: discard.NewGauge(), + } +} diff --git a/internal/evidence/pool.go b/internal/evidence/pool.go index a82c714ff..f4afb1f8c 100644 --- a/internal/evidence/pool.go +++ b/internal/evidence/pool.go @@ -2,6 +2,7 @@ package evidence import ( "bytes" + "context" "errors" "fmt" "sync" @@ -13,6 +14,7 @@ import ( "github.com/google/orderedcode" dbm "github.com/tendermint/tm-db" + "github.com/tendermint/tendermint/internal/eventbus" clist "github.com/tendermint/tendermint/internal/libs/clist" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/libs/log" @@ -49,11 +51,22 @@ type Pool struct { pruningHeight int64 pruningTime time.Time + + // Eventbus to emit events when evidence is validated + // Not part of the constructor, use SetEventBus to set it + // The eventBus must be started in order for event publishing not to block + eventBus *eventbus.EventBus + + Metrics *Metrics +} + +func (evpool *Pool) SetEventBus(e *eventbus.EventBus) { + evpool.eventBus = e } // NewPool creates an evidence pool. If using an existing evidence store, // it will add all pending evidence to the concurrent list. -func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore) (*Pool, error) { +func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore, metrics *Metrics) (*Pool, error) { state, err := stateDB.Load() if err != nil { return nil, fmt.Errorf("failed to load state: %w", err) @@ -67,6 +80,7 @@ func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore evidenceStore: evidenceDB, evidenceList: clist.New(), consensusBuffer: make([]duplicateVoteSet, 0), + Metrics: metrics, } // If pending evidence already in db, in event of prior failure, then check @@ -78,10 +92,12 @@ func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore } atomic.StoreUint32(&pool.evidenceSize, uint32(len(evList))) + pool.Metrics.NumEvidence.Set(float64(pool.evidenceSize)) for _, ev := range evList { pool.evidenceList.PushBack(ev) } + pool.eventBus = nil return pool, nil } @@ -108,7 +124,7 @@ func (evpool *Pool) PendingEvidence(maxBytes int64) ([]types.Evidence, int64) { // 2. Update the pool's state which contains evidence params relating to expiry. // 3. Moves pending evidence that has now been committed into the committed pool. // 4. Removes any expired evidence based on both height and time. -func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) { +func (evpool *Pool) Update(ctx context.Context, state sm.State, ev types.EvidenceList) { // sanity check if state.LastBlockHeight <= evpool.state.LastBlockHeight { panic(fmt.Sprintf( @@ -126,7 +142,7 @@ func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) { // flush conflicting vote pairs from the buffer, producing DuplicateVoteEvidence and // adding it to the pool - evpool.processConsensusBuffer(state) + evpool.processConsensusBuffer(ctx, state) // update state evpool.updateState(state) @@ -142,7 +158,7 @@ func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) { } // AddEvidence checks the evidence is valid and adds it to the pool. -func (evpool *Pool) AddEvidence(ev types.Evidence) error { +func (evpool *Pool) AddEvidence(ctx context.Context, ev types.Evidence) error { evpool.logger.Debug("attempting to add evidence", "evidence", ev) // We have already verified this piece of evidence - no need to do it again @@ -160,12 +176,12 @@ func (evpool *Pool) AddEvidence(ev types.Evidence) error { } // 1) Verify against state. - if err := evpool.verify(ev); err != nil { + if err := evpool.verify(ctx, ev); err != nil { return err } // 2) Save to store. - if err := evpool.addPendingEvidence(ev); err != nil { + if err := evpool.addPendingEvidence(ctx, ev); err != nil { return fmt.Errorf("failed to add evidence to pending list: %w", err) } @@ -198,7 +214,7 @@ func (evpool *Pool) ReportConflictingVotes(voteA, voteB *types.Vote) { // If it has already verified the evidence then it jumps to the next one. It ensures that no // evidence has already been committed or is being proposed twice. It also adds any // evidence that it doesn't currently have so that it can quickly form ABCI Evidence later. -func (evpool *Pool) CheckEvidence(evList types.EvidenceList) error { +func (evpool *Pool) CheckEvidence(ctx context.Context, evList types.EvidenceList) error { hashes := make([][]byte, len(evList)) for idx, ev := range evList { @@ -212,12 +228,12 @@ func (evpool *Pool) CheckEvidence(evList types.EvidenceList) error { return &types.ErrInvalidEvidence{Evidence: ev, Reason: errors.New("evidence was already committed")} } - err := evpool.verify(ev) + err := evpool.verify(ctx, ev) if err != nil { return err } - if err := evpool.addPendingEvidence(ev); err != nil { + if err := evpool.addPendingEvidence(ctx, ev); err != nil { // Something went wrong with adding the evidence but we already know it is valid // hence we log an error and continue evpool.logger.Error("failed to add evidence to pending list", "err", err, "evidence", ev) @@ -297,7 +313,7 @@ func (evpool *Pool) isPending(evidence types.Evidence) bool { return ok } -func (evpool *Pool) addPendingEvidence(ev types.Evidence) error { +func (evpool *Pool) addPendingEvidence(ctx context.Context, ev types.Evidence) error { evpb, err := types.EvidenceToProto(ev) if err != nil { return fmt.Errorf("failed to convert to proto: %w", err) @@ -316,7 +332,18 @@ func (evpool *Pool) addPendingEvidence(ev types.Evidence) error { } atomic.AddUint32(&evpool.evidenceSize, 1) - return nil + evpool.Metrics.NumEvidence.Set(float64(evpool.evidenceSize)) + + // This should normally never be true + if evpool.eventBus == nil { + evpool.logger.Debug("event bus is not configured") + return nil + + } + return evpool.eventBus.PublishEventEvidenceValidated(ctx, types.EventDataEvidenceValidated{ + Evidence: ev, + Height: ev.Height(), + }) } // markEvidenceAsCommitted processes all the evidence in the block, marking it as @@ -368,6 +395,7 @@ func (evpool *Pool) markEvidenceAsCommitted(evidence types.EvidenceList, height // update the evidence size atomic.AddUint32(&evpool.evidenceSize, ^uint32(len(blockEvidenceMap)-1)) + evpool.Metrics.NumEvidence.Set(float64(evpool.evidenceSize)) } // listEvidence retrieves lists evidence from oldest to newest within maxBytes. @@ -513,7 +541,7 @@ func (evpool *Pool) updateState(state sm.State) { // into DuplicateVoteEvidence. It sets the evidence timestamp to the block height // from the most recently committed block. // Evidence is then added to the pool so as to be ready to be broadcasted and proposed. -func (evpool *Pool) processConsensusBuffer(state sm.State) { +func (evpool *Pool) processConsensusBuffer(ctx context.Context, state sm.State) { evpool.mtx.Lock() defer evpool.mtx.Unlock() for _, voteSet := range evpool.consensusBuffer { @@ -578,7 +606,7 @@ func (evpool *Pool) processConsensusBuffer(state sm.State) { continue } - if err := evpool.addPendingEvidence(dve); err != nil { + if err := evpool.addPendingEvidence(ctx, dve); err != nil { evpool.logger.Error("failed to flush evidence from consensus buffer to pending list: %w", err) continue } diff --git a/internal/evidence/pool_test.go b/internal/evidence/pool_test.go index 9f0c99435..51f785221 100644 --- a/internal/evidence/pool_test.go +++ b/internal/evidence/pool_test.go @@ -11,6 +11,7 @@ import ( dbm "github.com/tendermint/tm-db" + "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/evidence" "github.com/tendermint/tendermint/internal/evidence/mocks" sm "github.com/tendermint/tendermint/internal/state" @@ -21,6 +22,9 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" + + tmpubsub "github.com/tendermint/tendermint/internal/pubsub" + tmquery "github.com/tendermint/tendermint/internal/pubsub/query" ) const evidenceChainID = "test_chain" @@ -40,7 +44,6 @@ func TestEvidencePoolBasic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - valSet, privVals := factory.ValidatorSet(ctx, t, 1, 10) blockStore.On("LoadBlockMeta", mock.AnythingOfType("int64")).Return( &types.BlockMeta{Header: types.Header{Time: defaultEvidenceTime}}, @@ -48,9 +51,9 @@ func TestEvidencePoolBasic(t *testing.T) { stateStore.On("LoadValidators", mock.AnythingOfType("int64")).Return(valSet, nil) stateStore.On("Load").Return(createState(height+1, valSet), nil) - pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) - + require.NoError(t, setupEventBus(ctx, pool)) // evidence not seen yet: evs, size := pool.PendingEvidence(defaultEvidenceMaxBytes) require.Equal(t, 0, len(evs)) @@ -66,7 +69,8 @@ func TestEvidencePoolBasic(t *testing.T) { }() // evidence seen but not yet committed: - require.NoError(t, pool.AddEvidence(ev)) + err = pool.AddEvidence(ctx, ev) + require.NoError(t, err) select { case <-evAdded: @@ -83,7 +87,8 @@ func TestEvidencePoolBasic(t *testing.T) { require.Equal(t, evidenceBytes, size) // check that the size of the single evidence in bytes is correct // shouldn't be able to add evidence twice - require.NoError(t, pool.AddEvidence(ev)) + err = pool.AddEvidence(ctx, ev) + require.NoError(t, err) evs, _ = pool.PendingEvidence(defaultEvidenceMaxBytes) require.Equal(t, 1, len(evs)) } @@ -110,9 +115,11 @@ func TestAddExpiredEvidence(t *testing.T) { return &types.BlockMeta{Header: types.Header{Time: expiredEvidenceTime}} }) - pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) + require.NoError(t, setupEventBus(ctx, pool)) + testCases := []struct { evHeight int64 evTime time.Time @@ -136,7 +143,7 @@ func TestAddExpiredEvidence(t *testing.T) { ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(ctx, tc.evHeight, tc.evTime, val, evidenceChainID) require.NoError(t, err) - err = pool.AddEvidence(ev) + err = pool.AddEvidence(ctx, ev) if tc.expErr { require.Error(t, err) } else { @@ -153,6 +160,9 @@ func TestReportConflictingVotes(t *testing.T) { defer cancel() pool, pv := defaultTestPool(ctx, t, height) + + require.NoError(t, setupEventBus(ctx, pool)) + val := types.NewValidator(pv.PrivKey.PubKey(), 10) ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(ctx, height+1, defaultEvidenceTime, pv, evidenceChainID) @@ -176,7 +186,7 @@ func TestReportConflictingVotes(t *testing.T) { state.LastBlockHeight++ state.LastBlockTime = ev.Time() state.LastValidators = types.NewValidatorSet([]*types.Validator{val}) - pool.Update(state, []types.Evidence{}) + pool.Update(ctx, state, []types.Evidence{}) // should be able to retrieve evidence from pool evList, _ = pool.PendingEvidence(defaultEvidenceMaxBytes) @@ -192,6 +202,9 @@ func TestEvidencePoolUpdate(t *testing.T) { defer cancel() pool, val := defaultTestPool(ctx, t, height) + + require.NoError(t, setupEventBus(ctx, pool)) + state := pool.State() // create two lots of old evidence that we expect to be pruned when we update @@ -211,8 +224,8 @@ func TestEvidencePoolUpdate(t *testing.T) { ) require.NoError(t, err) - require.NoError(t, pool.AddEvidence(prunedEv)) - require.NoError(t, pool.AddEvidence(notPrunedEv)) + require.NoError(t, pool.AddEvidence(ctx, prunedEv)) + require.NoError(t, pool.AddEvidence(ctx, notPrunedEv)) ev, err := types.NewMockDuplicateVoteEvidenceWithValidator( ctx, @@ -234,21 +247,21 @@ func TestEvidencePoolUpdate(t *testing.T) { require.Equal(t, uint32(2), pool.Size()) - require.NoError(t, pool.CheckEvidence(types.EvidenceList{ev})) + require.NoError(t, pool.CheckEvidence(ctx, types.EvidenceList{ev})) evList, _ = pool.PendingEvidence(3 * defaultEvidenceMaxBytes) require.Equal(t, 3, len(evList)) require.Equal(t, uint32(3), pool.Size()) - pool.Update(state, block.Evidence) + pool.Update(ctx, state, block.Evidence) // a) Update marks evidence as committed so pending evidence should be empty evList, _ = pool.PendingEvidence(defaultEvidenceMaxBytes) require.Equal(t, []types.Evidence{notPrunedEv}, evList) // b) If we try to check this evidence again it should fail because it has already been committed - err = pool.CheckEvidence(types.EvidenceList{ev}) + err = pool.CheckEvidence(ctx, types.EvidenceList{ev}) if assert.Error(t, err) { assert.Equal(t, "evidence was already committed", err.(*types.ErrInvalidEvidence).Reason.Error()) } @@ -261,6 +274,9 @@ func TestVerifyPendingEvidencePasses(t *testing.T) { defer cancel() pool, val := defaultTestPool(ctx, t, height) + + require.NoError(t, setupEventBus(ctx, pool)) + ev, err := types.NewMockDuplicateVoteEvidenceWithValidator( ctx, height, @@ -269,8 +285,8 @@ func TestVerifyPendingEvidencePasses(t *testing.T) { evidenceChainID, ) require.NoError(t, err) - require.NoError(t, pool.AddEvidence(ev)) - require.NoError(t, pool.CheckEvidence(types.EvidenceList{ev})) + require.NoError(t, pool.AddEvidence(ctx, ev)) + require.NoError(t, pool.CheckEvidence(ctx, types.EvidenceList{ev})) } func TestVerifyDuplicatedEvidenceFails(t *testing.T) { @@ -281,6 +297,8 @@ func TestVerifyDuplicatedEvidenceFails(t *testing.T) { pool, val := defaultTestPool(ctx, t, height) + require.NoError(t, setupEventBus(ctx, pool)) + ev, err := types.NewMockDuplicateVoteEvidenceWithValidator( ctx, height, @@ -290,12 +308,62 @@ func TestVerifyDuplicatedEvidenceFails(t *testing.T) { ) require.NoError(t, err) - err = pool.CheckEvidence(types.EvidenceList{ev, ev}) + err = pool.CheckEvidence(ctx, types.EvidenceList{ev, ev}) if assert.Error(t, err) { assert.Equal(t, "duplicate evidence", err.(*types.ErrInvalidEvidence).Reason.Error()) } } +// Check that we generate events when evidence is added into the evidence pool +func TestEventOnEvidenceValidated(t *testing.T) { + const height = 1 + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pool, val := defaultTestPool(ctx, t, height) + + ev, err := types.NewMockDuplicateVoteEvidenceWithValidator( + ctx, + height, + defaultEvidenceTime.Add(1*time.Minute), + val, + evidenceChainID, + ) + require.NoError(t, err) + + eventBus := eventbus.NewDefault(log.TestingLogger()) + require.NoError(t, eventBus.Start(ctx)) + + pool.SetEventBus(eventBus) + + const query = `tm.event='EvidenceValidated'` + evSub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ + ClientID: "test", + Query: tmquery.MustCompile(query), + }) + require.NoError(t, err) + + done := make(chan struct{}) + go func() { + defer close(done) + msg, err := evSub.Next(ctx) + assert.NoError(t, err) + + edt := msg.Data().(types.EventDataEvidenceValidated) + assert.Equal(t, ev, edt.Evidence) + }() + err = pool.AddEvidence(ctx, ev) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(1 * time.Second): + t.Fatal("did not receive a block header after 1 sec.") + } + +} + // check that valid light client evidence is correctly validated and stored in // evidence pool func TestLightClientAttackEvidenceLifecycle(t *testing.T) { @@ -326,32 +394,38 @@ func TestLightClientAttackEvidenceLifecycle(t *testing.T) { blockStore.On("LoadBlockCommit", height).Return(trusted.Commit) blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) + require.NoError(t, setupEventBus(ctx, pool)) + hash := ev.Hash() - require.NoError(t, pool.AddEvidence(ev)) - require.NoError(t, pool.AddEvidence(ev)) + err = pool.AddEvidence(ctx, ev) + require.NoError(t, err) + err = pool.AddEvidence(ctx, ev) + require.NoError(t, err) pendingEv, _ := pool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes) require.Equal(t, 1, len(pendingEv)) require.Equal(t, ev, pendingEv[0]) - require.NoError(t, pool.CheckEvidence(pendingEv)) + require.NoError(t, pool.CheckEvidence(ctx, pendingEv)) require.Equal(t, ev, pendingEv[0]) state.LastBlockHeight++ state.LastBlockTime = state.LastBlockTime.Add(1 * time.Minute) - pool.Update(state, pendingEv) + pool.Update(ctx, state, pendingEv) require.Equal(t, hash, pendingEv[0].Hash()) remaindingEv, _ := pool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes) require.Empty(t, remaindingEv) // evidence is already committed so it shouldn't pass - require.Error(t, pool.CheckEvidence(types.EvidenceList{ev})) - require.NoError(t, pool.AddEvidence(ev)) + require.Error(t, pool.CheckEvidence(ctx, types.EvidenceList{ev})) + + err = pool.AddEvidence(ctx, ev) + require.NoError(t, err) remaindingEv, _ = pool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes) require.Empty(t, remaindingEv) @@ -376,9 +450,11 @@ func TestRecoverPendingEvidence(t *testing.T) { require.NoError(t, err) // create previous pool and populate it - pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) + require.NoError(t, setupEventBus(ctx, pool)) + goodEvidence, err := types.NewMockDuplicateVoteEvidenceWithValidator( ctx, height, @@ -396,8 +472,10 @@ func TestRecoverPendingEvidence(t *testing.T) { ) require.NoError(t, err) - require.NoError(t, pool.AddEvidence(goodEvidence)) - require.NoError(t, pool.AddEvidence(expiredEvidence)) + err = pool.AddEvidence(ctx, goodEvidence) + require.NoError(t, err) + err = pool.AddEvidence(ctx, expiredEvidence) + require.NoError(t, err) // now recover from the previous pool at a different time newStateStore := &smmocks.Store{} @@ -417,7 +495,7 @@ func TestRecoverPendingEvidence(t *testing.T) { }, }, nil) - newPool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, newStateStore, blockStore) + newPool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, newStateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) evList, _ := newPool.PendingEvidence(defaultEvidenceMaxBytes) @@ -523,7 +601,7 @@ func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence blockStore, err := initializeBlockStore(dbm.NewMemDB(), state, valAddress) require.NoError(t, err) - pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err, "test evidence pool could not be created") return pool, val @@ -538,3 +616,12 @@ func createState(height int64, valSet *types.ValidatorSet) sm.State { ConsensusParams: *types.DefaultConsensusParams(), } } + +func setupEventBus(ctx context.Context, evpool *evidence.Pool) error { + eventBus := eventbus.NewDefault(log.TestingLogger()) + if err := eventBus.Start(ctx); err != nil { + return err + } + evpool.SetEventBus(eventBus) + return nil +} diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 62479874f..1011f732f 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -50,7 +50,8 @@ type Reactor struct { evidenceCh *p2p.Channel peerUpdates *p2p.PeerUpdates - mtx sync.Mutex + mtx sync.Mutex + peerRoutines map[types.NodeID]context.CancelFunc } @@ -78,6 +79,7 @@ func NewReactor( } r.BaseService = *service.NewBaseService(logger, "Evidence", r) + return r, err } @@ -103,7 +105,7 @@ func (r *Reactor) OnStop() { // It returns an error only if the Envelope.Message is unknown for this channel // or if the given evidence is invalid. This should never be called outside of // handleMessage. -func (r *Reactor) handleEvidenceMessage(envelope *p2p.Envelope) error { +func (r *Reactor) handleEvidenceMessage(ctx context.Context, envelope *p2p.Envelope) error { logger := r.logger.With("peer", envelope.From) switch msg := envelope.Message.(type) { @@ -115,12 +117,13 @@ func (r *Reactor) handleEvidenceMessage(envelope *p2p.Envelope) error { logger.Error("failed to convert evidence", "err", err) return err } - if err := r.evpool.AddEvidence(ev); err != nil { + if err := r.evpool.AddEvidence(ctx, ev); err != nil { // If we're given invalid evidence by the peer, notify the router that // we should remove this peer by returning an error. if _, ok := err.(*types.ErrInvalidEvidence); ok { return err } + } default: @@ -133,7 +136,7 @@ func (r *Reactor) handleEvidenceMessage(envelope *p2p.Envelope) error { // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope *p2p.Envelope) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -149,7 +152,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope *p2p.Envelope) (err switch chID { case EvidenceChannel: - err = r.handleEvidenceMessage(envelope) + err = r.handleEvidenceMessage(ctx, envelope) default: err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) @@ -164,7 +167,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context) { iter := r.evidenceCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(r.evidenceCh.ID, envelope); err != nil { + if err := r.handleMessage(ctx, r.evidenceCh.ID, envelope); err != nil { r.logger.Error("failed to process message", "ch_id", r.evidenceCh.ID, "envelope", envelope, "err", err) if serr := r.evidenceCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index 7d9c812c6..d0863acc1 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -17,6 +17,7 @@ import ( "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/tmhash" + "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/evidence" "github.com/tendermint/tendermint/internal/evidence/mocks" "github.com/tendermint/tendermint/internal/p2p" @@ -69,6 +70,7 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint idx := 0 evidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC) + for nodeID := range rts.network.Nodes { logger := rts.logger.With("validator", idx) evidenceDB := dbm.NewMemDB() @@ -80,9 +82,13 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint } return nil }) - rts.pools[nodeID], err = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore) + rts.pools[nodeID], err = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore, evidence.NopMetrics()) require.NoError(t, err) + eventBus := eventbus.NewDefault(logger) + err = eventBus.Start(ctx) + require.NoError(t, err) + rts.pools[nodeID].SetEventBus(eventBus) rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) @@ -219,7 +225,8 @@ func createEvidenceList( evidenceChainID, ) require.NoError(t, err) - require.NoError(t, pool.AddEvidence(ev), + err = pool.AddEvidence(ctx, ev) + require.NoError(t, err, "adding evidence it#%d of %d to pool with height %d", i, numEvidence, pool.State().LastBlockHeight) evList[i] = ev @@ -284,12 +291,14 @@ func TestReactorBroadcastEvidence(t *testing.T) { } rts := setup(ctx, t, stateDBs, 0) + rts.start(ctx, t) // Create a series of fixtures where each suite contains a reactor and // evidence pool. In addition, we mark a primary suite and the rest are // secondaries where each secondary is added as a peer via a PeerUpdate to the // primary. As a result, the primary will gossip all evidence to each secondary. + primary := rts.network.RandomNode() secondaries := make([]*p2ptest.Node, 0, len(rts.network.NodeIDs())-1) secondaryIDs := make([]types.NodeID, 0, cap(secondaries)) @@ -320,6 +329,7 @@ func TestReactorBroadcastEvidence(t *testing.T) { for _, pool := range rts.pools { require.Equal(t, numEvidence, int(pool.Size())) } + } // TestReactorSelectiveBroadcast tests a context where we have two reactors @@ -381,7 +391,8 @@ func TestReactorBroadcastEvidence_Pending(t *testing.T) { // Manually add half the evidence to the secondary which will mark them as // pending. for i := 0; i < numEvidence/2; i++ { - require.NoError(t, rts.pools[secondary.NodeID].AddEvidence(evList[i])) + err := rts.pools[secondary.NodeID].AddEvidence(ctx, evList[i]) + require.NoError(t, err) } // the secondary should have half the evidence as pending @@ -423,7 +434,8 @@ func TestReactorBroadcastEvidence_Committed(t *testing.T) { // Manually add half the evidence to the secondary which will mark them as // pending. for i := 0; i < numEvidence/2; i++ { - require.NoError(t, rts.pools[secondary.NodeID].AddEvidence(evList[i])) + err := rts.pools[secondary.NodeID].AddEvidence(ctx, evList[i]) + require.NoError(t, err) } // the secondary should have half the evidence as pending @@ -434,7 +446,7 @@ func TestReactorBroadcastEvidence_Committed(t *testing.T) { // update the secondary's pool such that all pending evidence is committed state.LastBlockHeight++ - rts.pools[secondary.NodeID].Update(state, evList[:numEvidence/2]) + rts.pools[secondary.NodeID].Update(ctx, state, evList[:numEvidence/2]) // the secondary should have half the evidence as committed require.Equal(t, 0, int(rts.pools[secondary.NodeID].Size())) @@ -496,7 +508,7 @@ func TestReactorBroadcastEvidence_FullyConnected(t *testing.T) { // commit state so we do not continue to repeat gossiping the same evidence state := pool.State() state.LastBlockHeight++ - pool.Update(state, evList) + pool.Update(ctx, state, evList) } } diff --git a/internal/evidence/verify.go b/internal/evidence/verify.go index 99c8e28e8..d5e1a8faf 100644 --- a/internal/evidence/verify.go +++ b/internal/evidence/verify.go @@ -2,6 +2,7 @@ package evidence import ( "bytes" + "context" "errors" "fmt" "time" @@ -21,7 +22,7 @@ import ( // set for. In these cases, we do not return a ErrInvalidEvidence as not to have // the sending peer disconnect. All other errors are treated as invalid evidence // (i.e. ErrInvalidEvidence). -func (evpool *Pool) verify(evidence types.Evidence) error { +func (evpool *Pool) verify(ctx context.Context, evidence types.Evidence) error { var ( state = evpool.State() height = state.LastBlockHeight @@ -74,7 +75,7 @@ func (evpool *Pool) verify(evidence types.Evidence) error { if err := ev.ValidateABCI(val, valSet, evTime); err != nil { ev.GenerateABCI(val, valSet, evTime) - if addErr := evpool.addPendingEvidence(ev); addErr != nil { + if addErr := evpool.addPendingEvidence(ctx, ev); addErr != nil { evpool.logger.Error("adding pending duplicate vote evidence failed", "err", addErr) } return err @@ -134,7 +135,7 @@ func (evpool *Pool) verify(evidence types.Evidence) error { // evidence and return an error if err := ev.ValidateABCI(commonVals, trustedHeader, evTime); err != nil { ev.GenerateABCI(commonVals, trustedHeader, evTime) - if addErr := evpool.addPendingEvidence(ev); addErr != nil { + if addErr := evpool.addPendingEvidence(ctx, ev); addErr != nil { evpool.logger.Error("adding pending light client attack evidence failed", "err", addErr) } return err diff --git a/internal/evidence/verify_test.go b/internal/evidence/verify_test.go index bcc008aee..607c8fd50 100644 --- a/internal/evidence/verify_test.go +++ b/internal/evidence/verify_test.go @@ -96,12 +96,12 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) { blockStore.On("LoadBlockMeta", height).Return(&types.BlockMeta{Header: *trusted.Header}) blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit) blockStore.On("LoadBlockCommit", height).Return(trusted.Commit) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) evList := types.EvidenceList{ev} // check that the evidence pool correctly verifies the evidence - assert.NoError(t, pool.CheckEvidence(evList)) + assert.NoError(t, pool.CheckEvidence(ctx, evList)) // as it was not originally in the pending bucket, it should now have been added pendingEvs, _ := pool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes) @@ -112,28 +112,33 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) { // should return an error ev.ByzantineValidators = ev.ByzantineValidators[:1] t.Log(evList) - assert.Error(t, pool.CheckEvidence(evList)) + assert.Error(t, pool.CheckEvidence(ctx, evList)) // restore original byz vals ev.ByzantineValidators = ev.GetByzantineValidators(common.ValidatorSet, trusted.SignedHeader) // duplicate evidence should be rejected evList = types.EvidenceList{ev, ev} - pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) + pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) - assert.Error(t, pool.CheckEvidence(evList)) + assert.Error(t, pool.CheckEvidence(ctx, evList)) // If evidence is submitted with an altered timestamp it should return an error ev.Timestamp = defaultEvidenceTime.Add(1 * time.Minute) - pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) + pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) - assert.Error(t, pool.AddEvidence(ev)) + + require.NoError(t, setupEventBus(ctx, pool)) + + err = pool.AddEvidence(ctx, ev) + assert.Error(t, err) ev.Timestamp = defaultEvidenceTime // Evidence submitted with a different validator power should fail ev.TotalVotingPower = 1 - pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) + pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) - assert.Error(t, pool.AddEvidence(ev)) + err = pool.AddEvidence(ctx, ev) + assert.Error(t, err) ev.TotalVotingPower = common.ValidatorSet.TotalVotingPower() } @@ -174,11 +179,13 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) { blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit) blockStore.On("LoadBlockCommit", nodeHeight).Return(trusted.Commit) blockStore.On("Height").Return(nodeHeight) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) + require.NoError(t, setupEventBus(ctx, pool)) + // check that the evidence pool correctly verifies the evidence - assert.NoError(t, pool.CheckEvidence(types.EvidenceList{ev})) + assert.NoError(t, pool.CheckEvidence(ctx, types.EvidenceList{ev})) // now we use a time which isn't able to contradict the FLA - thus we can't verify the evidence oldBlockStore := &mocks.BlockStore{} @@ -192,9 +199,9 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) { oldBlockStore.On("Height").Return(nodeHeight) require.Equal(t, defaultEvidenceTime, oldBlockStore.LoadBlockMeta(nodeHeight).Header.Time) - pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, oldBlockStore) + pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, oldBlockStore, evidence.NopMetrics()) require.NoError(t, err) - assert.Error(t, pool.CheckEvidence(types.EvidenceList{ev})) + assert.Error(t, pool.CheckEvidence(ctx, types.EvidenceList{ev})) } func TestVerifyLightClientAttack_Equivocation(t *testing.T) { @@ -282,11 +289,13 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) { blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader}) blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) + require.NoError(t, setupEventBus(ctx, pool)) + evList := types.EvidenceList{ev} - err = pool.CheckEvidence(evList) + err = pool.CheckEvidence(ctx, evList) assert.NoError(t, err) pendingEvs, _ := pool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes) @@ -369,11 +378,13 @@ func TestVerifyLightClientAttack_Amnesia(t *testing.T) { blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader}) blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) + require.NoError(t, setupEventBus(ctx, pool)) + evList := types.EvidenceList{ev} - err = pool.CheckEvidence(evList) + err = pool.CheckEvidence(ctx, evList) assert.NoError(t, err) pendingEvs, _ := pool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes) @@ -467,21 +478,23 @@ func TestVerifyDuplicateVoteEvidence(t *testing.T) { blockStore := &mocks.BlockStore{} blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: types.Header{Time: defaultEvidenceTime}}) - pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) + require.NoError(t, setupEventBus(ctx, pool)) + evList := types.EvidenceList{goodEv} - err = pool.CheckEvidence(evList) + err = pool.CheckEvidence(ctx, evList) assert.NoError(t, err) // evidence with a different validator power should fail evList = types.EvidenceList{badEv} - err = pool.CheckEvidence(evList) + err = pool.CheckEvidence(ctx, evList) assert.Error(t, err) // evidence with a different timestamp should fail evList = types.EvidenceList{badTimeEv} - err = pool.CheckEvidence(evList) + err = pool.CheckEvidence(ctx, evList) assert.Error(t, err) } diff --git a/internal/rpc/core/evidence.go b/internal/rpc/core/evidence.go index e97024b4c..c7e2bea8a 100644 --- a/internal/rpc/core/evidence.go +++ b/internal/rpc/core/evidence.go @@ -19,7 +19,7 @@ func (env *Environment) BroadcastEvidence( if err := ev.Value.ValidateBasic(); err != nil { return nil, fmt.Errorf("evidence.ValidateBasic failed: %w", err) } - if err := env.EvidencePool.AddEvidence(ev.Value); err != nil { + if err := env.EvidencePool.AddEvidence(ctx, ev.Value); err != nil { return nil, fmt.Errorf("failed to add evidence: %w", err) } return &coretypes.ResultBroadcastEvidence{Hash: ev.Value.Hash()}, nil diff --git a/internal/state/execution.go b/internal/state/execution.go index 688a5470b..edda788d2 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -174,7 +174,7 @@ func (blockExec *BlockExecutor) ProcessProposal( // If the block is invalid, it returns an error. // Validation does not mutate state, but does require historical information from the stateDB, // ie. to verify evidence from a validator at an old height. -func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) error { +func (blockExec *BlockExecutor) ValidateBlock(ctx context.Context, state State, block *types.Block) error { hash := block.Hash() if _, ok := blockExec.cache[hash.String()]; ok { return nil @@ -185,7 +185,7 @@ func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) e return err } - err = blockExec.evpool.CheckEvidence(block.Evidence) + err = blockExec.evpool.CheckEvidence(ctx, block.Evidence) if err != nil { return err } @@ -208,7 +208,7 @@ func (blockExec *BlockExecutor) ApplyBlock( ) (State, error) { // validate the block if we haven't already - if err := blockExec.ValidateBlock(state, block); err != nil { + if err := blockExec.ValidateBlock(ctx, state, block); err != nil { return state, ErrInvalidBlock(err) } @@ -255,7 +255,7 @@ func (blockExec *BlockExecutor) ApplyBlock( } // Update evpool with the latest state. - blockExec.evpool.Update(state, block.Evidence) + blockExec.evpool.Update(ctx, state, block.Evidence) // Update the app hash and save the state. state.AppHash = appHash diff --git a/internal/state/execution_test.go b/internal/state/execution_test.go index f3088dbac..56351eba6 100644 --- a/internal/state/execution_test.go +++ b/internal/state/execution_test.go @@ -216,8 +216,8 @@ func TestBeginBlockByzantineValidators(t *testing.T) { evpool := &mocks.EvidencePool{} evpool.On("PendingEvidence", mock.AnythingOfType("int64")).Return(ev, int64(100)) - evpool.On("Update", mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return() - evpool.On("CheckEvidence", mock.AnythingOfType("types.EvidenceList")).Return(nil) + evpool.On("Update", ctx, mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return() + evpool.On("CheckEvidence", ctx, mock.AnythingOfType("types.EvidenceList")).Return(nil) blockStore := store.NewBlockStore(dbm.NewMemDB()) diff --git a/internal/state/helpers_test.go b/internal/state/helpers_test.go index ed69b275b..ca384ffb9 100644 --- a/internal/state/helpers_test.go +++ b/internal/state/helpers_test.go @@ -75,7 +75,7 @@ func makeAndApplyGoodBlock( block, _, err := state.MakeBlock(height, factory.MakeTenTxs(height), lastCommit, evidence, proposerAddr) require.NoError(t, err) - require.NoError(t, blockExec.ValidateBlock(state, block)) + require.NoError(t, blockExec.ValidateBlock(ctx, state, block)) blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: types.PartSetHeader{Total: 3, Hash: tmrand.Bytes(32)}} state, err = blockExec.ApplyBlock(ctx, state, blockID, block) diff --git a/internal/state/mocks/evidence_pool.go b/internal/state/mocks/evidence_pool.go index 8bf4a9b64..04e8be7bc 100644 --- a/internal/state/mocks/evidence_pool.go +++ b/internal/state/mocks/evidence_pool.go @@ -3,8 +3,11 @@ package mocks import ( + context "context" + mock "github.com/stretchr/testify/mock" state "github.com/tendermint/tendermint/internal/state" + types "github.com/tendermint/tendermint/types" ) @@ -13,13 +16,13 @@ type EvidencePool struct { mock.Mock } -// AddEvidence provides a mock function with given fields: _a0 -func (_m *EvidencePool) AddEvidence(_a0 types.Evidence) error { - ret := _m.Called(_a0) +// AddEvidence provides a mock function with given fields: _a0, _a1 +func (_m *EvidencePool) AddEvidence(_a0 context.Context, _a1 types.Evidence) error { + ret := _m.Called(_a0, _a1) var r0 error - if rf, ok := ret.Get(0).(func(types.Evidence) error); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(context.Context, types.Evidence) error); ok { + r0 = rf(_a0, _a1) } else { r0 = ret.Error(0) } @@ -27,13 +30,13 @@ func (_m *EvidencePool) AddEvidence(_a0 types.Evidence) error { return r0 } -// CheckEvidence provides a mock function with given fields: _a0 -func (_m *EvidencePool) CheckEvidence(_a0 types.EvidenceList) error { - ret := _m.Called(_a0) +// CheckEvidence provides a mock function with given fields: _a0, _a1 +func (_m *EvidencePool) CheckEvidence(_a0 context.Context, _a1 types.EvidenceList) error { + ret := _m.Called(_a0, _a1) var r0 error - if rf, ok := ret.Get(0).(func(types.EvidenceList) error); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(context.Context, types.EvidenceList) error); ok { + r0 = rf(_a0, _a1) } else { r0 = ret.Error(0) } @@ -64,7 +67,7 @@ func (_m *EvidencePool) PendingEvidence(maxBytes int64) ([]types.Evidence, int64 return r0, r1 } -// Update provides a mock function with given fields: _a0, _a1 -func (_m *EvidencePool) Update(_a0 state.State, _a1 types.EvidenceList) { - _m.Called(_a0, _a1) +// Update provides a mock function with given fields: _a0, _a1, _a2 +func (_m *EvidencePool) Update(_a0 context.Context, _a1 state.State, _a2 types.EvidenceList) { + _m.Called(_a0, _a1, _a2) } diff --git a/internal/state/services.go b/internal/state/services.go index 2c9d312fb..5d04d2c82 100644 --- a/internal/state/services.go +++ b/internal/state/services.go @@ -1,6 +1,8 @@ package state import ( + "context" + "github.com/tendermint/tendermint/types" ) @@ -44,9 +46,9 @@ type BlockStore interface { // EvidencePool defines the EvidencePool interface used by State. type EvidencePool interface { PendingEvidence(maxBytes int64) (ev []types.Evidence, size int64) - AddEvidence(types.Evidence) error - Update(State, types.EvidenceList) - CheckEvidence(types.EvidenceList) error + AddEvidence(context.Context, types.Evidence) error + Update(context.Context, State, types.EvidenceList) + CheckEvidence(context.Context, types.EvidenceList) error } // EmptyEvidencePool is an empty implementation of EvidencePool, useful for testing. It also complies @@ -56,7 +58,9 @@ type EmptyEvidencePool struct{} func (EmptyEvidencePool) PendingEvidence(maxBytes int64) (ev []types.Evidence, size int64) { return nil, 0 } -func (EmptyEvidencePool) AddEvidence(types.Evidence) error { return nil } -func (EmptyEvidencePool) Update(State, types.EvidenceList) {} -func (EmptyEvidencePool) CheckEvidence(evList types.EvidenceList) error { return nil } +func (EmptyEvidencePool) AddEvidence(context.Context, types.Evidence) error { return nil } +func (EmptyEvidencePool) Update(context.Context, State, types.EvidenceList) {} +func (EmptyEvidencePool) CheckEvidence(ctx context.Context, evList types.EvidenceList) error { + return nil +} func (EmptyEvidencePool) ReportConflictingVotes(voteA, voteB *types.Vote) {} diff --git a/internal/state/validation_test.go b/internal/state/validation_test.go index 3fb723a27..4d78fde74 100644 --- a/internal/state/validation_test.go +++ b/internal/state/validation_test.go @@ -94,7 +94,7 @@ func TestValidateBlockHeader(t *testing.T) { block, err := statefactory.MakeBlock(state, height, lastCommit) require.NoError(t, err) tc.malleateBlock(block) - err = blockExec.ValidateBlock(state, block) + err = blockExec.ValidateBlock(ctx, state, block) t.Logf("%s: %v", tc.name, err) require.Error(t, err, tc.name) } @@ -110,7 +110,7 @@ func TestValidateBlockHeader(t *testing.T) { block, err := statefactory.MakeBlock(state, nextHeight, lastCommit) require.NoError(t, err) state.InitialHeight = nextHeight + 1 - err = blockExec.ValidateBlock(state, block) + err = blockExec.ValidateBlock(ctx, state, block) require.Error(t, err, "expected an error when state is ahead of block") assert.Contains(t, err.Error(), "lower than initial height") } @@ -164,7 +164,7 @@ func TestValidateBlockCommit(t *testing.T) { ) block, err := statefactory.MakeBlock(state, height, wrongHeightCommit) require.NoError(t, err) - err = blockExec.ValidateBlock(state, block) + err = blockExec.ValidateBlock(ctx, state, block) _, isErrInvalidCommitHeight := err.(types.ErrInvalidCommitHeight) require.True(t, isErrInvalidCommitHeight, "expected ErrInvalidCommitHeight at height %d but got: %v", height, err) @@ -173,7 +173,7 @@ func TestValidateBlockCommit(t *testing.T) { */ block, err = statefactory.MakeBlock(state, height, wrongSigsCommit) require.NoError(t, err) - err = blockExec.ValidateBlock(state, block) + err = blockExec.ValidateBlock(ctx, state, block) _, isErrInvalidCommitSignatures := err.(types.ErrInvalidCommitSignatures) require.True(t, isErrInvalidCommitSignatures, "expected ErrInvalidCommitSignatures at height %d, but got: %v", @@ -254,8 +254,8 @@ func TestValidateBlockEvidence(t *testing.T) { defaultEvidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC) evpool := &mocks.EvidencePool{} - evpool.On("CheckEvidence", mock.AnythingOfType("types.EvidenceList")).Return(nil) - evpool.On("Update", mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return() + evpool.On("CheckEvidence", ctx, mock.AnythingOfType("types.EvidenceList")).Return(nil) + evpool.On("Update", ctx, mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return() evpool.On("ABCIEvidence", mock.AnythingOfType("int64"), mock.AnythingOfType("[]types.Evidence")).Return( []abci.Evidence{}) @@ -290,7 +290,7 @@ func TestValidateBlockEvidence(t *testing.T) { block, _, err := state.MakeBlock(height, testfactory.MakeTenTxs(height), lastCommit, evidence, proposerAddr) require.NoError(t, err) - err = blockExec.ValidateBlock(state, block) + err = blockExec.ValidateBlock(ctx, state, block) if assert.Error(t, err) { _, ok := err.(*types.ErrEvidenceOverflow) require.True(t, ok, "expected error to be of type ErrEvidenceOverflow at height %d but got %v", height, err) diff --git a/node/node.go b/node/node.go index 275363c58..4c8534874 100644 --- a/node/node.go +++ b/node/node.go @@ -20,6 +20,7 @@ import ( "github.com/tendermint/tendermint/internal/blocksync" "github.com/tendermint/tendermint/internal/consensus" "github.com/tendermint/tendermint/internal/eventbus" + "github.com/tendermint/tendermint/internal/evidence" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/pex" @@ -264,7 +265,7 @@ func makeNode( } evReactor, evPool, err := createEvidenceReactor(ctx, - cfg, dbProvider, stateDB, blockStore, peerManager, router, logger, + cfg, dbProvider, stateDB, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus, ) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) @@ -689,6 +690,7 @@ type nodeMetrics struct { proxy *proxy.Metrics state *sm.Metrics statesync *statesync.Metrics + evidence *evidence.Metrics } // metricsProvider returns consensus, p2p, mempool, state, statesync Metrics. @@ -707,6 +709,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider { proxy: proxy.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), state: sm.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), statesync: statesync.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + evidence: evidence.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), } } return &nodeMetrics{ @@ -717,6 +720,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider { proxy: proxy.NopMetrics(), state: sm.NopMetrics(), statesync: statesync.NopMetrics(), + evidence: evidence.NopMetrics(), } } } diff --git a/node/node_test.go b/node/node_test.go index e42bf29d8..41cb1b6a9 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -298,7 +298,7 @@ func TestCreateProposalBlock(t *testing.T) { // Make EvidencePool evidenceDB := dbm.NewMemDB() blockStore := store.NewBlockStore(dbm.NewMemDB()) - evidencePool, err := evidence.NewPool(logger, evidenceDB, stateStore, blockStore) + evidencePool, err := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics()) require.NoError(t, err) // fill the evidence pool with more evidence @@ -357,7 +357,7 @@ func TestCreateProposalBlock(t *testing.T) { } assert.EqualValues(t, partSetFromHeader.ByteSize(), partSet.ByteSize()) - err = blockExec.ValidateBlock(state, block) + err = blockExec.ValidateBlock(ctx, state, block) assert.NoError(t, err) } diff --git a/node/setup.go b/node/setup.go index f88c4b9fc..9afd7911c 100644 --- a/node/setup.go +++ b/node/setup.go @@ -219,6 +219,8 @@ func createEvidenceReactor( peerManager *p2p.PeerManager, router *p2p.Router, logger log.Logger, + metrics *evidence.Metrics, + eventBus *eventbus.EventBus, ) (*evidence.Reactor, *evidence.Pool, error) { evidenceDB, err := dbProvider(&config.DBContext{ID: "evidence", Config: cfg}) if err != nil { @@ -227,11 +229,13 @@ func createEvidenceReactor( logger = logger.With("module", "evidence") - evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore) + evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore, metrics) if err != nil { return nil, nil, fmt.Errorf("creating evidence pool: %w", err) } + evidencePool.SetEventBus(eventBus) + evidenceReactor, err := evidence.NewReactor( ctx, logger, @@ -295,7 +299,6 @@ func createConsensusReactor( // Services which will be publishing and/or subscribing for messages (events) // consensusReactor will set it on consensusState and blockExecutor. reactor.SetEventBus(eventBus) - return reactor, consensusState, nil } diff --git a/types/events.go b/types/events.go index 3a3f64fd0..4ddfd0ba7 100644 --- a/types/events.go +++ b/types/events.go @@ -40,6 +40,10 @@ const ( EventTimeoutWaitValue = "TimeoutWait" EventValidBlockValue = "ValidBlock" EventVoteValue = "Vote" + + // Events emitted by the evidence reactor when evidence is validated + // and before it is committed + EventEvidenceValidatedValue = "EvidenceValidated" ) // Pre-populated ABCI Tendermint-reserved events @@ -104,6 +108,7 @@ func init() { jsontypes.MustRegister(EventDataTx{}) jsontypes.MustRegister(EventDataValidatorSetUpdates{}) jsontypes.MustRegister(EventDataVote{}) + jsontypes.MustRegister(EventDataEvidenceValidated{}) jsontypes.MustRegister(EventDataString("")) } @@ -223,6 +228,15 @@ type EventDataStateSyncStatus struct { // TypeTag implements the required method of jsontypes.Tagged. func (EventDataStateSyncStatus) TypeTag() string { return "tendermint/event/StateSyncStatus" } +type EventDataEvidenceValidated struct { + Evidence Evidence `json:"evidence"` + + Height int64 `json:"height,string"` +} + +// TypeTag implements the required method of jsontypes.Tagged. +func (EventDataEvidenceValidated) TypeTag() string { return "tendermint/event/EvidenceValidated" } + // PUBSUB const ( @@ -261,6 +275,7 @@ var ( EventQueryVote = QueryForEvent(EventVoteValue) EventQueryBlockSyncStatus = QueryForEvent(EventBlockSyncStatusValue) EventQueryStateSyncStatus = QueryForEvent(EventStateSyncStatusValue) + EventQueryEvidenceValidated = QueryForEvent(EventEvidenceValidatedValue) ) func EventQueryTxFor(tx Tx) *tmquery.Query {