diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 1c2e9cb05..5d6747859 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -248,7 +248,7 @@ func newConsensusStateForReplay(config cfg.Config) *ConsensusState { state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) // Create proxyAppConn connection (consensus, mempool, query) - proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), sm.NewHandshaker(config, state, blockStore)) + proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), sm.NewHandshaker(config, state, blockStore, ReplayLastBlock)) _, err := proxyApp.Start() if err != nil { Exit(Fmt("Error starting proxy app conns: %v", err)) diff --git a/consensus/state.go b/consensus/state.go index 78e6c0eaa..c872233d2 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -15,7 +15,6 @@ import ( cfg "github.com/tendermint/go-config" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" - mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -227,7 +226,7 @@ type ConsensusState struct { config cfg.Config proxyAppConn proxy.AppConnConsensus blockStore *bc.BlockStore - mempool *mempl.Mempool + mempool sm.Mempool privValidator PrivValidator // for signing votes @@ -255,7 +254,20 @@ type ConsensusState struct { done chan struct{} } -func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { +func ReplayLastBlock(config cfg.Config, state *sm.State, proxyApp proxy.AppConnConsensus, blockStore sm.BlockStore) { + mempool := sm.MockMempool{} + cs := NewConsensusState(config, state, proxyApp, blockStore.(*bc.BlockStore), mempool) + + evsw := types.NewEventSwitch() + cs.SetEventSwitch(evsw) + newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 0) + + cs.Start() + <-newBlockCh + cs.Stop() +} + +func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool sm.Mempool) *ConsensusState { cs := &ConsensusState{ config: config, proxyAppConn: proxyAppConn, diff --git a/mempool/mempool.go b/mempool/mempool.go index 6fe4d7049..e960f520f 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -7,13 +7,13 @@ import ( "sync/atomic" "time" + abci "github.com/tendermint/abci/types" auto "github.com/tendermint/go-autofile" "github.com/tendermint/go-clist" . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" - abci "github.com/tendermint/abci/types" ) /* @@ -249,7 +249,7 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { // Get the valid transactions remaining // If maxTxs is -1, there is no cap on returned transactions. -func (mem *Mempool) Reap(maxTxs int) []types.Tx { +func (mem *Mempool) Reap(maxTxs int) types.Txs { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() @@ -263,7 +263,7 @@ func (mem *Mempool) Reap(maxTxs int) []types.Tx { } // maxTxs: -1 means uncapped, 0 means none -func (mem *Mempool) collectTxs(maxTxs int) []types.Tx { +func (mem *Mempool) collectTxs(maxTxs int) types.Txs { if maxTxs == 0 { return []types.Tx{} } else if maxTxs < 0 { @@ -281,7 +281,7 @@ func (mem *Mempool) collectTxs(maxTxs int) []types.Tx { // Mempool will discard these txs. // NOTE: this should be called *after* block is committed by consensus. // NOTE: unsafe; Lock/Unlock must be managed by caller -func (mem *Mempool) Update(height int, txs []types.Tx) { +func (mem *Mempool) Update(height int, txs types.Txs) { // TODO: check err ? mem.proxyAppConn.FlushSync() // To flush async resCb calls e.g. from CheckTx diff --git a/node/node.go b/node/node.go index d9eeb1f42..8ccd47c06 100644 --- a/node/node.go +++ b/node/node.go @@ -64,7 +64,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato state := sm.GetState(config, stateDB) // Create the proxyApp, which manages connections (consensus, mempool, query) - proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(config, state, blockStore)) + proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(config, state, blockStore, consensus.ReplayLastBlock)) if _, err := proxyApp.Start(); err != nil { cmn.Exit(cmn.Fmt("Error starting proxy app connections: %v", err)) } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 16bd8f20f..fb06c3ff1 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -31,7 +31,7 @@ type Consensus interface { type Mempool interface { Size() int CheckTx(types.Tx, func(*abci.Response)) error - Reap(int) []types.Tx + Reap(int) types.Txs Flush() } diff --git a/state/execution.go b/state/execution.go index 3dbf5e28c..e4b7cff48 100644 --- a/state/execution.go +++ b/state/execution.go @@ -1,7 +1,6 @@ package state import ( - "bytes" "errors" "github.com/ebuchman/fail-test" @@ -278,15 +277,20 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl type Mempool interface { Lock() Unlock() - Update(height int, txs []types.Tx) + + CheckTx(types.Tx, func(*abci.Response)) error + Reap(int) types.Txs + Update(height int, txs types.Txs) } type MockMempool struct { } -func (m MockMempool) Lock() {} -func (m MockMempool) Unlock() {} -func (m MockMempool) Update(height int, txs []types.Tx) {} +func (m MockMempool) Lock() {} +func (m MockMempool) Unlock() {} +func (m MockMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) error { return nil } +func (m MockMempool) Reap(n int) types.Txs { return types.Txs{} } +func (m MockMempool) Update(height int, txs types.Txs) {} //---------------------------------------------------------------- // Handshake with app to sync to latest state of core by replaying blocks @@ -298,16 +302,19 @@ type BlockStore interface { LoadBlockMeta(height int) *types.BlockMeta } +type blockReplayFunc func(cfg.Config, *State, proxy.AppConnConsensus, BlockStore) + type Handshaker struct { - config cfg.Config - state *State - store BlockStore + config cfg.Config + state *State + store BlockStore + replayLastBlock blockReplayFunc nBlocks int // number of blocks applied to the state } -func NewHandshaker(config cfg.Config, state *State, store BlockStore) *Handshaker { - return &Handshaker{config, state, store, 0} +func NewHandshaker(config cfg.Config, state *State, store BlockStore, f blockReplayFunc) *Handshaker { + return &Handshaker{config, state, store, f, 0} } // TODO: retry the handshake/replay if it fails ? @@ -326,7 +333,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { // TODO: check version // replay blocks up to the latest in the blockstore - err = h.ReplayBlocks(appHash, blockHeight, proxyApp.Consensus()) + err = h.ReplayBlocks(appHash, blockHeight, proxyApp) if err != nil { return errors.New(Fmt("Error on replay: %v", err)) } @@ -340,7 +347,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { } // Replay all blocks after blockHeight and ensure the result matches the current state. -func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, appConnConsensus proxy.AppConnConsensus) error { +func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp proxy.AppConns) error { storeBlockHeight := h.store.Height() stateBlockHeight := h.state.LastBlockHeight @@ -353,85 +360,71 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, appConnCon return ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight} } else if storeBlockHeight == appBlockHeight { - // We ran Commit, but if we crashed before state.Save(), - // load the intermediate state and update the state.AppHash. - // NOTE: If ABCI allowed rollbacks, we could just replay the - // block even though it's been committed - stateAppHash := h.state.AppHash - lastBlockAppHash := h.store.LoadBlock(storeBlockHeight).AppHash - - if bytes.Equal(stateAppHash, appHash) { - // we're all synced up - log.Debug("ABCI RelpayBlocks: Already synced") - } else if bytes.Equal(stateAppHash, lastBlockAppHash) { - // we crashed after commit and before saving state, - // so load the intermediate state and update the hash - h.state.LoadIntermediate() - h.state.AppHash = appHash - log.Debug("ABCI RelpayBlocks: Loaded intermediate state and updated state.AppHash") - } else { - PanicSanity(Fmt("Unexpected state.AppHash: state.AppHash %X; app.AppHash %X, lastBlock.AppHash %X", stateAppHash, appHash, lastBlockAppHash)) + // We already ran Commit, so run through consensus with mock app + mockApp := newMockProxyApp(appHash) - } - return nil + h.replayLastBlock(h.config, h.state, mockApp, h.store) - } else if storeBlockHeight == appBlockHeight+1 && - storeBlockHeight == stateBlockHeight+1 { + } else if storeBlockHeight == appBlockHeight+1 { // We crashed after saving the block // but before Commit (both the state and app are behind), - // so just replay the block - - // check that the lastBlock.AppHash matches the state apphash - block := h.store.LoadBlock(storeBlockHeight) - if !bytes.Equal(block.Header.AppHash, appHash) { - return ErrLastStateMismatch{storeBlockHeight, block.Header.AppHash, appHash} - } - - blockMeta := h.store.LoadBlockMeta(storeBlockHeight) - - h.nBlocks += 1 - var eventCache types.Fireable // nil + // so run through consensus with the real app + h.replayLastBlock(h.config, h.state, proxyApp.Consensus(), h.store) - // replay the latest block - return h.state.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.BlockID.PartsHeader, MockMempool{}) - } else if storeBlockHeight != stateBlockHeight { - // unless we failed before committing or saving state (previous 2 case), - // the store and state should be at the same height! - PanicSanity(Fmt("Expected storeHeight (%d) and stateHeight (%d) to match.", storeBlockHeight, stateBlockHeight)) } else { // store is more than one ahead, // so app wants to replay many blocks - - // replay all blocks starting with appBlockHeight+1 - var eventCache types.Fireable // nil - - // TODO: use stateBlockHeight instead and let the consensus state - // do the replay - - var appHash []byte - for i := appBlockHeight + 1; i <= storeBlockHeight; i++ { - h.nBlocks += 1 - block := h.store.LoadBlock(i) - _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block) - if err != nil { - log.Warn("Error executing block on proxy app", "height", i, "err", err) - return err + /* + + // replay all blocks starting with appBlockHeight+1 + var eventCache types.Fireable // nil + + // TODO: use stateBlockHeight instead and let the consensus state + // do the replay + + var appHash []byte + for i := appBlockHeight + 1; i <= storeBlockHeight; i++ { + h.nBlocks += 1 + block := h.store.LoadBlock(i) + _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block) + if err != nil { + log.Warn("Error executing block on proxy app", "height", i, "err", err) + return err + } + // Commit block, get hash back + res := appConnConsensus.CommitSync() + if res.IsErr() { + log.Warn("Error in proxyAppConn.CommitSync", "error", res) + return res + } + if res.Log != "" { + log.Info("Commit.Log: " + res.Log) + } + appHash = res.Data } - // Commit block, get hash back - res := appConnConsensus.CommitSync() - if res.IsErr() { - log.Warn("Error in proxyAppConn.CommitSync", "error", res) - return res + if !bytes.Equal(h.state.AppHash, appHash) { + return errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)) } - if res.Log != "" { - log.Info("Commit.Log: " + res.Log) - } - appHash = res.Data - } - if !bytes.Equal(h.state.AppHash, appHash) { - return errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)) - } + */ return nil } return nil } + +//-------------------------------------------------------------------------------- + +func newMockProxyApp(appHash []byte) proxy.AppConnConsensus { + clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{appHash: appHash}) + cli, _ := clientCreator.NewABCIClient() + return proxy.NewAppConnConsensus(cli) +} + +type mockProxyApp struct { + abci.BaseApplication + + appHash []byte +} + +func (mock *mockProxyApp) Commit() abci.Result { + return abci.NewResultOK(mock.appHash, "") +}