You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

468 lines
15 KiB

7 years ago
8 years ago
8 years ago
8 years ago
6 years ago
6 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
6 years ago
6 years ago
6 years ago
6 years ago
8 years ago
8 years ago
  1. package state
  2. import (
  3. "fmt"
  4. "time"
  5. abci "github.com/tendermint/tendermint/abci/types"
  6. dbm "github.com/tendermint/tendermint/libs/db"
  7. "github.com/tendermint/tendermint/libs/fail"
  8. "github.com/tendermint/tendermint/libs/log"
  9. "github.com/tendermint/tendermint/proxy"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. //-----------------------------------------------------------------------------
  13. // BlockExecutor handles block execution and state updates.
  14. // It exposes ApplyBlock(), which validates & executes the block, updates state w/ ABCI responses,
  15. // then commits and updates the mempool atomically, then saves state.
  16. // BlockExecutor provides the context and accessories for properly executing a block.
  17. type BlockExecutor struct {
  18. // save state, validators, consensus params, abci responses here
  19. db dbm.DB
  20. // execute the app against this
  21. proxyApp proxy.AppConnConsensus
  22. // events
  23. eventBus types.BlockEventPublisher
  24. // update these with block results after commit
  25. mempool Mempool
  26. evpool EvidencePool
  27. logger log.Logger
  28. metrics *Metrics
  29. }
  30. type BlockExecutorOption func(executor *BlockExecutor)
  31. func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
  32. return func(blockExec *BlockExecutor) {
  33. blockExec.metrics = metrics
  34. }
  35. }
  36. // NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
  37. // Call SetEventBus to provide one.
  38. func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
  39. mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
  40. res := &BlockExecutor{
  41. db: db,
  42. proxyApp: proxyApp,
  43. eventBus: types.NopEventBus{},
  44. mempool: mempool,
  45. evpool: evpool,
  46. logger: logger,
  47. metrics: NopMetrics(),
  48. }
  49. for _, option := range options {
  50. option(res)
  51. }
  52. return res
  53. }
  54. // SetEventBus - sets the event bus for publishing block related events.
  55. // If not called, it defaults to types.NopEventBus.
  56. func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
  57. blockExec.eventBus = eventBus
  58. }
  59. // ValidateBlock validates the given block against the given state.
  60. // If the block is invalid, it returns an error.
  61. // Validation does not mutate state, but does require historical information from the stateDB,
  62. // ie. to verify evidence from a validator at an old height.
  63. func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) error {
  64. return validateBlock(blockExec.db, state, block)
  65. }
  66. // ApplyBlock validates the block against the state, executes it against the app,
  67. // fires the relevant events, commits the app, and saves the new state and responses.
  68. // It's the only function that needs to be called
  69. // from outside this package to process and commit an entire block.
  70. // It takes a blockID to avoid recomputing the parts hash.
  71. func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, block *types.Block) (State, error) {
  72. if err := blockExec.ValidateBlock(state, block); err != nil {
  73. return state, ErrInvalidBlock(err)
  74. }
  75. startTime := time.Now().UnixNano()
  76. abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, state.LastValidators, blockExec.db)
  77. endTime := time.Now().UnixNano()
  78. blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
  79. if err != nil {
  80. return state, ErrProxyAppConn(err)
  81. }
  82. fail.Fail() // XXX
  83. // Save the results before we commit.
  84. saveABCIResponses(blockExec.db, block.Height, abciResponses)
  85. fail.Fail() // XXX
  86. // Update the state with the block and responses.
  87. state, err = updateState(state, blockID, &block.Header, abciResponses)
  88. if err != nil {
  89. return state, fmt.Errorf("Commit failed for application: %v", err)
  90. }
  91. // Lock mempool, commit app state, update mempoool.
  92. appHash, err := blockExec.Commit(state, block)
  93. if err != nil {
  94. return state, fmt.Errorf("Commit failed for application: %v", err)
  95. }
  96. // Update evpool with the block and state.
  97. blockExec.evpool.Update(block, state)
  98. fail.Fail() // XXX
  99. // Update the app hash and save the state.
  100. state.AppHash = appHash
  101. SaveState(blockExec.db, state)
  102. fail.Fail() // XXX
  103. // Events are fired after everything else.
  104. // NOTE: if we crash between Commit and Save, events wont be fired during replay
  105. fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses)
  106. return state, nil
  107. }
  108. // Commit locks the mempool, runs the ABCI Commit message, and updates the
  109. // mempool.
  110. // It returns the result of calling abci.Commit (the AppHash), and an error.
  111. // The Mempool must be locked during commit and update because state is
  112. // typically reset on Commit and old txs must be replayed against committed
  113. // state before new txs are run in the mempool, lest they be invalid.
  114. func (blockExec *BlockExecutor) Commit(
  115. state State,
  116. block *types.Block,
  117. ) ([]byte, error) {
  118. blockExec.mempool.Lock()
  119. defer blockExec.mempool.Unlock()
  120. // while mempool is Locked, flush to ensure all async requests have completed
  121. // in the ABCI app before Commit.
  122. err := blockExec.mempool.FlushAppConn()
  123. if err != nil {
  124. blockExec.logger.Error("Client error during mempool.FlushAppConn", "err", err)
  125. return nil, err
  126. }
  127. // Commit block, get hash back
  128. res, err := blockExec.proxyApp.CommitSync()
  129. if err != nil {
  130. blockExec.logger.Error(
  131. "Client error during proxyAppConn.CommitSync",
  132. "err", err,
  133. )
  134. return nil, err
  135. }
  136. // ResponseCommit has no error code - just data
  137. blockExec.logger.Info(
  138. "Committed state",
  139. "height", block.Height,
  140. "txs", block.NumTxs,
  141. "appHash", fmt.Sprintf("%X", res.Data),
  142. )
  143. // Update mempool.
  144. err = blockExec.mempool.Update(
  145. block.Height,
  146. block.Txs,
  147. TxPreCheck(state),
  148. TxPostCheck(state),
  149. )
  150. return res.Data, err
  151. }
  152. //---------------------------------------------------------
  153. // Helper functions for executing blocks and updating state
  154. // Executes block's transactions on proxyAppConn.
  155. // Returns a list of transaction results and updates to the validator set
  156. func execBlockOnProxyApp(
  157. logger log.Logger,
  158. proxyAppConn proxy.AppConnConsensus,
  159. block *types.Block,
  160. lastValSet *types.ValidatorSet,
  161. stateDB dbm.DB,
  162. ) (*ABCIResponses, error) {
  163. var validTxs, invalidTxs = 0, 0
  164. txIndex := 0
  165. abciResponses := NewABCIResponses(block)
  166. // Execute transactions and get hash.
  167. proxyCb := func(req *abci.Request, res *abci.Response) {
  168. switch r := res.Value.(type) {
  169. case *abci.Response_DeliverTx:
  170. // TODO: make use of res.Log
  171. // TODO: make use of this info
  172. // Blocks may include invalid txs.
  173. txRes := r.DeliverTx
  174. if txRes.Code == abci.CodeTypeOK {
  175. validTxs++
  176. } else {
  177. logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
  178. invalidTxs++
  179. }
  180. abciResponses.DeliverTx[txIndex] = txRes
  181. txIndex++
  182. }
  183. }
  184. proxyAppConn.SetResponseCallback(proxyCb)
  185. commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB)
  186. // Begin block.
  187. _, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
  188. Hash: block.Hash(),
  189. Header: types.TM2PB.Header(&block.Header),
  190. LastCommitInfo: commitInfo,
  191. ByzantineValidators: byzVals,
  192. })
  193. if err != nil {
  194. logger.Error("Error in proxyAppConn.BeginBlock", "err", err)
  195. return nil, err
  196. }
  197. // Run txs of block.
  198. for _, tx := range block.Txs {
  199. proxyAppConn.DeliverTxAsync(tx)
  200. if err := proxyAppConn.Error(); err != nil {
  201. return nil, err
  202. }
  203. }
  204. // End block.
  205. abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{Height: block.Height})
  206. if err != nil {
  207. logger.Error("Error in proxyAppConn.EndBlock", "err", err)
  208. return nil, err
  209. }
  210. logger.Info("Executed block", "height", block.Height, "validTxs", validTxs, "invalidTxs", invalidTxs)
  211. valUpdates := abciResponses.EndBlock.ValidatorUpdates
  212. if len(valUpdates) > 0 {
  213. // TODO: cleanup the formatting
  214. logger.Info("Updates to validators", "updates", valUpdates)
  215. }
  216. return abciResponses, nil
  217. }
  218. func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorSet, stateDB dbm.DB) (abci.LastCommitInfo, []abci.Evidence) {
  219. // Sanity check that commit length matches validator set size -
  220. // only applies after first block
  221. if block.Height > 1 {
  222. precommitLen := len(block.LastCommit.Precommits)
  223. valSetLen := len(lastValSet.Validators)
  224. if precommitLen != valSetLen {
  225. // sanity check
  226. panic(fmt.Sprintf("precommit length (%d) doesn't match valset length (%d) at height %d\n\n%v\n\n%v",
  227. precommitLen, valSetLen, block.Height, block.LastCommit.Precommits, lastValSet.Validators))
  228. }
  229. }
  230. // Collect the vote info (list of validators and whether or not they signed).
  231. voteInfos := make([]abci.VoteInfo, len(lastValSet.Validators))
  232. for i, val := range lastValSet.Validators {
  233. var vote *types.Vote
  234. if i < len(block.LastCommit.Precommits) {
  235. vote = block.LastCommit.Precommits[i]
  236. }
  237. voteInfo := abci.VoteInfo{
  238. Validator: types.TM2PB.Validator(val),
  239. SignedLastBlock: vote != nil,
  240. }
  241. voteInfos[i] = voteInfo
  242. }
  243. commitInfo := abci.LastCommitInfo{
  244. Round: int32(block.LastCommit.Round()),
  245. Votes: voteInfos,
  246. }
  247. byzVals := make([]abci.Evidence, len(block.Evidence.Evidence))
  248. for i, ev := range block.Evidence.Evidence {
  249. // We need the validator set. We already did this in validateBlock.
  250. // TODO: Should we instead cache the valset in the evidence itself and add
  251. // `SetValidatorSet()` and `ToABCI` methods ?
  252. valset, err := LoadValidators(stateDB, ev.Height())
  253. if err != nil {
  254. panic(err) // shouldn't happen
  255. }
  256. byzVals[i] = types.TM2PB.Evidence(ev, valset, block.Time)
  257. }
  258. return commitInfo, byzVals
  259. }
  260. // If more or equal than 1/3 of total voting power changed in one block, then
  261. // a light client could never prove the transition externally. See
  262. // ./lite/doc.go for details on how a light client tracks validators.
  263. func updateValidators(currentSet *types.ValidatorSet, abciUpdates []abci.ValidatorUpdate) error {
  264. updates, err := types.PB2TM.ValidatorUpdates(abciUpdates)
  265. if err != nil {
  266. return err
  267. }
  268. // these are tendermint types now
  269. for _, valUpdate := range updates {
  270. if valUpdate.VotingPower < 0 {
  271. return fmt.Errorf("Voting power can't be negative %v", valUpdate)
  272. }
  273. address := valUpdate.Address
  274. _, val := currentSet.GetByAddress(address)
  275. if valUpdate.VotingPower == 0 {
  276. // remove val
  277. _, removed := currentSet.Remove(address)
  278. if !removed {
  279. return fmt.Errorf("Failed to remove validator %X", address)
  280. }
  281. } else if val == nil {
  282. // add val
  283. added := currentSet.Add(valUpdate)
  284. if !added {
  285. return fmt.Errorf("Failed to add new validator %v", valUpdate)
  286. }
  287. } else {
  288. // update val
  289. updated := currentSet.Update(valUpdate)
  290. if !updated {
  291. return fmt.Errorf("Failed to update validator %X to %v", address, valUpdate)
  292. }
  293. }
  294. }
  295. return nil
  296. }
  297. // updateState returns a new State updated according to the header and responses.
  298. func updateState(
  299. state State,
  300. blockID types.BlockID,
  301. header *types.Header,
  302. abciResponses *ABCIResponses,
  303. ) (State, error) {
  304. // Copy the valset so we can apply changes from EndBlock
  305. // and update s.LastValidators and s.Validators.
  306. nValSet := state.NextValidators.Copy()
  307. // Update the validator set with the latest abciResponses.
  308. lastHeightValsChanged := state.LastHeightValidatorsChanged
  309. if len(abciResponses.EndBlock.ValidatorUpdates) > 0 {
  310. err := updateValidators(nValSet, abciResponses.EndBlock.ValidatorUpdates)
  311. if err != nil {
  312. return state, fmt.Errorf("Error changing validator set: %v", err)
  313. }
  314. // Change results from this height but only applies to the next next height.
  315. lastHeightValsChanged = header.Height + 1 + 1
  316. }
  317. // Update validator accums and set state variables.
  318. nValSet.IncrementAccum(1)
  319. // Update the params with the latest abciResponses.
  320. nextParams := state.ConsensusParams
  321. lastHeightParamsChanged := state.LastHeightConsensusParamsChanged
  322. if abciResponses.EndBlock.ConsensusParamUpdates != nil {
  323. // NOTE: must not mutate s.ConsensusParams
  324. nextParams = state.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
  325. err := nextParams.Validate()
  326. if err != nil {
  327. return state, fmt.Errorf("Error updating consensus params: %v", err)
  328. }
  329. // Change results from this height but only applies to the next height.
  330. lastHeightParamsChanged = header.Height + 1
  331. }
  332. // TODO: allow app to upgrade version
  333. nextVersion := state.Version
  334. // NOTE: the AppHash has not been populated.
  335. // It will be filled on state.Save.
  336. return State{
  337. Version: nextVersion,
  338. ChainID: state.ChainID,
  339. LastBlockHeight: header.Height,
  340. LastBlockTotalTx: state.LastBlockTotalTx + header.NumTxs,
  341. LastBlockID: blockID,
  342. LastBlockTime: header.Time,
  343. NextValidators: nValSet,
  344. Validators: state.NextValidators.Copy(),
  345. LastValidators: state.Validators.Copy(),
  346. LastHeightValidatorsChanged: lastHeightValsChanged,
  347. ConsensusParams: nextParams,
  348. LastHeightConsensusParamsChanged: lastHeightParamsChanged,
  349. LastResultsHash: abciResponses.ResultsHash(),
  350. AppHash: nil,
  351. }, nil
  352. }
  353. // Fire NewBlock, NewBlockHeader.
  354. // Fire TxEvent for every tx.
  355. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
  356. func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
  357. eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
  358. eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
  359. for i, tx := range block.Data.Txs {
  360. eventBus.PublishEventTx(types.EventDataTx{types.TxResult{
  361. Height: block.Height,
  362. Index: uint32(i),
  363. Tx: tx,
  364. Result: *(abciResponses.DeliverTx[i]),
  365. }})
  366. }
  367. abciValUpdates := abciResponses.EndBlock.ValidatorUpdates
  368. if len(abciValUpdates) > 0 {
  369. // if there were an error, we would've stopped in updateValidators
  370. updates, _ := types.PB2TM.ValidatorUpdates(abciValUpdates)
  371. eventBus.PublishEventValidatorSetUpdates(
  372. types.EventDataValidatorSetUpdates{ValidatorUpdates: updates})
  373. }
  374. }
  375. //----------------------------------------------------------------------------------------------------
  376. // Execute block without state. TODO: eliminate
  377. // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
  378. // It returns the application root hash (result of abci.Commit).
  379. func ExecCommitBlock(
  380. appConnConsensus proxy.AppConnConsensus,
  381. block *types.Block,
  382. logger log.Logger,
  383. lastValSet *types.ValidatorSet,
  384. stateDB dbm.DB,
  385. ) ([]byte, error) {
  386. _, err := execBlockOnProxyApp(logger, appConnConsensus, block, lastValSet, stateDB)
  387. if err != nil {
  388. logger.Error("Error executing block on proxy app", "height", block.Height, "err", err)
  389. return nil, err
  390. }
  391. // Commit block, get hash back
  392. res, err := appConnConsensus.CommitSync()
  393. if err != nil {
  394. logger.Error("Client error during proxyAppConn.CommitSync", "err", res)
  395. return nil, err
  396. }
  397. // ResponseCommit has no error or log, just data
  398. return res.Data, nil
  399. }