You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

301 lines
8.6 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package state
  2. import (
  3. "errors"
  4. "fmt"
  5. fail "github.com/ebuchman/fail-test"
  6. abci "github.com/tendermint/abci/types"
  7. . "github.com/tendermint/go-common"
  8. crypto "github.com/tendermint/go-crypto"
  9. "github.com/tendermint/tendermint/proxy"
  10. "github.com/tendermint/tendermint/state/txindex"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. //--------------------------------------------------
  14. // Execute the block
  15. // ExecBlock executes the block, but does NOT mutate State.
  16. // + validates the block
  17. // + executes block.Txs on the proxyAppConn
  18. func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) {
  19. // Validate the block.
  20. if err := s.validateBlock(block); err != nil {
  21. return nil, ErrInvalidBlock(err)
  22. }
  23. // Execute the block txs
  24. abciResponses, err := execBlockOnProxyApp(eventCache, proxyAppConn, block)
  25. if err != nil {
  26. // There was some error in proxyApp
  27. // TODO Report error and wait for proxyApp to be available.
  28. return nil, ErrProxyAppConn(err)
  29. }
  30. return abciResponses, nil
  31. }
  32. // Executes block's transactions on proxyAppConn.
  33. // Returns a list of transaction results and updates to the validator set
  34. // TODO: Generate a bitmap or otherwise store tx validity in state.
  35. func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) {
  36. var validTxs, invalidTxs = 0, 0
  37. txIndex := 0
  38. abciResponses := NewABCIResponses(block)
  39. // Execute transactions and get hash
  40. proxyCb := func(req *abci.Request, res *abci.Response) {
  41. switch r := res.Value.(type) {
  42. case *abci.Response_DeliverTx:
  43. // TODO: make use of res.Log
  44. // TODO: make use of this info
  45. // Blocks may include invalid txs.
  46. // reqDeliverTx := req.(abci.RequestDeliverTx)
  47. txError := ""
  48. txResult := r.DeliverTx
  49. if txResult.Code == abci.CodeType_OK {
  50. validTxs++
  51. } else {
  52. log.Debug("Invalid tx", "code", txResult.Code, "log", txResult.Log)
  53. invalidTxs++
  54. txError = txResult.Code.String()
  55. }
  56. abciResponses.TxResults[txIndex] = &types.TxResult{uint64(block.Height), uint32(txIndex), *txResult}
  57. txIndex++
  58. // NOTE: if we count we can access the tx from the block instead of
  59. // pulling it from the req
  60. event := types.EventDataTx{
  61. Height: block.Height,
  62. Tx: types.Tx(req.GetDeliverTx().Tx),
  63. Data: txResult.Data,
  64. Code: txResult.Code,
  65. Log: txResult.Log,
  66. Error: txError,
  67. }
  68. types.FireEventTx(eventCache, event)
  69. }
  70. }
  71. proxyAppConn.SetResponseCallback(proxyCb)
  72. // Begin block
  73. err := proxyAppConn.BeginBlockSync(block.Hash(), types.TM2PB.Header(block.Header))
  74. if err != nil {
  75. log.Warn("Error in proxyAppConn.BeginBlock", "error", err)
  76. return nil, err
  77. }
  78. fail.Fail() // XXX
  79. // Run txs of block
  80. for _, tx := range block.Txs {
  81. fail.FailRand(len(block.Txs)) // XXX
  82. proxyAppConn.DeliverTxAsync(tx)
  83. if err := proxyAppConn.Error(); err != nil {
  84. return nil, err
  85. }
  86. }
  87. fail.Fail() // XXX
  88. // End block
  89. respEndBlock, err := proxyAppConn.EndBlockSync(uint64(block.Height))
  90. if err != nil {
  91. log.Warn("Error in proxyAppConn.EndBlock", "error", err)
  92. return nil, err
  93. }
  94. fail.Fail() // XXX
  95. log.Info("Executed block", "height", block.Height, "valid txs", validTxs, "invalid txs", invalidTxs)
  96. if len(respEndBlock.Diffs) > 0 {
  97. log.Info("Update to validator set", "updates", abci.ValidatorsString(respEndBlock.Diffs))
  98. }
  99. abciResponses.Validators = respEndBlock.Diffs
  100. return abciResponses, nil
  101. }
  102. func updateValidators(validators *types.ValidatorSet, changedValidators []*abci.Validator) error {
  103. // TODO: prevent change of 1/3+ at once
  104. for _, v := range changedValidators {
  105. pubkey, err := crypto.PubKeyFromBytes(v.PubKey) // NOTE: expects go-wire encoded pubkey
  106. if err != nil {
  107. return err
  108. }
  109. address := pubkey.Address()
  110. power := int64(v.Power)
  111. // mind the overflow from uint64
  112. if power < 0 {
  113. return errors.New(Fmt("Power (%d) overflows int64", v.Power))
  114. }
  115. _, val := validators.GetByAddress(address)
  116. if val == nil {
  117. // add val
  118. added := validators.Add(types.NewValidator(pubkey, power))
  119. if !added {
  120. return errors.New(Fmt("Failed to add new validator %X with voting power %d", address, power))
  121. }
  122. } else if v.Power == 0 {
  123. // remove val
  124. _, removed := validators.Remove(address)
  125. if !removed {
  126. return errors.New(Fmt("Failed to remove validator %X)"))
  127. }
  128. } else {
  129. // update val
  130. val.VotingPower = power
  131. updated := validators.Update(val)
  132. if !updated {
  133. return errors.New(Fmt("Failed to update validator %X with voting power %d", address, power))
  134. }
  135. }
  136. }
  137. return nil
  138. }
  139. // return a bit array of validators that signed the last commit
  140. // NOTE: assumes commits have already been authenticated
  141. func commitBitArrayFromBlock(block *types.Block) *BitArray {
  142. signed := NewBitArray(len(block.LastCommit.Precommits))
  143. for i, precommit := range block.LastCommit.Precommits {
  144. if precommit != nil {
  145. signed.SetIndex(i, true) // val_.LastCommitHeight = block.Height - 1
  146. }
  147. }
  148. return signed
  149. }
  150. //-----------------------------------------------------
  151. // Validate block
  152. func (s *State) ValidateBlock(block *types.Block) error {
  153. return s.validateBlock(block)
  154. }
  155. func (s *State) validateBlock(block *types.Block) error {
  156. // Basic block validation.
  157. err := block.ValidateBasic(s.ChainID, s.LastBlockHeight, s.LastBlockID, s.LastBlockTime, s.AppHash)
  158. if err != nil {
  159. return err
  160. }
  161. // Validate block LastCommit.
  162. if block.Height == 1 {
  163. if len(block.LastCommit.Precommits) != 0 {
  164. return errors.New("Block at height 1 (first block) should have no LastCommit precommits")
  165. }
  166. } else {
  167. if len(block.LastCommit.Precommits) != s.LastValidators.Size() {
  168. return errors.New(Fmt("Invalid block commit size. Expected %v, got %v",
  169. s.LastValidators.Size(), len(block.LastCommit.Precommits)))
  170. }
  171. err := s.LastValidators.VerifyCommit(
  172. s.ChainID, s.LastBlockID, block.Height-1, block.LastCommit)
  173. if err != nil {
  174. return err
  175. }
  176. }
  177. return nil
  178. }
  179. //-----------------------------------------------------------------------------
  180. // ApplyBlock executes the block, updates state w/ ABCI responses,
  181. // then commits and updates the mempool atomically, then saves state.
  182. // Transaction results are optionally indexed.
  183. // Execute and commit block against app, save block and state
  184. func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus,
  185. block *types.Block, partsHeader types.PartSetHeader, mempool types.Mempool) error {
  186. abciResponses, err := s.ExecBlock(eventCache, proxyAppConn, block)
  187. if err != nil {
  188. return fmt.Errorf("Exec failed for application: %v", err)
  189. }
  190. fail.Fail() // XXX
  191. // save the results before we commit
  192. s.SaveABCIResponses(abciResponses)
  193. fail.Fail() // XXX
  194. // now update the block and validators
  195. s.SetBlockAndValidators(block.Header, partsHeader)
  196. // lock mempool, commit state, update mempoool
  197. err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool)
  198. if err != nil {
  199. return fmt.Errorf("Commit failed for application: %v", err)
  200. }
  201. batch := txindex.NewBatch(block.NumTxs)
  202. for _, r := range txResults {
  203. batch.Add(*r)
  204. }
  205. s.TxIndexer.AddBatch(batch)
  206. fail.Fail() // XXX
  207. // save the state
  208. s.Save()
  209. return nil
  210. }
  211. // mempool must be locked during commit and update
  212. // because state is typically reset on Commit and old txs must be replayed
  213. // against committed state before new txs are run in the mempool, lest they be invalid
  214. func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, block *types.Block, mempool types.Mempool) error {
  215. mempool.Lock()
  216. defer mempool.Unlock()
  217. // Commit block, get hash back
  218. res := proxyAppConn.CommitSync()
  219. if res.IsErr() {
  220. log.Warn("Error in proxyAppConn.CommitSync", "error", res)
  221. return res
  222. }
  223. if res.Log != "" {
  224. log.Debug("Commit.Log: " + res.Log)
  225. }
  226. log.Info("Committed state", "hash", res.Data)
  227. // Set the state's new AppHash
  228. s.AppHash = res.Data
  229. // Update mempool.
  230. mempool.Update(block.Height, block.Txs)
  231. return nil
  232. }
  233. // Apply and commit a block, but without all the state validation.
  234. // Returns the application root hash (result of abci.Commit)
  235. // TODO handle abciResponses
  236. func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) {
  237. var eventCache types.Fireable // nil
  238. _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block)
  239. if err != nil {
  240. log.Warn("Error executing block on proxy app", "height", block.Height, "err", err)
  241. return nil, err
  242. }
  243. // Commit block, get hash back
  244. res := appConnConsensus.CommitSync()
  245. if res.IsErr() {
  246. log.Warn("Error in proxyAppConn.CommitSync", "error", res)
  247. return nil, res
  248. }
  249. if res.Log != "" {
  250. log.Info("Commit.Log: " + res.Log)
  251. }
  252. return res.Data, nil
  253. }