Browse Source

state: ApplyBlock

pull/265/head
Ethan Buchman 8 years ago
parent
commit
d3ae920bd0
7 changed files with 133 additions and 84 deletions
  1. +1
    -0
      consensus/race.test
  2. +3
    -41
      consensus/state.go
  3. +1
    -12
      node/node.go
  4. +8
    -5
      proxy/multi_app_conn.go
  5. +22
    -0
      proxy/multi_app_conn_test.go
  6. +86
    -26
      state/execution.go
  7. +12
    -0
      state/state.go

+ 1
- 0
consensus/race.test View File

@ -0,0 +1 @@
ok github.com/tendermint/tendermint/consensus 5.928s

+ 3
- 41
consensus/state.go View File

@ -1265,21 +1265,9 @@ func (cs *ConsensusState) finalizeCommit(height int) {
// event cache for txs
eventCache := types.NewEventCache(cs.evsw)
// Run the block on the State:
// + update validator sets
// + run txs on the proxyAppConn
err := stateCopy.ExecBlock(eventCache, cs.proxyAppConn, block, blockParts.Header())
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Exec failed for application: %v", err))
}
// lock mempool, commit state, update mempoool
err = cs.commitStateUpdateMempool(stateCopy, block)
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Commit failed for application: %v", err))
}
// Execute and commit the block
// NOTE: All calls to the proxyAppConn should come here
stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
// txs committed, bad ones removed from mepool; fire events
// NOTE: the block.AppHash wont reflect these txs until the next block
@ -1309,32 +1297,6 @@ func (cs *ConsensusState) finalizeCommit(height int) {
return
}
// mempool must be locked during commit and update
// because state is typically reset on Commit and old txs must be replayed
// against committed state before new txs are run in the mempool, lest they be invalid
func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Block) error {
cs.mempool.Lock()
defer cs.mempool.Unlock()
// Commit block, get hash back
res := cs.proxyAppConn.CommitSync()
if res.IsErr() {
log.Warn("Error in proxyAppConn.CommitSync", "error", res)
return res
}
if res.Log != "" {
log.Debug("Commit.Log: " + res.Log)
}
// Set the state's new AppHash
s.AppHash = res.Data
// Update mempool.
cs.mempool.Update(block.Height, block.Txs)
return nil
}
//-----------------------------------------------------------------------------
func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error {


+ 1
- 12
node/node.go View File

@ -60,7 +60,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
// Get State
state := getState(config, stateDB)
state := sm.GetState(config, stateDB)
// Create the proxyApp, which manages connections (consensus, mempool, query)
proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore)
@ -295,17 +295,6 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255
return nodeInfo
}
// Load the most recent state from "state" db,
// or create a new one (and save) from genesis.
func getState(config cfg.Config, stateDB dbm.DB) *sm.State {
state := sm.LoadState(stateDB)
if state == nil {
state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
state.Save()
}
return state
}
//------------------------------------------------------------------------------
// Users wishing to:


+ 8
- 5
proxy/multi_app_conn.go View File

@ -28,6 +28,9 @@ func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, bl
return NewMultiAppConn(config, clientCreator, state, blockStore)
}
//-----------------------------
// multiAppConn implements AppConns
// a multiAppConn is made of a few appConns (mempool, consensus, query)
// and manages their underlying tmsp clients, including the handshake
// which ensures the app and tendermint are synced.
@ -103,8 +106,9 @@ func (app *multiAppConn) OnStart() error {
}
// TODO: retry the handshake once if it fails the first time
// ... let Info take an argument determining its behaviour
func (app *multiAppConn) Handshake() error {
// handshake is done on the query conn
// handshake is done via info request on the query conn
res, tmspInfo, blockInfo, configInfo := app.queryConn.InfoSync()
if res.IsErr() {
return fmt.Errorf("Error calling Info. Code: %v; Data: %X; Log: %s", res.Code, res.Data, res.Log)
@ -127,12 +131,12 @@ func (app *multiAppConn) Handshake() error {
_ = tmspInfo
}
// of the last block (nil if we starting from 0)
// last block (nil if we starting from 0)
var header *types.Header
var partsHeader types.PartSetHeader
// check block
// if the blockHeight == 0, we will replay everything
// replay all blocks after blockHeight
// if blockHeight == 0, we will replay everything
if blockHeight != 0 {
blockMeta := app.blockStore.LoadBlockMeta(blockHeight)
if blockMeta == nil {
@ -176,7 +180,6 @@ func NewTMSPClient(addr, transport string) (tmspcli.Client, error) {
var client tmspcli.Client
// use local app (for testing)
// TODO: local proxy app conn
switch addr {
case "nilapp":
app := nilapp.NewNilApplication()


+ 22
- 0
proxy/multi_app_conn_test.go View File

@ -0,0 +1,22 @@
package proxy
import (
"testing"
"time"
"github.com/tendermint/go-p2p"
"github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/types"
)
func TestPersistence(t *testing.T) {
// create persistent dummy app
// set state on dummy app
// proxy handshake
config := tendermint_test.ResetConfig("proxy_test_")
multiApp := NewMultiAppConn(config, state, blockStore)
}

+ 86
- 26
state/execution.go View File

@ -181,7 +181,58 @@ func (txErr InvalidTxError) Error() string {
//-----------------------------------------------------------------------------
// Replay all blocks after blockHeight and ensure the result matches the current state
// mempool must be locked during commit and update
// because state is typically reset on Commit and old txs must be replayed
// against committed state before new txs are run in the mempool, lest they be invalid
func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, block *types.Block, mempool Mempool) error {
mempool.Lock()
defer mempool.Unlock()
// flush out any CheckTx that have already started
// cs.proxyAppConn.FlushSync() // ?! XXX
// Commit block, get hash back
res := proxyAppConn.CommitSync()
if res.IsErr() {
log.Warn("Error in proxyAppConn.CommitSync", "error", res)
return res
}
if res.Log != "" {
log.Debug("Commit.Log: " + res.Log)
}
// Set the state's new AppHash
s.AppHash = res.Data
// Update mempool.
mempool.Update(block.Height, block.Txs)
return nil
}
// Execute and commit block against app, save block and state
func (s *State) ApplyBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus,
block *types.Block, partsHeader types.PartSetHeader, mempool Mempool) {
// Run the block on the State:
// + update validator sets
// + run txs on the proxyAppConn
err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader)
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Exec failed for application: %v", err))
}
// lock mempool, commit state, update mempoool
err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool)
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Commit failed for application: %v", err))
}
}
// Replay all blocks after blockHeight and ensure the result matches the current state.
// XXX: blockStore must guarantee to have blocks for height <= blockStore.Height()
func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHeader,
appConnConsensus proxy.AppConnConsensus, blockStore proxy.BlockStore) error {
@ -197,36 +248,16 @@ func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHead
stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals)
}
// run the transactions
var eventCache events.Fireable // nil
// replay all blocks starting with blockHeight+1
for i := blockHeight + 1; i <= blockStore.Height(); i++ {
blockMeta := blockStore.LoadBlockMeta(i)
if blockMeta == nil {
PanicSanity(Fmt("Nil blockMeta at height %d when blockStore height is %d", i, blockStore.Height()))
}
block := blockStore.LoadBlock(i)
if block == nil {
PanicSanity(Fmt("Nil block at height %d when blockStore height is %d", i, blockStore.Height()))
}
// run the transactions
var eventCache events.Fireable // nil
err := stateCopy.ExecBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader)
if err != nil {
return fmt.Errorf("Error on ExecBlock: %v", err)
}
panicOnNilBlock(i, blockStore.Height(), block, blockMeta) // XXX
// commit the block (app should save the state)
res := appConnConsensus.CommitSync()
if res.IsErr() {
return fmt.Errorf("Error on Commit: %v", res)
}
if res.Log != "" {
log.Debug("Commit.Log: " + res.Log)
}
// update the state hash
stateCopy.AppHash = res.Data
stateCopy.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{})
}
// The computed state and the previously set state should be identical
@ -235,3 +266,32 @@ func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHead
}
return nil
}
func panicOnNilBlock(height, bsHeight int, block *types.Block, blockMeta *types.BlockMeta) {
if block == nil || blockMeta == nil {
// Sanity?
PanicCrisis(Fmt(`
block/blockMeta is nil for height <= blockStore.Height() (%d <= %d).
Block: %v,
BlockMeta: %v
`, height, bsHeight, block, blockMeta))
}
}
//------------------------------------------------
// Updates to the mempool need to be synchronized with committing a block
// so apps can reset their transient state on Commit
type Mempool interface {
Lock()
Unlock()
Update(height int, txs []types.Tx)
}
type mockMempool struct {
}
func (m mockMempool) Lock() {}
func (m mockMempool) Unlock() {}
func (m mockMempool) Update(height int, txs []types.Tx) {}

+ 12
- 0
state/state.go View File

@ -7,6 +7,7 @@ import (
"time"
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types"
@ -95,6 +96,17 @@ func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) {
return s.LastValidators, s.Validators
}
// Load the most recent state from "state" db,
// or create a new one (and save) from genesis.
func GetState(config cfg.Config, stateDB dbm.DB) *State {
state := LoadState(stateDB)
if state == nil {
state = MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
state.Save()
}
return state
}
//-----------------------------------------------------------------------------
// Genesis


Loading…
Cancel
Save