From f1c53c7358f9d414f34295e98fd08de90fc9aa00 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 24 May 2018 17:49:37 -0400 Subject: [PATCH 01/13] evidence: dont send evidence to unsynced peers * only send evidence to peers that are synced enough to validate it all * closes #1624 --- evidence/reactor.go | 36 +++++++++++++++++++++++++++++---- evidence/reactor_test.go | 43 ++++++++++++++++++++++++++++++++++++++++ evidence/store.go | 2 +- state/execution.go | 2 +- 4 files changed, 77 insertions(+), 6 deletions(-) diff --git a/evidence/reactor.go b/evidence/reactor.go index a6aa66b1f..de18f6b8e 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/go-amino" "github.com/tendermint/tmlibs/log" + cstypes "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -118,21 +119,48 @@ func (evR *EvidenceReactor) broadcastRoutine() { case evidence := <-evR.evpool.EvidenceChan(): // broadcast some new evidence msg := &EvidenceListMessage{[]types.Evidence{evidence}} - evR.Switch.Broadcast(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) + evR.broadcastEvidenceListMsg(msg) - // TODO: Broadcast runs asynchronously, so this should wait on the successChan - // in another routine before marking to be proper. + // TODO: the broadcast here is just doing TrySend. + // We should make sure the send succeeds before marking broadcasted. evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence) case <-ticker.C: // broadcast all pending evidence msg := &EvidenceListMessage{evR.evpool.PendingEvidence()} - evR.Switch.Broadcast(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) + evR.broadcastEvidenceListMsg(msg) case <-evR.Quit(): return } } } +func (evR *EvidenceReactor) broadcastEvidenceListMsg(msg *EvidenceListMessage) { + // NOTE: we dont send evidence to peers higher than their height, + // because they can't validate it (don't have validators from the height). + // So, for now, only send the `msg` to peers synced to the highest height in the list. + // TODO: send each peer all the evidence below its current height - + // might require a routine per peer, like the mempool. + + var maxHeight int64 + for _, ev := range msg.Evidence { + if ev.Height() > maxHeight { + maxHeight = ev.Height() + } + } + + for _, peer := range evR.Switch.Peers().List() { + ps := peer.Get(types.PeerStateKey).(PeerState) + rs := ps.GetRoundState() + if rs.Height >= maxHeight { + peer.TrySend(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) + } + } +} + +type PeerState interface { + GetRoundState() *cstypes.PeerRoundState +} + //----------------------------------------------------------------------------- // Messages diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 11c63929b..6b4b24a07 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -14,6 +14,7 @@ import ( "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" + cstypes "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -130,8 +131,50 @@ func TestReactorBroadcastEvidence(t *testing.T) { // make reactors from statedb reactors := makeAndConnectEvidenceReactors(config, stateDBs) + // set the peer height on each reactor + for _, r := range reactors { + for _, peer := range r.Switch.Peers().List() { + ps := peerState{height} + peer.Set(types.PeerStateKey, ps) + } + } + // send a bunch of valid evidence to the first reactor's evpool // and wait for them all to be received in the others evList := sendEvidence(t, reactors[0].evpool, valAddr, NUM_EVIDENCE) waitForEvidence(t, evList, reactors) } + +type peerState struct { + height int64 +} + +func (ps peerState) GetRoundState() *cstypes.PeerRoundState { + return &cstypes.PeerRoundState{ + Height: ps.height, + } +} + +func TestReactorSelectiveBroadcast(t *testing.T) { + config := cfg.TestConfig() + + valAddr := []byte("myval") + height1 := int64(NUM_EVIDENCE) + 10 + height2 := int64(NUM_EVIDENCE) / 2 + + // DB1 is ahead of DB2 + stateDB1 := initializeValidatorState(valAddr, height1) + stateDB2 := initializeValidatorState(valAddr, height2) + + // make reactors from statedb + reactors := makeAndConnectEvidenceReactors(config, []dbm.DB{stateDB1, stateDB2}) + peer := reactors[0].Switch.Peers().List()[0] + ps := peerState{height2} + peer.Set(types.PeerStateKey, ps) + + // send a bunch of valid evidence to the first reactor's evpool + evList := sendEvidence(t, reactors[0].evpool, valAddr, NUM_EVIDENCE) + + // only ones less than the peers height should make it through + waitForEvidence(t, evList[:NUM_EVIDENCE/2], reactors[1:2]) +} diff --git a/evidence/store.go b/evidence/store.go index 081715e3c..abfc5e1fe 100644 --- a/evidence/store.go +++ b/evidence/store.go @@ -164,7 +164,7 @@ func (store *EvidenceStore) MarkEvidenceAsBroadcasted(evidence types.Evidence) { store.db.Delete(key) } -// MarkEvidenceAsPending removes evidence from pending and outqueue and sets the state to committed. +// MarkEvidenceAsCommitted removes evidence from pending and outqueue and sets the state to committed. func (store *EvidenceStore) MarkEvidenceAsCommitted(evidence types.Evidence) { // if its committed, its been broadcast store.MarkEvidenceAsBroadcasted(evidence) diff --git a/state/execution.go b/state/execution.go index 3fe35e2fa..81d157f1d 100644 --- a/state/execution.go +++ b/state/execution.go @@ -106,7 +106,7 @@ func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block fail.Fail() // XXX - // Update evpool now that state is saved + // Update evpool now that state is saved. // TODO: handle the crash/recover scenario // ie. (may need to call Update for last block) blockExec.evpool.Update(block) From 53937a8129d8b4a9a18d2bd4ad6ea1b7de0c9fc1 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 24 May 2018 17:56:57 -0400 Subject: [PATCH 02/13] changelog --- CHANGELOG.md | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 786b5ef19..b9735a44f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,6 @@ ## 0.19.9 -*TBD* ## 0.19.8 @@ -38,7 +37,7 @@ FEATURES IMPROVEMENTS: -- [consensus] consensus reactor now receives events from a separate event bus, +- [consensus] Consensus reactor now receives events from a separate synchronous event bus, which is not dependant on external RPC load - [consensus/wal] do not look for height in older files if we've seen height - 1 - [docs] Various cleanup and link fixes @@ -51,6 +50,12 @@ BUG FIXES - [blockchain] Fix fast-sync deadlock during high peer turnover +BUG FIX: + +- [evidence] Dont send peers evidence from heights they haven't synced to yet +- [p2p] Refuse connections to more than one peer with the same IP +- [docs] Various fixes + ## 0.19.5 *May 20th, 2018* From 19d95b5410fa698e39e27c8647da5a30f7c6e0e0 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 4 Jun 2018 13:21:05 -0700 Subject: [PATCH 03/13] evidence: check peerstate exists; dont send old evidence --- evidence/reactor.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/evidence/reactor.go b/evidence/reactor.go index de18f6b8e..625663df2 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -138,7 +138,7 @@ func (evR *EvidenceReactor) broadcastEvidenceListMsg(msg *EvidenceListMessage) { // NOTE: we dont send evidence to peers higher than their height, // because they can't validate it (don't have validators from the height). // So, for now, only send the `msg` to peers synced to the highest height in the list. - // TODO: send each peer all the evidence below its current height - + // TODO: send each peer all the evidence below its current height within maxAge - // might require a routine per peer, like the mempool. var maxHeight int64 @@ -149,9 +149,17 @@ func (evR *EvidenceReactor) broadcastEvidenceListMsg(msg *EvidenceListMessage) { } for _, peer := range evR.Switch.Peers().List() { - ps := peer.Get(types.PeerStateKey).(PeerState) + ps, ok := peer.Get(types.PeerStateKey).(PeerState) + if !ok { + evR.Logger.Info("Found peer without PeerState", "peer", peer) + continue + } + + // only send to peer if maxHeight < peerHeight < maxHeight + maxAge + maxAge := evR.evpool.State().ConsensusParams.EvidenceParams.MaxAge rs := ps.GetRoundState() - if rs.Height >= maxHeight { + if rs.Height >= maxHeight && + rs.Height < maxAge+maxHeight { peer.TrySend(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) } } From dd62f06994399e3386c411cc50af224755e68e47 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 4 Jun 2018 13:31:57 -0700 Subject: [PATCH 04/13] state: s -> state --- state/execution.go | 46 ++++++++++++++++----------------- state/state.go | 62 ++++++++++++++++++++++----------------------- state/store.go | 14 +++++----- state/validation.go | 52 ++++++++++++++++++------------------- 4 files changed, 87 insertions(+), 87 deletions(-) diff --git a/state/execution.go b/state/execution.go index 81d157f1d..e5904ecfc 100644 --- a/state/execution.go +++ b/state/execution.go @@ -59,8 +59,8 @@ func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) // If the block is invalid, it returns an error. // Validation does not mutate state, but does require historical information from the stateDB, // ie. to verify evidence from a validator at an old height. -func (blockExec *BlockExecutor) ValidateBlock(s State, block *types.Block) error { - return validateBlock(blockExec.db, s, block) +func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) error { + return validateBlock(blockExec.db, state, block) } // ApplyBlock validates the block against the state, executes it against the app, @@ -68,15 +68,15 @@ func (blockExec *BlockExecutor) ValidateBlock(s State, block *types.Block) error // It's the only function that needs to be called // from outside this package to process and commit an entire block. // It takes a blockID to avoid recomputing the parts hash. -func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block *types.Block) (State, error) { +func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, block *types.Block) (State, error) { - if err := blockExec.ValidateBlock(s, block); err != nil { - return s, ErrInvalidBlock(err) + if err := blockExec.ValidateBlock(state, block); err != nil { + return state, ErrInvalidBlock(err) } abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block) if err != nil { - return s, ErrProxyAppConn(err) + return state, ErrProxyAppConn(err) } fail.Fail() // XXX @@ -87,22 +87,22 @@ func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block fail.Fail() // XXX // update the state with the block and responses - s, err = updateState(s, blockID, block.Header, abciResponses) + state, err = updateState(state, blockID, block.Header, abciResponses) if err != nil { - return s, fmt.Errorf("Commit failed for application: %v", err) + return state, fmt.Errorf("Commit failed for application: %v", err) } // lock mempool, commit state, update mempoool appHash, err := blockExec.Commit(block) if err != nil { - return s, fmt.Errorf("Commit failed for application: %v", err) + return state, fmt.Errorf("Commit failed for application: %v", err) } fail.Fail() // XXX // update the app hash and save the state - s.AppHash = appHash - SaveState(blockExec.db, s) + state.AppHash = appHash + SaveState(blockExec.db, state) fail.Fail() // XXX @@ -115,7 +115,7 @@ func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block // NOTE: if we crash between Commit and Save, events wont be fired during replay fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses) - return s, nil + return state, nil } // Commit locks the mempool, runs the ABCI Commit message, and updates the mempool. @@ -283,20 +283,20 @@ func updateValidators(currentSet *types.ValidatorSet, updates []abci.Validator) } // updateState returns a new State updated according to the header and responses. -func updateState(s State, blockID types.BlockID, header *types.Header, +func updateState(state State, blockID types.BlockID, header *types.Header, abciResponses *ABCIResponses) (State, error) { // copy the valset so we can apply changes from EndBlock // and update s.LastValidators and s.Validators - prevValSet := s.Validators.Copy() + prevValSet := state.Validators.Copy() nextValSet := prevValSet.Copy() // update the validator set with the latest abciResponses - lastHeightValsChanged := s.LastHeightValidatorsChanged + lastHeightValsChanged := state.LastHeightValidatorsChanged if len(abciResponses.EndBlock.ValidatorUpdates) > 0 { err := updateValidators(nextValSet, abciResponses.EndBlock.ValidatorUpdates) if err != nil { - return s, fmt.Errorf("Error changing validator set: %v", err) + return state, fmt.Errorf("Error changing validator set: %v", err) } // change results from this height but only applies to the next height lastHeightValsChanged = header.Height + 1 @@ -306,14 +306,14 @@ func updateState(s State, blockID types.BlockID, header *types.Header, nextValSet.IncrementAccum(1) // update the params with the latest abciResponses - nextParams := s.ConsensusParams - lastHeightParamsChanged := s.LastHeightConsensusParamsChanged + nextParams := state.ConsensusParams + lastHeightParamsChanged := state.LastHeightConsensusParamsChanged if abciResponses.EndBlock.ConsensusParamUpdates != nil { // NOTE: must not mutate s.ConsensusParams - nextParams = s.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates) + nextParams = state.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates) err := nextParams.Validate() if err != nil { - return s, fmt.Errorf("Error updating consensus params: %v", err) + return state, fmt.Errorf("Error updating consensus params: %v", err) } // change results from this height but only applies to the next height lastHeightParamsChanged = header.Height + 1 @@ -322,13 +322,13 @@ func updateState(s State, blockID types.BlockID, header *types.Header, // NOTE: the AppHash has not been populated. // It will be filled on state.Save. return State{ - ChainID: s.ChainID, + ChainID: state.ChainID, LastBlockHeight: header.Height, - LastBlockTotalTx: s.LastBlockTotalTx + header.NumTxs, + LastBlockTotalTx: state.LastBlockTotalTx + header.NumTxs, LastBlockID: blockID, LastBlockTime: header.Time, Validators: nextValSet, - LastValidators: s.Validators.Copy(), + LastValidators: state.Validators.Copy(), LastHeightValidatorsChanged: lastHeightValsChanged, ConsensusParams: nextParams, LastHeightConsensusParamsChanged: lastHeightParamsChanged, diff --git a/state/state.go b/state/state.go index aa6e04b66..3bc08dae3 100644 --- a/state/state.go +++ b/state/state.go @@ -55,67 +55,67 @@ type State struct { } // Copy makes a copy of the State for mutating. -func (s State) Copy() State { +func (state State) Copy() State { return State{ - ChainID: s.ChainID, + ChainID: state.ChainID, - LastBlockHeight: s.LastBlockHeight, - LastBlockTotalTx: s.LastBlockTotalTx, - LastBlockID: s.LastBlockID, - LastBlockTime: s.LastBlockTime, + LastBlockHeight: state.LastBlockHeight, + LastBlockTotalTx: state.LastBlockTotalTx, + LastBlockID: state.LastBlockID, + LastBlockTime: state.LastBlockTime, - Validators: s.Validators.Copy(), - LastValidators: s.LastValidators.Copy(), - LastHeightValidatorsChanged: s.LastHeightValidatorsChanged, + Validators: state.Validators.Copy(), + LastValidators: state.LastValidators.Copy(), + LastHeightValidatorsChanged: state.LastHeightValidatorsChanged, - ConsensusParams: s.ConsensusParams, - LastHeightConsensusParamsChanged: s.LastHeightConsensusParamsChanged, + ConsensusParams: state.ConsensusParams, + LastHeightConsensusParamsChanged: state.LastHeightConsensusParamsChanged, - AppHash: s.AppHash, + AppHash: state.AppHash, - LastResultsHash: s.LastResultsHash, + LastResultsHash: state.LastResultsHash, } } // Equals returns true if the States are identical. -func (s State) Equals(s2 State) bool { - sbz, s2bz := s.Bytes(), s2.Bytes() +func (state State) Equals(state2 State) bool { + sbz, s2bz := state.Bytes(), state2.Bytes() return bytes.Equal(sbz, s2bz) } // Bytes serializes the State using go-amino. -func (s State) Bytes() []byte { - return cdc.MustMarshalBinaryBare(s) +func (state State) Bytes() []byte { + return cdc.MustMarshalBinaryBare(state) } // IsEmpty returns true if the State is equal to the empty State. -func (s State) IsEmpty() bool { - return s.Validators == nil // XXX can't compare to Empty +func (state State) IsEmpty() bool { + return state.Validators == nil // XXX can't compare to Empty } // GetValidators returns the last and current validator sets. -func (s State) GetValidators() (last *types.ValidatorSet, current *types.ValidatorSet) { - return s.LastValidators, s.Validators +func (state State) GetValidators() (last *types.ValidatorSet, current *types.ValidatorSet) { + return state.LastValidators, state.Validators } //------------------------------------------------------------------------ // Create a block from the latest state // MakeBlock builds a block with the given txs and commit from the current state. -func (s State) MakeBlock(height int64, txs []types.Tx, commit *types.Commit) (*types.Block, *types.PartSet) { +func (state State) MakeBlock(height int64, txs []types.Tx, commit *types.Commit) (*types.Block, *types.PartSet) { // build base block block := types.MakeBlock(height, txs, commit) // fill header with state data - block.ChainID = s.ChainID - block.TotalTxs = s.LastBlockTotalTx + block.NumTxs - block.LastBlockID = s.LastBlockID - block.ValidatorsHash = s.Validators.Hash() - block.AppHash = s.AppHash - block.ConsensusHash = s.ConsensusParams.Hash() - block.LastResultsHash = s.LastResultsHash - - return block, block.MakePartSet(s.ConsensusParams.BlockGossip.BlockPartSizeBytes) + block.ChainID = state.ChainID + block.TotalTxs = state.LastBlockTotalTx + block.NumTxs + block.LastBlockID = state.LastBlockID + block.ValidatorsHash = state.Validators.Hash() + block.AppHash = state.AppHash + block.ConsensusHash = state.ConsensusParams.Hash() + block.LastResultsHash = state.LastResultsHash + + return block, block.MakePartSet(state.ConsensusParams.BlockGossip.BlockPartSizeBytes) } //------------------------------------------------------------------------ diff --git a/state/store.go b/state/store.go index ee0619d34..60acf9e1e 100644 --- a/state/store.go +++ b/state/store.go @@ -80,15 +80,15 @@ func loadState(db dbm.DB, key []byte) (state State) { } // SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database. -func SaveState(db dbm.DB, s State) { - saveState(db, s, stateKey) +func SaveState(db dbm.DB, state State) { + saveState(db, state, stateKey) } -func saveState(db dbm.DB, s State, key []byte) { - nextHeight := s.LastBlockHeight + 1 - saveValidatorsInfo(db, nextHeight, s.LastHeightValidatorsChanged, s.Validators) - saveConsensusParamsInfo(db, nextHeight, s.LastHeightConsensusParamsChanged, s.ConsensusParams) - db.SetSync(stateKey, s.Bytes()) +func saveState(db dbm.DB, state State, key []byte) { + nextHeight := state.LastBlockHeight + 1 + saveValidatorsInfo(db, nextHeight, state.LastHeightValidatorsChanged, state.Validators) + saveConsensusParamsInfo(db, nextHeight, state.LastHeightConsensusParamsChanged, state.ConsensusParams) + db.SetSync(stateKey, state.Bytes()) } //------------------------------------------------------------------------ diff --git a/state/validation.go b/state/validation.go index fb3e8d13d..6f4f383d2 100644 --- a/state/validation.go +++ b/state/validation.go @@ -12,18 +12,18 @@ import ( //----------------------------------------------------- // Validate block -func validateBlock(stateDB dbm.DB, s State, b *types.Block) error { +func validateBlock(stateDB dbm.DB, state State, b *types.Block) error { // validate internal consistency if err := b.ValidateBasic(); err != nil { return err } // validate basic info - if b.ChainID != s.ChainID { - return fmt.Errorf("Wrong Block.Header.ChainID. Expected %v, got %v", s.ChainID, b.ChainID) + if b.ChainID != state.ChainID { + return fmt.Errorf("Wrong Block.Header.ChainID. Expected %v, got %v", state.ChainID, b.ChainID) } - if b.Height != s.LastBlockHeight+1 { - return fmt.Errorf("Wrong Block.Header.Height. Expected %v, got %v", s.LastBlockHeight+1, b.Height) + if b.Height != state.LastBlockHeight+1 { + return fmt.Errorf("Wrong Block.Header.Height. Expected %v, got %v", state.LastBlockHeight+1, b.Height) } /* TODO: Determine bounds for Time See blockchain/reactor "stopSyncingDurationMinutes" @@ -34,26 +34,26 @@ func validateBlock(stateDB dbm.DB, s State, b *types.Block) error { */ // validate prev block info - if !b.LastBlockID.Equals(s.LastBlockID) { - return fmt.Errorf("Wrong Block.Header.LastBlockID. Expected %v, got %v", s.LastBlockID, b.LastBlockID) + if !b.LastBlockID.Equals(state.LastBlockID) { + return fmt.Errorf("Wrong Block.Header.LastBlockID. Expected %v, got %v", state.LastBlockID, b.LastBlockID) } newTxs := int64(len(b.Data.Txs)) - if b.TotalTxs != s.LastBlockTotalTx+newTxs { - return fmt.Errorf("Wrong Block.Header.TotalTxs. Expected %v, got %v", s.LastBlockTotalTx+newTxs, b.TotalTxs) + if b.TotalTxs != state.LastBlockTotalTx+newTxs { + return fmt.Errorf("Wrong Block.Header.TotalTxs. Expected %v, got %v", state.LastBlockTotalTx+newTxs, b.TotalTxs) } // validate app info - if !bytes.Equal(b.AppHash, s.AppHash) { - return fmt.Errorf("Wrong Block.Header.AppHash. Expected %X, got %v", s.AppHash, b.AppHash) + if !bytes.Equal(b.AppHash, state.AppHash) { + return fmt.Errorf("Wrong Block.Header.AppHash. Expected %X, got %v", state.AppHash, b.AppHash) } - if !bytes.Equal(b.ConsensusHash, s.ConsensusParams.Hash()) { - return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %v", s.ConsensusParams.Hash(), b.ConsensusHash) + if !bytes.Equal(b.ConsensusHash, state.ConsensusParams.Hash()) { + return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %v", state.ConsensusParams.Hash(), b.ConsensusHash) } - if !bytes.Equal(b.LastResultsHash, s.LastResultsHash) { - return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %v", s.LastResultsHash, b.LastResultsHash) + if !bytes.Equal(b.LastResultsHash, state.LastResultsHash) { + return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %v", state.LastResultsHash, b.LastResultsHash) } - if !bytes.Equal(b.ValidatorsHash, s.Validators.Hash()) { - return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %v", s.Validators.Hash(), b.ValidatorsHash) + if !bytes.Equal(b.ValidatorsHash, state.Validators.Hash()) { + return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %v", state.Validators.Hash(), b.ValidatorsHash) } // Validate block LastCommit. @@ -62,19 +62,19 @@ func validateBlock(stateDB dbm.DB, s State, b *types.Block) error { return errors.New("Block at height 1 (first block) should have no LastCommit precommits") } } else { - if len(b.LastCommit.Precommits) != s.LastValidators.Size() { + if len(b.LastCommit.Precommits) != state.LastValidators.Size() { return fmt.Errorf("Invalid block commit size. Expected %v, got %v", - s.LastValidators.Size(), len(b.LastCommit.Precommits)) + state.LastValidators.Size(), len(b.LastCommit.Precommits)) } - err := s.LastValidators.VerifyCommit( - s.ChainID, s.LastBlockID, b.Height-1, b.LastCommit) + err := state.LastValidators.VerifyCommit( + state.ChainID, state.LastBlockID, b.Height-1, b.LastCommit) if err != nil { return err } } for _, ev := range b.Evidence.Evidence { - if err := VerifyEvidence(stateDB, s, ev); err != nil { + if err := VerifyEvidence(stateDB, state, ev); err != nil { return types.NewEvidenceInvalidErr(ev, err) } } @@ -87,17 +87,17 @@ func validateBlock(stateDB dbm.DB, s State, b *types.Block) error { // VerifyEvidence verifies the evidence fully by checking it is internally // consistent and sufficiently recent. -func VerifyEvidence(stateDB dbm.DB, s State, evidence types.Evidence) error { - height := s.LastBlockHeight +func VerifyEvidence(stateDB dbm.DB, state State, evidence types.Evidence) error { + height := state.LastBlockHeight evidenceAge := height - evidence.Height() - maxAge := s.ConsensusParams.EvidenceParams.MaxAge + maxAge := state.ConsensusParams.EvidenceParams.MaxAge if evidenceAge > maxAge { return fmt.Errorf("Evidence from height %d is too old. Min height is %d", evidence.Height(), height-maxAge) } - if err := evidence.Verify(s.ChainID); err != nil { + if err := evidence.Verify(state.ChainID); err != nil { return err } From edb851280ad8fe5c7b67e568865702f94e0ae832 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 4 Jun 2018 13:33:22 -0700 Subject: [PATCH 05/13] state: b -> block --- state/validation.go | 52 ++++++++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/state/validation.go b/state/validation.go index 6f4f383d2..0726b61ea 100644 --- a/state/validation.go +++ b/state/validation.go @@ -12,68 +12,68 @@ import ( //----------------------------------------------------- // Validate block -func validateBlock(stateDB dbm.DB, state State, b *types.Block) error { +func validateBlock(stateDB dbm.DB, state State, block *types.Block) error { // validate internal consistency - if err := b.ValidateBasic(); err != nil { + if err := block.ValidateBasic(); err != nil { return err } // validate basic info - if b.ChainID != state.ChainID { - return fmt.Errorf("Wrong Block.Header.ChainID. Expected %v, got %v", state.ChainID, b.ChainID) + if block.ChainID != state.ChainID { + return fmt.Errorf("Wrong Block.Header.ChainID. Expected %v, got %v", state.ChainID, block.ChainID) } - if b.Height != state.LastBlockHeight+1 { - return fmt.Errorf("Wrong Block.Header.Height. Expected %v, got %v", state.LastBlockHeight+1, b.Height) + if block.Height != state.LastBlockHeight+1 { + return fmt.Errorf("Wrong Block.Header.Height. Expected %v, got %v", state.LastBlockHeight+1, block.Height) } /* TODO: Determine bounds for Time See blockchain/reactor "stopSyncingDurationMinutes" - if !b.Time.After(lastBlockTime) { + if !block.Time.After(lastBlockTime) { return errors.New("Invalid Block.Header.Time") } */ // validate prev block info - if !b.LastBlockID.Equals(state.LastBlockID) { - return fmt.Errorf("Wrong Block.Header.LastBlockID. Expected %v, got %v", state.LastBlockID, b.LastBlockID) + if !block.LastBlockID.Equals(state.LastBlockID) { + return fmt.Errorf("Wrong Block.Header.LastBlockID. Expected %v, got %v", state.LastBlockID, block.LastBlockID) } - newTxs := int64(len(b.Data.Txs)) - if b.TotalTxs != state.LastBlockTotalTx+newTxs { - return fmt.Errorf("Wrong Block.Header.TotalTxs. Expected %v, got %v", state.LastBlockTotalTx+newTxs, b.TotalTxs) + newTxs := int64(len(block.Data.Txs)) + if block.TotalTxs != state.LastBlockTotalTx+newTxs { + return fmt.Errorf("Wrong Block.Header.TotalTxs. Expected %v, got %v", state.LastBlockTotalTx+newTxs, block.TotalTxs) } // validate app info - if !bytes.Equal(b.AppHash, state.AppHash) { - return fmt.Errorf("Wrong Block.Header.AppHash. Expected %X, got %v", state.AppHash, b.AppHash) + if !bytes.Equal(block.AppHash, state.AppHash) { + return fmt.Errorf("Wrong Block.Header.AppHash. Expected %X, got %v", state.AppHash, block.AppHash) } - if !bytes.Equal(b.ConsensusHash, state.ConsensusParams.Hash()) { - return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %v", state.ConsensusParams.Hash(), b.ConsensusHash) + if !bytes.Equal(block.ConsensusHash, state.ConsensusParams.Hash()) { + return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %v", state.ConsensusParams.Hash(), block.ConsensusHash) } - if !bytes.Equal(b.LastResultsHash, state.LastResultsHash) { - return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %v", state.LastResultsHash, b.LastResultsHash) + if !bytes.Equal(block.LastResultsHash, state.LastResultsHash) { + return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %v", state.LastResultsHash, block.LastResultsHash) } - if !bytes.Equal(b.ValidatorsHash, state.Validators.Hash()) { - return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %v", state.Validators.Hash(), b.ValidatorsHash) + if !bytes.Equal(block.ValidatorsHash, state.Validators.Hash()) { + return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %v", state.Validators.Hash(), block.ValidatorsHash) } // Validate block LastCommit. - if b.Height == 1 { - if len(b.LastCommit.Precommits) != 0 { + if block.Height == 1 { + if len(block.LastCommit.Precommits) != 0 { return errors.New("Block at height 1 (first block) should have no LastCommit precommits") } } else { - if len(b.LastCommit.Precommits) != state.LastValidators.Size() { + if len(block.LastCommit.Precommits) != state.LastValidators.Size() { return fmt.Errorf("Invalid block commit size. Expected %v, got %v", - state.LastValidators.Size(), len(b.LastCommit.Precommits)) + state.LastValidators.Size(), len(block.LastCommit.Precommits)) } err := state.LastValidators.VerifyCommit( - state.ChainID, state.LastBlockID, b.Height-1, b.LastCommit) + state.ChainID, state.LastBlockID, block.Height-1, block.LastCommit) if err != nil { return err } } - for _, ev := range b.Evidence.Evidence { + for _, ev := range block.Evidence.Evidence { if err := VerifyEvidence(stateDB, state, ev); err != nil { return types.NewEvidenceInvalidErr(ev, err) } From 3d33226e80dd5363fb5cc72e08d316e0381b8c7f Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 4 Jun 2018 13:46:34 -0700 Subject: [PATCH 06/13] move types/services.go to state pkg. pass State to evpool.Update --- blockchain/reactor_test.go | 2 +- consensus/common_test.go | 2 +- consensus/replay.go | 6 ++-- consensus/replay_file.go | 2 +- consensus/replay_test.go | 4 +-- consensus/state.go | 8 ++--- consensus/wal_generator.go | 4 +-- evidence/pool.go | 6 ++-- rpc/core/pipe.go | 12 +++---- state/execution.go | 14 ++++---- state/execution_test.go | 2 +- {types => state}/services.go | 63 ++++++++++++++++-------------------- 12 files changed, 58 insertions(+), 67 deletions(-) rename {types => state}/services.go (57%) diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 49913c10e..c7f7e9afd 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -36,7 +36,7 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe fastSync := true var nilApp proxy.AppConnConsensus blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), nilApp, - types.MockMempool{}, types.MockEvidencePool{}) + sm.MockMempool{}, sm.MockEvidencePool{}) bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor.SetLogger(logger.With("module", "blockchain")) diff --git a/consensus/common_test.go b/consensus/common_test.go index 94ab70de5..f50e57699 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -262,7 +262,7 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S } // mock the evidence pool - evpool := types.MockEvidencePool{} + evpool := sm.MockEvidencePool{} // Make ConsensusState stateDB := dbm.NewMemDB() diff --git a/consensus/replay.go b/consensus/replay.go index 265ab5388..13d665f7a 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -196,7 +196,7 @@ func makeHeightSearchFunc(height int64) auto.SearchFunc { type Handshaker struct { stateDB dbm.DB initialState sm.State - store types.BlockStore + store sm.BlockStore appState json.RawMessage logger log.Logger @@ -204,7 +204,7 @@ type Handshaker struct { } func NewHandshaker(stateDB dbm.DB, state sm.State, - store types.BlockStore, appState json.RawMessage) *Handshaker { + store sm.BlockStore, appState json.RawMessage) *Handshaker { return &Handshaker{ stateDB: stateDB, @@ -390,7 +390,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap block := h.store.LoadBlock(height) meta := h.store.LoadBlockMeta(height) - blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, types.MockMempool{}, types.MockEvidencePool{}) + blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, sm.MockMempool{}, sm.MockEvidencePool{}) var err error state, err = blockExec.ApplyBlock(state, meta.BlockID, block) diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 58f022274..4f8343469 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -310,7 +310,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err)) } - mempool, evpool := types.MockMempool{}, types.MockEvidencePool{} + mempool, evpool := sm.MockMempool{}, sm.MockEvidencePool{} blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) consensusState := NewConsensusState(csConfig, state.Copy(), blockExec, diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 766a6e52e..e0f8a4b9a 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -263,8 +263,8 @@ const ( ) var ( - mempool = types.MockMempool{} - evpool = types.MockEvidencePool{} + mempool = sm.MockMempool{} + evpool = sm.MockEvidencePool{} ) //--------------------------------------- diff --git a/consensus/state.go b/consensus/state.go index 3b713e2ec..d46ec5830 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -76,9 +76,9 @@ type ConsensusState struct { // services for creating and executing blocks // TODO: encapsulate all of this in one "BlockManager" blockExec *sm.BlockExecutor - blockStore types.BlockStore - mempool types.Mempool - evpool types.EvidencePool + blockStore sm.BlockStore + mempool sm.Mempool + evpool sm.EvidencePool // internal state mtx sync.Mutex @@ -118,7 +118,7 @@ type ConsensusState struct { } // NewConsensusState returns a new ConsensusState. -func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState { +func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, mempool sm.Mempool, evpool sm.EvidencePool) *ConsensusState { cs := &ConsensusState{ config: config, blockExec: blockExec, diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index 5de808868..38bed4ac1 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -65,8 +65,8 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { return nil, errors.Wrap(err, "failed to start event bus") } defer eventBus.Stop() - mempool := types.MockMempool{} - evpool := types.MockEvidencePool{} + mempool := sm.MockMempool{} + evpool := sm.MockEvidencePool{} blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState.SetLogger(logger) diff --git a/evidence/pool.go b/evidence/pool.go index 07c351343..820fead47 100644 --- a/evidence/pool.go +++ b/evidence/pool.go @@ -68,13 +68,13 @@ func (evpool *EvidencePool) State() sm.State { } // Update loads the latest -func (evpool *EvidencePool) Update(block *types.Block) { +func (evpool *EvidencePool) Update(block *types.Block, state sm.State) { evpool.mtx.Lock() defer evpool.mtx.Unlock() - state := sm.LoadState(evpool.stateDB) + // sanity check if state.LastBlockHeight != block.Height { - panic(fmt.Sprintf("EvidencePool.Update: loaded state with height %d when block.Height=%d", state.LastBlockHeight, block.Height)) + panic(fmt.Sprintf("Failed EvidencePool.Update sanity check: got state.Height=%d with block.Height=%d", state.LastBlockHeight, block.Height)) } evpool.state = state diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index e4bb5a294..d6b02f032 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -51,9 +51,9 @@ var ( // interfaces defined in types and above stateDB dbm.DB - blockStore types.BlockStore - mempool types.Mempool - evidencePool types.EvidencePool + blockStore sm.BlockStore + mempool sm.Mempool + evidencePool sm.EvidencePool consensusState Consensus p2pSwitch P2P @@ -72,15 +72,15 @@ func SetStateDB(db dbm.DB) { stateDB = db } -func SetBlockStore(bs types.BlockStore) { +func SetBlockStore(bs sm.BlockStore) { blockStore = bs } -func SetMempool(mem types.Mempool) { +func SetMempool(mem sm.Mempool) { mempool = mem } -func SetEvidencePool(evpool types.EvidencePool) { +func SetEvidencePool(evpool sm.EvidencePool) { evidencePool = evpool } diff --git a/state/execution.go b/state/execution.go index e5904ecfc..cf27e3958 100644 --- a/state/execution.go +++ b/state/execution.go @@ -29,8 +29,8 @@ type BlockExecutor struct { eventBus types.BlockEventPublisher // update these with block results after commit - mempool types.Mempool - evpool types.EvidencePool + mempool Mempool + evpool EvidencePool logger log.Logger } @@ -38,7 +38,7 @@ type BlockExecutor struct { // NewBlockExecutor returns a new BlockExecutor with a NopEventBus. // Call SetEventBus to provide one. func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, - mempool types.Mempool, evpool types.EvidencePool) *BlockExecutor { + mempool Mempool, evpool EvidencePool) *BlockExecutor { return &BlockExecutor{ db: db, proxyApp: proxyApp, @@ -98,6 +98,9 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b return state, fmt.Errorf("Commit failed for application: %v", err) } + // Update evpool with the block and state. + blockExec.evpool.Update(block, state) + fail.Fail() // XXX // update the app hash and save the state @@ -106,11 +109,6 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b fail.Fail() // XXX - // Update evpool now that state is saved. - // TODO: handle the crash/recover scenario - // ie. (may need to call Update for last block) - blockExec.evpool.Update(block) - // events are fired after everything else // NOTE: if we crash between Commit and Save, events wont be fired during replay fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses) diff --git a/state/execution_test.go b/state/execution_test.go index 09c40b5a0..78966340c 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -34,7 +34,7 @@ func TestApplyBlock(t *testing.T) { state, stateDB := state(), dbm.NewMemDB() blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), - types.MockMempool{}, types.MockEvidencePool{}) + sm.MockMempool{}, sm.MockEvidencePool{}) block := makeBlock(state, 1) blockID := types.BlockID{block.Hash(), block.MakePartSet(testPartSize).Header()} diff --git a/types/services.go b/state/services.go similarity index 57% rename from types/services.go rename to state/services.go index 6b2be8a5c..bef286b20 100644 --- a/types/services.go +++ b/state/services.go @@ -1,11 +1,10 @@ -package types +package state import ( abci "github.com/tendermint/abci/types" + "github.com/tendermint/tendermint/types" ) -// NOTE/XXX: all type definitions in this file are considered UNSTABLE - //------------------------------------------------------ // blockchain services types // NOTE: Interfaces used by RPC must be thread safe! @@ -17,15 +16,14 @@ import ( // Mempool defines the mempool interface as used by the ConsensusState. // Updates to the mempool need to be synchronized with committing a block // so apps can reset their transient state on Commit -// UNSTABLE type Mempool interface { Lock() Unlock() Size() int - CheckTx(Tx, func(*abci.Response)) error - Reap(int) Txs - Update(height int64, txs Txs) error + CheckTx(types.Tx, func(*abci.Response)) error + Reap(int) types.Txs + Update(height int64, txs types.Txs) error Flush() FlushAppConn() error @@ -34,60 +32,55 @@ type Mempool interface { } // MockMempool is an empty implementation of a Mempool, useful for testing. -// UNSTABLE type MockMempool struct { } -func (m MockMempool) Lock() {} -func (m MockMempool) Unlock() {} -func (m MockMempool) Size() int { return 0 } -func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil } -func (m MockMempool) Reap(n int) Txs { return Txs{} } -func (m MockMempool) Update(height int64, txs Txs) error { return nil } -func (m MockMempool) Flush() {} -func (m MockMempool) FlushAppConn() error { return nil } -func (m MockMempool) TxsAvailable() <-chan int64 { return make(chan int64) } -func (m MockMempool) EnableTxsAvailable() {} +func (m MockMempool) Lock() {} +func (m MockMempool) Unlock() {} +func (m MockMempool) Size() int { return 0 } +func (m MockMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) error { return nil } +func (m MockMempool) Reap(n int) types.Txs { return types.Txs{} } +func (m MockMempool) Update(height int64, txs types.Txs) error { return nil } +func (m MockMempool) Flush() {} +func (m MockMempool) FlushAppConn() error { return nil } +func (m MockMempool) TxsAvailable() <-chan int64 { return make(chan int64) } +func (m MockMempool) EnableTxsAvailable() {} //------------------------------------------------------ // blockstore // BlockStoreRPC is the block store interface used by the RPC. -// UNSTABLE type BlockStoreRPC interface { Height() int64 - LoadBlockMeta(height int64) *BlockMeta - LoadBlock(height int64) *Block - LoadBlockPart(height int64, index int) *Part + LoadBlockMeta(height int64) *types.BlockMeta + LoadBlock(height int64) *types.Block + LoadBlockPart(height int64, index int) *types.Part - LoadBlockCommit(height int64) *Commit - LoadSeenCommit(height int64) *Commit + LoadBlockCommit(height int64) *types.Commit + LoadSeenCommit(height int64) *types.Commit } // BlockStore defines the BlockStore interface used by the ConsensusState. -// UNSTABLE type BlockStore interface { BlockStoreRPC - SaveBlock(block *Block, blockParts *PartSet, seenCommit *Commit) + SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) } -//------------------------------------------------------ +//----------------------------------------------------------------------------------------------------- // evidence pool // EvidencePool defines the EvidencePool interface used by the ConsensusState. -// UNSTABLE type EvidencePool interface { - PendingEvidence() []Evidence - AddEvidence(Evidence) error - Update(*Block) + PendingEvidence() []types.Evidence + AddEvidence(types.Evidence) error + Update(*types.Block, State) } // MockMempool is an empty implementation of a Mempool, useful for testing. -// UNSTABLE type MockEvidencePool struct { } -func (m MockEvidencePool) PendingEvidence() []Evidence { return nil } -func (m MockEvidencePool) AddEvidence(Evidence) error { return nil } -func (m MockEvidencePool) Update(*Block) {} +func (m MockEvidencePool) PendingEvidence() []types.Evidence { return nil } +func (m MockEvidencePool) AddEvidence(types.Evidence) error { return nil } +func (m MockEvidencePool) Update(*types.Block, State) {} From 97c5533c35f984284731049f3871d57d6e7fbb8b Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 4 Jun 2018 13:59:28 -0700 Subject: [PATCH 07/13] update some comments --- evidence/pool.go | 8 +++++--- evidence/store.go | 4 ---- state/execution.go | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/evidence/pool.go b/evidence/pool.go index 820fead47..fab29be60 100644 --- a/evidence/pool.go +++ b/evidence/pool.go @@ -69,16 +69,18 @@ func (evpool *EvidencePool) State() sm.State { // Update loads the latest func (evpool *EvidencePool) Update(block *types.Block, state sm.State) { - evpool.mtx.Lock() - defer evpool.mtx.Unlock() // sanity check if state.LastBlockHeight != block.Height { panic(fmt.Sprintf("Failed EvidencePool.Update sanity check: got state.Height=%d with block.Height=%d", state.LastBlockHeight, block.Height)) } + + // update the state + evpool.mtx.Lock() evpool.state = state + evpool.mtx.Unlock() - // NOTE: shouldn't need the mutex + // remove evidence from pending and mark committed evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence) } diff --git a/evidence/store.go b/evidence/store.go index abfc5e1fe..6af5d75d8 100644 --- a/evidence/store.go +++ b/evidence/store.go @@ -17,10 +17,6 @@ Impl: - First commit atomically in outqueue, pending, lookup. - Once broadcast, remove from outqueue. No need to sync - Once committed, atomically remove from pending and update lookup. - - TODO: If we crash after committed but before removing/updating, - we'll be stuck broadcasting evidence we never know we committed. - so either share the state db and atomically MarkCommitted - with ApplyBlock, or check all outqueue/pending on Start to see if its committed Schema for indexing evidence (note you need both height and hash to find a piece of evidence): diff --git a/state/execution.go b/state/execution.go index cf27e3958..770911597 100644 --- a/state/execution.go +++ b/state/execution.go @@ -92,7 +92,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b return state, fmt.Errorf("Commit failed for application: %v", err) } - // lock mempool, commit state, update mempoool + // lock mempool, commit app state, update mempoool appHash, err := blockExec.Commit(block) if err != nil { return state, fmt.Errorf("Commit failed for application: %v", err) From 2007c660911e345d4c0a609a344d9412567434ba Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 4 Jun 2018 14:13:39 -0700 Subject: [PATCH 08/13] fix test --- state/execution_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/state/execution_test.go b/state/execution_test.go index 78966340c..b6c7f9a6c 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -10,11 +10,12 @@ import ( "github.com/tendermint/abci/example/kvstore" abci "github.com/tendermint/abci/types" crypto "github.com/tendermint/go-crypto" - "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" + + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" ) var ( @@ -34,7 +35,7 @@ func TestApplyBlock(t *testing.T) { state, stateDB := state(), dbm.NewMemDB() blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), - sm.MockMempool{}, sm.MockEvidencePool{}) + MockMempool{}, MockEvidencePool{}) block := makeBlock(state, 1) blockID := types.BlockID{block.Hash(), block.MakePartSet(testPartSize).Header()} From 932381effa4f6b3a280f5948f6576aa20b8d43a0 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 4 Jun 2018 17:38:44 -0700 Subject: [PATCH 09/13] evidence: give each peer a go-routine --- evidence/pool.go | 54 ++++++++++++----- evidence/pool_test.go | 20 +++---- evidence/reactor.go | 121 +++++++++++++++++++++------------------ evidence/reactor_test.go | 7 +-- 4 files changed, 117 insertions(+), 85 deletions(-) diff --git a/evidence/pool.go b/evidence/pool.go index fab29be60..b24537776 100644 --- a/evidence/pool.go +++ b/evidence/pool.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" + clist "github.com/tendermint/tmlibs/clist" dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" @@ -17,6 +18,7 @@ type EvidencePool struct { logger log.Logger evidenceStore *EvidenceStore + evidenceList *clist.CList // concurrent linked-list of evidence // needed to load validators to verify evidence stateDB dbm.DB @@ -24,9 +26,6 @@ type EvidencePool struct { // latest state mtx sync.Mutex state sm.State - - // never close - evidenceChan chan types.Evidence } func NewEvidencePool(stateDB dbm.DB, evidenceStore *EvidenceStore) *EvidencePool { @@ -35,21 +34,24 @@ func NewEvidencePool(stateDB dbm.DB, evidenceStore *EvidenceStore) *EvidencePool state: sm.LoadState(stateDB), logger: log.NewNopLogger(), evidenceStore: evidenceStore, - evidenceChan: make(chan types.Evidence), + evidenceList: clist.New(), } return evpool } +func (evpool *EvidencePool) EvidenceFront() *clist.CElement { + return evpool.evidenceList.Front() +} + +func (evpool *EvidencePool) EvidenceWaitChan() <-chan struct{} { + return evpool.evidenceList.WaitChan() +} + // SetLogger sets the Logger. func (evpool *EvidencePool) SetLogger(l log.Logger) { evpool.logger = l } -// EvidenceChan returns an unbuffered channel on which new evidence can be received. -func (evpool *EvidencePool) EvidenceChan() <-chan types.Evidence { - return evpool.evidenceChan -} - // PriorityEvidence returns the priority evidence. func (evpool *EvidencePool) PriorityEvidence() []types.Evidence { return evpool.evidenceStore.PriorityEvidence() @@ -81,11 +83,10 @@ func (evpool *EvidencePool) Update(block *types.Block, state sm.State) { evpool.mtx.Unlock() // remove evidence from pending and mark committed - evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence) + evpool.MarkEvidenceAsCommitted(block.Height, block.Evidence.Evidence) } // AddEvidence checks the evidence is valid and adds it to the pool. -// Blocks on the EvidenceChan. func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { // TODO: check if we already have evidence for this @@ -109,14 +110,39 @@ func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { evpool.logger.Info("Verified new evidence of byzantine behaviour", "evidence", evidence) - // never closes. always safe to send on - evpool.evidenceChan <- evidence + // add evidence to clist + evpool.evidenceList.PushBack(evidence) + return nil } // MarkEvidenceAsCommitted marks all the evidence as committed. -func (evpool *EvidencePool) MarkEvidenceAsCommitted(evidence []types.Evidence) { +func (evpool *EvidencePool) MarkEvidenceAsCommitted(height int64, evidence []types.Evidence) { + blockEvidenceMap := make(map[string]struct{}) for _, ev := range evidence { evpool.evidenceStore.MarkEvidenceAsCommitted(ev) + blockEvidenceMap[ev.String()] = struct{}{} + } + + maxAge := evpool.State().ConsensusParams.EvidenceParams.MaxAge + + // remove committed evidence from the clist + evpool.filterEvidence(height, maxAge, blockEvidenceMap) + +} + +func (evpool *EvidencePool) filterEvidence(height, maxAge int64, blockEvidenceMap map[string]struct{}) { + for e := evpool.evidenceList.Front(); e != nil; e = e.Next() { + ev := e.Value.(types.Evidence) + + // Remove the evidence if it's already in a block + // or if it's now too old. + if _, ok := blockEvidenceMap[ev.String()]; ok || + ev.Height() < height-maxAge { + + // remove from clist + evpool.evidenceList.Remove(e) + e.DetachPrev() + } } } diff --git a/evidence/pool_test.go b/evidence/pool_test.go index 97a29a278..019076234 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -45,7 +45,6 @@ func initializeValidatorState(valAddr []byte, height int64) dbm.DB { } func TestEvidencePool(t *testing.T) { - assert := assert.New(t) valAddr := []byte("val1") height := int64(5) @@ -56,26 +55,25 @@ func TestEvidencePool(t *testing.T) { goodEvidence := types.NewMockGoodEvidence(height, 0, valAddr) badEvidence := types.MockBadEvidence{goodEvidence} + // bad evidence err := pool.AddEvidence(badEvidence) - assert.NotNil(err) + assert.NotNil(t, err) var wg sync.WaitGroup wg.Add(1) go func() { - <-pool.EvidenceChan() + <-pool.EvidenceWaitChan() wg.Done() }() err = pool.AddEvidence(goodEvidence) - assert.Nil(err) + assert.Nil(t, err) wg.Wait() - // if we send it again it wont fire on the chan + assert.Equal(t, 1, pool.evidenceList.Len()) + + // if we send it again, it shouldnt change the size err = pool.AddEvidence(goodEvidence) - assert.Nil(err) - select { - case <-pool.EvidenceChan(): - t.Fatal("unexpected read on EvidenceChan") - default: - } + assert.Nil(t, err) + assert.Equal(t, 1, pool.evidenceList.Len()) } diff --git a/evidence/reactor.go b/evidence/reactor.go index 625663df2..602edc0c9 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -6,9 +6,9 @@ import ( "time" "github.com/tendermint/go-amino" + clist "github.com/tendermint/tmlibs/clist" "github.com/tendermint/tmlibs/log" - cstypes "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -16,8 +16,10 @@ import ( const ( EvidenceChannel = byte(0x38) - maxMsgSize = 1048576 // 1MB TODO make it configurable - broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often + maxMsgSize = 1048576 // 1MB TODO make it configurable + + broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often + peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount ) // EvidenceReactor handles evpool evidence broadcasting amongst peers. @@ -47,7 +49,6 @@ func (evR *EvidenceReactor) OnStart() error { if err := evR.BaseReactor.OnStart(); err != nil { return err } - go evR.broadcastRoutine() return nil } @@ -64,14 +65,7 @@ func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) { - // send the peer our high-priority evidence. - // the rest will be sent by the broadcastRoutine - evidences := evR.evpool.PriorityEvidence() - msg := &EvidenceListMessage{evidences} - success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) - if !success { - // TODO: remove peer ? - } + go evR.broadcastEvidenceRoutine(peer) } // RemovePeer implements Reactor. @@ -110,63 +104,80 @@ func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) { evR.eventBus = b } -// Broadcast new evidence to all peers. -// Broadcasts must be non-blocking so routine is always available to read off EvidenceChan. -func (evR *EvidenceReactor) broadcastRoutine() { - ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS) +// Modeled after the mempool routine. +// - Evidence accumulates in a clist. +// - Each peer has a routien that iterates through the clist, +// sending available evidence to the peer. +// - If we're waiting for new evidence and the list is not empty, +// start iterating from the beginning again. +func (evR *EvidenceReactor) broadcastEvidenceRoutine(peer p2p.Peer) { + var next *clist.CElement for { - select { - case evidence := <-evR.evpool.EvidenceChan(): - // broadcast some new evidence - msg := &EvidenceListMessage{[]types.Evidence{evidence}} - evR.broadcastEvidenceListMsg(msg) - - // TODO: the broadcast here is just doing TrySend. - // We should make sure the send succeeds before marking broadcasted. - evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence) - case <-ticker.C: - // broadcast all pending evidence - msg := &EvidenceListMessage{evR.evpool.PendingEvidence()} - evR.broadcastEvidenceListMsg(msg) - case <-evR.Quit(): - return - } - } -} - -func (evR *EvidenceReactor) broadcastEvidenceListMsg(msg *EvidenceListMessage) { - // NOTE: we dont send evidence to peers higher than their height, - // because they can't validate it (don't have validators from the height). - // So, for now, only send the `msg` to peers synced to the highest height in the list. - // TODO: send each peer all the evidence below its current height within maxAge - - // might require a routine per peer, like the mempool. - - var maxHeight int64 - for _, ev := range msg.Evidence { - if ev.Height() > maxHeight { - maxHeight = ev.Height() + // This happens because the CElement we were looking at got garbage + // collected (removed). That is, .NextWait() returned nil. Go ahead and + // start from the beginning. + if next == nil { + select { + case <-evR.evpool.EvidenceWaitChan(): // Wait until evidence is available + if next = evR.evpool.EvidenceFront(); next == nil { + continue + } + case <-peer.Quit(): + return + case <-evR.Quit(): + return + } } - } - for _, peer := range evR.Switch.Peers().List() { - ps, ok := peer.Get(types.PeerStateKey).(PeerState) + ev := next.Value.(types.Evidence) + // make sure the peer is up to date + height := ev.Height() + peerState, ok := peer.Get(types.PeerStateKey).(PeerState) if !ok { evR.Logger.Info("Found peer without PeerState", "peer", peer) + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue + } - // only send to peer if maxHeight < peerHeight < maxHeight + maxAge + // NOTE: We only send evidence to peers where + // peerHeight - maxAge < evidenceHeight < peerHeight maxAge := evR.evpool.State().ConsensusParams.EvidenceParams.MaxAge - rs := ps.GetRoundState() - if rs.Height >= maxHeight && - rs.Height < maxAge+maxHeight { - peer.TrySend(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) + peerHeight := peerState.GetHeight() + if peerHeight < height || + peerHeight > height+maxAge { + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + continue + } + + // send evidence + msg := &EvidenceListMessage{[]types.Evidence{ev}} + success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) + if !success { + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + continue + } + + afterCh := time.After(time.Second * broadcastEvidenceIntervalS) + select { + case <-afterCh: + // start from the beginning every tick. + // TODO: only do this if we're at the end of the list! + next = nil + case <-next.NextWaitChan(): + // see the start of the for loop for nil check + next = next.Next() + case <-peer.Quit(): + return + case <-evR.Quit(): + return } } } +// PeerState describes the state of a peer. type PeerState interface { - GetRoundState() *cstypes.PeerRoundState + GetHeight() int64 } //----------------------------------------------------------------------------- diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 6b4b24a07..c7034c321 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -14,7 +14,6 @@ import ( "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" - cstypes "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -149,10 +148,8 @@ type peerState struct { height int64 } -func (ps peerState) GetRoundState() *cstypes.PeerRoundState { - return &cstypes.PeerRoundState{ - Height: ps.height, - } +func (ps peerState) GetHeight() int64 { + return ps.height } func TestReactorSelectiveBroadcast(t *testing.T) { From 566024b64f348a2f559e04577bc541d6586826c8 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 4 Jun 2018 21:50:29 -0700 Subject: [PATCH 10/13] use Hash as map key --- evidence/pool.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/evidence/pool.go b/evidence/pool.go index b24537776..4bad355f7 100644 --- a/evidence/pool.go +++ b/evidence/pool.go @@ -116,28 +116,28 @@ func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { return nil } -// MarkEvidenceAsCommitted marks all the evidence as committed. +// MarkEvidenceAsCommitted marks all the evidence as committed and removes it from the queue. func (evpool *EvidencePool) MarkEvidenceAsCommitted(height int64, evidence []types.Evidence) { + // make a map of committed evidence to remove from the clist blockEvidenceMap := make(map[string]struct{}) for _, ev := range evidence { evpool.evidenceStore.MarkEvidenceAsCommitted(ev) - blockEvidenceMap[ev.String()] = struct{}{} + blockEvidenceMap[evMapKey(ev)] = struct{}{} } - maxAge := evpool.State().ConsensusParams.EvidenceParams.MaxAge - // remove committed evidence from the clist - evpool.filterEvidence(height, maxAge, blockEvidenceMap) + maxAge := evpool.State().ConsensusParams.EvidenceParams.MaxAge + evpool.removeEvidence(height, maxAge, blockEvidenceMap) } -func (evpool *EvidencePool) filterEvidence(height, maxAge int64, blockEvidenceMap map[string]struct{}) { +func (evpool *EvidencePool) removeEvidence(height, maxAge int64, blockEvidenceMap map[string]struct{}) { for e := evpool.evidenceList.Front(); e != nil; e = e.Next() { ev := e.Value.(types.Evidence) // Remove the evidence if it's already in a block // or if it's now too old. - if _, ok := blockEvidenceMap[ev.String()]; ok || + if _, ok := blockEvidenceMap[evMapKey(ev)]; ok || ev.Height() < height-maxAge { // remove from clist @@ -146,3 +146,7 @@ func (evpool *EvidencePool) filterEvidence(height, maxAge int64, blockEvidenceMa } } } + +func evMapKey(ev types.Evidence) string { + return string(ev.Hash()) +} From 1b2e34738a4f84d2e554fedf38a7e011267ce37e Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 4 Jun 2018 21:50:42 -0700 Subject: [PATCH 11/13] checkSendEvidenceMessage --- evidence/reactor.go | 59 ++++++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/evidence/reactor.go b/evidence/reactor.go index 602edc0c9..ef04a5efe 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -130,30 +130,13 @@ func (evR *EvidenceReactor) broadcastEvidenceRoutine(peer p2p.Peer) { } ev := next.Value.(types.Evidence) - // make sure the peer is up to date - height := ev.Height() - peerState, ok := peer.Get(types.PeerStateKey).(PeerState) - if !ok { - evR.Logger.Info("Found peer without PeerState", "peer", peer) - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue - - } - - // NOTE: We only send evidence to peers where - // peerHeight - maxAge < evidenceHeight < peerHeight - maxAge := evR.evpool.State().ConsensusParams.EvidenceParams.MaxAge - peerHeight := peerState.GetHeight() - if peerHeight < height || - peerHeight > height+maxAge { - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue + msg, retry := evR.checkSendEvidenceMessage(peer, ev) + if msg != nil { + success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) + retry = !success } - // send evidence - msg := &EvidenceListMessage{[]types.Evidence{ev}} - success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) - if !success { + if retry { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } @@ -175,6 +158,38 @@ func (evR *EvidenceReactor) broadcastEvidenceRoutine(peer p2p.Peer) { } } +// Returns the message to send the peer, or nil if the evidence is invalid for the peer. +// If message is nil, return true if we should sleep and try again. +func (evR EvidenceReactor) checkSendEvidenceMessage(peer p2p.Peer, ev types.Evidence) (msg EvidenceMessage, retry bool) { + + // make sure the peer is up to date + evHeight := ev.Height() + peerState, ok := peer.Get(types.PeerStateKey).(PeerState) + if !ok { + evR.Logger.Info("Found peer without PeerState", "peer", peer) + return nil, true + } + + // NOTE: We only send evidence to peers where + // peerHeight - maxAge < evidenceHeight < peerHeight + maxAge := evR.evpool.State().ConsensusParams.EvidenceParams.MaxAge + peerHeight := peerState.GetHeight() + if peerHeight < evHeight { + // peer is behind. sleep while he catches up + return nil, true + } else if peerHeight > evHeight+maxAge { + // evidence is too old, skip + // NOTE: if evidence is too old for an honest peer, + // then we're behind and either it already got committed or it never will! + evR.Logger.Info("Not sending peer old evidence", "peerHeight", peerHeight, "evHeight", evHeight, "maxAge", maxAge, "peer", peer) + return nil, false + } + + // send evidence + msg = &EvidenceListMessage{[]types.Evidence{ev}} + return msg, false +} + // PeerState describes the state of a peer. type PeerState interface { GetHeight() int64 From 55bae62d71d9194c248de0654a63802186b4c0a5 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 5 Jun 2018 16:54:58 -0700 Subject: [PATCH 12/13] fix test --- evidence/reactor_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index c7034c321..2f1c34e6e 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -84,7 +84,7 @@ func _waitForEvidence(t *testing.T, wg *sync.WaitGroup, evs types.EvidenceList, } reapedEv := evpool.PendingEvidence() - // put the reaped evidence is a map so we can quickly check we got everything + // put the reaped evidence in a map so we can quickly check we got everything evMap := make(map[string]types.Evidence) for _, e := range reapedEv { evMap[string(e.Hash())] = e @@ -95,6 +95,7 @@ func _waitForEvidence(t *testing.T, wg *sync.WaitGroup, evs types.EvidenceList, fmt.Sprintf("evidence at index %d on reactor %d don't match: %v vs %v", i, reactorIdx, expectedEv, gotEv)) } + wg.Done() } @@ -110,7 +111,7 @@ func sendEvidence(t *testing.T, evpool *EvidencePool, valAddr []byte, n int) typ } var ( - NUM_EVIDENCE = 1 + NUM_EVIDENCE = 10 TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow ) @@ -174,4 +175,8 @@ func TestReactorSelectiveBroadcast(t *testing.T) { // only ones less than the peers height should make it through waitForEvidence(t, evList[:NUM_EVIDENCE/2], reactors[1:2]) + + // peers should still be connected + peers := reactors[1].Switch.Peers().List() + assert.Equal(t, 1, len(peers)) } From 41e847ec97fa33f9d0fb2103eb1f4c6f99f43da0 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 5 Jun 2018 17:02:05 -0700 Subject: [PATCH 13/13] linter --- evidence/reactor.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/evidence/reactor.go b/evidence/reactor.go index ef04a5efe..5159572e3 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -46,10 +46,7 @@ func (evR *EvidenceReactor) SetLogger(l log.Logger) { // OnStart implements cmn.Service func (evR *EvidenceReactor) OnStart() error { - if err := evR.BaseReactor.OnStart(); err != nil { - return err - } - return nil + return evR.BaseReactor.OnStart() } // GetChannels implements Reactor.