You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

744 lines
22 KiB

  1. package mempool
  2. import (
  3. "bytes"
  4. "container/list"
  5. "crypto/sha256"
  6. "fmt"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/pkg/errors"
  11. abci "github.com/tendermint/tendermint/abci/types"
  12. cfg "github.com/tendermint/tendermint/config"
  13. auto "github.com/tendermint/tendermint/libs/autofile"
  14. "github.com/tendermint/tendermint/libs/clist"
  15. cmn "github.com/tendermint/tendermint/libs/common"
  16. "github.com/tendermint/tendermint/libs/log"
  17. "github.com/tendermint/tendermint/proxy"
  18. "github.com/tendermint/tendermint/types"
  19. )
  20. //--------------------------------------------------------------------------------
  21. // CListMempool is an ordered in-memory pool for transactions before they are
  22. // proposed in a consensus round. Transaction validity is checked using the
  23. // CheckTx abci message before the transaction is added to the pool. The
  24. // mempool uses a concurrent list structure for storing transactions that can
  25. // be efficiently accessed by multiple concurrent readers.
  26. type CListMempool struct {
  27. config *cfg.MempoolConfig
  28. proxyMtx sync.Mutex
  29. proxyAppConn proxy.AppConnMempool
  30. txs *clist.CList // concurrent linked-list of good txs
  31. preCheck PreCheckFunc
  32. postCheck PostCheckFunc
  33. // Track whether we're rechecking txs.
  34. // These are not protected by a mutex and are expected to be mutated
  35. // in serial (ie. by abci responses which are called in serial).
  36. recheckCursor *clist.CElement // next expected response
  37. recheckEnd *clist.CElement // re-checking stops here
  38. // notify listeners (ie. consensus) when txs are available
  39. notifiedTxsAvailable bool
  40. txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
  41. // Map for quick access to txs to record sender in CheckTx.
  42. // txsMap: txKey -> CElement
  43. txsMap sync.Map
  44. // Atomic integers
  45. height int64 // the last block Update()'d to
  46. rechecking int32 // for re-checking filtered txs on Update()
  47. txsBytes int64 // total size of mempool, in bytes
  48. // Keep a cache of already-seen txs.
  49. // This reduces the pressure on the proxyApp.
  50. cache txCache
  51. // A log of mempool txs
  52. wal *auto.AutoFile
  53. logger log.Logger
  54. metrics *Metrics
  55. }
  56. var _ Mempool = &CListMempool{}
  57. // Option sets an optional parameter on the mempool.
  58. type Option func(*CListMempool)
  59. // NewCListMempool returns a new mempool with the given configuration and connection to an application.
  60. func NewCListMempool(
  61. config *cfg.MempoolConfig,
  62. proxyAppConn proxy.AppConnMempool,
  63. height int64,
  64. options ...Option,
  65. ) *CListMempool {
  66. mempool := &CListMempool{
  67. config: config,
  68. proxyAppConn: proxyAppConn,
  69. txs: clist.New(),
  70. height: height,
  71. rechecking: 0,
  72. recheckCursor: nil,
  73. recheckEnd: nil,
  74. logger: log.NewNopLogger(),
  75. metrics: NopMetrics(),
  76. }
  77. if config.CacheSize > 0 {
  78. mempool.cache = newMapTxCache(config.CacheSize)
  79. } else {
  80. mempool.cache = nopTxCache{}
  81. }
  82. proxyAppConn.SetResponseCallback(mempool.globalCb)
  83. for _, option := range options {
  84. option(mempool)
  85. }
  86. return mempool
  87. }
  88. // EnableTxsAvailable initializes the TxsAvailable channel,
  89. // ensuring it will trigger once every height when transactions are available.
  90. // NOTE: not thread safe - should only be called once, on startup
  91. func (mem *CListMempool) EnableTxsAvailable() {
  92. mem.txsAvailable = make(chan struct{}, 1)
  93. }
  94. // SetLogger sets the Logger.
  95. func (mem *CListMempool) SetLogger(l log.Logger) {
  96. mem.logger = l
  97. }
  98. // WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns
  99. // false. This is ran before CheckTx.
  100. func WithPreCheck(f PreCheckFunc) Option {
  101. return func(mem *CListMempool) { mem.preCheck = f }
  102. }
  103. // WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns
  104. // false. This is ran after CheckTx.
  105. func WithPostCheck(f PostCheckFunc) Option {
  106. return func(mem *CListMempool) { mem.postCheck = f }
  107. }
  108. // WithMetrics sets the metrics.
  109. func WithMetrics(metrics *Metrics) Option {
  110. return func(mem *CListMempool) { mem.metrics = metrics }
  111. }
  112. // InitWAL creates a directory for the WAL file and opens a file itself.
  113. //
  114. // *panics* if can't create directory or open file.
  115. // *not thread safe*
  116. func (mem *CListMempool) InitWAL() {
  117. walDir := mem.config.WalDir()
  118. err := cmn.EnsureDir(walDir, 0700)
  119. if err != nil {
  120. panic(errors.Wrap(err, "Error ensuring WAL dir"))
  121. }
  122. af, err := auto.OpenAutoFile(walDir + "/wal")
  123. if err != nil {
  124. panic(errors.Wrap(err, "Error opening WAL file"))
  125. }
  126. mem.wal = af
  127. }
  128. // CloseWAL closes and discards the underlying WAL file.
  129. // Any further writes will not be relayed to disk.
  130. func (mem *CListMempool) CloseWAL() {
  131. mem.proxyMtx.Lock()
  132. defer mem.proxyMtx.Unlock()
  133. if err := mem.wal.Close(); err != nil {
  134. mem.logger.Error("Error closing WAL", "err", err)
  135. }
  136. mem.wal = nil
  137. }
  138. // Lock locks the mempool. The consensus must be able to hold lock to safely update.
  139. func (mem *CListMempool) Lock() {
  140. mem.proxyMtx.Lock()
  141. }
  142. // Unlock unlocks the mempool.
  143. func (mem *CListMempool) Unlock() {
  144. mem.proxyMtx.Unlock()
  145. }
  146. // Size returns the number of transactions in the mempool.
  147. func (mem *CListMempool) Size() int {
  148. return mem.txs.Len()
  149. }
  150. // TxsBytes returns the total size of all txs in the mempool.
  151. func (mem *CListMempool) TxsBytes() int64 {
  152. return atomic.LoadInt64(&mem.txsBytes)
  153. }
  154. // FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
  155. // done. E.g. from CheckTx.
  156. func (mem *CListMempool) FlushAppConn() error {
  157. return mem.proxyAppConn.FlushSync()
  158. }
  159. // Flush removes all transactions from the mempool and cache
  160. func (mem *CListMempool) Flush() {
  161. mem.proxyMtx.Lock()
  162. defer mem.proxyMtx.Unlock()
  163. mem.cache.Reset()
  164. for e := mem.txs.Front(); e != nil; e = e.Next() {
  165. mem.txs.Remove(e)
  166. e.DetachPrev()
  167. }
  168. mem.txsMap = sync.Map{}
  169. _ = atomic.SwapInt64(&mem.txsBytes, 0)
  170. }
  171. // TxsFront returns the first transaction in the ordered list for peer
  172. // goroutines to call .NextWait() on.
  173. func (mem *CListMempool) TxsFront() *clist.CElement {
  174. return mem.txs.Front()
  175. }
  176. // TxsWaitChan returns a channel to wait on transactions. It will be closed
  177. // once the mempool is not empty (ie. the internal `mem.txs` has at least one
  178. // element)
  179. func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
  180. return mem.txs.WaitChan()
  181. }
  182. // CheckTx executes a new transaction against the application to determine its validity
  183. // and whether it should be added to the mempool.
  184. // It blocks if we're waiting on Update() or Reap().
  185. // cb: A callback from the CheckTx command.
  186. // It gets called from another goroutine.
  187. // CONTRACT: Either cb will get called, or err returned.
  188. func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
  189. return mem.CheckTxWithInfo(tx, cb, TxInfo{PeerID: UnknownPeerID})
  190. }
  191. // CheckTxWithInfo performs the same operation as CheckTx, but with extra meta data about the tx.
  192. // Currently this metadata is the peer who sent it,
  193. // used to prevent the tx from being gossiped back to them.
  194. func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) {
  195. mem.proxyMtx.Lock()
  196. // use defer to unlock mutex because application (*local client*) might panic
  197. defer mem.proxyMtx.Unlock()
  198. var (
  199. memSize = mem.Size()
  200. txsBytes = mem.TxsBytes()
  201. )
  202. if memSize >= mem.config.Size ||
  203. int64(len(tx))+txsBytes > mem.config.MaxTxsBytes {
  204. return ErrMempoolIsFull{
  205. memSize, mem.config.Size,
  206. txsBytes, mem.config.MaxTxsBytes}
  207. }
  208. // The size of the corresponding amino-encoded TxMessage
  209. // can't be larger than the maxMsgSize, otherwise we can't
  210. // relay it to peers.
  211. if len(tx) > maxTxSize {
  212. return ErrTxTooLarge
  213. }
  214. if mem.preCheck != nil {
  215. if err := mem.preCheck(tx); err != nil {
  216. return ErrPreCheck{err}
  217. }
  218. }
  219. // CACHE
  220. if !mem.cache.Push(tx) {
  221. // Record a new sender for a tx we've already seen.
  222. // Note it's possible a tx is still in the cache but no longer in the mempool
  223. // (eg. after committing a block, txs are removed from mempool but not cache),
  224. // so we only record the sender for txs still in the mempool.
  225. if e, ok := mem.txsMap.Load(txKey(tx)); ok {
  226. memTx := e.(*clist.CElement).Value.(*mempoolTx)
  227. if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded {
  228. // TODO: consider punishing peer for dups,
  229. // its non-trivial since invalid txs can become valid,
  230. // but they can spam the same tx with little cost to them atm.
  231. }
  232. }
  233. return ErrTxInCache
  234. }
  235. // END CACHE
  236. // WAL
  237. if mem.wal != nil {
  238. // TODO: Notify administrators when WAL fails
  239. _, err := mem.wal.Write([]byte(tx))
  240. if err != nil {
  241. mem.logger.Error("Error writing to WAL", "err", err)
  242. }
  243. _, err = mem.wal.Write([]byte("\n"))
  244. if err != nil {
  245. mem.logger.Error("Error writing to WAL", "err", err)
  246. }
  247. }
  248. // END WAL
  249. // NOTE: proxyAppConn may error if tx buffer is full
  250. if err = mem.proxyAppConn.Error(); err != nil {
  251. return err
  252. }
  253. reqRes := mem.proxyAppConn.CheckTxAsync(tx)
  254. reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID, cb))
  255. return nil
  256. }
  257. // Global callback that will be called after every ABCI response.
  258. // Having a single global callback avoids needing to set a callback for each request.
  259. // However, processing the checkTx response requires the peerID (so we can track which txs we heard from who),
  260. // and peerID is not included in the ABCI request, so we have to set request-specific callbacks that
  261. // include this information. If we're not in the midst of a recheck, this function will just return,
  262. // so the request specific callback can do the work.
  263. // When rechecking, we don't need the peerID, so the recheck callback happens here.
  264. func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {
  265. if mem.recheckCursor == nil {
  266. return
  267. }
  268. mem.metrics.RecheckTimes.Add(1)
  269. mem.resCbRecheck(req, res)
  270. // update metrics
  271. mem.metrics.Size.Set(float64(mem.Size()))
  272. }
  273. // Request specific callback that should be set on individual reqRes objects
  274. // to incorporate local information when processing the response.
  275. // This allows us to track the peer that sent us this tx, so we can avoid sending it back to them.
  276. // NOTE: alternatively, we could include this information in the ABCI request itself.
  277. //
  278. // External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called
  279. // when all other response processing is complete.
  280. //
  281. // Used in CheckTxWithInfo to record PeerID who sent us the tx.
  282. func (mem *CListMempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Response)) func(res *abci.Response) {
  283. return func(res *abci.Response) {
  284. if mem.recheckCursor != nil {
  285. // this should never happen
  286. panic("recheck cursor is not nil in reqResCb")
  287. }
  288. mem.resCbFirstTime(tx, peerID, res)
  289. // update metrics
  290. mem.metrics.Size.Set(float64(mem.Size()))
  291. // passed in by the caller of CheckTx, eg. the RPC
  292. if externalCb != nil {
  293. externalCb(res)
  294. }
  295. }
  296. }
  297. // Called from:
  298. // - resCbFirstTime (lock not held) if tx is valid
  299. func (mem *CListMempool) addTx(memTx *mempoolTx) {
  300. e := mem.txs.PushBack(memTx)
  301. mem.txsMap.Store(txKey(memTx.tx), e)
  302. atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx)))
  303. mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
  304. }
  305. // Called from:
  306. // - Update (lock held) if tx was committed
  307. // - resCbRecheck (lock not held) if tx was invalidated
  308. func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) {
  309. mem.txs.Remove(elem)
  310. elem.DetachPrev()
  311. mem.txsMap.Delete(txKey(tx))
  312. atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))
  313. if removeFromCache {
  314. mem.cache.Remove(tx)
  315. }
  316. }
  317. // callback, which is called after the app checked the tx for the first time.
  318. //
  319. // The case where the app checks the tx for the second and subsequent times is
  320. // handled by the resCbRecheck callback.
  321. func (mem *CListMempool) resCbFirstTime(tx []byte, peerID uint16, res *abci.Response) {
  322. switch r := res.Value.(type) {
  323. case *abci.Response_CheckTx:
  324. var postCheckErr error
  325. if mem.postCheck != nil {
  326. postCheckErr = mem.postCheck(tx, r.CheckTx)
  327. }
  328. if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
  329. memTx := &mempoolTx{
  330. height: mem.height,
  331. gasWanted: r.CheckTx.GasWanted,
  332. tx: tx,
  333. }
  334. memTx.senders.Store(peerID, true)
  335. mem.addTx(memTx)
  336. mem.logger.Info("Added good transaction",
  337. "tx", txID(tx),
  338. "res", r,
  339. "height", memTx.height,
  340. "total", mem.Size(),
  341. )
  342. mem.notifyTxsAvailable()
  343. } else {
  344. // ignore bad transaction
  345. mem.logger.Info("Rejected bad transaction", "tx", txID(tx), "res", r, "err", postCheckErr)
  346. mem.metrics.FailedTxs.Add(1)
  347. // remove from cache (it might be good later)
  348. mem.cache.Remove(tx)
  349. }
  350. default:
  351. // ignore other messages
  352. }
  353. }
  354. // callback, which is called after the app rechecked the tx.
  355. //
  356. // The case where the app checks the tx for the first time is handled by the
  357. // resCbFirstTime callback.
  358. func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
  359. switch r := res.Value.(type) {
  360. case *abci.Response_CheckTx:
  361. tx := req.GetCheckTx().Tx
  362. memTx := mem.recheckCursor.Value.(*mempoolTx)
  363. if !bytes.Equal(tx, memTx.tx) {
  364. panic(fmt.Sprintf(
  365. "Unexpected tx response from proxy during recheck\nExpected %X, got %X",
  366. memTx.tx,
  367. tx))
  368. }
  369. var postCheckErr error
  370. if mem.postCheck != nil {
  371. postCheckErr = mem.postCheck(tx, r.CheckTx)
  372. }
  373. if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
  374. // Good, nothing to do.
  375. } else {
  376. // Tx became invalidated due to newly committed block.
  377. mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr)
  378. // NOTE: we remove tx from the cache because it might be good later
  379. mem.removeTx(tx, mem.recheckCursor, true)
  380. }
  381. if mem.recheckCursor == mem.recheckEnd {
  382. mem.recheckCursor = nil
  383. } else {
  384. mem.recheckCursor = mem.recheckCursor.Next()
  385. }
  386. if mem.recheckCursor == nil {
  387. // Done!
  388. atomic.StoreInt32(&mem.rechecking, 0)
  389. mem.logger.Info("Done rechecking txs")
  390. // incase the recheck removed all txs
  391. if mem.Size() > 0 {
  392. mem.notifyTxsAvailable()
  393. }
  394. }
  395. default:
  396. // ignore other messages
  397. }
  398. }
  399. // TxsAvailable returns a channel which fires once for every height,
  400. // and only when transactions are available in the mempool.
  401. // NOTE: the returned channel may be nil if EnableTxsAvailable was not called.
  402. func (mem *CListMempool) TxsAvailable() <-chan struct{} {
  403. return mem.txsAvailable
  404. }
  405. func (mem *CListMempool) notifyTxsAvailable() {
  406. if mem.Size() == 0 {
  407. panic("notified txs available but mempool is empty!")
  408. }
  409. if mem.txsAvailable != nil && !mem.notifiedTxsAvailable {
  410. // channel cap is 1, so this will send once
  411. mem.notifiedTxsAvailable = true
  412. select {
  413. case mem.txsAvailable <- struct{}{}:
  414. default:
  415. }
  416. }
  417. }
  418. // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes bytes total
  419. // with the condition that the total gasWanted must be less than maxGas.
  420. // If both maxes are negative, there is no cap on the size of all returned
  421. // transactions (~ all available transactions).
  422. func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
  423. mem.proxyMtx.Lock()
  424. defer mem.proxyMtx.Unlock()
  425. for atomic.LoadInt32(&mem.rechecking) > 0 {
  426. // TODO: Something better?
  427. time.Sleep(time.Millisecond * 10)
  428. }
  429. var totalBytes int64
  430. var totalGas int64
  431. // TODO: we will get a performance boost if we have a good estimate of avg
  432. // size per tx, and set the initial capacity based off of that.
  433. // txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max/mem.avgTxSize))
  434. txs := make([]types.Tx, 0, mem.txs.Len())
  435. for e := mem.txs.Front(); e != nil; e = e.Next() {
  436. memTx := e.Value.(*mempoolTx)
  437. // Check total size requirement
  438. aminoOverhead := types.ComputeAminoOverhead(memTx.tx, 1)
  439. if maxBytes > -1 && totalBytes+int64(len(memTx.tx))+aminoOverhead > maxBytes {
  440. return txs
  441. }
  442. totalBytes += int64(len(memTx.tx)) + aminoOverhead
  443. // Check total gas requirement.
  444. // If maxGas is negative, skip this check.
  445. // Since newTotalGas < masGas, which
  446. // must be non-negative, it follows that this won't overflow.
  447. newTotalGas := totalGas + memTx.gasWanted
  448. if maxGas > -1 && newTotalGas > maxGas {
  449. return txs
  450. }
  451. totalGas = newTotalGas
  452. txs = append(txs, memTx.tx)
  453. }
  454. return txs
  455. }
  456. // ReapMaxTxs reaps up to max transactions from the mempool.
  457. // If max is negative, there is no cap on the size of all returned
  458. // transactions (~ all available transactions).
  459. func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
  460. mem.proxyMtx.Lock()
  461. defer mem.proxyMtx.Unlock()
  462. if max < 0 {
  463. max = mem.txs.Len()
  464. }
  465. for atomic.LoadInt32(&mem.rechecking) > 0 {
  466. // TODO: Something better?
  467. time.Sleep(time.Millisecond * 10)
  468. }
  469. txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max))
  470. for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() {
  471. memTx := e.Value.(*mempoolTx)
  472. txs = append(txs, memTx.tx)
  473. }
  474. return txs
  475. }
  476. // Update informs the mempool that the given txs were committed and can be discarded.
  477. // NOTE: this should be called *after* block is committed by consensus.
  478. // NOTE: unsafe; Lock/Unlock must be managed by caller
  479. func (mem *CListMempool) Update(
  480. height int64,
  481. txs types.Txs,
  482. preCheck PreCheckFunc,
  483. postCheck PostCheckFunc,
  484. ) error {
  485. // Set height
  486. mem.height = height
  487. mem.notifiedTxsAvailable = false
  488. if preCheck != nil {
  489. mem.preCheck = preCheck
  490. }
  491. if postCheck != nil {
  492. mem.postCheck = postCheck
  493. }
  494. // Add committed transactions to cache (if missing).
  495. for _, tx := range txs {
  496. _ = mem.cache.Push(tx)
  497. }
  498. // Remove committed transactions.
  499. txsLeft := mem.removeTxs(txs)
  500. // Either recheck non-committed txs to see if they became invalid
  501. // or just notify there're some txs left.
  502. if len(txsLeft) > 0 {
  503. if mem.config.Recheck {
  504. mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height)
  505. mem.recheckTxs(txsLeft)
  506. // At this point, mem.txs are being rechecked.
  507. // mem.recheckCursor re-scans mem.txs and possibly removes some txs.
  508. // Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
  509. } else {
  510. mem.notifyTxsAvailable()
  511. }
  512. }
  513. // Update metrics
  514. mem.metrics.Size.Set(float64(mem.Size()))
  515. return nil
  516. }
  517. func (mem *CListMempool) removeTxs(txs types.Txs) []types.Tx {
  518. // Build a map for faster lookups.
  519. txsMap := make(map[string]struct{}, len(txs))
  520. for _, tx := range txs {
  521. txsMap[string(tx)] = struct{}{}
  522. }
  523. txsLeft := make([]types.Tx, 0, mem.txs.Len())
  524. for e := mem.txs.Front(); e != nil; e = e.Next() {
  525. memTx := e.Value.(*mempoolTx)
  526. // Remove the tx if it's already in a block.
  527. if _, ok := txsMap[string(memTx.tx)]; ok {
  528. // NOTE: we don't remove committed txs from the cache.
  529. mem.removeTx(memTx.tx, e, false)
  530. continue
  531. }
  532. txsLeft = append(txsLeft, memTx.tx)
  533. }
  534. return txsLeft
  535. }
  536. // NOTE: pass in txs because mem.txs can mutate concurrently.
  537. func (mem *CListMempool) recheckTxs(txs []types.Tx) {
  538. if len(txs) == 0 {
  539. return
  540. }
  541. atomic.StoreInt32(&mem.rechecking, 1)
  542. mem.recheckCursor = mem.txs.Front()
  543. mem.recheckEnd = mem.txs.Back()
  544. // Push txs to proxyAppConn
  545. // NOTE: globalCb may be called concurrently.
  546. for _, tx := range txs {
  547. mem.proxyAppConn.CheckTxAsync(tx)
  548. }
  549. mem.proxyAppConn.FlushAsync()
  550. }
  551. //--------------------------------------------------------------------------------
  552. // mempoolTx is a transaction that successfully ran
  553. type mempoolTx struct {
  554. height int64 // height that this tx had been validated in
  555. gasWanted int64 // amount of gas this tx states it will require
  556. tx types.Tx //
  557. // ids of peers who've sent us this tx (as a map for quick lookups).
  558. // senders: PeerID -> bool
  559. senders sync.Map
  560. }
  561. // Height returns the height for this transaction
  562. func (memTx *mempoolTx) Height() int64 {
  563. return atomic.LoadInt64(&memTx.height)
  564. }
  565. //--------------------------------------------------------------------------------
  566. type txCache interface {
  567. Reset()
  568. Push(tx types.Tx) bool
  569. Remove(tx types.Tx)
  570. }
  571. // mapTxCache maintains a LRU cache of transactions. This only stores the hash
  572. // of the tx, due to memory concerns.
  573. type mapTxCache struct {
  574. mtx sync.Mutex
  575. size int
  576. map_ map[[sha256.Size]byte]*list.Element
  577. list *list.List
  578. }
  579. var _ txCache = (*mapTxCache)(nil)
  580. // newMapTxCache returns a new mapTxCache.
  581. func newMapTxCache(cacheSize int) *mapTxCache {
  582. return &mapTxCache{
  583. size: cacheSize,
  584. map_: make(map[[sha256.Size]byte]*list.Element, cacheSize),
  585. list: list.New(),
  586. }
  587. }
  588. // Reset resets the cache to an empty state.
  589. func (cache *mapTxCache) Reset() {
  590. cache.mtx.Lock()
  591. cache.map_ = make(map[[sha256.Size]byte]*list.Element, cache.size)
  592. cache.list.Init()
  593. cache.mtx.Unlock()
  594. }
  595. // Push adds the given tx to the cache and returns true. It returns
  596. // false if tx is already in the cache.
  597. func (cache *mapTxCache) Push(tx types.Tx) bool {
  598. cache.mtx.Lock()
  599. defer cache.mtx.Unlock()
  600. // Use the tx hash in the cache
  601. txHash := txKey(tx)
  602. if moved, exists := cache.map_[txHash]; exists {
  603. cache.list.MoveToBack(moved)
  604. return false
  605. }
  606. if cache.list.Len() >= cache.size {
  607. popped := cache.list.Front()
  608. poppedTxHash := popped.Value.([sha256.Size]byte)
  609. delete(cache.map_, poppedTxHash)
  610. if popped != nil {
  611. cache.list.Remove(popped)
  612. }
  613. }
  614. e := cache.list.PushBack(txHash)
  615. cache.map_[txHash] = e
  616. return true
  617. }
  618. // Remove removes the given tx from the cache.
  619. func (cache *mapTxCache) Remove(tx types.Tx) {
  620. cache.mtx.Lock()
  621. txHash := txKey(tx)
  622. popped := cache.map_[txHash]
  623. delete(cache.map_, txHash)
  624. if popped != nil {
  625. cache.list.Remove(popped)
  626. }
  627. cache.mtx.Unlock()
  628. }
  629. type nopTxCache struct{}
  630. var _ txCache = (*nopTxCache)(nil)
  631. func (nopTxCache) Reset() {}
  632. func (nopTxCache) Push(types.Tx) bool { return true }
  633. func (nopTxCache) Remove(types.Tx) {}
  634. //--------------------------------------------------------------------------------
  635. // txKey is the fixed length array sha256 hash used as the key in maps.
  636. func txKey(tx types.Tx) [sha256.Size]byte {
  637. return sha256.Sum256(tx)
  638. }
  639. // txID is the hex encoded hash of the bytes as a types.Tx.
  640. func txID(tx []byte) string {
  641. return fmt.Sprintf("%X", types.Tx(tx).Hash())
  642. }