You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

412 lines
14 KiB

7 years ago
6 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
6 years ago
6 years ago
8 years ago
6 years ago
8 years ago
  1. package state
  2. import (
  3. "fmt"
  4. fail "github.com/ebuchman/fail-test"
  5. abci "github.com/tendermint/tendermint/abci/types"
  6. dbm "github.com/tendermint/tendermint/libs/db"
  7. "github.com/tendermint/tendermint/libs/log"
  8. "github.com/tendermint/tendermint/proxy"
  9. "github.com/tendermint/tendermint/types"
  10. )
  11. //-----------------------------------------------------------------------------
  12. // BlockExecutor handles block execution and state updates.
  13. // It exposes ApplyBlock(), which validates & executes the block, updates state w/ ABCI responses,
  14. // then commits and updates the mempool atomically, then saves state.
  15. // BlockExecutor provides the context and accessories for properly executing a block.
  16. type BlockExecutor struct {
  17. // save state, validators, consensus params, abci responses here
  18. db dbm.DB
  19. // execute the app against this
  20. proxyApp proxy.AppConnConsensus
  21. // events
  22. eventBus types.BlockEventPublisher
  23. // update these with block results after commit
  24. mempool Mempool
  25. evpool EvidencePool
  26. logger log.Logger
  27. }
  28. // NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
  29. // Call SetEventBus to provide one.
  30. func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
  31. mempool Mempool, evpool EvidencePool) *BlockExecutor {
  32. return &BlockExecutor{
  33. db: db,
  34. proxyApp: proxyApp,
  35. eventBus: types.NopEventBus{},
  36. mempool: mempool,
  37. evpool: evpool,
  38. logger: logger,
  39. }
  40. }
  41. // SetEventBus - sets the event bus for publishing block related events.
  42. // If not called, it defaults to types.NopEventBus.
  43. func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
  44. blockExec.eventBus = eventBus
  45. }
  46. // ValidateBlock validates the given block against the given state.
  47. // If the block is invalid, it returns an error.
  48. // Validation does not mutate state, but does require historical information from the stateDB,
  49. // ie. to verify evidence from a validator at an old height.
  50. func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) error {
  51. return validateBlock(blockExec.db, state, block)
  52. }
  53. // ApplyBlock validates the block against the state, executes it against the app,
  54. // fires the relevant events, commits the app, and saves the new state and responses.
  55. // It's the only function that needs to be called
  56. // from outside this package to process and commit an entire block.
  57. // It takes a blockID to avoid recomputing the parts hash.
  58. func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, block *types.Block) (State, error) {
  59. if err := blockExec.ValidateBlock(state, block); err != nil {
  60. return state, ErrInvalidBlock(err)
  61. }
  62. abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, state.LastValidators, blockExec.db)
  63. if err != nil {
  64. return state, ErrProxyAppConn(err)
  65. }
  66. fail.Fail() // XXX
  67. // Save the results before we commit.
  68. saveABCIResponses(blockExec.db, block.Height, abciResponses)
  69. fail.Fail() // XXX
  70. // Update the state with the block and responses.
  71. state, err = updateState(state, blockID, &block.Header, abciResponses)
  72. if err != nil {
  73. return state, fmt.Errorf("Commit failed for application: %v", err)
  74. }
  75. // Lock mempool, commit app state, update mempoool.
  76. appHash, err := blockExec.Commit(block)
  77. if err != nil {
  78. return state, fmt.Errorf("Commit failed for application: %v", err)
  79. }
  80. // Update evpool with the block and state.
  81. blockExec.evpool.Update(block, state)
  82. fail.Fail() // XXX
  83. // Update the app hash and save the state.
  84. state.AppHash = appHash
  85. SaveState(blockExec.db, state)
  86. fail.Fail() // XXX
  87. // Events are fired after everything else.
  88. // NOTE: if we crash between Commit and Save, events wont be fired during replay
  89. fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses)
  90. return state, nil
  91. }
  92. // Commit locks the mempool, runs the ABCI Commit message, and updates the mempool.
  93. // It returns the result of calling abci.Commit (the AppHash), and an error.
  94. // The Mempool must be locked during commit and update because state is typically reset on Commit and old txs must be replayed
  95. // against committed state before new txs are run in the mempool, lest they be invalid.
  96. func (blockExec *BlockExecutor) Commit(block *types.Block) ([]byte, error) {
  97. blockExec.mempool.Lock()
  98. defer blockExec.mempool.Unlock()
  99. // while mempool is Locked, flush to ensure all async requests have completed
  100. // in the ABCI app before Commit.
  101. err := blockExec.mempool.FlushAppConn()
  102. if err != nil {
  103. blockExec.logger.Error("Client error during mempool.FlushAppConn", "err", err)
  104. return nil, err
  105. }
  106. // Commit block, get hash back
  107. res, err := blockExec.proxyApp.CommitSync()
  108. if err != nil {
  109. blockExec.logger.Error("Client error during proxyAppConn.CommitSync", "err", err)
  110. return nil, err
  111. }
  112. // ResponseCommit has no error code - just data
  113. blockExec.logger.Info("Committed state",
  114. "height", block.Height,
  115. "txs", block.NumTxs,
  116. "appHash", fmt.Sprintf("%X", res.Data))
  117. // Update mempool.
  118. if err := blockExec.mempool.Update(block.Height, block.Txs); err != nil {
  119. return nil, err
  120. }
  121. return res.Data, nil
  122. }
  123. //---------------------------------------------------------
  124. // Helper functions for executing blocks and updating state
  125. // Executes block's transactions on proxyAppConn.
  126. // Returns a list of transaction results and updates to the validator set
  127. func execBlockOnProxyApp(logger log.Logger, proxyAppConn proxy.AppConnConsensus,
  128. block *types.Block, lastValSet *types.ValidatorSet, stateDB dbm.DB) (*ABCIResponses, error) {
  129. var validTxs, invalidTxs = 0, 0
  130. txIndex := 0
  131. abciResponses := NewABCIResponses(block)
  132. // Execute transactions and get hash.
  133. proxyCb := func(req *abci.Request, res *abci.Response) {
  134. switch r := res.Value.(type) {
  135. case *abci.Response_DeliverTx:
  136. // TODO: make use of res.Log
  137. // TODO: make use of this info
  138. // Blocks may include invalid txs.
  139. txRes := r.DeliverTx
  140. if txRes.Code == abci.CodeTypeOK {
  141. validTxs++
  142. } else {
  143. logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
  144. invalidTxs++
  145. }
  146. abciResponses.DeliverTx[txIndex] = txRes
  147. txIndex++
  148. }
  149. }
  150. proxyAppConn.SetResponseCallback(proxyCb)
  151. signVals, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB)
  152. // Begin block.
  153. _, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
  154. Hash: block.Hash(),
  155. Header: types.TM2PB.Header(&block.Header),
  156. LastCommitInfo: abci.LastCommitInfo{
  157. CommitRound: int32(block.LastCommit.Round()),
  158. Validators: signVals,
  159. },
  160. ByzantineValidators: byzVals,
  161. })
  162. if err != nil {
  163. logger.Error("Error in proxyAppConn.BeginBlock", "err", err)
  164. return nil, err
  165. }
  166. // Run txs of block.
  167. for _, tx := range block.Txs {
  168. proxyAppConn.DeliverTxAsync(tx)
  169. if err := proxyAppConn.Error(); err != nil {
  170. return nil, err
  171. }
  172. }
  173. // End block.
  174. abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{Height: block.Height})
  175. if err != nil {
  176. logger.Error("Error in proxyAppConn.EndBlock", "err", err)
  177. return nil, err
  178. }
  179. logger.Info("Executed block", "height", block.Height, "validTxs", validTxs, "invalidTxs", invalidTxs)
  180. valUpdates := abciResponses.EndBlock.ValidatorUpdates
  181. if len(valUpdates) > 0 {
  182. logger.Info("Updates to validators", "updates", abci.ValidatorsString(valUpdates))
  183. }
  184. return abciResponses, nil
  185. }
  186. func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorSet, stateDB dbm.DB) ([]abci.SigningValidator, []abci.Evidence) {
  187. // Sanity check that commit length matches validator set size -
  188. // only applies after first block
  189. if block.Height > 1 {
  190. precommitLen := len(block.LastCommit.Precommits)
  191. valSetLen := len(lastValSet.Validators)
  192. if precommitLen != valSetLen {
  193. // sanity check
  194. panic(fmt.Sprintf("precommit length (%d) doesn't match valset length (%d) at height %d\n\n%v\n\n%v",
  195. precommitLen, valSetLen, block.Height, block.LastCommit.Precommits, lastValSet.Validators))
  196. }
  197. }
  198. // determine which validators did not sign last block.
  199. signVals := make([]abci.SigningValidator, len(lastValSet.Validators))
  200. for i, val := range lastValSet.Validators {
  201. var vote *types.Vote
  202. if i < len(block.LastCommit.Precommits) {
  203. vote = block.LastCommit.Precommits[i]
  204. }
  205. val := abci.SigningValidator{
  206. Validator: types.TM2PB.ValidatorWithoutPubKey(val),
  207. SignedLastBlock: vote != nil,
  208. }
  209. signVals[i] = val
  210. }
  211. byzVals := make([]abci.Evidence, len(block.Evidence.Evidence))
  212. for i, ev := range block.Evidence.Evidence {
  213. // We need the validator set. We already did this in validateBlock.
  214. // TODO: Should we instead cache the valset in the evidence itself and add
  215. // `SetValidatorSet()` and `ToABCI` methods ?
  216. valset, err := LoadValidators(stateDB, ev.Height())
  217. if err != nil {
  218. panic(err) // shouldn't happen
  219. }
  220. byzVals[i] = types.TM2PB.Evidence(ev, valset, block.Time)
  221. }
  222. return signVals, byzVals
  223. }
  224. // If more or equal than 1/3 of total voting power changed in one block, then
  225. // a light client could never prove the transition externally. See
  226. // ./lite/doc.go for details on how a light client tracks validators.
  227. func updateValidators(currentSet *types.ValidatorSet, abciUpdates []abci.Validator) error {
  228. updates, err := types.PB2TM.Validators(abciUpdates)
  229. if err != nil {
  230. return err
  231. }
  232. // these are tendermint types now
  233. for _, valUpdate := range updates {
  234. if valUpdate.VotingPower < 0 {
  235. return fmt.Errorf("Voting power can't be negative %v", valUpdate)
  236. }
  237. address := valUpdate.Address
  238. _, val := currentSet.GetByAddress(address)
  239. if valUpdate.VotingPower == 0 {
  240. // remove val
  241. _, removed := currentSet.Remove(address)
  242. if !removed {
  243. return fmt.Errorf("Failed to remove validator %X", address)
  244. }
  245. } else if val == nil {
  246. // add val
  247. added := currentSet.Add(valUpdate)
  248. if !added {
  249. return fmt.Errorf("Failed to add new validator %v", valUpdate)
  250. }
  251. } else {
  252. // update val
  253. updated := currentSet.Update(valUpdate)
  254. if !updated {
  255. return fmt.Errorf("Failed to update validator %X to %v", address, valUpdate)
  256. }
  257. }
  258. }
  259. return nil
  260. }
  261. // updateState returns a new State updated according to the header and responses.
  262. func updateState(state State, blockID types.BlockID, header *types.Header,
  263. abciResponses *ABCIResponses) (State, error) {
  264. // Copy the valset so we can apply changes from EndBlock
  265. // and update s.LastValidators and s.Validators.
  266. nValSet := state.NextValidators.Copy()
  267. // Update the validator set with the latest abciResponses.
  268. lastHeightValsChanged := state.LastHeightValidatorsChanged
  269. if len(abciResponses.EndBlock.ValidatorUpdates) > 0 {
  270. err := updateValidators(nValSet, abciResponses.EndBlock.ValidatorUpdates)
  271. if err != nil {
  272. return state, fmt.Errorf("Error changing validator set: %v", err)
  273. }
  274. // Change results from this height but only applies to the next next height.
  275. lastHeightValsChanged = header.Height + 1 + 1
  276. }
  277. // Update validator accums and set state variables.
  278. nValSet.IncrementAccum(1)
  279. // Update the params with the latest abciResponses.
  280. nextParams := state.ConsensusParams
  281. lastHeightParamsChanged := state.LastHeightConsensusParamsChanged
  282. if abciResponses.EndBlock.ConsensusParamUpdates != nil {
  283. // NOTE: must not mutate s.ConsensusParams
  284. nextParams = state.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
  285. err := nextParams.Validate()
  286. if err != nil {
  287. return state, fmt.Errorf("Error updating consensus params: %v", err)
  288. }
  289. // Change results from this height but only applies to the next height.
  290. lastHeightParamsChanged = header.Height + 1
  291. }
  292. // NOTE: the AppHash has not been populated.
  293. // It will be filled on state.Save.
  294. return State{
  295. ChainID: state.ChainID,
  296. LastBlockHeight: header.Height,
  297. LastBlockTotalTx: state.LastBlockTotalTx + header.NumTxs,
  298. LastBlockID: blockID,
  299. LastBlockTime: header.Time,
  300. NextValidators: nValSet,
  301. Validators: state.NextValidators.Copy(),
  302. LastValidators: state.Validators.Copy(),
  303. LastHeightValidatorsChanged: lastHeightValsChanged,
  304. ConsensusParams: nextParams,
  305. LastHeightConsensusParamsChanged: lastHeightParamsChanged,
  306. LastResultsHash: abciResponses.ResultsHash(),
  307. AppHash: nil,
  308. }, nil
  309. }
  310. // Fire NewBlock, NewBlockHeader.
  311. // Fire TxEvent for every tx.
  312. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
  313. func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
  314. eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
  315. eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
  316. for i, tx := range block.Data.Txs {
  317. eventBus.PublishEventTx(types.EventDataTx{types.TxResult{
  318. Height: block.Height,
  319. Index: uint32(i),
  320. Tx: tx,
  321. Result: *(abciResponses.DeliverTx[i]),
  322. }})
  323. }
  324. if len(abciResponses.EndBlock.ValidatorUpdates) > 0 {
  325. // if there were an error, we would've stopped in updateValidators
  326. updates, _ := types.PB2TM.Validators(abciResponses.EndBlock.ValidatorUpdates)
  327. eventBus.PublishEventValidatorSetUpdates(
  328. types.EventDataValidatorSetUpdates{ValidatorUpdates: updates})
  329. }
  330. }
  331. //----------------------------------------------------------------------------------------------------
  332. // Execute block without state. TODO: eliminate
  333. // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
  334. // It returns the application root hash (result of abci.Commit).
  335. func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block,
  336. logger log.Logger, lastValSet *types.ValidatorSet, stateDB dbm.DB) ([]byte, error) {
  337. _, err := execBlockOnProxyApp(logger, appConnConsensus, block, lastValSet, stateDB)
  338. if err != nil {
  339. logger.Error("Error executing block on proxy app", "height", block.Height, "err", err)
  340. return nil, err
  341. }
  342. // Commit block, get hash back
  343. res, err := appConnConsensus.CommitSync()
  344. if err != nil {
  345. logger.Error("Client error during proxyAppConn.CommitSync", "err", res)
  346. return nil, err
  347. }
  348. // ResponseCommit has no error or log, just data
  349. return res.Data, nil
  350. }