From cb279bf662cc64af57b4758404ae0373e4fb18ae Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 14 Apr 2017 15:33:19 -0400 Subject: [PATCH 01/14] state: ABCIResponses, s.Save() in ApplyBlock --- blockchain/reactor.go | 1 - consensus/state.go | 6 +-- state/execution.go | 85 ++++++++++++++++++++--------------------- state/state.go | 89 +++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 127 insertions(+), 54 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 90258825e..f88bccc3d 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -251,7 +251,6 @@ FOR_LOOP: // TODO This is bad, are we zombie? cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } - bcR.state.Save() } } continue FOR_LOOP diff --git a/consensus/state.go b/consensus/state.go index d69435814..fdbf43099 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1221,7 +1221,7 @@ func (cs *ConsensusState) finalizeCommit(height int) { stateCopy := cs.state.Copy() eventCache := types.NewEventCache(cs.evsw) - // Execute and commit the block, and update the mempool. + // Execute and commit the block, update and save the state, and update the mempool. // All calls to the proxyAppConn should come here. // NOTE: the block.AppHash wont reflect these txs until the next block err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) @@ -1233,14 +1233,10 @@ func (cs *ConsensusState) finalizeCommit(height int) { fail.Fail() // XXX // Fire off event for new block. - // TODO: Handle app failure. See #177 types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) eventCache.Flush() - // Save the state. - stateCopy.Save() - fail.Fail() // XXX // NewHeightStep! diff --git a/state/execution.go b/state/execution.go index 25d5dcd3a..943a64f7f 100644 --- a/state/execution.go +++ b/state/execution.go @@ -13,59 +13,39 @@ import ( "github.com/tendermint/tendermint/types" ) -// ExecBlock executes the block to mutate State. +//-------------------------------------------------- +// Execute the block + +// ExecBlock executes the block, but does NOT mutate State. // + validates the block // + executes block.Txs on the proxyAppConn -// + updates validator sets -// + returns block.Txs results -func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) ([]*types.TxResult, error) { +func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) { // Validate the block. if err := s.validateBlock(block); err != nil { return nil, ErrInvalidBlock(err) } - // compute bitarray of validators that signed - signed := commitBitArrayFromBlock(block) - _ = signed // TODO send on begin block - - // copy the valset - valSet := s.Validators.Copy() - nextValSet := valSet.Copy() - // Execute the block txs - txResults, changedValidators, err := execBlockOnProxyApp(eventCache, proxyAppConn, block) + abciResponses, err := execBlockOnProxyApp(eventCache, proxyAppConn, block) if err != nil { // There was some error in proxyApp // TODO Report error and wait for proxyApp to be available. return nil, ErrProxyAppConn(err) } - // update the validator set - err = updateValidators(nextValSet, changedValidators) - if err != nil { - log.Warn("Error changing validator set", "error", err) - // TODO: err or carry on? - } - - // All good! - // Update validator accums and set state variables - nextValSet.IncrementAccum(1) - s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet) - - fail.Fail() // XXX - - return txResults, nil + return abciResponses, nil } // Executes block's transactions on proxyAppConn. // Returns a list of transaction results and updates to the validator set // TODO: Generate a bitmap or otherwise store tx validity in state. -func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) ([]*types.TxResult, []*abci.Validator, error) { +func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) { var validTxs, invalidTxs = 0, 0 - txResults := make([]*types.TxResult, len(block.Txs)) txIndex := 0 + abciResponses := NewABCIResponses(block) + // Execute transactions and get hash proxyCb := func(req *abci.Request, res *abci.Response) { switch r := res.Value.(type) { @@ -84,12 +64,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo txError = txResult.Code.String() } - txResults[txIndex] = &types.TxResult{ - Height: uint64(block.Height), - Index: uint32(txIndex), - Tx: req.GetDeliverTx().Tx, - Result: *txResult, - } + abciResponses.TxResults[txIndex] = &types.TxResult{uint64(block.Height), uint32(txIndex), *txResult} txIndex++ // NOTE: if we count we can access the tx from the block instead of @@ -111,7 +86,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo err := proxyAppConn.BeginBlockSync(block.Hash(), types.TM2PB.Header(block.Header)) if err != nil { log.Warn("Error in proxyAppConn.BeginBlock", "error", err) - return nil, nil, err + return nil, err } fail.Fail() // XXX @@ -121,7 +96,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo fail.FailRand(len(block.Txs)) // XXX proxyAppConn.DeliverTxAsync(tx) if err := proxyAppConn.Error(); err != nil { - return nil, nil, err + return nil, err } } @@ -131,7 +106,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo respEndBlock, err := proxyAppConn.EndBlockSync(uint64(block.Height)) if err != nil { log.Warn("Error in proxyAppConn.EndBlock", "error", err) - return nil, nil, err + return nil, err } fail.Fail() // XXX @@ -140,7 +115,9 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo if len(respEndBlock.Diffs) > 0 { log.Info("Update to validator set", "updates", abci.ValidatorsString(respEndBlock.Diffs)) } - return txResults, respEndBlock.Diffs, nil + abciResponses.Validators = respEndBlock.Diffs + + return abciResponses, nil } func updateValidators(validators *types.ValidatorSet, changedValidators []*abci.Validator) error { @@ -230,16 +207,30 @@ func (s *State) validateBlock(block *types.Block) error { return nil } -// ApplyBlock executes the block, then commits and updates the mempool -// atomically, optionally indexing transaction results. +//----------------------------------------------------------------------------- +// ApplyBlock executes the block, updates state w/ ABCI responses, +// then commits and updates the mempool atomically, then saves state. +// Transaction results are optionally indexed. + +// Execute and commit block against app, save block and state func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, partsHeader types.PartSetHeader, mempool types.Mempool) error { - txResults, err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader) + abciResponses, err := s.ExecBlock(eventCache, proxyAppConn, block) if err != nil { return fmt.Errorf("Exec failed for application: %v", err) } + fail.Fail() // XXX + + // save the results before we commit + s.SaveABCIResponses(abciResponses) + + fail.Fail() // XXX + + // now update the block and validators + s.SetBlockAndValidators(block.Header, partsHeader) + // lock mempool, commit state, update mempoool err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool) if err != nil { @@ -252,6 +243,11 @@ func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConn } s.TxIndexer.AddBatch(batch) + fail.Fail() // XXX + + // save the state + s.Save() + return nil } @@ -284,9 +280,10 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl // Apply and commit a block, but without all the state validation. // Returns the application root hash (result of abci.Commit) +// TODO handle abciResponses func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) { var eventCache types.Fireable // nil - _, _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block) + _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block) if err != nil { log.Warn("Error executing block on proxy app", "height", block.Height, "err", err) return nil, err diff --git a/state/state.go b/state/state.go index 14b324d5f..1b90d3406 100644 --- a/state/state.go +++ b/state/state.go @@ -6,6 +6,7 @@ import ( "sync" "time" + abci "github.com/tendermint/abci/types" . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" dbm "github.com/tendermint/go-db" @@ -16,7 +17,8 @@ import ( ) var ( - stateKey = []byte("stateKey") + stateKey = []byte("stateKey") + abciResponsesKey = []byte("abciResponsesKey") ) //----------------------------------------------------------------------------- @@ -31,7 +33,7 @@ type State struct { GenesisDoc *types.GenesisDoc ChainID string - // updated at end of ExecBlock + // updated at end of SetBlockAndValidators LastBlockHeight int // Genesis state has this set to 0. So, Block(H=0) does not exist. LastBlockID types.BlockID LastBlockTime time.Time @@ -42,6 +44,10 @@ type State struct { AppHash []byte TxIndexer txindex.TxIndexer `json:"-"` // Transaction indexer. + + // Intermediate results from processing + // Persisted separately from the state + abciResponses *ABCIResponses } func LoadState(db dbm.DB) *State { @@ -62,6 +68,8 @@ func loadState(db dbm.DB, key []byte) *State { } // TODO: ensure that buf is completely read. } + + s.LoadABCIResponses() return s } @@ -76,7 +84,8 @@ func (s *State) Copy() *State { Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), AppHash: s.AppHash, - TxIndexer: s.TxIndexer, // pointer here, not value + abciResponses: s.abciResponses, // pointer here, not value + TxIndexer: s.TxIndexer, // pointer here, not value } } @@ -86,6 +95,37 @@ func (s *State) Save() { s.db.SetSync(stateKey, s.Bytes()) } +// Sets the ABCIResponses in the state and writes them to disk +func (s *State) SaveABCIResponses(abciResponses *ABCIResponses) { + s.abciResponses = abciResponses + + // save the validators to the db + s.db.SetSync(abciResponsesKey, s.abciResponses.Bytes()) + + // save the tx results using the TxIndexer + batch := txindexer.NewBatch() + for i, r := range s.abciResponses.TxResults { + tx := s.abciResponses.Txs[i] + batch.Index(tx.Hash(), *r) + } + s.TxIndexer.Batch(batch) +} + +func (s *State) LoadABCIResponses() { + s.abciResponses = new(ABCIResponses) + + buf := s.db.Get(abciResponsesKey) + if len(buf) != 0 { + r, n, err := bytes.NewReader(buf), new(int), new(error) + wire.ReadBinaryPtr(&s.abciResponses.Validators, r, 0, n, err) + if *err != nil { + // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED + Exit(Fmt("Data has been corrupted or its spec has changed: %v\n", *err)) + } + // TODO: ensure that buf is completely read. + } +} + func (s *State) Equals(s2 *State) bool { return bytes.Equal(s.Bytes(), s2.Bytes()) } @@ -101,7 +141,21 @@ func (s *State) Bytes() []byte { // Mutate state variables to match block and validators // after running EndBlock -func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) { +func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader) { + + // copy the valset + prevValSet := s.Validators.Copy() + nextValSet := prevValSet.Copy() + + // update the validator set + err := updateValidators(nextValSet, s.abciResponses.Validators) + if err != nil { + log.Warn("Error changing validator set", "error", err) + // TODO: err or carry on? + } + // Update validator accums and set state variables + nextValSet.IncrementAccum(1) + s.setBlockAndValidators(header.Height, types.BlockID{header.Hash(), blockPartsHeader}, header.Time, prevValSet, nextValSet) @@ -134,6 +188,33 @@ func GetState(config cfg.Config, stateDB dbm.DB) *State { return state } +//-------------------------------------------------- +// ABCIResponses holds intermediate state during block processing + +type ABCIResponses struct { + Validators []*abci.Validator // changes to the validator set + + Txs types.Txs // for reference later + TxResults []*types.TxResult // results of the txs, populated in the proxyCb +} + +func NewABCIResponses(block *types.Block) *ABCIResponses { + return &ABCIResponses{ + Txs: block.Data.Txs, + TxResults: make([]*types.TxResult, block.NumTxs), + } +} + +// Serialize the list of validators +func (a *ABCIResponses) Bytes() []byte { + buf, n, err := new(bytes.Buffer), new(int), new(error) + wire.WriteBinary(a.Validators, buf, n, err) + if *err != nil { + PanicCrisis(*err) + } + return buf.Bytes() +} + //----------------------------------------------------------------------------- // Genesis From 54b26869d51c6cce9df44aa34ff46e8cf5965459 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 14 Apr 2017 16:27:22 -0400 Subject: [PATCH 02/14] consensus/wal: #HEIGHT -> #ENDHEIGHT --- consensus/replay.go | 14 +++++++------- consensus/replay_test.go | 2 +- consensus/state.go | 6 ++++++ consensus/test_data/build.sh | 8 ++++---- consensus/test_data/empty_block.cswal | 2 +- consensus/test_data/many_blocks.cswal | 12 ++++++------ consensus/test_data/small_block1.cswal | 2 +- consensus/test_data/small_block2.cswal | 2 +- consensus/wal.go | 12 +++--------- 9 files changed, 30 insertions(+), 30 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index 731e7d216..d532828f8 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -100,19 +100,19 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { cs.replayMode = true defer func() { cs.replayMode = false }() - // Ensure that height+1 doesn't exist - gr, found, err := cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight+1)) + // Ensure that ENDHEIGHT for this height doesn't exist + gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight)) if found { - return errors.New(Fmt("WAL should not contain height %d.", csHeight+1)) + return errors.New(Fmt("WAL should not contain height %d.", csHeight)) } if gr != nil { gr.Close() } - // Search for height marker - gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight)) + // Search for last height marker + gr, found, err = cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight-1)) if err == io.EOF { - log.Warn("Replay: wal.group.Search returned EOF", "height", csHeight) + log.Warn("Replay: wal.group.Search returned EOF", "height", csHeight-1) return nil } else if err != nil { return err @@ -147,7 +147,7 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { //-------------------------------------------------------------------------------- // Parses marker lines of the form: -// #HEIGHT: 12345 +// #ENDHEIGHT: 12345 func makeHeightSearchFunc(height int) auto.SearchFunc { return func(line string) (int, error) { line = strings.TrimRight(line, "\n") diff --git a/consensus/replay_test.go b/consensus/replay_test.go index c70b60fa0..43204ab72 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -443,7 +443,7 @@ func buildTMStateFromChain(config cfg.Config, state *sm.State, chain []*types.Bl func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) { // Search for height marker - gr, found, err := wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(1)) + gr, found, err := wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(0)) if err != nil { return nil, nil, err } diff --git a/consensus/state.go b/consensus/state.go index fdbf43099..9c652c95a 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1202,6 +1202,12 @@ func (cs *ConsensusState) finalizeCommit(height int) { fail.Fail() // XXX + if cs.wal != nil { + cs.wal.writeEndHeight(height) + } + + fail.Fail() // XXX + // Save to blockStore. if cs.blockStore.Height() < block.Height { // NOTE: the seenCommit is local justification to commit this block, diff --git a/consensus/test_data/build.sh b/consensus/test_data/build.sh index ea0c9604a..2759c0e38 100644 --- a/consensus/test_data/build.sh +++ b/consensus/test_data/build.sh @@ -27,7 +27,7 @@ killall tendermint # /q would print up to and including the match, then quit. # /Q doesn't include the match. # http://unix.stackexchange.com/questions/11305/grep-show-all-the-file-up-to-the-match -sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/empty_block.cswal +sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/empty_block.cswal reset } @@ -41,7 +41,7 @@ sleep 7 killall tendermint kill -9 $PID -sed '/HEIGHT: 7/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/many_blocks.cswal +sed '/ENDHEIGHT: 6/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/many_blocks.cswal reset } @@ -56,7 +56,7 @@ sleep 10 killall tendermint kill -9 $PID -sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block1.cswal +sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block1.cswal reset } @@ -73,7 +73,7 @@ sleep 5 killall tendermint kill -9 $PID -sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block2.cswal +sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block2.cswal reset } diff --git a/consensus/test_data/empty_block.cswal b/consensus/test_data/empty_block.cswal index a3a3585ce..aa5b232c9 100644 --- a/consensus/test_data/empty_block.cswal +++ b/consensus/test_data/empty_block.cswal @@ -1,4 +1,4 @@ -#HEIGHT: 1 +#ENDHEIGHT: 0 {"time":"2016-12-18T05:05:33.502Z","msg":[3,{"duration":974084551,"height":1,"round":0,"step":1}]} {"time":"2016-12-18T05:05:33.505Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-12-18T05:05:33.505Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"71D2DA2336A9F84C22A28FF6C67F35F3478FC0AF"},"pol_round":-1,"pol_block_id":{"hash":"","parts":{"total":0,"hash":""}},"signature":[1,"62C0F2BCCB491399EEDAF8E85837ADDD4E25BAB7A84BFC4F0E88594531FBC6D4755DEC7E6427F04AD7EB8BB89502762AB4380C7BBA93A4C297E6180EC78E3504"]}}],"peer_key":""}]} diff --git a/consensus/test_data/many_blocks.cswal b/consensus/test_data/many_blocks.cswal index 9ef06c32c..fd103cb1e 100644 --- a/consensus/test_data/many_blocks.cswal +++ b/consensus/test_data/many_blocks.cswal @@ -1,4 +1,4 @@ -#HEIGHT: 1 +#ENDHEIGHT: 0 {"time":"2017-02-17T23:54:19.013Z","msg":[3,{"duration":969121813,"height":1,"round":0,"step":1}]} {"time":"2017-02-17T23:54:19.014Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2017-02-17T23:54:19.014Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"2E32C8D500E936D27A47FCE3FF4BE7C1AFB3FAE1"},"pol_round":-1,"pol_block_id":{"hash":"","parts":{"total":0,"hash":""}},"signature":[1,"105A5A834E9AE2FA2191CAB5CB20D63594BA7859BD3EB92F055C8A35476D71F0D89F9FD5B0FF030D021533C71A81BF6E8F026BF4A37FC637CF38CA35291A9D00"]}}],"peer_key":""}]} @@ -8,7 +8,7 @@ {"time":"2017-02-17T23:54:19.016Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} {"time":"2017-02-17T23:54:19.016Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":1,"round":0,"type":2,"block_id":{"hash":"3F32EE37F9EA674A2173CAD651836A8EE628B5C7","parts":{"total":1,"hash":"2E32C8D500E936D27A47FCE3FF4BE7C1AFB3FAE1"}},"signature":[1,"2B1070A5AB9305612A3AE74A8036D82B5E49E0DBBFBC7D723DB985CC8A8E72A52FF8E34D85273FEB8B901945CA541FA5142C3C4D43A04E9205ACECF53FD19B01"]}}],"peer_key":""}]} {"time":"2017-02-17T23:54:19.017Z","msg":[1,{"height":1,"round":0,"step":"RoundStepCommit"}]} -#HEIGHT: 2 +#ENDHEIGHT: 1 {"time":"2017-02-17T23:54:19.019Z","msg":[1,{"height":2,"round":0,"step":"RoundStepNewHeight"}]} {"time":"2017-02-17T23:54:20.017Z","msg":[3,{"duration":998073370,"height":2,"round":0,"step":1}]} {"time":"2017-02-17T23:54:20.018Z","msg":[1,{"height":2,"round":0,"step":"RoundStepPropose"}]} @@ -19,7 +19,7 @@ {"time":"2017-02-17T23:54:20.021Z","msg":[1,{"height":2,"round":0,"step":"RoundStepPrecommit"}]} {"time":"2017-02-17T23:54:20.021Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":2,"round":0,"type":2,"block_id":{"hash":"32310D174A99844713693C9815D2CA660364E028","parts":{"total":1,"hash":"D008E9014CDDEA8EC95E1E99E21333241BD52DFC"}},"signature":[1,"AA9F03D0707752301D7CBFCF4F0BCDBD666A46C1CAED3910BD64A3C5C2874AAF328172646C951C5E2FD962359C382A3CBBA2C73EC9B533668C6386995B83EC08"]}}],"peer_key":""}]} {"time":"2017-02-17T23:54:20.022Z","msg":[1,{"height":2,"round":0,"step":"RoundStepCommit"}]} -#HEIGHT: 3 +#ENDHEIGHT: 2 {"time":"2017-02-17T23:54:20.025Z","msg":[1,{"height":3,"round":0,"step":"RoundStepNewHeight"}]} {"time":"2017-02-17T23:54:21.022Z","msg":[3,{"duration":997103974,"height":3,"round":0,"step":1}]} {"time":"2017-02-17T23:54:21.024Z","msg":[1,{"height":3,"round":0,"step":"RoundStepPropose"}]} @@ -30,7 +30,7 @@ {"time":"2017-02-17T23:54:21.028Z","msg":[1,{"height":3,"round":0,"step":"RoundStepPrecommit"}]} {"time":"2017-02-17T23:54:21.028Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":3,"round":0,"type":2,"block_id":{"hash":"37AF6866DA8C3167CFC280FAE47B6ED441B00D5B","parts":{"total":1,"hash":"2E5DE5777A5AD899CD2531304F42A470509DE989"}},"signature":[1,"C900519E305EC03392E7D197D5FAB535DB240C9C0BA5375A1679C75BAAA07C7410C0EF43CF97D98F2C08A1D739667D5ACFF6233A1FAE75D3DA275AEA422EFD0F"]}}],"peer_key":""}]} {"time":"2017-02-17T23:54:21.028Z","msg":[1,{"height":3,"round":0,"step":"RoundStepCommit"}]} -#HEIGHT: 4 +#ENDHEIGHT: 3 {"time":"2017-02-17T23:54:21.032Z","msg":[1,{"height":4,"round":0,"step":"RoundStepNewHeight"}]} {"time":"2017-02-17T23:54:22.028Z","msg":[3,{"duration":996302067,"height":4,"round":0,"step":1}]} {"time":"2017-02-17T23:54:22.030Z","msg":[1,{"height":4,"round":0,"step":"RoundStepPropose"}]} @@ -41,7 +41,7 @@ {"time":"2017-02-17T23:54:22.033Z","msg":[1,{"height":4,"round":0,"step":"RoundStepPrecommit"}]} {"time":"2017-02-17T23:54:22.033Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":4,"round":0,"type":2,"block_id":{"hash":"04715E223BF4327FFA9B0D5AD849B74A099D5DEC","parts":{"total":1,"hash":"24CEBCBEB833F56D47AD14354071B3B7A243068A"}},"signature":[1,"F544743F17479A61F94B0F68C63D254BD60493D78E818D48A5859133619AEE5E92C47CAD89C654DF64E0911C3152091E047555D5F14655D95B9681AE9B336505"]}}],"peer_key":""}]} {"time":"2017-02-17T23:54:22.034Z","msg":[1,{"height":4,"round":0,"step":"RoundStepCommit"}]} -#HEIGHT: 5 +#ENDHEIGHT: 4 {"time":"2017-02-17T23:54:22.036Z","msg":[1,{"height":5,"round":0,"step":"RoundStepNewHeight"}]} {"time":"2017-02-17T23:54:23.034Z","msg":[3,{"duration":997096276,"height":5,"round":0,"step":1}]} {"time":"2017-02-17T23:54:23.035Z","msg":[1,{"height":5,"round":0,"step":"RoundStepPropose"}]} @@ -52,7 +52,7 @@ {"time":"2017-02-17T23:54:23.038Z","msg":[1,{"height":5,"round":0,"step":"RoundStepPrecommit"}]} {"time":"2017-02-17T23:54:23.038Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":5,"round":0,"type":2,"block_id":{"hash":"FDC6D837995BEBBBFCBF3E7D7CF44F8FDA448543","parts":{"total":1,"hash":"A52BAA9C2E52E633A1605F4B930205613E3E7A2F"}},"signature":[1,"DF51D23D5D2C57598F67791D953A6C2D9FC5865A3048ADA4469B37500D2996B95732E0DC6F99EAEAEA12B4818CE355C7B701D16857D2AC767D740C2E30E9260C"]}}],"peer_key":""}]} {"time":"2017-02-17T23:54:23.038Z","msg":[1,{"height":5,"round":0,"step":"RoundStepCommit"}]} -#HEIGHT: 6 +#ENDHEIGHT: 5 {"time":"2017-02-17T23:54:23.041Z","msg":[1,{"height":6,"round":0,"step":"RoundStepNewHeight"}]} {"time":"2017-02-17T23:54:24.038Z","msg":[3,{"duration":997341910,"height":6,"round":0,"step":1}]} {"time":"2017-02-17T23:54:24.040Z","msg":[1,{"height":6,"round":0,"step":"RoundStepPropose"}]} diff --git a/consensus/test_data/small_block1.cswal b/consensus/test_data/small_block1.cswal index 90103dff3..d4eff73f1 100644 --- a/consensus/test_data/small_block1.cswal +++ b/consensus/test_data/small_block1.cswal @@ -1,4 +1,4 @@ -#HEIGHT: 1 +#ENDHEIGHT: 0 {"time":"2016-12-18T05:05:38.593Z","msg":[3,{"duration":970717663,"height":1,"round":0,"step":1}]} {"time":"2016-12-18T05:05:38.595Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-12-18T05:05:38.595Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"A434EC796DF1CECC01296E953839C4675863A4E5"},"pol_round":-1,"pol_block_id":{"hash":"","parts":{"total":0,"hash":""}},"signature":[1,"39563C3C7EDD9855B2971457A5DABF05CFDAF52805658847EB1F05115B8341344A77761CC85E670AF1B679DA9FC0905231957174699FE8326DBE7706209BDD0B"]}}],"peer_key":""}]} diff --git a/consensus/test_data/small_block2.cswal b/consensus/test_data/small_block2.cswal index 1be6c4c93..b5d1d282b 100644 --- a/consensus/test_data/small_block2.cswal +++ b/consensus/test_data/small_block2.cswal @@ -1,4 +1,4 @@ -#HEIGHT: 1 +#ENDHEIGHT: 0 {"time":"2016-12-18T05:05:43.641Z","msg":[3,{"duration":969409681,"height":1,"round":0,"step":1}]} {"time":"2016-12-18T05:05:43.643Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-12-18T05:05:43.643Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":5,"hash":"C916905C3C444501DDDAA1BF52E959B7531E762E"},"pol_round":-1,"pol_block_id":{"hash":"","parts":{"total":0,"hash":""}},"signature":[1,"F1A8E9928889C68FD393F3983B5362AECA4A95AA13FE3C78569B2515EC046893CB718071CAF54F3F1507DCD851B37CD5557EA17BB5471D2DC6FB5AC5FBB72E02"]}}],"peer_key":""}]} diff --git a/consensus/wal.go b/consensus/wal.go index 6d8eb3819..a89eff5e4 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -59,7 +59,7 @@ func (wal *WAL) OnStart() error { if err != nil { return err } else if size == 0 { - wal.writeHeight(1) + wal.writeEndHeight(0) } _, err = wal.group.Start() return err @@ -83,12 +83,6 @@ func (wal *WAL) Save(wmsg WALMessage) { } } } - // Write #HEIGHT: XYZ if new height - if edrs, ok := wmsg.(types.EventDataRoundState); ok { - if edrs.Step == RoundStepNewHeight.String() { - wal.writeHeight(edrs.Height) - } - } // Write the wal message var wmsgBytes = wire.JSONBytes(TimedWALMessage{time.Now(), wmsg}) err := wal.group.WriteLine(string(wmsgBytes)) @@ -101,8 +95,8 @@ func (wal *WAL) Save(wmsg WALMessage) { } } -func (wal *WAL) writeHeight(height int) { - wal.group.WriteLine(Fmt("#HEIGHT: %v", height)) +func (wal *WAL) writeEndHeight(height int) { + wal.group.WriteLine(Fmt("#ENDHEIGHT: %v", height)) // TODO: only flush when necessary if err := wal.group.Flush(); err != nil { From ed03cb5c17c2cc58068c7ca7fb6e3e2a21381c42 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 14 Apr 2017 16:40:54 -0400 Subject: [PATCH 03/14] consensus/replay: remove timeout --- consensus/replay.go | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index d532828f8..4ed5573ee 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -332,23 +332,13 @@ func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, e evsw.Start() defer evsw.Stop() cs.SetEventSwitch(evsw) - newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1) + log.Notice("Attempting to replay last block", "height", h.store.Height()) // run through the WAL, commit new block, stop if _, err := cs.Start(); err != nil { return nil, err } - defer cs.Stop() - - timeout := h.config.GetInt("timeout_handshake") - timer := time.NewTimer(time.Duration(timeout) * time.Millisecond) - log.Notice("Attempting to replay last block", "height", h.store.Height(), "timeout", timeout) - - select { - case <-newBlockCh: - case <-timer.C: - return nil, ErrReplayLastBlockTimeout - } + cs.Stop() h.nBlocks += 1 From 3a973b80ac0ca266c56f4eb2bb2d11185cf1a1db Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 14 Apr 2017 17:20:03 -0400 Subject: [PATCH 04/14] update glide --- glide.lock | 18 +++++------------- state/state.go | 2 +- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/glide.lock b/glide.lock index 63b3875d0..f0ede402e 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: d9724aa287c40d1b3856b6565f09235d809c8b2f7c6537c04f597137c0d6cd26 -updated: 2017-04-11T15:24:16.619608243-04:00 +updated: 2017-04-14T17:17:31.933202871-04:00 imports: - name: github.com/btcsuite/btcd version: b8df516b4b267acf2de46be593a9d948d1d2c420 @@ -85,7 +85,7 @@ imports: - name: github.com/tendermint/go-clist version: 3baa390bbaf7634251c42ad69a8682e7e3990552 - name: github.com/tendermint/go-common - version: 6af2364fa91ef2f3afc8ba0db33b66d9d3ae006c + version: 714fdaee3bb3f8670e721a75c5ddda8787b256dd subpackages: - test - name: github.com/tendermint/go-config @@ -111,13 +111,13 @@ imports: subpackages: - upnp - name: github.com/tendermint/go-rpc - version: 9d18cbe74e66f875afa36d2fa3be280e4a2dc9e6 + version: 4671c44b2d124f7f6f6243dbfbf4ae2bf42ee809 subpackages: - client - server - types - name: github.com/tendermint/go-wire - version: 09dae074245a8042aa689d084af774e6ad6a90bb + version: 50889e2b4a9ba65b67be86a486f25853d514b937 - name: github.com/tendermint/log15 version: ae0f3d6450da9eac7074b439c8e1c3cabf0d5ce6 subpackages: @@ -166,12 +166,4 @@ imports: - stats - tap - transport -testImports: -- name: github.com/davecgh/go-spew - version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9 - subpackages: - - spew -- name: github.com/pmezard/go-difflib - version: d8ed2627bdf02c080bf22230dbb337003b7aba2d - subpackages: - - difflib +testImports: [] diff --git a/state/state.go b/state/state.go index 1b90d3406..8a27104cd 100644 --- a/state/state.go +++ b/state/state.go @@ -147,7 +147,7 @@ func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader typ prevValSet := s.Validators.Copy() nextValSet := prevValSet.Copy() - // update the validator set + // update the validator set with the latest abciResponses err := updateValidators(nextValSet, s.abciResponses.Validators) if err != nil { log.Warn("Error changing validator set", "error", err) From 1684ec163fad41a9d97512a6e08082e966cd6567 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 14 Apr 2017 20:21:50 -0400 Subject: [PATCH 05/14] ABCIResponses not needed as field in state --- state/execution.go | 14 +++++++------- state/state.go | 41 +++++++++++++++++++++-------------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/state/execution.go b/state/execution.go index 943a64f7f..fb0eac02e 100644 --- a/state/execution.go +++ b/state/execution.go @@ -43,7 +43,6 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo var validTxs, invalidTxs = 0, 0 txIndex := 0 - abciResponses := NewABCIResponses(block) // Execute transactions and get hash @@ -64,7 +63,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo txError = txResult.Code.String() } - abciResponses.TxResults[txIndex] = &types.TxResult{uint64(block.Height), uint32(txIndex), *txResult} + abciResponses.DeliverTx[txIndex] = txResult txIndex++ // NOTE: if we count we can access the tx from the block instead of @@ -103,7 +102,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo fail.Fail() // XXX // End block - respEndBlock, err := proxyAppConn.EndBlockSync(uint64(block.Height)) + abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(uint64(block.Height)) if err != nil { log.Warn("Error in proxyAppConn.EndBlock", "error", err) return nil, err @@ -111,11 +110,12 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo fail.Fail() // XXX + valDiff := abciResponses.EndBlock.Diffs + log.Info("Executed block", "height", block.Height, "valid txs", validTxs, "invalid txs", invalidTxs) - if len(respEndBlock.Diffs) > 0 { - log.Info("Update to validator set", "updates", abci.ValidatorsString(respEndBlock.Diffs)) + if len(valDiff) > 0 { + log.Info("Update to validator set", "updates", abci.ValidatorsString(valDiff)) } - abciResponses.Validators = respEndBlock.Diffs return abciResponses, nil } @@ -229,7 +229,7 @@ func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConn fail.Fail() // XXX // now update the block and validators - s.SetBlockAndValidators(block.Header, partsHeader) + s.SetBlockAndValidators(block.Header, partsHeader, abciResponses) // lock mempool, commit state, update mempoool err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool) diff --git a/state/state.go b/state/state.go index 8a27104cd..604f0892a 100644 --- a/state/state.go +++ b/state/state.go @@ -68,8 +68,6 @@ func loadState(db dbm.DB, key []byte) *State { } // TODO: ensure that buf is completely read. } - - s.LoadABCIResponses() return s } @@ -84,8 +82,7 @@ func (s *State) Copy() *State { Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), AppHash: s.AppHash, - abciResponses: s.abciResponses, // pointer here, not value - TxIndexer: s.TxIndexer, // pointer here, not value + TxIndexer: s.TxIndexer, // pointer here, not value } } @@ -96,34 +93,36 @@ func (s *State) Save() { } // Sets the ABCIResponses in the state and writes them to disk +// in case we crash after app.Commit and before s.Save() func (s *State) SaveABCIResponses(abciResponses *ABCIResponses) { - s.abciResponses = abciResponses // save the validators to the db - s.db.SetSync(abciResponsesKey, s.abciResponses.Bytes()) + s.db.SetSync(abciResponsesKey, abciResponses.Bytes()) // save the tx results using the TxIndexer + // NOTE: these may be overwriting, but the values should be the same. batch := txindexer.NewBatch() - for i, r := range s.abciResponses.TxResults { - tx := s.abciResponses.Txs[i] - batch.Index(tx.Hash(), *r) + for i, d := range abciResponses.DeliverTx { + tx := abciResponses.txs[i] + batch.Index(tx.Hash(), types.TxResult{uint64(abciResponses.height), uint32(i), *d}) } s.TxIndexer.Batch(batch) } -func (s *State) LoadABCIResponses() { - s.abciResponses = new(ABCIResponses) +func (s *State) LoadABCIResponses() *ABCIResponses { + abciResponses := new(ABCIResponses) buf := s.db.Get(abciResponsesKey) if len(buf) != 0 { r, n, err := bytes.NewReader(buf), new(int), new(error) - wire.ReadBinaryPtr(&s.abciResponses.Validators, r, 0, n, err) + wire.ReadBinaryPtr(&abciResponses.EndBlock.Diffs, r, 0, n, err) if *err != nil { // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED Exit(Fmt("Data has been corrupted or its spec has changed: %v\n", *err)) } // TODO: ensure that buf is completely read. } + return abciResponses } func (s *State) Equals(s2 *State) bool { @@ -141,14 +140,14 @@ func (s *State) Bytes() []byte { // Mutate state variables to match block and validators // after running EndBlock -func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader) { +func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, abciResponses *ABCIResponses) { // copy the valset prevValSet := s.Validators.Copy() nextValSet := prevValSet.Copy() // update the validator set with the latest abciResponses - err := updateValidators(nextValSet, s.abciResponses.Validators) + err := updateValidators(nextValSet, abciResponses.EndBlock.Diffs) if err != nil { log.Warn("Error changing validator set", "error", err) // TODO: err or carry on? @@ -192,23 +191,25 @@ func GetState(config cfg.Config, stateDB dbm.DB) *State { // ABCIResponses holds intermediate state during block processing type ABCIResponses struct { - Validators []*abci.Validator // changes to the validator set + height int + txs types.Txs // for reference later - Txs types.Txs // for reference later - TxResults []*types.TxResult // results of the txs, populated in the proxyCb + DeliverTx []*abci.ResponseDeliverTx // results of the txs, populated in the proxyCb + EndBlock abci.ResponseEndBlock // changes to the validator set } func NewABCIResponses(block *types.Block) *ABCIResponses { return &ABCIResponses{ - Txs: block.Data.Txs, - TxResults: make([]*types.TxResult, block.NumTxs), + height: block.Height, + txs: block.Data.Txs, + DeliverTx: make([]*abci.ResponseDeliverTx, block.NumTxs), } } // Serialize the list of validators func (a *ABCIResponses) Bytes() []byte { buf, n, err := new(bytes.Buffer), new(int), new(error) - wire.WriteBinary(a.Validators, buf, n, err) + wire.WriteBinary(a.EndBlock, buf, n, err) if *err != nil { PanicCrisis(*err) } From 5109746b1cd597ed7f2b2d9a1c6818bd45989bab Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 14 Apr 2017 20:30:15 -0400 Subject: [PATCH 06/14] Handshake uses ApplyBlock, no ConsensuState --- consensus/replay.go | 64 ++++++++++++++++++++++++++++++--------------- consensus/state.go | 24 +++++++++++------ state/execution.go | 14 ++++++++++ state/state.go | 33 +++++++++-------------- state/state_test.go | 31 ++++++++++++++++++++++ 5 files changed, 116 insertions(+), 50 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index 4ed5573ee..53f876084 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -101,6 +101,8 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { defer func() { cs.replayMode = false }() // Ensure that ENDHEIGHT for this height doesn't exist + // NOTE: This is just a sanity check. As far as we know things work fine without it, + // and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT). gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight)) if found { return errors.New(Fmt("WAL should not contain height %d.", csHeight)) @@ -273,15 +275,18 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p } else if appBlockHeight == stateBlockHeight { // We haven't run Commit (both the state and app are one block behind), - // so run through consensus with the real app + // so ApplyBlock with the real app. + // NOTE: We could instead use the cs.WAL on cs.Start, + // but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT log.Info("Replay last block using real app") - return h.replayLastBlock(proxyApp.Consensus()) + return h.replayLastBlock(storeBlockHeight, proxyApp.Consensus()) } else if appBlockHeight == storeBlockHeight { - // We ran Commit, but didn't save the state, so run through consensus with mock app - mockApp := newMockProxyApp(appHash) + // We ran Commit, but didn't save the state, so ApplyBlock with mock app + abciResponses := h.state.LoadABCIResponses() + mockApp := newMockProxyApp(appHash, abciResponses) log.Info("Replay last block using mock app") - return h.replayLastBlock(mockApp) + return h.replayLastBlock(storeBlockHeight, mockApp) } } @@ -323,26 +328,21 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store return appHash, h.checkAppHash(appHash) } -// Replay the last block through the consensus and return the AppHash from after Commit. -func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, error) { +// ApplyBlock on the proxyApp with the last block. +func (h *Handshaker) replayLastBlock(height int, proxyApp proxy.AppConnConsensus) ([]byte, error) { mempool := types.MockMempool{} - cs := NewConsensusState(h.config, h.state, proxyApp, h.store, mempool) - evsw := types.NewEventSwitch() - evsw.Start() - defer evsw.Stop() - cs.SetEventSwitch(evsw) + var eventCache types.Fireable // nil + block := h.store.LoadBlock(height) + meta := h.store.LoadBlockMeta(height) - log.Notice("Attempting to replay last block", "height", h.store.Height()) - // run through the WAL, commit new block, stop - if _, err := cs.Start(); err != nil { + if err := h.state.ApplyBlock(eventCache, proxyApp, block, meta.BlockID.PartsHeader, mempool); err != nil { return nil, err } - cs.Stop() h.nBlocks += 1 - return cs.state.AppHash, nil + return h.state.AppHash, nil } func (h *Handshaker) checkAppHash(appHash []byte) error { @@ -354,9 +354,14 @@ func (h *Handshaker) checkAppHash(appHash []byte) error { } //-------------------------------------------------------------------------------- - -func newMockProxyApp(appHash []byte) proxy.AppConnConsensus { - clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{appHash: appHash}) +// mockProxyApp uses ABCIResponses to give the right results +// Useful because we don't want to call Commit() twice for the same block on the real app. + +func newMockProxyApp(appHash []byte, abciResponses *sm.ABCIResponses) proxy.AppConnConsensus { + clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{ + appHash: appHash, + abciResponses: abciResponses, + }) cli, _ := clientCreator.NewABCIClient() return proxy.NewAppConnConsensus(cli) } @@ -364,7 +369,24 @@ func newMockProxyApp(appHash []byte) proxy.AppConnConsensus { type mockProxyApp struct { abci.BaseApplication - appHash []byte + appHash []byte + txCount int + abciResponses *sm.ABCIResponses +} + +func (mock *mockProxyApp) DeliverTx(tx []byte) abci.Result { + r := mock.abciResponses.DeliverTx[mock.txCount] + mock.txCount += 1 + return abci.Result{ + r.Code, + r.Data, + r.Log, + } +} + +func (mock *mockProxyApp) EndBlock(height uint64) abci.ResponseEndBlock { + mock.txCount = 0 + return mock.abciResponses.EndBlock } func (mock *mockProxyApp) Commit() abci.Result { diff --git a/consensus/state.go b/consensus/state.go index 9c652c95a..f1423e2f2 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1202,12 +1202,6 @@ func (cs *ConsensusState) finalizeCommit(height int) { fail.Fail() // XXX - if cs.wal != nil { - cs.wal.writeEndHeight(height) - } - - fail.Fail() // XXX - // Save to blockStore. if cs.blockStore.Height() < block.Height { // NOTE: the seenCommit is local justification to commit this block, @@ -1222,13 +1216,22 @@ func (cs *ConsensusState) finalizeCommit(height int) { fail.Fail() // XXX + // Finish writing to the WAL for this height. + // NOTE: ConsensusState should not be started again + // until we successfully call ApplyBlock (eg. in Handshake after restart) + if cs.wal != nil { + cs.wal.writeEndHeight(height) + } + + fail.Fail() // XXX + // Create a copy of the state for staging // and an event cache for txs stateCopy := cs.state.Copy() eventCache := types.NewEventCache(cs.evsw) // Execute and commit the block, update and save the state, and update the mempool. - // All calls to the proxyAppConn should come here. + // All calls to the proxyAppConn come here. // NOTE: the block.AppHash wont reflect these txs until the next block err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) if err != nil { @@ -1238,7 +1241,12 @@ func (cs *ConsensusState) finalizeCommit(height int) { fail.Fail() // XXX - // Fire off event for new block. + // Fire event for new block. + // NOTE: If we fail before firing, these events will never fire + // + // Some options (for which they may fire more than once. I guess that's fine): + // * Fire before persisting state, in ApplyBlock + // * Fire on start up if we haven't written any new WAL msgs types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) eventCache.Flush() diff --git a/state/execution.go b/state/execution.go index fb0eac02e..d978c1956 100644 --- a/state/execution.go +++ b/state/execution.go @@ -223,6 +223,9 @@ func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConn fail.Fail() // XXX + // index txs. This could run in the background + s.indexTxs(abciResponses) + // save the results before we commit s.SaveABCIResponses(abciResponses) @@ -278,6 +281,17 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl return nil } +func (s *State) indexTxs(abciResponses *ABCIResponses) { + // save the tx results using the TxIndexer + // NOTE: these may be overwriting, but the values should be the same. + batch := txindexer.NewBatch() + for i, d := range abciResponses.DeliverTx { + tx := abciResponses.txs[i] + batch.Index(tx.Hash(), types.TxResult{uint64(abciResponses.Height), uint32(i), *d}) + } + s.TxIndexer.Batch(batch) +} + // Apply and commit a block, but without all the state validation. // Returns the application root hash (result of abci.Commit) // TODO handle abciResponses diff --git a/state/state.go b/state/state.go index 604f0892a..eb7159e24 100644 --- a/state/state.go +++ b/state/state.go @@ -64,7 +64,7 @@ func loadState(db dbm.DB, key []byte) *State { wire.ReadBinaryPtr(&s, r, 0, n, err) if *err != nil { // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED - Exit(Fmt("Data has been corrupted or its spec has changed: %v\n", *err)) + Exit(Fmt("LoadState: Data has been corrupted or its spec has changed: %v\n", *err)) } // TODO: ensure that buf is completely read. } @@ -95,18 +95,8 @@ func (s *State) Save() { // Sets the ABCIResponses in the state and writes them to disk // in case we crash after app.Commit and before s.Save() func (s *State) SaveABCIResponses(abciResponses *ABCIResponses) { - // save the validators to the db s.db.SetSync(abciResponsesKey, abciResponses.Bytes()) - - // save the tx results using the TxIndexer - // NOTE: these may be overwriting, but the values should be the same. - batch := txindexer.NewBatch() - for i, d := range abciResponses.DeliverTx { - tx := abciResponses.txs[i] - batch.Index(tx.Hash(), types.TxResult{uint64(abciResponses.height), uint32(i), *d}) - } - s.TxIndexer.Batch(batch) } func (s *State) LoadABCIResponses() *ABCIResponses { @@ -115,10 +105,10 @@ func (s *State) LoadABCIResponses() *ABCIResponses { buf := s.db.Get(abciResponsesKey) if len(buf) != 0 { r, n, err := bytes.NewReader(buf), new(int), new(error) - wire.ReadBinaryPtr(&abciResponses.EndBlock.Diffs, r, 0, n, err) + wire.ReadBinaryPtr(abciResponses, r, 0, n, err) if *err != nil { // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED - Exit(Fmt("Data has been corrupted or its spec has changed: %v\n", *err)) + Exit(Fmt("LoadABCIResponses: Data has been corrupted or its spec has changed: %v\n", *err)) } // TODO: ensure that buf is completely read. } @@ -191,25 +181,26 @@ func GetState(config cfg.Config, stateDB dbm.DB) *State { // ABCIResponses holds intermediate state during block processing type ABCIResponses struct { - height int - txs types.Txs // for reference later + Height int - DeliverTx []*abci.ResponseDeliverTx // results of the txs, populated in the proxyCb - EndBlock abci.ResponseEndBlock // changes to the validator set + DeliverTx []*abci.ResponseDeliverTx + EndBlock abci.ResponseEndBlock + + txs types.Txs // for reference later } func NewABCIResponses(block *types.Block) *ABCIResponses { return &ABCIResponses{ - height: block.Height, - txs: block.Data.Txs, + Height: block.Height, DeliverTx: make([]*abci.ResponseDeliverTx, block.NumTxs), + txs: block.Data.Txs, } } -// Serialize the list of validators +// Serialize the ABCIResponse func (a *ABCIResponses) Bytes() []byte { buf, n, err := new(bytes.Buffer), new(int), new(error) - wire.WriteBinary(a.EndBlock, buf, n, err) + wire.WriteBinary(*a, buf, n, err) if *err != nil { PanicCrisis(*err) } diff --git a/state/state_test.go b/state/state_test.go index a534cb695..dca83e801 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -1,8 +1,12 @@ package state import ( + "fmt" "testing" + "github.com/stretchr/testify/assert" + abci "github.com/tendermint/abci/types" + "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" "github.com/tendermint/tendermint/config/tendermint_test" ) @@ -40,3 +44,30 @@ func TestStateSaveLoad(t *testing.T) { t.Fatal("expected state and its copy to be identical. got %v\n expected %v\n", loadedState, state) } } + +func TestABCIResponsesSaveLoad(t *testing.T) { + assert := assert.New(t) + + config := tendermint_test.ResetConfig("state_") + stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) + state := GetState(config, stateDB) + + state.LastBlockHeight += 1 + + // build mock responses + block := makeBlock(2, state) + abciResponses := NewABCIResponses(block) + abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo")} + abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok"} + abciResponses.EndBlock = abci.ResponseEndBlock{Diffs: []*abci.Validator{ + { + PubKey: crypto.GenPrivKeyEd25519().PubKey().Bytes(), + Power: 10, + }, + }} + abciResponses.txs = nil + + state.SaveABCIResponses(abciResponses) + abciResponses2 := state.LoadABCIResponses() + assert.Equal(abciResponses, abciResponses2, fmt.Sprintf("ABCIResponses don't match: Got %v, Expected %v", abciResponses2, abciResponses)) +} From 935f70a346b2c76db8d4806d99be6660bea544a2 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 15 Apr 2017 01:33:30 -0400 Subject: [PATCH 07/14] comments and cleanup --- consensus/replay.go | 28 ++++++++++++++-------------- consensus/state.go | 13 ++++++++++--- state/execution.go | 10 +--------- state/state.go | 2 +- 4 files changed, 26 insertions(+), 27 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index 53f876084..0adb333c7 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -105,7 +105,7 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { // and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT). gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight)) if found { - return errors.New(Fmt("WAL should not contain height %d.", csHeight)) + return errors.New(Fmt("WAL should not contain #ENDHEIGHT %d.", csHeight)) } if gr != nil { gr.Close() @@ -114,13 +114,13 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { // Search for last height marker gr, found, err = cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight-1)) if err == io.EOF { - log.Warn("Replay: wal.group.Search returned EOF", "height", csHeight-1) + log.Warn("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1) return nil } else if err != nil { return err } if !found { - return errors.New(Fmt("WAL does not contain height %d.", csHeight)) + return errors.New(Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1)) } defer gr.Close() @@ -275,18 +275,18 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p } else if appBlockHeight == stateBlockHeight { // We haven't run Commit (both the state and app are one block behind), - // so ApplyBlock with the real app. + // so replayBlock with the real app. // NOTE: We could instead use the cs.WAL on cs.Start, // but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT log.Info("Replay last block using real app") - return h.replayLastBlock(storeBlockHeight, proxyApp.Consensus()) + return h.replayBlock(storeBlockHeight, proxyApp.Consensus()) } else if appBlockHeight == storeBlockHeight { - // We ran Commit, but didn't save the state, so ApplyBlock with mock app + // We ran Commit, but didn't save the state, so replayBlock with mock app abciResponses := h.state.LoadABCIResponses() mockApp := newMockProxyApp(appHash, abciResponses) log.Info("Replay last block using mock app") - return h.replayLastBlock(storeBlockHeight, mockApp) + return h.replayBlock(storeBlockHeight, mockApp) } } @@ -295,18 +295,18 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p return nil, nil } -func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) { +func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, mutateState bool) ([]byte, error) { // App is further behind than it should be, so we need to replay blocks. // We replay all blocks from appBlockHeight+1. - // If useReplayFunc == true, stop short of the last block - // so it can be replayed using the WAL in ReplayBlocks. // Note that we don't have an old version of the state, - // so we by-pass state validation using sm.ApplyBlock. + // so we by-pass state validation/mutation using sm.ApplyBlock. + // If mutateState == true, stop short of the last block + // so it can be replayed with a real state.ApplyBlock var appHash []byte var err error finalBlock := storeBlockHeight - if useReplayFunc { + if mutateState { finalBlock -= 1 } for i := appBlockHeight + 1; i <= finalBlock; i++ { @@ -320,7 +320,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store h.nBlocks += 1 } - if useReplayFunc { + if mutateState { // sync the final block return h.ReplayBlocks(appHash, finalBlock, proxyApp) } @@ -329,7 +329,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store } // ApplyBlock on the proxyApp with the last block. -func (h *Handshaker) replayLastBlock(height int, proxyApp proxy.AppConnConsensus) ([]byte, error) { +func (h *Handshaker) replayBlock(height int, proxyApp proxy.AppConnConsensus) ([]byte, error) { mempool := types.MockMempool{} var eventCache types.Fireable // nil diff --git a/consensus/state.go b/consensus/state.go index f1423e2f2..6ff97ddc8 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1217,8 +1217,12 @@ func (cs *ConsensusState) finalizeCommit(height int) { fail.Fail() // XXX // Finish writing to the WAL for this height. - // NOTE: ConsensusState should not be started again - // until we successfully call ApplyBlock (eg. in Handshake after restart) + // NOTE: If we fail before writing this, we'll never write it, + // and just recover by running ApplyBlock in the Handshake. + // If we moved it before persisting the block, we'd have to allow + // WAL replay for blocks with an #ENDHEIGHT + // As is, ConsensusState should not be started again + // until we successfully call ApplyBlock (ie. here or in Handshake after restart) if cs.wal != nil { cs.wal.writeEndHeight(height) } @@ -1244,9 +1248,10 @@ func (cs *ConsensusState) finalizeCommit(height int) { // Fire event for new block. // NOTE: If we fail before firing, these events will never fire // - // Some options (for which they may fire more than once. I guess that's fine): + // TODO: Either // * Fire before persisting state, in ApplyBlock // * Fire on start up if we haven't written any new WAL msgs + // Both options mean we may fire more than once. Is that fine ? types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) eventCache.Flush() @@ -1256,6 +1261,8 @@ func (cs *ConsensusState) finalizeCommit(height int) { // NewHeightStep! cs.updateToState(stateCopy) + fail.Fail() // XXX + // cs.StartTime is already set. // Schedule Round0 to start soon. cs.scheduleRound0(&cs.RoundState) diff --git a/state/execution.go b/state/execution.go index d978c1956..959231eed 100644 --- a/state/execution.go +++ b/state/execution.go @@ -88,19 +88,14 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo return nil, err } - fail.Fail() // XXX - // Run txs of block for _, tx := range block.Txs { - fail.FailRand(len(block.Txs)) // XXX proxyAppConn.DeliverTxAsync(tx) if err := proxyAppConn.Error(); err != nil { return nil, err } } - fail.Fail() // XXX - // End block abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(uint64(block.Height)) if err != nil { @@ -108,8 +103,6 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo return nil, err } - fail.Fail() // XXX - valDiff := abciResponses.EndBlock.Diffs log.Info("Executed block", "height", block.Height, "valid txs", validTxs, "invalid txs", invalidTxs) @@ -292,9 +285,8 @@ func (s *State) indexTxs(abciResponses *ABCIResponses) { s.TxIndexer.Batch(batch) } -// Apply and commit a block, but without all the state validation. +// Apply and commit a block on the proxyApp without validating or mutating the state // Returns the application root hash (result of abci.Commit) -// TODO handle abciResponses func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) { var eventCache types.Fireable // nil _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block) diff --git a/state/state.go b/state/state.go index eb7159e24..7d0f24558 100644 --- a/state/state.go +++ b/state/state.go @@ -186,7 +186,7 @@ type ABCIResponses struct { DeliverTx []*abci.ResponseDeliverTx EndBlock abci.ResponseEndBlock - txs types.Txs // for reference later + txs types.Txs // reference for indexing results by hash } func NewABCIResponses(block *types.Block) *ABCIResponses { From f9d00967444e4ca4c716dec171e71ae8c892f935 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 15 Apr 2017 02:02:33 -0400 Subject: [PATCH 08/14] support #HEIGHT based WAL --- consensus/replay.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index 0adb333c7..f43c5ecad 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -115,12 +115,31 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { gr, found, err = cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight-1)) if err == io.EOF { log.Warn("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1) - return nil + // if we upgraded from 0.9 to 0.9.1, we may have #HEIGHT instead + // TODO (0.10.0): remove this + gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight)) + if err == io.EOF { + log.Warn("Replay: wal.group.Search returned EOF", "#HEIGHT", csHeight) + return nil + } else if err != nil { + return err + } } else if err != nil { return err } if !found { - return errors.New(Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1)) + // if we upgraded from 0.9 to 0.9.1, we may have #HEIGHT instead + // TODO (0.10.0): remove this + gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight)) + if err == io.EOF { + log.Warn("Replay: wal.group.Search returned EOF", "#HEIGHT", csHeight) + return nil + } else if err != nil { + return err + } + + // TODO (0.10.0): uncomment + // return errors.New(Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1)) } defer gr.Close() From cd9e9e9f455328db4399ae191e147bdb5f79f885 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 17 Apr 2017 15:24:44 -0700 Subject: [PATCH 09/14] s/ExecBlock/ValExecBlock/g; s/sm.ApplyBlock/sm.ExecCommitBlock/g --- consensus/replay.go | 4 ++-- state/execution.go | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index f43c5ecad..441b27018 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -318,7 +318,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store // App is further behind than it should be, so we need to replay blocks. // We replay all blocks from appBlockHeight+1. // Note that we don't have an old version of the state, - // so we by-pass state validation/mutation using sm.ApplyBlock. + // so we by-pass state validation/mutation using sm.ExecCommitBlock. // If mutateState == true, stop short of the last block // so it can be replayed with a real state.ApplyBlock @@ -331,7 +331,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store for i := appBlockHeight + 1; i <= finalBlock; i++ { log.Info("Applying block", "height", i) block := h.store.LoadBlock(i) - appHash, err = sm.ApplyBlock(proxyApp.Consensus(), block) + appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block) if err != nil { return nil, err } diff --git a/state/execution.go b/state/execution.go index 959231eed..4874fdc70 100644 --- a/state/execution.go +++ b/state/execution.go @@ -16,10 +16,10 @@ import ( //-------------------------------------------------- // Execute the block -// ExecBlock executes the block, but does NOT mutate State. +// ValExecBlock executes the block, but does NOT mutate State. // + validates the block // + executes block.Txs on the proxyAppConn -func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) { +func (s *State) ValExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) { // Validate the block. if err := s.validateBlock(block); err != nil { return nil, ErrInvalidBlock(err) @@ -201,15 +201,15 @@ func (s *State) validateBlock(block *types.Block) error { } //----------------------------------------------------------------------------- -// ApplyBlock executes the block, updates state w/ ABCI responses, +// ApplyBlock validates & executes the block, updates state w/ ABCI responses, // then commits and updates the mempool atomically, then saves state. // Transaction results are optionally indexed. -// Execute and commit block against app, save block and state +// Validate, execute, and commit block against app, save block and state func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, partsHeader types.PartSetHeader, mempool types.Mempool) error { - abciResponses, err := s.ExecBlock(eventCache, proxyAppConn, block) + abciResponses, err := s.ValExecBlock(eventCache, proxyAppConn, block) if err != nil { return fmt.Errorf("Exec failed for application: %v", err) } @@ -285,9 +285,9 @@ func (s *State) indexTxs(abciResponses *ABCIResponses) { s.TxIndexer.Batch(batch) } -// Apply and commit a block on the proxyApp without validating or mutating the state +// Exec and commit a block on the proxyApp without validating or mutating the state // Returns the application root hash (result of abci.Commit) -func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) { +func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) { var eventCache types.Fireable // nil _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block) if err != nil { From 2ba3656ffd50213894eb189557069fd477f85652 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 17 Apr 2017 21:10:38 -0400 Subject: [PATCH 10/14] wal: gr.Close() --- consensus/replay.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index 441b27018..0ebbb735f 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -104,12 +104,12 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { // NOTE: This is just a sanity check. As far as we know things work fine without it, // and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT). gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight)) - if found { - return errors.New(Fmt("WAL should not contain #ENDHEIGHT %d.", csHeight)) - } if gr != nil { gr.Close() } + if found { + return errors.New(Fmt("WAL should not contain #ENDHEIGHT %d.", csHeight)) + } // Search for last height marker gr, found, err = cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight-1)) @@ -128,6 +128,7 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { return err } if !found { + gr.Close() // if we upgraded from 0.9 to 0.9.1, we may have #HEIGHT instead // TODO (0.10.0): remove this gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight)) From cb2ed5bb7c293c5dfba9b88325caf1f38dc0c46d Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 17 Apr 2017 21:14:35 -0400 Subject: [PATCH 11/14] fixes from review --- consensus/replay.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index 0ebbb735f..ab13d86f8 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -320,8 +320,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store // We replay all blocks from appBlockHeight+1. // Note that we don't have an old version of the state, // so we by-pass state validation/mutation using sm.ExecCommitBlock. - // If mutateState == true, stop short of the last block - // so it can be replayed with a real state.ApplyBlock + // If mutateState == true, the final block is replayed with h.replayBlock() var appHash []byte var err error @@ -342,7 +341,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store if mutateState { // sync the final block - return h.ReplayBlocks(appHash, finalBlock, proxyApp) + return h.replayBlock(storeBlockHeight, proxyApp.Consensus()) } return appHash, h.checkAppHash(appHash) From cf4074cc807d77f4042dff940d6710c68e427247 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 17 Apr 2017 19:40:40 -0700 Subject: [PATCH 12/14] defer gr.Close() fixes --- consensus/replay.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index ab13d86f8..bd0975f4d 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -126,9 +126,10 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { } } else if err != nil { return err + } else { + defer gr.Close() } if !found { - gr.Close() // if we upgraded from 0.9 to 0.9.1, we may have #HEIGHT instead // TODO (0.10.0): remove this gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight)) @@ -137,12 +138,13 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { return nil } else if err != nil { return err + } else { + defer gr.Close() } // TODO (0.10.0): uncomment // return errors.New(Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1)) } - defer gr.Close() log.Notice("Catchup by replaying consensus messages", "height", csHeight) From 29a893b193f6e94a0b10c169d110d0ae48ddcd27 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 18 Apr 2017 10:44:13 -0400 Subject: [PATCH 13/14] update comment --- state/state.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/state/state.go b/state/state.go index 7d0f24558..086b0e710 100644 --- a/state/state.go +++ b/state/state.go @@ -132,7 +132,8 @@ func (s *State) Bytes() []byte { // after running EndBlock func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, abciResponses *ABCIResponses) { - // copy the valset + // copy the valset so we can apply changes from EndBlock + // and update s.LastValidators and s.Validators prevValSet := s.Validators.Copy() nextValSet := prevValSet.Copy() From 52d03d0071c737e1048ea4bd8ef12f4cd490211c Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 18 Apr 2017 21:35:00 -0400 Subject: [PATCH 14/14] post rebase fixes --- state/execution.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/state/execution.go b/state/execution.go index 4874fdc70..0b1aff699 100644 --- a/state/execution.go +++ b/state/execution.go @@ -233,12 +233,6 @@ func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConn return fmt.Errorf("Commit failed for application: %v", err) } - batch := txindex.NewBatch(block.NumTxs) - for _, r := range txResults { - batch.Add(*r) - } - s.TxIndexer.AddBatch(batch) - fail.Fail() // XXX // save the state @@ -277,12 +271,17 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl func (s *State) indexTxs(abciResponses *ABCIResponses) { // save the tx results using the TxIndexer // NOTE: these may be overwriting, but the values should be the same. - batch := txindexer.NewBatch() + batch := txindex.NewBatch(len(abciResponses.DeliverTx)) for i, d := range abciResponses.DeliverTx { tx := abciResponses.txs[i] - batch.Index(tx.Hash(), types.TxResult{uint64(abciResponses.Height), uint32(i), *d}) + batch.Add(types.TxResult{ + Height: uint64(abciResponses.Height), + Index: uint32(i), + Tx: tx, + Result: *d, + }) } - s.TxIndexer.Batch(batch) + s.TxIndexer.AddBatch(batch) } // Exec and commit a block on the proxyApp without validating or mutating the state