You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

865 lines
26 KiB

8 years ago
mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
9 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "reflect"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. abci "github.com/tendermint/tendermint/abci/types"
  12. "github.com/tendermint/tendermint/config"
  13. "github.com/tendermint/tendermint/internal/libs/clist"
  14. "github.com/tendermint/tendermint/internal/proxy"
  15. "github.com/tendermint/tendermint/libs/log"
  16. tmmath "github.com/tendermint/tendermint/libs/math"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. var _ Mempool = (*TxMempool)(nil)
  20. // TxMempoolOption sets an optional parameter on the TxMempool.
  21. type TxMempoolOption func(*TxMempool)
  22. // TxMempool defines a prioritized mempool data structure used by the v1 mempool
  23. // reactor. It keeps a thread-safe priority queue of transactions that is used
  24. // when a block proposer constructs a block and a thread-safe linked-list that
  25. // is used to gossip transactions to peers in a FIFO manner.
  26. type TxMempool struct {
  27. logger log.Logger
  28. metrics *Metrics
  29. config *config.MempoolConfig
  30. proxyAppConn proxy.AppConnMempool
  31. // txsAvailable fires once for each height when the mempool is not empty
  32. txsAvailable chan struct{}
  33. notifiedTxsAvailable bool
  34. // height defines the last block height process during Update()
  35. height int64
  36. // sizeBytes defines the total size of the mempool (sum of all tx bytes)
  37. sizeBytes int64
  38. // cache defines a fixed-size cache of already seen transactions as this
  39. // reduces pressure on the proxyApp.
  40. cache TxCache
  41. // txStore defines the main storage of valid transactions. Indexes are built
  42. // on top of this store.
  43. txStore *TxStore
  44. // gossipIndex defines the gossiping index of valid transactions via a
  45. // thread-safe linked-list. We also use the gossip index as a cursor for
  46. // rechecking transactions already in the mempool.
  47. gossipIndex *clist.CList
  48. // recheckCursor and recheckEnd are used as cursors based on the gossip index
  49. // to recheck transactions that are already in the mempool. Iteration is not
  50. // thread-safe and transaction may be mutated in serial order.
  51. //
  52. // XXX/TODO: It might be somewhat of a codesmell to use the gossip index for
  53. // iterator and cursor management when rechecking transactions. If the gossip
  54. // index changes or is removed in a future refactor, this will have to be
  55. // refactored. Instead, we should consider just keeping a slice of a snapshot
  56. // of the mempool's current transactions during Update and an integer cursor
  57. // into that slice. This, however, requires additional O(n) space complexity.
  58. recheckCursor *clist.CElement // next expected response
  59. recheckEnd *clist.CElement // re-checking stops here
  60. // priorityIndex defines the priority index of valid transactions via a
  61. // thread-safe priority queue.
  62. priorityIndex *TxPriorityQueue
  63. // heightIndex defines a height-based, in ascending order, transaction index.
  64. // i.e. older transactions are first.
  65. heightIndex *WrappedTxList
  66. // timestampIndex defines a timestamp-based, in ascending order, transaction
  67. // index. i.e. older transactions are first.
  68. timestampIndex *WrappedTxList
  69. // A read/write lock is used to safe guard updates, insertions and deletions
  70. // from the mempool. A read-lock is implicitly acquired when executing CheckTx,
  71. // however, a caller must explicitly grab a write-lock via Lock when updating
  72. // the mempool via Update().
  73. mtx sync.RWMutex
  74. preCheck PreCheckFunc
  75. postCheck PostCheckFunc
  76. }
  77. func NewTxMempool(
  78. logger log.Logger,
  79. cfg *config.MempoolConfig,
  80. proxyAppConn proxy.AppConnMempool,
  81. height int64,
  82. options ...TxMempoolOption,
  83. ) *TxMempool {
  84. txmp := &TxMempool{
  85. logger: logger,
  86. config: cfg,
  87. proxyAppConn: proxyAppConn,
  88. height: height,
  89. cache: NopTxCache{},
  90. metrics: NopMetrics(),
  91. txStore: NewTxStore(),
  92. gossipIndex: clist.New(),
  93. priorityIndex: NewTxPriorityQueue(),
  94. heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
  95. return wtx1.height >= wtx2.height
  96. }),
  97. timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
  98. return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
  99. }),
  100. }
  101. if cfg.CacheSize > 0 {
  102. txmp.cache = NewLRUTxCache(cfg.CacheSize)
  103. }
  104. proxyAppConn.SetResponseCallback(txmp.defaultTxCallback)
  105. for _, opt := range options {
  106. opt(txmp)
  107. }
  108. return txmp
  109. }
  110. // WithPreCheck sets a filter for the mempool to reject a transaction if f(tx)
  111. // returns an error. This is executed before CheckTx. It only applies to the
  112. // first created block. After that, Update() overwrites the existing value.
  113. func WithPreCheck(f PreCheckFunc) TxMempoolOption {
  114. return func(txmp *TxMempool) { txmp.preCheck = f }
  115. }
  116. // WithPostCheck sets a filter for the mempool to reject a transaction if
  117. // f(tx, resp) returns an error. This is executed after CheckTx. It only applies
  118. // to the first created block. After that, Update overwrites the existing value.
  119. func WithPostCheck(f PostCheckFunc) TxMempoolOption {
  120. return func(txmp *TxMempool) { txmp.postCheck = f }
  121. }
  122. // WithMetrics sets the mempool's metrics collector.
  123. func WithMetrics(metrics *Metrics) TxMempoolOption {
  124. return func(txmp *TxMempool) { txmp.metrics = metrics }
  125. }
  126. // Lock obtains a write-lock on the mempool. A caller must be sure to explicitly
  127. // release the lock when finished.
  128. func (txmp *TxMempool) Lock() {
  129. txmp.mtx.Lock()
  130. }
  131. // Unlock releases a write-lock on the mempool.
  132. func (txmp *TxMempool) Unlock() {
  133. txmp.mtx.Unlock()
  134. }
  135. // Size returns the number of valid transactions in the mempool. It is
  136. // thread-safe.
  137. func (txmp *TxMempool) Size() int {
  138. return txmp.txStore.Size()
  139. }
  140. // SizeBytes return the total sum in bytes of all the valid transactions in the
  141. // mempool. It is thread-safe.
  142. func (txmp *TxMempool) SizeBytes() int64 {
  143. return atomic.LoadInt64(&txmp.sizeBytes)
  144. }
  145. // FlushAppConn executes FlushSync on the mempool's proxyAppConn.
  146. //
  147. // NOTE: The caller must obtain a write-lock prior to execution.
  148. func (txmp *TxMempool) FlushAppConn(ctx context.Context) error {
  149. return txmp.proxyAppConn.FlushSync(ctx)
  150. }
  151. // WaitForNextTx returns a blocking channel that will be closed when the next
  152. // valid transaction is available to gossip. It is thread-safe.
  153. func (txmp *TxMempool) WaitForNextTx() <-chan struct{} {
  154. return txmp.gossipIndex.WaitChan()
  155. }
  156. // NextGossipTx returns the next valid transaction to gossip. A caller must wait
  157. // for WaitForNextTx to signal a transaction is available to gossip first. It is
  158. // thread-safe.
  159. func (txmp *TxMempool) NextGossipTx() *clist.CElement {
  160. return txmp.gossipIndex.Front()
  161. }
  162. // EnableTxsAvailable enables the mempool to trigger events when transactions
  163. // are available on a block by block basis.
  164. func (txmp *TxMempool) EnableTxsAvailable() {
  165. txmp.mtx.Lock()
  166. defer txmp.mtx.Unlock()
  167. txmp.txsAvailable = make(chan struct{}, 1)
  168. }
  169. // TxsAvailable returns a channel which fires once for every height, and only
  170. // when transactions are available in the mempool. It is thread-safe.
  171. func (txmp *TxMempool) TxsAvailable() <-chan struct{} {
  172. return txmp.txsAvailable
  173. }
  174. // CheckTx executes the ABCI CheckTx method for a given transaction. It acquires
  175. // a read-lock attempts to execute the application's CheckTx ABCI method via
  176. // CheckTxAsync. We return an error if any of the following happen:
  177. //
  178. // - The CheckTxAsync execution fails.
  179. // - The transaction already exists in the cache and we've already received the
  180. // transaction from the peer. Otherwise, if it solely exists in the cache, we
  181. // return nil.
  182. // - The transaction size exceeds the maximum transaction size as defined by the
  183. // configuration provided to the mempool.
  184. // - The transaction fails Pre-Check (if it is defined).
  185. // - The proxyAppConn fails, e.g. the buffer is full.
  186. //
  187. // If the mempool is full, we still execute CheckTx and attempt to find a lower
  188. // priority transaction to evict. If such a transaction exists, we remove the
  189. // lower priority transaction and add the new one with higher priority.
  190. //
  191. // NOTE:
  192. // - The applications' CheckTx implementation may panic.
  193. // - The caller is not to explicitly require any locks for executing CheckTx.
  194. func (txmp *TxMempool) CheckTx(
  195. ctx context.Context,
  196. tx types.Tx,
  197. cb func(*abci.Response),
  198. txInfo TxInfo,
  199. ) error {
  200. txmp.mtx.RLock()
  201. defer txmp.mtx.RUnlock()
  202. if txSize := len(tx); txSize > txmp.config.MaxTxBytes {
  203. return types.ErrTxTooLarge{
  204. Max: txmp.config.MaxTxBytes,
  205. Actual: txSize,
  206. }
  207. }
  208. if txmp.preCheck != nil {
  209. if err := txmp.preCheck(tx); err != nil {
  210. return types.ErrPreCheck{Reason: err}
  211. }
  212. }
  213. if err := txmp.proxyAppConn.Error(); err != nil {
  214. return err
  215. }
  216. txHash := tx.Key()
  217. // We add the transaction to the mempool's cache and if the transaction already
  218. // exists, i.e. false is returned, then we check if we've seen this transaction
  219. // from the same sender and error if we have. Otherwise, we return nil.
  220. if !txmp.cache.Push(tx) {
  221. wtx, ok := txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID)
  222. if wtx != nil && ok {
  223. // We already have the transaction stored and the we've already seen this
  224. // transaction from txInfo.SenderID.
  225. return types.ErrTxInCache
  226. }
  227. txmp.logger.Debug("tx exists already in cache", "tx_hash", tx.Hash())
  228. return nil
  229. }
  230. reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})
  231. if err != nil {
  232. txmp.cache.Remove(tx)
  233. return err
  234. }
  235. reqRes.SetCallback(func(res *abci.Response) {
  236. if txmp.recheckCursor != nil {
  237. panic("recheck cursor is non-nil in CheckTx callback")
  238. }
  239. wtx := &WrappedTx{
  240. tx: tx,
  241. hash: txHash,
  242. timestamp: time.Now().UTC(),
  243. height: txmp.height,
  244. }
  245. txmp.initTxCallback(wtx, res, txInfo)
  246. if cb != nil {
  247. cb(res)
  248. }
  249. })
  250. return nil
  251. }
  252. func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {
  253. txmp.Lock()
  254. defer txmp.Unlock()
  255. // remove the committed transaction from the transaction store and indexes
  256. if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil {
  257. txmp.removeTx(wtx, false)
  258. return nil
  259. }
  260. return errors.New("transaction not found")
  261. }
  262. // Flush empties the mempool. It acquires a read-lock, fetches all the
  263. // transactions currently in the transaction store and removes each transaction
  264. // from the store and all indexes and finally resets the cache.
  265. //
  266. // NOTE:
  267. // - Flushing the mempool may leave the mempool in an inconsistent state.
  268. func (txmp *TxMempool) Flush() {
  269. txmp.mtx.RLock()
  270. defer txmp.mtx.RUnlock()
  271. txmp.heightIndex.Reset()
  272. txmp.timestampIndex.Reset()
  273. for _, wtx := range txmp.txStore.GetAllTxs() {
  274. txmp.removeTx(wtx, false)
  275. }
  276. atomic.SwapInt64(&txmp.sizeBytes, 0)
  277. txmp.cache.Reset()
  278. }
  279. // ReapMaxBytesMaxGas returns a list of transactions within the provided size
  280. // and gas constraints. Transaction are retrieved in priority order.
  281. //
  282. // NOTE:
  283. // - Transactions returned are not removed from the mempool transaction
  284. // store or indexes.
  285. func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
  286. txmp.mtx.RLock()
  287. defer txmp.mtx.RUnlock()
  288. var (
  289. totalGas int64
  290. totalSize int64
  291. )
  292. // wTxs contains a list of *WrappedTx retrieved from the priority queue that
  293. // need to be re-enqueued prior to returning.
  294. wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs())
  295. defer func() {
  296. for _, wtx := range wTxs {
  297. txmp.priorityIndex.PushTx(wtx)
  298. }
  299. }()
  300. txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs())
  301. for txmp.priorityIndex.NumTxs() > 0 {
  302. wtx := txmp.priorityIndex.PopTx()
  303. txs = append(txs, wtx.tx)
  304. wTxs = append(wTxs, wtx)
  305. size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx})
  306. // Ensure we have capacity for the transaction with respect to the
  307. // transaction size.
  308. if maxBytes > -1 && totalSize+size > maxBytes {
  309. return txs[:len(txs)-1]
  310. }
  311. totalSize += size
  312. // ensure we have capacity for the transaction with respect to total gas
  313. gas := totalGas + wtx.gasWanted
  314. if maxGas > -1 && gas > maxGas {
  315. return txs[:len(txs)-1]
  316. }
  317. totalGas = gas
  318. }
  319. return txs
  320. }
  321. // ReapMaxTxs returns a list of transactions within the provided number of
  322. // transactions bound. Transaction are retrieved in priority order.
  323. //
  324. // NOTE:
  325. // - Transactions returned are not removed from the mempool transaction
  326. // store or indexes.
  327. func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
  328. txmp.mtx.RLock()
  329. defer txmp.mtx.RUnlock()
  330. numTxs := txmp.priorityIndex.NumTxs()
  331. if max < 0 {
  332. max = numTxs
  333. }
  334. cap := tmmath.MinInt(numTxs, max)
  335. // wTxs contains a list of *WrappedTx retrieved from the priority queue that
  336. // need to be re-enqueued prior to returning.
  337. wTxs := make([]*WrappedTx, 0, cap)
  338. txs := make([]types.Tx, 0, cap)
  339. for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max {
  340. wtx := txmp.priorityIndex.PopTx()
  341. txs = append(txs, wtx.tx)
  342. wTxs = append(wTxs, wtx)
  343. }
  344. for _, wtx := range wTxs {
  345. txmp.priorityIndex.PushTx(wtx)
  346. }
  347. return txs
  348. }
  349. // Update iterates over all the transactions provided by the block producer,
  350. // removes them from the cache (if applicable), and removes
  351. // the transactions from the main transaction store and associated indexes.
  352. // If there are transactions remaining in the mempool, we initiate a
  353. // re-CheckTx for them (if applicable), otherwise, we notify the caller more
  354. // transactions are available.
  355. //
  356. // NOTE:
  357. // - The caller must explicitly acquire a write-lock.
  358. func (txmp *TxMempool) Update(
  359. ctx context.Context,
  360. blockHeight int64,
  361. blockTxs types.Txs,
  362. deliverTxResponses []*abci.ResponseDeliverTx,
  363. newPreFn PreCheckFunc,
  364. newPostFn PostCheckFunc,
  365. ) error {
  366. txmp.height = blockHeight
  367. txmp.notifiedTxsAvailable = false
  368. if newPreFn != nil {
  369. txmp.preCheck = newPreFn
  370. }
  371. if newPostFn != nil {
  372. txmp.postCheck = newPostFn
  373. }
  374. for i, tx := range blockTxs {
  375. if deliverTxResponses[i].Code == abci.CodeTypeOK {
  376. // add the valid committed transaction to the cache (if missing)
  377. _ = txmp.cache.Push(tx)
  378. } else if !txmp.config.KeepInvalidTxsInCache {
  379. // allow invalid transactions to be re-submitted
  380. txmp.cache.Remove(tx)
  381. }
  382. // remove the committed transaction from the transaction store and indexes
  383. if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
  384. txmp.removeTx(wtx, false)
  385. }
  386. }
  387. txmp.purgeExpiredTxs(blockHeight)
  388. // If there any uncommitted transactions left in the mempool, we either
  389. // initiate re-CheckTx per remaining transaction or notify that remaining
  390. // transactions are left.
  391. if txmp.Size() > 0 {
  392. if txmp.config.Recheck {
  393. txmp.logger.Debug(
  394. "executing re-CheckTx for all remaining transactions",
  395. "num_txs", txmp.Size(),
  396. "height", blockHeight,
  397. )
  398. txmp.updateReCheckTxs(ctx)
  399. } else {
  400. txmp.notifyTxsAvailable()
  401. }
  402. }
  403. txmp.metrics.Size.Set(float64(txmp.Size()))
  404. return nil
  405. }
  406. // initTxCallback is the callback invoked for a new unique transaction after CheckTx
  407. // has been executed by the ABCI application for the first time on that transaction.
  408. // CheckTx can be called again for the same transaction later when re-checking;
  409. // however, this callback will not be called.
  410. //
  411. // initTxCallback runs after the ABCI application executes CheckTx.
  412. // It runs the postCheck hook if one is defined on the mempool.
  413. // If the CheckTx response response code is not OK, or if the postCheck hook
  414. // reports an error, the transaction is rejected. Otherwise, we attempt to insert
  415. // the transaction into the mempool.
  416. //
  417. // When inserting a transaction, we first check if there is sufficient capacity.
  418. // If there is, the transaction is added to the txStore and all indexes.
  419. // Otherwise, if the mempool is full, we attempt to find a lower priority transaction
  420. // to evict in place of the new incoming transaction. If no such transaction exists,
  421. // the new incoming transaction is rejected.
  422. //
  423. // NOTE:
  424. // - An explicit lock is NOT required.
  425. func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo TxInfo) {
  426. checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
  427. if !ok {
  428. return
  429. }
  430. var err error
  431. if txmp.postCheck != nil {
  432. err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx)
  433. }
  434. if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK {
  435. // ignore bad transactions
  436. txmp.logger.Info(
  437. "rejected bad transaction",
  438. "priority", wtx.priority,
  439. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  440. "peer_id", txInfo.SenderNodeID,
  441. "code", checkTxRes.CheckTx.Code,
  442. "post_check_err", err,
  443. )
  444. txmp.metrics.FailedTxs.Add(1)
  445. if !txmp.config.KeepInvalidTxsInCache {
  446. txmp.cache.Remove(wtx.tx)
  447. }
  448. if err != nil {
  449. checkTxRes.CheckTx.MempoolError = err.Error()
  450. }
  451. return
  452. }
  453. sender := checkTxRes.CheckTx.Sender
  454. priority := checkTxRes.CheckTx.Priority
  455. if len(sender) > 0 {
  456. if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil {
  457. txmp.logger.Error(
  458. "rejected incoming good transaction; tx already exists for sender",
  459. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  460. "sender", sender,
  461. )
  462. txmp.metrics.RejectedTxs.Add(1)
  463. return
  464. }
  465. }
  466. if err := txmp.canAddTx(wtx); err != nil {
  467. evictTxs := txmp.priorityIndex.GetEvictableTxs(
  468. priority,
  469. int64(wtx.Size()),
  470. txmp.SizeBytes(),
  471. txmp.config.MaxTxsBytes,
  472. )
  473. if len(evictTxs) == 0 {
  474. // No room for the new incoming transaction so we just remove it from
  475. // the cache.
  476. txmp.cache.Remove(wtx.tx)
  477. txmp.logger.Error(
  478. "rejected incoming good transaction; mempool full",
  479. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  480. "err", err.Error(),
  481. )
  482. txmp.metrics.RejectedTxs.Add(1)
  483. return
  484. }
  485. // evict an existing transaction(s)
  486. //
  487. // NOTE:
  488. // - The transaction, toEvict, can be removed while a concurrent
  489. // reCheckTx callback is being executed for the same transaction.
  490. for _, toEvict := range evictTxs {
  491. txmp.removeTx(toEvict, true)
  492. txmp.logger.Debug(
  493. "evicted existing good transaction; mempool full",
  494. "old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
  495. "old_priority", toEvict.priority,
  496. "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  497. "new_priority", wtx.priority,
  498. )
  499. txmp.metrics.EvictedTxs.Add(1)
  500. }
  501. }
  502. wtx.gasWanted = checkTxRes.CheckTx.GasWanted
  503. wtx.priority = priority
  504. wtx.sender = sender
  505. wtx.peers = map[uint16]struct{}{
  506. txInfo.SenderID: {},
  507. }
  508. txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
  509. txmp.metrics.Size.Set(float64(txmp.Size()))
  510. txmp.insertTx(wtx)
  511. txmp.logger.Debug(
  512. "inserted good transaction",
  513. "priority", wtx.priority,
  514. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  515. "height", txmp.height,
  516. "num_txs", txmp.Size(),
  517. )
  518. txmp.notifyTxsAvailable()
  519. }
  520. // defaultTxCallback is the CheckTx application callback used when a transaction
  521. // is being re-checked (if re-checking is enabled). The caller must hold a mempool
  522. // write-lock (via Lock()) and when executing Update(), if the mempool is non-empty
  523. // and Recheck is enabled, then all remaining transactions will be rechecked via
  524. // CheckTxAsync. The order transactions are rechecked must be the same as the
  525. // order in which this callback is called.
  526. func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) {
  527. if txmp.recheckCursor == nil {
  528. return
  529. }
  530. txmp.metrics.RecheckTimes.Add(1)
  531. checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
  532. if !ok {
  533. txmp.logger.Error("received incorrect type in mempool callback",
  534. "expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(),
  535. "got", reflect.TypeOf(res.Value).Name(),
  536. )
  537. return
  538. }
  539. tx := req.GetCheckTx().Tx
  540. wtx := txmp.recheckCursor.Value.(*WrappedTx)
  541. // Search through the remaining list of tx to recheck for a transaction that matches
  542. // the one we received from the ABCI application.
  543. for {
  544. if bytes.Equal(tx, wtx.tx) {
  545. // We've found a tx in the recheck list that matches the tx that we
  546. // received from the ABCI application.
  547. // Break, and use this transaction for further checks.
  548. break
  549. }
  550. txmp.logger.Error(
  551. "re-CheckTx transaction mismatch",
  552. "got", wtx.tx.Hash(),
  553. "expected", types.Tx(tx).Key(),
  554. )
  555. if txmp.recheckCursor == txmp.recheckEnd {
  556. // we reached the end of the recheckTx list without finding a tx
  557. // matching the one we received from the ABCI application.
  558. // Return without processing any tx.
  559. txmp.recheckCursor = nil
  560. return
  561. }
  562. txmp.recheckCursor = txmp.recheckCursor.Next()
  563. wtx = txmp.recheckCursor.Value.(*WrappedTx)
  564. }
  565. // Only evaluate transactions that have not been removed. This can happen
  566. // if an existing transaction is evicted during CheckTx and while this
  567. // callback is being executed for the same evicted transaction.
  568. if !txmp.txStore.IsTxRemoved(wtx.hash) {
  569. var err error
  570. if txmp.postCheck != nil {
  571. err = txmp.postCheck(tx, checkTxRes.CheckTx)
  572. }
  573. if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
  574. wtx.priority = checkTxRes.CheckTx.Priority
  575. } else {
  576. txmp.logger.Debug(
  577. "existing transaction no longer valid; failed re-CheckTx callback",
  578. "priority", wtx.priority,
  579. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  580. "err", err,
  581. "code", checkTxRes.CheckTx.Code,
  582. )
  583. if wtx.gossipEl != txmp.recheckCursor {
  584. panic("corrupted reCheckTx cursor")
  585. }
  586. txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
  587. }
  588. }
  589. // move reCheckTx cursor to next element
  590. if txmp.recheckCursor == txmp.recheckEnd {
  591. txmp.recheckCursor = nil
  592. } else {
  593. txmp.recheckCursor = txmp.recheckCursor.Next()
  594. }
  595. if txmp.recheckCursor == nil {
  596. txmp.logger.Debug("finished rechecking transactions")
  597. if txmp.Size() > 0 {
  598. txmp.notifyTxsAvailable()
  599. }
  600. }
  601. txmp.metrics.Size.Set(float64(txmp.Size()))
  602. }
  603. // updateReCheckTxs updates the recheck cursors using the gossipIndex. For
  604. // each transaction, it executes CheckTxAsync. The global callback defined on
  605. // the proxyAppConn will be executed for each transaction after CheckTx is
  606. // executed.
  607. //
  608. // NOTE:
  609. // - The caller must have a write-lock when executing updateReCheckTxs.
  610. func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) {
  611. if txmp.Size() == 0 {
  612. panic("attempted to update re-CheckTx txs when mempool is empty")
  613. }
  614. txmp.recheckCursor = txmp.gossipIndex.Front()
  615. txmp.recheckEnd = txmp.gossipIndex.Back()
  616. for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() {
  617. wtx := e.Value.(*WrappedTx)
  618. // Only execute CheckTx if the transaction is not marked as removed which
  619. // could happen if the transaction was evicted.
  620. if !txmp.txStore.IsTxRemoved(wtx.hash) {
  621. _, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{
  622. Tx: wtx.tx,
  623. Type: abci.CheckTxType_Recheck,
  624. })
  625. if err != nil {
  626. // no need in retrying since the tx will be rechecked after the next block
  627. txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err)
  628. }
  629. }
  630. }
  631. if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil {
  632. txmp.logger.Error("failed to flush transactions during rechecking", "err", err)
  633. }
  634. }
  635. // canAddTx returns an error if we cannot insert the provided *WrappedTx into
  636. // the mempool due to mempool configured constraints. If it returns nil,
  637. // the transaction can be inserted into the mempool.
  638. func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
  639. var (
  640. numTxs = txmp.Size()
  641. sizeBytes = txmp.SizeBytes()
  642. )
  643. if numTxs >= txmp.config.Size || int64(wtx.Size())+sizeBytes > txmp.config.MaxTxsBytes {
  644. return types.ErrMempoolIsFull{
  645. NumTxs: numTxs,
  646. MaxTxs: txmp.config.Size,
  647. TxsBytes: sizeBytes,
  648. MaxTxsBytes: txmp.config.MaxTxsBytes,
  649. }
  650. }
  651. return nil
  652. }
  653. func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
  654. txmp.txStore.SetTx(wtx)
  655. txmp.priorityIndex.PushTx(wtx)
  656. txmp.heightIndex.Insert(wtx)
  657. txmp.timestampIndex.Insert(wtx)
  658. // Insert the transaction into the gossip index and mark the reference to the
  659. // linked-list element, which will be needed at a later point when the
  660. // transaction is removed.
  661. gossipEl := txmp.gossipIndex.PushBack(wtx)
  662. wtx.gossipEl = gossipEl
  663. atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size()))
  664. }
  665. func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
  666. if txmp.txStore.IsTxRemoved(wtx.hash) {
  667. return
  668. }
  669. txmp.txStore.RemoveTx(wtx)
  670. txmp.priorityIndex.RemoveTx(wtx)
  671. txmp.heightIndex.Remove(wtx)
  672. txmp.timestampIndex.Remove(wtx)
  673. // Remove the transaction from the gossip index and cleanup the linked-list
  674. // element so it can be garbage collected.
  675. txmp.gossipIndex.Remove(wtx.gossipEl)
  676. wtx.gossipEl.DetachPrev()
  677. atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))
  678. if removeFromCache {
  679. txmp.cache.Remove(wtx.tx)
  680. }
  681. }
  682. // purgeExpiredTxs removes all transactions that have exceeded their respective
  683. // height- and/or time-based TTLs from their respective indexes. Every expired
  684. // transaction will be removed from the mempool, but preserved in the cache.
  685. //
  686. // NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which
  687. // the caller has a write-lock on the mempool and so we can safely iterate over
  688. // the height and time based indexes.
  689. func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
  690. now := time.Now()
  691. expiredTxs := make(map[types.TxKey]*WrappedTx)
  692. if txmp.config.TTLNumBlocks > 0 {
  693. purgeIdx := -1
  694. for i, wtx := range txmp.heightIndex.txs {
  695. if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks {
  696. expiredTxs[wtx.tx.Key()] = wtx
  697. purgeIdx = i
  698. } else {
  699. // since the index is sorted, we know no other txs can be be purged
  700. break
  701. }
  702. }
  703. if purgeIdx >= 0 {
  704. txmp.heightIndex.txs = txmp.heightIndex.txs[purgeIdx+1:]
  705. }
  706. }
  707. if txmp.config.TTLDuration > 0 {
  708. purgeIdx := -1
  709. for i, wtx := range txmp.timestampIndex.txs {
  710. if now.Sub(wtx.timestamp) > txmp.config.TTLDuration {
  711. expiredTxs[wtx.tx.Key()] = wtx
  712. purgeIdx = i
  713. } else {
  714. // since the index is sorted, we know no other txs can be be purged
  715. break
  716. }
  717. }
  718. if purgeIdx >= 0 {
  719. txmp.timestampIndex.txs = txmp.timestampIndex.txs[purgeIdx+1:]
  720. }
  721. }
  722. for _, wtx := range expiredTxs {
  723. txmp.removeTx(wtx, false)
  724. }
  725. }
  726. func (txmp *TxMempool) notifyTxsAvailable() {
  727. if txmp.Size() == 0 {
  728. panic("attempt to notify txs available but mempool is empty!")
  729. }
  730. if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable {
  731. // channel cap is 1, so this will send once
  732. txmp.notifiedTxsAvailable = true
  733. select {
  734. case txmp.txsAvailable <- struct{}{}:
  735. default:
  736. }
  737. }
  738. }