Browse Source

evidence: p2p refactor (#5747)

pull/5878/head
Aleksandr Bezobchuk 4 years ago
committed by GitHub
parent
commit
e986602649
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1290 additions and 725 deletions
  1. +1
    -2
      consensus/byzantine_test.go
  2. +96
    -70
      evidence/pool.go
  3. +122
    -96
      evidence/pool_test.go
  4. +313
    -182
      evidence/reactor.go
  5. +508
    -253
      evidence/reactor_test.go
  6. +84
    -27
      evidence/verify.go
  7. +4
    -7
      evidence/verify_test.go
  8. +54
    -26
      node/node.go
  9. +1
    -2
      node/node_test.go
  10. +23
    -7
      p2p/key.go
  11. +9
    -13
      p2p/key_test.go
  12. +8
    -4
      p2p/netaddress.go
  13. +2
    -2
      p2p/peer.go
  14. +4
    -2
      p2p/shim.go
  15. +2
    -1
      p2p/shim_test.go
  16. +5
    -4
      statesync/reactor.go
  17. +1
    -1
      statesync/reactor_test.go
  18. +53
    -26
      test/maverick/node/node.go

+ 1
- 2
consensus/byzantine_test.go View File

@ -72,9 +72,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// Make a full instance of the evidence pool
evidenceDB := dbm.NewMemDB()
evpool, err := evidence.NewPool(evidenceDB, stateStore, blockStore)
evpool, err := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore)
require.NoError(t, err)
evpool.SetLogger(logger.With("module", "evidence"))
// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)


+ 96
- 70
evidence/pool.go View File

@ -50,30 +50,31 @@ type Pool struct {
// NewPool creates an evidence pool. If using an existing evidence store,
// it will add all pending evidence to the concurrent list.
func NewPool(evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore) (*Pool, error) {
func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore) (*Pool, error) {
state, err := stateDB.Load()
if err != nil {
return nil, fmt.Errorf("cannot load state: %w", err)
return nil, fmt.Errorf("failed to load state: %w", err)
}
pool := &Pool{
stateDB: stateDB,
blockStore: blockStore,
state: state,
logger: log.NewNopLogger(),
logger: logger,
evidenceStore: evidenceDB,
evidenceList: clist.New(),
}
// if pending evidence already in db, in event of prior failure, then check for expiration,
// update the size and load it back to the evidenceList
// If pending evidence already in db, in event of prior failure, then check
// for expiration, update the size and load it back to the evidenceList.
pool.pruningHeight, pool.pruningTime = pool.removeExpiredPendingEvidence()
evList, _, err := pool.listEvidence(prefixPending, -1)
if err != nil {
return nil, err
}
atomic.StoreUint32(&pool.evidenceSize, uint32(len(evList)))
for _, ev := range evList {
pool.evidenceList.PushBack(ev)
}
@ -81,37 +82,44 @@ func NewPool(evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore) (*Pool,
return pool, nil
}
// PendingEvidence is used primarily as part of block proposal and returns up to maxNum of uncommitted evidence.
// PendingEvidence is used primarily as part of block proposal and returns up to
// maxNum of uncommitted evidence.
func (evpool *Pool) PendingEvidence(maxBytes int64) ([]types.Evidence, int64) {
if evpool.Size() == 0 {
return []types.Evidence{}, 0
}
evidence, size, err := evpool.listEvidence(prefixPending, maxBytes)
if err != nil {
evpool.logger.Error("Unable to retrieve pending evidence", "err", err)
evpool.logger.Error("failed to retrieve pending evidence", "err", err)
}
return evidence, size
}
// Update pulls the latest state to be used for expiration and evidence params and then prunes all expired evidence
// Update pulls the latest state to be used for expiration and evidence params
// and then prunes all expired evidence.
func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) {
// sanity check
if state.LastBlockHeight <= evpool.state.LastBlockHeight {
panic(fmt.Sprintf(
"Failed EvidencePool.Update new state height is less than or equal to previous state height: %d <= %d",
"failed EvidencePool.Update new state height is less than or equal to previous state height: %d <= %d",
state.LastBlockHeight,
evpool.state.LastBlockHeight,
))
}
evpool.logger.Info("Updating evidence pool", "last_block_height", state.LastBlockHeight,
"last_block_time", state.LastBlockTime)
// update the state
evpool.updateState(state)
evpool.logger.Info(
"updating evidence pool",
"last_block_height", state.LastBlockHeight,
"last_block_time", state.LastBlockTime,
)
evpool.updateState(state)
evpool.markEvidenceAsCommitted(ev)
// prune pending evidence when it has expired. This also updates when the next evidence will expire
// Prune pending evidence when it has expired. This also updates when the next
// evidence will expire.
if evpool.Size() > 0 && state.LastBlockHeight > evpool.pruningHeight &&
state.LastBlockTime.After(evpool.pruningTime) {
evpool.pruningHeight, evpool.pruningTime = evpool.removeExpiredPendingEvidence()
@ -120,59 +128,56 @@ func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) {
// AddEvidence checks the evidence is valid and adds it to the pool.
func (evpool *Pool) AddEvidence(ev types.Evidence) error {
evpool.logger.Debug("Attempting to add evidence", "ev", ev)
evpool.logger.Debug("attempting to add evidence", "evidence", ev)
// We have already verified this piece of evidence - no need to do it again
if evpool.isPending(ev) {
evpool.logger.Info("Evidence already pending, ignoring this one", "ev", ev)
evpool.logger.Info("evidence already pending; ignoring", "evidence", ev)
return nil
}
// check that the evidence isn't already committed
if evpool.isCommitted(ev) {
// this can happen if the peer that sent us the evidence is behind so we shouldn't
// punish the peer.
evpool.logger.Debug("Evidence was already committed, ignoring this one", "ev", ev)
// This can happen if the peer that sent us the evidence is behind so we
// shouldn't punish the peer.
evpool.logger.Debug("evidence was already committed; ignoring", "evidence", ev)
return nil
}
// 1) Verify against state.
err := evpool.verify(ev)
if err != nil {
return types.NewErrInvalidEvidence(ev, err)
if err := evpool.verify(ev); err != nil {
return err
}
// 2) Save to store.
if err := evpool.addPendingEvidence(ev); err != nil {
return fmt.Errorf("can't add evidence to pending list: %w", err)
return fmt.Errorf("failed to add evidence to pending list: %w", err)
}
// 3) Add evidence to clist.
evpool.evidenceList.PushBack(ev)
evpool.logger.Info("Verified new evidence of byzantine behavior", "evidence", ev)
evpool.logger.Info("verified new evidence of byzantine behavior", "evidence", ev)
return nil
}
// AddEvidenceFromConsensus should be exposed only to the consensus reactor so it can add evidence
// to the pool directly without the need for verification.
// AddEvidenceFromConsensus should be exposed only to the consensus reactor so
// it can add evidence to the pool directly without the need for verification.
func (evpool *Pool) AddEvidenceFromConsensus(ev types.Evidence) error {
// we already have this evidence, log this but don't return an error.
if evpool.isPending(ev) {
evpool.logger.Info("Evidence already pending, ignoring this one", "ev", ev)
evpool.logger.Info("evidence already pending; ignoring", "evidence", ev)
return nil
}
if err := evpool.addPendingEvidence(ev); err != nil {
return fmt.Errorf("can't add evidence to pending list: %w", err)
return fmt.Errorf("failed to add evidence to pending list: %w", err)
}
// add evidence to be gossiped with peers
evpool.evidenceList.PushBack(ev)
evpool.logger.Info("Verified new evidence of byzantine behavior", "evidence", ev)
evpool.logger.Info("verified new evidence of byzantine behavior", "evidence", ev)
return nil
}
@ -200,10 +205,10 @@ func (evpool *Pool) CheckEvidence(evList types.EvidenceList) error {
if err := evpool.addPendingEvidence(ev); err != nil {
// Something went wrong with adding the evidence but we already know it is valid
// hence we log an error and continue
evpool.logger.Error("Can't add evidence to pending list", "err", err, "ev", ev)
evpool.logger.Error("failed to add evidence to pending list", "err", err, "evidence", ev)
}
evpool.logger.Info("Verified new evidence of byzantine behavior", "evidence", ev)
evpool.logger.Info("verified new evidence of byzantine behavior", "evidence", ev)
}
// check for duplicate evidence. We cache hashes so we don't have to work them out again.
@ -223,16 +228,12 @@ func (evpool *Pool) EvidenceFront() *clist.CElement {
return evpool.evidenceList.Front()
}
// EvidenceWaitChan is a channel that closes once the first evidence in the list is there. i.e Front is not nil
// EvidenceWaitChan is a channel that closes once the first evidence in the list
// is there. i.e Front is not nil.
func (evpool *Pool) EvidenceWaitChan() <-chan struct{} {
return evpool.evidenceList.WaitChan()
}
// SetLogger sets the Logger.
func (evpool *Pool) SetLogger(l log.Logger) {
evpool.logger = l
}
// Size returns the number of evidence in the pool.
func (evpool *Pool) Size() uint32 {
return atomic.LoadUint32(&evpool.evidenceSize)
@ -245,10 +246,9 @@ func (evpool *Pool) State() sm.State {
return evpool.state
}
//--------------------------------------------------------------------------
// fastCheck leverages the fact that the evidence pool may have already verified the evidence to see if it can
// quickly conclude that the evidence is already valid.
// fastCheck leverages the fact that the evidence pool may have already verified
// the evidence to see if it can quickly conclude that the evidence is already
// valid.
func (evpool *Pool) fastCheck(ev types.Evidence) bool {
if lcae, ok := ev.(*types.LightClientAttackEvidence); ok {
key := keyPending(ev)
@ -256,25 +256,35 @@ func (evpool *Pool) fastCheck(ev types.Evidence) bool {
if evBytes == nil { // the evidence is not in the nodes pending list
return false
}
if err != nil {
evpool.logger.Error("Failed to load light client attack evidence", "err", err, "key(height/hash)", key)
evpool.logger.Error("failed to load light client attack evidence", "err", err, "key(height/hash)", key)
return false
}
var trustedPb tmproto.LightClientAttackEvidence
err = trustedPb.Unmarshal(evBytes)
if err != nil {
evpool.logger.Error("Failed to convert light client attack evidence from bytes",
"err", err, "key(height/hash)", key)
if err = trustedPb.Unmarshal(evBytes); err != nil {
evpool.logger.Error(
"failed to convert light client attack evidence from bytes",
"key(height/hash)", key,
"err", err,
)
return false
}
trustedEv, err := types.LightClientAttackEvidenceFromProto(&trustedPb)
if err != nil {
evpool.logger.Error("Failed to convert light client attack evidence from protobuf",
"err", err, "key(height/hash)", key)
evpool.logger.Error(
"failed to convert light client attack evidence from protobuf",
"key(height/hash)", key,
"err", err,
)
return false
}
// ensure that all the byzantine validators that the evidence pool has match the byzantine validators
// in this evidence
// Ensure that all the byzantine validators that the evidence pool has match
// the byzantine validators in this evidence.
if trustedEv.ByzantineValidators == nil && lcae.ByzantineValidators != nil {
return false
}
@ -303,7 +313,8 @@ func (evpool *Pool) fastCheck(ev types.Evidence) bool {
return true
}
// for all other evidence the evidence pool just checks if it is already in the pending db
// For all other evidence the evidence pool just checks if it is already in
// the pending db.
return evpool.isPending(ev)
}
@ -324,7 +335,7 @@ func (evpool *Pool) isCommitted(evidence types.Evidence) bool {
key := keyCommitted(evidence)
ok, err := evpool.evidenceStore.Has(key)
if err != nil {
evpool.logger.Error("Unable to find committed evidence", "err", err)
evpool.logger.Error("failed to find committed evidence", "err", err)
}
return ok
}
@ -334,7 +345,7 @@ func (evpool *Pool) isPending(evidence types.Evidence) bool {
key := keyPending(evidence)
ok, err := evpool.evidenceStore.Has(key)
if err != nil {
evpool.logger.Error("Unable to find pending evidence", "err", err)
evpool.logger.Error("failed to find pending evidence", "err", err)
}
return ok
}
@ -342,20 +353,21 @@ func (evpool *Pool) isPending(evidence types.Evidence) bool {
func (evpool *Pool) addPendingEvidence(ev types.Evidence) error {
evpb, err := types.EvidenceToProto(ev)
if err != nil {
return fmt.Errorf("unable to convert to proto, err: %w", err)
return fmt.Errorf("failed to convert to proto: %w", err)
}
evBytes, err := evpb.Marshal()
if err != nil {
return fmt.Errorf("unable to marshal evidence: %w", err)
return fmt.Errorf("failed to marshal evidence: %w", err)
}
key := keyPending(ev)
err = evpool.evidenceStore.Set(key, evBytes)
if err != nil {
return fmt.Errorf("can't persist evidence: %w", err)
return fmt.Errorf("failed to persist evidence: %w", err)
}
atomic.AddUint32(&evpool.evidenceSize, 1)
return nil
}
@ -363,10 +375,10 @@ func (evpool *Pool) addPendingEvidence(ev types.Evidence) error {
func (evpool *Pool) removePendingEvidence(evidence types.Evidence) {
key := keyPending(evidence)
if err := evpool.evidenceStore.Delete(key); err != nil {
evpool.logger.Error("Unable to delete pending evidence", "err", err)
evpool.logger.Error("failed to delete pending evidence", "err", err)
} else {
atomic.AddUint32(&evpool.evidenceSize, ^uint32(0))
evpool.logger.Info("Deleted pending evidence", "evidence", evidence)
evpool.logger.Info("deleted pending evidence", "evidence", evidence)
}
}
@ -387,13 +399,15 @@ func (evpool *Pool) markEvidenceAsCommitted(evidence types.EvidenceList) {
h := gogotypes.Int64Value{Value: ev.Height()}
evBytes, err := proto.Marshal(&h)
if err != nil {
evpool.logger.Error("failed to marshal committed evidence", "err", err, "key(height/hash)", key)
evpool.logger.Error("failed to marshal committed evidence", "key(height/hash)", key, "err", err)
continue
}
if err := evpool.evidenceStore.Set(key, evBytes); err != nil {
evpool.logger.Error("Unable to save committed evidence", "err", err, "key(height/hash)", key)
evpool.logger.Error("failed to save committed evidence", "key(height/hash)", key, "err", err)
}
evpool.logger.Info("marked evidence as committed", "evidence", ev)
}
// remove committed evidence from the clist
@ -416,16 +430,19 @@ func (evpool *Pool) listEvidence(prefixKey int64, maxBytes int64) ([]types.Evide
if err != nil {
return nil, totalSize, fmt.Errorf("database error: %v", err)
}
defer iter.Close()
for ; iter.Valid(); iter.Next() {
var evpb tmproto.Evidence
err := evpb.Unmarshal(iter.Value())
if err != nil {
if err := evpb.Unmarshal(iter.Value()); err != nil {
return evidence, totalSize, err
}
evList.Evidence = append(evList.Evidence, evpb)
evSize = int64(evList.Size())
if maxBytes != -1 && evSize > maxBytes {
if err := iter.Error(); err != nil {
return evidence, totalSize, err
@ -445,39 +462,48 @@ func (evpool *Pool) listEvidence(prefixKey int64, maxBytes int64) ([]types.Evide
if err := iter.Error(); err != nil {
return evidence, totalSize, err
}
return evidence, totalSize, nil
}
func (evpool *Pool) removeExpiredPendingEvidence() (int64, time.Time) {
iter, err := dbm.IteratePrefix(evpool.evidenceStore, prefixToBytes(prefixPending))
if err != nil {
evpool.logger.Error("Unable to iterate over pending evidence", "err", err)
evpool.logger.Error("failed to iterate over pending evidence", "err", err)
return evpool.State().LastBlockHeight, evpool.State().LastBlockTime
}
defer iter.Close()
blockEvidenceMap := make(map[string]struct{})
for ; iter.Valid(); iter.Next() {
ev, err := bytesToEv(iter.Value())
if err != nil {
evpool.logger.Error("Error in transition evidence from protobuf", "err", err)
evpool.logger.Error("failed to transition evidence from protobuf", "err", err)
continue
}
if !evpool.isExpired(ev.Height(), ev.Time()) {
if len(blockEvidenceMap) != 0 {
evpool.removeEvidenceFromList(blockEvidenceMap)
}
// return the height and time with which this evidence will have expired so we know when to prune next
// Return the height and time with which this evidence will have expired
// so we know when to prune next.
return ev.Height() + evpool.State().ConsensusParams.Evidence.MaxAgeNumBlocks + 1,
ev.Time().Add(evpool.State().ConsensusParams.Evidence.MaxAgeDuration).Add(time.Second)
}
evpool.removePendingEvidence(ev)
blockEvidenceMap[evMapKey(ev)] = struct{}{}
}
// We either have no pending evidence or all evidence has expired
// we either have no pending evidence or all evidence has expired
if len(blockEvidenceMap) != 0 {
evpool.removeEvidenceFromList(blockEvidenceMap)
}
return evpool.State().LastBlockHeight, evpool.State().LastBlockTime
}


+ 122
- 96
evidence/pool_test.go View File

@ -1,7 +1,6 @@
package evidence_test
import (
"os"
"testing"
"time"
@ -23,12 +22,6 @@ import (
"github.com/tendermint/tendermint/version"
)
func TestMain(m *testing.M) {
code := m.Run()
os.Exit(code)
}
const evidenceChainID = "test_chain"
var (
@ -52,14 +45,13 @@ func TestEvidencePoolBasic(t *testing.T) {
stateStore.On("LoadValidators", mock.AnythingOfType("int64")).Return(valSet, nil)
stateStore.On("Load").Return(createState(height+1, valSet), nil)
pool, err := evidence.NewPool(evidenceDB, stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore)
require.NoError(t, err)
pool.SetLogger(log.TestingLogger())
// evidence not seen yet:
evs, size := pool.PendingEvidence(defaultEvidenceMaxBytes)
assert.Equal(t, 0, len(evs))
assert.Zero(t, size)
require.Equal(t, 0, len(evs))
require.Zero(t, size)
ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime, privVals[0], evidenceChainID)
@ -71,7 +63,7 @@ func TestEvidencePoolBasic(t *testing.T) {
}()
// evidence seen but not yet committed:
assert.NoError(t, pool.AddEvidence(ev))
require.NoError(t, pool.AddEvidence(ev))
select {
case <-evAdded:
@ -80,18 +72,17 @@ func TestEvidencePoolBasic(t *testing.T) {
}
next := pool.EvidenceFront()
assert.Equal(t, ev, next.Value.(types.Evidence))
require.Equal(t, ev, next.Value.(types.Evidence))
const evidenceBytes int64 = 372
evs, size = pool.PendingEvidence(evidenceBytes)
assert.Equal(t, 1, len(evs))
assert.Equal(t, evidenceBytes, size) // check that the size of the single evidence in bytes is correct
require.Equal(t, 1, len(evs))
require.Equal(t, evidenceBytes, size) // check that the size of the single evidence in bytes is correct
// shouldn't be able to add evidence twice
assert.NoError(t, pool.AddEvidence(ev))
require.NoError(t, pool.AddEvidence(ev))
evs, _ = pool.PendingEvidence(defaultEvidenceMaxBytes)
assert.Equal(t, 1, len(evs))
require.Equal(t, 1, len(evs))
}
// Tests inbound evidence for the right time and height
@ -99,7 +90,7 @@ func TestAddExpiredEvidence(t *testing.T) {
var (
val = types.NewMockPV()
height = int64(30)
stateStore = initializeValidatorState(val, height)
stateStore = initializeValidatorState(t, val, height)
evidenceDB = dbm.NewMemDB()
blockStore = &mocks.BlockStore{}
expiredEvidenceTime = time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)
@ -113,7 +104,7 @@ func TestAddExpiredEvidence(t *testing.T) {
return &types.BlockMeta{Header: types.Header{Time: expiredEvidenceTime}}
})
pool, err := evidence.NewPool(evidenceDB, stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore)
require.NoError(t, err)
testCases := []struct {
@ -132,13 +123,14 @@ func TestAddExpiredEvidence(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.evDescription, func(t *testing.T) {
ev := types.NewMockDuplicateVoteEvidenceWithValidator(tc.evHeight, tc.evTime, val, evidenceChainID)
err := pool.AddEvidence(ev)
if tc.expErr {
assert.Error(t, err)
require.Error(t, err)
} else {
assert.NoError(t, err)
require.NoError(t, err)
}
})
}
@ -146,48 +138,59 @@ func TestAddExpiredEvidence(t *testing.T) {
func TestAddEvidenceFromConsensus(t *testing.T) {
var height int64 = 10
pool, val := defaultTestPool(height)
pool, val := defaultTestPool(t, height)
ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime, val, evidenceChainID)
err := pool.AddEvidenceFromConsensus(ev)
assert.NoError(t, err)
require.NoError(t, pool.AddEvidenceFromConsensus(ev))
next := pool.EvidenceFront()
assert.Equal(t, ev, next.Value.(types.Evidence))
require.Equal(t, ev, next.Value.(types.Evidence))
// shouldn't be able to submit the same evidence twice
err = pool.AddEvidenceFromConsensus(ev)
assert.NoError(t, err)
require.NoError(t, pool.AddEvidenceFromConsensus(ev))
evs, _ := pool.PendingEvidence(defaultEvidenceMaxBytes)
assert.Equal(t, 1, len(evs))
require.Equal(t, 1, len(evs))
}
func TestEvidencePoolUpdate(t *testing.T) {
height := int64(21)
pool, val := defaultTestPool(height)
pool, val := defaultTestPool(t, height)
state := pool.State()
// create new block (no need to save it to blockStore)
prunedEv := types.NewMockDuplicateVoteEvidenceWithValidator(1, defaultEvidenceTime.Add(1*time.Minute),
val, evidenceChainID)
err := pool.AddEvidence(prunedEv)
require.NoError(t, err)
ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime.Add(21*time.Minute),
val, evidenceChainID)
prunedEv := types.NewMockDuplicateVoteEvidenceWithValidator(
1,
defaultEvidenceTime.Add(1*time.Minute),
val,
evidenceChainID,
)
require.NoError(t, pool.AddEvidence(prunedEv))
ev := types.NewMockDuplicateVoteEvidenceWithValidator(
height,
defaultEvidenceTime.Add(21*time.Minute),
val,
evidenceChainID,
)
lastCommit := makeCommit(height, val.PrivKey.PubKey().Address())
block := types.MakeBlock(height+1, []types.Tx{}, lastCommit, []types.Evidence{ev})
// update state (partially)
state.LastBlockHeight = height + 1
state.LastBlockTime = defaultEvidenceTime.Add(22 * time.Minute)
err = pool.CheckEvidence(types.EvidenceList{ev})
require.NoError(t, err)
require.NoError(t, pool.CheckEvidence(types.EvidenceList{ev}))
pool.Update(state, block.Evidence.Evidence)
// a) Update marks evidence as committed so pending evidence should be empty
evList, evSize := pool.PendingEvidence(defaultEvidenceMaxBytes)
assert.Empty(t, evList)
assert.Zero(t, evSize)
require.Empty(t, evList)
require.Zero(t, evSize)
// b) If we try to check this evidence again it should fail because it has already been committed
err = pool.CheckEvidence(types.EvidenceList{ev})
err := pool.CheckEvidence(types.EvidenceList{ev})
if assert.Error(t, err) {
assert.Equal(t, "evidence was already committed", err.(*types.ErrInvalidEvidence).Reason.Error())
}
@ -195,21 +198,29 @@ func TestEvidencePoolUpdate(t *testing.T) {
func TestVerifyPendingEvidencePasses(t *testing.T) {
var height int64 = 1
pool, val := defaultTestPool(height)
ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime.Add(1*time.Minute),
val, evidenceChainID)
err := pool.AddEvidence(ev)
require.NoError(t, err)
err = pool.CheckEvidence(types.EvidenceList{ev})
assert.NoError(t, err)
pool, val := defaultTestPool(t, height)
ev := types.NewMockDuplicateVoteEvidenceWithValidator(
height,
defaultEvidenceTime.Add(1*time.Minute),
val,
evidenceChainID,
)
require.NoError(t, pool.AddEvidence(ev))
require.NoError(t, pool.CheckEvidence(types.EvidenceList{ev}))
}
func TestVerifyDuplicatedEvidenceFails(t *testing.T) {
var height int64 = 1
pool, val := defaultTestPool(height)
ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime.Add(1*time.Minute),
val, evidenceChainID)
pool, val := defaultTestPool(t, height)
ev := types.NewMockDuplicateVoteEvidenceWithValidator(
height,
defaultEvidenceTime.Add(1*time.Minute),
val,
evidenceChainID,
)
err := pool.CheckEvidence(types.EvidenceList{ev, ev})
if assert.Error(t, err) {
assert.Equal(t, "duplicate evidence", err.(*types.ErrInvalidEvidence).Reason.Error())
@ -224,6 +235,7 @@ func TestCheckEvidenceWithLightClientAttack(t *testing.T) {
validatorPower int64 = 10
height int64 = 10
)
conflictingVals, conflictingPrivVals := types.RandValidatorSet(nValidators, validatorPower)
trustedHeader := makeHeaderRandom(height)
trustedHeader.Time = defaultEvidenceTime
@ -237,12 +249,14 @@ func TestCheckEvidenceWithLightClientAttack(t *testing.T) {
trustedHeader.AppHash = conflictingHeader.AppHash
trustedHeader.LastResultsHash = conflictingHeader.LastResultsHash
// for simplicity we are simulating a duplicate vote attack where all the validators in the
// conflictingVals set voted twice
// For simplicity we are simulating a duplicate vote attack where all the
// validators in the conflictingVals set voted twice.
blockID := makeBlockID(conflictingHeader.Hash(), 1000, []byte("partshash"))
voteSet := types.NewVoteSet(evidenceChainID, height, 1, tmproto.SignedMsgType(2), conflictingVals)
commit, err := types.MakeCommit(blockID, height, 1, voteSet, conflictingPrivVals, defaultEvidenceTime)
require.NoError(t, err)
ev := &types.LightClientAttackEvidence{
ConflictingBlock: &types.LightBlock{
SignedHeader: &types.SignedHeader{
@ -259,8 +273,14 @@ func TestCheckEvidenceWithLightClientAttack(t *testing.T) {
trustedBlockID := makeBlockID(trustedHeader.Hash(), 1000, []byte("partshash"))
trustedVoteSet := types.NewVoteSet(evidenceChainID, height, 1, tmproto.SignedMsgType(2), conflictingVals)
trustedCommit, err := types.MakeCommit(trustedBlockID, height, 1, trustedVoteSet, conflictingPrivVals,
defaultEvidenceTime)
trustedCommit, err := types.MakeCommit(
trustedBlockID,
height,
1,
trustedVoteSet,
conflictingPrivVals,
defaultEvidenceTime,
)
require.NoError(t, err)
state := sm.State{
@ -268,28 +288,25 @@ func TestCheckEvidenceWithLightClientAttack(t *testing.T) {
LastBlockHeight: 11,
ConsensusParams: *types.DefaultConsensusParams(),
}
stateStore := &smmocks.Store{}
stateStore.On("LoadValidators", height).Return(conflictingVals, nil)
stateStore.On("Load").Return(state, nil)
blockStore := &mocks.BlockStore{}
blockStore.On("LoadBlockMeta", height).Return(&types.BlockMeta{Header: *trustedHeader})
blockStore.On("LoadBlockCommit", height).Return(trustedCommit)
pool, err := evidence.NewPool(dbm.NewMemDB(), stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
require.NoError(t, err)
pool.SetLogger(log.TestingLogger())
err = pool.AddEvidence(ev)
assert.NoError(t, err)
require.NoError(t, pool.AddEvidence(ev))
require.NoError(t, pool.CheckEvidence(types.EvidenceList{ev}))
err = pool.CheckEvidence(types.EvidenceList{ev})
assert.NoError(t, err)
// take away the last signature -> there are less validators then what we have detected,
// hence this should fail
// Take away the last signature -> there are less validators then what we have detected,
// hence this should fail.
commit.Signatures = append(commit.Signatures[:nValidators-1], types.NewCommitSigAbsent())
err = pool.CheckEvidence(types.EvidenceList{ev})
assert.Error(t, err)
require.Error(t, pool.CheckEvidence(types.EvidenceList{ev}))
}
// Tests that restarting the evidence pool after a potential failure will recover the
@ -299,23 +316,33 @@ func TestRecoverPendingEvidence(t *testing.T) {
val := types.NewMockPV()
valAddress := val.PrivKey.PubKey().Address()
evidenceDB := dbm.NewMemDB()
stateStore := initializeValidatorState(val, height)
stateStore := initializeValidatorState(t, val, height)
state, err := stateStore.Load()
require.NoError(t, err)
blockStore := initializeBlockStore(dbm.NewMemDB(), state, valAddress)
// create previous pool and populate it
pool, err := evidence.NewPool(evidenceDB, stateStore, blockStore)
require.NoError(t, err)
pool.SetLogger(log.TestingLogger())
goodEvidence := types.NewMockDuplicateVoteEvidenceWithValidator(height,
defaultEvidenceTime.Add(10*time.Minute), val, evidenceChainID)
expiredEvidence := types.NewMockDuplicateVoteEvidenceWithValidator(int64(1),
defaultEvidenceTime.Add(1*time.Minute), val, evidenceChainID)
err = pool.AddEvidence(goodEvidence)
require.NoError(t, err)
err = pool.AddEvidence(expiredEvidence)
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore)
require.NoError(t, err)
goodEvidence := types.NewMockDuplicateVoteEvidenceWithValidator(
height,
defaultEvidenceTime.Add(10*time.Minute),
val,
evidenceChainID,
)
expiredEvidence := types.NewMockDuplicateVoteEvidenceWithValidator(
int64(1),
defaultEvidenceTime.Add(1*time.Minute),
val,
evidenceChainID,
)
require.NoError(t, pool.AddEvidence(goodEvidence))
require.NoError(t, pool.AddEvidence(expiredEvidence))
// now recover from the previous pool at a different time
newStateStore := &smmocks.Store{}
newStateStore.On("Load").Return(sm.State{
@ -333,16 +360,18 @@ func TestRecoverPendingEvidence(t *testing.T) {
},
},
}, nil)
newPool, err := evidence.NewPool(evidenceDB, newStateStore, blockStore)
assert.NoError(t, err)
newPool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, newStateStore, blockStore)
require.NoError(t, err)
evList, _ := newPool.PendingEvidence(defaultEvidenceMaxBytes)
assert.Equal(t, 1, len(evList))
next := newPool.EvidenceFront()
assert.Equal(t, goodEvidence, next.Value.(types.Evidence))
require.Equal(t, 1, len(evList))
next := newPool.EvidenceFront()
require.Equal(t, goodEvidence, next.Value.(types.Evidence))
}
func initializeStateFromValidatorSet(valSet *types.ValidatorSet, height int64) sm.Store {
func initializeStateFromValidatorSet(t *testing.T, valSet *types.ValidatorSet, height int64) sm.Store {
stateDB := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB)
state := sm.State{
@ -370,16 +399,13 @@ func initializeStateFromValidatorSet(valSet *types.ValidatorSet, height int64) s
// save all states up to height
for i := int64(0); i <= height; i++ {
state.LastBlockHeight = i
if err := stateStore.Save(state); err != nil {
panic(err)
}
require.NoError(t, stateStore.Save(state))
}
return stateStore
}
func initializeValidatorState(privVal types.PrivValidator, height int64) sm.Store {
func initializeValidatorState(t *testing.T, privVal types.PrivValidator, height int64) sm.Store {
pubKey, _ := privVal.GetPubKey()
validator := &types.Validator{Address: pubKey.Address(), VotingPower: 10, PubKey: pubKey}
@ -389,7 +415,7 @@ func initializeValidatorState(privVal types.PrivValidator, height int64) sm.Stor
Proposer: validator,
}
return initializeStateFromValidatorSet(valSet, height)
return initializeStateFromValidatorSet(t, valSet, height)
}
// initializeBlockStore creates a block storage and populates it w/ a dummy
@ -420,21 +446,21 @@ func makeCommit(height int64, valAddr []byte) *types.Commit {
Timestamp: defaultEvidenceTime,
Signature: []byte("Signature"),
}}
return types.NewCommit(height, 0, types.BlockID{}, commitSigs)
}
func defaultTestPool(height int64) (*evidence.Pool, types.MockPV) {
func defaultTestPool(t *testing.T, height int64) (*evidence.Pool, types.MockPV) {
val := types.NewMockPV()
valAddress := val.PrivKey.PubKey().Address()
evidenceDB := dbm.NewMemDB()
stateStore := initializeValidatorState(val, height)
stateStore := initializeValidatorState(t, val, height)
state, _ := stateStore.Load()
blockStore := initializeBlockStore(dbm.NewMemDB(), state, valAddress)
pool, err := evidence.NewPool(evidenceDB, stateStore, blockStore)
if err != nil {
panic("test evidence pool could not be created")
}
pool.SetLogger(log.TestingLogger())
pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore)
require.NoError(t, err, "test evidence pool could not be created")
return pool, val
}


+ 313
- 182
evidence/reactor.go View File

@ -2,17 +2,42 @@ package evidence
import (
"fmt"
"sync"
"time"
clist "github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
var (
_ service.Service = (*Reactor)(nil)
// ChannelShims contains a map of ChannelDescriptorShim objects, where each
// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
// p2p proto.Message the new p2p Channel is responsible for handling.
//
//
// TODO: Remove once p2p refactor is complete.
// ref: https://github.com/tendermint/tendermint/issues/5670
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
EvidenceChannel: {
MsgType: new(tmproto.EvidenceList),
Descriptor: &p2p.ChannelDescriptor{
ID: byte(EvidenceChannel),
Priority: 6,
RecvMessageCapacity: maxMsgSize,
},
},
}
)
const (
EvidenceChannel = byte(0x38)
EvidenceChannel = p2p.ChannelID(0x38)
maxMsgSize = 1048576 // 1MB TODO make it configurable
@ -21,233 +46,339 @@ const (
// Most evidence should be committed in the very next block that is why we wait
// just over the block production rate before sending evidence again.
broadcastEvidenceIntervalS = 10
// If a message fails wait this much before sending it again
peerRetryMessageIntervalMS = 100
)
type closer struct {
closeOnce sync.Once
doneCh chan struct{}
}
func newCloser() *closer {
return &closer{doneCh: make(chan struct{})}
}
func (c *closer) close() {
c.closeOnce.Do(func() {
close(c.doneCh)
})
}
// Reactor handles evpool evidence broadcasting amongst peers.
type Reactor struct {
p2p.BaseReactor
evpool *Pool
eventBus *types.EventBus
service.BaseService
evpool *Pool
eventBus *types.EventBus
evidenceCh *p2p.Channel
peerUpdates *p2p.PeerUpdatesCh
closeCh chan struct{}
peerWG sync.WaitGroup
mtx tmsync.Mutex
peerRoutines map[p2p.NodeID]*closer
}
// NewReactor returns a new Reactor with the given config and evpool.
func NewReactor(evpool *Pool) *Reactor {
evR := &Reactor{
evpool: evpool,
// NewReactor returns a reference to a new evidence reactor, which implements the
// service.Service interface. It accepts a p2p Channel dedicated for handling
// envelopes with EvidenceList messages.
func NewReactor(
logger log.Logger,
evidenceCh *p2p.Channel,
peerUpdates *p2p.PeerUpdatesCh,
evpool *Pool,
) *Reactor {
r := &Reactor{
evpool: evpool,
evidenceCh: evidenceCh,
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
peerRoutines: make(map[p2p.NodeID]*closer),
}
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR)
return evR
r.BaseService = *service.NewBaseService(logger, "Evidence", r)
return r
}
// SetLogger sets the Logger on the reactor and the underlying Evidence.
func (evR *Reactor) SetLogger(l log.Logger) {
evR.Logger = l
evR.evpool.SetLogger(l)
// SetEventBus implements events.Eventable.
func (r *Reactor) SetEventBus(b *types.EventBus) {
r.eventBus = b
}
// GetChannels implements Reactor.
// It returns the list of channels for this reactor.
func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
ID: EvidenceChannel,
Priority: 6,
RecvMessageCapacity: maxMsgSize,
},
// OnStart starts separate go routines for each p2p Channel and listens for
// envelopes on each. In addition, it also listens for peer updates and handles
// messages on that p2p channel accordingly. The caller must be sure to execute
// OnStop to ensure the outbound p2p Channels are closed. No error is returned.
func (r *Reactor) OnStart() error {
go r.processEvidenceCh()
go r.processPeerUpdates()
return nil
}
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
// blocking until they all exit.
func (r *Reactor) OnStop() {
r.mtx.Lock()
for _, c := range r.peerRoutines {
c.close()
}
r.mtx.Unlock()
// Wait for all spawned peer evidence broadcasting goroutines to gracefully
// exit.
r.peerWG.Wait()
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
// p2p Channels should execute Close().
close(r.closeCh)
// Wait for all p2p Channels to be closed before returning. This ensures we
// can easily reason about synchronization of all p2p Channels and ensure no
// panics will occur.
<-r.evidenceCh.Done()
<-r.peerUpdates.Done()
}
// AddPeer implements Reactor.
func (evR *Reactor) AddPeer(peer p2p.Peer) {
go evR.broadcastEvidenceRoutine(peer)
// handleEvidenceMessage handles enevelopes sent from peers on the EvidenceChannel.
// It returns an error only if the Envelope.Message is unknown for this channel
// or if the given evidence is invalid. This should never be called outside of
// handleMessage.
func (r *Reactor) handleEvidenceMessage(envelope p2p.Envelope) error {
logger := r.Logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
case *tmproto.EvidenceList:
logger.Debug("received evidence list", "num_evidence", len(msg.Evidence))
// TODO: Refactor the Evidence type to not contain a list since we only ever
// send and receive one piece of evidence at a time. Or potentially consider
// batching evidence.
//
// see: https://github.com/tendermint/tendermint/issues/4729
for i := 0; i < len(msg.Evidence); i++ {
ev, err := types.EvidenceFromProto(&msg.Evidence[i])
if err != nil {
logger.Error("failed to convert evidence", "err", err)
continue
}
if err := r.evpool.AddEvidence(ev); err != nil {
// If we're given invalid evidence by the peer, notify the router that
// we should remove this peer by returning an error.
if _, ok := err.(*types.ErrInvalidEvidence); ok {
return err
}
}
}
default:
return fmt.Errorf("received unknown message: %T", msg)
}
return nil
}
// Receive implements Reactor.
// It adds any received evidence to the evpool.
// XXX: do not call any methods that can block or incur heavy processing.
// https://github.com/tendermint/tendermint/issues/2888
func (evR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
evis, err := decodeMsg(msgBytes)
if err != nil {
evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
evR.Switch.StopPeerForError(src, err)
return
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
}
}()
switch chID {
case EvidenceChannel:
err = r.handleEvidenceMessage(envelope)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
}
for _, ev := range evis {
err := evR.evpool.AddEvidence(ev)
switch err.(type) {
case *types.ErrInvalidEvidence:
evR.Logger.Error(err.Error())
// punish peer
evR.Switch.StopPeerForError(src, err)
return err
}
// processEvidenceCh implements a blocking event loop where we listen for p2p
// Envelope messages from the evidenceCh.
func (r *Reactor) processEvidenceCh() {
defer r.evidenceCh.Close()
for {
select {
case envelope := <-r.evidenceCh.In():
if err := r.handleMessage(r.evidenceCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.evidenceCh.ID(), "envelope", envelope, "err", err)
r.evidenceCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
}
}
case <-r.closeCh:
r.Logger.Debug("stopped listening on evidence channel; closing...")
return
case nil:
default:
// continue to the next piece of evidence
evR.Logger.Error("Evidence has not been added", "evidence", evis, "err", err)
}
}
}
// SetEventBus implements events.Eventable.
func (evR *Reactor) SetEventBus(b *types.EventBus) {
evR.eventBus = b
// processPeerUpdate processes a PeerUpdate, returning an error upon failing to
// handle the PeerUpdate or if a panic is recovered. For new or live peers it
// will check if an evidence broadcasting goroutine needs to be started. For
// down or removed peers, it will check if an evidence broadcasting goroutine
// exists and signal that it should exit.
//
// FIXME: The peer may be behind in which case it would simply ignore the
// evidence and treat it as invalid. This would cause the peer to disconnect.
// The peer may also receive the same piece of evidence multiple times if it
// connects/disconnects frequently from the broadcasting peer(s).
//
// REF: https://github.com/tendermint/tendermint/issues/4727
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status)
r.mtx.Lock()
defer r.mtx.Unlock()
switch peerUpdate.Status {
case p2p.PeerStatusUp:
// Do not allow starting new evidence broadcast loops after reactor shutdown
// has been initiated. This can happen after we've manually closed all
// peer broadcast loops and closed r.closeCh, but the router still sends
// in-flight peer updates.
if !r.IsRunning() {
return
}
// Check if we've already started a goroutine for this peer, if not we create
// a new done channel so we can explicitly close the goroutine if the peer
// is later removed, we increment the waitgroup so the reactor can stop
// safely, and finally start the goroutine to broadcast evidence to that peer.
_, ok := r.peerRoutines[peerUpdate.PeerID]
if !ok {
closer := newCloser()
r.peerRoutines[peerUpdate.PeerID] = closer
r.peerWG.Add(1)
go r.broadcastEvidenceLoop(peerUpdate.PeerID, closer)
}
case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned:
// Check if we've started an evidence broadcasting goroutine for this peer.
// If we have, we signal to terminate the goroutine via the channel's closure.
// This will internally decrement the peer waitgroup and remove the peer
// from the map of peer evidence broadcasting goroutines.
closer, ok := r.peerRoutines[peerUpdate.PeerID]
if ok {
closer.close()
}
}
}
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates() {
defer r.peerUpdates.Close()
for {
select {
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(peerUpdate)
case <-r.closeCh:
r.Logger.Debug("stopped listening on peer updates channel; closing...")
return
}
}
}
// Modeled after the mempool routine.
// - Evidence accumulates in a clist.
// - Each peer has a routine that iterates through the clist,
// sending available evidence to the peer.
// - If we're waiting for new evidence and the list is not empty,
// start iterating from the beginning again.
func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) {
// broadcastEvidenceLoop starts a blocking process that continuously reads pieces
// of evidence off of a linked-list and sends the evidence in a p2p Envelope to
// the given peer by ID. This should be invoked in a goroutine per unique peer
// ID via an appropriate PeerUpdate. The goroutine can be signaled to gracefully
// exit by either explicitly closing the provided doneCh or by the reactor
// signaling to stop.
//
// TODO: This should be refactored so that we do not blindly gossip evidence
// that the peer has already received or may not be ready for.
//
// REF: https://github.com/tendermint/tendermint/issues/4727
func (r *Reactor) broadcastEvidenceLoop(peerID p2p.NodeID, closer *closer) {
var next *clist.CElement
defer func() {
r.mtx.Lock()
delete(r.peerRoutines, peerID)
r.mtx.Unlock()
r.peerWG.Done()
if e := recover(); e != nil {
r.Logger.Error("recovering from broadcasting evidence loop", "err", e)
}
}()
for {
// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWait() returned nil. Go ahead and
// start from the beginning.
// collected (removed). That is, .NextWaitChan() returned nil. So we can go
// ahead and start from the beginning.
if next == nil {
select {
case <-evR.evpool.EvidenceWaitChan(): // Wait until evidence is available
if next = evR.evpool.EvidenceFront(); next == nil {
case <-r.evpool.EvidenceWaitChan(): // wait until next evidence is available
if next = r.evpool.EvidenceFront(); next == nil {
continue
}
case <-peer.Quit():
case <-closer.doneCh:
// The peer is marked for removal via a PeerUpdate as the doneCh was
// explicitly closed to signal we should exit.
return
case <-evR.Quit():
case <-r.closeCh:
// The reactor has signaled that we are stopped and thus we should
// implicitly exit this peer's goroutine.
return
}
}
ev := next.Value.(types.Evidence)
evis := evR.prepareEvidenceMessage(peer, ev)
if len(evis) > 0 {
msgBytes, err := encodeMsg(evis)
if err != nil {
panic(err)
}
evR.Logger.Debug("Gossiping evidence to peer", "ev", ev, "peer", peer.ID())
success := peer.Send(EvidenceChannel, msgBytes)
if !success {
time.Sleep(peerRetryMessageIntervalMS * time.Millisecond)
continue
}
evProto, err := types.EvidenceToProto(ev)
if err != nil {
panic(fmt.Errorf("failed to convert evidence: %w", err))
}
// Send the evidence to the corresponding peer. Note, the peer may be behind
// and thus would not be able to process the evidence correctly. Also, the
// peer may receive this piece of evidence multiple times if it added and
// removed frequently from the broadcasting peer.
r.evidenceCh.Out() <- p2p.Envelope{
To: peerID,
Message: &tmproto.EvidenceList{
Evidence: []tmproto.Evidence{*evProto},
},
}
r.Logger.Debug("gossiped evidence to peer", "evidence", ev, "peer", peerID)
afterCh := time.After(time.Second * broadcastEvidenceIntervalS)
select {
case <-afterCh:
// start from the beginning every tick.
// TODO: only do this if we're at the end of the list!
case <-time.After(time.Second * broadcastEvidenceIntervalS):
// start from the beginning after broadcastEvidenceIntervalS seconds
next = nil
case <-next.NextWaitChan():
// see the start of the for loop for nil check
next = next.Next()
case <-peer.Quit():
return
case <-evR.Quit():
return
}
}
}
// Returns the message to send to the peer, or nil if the evidence is invalid for the peer.
// If message is nil, we should sleep and try again.
func (evR Reactor) prepareEvidenceMessage(
peer p2p.Peer,
ev types.Evidence,
) (evis []types.Evidence) {
// make sure the peer is up to date
evHeight := ev.Height()
peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
if !ok {
// Peer does not have a state yet. We set it in the consensus reactor, but
// when we add peer in Switch, the order we call reactors#AddPeer is
// different every time due to us using a map. Sometimes other reactors
// will be initialized before the consensus reactor. We should wait a few
// milliseconds and retry.
return nil
}
// NOTE: We only send evidence to peers where
// peerHeight - maxAge < evidenceHeight < peerHeight
var (
peerHeight = peerState.GetHeight()
params = evR.evpool.State().ConsensusParams.Evidence
ageNumBlocks = peerHeight - evHeight
)
if peerHeight <= evHeight { // peer is behind. sleep while he catches up
return nil
} else if ageNumBlocks > params.MaxAgeNumBlocks { // evidence is too old relative to the peer, skip
// NOTE: if evidence is too old for an honest peer, then we're behind and
// either it already got committed or it never will!
evR.Logger.Info("Not sending peer old evidence",
"peerHeight", peerHeight,
"evHeight", evHeight,
"maxAgeNumBlocks", params.MaxAgeNumBlocks,
"lastBlockTime", evR.evpool.State().LastBlockTime,
"maxAgeDuration", params.MaxAgeDuration,
"peer", peer,
)
return nil
}
// send evidence
return []types.Evidence{ev}
}
// PeerState describes the state of a peer.
type PeerState interface {
GetHeight() int64
}
// encodemsg takes a array of evidence
// returns the byte encoding of the List Message
func encodeMsg(evis []types.Evidence) ([]byte, error) {
evi := make([]tmproto.Evidence, len(evis))
for i := 0; i < len(evis); i++ {
ev, err := types.EvidenceToProto(evis[i])
if err != nil {
return nil, err
}
evi[i] = *ev
}
epl := tmproto.EvidenceList{
Evidence: evi,
}
return epl.Marshal()
}
// decodemsg takes an array of bytes
// returns an array of evidence
func decodeMsg(bz []byte) (evis []types.Evidence, err error) {
lm := tmproto.EvidenceList{}
if err := lm.Unmarshal(bz); err != nil {
return nil, err
}
evis = make([]types.Evidence, len(lm.Evidence))
for i := 0; i < len(lm.Evidence); i++ {
ev, err := types.EvidenceFromProto(&lm.Evidence[i])
if err != nil {
return nil, err
}
evis[i] = ev
}
case <-closer.doneCh:
// The peer is marked for removal via a PeerUpdate as the doneCh was
// explicitly closed to signal we should exit.
return
for i, ev := range evis {
if err := ev.ValidateBasic(); err != nil {
return nil, fmt.Errorf("invalid evidence (#%d): %v", i, err)
case <-r.closeCh:
// The reactor has signaled that we are stopped and thus we should
// implicitly exit this peer's goroutine.
return
}
}
return evis, nil
}

+ 508
- 253
evidence/reactor_test.go View File

@ -3,18 +3,16 @@ package evidence_test
import (
"encoding/hex"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/go-kit/kit/log/term"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/evidence"
@ -28,310 +26,565 @@ import (
var (
numEvidence = 10
timeout = 120 * time.Second // ridiculously high because CircleCI is slow
rng = rand.New(rand.NewSource(time.Now().UnixNano()))
)
// We have N evidence reactors connected to one another. The first reactor
// receives a number of evidence at varying heights. We test that all
// other reactors receive the evidence and add it to their own respective
// evidence pools.
type reactorTestSuite struct {
reactor *evidence.Reactor
pool *evidence.Pool
peerID p2p.NodeID
evidenceChannel *p2p.Channel
evidenceInCh chan p2p.Envelope
evidenceOutCh chan p2p.Envelope
evidencePeerErrCh chan p2p.PeerError
peerUpdatesCh chan p2p.PeerUpdate
peerUpdates *p2p.PeerUpdatesCh
}
func setup(t *testing.T, logger log.Logger, pool *evidence.Pool, chBuf uint) *reactorTestSuite {
t.Helper()
pID := make([]byte, 16)
_, err := rng.Read(pID)
require.NoError(t, err)
peerUpdatesCh := make(chan p2p.PeerUpdate)
rts := &reactorTestSuite{
pool: pool,
evidenceInCh: make(chan p2p.Envelope, chBuf),
evidenceOutCh: make(chan p2p.Envelope, chBuf),
evidencePeerErrCh: make(chan p2p.PeerError, chBuf),
peerUpdatesCh: peerUpdatesCh,
peerUpdates: p2p.NewPeerUpdates(peerUpdatesCh),
peerID: p2p.NodeID(fmt.Sprintf("%x", pID)),
}
rts.evidenceChannel = p2p.NewChannel(
evidence.EvidenceChannel,
new(tmproto.EvidenceList),
rts.evidenceInCh,
rts.evidenceOutCh,
rts.evidencePeerErrCh,
)
rts.reactor = evidence.NewReactor(
logger,
rts.evidenceChannel,
rts.peerUpdates,
pool,
)
require.NoError(t, rts.reactor.Start())
require.True(t, rts.reactor.IsRunning())
t.Cleanup(func() {
require.NoError(t, rts.reactor.Stop())
require.False(t, rts.reactor.IsRunning())
})
return rts
}
func createTestSuites(t *testing.T, stateStores []sm.Store, chBuf uint) []*reactorTestSuite {
t.Helper()
numSStores := len(stateStores)
testSuites := make([]*reactorTestSuite, numSStores)
evidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
for i := 0; i < numSStores; i++ {
logger := log.TestingLogger().With("validator", i)
evidenceDB := dbm.NewMemDB()
blockStore := &mocks.BlockStore{}
blockStore.On("LoadBlockMeta", mock.AnythingOfType("int64")).Return(
&types.BlockMeta{Header: types.Header{Time: evidenceTime}},
)
pool, err := evidence.NewPool(logger, evidenceDB, stateStores[i], blockStore)
require.NoError(t, err)
testSuites[i] = setup(t, logger, pool, chBuf)
}
return testSuites
}
func waitForEvidence(t *testing.T, evList types.EvidenceList, suites ...*reactorTestSuite) {
t.Helper()
wg := new(sync.WaitGroup)
for _, suite := range suites {
wg.Add(1)
go func(s *reactorTestSuite) {
var localEvList []types.Evidence
currentPoolSize := 0
for currentPoolSize != len(evList) {
// each evidence should not be more than 500 bytes
localEvList, _ = s.pool.PendingEvidence(int64(len(evList) * 500))
currentPoolSize = len(localEvList)
}
// put the reaped evidence in a map so we can quickly check we got everything
evMap := make(map[string]types.Evidence)
for _, e := range localEvList {
evMap[string(e.Hash())] = e
}
for i, expectedEv := range evList {
gotEv := evMap[string(expectedEv.Hash())]
require.Equalf(
t,
expectedEv,
gotEv,
"evidence at index %d in pool does not match; got: %v, expected: %v", i, gotEv, expectedEv,
)
}
wg.Done()
}(suite)
}
// wait for the evidence in all evidence pools
wg.Wait()
}
func createEvidenceList(
t *testing.T,
pool *evidence.Pool,
val types.PrivValidator,
numEvidence int,
) types.EvidenceList {
evList := make([]types.Evidence, numEvidence)
for i := 0; i < numEvidence; i++ {
ev := types.NewMockDuplicateVoteEvidenceWithValidator(
int64(i+1),
time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC),
val,
evidenceChainID,
)
require.NoError(t, pool.AddEvidence(ev))
evList[i] = ev
}
return evList
}
// simulateRouter will increment the provided WaitGroup and execute a simulated
// router where, for each outbound p2p Envelope from the primary reactor, we
// proxy (send) the Envelope the relevant peer reactor. Done is invoked on the
// WaitGroup when numOut Envelopes are sent (i.e. read from the outbound channel).
func simulateRouter(wg *sync.WaitGroup, primary *reactorTestSuite, suites []*reactorTestSuite, numOut int) {
wg.Add(1)
// create a mapping for efficient suite lookup by peer ID
suitesByPeerID := make(map[p2p.NodeID]*reactorTestSuite)
for _, suite := range suites {
suitesByPeerID[suite.peerID] = suite
}
// Simulate a router by listening for all outbound envelopes and proxying the
// envelope to the respective peer (suite).
go func() {
for i := 0; i < numOut; i++ {
envelope := <-primary.evidenceOutCh
other := suitesByPeerID[envelope.To]
other.evidenceInCh <- p2p.Envelope{
From: primary.peerID,
To: envelope.To,
Message: envelope.Message,
}
}
wg.Done()
}()
}
func TestReactorMultiDisconnect(t *testing.T) {
val := types.NewMockPV()
height := int64(numEvidence) + 10
stateDB1 := initializeValidatorState(t, val, height)
stateDB2 := initializeValidatorState(t, val, height)
testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, 20)
primary := testSuites[0]
secondary := testSuites[1]
_ = createEvidenceList(t, primary.pool, val, numEvidence)
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: secondary.peerID,
}
// Ensure "disconnecting" the secondary peer from the primary more than once
// is handled gracefully.
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusDown,
PeerID: secondary.peerID,
}
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusDown,
PeerID: secondary.peerID,
}
}
// TestReactorBroadcastEvidence creates an environment of multiple peers that
// are all at the same height. One peer, designated as a primary, gossips all
// evidence to the remaining peers.
func TestReactorBroadcastEvidence(t *testing.T) {
config := cfg.TestConfig()
N := 7
numPeers := 7
// create statedb for everyone
stateDBs := make([]sm.Store, N)
// create a stateDB for all test suites (nodes)
stateDBs := make([]sm.Store, numPeers)
val := types.NewMockPV()
// we need validators saved for heights at least as high as we have evidence for
// We need all validators saved for heights at least as high as we have
// evidence for.
height := int64(numEvidence) + 10
for i := 0; i < N; i++ {
stateDBs[i] = initializeValidatorState(val, height)
for i := 0; i < numPeers; i++ {
stateDBs[i] = initializeValidatorState(t, val, height)
}
// make reactors from statedb
reactors, pools := makeAndConnectReactorsAndPools(config, stateDBs)
// Create a series of test suites where each suite contains a reactor and
// evidence pool. In addition, we mark a primary suite and the rest are
// secondaries where each secondary is added as a peer via a PeerUpdate to the
// primary. As a result, the primary will gossip all evidence to each secondary.
testSuites := createTestSuites(t, stateDBs, 0)
primary := testSuites[0]
secondaries := testSuites[1:]
// set the peer height on each reactor
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
ps := peerState{height}
peer.Set(types.PeerStateKey, ps)
// Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite).
wg := new(sync.WaitGroup)
simulateRouter(wg, primary, testSuites, numEvidence*len(secondaries))
evList := createEvidenceList(t, primary.pool, val, numEvidence)
// Add each secondary suite (node) as a peer to the primary suite (node). This
// will cause the primary to gossip all evidence to the secondaries.
for _, suite := range secondaries {
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: suite.peerID,
}
}
// send a bunch of valid evidence to the first reactor's evpool
// and wait for them all to be received in the others
evList := sendEvidence(t, pools[0], val, numEvidence)
waitForEvidence(t, evList, pools)
}
// Wait till all secondary suites (reactor) received all evidence from the
// primary suite (node).
waitForEvidence(t, evList, secondaries...)
// We have two evidence reactors connected to one another but are at different heights.
// Reactor 1 which is ahead receives a number of evidence. It should only send the evidence
// that is below the height of the peer to that peer.
func TestReactorSelectiveBroadcast(t *testing.T) {
config := cfg.TestConfig()
for _, suite := range testSuites {
require.Equal(t, numEvidence, int(suite.pool.Size()))
}
wg.Wait()
// ensure all channels are drained
for _, suite := range testSuites {
require.Empty(t, suite.evidenceOutCh)
}
}
// TestReactorSelectiveBroadcast tests a context where we have two reactors
// connected to one another but are at different heights. Reactor 1 which is
// ahead receives a list of evidence.
func TestReactorBroadcastEvidence_Lagging(t *testing.T) {
val := types.NewMockPV()
height1 := int64(numEvidence) + 10
height2 := int64(numEvidence) / 2
// DB1 is ahead of DB2
stateDB1 := initializeValidatorState(val, height1)
stateDB2 := initializeValidatorState(val, height2)
// stateDB1 is ahead of stateDB2, where stateDB1 has all heights (1-10) and
// stateDB2 only has heights 1-7.
stateDB1 := initializeValidatorState(t, val, height1)
stateDB2 := initializeValidatorState(t, val, height2)
// make reactors from statedb
reactors, pools := makeAndConnectReactorsAndPools(config, []sm.Store{stateDB1, stateDB2})
testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, 0)
primary := testSuites[0]
secondaries := testSuites[1:]
// set the peer height on each reactor
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
ps := peerState{height1}
peer.Set(types.PeerStateKey, ps)
// Simulate a router by listening for all outbound envelopes and proxying the
// envelope to the respective peer (suite).
wg := new(sync.WaitGroup)
simulateRouter(wg, primary, testSuites, numEvidence*len(secondaries))
// Send a list of valid evidence to the first reactor's, the one that is ahead,
// evidence pool.
evList := createEvidenceList(t, primary.pool, val, numEvidence)
// Add each secondary suite (node) as a peer to the primary suite (node). This
// will cause the primary to gossip all evidence to the secondaries.
for _, suite := range secondaries {
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: suite.peerID,
}
}
// update the first reactor peer's height to be very small
peer := reactors[0].Switch.Peers().List()[0]
ps := peerState{height2}
peer.Set(types.PeerStateKey, ps)
// only ones less than the peers height should make it through
waitForEvidence(t, evList[:height2+2], secondaries...)
// send a bunch of valid evidence to the first reactor's evpool
evList := sendEvidence(t, pools[0], val, numEvidence)
require.Equal(t, numEvidence, int(primary.pool.Size()))
require.Equal(t, int(height2+2), int(secondaries[0].pool.Size()))
// only ones less than the peers height should make it through
waitForEvidence(t, evList[:numEvidence/2-1], []*evidence.Pool{pools[1]})
// The primary will continue to send the remaining evidence to the secondaries
// so we wait until it has sent all the envelopes.
wg.Wait()
// peers should still be connected
peers := reactors[1].Switch.Peers().List()
assert.Equal(t, 1, len(peers))
// ensure all channels are drained
for _, suite := range testSuites {
require.Empty(t, suite.evidenceOutCh)
}
}
// This tests aims to ensure that reactors don't send evidence that they have committed or that ar
// not ready for the peer through three scenarios.
// First, committed evidence to a newly connected peer
// Second, evidence to a peer that is behind
// Third, evidence that was pending and became committed just before the peer caught up
func TestReactorsGossipNoCommittedEvidence(t *testing.T) {
config := cfg.TestConfig()
func TestReactorBroadcastEvidence_Pending(t *testing.T) {
val := types.NewMockPV()
var height int64 = 10
height := int64(10)
// DB1 is ahead of DB2
stateDB1 := initializeValidatorState(val, height-1)
stateDB2 := initializeValidatorState(val, height-2)
state, err := stateDB1.Load()
require.NoError(t, err)
state.LastBlockHeight++
stateDB1 := initializeValidatorState(t, val, height)
stateDB2 := initializeValidatorState(t, val, height)
// make reactors from statedb
reactors, pools := makeAndConnectReactorsAndPools(config, []sm.Store{stateDB1, stateDB2})
testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, 0)
primary := testSuites[0]
secondary := testSuites[1]
evList := sendEvidence(t, pools[0], val, 2)
pools[0].Update(state, evList)
require.EqualValues(t, uint32(0), pools[0].Size())
// Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite).
wg := new(sync.WaitGroup)
simulateRouter(wg, primary, testSuites, numEvidence)
time.Sleep(100 * time.Millisecond)
// add all evidence to the primary reactor
evList := createEvidenceList(t, primary.pool, val, numEvidence)
peer := reactors[0].Switch.Peers().List()[0]
ps := peerState{height - 2}
peer.Set(types.PeerStateKey, ps)
// Manually add half the evidence to the secondary which will mark them as
// pending.
for i := 0; i < numEvidence/2; i++ {
require.NoError(t, secondary.pool.AddEvidence(evList[i]))
}
peer = reactors[1].Switch.Peers().List()[0]
ps = peerState{height}
peer.Set(types.PeerStateKey, ps)
// the secondary should have half the evidence as pending
require.Equal(t, uint32(numEvidence/2), secondary.pool.Size())
// wait to see that no evidence comes through
time.Sleep(300 * time.Millisecond)
// add the secondary reactor as a peer to the primary reactor
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: secondary.peerID,
}
// the second pool should not have received any evidence because it has already been committed
assert.Equal(t, uint32(0), pools[1].Size(), "second reactor should not have received evidence")
// The secondary reactor should have received all the evidence ignoring the
// already pending evidence.
waitForEvidence(t, evList, secondary)
// the first reactor receives three more evidence
evList = make([]types.Evidence, 3)
for i := 0; i < 3; i++ {
ev := types.NewMockDuplicateVoteEvidenceWithValidator(height-3+int64(i),
time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC), val, state.ChainID)
err := pools[0].AddEvidence(ev)
require.NoError(t, err)
evList[i] = ev
for _, suite := range testSuites {
require.Equal(t, numEvidence, int(suite.pool.Size()))
}
wg.Wait()
// ensure all channels are drained
for _, suite := range testSuites {
require.Empty(t, suite.evidenceOutCh)
}
}
func TestReactorBroadcastEvidence_Committed(t *testing.T) {
val := types.NewMockPV()
height := int64(10)
stateDB1 := initializeValidatorState(t, val, height)
stateDB2 := initializeValidatorState(t, val, height)
testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, 0)
primary := testSuites[0]
secondary := testSuites[1]
// add all evidence to the primary reactor
evList := createEvidenceList(t, primary.pool, val, numEvidence)
// Manually add half the evidence to the secondary which will mark them as
// pending.
for i := 0; i < numEvidence/2; i++ {
require.NoError(t, secondary.pool.AddEvidence(evList[i]))
}
// wait to see that only one evidence is sent
time.Sleep(300 * time.Millisecond)
// the secondary should have half the evidence as pending
require.Equal(t, uint32(numEvidence/2), secondary.pool.Size())
// the second pool should only have received the first evidence because it is behind
peerEv, _ := pools[1].PendingEvidence(10000)
assert.EqualValues(t, []types.Evidence{evList[0]}, peerEv)
state, err := stateDB2.Load()
require.NoError(t, err)
// the last evidence is committed and the second reactor catches up in state to the first
// reactor. We therefore expect that the second reactor only receives one more evidence, the
// one that is still pending and not the evidence that has already been committed.
// update the secondary's pool such that all pending evidence is committed
state.LastBlockHeight++
pools[0].Update(state, []types.Evidence{evList[2]})
// the first reactor should have the two remaining pending evidence
require.EqualValues(t, uint32(2), pools[0].Size())
secondary.pool.Update(state, evList[:numEvidence/2])
// now update the state of the second reactor
pools[1].Update(state, types.EvidenceList{})
peer = reactors[0].Switch.Peers().List()[0]
ps = peerState{height}
peer.Set(types.PeerStateKey, ps)
// the secondary should have half the evidence as committed
require.Equal(t, uint32(0), secondary.pool.Size())
// wait to see that only two evidence is sent
time.Sleep(300 * time.Millisecond)
// Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite).
wg := new(sync.WaitGroup)
simulateRouter(wg, primary, testSuites, numEvidence)
peerEv, _ = pools[1].PendingEvidence(1000)
assert.EqualValues(t, []types.Evidence{evList[0], evList[1]}, peerEv)
}
// add the secondary reactor as a peer to the primary reactor
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: secondary.peerID,
}
// evidenceLogger is a TestingLogger which uses a different
// color for each validator ("validator" key must exist).
func evidenceLogger() log.Logger {
return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
for i := 0; i < len(keyvals)-1; i += 2 {
if keyvals[i] == "validator" {
return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
}
}
return term.FgBgColor{}
})
// The secondary reactor should have received all the evidence ignoring the
// already committed evidence.
waitForEvidence(t, evList[numEvidence/2:], secondary)
require.Equal(t, numEvidence, int(primary.pool.Size()))
require.Equal(t, numEvidence/2, int(secondary.pool.Size()))
wg.Wait()
// ensure all channels are drained
for _, suite := range testSuites {
require.Empty(t, suite.evidenceOutCh)
}
}
// connect N evidence reactors through N switches
func makeAndConnectReactorsAndPools(config *cfg.Config, stateStores []sm.Store) ([]*evidence.Reactor,
[]*evidence.Pool) {
N := len(stateStores)
func TestReactorBroadcastEvidence_FullyConnected(t *testing.T) {
numPeers := 7
reactors := make([]*evidence.Reactor, N)
pools := make([]*evidence.Pool, N)
logger := evidenceLogger()
evidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
// create a stateDB for all test suites (nodes)
stateDBs := make([]sm.Store, numPeers)
val := types.NewMockPV()
for i := 0; i < N; i++ {
evidenceDB := dbm.NewMemDB()
blockStore := &mocks.BlockStore{}
blockStore.On("LoadBlockMeta", mock.AnythingOfType("int64")).Return(
&types.BlockMeta{Header: types.Header{Time: evidenceTime}},
)
pool, err := evidence.NewPool(evidenceDB, stateStores[i], blockStore)
if err != nil {
panic(err)
// We need all validators saved for heights at least as high as we have
// evidence for.
height := int64(numEvidence) + 10
for i := 0; i < numPeers; i++ {
stateDBs[i] = initializeValidatorState(t, val, height)
}
testSuites := createTestSuites(t, stateDBs, 0)
// Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite).
wg := new(sync.WaitGroup)
for _, suite := range testSuites {
simulateRouter(wg, suite, testSuites, numEvidence*(len(testSuites)-1))
}
evList := createEvidenceList(t, testSuites[0].pool, val, numEvidence)
// every suite (reactor) connects to every other suite (reactor)
for _, suiteI := range testSuites {
for _, suiteJ := range testSuites {
if suiteI.peerID != suiteJ.peerID {
suiteI.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: suiteJ.peerID,
}
}
}
pools[i] = pool
reactors[i] = evidence.NewReactor(pool)
reactors[i].SetLogger(logger.With("validator", i))
}
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("EVIDENCE", reactors[i])
return s
// wait till all suites (reactors) received all evidence from other suites (reactors)
waitForEvidence(t, evList, testSuites...)
for _, suite := range testSuites {
require.Equal(t, numEvidence, int(suite.pool.Size()))
}, p2p.Connect2Switches)
// commit state so we do not continue to repeat gossiping the same evidence
state := suite.pool.State()
state.LastBlockHeight++
suite.pool.Update(state, evList)
}
return reactors, pools
wg.Wait()
}
// wait for all evidence on all reactors
func waitForEvidence(t *testing.T, evs types.EvidenceList, pools []*evidence.Pool) {
// wait for the evidence in all evpools
func TestReactorBroadcastEvidence_RemovePeer(t *testing.T) {
val := types.NewMockPV()
height := int64(10)
stateDB1 := initializeValidatorState(t, val, height)
stateDB2 := initializeValidatorState(t, val, height)
testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, uint(numEvidence))
primary := testSuites[0]
secondary := testSuites[1]
// Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite).
wg := new(sync.WaitGroup)
for i := 0; i < len(pools); i++ {
wg.Add(1)
go _waitForEvidence(t, wg, evs, i, pools)
simulateRouter(wg, primary, testSuites, numEvidence/2)
// add all evidence to the primary reactor
evList := createEvidenceList(t, primary.pool, val, numEvidence)
// add the secondary reactor as a peer to the primary reactor
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: secondary.peerID,
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
// have the secondary reactor receive only half the evidence
waitForEvidence(t, evList[:numEvidence/2], secondary)
timer := time.After(timeout)
select {
case <-timer:
t.Fatal("Timed out waiting for evidence")
case <-done:
// disconnect the peer
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusDown,
PeerID: secondary.peerID,
}
}
// wait for all evidence on a single evpool
func _waitForEvidence(
t *testing.T,
wg *sync.WaitGroup,
evs types.EvidenceList,
poolIdx int,
pools []*evidence.Pool,
) {
evpool := pools[poolIdx]
var evList []types.Evidence
currentPoolSize := 0
for currentPoolSize != len(evs) {
evList, _ = evpool.PendingEvidence(int64(len(evs) * 500)) // each evidence should not be more than 500 bytes
currentPoolSize = len(evList)
time.Sleep(time.Millisecond * 100)
}
// put the reaped evidence in a map so we can quickly check we got everything
evMap := make(map[string]types.Evidence)
for _, e := range evList {
evMap[string(e.Hash())] = e
}
for i, expectedEv := range evs {
gotEv := evMap[string(expectedEv.Hash())]
assert.Equal(t, expectedEv, gotEv,
fmt.Sprintf("evidence at index %d on pool %d don't match: %v vs %v",
i, poolIdx, expectedEv, gotEv))
}
wg.Done()
}
// Ensure the secondary only received half of the evidence before being
// disconnected.
require.Equal(t, numEvidence/2, int(secondary.pool.Size()))
func sendEvidence(t *testing.T, evpool *evidence.Pool, val types.PrivValidator, n int) types.EvidenceList {
evList := make([]types.Evidence, n)
for i := 0; i < n; i++ {
ev := types.NewMockDuplicateVoteEvidenceWithValidator(int64(i+1),
time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC), val, evidenceChainID)
err := evpool.AddEvidence(ev)
require.NoError(t, err)
evList[i] = ev
wg.Wait()
// The primary reactor should still be attempting to send the remaining half.
//
// NOTE: The channel is buffered (size numEvidence) as to ensure the primary
// reactor will send all envelopes at once before receiving the signal to stop
// gossiping.
for i := 0; i < numEvidence/2; i++ {
<-primary.evidenceOutCh
}
return evList
}
type peerState struct {
height int64
// ensure all channels are drained
for _, suite := range testSuites {
require.Empty(t, suite.evidenceOutCh)
}
}
func (ps peerState) GetHeight() int64 {
return ps.height
}
// nolint:lll
func TestEvidenceListSerialization(t *testing.T) {
exampleVote := func(msgType byte) *types.Vote {
var stamp, err = time.Parse(types.TimeFormat, "2017-12-25T03:00:01.234Z")
require.NoError(t, err)
func exampleVote(t byte) *types.Vote {
var stamp, err = time.Parse(types.TimeFormat, "2017-12-25T03:00:01.234Z")
if err != nil {
panic(err)
}
return &types.Vote{
Type: tmproto.SignedMsgType(t),
Height: 3,
Round: 2,
Timestamp: stamp,
BlockID: types.BlockID{
Hash: tmhash.Sum([]byte("blockID_hash")),
PartSetHeader: types.PartSetHeader{
Total: 1000000,
Hash: tmhash.Sum([]byte("blockID_part_set_header_hash")),
return &types.Vote{
Type: tmproto.SignedMsgType(msgType),
Height: 3,
Round: 2,
Timestamp: stamp,
BlockID: types.BlockID{
Hash: tmhash.Sum([]byte("blockID_hash")),
PartSetHeader: types.PartSetHeader{
Total: 1000000,
Hash: tmhash.Sum([]byte("blockID_part_set_header_hash")),
},
},
},
ValidatorAddress: crypto.AddressHash([]byte("validator_address")),
ValidatorIndex: 56789,
ValidatorAddress: crypto.AddressHash([]byte("validator_address")),
ValidatorIndex: 56789,
}
}
}
// nolint:lll //ignore line length for tests
func TestEvidenceVectors(t *testing.T) {
val := &types.Validator{
Address: crypto.AddressHash([]byte("validator_address")),
@ -347,33 +600,35 @@ func TestEvidenceVectors(t *testing.T) {
valSet,
)
testCases := []struct {
testName string
testCases := map[string]struct {
evidenceList []types.Evidence
expBytes string
}{
{"DuplicateVoteEvidence", []types.Evidence{dupl}, "0a85020a82020a79080210031802224a0a208b01023386c371778ecb6368573e539afc3cc860ec3a2f614e54fe5652f4fc80122608c0843d122072db3d959635dff1bb567bedaa70573392c5159666a3f8caf11e413aac52207a2a0b08b1d381d20510809dca6f32146af1f4111082efb388211bc72c55bcd61e9ac3d538d5bb031279080110031802224a0a208b01023386c371778ecb6368573e539afc3cc860ec3a2f614e54fe5652f4fc80122608c0843d122072db3d959635dff1bb567bedaa70573392c5159666a3f8caf11e413aac52207a2a0b08b1d381d20510809dca6f32146af1f4111082efb388211bc72c55bcd61e9ac3d538d5bb03180a200a2a060880dbaae105"},
"DuplicateVoteEvidence": {
[]types.Evidence{dupl},
"0a85020a82020a79080210031802224a0a208b01023386c371778ecb6368573e539afc3cc860ec3a2f614e54fe5652f4fc80122608c0843d122072db3d959635dff1bb567bedaa70573392c5159666a3f8caf11e413aac52207a2a0b08b1d381d20510809dca6f32146af1f4111082efb388211bc72c55bcd61e9ac3d538d5bb031279080110031802224a0a208b01023386c371778ecb6368573e539afc3cc860ec3a2f614e54fe5652f4fc80122608c0843d122072db3d959635dff1bb567bedaa70573392c5159666a3f8caf11e413aac52207a2a0b08b1d381d20510809dca6f32146af1f4111082efb388211bc72c55bcd61e9ac3d538d5bb03180a200a2a060880dbaae105",
},
}
for _, tc := range testCases {
for name, tc := range testCases {
tc := tc
evi := make([]tmproto.Evidence, len(tc.evidenceList))
for i := 0; i < len(tc.evidenceList); i++ {
ev, err := types.EvidenceToProto(tc.evidenceList[i])
require.NoError(t, err, tc.testName)
evi[i] = *ev
}
epl := tmproto.EvidenceList{
Evidence: evi,
}
t.Run(name, func(t *testing.T) {
protoEv := make([]tmproto.Evidence, len(tc.evidenceList))
for i := 0; i < len(tc.evidenceList); i++ {
ev, err := types.EvidenceToProto(tc.evidenceList[i])
require.NoError(t, err)
protoEv[i] = *ev
}
bz, err := epl.Marshal()
require.NoError(t, err, tc.testName)
epl := tmproto.EvidenceList{
Evidence: protoEv,
}
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
bz, err := epl.Marshal()
require.NoError(t, err)
require.Equal(t, tc.expBytes, hex.EncodeToString(bz))
})
}
}

+ 84
- 27
evidence/verify.go View File

@ -17,6 +17,11 @@ import (
// - it is from a key who was a validator at the given height
// - it is internally consistent with state
// - it was properly signed by the alleged equivocator and meets the individual evidence verification requirements
//
// NOTE: Evidence may be provided that we do not have the block or validator
// set for. In these cases, we do not return a ErrInvalidEvidence as not to have
// the sending peer disconnect. All other errors are treated as invalid evidence
// (i.e. ErrInvalidEvidence).
func (evpool *Pool) verify(evidence types.Evidence) error {
var (
state = evpool.State()
@ -25,26 +30,42 @@ func (evpool *Pool) verify(evidence types.Evidence) error {
ageNumBlocks = height - evidence.Height()
)
// verify the time of the evidence
// ensure we have the block for the evidence height
//
// NOTE: It is currently possible for a peer to send us evidence we're not
// able to process because we're too far behind (e.g. syncing), so we DO NOT
// return an invalid evidence error because we do not want the peer to
// disconnect or signal an error in this particular case.
blockMeta := evpool.blockStore.LoadBlockMeta(evidence.Height())
if blockMeta == nil {
return fmt.Errorf("don't have header #%d", evidence.Height())
return fmt.Errorf("failed to verify evidence; missing block for height %d", evidence.Height())
}
// verify the time of the evidence
evTime := blockMeta.Header.Time
if evidence.Time() != evTime {
return fmt.Errorf("evidence has a different time to the block it is associated with (%v != %v)",
evidence.Time(), evTime)
return types.NewErrInvalidEvidence(
evidence,
fmt.Errorf(
"evidence has a different time to the block it is associated with (%v != %v)",
evidence.Time(), evTime,
),
)
}
ageDuration := state.LastBlockTime.Sub(evTime)
// check that the evidence hasn't expired
if ageDuration > evidenceParams.MaxAgeDuration && ageNumBlocks > evidenceParams.MaxAgeNumBlocks {
return fmt.Errorf(
"evidence from height %d (created at: %v) is too old; min height is %d and evidence can not be older than %v",
evidence.Height(),
evTime,
height-evidenceParams.MaxAgeNumBlocks,
state.LastBlockTime.Add(evidenceParams.MaxAgeDuration),
return types.NewErrInvalidEvidence(
evidence,
fmt.Errorf(
"evidence from height %d (created at: %v) is too old; min height is %d and evidence can not be older than %v",
evidence.Height(),
evTime,
height-evidenceParams.MaxAgeNumBlocks,
state.LastBlockTime.Add(evidenceParams.MaxAgeDuration),
),
)
}
@ -55,18 +76,26 @@ func (evpool *Pool) verify(evidence types.Evidence) error {
if err != nil {
return err
}
return VerifyDuplicateVote(ev, state.ChainID, valSet)
if err := VerifyDuplicateVote(ev, state.ChainID, valSet); err != nil {
return types.NewErrInvalidEvidence(evidence, err)
}
return nil
case *types.LightClientAttackEvidence:
commonHeader, err := getSignedHeader(evpool.blockStore, evidence.Height())
if err != nil {
return err
}
commonVals, err := evpool.stateDB.LoadValidators(evidence.Height())
if err != nil {
return err
}
trustedHeader := commonHeader
// in the case of lunatic the trusted header is different to the common header
if evidence.Height() != ev.ConflictingBlock.Height {
trustedHeader, err = getSignedHeader(evpool.blockStore, ev.ConflictingBlock.Height)
@ -75,23 +104,40 @@ func (evpool *Pool) verify(evidence types.Evidence) error {
}
}
err = VerifyLightClientAttack(ev, commonHeader, trustedHeader, commonVals, state.LastBlockTime,
state.ConsensusParams.Evidence.MaxAgeDuration)
err = VerifyLightClientAttack(
ev,
commonHeader,
trustedHeader,
commonVals,
state.LastBlockTime,
state.ConsensusParams.Evidence.MaxAgeDuration,
)
if err != nil {
return err
return types.NewErrInvalidEvidence(evidence, err)
}
// find out what type of attack this was and thus extract the malicious validators. Note in the case of an
// Amnesia attack we don't have any malicious validators.
// Find out what type of attack this was and thus extract the malicious
// validators. Note, in the case of an Amnesia attack we don't have any
// malicious validators.
validators := ev.GetByzantineValidators(commonVals, trustedHeader)
// ensure this matches the validators that are listed in the evidence. They should be ordered based on power.
// Ensure this matches the validators that are listed in the evidence. They
// should be ordered based on power.
if validators == nil && ev.ByzantineValidators != nil {
return fmt.Errorf("expected nil validators from an amnesia light client attack but got %d",
len(ev.ByzantineValidators))
return types.NewErrInvalidEvidence(
evidence,
fmt.Errorf(
"expected nil validators from an amnesia light client attack but got %d",
len(ev.ByzantineValidators),
),
)
}
if exp, got := len(validators), len(ev.ByzantineValidators); exp != got {
return fmt.Errorf("expected %d byzantine validators from evidence but got %d",
exp, got)
return types.NewErrInvalidEvidence(
evidence,
fmt.Errorf("expected %d byzantine validators from evidence but got %d", exp, got),
)
}
// ensure that both validator arrays are in the same order
@ -99,20 +145,31 @@ func (evpool *Pool) verify(evidence types.Evidence) error {
for idx, val := range validators {
if !bytes.Equal(ev.ByzantineValidators[idx].Address, val.Address) {
return fmt.Errorf("evidence contained a different byzantine validator address to the one we were expecting."+
"Expected %v, got %v", val.Address, ev.ByzantineValidators[idx].Address)
return types.NewErrInvalidEvidence(
evidence,
fmt.Errorf(
"evidence contained an unexpected byzantine validator address; expected: %v, got: %v",
val.Address, ev.ByzantineValidators[idx].Address,
),
)
}
if ev.ByzantineValidators[idx].VotingPower != val.VotingPower {
return fmt.Errorf("evidence contained a byzantine validator with a different power to the one we were expecting."+
"Expected %d, got %d", val.VotingPower, ev.ByzantineValidators[idx].VotingPower)
return types.NewErrInvalidEvidence(
evidence,
fmt.Errorf(
"evidence contained unexpected byzantine validator power; expected %d, got %d",
val.VotingPower, ev.ByzantineValidators[idx].VotingPower,
),
)
}
}
return nil
default:
return fmt.Errorf("unrecognized evidence type: %T", evidence)
return types.NewErrInvalidEvidence(evidence, fmt.Errorf("unrecognized evidence type: %T", evidence))
}
}
// VerifyLightClientAttack verifies LightClientAttackEvidence against the state of the full node. This involves


+ 4
- 7
evidence/verify_test.go View File

@ -103,9 +103,8 @@ func TestVerifyLightClientAttack_Lunatic(t *testing.T) {
blockStore.On("LoadBlockCommit", int64(4)).Return(commit)
blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit)
pool, err := evidence.NewPool(dbm.NewMemDB(), stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
require.NoError(t, err)
pool.SetLogger(log.TestingLogger())
evList := types.EvidenceList{ev}
err = pool.CheckEvidence(evList)
@ -201,9 +200,8 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) {
blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader})
blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit)
pool, err := evidence.NewPool(dbm.NewMemDB(), stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
require.NoError(t, err)
pool.SetLogger(log.TestingLogger())
evList := types.EvidenceList{ev}
err = pool.CheckEvidence(evList)
@ -276,9 +274,8 @@ func TestVerifyLightClientAttack_Amnesia(t *testing.T) {
blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader})
blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit)
pool, err := evidence.NewPool(dbm.NewMemDB(), stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
require.NoError(t, err)
pool.SetLogger(log.TestingLogger())
evList := types.EvidenceList{ev}
err = pool.CheckEvidence(evList)
@ -369,7 +366,7 @@ func TestVerifyDuplicateVoteEvidence(t *testing.T) {
blockStore := &mocks.BlockStore{}
blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: types.Header{Time: defaultEvidenceTime}})
pool, err := evidence.NewPool(dbm.NewMemDB(), stateStore, blockStore)
pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore)
require.NoError(t, err)
evList := types.EvidenceList{goodEv}


+ 54
- 26
node/node.go View File

@ -201,9 +201,10 @@ type Node struct {
consensusState *cs.State // latest consensus state
consensusReactor *cs.Reactor // for participating in the consensus
pexReactor *pex.Reactor // for exchanging peer addresses
evidencePool *evidence.Pool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
evidenceReactor *evidence.Reactor
evidencePool *evidence.Pool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer
indexerService *txindex.IndexerService
prometheusSrv *http.Server
@ -338,21 +339,34 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
return mempoolReactor, mempool
}
func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider,
stateDB dbm.DB, blockStore *store.BlockStore, logger log.Logger) (*evidence.Reactor, *evidence.Pool, error) {
func createEvidenceReactor(
config *cfg.Config,
dbProvider DBProvider,
stateDB dbm.DB,
blockStore *store.BlockStore,
logger log.Logger,
) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) {
evidenceDB, err := dbProvider(&DBContext{"evidence", config})
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
evidenceLogger := logger.With("module", "evidence")
evidencePool, err := evidence.NewPool(evidenceDB, sm.NewStore(stateDB), blockStore)
logger = logger.With("module", "evidence")
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
evidenceReactor := evidence.NewReactor(evidencePool)
evidenceReactor.SetLogger(evidenceLogger)
return evidenceReactor, evidencePool, nil
evidenceReactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims)
evidenceReactor := evidence.NewReactor(
logger,
evidenceReactorShim.GetChannel(evidence.EvidenceChannel),
evidenceReactorShim.PeerUpdates,
evidencePool,
)
return evidenceReactorShim, evidenceReactor, evidencePool, nil
}
func createBlockchainReactor(config *cfg.Config,
@ -485,7 +499,7 @@ func createSwitch(config *cfg.Config,
bcReactor p2p.Reactor,
stateSyncReactor *p2p.ReactorShim,
consensusReactor *cs.Reactor,
evidenceReactor *evidence.Reactor,
evidenceReactor *p2p.ReactorShim,
nodeInfo p2p.NodeInfo,
nodeKey p2p.NodeKey,
p2pLogger log.Logger) *p2p.Switch {
@ -708,8 +722,7 @@ func NewNode(config *cfg.Config,
// Make MempoolReactor
mempoolReactor, mempool := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger)
// Make Evidence Reactor
evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger)
evReactorShim, evReactor, evPool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger)
if err != nil {
return nil, err
}
@ -720,7 +733,7 @@ func NewNode(config *cfg.Config,
logger.With("module", "state"),
proxyApp.Consensus(),
mempool,
evidencePool,
evPool,
sm.BlockExecutorWithMetrics(smMetrics),
)
@ -737,8 +750,9 @@ func NewNode(config *cfg.Config,
} else if fastSync {
csMetrics.FastSyncing.Set(1)
}
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
config, state, blockExec, blockStore, mempool, evPool,
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger,
)
@ -746,8 +760,7 @@ func NewNode(config *cfg.Config,
// FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy,
// we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644
stateSyncReactorShim := p2p.NewReactorShim("StateSyncShim", statesync.ChannelShims)
stateSyncReactorShim.SetLogger(logger.With("module", "statesync"))
stateSyncReactorShim := p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims)
stateSyncReactor := statesync.NewReactor(
stateSyncReactorShim.Logger,
@ -769,7 +782,7 @@ func NewNode(config *cfg.Config,
transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp)
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
stateSyncReactorShim, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger,
stateSyncReactorShim, consensusReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
@ -833,7 +846,8 @@ func NewNode(config *cfg.Config,
stateSync: stateSync,
stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
pexReactor: pexReactor,
evidencePool: evidencePool,
evidenceReactor: evReactor,
evidencePool: evPool,
proxyApp: proxyApp,
txIndexer: txIndexer,
indexerService: indexerService,
@ -905,6 +919,11 @@ func (n *Node) OnStart() error {
return err
}
// Start the real evidence reactor separately since the switch uses the shim.
if err := n.evidenceReactor.Start(); err != nil {
return err
}
// Always connect to persistent peers
err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
if err != nil {
@ -948,7 +967,12 @@ func (n *Node) OnStop() {
// Stop the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Stop(); err != nil {
n.Logger.Error("failed to stop state sync service", "err", err)
n.Logger.Error("failed to stop the state sync reactor", "err", err)
}
// Stop the real evidence reactor separately since the switch uses the shim.
if err := n.evidenceReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the evidence reactor", "err", err)
}
// stop mempool WAL
@ -1270,10 +1294,14 @@ func makeNodeInfo(
Version: version.TMCoreSemVer,
Channels: []byte{
bcChannel,
cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel,
cs.StateChannel,
cs.DataChannel,
cs.VoteChannel,
cs.VoteSetBitsChannel,
mempl.MempoolChannel,
evidence.EvidenceChannel,
byte(statesync.SnapshotChannel), byte(statesync.ChunkChannel),
byte(evidence.EvidenceChannel),
byte(statesync.SnapshotChannel),
byte(statesync.ChunkChannel),
},
Moniker: config.Moniker,
Other: p2p.NodeInfoOther{


+ 1
- 2
node/node_test.go View File

@ -257,9 +257,8 @@ func TestCreateProposalBlock(t *testing.T) {
// Make EvidencePool
evidenceDB := dbm.NewMemDB()
blockStore := store.NewBlockStore(dbm.NewMemDB())
evidencePool, err := evidence.NewPool(evidenceDB, stateStore, blockStore)
evidencePool, err := evidence.NewPool(logger, evidenceDB, stateStore, blockStore)
require.NoError(t, err)
evidencePool.SetLogger(logger)
// fill the evidence pool with more evidence
// than can fit in a block


+ 23
- 7
p2p/key.go View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"strings"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
@ -12,15 +13,22 @@ import (
tmos "github.com/tendermint/tendermint/libs/os"
)
// NodeID is a hex-encoded crypto.Address.
// FIXME: We should either ensure this is always lowercased, or add an Equal()
// for comparison that decodes to the binary byte slice first.
type NodeID string
// NodeIDByteLength is the length of a crypto.Address. Currently only 20.
// FIXME: support other length addresses?
const NodeIDByteLength = crypto.AddressSize
// NodeID is a hex-encoded crypto.Address.
type NodeID string
// NewNodeID returns a lowercased (normalized) NodeID.
func NewNodeID(nodeID string) (NodeID, error) {
if _, err := NodeID(nodeID).Bytes(); err != nil {
return NodeID(""), err
}
return NodeID(strings.ToLower(nodeID)), nil
}
// NodeIDFromPubKey returns the noe ID corresponding to the given PubKey. It's
// the hex-encoding of the pubKey.Address().
func NodeIDFromPubKey(pubKey crypto.PubKey) NodeID {
@ -39,15 +47,23 @@ func (id NodeID) Bytes() ([]byte, error) {
// Validate validates the NodeID.
func (id NodeID) Validate() error {
if len(id) == 0 {
return errors.New("no ID")
return errors.New("empty node ID")
}
bz, err := id.Bytes()
if err != nil {
return err
}
if len(bz) != NodeIDByteLength {
return fmt.Errorf("invalid ID length - got %d, expected %d", len(bz), NodeIDByteLength)
return fmt.Errorf("invalid node ID length; got %d, expected %d", len(bz), NodeIDByteLength)
}
idStr := string(id)
if strings.ToLower(idStr) != idStr {
return fmt.Errorf("invalid node ID; must be lowercased")
}
return nil
}


+ 9
- 13
p2p/key_test.go View File

@ -5,7 +5,6 @@ import (
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
tmrand "github.com/tendermint/tendermint/libs/rand"
@ -15,35 +14,32 @@ func TestLoadOrGenNodeKey(t *testing.T) {
filePath := filepath.Join(os.TempDir(), tmrand.Str(12)+"_peer_id.json")
nodeKey, err := LoadOrGenNodeKey(filePath)
assert.Nil(t, err)
require.Nil(t, err)
nodeKey2, err := LoadOrGenNodeKey(filePath)
assert.Nil(t, err)
assert.Equal(t, nodeKey, nodeKey2)
require.Nil(t, err)
require.Equal(t, nodeKey, nodeKey2)
}
func TestLoadNodeKey(t *testing.T) {
filePath := filepath.Join(os.TempDir(), tmrand.Str(12)+"_peer_id.json")
_, err := LoadNodeKey(filePath)
assert.True(t, os.IsNotExist(err))
require.True(t, os.IsNotExist(err))
_, err = LoadOrGenNodeKey(filePath)
require.NoError(t, err)
nodeKey, err := LoadNodeKey(filePath)
assert.NoError(t, err)
assert.NotNil(t, nodeKey)
require.NoError(t, err)
require.NotNil(t, nodeKey)
}
func TestNodeKeySaveAs(t *testing.T) {
filePath := filepath.Join(os.TempDir(), tmrand.Str(12)+"_peer_id.json")
assert.NoFileExists(t, filePath)
require.NoFileExists(t, filePath)
nodeKey := GenNodeKey()
err := nodeKey.SaveAs(filePath)
assert.NoError(t, err)
assert.FileExists(t, filePath)
require.NoError(t, nodeKey.SaveAs(filePath))
require.FileExists(t, filePath)
}

+ 8
- 4
p2p/netaddress.go View File

@ -73,12 +73,16 @@ func NewNetAddressString(addr string) (*NetAddress, error) {
return nil, ErrNetAddressNoID{addr}
}
// get ID
if err := NodeID(spl[0]).Validate(); err != nil {
id, err := NewNodeID(spl[0])
if err != nil {
return nil, ErrNetAddressInvalid{addrWithoutProtocol, err}
}
if err := id.Validate(); err != nil {
return nil, ErrNetAddressInvalid{addrWithoutProtocol, err}
}
var id NodeID
id, addrWithoutProtocol = NodeID(spl[0]), spl[1]
addrWithoutProtocol = spl[1]
// get host and port
host, portStr, err := net.SplitHostPort(addrWithoutProtocol)


+ 2
- 2
p2p/peer.go View File

@ -68,9 +68,9 @@ type PeerUpdatesCh struct {
}
// NewPeerUpdates returns a reference to a new PeerUpdatesCh.
func NewPeerUpdates() *PeerUpdatesCh {
func NewPeerUpdates(updatesCh chan PeerUpdate) *PeerUpdatesCh {
return &PeerUpdatesCh{
updatesCh: make(chan PeerUpdate),
updatesCh: updatesCh,
doneCh: make(chan struct{}),
}
}


+ 4
- 2
p2p/shim.go View File

@ -5,6 +5,7 @@ import (
"sort"
"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/libs/log"
)
// ============================================================================
@ -50,7 +51,7 @@ type (
}
)
func NewReactorShim(name string, descriptors map[ChannelID]*ChannelDescriptorShim) *ReactorShim {
func NewReactorShim(logger log.Logger, name string, descriptors map[ChannelID]*ChannelDescriptorShim) *ReactorShim {
channels := make(map[ChannelID]*ChannelShim)
for _, cds := range descriptors {
@ -60,11 +61,12 @@ func NewReactorShim(name string, descriptors map[ChannelID]*ChannelDescriptorShi
rs := &ReactorShim{
Name: name,
PeerUpdates: NewPeerUpdates(),
PeerUpdates: NewPeerUpdates(make(chan PeerUpdate)),
Channels: channels,
}
rs.BaseReactor = *NewBaseReactor(name, rs)
rs.SetLogger(logger)
return rs
}


+ 2
- 1
p2p/shim_test.go View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
p2pmocks "github.com/tendermint/tendermint/p2p/mocks"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
@ -50,7 +51,7 @@ func setup(t *testing.T, peers []p2p.Peer) *reactorShimTestSuite {
t.Helper()
rts := &reactorShimTestSuite{
shim: p2p.NewReactorShim("TestShim", testChannelShims),
shim: p2p.NewReactorShim(log.TestingLogger(), "TestShim", testChannelShims),
}
rts.sw = p2p.MakeSwitch(p2pCfg, 1, "testing", "123.123.123", func(_ int, sw *p2p.Switch) *p2p.Switch {


+ 5
- 4
statesync/reactor.go View File

@ -320,7 +320,6 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
r.Logger.Error("recovering from processing message panic", "err", err)
}
}()
@ -341,7 +340,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
// processSnapshotCh initiates a blocking process where we listen for and handle
// envelopes on the SnapshotChannel. Any error encountered during message
// execution will result in a PeerError being sent on the SnapshotChannel. When
// the reactor is stopped, we will catch the singal and close the p2p Channel
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processSnapshotCh() {
defer r.snapshotCh.Close()
@ -350,6 +349,7 @@ func (r *Reactor) processSnapshotCh() {
select {
case envelope := <-r.snapshotCh.In():
if err := r.handleMessage(r.snapshotCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process envelope", "ch_id", r.snapshotCh.ID(), "envelope", envelope, "err", err)
r.snapshotCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,
@ -367,7 +367,7 @@ func (r *Reactor) processSnapshotCh() {
// processChunkCh initiates a blocking process where we listen for and handle
// envelopes on the ChunkChannel. Any error encountered during message
// execution will result in a PeerError being sent on the ChunkChannel. When
// the reactor is stopped, we will catch the singal and close the p2p Channel
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processChunkCh() {
defer r.chunkCh.Close()
@ -376,6 +376,7 @@ func (r *Reactor) processChunkCh() {
select {
case envelope := <-r.chunkCh.In():
if err := r.handleMessage(r.chunkCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process envelope", "ch_id", r.chunkCh.ID(), "envelope", envelope, "err", err)
r.chunkCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,
@ -410,7 +411,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
}
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the singal and
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates() {
defer r.peerUpdates.Close()


+ 1
- 1
statesync/reactor_test.go View File

@ -62,7 +62,7 @@ func setup(
chunkInCh: make(chan p2p.Envelope, chBuf),
chunkOutCh: make(chan p2p.Envelope, chBuf),
chunkPeerErrCh: make(chan p2p.PeerError, chBuf),
peerUpdates: p2p.NewPeerUpdates(),
peerUpdates: p2p.NewPeerUpdates(make(chan p2p.PeerUpdate)),
conn: conn,
connQuery: connQuery,
stateProvider: stateProvider,


+ 53
- 26
test/maverick/node/node.go View File

@ -243,9 +243,10 @@ type Node struct {
consensusState *cs.State // latest consensus state
consensusReactor *cs.Reactor // for participating in the consensus
pexReactor *pex.Reactor // for exchanging peer addresses
evidencePool *evidence.Pool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
evidenceReactor *evidence.Reactor
evidencePool *evidence.Pool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer
indexerService *txindex.IndexerService
prometheusSrv *http.Server
@ -380,21 +381,34 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
return mempoolReactor, mempool
}
func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider,
stateDB dbm.DB, blockStore *store.BlockStore, logger log.Logger) (*evidence.Reactor, *evidence.Pool, error) {
func createEvidenceReactor(
config *cfg.Config,
dbProvider DBProvider,
stateDB dbm.DB,
blockStore *store.BlockStore,
logger log.Logger,
) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) {
evidenceDB, err := dbProvider(&DBContext{"evidence", config})
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
evidenceLogger := logger.With("module", "evidence")
evidencePool, err := evidence.NewPool(evidenceDB, sm.NewStore(stateDB), blockStore)
logger = logger.With("module", "evidence")
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
evidenceReactor := evidence.NewReactor(evidencePool)
evidenceReactor.SetLogger(evidenceLogger)
return evidenceReactor, evidencePool, nil
evidenceReactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims)
evidenceReactor := evidence.NewReactor(
logger,
evidenceReactorShim.GetChannel(evidence.EvidenceChannel),
evidenceReactorShim.PeerUpdates,
evidencePool,
)
return evidenceReactorShim, evidenceReactor, evidencePool, nil
}
func createBlockchainReactor(config *cfg.Config,
@ -529,7 +543,7 @@ func createSwitch(config *cfg.Config,
bcReactor p2p.Reactor,
stateSyncReactor *p2p.ReactorShim,
consensusReactor *cs.Reactor,
evidenceReactor *evidence.Reactor,
evidenceReactor *p2p.ReactorShim,
nodeInfo p2p.NodeInfo,
nodeKey p2p.NodeKey,
p2pLogger log.Logger) *p2p.Switch {
@ -751,8 +765,7 @@ func NewNode(config *cfg.Config,
// Make MempoolReactor
mempoolReactor, mempool := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger)
// Make Evidence Reactor
evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger)
evReactorShim, evReactor, evPool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger)
if err != nil {
return nil, err
}
@ -763,7 +776,7 @@ func NewNode(config *cfg.Config,
logger.With("module", "state"),
proxyApp.Consensus(),
mempool,
evidencePool,
evPool,
sm.BlockExecutorWithMetrics(smMetrics),
)
@ -783,15 +796,14 @@ func NewNode(config *cfg.Config,
logger.Info("Setting up maverick consensus reactor", "Misbehaviors", misbehaviors)
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
config, state, blockExec, blockStore, mempool, evPool,
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, misbehaviors)
// Set up state sync reactor, and schedule a sync if requested.
// FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy,
// we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644
stateSyncReactorShim := p2p.NewReactorShim("StateSyncShim", statesync.ChannelShims)
stateSyncReactorShim.SetLogger(logger.With("module", "statesync"))
stateSyncReactorShim := p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims)
stateSyncReactor := statesync.NewReactor(
stateSyncReactorShim.Logger,
@ -813,7 +825,7 @@ func NewNode(config *cfg.Config,
transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp)
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
stateSyncReactorShim, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger,
stateSyncReactorShim, consensusReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
@ -877,7 +889,8 @@ func NewNode(config *cfg.Config,
stateSync: stateSync,
stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
pexReactor: pexReactor,
evidencePool: evidencePool,
evidenceReactor: evReactor,
evidencePool: evPool,
proxyApp: proxyApp,
txIndexer: txIndexer,
indexerService: indexerService,
@ -949,6 +962,11 @@ func (n *Node) OnStart() error {
return err
}
// Start the real evidence reactor separately since the switch uses the shim.
if err := n.evidenceReactor.Start(); err != nil {
return err
}
// Always connect to persistent peers
err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
if err != nil {
@ -992,7 +1010,12 @@ func (n *Node) OnStop() {
// Stop the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Stop(); err != nil {
n.Logger.Error("failed to stop state sync service", "err", err)
n.Logger.Error("failed to stop the state sync reactor", "err", err)
}
// Stop the real evidence reactor separately since the switch uses the shim.
if err := n.evidenceReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the evidence reactor", "err", err)
}
// stop mempool WAL
@ -1312,10 +1335,14 @@ func makeNodeInfo(
Version: version.TMCoreSemVer,
Channels: []byte{
bcChannel,
cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel,
cs.StateChannel,
cs.DataChannel,
cs.VoteChannel,
cs.VoteSetBitsChannel,
mempl.MempoolChannel,
evidence.EvidenceChannel,
byte(statesync.SnapshotChannel), byte(statesync.ChunkChannel),
byte(evidence.EvidenceChannel),
byte(statesync.SnapshotChannel),
byte(statesync.ChunkChannel),
},
Moniker: config.Moniker,
Other: p2p.NodeInfoOther{


Loading…
Cancel
Save