From 8ec1839f5d15ea65bce77fda2790935841a5b3cf Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 11 Sep 2016 13:16:23 -0400 Subject: [PATCH] save block b4 apply; track stale apphash --- consensus/state.go | 33 +++---- mempool/mempool.go | 3 +- state/execution.go | 213 +++++++++++++++++++++++++-------------------- state/state.go | 23 +++-- 4 files changed, 157 insertions(+), 115 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index e7a06efde..4e06dc0db 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1255,33 +1255,34 @@ func (cs *ConsensusState) finalizeCommit(height int) { "height", block.Height, "hash", block.Hash(), "root", block.AppHash) log.Info(Fmt("%v", block)) - // Fire off event for new block. - // TODO: Handle app failure. See #177 - types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) - types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) + // Save to blockStore. + if cs.blockStore.Height() < block.Height { + precommits := cs.Votes.Precommits(cs.CommitRound) + seenCommit := precommits.MakeCommit() + log.Notice("save block", "height", block.Height) + cs.blockStore.SaveBlock(block, blockParts, seenCommit) + } else { + log.Warn("Why are we finalizeCommitting a block height we already have?", "height", block.Height) + } // Create a copy of the state for staging + // and an event cache for txs stateCopy := cs.state.Copy() // event cache for txs eventCache := types.NewEventCache(cs.evsw) - // Execute and commit the block - // NOTE: All calls to the proxyAppConn should come here + // Execute and commit the block, and update the mempool. + // All calls to the proxyAppConn should come here. + // NOTE: the block.AppHash wont reflect these txs until the next block stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) - // txs committed, bad ones removed from mepool; fire events - // NOTE: the block.AppHash wont reflect these txs until the next block + // Fire off event for new block. + // TODO: Handle app failure. See #177 + types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) + types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) eventCache.Flush() - // Save to blockStore. - if cs.blockStore.Height() < block.Height { - precommits := cs.Votes.Precommits(cs.CommitRound) - seenCommit := precommits.MakeCommit() - log.Notice("save block", "height", block.Height) - cs.blockStore.SaveBlock(block, blockParts, seenCommit) - } - // Save the state. log.Notice("save state", "height", stateCopy.LastBlockHeight, "hash", stateCopy.AppHash) stateCopy.Save() diff --git a/mempool/mempool.go b/mempool/mempool.go index a5426991e..80841b171 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -282,8 +282,7 @@ func (mem *Mempool) collectTxs(maxTxs int) []types.Tx { // NOTE: this should be called *after* block is committed by consensus. // NOTE: unsafe; Lock/Unlock must be managed by caller func (mem *Mempool) Update(height int, txs []types.Tx) { - // mem.proxyMtx.Lock() - // defer mem.proxyMtx.Unlock() + // TODO: check err ? mem.proxyAppConn.FlushSync() // To flush async resCb calls e.g. from CheckTx // First, create a lookup map of txns in new txs. diff --git a/state/execution.go b/state/execution.go index afee2753c..4d570f0b5 100644 --- a/state/execution.go +++ b/state/execution.go @@ -2,7 +2,6 @@ package state import ( "errors" - "fmt" . "github.com/tendermint/go-common" "github.com/tendermint/tendermint/proxy" @@ -10,10 +9,13 @@ import ( tmsp "github.com/tendermint/tmsp/types" ) -// Validate block -func (s *State) ValidateBlock(block *types.Block) error { - return s.validateBlock(block) -} +//-------------------------------------------------- +// Execute the block + +type ( + ErrInvalidBlock error + ErrProxyAppConn error +) // Execute the block to mutate State. // Validates block and then executes Data.Txs in the block. @@ -22,7 +24,7 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC // Validate the block. err := s.validateBlock(block) if err != nil { - return err + return ErrInvalidBlock(err) } // Update the validator set @@ -37,7 +39,7 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC if err != nil { // There was some error in proxyApp // TODO Report error and wait for proxyApp to be available. - return err + return ErrProxyAppConn(err) } // All good! @@ -45,6 +47,10 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC nextValSet.IncrementAccum(1) s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet) + // save state with updated height/blockhash/validators + // but stale apphash, in case we fail between Commit and Save + s.Save() + return nil } @@ -113,33 +119,6 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox return nil } -func (s *State) validateBlock(block *types.Block) error { - // Basic block validation. - err := block.ValidateBasic(s.ChainID, s.LastBlockHeight, s.LastBlockID, s.LastBlockTime, s.AppHash) - if err != nil { - return err - } - - // Validate block LastCommit. - if block.Height == 1 { - if len(block.LastCommit.Precommits) != 0 { - return errors.New("Block at height 1 (first block) should have no LastCommit precommits") - } - } else { - if len(block.LastCommit.Precommits) != s.LastValidators.Size() { - return fmt.Errorf("Invalid block commit size. Expected %v, got %v", - s.LastValidators.Size(), len(block.LastCommit.Precommits)) - } - err := s.LastValidators.VerifyCommit( - s.ChainID, s.LastBlockID, block.Height-1, block.LastCommit) - if err != nil { - return err - } - } - - return nil -} - // Updates the LastCommitHeight of the validators in valSet, in place. // Assumes that lastValSet matches the valset of block.LastCommit // CONTRACT: lastValSet is not mutated. @@ -168,18 +147,62 @@ func updateValidatorsWithBlock(lastValSet *types.ValidatorSet, valSet *types.Val } -//----------------------------------------------------------------------------- +//----------------------------------------------------- +// Validate block -type InvalidTxError struct { - Tx types.Tx - Code tmsp.CodeType +func (s *State) ValidateBlock(block *types.Block) error { + return s.validateBlock(block) } -func (txErr InvalidTxError) Error() string { - return Fmt("Invalid tx: [%v] code: [%v]", txErr.Tx, txErr.Code) +func (s *State) validateBlock(block *types.Block) error { + // Basic block validation. + err := block.ValidateBasic(s.ChainID, s.LastBlockHeight, s.LastBlockID, s.LastBlockTime, s.AppHash) + if err != nil { + return err + } + + // Validate block LastCommit. + if block.Height == 1 { + if len(block.LastCommit.Precommits) != 0 { + return errors.New("Block at height 1 (first block) should have no LastCommit precommits") + } + } else { + if len(block.LastCommit.Precommits) != s.LastValidators.Size() { + return errors.New(Fmt("Invalid block commit size. Expected %v, got %v", + s.LastValidators.Size(), len(block.LastCommit.Precommits))) + } + err := s.LastValidators.VerifyCommit( + s.ChainID, s.LastBlockID, block.Height-1, block.LastCommit) + if err != nil { + return err + } + } + + return nil } //----------------------------------------------------------------------------- +// ApplyBlock executes the block, then commits and updates the mempool atomically + +// Execute and commit block against app, save block and state +func (s *State) ApplyBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, + block *types.Block, partsHeader types.PartSetHeader, mempool Mempool) error { + + // Run the block on the State: + // + update validator sets + // + run txs on the proxyAppConn + err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader) + if err != nil { + return errors.New(Fmt("Exec failed for application: %v", err)) + } + + // lock mempool, commit state, update mempoool + err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool) + if err != nil { + return errors.New(Fmt("Commit failed for application: %v", err)) + } + return nil +} // mempool must be locked during commit and update // because state is typically reset on Commit and old txs must be replayed @@ -188,9 +211,6 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl mempool.Lock() defer mempool.Unlock() - // flush out any CheckTx that have already started - // cs.proxyAppConn.FlushSync() // ?! XXX - // Commit block, get hash back res := proxyAppConn.CommitSync() if res.IsErr() { @@ -210,25 +230,40 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl return nil } -// Execute and commit block against app, save block and state -func (s *State) ApplyBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, - block *types.Block, partsHeader types.PartSetHeader, mempool Mempool) { +// Updates to the mempool need to be synchronized with committing a block +// so apps can reset their transient state on Commit +type Mempool interface { + Lock() + Unlock() + Update(height int, txs []types.Tx) +} - // Run the block on the State: - // + update validator sets - // + run txs on the proxyAppConn - err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader) - if err != nil { - // TODO: handle this gracefully. - PanicQ(Fmt("Exec failed for application: %v", err)) - } +type mockMempool struct { +} - // lock mempool, commit state, update mempoool - err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool) - if err != nil { - // TODO: handle this gracefully. - PanicQ(Fmt("Commit failed for application: %v", err)) - } +func (m mockMempool) Lock() {} +func (m mockMempool) Unlock() {} +func (m mockMempool) Update(height int, txs []types.Tx) {} + +//---------------------------------------------------------------- +// Replay blocks to sync app to latest state of core + +type ErrAppBlockHeightTooHigh struct { + coreHeight int + appHeight int +} + +func (e ErrAppBlockHeightTooHigh) Error() string { + return Fmt("App block height (%d) is higher than core (%d)", e.appHeight, e.coreHeight) +} + +type ErrStateMismatch struct { + got *State + expected *State +} + +func (e ErrStateMismatch) Error() string { + return Fmt("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", e.got, e.expected) } // Replay all blocks after blockHeight and ensure the result matches the current state. @@ -241,34 +276,45 @@ func (s *State) ReplayBlocks(appHash []byte, header *types.Header, partsHeader t // it should save all eg. valset changes before calling Commit. // then, if tm state is behind app state, the only thing missing can be app hash - // fresh state to work on + // get a fresh state and reset to the apps latest stateCopy := s.Copy() - - // reset to this height (do nothing if its 0) - var blockHeight int if header != nil { - blockHeight = header.Height // TODO: put validators in iavl tree so we can set the state with an older validator set lastVals, nextVals := stateCopy.GetValidators() stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals) + stateCopy.Stale = false stateCopy.AppHash = appHash } - // run the transactions - var eventCache events.Fireable // nil - - // replay all blocks starting with blockHeight+1 - for i := blockHeight + 1; i <= blockStore.Height(); i++ { - blockMeta := blockStore.LoadBlockMeta(i) - block := blockStore.LoadBlock(i) - panicOnNilBlock(i, blockStore.Height(), block, blockMeta) // XXX + appBlockHeight := stateCopy.LastBlockHeight + coreBlockHeight := blockStore.Height() + if coreBlockHeight < appBlockHeight { + return ErrAppBlockHeightTooHigh{coreBlockHeight, appBlockHeight} + + } else if coreBlockHeight == appBlockHeight { + // if we crashed between Commit and SaveState, + // the state's app hash is stale + if s.Stale { + s.Stale = false + s.AppHash = appHash + } - stateCopy.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{}) + } else { + // the app is behind. + // replay all blocks starting with appBlockHeight+1 + for i := appBlockHeight + 1; i <= coreBlockHeight; i++ { + blockMeta := blockStore.LoadBlockMeta(i) + block := blockStore.LoadBlock(i) + panicOnNilBlock(i, coreBlockHeight, block, blockMeta) // XXX + + var eventCache events.Fireable // nil + stateCopy.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{}) + } } // The computed state and the previously set state should be identical if !s.Equals(stateCopy) { - return fmt.Errorf("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", stateCopy, s) + return ErrStateMismatch{stateCopy, s} } return nil } @@ -284,20 +330,3 @@ BlockMeta: %v } } - -//------------------------------------------------ -// Updates to the mempool need to be synchronized with committing a block -// so apps can reset their transient state on Commit - -type Mempool interface { - Lock() - Unlock() - Update(height int, txs []types.Tx) -} - -type mockMempool struct { -} - -func (m mockMempool) Lock() {} -func (m mockMempool) Unlock() {} -func (m mockMempool) Update(height int, txs []types.Tx) {} diff --git a/state/state.go b/state/state.go index 289c7a4d8..e1c6f88a5 100644 --- a/state/state.go +++ b/state/state.go @@ -21,16 +21,25 @@ var ( // NOTE: not goroutine-safe. type State struct { - mtx sync.Mutex - db dbm.DB - GenesisDoc *types.GenesisDoc - ChainID string + // mtx for writing to db + mtx sync.Mutex + db dbm.DB + + // should not change + GenesisDoc *types.GenesisDoc + ChainID string + + // updated at end of ExecBlock LastBlockHeight int // Genesis state has this set to 0. So, Block(H=0) does not exist. LastBlockID types.BlockID LastBlockTime time.Time Validators *types.ValidatorSet LastValidators *types.ValidatorSet - AppHash []byte + + // AppHash is updated after Commit; + // it's stale after ExecBlock and before Commit + Stale bool + AppHash []byte } func LoadState(db dbm.DB) *State { @@ -60,6 +69,7 @@ func (s *State) Copy() *State { LastBlockTime: s.LastBlockTime, Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), + Stale: s.Stale, // but really state shouldnt be copied while its stale AppHash: s.AppHash, } } @@ -84,12 +94,15 @@ func (s *State) Bytes() []byte { } // Mutate state variables to match block and validators +// Since we don't have the AppHash yet, it becomes stale func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) { s.LastBlockHeight = header.Height s.LastBlockID = types.BlockID{block.Hash(), blockPartsHeader} s.LastBlockTime = header.Time s.Validators = nextValSet s.LastValidators = prevValSet + + s.Stale = true } func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) {