You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

488 lines
16 KiB

7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
8 years ago
8 years ago
  1. package state
  2. import (
  3. "fmt"
  4. "time"
  5. abci "github.com/tendermint/tendermint/abci/types"
  6. dbm "github.com/tendermint/tendermint/libs/db"
  7. "github.com/tendermint/tendermint/libs/fail"
  8. "github.com/tendermint/tendermint/libs/log"
  9. "github.com/tendermint/tendermint/proxy"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. //-----------------------------------------------------------------------------
  13. // BlockExecutor handles block execution and state updates.
  14. // It exposes ApplyBlock(), which validates & executes the block, updates state w/ ABCI responses,
  15. // then commits and updates the mempool atomically, then saves state.
  16. // BlockExecutor provides the context and accessories for properly executing a block.
  17. type BlockExecutor struct {
  18. // save state, validators, consensus params, abci responses here
  19. db dbm.DB
  20. // execute the app against this
  21. proxyApp proxy.AppConnConsensus
  22. // events
  23. eventBus types.BlockEventPublisher
  24. // manage the mempool lock during commit
  25. // and update both with block results after commit.
  26. mempool Mempool
  27. evpool EvidencePool
  28. logger log.Logger
  29. metrics *Metrics
  30. }
  31. type BlockExecutorOption func(executor *BlockExecutor)
  32. func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
  33. return func(blockExec *BlockExecutor) {
  34. blockExec.metrics = metrics
  35. }
  36. }
  37. // NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
  38. // Call SetEventBus to provide one.
  39. func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
  40. res := &BlockExecutor{
  41. db: db,
  42. proxyApp: proxyApp,
  43. eventBus: types.NopEventBus{},
  44. mempool: mempool,
  45. evpool: evpool,
  46. logger: logger,
  47. metrics: NopMetrics(),
  48. }
  49. for _, option := range options {
  50. option(res)
  51. }
  52. return res
  53. }
  54. // SetEventBus - sets the event bus for publishing block related events.
  55. // If not called, it defaults to types.NopEventBus.
  56. func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
  57. blockExec.eventBus = eventBus
  58. }
  59. // CreateProposalBlock calls state.MakeBlock with evidence from the evpool
  60. // and txs from the mempool. The max bytes must be big enough to fit the commit.
  61. // Up to 1/10th of the block space is allcoated for maximum sized evidence.
  62. // The rest is given to txs, up to the max gas.
  63. func (blockExec *BlockExecutor) CreateProposalBlock(
  64. height int64,
  65. state State, commit *types.Commit,
  66. proposerAddr []byte,
  67. ) (*types.Block, *types.PartSet) {
  68. maxBytes := state.ConsensusParams.BlockSize.MaxBytes
  69. maxGas := state.ConsensusParams.BlockSize.MaxGas
  70. // Fetch a limited amount of valid evidence
  71. maxNumEvidence, _ := types.MaxEvidencePerBlock(maxBytes)
  72. evidence := blockExec.evpool.PendingEvidence(maxNumEvidence)
  73. // Fetch a limited amount of valid txs
  74. maxDataBytes := types.MaxDataBytes(maxBytes, state.Validators.Size(), len(evidence))
  75. txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas)
  76. return state.MakeBlock(height, txs, commit, evidence, proposerAddr)
  77. }
  78. // ValidateBlock validates the given block against the given state.
  79. // If the block is invalid, it returns an error.
  80. // Validation does not mutate state, but does require historical information from the stateDB,
  81. // ie. to verify evidence from a validator at an old height.
  82. func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) error {
  83. return validateBlock(blockExec.db, state, block)
  84. }
  85. // ApplyBlock validates the block against the state, executes it against the app,
  86. // fires the relevant events, commits the app, and saves the new state and responses.
  87. // It's the only function that needs to be called
  88. // from outside this package to process and commit an entire block.
  89. // It takes a blockID to avoid recomputing the parts hash.
  90. func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, block *types.Block) (State, error) {
  91. if err := blockExec.ValidateBlock(state, block); err != nil {
  92. return state, ErrInvalidBlock(err)
  93. }
  94. startTime := time.Now().UnixNano()
  95. abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, state.LastValidators, blockExec.db)
  96. endTime := time.Now().UnixNano()
  97. blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
  98. if err != nil {
  99. return state, ErrProxyAppConn(err)
  100. }
  101. fail.Fail() // XXX
  102. // Save the results before we commit.
  103. saveABCIResponses(blockExec.db, block.Height, abciResponses)
  104. fail.Fail() // XXX
  105. // validate the validator updates and convert to tendermint types
  106. abciValUpdates := abciResponses.EndBlock.ValidatorUpdates
  107. err = validateValidatorUpdates(abciValUpdates, state.ConsensusParams.Validator)
  108. if err != nil {
  109. return state, fmt.Errorf("Error in validator updates: %v", err)
  110. }
  111. validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates)
  112. if err != nil {
  113. return state, err
  114. }
  115. if len(validatorUpdates) > 0 {
  116. blockExec.logger.Info("Updates to validators", "updates", types.ValidatorListString(validatorUpdates))
  117. }
  118. // Update the state with the block and responses.
  119. state, err = updateState(state, blockID, &block.Header, abciResponses, validatorUpdates)
  120. if err != nil {
  121. return state, fmt.Errorf("Commit failed for application: %v", err)
  122. }
  123. // Lock mempool, commit app state, update mempoool.
  124. appHash, err := blockExec.Commit(state, block)
  125. if err != nil {
  126. return state, fmt.Errorf("Commit failed for application: %v", err)
  127. }
  128. // Update evpool with the block and state.
  129. blockExec.evpool.Update(block, state)
  130. fail.Fail() // XXX
  131. // Update the app hash and save the state.
  132. state.AppHash = appHash
  133. SaveState(blockExec.db, state)
  134. fail.Fail() // XXX
  135. // Events are fired after everything else.
  136. // NOTE: if we crash between Commit and Save, events wont be fired during replay
  137. fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses, validatorUpdates)
  138. return state, nil
  139. }
  140. // Commit locks the mempool, runs the ABCI Commit message, and updates the
  141. // mempool.
  142. // It returns the result of calling abci.Commit (the AppHash), and an error.
  143. // The Mempool must be locked during commit and update because state is
  144. // typically reset on Commit and old txs must be replayed against committed
  145. // state before new txs are run in the mempool, lest they be invalid.
  146. func (blockExec *BlockExecutor) Commit(
  147. state State,
  148. block *types.Block,
  149. ) ([]byte, error) {
  150. blockExec.mempool.Lock()
  151. defer blockExec.mempool.Unlock()
  152. // while mempool is Locked, flush to ensure all async requests have completed
  153. // in the ABCI app before Commit.
  154. err := blockExec.mempool.FlushAppConn()
  155. if err != nil {
  156. blockExec.logger.Error("Client error during mempool.FlushAppConn", "err", err)
  157. return nil, err
  158. }
  159. // Commit block, get hash back
  160. res, err := blockExec.proxyApp.CommitSync()
  161. if err != nil {
  162. blockExec.logger.Error(
  163. "Client error during proxyAppConn.CommitSync",
  164. "err", err,
  165. )
  166. return nil, err
  167. }
  168. // ResponseCommit has no error code - just data
  169. blockExec.logger.Info(
  170. "Committed state",
  171. "height", block.Height,
  172. "txs", block.NumTxs,
  173. "appHash", fmt.Sprintf("%X", res.Data),
  174. )
  175. // Update mempool.
  176. err = blockExec.mempool.Update(
  177. block.Height,
  178. block.Txs,
  179. TxPreCheck(state),
  180. TxPostCheck(state),
  181. )
  182. return res.Data, err
  183. }
  184. //---------------------------------------------------------
  185. // Helper functions for executing blocks and updating state
  186. // Executes block's transactions on proxyAppConn.
  187. // Returns a list of transaction results and updates to the validator set
  188. func execBlockOnProxyApp(
  189. logger log.Logger,
  190. proxyAppConn proxy.AppConnConsensus,
  191. block *types.Block,
  192. lastValSet *types.ValidatorSet,
  193. stateDB dbm.DB,
  194. ) (*ABCIResponses, error) {
  195. var validTxs, invalidTxs = 0, 0
  196. txIndex := 0
  197. abciResponses := NewABCIResponses(block)
  198. // Execute transactions and get hash.
  199. proxyCb := func(req *abci.Request, res *abci.Response) {
  200. switch r := res.Value.(type) {
  201. case *abci.Response_DeliverTx:
  202. // TODO: make use of res.Log
  203. // TODO: make use of this info
  204. // Blocks may include invalid txs.
  205. txRes := r.DeliverTx
  206. if txRes.Code == abci.CodeTypeOK {
  207. validTxs++
  208. } else {
  209. logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
  210. invalidTxs++
  211. }
  212. abciResponses.DeliverTx[txIndex] = txRes
  213. txIndex++
  214. }
  215. }
  216. proxyAppConn.SetResponseCallback(proxyCb)
  217. commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB)
  218. // Begin block
  219. var err error
  220. abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
  221. Hash: block.Hash(),
  222. Header: types.TM2PB.Header(&block.Header),
  223. LastCommitInfo: commitInfo,
  224. ByzantineValidators: byzVals,
  225. })
  226. if err != nil {
  227. logger.Error("Error in proxyAppConn.BeginBlock", "err", err)
  228. return nil, err
  229. }
  230. // Run txs of block.
  231. for _, tx := range block.Txs {
  232. proxyAppConn.DeliverTxAsync(tx)
  233. if err := proxyAppConn.Error(); err != nil {
  234. return nil, err
  235. }
  236. }
  237. // End block.
  238. abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{Height: block.Height})
  239. if err != nil {
  240. logger.Error("Error in proxyAppConn.EndBlock", "err", err)
  241. return nil, err
  242. }
  243. logger.Info("Executed block", "height", block.Height, "validTxs", validTxs, "invalidTxs", invalidTxs)
  244. return abciResponses, nil
  245. }
  246. func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorSet, stateDB dbm.DB) (abci.LastCommitInfo, []abci.Evidence) {
  247. // Sanity check that commit length matches validator set size -
  248. // only applies after first block
  249. if block.Height > 1 {
  250. precommitLen := len(block.LastCommit.Precommits)
  251. valSetLen := len(lastValSet.Validators)
  252. if precommitLen != valSetLen {
  253. // sanity check
  254. panic(fmt.Sprintf("precommit length (%d) doesn't match valset length (%d) at height %d\n\n%v\n\n%v",
  255. precommitLen, valSetLen, block.Height, block.LastCommit.Precommits, lastValSet.Validators))
  256. }
  257. }
  258. // Collect the vote info (list of validators and whether or not they signed).
  259. voteInfos := make([]abci.VoteInfo, len(lastValSet.Validators))
  260. for i, val := range lastValSet.Validators {
  261. var vote *types.CommitSig
  262. if i < len(block.LastCommit.Precommits) {
  263. vote = block.LastCommit.Precommits[i]
  264. }
  265. voteInfo := abci.VoteInfo{
  266. Validator: types.TM2PB.Validator(val),
  267. SignedLastBlock: vote != nil,
  268. }
  269. voteInfos[i] = voteInfo
  270. }
  271. commitInfo := abci.LastCommitInfo{
  272. Round: int32(block.LastCommit.Round()),
  273. Votes: voteInfos,
  274. }
  275. byzVals := make([]abci.Evidence, len(block.Evidence.Evidence))
  276. for i, ev := range block.Evidence.Evidence {
  277. // We need the validator set. We already did this in validateBlock.
  278. // TODO: Should we instead cache the valset in the evidence itself and add
  279. // `SetValidatorSet()` and `ToABCI` methods ?
  280. valset, err := LoadValidators(stateDB, ev.Height())
  281. if err != nil {
  282. panic(err) // shouldn't happen
  283. }
  284. byzVals[i] = types.TM2PB.Evidence(ev, valset, block.Time)
  285. }
  286. return commitInfo, byzVals
  287. }
  288. func validateValidatorUpdates(abciUpdates []abci.ValidatorUpdate,
  289. params types.ValidatorParams) error {
  290. for _, valUpdate := range abciUpdates {
  291. if valUpdate.GetPower() < 0 {
  292. return fmt.Errorf("Voting power can't be negative %v", valUpdate)
  293. } else if valUpdate.GetPower() == 0 {
  294. // continue, since this is deleting the validator, and thus there is no
  295. // pubkey to check
  296. continue
  297. }
  298. // Check if validator's pubkey matches an ABCI type in the consensus params
  299. thisKeyType := valUpdate.PubKey.Type
  300. if !params.IsValidPubkeyType(thisKeyType) {
  301. return fmt.Errorf("Validator %v is using pubkey %s, which is unsupported for consensus",
  302. valUpdate, thisKeyType)
  303. }
  304. }
  305. return nil
  306. }
  307. // updateState returns a new State updated according to the header and responses.
  308. func updateState(
  309. state State,
  310. blockID types.BlockID,
  311. header *types.Header,
  312. abciResponses *ABCIResponses,
  313. validatorUpdates []*types.Validator,
  314. ) (State, error) {
  315. // Copy the valset so we can apply changes from EndBlock
  316. // and update s.LastValidators and s.Validators.
  317. nValSet := state.NextValidators.Copy()
  318. // Update the validator set with the latest abciResponses.
  319. lastHeightValsChanged := state.LastHeightValidatorsChanged
  320. if len(validatorUpdates) > 0 {
  321. err := nValSet.UpdateWithChangeSet(validatorUpdates)
  322. if err != nil {
  323. return state, fmt.Errorf("Error changing validator set: %v", err)
  324. }
  325. // Change results from this height but only applies to the next next height.
  326. lastHeightValsChanged = header.Height + 1 + 1
  327. }
  328. // Update validator proposer priority and set state variables.
  329. nValSet.IncrementProposerPriority(1)
  330. // Update the params with the latest abciResponses.
  331. nextParams := state.ConsensusParams
  332. lastHeightParamsChanged := state.LastHeightConsensusParamsChanged
  333. if abciResponses.EndBlock.ConsensusParamUpdates != nil {
  334. // NOTE: must not mutate s.ConsensusParams
  335. nextParams = state.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
  336. err := nextParams.Validate()
  337. if err != nil {
  338. return state, fmt.Errorf("Error updating consensus params: %v", err)
  339. }
  340. // Change results from this height but only applies to the next height.
  341. lastHeightParamsChanged = header.Height + 1
  342. }
  343. // TODO: allow app to upgrade version
  344. nextVersion := state.Version
  345. // NOTE: the AppHash has not been populated.
  346. // It will be filled on state.Save.
  347. return State{
  348. Version: nextVersion,
  349. ChainID: state.ChainID,
  350. LastBlockHeight: header.Height,
  351. LastBlockTotalTx: state.LastBlockTotalTx + header.NumTxs,
  352. LastBlockID: blockID,
  353. LastBlockTime: header.Time,
  354. NextValidators: nValSet,
  355. Validators: state.NextValidators.Copy(),
  356. LastValidators: state.Validators.Copy(),
  357. LastHeightValidatorsChanged: lastHeightValsChanged,
  358. ConsensusParams: nextParams,
  359. LastHeightConsensusParamsChanged: lastHeightParamsChanged,
  360. LastResultsHash: abciResponses.ResultsHash(),
  361. AppHash: nil,
  362. }, nil
  363. }
  364. // Fire NewBlock, NewBlockHeader.
  365. // Fire TxEvent for every tx.
  366. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
  367. func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses, validatorUpdates []*types.Validator) {
  368. eventBus.PublishEventNewBlock(types.EventDataNewBlock{
  369. Block: block,
  370. ResultBeginBlock: *abciResponses.BeginBlock,
  371. ResultEndBlock: *abciResponses.EndBlock,
  372. })
  373. eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
  374. Header: block.Header,
  375. ResultBeginBlock: *abciResponses.BeginBlock,
  376. ResultEndBlock: *abciResponses.EndBlock,
  377. })
  378. for i, tx := range block.Data.Txs {
  379. eventBus.PublishEventTx(types.EventDataTx{types.TxResult{
  380. Height: block.Height,
  381. Index: uint32(i),
  382. Tx: tx,
  383. Result: *(abciResponses.DeliverTx[i]),
  384. }})
  385. }
  386. if len(validatorUpdates) > 0 {
  387. eventBus.PublishEventValidatorSetUpdates(
  388. types.EventDataValidatorSetUpdates{ValidatorUpdates: validatorUpdates})
  389. }
  390. }
  391. //----------------------------------------------------------------------------------------------------
  392. // Execute block without state. TODO: eliminate
  393. // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
  394. // It returns the application root hash (result of abci.Commit).
  395. func ExecCommitBlock(
  396. appConnConsensus proxy.AppConnConsensus,
  397. block *types.Block,
  398. logger log.Logger,
  399. lastValSet *types.ValidatorSet,
  400. stateDB dbm.DB,
  401. ) ([]byte, error) {
  402. _, err := execBlockOnProxyApp(logger, appConnConsensus, block, lastValSet, stateDB)
  403. if err != nil {
  404. logger.Error("Error executing block on proxy app", "height", block.Height, "err", err)
  405. return nil, err
  406. }
  407. // Commit block, get hash back
  408. res, err := appConnConsensus.CommitSync()
  409. if err != nil {
  410. logger.Error("Client error during proxyAppConn.CommitSync", "err", res)
  411. return nil, err
  412. }
  413. // ResponseCommit has no error or log, just data
  414. return res.Data, nil
  415. }