From d3ae920bd0c40be9d88f06ac5ef26d5b0711e624 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 25 Aug 2016 00:18:03 -0400 Subject: [PATCH] state: ApplyBlock --- consensus/race.test | 1 + consensus/state.go | 44 +------------- node/node.go | 13 +--- proxy/multi_app_conn.go | 13 ++-- proxy/multi_app_conn_test.go | 22 +++++++ state/execution.go | 112 +++++++++++++++++++++++++++-------- state/state.go | 12 ++++ 7 files changed, 133 insertions(+), 84 deletions(-) create mode 100644 consensus/race.test create mode 100644 proxy/multi_app_conn_test.go diff --git a/consensus/race.test b/consensus/race.test new file mode 100644 index 000000000..46231439d --- /dev/null +++ b/consensus/race.test @@ -0,0 +1 @@ +ok github.com/tendermint/tendermint/consensus 5.928s diff --git a/consensus/state.go b/consensus/state.go index e5580f192..d26a3ed25 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1265,21 +1265,9 @@ func (cs *ConsensusState) finalizeCommit(height int) { // event cache for txs eventCache := types.NewEventCache(cs.evsw) - // Run the block on the State: - // + update validator sets - // + run txs on the proxyAppConn - err := stateCopy.ExecBlock(eventCache, cs.proxyAppConn, block, blockParts.Header()) - if err != nil { - // TODO: handle this gracefully. - PanicQ(Fmt("Exec failed for application: %v", err)) - } - - // lock mempool, commit state, update mempoool - err = cs.commitStateUpdateMempool(stateCopy, block) - if err != nil { - // TODO: handle this gracefully. - PanicQ(Fmt("Commit failed for application: %v", err)) - } + // Execute and commit the block + // NOTE: All calls to the proxyAppConn should come here + stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) // txs committed, bad ones removed from mepool; fire events // NOTE: the block.AppHash wont reflect these txs until the next block @@ -1309,32 +1297,6 @@ func (cs *ConsensusState) finalizeCommit(height int) { return } -// mempool must be locked during commit and update -// because state is typically reset on Commit and old txs must be replayed -// against committed state before new txs are run in the mempool, lest they be invalid -func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Block) error { - cs.mempool.Lock() - defer cs.mempool.Unlock() - - // Commit block, get hash back - res := cs.proxyAppConn.CommitSync() - if res.IsErr() { - log.Warn("Error in proxyAppConn.CommitSync", "error", res) - return res - } - if res.Log != "" { - log.Debug("Commit.Log: " + res.Log) - } - - // Set the state's new AppHash - s.AppHash = res.Data - - // Update mempool. - cs.mempool.Update(block.Height, block.Txs) - - return nil -} - //----------------------------------------------------------------------------- func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error { diff --git a/node/node.go b/node/node.go index b34d0e037..8ca6662a6 100644 --- a/node/node.go +++ b/node/node.go @@ -60,7 +60,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) // Get State - state := getState(config, stateDB) + state := sm.GetState(config, stateDB) // Create the proxyApp, which manages connections (consensus, mempool, query) proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore) @@ -295,17 +295,6 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255 return nodeInfo } -// Load the most recent state from "state" db, -// or create a new one (and save) from genesis. -func getState(config cfg.Config, stateDB dbm.DB) *sm.State { - state := sm.LoadState(stateDB) - if state == nil { - state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) - state.Save() - } - return state -} - //------------------------------------------------------------------------------ // Users wishing to: diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index cfd827853..fc8d1df2b 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -28,6 +28,9 @@ func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, bl return NewMultiAppConn(config, clientCreator, state, blockStore) } +//----------------------------- +// multiAppConn implements AppConns + // a multiAppConn is made of a few appConns (mempool, consensus, query) // and manages their underlying tmsp clients, including the handshake // which ensures the app and tendermint are synced. @@ -103,8 +106,9 @@ func (app *multiAppConn) OnStart() error { } // TODO: retry the handshake once if it fails the first time +// ... let Info take an argument determining its behaviour func (app *multiAppConn) Handshake() error { - // handshake is done on the query conn + // handshake is done via info request on the query conn res, tmspInfo, blockInfo, configInfo := app.queryConn.InfoSync() if res.IsErr() { return fmt.Errorf("Error calling Info. Code: %v; Data: %X; Log: %s", res.Code, res.Data, res.Log) @@ -127,12 +131,12 @@ func (app *multiAppConn) Handshake() error { _ = tmspInfo } - // of the last block (nil if we starting from 0) + // last block (nil if we starting from 0) var header *types.Header var partsHeader types.PartSetHeader - // check block - // if the blockHeight == 0, we will replay everything + // replay all blocks after blockHeight + // if blockHeight == 0, we will replay everything if blockHeight != 0 { blockMeta := app.blockStore.LoadBlockMeta(blockHeight) if blockMeta == nil { @@ -176,7 +180,6 @@ func NewTMSPClient(addr, transport string) (tmspcli.Client, error) { var client tmspcli.Client // use local app (for testing) - // TODO: local proxy app conn switch addr { case "nilapp": app := nilapp.NewNilApplication() diff --git a/proxy/multi_app_conn_test.go b/proxy/multi_app_conn_test.go new file mode 100644 index 000000000..3ff2520f6 --- /dev/null +++ b/proxy/multi_app_conn_test.go @@ -0,0 +1,22 @@ +package proxy + +import ( + "testing" + "time" + + "github.com/tendermint/go-p2p" + "github.com/tendermint/tendermint/config/tendermint_test" + "github.com/tendermint/tendermint/node" + "github.com/tendermint/tendermint/types" +) + +func TestPersistence(t *testing.T) { + + // create persistent dummy app + // set state on dummy app + // proxy handshake + + config := tendermint_test.ResetConfig("proxy_test_") + multiApp := NewMultiAppConn(config, state, blockStore) + +} diff --git a/state/execution.go b/state/execution.go index f5dcadae9..f37ce9b42 100644 --- a/state/execution.go +++ b/state/execution.go @@ -181,7 +181,58 @@ func (txErr InvalidTxError) Error() string { //----------------------------------------------------------------------------- -// Replay all blocks after blockHeight and ensure the result matches the current state +// mempool must be locked during commit and update +// because state is typically reset on Commit and old txs must be replayed +// against committed state before new txs are run in the mempool, lest they be invalid +func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, block *types.Block, mempool Mempool) error { + mempool.Lock() + defer mempool.Unlock() + + // flush out any CheckTx that have already started + // cs.proxyAppConn.FlushSync() // ?! XXX + + // Commit block, get hash back + res := proxyAppConn.CommitSync() + if res.IsErr() { + log.Warn("Error in proxyAppConn.CommitSync", "error", res) + return res + } + if res.Log != "" { + log.Debug("Commit.Log: " + res.Log) + } + + // Set the state's new AppHash + s.AppHash = res.Data + + // Update mempool. + mempool.Update(block.Height, block.Txs) + + return nil +} + +// Execute and commit block against app, save block and state +func (s *State) ApplyBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, + block *types.Block, partsHeader types.PartSetHeader, mempool Mempool) { + + // Run the block on the State: + // + update validator sets + // + run txs on the proxyAppConn + err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader) + if err != nil { + // TODO: handle this gracefully. + PanicQ(Fmt("Exec failed for application: %v", err)) + } + + // lock mempool, commit state, update mempoool + err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool) + if err != nil { + // TODO: handle this gracefully. + PanicQ(Fmt("Commit failed for application: %v", err)) + } +} + +// Replay all blocks after blockHeight and ensure the result matches the current state. +// XXX: blockStore must guarantee to have blocks for height <= blockStore.Height() func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHeader, appConnConsensus proxy.AppConnConsensus, blockStore proxy.BlockStore) error { @@ -197,36 +248,16 @@ func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHead stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals) } + // run the transactions + var eventCache events.Fireable // nil + // replay all blocks starting with blockHeight+1 for i := blockHeight + 1; i <= blockStore.Height(); i++ { blockMeta := blockStore.LoadBlockMeta(i) - if blockMeta == nil { - PanicSanity(Fmt("Nil blockMeta at height %d when blockStore height is %d", i, blockStore.Height())) - } - block := blockStore.LoadBlock(i) - if block == nil { - PanicSanity(Fmt("Nil block at height %d when blockStore height is %d", i, blockStore.Height())) - } - - // run the transactions - var eventCache events.Fireable // nil - err := stateCopy.ExecBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader) - if err != nil { - return fmt.Errorf("Error on ExecBlock: %v", err) - } + panicOnNilBlock(i, blockStore.Height(), block, blockMeta) // XXX - // commit the block (app should save the state) - res := appConnConsensus.CommitSync() - if res.IsErr() { - return fmt.Errorf("Error on Commit: %v", res) - } - if res.Log != "" { - log.Debug("Commit.Log: " + res.Log) - } - - // update the state hash - stateCopy.AppHash = res.Data + stateCopy.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{}) } // The computed state and the previously set state should be identical @@ -235,3 +266,32 @@ func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHead } return nil } + +func panicOnNilBlock(height, bsHeight int, block *types.Block, blockMeta *types.BlockMeta) { + if block == nil || blockMeta == nil { + // Sanity? + PanicCrisis(Fmt(` +block/blockMeta is nil for height <= blockStore.Height() (%d <= %d). +Block: %v, +BlockMeta: %v +`, height, bsHeight, block, blockMeta)) + + } +} + +//------------------------------------------------ +// Updates to the mempool need to be synchronized with committing a block +// so apps can reset their transient state on Commit + +type Mempool interface { + Lock() + Unlock() + Update(height int, txs []types.Tx) +} + +type mockMempool struct { +} + +func (m mockMempool) Lock() {} +func (m mockMempool) Unlock() {} +func (m mockMempool) Update(height int, txs []types.Tx) {} diff --git a/state/state.go b/state/state.go index 699f612e2..289c7a4d8 100644 --- a/state/state.go +++ b/state/state.go @@ -7,6 +7,7 @@ import ( "time" . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" dbm "github.com/tendermint/go-db" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" @@ -95,6 +96,17 @@ func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) { return s.LastValidators, s.Validators } +// Load the most recent state from "state" db, +// or create a new one (and save) from genesis. +func GetState(config cfg.Config, stateDB dbm.DB) *State { + state := LoadState(stateDB) + if state == nil { + state = MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) + state.Save() + } + return state +} + //----------------------------------------------------------------------------- // Genesis