You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

476 lines
15 KiB

7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
6 years ago
6 years ago
8 years ago
8 years ago
  1. package state
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. abci "github.com/tendermint/tendermint/abci/types"
  7. dbm "github.com/tendermint/tendermint/libs/db"
  8. "github.com/tendermint/tendermint/libs/fail"
  9. "github.com/tendermint/tendermint/libs/log"
  10. "github.com/tendermint/tendermint/proxy"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. //-----------------------------------------------------------------------------
  14. // BlockExecutor handles block execution and state updates.
  15. // It exposes ApplyBlock(), which validates & executes the block, updates state w/ ABCI responses,
  16. // then commits and updates the mempool atomically, then saves state.
  17. // BlockExecutor provides the context and accessories for properly executing a block.
  18. type BlockExecutor struct {
  19. // save state, validators, consensus params, abci responses here
  20. db dbm.DB
  21. // execute the app against this
  22. proxyApp proxy.AppConnConsensus
  23. // events
  24. eventBus types.BlockEventPublisher
  25. // update these with block results after commit
  26. mempool Mempool
  27. evpool EvidencePool
  28. logger log.Logger
  29. metrics *Metrics
  30. }
  31. type BlockExecutorOption func(executor *BlockExecutor)
  32. func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
  33. return func(blockExec *BlockExecutor) {
  34. blockExec.metrics = metrics
  35. }
  36. }
  37. // NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
  38. // Call SetEventBus to provide one.
  39. func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
  40. mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
  41. res := &BlockExecutor{
  42. db: db,
  43. proxyApp: proxyApp,
  44. eventBus: types.NopEventBus{},
  45. mempool: mempool,
  46. evpool: evpool,
  47. logger: logger,
  48. metrics: NopMetrics(),
  49. }
  50. for _, option := range options {
  51. option(res)
  52. }
  53. return res
  54. }
  55. // SetEventBus - sets the event bus for publishing block related events.
  56. // If not called, it defaults to types.NopEventBus.
  57. func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
  58. blockExec.eventBus = eventBus
  59. }
  60. // ValidateBlock validates the given block against the given state.
  61. // If the block is invalid, it returns an error.
  62. // Validation does not mutate state, but does require historical information from the stateDB,
  63. // ie. to verify evidence from a validator at an old height.
  64. func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) error {
  65. return validateBlock(blockExec.db, state, block)
  66. }
  67. // ApplyBlock validates the block against the state, executes it against the app,
  68. // fires the relevant events, commits the app, and saves the new state and responses.
  69. // It's the only function that needs to be called
  70. // from outside this package to process and commit an entire block.
  71. // It takes a blockID to avoid recomputing the parts hash.
  72. func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, block *types.Block) (State, error) {
  73. if err := blockExec.ValidateBlock(state, block); err != nil {
  74. return state, ErrInvalidBlock(err)
  75. }
  76. startTime := time.Now().UnixNano()
  77. abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, state.LastValidators, blockExec.db)
  78. endTime := time.Now().UnixNano()
  79. blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
  80. if err != nil {
  81. return state, ErrProxyAppConn(err)
  82. }
  83. fail.Fail() // XXX
  84. // Save the results before we commit.
  85. saveABCIResponses(blockExec.db, block.Height, abciResponses)
  86. fail.Fail() // XXX
  87. // Update the state with the block and responses.
  88. state, err = updateState(blockExec.logger, state, blockID, &block.Header, abciResponses)
  89. if err != nil {
  90. return state, fmt.Errorf("Commit failed for application: %v", err)
  91. }
  92. // Lock mempool, commit app state, update mempoool.
  93. appHash, err := blockExec.Commit(state, block)
  94. if err != nil {
  95. return state, fmt.Errorf("Commit failed for application: %v", err)
  96. }
  97. // Update evpool with the block and state.
  98. blockExec.evpool.Update(block, state)
  99. fail.Fail() // XXX
  100. // Update the app hash and save the state.
  101. state.AppHash = appHash
  102. SaveState(blockExec.db, state)
  103. fail.Fail() // XXX
  104. // Events are fired after everything else.
  105. // NOTE: if we crash between Commit and Save, events wont be fired during replay
  106. fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses)
  107. return state, nil
  108. }
  109. // Commit locks the mempool, runs the ABCI Commit message, and updates the
  110. // mempool.
  111. // It returns the result of calling abci.Commit (the AppHash), and an error.
  112. // The Mempool must be locked during commit and update because state is
  113. // typically reset on Commit and old txs must be replayed against committed
  114. // state before new txs are run in the mempool, lest they be invalid.
  115. func (blockExec *BlockExecutor) Commit(
  116. state State,
  117. block *types.Block,
  118. ) ([]byte, error) {
  119. blockExec.mempool.Lock()
  120. defer blockExec.mempool.Unlock()
  121. // while mempool is Locked, flush to ensure all async requests have completed
  122. // in the ABCI app before Commit.
  123. err := blockExec.mempool.FlushAppConn()
  124. if err != nil {
  125. blockExec.logger.Error("Client error during mempool.FlushAppConn", "err", err)
  126. return nil, err
  127. }
  128. // Commit block, get hash back
  129. res, err := blockExec.proxyApp.CommitSync()
  130. if err != nil {
  131. blockExec.logger.Error(
  132. "Client error during proxyAppConn.CommitSync",
  133. "err", err,
  134. )
  135. return nil, err
  136. }
  137. // ResponseCommit has no error code - just data
  138. blockExec.logger.Info(
  139. "Committed state",
  140. "height", block.Height,
  141. "txs", block.NumTxs,
  142. "appHash", fmt.Sprintf("%X", res.Data),
  143. )
  144. // Update mempool.
  145. err = blockExec.mempool.Update(
  146. block.Height,
  147. block.Txs,
  148. TxPreCheck(state),
  149. TxPostCheck(state),
  150. )
  151. return res.Data, err
  152. }
  153. //---------------------------------------------------------
  154. // Helper functions for executing blocks and updating state
  155. // Executes block's transactions on proxyAppConn.
  156. // Returns a list of transaction results and updates to the validator set
  157. func execBlockOnProxyApp(
  158. logger log.Logger,
  159. proxyAppConn proxy.AppConnConsensus,
  160. block *types.Block,
  161. lastValSet *types.ValidatorSet,
  162. stateDB dbm.DB,
  163. ) (*ABCIResponses, error) {
  164. var validTxs, invalidTxs = 0, 0
  165. txIndex := 0
  166. abciResponses := NewABCIResponses(block)
  167. // Execute transactions and get hash.
  168. proxyCb := func(req *abci.Request, res *abci.Response) {
  169. switch r := res.Value.(type) {
  170. case *abci.Response_DeliverTx:
  171. // TODO: make use of res.Log
  172. // TODO: make use of this info
  173. // Blocks may include invalid txs.
  174. txRes := r.DeliverTx
  175. if txRes.Code == abci.CodeTypeOK {
  176. validTxs++
  177. } else {
  178. logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
  179. invalidTxs++
  180. }
  181. abciResponses.DeliverTx[txIndex] = txRes
  182. txIndex++
  183. }
  184. }
  185. proxyAppConn.SetResponseCallback(proxyCb)
  186. commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB)
  187. // Begin block.
  188. _, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
  189. Hash: block.Hash(),
  190. Header: types.TM2PB.Header(&block.Header),
  191. LastCommitInfo: commitInfo,
  192. ByzantineValidators: byzVals,
  193. })
  194. if err != nil {
  195. logger.Error("Error in proxyAppConn.BeginBlock", "err", err)
  196. return nil, err
  197. }
  198. // Run txs of block.
  199. for _, tx := range block.Txs {
  200. proxyAppConn.DeliverTxAsync(tx)
  201. if err := proxyAppConn.Error(); err != nil {
  202. return nil, err
  203. }
  204. }
  205. // End block.
  206. abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{Height: block.Height})
  207. if err != nil {
  208. logger.Error("Error in proxyAppConn.EndBlock", "err", err)
  209. return nil, err
  210. }
  211. logger.Info("Executed block", "height", block.Height, "validTxs", validTxs, "invalidTxs", invalidTxs)
  212. return abciResponses, nil
  213. }
  214. func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorSet, stateDB dbm.DB) (abci.LastCommitInfo, []abci.Evidence) {
  215. // Sanity check that commit length matches validator set size -
  216. // only applies after first block
  217. if block.Height > 1 {
  218. precommitLen := len(block.LastCommit.Precommits)
  219. valSetLen := len(lastValSet.Validators)
  220. if precommitLen != valSetLen {
  221. // sanity check
  222. panic(fmt.Sprintf("precommit length (%d) doesn't match valset length (%d) at height %d\n\n%v\n\n%v",
  223. precommitLen, valSetLen, block.Height, block.LastCommit.Precommits, lastValSet.Validators))
  224. }
  225. }
  226. // Collect the vote info (list of validators and whether or not they signed).
  227. voteInfos := make([]abci.VoteInfo, len(lastValSet.Validators))
  228. for i, val := range lastValSet.Validators {
  229. var vote *types.Vote
  230. if i < len(block.LastCommit.Precommits) {
  231. vote = block.LastCommit.Precommits[i]
  232. }
  233. voteInfo := abci.VoteInfo{
  234. Validator: types.TM2PB.Validator(val),
  235. SignedLastBlock: vote != nil,
  236. }
  237. voteInfos[i] = voteInfo
  238. }
  239. commitInfo := abci.LastCommitInfo{
  240. Round: int32(block.LastCommit.Round()),
  241. Votes: voteInfos,
  242. }
  243. byzVals := make([]abci.Evidence, len(block.Evidence.Evidence))
  244. for i, ev := range block.Evidence.Evidence {
  245. // We need the validator set. We already did this in validateBlock.
  246. // TODO: Should we instead cache the valset in the evidence itself and add
  247. // `SetValidatorSet()` and `ToABCI` methods ?
  248. valset, err := LoadValidators(stateDB, ev.Height())
  249. if err != nil {
  250. panic(err) // shouldn't happen
  251. }
  252. byzVals[i] = types.TM2PB.Evidence(ev, valset, block.Time)
  253. }
  254. return commitInfo, byzVals
  255. }
  256. // If more or equal than 1/3 of total voting power changed in one block, then
  257. // a light client could never prove the transition externally. See
  258. // ./lite/doc.go for details on how a light client tracks validators.
  259. func updateValidators(currentSet *types.ValidatorSet, abciUpdates []abci.ValidatorUpdate) ([]*types.Validator, error) {
  260. updates, err := types.PB2TM.ValidatorUpdates(abciUpdates)
  261. if err != nil {
  262. return nil, err
  263. }
  264. // these are tendermint types now
  265. for _, valUpdate := range updates {
  266. if valUpdate.VotingPower < 0 {
  267. return nil, fmt.Errorf("Voting power can't be negative %v", valUpdate)
  268. }
  269. address := valUpdate.Address
  270. _, val := currentSet.GetByAddress(address)
  271. if valUpdate.VotingPower == 0 {
  272. // remove val
  273. _, removed := currentSet.Remove(address)
  274. if !removed {
  275. return nil, fmt.Errorf("Failed to remove validator %X", address)
  276. }
  277. } else if val == nil {
  278. // add val
  279. added := currentSet.Add(valUpdate)
  280. if !added {
  281. return nil, fmt.Errorf("Failed to add new validator %v", valUpdate)
  282. }
  283. } else {
  284. // update val
  285. updated := currentSet.Update(valUpdate)
  286. if !updated {
  287. return nil, fmt.Errorf("Failed to update validator %X to %v", address, valUpdate)
  288. }
  289. }
  290. }
  291. return updates, nil
  292. }
  293. // updateState returns a new State updated according to the header and responses.
  294. func updateState(
  295. logger log.Logger,
  296. state State,
  297. blockID types.BlockID,
  298. header *types.Header,
  299. abciResponses *ABCIResponses,
  300. ) (State, error) {
  301. // Copy the valset so we can apply changes from EndBlock
  302. // and update s.LastValidators and s.Validators.
  303. nValSet := state.NextValidators.Copy()
  304. // Update the validator set with the latest abciResponses.
  305. lastHeightValsChanged := state.LastHeightValidatorsChanged
  306. if len(abciResponses.EndBlock.ValidatorUpdates) > 0 {
  307. validatorUpdates, err := updateValidators(nValSet, abciResponses.EndBlock.ValidatorUpdates)
  308. if err != nil {
  309. return state, fmt.Errorf("Error changing validator set: %v", err)
  310. }
  311. // Change results from this height but only applies to the next next height.
  312. lastHeightValsChanged = header.Height + 1 + 1
  313. logger.Info("Updates to validators", "updates", makeValidatorUpdatesLogString(validatorUpdates))
  314. }
  315. // Update validator accums and set state variables.
  316. nValSet.IncrementAccum(1)
  317. // Update the params with the latest abciResponses.
  318. nextParams := state.ConsensusParams
  319. lastHeightParamsChanged := state.LastHeightConsensusParamsChanged
  320. if abciResponses.EndBlock.ConsensusParamUpdates != nil {
  321. // NOTE: must not mutate s.ConsensusParams
  322. nextParams = state.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
  323. err := nextParams.Validate()
  324. if err != nil {
  325. return state, fmt.Errorf("Error updating consensus params: %v", err)
  326. }
  327. // Change results from this height but only applies to the next height.
  328. lastHeightParamsChanged = header.Height + 1
  329. }
  330. // TODO: allow app to upgrade version
  331. nextVersion := state.Version
  332. // NOTE: the AppHash has not been populated.
  333. // It will be filled on state.Save.
  334. return State{
  335. Version: nextVersion,
  336. ChainID: state.ChainID,
  337. LastBlockHeight: header.Height,
  338. LastBlockTotalTx: state.LastBlockTotalTx + header.NumTxs,
  339. LastBlockID: blockID,
  340. LastBlockTime: header.Time,
  341. NextValidators: nValSet,
  342. Validators: state.NextValidators.Copy(),
  343. LastValidators: state.Validators.Copy(),
  344. LastHeightValidatorsChanged: lastHeightValsChanged,
  345. ConsensusParams: nextParams,
  346. LastHeightConsensusParamsChanged: lastHeightParamsChanged,
  347. LastResultsHash: abciResponses.ResultsHash(),
  348. AppHash: nil,
  349. }, nil
  350. }
  351. // Fire NewBlock, NewBlockHeader.
  352. // Fire TxEvent for every tx.
  353. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
  354. func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
  355. eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
  356. eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
  357. for i, tx := range block.Data.Txs {
  358. eventBus.PublishEventTx(types.EventDataTx{types.TxResult{
  359. Height: block.Height,
  360. Index: uint32(i),
  361. Tx: tx,
  362. Result: *(abciResponses.DeliverTx[i]),
  363. }})
  364. }
  365. abciValUpdates := abciResponses.EndBlock.ValidatorUpdates
  366. if len(abciValUpdates) > 0 {
  367. // if there were an error, we would've stopped in updateValidators
  368. updates, _ := types.PB2TM.ValidatorUpdates(abciValUpdates)
  369. eventBus.PublishEventValidatorSetUpdates(
  370. types.EventDataValidatorSetUpdates{ValidatorUpdates: updates})
  371. }
  372. }
  373. //----------------------------------------------------------------------------------------------------
  374. // Execute block without state. TODO: eliminate
  375. // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
  376. // It returns the application root hash (result of abci.Commit).
  377. func ExecCommitBlock(
  378. appConnConsensus proxy.AppConnConsensus,
  379. block *types.Block,
  380. logger log.Logger,
  381. lastValSet *types.ValidatorSet,
  382. stateDB dbm.DB,
  383. ) ([]byte, error) {
  384. _, err := execBlockOnProxyApp(logger, appConnConsensus, block, lastValSet, stateDB)
  385. if err != nil {
  386. logger.Error("Error executing block on proxy app", "height", block.Height, "err", err)
  387. return nil, err
  388. }
  389. // Commit block, get hash back
  390. res, err := appConnConsensus.CommitSync()
  391. if err != nil {
  392. logger.Error("Client error during proxyAppConn.CommitSync", "err", res)
  393. return nil, err
  394. }
  395. // ResponseCommit has no error or log, just data
  396. return res.Data, nil
  397. }
  398. // Make pretty string for validatorUpdates logging
  399. func makeValidatorUpdatesLogString(vals []*types.Validator) string {
  400. chunks := make([]string, len(vals))
  401. for i, val := range vals {
  402. chunks[i] = fmt.Sprintf("%s:%d", val.Address, val.VotingPower)
  403. }
  404. return strings.Join(chunks, ",")
  405. }