diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 2ebf89d65..6ffce4b98 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -72,9 +72,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // Make a full instance of the evidence pool evidenceDB := dbm.NewMemDB() - evpool, err := evidence.NewPool(evidenceDB, stateStore, blockStore) + evpool, err := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore) require.NoError(t, err) - evpool.SetLogger(logger.With("module", "evidence")) // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) diff --git a/evidence/pool.go b/evidence/pool.go index 15404114e..cf425d988 100644 --- a/evidence/pool.go +++ b/evidence/pool.go @@ -50,30 +50,31 @@ type Pool struct { // NewPool creates an evidence pool. If using an existing evidence store, // it will add all pending evidence to the concurrent list. -func NewPool(evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore) (*Pool, error) { - +func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore) (*Pool, error) { state, err := stateDB.Load() if err != nil { - return nil, fmt.Errorf("cannot load state: %w", err) + return nil, fmt.Errorf("failed to load state: %w", err) } pool := &Pool{ stateDB: stateDB, blockStore: blockStore, state: state, - logger: log.NewNopLogger(), + logger: logger, evidenceStore: evidenceDB, evidenceList: clist.New(), } - // if pending evidence already in db, in event of prior failure, then check for expiration, - // update the size and load it back to the evidenceList + // If pending evidence already in db, in event of prior failure, then check + // for expiration, update the size and load it back to the evidenceList. pool.pruningHeight, pool.pruningTime = pool.removeExpiredPendingEvidence() evList, _, err := pool.listEvidence(prefixPending, -1) if err != nil { return nil, err } + atomic.StoreUint32(&pool.evidenceSize, uint32(len(evList))) + for _, ev := range evList { pool.evidenceList.PushBack(ev) } @@ -81,37 +82,44 @@ func NewPool(evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore) (*Pool, return pool, nil } -// PendingEvidence is used primarily as part of block proposal and returns up to maxNum of uncommitted evidence. +// PendingEvidence is used primarily as part of block proposal and returns up to +// maxNum of uncommitted evidence. func (evpool *Pool) PendingEvidence(maxBytes int64) ([]types.Evidence, int64) { if evpool.Size() == 0 { return []types.Evidence{}, 0 } + evidence, size, err := evpool.listEvidence(prefixPending, maxBytes) if err != nil { - evpool.logger.Error("Unable to retrieve pending evidence", "err", err) + evpool.logger.Error("failed to retrieve pending evidence", "err", err) } + return evidence, size } -// Update pulls the latest state to be used for expiration and evidence params and then prunes all expired evidence +// Update pulls the latest state to be used for expiration and evidence params +// and then prunes all expired evidence. func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) { // sanity check if state.LastBlockHeight <= evpool.state.LastBlockHeight { panic(fmt.Sprintf( - "Failed EvidencePool.Update new state height is less than or equal to previous state height: %d <= %d", + "failed EvidencePool.Update new state height is less than or equal to previous state height: %d <= %d", state.LastBlockHeight, evpool.state.LastBlockHeight, )) } - evpool.logger.Info("Updating evidence pool", "last_block_height", state.LastBlockHeight, - "last_block_time", state.LastBlockTime) - // update the state - evpool.updateState(state) + evpool.logger.Info( + "updating evidence pool", + "last_block_height", state.LastBlockHeight, + "last_block_time", state.LastBlockTime, + ) + evpool.updateState(state) evpool.markEvidenceAsCommitted(ev) - // prune pending evidence when it has expired. This also updates when the next evidence will expire + // Prune pending evidence when it has expired. This also updates when the next + // evidence will expire. if evpool.Size() > 0 && state.LastBlockHeight > evpool.pruningHeight && state.LastBlockTime.After(evpool.pruningTime) { evpool.pruningHeight, evpool.pruningTime = evpool.removeExpiredPendingEvidence() @@ -120,59 +128,56 @@ func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) { // AddEvidence checks the evidence is valid and adds it to the pool. func (evpool *Pool) AddEvidence(ev types.Evidence) error { - evpool.logger.Debug("Attempting to add evidence", "ev", ev) + evpool.logger.Debug("attempting to add evidence", "evidence", ev) // We have already verified this piece of evidence - no need to do it again if evpool.isPending(ev) { - evpool.logger.Info("Evidence already pending, ignoring this one", "ev", ev) + evpool.logger.Info("evidence already pending; ignoring", "evidence", ev) return nil } // check that the evidence isn't already committed if evpool.isCommitted(ev) { - // this can happen if the peer that sent us the evidence is behind so we shouldn't - // punish the peer. - evpool.logger.Debug("Evidence was already committed, ignoring this one", "ev", ev) + // This can happen if the peer that sent us the evidence is behind so we + // shouldn't punish the peer. + evpool.logger.Debug("evidence was already committed; ignoring", "evidence", ev) return nil } // 1) Verify against state. - err := evpool.verify(ev) - if err != nil { - return types.NewErrInvalidEvidence(ev, err) + if err := evpool.verify(ev); err != nil { + return err } // 2) Save to store. if err := evpool.addPendingEvidence(ev); err != nil { - return fmt.Errorf("can't add evidence to pending list: %w", err) + return fmt.Errorf("failed to add evidence to pending list: %w", err) } // 3) Add evidence to clist. evpool.evidenceList.PushBack(ev) - evpool.logger.Info("Verified new evidence of byzantine behavior", "evidence", ev) - + evpool.logger.Info("verified new evidence of byzantine behavior", "evidence", ev) return nil } -// AddEvidenceFromConsensus should be exposed only to the consensus reactor so it can add evidence -// to the pool directly without the need for verification. +// AddEvidenceFromConsensus should be exposed only to the consensus reactor so +// it can add evidence to the pool directly without the need for verification. func (evpool *Pool) AddEvidenceFromConsensus(ev types.Evidence) error { - // we already have this evidence, log this but don't return an error. if evpool.isPending(ev) { - evpool.logger.Info("Evidence already pending, ignoring this one", "ev", ev) + evpool.logger.Info("evidence already pending; ignoring", "evidence", ev) return nil } if err := evpool.addPendingEvidence(ev); err != nil { - return fmt.Errorf("can't add evidence to pending list: %w", err) + return fmt.Errorf("failed to add evidence to pending list: %w", err) } + // add evidence to be gossiped with peers evpool.evidenceList.PushBack(ev) - evpool.logger.Info("Verified new evidence of byzantine behavior", "evidence", ev) - + evpool.logger.Info("verified new evidence of byzantine behavior", "evidence", ev) return nil } @@ -200,10 +205,10 @@ func (evpool *Pool) CheckEvidence(evList types.EvidenceList) error { if err := evpool.addPendingEvidence(ev); err != nil { // Something went wrong with adding the evidence but we already know it is valid // hence we log an error and continue - evpool.logger.Error("Can't add evidence to pending list", "err", err, "ev", ev) + evpool.logger.Error("failed to add evidence to pending list", "err", err, "evidence", ev) } - evpool.logger.Info("Verified new evidence of byzantine behavior", "evidence", ev) + evpool.logger.Info("verified new evidence of byzantine behavior", "evidence", ev) } // check for duplicate evidence. We cache hashes so we don't have to work them out again. @@ -223,16 +228,12 @@ func (evpool *Pool) EvidenceFront() *clist.CElement { return evpool.evidenceList.Front() } -// EvidenceWaitChan is a channel that closes once the first evidence in the list is there. i.e Front is not nil +// EvidenceWaitChan is a channel that closes once the first evidence in the list +// is there. i.e Front is not nil. func (evpool *Pool) EvidenceWaitChan() <-chan struct{} { return evpool.evidenceList.WaitChan() } -// SetLogger sets the Logger. -func (evpool *Pool) SetLogger(l log.Logger) { - evpool.logger = l -} - // Size returns the number of evidence in the pool. func (evpool *Pool) Size() uint32 { return atomic.LoadUint32(&evpool.evidenceSize) @@ -245,10 +246,9 @@ func (evpool *Pool) State() sm.State { return evpool.state } -//-------------------------------------------------------------------------- - -// fastCheck leverages the fact that the evidence pool may have already verified the evidence to see if it can -// quickly conclude that the evidence is already valid. +// fastCheck leverages the fact that the evidence pool may have already verified +// the evidence to see if it can quickly conclude that the evidence is already +// valid. func (evpool *Pool) fastCheck(ev types.Evidence) bool { if lcae, ok := ev.(*types.LightClientAttackEvidence); ok { key := keyPending(ev) @@ -256,25 +256,35 @@ func (evpool *Pool) fastCheck(ev types.Evidence) bool { if evBytes == nil { // the evidence is not in the nodes pending list return false } + if err != nil { - evpool.logger.Error("Failed to load light client attack evidence", "err", err, "key(height/hash)", key) + evpool.logger.Error("failed to load light client attack evidence", "err", err, "key(height/hash)", key) return false } + var trustedPb tmproto.LightClientAttackEvidence - err = trustedPb.Unmarshal(evBytes) - if err != nil { - evpool.logger.Error("Failed to convert light client attack evidence from bytes", - "err", err, "key(height/hash)", key) + + if err = trustedPb.Unmarshal(evBytes); err != nil { + evpool.logger.Error( + "failed to convert light client attack evidence from bytes", + "key(height/hash)", key, + "err", err, + ) return false } + trustedEv, err := types.LightClientAttackEvidenceFromProto(&trustedPb) if err != nil { - evpool.logger.Error("Failed to convert light client attack evidence from protobuf", - "err", err, "key(height/hash)", key) + evpool.logger.Error( + "failed to convert light client attack evidence from protobuf", + "key(height/hash)", key, + "err", err, + ) return false } - // ensure that all the byzantine validators that the evidence pool has match the byzantine validators - // in this evidence + + // Ensure that all the byzantine validators that the evidence pool has match + // the byzantine validators in this evidence. if trustedEv.ByzantineValidators == nil && lcae.ByzantineValidators != nil { return false } @@ -303,7 +313,8 @@ func (evpool *Pool) fastCheck(ev types.Evidence) bool { return true } - // for all other evidence the evidence pool just checks if it is already in the pending db + // For all other evidence the evidence pool just checks if it is already in + // the pending db. return evpool.isPending(ev) } @@ -324,7 +335,7 @@ func (evpool *Pool) isCommitted(evidence types.Evidence) bool { key := keyCommitted(evidence) ok, err := evpool.evidenceStore.Has(key) if err != nil { - evpool.logger.Error("Unable to find committed evidence", "err", err) + evpool.logger.Error("failed to find committed evidence", "err", err) } return ok } @@ -334,7 +345,7 @@ func (evpool *Pool) isPending(evidence types.Evidence) bool { key := keyPending(evidence) ok, err := evpool.evidenceStore.Has(key) if err != nil { - evpool.logger.Error("Unable to find pending evidence", "err", err) + evpool.logger.Error("failed to find pending evidence", "err", err) } return ok } @@ -342,20 +353,21 @@ func (evpool *Pool) isPending(evidence types.Evidence) bool { func (evpool *Pool) addPendingEvidence(ev types.Evidence) error { evpb, err := types.EvidenceToProto(ev) if err != nil { - return fmt.Errorf("unable to convert to proto, err: %w", err) + return fmt.Errorf("failed to convert to proto: %w", err) } evBytes, err := evpb.Marshal() if err != nil { - return fmt.Errorf("unable to marshal evidence: %w", err) + return fmt.Errorf("failed to marshal evidence: %w", err) } key := keyPending(ev) err = evpool.evidenceStore.Set(key, evBytes) if err != nil { - return fmt.Errorf("can't persist evidence: %w", err) + return fmt.Errorf("failed to persist evidence: %w", err) } + atomic.AddUint32(&evpool.evidenceSize, 1) return nil } @@ -363,10 +375,10 @@ func (evpool *Pool) addPendingEvidence(ev types.Evidence) error { func (evpool *Pool) removePendingEvidence(evidence types.Evidence) { key := keyPending(evidence) if err := evpool.evidenceStore.Delete(key); err != nil { - evpool.logger.Error("Unable to delete pending evidence", "err", err) + evpool.logger.Error("failed to delete pending evidence", "err", err) } else { atomic.AddUint32(&evpool.evidenceSize, ^uint32(0)) - evpool.logger.Info("Deleted pending evidence", "evidence", evidence) + evpool.logger.Info("deleted pending evidence", "evidence", evidence) } } @@ -387,13 +399,15 @@ func (evpool *Pool) markEvidenceAsCommitted(evidence types.EvidenceList) { h := gogotypes.Int64Value{Value: ev.Height()} evBytes, err := proto.Marshal(&h) if err != nil { - evpool.logger.Error("failed to marshal committed evidence", "err", err, "key(height/hash)", key) + evpool.logger.Error("failed to marshal committed evidence", "key(height/hash)", key, "err", err) continue } if err := evpool.evidenceStore.Set(key, evBytes); err != nil { - evpool.logger.Error("Unable to save committed evidence", "err", err, "key(height/hash)", key) + evpool.logger.Error("failed to save committed evidence", "key(height/hash)", key, "err", err) } + + evpool.logger.Info("marked evidence as committed", "evidence", ev) } // remove committed evidence from the clist @@ -416,16 +430,19 @@ func (evpool *Pool) listEvidence(prefixKey int64, maxBytes int64) ([]types.Evide if err != nil { return nil, totalSize, fmt.Errorf("database error: %v", err) } + defer iter.Close() for ; iter.Valid(); iter.Next() { var evpb tmproto.Evidence - err := evpb.Unmarshal(iter.Value()) - if err != nil { + + if err := evpb.Unmarshal(iter.Value()); err != nil { return evidence, totalSize, err } + evList.Evidence = append(evList.Evidence, evpb) evSize = int64(evList.Size()) + if maxBytes != -1 && evSize > maxBytes { if err := iter.Error(); err != nil { return evidence, totalSize, err @@ -445,39 +462,48 @@ func (evpool *Pool) listEvidence(prefixKey int64, maxBytes int64) ([]types.Evide if err := iter.Error(); err != nil { return evidence, totalSize, err } + return evidence, totalSize, nil } func (evpool *Pool) removeExpiredPendingEvidence() (int64, time.Time) { iter, err := dbm.IteratePrefix(evpool.evidenceStore, prefixToBytes(prefixPending)) if err != nil { - evpool.logger.Error("Unable to iterate over pending evidence", "err", err) + evpool.logger.Error("failed to iterate over pending evidence", "err", err) return evpool.State().LastBlockHeight, evpool.State().LastBlockTime } + defer iter.Close() + blockEvidenceMap := make(map[string]struct{}) + for ; iter.Valid(); iter.Next() { ev, err := bytesToEv(iter.Value()) if err != nil { - evpool.logger.Error("Error in transition evidence from protobuf", "err", err) + evpool.logger.Error("failed to transition evidence from protobuf", "err", err) continue } + if !evpool.isExpired(ev.Height(), ev.Time()) { if len(blockEvidenceMap) != 0 { evpool.removeEvidenceFromList(blockEvidenceMap) } - // return the height and time with which this evidence will have expired so we know when to prune next + // Return the height and time with which this evidence will have expired + // so we know when to prune next. return ev.Height() + evpool.State().ConsensusParams.Evidence.MaxAgeNumBlocks + 1, ev.Time().Add(evpool.State().ConsensusParams.Evidence.MaxAgeDuration).Add(time.Second) } + evpool.removePendingEvidence(ev) blockEvidenceMap[evMapKey(ev)] = struct{}{} } - // We either have no pending evidence or all evidence has expired + + // we either have no pending evidence or all evidence has expired if len(blockEvidenceMap) != 0 { evpool.removeEvidenceFromList(blockEvidenceMap) } + return evpool.State().LastBlockHeight, evpool.State().LastBlockTime } diff --git a/evidence/pool_test.go b/evidence/pool_test.go index 046f4efc5..f9b540ac0 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -1,7 +1,6 @@ package evidence_test import ( - "os" "testing" "time" @@ -23,12 +22,6 @@ import ( "github.com/tendermint/tendermint/version" ) -func TestMain(m *testing.M) { - - code := m.Run() - os.Exit(code) -} - const evidenceChainID = "test_chain" var ( @@ -52,14 +45,13 @@ func TestEvidencePoolBasic(t *testing.T) { stateStore.On("LoadValidators", mock.AnythingOfType("int64")).Return(valSet, nil) stateStore.On("Load").Return(createState(height+1, valSet), nil) - pool, err := evidence.NewPool(evidenceDB, stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore) require.NoError(t, err) - pool.SetLogger(log.TestingLogger()) // evidence not seen yet: evs, size := pool.PendingEvidence(defaultEvidenceMaxBytes) - assert.Equal(t, 0, len(evs)) - assert.Zero(t, size) + require.Equal(t, 0, len(evs)) + require.Zero(t, size) ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime, privVals[0], evidenceChainID) @@ -71,7 +63,7 @@ func TestEvidencePoolBasic(t *testing.T) { }() // evidence seen but not yet committed: - assert.NoError(t, pool.AddEvidence(ev)) + require.NoError(t, pool.AddEvidence(ev)) select { case <-evAdded: @@ -80,18 +72,17 @@ func TestEvidencePoolBasic(t *testing.T) { } next := pool.EvidenceFront() - assert.Equal(t, ev, next.Value.(types.Evidence)) + require.Equal(t, ev, next.Value.(types.Evidence)) const evidenceBytes int64 = 372 evs, size = pool.PendingEvidence(evidenceBytes) - assert.Equal(t, 1, len(evs)) - assert.Equal(t, evidenceBytes, size) // check that the size of the single evidence in bytes is correct + require.Equal(t, 1, len(evs)) + require.Equal(t, evidenceBytes, size) // check that the size of the single evidence in bytes is correct // shouldn't be able to add evidence twice - assert.NoError(t, pool.AddEvidence(ev)) + require.NoError(t, pool.AddEvidence(ev)) evs, _ = pool.PendingEvidence(defaultEvidenceMaxBytes) - assert.Equal(t, 1, len(evs)) - + require.Equal(t, 1, len(evs)) } // Tests inbound evidence for the right time and height @@ -99,7 +90,7 @@ func TestAddExpiredEvidence(t *testing.T) { var ( val = types.NewMockPV() height = int64(30) - stateStore = initializeValidatorState(val, height) + stateStore = initializeValidatorState(t, val, height) evidenceDB = dbm.NewMemDB() blockStore = &mocks.BlockStore{} expiredEvidenceTime = time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC) @@ -113,7 +104,7 @@ func TestAddExpiredEvidence(t *testing.T) { return &types.BlockMeta{Header: types.Header{Time: expiredEvidenceTime}} }) - pool, err := evidence.NewPool(evidenceDB, stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore) require.NoError(t, err) testCases := []struct { @@ -132,13 +123,14 @@ func TestAddExpiredEvidence(t *testing.T) { for _, tc := range testCases { tc := tc + t.Run(tc.evDescription, func(t *testing.T) { ev := types.NewMockDuplicateVoteEvidenceWithValidator(tc.evHeight, tc.evTime, val, evidenceChainID) err := pool.AddEvidence(ev) if tc.expErr { - assert.Error(t, err) + require.Error(t, err) } else { - assert.NoError(t, err) + require.NoError(t, err) } }) } @@ -146,48 +138,59 @@ func TestAddExpiredEvidence(t *testing.T) { func TestAddEvidenceFromConsensus(t *testing.T) { var height int64 = 10 - pool, val := defaultTestPool(height) + + pool, val := defaultTestPool(t, height) ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime, val, evidenceChainID) - err := pool.AddEvidenceFromConsensus(ev) - assert.NoError(t, err) + + require.NoError(t, pool.AddEvidenceFromConsensus(ev)) next := pool.EvidenceFront() - assert.Equal(t, ev, next.Value.(types.Evidence)) + require.Equal(t, ev, next.Value.(types.Evidence)) // shouldn't be able to submit the same evidence twice - err = pool.AddEvidenceFromConsensus(ev) - assert.NoError(t, err) + require.NoError(t, pool.AddEvidenceFromConsensus(ev)) evs, _ := pool.PendingEvidence(defaultEvidenceMaxBytes) - assert.Equal(t, 1, len(evs)) + require.Equal(t, 1, len(evs)) } func TestEvidencePoolUpdate(t *testing.T) { height := int64(21) - pool, val := defaultTestPool(height) + pool, val := defaultTestPool(t, height) state := pool.State() // create new block (no need to save it to blockStore) - prunedEv := types.NewMockDuplicateVoteEvidenceWithValidator(1, defaultEvidenceTime.Add(1*time.Minute), - val, evidenceChainID) - err := pool.AddEvidence(prunedEv) - require.NoError(t, err) - ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime.Add(21*time.Minute), - val, evidenceChainID) + prunedEv := types.NewMockDuplicateVoteEvidenceWithValidator( + 1, + defaultEvidenceTime.Add(1*time.Minute), + val, + evidenceChainID, + ) + + require.NoError(t, pool.AddEvidence(prunedEv)) + + ev := types.NewMockDuplicateVoteEvidenceWithValidator( + height, + defaultEvidenceTime.Add(21*time.Minute), + val, + evidenceChainID, + ) lastCommit := makeCommit(height, val.PrivKey.PubKey().Address()) block := types.MakeBlock(height+1, []types.Tx{}, lastCommit, []types.Evidence{ev}) + // update state (partially) state.LastBlockHeight = height + 1 state.LastBlockTime = defaultEvidenceTime.Add(22 * time.Minute) - err = pool.CheckEvidence(types.EvidenceList{ev}) - require.NoError(t, err) + + require.NoError(t, pool.CheckEvidence(types.EvidenceList{ev})) pool.Update(state, block.Evidence.Evidence) + // a) Update marks evidence as committed so pending evidence should be empty evList, evSize := pool.PendingEvidence(defaultEvidenceMaxBytes) - assert.Empty(t, evList) - assert.Zero(t, evSize) + require.Empty(t, evList) + require.Zero(t, evSize) // b) If we try to check this evidence again it should fail because it has already been committed - err = pool.CheckEvidence(types.EvidenceList{ev}) + err := pool.CheckEvidence(types.EvidenceList{ev}) if assert.Error(t, err) { assert.Equal(t, "evidence was already committed", err.(*types.ErrInvalidEvidence).Reason.Error()) } @@ -195,21 +198,29 @@ func TestEvidencePoolUpdate(t *testing.T) { func TestVerifyPendingEvidencePasses(t *testing.T) { var height int64 = 1 - pool, val := defaultTestPool(height) - ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime.Add(1*time.Minute), - val, evidenceChainID) - err := pool.AddEvidence(ev) - require.NoError(t, err) - err = pool.CheckEvidence(types.EvidenceList{ev}) - assert.NoError(t, err) + pool, val := defaultTestPool(t, height) + ev := types.NewMockDuplicateVoteEvidenceWithValidator( + height, + defaultEvidenceTime.Add(1*time.Minute), + val, + evidenceChainID, + ) + + require.NoError(t, pool.AddEvidence(ev)) + require.NoError(t, pool.CheckEvidence(types.EvidenceList{ev})) } func TestVerifyDuplicatedEvidenceFails(t *testing.T) { var height int64 = 1 - pool, val := defaultTestPool(height) - ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime.Add(1*time.Minute), - val, evidenceChainID) + pool, val := defaultTestPool(t, height) + ev := types.NewMockDuplicateVoteEvidenceWithValidator( + height, + defaultEvidenceTime.Add(1*time.Minute), + val, + evidenceChainID, + ) + err := pool.CheckEvidence(types.EvidenceList{ev, ev}) if assert.Error(t, err) { assert.Equal(t, "duplicate evidence", err.(*types.ErrInvalidEvidence).Reason.Error()) @@ -224,6 +235,7 @@ func TestCheckEvidenceWithLightClientAttack(t *testing.T) { validatorPower int64 = 10 height int64 = 10 ) + conflictingVals, conflictingPrivVals := types.RandValidatorSet(nValidators, validatorPower) trustedHeader := makeHeaderRandom(height) trustedHeader.Time = defaultEvidenceTime @@ -237,12 +249,14 @@ func TestCheckEvidenceWithLightClientAttack(t *testing.T) { trustedHeader.AppHash = conflictingHeader.AppHash trustedHeader.LastResultsHash = conflictingHeader.LastResultsHash - // for simplicity we are simulating a duplicate vote attack where all the validators in the - // conflictingVals set voted twice + // For simplicity we are simulating a duplicate vote attack where all the + // validators in the conflictingVals set voted twice. blockID := makeBlockID(conflictingHeader.Hash(), 1000, []byte("partshash")) voteSet := types.NewVoteSet(evidenceChainID, height, 1, tmproto.SignedMsgType(2), conflictingVals) + commit, err := types.MakeCommit(blockID, height, 1, voteSet, conflictingPrivVals, defaultEvidenceTime) require.NoError(t, err) + ev := &types.LightClientAttackEvidence{ ConflictingBlock: &types.LightBlock{ SignedHeader: &types.SignedHeader{ @@ -259,8 +273,14 @@ func TestCheckEvidenceWithLightClientAttack(t *testing.T) { trustedBlockID := makeBlockID(trustedHeader.Hash(), 1000, []byte("partshash")) trustedVoteSet := types.NewVoteSet(evidenceChainID, height, 1, tmproto.SignedMsgType(2), conflictingVals) - trustedCommit, err := types.MakeCommit(trustedBlockID, height, 1, trustedVoteSet, conflictingPrivVals, - defaultEvidenceTime) + trustedCommit, err := types.MakeCommit( + trustedBlockID, + height, + 1, + trustedVoteSet, + conflictingPrivVals, + defaultEvidenceTime, + ) require.NoError(t, err) state := sm.State{ @@ -268,28 +288,25 @@ func TestCheckEvidenceWithLightClientAttack(t *testing.T) { LastBlockHeight: 11, ConsensusParams: *types.DefaultConsensusParams(), } + stateStore := &smmocks.Store{} stateStore.On("LoadValidators", height).Return(conflictingVals, nil) stateStore.On("Load").Return(state, nil) + blockStore := &mocks.BlockStore{} blockStore.On("LoadBlockMeta", height).Return(&types.BlockMeta{Header: *trustedHeader}) blockStore.On("LoadBlockCommit", height).Return(trustedCommit) - pool, err := evidence.NewPool(dbm.NewMemDB(), stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) require.NoError(t, err) - pool.SetLogger(log.TestingLogger()) - err = pool.AddEvidence(ev) - assert.NoError(t, err) + require.NoError(t, pool.AddEvidence(ev)) + require.NoError(t, pool.CheckEvidence(types.EvidenceList{ev})) - err = pool.CheckEvidence(types.EvidenceList{ev}) - assert.NoError(t, err) - - // take away the last signature -> there are less validators then what we have detected, - // hence this should fail + // Take away the last signature -> there are less validators then what we have detected, + // hence this should fail. commit.Signatures = append(commit.Signatures[:nValidators-1], types.NewCommitSigAbsent()) - err = pool.CheckEvidence(types.EvidenceList{ev}) - assert.Error(t, err) + require.Error(t, pool.CheckEvidence(types.EvidenceList{ev})) } // Tests that restarting the evidence pool after a potential failure will recover the @@ -299,23 +316,33 @@ func TestRecoverPendingEvidence(t *testing.T) { val := types.NewMockPV() valAddress := val.PrivKey.PubKey().Address() evidenceDB := dbm.NewMemDB() - stateStore := initializeValidatorState(val, height) + stateStore := initializeValidatorState(t, val, height) + state, err := stateStore.Load() require.NoError(t, err) + blockStore := initializeBlockStore(dbm.NewMemDB(), state, valAddress) + // create previous pool and populate it - pool, err := evidence.NewPool(evidenceDB, stateStore, blockStore) - require.NoError(t, err) - pool.SetLogger(log.TestingLogger()) - goodEvidence := types.NewMockDuplicateVoteEvidenceWithValidator(height, - defaultEvidenceTime.Add(10*time.Minute), val, evidenceChainID) - expiredEvidence := types.NewMockDuplicateVoteEvidenceWithValidator(int64(1), - defaultEvidenceTime.Add(1*time.Minute), val, evidenceChainID) - err = pool.AddEvidence(goodEvidence) - require.NoError(t, err) - err = pool.AddEvidence(expiredEvidence) + pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore) require.NoError(t, err) + goodEvidence := types.NewMockDuplicateVoteEvidenceWithValidator( + height, + defaultEvidenceTime.Add(10*time.Minute), + val, + evidenceChainID, + ) + expiredEvidence := types.NewMockDuplicateVoteEvidenceWithValidator( + int64(1), + defaultEvidenceTime.Add(1*time.Minute), + val, + evidenceChainID, + ) + + require.NoError(t, pool.AddEvidence(goodEvidence)) + require.NoError(t, pool.AddEvidence(expiredEvidence)) + // now recover from the previous pool at a different time newStateStore := &smmocks.Store{} newStateStore.On("Load").Return(sm.State{ @@ -333,16 +360,18 @@ func TestRecoverPendingEvidence(t *testing.T) { }, }, }, nil) - newPool, err := evidence.NewPool(evidenceDB, newStateStore, blockStore) - assert.NoError(t, err) + + newPool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, newStateStore, blockStore) + require.NoError(t, err) + evList, _ := newPool.PendingEvidence(defaultEvidenceMaxBytes) - assert.Equal(t, 1, len(evList)) - next := newPool.EvidenceFront() - assert.Equal(t, goodEvidence, next.Value.(types.Evidence)) + require.Equal(t, 1, len(evList)) + next := newPool.EvidenceFront() + require.Equal(t, goodEvidence, next.Value.(types.Evidence)) } -func initializeStateFromValidatorSet(valSet *types.ValidatorSet, height int64) sm.Store { +func initializeStateFromValidatorSet(t *testing.T, valSet *types.ValidatorSet, height int64) sm.Store { stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB) state := sm.State{ @@ -370,16 +399,13 @@ func initializeStateFromValidatorSet(valSet *types.ValidatorSet, height int64) s // save all states up to height for i := int64(0); i <= height; i++ { state.LastBlockHeight = i - if err := stateStore.Save(state); err != nil { - panic(err) - } + require.NoError(t, stateStore.Save(state)) } return stateStore } -func initializeValidatorState(privVal types.PrivValidator, height int64) sm.Store { - +func initializeValidatorState(t *testing.T, privVal types.PrivValidator, height int64) sm.Store { pubKey, _ := privVal.GetPubKey() validator := &types.Validator{Address: pubKey.Address(), VotingPower: 10, PubKey: pubKey} @@ -389,7 +415,7 @@ func initializeValidatorState(privVal types.PrivValidator, height int64) sm.Stor Proposer: validator, } - return initializeStateFromValidatorSet(valSet, height) + return initializeStateFromValidatorSet(t, valSet, height) } // initializeBlockStore creates a block storage and populates it w/ a dummy @@ -420,21 +446,21 @@ func makeCommit(height int64, valAddr []byte) *types.Commit { Timestamp: defaultEvidenceTime, Signature: []byte("Signature"), }} + return types.NewCommit(height, 0, types.BlockID{}, commitSigs) } -func defaultTestPool(height int64) (*evidence.Pool, types.MockPV) { +func defaultTestPool(t *testing.T, height int64) (*evidence.Pool, types.MockPV) { val := types.NewMockPV() valAddress := val.PrivKey.PubKey().Address() evidenceDB := dbm.NewMemDB() - stateStore := initializeValidatorState(val, height) + stateStore := initializeValidatorState(t, val, height) state, _ := stateStore.Load() blockStore := initializeBlockStore(dbm.NewMemDB(), state, valAddress) - pool, err := evidence.NewPool(evidenceDB, stateStore, blockStore) - if err != nil { - panic("test evidence pool could not be created") - } - pool.SetLogger(log.TestingLogger()) + + pool, err := evidence.NewPool(log.TestingLogger(), evidenceDB, stateStore, blockStore) + require.NoError(t, err, "test evidence pool could not be created") + return pool, val } diff --git a/evidence/reactor.go b/evidence/reactor.go index dad07bf3f..ef1230c57 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -2,17 +2,42 @@ package evidence import ( "fmt" + "sync" "time" clist "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" + tmsync "github.com/tendermint/tendermint/libs/sync" "github.com/tendermint/tendermint/p2p" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/tendermint/tendermint/types" ) +var ( + _ service.Service = (*Reactor)(nil) + + // ChannelShims contains a map of ChannelDescriptorShim objects, where each + // object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding + // p2p proto.Message the new p2p Channel is responsible for handling. + // + // + // TODO: Remove once p2p refactor is complete. + // ref: https://github.com/tendermint/tendermint/issues/5670 + ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ + EvidenceChannel: { + MsgType: new(tmproto.EvidenceList), + Descriptor: &p2p.ChannelDescriptor{ + ID: byte(EvidenceChannel), + Priority: 6, + RecvMessageCapacity: maxMsgSize, + }, + }, + } +) + const ( - EvidenceChannel = byte(0x38) + EvidenceChannel = p2p.ChannelID(0x38) maxMsgSize = 1048576 // 1MB TODO make it configurable @@ -21,233 +46,339 @@ const ( // Most evidence should be committed in the very next block that is why we wait // just over the block production rate before sending evidence again. broadcastEvidenceIntervalS = 10 - // If a message fails wait this much before sending it again - peerRetryMessageIntervalMS = 100 ) +type closer struct { + closeOnce sync.Once + doneCh chan struct{} +} + +func newCloser() *closer { + return &closer{doneCh: make(chan struct{})} +} + +func (c *closer) close() { + c.closeOnce.Do(func() { + close(c.doneCh) + }) +} + // Reactor handles evpool evidence broadcasting amongst peers. type Reactor struct { - p2p.BaseReactor - evpool *Pool - eventBus *types.EventBus + service.BaseService + + evpool *Pool + eventBus *types.EventBus + evidenceCh *p2p.Channel + peerUpdates *p2p.PeerUpdatesCh + closeCh chan struct{} + + peerWG sync.WaitGroup + + mtx tmsync.Mutex + peerRoutines map[p2p.NodeID]*closer } -// NewReactor returns a new Reactor with the given config and evpool. -func NewReactor(evpool *Pool) *Reactor { - evR := &Reactor{ - evpool: evpool, +// NewReactor returns a reference to a new evidence reactor, which implements the +// service.Service interface. It accepts a p2p Channel dedicated for handling +// envelopes with EvidenceList messages. +func NewReactor( + logger log.Logger, + evidenceCh *p2p.Channel, + peerUpdates *p2p.PeerUpdatesCh, + evpool *Pool, +) *Reactor { + r := &Reactor{ + evpool: evpool, + evidenceCh: evidenceCh, + peerUpdates: peerUpdates, + closeCh: make(chan struct{}), + peerRoutines: make(map[p2p.NodeID]*closer), } - evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR) - return evR + + r.BaseService = *service.NewBaseService(logger, "Evidence", r) + return r } -// SetLogger sets the Logger on the reactor and the underlying Evidence. -func (evR *Reactor) SetLogger(l log.Logger) { - evR.Logger = l - evR.evpool.SetLogger(l) +// SetEventBus implements events.Eventable. +func (r *Reactor) SetEventBus(b *types.EventBus) { + r.eventBus = b } -// GetChannels implements Reactor. -// It returns the list of channels for this reactor. -func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor { - return []*p2p.ChannelDescriptor{ - { - ID: EvidenceChannel, - Priority: 6, - RecvMessageCapacity: maxMsgSize, - }, +// OnStart starts separate go routines for each p2p Channel and listens for +// envelopes on each. In addition, it also listens for peer updates and handles +// messages on that p2p channel accordingly. The caller must be sure to execute +// OnStop to ensure the outbound p2p Channels are closed. No error is returned. +func (r *Reactor) OnStart() error { + go r.processEvidenceCh() + go r.processPeerUpdates() + + return nil +} + +// OnStop stops the reactor by signaling to all spawned goroutines to exit and +// blocking until they all exit. +func (r *Reactor) OnStop() { + r.mtx.Lock() + for _, c := range r.peerRoutines { + c.close() } + r.mtx.Unlock() + + // Wait for all spawned peer evidence broadcasting goroutines to gracefully + // exit. + r.peerWG.Wait() + + // Close closeCh to signal to all spawned goroutines to gracefully exit. All + // p2p Channels should execute Close(). + close(r.closeCh) + + // Wait for all p2p Channels to be closed before returning. This ensures we + // can easily reason about synchronization of all p2p Channels and ensure no + // panics will occur. + <-r.evidenceCh.Done() + <-r.peerUpdates.Done() } -// AddPeer implements Reactor. -func (evR *Reactor) AddPeer(peer p2p.Peer) { - go evR.broadcastEvidenceRoutine(peer) +// handleEvidenceMessage handles enevelopes sent from peers on the EvidenceChannel. +// It returns an error only if the Envelope.Message is unknown for this channel +// or if the given evidence is invalid. This should never be called outside of +// handleMessage. +func (r *Reactor) handleEvidenceMessage(envelope p2p.Envelope) error { + logger := r.Logger.With("peer", envelope.From) + + switch msg := envelope.Message.(type) { + case *tmproto.EvidenceList: + logger.Debug("received evidence list", "num_evidence", len(msg.Evidence)) + + // TODO: Refactor the Evidence type to not contain a list since we only ever + // send and receive one piece of evidence at a time. Or potentially consider + // batching evidence. + // + // see: https://github.com/tendermint/tendermint/issues/4729 + for i := 0; i < len(msg.Evidence); i++ { + ev, err := types.EvidenceFromProto(&msg.Evidence[i]) + if err != nil { + logger.Error("failed to convert evidence", "err", err) + continue + } + + if err := r.evpool.AddEvidence(ev); err != nil { + // If we're given invalid evidence by the peer, notify the router that + // we should remove this peer by returning an error. + if _, ok := err.(*types.ErrInvalidEvidence); ok { + return err + } + } + } + + default: + return fmt.Errorf("received unknown message: %T", msg) + } + + return nil } -// Receive implements Reactor. -// It adds any received evidence to the evpool. -// XXX: do not call any methods that can block or incur heavy processing. -// https://github.com/tendermint/tendermint/issues/2888 -func (evR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - evis, err := decodeMsg(msgBytes) - if err != nil { - evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) - evR.Switch.StopPeerForError(src, err) - return +// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. +// It will handle errors and any possible panics gracefully. A caller can handle +// any error returned by sending a PeerError on the respective channel. +func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) { + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("panic in processing message: %v", e) + } + }() + + switch chID { + case EvidenceChannel: + err = r.handleEvidenceMessage(envelope) + + default: + err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) } - for _, ev := range evis { - err := evR.evpool.AddEvidence(ev) - switch err.(type) { - case *types.ErrInvalidEvidence: - evR.Logger.Error(err.Error()) - // punish peer - evR.Switch.StopPeerForError(src, err) + return err +} + +// processEvidenceCh implements a blocking event loop where we listen for p2p +// Envelope messages from the evidenceCh. +func (r *Reactor) processEvidenceCh() { + defer r.evidenceCh.Close() + + for { + select { + case envelope := <-r.evidenceCh.In(): + if err := r.handleMessage(r.evidenceCh.ID(), envelope); err != nil { + r.Logger.Error("failed to process message", "ch_id", r.evidenceCh.ID(), "envelope", envelope, "err", err) + r.evidenceCh.Error() <- p2p.PeerError{ + PeerID: envelope.From, + Err: err, + Severity: p2p.PeerErrorSeverityLow, + } + } + + case <-r.closeCh: + r.Logger.Debug("stopped listening on evidence channel; closing...") return - case nil: - default: - // continue to the next piece of evidence - evR.Logger.Error("Evidence has not been added", "evidence", evis, "err", err) } } } -// SetEventBus implements events.Eventable. -func (evR *Reactor) SetEventBus(b *types.EventBus) { - evR.eventBus = b +// processPeerUpdate processes a PeerUpdate, returning an error upon failing to +// handle the PeerUpdate or if a panic is recovered. For new or live peers it +// will check if an evidence broadcasting goroutine needs to be started. For +// down or removed peers, it will check if an evidence broadcasting goroutine +// exists and signal that it should exit. +// +// FIXME: The peer may be behind in which case it would simply ignore the +// evidence and treat it as invalid. This would cause the peer to disconnect. +// The peer may also receive the same piece of evidence multiple times if it +// connects/disconnects frequently from the broadcasting peer(s). +// +// REF: https://github.com/tendermint/tendermint/issues/4727 +func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { + r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status) + + r.mtx.Lock() + defer r.mtx.Unlock() + + switch peerUpdate.Status { + case p2p.PeerStatusUp: + // Do not allow starting new evidence broadcast loops after reactor shutdown + // has been initiated. This can happen after we've manually closed all + // peer broadcast loops and closed r.closeCh, but the router still sends + // in-flight peer updates. + if !r.IsRunning() { + return + } + + // Check if we've already started a goroutine for this peer, if not we create + // a new done channel so we can explicitly close the goroutine if the peer + // is later removed, we increment the waitgroup so the reactor can stop + // safely, and finally start the goroutine to broadcast evidence to that peer. + _, ok := r.peerRoutines[peerUpdate.PeerID] + if !ok { + closer := newCloser() + + r.peerRoutines[peerUpdate.PeerID] = closer + r.peerWG.Add(1) + go r.broadcastEvidenceLoop(peerUpdate.PeerID, closer) + } + + case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned: + // Check if we've started an evidence broadcasting goroutine for this peer. + // If we have, we signal to terminate the goroutine via the channel's closure. + // This will internally decrement the peer waitgroup and remove the peer + // from the map of peer evidence broadcasting goroutines. + closer, ok := r.peerRoutines[peerUpdate.PeerID] + if ok { + closer.close() + } + } +} + +// processPeerUpdates initiates a blocking process where we listen for and handle +// PeerUpdate messages. When the reactor is stopped, we will catch the signal and +// close the p2p PeerUpdatesCh gracefully. +func (r *Reactor) processPeerUpdates() { + defer r.peerUpdates.Close() + + for { + select { + case peerUpdate := <-r.peerUpdates.Updates(): + r.processPeerUpdate(peerUpdate) + + case <-r.closeCh: + r.Logger.Debug("stopped listening on peer updates channel; closing...") + return + } + } } -// Modeled after the mempool routine. -// - Evidence accumulates in a clist. -// - Each peer has a routine that iterates through the clist, -// sending available evidence to the peer. -// - If we're waiting for new evidence and the list is not empty, -// start iterating from the beginning again. -func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) { +// broadcastEvidenceLoop starts a blocking process that continuously reads pieces +// of evidence off of a linked-list and sends the evidence in a p2p Envelope to +// the given peer by ID. This should be invoked in a goroutine per unique peer +// ID via an appropriate PeerUpdate. The goroutine can be signaled to gracefully +// exit by either explicitly closing the provided doneCh or by the reactor +// signaling to stop. +// +// TODO: This should be refactored so that we do not blindly gossip evidence +// that the peer has already received or may not be ready for. +// +// REF: https://github.com/tendermint/tendermint/issues/4727 +func (r *Reactor) broadcastEvidenceLoop(peerID p2p.NodeID, closer *closer) { var next *clist.CElement + + defer func() { + r.mtx.Lock() + delete(r.peerRoutines, peerID) + r.mtx.Unlock() + + r.peerWG.Done() + + if e := recover(); e != nil { + r.Logger.Error("recovering from broadcasting evidence loop", "err", e) + } + }() + for { // This happens because the CElement we were looking at got garbage - // collected (removed). That is, .NextWait() returned nil. Go ahead and - // start from the beginning. + // collected (removed). That is, .NextWaitChan() returned nil. So we can go + // ahead and start from the beginning. if next == nil { select { - case <-evR.evpool.EvidenceWaitChan(): // Wait until evidence is available - if next = evR.evpool.EvidenceFront(); next == nil { + case <-r.evpool.EvidenceWaitChan(): // wait until next evidence is available + if next = r.evpool.EvidenceFront(); next == nil { continue } - case <-peer.Quit(): + + case <-closer.doneCh: + // The peer is marked for removal via a PeerUpdate as the doneCh was + // explicitly closed to signal we should exit. return - case <-evR.Quit(): + + case <-r.closeCh: + // The reactor has signaled that we are stopped and thus we should + // implicitly exit this peer's goroutine. return } } ev := next.Value.(types.Evidence) - evis := evR.prepareEvidenceMessage(peer, ev) - if len(evis) > 0 { - msgBytes, err := encodeMsg(evis) - if err != nil { - panic(err) - } - evR.Logger.Debug("Gossiping evidence to peer", "ev", ev, "peer", peer.ID()) - success := peer.Send(EvidenceChannel, msgBytes) - if !success { - time.Sleep(peerRetryMessageIntervalMS * time.Millisecond) - continue - } + evProto, err := types.EvidenceToProto(ev) + if err != nil { + panic(fmt.Errorf("failed to convert evidence: %w", err)) + } + + // Send the evidence to the corresponding peer. Note, the peer may be behind + // and thus would not be able to process the evidence correctly. Also, the + // peer may receive this piece of evidence multiple times if it added and + // removed frequently from the broadcasting peer. + r.evidenceCh.Out() <- p2p.Envelope{ + To: peerID, + Message: &tmproto.EvidenceList{ + Evidence: []tmproto.Evidence{*evProto}, + }, } + r.Logger.Debug("gossiped evidence to peer", "evidence", ev, "peer", peerID) - afterCh := time.After(time.Second * broadcastEvidenceIntervalS) select { - case <-afterCh: - // start from the beginning every tick. - // TODO: only do this if we're at the end of the list! + case <-time.After(time.Second * broadcastEvidenceIntervalS): + // start from the beginning after broadcastEvidenceIntervalS seconds next = nil + case <-next.NextWaitChan(): - // see the start of the for loop for nil check next = next.Next() - case <-peer.Quit(): - return - case <-evR.Quit(): - return - } - } -} - -// Returns the message to send to the peer, or nil if the evidence is invalid for the peer. -// If message is nil, we should sleep and try again. -func (evR Reactor) prepareEvidenceMessage( - peer p2p.Peer, - ev types.Evidence, -) (evis []types.Evidence) { - - // make sure the peer is up to date - evHeight := ev.Height() - peerState, ok := peer.Get(types.PeerStateKey).(PeerState) - if !ok { - // Peer does not have a state yet. We set it in the consensus reactor, but - // when we add peer in Switch, the order we call reactors#AddPeer is - // different every time due to us using a map. Sometimes other reactors - // will be initialized before the consensus reactor. We should wait a few - // milliseconds and retry. - return nil - } - - // NOTE: We only send evidence to peers where - // peerHeight - maxAge < evidenceHeight < peerHeight - var ( - peerHeight = peerState.GetHeight() - params = evR.evpool.State().ConsensusParams.Evidence - ageNumBlocks = peerHeight - evHeight - ) - - if peerHeight <= evHeight { // peer is behind. sleep while he catches up - return nil - } else if ageNumBlocks > params.MaxAgeNumBlocks { // evidence is too old relative to the peer, skip - - // NOTE: if evidence is too old for an honest peer, then we're behind and - // either it already got committed or it never will! - evR.Logger.Info("Not sending peer old evidence", - "peerHeight", peerHeight, - "evHeight", evHeight, - "maxAgeNumBlocks", params.MaxAgeNumBlocks, - "lastBlockTime", evR.evpool.State().LastBlockTime, - "maxAgeDuration", params.MaxAgeDuration, - "peer", peer, - ) - - return nil - } - - // send evidence - return []types.Evidence{ev} -} -// PeerState describes the state of a peer. -type PeerState interface { - GetHeight() int64 -} - -// encodemsg takes a array of evidence -// returns the byte encoding of the List Message -func encodeMsg(evis []types.Evidence) ([]byte, error) { - evi := make([]tmproto.Evidence, len(evis)) - for i := 0; i < len(evis); i++ { - ev, err := types.EvidenceToProto(evis[i]) - if err != nil { - return nil, err - } - evi[i] = *ev - } - epl := tmproto.EvidenceList{ - Evidence: evi, - } - - return epl.Marshal() -} - -// decodemsg takes an array of bytes -// returns an array of evidence -func decodeMsg(bz []byte) (evis []types.Evidence, err error) { - lm := tmproto.EvidenceList{} - if err := lm.Unmarshal(bz); err != nil { - return nil, err - } - - evis = make([]types.Evidence, len(lm.Evidence)) - for i := 0; i < len(lm.Evidence); i++ { - ev, err := types.EvidenceFromProto(&lm.Evidence[i]) - if err != nil { - return nil, err - } - evis[i] = ev - } + case <-closer.doneCh: + // The peer is marked for removal via a PeerUpdate as the doneCh was + // explicitly closed to signal we should exit. + return - for i, ev := range evis { - if err := ev.ValidateBasic(); err != nil { - return nil, fmt.Errorf("invalid evidence (#%d): %v", i, err) + case <-r.closeCh: + // The reactor has signaled that we are stopped and thus we should + // implicitly exit this peer's goroutine. + return } } - - return evis, nil } diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 170b45348..0958b260f 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -3,18 +3,16 @@ package evidence_test import ( "encoding/hex" "fmt" + "math/rand" "sync" "testing" "time" - "github.com/go-kit/kit/log/term" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" - cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/tmhash" "github.com/tendermint/tendermint/evidence" @@ -28,310 +26,565 @@ import ( var ( numEvidence = 10 - timeout = 120 * time.Second // ridiculously high because CircleCI is slow + + rng = rand.New(rand.NewSource(time.Now().UnixNano())) ) -// We have N evidence reactors connected to one another. The first reactor -// receives a number of evidence at varying heights. We test that all -// other reactors receive the evidence and add it to their own respective -// evidence pools. +type reactorTestSuite struct { + reactor *evidence.Reactor + pool *evidence.Pool + + peerID p2p.NodeID + + evidenceChannel *p2p.Channel + evidenceInCh chan p2p.Envelope + evidenceOutCh chan p2p.Envelope + evidencePeerErrCh chan p2p.PeerError + + peerUpdatesCh chan p2p.PeerUpdate + peerUpdates *p2p.PeerUpdatesCh +} + +func setup(t *testing.T, logger log.Logger, pool *evidence.Pool, chBuf uint) *reactorTestSuite { + t.Helper() + + pID := make([]byte, 16) + _, err := rng.Read(pID) + require.NoError(t, err) + + peerUpdatesCh := make(chan p2p.PeerUpdate) + + rts := &reactorTestSuite{ + pool: pool, + evidenceInCh: make(chan p2p.Envelope, chBuf), + evidenceOutCh: make(chan p2p.Envelope, chBuf), + evidencePeerErrCh: make(chan p2p.PeerError, chBuf), + peerUpdatesCh: peerUpdatesCh, + peerUpdates: p2p.NewPeerUpdates(peerUpdatesCh), + peerID: p2p.NodeID(fmt.Sprintf("%x", pID)), + } + + rts.evidenceChannel = p2p.NewChannel( + evidence.EvidenceChannel, + new(tmproto.EvidenceList), + rts.evidenceInCh, + rts.evidenceOutCh, + rts.evidencePeerErrCh, + ) + + rts.reactor = evidence.NewReactor( + logger, + rts.evidenceChannel, + rts.peerUpdates, + pool, + ) + + require.NoError(t, rts.reactor.Start()) + require.True(t, rts.reactor.IsRunning()) + + t.Cleanup(func() { + require.NoError(t, rts.reactor.Stop()) + require.False(t, rts.reactor.IsRunning()) + }) + + return rts +} + +func createTestSuites(t *testing.T, stateStores []sm.Store, chBuf uint) []*reactorTestSuite { + t.Helper() + + numSStores := len(stateStores) + testSuites := make([]*reactorTestSuite, numSStores) + evidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC) + + for i := 0; i < numSStores; i++ { + logger := log.TestingLogger().With("validator", i) + evidenceDB := dbm.NewMemDB() + blockStore := &mocks.BlockStore{} + blockStore.On("LoadBlockMeta", mock.AnythingOfType("int64")).Return( + &types.BlockMeta{Header: types.Header{Time: evidenceTime}}, + ) + + pool, err := evidence.NewPool(logger, evidenceDB, stateStores[i], blockStore) + require.NoError(t, err) + + testSuites[i] = setup(t, logger, pool, chBuf) + } + + return testSuites +} + +func waitForEvidence(t *testing.T, evList types.EvidenceList, suites ...*reactorTestSuite) { + t.Helper() + + wg := new(sync.WaitGroup) + + for _, suite := range suites { + wg.Add(1) + + go func(s *reactorTestSuite) { + var localEvList []types.Evidence + + currentPoolSize := 0 + for currentPoolSize != len(evList) { + // each evidence should not be more than 500 bytes + localEvList, _ = s.pool.PendingEvidence(int64(len(evList) * 500)) + currentPoolSize = len(localEvList) + } + + // put the reaped evidence in a map so we can quickly check we got everything + evMap := make(map[string]types.Evidence) + for _, e := range localEvList { + evMap[string(e.Hash())] = e + } + + for i, expectedEv := range evList { + gotEv := evMap[string(expectedEv.Hash())] + require.Equalf( + t, + expectedEv, + gotEv, + "evidence at index %d in pool does not match; got: %v, expected: %v", i, gotEv, expectedEv, + ) + } + + wg.Done() + }(suite) + } + + // wait for the evidence in all evidence pools + wg.Wait() +} + +func createEvidenceList( + t *testing.T, + pool *evidence.Pool, + val types.PrivValidator, + numEvidence int, +) types.EvidenceList { + evList := make([]types.Evidence, numEvidence) + for i := 0; i < numEvidence; i++ { + ev := types.NewMockDuplicateVoteEvidenceWithValidator( + int64(i+1), + time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC), + val, + evidenceChainID, + ) + + require.NoError(t, pool.AddEvidence(ev)) + evList[i] = ev + } + + return evList +} + +// simulateRouter will increment the provided WaitGroup and execute a simulated +// router where, for each outbound p2p Envelope from the primary reactor, we +// proxy (send) the Envelope the relevant peer reactor. Done is invoked on the +// WaitGroup when numOut Envelopes are sent (i.e. read from the outbound channel). +func simulateRouter(wg *sync.WaitGroup, primary *reactorTestSuite, suites []*reactorTestSuite, numOut int) { + wg.Add(1) + + // create a mapping for efficient suite lookup by peer ID + suitesByPeerID := make(map[p2p.NodeID]*reactorTestSuite) + for _, suite := range suites { + suitesByPeerID[suite.peerID] = suite + } + + // Simulate a router by listening for all outbound envelopes and proxying the + // envelope to the respective peer (suite). + go func() { + for i := 0; i < numOut; i++ { + envelope := <-primary.evidenceOutCh + other := suitesByPeerID[envelope.To] + + other.evidenceInCh <- p2p.Envelope{ + From: primary.peerID, + To: envelope.To, + Message: envelope.Message, + } + } + + wg.Done() + }() +} + +func TestReactorMultiDisconnect(t *testing.T) { + val := types.NewMockPV() + height := int64(numEvidence) + 10 + + stateDB1 := initializeValidatorState(t, val, height) + stateDB2 := initializeValidatorState(t, val, height) + + testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, 20) + primary := testSuites[0] + secondary := testSuites[1] + + _ = createEvidenceList(t, primary.pool, val, numEvidence) + + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: secondary.peerID, + } + + // Ensure "disconnecting" the secondary peer from the primary more than once + // is handled gracefully. + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusDown, + PeerID: secondary.peerID, + } + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusDown, + PeerID: secondary.peerID, + } +} + +// TestReactorBroadcastEvidence creates an environment of multiple peers that +// are all at the same height. One peer, designated as a primary, gossips all +// evidence to the remaining peers. func TestReactorBroadcastEvidence(t *testing.T) { - config := cfg.TestConfig() - N := 7 + numPeers := 7 - // create statedb for everyone - stateDBs := make([]sm.Store, N) + // create a stateDB for all test suites (nodes) + stateDBs := make([]sm.Store, numPeers) val := types.NewMockPV() - // we need validators saved for heights at least as high as we have evidence for + + // We need all validators saved for heights at least as high as we have + // evidence for. height := int64(numEvidence) + 10 - for i := 0; i < N; i++ { - stateDBs[i] = initializeValidatorState(val, height) + for i := 0; i < numPeers; i++ { + stateDBs[i] = initializeValidatorState(t, val, height) } - // make reactors from statedb - reactors, pools := makeAndConnectReactorsAndPools(config, stateDBs) + // Create a series of test suites where each suite contains a reactor and + // evidence pool. In addition, we mark a primary suite and the rest are + // secondaries where each secondary is added as a peer via a PeerUpdate to the + // primary. As a result, the primary will gossip all evidence to each secondary. + testSuites := createTestSuites(t, stateDBs, 0) + primary := testSuites[0] + secondaries := testSuites[1:] - // set the peer height on each reactor - for _, r := range reactors { - for _, peer := range r.Switch.Peers().List() { - ps := peerState{height} - peer.Set(types.PeerStateKey, ps) + // Simulate a router by listening for all outbound envelopes and proxying the + // envelopes to the respective peer (suite). + wg := new(sync.WaitGroup) + simulateRouter(wg, primary, testSuites, numEvidence*len(secondaries)) + + evList := createEvidenceList(t, primary.pool, val, numEvidence) + + // Add each secondary suite (node) as a peer to the primary suite (node). This + // will cause the primary to gossip all evidence to the secondaries. + for _, suite := range secondaries { + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: suite.peerID, } } - // send a bunch of valid evidence to the first reactor's evpool - // and wait for them all to be received in the others - evList := sendEvidence(t, pools[0], val, numEvidence) - waitForEvidence(t, evList, pools) -} + // Wait till all secondary suites (reactor) received all evidence from the + // primary suite (node). + waitForEvidence(t, evList, secondaries...) -// We have two evidence reactors connected to one another but are at different heights. -// Reactor 1 which is ahead receives a number of evidence. It should only send the evidence -// that is below the height of the peer to that peer. -func TestReactorSelectiveBroadcast(t *testing.T) { - config := cfg.TestConfig() + for _, suite := range testSuites { + require.Equal(t, numEvidence, int(suite.pool.Size())) + } + wg.Wait() + + // ensure all channels are drained + for _, suite := range testSuites { + require.Empty(t, suite.evidenceOutCh) + } +} + +// TestReactorSelectiveBroadcast tests a context where we have two reactors +// connected to one another but are at different heights. Reactor 1 which is +// ahead receives a list of evidence. +func TestReactorBroadcastEvidence_Lagging(t *testing.T) { val := types.NewMockPV() height1 := int64(numEvidence) + 10 height2 := int64(numEvidence) / 2 - // DB1 is ahead of DB2 - stateDB1 := initializeValidatorState(val, height1) - stateDB2 := initializeValidatorState(val, height2) + // stateDB1 is ahead of stateDB2, where stateDB1 has all heights (1-10) and + // stateDB2 only has heights 1-7. + stateDB1 := initializeValidatorState(t, val, height1) + stateDB2 := initializeValidatorState(t, val, height2) - // make reactors from statedb - reactors, pools := makeAndConnectReactorsAndPools(config, []sm.Store{stateDB1, stateDB2}) + testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, 0) + primary := testSuites[0] + secondaries := testSuites[1:] - // set the peer height on each reactor - for _, r := range reactors { - for _, peer := range r.Switch.Peers().List() { - ps := peerState{height1} - peer.Set(types.PeerStateKey, ps) + // Simulate a router by listening for all outbound envelopes and proxying the + // envelope to the respective peer (suite). + wg := new(sync.WaitGroup) + simulateRouter(wg, primary, testSuites, numEvidence*len(secondaries)) + + // Send a list of valid evidence to the first reactor's, the one that is ahead, + // evidence pool. + evList := createEvidenceList(t, primary.pool, val, numEvidence) + + // Add each secondary suite (node) as a peer to the primary suite (node). This + // will cause the primary to gossip all evidence to the secondaries. + for _, suite := range secondaries { + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: suite.peerID, } } - // update the first reactor peer's height to be very small - peer := reactors[0].Switch.Peers().List()[0] - ps := peerState{height2} - peer.Set(types.PeerStateKey, ps) + // only ones less than the peers height should make it through + waitForEvidence(t, evList[:height2+2], secondaries...) - // send a bunch of valid evidence to the first reactor's evpool - evList := sendEvidence(t, pools[0], val, numEvidence) + require.Equal(t, numEvidence, int(primary.pool.Size())) + require.Equal(t, int(height2+2), int(secondaries[0].pool.Size())) - // only ones less than the peers height should make it through - waitForEvidence(t, evList[:numEvidence/2-1], []*evidence.Pool{pools[1]}) + // The primary will continue to send the remaining evidence to the secondaries + // so we wait until it has sent all the envelopes. + wg.Wait() - // peers should still be connected - peers := reactors[1].Switch.Peers().List() - assert.Equal(t, 1, len(peers)) + // ensure all channels are drained + for _, suite := range testSuites { + require.Empty(t, suite.evidenceOutCh) + } } -// This tests aims to ensure that reactors don't send evidence that they have committed or that ar -// not ready for the peer through three scenarios. -// First, committed evidence to a newly connected peer -// Second, evidence to a peer that is behind -// Third, evidence that was pending and became committed just before the peer caught up -func TestReactorsGossipNoCommittedEvidence(t *testing.T) { - config := cfg.TestConfig() - +func TestReactorBroadcastEvidence_Pending(t *testing.T) { val := types.NewMockPV() - var height int64 = 10 + height := int64(10) - // DB1 is ahead of DB2 - stateDB1 := initializeValidatorState(val, height-1) - stateDB2 := initializeValidatorState(val, height-2) - state, err := stateDB1.Load() - require.NoError(t, err) - state.LastBlockHeight++ + stateDB1 := initializeValidatorState(t, val, height) + stateDB2 := initializeValidatorState(t, val, height) - // make reactors from statedb - reactors, pools := makeAndConnectReactorsAndPools(config, []sm.Store{stateDB1, stateDB2}) + testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, 0) + primary := testSuites[0] + secondary := testSuites[1] - evList := sendEvidence(t, pools[0], val, 2) - pools[0].Update(state, evList) - require.EqualValues(t, uint32(0), pools[0].Size()) + // Simulate a router by listening for all outbound envelopes and proxying the + // envelopes to the respective peer (suite). + wg := new(sync.WaitGroup) + simulateRouter(wg, primary, testSuites, numEvidence) - time.Sleep(100 * time.Millisecond) + // add all evidence to the primary reactor + evList := createEvidenceList(t, primary.pool, val, numEvidence) - peer := reactors[0].Switch.Peers().List()[0] - ps := peerState{height - 2} - peer.Set(types.PeerStateKey, ps) + // Manually add half the evidence to the secondary which will mark them as + // pending. + for i := 0; i < numEvidence/2; i++ { + require.NoError(t, secondary.pool.AddEvidence(evList[i])) + } - peer = reactors[1].Switch.Peers().List()[0] - ps = peerState{height} - peer.Set(types.PeerStateKey, ps) + // the secondary should have half the evidence as pending + require.Equal(t, uint32(numEvidence/2), secondary.pool.Size()) - // wait to see that no evidence comes through - time.Sleep(300 * time.Millisecond) + // add the secondary reactor as a peer to the primary reactor + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: secondary.peerID, + } - // the second pool should not have received any evidence because it has already been committed - assert.Equal(t, uint32(0), pools[1].Size(), "second reactor should not have received evidence") + // The secondary reactor should have received all the evidence ignoring the + // already pending evidence. + waitForEvidence(t, evList, secondary) - // the first reactor receives three more evidence - evList = make([]types.Evidence, 3) - for i := 0; i < 3; i++ { - ev := types.NewMockDuplicateVoteEvidenceWithValidator(height-3+int64(i), - time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC), val, state.ChainID) - err := pools[0].AddEvidence(ev) - require.NoError(t, err) - evList[i] = ev + for _, suite := range testSuites { + require.Equal(t, numEvidence, int(suite.pool.Size())) + } + + wg.Wait() + + // ensure all channels are drained + for _, suite := range testSuites { + require.Empty(t, suite.evidenceOutCh) + } +} + +func TestReactorBroadcastEvidence_Committed(t *testing.T) { + val := types.NewMockPV() + height := int64(10) + + stateDB1 := initializeValidatorState(t, val, height) + stateDB2 := initializeValidatorState(t, val, height) + + testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, 0) + primary := testSuites[0] + secondary := testSuites[1] + + // add all evidence to the primary reactor + evList := createEvidenceList(t, primary.pool, val, numEvidence) + + // Manually add half the evidence to the secondary which will mark them as + // pending. + for i := 0; i < numEvidence/2; i++ { + require.NoError(t, secondary.pool.AddEvidence(evList[i])) } - // wait to see that only one evidence is sent - time.Sleep(300 * time.Millisecond) + // the secondary should have half the evidence as pending + require.Equal(t, uint32(numEvidence/2), secondary.pool.Size()) - // the second pool should only have received the first evidence because it is behind - peerEv, _ := pools[1].PendingEvidence(10000) - assert.EqualValues(t, []types.Evidence{evList[0]}, peerEv) + state, err := stateDB2.Load() + require.NoError(t, err) - // the last evidence is committed and the second reactor catches up in state to the first - // reactor. We therefore expect that the second reactor only receives one more evidence, the - // one that is still pending and not the evidence that has already been committed. + // update the secondary's pool such that all pending evidence is committed state.LastBlockHeight++ - pools[0].Update(state, []types.Evidence{evList[2]}) - // the first reactor should have the two remaining pending evidence - require.EqualValues(t, uint32(2), pools[0].Size()) + secondary.pool.Update(state, evList[:numEvidence/2]) - // now update the state of the second reactor - pools[1].Update(state, types.EvidenceList{}) - peer = reactors[0].Switch.Peers().List()[0] - ps = peerState{height} - peer.Set(types.PeerStateKey, ps) + // the secondary should have half the evidence as committed + require.Equal(t, uint32(0), secondary.pool.Size()) - // wait to see that only two evidence is sent - time.Sleep(300 * time.Millisecond) + // Simulate a router by listening for all outbound envelopes and proxying the + // envelopes to the respective peer (suite). + wg := new(sync.WaitGroup) + simulateRouter(wg, primary, testSuites, numEvidence) - peerEv, _ = pools[1].PendingEvidence(1000) - assert.EqualValues(t, []types.Evidence{evList[0], evList[1]}, peerEv) -} + // add the secondary reactor as a peer to the primary reactor + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: secondary.peerID, + } -// evidenceLogger is a TestingLogger which uses a different -// color for each validator ("validator" key must exist). -func evidenceLogger() log.Logger { - return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { - for i := 0; i < len(keyvals)-1; i += 2 { - if keyvals[i] == "validator" { - return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} - } - } - return term.FgBgColor{} - }) + // The secondary reactor should have received all the evidence ignoring the + // already committed evidence. + waitForEvidence(t, evList[numEvidence/2:], secondary) + + require.Equal(t, numEvidence, int(primary.pool.Size())) + require.Equal(t, numEvidence/2, int(secondary.pool.Size())) + + wg.Wait() + + // ensure all channels are drained + for _, suite := range testSuites { + require.Empty(t, suite.evidenceOutCh) + } } -// connect N evidence reactors through N switches -func makeAndConnectReactorsAndPools(config *cfg.Config, stateStores []sm.Store) ([]*evidence.Reactor, - []*evidence.Pool) { - N := len(stateStores) +func TestReactorBroadcastEvidence_FullyConnected(t *testing.T) { + numPeers := 7 - reactors := make([]*evidence.Reactor, N) - pools := make([]*evidence.Pool, N) - logger := evidenceLogger() - evidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC) + // create a stateDB for all test suites (nodes) + stateDBs := make([]sm.Store, numPeers) + val := types.NewMockPV() - for i := 0; i < N; i++ { - evidenceDB := dbm.NewMemDB() - blockStore := &mocks.BlockStore{} - blockStore.On("LoadBlockMeta", mock.AnythingOfType("int64")).Return( - &types.BlockMeta{Header: types.Header{Time: evidenceTime}}, - ) - pool, err := evidence.NewPool(evidenceDB, stateStores[i], blockStore) - if err != nil { - panic(err) + // We need all validators saved for heights at least as high as we have + // evidence for. + height := int64(numEvidence) + 10 + for i := 0; i < numPeers; i++ { + stateDBs[i] = initializeValidatorState(t, val, height) + } + + testSuites := createTestSuites(t, stateDBs, 0) + + // Simulate a router by listening for all outbound envelopes and proxying the + // envelopes to the respective peer (suite). + wg := new(sync.WaitGroup) + for _, suite := range testSuites { + simulateRouter(wg, suite, testSuites, numEvidence*(len(testSuites)-1)) + } + + evList := createEvidenceList(t, testSuites[0].pool, val, numEvidence) + + // every suite (reactor) connects to every other suite (reactor) + for _, suiteI := range testSuites { + for _, suiteJ := range testSuites { + if suiteI.peerID != suiteJ.peerID { + suiteI.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: suiteJ.peerID, + } + } } - pools[i] = pool - reactors[i] = evidence.NewReactor(pool) - reactors[i].SetLogger(logger.With("validator", i)) } - p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("EVIDENCE", reactors[i]) - return s + // wait till all suites (reactors) received all evidence from other suites (reactors) + waitForEvidence(t, evList, testSuites...) + + for _, suite := range testSuites { + require.Equal(t, numEvidence, int(suite.pool.Size())) - }, p2p.Connect2Switches) + // commit state so we do not continue to repeat gossiping the same evidence + state := suite.pool.State() + state.LastBlockHeight++ + suite.pool.Update(state, evList) + } - return reactors, pools + wg.Wait() } -// wait for all evidence on all reactors -func waitForEvidence(t *testing.T, evs types.EvidenceList, pools []*evidence.Pool) { - // wait for the evidence in all evpools +func TestReactorBroadcastEvidence_RemovePeer(t *testing.T) { + val := types.NewMockPV() + height := int64(10) + + stateDB1 := initializeValidatorState(t, val, height) + stateDB2 := initializeValidatorState(t, val, height) + + testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, uint(numEvidence)) + primary := testSuites[0] + secondary := testSuites[1] + + // Simulate a router by listening for all outbound envelopes and proxying the + // envelopes to the respective peer (suite). wg := new(sync.WaitGroup) - for i := 0; i < len(pools); i++ { - wg.Add(1) - go _waitForEvidence(t, wg, evs, i, pools) + simulateRouter(wg, primary, testSuites, numEvidence/2) + + // add all evidence to the primary reactor + evList := createEvidenceList(t, primary.pool, val, numEvidence) + + // add the secondary reactor as a peer to the primary reactor + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: secondary.peerID, } - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() + // have the secondary reactor receive only half the evidence + waitForEvidence(t, evList[:numEvidence/2], secondary) - timer := time.After(timeout) - select { - case <-timer: - t.Fatal("Timed out waiting for evidence") - case <-done: + // disconnect the peer + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusDown, + PeerID: secondary.peerID, } -} -// wait for all evidence on a single evpool -func _waitForEvidence( - t *testing.T, - wg *sync.WaitGroup, - evs types.EvidenceList, - poolIdx int, - pools []*evidence.Pool, -) { - evpool := pools[poolIdx] - var evList []types.Evidence - currentPoolSize := 0 - for currentPoolSize != len(evs) { - evList, _ = evpool.PendingEvidence(int64(len(evs) * 500)) // each evidence should not be more than 500 bytes - currentPoolSize = len(evList) - time.Sleep(time.Millisecond * 100) - } - - // put the reaped evidence in a map so we can quickly check we got everything - evMap := make(map[string]types.Evidence) - for _, e := range evList { - evMap[string(e.Hash())] = e - } - for i, expectedEv := range evs { - gotEv := evMap[string(expectedEv.Hash())] - assert.Equal(t, expectedEv, gotEv, - fmt.Sprintf("evidence at index %d on pool %d don't match: %v vs %v", - i, poolIdx, expectedEv, gotEv)) - } - - wg.Done() -} + // Ensure the secondary only received half of the evidence before being + // disconnected. + require.Equal(t, numEvidence/2, int(secondary.pool.Size())) -func sendEvidence(t *testing.T, evpool *evidence.Pool, val types.PrivValidator, n int) types.EvidenceList { - evList := make([]types.Evidence, n) - for i := 0; i < n; i++ { - ev := types.NewMockDuplicateVoteEvidenceWithValidator(int64(i+1), - time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC), val, evidenceChainID) - err := evpool.AddEvidence(ev) - require.NoError(t, err) - evList[i] = ev + wg.Wait() + + // The primary reactor should still be attempting to send the remaining half. + // + // NOTE: The channel is buffered (size numEvidence) as to ensure the primary + // reactor will send all envelopes at once before receiving the signal to stop + // gossiping. + for i := 0; i < numEvidence/2; i++ { + <-primary.evidenceOutCh } - return evList -} -type peerState struct { - height int64 + // ensure all channels are drained + for _, suite := range testSuites { + require.Empty(t, suite.evidenceOutCh) + } } -func (ps peerState) GetHeight() int64 { - return ps.height -} +// nolint:lll +func TestEvidenceListSerialization(t *testing.T) { + exampleVote := func(msgType byte) *types.Vote { + var stamp, err = time.Parse(types.TimeFormat, "2017-12-25T03:00:01.234Z") + require.NoError(t, err) -func exampleVote(t byte) *types.Vote { - var stamp, err = time.Parse(types.TimeFormat, "2017-12-25T03:00:01.234Z") - if err != nil { - panic(err) - } - - return &types.Vote{ - Type: tmproto.SignedMsgType(t), - Height: 3, - Round: 2, - Timestamp: stamp, - BlockID: types.BlockID{ - Hash: tmhash.Sum([]byte("blockID_hash")), - PartSetHeader: types.PartSetHeader{ - Total: 1000000, - Hash: tmhash.Sum([]byte("blockID_part_set_header_hash")), + return &types.Vote{ + Type: tmproto.SignedMsgType(msgType), + Height: 3, + Round: 2, + Timestamp: stamp, + BlockID: types.BlockID{ + Hash: tmhash.Sum([]byte("blockID_hash")), + PartSetHeader: types.PartSetHeader{ + Total: 1000000, + Hash: tmhash.Sum([]byte("blockID_part_set_header_hash")), + }, }, - }, - ValidatorAddress: crypto.AddressHash([]byte("validator_address")), - ValidatorIndex: 56789, + ValidatorAddress: crypto.AddressHash([]byte("validator_address")), + ValidatorIndex: 56789, + } } -} - -// nolint:lll //ignore line length for tests -func TestEvidenceVectors(t *testing.T) { val := &types.Validator{ Address: crypto.AddressHash([]byte("validator_address")), @@ -347,33 +600,35 @@ func TestEvidenceVectors(t *testing.T) { valSet, ) - testCases := []struct { - testName string + testCases := map[string]struct { evidenceList []types.Evidence expBytes string }{ - {"DuplicateVoteEvidence", []types.Evidence{dupl}, "0a85020a82020a79080210031802224a0a208b01023386c371778ecb6368573e539afc3cc860ec3a2f614e54fe5652f4fc80122608c0843d122072db3d959635dff1bb567bedaa70573392c5159666a3f8caf11e413aac52207a2a0b08b1d381d20510809dca6f32146af1f4111082efb388211bc72c55bcd61e9ac3d538d5bb031279080110031802224a0a208b01023386c371778ecb6368573e539afc3cc860ec3a2f614e54fe5652f4fc80122608c0843d122072db3d959635dff1bb567bedaa70573392c5159666a3f8caf11e413aac52207a2a0b08b1d381d20510809dca6f32146af1f4111082efb388211bc72c55bcd61e9ac3d538d5bb03180a200a2a060880dbaae105"}, + "DuplicateVoteEvidence": { + []types.Evidence{dupl}, + "0a85020a82020a79080210031802224a0a208b01023386c371778ecb6368573e539afc3cc860ec3a2f614e54fe5652f4fc80122608c0843d122072db3d959635dff1bb567bedaa70573392c5159666a3f8caf11e413aac52207a2a0b08b1d381d20510809dca6f32146af1f4111082efb388211bc72c55bcd61e9ac3d538d5bb031279080110031802224a0a208b01023386c371778ecb6368573e539afc3cc860ec3a2f614e54fe5652f4fc80122608c0843d122072db3d959635dff1bb567bedaa70573392c5159666a3f8caf11e413aac52207a2a0b08b1d381d20510809dca6f32146af1f4111082efb388211bc72c55bcd61e9ac3d538d5bb03180a200a2a060880dbaae105", + }, } - for _, tc := range testCases { + for name, tc := range testCases { tc := tc - evi := make([]tmproto.Evidence, len(tc.evidenceList)) - for i := 0; i < len(tc.evidenceList); i++ { - ev, err := types.EvidenceToProto(tc.evidenceList[i]) - require.NoError(t, err, tc.testName) - evi[i] = *ev - } - - epl := tmproto.EvidenceList{ - Evidence: evi, - } + t.Run(name, func(t *testing.T) { + protoEv := make([]tmproto.Evidence, len(tc.evidenceList)) + for i := 0; i < len(tc.evidenceList); i++ { + ev, err := types.EvidenceToProto(tc.evidenceList[i]) + require.NoError(t, err) + protoEv[i] = *ev + } - bz, err := epl.Marshal() - require.NoError(t, err, tc.testName) + epl := tmproto.EvidenceList{ + Evidence: protoEv, + } - require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName) + bz, err := epl.Marshal() + require.NoError(t, err) + require.Equal(t, tc.expBytes, hex.EncodeToString(bz)) + }) } - } diff --git a/evidence/verify.go b/evidence/verify.go index 0721ade9a..dabd248c8 100644 --- a/evidence/verify.go +++ b/evidence/verify.go @@ -17,6 +17,11 @@ import ( // - it is from a key who was a validator at the given height // - it is internally consistent with state // - it was properly signed by the alleged equivocator and meets the individual evidence verification requirements +// +// NOTE: Evidence may be provided that we do not have the block or validator +// set for. In these cases, we do not return a ErrInvalidEvidence as not to have +// the sending peer disconnect. All other errors are treated as invalid evidence +// (i.e. ErrInvalidEvidence). func (evpool *Pool) verify(evidence types.Evidence) error { var ( state = evpool.State() @@ -25,26 +30,42 @@ func (evpool *Pool) verify(evidence types.Evidence) error { ageNumBlocks = height - evidence.Height() ) - // verify the time of the evidence + // ensure we have the block for the evidence height + // + // NOTE: It is currently possible for a peer to send us evidence we're not + // able to process because we're too far behind (e.g. syncing), so we DO NOT + // return an invalid evidence error because we do not want the peer to + // disconnect or signal an error in this particular case. blockMeta := evpool.blockStore.LoadBlockMeta(evidence.Height()) if blockMeta == nil { - return fmt.Errorf("don't have header #%d", evidence.Height()) + return fmt.Errorf("failed to verify evidence; missing block for height %d", evidence.Height()) } + + // verify the time of the evidence evTime := blockMeta.Header.Time if evidence.Time() != evTime { - return fmt.Errorf("evidence has a different time to the block it is associated with (%v != %v)", - evidence.Time(), evTime) + return types.NewErrInvalidEvidence( + evidence, + fmt.Errorf( + "evidence has a different time to the block it is associated with (%v != %v)", + evidence.Time(), evTime, + ), + ) } + ageDuration := state.LastBlockTime.Sub(evTime) // check that the evidence hasn't expired if ageDuration > evidenceParams.MaxAgeDuration && ageNumBlocks > evidenceParams.MaxAgeNumBlocks { - return fmt.Errorf( - "evidence from height %d (created at: %v) is too old; min height is %d and evidence can not be older than %v", - evidence.Height(), - evTime, - height-evidenceParams.MaxAgeNumBlocks, - state.LastBlockTime.Add(evidenceParams.MaxAgeDuration), + return types.NewErrInvalidEvidence( + evidence, + fmt.Errorf( + "evidence from height %d (created at: %v) is too old; min height is %d and evidence can not be older than %v", + evidence.Height(), + evTime, + height-evidenceParams.MaxAgeNumBlocks, + state.LastBlockTime.Add(evidenceParams.MaxAgeDuration), + ), ) } @@ -55,18 +76,26 @@ func (evpool *Pool) verify(evidence types.Evidence) error { if err != nil { return err } - return VerifyDuplicateVote(ev, state.ChainID, valSet) + + if err := VerifyDuplicateVote(ev, state.ChainID, valSet); err != nil { + return types.NewErrInvalidEvidence(evidence, err) + } + + return nil case *types.LightClientAttackEvidence: commonHeader, err := getSignedHeader(evpool.blockStore, evidence.Height()) if err != nil { return err } + commonVals, err := evpool.stateDB.LoadValidators(evidence.Height()) if err != nil { return err } + trustedHeader := commonHeader + // in the case of lunatic the trusted header is different to the common header if evidence.Height() != ev.ConflictingBlock.Height { trustedHeader, err = getSignedHeader(evpool.blockStore, ev.ConflictingBlock.Height) @@ -75,23 +104,40 @@ func (evpool *Pool) verify(evidence types.Evidence) error { } } - err = VerifyLightClientAttack(ev, commonHeader, trustedHeader, commonVals, state.LastBlockTime, - state.ConsensusParams.Evidence.MaxAgeDuration) + err = VerifyLightClientAttack( + ev, + commonHeader, + trustedHeader, + commonVals, + state.LastBlockTime, + state.ConsensusParams.Evidence.MaxAgeDuration, + ) if err != nil { - return err + return types.NewErrInvalidEvidence(evidence, err) } - // find out what type of attack this was and thus extract the malicious validators. Note in the case of an - // Amnesia attack we don't have any malicious validators. + + // Find out what type of attack this was and thus extract the malicious + // validators. Note, in the case of an Amnesia attack we don't have any + // malicious validators. validators := ev.GetByzantineValidators(commonVals, trustedHeader) - // ensure this matches the validators that are listed in the evidence. They should be ordered based on power. + + // Ensure this matches the validators that are listed in the evidence. They + // should be ordered based on power. if validators == nil && ev.ByzantineValidators != nil { - return fmt.Errorf("expected nil validators from an amnesia light client attack but got %d", - len(ev.ByzantineValidators)) + return types.NewErrInvalidEvidence( + evidence, + fmt.Errorf( + "expected nil validators from an amnesia light client attack but got %d", + len(ev.ByzantineValidators), + ), + ) } if exp, got := len(validators), len(ev.ByzantineValidators); exp != got { - return fmt.Errorf("expected %d byzantine validators from evidence but got %d", - exp, got) + return types.NewErrInvalidEvidence( + evidence, + fmt.Errorf("expected %d byzantine validators from evidence but got %d", exp, got), + ) } // ensure that both validator arrays are in the same order @@ -99,20 +145,31 @@ func (evpool *Pool) verify(evidence types.Evidence) error { for idx, val := range validators { if !bytes.Equal(ev.ByzantineValidators[idx].Address, val.Address) { - return fmt.Errorf("evidence contained a different byzantine validator address to the one we were expecting."+ - "Expected %v, got %v", val.Address, ev.ByzantineValidators[idx].Address) + return types.NewErrInvalidEvidence( + evidence, + fmt.Errorf( + "evidence contained an unexpected byzantine validator address; expected: %v, got: %v", + val.Address, ev.ByzantineValidators[idx].Address, + ), + ) } + if ev.ByzantineValidators[idx].VotingPower != val.VotingPower { - return fmt.Errorf("evidence contained a byzantine validator with a different power to the one we were expecting."+ - "Expected %d, got %d", val.VotingPower, ev.ByzantineValidators[idx].VotingPower) + return types.NewErrInvalidEvidence( + evidence, + fmt.Errorf( + "evidence contained unexpected byzantine validator power; expected %d, got %d", + val.VotingPower, ev.ByzantineValidators[idx].VotingPower, + ), + ) } } return nil + default: - return fmt.Errorf("unrecognized evidence type: %T", evidence) + return types.NewErrInvalidEvidence(evidence, fmt.Errorf("unrecognized evidence type: %T", evidence)) } - } // VerifyLightClientAttack verifies LightClientAttackEvidence against the state of the full node. This involves diff --git a/evidence/verify_test.go b/evidence/verify_test.go index 0e72582b2..cdad27d4e 100644 --- a/evidence/verify_test.go +++ b/evidence/verify_test.go @@ -103,9 +103,8 @@ func TestVerifyLightClientAttack_Lunatic(t *testing.T) { blockStore.On("LoadBlockCommit", int64(4)).Return(commit) blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit) - pool, err := evidence.NewPool(dbm.NewMemDB(), stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) require.NoError(t, err) - pool.SetLogger(log.TestingLogger()) evList := types.EvidenceList{ev} err = pool.CheckEvidence(evList) @@ -201,9 +200,8 @@ func TestVerifyLightClientAttack_Equivocation(t *testing.T) { blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader}) blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit) - pool, err := evidence.NewPool(dbm.NewMemDB(), stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) require.NoError(t, err) - pool.SetLogger(log.TestingLogger()) evList := types.EvidenceList{ev} err = pool.CheckEvidence(evList) @@ -276,9 +274,8 @@ func TestVerifyLightClientAttack_Amnesia(t *testing.T) { blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: *trustedHeader}) blockStore.On("LoadBlockCommit", int64(10)).Return(trustedCommit) - pool, err := evidence.NewPool(dbm.NewMemDB(), stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) require.NoError(t, err) - pool.SetLogger(log.TestingLogger()) evList := types.EvidenceList{ev} err = pool.CheckEvidence(evList) @@ -369,7 +366,7 @@ func TestVerifyDuplicateVoteEvidence(t *testing.T) { blockStore := &mocks.BlockStore{} blockStore.On("LoadBlockMeta", int64(10)).Return(&types.BlockMeta{Header: types.Header{Time: defaultEvidenceTime}}) - pool, err := evidence.NewPool(dbm.NewMemDB(), stateStore, blockStore) + pool, err := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore) require.NoError(t, err) evList := types.EvidenceList{goodEv} diff --git a/node/node.go b/node/node.go index 9d06325bb..b79aa1d6c 100644 --- a/node/node.go +++ b/node/node.go @@ -201,9 +201,10 @@ type Node struct { consensusState *cs.State // latest consensus state consensusReactor *cs.Reactor // for participating in the consensus pexReactor *pex.Reactor // for exchanging peer addresses - evidencePool *evidence.Pool // tracking evidence - proxyApp proxy.AppConns // connection to the application - rpcListeners []net.Listener // rpc servers + evidenceReactor *evidence.Reactor + evidencePool *evidence.Pool // tracking evidence + proxyApp proxy.AppConns // connection to the application + rpcListeners []net.Listener // rpc servers txIndexer txindex.TxIndexer indexerService *txindex.IndexerService prometheusSrv *http.Server @@ -338,21 +339,34 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, return mempoolReactor, mempool } -func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider, - stateDB dbm.DB, blockStore *store.BlockStore, logger log.Logger) (*evidence.Reactor, *evidence.Pool, error) { - +func createEvidenceReactor( + config *cfg.Config, + dbProvider DBProvider, + stateDB dbm.DB, + blockStore *store.BlockStore, + logger log.Logger, +) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) { evidenceDB, err := dbProvider(&DBContext{"evidence", config}) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - evidenceLogger := logger.With("module", "evidence") - evidencePool, err := evidence.NewPool(evidenceDB, sm.NewStore(stateDB), blockStore) + + logger = logger.With("module", "evidence") + + evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - evidenceReactor := evidence.NewReactor(evidencePool) - evidenceReactor.SetLogger(evidenceLogger) - return evidenceReactor, evidencePool, nil + + evidenceReactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims) + evidenceReactor := evidence.NewReactor( + logger, + evidenceReactorShim.GetChannel(evidence.EvidenceChannel), + evidenceReactorShim.PeerUpdates, + evidencePool, + ) + + return evidenceReactorShim, evidenceReactor, evidencePool, nil } func createBlockchainReactor(config *cfg.Config, @@ -485,7 +499,7 @@ func createSwitch(config *cfg.Config, bcReactor p2p.Reactor, stateSyncReactor *p2p.ReactorShim, consensusReactor *cs.Reactor, - evidenceReactor *evidence.Reactor, + evidenceReactor *p2p.ReactorShim, nodeInfo p2p.NodeInfo, nodeKey p2p.NodeKey, p2pLogger log.Logger) *p2p.Switch { @@ -708,8 +722,7 @@ func NewNode(config *cfg.Config, // Make MempoolReactor mempoolReactor, mempool := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger) - // Make Evidence Reactor - evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger) + evReactorShim, evReactor, evPool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger) if err != nil { return nil, err } @@ -720,7 +733,7 @@ func NewNode(config *cfg.Config, logger.With("module", "state"), proxyApp.Consensus(), mempool, - evidencePool, + evPool, sm.BlockExecutorWithMetrics(smMetrics), ) @@ -737,8 +750,9 @@ func NewNode(config *cfg.Config, } else if fastSync { csMetrics.FastSyncing.Set(1) } + consensusReactor, consensusState := createConsensusReactor( - config, state, blockExec, blockStore, mempool, evidencePool, + config, state, blockExec, blockStore, mempool, evPool, privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, ) @@ -746,8 +760,7 @@ func NewNode(config *cfg.Config, // FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 - stateSyncReactorShim := p2p.NewReactorShim("StateSyncShim", statesync.ChannelShims) - stateSyncReactorShim.SetLogger(logger.With("module", "statesync")) + stateSyncReactorShim := p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims) stateSyncReactor := statesync.NewReactor( stateSyncReactorShim.Logger, @@ -769,7 +782,7 @@ func NewNode(config *cfg.Config, transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) sw := createSwitch( config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, - stateSyncReactorShim, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, + stateSyncReactorShim, consensusReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, ) err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) @@ -833,7 +846,8 @@ func NewNode(config *cfg.Config, stateSync: stateSync, stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state pexReactor: pexReactor, - evidencePool: evidencePool, + evidenceReactor: evReactor, + evidencePool: evPool, proxyApp: proxyApp, txIndexer: txIndexer, indexerService: indexerService, @@ -905,6 +919,11 @@ func (n *Node) OnStart() error { return err } + // Start the real evidence reactor separately since the switch uses the shim. + if err := n.evidenceReactor.Start(); err != nil { + return err + } + // Always connect to persistent peers err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " ")) if err != nil { @@ -948,7 +967,12 @@ func (n *Node) OnStop() { // Stop the real state sync reactor separately since the switch uses the shim. if err := n.stateSyncReactor.Stop(); err != nil { - n.Logger.Error("failed to stop state sync service", "err", err) + n.Logger.Error("failed to stop the state sync reactor", "err", err) + } + + // Stop the real evidence reactor separately since the switch uses the shim. + if err := n.evidenceReactor.Stop(); err != nil { + n.Logger.Error("failed to stop the evidence reactor", "err", err) } // stop mempool WAL @@ -1270,10 +1294,14 @@ func makeNodeInfo( Version: version.TMCoreSemVer, Channels: []byte{ bcChannel, - cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel, + cs.StateChannel, + cs.DataChannel, + cs.VoteChannel, + cs.VoteSetBitsChannel, mempl.MempoolChannel, - evidence.EvidenceChannel, - byte(statesync.SnapshotChannel), byte(statesync.ChunkChannel), + byte(evidence.EvidenceChannel), + byte(statesync.SnapshotChannel), + byte(statesync.ChunkChannel), }, Moniker: config.Moniker, Other: p2p.NodeInfoOther{ diff --git a/node/node_test.go b/node/node_test.go index 1ab345ae7..b6fdc26e9 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -257,9 +257,8 @@ func TestCreateProposalBlock(t *testing.T) { // Make EvidencePool evidenceDB := dbm.NewMemDB() blockStore := store.NewBlockStore(dbm.NewMemDB()) - evidencePool, err := evidence.NewPool(evidenceDB, stateStore, blockStore) + evidencePool, err := evidence.NewPool(logger, evidenceDB, stateStore, blockStore) require.NoError(t, err) - evidencePool.SetLogger(logger) // fill the evidence pool with more evidence // than can fit in a block diff --git a/p2p/key.go b/p2p/key.go index ff750ccc4..b6a53d94e 100644 --- a/p2p/key.go +++ b/p2p/key.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/ioutil" + "strings" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/ed25519" @@ -12,15 +13,22 @@ import ( tmos "github.com/tendermint/tendermint/libs/os" ) -// NodeID is a hex-encoded crypto.Address. -// FIXME: We should either ensure this is always lowercased, or add an Equal() -// for comparison that decodes to the binary byte slice first. -type NodeID string - // NodeIDByteLength is the length of a crypto.Address. Currently only 20. // FIXME: support other length addresses? const NodeIDByteLength = crypto.AddressSize +// NodeID is a hex-encoded crypto.Address. +type NodeID string + +// NewNodeID returns a lowercased (normalized) NodeID. +func NewNodeID(nodeID string) (NodeID, error) { + if _, err := NodeID(nodeID).Bytes(); err != nil { + return NodeID(""), err + } + + return NodeID(strings.ToLower(nodeID)), nil +} + // NodeIDFromPubKey returns the noe ID corresponding to the given PubKey. It's // the hex-encoding of the pubKey.Address(). func NodeIDFromPubKey(pubKey crypto.PubKey) NodeID { @@ -39,15 +47,23 @@ func (id NodeID) Bytes() ([]byte, error) { // Validate validates the NodeID. func (id NodeID) Validate() error { if len(id) == 0 { - return errors.New("no ID") + return errors.New("empty node ID") } + bz, err := id.Bytes() if err != nil { return err } + if len(bz) != NodeIDByteLength { - return fmt.Errorf("invalid ID length - got %d, expected %d", len(bz), NodeIDByteLength) + return fmt.Errorf("invalid node ID length; got %d, expected %d", len(bz), NodeIDByteLength) } + + idStr := string(id) + if strings.ToLower(idStr) != idStr { + return fmt.Errorf("invalid node ID; must be lowercased") + } + return nil } diff --git a/p2p/key_test.go b/p2p/key_test.go index 5a79cb540..33cece6d5 100644 --- a/p2p/key_test.go +++ b/p2p/key_test.go @@ -5,7 +5,6 @@ import ( "path/filepath" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" tmrand "github.com/tendermint/tendermint/libs/rand" @@ -15,35 +14,32 @@ func TestLoadOrGenNodeKey(t *testing.T) { filePath := filepath.Join(os.TempDir(), tmrand.Str(12)+"_peer_id.json") nodeKey, err := LoadOrGenNodeKey(filePath) - assert.Nil(t, err) + require.Nil(t, err) nodeKey2, err := LoadOrGenNodeKey(filePath) - assert.Nil(t, err) - - assert.Equal(t, nodeKey, nodeKey2) + require.Nil(t, err) + require.Equal(t, nodeKey, nodeKey2) } func TestLoadNodeKey(t *testing.T) { filePath := filepath.Join(os.TempDir(), tmrand.Str(12)+"_peer_id.json") _, err := LoadNodeKey(filePath) - assert.True(t, os.IsNotExist(err)) + require.True(t, os.IsNotExist(err)) _, err = LoadOrGenNodeKey(filePath) require.NoError(t, err) nodeKey, err := LoadNodeKey(filePath) - assert.NoError(t, err) - assert.NotNil(t, nodeKey) + require.NoError(t, err) + require.NotNil(t, nodeKey) } func TestNodeKeySaveAs(t *testing.T) { filePath := filepath.Join(os.TempDir(), tmrand.Str(12)+"_peer_id.json") - - assert.NoFileExists(t, filePath) + require.NoFileExists(t, filePath) nodeKey := GenNodeKey() - err := nodeKey.SaveAs(filePath) - assert.NoError(t, err) - assert.FileExists(t, filePath) + require.NoError(t, nodeKey.SaveAs(filePath)) + require.FileExists(t, filePath) } diff --git a/p2p/netaddress.go b/p2p/netaddress.go index b7c860ec3..3b6ec6e0d 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -73,12 +73,16 @@ func NewNetAddressString(addr string) (*NetAddress, error) { return nil, ErrNetAddressNoID{addr} } - // get ID - if err := NodeID(spl[0]).Validate(); err != nil { + id, err := NewNodeID(spl[0]) + if err != nil { + return nil, ErrNetAddressInvalid{addrWithoutProtocol, err} + } + + if err := id.Validate(); err != nil { return nil, ErrNetAddressInvalid{addrWithoutProtocol, err} } - var id NodeID - id, addrWithoutProtocol = NodeID(spl[0]), spl[1] + + addrWithoutProtocol = spl[1] // get host and port host, portStr, err := net.SplitHostPort(addrWithoutProtocol) diff --git a/p2p/peer.go b/p2p/peer.go index a7632d03a..3d9772622 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -68,9 +68,9 @@ type PeerUpdatesCh struct { } // NewPeerUpdates returns a reference to a new PeerUpdatesCh. -func NewPeerUpdates() *PeerUpdatesCh { +func NewPeerUpdates(updatesCh chan PeerUpdate) *PeerUpdatesCh { return &PeerUpdatesCh{ - updatesCh: make(chan PeerUpdate), + updatesCh: updatesCh, doneCh: make(chan struct{}), } } diff --git a/p2p/shim.go b/p2p/shim.go index 398c8eb2e..eb44b40f1 100644 --- a/p2p/shim.go +++ b/p2p/shim.go @@ -5,6 +5,7 @@ import ( "sort" "github.com/gogo/protobuf/proto" + "github.com/tendermint/tendermint/libs/log" ) // ============================================================================ @@ -50,7 +51,7 @@ type ( } ) -func NewReactorShim(name string, descriptors map[ChannelID]*ChannelDescriptorShim) *ReactorShim { +func NewReactorShim(logger log.Logger, name string, descriptors map[ChannelID]*ChannelDescriptorShim) *ReactorShim { channels := make(map[ChannelID]*ChannelShim) for _, cds := range descriptors { @@ -60,11 +61,12 @@ func NewReactorShim(name string, descriptors map[ChannelID]*ChannelDescriptorShi rs := &ReactorShim{ Name: name, - PeerUpdates: NewPeerUpdates(), + PeerUpdates: NewPeerUpdates(make(chan PeerUpdate)), Channels: channels, } rs.BaseReactor = *NewBaseReactor(name, rs) + rs.SetLogger(logger) return rs } diff --git a/p2p/shim_test.go b/p2p/shim_test.go index 2b9f9fbc4..f5b84a490 100644 --- a/p2p/shim_test.go +++ b/p2p/shim_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" p2pmocks "github.com/tendermint/tendermint/p2p/mocks" ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" @@ -50,7 +51,7 @@ func setup(t *testing.T, peers []p2p.Peer) *reactorShimTestSuite { t.Helper() rts := &reactorShimTestSuite{ - shim: p2p.NewReactorShim("TestShim", testChannelShims), + shim: p2p.NewReactorShim(log.TestingLogger(), "TestShim", testChannelShims), } rts.sw = p2p.MakeSwitch(p2pCfg, 1, "testing", "123.123.123", func(_ int, sw *p2p.Switch) *p2p.Switch { diff --git a/statesync/reactor.go b/statesync/reactor.go index 23ce26608..ca0be92a3 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -320,7 +320,6 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) - r.Logger.Error("recovering from processing message panic", "err", err) } }() @@ -341,7 +340,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err // processSnapshotCh initiates a blocking process where we listen for and handle // envelopes on the SnapshotChannel. Any error encountered during message // execution will result in a PeerError being sent on the SnapshotChannel. When -// the reactor is stopped, we will catch the singal and close the p2p Channel +// the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. func (r *Reactor) processSnapshotCh() { defer r.snapshotCh.Close() @@ -350,6 +349,7 @@ func (r *Reactor) processSnapshotCh() { select { case envelope := <-r.snapshotCh.In(): if err := r.handleMessage(r.snapshotCh.ID(), envelope); err != nil { + r.Logger.Error("failed to process envelope", "ch_id", r.snapshotCh.ID(), "envelope", envelope, "err", err) r.snapshotCh.Error() <- p2p.PeerError{ PeerID: envelope.From, Err: err, @@ -367,7 +367,7 @@ func (r *Reactor) processSnapshotCh() { // processChunkCh initiates a blocking process where we listen for and handle // envelopes on the ChunkChannel. Any error encountered during message // execution will result in a PeerError being sent on the ChunkChannel. When -// the reactor is stopped, we will catch the singal and close the p2p Channel +// the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. func (r *Reactor) processChunkCh() { defer r.chunkCh.Close() @@ -376,6 +376,7 @@ func (r *Reactor) processChunkCh() { select { case envelope := <-r.chunkCh.In(): if err := r.handleMessage(r.chunkCh.ID(), envelope); err != nil { + r.Logger.Error("failed to process envelope", "ch_id", r.chunkCh.ID(), "envelope", envelope, "err", err) r.chunkCh.Error() <- p2p.PeerError{ PeerID: envelope.From, Err: err, @@ -410,7 +411,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { } // processPeerUpdates initiates a blocking process where we listen for and handle -// PeerUpdate messages. When the reactor is stopped, we will catch the singal and +// PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. func (r *Reactor) processPeerUpdates() { defer r.peerUpdates.Close() diff --git a/statesync/reactor_test.go b/statesync/reactor_test.go index 824d2432a..6dfad5edb 100644 --- a/statesync/reactor_test.go +++ b/statesync/reactor_test.go @@ -62,7 +62,7 @@ func setup( chunkInCh: make(chan p2p.Envelope, chBuf), chunkOutCh: make(chan p2p.Envelope, chBuf), chunkPeerErrCh: make(chan p2p.PeerError, chBuf), - peerUpdates: p2p.NewPeerUpdates(), + peerUpdates: p2p.NewPeerUpdates(make(chan p2p.PeerUpdate)), conn: conn, connQuery: connQuery, stateProvider: stateProvider, diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index 62644a210..dd3b684af 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -243,9 +243,10 @@ type Node struct { consensusState *cs.State // latest consensus state consensusReactor *cs.Reactor // for participating in the consensus pexReactor *pex.Reactor // for exchanging peer addresses - evidencePool *evidence.Pool // tracking evidence - proxyApp proxy.AppConns // connection to the application - rpcListeners []net.Listener // rpc servers + evidenceReactor *evidence.Reactor + evidencePool *evidence.Pool // tracking evidence + proxyApp proxy.AppConns // connection to the application + rpcListeners []net.Listener // rpc servers txIndexer txindex.TxIndexer indexerService *txindex.IndexerService prometheusSrv *http.Server @@ -380,21 +381,34 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, return mempoolReactor, mempool } -func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider, - stateDB dbm.DB, blockStore *store.BlockStore, logger log.Logger) (*evidence.Reactor, *evidence.Pool, error) { - +func createEvidenceReactor( + config *cfg.Config, + dbProvider DBProvider, + stateDB dbm.DB, + blockStore *store.BlockStore, + logger log.Logger, +) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) { evidenceDB, err := dbProvider(&DBContext{"evidence", config}) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - evidenceLogger := logger.With("module", "evidence") - evidencePool, err := evidence.NewPool(evidenceDB, sm.NewStore(stateDB), blockStore) + + logger = logger.With("module", "evidence") + + evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - evidenceReactor := evidence.NewReactor(evidencePool) - evidenceReactor.SetLogger(evidenceLogger) - return evidenceReactor, evidencePool, nil + + evidenceReactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims) + evidenceReactor := evidence.NewReactor( + logger, + evidenceReactorShim.GetChannel(evidence.EvidenceChannel), + evidenceReactorShim.PeerUpdates, + evidencePool, + ) + + return evidenceReactorShim, evidenceReactor, evidencePool, nil } func createBlockchainReactor(config *cfg.Config, @@ -529,7 +543,7 @@ func createSwitch(config *cfg.Config, bcReactor p2p.Reactor, stateSyncReactor *p2p.ReactorShim, consensusReactor *cs.Reactor, - evidenceReactor *evidence.Reactor, + evidenceReactor *p2p.ReactorShim, nodeInfo p2p.NodeInfo, nodeKey p2p.NodeKey, p2pLogger log.Logger) *p2p.Switch { @@ -751,8 +765,7 @@ func NewNode(config *cfg.Config, // Make MempoolReactor mempoolReactor, mempool := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger) - // Make Evidence Reactor - evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger) + evReactorShim, evReactor, evPool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger) if err != nil { return nil, err } @@ -763,7 +776,7 @@ func NewNode(config *cfg.Config, logger.With("module", "state"), proxyApp.Consensus(), mempool, - evidencePool, + evPool, sm.BlockExecutorWithMetrics(smMetrics), ) @@ -783,15 +796,14 @@ func NewNode(config *cfg.Config, logger.Info("Setting up maverick consensus reactor", "Misbehaviors", misbehaviors) consensusReactor, consensusState := createConsensusReactor( - config, state, blockExec, blockStore, mempool, evidencePool, + config, state, blockExec, blockStore, mempool, evPool, privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, misbehaviors) // Set up state sync reactor, and schedule a sync if requested. // FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 - stateSyncReactorShim := p2p.NewReactorShim("StateSyncShim", statesync.ChannelShims) - stateSyncReactorShim.SetLogger(logger.With("module", "statesync")) + stateSyncReactorShim := p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims) stateSyncReactor := statesync.NewReactor( stateSyncReactorShim.Logger, @@ -813,7 +825,7 @@ func NewNode(config *cfg.Config, transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) sw := createSwitch( config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, - stateSyncReactorShim, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, + stateSyncReactorShim, consensusReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, ) err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) @@ -877,7 +889,8 @@ func NewNode(config *cfg.Config, stateSync: stateSync, stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state pexReactor: pexReactor, - evidencePool: evidencePool, + evidenceReactor: evReactor, + evidencePool: evPool, proxyApp: proxyApp, txIndexer: txIndexer, indexerService: indexerService, @@ -949,6 +962,11 @@ func (n *Node) OnStart() error { return err } + // Start the real evidence reactor separately since the switch uses the shim. + if err := n.evidenceReactor.Start(); err != nil { + return err + } + // Always connect to persistent peers err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " ")) if err != nil { @@ -992,7 +1010,12 @@ func (n *Node) OnStop() { // Stop the real state sync reactor separately since the switch uses the shim. if err := n.stateSyncReactor.Stop(); err != nil { - n.Logger.Error("failed to stop state sync service", "err", err) + n.Logger.Error("failed to stop the state sync reactor", "err", err) + } + + // Stop the real evidence reactor separately since the switch uses the shim. + if err := n.evidenceReactor.Stop(); err != nil { + n.Logger.Error("failed to stop the evidence reactor", "err", err) } // stop mempool WAL @@ -1312,10 +1335,14 @@ func makeNodeInfo( Version: version.TMCoreSemVer, Channels: []byte{ bcChannel, - cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel, + cs.StateChannel, + cs.DataChannel, + cs.VoteChannel, + cs.VoteSetBitsChannel, mempl.MempoolChannel, - evidence.EvidenceChannel, - byte(statesync.SnapshotChannel), byte(statesync.ChunkChannel), + byte(evidence.EvidenceChannel), + byte(statesync.SnapshotChannel), + byte(statesync.ChunkChannel), }, Moniker: config.Moniker, Other: p2p.NodeInfoOther{