Browse Source

state: BlockExecutor

pull/1015/head
Ethan Buchman 7 years ago
parent
commit
9e6d088757
3 changed files with 104 additions and 74 deletions
  1. +3
    -3
      state/db.go
  2. +90
    -64
      state/execution.go
  3. +11
    -7
      state/state.go

+ 3
- 3
state/db.go View File

@ -18,11 +18,11 @@ func GetState(stateDB dbm.DB, genesisFile string) (*State, error) {
state := LoadState(stateDB)
if state == nil {
var err error
state, err = MakeGenesisStateFromFile(stateDB, genesisFile)
state, err = MakeGenesisStateFromFile(genesisFile)
if err != nil {
return nil, err
}
state.Save()
state.Save(stateDB, state.AppHash)
}
return state, nil
@ -39,7 +39,7 @@ func loadState(db dbm.DB, key []byte) *State {
return nil
}
s := &State{db: db}
s := new(State)
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(&s, r, 0, n, err)
if *err != nil {


+ 90
- 64
state/execution.go View File

@ -10,23 +10,22 @@ import (
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
)
//--------------------------------------------------
// Execute the block
// ValExecBlock executes the block, but does NOT mutate State.
// ValExecBlock executes the block and returns the responses. It does NOT mutate State.
// + validates the block
// + executes block.Txs on the proxyAppConn
func (s *State) ValExecBlock(txEventPublisher types.TxEventPublisher, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) {
// Validate the block.
func (blockExec *BlockExecutor) ValExecBlock(s State, block *types.Block) (*ABCIResponses, error) {
if err := s.validateBlock(block); err != nil {
return nil, ErrInvalidBlock(err)
}
// Execute the block txs
abciResponses, err := execBlockOnProxyApp(txEventPublisher, proxyAppConn, block, s.logger, s.LastValidators)
abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block)
if err != nil {
// There was some error in proxyApp
// TODO Report error and wait for proxyApp to be available.
@ -38,8 +37,7 @@ func (s *State) ValExecBlock(txEventPublisher types.TxEventPublisher, proxyAppCo
// Executes block's transactions on proxyAppConn.
// Returns a list of transaction results and updates to the validator set
// TODO: Generate a bitmap or otherwise store tx validity in state.
func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn proxy.AppConnConsensus, block *types.Block, logger log.Logger, lastValidators *types.ValidatorSet) (*ABCIResponses, error) {
func execBlockOnProxyApp(logger log.Logger, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) {
var validTxs, invalidTxs = 0, 0
txIndex := 0
@ -59,17 +57,6 @@ func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn p
logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
invalidTxs++
}
// NOTE: if we count we can access the tx from the block instead of
// pulling it from the req
tx := types.Tx(req.GetDeliverTx().Tx)
txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{
Height: block.Height,
Index: uint32(txIndex),
Tx: tx,
Result: *txRes,
}})
abciResponses.DeliverTx[txIndex] = txRes
txIndex++
}
@ -296,41 +283,52 @@ func (s State) validateBlock(b *types.Block) error {
}
for _, ev := range b.Evidence.Evidence {
if _, err := VerifyEvidence(s, ev); err != nil {
if err := VerifyEvidence(s, ev); err != nil {
return types.NewEvidenceInvalidErr(ev, err)
}
/* // Needs a db ...
valset, err := LoadValidators(s.db, ev.Height())
if err != nil {
// XXX/TODO: what do we do if we can't load the valset?
// eg. if we have pruned the state or height is too high?
return err
}
if err := VerifyEvidenceValidator(valSet, ev); err != nil {
return types.NewEvidenceInvalidErr(ev, err)
}
*/
}
return nil
}
// XXX: What's cheaper (ie. what should be checked first):
// evidence internal validity (ie. sig checks) or validator existed (fetch historical val set from db)
// VerifyEvidence verifies the evidence fully by checking it is internally
// consistent and corresponds to an existing or previous validator.
// It returns the priority of this evidence, or an error.
// NOTE: return error may be ErrNoValSetForHeight, in which case the validator set
// for the evidence height could not be loaded.
func (s State) VerifyEvidence(evidence types.Evidence) (priority int64, err error) {
// consistent and sufficiently recent.
func VerifyEvidence(s State, evidence types.Evidence) error {
height := s.LastBlockHeight
evidenceAge := height - evidence.Height()
maxAge := s.ConsensusParams.EvidenceParams.MaxAge
if evidenceAge > maxAge {
return priority, fmt.Errorf("Evidence from height %d is too old. Min height is %d",
return fmt.Errorf("Evidence from height %d is too old. Min height is %d",
evidence.Height(), height-maxAge)
}
if err := evidence.Verify(s.ChainID); err != nil {
return priority, err
return err
}
return nil
}
// VerifyEvidenceValidator returns the voting power of the validator at the height of the evidence.
// It returns an error if the validator did not exist or does not match that loaded from the historical validator set.
func VerifyEvidenceValidator(valset *types.ValidatorSet, evidence types.Evidence) (priority int64, err error) {
// The address must have been an active validator at the height
ev := evidence
height, addr, idx := ev.Height(), ev.Address(), ev.Index()
valset, err := LoadValidators(s.db, height)
if err != nil {
// XXX/TODO: what do we do if we can't load the valset?
// eg. if we have pruned the state or height is too high?
return priority, err
}
valIdx, val := valset.GetByAddress(addr)
if val == nil {
return priority, fmt.Errorf("Address %X was not a validator at height %d", addr, height)
@ -348,6 +346,9 @@ func (s State) VerifyEvidence(evidence types.Evidence) (priority int64, err erro
// BlockExecutor provides the context and accessories for properly executing a block.
type BlockExecutor struct {
db dbm.DB
logger log.Logger
txEventPublisher types.TxEventPublisher
proxyApp proxy.AppConnConsensus
@ -355,81 +356,106 @@ type BlockExecutor struct {
evpool types.EvidencePool
}
func NewBlockExecutor(db dbm.DB, logger log.Logger, txEventer types.TxEventPublisher, proxyApp proxy.AppConnConsensus,
mempool types.Mempool, evpool types.EvidencePool) *BlockExecutor {
return &BlockExecutor{
db,
logger,
txEventer,
proxyApp,
mempool,
evpool,
}
}
// ApplyBlock validates the block against the state, executes it against the app,
// commits it, and saves the block and state. It's the only function that needs to be called
// from outside this package to process and commit an entire block.
func (s *State) ApplyBlock(txEventPublisher types.TxEventPublisher, proxyAppConn proxy.AppConnConsensus,
block *types.Block, partsHeader types.PartSetHeader,
mempool types.Mempool, evpool types.EvidencePool) error {
// It takes a blockID to avoid recomputing the parts hash.
func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block *types.Block) (State, error) {
abciResponses, err := s.ValExecBlock(txEventPublisher, proxyAppConn, block)
abciResponses, err := blockExec.ValExecBlock(s, block)
if err != nil {
return fmt.Errorf("Exec failed for application: %v", err)
}
return s, fmt.Errorf("Exec failed for application: %v", err)
}
// TODO: Fire events
/*
tx := types.Tx(req.GetDeliverTx().Tx)
txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{
Height: block.Height,
Index: uint32(txIndex),
Tx: tx,
Result: *txRes,
}})
*/
fail.Fail() // XXX
// save the results before we commit
SaveABCIResponses(s.db, block.Height, abciResponses)
SaveABCIResponses(blockExec.db, block.Height, abciResponses)
fail.Fail() // XXX
// now update the block and validators
err = s.SetBlockAndValidators(block.Header, partsHeader, abciResponses)
// update the state with the block and responses
s, err = s.NextState(blockID, block.Header, abciResponses)
if err != nil {
return fmt.Errorf("Commit failed for application: %v", err)
return s, fmt.Errorf("Commit failed for application: %v", err)
}
// lock mempool, commit state, update mempoool
err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool)
appHash, err := blockExec.Commit(block)
if err != nil {
return fmt.Errorf("Commit failed for application: %v", err)
return s, fmt.Errorf("Commit failed for application: %v", err)
}
fail.Fail() // XXX
evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence)
// save the state and the validators
s.Save()
s.Save(blockExec.db, appHash)
return nil
return s, nil
}
// CommitStateUpdateMempool locks the mempool, runs the ABCI Commit message, and updates the mempool.
// Commit locks the mempool, runs the ABCI Commit message, and updates the mempool.
// It returns the result of calling abci.Commit (the AppHash), and an error.
// The Mempool must be locked during commit and update because state is typically reset on Commit and old txs must be replayed
// against committed state before new txs are run in the mempool, lest they be invalid.
func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, block *types.Block, mempool types.Mempool) error {
mempool.Lock()
defer mempool.Unlock()
func (blockExec *BlockExecutor) Commit(block *types.Block) ([]byte, error) {
blockExec.mempool.Lock()
defer blockExec.mempool.Unlock()
// Commit block, get hash back
res, err := proxyAppConn.CommitSync()
res, err := blockExec.proxyApp.CommitSync()
if err != nil {
s.logger.Error("Client error during proxyAppConn.CommitSync", "err", err)
return err
blockExec.logger.Error("Client error during proxyAppConn.CommitSync", "err", err)
return nil, err
}
if res.IsErr() {
s.logger.Error("Error in proxyAppConn.CommitSync", "err", res)
return res
blockExec.logger.Error("Error in proxyAppConn.CommitSync", "err", res)
return nil, res
}
if res.Log != "" {
s.logger.Debug("Commit.Log: " + res.Log)
blockExec.logger.Debug("Commit.Log: " + res.Log)
}
s.logger.Info("Committed state", "height", block.Height, "txs", block.NumTxs, "hash", res.Data)
blockExec.logger.Info("Committed state", "height", block.Height, "txs", block.NumTxs, "hash", res.Data)
// Set the state's new AppHash
s.AppHash = res.Data
// Update evpool
blockExec.evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence)
// Update mempool.
return mempool.Update(block.Height, block.Txs)
if err := blockExec.mempool.Update(block.Height, block.Txs); err != nil {
return nil, err
}
return res.Data, nil
}
// ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
// It returns the application root hash (result of abci.Commit).
func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger, lastValidators *types.ValidatorSet) ([]byte, error) {
_, err := execBlockOnProxyApp(types.NopEventBus{}, appConnConsensus, block, logger, lastValidators)
func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger) ([]byte, error) {
_, err := execBlockOnProxyApp(logger, appConnConsensus, block)
if err != nil {
logger.Error("Error executing block on proxy app", "height", block.Height, "err", err)
return nil, err


+ 11
- 7
state/state.go View File

@ -73,7 +73,7 @@ type State struct {
// Copy makes a copy of the State for mutating.
func (s State) Copy() State {
return &State{
return State{
ChainID: s.ChainID,
LastBlockHeight: s.LastBlockHeight,
@ -95,7 +95,9 @@ func (s State) Copy() State {
}
// Save persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database.
func (s State) Save(db dbm.DB) {
// It sets the given appHash on the state before persisting.
func (s State) Save(db dbm.DB, appHash []byte) {
s.AppHash = appHash
nextHeight := s.LastBlockHeight + 1
saveValidatorsInfo(db, nextHeight, s.LastHeightValidatorsChanged, s.Validators)
saveConsensusParamsInfo(db, nextHeight, s.LastHeightConsensusParamsChanged, s.ConsensusParams)
@ -113,7 +115,7 @@ func (s State) Bytes() []byte {
}
// NextState returns a new State updated according to the header and responses.
func (s State) NextState(header *types.Header, blockPartsHeader types.PartSetHeader,
func (s State) NextState(blockID types.BlockID, header *types.Header,
abciResponses *ABCIResponses) (State, error) {
// copy the valset so we can apply changes from EndBlock
@ -126,7 +128,7 @@ func (s State) NextState(header *types.Header, blockPartsHeader types.PartSetHea
if len(abciResponses.EndBlock.ValidatorUpdates) > 0 {
err := updateValidators(nextValSet, abciResponses.EndBlock.ValidatorUpdates)
if err != nil {
return fmt.Errorf("Error changing validator set: %v", err)
return s, fmt.Errorf("Error changing validator set: %v", err)
}
// change results from this height but only applies to the next height
lastHeightValsChanged = header.Height + 1
@ -143,17 +145,19 @@ func (s State) NextState(header *types.Header, blockPartsHeader types.PartSetHea
nextParams = s.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
err := nextParams.Validate()
if err != nil {
return fmt.Errorf("Error updating consensus params: %v", err)
return s, fmt.Errorf("Error updating consensus params: %v", err)
}
// change results from this height but only applies to the next height
lastHeightParamsChanged = header.Height + 1
}
// NOTE: the AppHash has not been populated.
// It will be filled on state.Save.
return State{
ChainID: s.ChainID,
LastBlockHeight: header.Height,
LastBlockTotalTx: s.LastBlockTotalTx + header.NumTxs,
LastBlockID: types.BlockID{header.Hash(), blockPartsHeader},
LastBlockID: blockID,
LastBlockTime: header.Time,
Validators: nextValSet,
LastValidators: s.Validators.Copy(),
@ -162,7 +166,7 @@ func (s State) NextState(header *types.Header, blockPartsHeader types.PartSetHea
LastHeightConsensusParamsChanged: lastHeightParamsChanged,
LastResultsHash: abciResponses.ResultsHash(),
AppHash: nil,
}
}, nil
}
// GetValidators returns the last and current validator sets.


Loading…
Cancel
Save