You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

842 lines
25 KiB

  1. package v1
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "sync/atomic"
  7. "time"
  8. abci "github.com/tendermint/tendermint/abci/types"
  9. "github.com/tendermint/tendermint/config"
  10. "github.com/tendermint/tendermint/internal/libs/clist"
  11. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  12. "github.com/tendermint/tendermint/internal/mempool"
  13. "github.com/tendermint/tendermint/internal/proxy"
  14. "github.com/tendermint/tendermint/libs/log"
  15. tmmath "github.com/tendermint/tendermint/libs/math"
  16. "github.com/tendermint/tendermint/types"
  17. )
  18. var _ mempool.Mempool = (*TxMempool)(nil)
  19. // TxMempoolOption sets an optional parameter on the TxMempool.
  20. type TxMempoolOption func(*TxMempool)
  21. // TxMempool defines a prioritized mempool data structure used by the v1 mempool
  22. // reactor. It keeps a thread-safe priority queue of transactions that is used
  23. // when a block proposer constructs a block and a thread-safe linked-list that
  24. // is used to gossip transactions to peers in a FIFO manner.
  25. type TxMempool struct {
  26. logger log.Logger
  27. metrics *mempool.Metrics
  28. config *config.MempoolConfig
  29. proxyAppConn proxy.AppConnMempool
  30. // txsAvailable fires once for each height when the mempool is not empty
  31. txsAvailable chan struct{}
  32. notifiedTxsAvailable bool
  33. // height defines the last block height process during Update()
  34. height int64
  35. // sizeBytes defines the total size of the mempool (sum of all tx bytes)
  36. sizeBytes int64
  37. // cache defines a fixed-size cache of already seen transactions as this
  38. // reduces pressure on the proxyApp.
  39. cache mempool.TxCache
  40. // txStore defines the main storage of valid transactions. Indexes are built
  41. // on top of this store.
  42. txStore *TxStore
  43. // gossipIndex defines the gossiping index of valid transactions via a
  44. // thread-safe linked-list. We also use the gossip index as a cursor for
  45. // rechecking transactions already in the mempool.
  46. gossipIndex *clist.CList
  47. // recheckCursor and recheckEnd are used as cursors based on the gossip index
  48. // to recheck transactions that are already in the mempool. Iteration is not
  49. // thread-safe and transaction may be mutated in serial order.
  50. //
  51. // XXX/TODO: It might be somewhat of a codesmell to use the gossip index for
  52. // iterator and cursor management when rechecking transactions. If the gossip
  53. // index changes or is removed in a future refactor, this will have to be
  54. // refactored. Instead, we should consider just keeping a slice of a snapshot
  55. // of the mempool's current transactions during Update and an integer cursor
  56. // into that slice. This, however, requires additional O(n) space complexity.
  57. recheckCursor *clist.CElement // next expected response
  58. recheckEnd *clist.CElement // re-checking stops here
  59. // priorityIndex defines the priority index of valid transactions via a
  60. // thread-safe priority queue.
  61. priorityIndex *TxPriorityQueue
  62. // heightIndex defines a height-based, in ascending order, transaction index.
  63. // i.e. older transactions are first.
  64. heightIndex *WrappedTxList
  65. // timestampIndex defines a timestamp-based, in ascending order, transaction
  66. // index. i.e. older transactions are first.
  67. timestampIndex *WrappedTxList
  68. // A read/write lock is used to safe guard updates, insertions and deletions
  69. // from the mempool. A read-lock is implicitly acquired when executing CheckTx,
  70. // however, a caller must explicitly grab a write-lock via Lock when updating
  71. // the mempool via Update().
  72. mtx tmsync.RWMutex
  73. preCheck mempool.PreCheckFunc
  74. postCheck mempool.PostCheckFunc
  75. }
  76. func NewTxMempool(
  77. logger log.Logger,
  78. cfg *config.MempoolConfig,
  79. proxyAppConn proxy.AppConnMempool,
  80. height int64,
  81. options ...TxMempoolOption,
  82. ) *TxMempool {
  83. txmp := &TxMempool{
  84. logger: logger,
  85. config: cfg,
  86. proxyAppConn: proxyAppConn,
  87. height: height,
  88. cache: mempool.NopTxCache{},
  89. metrics: mempool.NopMetrics(),
  90. txStore: NewTxStore(),
  91. gossipIndex: clist.New(),
  92. priorityIndex: NewTxPriorityQueue(),
  93. heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
  94. return wtx1.height >= wtx2.height
  95. }),
  96. timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
  97. return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
  98. }),
  99. }
  100. if cfg.CacheSize > 0 {
  101. txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize)
  102. }
  103. proxyAppConn.SetResponseCallback(txmp.defaultTxCallback)
  104. for _, opt := range options {
  105. opt(txmp)
  106. }
  107. return txmp
  108. }
  109. // WithPreCheck sets a filter for the mempool to reject a transaction if f(tx)
  110. // returns an error. This is executed before CheckTx. It only applies to the
  111. // first created block. After that, Update() overwrites the existing value.
  112. func WithPreCheck(f mempool.PreCheckFunc) TxMempoolOption {
  113. return func(txmp *TxMempool) { txmp.preCheck = f }
  114. }
  115. // WithPostCheck sets a filter for the mempool to reject a transaction if
  116. // f(tx, resp) returns an error. This is executed after CheckTx. It only applies
  117. // to the first created block. After that, Update overwrites the existing value.
  118. func WithPostCheck(f mempool.PostCheckFunc) TxMempoolOption {
  119. return func(txmp *TxMempool) { txmp.postCheck = f }
  120. }
  121. // WithMetrics sets the mempool's metrics collector.
  122. func WithMetrics(metrics *mempool.Metrics) TxMempoolOption {
  123. return func(txmp *TxMempool) { txmp.metrics = metrics }
  124. }
  125. // Lock obtains a write-lock on the mempool. A caller must be sure to explicitly
  126. // release the lock when finished.
  127. func (txmp *TxMempool) Lock() {
  128. txmp.mtx.Lock()
  129. }
  130. // Unlock releases a write-lock on the mempool.
  131. func (txmp *TxMempool) Unlock() {
  132. txmp.mtx.Unlock()
  133. }
  134. // Size returns the number of valid transactions in the mempool. It is
  135. // thread-safe.
  136. func (txmp *TxMempool) Size() int {
  137. return txmp.txStore.Size()
  138. }
  139. // SizeBytes return the total sum in bytes of all the valid transactions in the
  140. // mempool. It is thread-safe.
  141. func (txmp *TxMempool) SizeBytes() int64 {
  142. return atomic.LoadInt64(&txmp.sizeBytes)
  143. }
  144. // FlushAppConn executes FlushSync on the mempool's proxyAppConn.
  145. //
  146. // NOTE: The caller must obtain a write-lock via Lock() prior to execution.
  147. func (txmp *TxMempool) FlushAppConn() error {
  148. return txmp.proxyAppConn.FlushSync(context.Background())
  149. }
  150. // WaitForNextTx returns a blocking channel that will be closed when the next
  151. // valid transaction is available to gossip. It is thread-safe.
  152. func (txmp *TxMempool) WaitForNextTx() <-chan struct{} {
  153. return txmp.gossipIndex.WaitChan()
  154. }
  155. // NextGossipTx returns the next valid transaction to gossip. A caller must wait
  156. // for WaitForNextTx to signal a transaction is available to gossip first. It is
  157. // thread-safe.
  158. func (txmp *TxMempool) NextGossipTx() *clist.CElement {
  159. return txmp.gossipIndex.Front()
  160. }
  161. // EnableTxsAvailable enables the mempool to trigger events when transactions
  162. // are available on a block by block basis.
  163. func (txmp *TxMempool) EnableTxsAvailable() {
  164. txmp.mtx.Lock()
  165. defer txmp.mtx.Unlock()
  166. txmp.txsAvailable = make(chan struct{}, 1)
  167. }
  168. // TxsAvailable returns a channel which fires once for every height, and only
  169. // when transactions are available in the mempool. It is thread-safe.
  170. func (txmp *TxMempool) TxsAvailable() <-chan struct{} {
  171. return txmp.txsAvailable
  172. }
  173. // CheckTx executes the ABCI CheckTx method for a given transaction. It acquires
  174. // a read-lock attempts to execute the application's CheckTx ABCI method via
  175. // CheckTxAsync. We return an error if any of the following happen:
  176. //
  177. // - The CheckTxAsync execution fails.
  178. // - The transaction already exists in the cache and we've already received the
  179. // transaction from the peer. Otherwise, if it solely exists in the cache, we
  180. // return nil.
  181. // - The transaction size exceeds the maximum transaction size as defined by the
  182. // configuration provided to the mempool.
  183. // - The transaction fails Pre-Check (if it is defined).
  184. // - The proxyAppConn fails, e.g. the buffer is full.
  185. //
  186. // If the mempool is full, we still execute CheckTx and attempt to find a lower
  187. // priority transaction to evict. If such a transaction exists, we remove the
  188. // lower priority transaction and add the new one with higher priority.
  189. //
  190. // NOTE:
  191. // - The applications' CheckTx implementation may panic.
  192. // - The caller is not to explicitly require any locks for executing CheckTx.
  193. func (txmp *TxMempool) CheckTx(
  194. ctx context.Context,
  195. tx types.Tx,
  196. cb func(*abci.Response),
  197. txInfo mempool.TxInfo,
  198. ) error {
  199. txmp.mtx.RLock()
  200. defer txmp.mtx.RUnlock()
  201. txSize := len(tx)
  202. if txSize > txmp.config.MaxTxBytes {
  203. return types.ErrTxTooLarge{
  204. Max: txmp.config.MaxTxBytes,
  205. Actual: txSize,
  206. }
  207. }
  208. if txmp.preCheck != nil {
  209. if err := txmp.preCheck(tx); err != nil {
  210. return types.ErrPreCheck{
  211. Reason: err,
  212. }
  213. }
  214. }
  215. if err := txmp.proxyAppConn.Error(); err != nil {
  216. return err
  217. }
  218. txHash := mempool.TxKey(tx)
  219. // We add the transaction to the mempool's cache and if the transaction already
  220. // exists, i.e. false is returned, then we check if we've seen this transaction
  221. // from the same sender and error if we have. Otherwise, we return nil.
  222. if !txmp.cache.Push(tx) {
  223. wtx, ok := txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID)
  224. if wtx != nil && ok {
  225. // We already have the transaction stored and the we've already seen this
  226. // transaction from txInfo.SenderID.
  227. return types.ErrTxInCache
  228. }
  229. txmp.logger.Debug("tx exists already in cache", "tx_hash", tx.Hash())
  230. return nil
  231. }
  232. if ctx == nil {
  233. ctx = context.Background()
  234. }
  235. reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})
  236. if err != nil {
  237. txmp.cache.Remove(tx)
  238. return err
  239. }
  240. reqRes.SetCallback(func(res *abci.Response) {
  241. if txmp.recheckCursor != nil {
  242. panic("recheck cursor is non-nil in CheckTx callback")
  243. }
  244. wtx := &WrappedTx{
  245. tx: tx,
  246. hash: txHash,
  247. timestamp: time.Now().UTC(),
  248. height: txmp.height,
  249. }
  250. txmp.initTxCallback(wtx, res, txInfo)
  251. if cb != nil {
  252. cb(res)
  253. }
  254. })
  255. return nil
  256. }
  257. // Flush flushes out the mempool. It acquires a read-lock, fetches all the
  258. // transactions currently in the transaction store and removes each transaction
  259. // from the store and all indexes and finally resets the cache.
  260. //
  261. // NOTE:
  262. // - Flushing the mempool may leave the mempool in an inconsistent state.
  263. func (txmp *TxMempool) Flush() {
  264. txmp.mtx.RLock()
  265. defer txmp.mtx.RUnlock()
  266. txmp.heightIndex.Reset()
  267. txmp.timestampIndex.Reset()
  268. for _, wtx := range txmp.txStore.GetAllTxs() {
  269. txmp.removeTx(wtx, false)
  270. }
  271. atomic.SwapInt64(&txmp.sizeBytes, 0)
  272. txmp.cache.Reset()
  273. }
  274. // ReapMaxBytesMaxGas returns a list of transactions within the provided size
  275. // and gas constraints. Transaction are retrieved in priority order.
  276. //
  277. // NOTE:
  278. // - A read-lock is acquired.
  279. // - Transactions returned are not actually removed from the mempool transaction
  280. // store or indexes.
  281. func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
  282. txmp.mtx.RLock()
  283. defer txmp.mtx.RUnlock()
  284. var (
  285. totalGas int64
  286. totalSize int64
  287. )
  288. // wTxs contains a list of *WrappedTx retrieved from the priority queue that
  289. // need to be re-enqueued prior to returning.
  290. wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs())
  291. defer func() {
  292. for _, wtx := range wTxs {
  293. txmp.priorityIndex.PushTx(wtx)
  294. }
  295. }()
  296. txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs())
  297. for txmp.priorityIndex.NumTxs() > 0 {
  298. wtx := txmp.priorityIndex.PopTx()
  299. txs = append(txs, wtx.tx)
  300. wTxs = append(wTxs, wtx)
  301. size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx})
  302. // Ensure we have capacity for the transaction with respect to the
  303. // transaction size.
  304. if maxBytes > -1 && totalSize+size > maxBytes {
  305. return txs[:len(txs)-1]
  306. }
  307. totalSize += size
  308. // ensure we have capacity for the transaction with respect to total gas
  309. gas := totalGas + wtx.gasWanted
  310. if maxGas > -1 && gas > maxGas {
  311. return txs[:len(txs)-1]
  312. }
  313. totalGas = gas
  314. }
  315. return txs
  316. }
  317. // ReapMaxTxs returns a list of transactions within the provided number of
  318. // transactions bound. Transaction are retrieved in priority order.
  319. //
  320. // NOTE:
  321. // - A read-lock is acquired.
  322. // - Transactions returned are not actually removed from the mempool transaction
  323. // store or indexes.
  324. func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
  325. txmp.mtx.RLock()
  326. defer txmp.mtx.RUnlock()
  327. numTxs := txmp.priorityIndex.NumTxs()
  328. if max < 0 {
  329. max = numTxs
  330. }
  331. cap := tmmath.MinInt(numTxs, max)
  332. // wTxs contains a list of *WrappedTx retrieved from the priority queue that
  333. // need to be re-enqueued prior to returning.
  334. wTxs := make([]*WrappedTx, 0, cap)
  335. defer func() {
  336. for _, wtx := range wTxs {
  337. txmp.priorityIndex.PushTx(wtx)
  338. }
  339. }()
  340. txs := make([]types.Tx, 0, cap)
  341. for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max {
  342. wtx := txmp.priorityIndex.PopTx()
  343. txs = append(txs, wtx.tx)
  344. wTxs = append(wTxs, wtx)
  345. }
  346. return txs
  347. }
  348. // Update iterates over all the transactions provided by the caller, i.e. the
  349. // block producer, and removes them from the cache (if applicable) and removes
  350. // the transactions from the main transaction store and associated indexes.
  351. // Finally, if there are trainsactions remaining in the mempool, we initiate a
  352. // re-CheckTx for them (if applicable), otherwise, we notify the caller more
  353. // transactions are available.
  354. //
  355. // NOTE:
  356. // - The caller must explicitly acquire a write-lock via Lock().
  357. func (txmp *TxMempool) Update(
  358. blockHeight int64,
  359. blockTxs types.Txs,
  360. deliverTxResponses []*abci.ResponseDeliverTx,
  361. newPreFn mempool.PreCheckFunc,
  362. newPostFn mempool.PostCheckFunc,
  363. ) error {
  364. txmp.height = blockHeight
  365. txmp.notifiedTxsAvailable = false
  366. if newPreFn != nil {
  367. txmp.preCheck = newPreFn
  368. }
  369. if newPostFn != nil {
  370. txmp.postCheck = newPostFn
  371. }
  372. for i, tx := range blockTxs {
  373. if deliverTxResponses[i].Code == abci.CodeTypeOK {
  374. // add the valid committed transaction to the cache (if missing)
  375. _ = txmp.cache.Push(tx)
  376. } else if !txmp.config.KeepInvalidTxsInCache {
  377. // allow invalid transactions to be re-submitted
  378. txmp.cache.Remove(tx)
  379. }
  380. // remove the committed transaction from the transaction store and indexes
  381. if wtx := txmp.txStore.GetTxByHash(mempool.TxKey(tx)); wtx != nil {
  382. txmp.removeTx(wtx, false)
  383. }
  384. }
  385. txmp.purgeExpiredTxs(blockHeight)
  386. // If there any uncommitted transactions left in the mempool, we either
  387. // initiate re-CheckTx per remaining transaction or notify that remaining
  388. // transactions are left.
  389. if txmp.Size() > 0 {
  390. if txmp.config.Recheck {
  391. txmp.logger.Debug(
  392. "executing re-CheckTx for all remaining transactions",
  393. "num_txs", txmp.Size(),
  394. "height", blockHeight,
  395. )
  396. txmp.updateReCheckTxs()
  397. } else {
  398. txmp.notifyTxsAvailable()
  399. }
  400. }
  401. txmp.metrics.Size.Set(float64(txmp.Size()))
  402. return nil
  403. }
  404. // initTxCallback performs the initial, i.e. the first, callback after CheckTx
  405. // has been executed by the ABCI application. In other words, initTxCallback is
  406. // called after executing CheckTx when we see a unique transaction for the first
  407. // time. CheckTx can be called again for the same transaction at a later point
  408. // in time when re-checking, however, this callback will not be called.
  409. //
  410. // After the ABCI application executes CheckTx, initTxCallback is called with
  411. // the ABCI *Response object and TxInfo. If postCheck is defined on the mempool,
  412. // we execute that first. If there is no error from postCheck (if defined) and
  413. // the ABCI CheckTx response code is OK, we attempt to insert the transaction.
  414. //
  415. // When attempting to insert the transaction, we first check if there is
  416. // sufficient capacity. If there is sufficient capacity, the transaction is
  417. // inserted into the txStore and indexed across all indexes. Otherwise, if the
  418. // mempool is full, we attempt to find a lower priority transaction to evict in
  419. // place of the new incoming transaction. If no such transaction exists, the
  420. // new incoming transaction is rejected.
  421. //
  422. // If the new incoming transaction fails CheckTx or postCheck fails, we reject
  423. // the new incoming transaction.
  424. //
  425. // NOTE:
  426. // - An explicit lock is NOT required.
  427. func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo mempool.TxInfo) {
  428. checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
  429. if !ok {
  430. return
  431. }
  432. var err error
  433. if txmp.postCheck != nil {
  434. err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx)
  435. }
  436. if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK {
  437. // ignore bad transactions
  438. txmp.logger.Info(
  439. "rejected bad transaction",
  440. "priority", wtx.priority,
  441. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  442. "peer_id", txInfo.SenderNodeID,
  443. "code", checkTxRes.CheckTx.Code,
  444. "post_check_err", err,
  445. )
  446. txmp.metrics.FailedTxs.Add(1)
  447. if !txmp.config.KeepInvalidTxsInCache {
  448. txmp.cache.Remove(wtx.tx)
  449. }
  450. if err != nil {
  451. checkTxRes.CheckTx.MempoolError = err.Error()
  452. }
  453. return
  454. }
  455. sender := checkTxRes.CheckTx.Sender
  456. priority := checkTxRes.CheckTx.Priority
  457. if len(sender) > 0 {
  458. if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil {
  459. txmp.logger.Error(
  460. "rejected incoming good transaction; tx already exists for sender",
  461. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  462. "sender", sender,
  463. )
  464. txmp.metrics.RejectedTxs.Add(1)
  465. return
  466. }
  467. }
  468. if err := txmp.canAddTx(wtx); err != nil {
  469. evictTxs := txmp.priorityIndex.GetEvictableTxs(
  470. priority,
  471. int64(wtx.Size()),
  472. txmp.SizeBytes(),
  473. txmp.config.MaxTxsBytes,
  474. )
  475. if len(evictTxs) == 0 {
  476. // No room for the new incoming transaction so we just remove it from
  477. // the cache.
  478. txmp.cache.Remove(wtx.tx)
  479. txmp.logger.Error(
  480. "rejected incoming good transaction; mempool full",
  481. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  482. "err", err.Error(),
  483. )
  484. txmp.metrics.RejectedTxs.Add(1)
  485. return
  486. }
  487. // evict an existing transaction(s)
  488. //
  489. // NOTE:
  490. // - The transaction, toEvict, can be removed while a concurrent
  491. // reCheckTx callback is being executed for the same transaction.
  492. for _, toEvict := range evictTxs {
  493. txmp.removeTx(toEvict, true)
  494. txmp.logger.Debug(
  495. "evicted existing good transaction; mempool full",
  496. "old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
  497. "old_priority", toEvict.priority,
  498. "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  499. "new_priority", wtx.priority,
  500. )
  501. txmp.metrics.EvictedTxs.Add(1)
  502. }
  503. }
  504. wtx.gasWanted = checkTxRes.CheckTx.GasWanted
  505. wtx.priority = priority
  506. wtx.sender = sender
  507. wtx.peers = map[uint16]struct{}{
  508. txInfo.SenderID: {},
  509. }
  510. txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
  511. txmp.metrics.Size.Set(float64(txmp.Size()))
  512. txmp.insertTx(wtx)
  513. txmp.logger.Debug(
  514. "inserted good transaction",
  515. "priority", wtx.priority,
  516. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  517. "height", txmp.height,
  518. "num_txs", txmp.Size(),
  519. )
  520. txmp.notifyTxsAvailable()
  521. }
  522. // defaultTxCallback performs the default CheckTx application callback. This is
  523. // NOT executed when a transaction is first seen/received. Instead, this callback
  524. // is executed during re-checking transactions (if enabled). A caller, i.e a
  525. // block proposer, acquires a mempool write-lock via Lock() and when executing
  526. // Update(), if the mempool is non-empty and Recheck is enabled, then all
  527. // remaining transactions will be rechecked via CheckTxAsync. The order in which
  528. // they are rechecked must be the same order in which this callback is called
  529. // per transaction.
  530. func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) {
  531. if txmp.recheckCursor == nil {
  532. return
  533. }
  534. txmp.metrics.RecheckTimes.Add(1)
  535. checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
  536. if ok {
  537. tx := req.GetCheckTx().Tx
  538. wtx := txmp.recheckCursor.Value.(*WrappedTx)
  539. if !bytes.Equal(tx, wtx.tx) {
  540. panic(fmt.Sprintf("re-CheckTx transaction mismatch; got: %X, expected: %X", wtx.tx.Hash(), mempool.TxKey(tx)))
  541. }
  542. // Only evaluate transactions that have not been removed. This can happen
  543. // if an existing transaction is evicted during CheckTx and while this
  544. // callback is being executed for the same evicted transaction.
  545. if !txmp.txStore.IsTxRemoved(wtx.hash) {
  546. var err error
  547. if txmp.postCheck != nil {
  548. err = txmp.postCheck(tx, checkTxRes.CheckTx)
  549. }
  550. if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
  551. wtx.priority = checkTxRes.CheckTx.Priority
  552. } else {
  553. txmp.logger.Debug(
  554. "existing transaction no longer valid; failed re-CheckTx callback",
  555. "priority", wtx.priority,
  556. "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(wtx.tx)),
  557. "err", err,
  558. "code", checkTxRes.CheckTx.Code,
  559. )
  560. if wtx.gossipEl != txmp.recheckCursor {
  561. panic("corrupted reCheckTx cursor")
  562. }
  563. txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
  564. }
  565. }
  566. // move reCheckTx cursor to next element
  567. if txmp.recheckCursor == txmp.recheckEnd {
  568. txmp.recheckCursor = nil
  569. } else {
  570. txmp.recheckCursor = txmp.recheckCursor.Next()
  571. }
  572. if txmp.recheckCursor == nil {
  573. txmp.logger.Debug("finished rechecking transactions")
  574. if txmp.Size() > 0 {
  575. txmp.notifyTxsAvailable()
  576. }
  577. }
  578. txmp.metrics.Size.Set(float64(txmp.Size()))
  579. }
  580. }
  581. // updateReCheckTxs updates the recheck cursors by using the gossipIndex. For
  582. // each transaction, it executes CheckTxAsync. The global callback defined on
  583. // the proxyAppConn will be executed for each transaction after CheckTx is
  584. // executed.
  585. //
  586. // NOTE:
  587. // - The caller must have a write-lock when executing updateReCheckTxs.
  588. func (txmp *TxMempool) updateReCheckTxs() {
  589. if txmp.Size() == 0 {
  590. panic("attempted to update re-CheckTx txs when mempool is empty")
  591. }
  592. txmp.recheckCursor = txmp.gossipIndex.Front()
  593. txmp.recheckEnd = txmp.gossipIndex.Back()
  594. ctx := context.Background()
  595. for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() {
  596. wtx := e.Value.(*WrappedTx)
  597. // Only execute CheckTx if the transaction is not marked as removed which
  598. // could happen if the transaction was evicted.
  599. if !txmp.txStore.IsTxRemoved(wtx.hash) {
  600. _, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{
  601. Tx: wtx.tx,
  602. Type: abci.CheckTxType_Recheck,
  603. })
  604. if err != nil {
  605. // no need in retrying since the tx will be rechecked after the next block
  606. txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err)
  607. }
  608. }
  609. }
  610. if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil {
  611. txmp.logger.Error("failed to flush transactions during rechecking", "err", err)
  612. }
  613. }
  614. // canAddTx returns an error if we cannot insert the provided *WrappedTx into
  615. // the mempool due to mempool configured constraints. Otherwise, nil is returned
  616. // and the transaction can be inserted into the mempool.
  617. func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
  618. var (
  619. numTxs = txmp.Size()
  620. sizeBytes = txmp.SizeBytes()
  621. )
  622. if numTxs >= txmp.config.Size || int64(wtx.Size())+sizeBytes > txmp.config.MaxTxsBytes {
  623. return types.ErrMempoolIsFull{
  624. NumTxs: numTxs,
  625. MaxTxs: txmp.config.Size,
  626. TxsBytes: sizeBytes,
  627. MaxTxsBytes: txmp.config.MaxTxsBytes,
  628. }
  629. }
  630. return nil
  631. }
  632. func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
  633. txmp.txStore.SetTx(wtx)
  634. txmp.priorityIndex.PushTx(wtx)
  635. txmp.heightIndex.Insert(wtx)
  636. txmp.timestampIndex.Insert(wtx)
  637. // Insert the transaction into the gossip index and mark the reference to the
  638. // linked-list element, which will be needed at a later point when the
  639. // transaction is removed.
  640. gossipEl := txmp.gossipIndex.PushBack(wtx)
  641. wtx.gossipEl = gossipEl
  642. atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size()))
  643. }
  644. func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
  645. if txmp.txStore.IsTxRemoved(wtx.hash) {
  646. return
  647. }
  648. txmp.txStore.RemoveTx(wtx)
  649. txmp.priorityIndex.RemoveTx(wtx)
  650. txmp.heightIndex.Remove(wtx)
  651. txmp.timestampIndex.Remove(wtx)
  652. // Remove the transaction from the gossip index and cleanup the linked-list
  653. // element so it can be garbage collected.
  654. txmp.gossipIndex.Remove(wtx.gossipEl)
  655. wtx.gossipEl.DetachPrev()
  656. atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))
  657. if removeFromCache {
  658. txmp.cache.Remove(wtx.tx)
  659. }
  660. }
  661. // purgeExpiredTxs removes all transactions that have exceeded their respective
  662. // height and/or time based TTLs from their respective indexes. Every expired
  663. // transaction will be removed from the mempool entirely, except for the cache.
  664. //
  665. // NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which
  666. // the caller has a write-lock on the mempool and so we can safely iterate over
  667. // the height and time based indexes.
  668. func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
  669. now := time.Now()
  670. expiredTxs := make(map[[mempool.TxKeySize]byte]*WrappedTx)
  671. if txmp.config.TTLNumBlocks > 0 {
  672. purgeIdx := -1
  673. for i, wtx := range txmp.heightIndex.txs {
  674. if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks {
  675. expiredTxs[mempool.TxKey(wtx.tx)] = wtx
  676. purgeIdx = i
  677. } else {
  678. // since the index is sorted, we know no other txs can be be purged
  679. break
  680. }
  681. }
  682. if purgeIdx >= 0 {
  683. txmp.heightIndex.txs = txmp.heightIndex.txs[purgeIdx+1:]
  684. }
  685. }
  686. if txmp.config.TTLDuration > 0 {
  687. purgeIdx := -1
  688. for i, wtx := range txmp.timestampIndex.txs {
  689. if now.Sub(wtx.timestamp) > txmp.config.TTLDuration {
  690. expiredTxs[mempool.TxKey(wtx.tx)] = wtx
  691. purgeIdx = i
  692. } else {
  693. // since the index is sorted, we know no other txs can be be purged
  694. break
  695. }
  696. }
  697. if purgeIdx >= 0 {
  698. txmp.timestampIndex.txs = txmp.timestampIndex.txs[purgeIdx+1:]
  699. }
  700. }
  701. for _, wtx := range expiredTxs {
  702. txmp.removeTx(wtx, false)
  703. }
  704. }
  705. func (txmp *TxMempool) notifyTxsAvailable() {
  706. if txmp.Size() == 0 {
  707. panic("attempt to notify txs available but mempool is empty!")
  708. }
  709. if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable {
  710. // channel cap is 1, so this will send once
  711. txmp.notifiedTxsAvailable = true
  712. select {
  713. case txmp.txsAvailable <- struct{}{}:
  714. default:
  715. }
  716. }
  717. }