You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

226 lines
6.4 KiB

9 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "sync"
  5. "sync/atomic"
  6. "github.com/tendermint/go-clist"
  7. . "github.com/tendermint/go-common"
  8. "github.com/tendermint/tendermint/proxy"
  9. "github.com/tendermint/tendermint/types"
  10. tmsp "github.com/tendermint/tmsp/types"
  11. )
  12. /*
  13. The mempool pushes new txs onto the proxyAppCtx.
  14. It gets a stream of (req, res) tuples from the proxy.
  15. The memool stores good txs in a concurrent linked-list.
  16. Multiple concurrent go-routines can traverse this linked-list
  17. safely by calling .NextWait() on each element.
  18. So we have several go-routines:
  19. 1. Consensus calling Update() and Reap() synchronously
  20. 2. Many mempool reactor's peer routines calling AppendTx()
  21. 3. Many mempool reactor's peer routines traversing the txs linked list
  22. 4. Another goroutine calling GarbageCollectTxs() periodically
  23. To manage these goroutines, there are three methods of locking.
  24. 1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe)
  25. 2. Mutations to the linked-list elements are atomic
  26. 3. AppendTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx
  27. Garbage collection of old elements from mempool.txs is handlde via
  28. the DetachPrev() call, which makes old elements not reachable by
  29. peer broadcastTxRoutine() automatically garbage collected.
  30. */
  31. type Mempool struct {
  32. proxyMtx sync.Mutex
  33. proxyAppCtx proxy.AppContext
  34. txs *clist.CList // concurrent linked-list of good txs
  35. counter int64 // simple incrementing counter
  36. height int // the last block Update()'d to
  37. expected *clist.CElement // pointer to .txs for next response
  38. }
  39. func NewMempool(proxyAppCtx proxy.AppContext) *Mempool {
  40. mempool := &Mempool{
  41. proxyAppCtx: proxyAppCtx,
  42. txs: clist.New(),
  43. counter: 0,
  44. height: 0,
  45. expected: nil,
  46. }
  47. proxyAppCtx.SetResponseCallback(mempool.resCb)
  48. return mempool
  49. }
  50. // Return the first element of mem.txs for peer goroutines to call .NextWait() on.
  51. // Blocks until txs has elements.
  52. func (mem *Mempool) TxsFrontWait() *clist.CElement {
  53. return mem.txs.FrontWait()
  54. }
  55. // Try a new transaction in the mempool.
  56. // Potentially blocking if we're blocking on Update() or Reap().
  57. func (mem *Mempool) AppendTx(tx types.Tx) (err error) {
  58. mem.proxyMtx.Lock()
  59. defer mem.proxyMtx.Unlock()
  60. if err = mem.proxyAppCtx.Error(); err != nil {
  61. return err
  62. }
  63. mem.proxyAppCtx.AppendTxAsync(tx)
  64. return nil
  65. }
  66. // TMSP callback function
  67. // CONTRACT: No other goroutines mutate mem.expected concurrently.
  68. func (mem *Mempool) resCb(req tmsp.Request, res tmsp.Response) {
  69. switch res := res.(type) {
  70. case tmsp.ResponseAppendTx:
  71. reqAppendTx := req.(tmsp.RequestAppendTx)
  72. if mem.expected == nil { // Normal operation
  73. if res.RetCode == tmsp.RetCodeOK {
  74. mem.counter++
  75. memTx := &mempoolTx{
  76. counter: mem.counter,
  77. height: int64(mem.height),
  78. tx: reqAppendTx.TxBytes,
  79. }
  80. mem.txs.PushBack(memTx)
  81. } else {
  82. // ignore bad transaction
  83. // TODO: handle other retcodes
  84. }
  85. } else { // During Update()
  86. // TODO Log sane warning if mem.expected is nil.
  87. memTx := mem.expected.Value.(*mempoolTx)
  88. if !bytes.Equal(reqAppendTx.TxBytes, memTx.tx) {
  89. PanicSanity("Unexpected tx response from proxy")
  90. }
  91. if res.RetCode == tmsp.RetCodeOK {
  92. // Good, nothing to do.
  93. } else {
  94. // TODO: handle other retcodes
  95. // Tx became invalidated due to newly committed block.
  96. // NOTE: Concurrent traversal of mem.txs via CElement.Next() still works.
  97. mem.txs.Remove(mem.expected)
  98. mem.expected.DetachPrev()
  99. }
  100. mem.expected = mem.expected.Next()
  101. }
  102. default:
  103. // ignore other messages
  104. }
  105. }
  106. // Get the valid transactions run so far, and the hash of
  107. // the application state that results from those transactions.
  108. func (mem *Mempool) Reap() ([]types.Tx, []byte, error) {
  109. mem.proxyMtx.Lock()
  110. defer mem.proxyMtx.Unlock()
  111. // First, get the hash of txs run so far
  112. hash, err := mem.proxyAppCtx.GetHashSync()
  113. if err != nil {
  114. return nil, nil, err
  115. }
  116. // And collect all the transactions.
  117. txs := mem.collectTxs()
  118. return txs, hash, nil
  119. }
  120. func (mem *Mempool) collectTxs() []types.Tx {
  121. txs := make([]types.Tx, 0, mem.txs.Len())
  122. for e := mem.txs.Front(); e != nil; e = e.Next() {
  123. memTx := e.Value.(*mempoolTx)
  124. txs = append(txs, memTx.tx)
  125. }
  126. return txs
  127. }
  128. // "block" is the new block that was committed.
  129. // Txs that are present in "block" are discarded from mempool.
  130. // NOTE: this should be called *after* block is committed by consensus.
  131. // CONTRACT: block is valid and next in sequence.
  132. func (mem *Mempool) Update(block *types.Block) error {
  133. mem.proxyMtx.Lock()
  134. defer mem.proxyMtx.Unlock()
  135. // Rollback mempool synchronously
  136. // TODO: test that proxyAppCtx's state matches the block's
  137. err := mem.proxyAppCtx.RollbackSync()
  138. if err != nil {
  139. return err
  140. }
  141. // First, create a lookup map of txns in new block.
  142. blockTxsMap := make(map[string]struct{})
  143. for _, tx := range block.Data.Txs {
  144. blockTxsMap[string(tx)] = struct{}{}
  145. }
  146. // Remove transactions that are already in block.
  147. // Return the remaining potentially good txs.
  148. goodTxs := mem.filterTxs(block.Height, blockTxsMap)
  149. // Set height and expected
  150. mem.height = block.Height
  151. mem.expected = mem.txs.Front()
  152. // Push good txs to proxyAppCtx
  153. // NOTE: resCb() may be called concurrently.
  154. for _, tx := range goodTxs {
  155. mem.proxyAppCtx.AppendTxAsync(tx)
  156. if err := mem.proxyAppCtx.Error(); err != nil {
  157. return err
  158. }
  159. }
  160. // NOTE: Even though we return immediately without e.g.
  161. // calling mem.proxyAppCtx.FlushSync(),
  162. // New mempool txs will still have to wait until
  163. // all goodTxs are re-processed.
  164. // So we could make synchronous calls here to proxyAppCtx.
  165. return nil
  166. }
  167. func (mem *Mempool) filterTxs(height int, blockTxsMap map[string]struct{}) []types.Tx {
  168. goodTxs := make([]types.Tx, 0, mem.txs.Len())
  169. for e := mem.txs.Front(); e != nil; e = e.Next() {
  170. memTx := e.Value.(*mempoolTx)
  171. if _, ok := blockTxsMap[string(memTx.tx)]; ok {
  172. // Remove the tx since already in block.
  173. mem.txs.Remove(e)
  174. e.DetachPrev()
  175. continue
  176. }
  177. // Good tx!
  178. atomic.StoreInt64(&memTx.height, int64(height))
  179. goodTxs = append(goodTxs, memTx.tx)
  180. }
  181. return goodTxs
  182. }
  183. //--------------------------------------------------------------------------------
  184. // A transaction that successfully ran
  185. type mempoolTx struct {
  186. counter int64 // a simple incrementing counter
  187. height int64 // height that this tx had been validated in
  188. tx types.Tx //
  189. }
  190. func (memTx *mempoolTx) Height() int {
  191. return int(atomic.LoadInt64(&memTx.height))
  192. }