You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

511 lines
16 KiB

7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
8 years ago
8 years ago
  1. package state
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. abci "github.com/tendermint/tendermint/abci/types"
  7. dbm "github.com/tendermint/tendermint/libs/db"
  8. "github.com/tendermint/tendermint/libs/fail"
  9. "github.com/tendermint/tendermint/libs/log"
  10. "github.com/tendermint/tendermint/proxy"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. //-----------------------------------------------------------------------------
  14. // BlockExecutor handles block execution and state updates.
  15. // It exposes ApplyBlock(), which validates & executes the block, updates state w/ ABCI responses,
  16. // then commits and updates the mempool atomically, then saves state.
  17. // BlockExecutor provides the context and accessories for properly executing a block.
  18. type BlockExecutor struct {
  19. // save state, validators, consensus params, abci responses here
  20. db dbm.DB
  21. // execute the app against this
  22. proxyApp proxy.AppConnConsensus
  23. // events
  24. eventBus types.BlockEventPublisher
  25. // update these with block results after commit
  26. mempool Mempool
  27. evpool EvidencePool
  28. logger log.Logger
  29. metrics *Metrics
  30. }
  31. type BlockExecutorOption func(executor *BlockExecutor)
  32. func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
  33. return func(blockExec *BlockExecutor) {
  34. blockExec.metrics = metrics
  35. }
  36. }
  37. // NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
  38. // Call SetEventBus to provide one.
  39. func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
  40. mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
  41. res := &BlockExecutor{
  42. db: db,
  43. proxyApp: proxyApp,
  44. eventBus: types.NopEventBus{},
  45. mempool: mempool,
  46. evpool: evpool,
  47. logger: logger,
  48. metrics: NopMetrics(),
  49. }
  50. for _, option := range options {
  51. option(res)
  52. }
  53. return res
  54. }
  55. // SetEventBus - sets the event bus for publishing block related events.
  56. // If not called, it defaults to types.NopEventBus.
  57. func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
  58. blockExec.eventBus = eventBus
  59. }
  60. // ValidateBlock validates the given block against the given state.
  61. // If the block is invalid, it returns an error.
  62. // Validation does not mutate state, but does require historical information from the stateDB,
  63. // ie. to verify evidence from a validator at an old height.
  64. func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) error {
  65. return validateBlock(blockExec.db, state, block)
  66. }
  67. // ApplyBlock validates the block against the state, executes it against the app,
  68. // fires the relevant events, commits the app, and saves the new state and responses.
  69. // It's the only function that needs to be called
  70. // from outside this package to process and commit an entire block.
  71. // It takes a blockID to avoid recomputing the parts hash.
  72. func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, block *types.Block) (State, error) {
  73. if err := blockExec.ValidateBlock(state, block); err != nil {
  74. return state, ErrInvalidBlock(err)
  75. }
  76. startTime := time.Now().UnixNano()
  77. abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, state.LastValidators, blockExec.db)
  78. endTime := time.Now().UnixNano()
  79. blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
  80. if err != nil {
  81. return state, ErrProxyAppConn(err)
  82. }
  83. fail.Fail() // XXX
  84. // Save the results before we commit.
  85. saveABCIResponses(blockExec.db, block.Height, abciResponses)
  86. fail.Fail() // XXX
  87. // validate the validator updates and convert to tendermint types
  88. abciValUpdates := abciResponses.EndBlock.ValidatorUpdates
  89. err = validateValidatorUpdates(abciValUpdates, state.ConsensusParams.Validator)
  90. if err != nil {
  91. return state, fmt.Errorf("Error in validator updates: %v", err)
  92. }
  93. validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates)
  94. if err != nil {
  95. return state, err
  96. }
  97. if len(validatorUpdates) > 0 {
  98. blockExec.logger.Info("Updates to validators", "updates", makeValidatorUpdatesLogString(validatorUpdates))
  99. }
  100. // Update the state with the block and responses.
  101. state, err = updateState(state, blockID, &block.Header, abciResponses, validatorUpdates)
  102. if err != nil {
  103. return state, fmt.Errorf("Commit failed for application: %v", err)
  104. }
  105. // Lock mempool, commit app state, update mempoool.
  106. appHash, err := blockExec.Commit(state, block)
  107. if err != nil {
  108. return state, fmt.Errorf("Commit failed for application: %v", err)
  109. }
  110. // Update evpool with the block and state.
  111. blockExec.evpool.Update(block, state)
  112. fail.Fail() // XXX
  113. // Update the app hash and save the state.
  114. state.AppHash = appHash
  115. SaveState(blockExec.db, state)
  116. fail.Fail() // XXX
  117. // Events are fired after everything else.
  118. // NOTE: if we crash between Commit and Save, events wont be fired during replay
  119. fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses, validatorUpdates)
  120. return state, nil
  121. }
  122. // Commit locks the mempool, runs the ABCI Commit message, and updates the
  123. // mempool.
  124. // It returns the result of calling abci.Commit (the AppHash), and an error.
  125. // The Mempool must be locked during commit and update because state is
  126. // typically reset on Commit and old txs must be replayed against committed
  127. // state before new txs are run in the mempool, lest they be invalid.
  128. func (blockExec *BlockExecutor) Commit(
  129. state State,
  130. block *types.Block,
  131. ) ([]byte, error) {
  132. blockExec.mempool.Lock()
  133. defer blockExec.mempool.Unlock()
  134. // while mempool is Locked, flush to ensure all async requests have completed
  135. // in the ABCI app before Commit.
  136. err := blockExec.mempool.FlushAppConn()
  137. if err != nil {
  138. blockExec.logger.Error("Client error during mempool.FlushAppConn", "err", err)
  139. return nil, err
  140. }
  141. // Commit block, get hash back
  142. res, err := blockExec.proxyApp.CommitSync()
  143. if err != nil {
  144. blockExec.logger.Error(
  145. "Client error during proxyAppConn.CommitSync",
  146. "err", err,
  147. )
  148. return nil, err
  149. }
  150. // ResponseCommit has no error code - just data
  151. blockExec.logger.Info(
  152. "Committed state",
  153. "height", block.Height,
  154. "txs", block.NumTxs,
  155. "appHash", fmt.Sprintf("%X", res.Data),
  156. )
  157. // Update mempool.
  158. err = blockExec.mempool.Update(
  159. block.Height,
  160. block.Txs,
  161. TxPreCheck(state),
  162. TxPostCheck(state),
  163. )
  164. return res.Data, err
  165. }
  166. //---------------------------------------------------------
  167. // Helper functions for executing blocks and updating state
  168. // Executes block's transactions on proxyAppConn.
  169. // Returns a list of transaction results and updates to the validator set
  170. func execBlockOnProxyApp(
  171. logger log.Logger,
  172. proxyAppConn proxy.AppConnConsensus,
  173. block *types.Block,
  174. lastValSet *types.ValidatorSet,
  175. stateDB dbm.DB,
  176. ) (*ABCIResponses, error) {
  177. var validTxs, invalidTxs = 0, 0
  178. txIndex := 0
  179. abciResponses := NewABCIResponses(block)
  180. // Execute transactions and get hash.
  181. proxyCb := func(req *abci.Request, res *abci.Response) {
  182. switch r := res.Value.(type) {
  183. case *abci.Response_DeliverTx:
  184. // TODO: make use of res.Log
  185. // TODO: make use of this info
  186. // Blocks may include invalid txs.
  187. txRes := r.DeliverTx
  188. if txRes.Code == abci.CodeTypeOK {
  189. validTxs++
  190. } else {
  191. logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
  192. invalidTxs++
  193. }
  194. abciResponses.DeliverTx[txIndex] = txRes
  195. txIndex++
  196. }
  197. }
  198. proxyAppConn.SetResponseCallback(proxyCb)
  199. commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB)
  200. // Begin block
  201. var err error
  202. abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
  203. Hash: block.Hash(),
  204. Header: types.TM2PB.Header(&block.Header),
  205. LastCommitInfo: commitInfo,
  206. ByzantineValidators: byzVals,
  207. })
  208. if err != nil {
  209. logger.Error("Error in proxyAppConn.BeginBlock", "err", err)
  210. return nil, err
  211. }
  212. // Run txs of block.
  213. for _, tx := range block.Txs {
  214. proxyAppConn.DeliverTxAsync(tx)
  215. if err := proxyAppConn.Error(); err != nil {
  216. return nil, err
  217. }
  218. }
  219. // End block.
  220. abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{Height: block.Height})
  221. if err != nil {
  222. logger.Error("Error in proxyAppConn.EndBlock", "err", err)
  223. return nil, err
  224. }
  225. logger.Info("Executed block", "height", block.Height, "validTxs", validTxs, "invalidTxs", invalidTxs)
  226. return abciResponses, nil
  227. }
  228. func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorSet, stateDB dbm.DB) (abci.LastCommitInfo, []abci.Evidence) {
  229. // Sanity check that commit length matches validator set size -
  230. // only applies after first block
  231. if block.Height > 1 {
  232. precommitLen := len(block.LastCommit.Precommits)
  233. valSetLen := len(lastValSet.Validators)
  234. if precommitLen != valSetLen {
  235. // sanity check
  236. panic(fmt.Sprintf("precommit length (%d) doesn't match valset length (%d) at height %d\n\n%v\n\n%v",
  237. precommitLen, valSetLen, block.Height, block.LastCommit.Precommits, lastValSet.Validators))
  238. }
  239. }
  240. // Collect the vote info (list of validators and whether or not they signed).
  241. voteInfos := make([]abci.VoteInfo, len(lastValSet.Validators))
  242. for i, val := range lastValSet.Validators {
  243. var vote *types.Vote
  244. if i < len(block.LastCommit.Precommits) {
  245. vote = block.LastCommit.Precommits[i]
  246. }
  247. voteInfo := abci.VoteInfo{
  248. Validator: types.TM2PB.Validator(val),
  249. SignedLastBlock: vote != nil,
  250. }
  251. voteInfos[i] = voteInfo
  252. }
  253. commitInfo := abci.LastCommitInfo{
  254. Round: int32(block.LastCommit.Round()),
  255. Votes: voteInfos,
  256. }
  257. byzVals := make([]abci.Evidence, len(block.Evidence.Evidence))
  258. for i, ev := range block.Evidence.Evidence {
  259. // We need the validator set. We already did this in validateBlock.
  260. // TODO: Should we instead cache the valset in the evidence itself and add
  261. // `SetValidatorSet()` and `ToABCI` methods ?
  262. valset, err := LoadValidators(stateDB, ev.Height())
  263. if err != nil {
  264. panic(err) // shouldn't happen
  265. }
  266. byzVals[i] = types.TM2PB.Evidence(ev, valset, block.Time)
  267. }
  268. return commitInfo, byzVals
  269. }
  270. func validateValidatorUpdates(abciUpdates []abci.ValidatorUpdate,
  271. params types.ValidatorParams) error {
  272. for _, valUpdate := range abciUpdates {
  273. if valUpdate.GetPower() < 0 {
  274. return fmt.Errorf("Voting power can't be negative %v", valUpdate)
  275. } else if valUpdate.GetPower() == 0 {
  276. // continue, since this is deleting the validator, and thus there is no
  277. // pubkey to check
  278. continue
  279. }
  280. // Check if validator's pubkey matches an ABCI type in the consensus params
  281. thisKeyType := valUpdate.PubKey.Type
  282. if !params.IsValidPubkeyType(thisKeyType) {
  283. return fmt.Errorf("Validator %v is using pubkey %s, which is unsupported for consensus",
  284. valUpdate, thisKeyType)
  285. }
  286. }
  287. return nil
  288. }
  289. // If more or equal than 1/3 of total voting power changed in one block, then
  290. // a light client could never prove the transition externally. See
  291. // ./lite/doc.go for details on how a light client tracks validators.
  292. func updateValidators(currentSet *types.ValidatorSet, updates []*types.Validator) error {
  293. for _, valUpdate := range updates {
  294. // should already have been checked
  295. if valUpdate.VotingPower < 0 {
  296. return fmt.Errorf("Voting power can't be negative %v", valUpdate)
  297. }
  298. address := valUpdate.Address
  299. _, val := currentSet.GetByAddress(address)
  300. // valUpdate.VotingPower is ensured to be non-negative in validation method
  301. if valUpdate.VotingPower == 0 {
  302. // remove val
  303. _, removed := currentSet.Remove(address)
  304. if !removed {
  305. return fmt.Errorf("Failed to remove validator %X", address)
  306. }
  307. } else if val == nil {
  308. // add val
  309. added := currentSet.Add(valUpdate)
  310. if !added {
  311. return fmt.Errorf("Failed to add new validator %v", valUpdate)
  312. }
  313. } else {
  314. // update val
  315. updated := currentSet.Update(valUpdate)
  316. if !updated {
  317. return fmt.Errorf("Failed to update validator %X to %v", address, valUpdate)
  318. }
  319. }
  320. }
  321. return nil
  322. }
  323. // updateState returns a new State updated according to the header and responses.
  324. func updateState(
  325. state State,
  326. blockID types.BlockID,
  327. header *types.Header,
  328. abciResponses *ABCIResponses,
  329. validatorUpdates []*types.Validator,
  330. ) (State, error) {
  331. // Copy the valset so we can apply changes from EndBlock
  332. // and update s.LastValidators and s.Validators.
  333. nValSet := state.NextValidators.Copy()
  334. // Update the validator set with the latest abciResponses.
  335. lastHeightValsChanged := state.LastHeightValidatorsChanged
  336. if len(validatorUpdates) > 0 {
  337. err := updateValidators(nValSet, validatorUpdates)
  338. if err != nil {
  339. return state, fmt.Errorf("Error changing validator set: %v", err)
  340. }
  341. // Change results from this height but only applies to the next next height.
  342. lastHeightValsChanged = header.Height + 1 + 1
  343. }
  344. // Update validator accums and set state variables.
  345. nValSet.IncrementAccum(1)
  346. // Update the params with the latest abciResponses.
  347. nextParams := state.ConsensusParams
  348. lastHeightParamsChanged := state.LastHeightConsensusParamsChanged
  349. if abciResponses.EndBlock.ConsensusParamUpdates != nil {
  350. // NOTE: must not mutate s.ConsensusParams
  351. nextParams = state.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
  352. err := nextParams.Validate()
  353. if err != nil {
  354. return state, fmt.Errorf("Error updating consensus params: %v", err)
  355. }
  356. // Change results from this height but only applies to the next height.
  357. lastHeightParamsChanged = header.Height + 1
  358. }
  359. // TODO: allow app to upgrade version
  360. nextVersion := state.Version
  361. // NOTE: the AppHash has not been populated.
  362. // It will be filled on state.Save.
  363. return State{
  364. Version: nextVersion,
  365. ChainID: state.ChainID,
  366. LastBlockHeight: header.Height,
  367. LastBlockTotalTx: state.LastBlockTotalTx + header.NumTxs,
  368. LastBlockID: blockID,
  369. LastBlockTime: header.Time,
  370. NextValidators: nValSet,
  371. Validators: state.NextValidators.Copy(),
  372. LastValidators: state.Validators.Copy(),
  373. LastHeightValidatorsChanged: lastHeightValsChanged,
  374. ConsensusParams: nextParams,
  375. LastHeightConsensusParamsChanged: lastHeightParamsChanged,
  376. LastResultsHash: abciResponses.ResultsHash(),
  377. AppHash: nil,
  378. }, nil
  379. }
  380. // Fire NewBlock, NewBlockHeader.
  381. // Fire TxEvent for every tx.
  382. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
  383. func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses, validatorUpdates []*types.Validator) {
  384. eventBus.PublishEventNewBlock(types.EventDataNewBlock{
  385. Block: block,
  386. ResultBeginBlock: *abciResponses.BeginBlock,
  387. ResultEndBlock: *abciResponses.EndBlock,
  388. })
  389. eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
  390. Header: block.Header,
  391. ResultBeginBlock: *abciResponses.BeginBlock,
  392. ResultEndBlock: *abciResponses.EndBlock,
  393. })
  394. for i, tx := range block.Data.Txs {
  395. eventBus.PublishEventTx(types.EventDataTx{types.TxResult{
  396. Height: block.Height,
  397. Index: uint32(i),
  398. Tx: tx,
  399. Result: *(abciResponses.DeliverTx[i]),
  400. }})
  401. }
  402. if len(validatorUpdates) > 0 {
  403. eventBus.PublishEventValidatorSetUpdates(
  404. types.EventDataValidatorSetUpdates{ValidatorUpdates: validatorUpdates})
  405. }
  406. }
  407. //----------------------------------------------------------------------------------------------------
  408. // Execute block without state. TODO: eliminate
  409. // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
  410. // It returns the application root hash (result of abci.Commit).
  411. func ExecCommitBlock(
  412. appConnConsensus proxy.AppConnConsensus,
  413. block *types.Block,
  414. logger log.Logger,
  415. lastValSet *types.ValidatorSet,
  416. stateDB dbm.DB,
  417. ) ([]byte, error) {
  418. _, err := execBlockOnProxyApp(logger, appConnConsensus, block, lastValSet, stateDB)
  419. if err != nil {
  420. logger.Error("Error executing block on proxy app", "height", block.Height, "err", err)
  421. return nil, err
  422. }
  423. // Commit block, get hash back
  424. res, err := appConnConsensus.CommitSync()
  425. if err != nil {
  426. logger.Error("Client error during proxyAppConn.CommitSync", "err", res)
  427. return nil, err
  428. }
  429. // ResponseCommit has no error or log, just data
  430. return res.Data, nil
  431. }
  432. // Make pretty string for validatorUpdates logging
  433. func makeValidatorUpdatesLogString(vals []*types.Validator) string {
  434. chunks := make([]string, len(vals))
  435. for i, val := range vals {
  436. chunks[i] = fmt.Sprintf("%s:%d", val.Address, val.VotingPower)
  437. }
  438. return strings.Join(chunks, ",")
  439. }