You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

837 lines
26 KiB

  1. package v1
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "sync/atomic"
  7. "time"
  8. abci "github.com/tendermint/tendermint/abci/types"
  9. "github.com/tendermint/tendermint/config"
  10. "github.com/tendermint/tendermint/internal/libs/clist"
  11. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  12. "github.com/tendermint/tendermint/internal/mempool"
  13. "github.com/tendermint/tendermint/libs/log"
  14. tmmath "github.com/tendermint/tendermint/libs/math"
  15. pubmempool "github.com/tendermint/tendermint/pkg/mempool"
  16. "github.com/tendermint/tendermint/proxy"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. var _ mempool.Mempool = (*TxMempool)(nil)
  20. // TxMempoolOption sets an optional parameter on the TxMempool.
  21. type TxMempoolOption func(*TxMempool)
  22. // TxMempool defines a prioritized mempool data structure used by the v1 mempool
  23. // reactor. It keeps a thread-safe priority queue of transactions that is used
  24. // when a block proposer constructs a block and a thread-safe linked-list that
  25. // is used to gossip transactions to peers in a FIFO manner.
  26. type TxMempool struct {
  27. logger log.Logger
  28. metrics *mempool.Metrics
  29. config *config.MempoolConfig
  30. proxyAppConn proxy.AppConnMempool
  31. // txsAvailable fires once for each height when the mempool is not empty
  32. txsAvailable chan struct{}
  33. notifiedTxsAvailable bool
  34. // height defines the last block height process during Update()
  35. height int64
  36. // sizeBytes defines the total size of the mempool (sum of all tx bytes)
  37. sizeBytes int64
  38. // cache defines a fixed-size cache of already seen transactions as this
  39. // reduces pressure on the proxyApp.
  40. cache mempool.TxCache
  41. // txStore defines the main storage of valid transactions. Indexes are built
  42. // on top of this store.
  43. txStore *TxStore
  44. // gossipIndex defines the gossiping index of valid transactions via a
  45. // thread-safe linked-list. We also use the gossip index as a cursor for
  46. // rechecking transactions already in the mempool.
  47. gossipIndex *clist.CList
  48. // recheckCursor and recheckEnd are used as cursors based on the gossip index
  49. // to recheck transactions that are already in the mempool. Iteration is not
  50. // thread-safe and transaction may be mutated in serial order.
  51. //
  52. // XXX/TODO: It might be somewhat of a codesmell to use the gossip index for
  53. // iterator and cursor management when rechecking transactions. If the gossip
  54. // index changes or is removed in a future refactor, this will have to be
  55. // refactored. Instead, we should consider just keeping a slice of a snapshot
  56. // of the mempool's current transactions during Update and an integer cursor
  57. // into that slice. This, however, requires additional O(n) space complexity.
  58. recheckCursor *clist.CElement // next expected response
  59. recheckEnd *clist.CElement // re-checking stops here
  60. // priorityIndex defines the priority index of valid transactions via a
  61. // thread-safe priority queue.
  62. priorityIndex *TxPriorityQueue
  63. // heightIndex defines a height-based, in ascending order, transaction index.
  64. // i.e. older transactions are first.
  65. heightIndex *WrappedTxList
  66. // timestampIndex defines a timestamp-based, in ascending order, transaction
  67. // index. i.e. older transactions are first.
  68. timestampIndex *WrappedTxList
  69. // A read/write lock is used to safe guard updates, insertions and deletions
  70. // from the mempool. A read-lock is implicitly acquired when executing CheckTx,
  71. // however, a caller must explicitly grab a write-lock via Lock when updating
  72. // the mempool via Update().
  73. mtx tmsync.RWMutex
  74. preCheck mempool.PreCheckFunc
  75. postCheck mempool.PostCheckFunc
  76. }
  77. func NewTxMempool(
  78. logger log.Logger,
  79. cfg *config.MempoolConfig,
  80. proxyAppConn proxy.AppConnMempool,
  81. height int64,
  82. options ...TxMempoolOption,
  83. ) *TxMempool {
  84. txmp := &TxMempool{
  85. logger: logger,
  86. config: cfg,
  87. proxyAppConn: proxyAppConn,
  88. height: height,
  89. cache: mempool.NopTxCache{},
  90. metrics: mempool.NopMetrics(),
  91. txStore: NewTxStore(),
  92. gossipIndex: clist.New(),
  93. priorityIndex: NewTxPriorityQueue(),
  94. heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
  95. return wtx1.height >= wtx2.height
  96. }),
  97. timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
  98. return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
  99. }),
  100. }
  101. if cfg.CacheSize > 0 {
  102. txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize)
  103. }
  104. proxyAppConn.SetResponseCallback(txmp.defaultTxCallback)
  105. for _, opt := range options {
  106. opt(txmp)
  107. }
  108. return txmp
  109. }
  110. // WithPreCheck sets a filter for the mempool to reject a transaction if f(tx)
  111. // returns an error. This is executed before CheckTx. It only applies to the
  112. // first created block. After that, Update() overwrites the existing value.
  113. func WithPreCheck(f mempool.PreCheckFunc) TxMempoolOption {
  114. return func(txmp *TxMempool) { txmp.preCheck = f }
  115. }
  116. // WithPostCheck sets a filter for the mempool to reject a transaction if
  117. // f(tx, resp) returns an error. This is executed after CheckTx. It only applies
  118. // to the first created block. After that, Update overwrites the existing value.
  119. func WithPostCheck(f mempool.PostCheckFunc) TxMempoolOption {
  120. return func(txmp *TxMempool) { txmp.postCheck = f }
  121. }
  122. // WithMetrics sets the mempool's metrics collector.
  123. func WithMetrics(metrics *mempool.Metrics) TxMempoolOption {
  124. return func(txmp *TxMempool) { txmp.metrics = metrics }
  125. }
  126. // Lock obtains a write-lock on the mempool. A caller must be sure to explicitly
  127. // release the lock when finished.
  128. func (txmp *TxMempool) Lock() {
  129. txmp.mtx.Lock()
  130. }
  131. // Unlock releases a write-lock on the mempool.
  132. func (txmp *TxMempool) Unlock() {
  133. txmp.mtx.Unlock()
  134. }
  135. // Size returns the number of valid transactions in the mempool. It is
  136. // thread-safe.
  137. func (txmp *TxMempool) Size() int {
  138. return txmp.txStore.Size()
  139. }
  140. // SizeBytes return the total sum in bytes of all the valid transactions in the
  141. // mempool. It is thread-safe.
  142. func (txmp *TxMempool) SizeBytes() int64 {
  143. return atomic.LoadInt64(&txmp.sizeBytes)
  144. }
  145. // FlushAppConn executes FlushSync on the mempool's proxyAppConn.
  146. //
  147. // NOTE: The caller must obtain a write-lock via Lock() prior to execution.
  148. func (txmp *TxMempool) FlushAppConn() error {
  149. return txmp.proxyAppConn.FlushSync(context.Background())
  150. }
  151. // WaitForNextTx returns a blocking channel that will be closed when the next
  152. // valid transaction is available to gossip. It is thread-safe.
  153. func (txmp *TxMempool) WaitForNextTx() <-chan struct{} {
  154. return txmp.gossipIndex.WaitChan()
  155. }
  156. // NextGossipTx returns the next valid transaction to gossip. A caller must wait
  157. // for WaitForNextTx to signal a transaction is available to gossip first. It is
  158. // thread-safe.
  159. func (txmp *TxMempool) NextGossipTx() *WrappedTx {
  160. return txmp.gossipIndex.Front().Value.(*WrappedTx)
  161. }
  162. // EnableTxsAvailable enables the mempool to trigger events when transactions
  163. // are available on a block by block basis.
  164. func (txmp *TxMempool) EnableTxsAvailable() {
  165. txmp.mtx.Lock()
  166. defer txmp.mtx.Unlock()
  167. txmp.txsAvailable = make(chan struct{}, 1)
  168. }
  169. // TxsAvailable returns a channel which fires once for every height, and only
  170. // when transactions are available in the mempool. It is thread-safe.
  171. func (txmp *TxMempool) TxsAvailable() <-chan struct{} {
  172. return txmp.txsAvailable
  173. }
  174. // CheckTx executes the ABCI CheckTx method for a given transaction. It acquires
  175. // a read-lock attempts to execute the application's CheckTx ABCI method via
  176. // CheckTxAsync. We return an error if any of the following happen:
  177. //
  178. // - The CheckTxAsync execution fails.
  179. // - The transaction already exists in the cache and we've already received the
  180. // transaction from the peer. Otherwise, if it solely exists in the cache, we
  181. // return nil.
  182. // - The transaction size exceeds the maximum transaction size as defined by the
  183. // configuration provided to the mempool.
  184. // - The transaction fails Pre-Check (if it is defined).
  185. // - The proxyAppConn fails, e.g. the buffer is full.
  186. //
  187. // If the mempool is full, we still execute CheckTx and attempt to find a lower
  188. // priority transaction to evict. If such a transaction exists, we remove the
  189. // lower priority transaction and add the new one with higher priority.
  190. //
  191. // NOTE:
  192. // - The applications' CheckTx implementation may panic.
  193. // - The caller is not to explicitly require any locks for executing CheckTx.
  194. func (txmp *TxMempool) CheckTx(
  195. ctx context.Context,
  196. tx types.Tx,
  197. cb func(*abci.Response),
  198. txInfo mempool.TxInfo,
  199. ) error {
  200. txmp.mtx.RLock()
  201. defer txmp.mtx.RUnlock()
  202. txSize := len(tx)
  203. if txSize > txmp.config.MaxTxBytes {
  204. return pubmempool.ErrTxTooLarge{
  205. Max: txmp.config.MaxTxBytes,
  206. Actual: txSize,
  207. }
  208. }
  209. if txmp.preCheck != nil {
  210. if err := txmp.preCheck(tx); err != nil {
  211. return pubmempool.ErrPreCheck{
  212. Reason: err,
  213. }
  214. }
  215. }
  216. if err := txmp.proxyAppConn.Error(); err != nil {
  217. return err
  218. }
  219. txHash := mempool.TxKey(tx)
  220. // We add the transaction to the mempool's cache and if the transaction already
  221. // exists, i.e. false is returned, then we check if we've seen this transaction
  222. // from the same sender and error if we have. Otherwise, we return nil.
  223. if !txmp.cache.Push(tx) {
  224. wtx, ok := txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID)
  225. if wtx != nil && ok {
  226. // We already have the transaction stored and the we've already seen this
  227. // transaction from txInfo.SenderID.
  228. return pubmempool.ErrTxInCache
  229. }
  230. txmp.logger.Debug("tx exists already in cache", "tx_hash", tx.Hash())
  231. return nil
  232. }
  233. if ctx == nil {
  234. ctx = context.Background()
  235. }
  236. reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})
  237. if err != nil {
  238. txmp.cache.Remove(tx)
  239. return err
  240. }
  241. reqRes.SetCallback(func(res *abci.Response) {
  242. if txmp.recheckCursor != nil {
  243. panic("recheck cursor is non-nil in CheckTx callback")
  244. }
  245. wtx := &WrappedTx{
  246. tx: tx,
  247. hash: txHash,
  248. timestamp: time.Now().UTC(),
  249. height: txmp.height,
  250. }
  251. txmp.initTxCallback(wtx, res, txInfo)
  252. if cb != nil {
  253. cb(res)
  254. }
  255. })
  256. return nil
  257. }
  258. // Flush flushes out the mempool. It acquires a read-lock, fetches all the
  259. // transactions currently in the transaction store and removes each transaction
  260. // from the store and all indexes and finally resets the cache.
  261. //
  262. // NOTE:
  263. // - Flushing the mempool may leave the mempool in an inconsistent state.
  264. func (txmp *TxMempool) Flush() {
  265. txmp.mtx.RLock()
  266. defer txmp.mtx.RUnlock()
  267. txmp.heightIndex.Reset()
  268. txmp.timestampIndex.Reset()
  269. for _, wtx := range txmp.txStore.GetAllTxs() {
  270. txmp.removeTx(wtx, false)
  271. }
  272. atomic.SwapInt64(&txmp.sizeBytes, 0)
  273. txmp.cache.Reset()
  274. }
  275. // ReapMaxBytesMaxGas returns a list of transactions within the provided size
  276. // and gas constraints. Transaction are retrieved in priority order.
  277. //
  278. // NOTE:
  279. // - A read-lock is acquired.
  280. // - Transactions returned are not actually removed from the mempool transaction
  281. // store or indexes.
  282. func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
  283. txmp.mtx.RLock()
  284. defer txmp.mtx.RUnlock()
  285. var (
  286. totalGas int64
  287. totalSize int64
  288. )
  289. // wTxs contains a list of *WrappedTx retrieved from the priority queue that
  290. // need to be re-enqueued prior to returning.
  291. wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs())
  292. defer func() {
  293. for _, wtx := range wTxs {
  294. txmp.priorityIndex.PushTx(wtx)
  295. }
  296. }()
  297. txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs())
  298. for txmp.priorityIndex.NumTxs() > 0 {
  299. wtx := txmp.priorityIndex.PopTx()
  300. txs = append(txs, wtx.tx)
  301. wTxs = append(wTxs, wtx)
  302. size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx})
  303. // Ensure we have capacity for the transaction with respect to the
  304. // transaction size.
  305. if maxBytes > -1 && totalSize+size > maxBytes {
  306. return txs[:len(txs)-1]
  307. }
  308. totalSize += size
  309. // ensure we have capacity for the transaction with respect to total gas
  310. gas := totalGas + wtx.gasWanted
  311. if maxGas > -1 && gas > maxGas {
  312. return txs[:len(txs)-1]
  313. }
  314. totalGas = gas
  315. }
  316. return txs
  317. }
  318. // ReapMaxTxs returns a list of transactions within the provided number of
  319. // transactions bound. Transaction are retrieved in priority order.
  320. //
  321. // NOTE:
  322. // - A read-lock is acquired.
  323. // - Transactions returned are not actually removed from the mempool transaction
  324. // store or indexes.
  325. func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
  326. txmp.mtx.RLock()
  327. defer txmp.mtx.RUnlock()
  328. numTxs := txmp.priorityIndex.NumTxs()
  329. if max < 0 {
  330. max = numTxs
  331. }
  332. cap := tmmath.MinInt(numTxs, max)
  333. // wTxs contains a list of *WrappedTx retrieved from the priority queue that
  334. // need to be re-enqueued prior to returning.
  335. wTxs := make([]*WrappedTx, 0, cap)
  336. defer func() {
  337. for _, wtx := range wTxs {
  338. txmp.priorityIndex.PushTx(wtx)
  339. }
  340. }()
  341. txs := make([]types.Tx, 0, cap)
  342. for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max {
  343. wtx := txmp.priorityIndex.PopTx()
  344. txs = append(txs, wtx.tx)
  345. wTxs = append(wTxs, wtx)
  346. }
  347. return txs
  348. }
  349. // Update iterates over all the transactions provided by the caller, i.e. the
  350. // block producer, and removes them from the cache (if applicable) and removes
  351. // the transactions from the main transaction store and associated indexes.
  352. // Finally, if there are trainsactions remaining in the mempool, we initiate a
  353. // re-CheckTx for them (if applicable), otherwise, we notify the caller more
  354. // transactions are available.
  355. //
  356. // NOTE:
  357. // - The caller must explicitly acquire a write-lock via Lock().
  358. func (txmp *TxMempool) Update(
  359. blockHeight int64,
  360. blockTxs types.Txs,
  361. deliverTxResponses []*abci.ResponseDeliverTx,
  362. newPreFn mempool.PreCheckFunc,
  363. newPostFn mempool.PostCheckFunc,
  364. ) error {
  365. txmp.height = blockHeight
  366. txmp.notifiedTxsAvailable = false
  367. if newPreFn != nil {
  368. txmp.preCheck = newPreFn
  369. }
  370. if newPostFn != nil {
  371. txmp.postCheck = newPostFn
  372. }
  373. for i, tx := range blockTxs {
  374. if deliverTxResponses[i].Code == abci.CodeTypeOK {
  375. // add the valid committed transaction to the cache (if missing)
  376. _ = txmp.cache.Push(tx)
  377. } else if !txmp.config.KeepInvalidTxsInCache {
  378. // allow invalid transactions to be re-submitted
  379. txmp.cache.Remove(tx)
  380. }
  381. // remove the committed transaction from the transaction store and indexes
  382. if wtx := txmp.txStore.GetTxByHash(mempool.TxKey(tx)); wtx != nil {
  383. txmp.removeTx(wtx, false)
  384. }
  385. }
  386. txmp.purgeExpiredTxs(blockHeight)
  387. // If there any uncommitted transactions left in the mempool, we either
  388. // initiate re-CheckTx per remaining transaction or notify that remaining
  389. // transactions are left.
  390. if txmp.Size() > 0 {
  391. if txmp.config.Recheck {
  392. txmp.logger.Debug(
  393. "executing re-CheckTx for all remaining transactions",
  394. "num_txs", txmp.Size(),
  395. "height", blockHeight,
  396. )
  397. txmp.updateReCheckTxs()
  398. } else {
  399. txmp.notifyTxsAvailable()
  400. }
  401. }
  402. txmp.metrics.Size.Set(float64(txmp.Size()))
  403. return nil
  404. }
  405. // initTxCallback performs the initial, i.e. the first, callback after CheckTx
  406. // has been executed by the ABCI application. In other words, initTxCallback is
  407. // called after executing CheckTx when we see a unique transaction for the first
  408. // time. CheckTx can be called again for the same transaction at a later point
  409. // in time when re-checking, however, this callback will not be called.
  410. //
  411. // After the ABCI application executes CheckTx, initTxCallback is called with
  412. // the ABCI *Response object and TxInfo. If postCheck is defined on the mempool,
  413. // we execute that first. If there is no error from postCheck (if defined) and
  414. // the ABCI CheckTx response code is OK, we attempt to insert the transaction.
  415. //
  416. // When attempting to insert the transaction, we first check if there is
  417. // sufficient capacity. If there is sufficient capacity, the transaction is
  418. // inserted into the txStore and indexed across all indexes. Otherwise, if the
  419. // mempool is full, we attempt to find a lower priority transaction to evict in
  420. // place of the new incoming transaction. If no such transaction exists, the
  421. // new incoming transaction is rejected.
  422. //
  423. // If the new incoming transaction fails CheckTx or postCheck fails, we reject
  424. // the new incoming transaction.
  425. //
  426. // NOTE:
  427. // - An explicit lock is NOT required.
  428. func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo mempool.TxInfo) {
  429. checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
  430. if ok {
  431. var err error
  432. if txmp.postCheck != nil {
  433. err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx)
  434. }
  435. if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
  436. sender := checkTxRes.CheckTx.Sender
  437. priority := checkTxRes.CheckTx.Priority
  438. if len(sender) > 0 {
  439. if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil {
  440. txmp.logger.Error(
  441. "rejected incoming good transaction; tx already exists for sender",
  442. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  443. "sender", sender,
  444. )
  445. txmp.metrics.RejectedTxs.Add(1)
  446. return
  447. }
  448. }
  449. if err := txmp.canAddTx(wtx); err != nil {
  450. evictTxs := txmp.priorityIndex.GetEvictableTxs(
  451. priority,
  452. int64(wtx.Size()),
  453. txmp.SizeBytes(),
  454. txmp.config.MaxTxsBytes,
  455. )
  456. if len(evictTxs) == 0 {
  457. // No room for the new incoming transaction so we just remove it from
  458. // the cache.
  459. txmp.cache.Remove(wtx.tx)
  460. txmp.logger.Error(
  461. "rejected incoming good transaction; mempool full",
  462. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  463. "err", err.Error(),
  464. )
  465. txmp.metrics.RejectedTxs.Add(1)
  466. return
  467. }
  468. // evict an existing transaction(s)
  469. //
  470. // NOTE:
  471. // - The transaction, toEvict, can be removed while a concurrent
  472. // reCheckTx callback is being executed for the same transaction.
  473. for _, toEvict := range evictTxs {
  474. txmp.removeTx(toEvict, true)
  475. txmp.logger.Debug(
  476. "evicted existing good transaction; mempool full",
  477. "old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
  478. "old_priority", toEvict.priority,
  479. "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  480. "new_priority", wtx.priority,
  481. )
  482. txmp.metrics.EvictedTxs.Add(1)
  483. }
  484. }
  485. wtx.gasWanted = checkTxRes.CheckTx.GasWanted
  486. wtx.priority = priority
  487. wtx.sender = sender
  488. wtx.peers = map[uint16]struct{}{
  489. txInfo.SenderID: {},
  490. }
  491. txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
  492. txmp.metrics.Size.Set(float64(txmp.Size()))
  493. txmp.insertTx(wtx)
  494. txmp.logger.Debug(
  495. "inserted good transaction",
  496. "priority", wtx.priority,
  497. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  498. "height", txmp.height,
  499. "num_txs", txmp.Size(),
  500. )
  501. txmp.notifyTxsAvailable()
  502. } else {
  503. // ignore bad transactions
  504. txmp.logger.Info(
  505. "rejected bad transaction",
  506. "priority", wtx.priority,
  507. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  508. "peer_id", txInfo.SenderNodeID,
  509. "code", checkTxRes.CheckTx.Code,
  510. "post_check_err", err,
  511. )
  512. txmp.metrics.FailedTxs.Add(1)
  513. if !txmp.config.KeepInvalidTxsInCache {
  514. txmp.cache.Remove(wtx.tx)
  515. }
  516. }
  517. }
  518. }
  519. // defaultTxCallback performs the default CheckTx application callback. This is
  520. // NOT executed when a transaction is first seen/received. Instead, this callback
  521. // is executed during re-checking transactions (if enabled). A caller, i.e a
  522. // block proposer, acquires a mempool write-lock via Lock() and when executing
  523. // Update(), if the mempool is non-empty and Recheck is enabled, then all
  524. // remaining transactions will be rechecked via CheckTxAsync. The order in which
  525. // they are rechecked must be the same order in which this callback is called
  526. // per transaction.
  527. func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) {
  528. if txmp.recheckCursor == nil {
  529. return
  530. }
  531. txmp.metrics.RecheckTimes.Add(1)
  532. checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
  533. if ok {
  534. tx := req.GetCheckTx().Tx
  535. wtx := txmp.recheckCursor.Value.(*WrappedTx)
  536. if !bytes.Equal(tx, wtx.tx) {
  537. panic(fmt.Sprintf("re-CheckTx transaction mismatch; got: %X, expected: %X", wtx.tx.Hash(), mempool.TxKey(tx)))
  538. }
  539. // Only evaluate transactions that have not been removed. This can happen
  540. // if an existing transaction is evicted during CheckTx and while this
  541. // callback is being executed for the same evicted transaction.
  542. if !txmp.txStore.IsTxRemoved(wtx.hash) {
  543. var err error
  544. if txmp.postCheck != nil {
  545. err = txmp.postCheck(tx, checkTxRes.CheckTx)
  546. }
  547. if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
  548. wtx.priority = checkTxRes.CheckTx.Priority
  549. } else {
  550. txmp.logger.Debug(
  551. "existing transaction no longer valid; failed re-CheckTx callback",
  552. "priority", wtx.priority,
  553. "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(wtx.tx)),
  554. "err", err,
  555. "code", checkTxRes.CheckTx.Code,
  556. )
  557. if wtx.gossipEl != txmp.recheckCursor {
  558. panic("corrupted reCheckTx cursor")
  559. }
  560. txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
  561. }
  562. }
  563. // move reCheckTx cursor to next element
  564. if txmp.recheckCursor == txmp.recheckEnd {
  565. txmp.recheckCursor = nil
  566. } else {
  567. txmp.recheckCursor = txmp.recheckCursor.Next()
  568. }
  569. if txmp.recheckCursor == nil {
  570. txmp.logger.Debug("finished rechecking transactions")
  571. if txmp.Size() > 0 {
  572. txmp.notifyTxsAvailable()
  573. }
  574. }
  575. txmp.metrics.Size.Set(float64(txmp.Size()))
  576. }
  577. }
  578. // updateReCheckTxs updates the recheck cursors by using the gossipIndex. For
  579. // each transaction, it executes CheckTxAsync. The global callback defined on
  580. // the proxyAppConn will be executed for each transaction after CheckTx is
  581. // executed.
  582. //
  583. // NOTE:
  584. // - The caller must have a write-lock when executing updateReCheckTxs.
  585. func (txmp *TxMempool) updateReCheckTxs() {
  586. if txmp.Size() == 0 {
  587. panic("attempted to update re-CheckTx txs when mempool is empty")
  588. }
  589. txmp.recheckCursor = txmp.gossipIndex.Front()
  590. txmp.recheckEnd = txmp.gossipIndex.Back()
  591. ctx := context.Background()
  592. for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() {
  593. wtx := e.Value.(*WrappedTx)
  594. // Only execute CheckTx if the transaction is not marked as removed which
  595. // could happen if the transaction was evicted.
  596. if !txmp.txStore.IsTxRemoved(wtx.hash) {
  597. _, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{
  598. Tx: wtx.tx,
  599. Type: abci.CheckTxType_Recheck,
  600. })
  601. if err != nil {
  602. // no need in retrying since the tx will be rechecked after the next block
  603. txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err)
  604. }
  605. }
  606. }
  607. if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil {
  608. txmp.logger.Error("failed to flush transactions during rechecking", "err", err)
  609. }
  610. }
  611. // canAddTx returns an error if we cannot insert the provided *WrappedTx into
  612. // the mempool due to mempool configured constraints. Otherwise, nil is returned
  613. // and the transaction can be inserted into the mempool.
  614. func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
  615. var (
  616. numTxs = txmp.Size()
  617. sizeBytes = txmp.SizeBytes()
  618. )
  619. if numTxs >= txmp.config.Size || int64(wtx.Size())+sizeBytes > txmp.config.MaxTxsBytes {
  620. return pubmempool.ErrMempoolIsFull{
  621. NumTxs: numTxs,
  622. MaxTxs: txmp.config.Size,
  623. TxsBytes: sizeBytes,
  624. MaxTxsBytes: txmp.config.MaxTxsBytes,
  625. }
  626. }
  627. return nil
  628. }
  629. func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
  630. txmp.txStore.SetTx(wtx)
  631. txmp.priorityIndex.PushTx(wtx)
  632. txmp.heightIndex.Insert(wtx)
  633. txmp.timestampIndex.Insert(wtx)
  634. // Insert the transaction into the gossip index and mark the reference to the
  635. // linked-list element, which will be needed at a later point when the
  636. // transaction is removed.
  637. gossipEl := txmp.gossipIndex.PushBack(wtx)
  638. wtx.gossipEl = gossipEl
  639. atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size()))
  640. }
  641. func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
  642. if txmp.txStore.IsTxRemoved(wtx.hash) {
  643. return
  644. }
  645. txmp.txStore.RemoveTx(wtx)
  646. txmp.priorityIndex.RemoveTx(wtx)
  647. txmp.heightIndex.Remove(wtx)
  648. txmp.timestampIndex.Remove(wtx)
  649. // Remove the transaction from the gossip index and cleanup the linked-list
  650. // element so it can be garbage collected.
  651. txmp.gossipIndex.Remove(wtx.gossipEl)
  652. wtx.gossipEl.DetachPrev()
  653. atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))
  654. if removeFromCache {
  655. txmp.cache.Remove(wtx.tx)
  656. }
  657. }
  658. // purgeExpiredTxs removes all transactions that have exceeded their respective
  659. // height and/or time based TTLs from their respective indexes. Every expired
  660. // transaction will be removed from the mempool entirely, except for the cache.
  661. //
  662. // NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which
  663. // the caller has a write-lock on the mempool and so we can safely iterate over
  664. // the height and time based indexes.
  665. func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
  666. now := time.Now()
  667. expiredTxs := make(map[[mempool.TxKeySize]byte]*WrappedTx)
  668. if txmp.config.TTLNumBlocks > 0 {
  669. purgeIdx := -1
  670. for i, wtx := range txmp.heightIndex.txs {
  671. if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks {
  672. expiredTxs[mempool.TxKey(wtx.tx)] = wtx
  673. purgeIdx = i
  674. } else {
  675. // since the index is sorted, we know no other txs can be be purged
  676. break
  677. }
  678. }
  679. if purgeIdx >= 0 {
  680. txmp.heightIndex.txs = txmp.heightIndex.txs[purgeIdx+1:]
  681. }
  682. }
  683. if txmp.config.TTLDuration > 0 {
  684. purgeIdx := -1
  685. for i, wtx := range txmp.timestampIndex.txs {
  686. if now.Sub(wtx.timestamp) > txmp.config.TTLDuration {
  687. expiredTxs[mempool.TxKey(wtx.tx)] = wtx
  688. purgeIdx = i
  689. } else {
  690. // since the index is sorted, we know no other txs can be be purged
  691. break
  692. }
  693. }
  694. if purgeIdx >= 0 {
  695. txmp.timestampIndex.txs = txmp.timestampIndex.txs[purgeIdx+1:]
  696. }
  697. }
  698. for _, wtx := range expiredTxs {
  699. txmp.removeTx(wtx, false)
  700. }
  701. }
  702. func (txmp *TxMempool) notifyTxsAvailable() {
  703. if txmp.Size() == 0 {
  704. panic("attempt to notify txs available but mempool is empty!")
  705. }
  706. if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable {
  707. // channel cap is 1, so this will send once
  708. txmp.notifiedTxsAvailable = true
  709. select {
  710. case txmp.txsAvailable <- struct{}{}:
  711. default:
  712. }
  713. }
  714. }