You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

249 lines
7.0 KiB

9 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "container/list"
  5. "sync"
  6. "sync/atomic"
  7. "github.com/tendermint/go-clist"
  8. . "github.com/tendermint/go-common"
  9. "github.com/tendermint/tendermint/proxy"
  10. "github.com/tendermint/tendermint/types"
  11. tmsp "github.com/tendermint/tmsp/types"
  12. )
  13. /*
  14. The mempool pushes new txs onto the proxyAppCtx.
  15. It gets a stream of (req, res) tuples from the proxy.
  16. The memool stores good txs in a concurrent linked-list.
  17. Multiple concurrent go-routines can traverse this linked-list
  18. safely by calling .NextWait() on each element.
  19. So we have several go-routines:
  20. 1. Consensus calling Update() and Reap() synchronously
  21. 2. Many mempool reactor's peer routines calling AppendTx()
  22. 3. Many mempool reactor's peer routines traversing the txs linked list
  23. 4. Another goroutine calling GarbageCollectTxs() periodically
  24. To manage these goroutines, there are three methods of locking.
  25. 1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe)
  26. 2. Mutations to the linked-list elements are atomic
  27. 3. AppendTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx
  28. Garbage collection of old elements from mempool.txs is handlde via
  29. the DetachPrev() call, which makes old elements not reachable by
  30. peer broadcastTxRoutine() automatically garbage collected.
  31. */
  32. const cacheSize = 100000
  33. type Mempool struct {
  34. proxyMtx sync.Mutex
  35. proxyAppCtx proxy.AppContext
  36. txs *clist.CList // concurrent linked-list of good txs
  37. counter int64 // simple incrementing counter
  38. height int // the last block Update()'d to
  39. expected *clist.CElement // pointer to .txs for next response
  40. // Keep a cache of already-seen txs.
  41. // This reduces the pressure on the proxyApp.
  42. cacheMap map[string]struct{}
  43. cacheList *list.List
  44. }
  45. func NewMempool(proxyAppCtx proxy.AppContext) *Mempool {
  46. mempool := &Mempool{
  47. proxyAppCtx: proxyAppCtx,
  48. txs: clist.New(),
  49. counter: 0,
  50. height: 0,
  51. expected: nil,
  52. cacheMap: make(map[string]struct{}, cacheSize),
  53. cacheList: list.New(),
  54. }
  55. proxyAppCtx.SetResponseCallback(mempool.resCb)
  56. return mempool
  57. }
  58. // Return the first element of mem.txs for peer goroutines to call .NextWait() on.
  59. // Blocks until txs has elements.
  60. func (mem *Mempool) TxsFrontWait() *clist.CElement {
  61. return mem.txs.FrontWait()
  62. }
  63. // Try a new transaction in the mempool.
  64. // Potentially blocking if we're blocking on Update() or Reap().
  65. func (mem *Mempool) AppendTx(tx types.Tx) (err error) {
  66. mem.proxyMtx.Lock()
  67. defer mem.proxyMtx.Unlock()
  68. // CACHE
  69. if _, exists := mem.cacheMap[string(tx)]; exists {
  70. return nil
  71. }
  72. if mem.cacheList.Len() >= cacheSize {
  73. popped := mem.cacheList.Front()
  74. poppedTx := popped.Value.(types.Tx)
  75. delete(mem.cacheMap, string(poppedTx))
  76. mem.cacheList.Remove(popped)
  77. }
  78. mem.cacheMap[string(tx)] = struct{}{}
  79. mem.cacheList.PushBack(tx)
  80. // END CACHE
  81. if err = mem.proxyAppCtx.Error(); err != nil {
  82. return err
  83. }
  84. mem.proxyAppCtx.AppendTxAsync(tx)
  85. return nil
  86. }
  87. // TMSP callback function
  88. // CONTRACT: No other goroutines mutate mem.expected concurrently.
  89. func (mem *Mempool) resCb(req tmsp.Request, res tmsp.Response) {
  90. switch res := res.(type) {
  91. case tmsp.ResponseAppendTx:
  92. reqAppendTx := req.(tmsp.RequestAppendTx)
  93. if mem.expected == nil { // Normal operation
  94. if res.RetCode == tmsp.RetCodeOK {
  95. mem.counter++
  96. memTx := &mempoolTx{
  97. counter: mem.counter,
  98. height: int64(mem.height),
  99. tx: reqAppendTx.TxBytes,
  100. }
  101. mem.txs.PushBack(memTx)
  102. } else {
  103. // ignore bad transaction
  104. // TODO: handle other retcodes
  105. }
  106. } else { // During Update()
  107. // TODO Log sane warning if mem.expected is nil.
  108. memTx := mem.expected.Value.(*mempoolTx)
  109. if !bytes.Equal(reqAppendTx.TxBytes, memTx.tx) {
  110. PanicSanity("Unexpected tx response from proxy")
  111. }
  112. if res.RetCode == tmsp.RetCodeOK {
  113. // Good, nothing to do.
  114. } else {
  115. // TODO: handle other retcodes
  116. // Tx became invalidated due to newly committed block.
  117. // NOTE: Concurrent traversal of mem.txs via CElement.Next() still works.
  118. mem.txs.Remove(mem.expected)
  119. mem.expected.DetachPrev()
  120. }
  121. mem.expected = mem.expected.Next()
  122. }
  123. default:
  124. // ignore other messages
  125. }
  126. }
  127. // Get the valid transactions run so far, and the hash of
  128. // the application state that results from those transactions.
  129. func (mem *Mempool) Reap() ([]types.Tx, []byte, error) {
  130. mem.proxyMtx.Lock()
  131. defer mem.proxyMtx.Unlock()
  132. // First, get the hash of txs run so far
  133. hash, err := mem.proxyAppCtx.GetHashSync()
  134. if err != nil {
  135. return nil, nil, err
  136. }
  137. // And collect all the transactions.
  138. txs := mem.collectTxs()
  139. return txs, hash, nil
  140. }
  141. func (mem *Mempool) collectTxs() []types.Tx {
  142. txs := make([]types.Tx, 0, mem.txs.Len())
  143. for e := mem.txs.Front(); e != nil; e = e.Next() {
  144. memTx := e.Value.(*mempoolTx)
  145. txs = append(txs, memTx.tx)
  146. }
  147. return txs
  148. }
  149. // "block" is the new block that was committed.
  150. // Txs that are present in "block" are discarded from mempool.
  151. // NOTE: this should be called *after* block is committed by consensus.
  152. // CONTRACT: block is valid and next in sequence.
  153. func (mem *Mempool) Update(block *types.Block) error {
  154. mem.proxyMtx.Lock()
  155. defer mem.proxyMtx.Unlock()
  156. // Rollback mempool synchronously
  157. // TODO: test that proxyAppCtx's state matches the block's
  158. err := mem.proxyAppCtx.RollbackSync()
  159. if err != nil {
  160. return err
  161. }
  162. // First, create a lookup map of txns in new block.
  163. blockTxsMap := make(map[string]struct{})
  164. for _, tx := range block.Data.Txs {
  165. blockTxsMap[string(tx)] = struct{}{}
  166. }
  167. // Remove transactions that are already in block.
  168. // Return the remaining potentially good txs.
  169. goodTxs := mem.filterTxs(block.Height, blockTxsMap)
  170. // Set height and expected
  171. mem.height = block.Height
  172. mem.expected = mem.txs.Front()
  173. // Push good txs to proxyAppCtx
  174. // NOTE: resCb() may be called concurrently.
  175. for _, tx := range goodTxs {
  176. mem.proxyAppCtx.AppendTxAsync(tx)
  177. if err := mem.proxyAppCtx.Error(); err != nil {
  178. return err
  179. }
  180. }
  181. // NOTE: Even though we return immediately without e.g.
  182. // calling mem.proxyAppCtx.FlushSync(),
  183. // New mempool txs will still have to wait until
  184. // all goodTxs are re-processed.
  185. // So we could make synchronous calls here to proxyAppCtx.
  186. return nil
  187. }
  188. func (mem *Mempool) filterTxs(height int, blockTxsMap map[string]struct{}) []types.Tx {
  189. goodTxs := make([]types.Tx, 0, mem.txs.Len())
  190. for e := mem.txs.Front(); e != nil; e = e.Next() {
  191. memTx := e.Value.(*mempoolTx)
  192. if _, ok := blockTxsMap[string(memTx.tx)]; ok {
  193. // Remove the tx since already in block.
  194. mem.txs.Remove(e)
  195. e.DetachPrev()
  196. continue
  197. }
  198. // Good tx!
  199. atomic.StoreInt64(&memTx.height, int64(height))
  200. goodTxs = append(goodTxs, memTx.tx)
  201. }
  202. return goodTxs
  203. }
  204. //--------------------------------------------------------------------------------
  205. // A transaction that successfully ran
  206. type mempoolTx struct {
  207. counter int64 // a simple incrementing counter
  208. height int64 // height that this tx had been validated in
  209. tx types.Tx //
  210. }
  211. func (memTx *mempoolTx) Height() int {
  212. return int(atomic.LoadInt64(&memTx.height))
  213. }