diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 6ffce4b98..b16a98c6a 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -153,6 +153,72 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { } } + // introducing a lazy proposer means that the time of the block committed is different to the + // timestamp that the other nodes have. This tests to ensure that the evidence that finally gets + // proposed will have a valid timestamp + lazyProposer := css[1] + + lazyProposer.decideProposal = func(height int64, round int32) { + lazyProposer.Logger.Info("Lazy Proposer proposing condensed commit") + if lazyProposer.privValidator == nil { + panic("entered createProposalBlock with privValidator being nil") + } + + var commit *types.Commit + switch { + case lazyProposer.Height == lazyProposer.state.InitialHeight: + // We're creating a proposal for the first block. + // The commit is empty, but not nil. + commit = types.NewCommit(0, 0, types.BlockID{}, nil) + case lazyProposer.LastCommit.HasTwoThirdsMajority(): + // Make the commit from LastCommit + commit = lazyProposer.LastCommit.MakeCommit() + default: // This shouldn't happen. + lazyProposer.Logger.Error("enterPropose: Cannot propose anything: No commit for the previous block") + return + } + + // omit the last signature in the commit + commit.Signatures[len(commit.Signatures)-1] = types.NewCommitSigAbsent() + + if lazyProposer.privValidatorPubKey == nil { + // If this node is a validator & proposer in the current round, it will + // miss the opportunity to create a block. + lazyProposer.Logger.Error(fmt.Sprintf("enterPropose: %v", errPubKeyIsNotSet)) + return + } + proposerAddr := lazyProposer.privValidatorPubKey.Address() + + block, blockParts := lazyProposer.blockExec.CreateProposalBlock( + lazyProposer.Height, lazyProposer.state, commit, proposerAddr, + ) + + // Flush the WAL. Otherwise, we may not recompute the same proposal to sign, + // and the privValidator will refuse to sign anything. + if err := lazyProposer.wal.FlushAndSync(); err != nil { + lazyProposer.Logger.Error("Error flushing to disk") + } + + // Make proposal + propBlockID := types.BlockID{Hash: block.Hash(), PartSetHeader: blockParts.Header()} + proposal := types.NewProposal(height, round, lazyProposer.ValidRound, propBlockID) + p := proposal.ToProto() + if err := lazyProposer.privValidator.SignProposal(lazyProposer.state.ChainID, p); err == nil { + proposal.Signature = p.Signature + + // send proposal and block parts on internal msg queue + lazyProposer.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""}) + for i := 0; i < int(blockParts.Total()); i++ { + part := blockParts.GetPart(i) + lazyProposer.sendInternalMessage(msgInfo{&BlockPartMessage{lazyProposer.Height, lazyProposer.Round, part}, ""}) + } + lazyProposer.Logger.Info("Signed proposal", "height", height, "round", round, "proposal", proposal) + lazyProposer.Logger.Debug(fmt.Sprintf("Signed proposal block: %v", block)) + } else if !lazyProposer.replayMode { + lazyProposer.Logger.Error("enterPropose: Error signing proposal", "height", height, "round", round, "err", err) + } + } + // start the consensus reactors for i := 0; i < nValidators; i++ { s := reactors[i].conS.GetState() diff --git a/consensus/state.go b/consensus/state.go index d57bbf42c..f582d230d 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -73,9 +73,8 @@ type txNotifier interface { // interface to the evidence pool type evidencePool interface { - // Adds consensus based evidence to the evidence pool. This function differs to - // AddEvidence by bypassing verification and adding it immediately to the pool - AddEvidenceFromConsensus(types.Evidence) error + // reports conflicting votes to the evidence pool to be processed into evidence + ReportConflictingVotes(voteA, voteB *types.Vote) } // State handles execution of the consensus algorithm. @@ -1866,21 +1865,12 @@ func (cs *State) tryAddVote(vote *types.Vote, peerID p2p.NodeID) (bool, error) { vote.Type) return added, err } - var timestamp time.Time - if voteErr.VoteA.Height == cs.state.InitialHeight { - timestamp = cs.state.LastBlockTime // genesis time - } else { - timestamp = sm.MedianTime(cs.LastCommit.MakeCommit(), cs.LastValidators) - } - // form duplicate vote evidence from the conflicting votes and send it across to the - // evidence pool - ev := types.NewDuplicateVoteEvidence(voteErr.VoteA, voteErr.VoteB, timestamp, cs.Validators) - evidenceErr := cs.evpool.AddEvidenceFromConsensus(ev) - if evidenceErr != nil { - cs.Logger.Error("Failed to add evidence to the evidence pool", "err", evidenceErr) - } else { - cs.Logger.Debug("Added evidence to the evidence pool", "ev", ev) - } + // report conflicting votes to the evidence pool + cs.evpool.ReportConflictingVotes(voteErr.VoteA, voteErr.VoteB) + cs.Logger.Info("Found and sent conflicting votes to the evidence pool", + "VoteA", voteErr.VoteA, + "VoteB", voteErr.VoteB, + ) return added, err } else if err == types.ErrVoteNonDeterministicSignature { cs.Logger.Debug("Vote has non-deterministic signature", "err", err) diff --git a/evidence/pool.go b/evidence/pool.go index 6cb47069f..34c267c0c 100644 --- a/evidence/pool.go +++ b/evidence/pool.go @@ -43,10 +43,10 @@ type Pool struct { mtx sync.Mutex // latest state state sm.State - // evidence from consensus if buffered to this slice, awaiting until the next height + // evidence from consensus is buffered to this slice, awaiting until the next height // before being flushed to the pool. This prevents broadcasting and proposing of // evidence before the height with which the evidence happened is finished. - consensusBuffer []types.Evidence + consensusBuffer []duplicateVoteSet pruningHeight int64 pruningTime time.Time @@ -67,7 +67,7 @@ func NewPool(logger log.Logger, evidenceDB dbm.DB, stateDB sm.Store, blockStore logger: logger, evidenceStore: evidenceDB, evidenceList: clist.New(), - consensusBuffer: make([]types.Evidence, 0), + consensusBuffer: make([]duplicateVoteSet, 0), } // If pending evidence already in db, in event of prior failure, then check @@ -102,8 +102,13 @@ func (evpool *Pool) PendingEvidence(maxBytes int64) ([]types.Evidence, int64) { return evidence, size } -// Update pulls the latest state to be used for expiration and evidence params -// and then prunes all expired evidence. +// Update takes both the new state and the evidence committed at that height and performs +// the following operations: +// 1. Take any conflicting votes from consensus and use the state's LastBlockTime to form +// DuplicateVoteEvidence and add it to the pool. +// 2. Update the pool's state which contains evidence params relating to expiry. +// 3. Moves pending evidence that has now been committed into the committed pool. +// 4. Removes any expired evidence based on both height and time. func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) { // sanity check if state.LastBlockHeight <= evpool.state.LastBlockHeight { @@ -114,18 +119,17 @@ func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) { )) } - evpool.logger.Info( + evpool.logger.Debug( "updating evidence pool", "last_block_height", state.LastBlockHeight, "last_block_time", state.LastBlockTime, ) - evpool.mtx.Lock() - // flush awaiting evidence from consensus into pool - evpool.flushConsensusBuffer() + // flush conflicting vote pairs from the buffer, producing DuplicateVoteEvidence and + // adding it to the pool + evpool.processConsensusBuffer(state) // update state - evpool.state = state - evpool.mtx.Unlock() + evpool.updateState(state) // move committed evidence out from the pending pool and into the committed pool evpool.markEvidenceAsCommitted(ev) @@ -144,7 +148,7 @@ func (evpool *Pool) AddEvidence(ev types.Evidence) error { // We have already verified this piece of evidence - no need to do it again if evpool.isPending(ev) { - evpool.logger.Info("evidence already pending; ignoring", "evidence", ev) + evpool.logger.Debug("evidence already pending; ignoring", "evidence", ev) return nil } @@ -173,24 +177,22 @@ func (evpool *Pool) AddEvidence(ev types.Evidence) error { return nil } -// AddEvidenceFromConsensus should be exposed only to the consensus reactor so -// it can add evidence to the pool directly without the need for verification. -func (evpool *Pool) AddEvidenceFromConsensus(ev types.Evidence) error { - // we already have this evidence, log this but don't return an error. - if evpool.isPending(ev) { - evpool.logger.Info("evidence already pending; ignoring", "evidence", ev) - return nil - } - - // add evidence to a buffer which will pass the evidence to the pool at the following height. - // This avoids the issue of some nodes verifying and proposing evidence at a height where the - // block hasn't been committed on cause others to potentially fail. +// ReportConflictingVotes takes two conflicting votes and forms duplicate vote evidence, +// adding it eventually to the evidence pool. +// +// Duplicate vote attacks happen before the block is committed and the timestamp is +// finalized, thus the evidence pool holds these votes in a buffer, forming the +// evidence from them once consensus at that height has been reached and `Update()` with +// the new state called. +// +// Votes are not verified. +func (evpool *Pool) ReportConflictingVotes(voteA, voteB *types.Vote) { evpool.mtx.Lock() defer evpool.mtx.Unlock() - evpool.consensusBuffer = append(evpool.consensusBuffer, ev) - evpool.logger.Info("received new evidence of byzantine behavior from consensus", "evidence", ev) - - return nil + evpool.consensusBuffer = append(evpool.consensusBuffer, duplicateVoteSet{ + VoteA: voteA, + VoteB: voteB, + }) } // CheckEvidence takes an array of evidence from a block and verifies all the evidence there. @@ -211,7 +213,7 @@ func (evpool *Pool) CheckEvidence(evList types.EvidenceList) error { err := evpool.verify(ev) if err != nil { - return &types.ErrInvalidEvidence{Evidence: ev, Reason: err} + return err } if err := evpool.addPendingEvidence(ev); err != nil { @@ -390,7 +392,7 @@ func (evpool *Pool) removePendingEvidence(evidence types.Evidence) { evpool.logger.Error("failed to delete pending evidence", "err", err) } else { atomic.AddUint32(&evpool.evidenceSize, ^uint32(0)) - evpool.logger.Info("deleted pending evidence", "evidence", evidence) + evpool.logger.Debug("deleted pending evidence", "evidence", evidence) } } @@ -419,7 +421,7 @@ func (evpool *Pool) markEvidenceAsCommitted(evidence types.EvidenceList) { evpool.logger.Error("failed to save committed evidence", "key(height/hash)", key, "err", err) } - evpool.logger.Info("marked evidence as committed", "evidence", ev) + evpool.logger.Debug("marked evidence as committed", "evidence", ev) } // remove committed evidence from the clist @@ -532,19 +534,90 @@ func (evpool *Pool) removeEvidenceFromList( } } -// flushConsensusBuffer moves the evidence produced from consensus into the evidence pool -// and list so that it can be broadcasted and proposed -func (evpool *Pool) flushConsensusBuffer() { - for _, ev := range evpool.consensusBuffer { - if err := evpool.addPendingEvidence(ev); err != nil { +func (evpool *Pool) updateState(state sm.State) { + evpool.mtx.Lock() + defer evpool.mtx.Unlock() + evpool.state = state +} + +// processConsensusBuffer converts all the duplicate votes witnessed from consensus +// into DuplicateVoteEvidence. It sets the evidence timestamp to the block height +// from the most recently committed block. +// Evidence is then added to the pool so as to be ready to be broadcasted and proposed. +func (evpool *Pool) processConsensusBuffer(state sm.State) { + evpool.mtx.Lock() + defer evpool.mtx.Unlock() + for _, voteSet := range evpool.consensusBuffer { + + // Check the height of the conflicting votes and fetch the corresponding time and validator set + // to produce the valid evidence + var dve *types.DuplicateVoteEvidence + switch { + case voteSet.VoteA.Height == state.LastBlockHeight: + dve = types.NewDuplicateVoteEvidence( + voteSet.VoteA, + voteSet.VoteB, + state.LastBlockTime, + state.LastValidators, + ) + + case voteSet.VoteA.Height < state.LastBlockHeight: + valSet, err := evpool.stateDB.LoadValidators(voteSet.VoteA.Height) + if err != nil { + evpool.logger.Error("failed to load validator set for conflicting votes", + "height", voteSet.VoteA.Height, "err", err) + continue + } + blockMeta := evpool.blockStore.LoadBlockMeta(voteSet.VoteA.Height) + if blockMeta == nil { + evpool.logger.Error("failed to load block time for conflicting votes", "height", voteSet.VoteA.Height) + continue + } + dve = types.NewDuplicateVoteEvidence( + voteSet.VoteA, + voteSet.VoteB, + blockMeta.Header.Time, + valSet, + ) + + default: + // evidence pool shouldn't expect to get votes from consensus of a height that is above the current + // state. If this error is seen then perhaps consider keeping the votes in the buffer and retry + // in following heights + evpool.logger.Error("inbound duplicate votes from consensus are of a greater height than current state", + "duplicate vote height", voteSet.VoteA.Height, + "state.LastBlockHeight", state.LastBlockHeight) + continue + } + + // check if we already have this evidence + if evpool.isPending(dve) { + evpool.logger.Debug("evidence already pending; ignoring", "evidence", dve) + continue + } + + // check that the evidence is not already committed on chain + if evpool.isCommitted(dve) { + evpool.logger.Debug("evidence already committed; ignoring", "evidence", dve) + continue + } + + if err := evpool.addPendingEvidence(dve); err != nil { evpool.logger.Error("failed to flush evidence from consensus buffer to pending list: %w", err) continue } - evpool.evidenceList.PushBack(ev) + evpool.evidenceList.PushBack(dve) + + evpool.logger.Info("verified new evidence of byzantine behavior", "evidence", dve) } // reset consensus buffer - evpool.consensusBuffer = make([]types.Evidence, 0) + evpool.consensusBuffer = make([]duplicateVoteSet, 0) +} + +type duplicateVoteSet struct { + VoteA *types.Vote + VoteB *types.Vote } func bytesToEv(evBytes []byte) (types.Evidence, error) { diff --git a/evidence/pool_test.go b/evidence/pool_test.go index a246219d5..b34e77c00 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -136,13 +136,17 @@ func TestAddExpiredEvidence(t *testing.T) { } } -func TestAddEvidenceFromConsensus(t *testing.T) { +func TestReportConflictingVotes(t *testing.T) { var height int64 = 10 - pool, val := defaultTestPool(t, height) - ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime, val, evidenceChainID) + pool, pv := defaultTestPool(t, height) + val := types.NewValidator(pv.PrivKey.PubKey(), 10) + ev := types.NewMockDuplicateVoteEvidenceWithValidator(height+1, defaultEvidenceTime, pv, evidenceChainID) + + pool.ReportConflictingVotes(ev.VoteA, ev.VoteB) - require.NoError(t, pool.AddEvidenceFromConsensus(ev)) + // shouldn't be able to submit the same evidence twice + pool.ReportConflictingVotes(ev.VoteA, ev.VoteB) // evidence from consensus should not be added immediately but reside in the consensus buffer evList, evSize := pool.PendingEvidence(defaultEvidenceMaxBytes) @@ -155,19 +159,13 @@ func TestAddEvidenceFromConsensus(t *testing.T) { // move to next height and update state and evidence pool state := pool.State() state.LastBlockHeight++ + state.LastBlockTime = ev.Time() + state.LastValidators = types.NewValidatorSet([]*types.Validator{val}) pool.Update(state, []types.Evidence{}) // should be able to retrieve evidence from pool evList, _ = pool.PendingEvidence(defaultEvidenceMaxBytes) require.Equal(t, []types.Evidence{ev}, evList) - - // shouldn't be able to submit the same evidence twice - require.NoError(t, pool.AddEvidenceFromConsensus(ev)) - state = pool.State() - state.LastBlockHeight++ - pool.Update(state, []types.Evidence{}) - evList2, _ := pool.PendingEvidence(defaultEvidenceMaxBytes) - require.Equal(t, evList, evList2) } func TestEvidencePoolUpdate(t *testing.T) { diff --git a/node/node_test.go b/node/node_test.go index b6fdc26e9..6ec7517e2 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -266,8 +266,7 @@ func TestCreateProposalBlock(t *testing.T) { for currentBytes <= maxEvidenceBytes { ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, time.Now(), privVals[0], "test-chain") currentBytes += int64(len(ev.Bytes())) - err := evidencePool.AddEvidenceFromConsensus(ev) - require.NoError(t, err) + evidencePool.ReportConflictingVotes(ev.VoteA, ev.VoteB) } evList, size := evidencePool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes) diff --git a/state/services.go b/state/services.go index 5213f8fdd..a46863904 100644 --- a/state/services.go +++ b/state/services.go @@ -53,9 +53,7 @@ type EmptyEvidencePool struct{} func (EmptyEvidencePool) PendingEvidence(maxBytes int64) (ev []types.Evidence, size int64) { return nil, 0 } -func (EmptyEvidencePool) AddEvidence(types.Evidence) error { return nil } -func (EmptyEvidencePool) Update(State, types.EvidenceList) {} -func (EmptyEvidencePool) CheckEvidence(evList types.EvidenceList) error { return nil } -func (EmptyEvidencePool) AddEvidenceFromConsensus(evidence types.Evidence) error { - return nil -} +func (EmptyEvidencePool) AddEvidence(types.Evidence) error { return nil } +func (EmptyEvidencePool) Update(State, types.EvidenceList) {} +func (EmptyEvidencePool) CheckEvidence(evList types.EvidenceList) error { return nil } +func (EmptyEvidencePool) ReportConflictingVotes(voteA, voteB *types.Vote) {} diff --git a/test/maverick/consensus/state.go b/test/maverick/consensus/state.go index 852850425..76060e10b 100644 --- a/test/maverick/consensus/state.go +++ b/test/maverick/consensus/state.go @@ -465,9 +465,8 @@ type txNotifier interface { // interface to the evidence pool type evidencePool interface { - // Adds consensus based evidence to the evidence pool where time is the time - // of the block where the offense occurred and the validator set is the current one. - AddEvidenceFromConsensus(evidence types.Evidence) error + // reports conflicting votes to the evidence pool to be processed into evidence + ReportConflictingVotes(voteA, voteB *types.Vote) } //---------------------------------------- @@ -1768,17 +1767,7 @@ func (cs *State) tryAddVote(vote *types.Vote, peerID p2p.NodeID) (bool, error) { vote.Type) return added, err } - var timestamp time.Time - if voteErr.VoteA.Height == cs.state.InitialHeight { - timestamp = cs.state.LastBlockTime // genesis time - } else { - timestamp = sm.MedianTime(cs.LastCommit.MakeCommit(), cs.LastValidators) - } - ev := types.NewDuplicateVoteEvidence(voteErr.VoteA, voteErr.VoteB, timestamp, cs.Validators) - evidenceErr := cs.evpool.AddEvidenceFromConsensus(ev) - if evidenceErr != nil { - cs.Logger.Error("Failed to add evidence to the evidence pool", "err", evidenceErr) - } + cs.evpool.ReportConflictingVotes(voteErr.VoteA, voteErr.VoteB) return added, err } else if err == types.ErrVoteNonDeterministicSignature { cs.Logger.Debug("Vote has non-deterministic signature", "err", err)