From bc678596729376465f56ac0a339a6799eef38e0a Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 20 Feb 2017 16:24:35 -0500 Subject: [PATCH] make ReplayBlocks logic exhaustive --- consensus/replay_test.go | 27 +++++--- consensus/state.go | 11 ++-- state/execution.go | 139 ++++++++++++++++++++++++++------------- 3 files changed, 118 insertions(+), 59 deletions(-) diff --git a/consensus/replay_test.go b/consensus/replay_test.go index a9123a8e4..e8aa9e903 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -15,7 +15,7 @@ import ( "github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/abci/example/dummy" - . "github.com/tendermint/go-common" + cmn "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" @@ -37,7 +37,7 @@ func init() { // the `Handshake Tests` are for failures in applying the block. // With the help of the WAL, we can recover from it all! -var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data") +var data_dir = path.Join(cmn.GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data") //------------------------------------------------------------------------------------------ // WAL Tests @@ -68,7 +68,7 @@ type testCase struct { func newTestCase(name string, stepChanges []int) *testCase { if len(stepChanges) != 3 { - panic(Fmt("a full wal has 3 step changes! Got array %v", stepChanges)) + panic(cmn.Fmt("a full wal has 3 step changes! Got array %v", stepChanges)) } return &testCase{ name: name, @@ -103,15 +103,15 @@ func readWAL(p string) string { func writeWAL(walMsgs string) string { tempDir := os.TempDir() - walDir := path.Join(tempDir, "/wal"+RandStr(12)) + walDir := path.Join(tempDir, "/wal"+cmn.RandStr(12)) walFile := path.Join(walDir, "wal") // Create WAL directory - err := EnsureDir(walDir, 0700) + err := cmn.EnsureDir(walDir, 0700) if err != nil { panic(err) } // Write the needed WAL to file - err = WriteFile(walFile, []byte(walMsgs), 0600) + err = cmn.WriteFile(walFile, []byte(walMsgs), 0600) if err != nil { panic(err) } @@ -123,7 +123,7 @@ func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) { select { case <-newBlockCh: case <-after: - panic(Fmt("Timed out waiting for new block for case '%s' line %d", thisCase.name, i)) + panic(cmn.Fmt("Timed out waiting for new block for case '%s' line %d", thisCase.name, i)) } } @@ -156,7 +156,7 @@ func toPV(pv PrivValidator) *types.PrivValidator { func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) { fmt.Println("-------------------------------------") - log.Notice(Fmt("Starting replay test %v (of %d lines of WAL). Crash after = %v", thisCase.name, nLines, crashAfter)) + log.Notice(cmn.Fmt("Starting replay test %v (of %d lines of WAL). Crash after = %v", thisCase.name, nLines, crashAfter)) lineStep := nLines if crashAfter { @@ -285,8 +285,15 @@ func TestHandshakeReplayNone(t *testing.T) { // Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart the app and sync it up with the remaining blocks func testHandshakeReplay(t *testing.T, nBlocks int) { config := tendermint_test.ResetConfig("proxy_test_") - walFile := path.Join(data_dir, "many_blocks.cswal") + + // copy the many_blocks file + walBody, err := cmn.ReadFile(path.Join(data_dir, "many_blocks.cswal")) + if err != nil { + t.Fatal(err) + } + walFile := writeWAL(string(walBody)) config.Set("cs_wal_file", walFile) + privVal := types.LoadPrivValidator(config.GetString("priv_validator_file")) testPartSize = config.GetInt("block_part_size") @@ -371,7 +378,7 @@ func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) { return nil, nil, err } if !found { - return nil, nil, errors.New(Fmt("WAL does not contain height %d.", 1)) + return nil, nil, errors.New(cmn.Fmt("WAL does not contain height %d.", 1)) } defer gr.Close() diff --git a/consensus/state.go b/consensus/state.go index 1c864c6d1..c077c412c 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -253,7 +253,8 @@ type ConsensusState struct { done chan struct{} } -func ReplayLastBlock(config cfg.Config, state *sm.State, proxyApp proxy.AppConnConsensus, blockStore sm.BlockStore) { +// Replay the last block through the consensus and return the AppHash after commit. +func ReplayLastBlock(config cfg.Config, state *sm.State, proxyApp proxy.AppConnConsensus, blockStore sm.BlockStore) ([]byte, error) { mempool := sm.MockMempool{} cs := NewConsensusState(config, state, proxyApp, blockStore, mempool) @@ -265,8 +266,10 @@ func ReplayLastBlock(config cfg.Config, state *sm.State, proxyApp proxy.AppConnC // run through the WAL, commit new block, stop cs.Start() - <-newBlockCh + <-newBlockCh // TODO: use a timeout and return err? cs.Stop() + + return cs.state.AppHash, nil } func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore sm.BlockStore, mempool sm.Mempool) *ConsensusState { @@ -1235,7 +1238,7 @@ func (cs *ConsensusState) finalizeCommit(height int) { cs.blockStore.SaveBlock(block, blockParts, seenCommit) } else { // Happens during replay if we already saved the block but didn't commit - log.Notice("Calling finalizeCommit on already stored block", "height", block.Height) + log.Info("Calling finalizeCommit on already stored block", "height", block.Height) } fail.Fail() // XXX @@ -1250,7 +1253,7 @@ func (cs *ConsensusState) finalizeCommit(height int) { // NOTE: the block.AppHash wont reflect these txs until the next block err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) if err != nil { - log.Warn("Error on ApplyBlock. Did the application crash? Please restart tendermint", "error", err) + log.Error("Error on ApplyBlock. Did the application crash? Please restart tendermint", "error", err) return } diff --git a/state/execution.go b/state/execution.go index 71c2dd759..7677a672a 100644 --- a/state/execution.go +++ b/state/execution.go @@ -260,6 +260,7 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl log.Debug("Commit.Log: " + res.Log) } + log.Info("Committed state", "hash", res.Data) // Set the state's new AppHash s.AppHash = res.Data @@ -327,7 +328,8 @@ type BlockStore interface { LoadSeenCommit(height int) *types.Commit } -type blockReplayFunc func(cfg.Config, *State, proxy.AppConnConsensus, BlockStore) +// returns the apphash from Commit +type blockReplayFunc func(cfg.Config, *State, proxy.AppConnConsensus, BlockStore) ([]byte, error) type Handshaker struct { config cfg.Config @@ -362,73 +364,120 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { // TODO: check version // replay blocks up to the latest in the blockstore - err = h.ReplayBlocks(appHash, blockHeight, proxyApp) + appHash, err = h.ReplayBlocks(appHash, blockHeight, proxyApp) if err != nil { return errors.New(Fmt("Error on replay: %v", err)) } + // NOTE: the h.state may now be behind the cs state + // TODO: (on restart) replay mempool return nil } // Replay all blocks after blockHeight and ensure the result matches the current state. -func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp proxy.AppConns) error { +// Returns the final AppHash or an error +func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp proxy.AppConns) ([]byte, error) { storeBlockHeight := h.store.Height() stateBlockHeight := h.state.LastBlockHeight log.Notice("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight) + // First handle edge cases and constraints on the storeBlockHeight if storeBlockHeight == 0 { - return nil + return nil, nil + } else if storeBlockHeight < appBlockHeight { - // if the app is ahead, there's nothing we can do - return ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight} - - } else if storeBlockHeight == appBlockHeight && storeBlockHeight == stateBlockHeight { - // all good! - return nil - - } else if storeBlockHeight == appBlockHeight && storeBlockHeight == stateBlockHeight+1 { - // We already ran Commit, but didn't save the state, so run through consensus with mock app - mockApp := newMockProxyApp(appHash) - log.Info("Replay last block using mock app") - h.replayLastBlock(h.config, h.state, mockApp, h.store) - - } else if storeBlockHeight == appBlockHeight+1 && storeBlockHeight == stateBlockHeight+1 { - // We crashed after saving the block - // but before Commit (both the state and app are behind), - // so run through consensus with the real app - log.Info("Replay last block using real app") - h.replayLastBlock(h.config, h.state, proxyApp.Consensus(), h.store) + // the app should never be ahead of the store (but this is under app's control) + return nil, ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight} - } else { - // store is more than one ahead, - // so app wants to replay many blocks. - // replay all blocks from appBlockHeight+1 to storeBlockHeight-1. - // Replay the final block through consensus - - var appHash []byte - var err error - for i := appBlockHeight + 1; i <= storeBlockHeight; i++ { - log.Info("Applying block", "height", i) - h.nBlocks += 1 - block := h.store.LoadBlock(i) - appHash, err = applyBlock(proxyApp.Consensus(), block) - if err != nil { - return err - } + } else if storeBlockHeight < stateBlockHeight { + // the state should never be ahead of the store (this is under tendermint's control) + PanicSanity(Fmt("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight)) + + } else if storeBlockHeight > stateBlockHeight+1 { + // store should be at most one ahead of the state (this is under tendermint's control) + PanicSanity(Fmt("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1)) + } + + // Now either store is equal to state, or one ahead. + // For each, consider all cases of where the app could be, given app <= store + if storeBlockHeight == stateBlockHeight { + // Tendermint ran Commit and saved the state. + // Either the app is asking for replay, or we're all synced up. + if appBlockHeight < storeBlockHeight { + // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) + return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, false) + + } else if appBlockHeight == storeBlockHeight { + // we're good! + return appHash, nil + } + + } else if storeBlockHeight == stateBlockHeight+1 { + // We saved the block in the store but haven't updated the state, + // so we'll need to replay a block using the WAL. + if appBlockHeight < stateBlockHeight { + // the app is further behind than it should be, so replay blocks + // but leave the last block to go through the WAL + return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, true) + + } else if appBlockHeight == stateBlockHeight { + // We haven't run Commit (both the state and app are one block behind), + // so run through consensus with the real app + log.Info("Replay last block using real app") + return h.replayLastBlock(h.config, h.state, proxyApp.Consensus(), h.store) + + } else if appBlockHeight == storeBlockHeight { + // We ran Commit, but didn't save the state, so run through consensus with mock app + mockApp := newMockProxyApp(appHash) + log.Info("Replay last block using mock app") + return h.replayLastBlock(h.config, h.state, mockApp, h.store) } - // TODO: should we be playing the final block through the consensus, instead of using applyBlock? - // h.replayLastBlock(h.config, h.state, proxyApp.Consensus(), h.store) + } + + PanicSanity("Should never happen") + return nil, nil +} - if !bytes.Equal(h.state.AppHash, appHash) { - return errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)) +func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) { + // App is further behind than it should be, so we need to replay blocks. + // We replay all blocks from appBlockHeight+1 to storeBlockHeight-1, + // and let the final block be replayed through ReplayBlocks. + // Note that we don't have an old version of the state, + // so we by-pass state validation using applyBlock here. + + var appHash []byte + var err error + finalBlock := storeBlockHeight + if useReplayFunc { + finalBlock -= 1 + } + for i := appBlockHeight + 1; i <= finalBlock; i++ { + log.Info("Applying block", "height", i) + h.nBlocks += 1 + block := h.store.LoadBlock(i) + appHash, err = applyBlock(proxyApp.Consensus(), block) + if err != nil { + return nil, err } - return nil } - return nil + + if useReplayFunc { + // sync the final block + appHash, err = h.ReplayBlocks(appHash, finalBlock, proxyApp) + if err != nil { + return appHash, err + } + } + + if !bytes.Equal(h.state.AppHash, appHash) { + return nil, errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)) + } + + return appHash, nil } //--------------------------------------------------------------------------------