You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

382 lines
13 KiB

8 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package state
  2. import (
  3. "fmt"
  4. fail "github.com/ebuchman/fail-test"
  5. abci "github.com/tendermint/abci/types"
  6. crypto "github.com/tendermint/go-crypto"
  7. "github.com/tendermint/tendermint/proxy"
  8. "github.com/tendermint/tendermint/types"
  9. dbm "github.com/tendermint/tmlibs/db"
  10. "github.com/tendermint/tmlibs/log"
  11. )
  12. //-----------------------------------------------------------------------------
  13. // BlockExecutor handles block execution and state updates.
  14. // It exposes ApplyBlock(), which validates & executes the block, updates state w/ ABCI responses,
  15. // then commits and updates the mempool atomically, then saves state.
  16. // BlockExecutor provides the context and accessories for properly executing a block.
  17. type BlockExecutor struct {
  18. // save state, validators, consensus params, abci responses here
  19. db dbm.DB
  20. // execute the app against this
  21. proxyApp proxy.AppConnConsensus
  22. // events
  23. eventBus types.BlockEventPublisher
  24. // update these with block results after commit
  25. mempool types.Mempool
  26. evpool types.EvidencePool
  27. logger log.Logger
  28. }
  29. // NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
  30. // Call SetEventBus to provide one.
  31. func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
  32. mempool types.Mempool, evpool types.EvidencePool) *BlockExecutor {
  33. return &BlockExecutor{
  34. db: db,
  35. proxyApp: proxyApp,
  36. eventBus: types.NopEventBus{},
  37. mempool: mempool,
  38. evpool: evpool,
  39. logger: logger,
  40. }
  41. }
  42. // SetEventBus - sets the event bus for publishing block related events.
  43. // If not called, it defaults to types.NopEventBus.
  44. func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
  45. blockExec.eventBus = eventBus
  46. }
  47. // ValidateBlock validates the given block against the given state.
  48. // If the block is invalid, it returns an error.
  49. // Validation does not mutate state, but does require historical information from the stateDB,
  50. // ie. to verify evidence from a validator at an old height.
  51. func (blockExec *BlockExecutor) ValidateBlock(s State, block *types.Block) error {
  52. return validateBlock(blockExec.db, s, block)
  53. }
  54. // ApplyBlock validates the block against the state, executes it against the app,
  55. // fires the relevant events, commits the app, and saves the new state and responses.
  56. // It's the only function that needs to be called
  57. // from outside this package to process and commit an entire block.
  58. // It takes a blockID to avoid recomputing the parts hash.
  59. func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block *types.Block) (State, error) {
  60. if err := blockExec.ValidateBlock(s, block); err != nil {
  61. return s, ErrInvalidBlock(err)
  62. }
  63. abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block)
  64. if err != nil {
  65. return s, ErrProxyAppConn(err)
  66. }
  67. fail.Fail() // XXX
  68. // save the results before we commit
  69. saveABCIResponses(blockExec.db, block.Height, abciResponses)
  70. fail.Fail() // XXX
  71. // update the state with the block and responses
  72. s, err = updateState(s, blockID, block.Header, abciResponses)
  73. if err != nil {
  74. return s, fmt.Errorf("Commit failed for application: %v", err)
  75. }
  76. // lock mempool, commit state, update mempoool
  77. appHash, err := blockExec.Commit(block)
  78. if err != nil {
  79. return s, fmt.Errorf("Commit failed for application: %v", err)
  80. }
  81. fail.Fail() // XXX
  82. // update the app hash and save the state
  83. s.AppHash = appHash
  84. SaveState(blockExec.db, s)
  85. fail.Fail() // XXX
  86. // Update evpool now that state is saved
  87. // TODO: handle the crash/recover scenario
  88. // ie. (may need to call Update for last block)
  89. blockExec.evpool.Update(block)
  90. // events are fired after everything else
  91. // NOTE: if we crash between Commit and Save, events wont be fired during replay
  92. fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses)
  93. return s, nil
  94. }
  95. // Commit locks the mempool, runs the ABCI Commit message, and updates the mempool.
  96. // It returns the result of calling abci.Commit (the AppHash), and an error.
  97. // The Mempool must be locked during commit and update because state is typically reset on Commit and old txs must be replayed
  98. // against committed state before new txs are run in the mempool, lest they be invalid.
  99. func (blockExec *BlockExecutor) Commit(block *types.Block) ([]byte, error) {
  100. blockExec.mempool.Lock()
  101. defer blockExec.mempool.Unlock()
  102. // while mempool is Locked, flush to ensure all async requests have completed
  103. // in the ABCI app before Commit.
  104. err := blockExec.mempool.FlushAppConn()
  105. if err != nil {
  106. blockExec.logger.Error("Client error during mempool.FlushAppConn", "err", err)
  107. return nil, err
  108. }
  109. // Commit block, get hash back
  110. res, err := blockExec.proxyApp.CommitSync()
  111. if err != nil {
  112. blockExec.logger.Error("Client error during proxyAppConn.CommitSync", "err", err)
  113. return nil, err
  114. }
  115. // ResponseCommit has no error code - just data
  116. blockExec.logger.Info("Committed state",
  117. "height", block.Height,
  118. "txs", block.NumTxs,
  119. "appHash", fmt.Sprintf("%X", res.Data))
  120. // Update mempool.
  121. if err := blockExec.mempool.Update(block.Height, block.Txs); err != nil {
  122. return nil, err
  123. }
  124. return res.Data, nil
  125. }
  126. //---------------------------------------------------------
  127. // Helper functions for executing blocks and updating state
  128. // Executes block's transactions on proxyAppConn.
  129. // Returns a list of transaction results and updates to the validator set
  130. func execBlockOnProxyApp(logger log.Logger, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) {
  131. var validTxs, invalidTxs = 0, 0
  132. txIndex := 0
  133. abciResponses := NewABCIResponses(block)
  134. // Execute transactions and get hash
  135. proxyCb := func(req *abci.Request, res *abci.Response) {
  136. switch r := res.Value.(type) {
  137. case *abci.Response_DeliverTx:
  138. // TODO: make use of res.Log
  139. // TODO: make use of this info
  140. // Blocks may include invalid txs.
  141. txRes := r.DeliverTx
  142. if txRes.Code == abci.CodeTypeOK {
  143. validTxs++
  144. } else {
  145. logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
  146. invalidTxs++
  147. }
  148. abciResponses.DeliverTx[txIndex] = txRes
  149. txIndex++
  150. }
  151. }
  152. proxyAppConn.SetResponseCallback(proxyCb)
  153. // determine which validators did not sign last block
  154. absentVals := make([]int32, 0)
  155. for valI, vote := range block.LastCommit.Precommits {
  156. if vote == nil {
  157. absentVals = append(absentVals, int32(valI))
  158. }
  159. }
  160. // TODO: determine which validators were byzantine
  161. byzantineVals := make([]abci.Evidence, len(block.Evidence.Evidence))
  162. for i, ev := range block.Evidence.Evidence {
  163. byzantineVals[i] = abci.Evidence{
  164. PubKey: ev.Address(), // XXX
  165. Height: ev.Height(),
  166. }
  167. }
  168. // Begin block
  169. _, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
  170. Hash: block.Hash(),
  171. Header: types.TM2PB.Header(block.Header),
  172. AbsentValidators: absentVals,
  173. ByzantineValidators: byzantineVals,
  174. })
  175. if err != nil {
  176. logger.Error("Error in proxyAppConn.BeginBlock", "err", err)
  177. return nil, err
  178. }
  179. // Run txs of block
  180. for _, tx := range block.Txs {
  181. proxyAppConn.DeliverTxAsync(tx)
  182. if err := proxyAppConn.Error(); err != nil {
  183. return nil, err
  184. }
  185. }
  186. // End block
  187. abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{block.Height})
  188. if err != nil {
  189. logger.Error("Error in proxyAppConn.EndBlock", "err", err)
  190. return nil, err
  191. }
  192. logger.Info("Executed block", "height", block.Height, "validTxs", validTxs, "invalidTxs", invalidTxs)
  193. valUpdates := abciResponses.EndBlock.ValidatorUpdates
  194. if len(valUpdates) > 0 {
  195. logger.Info("Updates to validators", "updates", abci.ValidatorsString(valUpdates))
  196. }
  197. return abciResponses, nil
  198. }
  199. // If more or equal than 1/3 of total voting power changed in one block, then
  200. // a light client could never prove the transition externally. See
  201. // ./lite/doc.go for details on how a light client tracks validators.
  202. func updateValidators(currentSet *types.ValidatorSet, updates []abci.Validator) error {
  203. for _, v := range updates {
  204. pubkey, err := crypto.PubKeyFromBytes(v.PubKey) // NOTE: expects go-amino encoded pubkey
  205. if err != nil {
  206. return err
  207. }
  208. address := pubkey.Address()
  209. power := int64(v.Power)
  210. // mind the overflow from int64
  211. if power < 0 {
  212. return fmt.Errorf("Power (%d) overflows int64", v.Power)
  213. }
  214. _, val := currentSet.GetByAddress(address)
  215. if val == nil {
  216. // add val
  217. added := currentSet.Add(types.NewValidator(pubkey, power))
  218. if !added {
  219. return fmt.Errorf("Failed to add new validator %X with voting power %d", address, power)
  220. }
  221. } else if v.Power == 0 {
  222. // remove val
  223. _, removed := currentSet.Remove(address)
  224. if !removed {
  225. return fmt.Errorf("Failed to remove validator %X", address)
  226. }
  227. } else {
  228. // update val
  229. val.VotingPower = power
  230. updated := currentSet.Update(val)
  231. if !updated {
  232. return fmt.Errorf("Failed to update validator %X with voting power %d", address, power)
  233. }
  234. }
  235. }
  236. return nil
  237. }
  238. // updateState returns a new State updated according to the header and responses.
  239. func updateState(s State, blockID types.BlockID, header *types.Header,
  240. abciResponses *ABCIResponses) (State, error) {
  241. // copy the valset so we can apply changes from EndBlock
  242. // and update s.LastValidators and s.Validators
  243. prevValSet := s.Validators.Copy()
  244. nextValSet := prevValSet.Copy()
  245. // update the validator set with the latest abciResponses
  246. lastHeightValsChanged := s.LastHeightValidatorsChanged
  247. if len(abciResponses.EndBlock.ValidatorUpdates) > 0 {
  248. err := updateValidators(nextValSet, abciResponses.EndBlock.ValidatorUpdates)
  249. if err != nil {
  250. return s, fmt.Errorf("Error changing validator set: %v", err)
  251. }
  252. // change results from this height but only applies to the next height
  253. lastHeightValsChanged = header.Height + 1
  254. }
  255. // Update validator accums and set state variables
  256. nextValSet.IncrementAccum(1)
  257. // update the params with the latest abciResponses
  258. nextParams := s.ConsensusParams
  259. lastHeightParamsChanged := s.LastHeightConsensusParamsChanged
  260. if abciResponses.EndBlock.ConsensusParamUpdates != nil {
  261. // NOTE: must not mutate s.ConsensusParams
  262. nextParams = s.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
  263. err := nextParams.Validate()
  264. if err != nil {
  265. return s, fmt.Errorf("Error updating consensus params: %v", err)
  266. }
  267. // change results from this height but only applies to the next height
  268. lastHeightParamsChanged = header.Height + 1
  269. }
  270. // NOTE: the AppHash has not been populated.
  271. // It will be filled on state.Save.
  272. return State{
  273. ChainID: s.ChainID,
  274. LastBlockHeight: header.Height,
  275. LastBlockTotalTx: s.LastBlockTotalTx + header.NumTxs,
  276. LastBlockID: blockID,
  277. LastBlockTime: header.Time,
  278. Validators: nextValSet,
  279. LastValidators: s.Validators.Copy(),
  280. LastHeightValidatorsChanged: lastHeightValsChanged,
  281. ConsensusParams: nextParams,
  282. LastHeightConsensusParamsChanged: lastHeightParamsChanged,
  283. LastResultsHash: abciResponses.ResultsHash(),
  284. AppHash: nil,
  285. }, nil
  286. }
  287. // Fire NewBlock, NewBlockHeader.
  288. // Fire TxEvent for every tx.
  289. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
  290. func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
  291. // NOTE: do we still need this buffer ?
  292. txEventBuffer := types.NewTxEventBuffer(eventBus, int(block.NumTxs))
  293. for i, tx := range block.Data.Txs {
  294. txEventBuffer.PublishEventTx(types.EventDataTx{types.TxResult{
  295. Height: block.Height,
  296. Index: uint32(i),
  297. Tx: tx,
  298. Result: *(abciResponses.DeliverTx[i]),
  299. }})
  300. }
  301. eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
  302. eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
  303. err := txEventBuffer.Flush()
  304. if err != nil {
  305. logger.Error("Failed to flush event buffer", "err", err)
  306. }
  307. }
  308. //----------------------------------------------------------------------------------------------------
  309. // Execute block without state. TODO: eliminate
  310. // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
  311. // It returns the application root hash (result of abci.Commit).
  312. func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger) ([]byte, error) {
  313. _, err := execBlockOnProxyApp(logger, appConnConsensus, block)
  314. if err != nil {
  315. logger.Error("Error executing block on proxy app", "height", block.Height, "err", err)
  316. return nil, err
  317. }
  318. // Commit block, get hash back
  319. res, err := appConnConsensus.CommitSync()
  320. if err != nil {
  321. logger.Error("Client error during proxyAppConn.CommitSync", "err", res)
  322. return nil, err
  323. }
  324. // ResponseCommit has no error or log, just data
  325. return res.Data, nil
  326. }