diff --git a/consensus/replay.go b/consensus/replay.go index 4ed5573ee..53f876084 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -101,6 +101,8 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { defer func() { cs.replayMode = false }() // Ensure that ENDHEIGHT for this height doesn't exist + // NOTE: This is just a sanity check. As far as we know things work fine without it, + // and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT). gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight)) if found { return errors.New(Fmt("WAL should not contain height %d.", csHeight)) @@ -273,15 +275,18 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p } else if appBlockHeight == stateBlockHeight { // We haven't run Commit (both the state and app are one block behind), - // so run through consensus with the real app + // so ApplyBlock with the real app. + // NOTE: We could instead use the cs.WAL on cs.Start, + // but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT log.Info("Replay last block using real app") - return h.replayLastBlock(proxyApp.Consensus()) + return h.replayLastBlock(storeBlockHeight, proxyApp.Consensus()) } else if appBlockHeight == storeBlockHeight { - // We ran Commit, but didn't save the state, so run through consensus with mock app - mockApp := newMockProxyApp(appHash) + // We ran Commit, but didn't save the state, so ApplyBlock with mock app + abciResponses := h.state.LoadABCIResponses() + mockApp := newMockProxyApp(appHash, abciResponses) log.Info("Replay last block using mock app") - return h.replayLastBlock(mockApp) + return h.replayLastBlock(storeBlockHeight, mockApp) } } @@ -323,26 +328,21 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store return appHash, h.checkAppHash(appHash) } -// Replay the last block through the consensus and return the AppHash from after Commit. -func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, error) { +// ApplyBlock on the proxyApp with the last block. +func (h *Handshaker) replayLastBlock(height int, proxyApp proxy.AppConnConsensus) ([]byte, error) { mempool := types.MockMempool{} - cs := NewConsensusState(h.config, h.state, proxyApp, h.store, mempool) - evsw := types.NewEventSwitch() - evsw.Start() - defer evsw.Stop() - cs.SetEventSwitch(evsw) + var eventCache types.Fireable // nil + block := h.store.LoadBlock(height) + meta := h.store.LoadBlockMeta(height) - log.Notice("Attempting to replay last block", "height", h.store.Height()) - // run through the WAL, commit new block, stop - if _, err := cs.Start(); err != nil { + if err := h.state.ApplyBlock(eventCache, proxyApp, block, meta.BlockID.PartsHeader, mempool); err != nil { return nil, err } - cs.Stop() h.nBlocks += 1 - return cs.state.AppHash, nil + return h.state.AppHash, nil } func (h *Handshaker) checkAppHash(appHash []byte) error { @@ -354,9 +354,14 @@ func (h *Handshaker) checkAppHash(appHash []byte) error { } //-------------------------------------------------------------------------------- - -func newMockProxyApp(appHash []byte) proxy.AppConnConsensus { - clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{appHash: appHash}) +// mockProxyApp uses ABCIResponses to give the right results +// Useful because we don't want to call Commit() twice for the same block on the real app. + +func newMockProxyApp(appHash []byte, abciResponses *sm.ABCIResponses) proxy.AppConnConsensus { + clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{ + appHash: appHash, + abciResponses: abciResponses, + }) cli, _ := clientCreator.NewABCIClient() return proxy.NewAppConnConsensus(cli) } @@ -364,7 +369,24 @@ func newMockProxyApp(appHash []byte) proxy.AppConnConsensus { type mockProxyApp struct { abci.BaseApplication - appHash []byte + appHash []byte + txCount int + abciResponses *sm.ABCIResponses +} + +func (mock *mockProxyApp) DeliverTx(tx []byte) abci.Result { + r := mock.abciResponses.DeliverTx[mock.txCount] + mock.txCount += 1 + return abci.Result{ + r.Code, + r.Data, + r.Log, + } +} + +func (mock *mockProxyApp) EndBlock(height uint64) abci.ResponseEndBlock { + mock.txCount = 0 + return mock.abciResponses.EndBlock } func (mock *mockProxyApp) Commit() abci.Result { diff --git a/consensus/state.go b/consensus/state.go index 9c652c95a..f1423e2f2 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1202,12 +1202,6 @@ func (cs *ConsensusState) finalizeCommit(height int) { fail.Fail() // XXX - if cs.wal != nil { - cs.wal.writeEndHeight(height) - } - - fail.Fail() // XXX - // Save to blockStore. if cs.blockStore.Height() < block.Height { // NOTE: the seenCommit is local justification to commit this block, @@ -1222,13 +1216,22 @@ func (cs *ConsensusState) finalizeCommit(height int) { fail.Fail() // XXX + // Finish writing to the WAL for this height. + // NOTE: ConsensusState should not be started again + // until we successfully call ApplyBlock (eg. in Handshake after restart) + if cs.wal != nil { + cs.wal.writeEndHeight(height) + } + + fail.Fail() // XXX + // Create a copy of the state for staging // and an event cache for txs stateCopy := cs.state.Copy() eventCache := types.NewEventCache(cs.evsw) // Execute and commit the block, update and save the state, and update the mempool. - // All calls to the proxyAppConn should come here. + // All calls to the proxyAppConn come here. // NOTE: the block.AppHash wont reflect these txs until the next block err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) if err != nil { @@ -1238,7 +1241,12 @@ func (cs *ConsensusState) finalizeCommit(height int) { fail.Fail() // XXX - // Fire off event for new block. + // Fire event for new block. + // NOTE: If we fail before firing, these events will never fire + // + // Some options (for which they may fire more than once. I guess that's fine): + // * Fire before persisting state, in ApplyBlock + // * Fire on start up if we haven't written any new WAL msgs types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) eventCache.Flush() diff --git a/state/execution.go b/state/execution.go index fb0eac02e..d978c1956 100644 --- a/state/execution.go +++ b/state/execution.go @@ -223,6 +223,9 @@ func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConn fail.Fail() // XXX + // index txs. This could run in the background + s.indexTxs(abciResponses) + // save the results before we commit s.SaveABCIResponses(abciResponses) @@ -278,6 +281,17 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl return nil } +func (s *State) indexTxs(abciResponses *ABCIResponses) { + // save the tx results using the TxIndexer + // NOTE: these may be overwriting, but the values should be the same. + batch := txindexer.NewBatch() + for i, d := range abciResponses.DeliverTx { + tx := abciResponses.txs[i] + batch.Index(tx.Hash(), types.TxResult{uint64(abciResponses.Height), uint32(i), *d}) + } + s.TxIndexer.Batch(batch) +} + // Apply and commit a block, but without all the state validation. // Returns the application root hash (result of abci.Commit) // TODO handle abciResponses diff --git a/state/state.go b/state/state.go index 604f0892a..eb7159e24 100644 --- a/state/state.go +++ b/state/state.go @@ -64,7 +64,7 @@ func loadState(db dbm.DB, key []byte) *State { wire.ReadBinaryPtr(&s, r, 0, n, err) if *err != nil { // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED - Exit(Fmt("Data has been corrupted or its spec has changed: %v\n", *err)) + Exit(Fmt("LoadState: Data has been corrupted or its spec has changed: %v\n", *err)) } // TODO: ensure that buf is completely read. } @@ -95,18 +95,8 @@ func (s *State) Save() { // Sets the ABCIResponses in the state and writes them to disk // in case we crash after app.Commit and before s.Save() func (s *State) SaveABCIResponses(abciResponses *ABCIResponses) { - // save the validators to the db s.db.SetSync(abciResponsesKey, abciResponses.Bytes()) - - // save the tx results using the TxIndexer - // NOTE: these may be overwriting, but the values should be the same. - batch := txindexer.NewBatch() - for i, d := range abciResponses.DeliverTx { - tx := abciResponses.txs[i] - batch.Index(tx.Hash(), types.TxResult{uint64(abciResponses.height), uint32(i), *d}) - } - s.TxIndexer.Batch(batch) } func (s *State) LoadABCIResponses() *ABCIResponses { @@ -115,10 +105,10 @@ func (s *State) LoadABCIResponses() *ABCIResponses { buf := s.db.Get(abciResponsesKey) if len(buf) != 0 { r, n, err := bytes.NewReader(buf), new(int), new(error) - wire.ReadBinaryPtr(&abciResponses.EndBlock.Diffs, r, 0, n, err) + wire.ReadBinaryPtr(abciResponses, r, 0, n, err) if *err != nil { // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED - Exit(Fmt("Data has been corrupted or its spec has changed: %v\n", *err)) + Exit(Fmt("LoadABCIResponses: Data has been corrupted or its spec has changed: %v\n", *err)) } // TODO: ensure that buf is completely read. } @@ -191,25 +181,26 @@ func GetState(config cfg.Config, stateDB dbm.DB) *State { // ABCIResponses holds intermediate state during block processing type ABCIResponses struct { - height int - txs types.Txs // for reference later + Height int - DeliverTx []*abci.ResponseDeliverTx // results of the txs, populated in the proxyCb - EndBlock abci.ResponseEndBlock // changes to the validator set + DeliverTx []*abci.ResponseDeliverTx + EndBlock abci.ResponseEndBlock + + txs types.Txs // for reference later } func NewABCIResponses(block *types.Block) *ABCIResponses { return &ABCIResponses{ - height: block.Height, - txs: block.Data.Txs, + Height: block.Height, DeliverTx: make([]*abci.ResponseDeliverTx, block.NumTxs), + txs: block.Data.Txs, } } -// Serialize the list of validators +// Serialize the ABCIResponse func (a *ABCIResponses) Bytes() []byte { buf, n, err := new(bytes.Buffer), new(int), new(error) - wire.WriteBinary(a.EndBlock, buf, n, err) + wire.WriteBinary(*a, buf, n, err) if *err != nil { PanicCrisis(*err) } diff --git a/state/state_test.go b/state/state_test.go index a534cb695..dca83e801 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -1,8 +1,12 @@ package state import ( + "fmt" "testing" + "github.com/stretchr/testify/assert" + abci "github.com/tendermint/abci/types" + "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" "github.com/tendermint/tendermint/config/tendermint_test" ) @@ -40,3 +44,30 @@ func TestStateSaveLoad(t *testing.T) { t.Fatal("expected state and its copy to be identical. got %v\n expected %v\n", loadedState, state) } } + +func TestABCIResponsesSaveLoad(t *testing.T) { + assert := assert.New(t) + + config := tendermint_test.ResetConfig("state_") + stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) + state := GetState(config, stateDB) + + state.LastBlockHeight += 1 + + // build mock responses + block := makeBlock(2, state) + abciResponses := NewABCIResponses(block) + abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo")} + abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok"} + abciResponses.EndBlock = abci.ResponseEndBlock{Diffs: []*abci.Validator{ + { + PubKey: crypto.GenPrivKeyEd25519().PubKey().Bytes(), + Power: 10, + }, + }} + abciResponses.txs = nil + + state.SaveABCIResponses(abciResponses) + abciResponses2 := state.LoadABCIResponses() + assert.Equal(abciResponses, abciResponses2, fmt.Sprintf("ABCIResponses don't match: Got %v, Expected %v", abciResponses2, abciResponses)) +}