You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

435 lines
14 KiB

8 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package state
  2. import (
  3. "errors"
  4. "fmt"
  5. fail "github.com/ebuchman/fail-test"
  6. abci "github.com/tendermint/abci/types"
  7. crypto "github.com/tendermint/go-crypto"
  8. "github.com/tendermint/tendermint/proxy"
  9. "github.com/tendermint/tendermint/types"
  10. dbm "github.com/tendermint/tmlibs/db"
  11. "github.com/tendermint/tmlibs/log"
  12. )
  13. //-----------------------------------------------------------------------------
  14. // BlockExecutor handles block execution and state updates.
  15. // It exposes ApplyBlock(), which validates & executes the block, updates state w/ ABCI responses,
  16. // then commits and updates the mempool atomically, then saves state.
  17. // BlockExecutor provides the context and accessories for properly executing a block.
  18. type BlockExecutor struct {
  19. // save state, validators, consensus params, abci responses here
  20. db dbm.DB
  21. // execute the app against this
  22. proxyApp proxy.AppConnConsensus
  23. // events
  24. eventBus types.BlockEventPublisher
  25. // update these with block results after commit
  26. mempool types.Mempool
  27. evpool types.EvidencePool
  28. logger log.Logger
  29. }
  30. // NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
  31. // Call SetEventBus to provide one.
  32. func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
  33. mempool types.Mempool, evpool types.EvidencePool) *BlockExecutor {
  34. return &BlockExecutor{
  35. db: db,
  36. proxyApp: proxyApp,
  37. eventBus: types.NopEventBus{},
  38. mempool: mempool,
  39. evpool: evpool,
  40. logger: logger,
  41. }
  42. }
  43. // SetEventBus - sets the event bus for publishing block related events.
  44. // If not called, it defaults to types.NopEventBus.
  45. func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
  46. blockExec.eventBus = eventBus
  47. }
  48. // ValidateBlock validates the given block against the given state.
  49. // If the block is invalid, it returns an error.
  50. // Validation does not mutate state, but does require historical information from the stateDB,
  51. // ie. to verify evidence from a validator at an old height.
  52. func (blockExec *BlockExecutor) ValidateBlock(s State, block *types.Block) error {
  53. return validateBlock(blockExec.db, s, block)
  54. }
  55. // ApplyBlock validates the block against the state, executes it against the app,
  56. // fires the relevant events, commits the app, and saves the new state and responses.
  57. // It's the only function that needs to be called
  58. // from outside this package to process and commit an entire block.
  59. // It takes a blockID to avoid recomputing the parts hash.
  60. func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block *types.Block) (State, error) {
  61. if err := blockExec.ValidateBlock(s, block); err != nil {
  62. return s, ErrInvalidBlock(err)
  63. }
  64. abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block)
  65. if err != nil {
  66. return s, ErrProxyAppConn(err)
  67. }
  68. fail.Fail() // XXX
  69. // save the results before we commit
  70. saveABCIResponses(blockExec.db, block.Height, abciResponses)
  71. fail.Fail() // XXX
  72. // update the state with the block and responses
  73. s, err = updateState(s, blockID, block.Header, abciResponses)
  74. if err != nil {
  75. return s, fmt.Errorf("Commit failed for application: %v", err)
  76. }
  77. // lock mempool, commit state, update mempoool
  78. appHash, err := blockExec.Commit(block)
  79. if err != nil {
  80. return s, fmt.Errorf("Commit failed for application: %v", err)
  81. }
  82. fail.Fail() // XXX
  83. // update the app hash and save the state
  84. s.AppHash = appHash
  85. SaveState(blockExec.db, s)
  86. fail.Fail() // XXX
  87. // Update evpool now that state is saved
  88. // TODO: handle the crash/recover scenario
  89. // ie. (may need to call Update for last block)
  90. blockExec.evpool.Update(block)
  91. // events are fired after everything else
  92. // NOTE: if we crash between Commit and Save, events wont be fired during replay
  93. fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses)
  94. return s, nil
  95. }
  96. // Commit locks the mempool, runs the ABCI Commit message, and updates the mempool.
  97. // It returns the result of calling abci.Commit (the AppHash), and an error.
  98. // The Mempool must be locked during commit and update because state is typically reset on Commit and old txs must be replayed
  99. // against committed state before new txs are run in the mempool, lest they be invalid.
  100. func (blockExec *BlockExecutor) Commit(block *types.Block) ([]byte, error) {
  101. blockExec.mempool.Lock()
  102. defer blockExec.mempool.Unlock()
  103. // while mempool is Locked, flush to ensure all async requests have completed
  104. // in the ABCI app before Commit.
  105. err := blockExec.mempool.FlushAppConn()
  106. if err != nil {
  107. blockExec.logger.Error("Client error during mempool.FlushAppConn", "err", err)
  108. return nil, err
  109. }
  110. // Commit block, get hash back
  111. res, err := blockExec.proxyApp.CommitSync()
  112. if err != nil {
  113. blockExec.logger.Error("Client error during proxyAppConn.CommitSync", "err", err)
  114. return nil, err
  115. }
  116. if res.IsErr() {
  117. blockExec.logger.Error("Error in proxyAppConn.CommitSync", "err", res)
  118. return nil, res
  119. }
  120. if res.Log != "" {
  121. blockExec.logger.Debug("Commit.Log: " + res.Log)
  122. }
  123. blockExec.logger.Info("Committed state", "height", block.Height, "txs", block.NumTxs, "appHash", res.Data)
  124. // Update mempool.
  125. if err := blockExec.mempool.Update(block.Height, block.Txs); err != nil {
  126. return nil, err
  127. }
  128. return res.Data, nil
  129. }
  130. //---------------------------------------------------------
  131. // Helper functions for executing blocks and updating state
  132. // Executes block's transactions on proxyAppConn.
  133. // Returns a list of transaction results and updates to the validator set
  134. func execBlockOnProxyApp(logger log.Logger, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) {
  135. var validTxs, invalidTxs = 0, 0
  136. txIndex := 0
  137. abciResponses := NewABCIResponses(block)
  138. // Execute transactions and get hash
  139. proxyCb := func(req *abci.Request, res *abci.Response) {
  140. switch r := res.Value.(type) {
  141. case *abci.Response_DeliverTx:
  142. // TODO: make use of res.Log
  143. // TODO: make use of this info
  144. // Blocks may include invalid txs.
  145. txRes := r.DeliverTx
  146. if txRes.Code == abci.CodeTypeOK {
  147. validTxs++
  148. } else {
  149. logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
  150. invalidTxs++
  151. }
  152. abciResponses.DeliverTx[txIndex] = txRes
  153. txIndex++
  154. }
  155. }
  156. proxyAppConn.SetResponseCallback(proxyCb)
  157. // determine which validators did not sign last block
  158. absentVals := make([]int32, 0)
  159. for valI, vote := range block.LastCommit.Precommits {
  160. if vote == nil {
  161. absentVals = append(absentVals, int32(valI))
  162. }
  163. }
  164. byzantineVals := make([]*abci.Evidence, len(block.Evidence.Evidence))
  165. for i, ev := range block.Evidence.Evidence {
  166. byzantineVals[i] = &abci.Evidence{
  167. PubKey: ev.Address(), // XXX/TODO
  168. Height: ev.Height(),
  169. }
  170. }
  171. // Begin block
  172. _, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
  173. Hash: block.Hash(),
  174. Header: types.TM2PB.Header(block.Header),
  175. AbsentValidators: absentVals,
  176. ByzantineValidators: byzantineVals,
  177. })
  178. if err != nil {
  179. logger.Error("Error in proxyAppConn.BeginBlock", "err", err)
  180. return nil, err
  181. }
  182. // Run txs of block
  183. for _, tx := range block.Txs {
  184. proxyAppConn.DeliverTxAsync(tx)
  185. if err := proxyAppConn.Error(); err != nil {
  186. return nil, err
  187. }
  188. }
  189. // End block
  190. abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{block.Height})
  191. if err != nil {
  192. logger.Error("Error in proxyAppConn.EndBlock", "err", err)
  193. return nil, err
  194. }
  195. logger.Info("Executed block", "height", block.Height, "validTxs", validTxs, "invalidTxs", invalidTxs)
  196. valUpdates := abciResponses.EndBlock.ValidatorUpdates
  197. if len(valUpdates) > 0 {
  198. logger.Info("Updates to validators", "updates", abci.ValidatorsString(valUpdates))
  199. }
  200. return abciResponses, nil
  201. }
  202. func updateValidators(currentSet *types.ValidatorSet, updates []*abci.Validator) error {
  203. // If more or equal than 1/3 of total voting power changed in one block, then
  204. // a light client could never prove the transition externally. See
  205. // ./lite/doc.go for details on how a light client tracks validators.
  206. vp23, err := changeInVotingPowerMoreOrEqualToOneThird(currentSet, updates)
  207. if err != nil {
  208. return err
  209. }
  210. if vp23 {
  211. return errors.New("the change in voting power must be strictly less than 1/3")
  212. }
  213. for _, v := range updates {
  214. pubkey, err := crypto.PubKeyFromBytes(v.PubKey) // NOTE: expects go-wire encoded pubkey
  215. if err != nil {
  216. return err
  217. }
  218. address := pubkey.Address()
  219. power := int64(v.Power)
  220. // mind the overflow from int64
  221. if power < 0 {
  222. return fmt.Errorf("Power (%d) overflows int64", v.Power)
  223. }
  224. _, val := currentSet.GetByAddress(address)
  225. if val == nil {
  226. // add val
  227. added := currentSet.Add(types.NewValidator(pubkey, power))
  228. if !added {
  229. return fmt.Errorf("Failed to add new validator %X with voting power %d", address, power)
  230. }
  231. } else if v.Power == 0 {
  232. // remove val
  233. _, removed := currentSet.Remove(address)
  234. if !removed {
  235. return fmt.Errorf("Failed to remove validator %X", address)
  236. }
  237. } else {
  238. // update val
  239. val.VotingPower = power
  240. updated := currentSet.Update(val)
  241. if !updated {
  242. return fmt.Errorf("Failed to update validator %X with voting power %d", address, power)
  243. }
  244. }
  245. }
  246. return nil
  247. }
  248. func changeInVotingPowerMoreOrEqualToOneThird(currentSet *types.ValidatorSet, updates []*abci.Validator) (bool, error) {
  249. threshold := currentSet.TotalVotingPower() * 1 / 3
  250. acc := int64(0)
  251. for _, v := range updates {
  252. pubkey, err := crypto.PubKeyFromBytes(v.PubKey) // NOTE: expects go-wire encoded pubkey
  253. if err != nil {
  254. return false, err
  255. }
  256. address := pubkey.Address()
  257. power := int64(v.Power)
  258. // mind the overflow from int64
  259. if power < 0 {
  260. return false, fmt.Errorf("Power (%d) overflows int64", v.Power)
  261. }
  262. _, val := currentSet.GetByAddress(address)
  263. if val == nil {
  264. acc += power
  265. } else {
  266. np := val.VotingPower - power
  267. if np < 0 {
  268. np = -np
  269. }
  270. acc += np
  271. }
  272. if acc >= threshold {
  273. return true, nil
  274. }
  275. }
  276. return false, nil
  277. }
  278. // updateState returns a new State updated according to the header and responses.
  279. func updateState(s State, blockID types.BlockID, header *types.Header,
  280. abciResponses *ABCIResponses) (State, error) {
  281. // copy the valset so we can apply changes from EndBlock
  282. // and update s.LastValidators and s.Validators
  283. prevValSet := s.Validators.Copy()
  284. nextValSet := prevValSet.Copy()
  285. // update the validator set with the latest abciResponses
  286. lastHeightValsChanged := s.LastHeightValidatorsChanged
  287. if len(abciResponses.EndBlock.ValidatorUpdates) > 0 {
  288. err := updateValidators(nextValSet, abciResponses.EndBlock.ValidatorUpdates)
  289. if err != nil {
  290. return s, fmt.Errorf("Error changing validator set: %v", err)
  291. }
  292. // change results from this height but only applies to the next height
  293. lastHeightValsChanged = header.Height + 1
  294. }
  295. // Update validator accums and set state variables
  296. nextValSet.IncrementAccum(1)
  297. // update the params with the latest abciResponses
  298. nextParams := s.ConsensusParams
  299. lastHeightParamsChanged := s.LastHeightConsensusParamsChanged
  300. if abciResponses.EndBlock.ConsensusParamUpdates != nil {
  301. // NOTE: must not mutate s.ConsensusParams
  302. nextParams = s.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
  303. err := nextParams.Validate()
  304. if err != nil {
  305. return s, fmt.Errorf("Error updating consensus params: %v", err)
  306. }
  307. // change results from this height but only applies to the next height
  308. lastHeightParamsChanged = header.Height + 1
  309. }
  310. // NOTE: the AppHash has not been populated.
  311. // It will be filled on state.Save.
  312. return State{
  313. ChainID: s.ChainID,
  314. LastBlockHeight: header.Height,
  315. LastBlockTotalTx: s.LastBlockTotalTx + header.NumTxs,
  316. LastBlockID: blockID,
  317. LastBlockTime: header.Time,
  318. Validators: nextValSet,
  319. LastValidators: s.Validators.Copy(),
  320. LastHeightValidatorsChanged: lastHeightValsChanged,
  321. ConsensusParams: nextParams,
  322. LastHeightConsensusParamsChanged: lastHeightParamsChanged,
  323. LastResultsHash: abciResponses.ResultsHash(),
  324. AppHash: nil,
  325. }, nil
  326. }
  327. // Fire NewBlock, NewBlockHeader.
  328. // Fire TxEvent for every tx.
  329. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
  330. func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
  331. // NOTE: do we still need this buffer ?
  332. txEventBuffer := types.NewTxEventBuffer(eventBus, int(block.NumTxs))
  333. for i, tx := range block.Data.Txs {
  334. txEventBuffer.PublishEventTx(types.EventDataTx{types.TxResult{
  335. Height: block.Height,
  336. Index: uint32(i),
  337. Tx: tx,
  338. Result: *(abciResponses.DeliverTx[i]),
  339. }})
  340. }
  341. eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
  342. eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
  343. err := txEventBuffer.Flush()
  344. if err != nil {
  345. logger.Error("Failed to flush event buffer", "err", err)
  346. }
  347. }
  348. //----------------------------------------------------------------------------------------------------
  349. // Execute block without state. TODO: eliminate
  350. // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
  351. // It returns the application root hash (result of abci.Commit).
  352. func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger) ([]byte, error) {
  353. _, err := execBlockOnProxyApp(logger, appConnConsensus, block)
  354. if err != nil {
  355. logger.Error("Error executing block on proxy app", "height", block.Height, "err", err)
  356. return nil, err
  357. }
  358. // Commit block, get hash back
  359. res, err := appConnConsensus.CommitSync()
  360. if err != nil {
  361. logger.Error("Client error during proxyAppConn.CommitSync", "err", res)
  362. return nil, err
  363. }
  364. if res.IsErr() {
  365. logger.Error("Error in proxyAppConn.CommitSync", "err", res)
  366. return nil, res
  367. }
  368. if res.Log != "" {
  369. logger.Info("Commit.Log: " + res.Log)
  370. }
  371. return res.Data, nil
  372. }