You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

769 lines
23 KiB

  1. package v1
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "sync/atomic"
  7. "time"
  8. abci "github.com/tendermint/tendermint/abci/types"
  9. "github.com/tendermint/tendermint/config"
  10. "github.com/tendermint/tendermint/internal/libs/clist"
  11. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  12. "github.com/tendermint/tendermint/internal/mempool"
  13. "github.com/tendermint/tendermint/libs/log"
  14. tmmath "github.com/tendermint/tendermint/libs/math"
  15. pubmempool "github.com/tendermint/tendermint/pkg/mempool"
  16. "github.com/tendermint/tendermint/proxy"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. var _ mempool.Mempool = (*TxMempool)(nil)
  20. // TxMempoolOption sets an optional parameter on the TxMempool.
  21. type TxMempoolOption func(*TxMempool)
  22. // TxMempool defines a prioritized mempool data structure used by the v1 mempool
  23. // reactor. It keeps a thread-safe priority queue of transactions that is used
  24. // when a block proposer constructs a block and a thread-safe linked-list that
  25. // is used to gossip transactions to peers in a FIFO manner.
  26. type TxMempool struct {
  27. logger log.Logger
  28. metrics *mempool.Metrics
  29. config *config.MempoolConfig
  30. proxyAppConn proxy.AppConnMempool
  31. // txsAvailable fires once for each height when the mempool is not empty
  32. txsAvailable chan struct{}
  33. notifiedTxsAvailable bool
  34. // height defines the last block height process during Update()
  35. height int64
  36. // sizeBytes defines the total size of the mempool (sum of all tx bytes)
  37. sizeBytes int64
  38. // cache defines a fixed-size cache of already seen transactions as this
  39. // reduces pressure on the proxyApp.
  40. cache mempool.TxCache
  41. // txStore defines the main storage of valid transactions. Indexes are built
  42. // on top of this store.
  43. txStore *TxStore
  44. // gossipIndex defines the gossiping index of valid transactions via a
  45. // thread-safe linked-list. We also use the gossip index as a cursor for
  46. // rechecking transactions already in the mempool.
  47. gossipIndex *clist.CList
  48. // recheckCursor and recheckEnd are used as cursors based on the gossip index
  49. // to recheck transactions that are already in the mempool. Iteration is not
  50. // thread-safe and transaction may be mutated in serial order.
  51. //
  52. // XXX/TODO: It might be somewhat of a codesmell to use the gossip index for
  53. // iterator and cursor management when rechecking transactions. If the gossip
  54. // index changes or is removed in a future refactor, this will have to be
  55. // refactored. Instead, we should consider just keeping a slice of a snapshot
  56. // of the mempool's current transactions during Update and an integer cursor
  57. // into that slice. This, however, requires additional O(n) space complexity.
  58. recheckCursor *clist.CElement // next expected response
  59. recheckEnd *clist.CElement // re-checking stops here
  60. // priorityIndex defines the priority index of valid transactions via a
  61. // thread-safe priority queue.
  62. priorityIndex *TxPriorityQueue
  63. // A read/write lock is used to safe guard updates, insertions and deletions
  64. // from the mempool. A read-lock is implicitly acquired when executing CheckTx,
  65. // however, a caller must explicitly grab a write-lock via Lock when updating
  66. // the mempool via Update().
  67. mtx tmsync.RWMutex
  68. preCheck mempool.PreCheckFunc
  69. postCheck mempool.PostCheckFunc
  70. }
  71. func NewTxMempool(
  72. logger log.Logger,
  73. cfg *config.MempoolConfig,
  74. proxyAppConn proxy.AppConnMempool,
  75. height int64,
  76. options ...TxMempoolOption,
  77. ) *TxMempool {
  78. txmp := &TxMempool{
  79. logger: logger,
  80. config: cfg,
  81. proxyAppConn: proxyAppConn,
  82. height: height,
  83. cache: mempool.NopTxCache{},
  84. metrics: mempool.NopMetrics(),
  85. txStore: NewTxStore(),
  86. gossipIndex: clist.New(),
  87. priorityIndex: NewTxPriorityQueue(),
  88. }
  89. if cfg.CacheSize > 0 {
  90. txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize)
  91. }
  92. proxyAppConn.SetResponseCallback(txmp.defaultTxCallback)
  93. for _, opt := range options {
  94. opt(txmp)
  95. }
  96. return txmp
  97. }
  98. // WithPreCheck sets a filter for the mempool to reject a transaction if f(tx)
  99. // returns an error. This is executed before CheckTx. It only applies to the
  100. // first created block. After that, Update() overwrites the existing value.
  101. func WithPreCheck(f mempool.PreCheckFunc) TxMempoolOption {
  102. return func(txmp *TxMempool) { txmp.preCheck = f }
  103. }
  104. // WithPostCheck sets a filter for the mempool to reject a transaction if
  105. // f(tx, resp) returns an error. This is executed after CheckTx. It only applies
  106. // to the first created block. After that, Update overwrites the existing value.
  107. func WithPostCheck(f mempool.PostCheckFunc) TxMempoolOption {
  108. return func(txmp *TxMempool) { txmp.postCheck = f }
  109. }
  110. // WithMetrics sets the mempool's metrics collector.
  111. func WithMetrics(metrics *mempool.Metrics) TxMempoolOption {
  112. return func(txmp *TxMempool) { txmp.metrics = metrics }
  113. }
  114. // Lock obtains a write-lock on the mempool. A caller must be sure to explicitly
  115. // release the lock when finished.
  116. func (txmp *TxMempool) Lock() {
  117. txmp.mtx.Lock()
  118. }
  119. // Unlock releases a write-lock on the mempool.
  120. func (txmp *TxMempool) Unlock() {
  121. txmp.mtx.Unlock()
  122. }
  123. // Size returns the number of valid transactions in the mempool. It is
  124. // thread-safe.
  125. func (txmp *TxMempool) Size() int {
  126. return txmp.txStore.Size()
  127. }
  128. // SizeBytes return the total sum in bytes of all the valid transactions in the
  129. // mempool. It is thread-safe.
  130. func (txmp *TxMempool) SizeBytes() int64 {
  131. return atomic.LoadInt64(&txmp.sizeBytes)
  132. }
  133. // FlushAppConn executes FlushSync on the mempool's proxyAppConn.
  134. //
  135. // NOTE: The caller must obtain a write-lock via Lock() prior to execution.
  136. func (txmp *TxMempool) FlushAppConn() error {
  137. return txmp.proxyAppConn.FlushSync(context.Background())
  138. }
  139. // WaitForNextTx returns a blocking channel that will be closed when the next
  140. // valid transaction is available to gossip. It is thread-safe.
  141. func (txmp *TxMempool) WaitForNextTx() <-chan struct{} {
  142. return txmp.gossipIndex.WaitChan()
  143. }
  144. // NextGossipTx returns the next valid transaction to gossip. A caller must wait
  145. // for WaitForNextTx to signal a transaction is available to gossip first. It is
  146. // thread-safe.
  147. func (txmp *TxMempool) NextGossipTx() *WrappedTx {
  148. return txmp.gossipIndex.Front().Value.(*WrappedTx)
  149. }
  150. // EnableTxsAvailable enables the mempool to trigger events when transactions
  151. // are available on a block by block basis.
  152. func (txmp *TxMempool) EnableTxsAvailable() {
  153. txmp.mtx.Lock()
  154. defer txmp.mtx.Unlock()
  155. txmp.txsAvailable = make(chan struct{}, 1)
  156. }
  157. // TxsAvailable returns a channel which fires once for every height, and only
  158. // when transactions are available in the mempool. It is thread-safe.
  159. func (txmp *TxMempool) TxsAvailable() <-chan struct{} {
  160. return txmp.txsAvailable
  161. }
  162. // CheckTx executes the ABCI CheckTx method for a given transaction. It acquires
  163. // a read-lock attempts to execute the application's CheckTx ABCI method via
  164. // CheckTxAsync. We return an error if any of the following happen:
  165. //
  166. // - The CheckTxAsync execution fails.
  167. // - The transaction already exists in the cache and we've already received the
  168. // transaction from the peer. Otherwise, if it solely exists in the cache, we
  169. // return nil.
  170. // - The transaction size exceeds the maximum transaction size as defined by the
  171. // configuration provided to the mempool.
  172. // - The transaction fails Pre-Check (if it is defined).
  173. // - The proxyAppConn fails, e.g. the buffer is full.
  174. //
  175. // If the mempool is full, we still execute CheckTx and attempt to find a lower
  176. // priority transaction to evict. If such a transaction exists, we remove the
  177. // lower priority transaction and add the new one with higher priority.
  178. //
  179. // NOTE:
  180. // - The applications' CheckTx implementation may panic.
  181. // - The caller is not to explicitly require any locks for executing CheckTx.
  182. func (txmp *TxMempool) CheckTx(
  183. ctx context.Context,
  184. tx types.Tx,
  185. cb func(*abci.Response),
  186. txInfo mempool.TxInfo,
  187. ) error {
  188. txmp.mtx.RLock()
  189. defer txmp.mtx.RUnlock()
  190. txSize := len(tx)
  191. if txSize > txmp.config.MaxTxBytes {
  192. return pubmempool.ErrTxTooLarge{
  193. Max: txmp.config.MaxTxBytes,
  194. Actual: txSize,
  195. }
  196. }
  197. if txmp.preCheck != nil {
  198. if err := txmp.preCheck(tx); err != nil {
  199. return pubmempool.ErrPreCheck{
  200. Reason: err,
  201. }
  202. }
  203. }
  204. if err := txmp.proxyAppConn.Error(); err != nil {
  205. return err
  206. }
  207. txHash := mempool.TxKey(tx)
  208. // We add the transaction to the mempool's cache and if the transaction already
  209. // exists, i.e. false is returned, then we check if we've seen this transaction
  210. // from the same sender and error if we have. Otherwise, we return nil.
  211. if !txmp.cache.Push(tx) {
  212. wtx, ok := txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID)
  213. if wtx != nil && ok {
  214. // We already have the transaction stored and the we've already seen this
  215. // transaction from txInfo.SenderID.
  216. return pubmempool.ErrTxInCache
  217. }
  218. txmp.logger.Debug("tx exists already in cache", "tx_hash", tx.Hash())
  219. return nil
  220. }
  221. if ctx == nil {
  222. ctx = context.Background()
  223. }
  224. reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})
  225. if err != nil {
  226. txmp.cache.Remove(tx)
  227. return err
  228. }
  229. reqRes.SetCallback(func(res *abci.Response) {
  230. if txmp.recheckCursor != nil {
  231. panic("recheck cursor is non-nil in CheckTx callback")
  232. }
  233. wtx := &WrappedTx{
  234. tx: tx,
  235. hash: txHash,
  236. timestamp: time.Now().UTC(),
  237. }
  238. txmp.initTxCallback(wtx, res, txInfo)
  239. if cb != nil {
  240. cb(res)
  241. }
  242. })
  243. return nil
  244. }
  245. // Flush flushes out the mempool. It acquires a read-lock, fetches all the
  246. // transactions currently in the transaction store and removes each transaction
  247. // from the store and all indexes and finally resets the cache.
  248. //
  249. // NOTE:
  250. // - Flushing the mempool may leave the mempool in an inconsistent state.
  251. func (txmp *TxMempool) Flush() {
  252. txmp.mtx.RLock()
  253. defer txmp.mtx.RUnlock()
  254. for _, wtx := range txmp.txStore.GetAllTxs() {
  255. if !txmp.txStore.IsTxRemoved(wtx.hash) {
  256. txmp.txStore.RemoveTx(wtx)
  257. txmp.priorityIndex.RemoveTx(wtx)
  258. txmp.gossipIndex.Remove(wtx.gossipEl)
  259. wtx.gossipEl.DetachPrev()
  260. }
  261. }
  262. atomic.SwapInt64(&txmp.sizeBytes, 0)
  263. txmp.cache.Reset()
  264. }
  265. // ReapMaxBytesMaxGas returns a list of transactions within the provided size
  266. // and gas constraints. Transaction are retrieved in priority order.
  267. //
  268. // NOTE:
  269. // - A read-lock is acquired.
  270. // - Transactions returned are not actually removed from the mempool transaction
  271. // store or indexes.
  272. func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
  273. txmp.mtx.RLock()
  274. defer txmp.mtx.RUnlock()
  275. var (
  276. totalGas int64
  277. totalSize int64
  278. )
  279. // wTxs contains a list of *WrappedTx retrieved from the priority queue that
  280. // need to be re-enqueued prior to returning.
  281. wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs())
  282. defer func() {
  283. for _, wtx := range wTxs {
  284. txmp.priorityIndex.PushTx(wtx)
  285. }
  286. }()
  287. txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs())
  288. for txmp.priorityIndex.NumTxs() > 0 {
  289. wtx := txmp.priorityIndex.PopTx()
  290. txs = append(txs, wtx.tx)
  291. wTxs = append(wTxs, wtx)
  292. size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx})
  293. // Ensure we have capacity for the transaction with respect to the
  294. // transaction size.
  295. if maxBytes > -1 && totalSize+size > maxBytes {
  296. return txs[:len(txs)-1]
  297. }
  298. totalSize += size
  299. // ensure we have capacity for the transaction with respect to total gas
  300. gas := totalGas + wtx.gasWanted
  301. if maxGas > -1 && gas > maxGas {
  302. return txs[:len(txs)-1]
  303. }
  304. totalGas = gas
  305. }
  306. return txs
  307. }
  308. // ReapMaxTxs returns a list of transactions within the provided number of
  309. // transactions bound. Transaction are retrieved in priority order.
  310. //
  311. // NOTE:
  312. // - A read-lock is acquired.
  313. // - Transactions returned are not actually removed from the mempool transaction
  314. // store or indexes.
  315. func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
  316. txmp.mtx.RLock()
  317. defer txmp.mtx.RUnlock()
  318. numTxs := txmp.priorityIndex.NumTxs()
  319. if max < 0 {
  320. max = numTxs
  321. }
  322. cap := tmmath.MinInt(numTxs, max)
  323. // wTxs contains a list of *WrappedTx retrieved from the priority queue that
  324. // need to be re-enqueued prior to returning.
  325. wTxs := make([]*WrappedTx, 0, cap)
  326. defer func() {
  327. for _, wtx := range wTxs {
  328. txmp.priorityIndex.PushTx(wtx)
  329. }
  330. }()
  331. txs := make([]types.Tx, 0, cap)
  332. for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max {
  333. wtx := txmp.priorityIndex.PopTx()
  334. txs = append(txs, wtx.tx)
  335. wTxs = append(wTxs, wtx)
  336. }
  337. return txs
  338. }
  339. // Update iterates over all the transactions provided by the caller, i.e. the
  340. // block producer, and removes them from the cache (if applicable) and removes
  341. // the transactions from the main transaction store and associated indexes.
  342. // Finally, if there are trainsactions remaining in the mempool, we initiate a
  343. // re-CheckTx for them (if applicable), otherwise, we notify the caller more
  344. // transactions are available.
  345. //
  346. // NOTE:
  347. // - The caller must explicitly acquire a write-lock via Lock().
  348. func (txmp *TxMempool) Update(
  349. blockHeight int64,
  350. blockTxs types.Txs,
  351. deliverTxResponses []*abci.ResponseDeliverTx,
  352. newPreFn mempool.PreCheckFunc,
  353. newPostFn mempool.PostCheckFunc,
  354. ) error {
  355. txmp.height = blockHeight
  356. txmp.notifiedTxsAvailable = false
  357. if newPreFn != nil {
  358. txmp.preCheck = newPreFn
  359. }
  360. if newPostFn != nil {
  361. txmp.postCheck = newPostFn
  362. }
  363. for i, tx := range blockTxs {
  364. if deliverTxResponses[i].Code == abci.CodeTypeOK {
  365. // add the valid committed transaction to the cache (if missing)
  366. _ = txmp.cache.Push(tx)
  367. } else if !txmp.config.KeepInvalidTxsInCache {
  368. // allow invalid transactions to be re-submitted
  369. txmp.cache.Remove(tx)
  370. }
  371. // remove the committed transaction from the transaction store and indexes
  372. if wtx := txmp.txStore.GetTxByHash(mempool.TxKey(tx)); wtx != nil {
  373. txmp.removeTx(wtx, false)
  374. }
  375. }
  376. // If there any uncommitted transactions left in the mempool, we either
  377. // initiate re-CheckTx per remaining transaction or notify that remaining
  378. // transactions are left.
  379. if txmp.Size() > 0 {
  380. if txmp.config.Recheck {
  381. txmp.logger.Debug(
  382. "executing re-CheckTx for all remaining transactions",
  383. "num_txs", txmp.Size(),
  384. "height", blockHeight,
  385. )
  386. txmp.updateReCheckTxs()
  387. } else {
  388. txmp.notifyTxsAvailable()
  389. }
  390. }
  391. txmp.metrics.Size.Set(float64(txmp.Size()))
  392. return nil
  393. }
  394. // initTxCallback performs the initial, i.e. the first, callback after CheckTx
  395. // has been executed by the ABCI application. In other words, initTxCallback is
  396. // called after executing CheckTx when we see a unique transaction for the first
  397. // time. CheckTx can be called again for the same transaction at a later point
  398. // in time when re-checking, however, this callback will not be called.
  399. //
  400. // After the ABCI application executes CheckTx, initTxCallback is called with
  401. // the ABCI *Response object and TxInfo. If postCheck is defined on the mempool,
  402. // we execute that first. If there is no error from postCheck (if defined) and
  403. // the ABCI CheckTx response code is OK, we attempt to insert the transaction.
  404. //
  405. // When attempting to insert the transaction, we first check if there is
  406. // sufficient capacity. If there is sufficient capacity, the transaction is
  407. // inserted into the txStore and indexed across all indexes. Otherwise, if the
  408. // mempool is full, we attempt to find a lower priority transaction to evict in
  409. // place of the new incoming transaction. If no such transaction exists, the
  410. // new incoming transaction is rejected.
  411. //
  412. // If the new incoming transaction fails CheckTx or postCheck fails, we reject
  413. // the new incoming transaction.
  414. //
  415. // NOTE:
  416. // - An explicit lock is NOT required.
  417. func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo mempool.TxInfo) {
  418. checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
  419. if ok {
  420. var err error
  421. if txmp.postCheck != nil {
  422. err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx)
  423. }
  424. if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
  425. sender := checkTxRes.CheckTx.Sender
  426. priority := checkTxRes.CheckTx.Priority
  427. if len(sender) > 0 {
  428. if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil {
  429. txmp.logger.Error(
  430. "rejected incoming good transaction; tx already exists for sender",
  431. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  432. "sender", sender,
  433. )
  434. txmp.metrics.RejectedTxs.Add(1)
  435. return
  436. }
  437. }
  438. if err := txmp.canAddTx(wtx); err != nil {
  439. evictTxs := txmp.priorityIndex.GetEvictableTxs(
  440. priority,
  441. int64(wtx.Size()),
  442. txmp.SizeBytes(),
  443. txmp.config.MaxTxsBytes,
  444. )
  445. if len(evictTxs) == 0 {
  446. // No room for the new incoming transaction so we just remove it from
  447. // the cache.
  448. txmp.cache.Remove(wtx.tx)
  449. txmp.logger.Error(
  450. "rejected incoming good transaction; mempool full",
  451. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  452. "err", err.Error(),
  453. )
  454. txmp.metrics.RejectedTxs.Add(1)
  455. return
  456. }
  457. // evict an existing transaction(s)
  458. //
  459. // NOTE:
  460. // - The transaction, toEvict, can be removed while a concurrent
  461. // reCheckTx callback is being executed for the same transaction.
  462. for _, toEvict := range evictTxs {
  463. txmp.removeTx(toEvict, true)
  464. txmp.logger.Debug(
  465. "evicted existing good transaction; mempool full",
  466. "old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
  467. "old_priority", toEvict.priority,
  468. "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  469. "new_priority", wtx.priority,
  470. )
  471. txmp.metrics.EvictedTxs.Add(1)
  472. }
  473. }
  474. wtx.gasWanted = checkTxRes.CheckTx.GasWanted
  475. wtx.height = txmp.height
  476. wtx.priority = priority
  477. wtx.sender = sender
  478. wtx.peers = map[uint16]struct{}{
  479. txInfo.SenderID: {},
  480. }
  481. txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
  482. txmp.metrics.Size.Set(float64(txmp.Size()))
  483. txmp.insertTx(wtx)
  484. txmp.logger.Debug(
  485. "inserted good transaction",
  486. "priority", wtx.priority,
  487. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  488. "height", txmp.height,
  489. "num_txs", txmp.Size(),
  490. )
  491. txmp.notifyTxsAvailable()
  492. } else {
  493. // ignore bad transactions
  494. txmp.logger.Info(
  495. "rejected bad transaction",
  496. "priority", wtx.priority,
  497. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  498. "peer_id", txInfo.SenderNodeID,
  499. "code", checkTxRes.CheckTx.Code,
  500. "post_check_err", err,
  501. )
  502. txmp.metrics.FailedTxs.Add(1)
  503. if !txmp.config.KeepInvalidTxsInCache {
  504. txmp.cache.Remove(wtx.tx)
  505. }
  506. }
  507. }
  508. }
  509. // defaultTxCallback performs the default CheckTx application callback. This is
  510. // NOT executed when a transaction is first seen/received. Instead, this callback
  511. // is executed during re-checking transactions (if enabled). A caller, i.e a
  512. // block proposer, acquires a mempool write-lock via Lock() and when executing
  513. // Update(), if the mempool is non-empty and Recheck is enabled, then all
  514. // remaining transactions will be rechecked via CheckTxAsync. The order in which
  515. // they are rechecked must be the same order in which this callback is called
  516. // per transaction.
  517. func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) {
  518. if txmp.recheckCursor == nil {
  519. return
  520. }
  521. txmp.metrics.RecheckTimes.Add(1)
  522. checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
  523. if ok {
  524. tx := req.GetCheckTx().Tx
  525. wtx := txmp.recheckCursor.Value.(*WrappedTx)
  526. if !bytes.Equal(tx, wtx.tx) {
  527. panic(fmt.Sprintf("re-CheckTx transaction mismatch; got: %X, expected: %X", wtx.tx.Hash(), mempool.TxKey(tx)))
  528. }
  529. // Only evaluate transactions that have not been removed. This can happen
  530. // if an existing transaction is evicted during CheckTx and while this
  531. // callback is being executed for the same evicted transaction.
  532. if !txmp.txStore.IsTxRemoved(wtx.hash) {
  533. var err error
  534. if txmp.postCheck != nil {
  535. err = txmp.postCheck(tx, checkTxRes.CheckTx)
  536. }
  537. if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
  538. wtx.priority = checkTxRes.CheckTx.Priority
  539. } else {
  540. txmp.logger.Debug(
  541. "existing transaction no longer valid; failed re-CheckTx callback",
  542. "priority", wtx.priority,
  543. "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(wtx.tx)),
  544. "err", err,
  545. "code", checkTxRes.CheckTx.Code,
  546. )
  547. if wtx.gossipEl != txmp.recheckCursor {
  548. panic("corrupted reCheckTx cursor")
  549. }
  550. txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
  551. }
  552. }
  553. // move reCheckTx cursor to next element
  554. if txmp.recheckCursor == txmp.recheckEnd {
  555. txmp.recheckCursor = nil
  556. } else {
  557. txmp.recheckCursor = txmp.recheckCursor.Next()
  558. }
  559. if txmp.recheckCursor == nil {
  560. txmp.logger.Debug("finished rechecking transactions")
  561. if txmp.Size() > 0 {
  562. txmp.notifyTxsAvailable()
  563. }
  564. }
  565. txmp.metrics.Size.Set(float64(txmp.Size()))
  566. }
  567. }
  568. // updateReCheckTxs updates the recheck cursors by using the gossipIndex. For
  569. // each transaction, it executes CheckTxAsync. The global callback defined on
  570. // the proxyAppConn will be executed for each transaction after CheckTx is
  571. // executed.
  572. //
  573. // NOTE:
  574. // - The caller must have a write-lock when executing updateReCheckTxs.
  575. func (txmp *TxMempool) updateReCheckTxs() {
  576. if txmp.Size() == 0 {
  577. panic("attempted to update re-CheckTx txs when mempool is empty")
  578. }
  579. txmp.recheckCursor = txmp.gossipIndex.Front()
  580. txmp.recheckEnd = txmp.gossipIndex.Back()
  581. ctx := context.Background()
  582. for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() {
  583. wtx := e.Value.(*WrappedTx)
  584. // Only execute CheckTx if the transaction is not marked as removed which
  585. // could happen if the transaction was evicted.
  586. if !txmp.txStore.IsTxRemoved(wtx.hash) {
  587. _, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{
  588. Tx: wtx.tx,
  589. Type: abci.CheckTxType_Recheck,
  590. })
  591. if err != nil {
  592. // no need in retrying since the tx will be rechecked after the next block
  593. txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err)
  594. }
  595. }
  596. }
  597. if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil {
  598. txmp.logger.Error("failed to flush transactions during rechecking", "err", err)
  599. }
  600. }
  601. // canAddTx returns an error if we cannot insert the provided *WrappedTx into
  602. // the mempool due to mempool configured constraints. Otherwise, nil is returned
  603. // and the transaction can be inserted into the mempool.
  604. func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
  605. var (
  606. numTxs = txmp.Size()
  607. sizeBytes = txmp.SizeBytes()
  608. )
  609. if numTxs >= txmp.config.Size || int64(wtx.Size())+sizeBytes > txmp.config.MaxTxsBytes {
  610. return pubmempool.ErrMempoolIsFull{
  611. NumTxs: numTxs,
  612. MaxTxs: txmp.config.Size,
  613. TxsBytes: sizeBytes,
  614. MaxTxsBytes: txmp.config.MaxTxsBytes,
  615. }
  616. }
  617. return nil
  618. }
  619. func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
  620. txmp.txStore.SetTx(wtx)
  621. txmp.priorityIndex.PushTx(wtx)
  622. // Insert the transaction into the gossip index and mark the reference to the
  623. // linked-list element, which will be needed at a later point when the
  624. // transaction is removed.
  625. gossipEl := txmp.gossipIndex.PushBack(wtx)
  626. wtx.gossipEl = gossipEl
  627. atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size()))
  628. }
  629. func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
  630. if txmp.txStore.IsTxRemoved(wtx.hash) {
  631. return
  632. }
  633. txmp.txStore.RemoveTx(wtx)
  634. txmp.priorityIndex.RemoveTx(wtx)
  635. // Remove the transaction from the gossip index and cleanup the linked-list
  636. // element so it can be garbage collected.
  637. txmp.gossipIndex.Remove(wtx.gossipEl)
  638. wtx.gossipEl.DetachPrev()
  639. atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))
  640. if removeFromCache {
  641. txmp.cache.Remove(wtx.tx)
  642. }
  643. }
  644. func (txmp *TxMempool) notifyTxsAvailable() {
  645. if txmp.Size() == 0 {
  646. panic("attempt to notify txs available but mempool is empty!")
  647. }
  648. if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable {
  649. // channel cap is 1, so this will send once
  650. txmp.notifiedTxsAvailable = true
  651. select {
  652. case txmp.txsAvailable <- struct{}{}:
  653. default:
  654. }
  655. }
  656. }