You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

109 lines
2.8 KiB

10 years ago
  1. /*
  2. Mempool receives new transactions and applies them to the latest committed state.
  3. If the transaction is acceptable, then it broadcasts the tx to peers.
  4. When this node happens to be the next proposer, it simply uses the recently
  5. modified state (and the associated transactions) to construct a proposal.
  6. */
  7. package mempool
  8. import (
  9. "sync"
  10. "github.com/tendermint/tendermint2/binary"
  11. sm "github.com/tendermint/tendermint2/state"
  12. "github.com/tendermint/tendermint2/types"
  13. )
  14. type Mempool struct {
  15. mtx sync.Mutex
  16. state *sm.State
  17. cache *sm.BlockCache
  18. txs []types.Tx
  19. }
  20. func NewMempool(state *sm.State) *Mempool {
  21. return &Mempool{
  22. state: state,
  23. cache: sm.NewBlockCache(state),
  24. }
  25. }
  26. func (mem *Mempool) GetState() *sm.State {
  27. return mem.state
  28. }
  29. func (mem *Mempool) GetCache() *sm.BlockCache {
  30. return mem.cache
  31. }
  32. // Apply tx to the state and remember it.
  33. func (mem *Mempool) AddTx(tx types.Tx) (err error) {
  34. mem.mtx.Lock()
  35. defer mem.mtx.Unlock()
  36. err = sm.ExecTx(mem.cache, tx, false)
  37. if err != nil {
  38. log.Debug("AddTx() error", "tx", tx, "error", err)
  39. return err
  40. } else {
  41. log.Debug("AddTx() success", "tx", tx)
  42. mem.txs = append(mem.txs, tx)
  43. return nil
  44. }
  45. }
  46. func (mem *Mempool) GetProposalTxs() []types.Tx {
  47. mem.mtx.Lock()
  48. defer mem.mtx.Unlock()
  49. log.Debug("GetProposalTxs:", "txs", mem.txs)
  50. return mem.txs
  51. }
  52. // "block" is the new block being committed.
  53. // "state" is the result of state.AppendBlock("block").
  54. // Txs that are present in "block" are discarded from mempool.
  55. // Txs that have become invalid in the new "state" are also discarded.
  56. func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) {
  57. mem.mtx.Lock()
  58. defer mem.mtx.Unlock()
  59. mem.state = state.Copy()
  60. mem.cache = sm.NewBlockCache(mem.state)
  61. // First, create a lookup map of txns in new block.
  62. blockTxsMap := make(map[string]struct{})
  63. for _, tx := range block.Data.Txs {
  64. txHash := binary.BinarySha256(tx)
  65. blockTxsMap[string(txHash)] = struct{}{}
  66. }
  67. // Next, filter all txs from mem.txs that are in blockTxsMap
  68. txs := []types.Tx{}
  69. for _, tx := range mem.txs {
  70. txHash := binary.BinarySha256(tx)
  71. if _, ok := blockTxsMap[string(txHash)]; ok {
  72. log.Debug("Filter out, already committed", "tx", tx, "txHash", txHash)
  73. continue
  74. } else {
  75. log.Debug("Filter in, still new", "tx", tx, "txHash", txHash)
  76. txs = append(txs, tx)
  77. }
  78. }
  79. // Next, filter all txs that aren't valid given new state.
  80. validTxs := []types.Tx{}
  81. for _, tx := range txs {
  82. err := sm.ExecTx(mem.cache, tx, false)
  83. if err == nil {
  84. log.Debug("Filter in, valid", "tx", tx)
  85. validTxs = append(validTxs, tx)
  86. } else {
  87. // tx is no longer valid.
  88. log.Debug("Filter out, no longer valid", "tx", tx, "error", err)
  89. }
  90. }
  91. // We're done!
  92. log.Debug("New txs", "txs", validTxs, "oldTxs", mem.txs)
  93. mem.txs = validTxs
  94. }