You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

565 lines
19 KiB

7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
8 years ago
8 years ago
  1. package state
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. abci "github.com/tendermint/tendermint/abci/types"
  7. dbm "github.com/tendermint/tendermint/libs/db"
  8. "github.com/tendermint/tendermint/libs/fail"
  9. "github.com/tendermint/tendermint/libs/log"
  10. "github.com/tendermint/tendermint/proxy"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. //-----------------------------------------------------------------------------
  14. // BlockExecutor handles block execution and state updates.
  15. // It exposes ApplyBlock(), which validates & executes the block, updates state w/ ABCI responses,
  16. // then commits and updates the mempool atomically, then saves state.
  17. // BlockExecutor provides the context and accessories for properly executing a block.
  18. type BlockExecutor struct {
  19. // save state, validators, consensus params, abci responses here
  20. db dbm.DB
  21. // execute the app against this
  22. proxyApp proxy.AppConnConsensus
  23. // events
  24. eventBus types.BlockEventPublisher
  25. // manage the mempool lock during commit
  26. // and update both with block results after commit.
  27. mempool Mempool
  28. evpool EvidencePool
  29. logger log.Logger
  30. metrics *Metrics
  31. }
  32. type BlockExecutorOption func(executor *BlockExecutor)
  33. func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
  34. return func(blockExec *BlockExecutor) {
  35. blockExec.metrics = metrics
  36. }
  37. }
  38. // NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
  39. // Call SetEventBus to provide one.
  40. func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
  41. mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
  42. res := &BlockExecutor{
  43. db: db,
  44. proxyApp: proxyApp,
  45. eventBus: types.NopEventBus{},
  46. mempool: mempool,
  47. evpool: evpool,
  48. logger: logger,
  49. metrics: NopMetrics(),
  50. }
  51. for _, option := range options {
  52. option(res)
  53. }
  54. return res
  55. }
  56. // SetEventBus - sets the event bus for publishing block related events.
  57. // If not called, it defaults to types.NopEventBus.
  58. func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
  59. blockExec.eventBus = eventBus
  60. }
  61. // CreateProposalBlock calls state.MakeBlock with evidence from the evpool
  62. // and txs from the mempool. The max bytes must be big enough to fit the commit.
  63. // Up to 1/10th of the block space is allcoated for maximum sized evidence.
  64. // The rest is given to txs, up to the max gas.
  65. func (blockExec *BlockExecutor) CreateProposalBlock(
  66. height int64,
  67. state State, commit *types.Commit,
  68. proposerAddr []byte,
  69. ) (*types.Block, *types.PartSet) {
  70. maxBytes := state.ConsensusParams.BlockSize.MaxBytes
  71. maxGas := state.ConsensusParams.BlockSize.MaxGas
  72. // Fetch a limited amount of valid evidence
  73. maxNumEvidence, _ := types.MaxEvidencePerBlock(maxBytes)
  74. evidence := blockExec.evpool.PendingEvidence(maxNumEvidence)
  75. // Fetch a limited amount of valid txs
  76. maxDataBytes := types.MaxDataBytes(maxBytes, state.Validators.Size(), len(evidence))
  77. txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas)
  78. return state.MakeBlock(height, txs, commit, evidence, proposerAddr)
  79. }
  80. // ValidateBlock validates the given block against the given state.
  81. // If the block is invalid, it returns an error.
  82. // Validation does not mutate state, but does require historical information from the stateDB,
  83. // ie. to verify evidence from a validator at an old height.
  84. func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) error {
  85. return validateBlock(blockExec.db, state, block)
  86. }
  87. // ApplyBlock validates the block against the state, executes it against the app,
  88. // fires the relevant events, commits the app, and saves the new state and responses.
  89. // It's the only function that needs to be called
  90. // from outside this package to process and commit an entire block.
  91. // It takes a blockID to avoid recomputing the parts hash.
  92. func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, block *types.Block) (State, error) {
  93. if err := blockExec.ValidateBlock(state, block); err != nil {
  94. return state, ErrInvalidBlock(err)
  95. }
  96. startTime := time.Now().UnixNano()
  97. abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, state.LastValidators, blockExec.db)
  98. endTime := time.Now().UnixNano()
  99. blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
  100. if err != nil {
  101. return state, ErrProxyAppConn(err)
  102. }
  103. fail.Fail() // XXX
  104. // Save the results before we commit.
  105. saveABCIResponses(blockExec.db, block.Height, abciResponses)
  106. fail.Fail() // XXX
  107. // validate the validator updates and convert to tendermint types
  108. abciValUpdates := abciResponses.EndBlock.ValidatorUpdates
  109. err = validateValidatorUpdates(abciValUpdates, state.ConsensusParams.Validator)
  110. if err != nil {
  111. return state, fmt.Errorf("Error in validator updates: %v", err)
  112. }
  113. validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates)
  114. if err != nil {
  115. return state, err
  116. }
  117. if len(validatorUpdates) > 0 {
  118. blockExec.logger.Info("Updates to validators", "updates", makeValidatorUpdatesLogString(validatorUpdates))
  119. }
  120. // Update the state with the block and responses.
  121. state, err = updateState(state, blockID, &block.Header, abciResponses, validatorUpdates)
  122. if err != nil {
  123. return state, fmt.Errorf("Commit failed for application: %v", err)
  124. }
  125. // Lock mempool, commit app state, update mempoool.
  126. appHash, err := blockExec.Commit(state, block)
  127. if err != nil {
  128. return state, fmt.Errorf("Commit failed for application: %v", err)
  129. }
  130. // Update evpool with the block and state.
  131. blockExec.evpool.Update(block, state)
  132. fail.Fail() // XXX
  133. // Update the app hash and save the state.
  134. state.AppHash = appHash
  135. SaveState(blockExec.db, state)
  136. fail.Fail() // XXX
  137. // Events are fired after everything else.
  138. // NOTE: if we crash between Commit and Save, events wont be fired during replay
  139. fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses, validatorUpdates)
  140. return state, nil
  141. }
  142. // Commit locks the mempool, runs the ABCI Commit message, and updates the
  143. // mempool.
  144. // It returns the result of calling abci.Commit (the AppHash), and an error.
  145. // The Mempool must be locked during commit and update because state is
  146. // typically reset on Commit and old txs must be replayed against committed
  147. // state before new txs are run in the mempool, lest they be invalid.
  148. func (blockExec *BlockExecutor) Commit(
  149. state State,
  150. block *types.Block,
  151. ) ([]byte, error) {
  152. blockExec.mempool.Lock()
  153. defer blockExec.mempool.Unlock()
  154. // while mempool is Locked, flush to ensure all async requests have completed
  155. // in the ABCI app before Commit.
  156. err := blockExec.mempool.FlushAppConn()
  157. if err != nil {
  158. blockExec.logger.Error("Client error during mempool.FlushAppConn", "err", err)
  159. return nil, err
  160. }
  161. // Commit block, get hash back
  162. res, err := blockExec.proxyApp.CommitSync()
  163. if err != nil {
  164. blockExec.logger.Error(
  165. "Client error during proxyAppConn.CommitSync",
  166. "err", err,
  167. )
  168. return nil, err
  169. }
  170. // ResponseCommit has no error code - just data
  171. blockExec.logger.Info(
  172. "Committed state",
  173. "height", block.Height,
  174. "txs", block.NumTxs,
  175. "appHash", fmt.Sprintf("%X", res.Data),
  176. )
  177. // Update mempool.
  178. err = blockExec.mempool.Update(
  179. block.Height,
  180. block.Txs,
  181. TxPreCheck(state),
  182. TxPostCheck(state),
  183. )
  184. return res.Data, err
  185. }
  186. //---------------------------------------------------------
  187. // Helper functions for executing blocks and updating state
  188. // Executes block's transactions on proxyAppConn.
  189. // Returns a list of transaction results and updates to the validator set
  190. func execBlockOnProxyApp(
  191. logger log.Logger,
  192. proxyAppConn proxy.AppConnConsensus,
  193. block *types.Block,
  194. lastValSet *types.ValidatorSet,
  195. stateDB dbm.DB,
  196. ) (*ABCIResponses, error) {
  197. var validTxs, invalidTxs = 0, 0
  198. txIndex := 0
  199. abciResponses := NewABCIResponses(block)
  200. // Execute transactions and get hash.
  201. proxyCb := func(req *abci.Request, res *abci.Response) {
  202. switch r := res.Value.(type) {
  203. case *abci.Response_DeliverTx:
  204. // TODO: make use of res.Log
  205. // TODO: make use of this info
  206. // Blocks may include invalid txs.
  207. txRes := r.DeliverTx
  208. if txRes.Code == abci.CodeTypeOK {
  209. validTxs++
  210. } else {
  211. logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
  212. invalidTxs++
  213. }
  214. abciResponses.DeliverTx[txIndex] = txRes
  215. txIndex++
  216. }
  217. }
  218. proxyAppConn.SetResponseCallback(proxyCb)
  219. commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB)
  220. // Begin block
  221. var err error
  222. abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
  223. Hash: block.Hash(),
  224. Header: types.TM2PB.Header(&block.Header),
  225. LastCommitInfo: commitInfo,
  226. ByzantineValidators: byzVals,
  227. })
  228. if err != nil {
  229. logger.Error("Error in proxyAppConn.BeginBlock", "err", err)
  230. return nil, err
  231. }
  232. // Run txs of block.
  233. for _, tx := range block.Txs {
  234. proxyAppConn.DeliverTxAsync(tx)
  235. if err := proxyAppConn.Error(); err != nil {
  236. return nil, err
  237. }
  238. }
  239. // End block.
  240. abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{Height: block.Height})
  241. if err != nil {
  242. logger.Error("Error in proxyAppConn.EndBlock", "err", err)
  243. return nil, err
  244. }
  245. logger.Info("Executed block", "height", block.Height, "validTxs", validTxs, "invalidTxs", invalidTxs)
  246. return abciResponses, nil
  247. }
  248. func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorSet, stateDB dbm.DB) (abci.LastCommitInfo, []abci.Evidence) {
  249. // Sanity check that commit length matches validator set size -
  250. // only applies after first block
  251. if block.Height > 1 {
  252. precommitLen := len(block.LastCommit.Precommits)
  253. valSetLen := len(lastValSet.Validators)
  254. if precommitLen != valSetLen {
  255. // sanity check
  256. panic(fmt.Sprintf("precommit length (%d) doesn't match valset length (%d) at height %d\n\n%v\n\n%v",
  257. precommitLen, valSetLen, block.Height, block.LastCommit.Precommits, lastValSet.Validators))
  258. }
  259. }
  260. // Collect the vote info (list of validators and whether or not they signed).
  261. voteInfos := make([]abci.VoteInfo, len(lastValSet.Validators))
  262. for i, val := range lastValSet.Validators {
  263. var vote *types.Vote
  264. if i < len(block.LastCommit.Precommits) {
  265. vote = block.LastCommit.Precommits[i]
  266. }
  267. voteInfo := abci.VoteInfo{
  268. Validator: types.TM2PB.Validator(val),
  269. SignedLastBlock: vote != nil,
  270. }
  271. voteInfos[i] = voteInfo
  272. }
  273. commitInfo := abci.LastCommitInfo{
  274. Round: int32(block.LastCommit.Round()),
  275. Votes: voteInfos,
  276. }
  277. byzVals := make([]abci.Evidence, len(block.Evidence.Evidence))
  278. for i, ev := range block.Evidence.Evidence {
  279. // We need the validator set. We already did this in validateBlock.
  280. // TODO: Should we instead cache the valset in the evidence itself and add
  281. // `SetValidatorSet()` and `ToABCI` methods ?
  282. valset, err := LoadValidators(stateDB, ev.Height())
  283. if err != nil {
  284. panic(err) // shouldn't happen
  285. }
  286. byzVals[i] = types.TM2PB.Evidence(ev, valset, block.Time)
  287. }
  288. return commitInfo, byzVals
  289. }
  290. func validateValidatorUpdates(abciUpdates []abci.ValidatorUpdate,
  291. params types.ValidatorParams) error {
  292. for _, valUpdate := range abciUpdates {
  293. if valUpdate.GetPower() < 0 {
  294. return fmt.Errorf("Voting power can't be negative %v", valUpdate)
  295. } else if valUpdate.GetPower() == 0 {
  296. // continue, since this is deleting the validator, and thus there is no
  297. // pubkey to check
  298. continue
  299. }
  300. // Check if validator's pubkey matches an ABCI type in the consensus params
  301. thisKeyType := valUpdate.PubKey.Type
  302. if !params.IsValidPubkeyType(thisKeyType) {
  303. return fmt.Errorf("Validator %v is using pubkey %s, which is unsupported for consensus",
  304. valUpdate, thisKeyType)
  305. }
  306. }
  307. return nil
  308. }
  309. // If more or equal than 1/3 of total voting power changed in one block, then
  310. // a light client could never prove the transition externally. See
  311. // ./lite/doc.go for details on how a light client tracks validators.
  312. func updateValidators(currentSet *types.ValidatorSet, updates []*types.Validator) error {
  313. for _, valUpdate := range updates {
  314. // should already have been checked
  315. if valUpdate.VotingPower < 0 {
  316. return fmt.Errorf("Voting power can't be negative %v", valUpdate)
  317. }
  318. address := valUpdate.Address
  319. _, val := currentSet.GetByAddress(address)
  320. // valUpdate.VotingPower is ensured to be non-negative in validation method
  321. if valUpdate.VotingPower == 0 { // remove val
  322. _, removed := currentSet.Remove(address)
  323. if !removed {
  324. return fmt.Errorf("Failed to remove validator %X", address)
  325. }
  326. } else if val == nil { // add val
  327. // make sure we do not exceed MaxTotalVotingPower by adding this validator:
  328. totalVotingPower := currentSet.TotalVotingPower()
  329. updatedVotingPower := valUpdate.VotingPower + totalVotingPower
  330. overflow := updatedVotingPower > types.MaxTotalVotingPower || updatedVotingPower < 0
  331. if overflow {
  332. return fmt.Errorf(
  333. "Failed to add new validator %v. Adding it would exceed max allowed total voting power %v",
  334. valUpdate,
  335. types.MaxTotalVotingPower)
  336. }
  337. // TODO: issue #1558 update spec according to the following:
  338. // Set ProposerPriority to -C*totalVotingPower (with C ~= 1.125) to make sure validators can't
  339. // unbond/rebond to reset their (potentially previously negative) ProposerPriority to zero.
  340. //
  341. // Contract: totalVotingPower < MaxTotalVotingPower to ensure ProposerPriority does
  342. // not exceed the bounds of int64.
  343. //
  344. // Compute ProposerPriority = -1.125*totalVotingPower == -(totalVotingPower + (totalVotingPower >> 3)).
  345. valUpdate.ProposerPriority = -(totalVotingPower + (totalVotingPower >> 3))
  346. added := currentSet.Add(valUpdate)
  347. if !added {
  348. return fmt.Errorf("Failed to add new validator %v", valUpdate)
  349. }
  350. } else { // update val
  351. // make sure we do not exceed MaxTotalVotingPower by updating this validator:
  352. totalVotingPower := currentSet.TotalVotingPower()
  353. curVotingPower := val.VotingPower
  354. updatedVotingPower := totalVotingPower - curVotingPower + valUpdate.VotingPower
  355. overflow := updatedVotingPower > types.MaxTotalVotingPower || updatedVotingPower < 0
  356. if overflow {
  357. return fmt.Errorf(
  358. "Failed to update existing validator %v. Updating it would exceed max allowed total voting power %v",
  359. valUpdate,
  360. types.MaxTotalVotingPower)
  361. }
  362. updated := currentSet.Update(valUpdate)
  363. if !updated {
  364. return fmt.Errorf("Failed to update validator %X to %v", address, valUpdate)
  365. }
  366. }
  367. }
  368. return nil
  369. }
  370. // updateState returns a new State updated according to the header and responses.
  371. func updateState(
  372. state State,
  373. blockID types.BlockID,
  374. header *types.Header,
  375. abciResponses *ABCIResponses,
  376. validatorUpdates []*types.Validator,
  377. ) (State, error) {
  378. // Copy the valset so we can apply changes from EndBlock
  379. // and update s.LastValidators and s.Validators.
  380. nValSet := state.NextValidators.Copy()
  381. // Update the validator set with the latest abciResponses.
  382. lastHeightValsChanged := state.LastHeightValidatorsChanged
  383. if len(validatorUpdates) > 0 {
  384. err := updateValidators(nValSet, validatorUpdates)
  385. if err != nil {
  386. return state, fmt.Errorf("Error changing validator set: %v", err)
  387. }
  388. // Change results from this height but only applies to the next next height.
  389. lastHeightValsChanged = header.Height + 1 + 1
  390. }
  391. // Update validator proposer priority and set state variables.
  392. nValSet.IncrementProposerPriority(1)
  393. // Update the params with the latest abciResponses.
  394. nextParams := state.ConsensusParams
  395. lastHeightParamsChanged := state.LastHeightConsensusParamsChanged
  396. if abciResponses.EndBlock.ConsensusParamUpdates != nil {
  397. // NOTE: must not mutate s.ConsensusParams
  398. nextParams = state.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
  399. err := nextParams.Validate()
  400. if err != nil {
  401. return state, fmt.Errorf("Error updating consensus params: %v", err)
  402. }
  403. // Change results from this height but only applies to the next height.
  404. lastHeightParamsChanged = header.Height + 1
  405. }
  406. // TODO: allow app to upgrade version
  407. nextVersion := state.Version
  408. // NOTE: the AppHash has not been populated.
  409. // It will be filled on state.Save.
  410. return State{
  411. Version: nextVersion,
  412. ChainID: state.ChainID,
  413. LastBlockHeight: header.Height,
  414. LastBlockTotalTx: state.LastBlockTotalTx + header.NumTxs,
  415. LastBlockID: blockID,
  416. LastBlockTime: header.Time,
  417. NextValidators: nValSet,
  418. Validators: state.NextValidators.Copy(),
  419. LastValidators: state.Validators.Copy(),
  420. LastHeightValidatorsChanged: lastHeightValsChanged,
  421. ConsensusParams: nextParams,
  422. LastHeightConsensusParamsChanged: lastHeightParamsChanged,
  423. LastResultsHash: abciResponses.ResultsHash(),
  424. AppHash: nil,
  425. }, nil
  426. }
  427. // Fire NewBlock, NewBlockHeader.
  428. // Fire TxEvent for every tx.
  429. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
  430. func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses, validatorUpdates []*types.Validator) {
  431. eventBus.PublishEventNewBlock(types.EventDataNewBlock{
  432. Block: block,
  433. ResultBeginBlock: *abciResponses.BeginBlock,
  434. ResultEndBlock: *abciResponses.EndBlock,
  435. })
  436. eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
  437. Header: block.Header,
  438. ResultBeginBlock: *abciResponses.BeginBlock,
  439. ResultEndBlock: *abciResponses.EndBlock,
  440. })
  441. for i, tx := range block.Data.Txs {
  442. eventBus.PublishEventTx(types.EventDataTx{types.TxResult{
  443. Height: block.Height,
  444. Index: uint32(i),
  445. Tx: tx,
  446. Result: *(abciResponses.DeliverTx[i]),
  447. }})
  448. }
  449. if len(validatorUpdates) > 0 {
  450. eventBus.PublishEventValidatorSetUpdates(
  451. types.EventDataValidatorSetUpdates{ValidatorUpdates: validatorUpdates})
  452. }
  453. }
  454. //----------------------------------------------------------------------------------------------------
  455. // Execute block without state. TODO: eliminate
  456. // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
  457. // It returns the application root hash (result of abci.Commit).
  458. func ExecCommitBlock(
  459. appConnConsensus proxy.AppConnConsensus,
  460. block *types.Block,
  461. logger log.Logger,
  462. lastValSet *types.ValidatorSet,
  463. stateDB dbm.DB,
  464. ) ([]byte, error) {
  465. _, err := execBlockOnProxyApp(logger, appConnConsensus, block, lastValSet, stateDB)
  466. if err != nil {
  467. logger.Error("Error executing block on proxy app", "height", block.Height, "err", err)
  468. return nil, err
  469. }
  470. // Commit block, get hash back
  471. res, err := appConnConsensus.CommitSync()
  472. if err != nil {
  473. logger.Error("Client error during proxyAppConn.CommitSync", "err", res)
  474. return nil, err
  475. }
  476. // ResponseCommit has no error or log, just data
  477. return res.Data, nil
  478. }
  479. // Make pretty string for validatorUpdates logging
  480. func makeValidatorUpdatesLogString(vals []*types.Validator) string {
  481. chunks := make([]string, len(vals))
  482. for i, val := range vals {
  483. chunks[i] = fmt.Sprintf("%s:%d", val.Address, val.VotingPower)
  484. }
  485. return strings.Join(chunks, ",")
  486. }