Browse Source

types/events+evidence: emit events + metrics on evidence validation (#7802)

* event: Added Events after evidence validation; evidence: refactored AddEvidence

Added context and Metrics as parameter for the pool constructor

* evidence: pushed event firing into evidence pool and added metrics to represent the size of the evpool

* state: fixed parameters of evpool mock functions

* evidence: added test to confirm events are generated

* Removed obsolete EvidenceEventPublisher interface

* evidence: pool removed error on missing eventbus
pull/7874/head
Jasmina Malicevic 2 years ago
committed by GitHub
parent
commit
e80541a251
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 392 additions and 122 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +1
    -0
      docs/nodes/metrics.md
  3. +2
    -1
      internal/consensus/byzantine_test.go
  4. +3
    -2
      internal/consensus/reactor_test.go
  5. +3
    -3
      internal/consensus/state.go
  6. +4
    -0
      internal/eventbus/event_bus.go
  7. +42
    -0
      internal/eventbus/event_bus_test.go
  8. +47
    -0
      internal/evidence/metrics.go
  9. +41
    -13
      internal/evidence/pool.go
  10. +115
    -28
      internal/evidence/pool_test.go
  11. +9
    -6
      internal/evidence/reactor.go
  12. +18
    -6
      internal/evidence/reactor_test.go
  13. +4
    -3
      internal/evidence/verify.go
  14. +34
    -21
      internal/evidence/verify_test.go
  15. +1
    -1
      internal/rpc/core/evidence.go
  16. +4
    -4
      internal/state/execution.go
  17. +2
    -2
      internal/state/execution_test.go
  18. +1
    -1
      internal/state/helpers_test.go
  19. +16
    -13
      internal/state/mocks/evidence_pool.go
  20. +10
    -6
      internal/state/services.go
  21. +7
    -7
      internal/state/validation_test.go
  22. +5
    -1
      node/node.go
  23. +2
    -2
      node/node_test.go
  24. +5
    -2
      node/setup.go
  25. +15
    -0
      types/events.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -63,6 +63,7 @@ Special thanks to external contributors on this release:
- [internal/protoio] \#7325 Optimized `MarshalDelimited` by inlining the common case and using a `sync.Pool` in the worst case. (@odeke-em)
- [consensus] \#6969 remove logic to 'unlock' a locked block.
- [evidence] \#7700 Evidence messages contain single Evidence instead of EvidenceList (@jmalicevic)
- [evidence] \#7802 Evidence pool emits events when evidence is validated and updates a metric when the number of evidence in the evidence pool changes. (@jmalicevic)
- [pubsub] \#7319 Performance improvements for the event query API (@creachadair)
- [node] \#7521 Define concrete type for seed node implementation (@spacech1mp)
- [rpc] \#7612 paginate mempool /unconfirmed_txs rpc endpoint (@spacech1mp)


+ 1
- 0
docs/nodes/metrics.md View File

@ -40,6 +40,7 @@ The following metrics are available:
| consensus_fast_syncing | gauge | | either 0 (not fast syncing) or 1 (syncing) |
| consensus_state_syncing | gauge | | either 0 (not state syncing) or 1 (syncing) |
| consensus_block_size_bytes | Gauge | | Block size in bytes |
| evidence_pool_num_evidence | Gauge | | Number of evidence in the evidence pool
| p2p_peers | Gauge | | Number of peers node's connected to |
| p2p_peer_receive_bytes_total | counter | peer_id, chID | number of bytes per channel received from a given peer |
| p2p_peer_send_bytes_total | counter | peer_id, chID | number of bytes per channel sent to a given peer |


+ 2
- 1
internal/consensus/byzantine_test.go View File

@ -90,7 +90,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// Make a full instance of the evidence pool
evidenceDB := dbm.NewMemDB()
evpool, err := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore)
evpool, err := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
// Make State
@ -104,6 +104,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
err = eventBus.Start(ctx)
require.NoError(t, err)
cs.SetEventBus(eventBus)
evpool.SetEventBus(eventBus)
cs.SetTimeoutTicker(tickerFunc())


+ 3
- 2
internal/consensus/reactor_test.go View File

@ -427,14 +427,15 @@ func TestReactorWithEvidence(t *testing.T) {
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(ctx, 1, defaultTestTime, privVals[vIdx], cfg.ChainID())
require.NoError(t, err)
evpool := &statemocks.EvidencePool{}
evpool.On("CheckEvidence", mock.AnythingOfType("types.EvidenceList")).Return(nil)
evpool.On("CheckEvidence", ctx, mock.AnythingOfType("types.EvidenceList")).Return(nil)
evpool.On("PendingEvidence", mock.AnythingOfType("int64")).Return([]types.Evidence{
ev}, int64(len(ev.Bytes())))
evpool.On("Update", mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return()
evpool.On("Update", ctx, mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return()
evpool2 := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(ctx, logger.With("validator", i, "module", "consensus"),
thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs.SetPrivValidator(ctx, pv)


+ 3
- 3
internal/consensus/state.go View File

@ -1432,7 +1432,7 @@ func (cs *State) defaultDoPrevote(ctx context.Context, height int64, round int32
}
// Validate proposal block, from Tendermint's perspective
err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock)
err := cs.blockExec.ValidateBlock(ctx, cs.state, cs.ProposalBlock)
if err != nil {
// ProposalBlock is invalid, prevote nil.
logger.Error("prevote step: consensus deems this block invalid; prevoting nil",
@ -1651,7 +1651,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32)
logger.Debug("precommit step: +2/3 prevoted proposal block; locking", "hash", blockID.Hash)
// Validate the block.
if err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock); err != nil {
if err := cs.blockExec.ValidateBlock(ctx, cs.state, cs.ProposalBlock); err != nil {
panic(fmt.Sprintf("precommit step: +2/3 prevoted for an invalid block %v; relocking", err))
}
@ -1831,7 +1831,7 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) {
panic("cannot finalize commit; proposal block does not hash to commit hash")
}
if err := cs.blockExec.ValidateBlock(cs.state, block); err != nil {
if err := cs.blockExec.ValidateBlock(ctx, cs.state, block); err != nil {
panic(fmt.Errorf("+2/3 committed an invalid block: %w", err))
}


+ 4
- 0
internal/eventbus/event_bus.go View File

@ -198,6 +198,10 @@ func (b *EventBus) PublishEventValidatorSetUpdates(ctx context.Context, data typ
return b.Publish(ctx, types.EventValidatorSetUpdatesValue, data)
}
func (b *EventBus) PublishEventEvidenceValidated(ctx context.Context, evidence types.EventDataEvidenceValidated) error {
return b.Publish(ctx, types.EventEvidenceValidatedValue, evidence)
}
//-----------------------------------------------------------------------------
// NopEventBus implements a types.BlockEventPublisher that discards all events.


+ 42
- 0
internal/eventbus/event_bus_test.go View File

@ -293,6 +293,48 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
}
}
func TestEventBusPublishEventEvidenceValidated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventBus := eventbus.NewDefault(log.TestingLogger())
err := eventBus.Start(ctx)
require.NoError(t, err)
ev, err := types.NewMockDuplicateVoteEvidence(ctx, 1, time.Now(), "test-chain-id")
require.NoError(t, err)
const query = `tm.event='EvidenceValidated'`
evSub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{
ClientID: "test",
Query: tmquery.MustCompile(query),
})
require.NoError(t, err)
done := make(chan struct{})
go func() {
defer close(done)
msg, err := evSub.Next(ctx)
assert.NoError(t, err)
edt := msg.Data().(types.EventDataEvidenceValidated)
assert.Equal(t, ev, edt.Evidence)
assert.Equal(t, int64(1), edt.Height)
}()
err = eventBus.PublishEventEvidenceValidated(ctx, types.EventDataEvidenceValidated{
Evidence: ev,
Height: int64(1),
})
assert.NoError(t, err)
select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a block header after 1 sec.")
}
}
func TestEventBusPublishEventNewEvidence(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()


+ 47
- 0
internal/evidence/metrics.go View File

@ -0,0 +1,47 @@
package evidence
import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "evidence_pool"
)
// Metrics contains metrics exposed by this package.
// see MetricsProvider for descriptions.
type Metrics struct {
// Number of evidence in the evidence pool
NumEvidence metrics.Gauge
}
// PrometheusMetrics returns Metrics build using Prometheus client library.
// Optionally, labels can be provided along with their values ("foo",
// "fooValue").
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
NumEvidence: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "num_evidence",
Help: "Number of pending evidence in evidence pool.",
}, labels).With(labelsAndValues...),
}
}
// NopMetrics returns no-op Metrics.
func NopMetrics() *Metrics {
return &Metrics{
NumEvidence: discard.NewGauge(),
}
}

+ 41
- 13
internal/evidence/pool.go View File

@ -2,6 +2,7 @@ package evidence
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
@ -13,6 +14,7 @@ import (
"github.com/google/orderedcode"
dbm "github.com/tendermint/tm-db"
"github.com/tendermint/tendermint/internal/eventbus"
clist "github.com/tendermint/tendermint/internal/libs/clist"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/libs/log"
@ -49,11 +51,22 @@ type Pool struct {
pruningHeight int64
pruningTime time.Time
// Eventbus to emit events when evidence is validated
// Not part of the constructor, use SetEventBus to set it
// The eventBus must be started in order for event publishing not to block
eventBus *eventbus.EventBus
Metrics *Metrics
}
func (evpool *Pool) SetEventBus(e *eventbus.EventBus) {
evpool.eventBus = e
}
// NewPool creates an evidence pool. If using an existing evidence store,
// it will add all pending evidence to the concurrent list.
func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore) (*Pool, error) {
func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore, metrics *Metrics) (*Pool, error) {
state, err := stateDB.Load()
if err != nil {
return nil, fmt.Errorf("failed to load state: %w", err)
@ -67,6 +80,7 @@ func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore
evidenceStore: evidenceDB,
evidenceList: clist.New(),
consensusBuffer: make([]duplicateVoteSet, 0),
Metrics: metrics,
}
// If pending evidence already in db, in event of prior failure, then check
@ -78,10 +92,12 @@ func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore
}
atomic.StoreUint32(&pool.evidenceSize, uint32(len(evList)))
pool.Metrics.NumEvidence.Set(float64(pool.evidenceSize))
for _, ev := range evList {
pool.evidenceList.PushBack(ev)
}
pool.eventBus = nil
return pool, nil
}
@ -108,7 +124,7 @@ func (evpool *Pool) PendingEvidence(maxBytes int64) ([]types.Evidence, int64) {
// 2. Update the pool's state which contains evidence params relating to expiry.
// 3. Moves pending evidence that has now been committed into the committed pool.
// 4. Removes any expired evidence based on both height and time.
func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) {
func (evpool *Pool) Update(ctx context.Context, state sm.State, ev types.EvidenceList) {
// sanity check
if state.LastBlockHeight <= evpool.state.LastBlockHeight {
panic(fmt.Sprintf(
@ -126,7 +142,7 @@ func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) {
// flush conflicting vote pairs from the buffer, producing DuplicateVoteEvidence and
// adding it to the pool
evpool.processConsensusBuffer(state)
evpool.processConsensusBuffer(ctx, state)
// update state
evpool.updateState(state)
@ -142,7 +158,7 @@ func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) {
}
// AddEvidence checks the evidence is valid and adds it to the pool.
func (evpool *Pool) AddEvidence(ev types.Evidence) error {
func (evpool *Pool) AddEvidence(ctx context.Context, ev types.Evidence) error {
evpool.logger.Debug("attempting to add evidence", "evidence", ev)
// We have already verified this piece of evidence - no need to do it again
@ -160,12 +176,12 @@ func (evpool *Pool) AddEvidence(ev types.Evidence) error {
}
// 1) Verify against state.
if err := evpool.verify(ev); err != nil {
if err := evpool.verify(ctx, ev); err != nil {
return err
}
// 2) Save to store.
if err := evpool.addPendingEvidence(ev); err != nil {
if err := evpool.addPendingEvidence(ctx, ev); err != nil {
return fmt.Errorf("failed to add evidence to pending list: %w", err)
}
@ -198,7 +214,7 @@ func (evpool *Pool) ReportConflictingVotes(voteA, voteB *types.Vote) {
// If it has already verified the evidence then it jumps to the next one. It ensures that no
// evidence has already been committed or is being proposed twice. It also adds any
// evidence that it doesn't currently have so that it can quickly form ABCI Evidence later.
func (evpool *Pool) CheckEvidence(evList types.EvidenceList) error {
func (evpool *Pool) CheckEvidence(ctx context.Context, evList types.EvidenceList) error {
hashes := make([][]byte, len(evList))
for idx, ev := range evList {
@ -212,12 +228,12 @@ func (evpool *Pool) CheckEvidence(evList types.EvidenceList) error {
return &types.ErrInvalidEvidence{Evidence: ev, Reason: errors.New("evidence was already committed")}
}
err := evpool.verify(ev)
err := evpool.verify(ctx, ev)
if err != nil {
return err
}
if err := evpool.addPendingEvidence(ev); err != nil {
if err := evpool.addPendingEvidence(ctx, ev); err != nil {
// Something went wrong with adding the evidence but we already know it is valid
// hence we log an error and continue
evpool.logger.Error("failed to add evidence to pending list", "err", err, "evidence", ev)
@ -297,7 +313,7 @@ func (evpool *Pool) isPending(evidence types.Evidence) bool {
return ok
}
func (evpool *Pool) addPendingEvidence(ev types.Evidence) error {
func (evpool *Pool) addPendingEvidence(ctx context.Context, ev types.Evidence) error {
evpb, err := types.EvidenceToProto(ev)
if err != nil {
return fmt.Errorf("failed to convert to proto: %w", err)
@ -316,7 +332,18 @@ func (evpool *Pool) addPendingEvidence(ev types.Evidence) error {
}
atomic.AddUint32(&evpool.evidenceSize, 1)
return nil
evpool.Metrics.NumEvidence.Set(float64(evpool.evidenceSize))
// This should normally never be true
if evpool.eventBus == nil {
evpool.logger.Debug("event bus is not configured")
return nil
}
return evpool.eventBus.PublishEventEvidenceValidated(ctx, types.EventDataEvidenceValidated{
Evidence: ev,
Height: ev.Height(),
})
}
// markEvidenceAsCommitted processes all the evidence in the block, marking it as
@ -368,6 +395,7 @@ func (evpool *Pool) markEvidenceAsCommitted(evidence types.EvidenceList, height
// update the evidence size
atomic.AddUint32(&evpool.evidenceSize, ^uint32(len(blockEvidenceMap)-1))
evpool.Metrics.NumEvidence.Set(float64(evpool.evidenceSize))
}
// listEvidence retrieves lists evidence from oldest to newest within maxBytes.
@ -513,7 +541,7 @@ func (evpool *Pool) updateState(state sm.State) {
// into DuplicateVoteEvidence. It sets the evidence timestamp to the block height
// from the most recently committed block.
// Evidence is then added to the pool so as to be ready to be broadcasted and proposed.
func (evpool *Pool) processConsensusBuffer(state sm.State) {
func (evpool *Pool) processConsensusBuffer(ctx context.Context, state sm.State) {
evpool.mtx.Lock()
defer evpool.mtx.Unlock()
for _, voteSet := range evpool.consensusBuffer {
@ -578,7 +606,7 @@ func (evpool *Pool) processConsensusBuffer(state sm.State) {
continue
}
if err := evpool.addPendingEvidence(dve); err != nil {
if err := evpool.addPendingEvidence(ctx, dve); err != nil {
evpool.logger.Error("failed to flush evidence from consensus buffer to pending list: %w", err)
continue
}


+ 115
- 28
internal/evidence/pool_test.go View File

@ -11,6 +11,7 @@ import (
dbm "github.com/tendermint/tm-db"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/evidence/mocks"
sm "github.com/tendermint/tendermint/internal/state"
@ -21,6 +22,9 @@ import (
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
tmpubsub "github.com/tendermint/tendermint/internal/pubsub"
tmquery "github.com/tendermint/tendermint/internal/pubsub/query"
)
const evidenceChainID = "test_chain"
@ -40,7 +44,6 @@ func TestEvidencePoolBasic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
valSet, privVals := factory.ValidatorSet(ctx, t, 1, 10)
blockStore.On("LoadBlockMeta", mock.AnythingOfType("int64")).Return(
&types.BlockMeta{Header: types.Header{Time: defaultEvidenceTime}},
@ -48,9 +51,9 @@ func TestEvidencePoolBasic(t *testing.T) {
stateStore.On("LoadValidators", mock.AnythingOfType("int64")).Return(valSet, nil)
stateStore.On("Load").Return(createState(height+1, valSet), nil)
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
require.NoError(t, setupEventBus(ctx, pool))
// evidence not seen yet:
evs, size := pool.PendingEvidence(defaultEvidenceMaxBytes)
require.Equal(t, 0, len(evs))
@ -66,7 +69,8 @@ func TestEvidencePoolBasic(t *testing.T) {
}()
// evidence seen but not yet committed:
require.NoError(t, pool.AddEvidence(ev))
err = pool.AddEvidence(ctx, ev)
require.NoError(t, err)
select {
case <-evAdded:
@ -83,7 +87,8 @@ func TestEvidencePoolBasic(t *testing.T) {
require.Equal(t, evidenceBytes, size) // check that the size of the single evidence in bytes is correct
// shouldn't be able to add evidence twice
require.NoError(t, pool.AddEvidence(ev))
err = pool.AddEvidence(ctx, ev)
require.NoError(t, err)
evs, _ = pool.PendingEvidence(defaultEvidenceMaxBytes)
require.Equal(t, 1, len(evs))
}
@ -110,9 +115,11 @@ func TestAddExpiredEvidence(t *testing.T) {
return &types.BlockMeta{Header: types.Header{Time: expiredEvidenceTime}}
})
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
require.NoError(t, setupEventBus(ctx, pool))
testCases := []struct {
evHeight int64
evTime time.Time
@ -136,7 +143,7 @@ func TestAddExpiredEvidence(t *testing.T) {
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(ctx, tc.evHeight, tc.evTime, val, evidenceChainID)
require.NoError(t, err)
err = pool.AddEvidence(ev)
err = pool.AddEvidence(ctx, ev)
if tc.expErr {
require.Error(t, err)
} else {
@ -153,6 +160,9 @@ func TestReportConflictingVotes(t *testing.T) {
defer cancel()
pool, pv := defaultTestPool(ctx, t, height)
require.NoError(t, setupEventBus(ctx, pool))
val := types.NewValidator(pv.PrivKey.PubKey(), 10)
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(ctx, height+1, defaultEvidenceTime, pv, evidenceChainID)
@ -176,7 +186,7 @@ func TestReportConflictingVotes(t *testing.T) {
state.LastBlockHeight++
state.LastBlockTime = ev.Time()
state.LastValidators = types.NewValidatorSet([]*types.Validator{val})
pool.Update(state, []types.Evidence{})
pool.Update(ctx, state, []types.Evidence{})
// should be able to retrieve evidence from pool
evList, _ = pool.PendingEvidence(defaultEvidenceMaxBytes)
@ -192,6 +202,9 @@ func TestEvidencePoolUpdate(t *testing.T) {
defer cancel()
pool, val := defaultTestPool(ctx, t, height)
require.NoError(t, setupEventBus(ctx, pool))
state := pool.State()
// create two lots of old evidence that we expect to be pruned when we update
@ -211,8 +224,8 @@ func TestEvidencePoolUpdate(t *testing.T) {
)
require.NoError(t, err)
require.NoError(t, pool.AddEvidence(prunedEv))
require.NoError(t, pool.AddEvidence(notPrunedEv))
require.NoError(t, pool.AddEvidence(ctx, prunedEv))
require.NoError(t, pool.AddEvidence(ctx, notPrunedEv))
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx,
@ -234,21 +247,21 @@ func TestEvidencePoolUpdate(t *testing.T) {
require.Equal(t, uint32(2), pool.Size())
require.NoError(t, pool.CheckEvidence(types.EvidenceList{ev}))
require.NoError(t, pool.CheckEvidence(ctx, types.EvidenceList{ev}))
evList, _ = pool.PendingEvidence(3 * defaultEvidenceMaxBytes)
require.Equal(t, 3, len(evList))
require.Equal(t, uint32(3), pool.Size())
pool.Update(state, block.Evidence)
pool.Update(ctx, state, block.Evidence)
// a) Update marks evidence as committed so pending evidence should be empty
evList, _ = pool.PendingEvidence(defaultEvidenceMaxBytes)
require.Equal(t, []types.Evidence{notPrunedEv}, evList)
// b) If we try to check this evidence again it should fail because it has already been committed
err = pool.CheckEvidence(types.EvidenceList{ev})
err = pool.CheckEvidence(ctx, types.EvidenceList{ev})
if assert.Error(t, err) {
assert.Equal(t, "evidence was already committed", err.(*types.ErrInvalidEvidence).Reason.Error())
}
@ -261,6 +274,9 @@ func TestVerifyPendingEvidencePasses(t *testing.T) {
defer cancel()
pool, val := defaultTestPool(ctx, t, height)
require.NoError(t, setupEventBus(ctx, pool))
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx,
height,
@ -269,8 +285,8 @@ func TestVerifyPendingEvidencePasses(t *testing.T) {
evidenceChainID,
)
require.NoError(t, err)
require.NoError(t, pool.AddEvidence(ev))
require.NoError(t, pool.CheckEvidence(types.EvidenceList{ev}))
require.NoError(t, pool.AddEvidence(ctx, ev))
require.NoError(t, pool.CheckEvidence(ctx, types.EvidenceList{ev}))
}
func TestVerifyDuplicatedEvidenceFails(t *testing.T) {
@ -281,6 +297,8 @@ func TestVerifyDuplicatedEvidenceFails(t *testing.T) {
pool, val := defaultTestPool(ctx, t, height)
require.NoError(t, setupEventBus(ctx, pool))
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx,
height,
@ -290,12 +308,62 @@ func TestVerifyDuplicatedEvidenceFails(t *testing.T) {
)
require.NoError(t, err)
err = pool.CheckEvidence(types.EvidenceList{ev, ev})
err = pool.CheckEvidence(ctx, types.EvidenceList{ev, ev})
if assert.Error(t, err) {
assert.Equal(t, "duplicate evidence", err.(*types.ErrInvalidEvidence).Reason.Error())
}
}
// Check that we generate events when evidence is added into the evidence pool
func TestEventOnEvidenceValidated(t *testing.T) {
const height = 1
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, val := defaultTestPool(ctx, t, height)
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx,
height,
defaultEvidenceTime.Add(1*time.Minute),
val,
evidenceChainID,
)
require.NoError(t, err)
eventBus := eventbus.NewDefault(log.TestingLogger())
require.NoError(t, eventBus.Start(ctx))
pool.SetEventBus(eventBus)
const query = `tm.event='EvidenceValidated'`
evSub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{
ClientID: "test",
Query: tmquery.MustCompile(query),
})
require.NoError(t, err)
done := make(chan struct{})
go func() {
defer close(done)
msg, err := evSub.Next(ctx)
assert.NoError(t, err)
edt := msg.Data().(types.EventDataEvidenceValidated)
assert.Equal(t, ev, edt.Evidence)
}()
err = pool.AddEvidence(ctx, ev)
require.NoError(t, err)
select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a block header after 1 sec.")
}
}
// check that valid light client evidence is correctly validated and stored in
// evidence pool
func TestLightClientAttackEvidenceLifecycle(t *testing.T) {
@ -326,32 +394,38 @@ func TestLightClientAttackEvidenceLifecycle(t *testing.T) {
blockStore.On("LoadBlockCommit", height).Return(trusted.Commit)
blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
require.NoError(t, setupEventBus(ctx, pool))
hash := ev.Hash()
require.NoError(t, pool.AddEvidence(ev))
require.NoError(t, pool.AddEvidence(ev))
err = pool.AddEvidence(ctx, ev)
require.NoError(t, err)
err = pool.AddEvidence(ctx, ev)
require.NoError(t, err)
pendingEv, _ := pool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes)
require.Equal(t, 1, len(pendingEv))
require.Equal(t, ev, pendingEv[0])
require.NoError(t, pool.CheckEvidence(pendingEv))
require.NoError(t, pool.CheckEvidence(ctx, pendingEv))
require.Equal(t, ev, pendingEv[0])
state.LastBlockHeight++
state.LastBlockTime = state.LastBlockTime.Add(1 * time.Minute)
pool.Update(state, pendingEv)
pool.Update(ctx, state, pendingEv)
require.Equal(t, hash, pendingEv[0].Hash())
remaindingEv, _ := pool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes)
require.Empty(t, remaindingEv)
// evidence is already committed so it shouldn't pass
require.Error(t, pool.CheckEvidence(types.EvidenceList{ev}))
require.NoError(t, pool.AddEvidence(ev))
require.Error(t, pool.CheckEvidence(ctx, types.EvidenceList{ev}))
err = pool.AddEvidence(ctx, ev)
require.NoError(t, err)
remaindingEv, _ = pool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes)
require.Empty(t, remaindingEv)
@ -376,9 +450,11 @@ func TestRecoverPendingEvidence(t *testing.T) {
require.NoError(t, err)
// create previous pool and populate it
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
require.NoError(t, setupEventBus(ctx, pool))
goodEvidence, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx,
height,
@ -396,8 +472,10 @@ func TestRecoverPendingEvidence(t *testing.T) {
)
require.NoError(t, err)
require.NoError(t, pool.AddEvidence(goodEvidence))
require.NoError(t, pool.AddEvidence(expiredEvidence))
err = pool.AddEvidence(ctx, goodEvidence)
require.NoError(t, err)
err = pool.AddEvidence(ctx, expiredEvidence)
require.NoError(t, err)
// now recover from the previous pool at a different time
newStateStore := &smmocks.Store{}
@ -417,7 +495,7 @@ func TestRecoverPendingEvidence(t *testing.T) {
},
}, nil)
newPool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, newStateStore, blockStore)
newPool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, newStateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
evList, _ := newPool.PendingEvidence(defaultEvidenceMaxBytes)
@ -523,7 +601,7 @@ func defaultTestPool(ctx context.Context, t *testing.T, height int64) (*evidence
blockStore, err := initializeBlockStore(dbm.NewMemDB(), state, valAddress)
require.NoError(t, err)
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err, "test evidence pool could not be created")
return pool, val
@ -538,3 +616,12 @@ func createState(height int64, valSet *types.ValidatorSet) sm.State {
ConsensusParams: *types.DefaultConsensusParams(),
}
}
func setupEventBus(ctx context.Context, evpool *evidence.Pool) error {
eventBus := eventbus.NewDefault(log.TestingLogger())
if err := eventBus.Start(ctx); err != nil {
return err
}
evpool.SetEventBus(eventBus)
return nil
}

+ 9
- 6
internal/evidence/reactor.go View File

@ -50,7 +50,8 @@ type Reactor struct {
evidenceCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
mtx sync.Mutex
mtx sync.Mutex
peerRoutines map[types.NodeID]context.CancelFunc
}
@ -78,6 +79,7 @@ func NewReactor(
}
r.BaseService = *service.NewBaseService(logger, "Evidence", r)
return r, err
}
@ -103,7 +105,7 @@ func (r *Reactor) OnStop() {
// It returns an error only if the Envelope.Message is unknown for this channel
// or if the given evidence is invalid. This should never be called outside of
// handleMessage.
func (r *Reactor) handleEvidenceMessage(envelope *p2p.Envelope) error {
func (r *Reactor) handleEvidenceMessage(ctx context.Context, envelope *p2p.Envelope) error {
logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
@ -115,12 +117,13 @@ func (r *Reactor) handleEvidenceMessage(envelope *p2p.Envelope) error {
logger.Error("failed to convert evidence", "err", err)
return err
}
if err := r.evpool.AddEvidence(ev); err != nil {
if err := r.evpool.AddEvidence(ctx, ev); err != nil {
// If we're given invalid evidence by the peer, notify the router that
// we should remove this peer by returning an error.
if _, ok := err.(*types.ErrInvalidEvidence); ok {
return err
}
}
default:
@ -133,7 +136,7 @@ func (r *Reactor) handleEvidenceMessage(envelope *p2p.Envelope) error {
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@ -149,7 +152,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope *p2p.Envelope) (err
switch chID {
case EvidenceChannel:
err = r.handleEvidenceMessage(envelope)
err = r.handleEvidenceMessage(ctx, envelope)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
@ -164,7 +167,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context) {
iter := r.evidenceCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(r.evidenceCh.ID, envelope); err != nil {
if err := r.handleMessage(ctx, r.evidenceCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.evidenceCh.ID, "envelope", envelope, "err", err)
if serr := r.evidenceCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,


+ 18
- 6
internal/evidence/reactor_test.go View File

@ -17,6 +17,7 @@ import (
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/evidence/mocks"
"github.com/tendermint/tendermint/internal/p2p"
@ -69,6 +70,7 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint
idx := 0
evidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
for nodeID := range rts.network.Nodes {
logger := rts.logger.With("validator", idx)
evidenceDB := dbm.NewMemDB()
@ -80,9 +82,13 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint
}
return nil
})
rts.pools[nodeID], err = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore)
rts.pools[nodeID], err = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore, evidence.NopMetrics())
require.NoError(t, err)
eventBus := eventbus.NewDefault(logger)
err = eventBus.Start(ctx)
require.NoError(t, err)
rts.pools[nodeID].SetEventBus(eventBus)
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
@ -219,7 +225,8 @@ func createEvidenceList(
evidenceChainID,
)
require.NoError(t, err)
require.NoError(t, pool.AddEvidence(ev),
err = pool.AddEvidence(ctx, ev)
require.NoError(t, err,
"adding evidence it#%d of %d to pool with height %d",
i, numEvidence, pool.State().LastBlockHeight)
evList[i] = ev
@ -284,12 +291,14 @@ func TestReactorBroadcastEvidence(t *testing.T) {
}
rts := setup(ctx, t, stateDBs, 0)
rts.start(ctx, t)
// Create a series of fixtures where each suite contains a reactor and
// evidence pool. In addition, we mark a primary suite and the rest are
// secondaries where each secondary is added as a peer via a PeerUpdate to the
// primary. As a result, the primary will gossip all evidence to each secondary.
primary := rts.network.RandomNode()
secondaries := make([]*p2ptest.Node, 0, len(rts.network.NodeIDs())-1)
secondaryIDs := make([]types.NodeID, 0, cap(secondaries))
@ -320,6 +329,7 @@ func TestReactorBroadcastEvidence(t *testing.T) {
for _, pool := range rts.pools {
require.Equal(t, numEvidence, int(pool.Size()))
}
}
// TestReactorSelectiveBroadcast tests a context where we have two reactors
@ -381,7 +391,8 @@ func TestReactorBroadcastEvidence_Pending(t *testing.T) {
// Manually add half the evidence to the secondary which will mark them as
// pending.
for i := 0; i < numEvidence/2; i++ {
require.NoError(t, rts.pools[secondary.NodeID].AddEvidence(evList[i]))
err := rts.pools[secondary.NodeID].AddEvidence(ctx, evList[i])
require.NoError(t, err)
}
// the secondary should have half the evidence as pending
@ -423,7 +434,8 @@ func TestReactorBroadcastEvidence_Committed(t *testing.T) {
// Manually add half the evidence to the secondary which will mark them as
// pending.
for i := 0; i < numEvidence/2; i++ {
require.NoError(t, rts.pools[secondary.NodeID].AddEvidence(evList[i]))
err := rts.pools[secondary.NodeID].AddEvidence(ctx, evList[i])
require.NoError(t, err)
}
// the secondary should have half the evidence as pending
@ -434,7 +446,7 @@ func TestReactorBroadcastEvidence_Committed(t *testing.T) {
// update the secondary's pool such that all pending evidence is committed
state.LastBlockHeight++
rts.pools[secondary.NodeID].Update(state, evList[:numEvidence/2])
rts.pools[secondary.NodeID].Update(ctx, state, evList[:numEvidence/2])
// the secondary should have half the evidence as committed
require.Equal(t, 0, int(rts.pools[secondary.NodeID].Size()))
@ -496,7 +508,7 @@ func TestReactorBroadcastEvidence_FullyConnected(t *testing.T) {
// commit state so we do not continue to repeat gossiping the same evidence
state := pool.State()
state.LastBlockHeight++
pool.Update(state, evList)
pool.Update(ctx, state, evList)
}
}


+ 4
- 3
internal/evidence/verify.go View File

@ -2,6 +2,7 @@ package evidence
import (
"bytes"
"context"
"errors"
"fmt"
"time"
@ -21,7 +22,7 @@ import (
// set for. In these cases, we do not return a ErrInvalidEvidence as not to have
// the sending peer disconnect. All other errors are treated as invalid evidence
// (i.e. ErrInvalidEvidence).
func (evpool *Pool) verify(evidence types.Evidence) error {
func (evpool *Pool) verify(ctx context.Context, evidence types.Evidence) error {
var (
state = evpool.State()
height = state.LastBlockHeight
@ -74,7 +75,7 @@ func (evpool *Pool) verify(evidence types.Evidence) error {
if err := ev.ValidateABCI(val, valSet, evTime); err != nil {
ev.GenerateABCI(val, valSet, evTime)
if addErr := evpool.addPendingEvidence(ev); addErr != nil {
if addErr := evpool.addPendingEvidence(ctx, ev); addErr != nil {
evpool.logger.Error("adding pending duplicate vote evidence failed", "err", addErr)
}
return err
@ -134,7 +135,7 @@ func (evpool *Pool) verify(evidence types.Evidence) error {
// evidence and return an error
if err := ev.ValidateABCI(commonVals, trustedHeader, evTime); err != nil {
ev.GenerateABCI(commonVals, trustedHeader, evTime)
if addErr := evpool.addPendingEvidence(ev); addErr != nil {
if addErr := evpool.addPendingEvidence(ctx, ev); addErr != nil {
evpool.logger.Error("adding pending light client attack evidence failed", "err", addErr)
}
return err


+ 34
- 21
internal/evidence/verify_test.go View File

@ -96,12 +96,12 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) {
blockStore.On("LoadBlockMeta", height).Return(&types.BlockMeta{Header: *trusted.Header})
blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit)
blockStore.On("LoadBlockCommit", height).Return(trusted.Commit)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
evList := types.EvidenceList{ev}
// check that the evidence pool correctly verifies the evidence
assert.NoError(t, pool.CheckEvidence(evList))
assert.NoError(t, pool.CheckEvidence(ctx, evList))
// as it was not originally in the pending bucket, it should now have been added
pendingEvs, _ := pool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes)
@ -112,28 +112,33 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) {
// should return an error
ev.ByzantineValidators = ev.ByzantineValidators[:1]
t.Log(evList)
assert.Error(t, pool.CheckEvidence(evList))
assert.Error(t, pool.CheckEvidence(ctx, evList))
// restore original byz vals
ev.ByzantineValidators = ev.GetByzantineValidators(common.ValidatorSet, trusted.SignedHeader)
// duplicate evidence should be rejected
evList = types.EvidenceList{ev, ev}
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
assert.Error(t, pool.CheckEvidence(evList))
assert.Error(t, pool.CheckEvidence(ctx, evList))
// If evidence is submitted with an altered timestamp it should return an error
ev.Timestamp = defaultEvidenceTime.Add(1 * time.Minute)
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
assert.Error(t, pool.AddEvidence(ev))
require.NoError(t, setupEventBus(ctx, pool))
err = pool.AddEvidence(ctx, ev)
assert.Error(t, err)
ev.Timestamp = defaultEvidenceTime
// Evidence submitted with a different validator power should fail
ev.TotalVotingPower = 1
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
assert.Error(t, pool.AddEvidence(ev))
err = pool.AddEvidence(ctx, ev)
assert.Error(t, err)
ev.TotalVotingPower = common.ValidatorSet.TotalVotingPower()
}
@ -174,11 +179,13 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) {
blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit)
blockStore.On("LoadBlockCommit", nodeHeight).Return(trusted.Commit)
blockStore.On("Height").Return(nodeHeight)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
require.NoError(t, setupEventBus(ctx, pool))
// check that the evidence pool correctly verifies the evidence
assert.NoError(t, pool.CheckEvidence(types.EvidenceList{ev}))
assert.NoError(t, pool.CheckEvidence(ctx, types.EvidenceList{ev}))
// now we use a time which isn't able to contradict the FLA - thus we can't verify the evidence
oldBlockStore := &mocks.BlockStore{}
@ -192,9 +199,9 @@ func TestVerify_ForwardLunaticAttack(t *testing.T) {
oldBlockStore.On("Height").Return(nodeHeight)
require.Equal(t, defaultEvidenceTime, oldBlockStore.LoadBlockMeta(nodeHeight).Header.Time)
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, oldBlockStore)
pool, err = evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, oldBlockStore, evidence.NopMetrics())
require.NoError(t, err)
assert.Error(t, pool.CheckEvidence(types.EvidenceList{ev}))
assert.Error(t, pool.CheckEvidence(ctx, types.EvidenceList{ev}))
}
func TestVerifyLightClientAttack_Equivocation(t *testing.T) {
@ -282,11 +289,13 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) {
blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader})
blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
require.NoError(t, setupEventBus(ctx, pool))
evList := types.EvidenceList{ev}
err = pool.CheckEvidence(evList)
err = pool.CheckEvidence(ctx, evList)
assert.NoError(t, err)
pendingEvs, _ := pool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes)
@ -369,11 +378,13 @@ func TestVerifyLightClientAttack_Amnesia(t *testing.T) {
blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader})
blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
require.NoError(t, setupEventBus(ctx, pool))
evList := types.EvidenceList{ev}
err = pool.CheckEvidence(evList)
err = pool.CheckEvidence(ctx, evList)
assert.NoError(t, err)
pendingEvs, _ := pool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes)
@ -467,21 +478,23 @@ func TestVerifyDuplicateVoteEvidence(t *testing.T) {
blockStore := &mocks.BlockStore{}
blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: types.Header{Time: defaultEvidenceTime}})
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
require.NoError(t, setupEventBus(ctx, pool))
evList := types.EvidenceList{goodEv}
err = pool.CheckEvidence(evList)
err = pool.CheckEvidence(ctx, evList)
assert.NoError(t, err)
// evidence with a different validator power should fail
evList = types.EvidenceList{badEv}
err = pool.CheckEvidence(evList)
err = pool.CheckEvidence(ctx, evList)
assert.Error(t, err)
// evidence with a different timestamp should fail
evList = types.EvidenceList{badTimeEv}
err = pool.CheckEvidence(evList)
err = pool.CheckEvidence(ctx, evList)
assert.Error(t, err)
}


+ 1
- 1
internal/rpc/core/evidence.go View File

@ -19,7 +19,7 @@ func (env *Environment) BroadcastEvidence(
if err := ev.Value.ValidateBasic(); err != nil {
return nil, fmt.Errorf("evidence.ValidateBasic failed: %w", err)
}
if err := env.EvidencePool.AddEvidence(ev.Value); err != nil {
if err := env.EvidencePool.AddEvidence(ctx, ev.Value); err != nil {
return nil, fmt.Errorf("failed to add evidence: %w", err)
}
return &coretypes.ResultBroadcastEvidence{Hash: ev.Value.Hash()}, nil


+ 4
- 4
internal/state/execution.go View File

@ -174,7 +174,7 @@ func (blockExec *BlockExecutor) ProcessProposal(
// If the block is invalid, it returns an error.
// Validation does not mutate state, but does require historical information from the stateDB,
// ie. to verify evidence from a validator at an old height.
func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) error {
func (blockExec *BlockExecutor) ValidateBlock(ctx context.Context, state State, block *types.Block) error {
hash := block.Hash()
if _, ok := blockExec.cache[hash.String()]; ok {
return nil
@ -185,7 +185,7 @@ func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) e
return err
}
err = blockExec.evpool.CheckEvidence(block.Evidence)
err = blockExec.evpool.CheckEvidence(ctx, block.Evidence)
if err != nil {
return err
}
@ -208,7 +208,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
) (State, error) {
// validate the block if we haven't already
if err := blockExec.ValidateBlock(state, block); err != nil {
if err := blockExec.ValidateBlock(ctx, state, block); err != nil {
return state, ErrInvalidBlock(err)
}
@ -255,7 +255,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
}
// Update evpool with the latest state.
blockExec.evpool.Update(state, block.Evidence)
blockExec.evpool.Update(ctx, state, block.Evidence)
// Update the app hash and save the state.
state.AppHash = appHash


+ 2
- 2
internal/state/execution_test.go View File

@ -216,8 +216,8 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
evpool := &mocks.EvidencePool{}
evpool.On("PendingEvidence", mock.AnythingOfType("int64")).Return(ev, int64(100))
evpool.On("Update", mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return()
evpool.On("CheckEvidence", mock.AnythingOfType("types.EvidenceList")).Return(nil)
evpool.On("Update", ctx, mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return()
evpool.On("CheckEvidence", ctx, mock.AnythingOfType("types.EvidenceList")).Return(nil)
blockStore := store.NewBlockStore(dbm.NewMemDB())


+ 1
- 1
internal/state/helpers_test.go View File

@ -75,7 +75,7 @@ func makeAndApplyGoodBlock(
block, _, err := state.MakeBlock(height, factory.MakeTenTxs(height), lastCommit, evidence, proposerAddr)
require.NoError(t, err)
require.NoError(t, blockExec.ValidateBlock(state, block))
require.NoError(t, blockExec.ValidateBlock(ctx, state, block))
blockID := types.BlockID{Hash: block.Hash(),
PartSetHeader: types.PartSetHeader{Total: 3, Hash: tmrand.Bytes(32)}}
state, err = blockExec.ApplyBlock(ctx, state, blockID, block)


+ 16
- 13
internal/state/mocks/evidence_pool.go View File

@ -3,8 +3,11 @@
package mocks
import (
context "context"
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/internal/state"
types "github.com/tendermint/tendermint/types"
)
@ -13,13 +16,13 @@ type EvidencePool struct {
mock.Mock
}
// AddEvidence provides a mock function with given fields: _a0
func (_m *EvidencePool) AddEvidence(_a0 types.Evidence) error {
ret := _m.Called(_a0)
// AddEvidence provides a mock function with given fields: _a0, _a1
func (_m *EvidencePool) AddEvidence(_a0 context.Context, _a1 types.Evidence) error {
ret := _m.Called(_a0, _a1)
var r0 error
if rf, ok := ret.Get(0).(func(types.Evidence) error); ok {
r0 = rf(_a0)
if rf, ok := ret.Get(0).(func(context.Context, types.Evidence) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}
@ -27,13 +30,13 @@ func (_m *EvidencePool) AddEvidence(_a0 types.Evidence) error {
return r0
}
// CheckEvidence provides a mock function with given fields: _a0
func (_m *EvidencePool) CheckEvidence(_a0 types.EvidenceList) error {
ret := _m.Called(_a0)
// CheckEvidence provides a mock function with given fields: _a0, _a1
func (_m *EvidencePool) CheckEvidence(_a0 context.Context, _a1 types.EvidenceList) error {
ret := _m.Called(_a0, _a1)
var r0 error
if rf, ok := ret.Get(0).(func(types.EvidenceList) error); ok {
r0 = rf(_a0)
if rf, ok := ret.Get(0).(func(context.Context, types.EvidenceList) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}
@ -64,7 +67,7 @@ func (_m *EvidencePool) PendingEvidence(maxBytes int64) ([]types.Evidence, int64
return r0, r1
}
// Update provides a mock function with given fields: _a0, _a1
func (_m *EvidencePool) Update(_a0 state.State, _a1 types.EvidenceList) {
_m.Called(_a0, _a1)
// Update provides a mock function with given fields: _a0, _a1, _a2
func (_m *EvidencePool) Update(_a0 context.Context, _a1 state.State, _a2 types.EvidenceList) {
_m.Called(_a0, _a1, _a2)
}

+ 10
- 6
internal/state/services.go View File

@ -1,6 +1,8 @@
package state
import (
"context"
"github.com/tendermint/tendermint/types"
)
@ -44,9 +46,9 @@ type BlockStore interface {
// EvidencePool defines the EvidencePool interface used by State.
type EvidencePool interface {
PendingEvidence(maxBytes int64) (ev []types.Evidence, size int64)
AddEvidence(types.Evidence) error
Update(State, types.EvidenceList)
CheckEvidence(types.EvidenceList) error
AddEvidence(context.Context, types.Evidence) error
Update(context.Context, State, types.EvidenceList)
CheckEvidence(context.Context, types.EvidenceList) error
}
// EmptyEvidencePool is an empty implementation of EvidencePool, useful for testing. It also complies
@ -56,7 +58,9 @@ type EmptyEvidencePool struct{}
func (EmptyEvidencePool) PendingEvidence(maxBytes int64) (ev []types.Evidence, size int64) {
return nil, 0
}
func (EmptyEvidencePool) AddEvidence(types.Evidence) error { return nil }
func (EmptyEvidencePool) Update(State, types.EvidenceList) {}
func (EmptyEvidencePool) CheckEvidence(evList types.EvidenceList) error { return nil }
func (EmptyEvidencePool) AddEvidence(context.Context, types.Evidence) error { return nil }
func (EmptyEvidencePool) Update(context.Context, State, types.EvidenceList) {}
func (EmptyEvidencePool) CheckEvidence(ctx context.Context, evList types.EvidenceList) error {
return nil
}
func (EmptyEvidencePool) ReportConflictingVotes(voteA, voteB *types.Vote) {}

+ 7
- 7
internal/state/validation_test.go View File

@ -94,7 +94,7 @@ func TestValidateBlockHeader(t *testing.T) {
block, err := statefactory.MakeBlock(state, height, lastCommit)
require.NoError(t, err)
tc.malleateBlock(block)
err = blockExec.ValidateBlock(state, block)
err = blockExec.ValidateBlock(ctx, state, block)
t.Logf("%s: %v", tc.name, err)
require.Error(t, err, tc.name)
}
@ -110,7 +110,7 @@ func TestValidateBlockHeader(t *testing.T) {
block, err := statefactory.MakeBlock(state, nextHeight, lastCommit)
require.NoError(t, err)
state.InitialHeight = nextHeight + 1
err = blockExec.ValidateBlock(state, block)
err = blockExec.ValidateBlock(ctx, state, block)
require.Error(t, err, "expected an error when state is ahead of block")
assert.Contains(t, err.Error(), "lower than initial height")
}
@ -164,7 +164,7 @@ func TestValidateBlockCommit(t *testing.T) {
)
block, err := statefactory.MakeBlock(state, height, wrongHeightCommit)
require.NoError(t, err)
err = blockExec.ValidateBlock(state, block)
err = blockExec.ValidateBlock(ctx, state, block)
_, isErrInvalidCommitHeight := err.(types.ErrInvalidCommitHeight)
require.True(t, isErrInvalidCommitHeight, "expected ErrInvalidCommitHeight at height %d but got: %v", height, err)
@ -173,7 +173,7 @@ func TestValidateBlockCommit(t *testing.T) {
*/
block, err = statefactory.MakeBlock(state, height, wrongSigsCommit)
require.NoError(t, err)
err = blockExec.ValidateBlock(state, block)
err = blockExec.ValidateBlock(ctx, state, block)
_, isErrInvalidCommitSignatures := err.(types.ErrInvalidCommitSignatures)
require.True(t, isErrInvalidCommitSignatures,
"expected ErrInvalidCommitSignatures at height %d, but got: %v",
@ -254,8 +254,8 @@ func TestValidateBlockEvidence(t *testing.T) {
defaultEvidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
evpool := &mocks.EvidencePool{}
evpool.On("CheckEvidence", mock.AnythingOfType("types.EvidenceList")).Return(nil)
evpool.On("Update", mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return()
evpool.On("CheckEvidence", ctx, mock.AnythingOfType("types.EvidenceList")).Return(nil)
evpool.On("Update", ctx, mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return()
evpool.On("ABCIEvidence", mock.AnythingOfType("int64"), mock.AnythingOfType("[]types.Evidence")).Return(
[]abci.Evidence{})
@ -290,7 +290,7 @@ func TestValidateBlockEvidence(t *testing.T) {
block, _, err := state.MakeBlock(height, testfactory.MakeTenTxs(height), lastCommit, evidence, proposerAddr)
require.NoError(t, err)
err = blockExec.ValidateBlock(state, block)
err = blockExec.ValidateBlock(ctx, state, block)
if assert.Error(t, err) {
_, ok := err.(*types.ErrEvidenceOverflow)
require.True(t, ok, "expected error to be of type ErrEvidenceOverflow at height %d but got %v", height, err)


+ 5
- 1
node/node.go View File

@ -20,6 +20,7 @@ import (
"github.com/tendermint/tendermint/internal/blocksync"
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/pex"
@ -264,7 +265,7 @@ func makeNode(
}
evReactor, evPool, err := createEvidenceReactor(ctx,
cfg, dbProvider, stateDB, blockStore, peerManager, router, logger,
cfg, dbProvider, stateDB, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
@ -689,6 +690,7 @@ type nodeMetrics struct {
proxy *proxy.Metrics
state *sm.Metrics
statesync *statesync.Metrics
evidence *evidence.Metrics
}
// metricsProvider returns consensus, p2p, mempool, state, statesync Metrics.
@ -707,6 +709,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider {
proxy: proxy.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
state: sm.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
statesync: statesync.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
evidence: evidence.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
}
}
return &nodeMetrics{
@ -717,6 +720,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider {
proxy: proxy.NopMetrics(),
state: sm.NopMetrics(),
statesync: statesync.NopMetrics(),
evidence: evidence.NopMetrics(),
}
}
}


+ 2
- 2
node/node_test.go View File

@ -298,7 +298,7 @@ func TestCreateProposalBlock(t *testing.T) {
// Make EvidencePool
evidenceDB := dbm.NewMemDB()
blockStore := store.NewBlockStore(dbm.NewMemDB())
evidencePool, err := evidence.NewPool(logger, evidenceDB, stateStore, blockStore)
evidencePool, err := evidence.NewPool(logger, evidenceDB, stateStore, blockStore, evidence.NopMetrics())
require.NoError(t, err)
// fill the evidence pool with more evidence
@ -357,7 +357,7 @@ func TestCreateProposalBlock(t *testing.T) {
}
assert.EqualValues(t, partSetFromHeader.ByteSize(), partSet.ByteSize())
err = blockExec.ValidateBlock(state, block)
err = blockExec.ValidateBlock(ctx, state, block)
assert.NoError(t, err)
}


+ 5
- 2
node/setup.go View File

@ -219,6 +219,8 @@ func createEvidenceReactor(
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
metrics *evidence.Metrics,
eventBus *eventbus.EventBus,
) (*evidence.Reactor, *evidence.Pool, error) {
evidenceDB, err := dbProvider(&config.DBContext{ID: "evidence", Config: cfg})
if err != nil {
@ -227,11 +229,13 @@ func createEvidenceReactor(
logger = logger.With("module", "evidence")
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore)
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore, metrics)
if err != nil {
return nil, nil, fmt.Errorf("creating evidence pool: %w", err)
}
evidencePool.SetEventBus(eventBus)
evidenceReactor, err := evidence.NewReactor(
ctx,
logger,
@ -295,7 +299,6 @@ func createConsensusReactor(
// Services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor.
reactor.SetEventBus(eventBus)
return reactor, consensusState, nil
}


+ 15
- 0
types/events.go View File

@ -40,6 +40,10 @@ const (
EventTimeoutWaitValue = "TimeoutWait"
EventValidBlockValue = "ValidBlock"
EventVoteValue = "Vote"
// Events emitted by the evidence reactor when evidence is validated
// and before it is committed
EventEvidenceValidatedValue = "EvidenceValidated"
)
// Pre-populated ABCI Tendermint-reserved events
@ -104,6 +108,7 @@ func init() {
jsontypes.MustRegister(EventDataTx{})
jsontypes.MustRegister(EventDataValidatorSetUpdates{})
jsontypes.MustRegister(EventDataVote{})
jsontypes.MustRegister(EventDataEvidenceValidated{})
jsontypes.MustRegister(EventDataString(""))
}
@ -223,6 +228,15 @@ type EventDataStateSyncStatus struct {
// TypeTag implements the required method of jsontypes.Tagged.
func (EventDataStateSyncStatus) TypeTag() string { return "tendermint/event/StateSyncStatus" }
type EventDataEvidenceValidated struct {
Evidence Evidence `json:"evidence"`
Height int64 `json:"height,string"`
}
// TypeTag implements the required method of jsontypes.Tagged.
func (EventDataEvidenceValidated) TypeTag() string { return "tendermint/event/EvidenceValidated" }
// PUBSUB
const (
@ -261,6 +275,7 @@ var (
EventQueryVote = QueryForEvent(EventVoteValue)
EventQueryBlockSyncStatus = QueryForEvent(EventBlockSyncStatusValue)
EventQueryStateSyncStatus = QueryForEvent(EventStateSyncStatusValue)
EventQueryEvidenceValidated = QueryForEvent(EventEvidenceValidatedValue)
)
func EventQueryTxFor(tx Tx) *tmquery.Query {


Loading…
Cancel
Save