You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

858 lines
25 KiB

8 years ago
mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
9 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "reflect"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. abci "github.com/tendermint/tendermint/abci/types"
  12. "github.com/tendermint/tendermint/config"
  13. "github.com/tendermint/tendermint/internal/libs/clist"
  14. "github.com/tendermint/tendermint/internal/proxy"
  15. "github.com/tendermint/tendermint/libs/log"
  16. tmmath "github.com/tendermint/tendermint/libs/math"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. var _ Mempool = (*TxMempool)(nil)
  20. // TxMempoolOption sets an optional parameter on the TxMempool.
  21. type TxMempoolOption func(*TxMempool)
  22. // TxMempool defines a prioritized mempool data structure used by the v1 mempool
  23. // reactor. It keeps a thread-safe priority queue of transactions that is used
  24. // when a block proposer constructs a block and a thread-safe linked-list that
  25. // is used to gossip transactions to peers in a FIFO manner.
  26. type TxMempool struct {
  27. logger log.Logger
  28. metrics *Metrics
  29. config *config.MempoolConfig
  30. proxyAppConn proxy.AppConnMempool
  31. // txsAvailable fires once for each height when the mempool is not empty
  32. txsAvailable chan struct{}
  33. notifiedTxsAvailable bool
  34. // height defines the last block height process during Update()
  35. height int64
  36. // sizeBytes defines the total size of the mempool (sum of all tx bytes)
  37. sizeBytes int64
  38. // cache defines a fixed-size cache of already seen transactions as this
  39. // reduces pressure on the proxyApp.
  40. cache TxCache
  41. // txStore defines the main storage of valid transactions. Indexes are built
  42. // on top of this store.
  43. txStore *TxStore
  44. // gossipIndex defines the gossiping index of valid transactions via a
  45. // thread-safe linked-list. We also use the gossip index as a cursor for
  46. // rechecking transactions already in the mempool.
  47. gossipIndex *clist.CList
  48. // recheckCursor and recheckEnd are used as cursors based on the gossip index
  49. // to recheck transactions that are already in the mempool. Iteration is not
  50. // thread-safe and transaction may be mutated in serial order.
  51. //
  52. // XXX/TODO: It might be somewhat of a codesmell to use the gossip index for
  53. // iterator and cursor management when rechecking transactions. If the gossip
  54. // index changes or is removed in a future refactor, this will have to be
  55. // refactored. Instead, we should consider just keeping a slice of a snapshot
  56. // of the mempool's current transactions during Update and an integer cursor
  57. // into that slice. This, however, requires additional O(n) space complexity.
  58. recheckCursor *clist.CElement // next expected response
  59. recheckEnd *clist.CElement // re-checking stops here
  60. // priorityIndex defines the priority index of valid transactions via a
  61. // thread-safe priority queue.
  62. priorityIndex *TxPriorityQueue
  63. // heightIndex defines a height-based, in ascending order, transaction index.
  64. // i.e. older transactions are first.
  65. heightIndex *WrappedTxList
  66. // timestampIndex defines a timestamp-based, in ascending order, transaction
  67. // index. i.e. older transactions are first.
  68. timestampIndex *WrappedTxList
  69. // A read/write lock is used to safe guard updates, insertions and deletions
  70. // from the mempool. A read-lock is implicitly acquired when executing CheckTx,
  71. // however, a caller must explicitly grab a write-lock via Lock when updating
  72. // the mempool via Update().
  73. mtx sync.RWMutex
  74. preCheck PreCheckFunc
  75. postCheck PostCheckFunc
  76. }
  77. func NewTxMempool(
  78. logger log.Logger,
  79. cfg *config.MempoolConfig,
  80. proxyAppConn proxy.AppConnMempool,
  81. height int64,
  82. options ...TxMempoolOption,
  83. ) *TxMempool {
  84. txmp := &TxMempool{
  85. logger: logger,
  86. config: cfg,
  87. proxyAppConn: proxyAppConn,
  88. height: height,
  89. cache: NopTxCache{},
  90. metrics: NopMetrics(),
  91. txStore: NewTxStore(),
  92. gossipIndex: clist.New(),
  93. priorityIndex: NewTxPriorityQueue(),
  94. heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
  95. return wtx1.height >= wtx2.height
  96. }),
  97. timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
  98. return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
  99. }),
  100. }
  101. if cfg.CacheSize > 0 {
  102. txmp.cache = NewLRUTxCache(cfg.CacheSize)
  103. }
  104. proxyAppConn.SetResponseCallback(txmp.defaultTxCallback)
  105. for _, opt := range options {
  106. opt(txmp)
  107. }
  108. return txmp
  109. }
  110. // WithPreCheck sets a filter for the mempool to reject a transaction if f(tx)
  111. // returns an error. This is executed before CheckTx. It only applies to the
  112. // first created block. After that, Update() overwrites the existing value.
  113. func WithPreCheck(f PreCheckFunc) TxMempoolOption {
  114. return func(txmp *TxMempool) { txmp.preCheck = f }
  115. }
  116. // WithPostCheck sets a filter for the mempool to reject a transaction if
  117. // f(tx, resp) returns an error. This is executed after CheckTx. It only applies
  118. // to the first created block. After that, Update overwrites the existing value.
  119. func WithPostCheck(f PostCheckFunc) TxMempoolOption {
  120. return func(txmp *TxMempool) { txmp.postCheck = f }
  121. }
  122. // WithMetrics sets the mempool's metrics collector.
  123. func WithMetrics(metrics *Metrics) TxMempoolOption {
  124. return func(txmp *TxMempool) { txmp.metrics = metrics }
  125. }
  126. // Lock obtains a write-lock on the mempool. A caller must be sure to explicitly
  127. // release the lock when finished.
  128. func (txmp *TxMempool) Lock() {
  129. txmp.mtx.Lock()
  130. }
  131. // Unlock releases a write-lock on the mempool.
  132. func (txmp *TxMempool) Unlock() {
  133. txmp.mtx.Unlock()
  134. }
  135. // Size returns the number of valid transactions in the mempool. It is
  136. // thread-safe.
  137. func (txmp *TxMempool) Size() int {
  138. return txmp.txStore.Size()
  139. }
  140. // SizeBytes return the total sum in bytes of all the valid transactions in the
  141. // mempool. It is thread-safe.
  142. func (txmp *TxMempool) SizeBytes() int64 {
  143. return atomic.LoadInt64(&txmp.sizeBytes)
  144. }
  145. // FlushAppConn executes FlushSync on the mempool's proxyAppConn.
  146. //
  147. // NOTE: The caller must obtain a write-lock prior to execution.
  148. func (txmp *TxMempool) FlushAppConn(ctx context.Context) error {
  149. return txmp.proxyAppConn.Flush(ctx)
  150. }
  151. // WaitForNextTx returns a blocking channel that will be closed when the next
  152. // valid transaction is available to gossip. It is thread-safe.
  153. func (txmp *TxMempool) WaitForNextTx() <-chan struct{} {
  154. return txmp.gossipIndex.WaitChan()
  155. }
  156. // NextGossipTx returns the next valid transaction to gossip. A caller must wait
  157. // for WaitForNextTx to signal a transaction is available to gossip first. It is
  158. // thread-safe.
  159. func (txmp *TxMempool) NextGossipTx() *clist.CElement {
  160. return txmp.gossipIndex.Front()
  161. }
  162. // EnableTxsAvailable enables the mempool to trigger events when transactions
  163. // are available on a block by block basis.
  164. func (txmp *TxMempool) EnableTxsAvailable() {
  165. txmp.mtx.Lock()
  166. defer txmp.mtx.Unlock()
  167. txmp.txsAvailable = make(chan struct{}, 1)
  168. }
  169. // TxsAvailable returns a channel which fires once for every height, and only
  170. // when transactions are available in the mempool. It is thread-safe.
  171. func (txmp *TxMempool) TxsAvailable() <-chan struct{} {
  172. return txmp.txsAvailable
  173. }
  174. // CheckTx executes the ABCI CheckTx method for a given transaction. It acquires
  175. // a read-lock attempts to execute the application's CheckTx ABCI method via
  176. // CheckTxAsync. We return an error if any of the following happen:
  177. //
  178. // - The CheckTxAsync execution fails.
  179. // - The transaction already exists in the cache and we've already received the
  180. // transaction from the peer. Otherwise, if it solely exists in the cache, we
  181. // return nil.
  182. // - The transaction size exceeds the maximum transaction size as defined by the
  183. // configuration provided to the mempool.
  184. // - The transaction fails Pre-Check (if it is defined).
  185. // - The proxyAppConn fails, e.g. the buffer is full.
  186. //
  187. // If the mempool is full, we still execute CheckTx and attempt to find a lower
  188. // priority transaction to evict. If such a transaction exists, we remove the
  189. // lower priority transaction and add the new one with higher priority.
  190. //
  191. // NOTE:
  192. // - The applications' CheckTx implementation may panic.
  193. // - The caller is not to explicitly require any locks for executing CheckTx.
  194. func (txmp *TxMempool) CheckTx(
  195. ctx context.Context,
  196. tx types.Tx,
  197. cb func(*abci.Response),
  198. txInfo TxInfo,
  199. ) error {
  200. txmp.mtx.RLock()
  201. defer txmp.mtx.RUnlock()
  202. if txSize := len(tx); txSize > txmp.config.MaxTxBytes {
  203. return types.ErrTxTooLarge{
  204. Max: txmp.config.MaxTxBytes,
  205. Actual: txSize,
  206. }
  207. }
  208. if txmp.preCheck != nil {
  209. if err := txmp.preCheck(tx); err != nil {
  210. return types.ErrPreCheck{Reason: err}
  211. }
  212. }
  213. if err := txmp.proxyAppConn.Error(); err != nil {
  214. return err
  215. }
  216. txHash := tx.Key()
  217. // We add the transaction to the mempool's cache and if the
  218. // transaction is already present in the cache, i.e. false is returned, then we
  219. // check if we've seen this transaction and error if we have.
  220. if !txmp.cache.Push(tx) {
  221. txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID)
  222. return types.ErrTxInCache
  223. }
  224. reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})
  225. if err != nil {
  226. txmp.cache.Remove(tx)
  227. return err
  228. }
  229. reqRes.SetCallback(func(res *abci.Response) {
  230. if txmp.recheckCursor != nil {
  231. panic("recheck cursor is non-nil in CheckTx callback")
  232. }
  233. wtx := &WrappedTx{
  234. tx: tx,
  235. hash: txHash,
  236. timestamp: time.Now().UTC(),
  237. height: txmp.height,
  238. }
  239. txmp.initTxCallback(wtx, res, txInfo)
  240. if cb != nil {
  241. cb(res)
  242. }
  243. })
  244. return nil
  245. }
  246. func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {
  247. txmp.Lock()
  248. defer txmp.Unlock()
  249. // remove the committed transaction from the transaction store and indexes
  250. if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil {
  251. txmp.removeTx(wtx, false)
  252. return nil
  253. }
  254. return errors.New("transaction not found")
  255. }
  256. // Flush empties the mempool. It acquires a read-lock, fetches all the
  257. // transactions currently in the transaction store and removes each transaction
  258. // from the store and all indexes and finally resets the cache.
  259. //
  260. // NOTE:
  261. // - Flushing the mempool may leave the mempool in an inconsistent state.
  262. func (txmp *TxMempool) Flush() {
  263. txmp.mtx.RLock()
  264. defer txmp.mtx.RUnlock()
  265. txmp.heightIndex.Reset()
  266. txmp.timestampIndex.Reset()
  267. for _, wtx := range txmp.txStore.GetAllTxs() {
  268. txmp.removeTx(wtx, false)
  269. }
  270. atomic.SwapInt64(&txmp.sizeBytes, 0)
  271. txmp.cache.Reset()
  272. }
  273. // ReapMaxBytesMaxGas returns a list of transactions within the provided size
  274. // and gas constraints. Transaction are retrieved in priority order.
  275. //
  276. // NOTE:
  277. // - Transactions returned are not removed from the mempool transaction
  278. // store or indexes.
  279. func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
  280. txmp.mtx.RLock()
  281. defer txmp.mtx.RUnlock()
  282. var (
  283. totalGas int64
  284. totalSize int64
  285. )
  286. // wTxs contains a list of *WrappedTx retrieved from the priority queue that
  287. // need to be re-enqueued prior to returning.
  288. wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs())
  289. defer func() {
  290. for _, wtx := range wTxs {
  291. txmp.priorityIndex.PushTx(wtx)
  292. }
  293. }()
  294. txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs())
  295. for txmp.priorityIndex.NumTxs() > 0 {
  296. wtx := txmp.priorityIndex.PopTx()
  297. txs = append(txs, wtx.tx)
  298. wTxs = append(wTxs, wtx)
  299. size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx})
  300. // Ensure we have capacity for the transaction with respect to the
  301. // transaction size.
  302. if maxBytes > -1 && totalSize+size > maxBytes {
  303. return txs[:len(txs)-1]
  304. }
  305. totalSize += size
  306. // ensure we have capacity for the transaction with respect to total gas
  307. gas := totalGas + wtx.gasWanted
  308. if maxGas > -1 && gas > maxGas {
  309. return txs[:len(txs)-1]
  310. }
  311. totalGas = gas
  312. }
  313. return txs
  314. }
  315. // ReapMaxTxs returns a list of transactions within the provided number of
  316. // transactions bound. Transaction are retrieved in priority order.
  317. //
  318. // NOTE:
  319. // - Transactions returned are not removed from the mempool transaction
  320. // store or indexes.
  321. func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
  322. txmp.mtx.RLock()
  323. defer txmp.mtx.RUnlock()
  324. numTxs := txmp.priorityIndex.NumTxs()
  325. if max < 0 {
  326. max = numTxs
  327. }
  328. cap := tmmath.MinInt(numTxs, max)
  329. // wTxs contains a list of *WrappedTx retrieved from the priority queue that
  330. // need to be re-enqueued prior to returning.
  331. wTxs := make([]*WrappedTx, 0, cap)
  332. txs := make([]types.Tx, 0, cap)
  333. for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max {
  334. wtx := txmp.priorityIndex.PopTx()
  335. txs = append(txs, wtx.tx)
  336. wTxs = append(wTxs, wtx)
  337. }
  338. for _, wtx := range wTxs {
  339. txmp.priorityIndex.PushTx(wtx)
  340. }
  341. return txs
  342. }
  343. // Update iterates over all the transactions provided by the block producer,
  344. // removes them from the cache (if applicable), and removes
  345. // the transactions from the main transaction store and associated indexes.
  346. // If there are transactions remaining in the mempool, we initiate a
  347. // re-CheckTx for them (if applicable), otherwise, we notify the caller more
  348. // transactions are available.
  349. //
  350. // NOTE:
  351. // - The caller must explicitly acquire a write-lock.
  352. func (txmp *TxMempool) Update(
  353. ctx context.Context,
  354. blockHeight int64,
  355. blockTxs types.Txs,
  356. deliverTxResponses []*abci.ResponseDeliverTx,
  357. newPreFn PreCheckFunc,
  358. newPostFn PostCheckFunc,
  359. ) error {
  360. txmp.height = blockHeight
  361. txmp.notifiedTxsAvailable = false
  362. if newPreFn != nil {
  363. txmp.preCheck = newPreFn
  364. }
  365. if newPostFn != nil {
  366. txmp.postCheck = newPostFn
  367. }
  368. for i, tx := range blockTxs {
  369. if deliverTxResponses[i].Code == abci.CodeTypeOK {
  370. // add the valid committed transaction to the cache (if missing)
  371. _ = txmp.cache.Push(tx)
  372. } else if !txmp.config.KeepInvalidTxsInCache {
  373. // allow invalid transactions to be re-submitted
  374. txmp.cache.Remove(tx)
  375. }
  376. // remove the committed transaction from the transaction store and indexes
  377. if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
  378. txmp.removeTx(wtx, false)
  379. }
  380. }
  381. txmp.purgeExpiredTxs(blockHeight)
  382. // If there any uncommitted transactions left in the mempool, we either
  383. // initiate re-CheckTx per remaining transaction or notify that remaining
  384. // transactions are left.
  385. if txmp.Size() > 0 {
  386. if txmp.config.Recheck {
  387. txmp.logger.Debug(
  388. "executing re-CheckTx for all remaining transactions",
  389. "num_txs", txmp.Size(),
  390. "height", blockHeight,
  391. )
  392. txmp.updateReCheckTxs(ctx)
  393. } else {
  394. txmp.notifyTxsAvailable()
  395. }
  396. }
  397. txmp.metrics.Size.Set(float64(txmp.Size()))
  398. return nil
  399. }
  400. // initTxCallback is the callback invoked for a new unique transaction after CheckTx
  401. // has been executed by the ABCI application for the first time on that transaction.
  402. // CheckTx can be called again for the same transaction later when re-checking;
  403. // however, this callback will not be called.
  404. //
  405. // initTxCallback runs after the ABCI application executes CheckTx.
  406. // It runs the postCheck hook if one is defined on the mempool.
  407. // If the CheckTx response response code is not OK, or if the postCheck hook
  408. // reports an error, the transaction is rejected. Otherwise, we attempt to insert
  409. // the transaction into the mempool.
  410. //
  411. // When inserting a transaction, we first check if there is sufficient capacity.
  412. // If there is, the transaction is added to the txStore and all indexes.
  413. // Otherwise, if the mempool is full, we attempt to find a lower priority transaction
  414. // to evict in place of the new incoming transaction. If no such transaction exists,
  415. // the new incoming transaction is rejected.
  416. //
  417. // NOTE:
  418. // - An explicit lock is NOT required.
  419. func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo TxInfo) {
  420. checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
  421. if !ok {
  422. return
  423. }
  424. var err error
  425. if txmp.postCheck != nil {
  426. err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx)
  427. }
  428. if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK {
  429. // ignore bad transactions
  430. txmp.logger.Info(
  431. "rejected bad transaction",
  432. "priority", wtx.priority,
  433. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  434. "peer_id", txInfo.SenderNodeID,
  435. "code", checkTxRes.CheckTx.Code,
  436. "post_check_err", err,
  437. )
  438. txmp.metrics.FailedTxs.Add(1)
  439. if !txmp.config.KeepInvalidTxsInCache {
  440. txmp.cache.Remove(wtx.tx)
  441. }
  442. if err != nil {
  443. checkTxRes.CheckTx.MempoolError = err.Error()
  444. }
  445. return
  446. }
  447. sender := checkTxRes.CheckTx.Sender
  448. priority := checkTxRes.CheckTx.Priority
  449. if len(sender) > 0 {
  450. if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil {
  451. txmp.logger.Error(
  452. "rejected incoming good transaction; tx already exists for sender",
  453. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  454. "sender", sender,
  455. )
  456. txmp.metrics.RejectedTxs.Add(1)
  457. return
  458. }
  459. }
  460. if err := txmp.canAddTx(wtx); err != nil {
  461. evictTxs := txmp.priorityIndex.GetEvictableTxs(
  462. priority,
  463. int64(wtx.Size()),
  464. txmp.SizeBytes(),
  465. txmp.config.MaxTxsBytes,
  466. )
  467. if len(evictTxs) == 0 {
  468. // No room for the new incoming transaction so we just remove it from
  469. // the cache.
  470. txmp.cache.Remove(wtx.tx)
  471. txmp.logger.Error(
  472. "rejected incoming good transaction; mempool full",
  473. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  474. "err", err.Error(),
  475. )
  476. txmp.metrics.RejectedTxs.Add(1)
  477. return
  478. }
  479. // evict an existing transaction(s)
  480. //
  481. // NOTE:
  482. // - The transaction, toEvict, can be removed while a concurrent
  483. // reCheckTx callback is being executed for the same transaction.
  484. for _, toEvict := range evictTxs {
  485. txmp.removeTx(toEvict, true)
  486. txmp.logger.Debug(
  487. "evicted existing good transaction; mempool full",
  488. "old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
  489. "old_priority", toEvict.priority,
  490. "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  491. "new_priority", wtx.priority,
  492. )
  493. txmp.metrics.EvictedTxs.Add(1)
  494. }
  495. }
  496. wtx.gasWanted = checkTxRes.CheckTx.GasWanted
  497. wtx.priority = priority
  498. wtx.sender = sender
  499. wtx.peers = map[uint16]struct{}{
  500. txInfo.SenderID: {},
  501. }
  502. txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
  503. txmp.metrics.Size.Set(float64(txmp.Size()))
  504. txmp.insertTx(wtx)
  505. txmp.logger.Debug(
  506. "inserted good transaction",
  507. "priority", wtx.priority,
  508. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  509. "height", txmp.height,
  510. "num_txs", txmp.Size(),
  511. )
  512. txmp.notifyTxsAvailable()
  513. }
  514. // defaultTxCallback is the CheckTx application callback used when a transaction
  515. // is being re-checked (if re-checking is enabled). The caller must hold a mempool
  516. // write-lock (via Lock()) and when executing Update(), if the mempool is non-empty
  517. // and Recheck is enabled, then all remaining transactions will be rechecked via
  518. // CheckTxAsync. The order transactions are rechecked must be the same as the
  519. // order in which this callback is called.
  520. func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) {
  521. if txmp.recheckCursor == nil {
  522. return
  523. }
  524. txmp.metrics.RecheckTimes.Add(1)
  525. checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
  526. if !ok {
  527. txmp.logger.Error("received incorrect type in mempool callback",
  528. "expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(),
  529. "got", reflect.TypeOf(res.Value).Name(),
  530. )
  531. return
  532. }
  533. tx := req.GetCheckTx().Tx
  534. wtx := txmp.recheckCursor.Value.(*WrappedTx)
  535. // Search through the remaining list of tx to recheck for a transaction that matches
  536. // the one we received from the ABCI application.
  537. for {
  538. if bytes.Equal(tx, wtx.tx) {
  539. // We've found a tx in the recheck list that matches the tx that we
  540. // received from the ABCI application.
  541. // Break, and use this transaction for further checks.
  542. break
  543. }
  544. txmp.logger.Error(
  545. "re-CheckTx transaction mismatch",
  546. "got", wtx.tx.Hash(),
  547. "expected", types.Tx(tx).Key(),
  548. )
  549. if txmp.recheckCursor == txmp.recheckEnd {
  550. // we reached the end of the recheckTx list without finding a tx
  551. // matching the one we received from the ABCI application.
  552. // Return without processing any tx.
  553. txmp.recheckCursor = nil
  554. return
  555. }
  556. txmp.recheckCursor = txmp.recheckCursor.Next()
  557. wtx = txmp.recheckCursor.Value.(*WrappedTx)
  558. }
  559. // Only evaluate transactions that have not been removed. This can happen
  560. // if an existing transaction is evicted during CheckTx and while this
  561. // callback is being executed for the same evicted transaction.
  562. if !txmp.txStore.IsTxRemoved(wtx.hash) {
  563. var err error
  564. if txmp.postCheck != nil {
  565. err = txmp.postCheck(tx, checkTxRes.CheckTx)
  566. }
  567. if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
  568. wtx.priority = checkTxRes.CheckTx.Priority
  569. } else {
  570. txmp.logger.Debug(
  571. "existing transaction no longer valid; failed re-CheckTx callback",
  572. "priority", wtx.priority,
  573. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  574. "err", err,
  575. "code", checkTxRes.CheckTx.Code,
  576. )
  577. if wtx.gossipEl != txmp.recheckCursor {
  578. panic("corrupted reCheckTx cursor")
  579. }
  580. txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
  581. }
  582. }
  583. // move reCheckTx cursor to next element
  584. if txmp.recheckCursor == txmp.recheckEnd {
  585. txmp.recheckCursor = nil
  586. } else {
  587. txmp.recheckCursor = txmp.recheckCursor.Next()
  588. }
  589. if txmp.recheckCursor == nil {
  590. txmp.logger.Debug("finished rechecking transactions")
  591. if txmp.Size() > 0 {
  592. txmp.notifyTxsAvailable()
  593. }
  594. }
  595. txmp.metrics.Size.Set(float64(txmp.Size()))
  596. }
  597. // updateReCheckTxs updates the recheck cursors using the gossipIndex. For
  598. // each transaction, it executes CheckTxAsync. The global callback defined on
  599. // the proxyAppConn will be executed for each transaction after CheckTx is
  600. // executed.
  601. //
  602. // NOTE:
  603. // - The caller must have a write-lock when executing updateReCheckTxs.
  604. func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) {
  605. if txmp.Size() == 0 {
  606. panic("attempted to update re-CheckTx txs when mempool is empty")
  607. }
  608. txmp.recheckCursor = txmp.gossipIndex.Front()
  609. txmp.recheckEnd = txmp.gossipIndex.Back()
  610. for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() {
  611. wtx := e.Value.(*WrappedTx)
  612. // Only execute CheckTx if the transaction is not marked as removed which
  613. // could happen if the transaction was evicted.
  614. if !txmp.txStore.IsTxRemoved(wtx.hash) {
  615. _, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{
  616. Tx: wtx.tx,
  617. Type: abci.CheckTxType_Recheck,
  618. })
  619. if err != nil {
  620. // no need in retrying since the tx will be rechecked after the next block
  621. txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err)
  622. }
  623. }
  624. }
  625. if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil {
  626. txmp.logger.Error("failed to flush transactions during rechecking", "err", err)
  627. }
  628. }
  629. // canAddTx returns an error if we cannot insert the provided *WrappedTx into
  630. // the mempool due to mempool configured constraints. If it returns nil,
  631. // the transaction can be inserted into the mempool.
  632. func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
  633. var (
  634. numTxs = txmp.Size()
  635. sizeBytes = txmp.SizeBytes()
  636. )
  637. if numTxs >= txmp.config.Size || int64(wtx.Size())+sizeBytes > txmp.config.MaxTxsBytes {
  638. return types.ErrMempoolIsFull{
  639. NumTxs: numTxs,
  640. MaxTxs: txmp.config.Size,
  641. TxsBytes: sizeBytes,
  642. MaxTxsBytes: txmp.config.MaxTxsBytes,
  643. }
  644. }
  645. return nil
  646. }
  647. func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
  648. txmp.txStore.SetTx(wtx)
  649. txmp.priorityIndex.PushTx(wtx)
  650. txmp.heightIndex.Insert(wtx)
  651. txmp.timestampIndex.Insert(wtx)
  652. // Insert the transaction into the gossip index and mark the reference to the
  653. // linked-list element, which will be needed at a later point when the
  654. // transaction is removed.
  655. gossipEl := txmp.gossipIndex.PushBack(wtx)
  656. wtx.gossipEl = gossipEl
  657. atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size()))
  658. }
  659. func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
  660. if txmp.txStore.IsTxRemoved(wtx.hash) {
  661. return
  662. }
  663. txmp.txStore.RemoveTx(wtx)
  664. txmp.priorityIndex.RemoveTx(wtx)
  665. txmp.heightIndex.Remove(wtx)
  666. txmp.timestampIndex.Remove(wtx)
  667. // Remove the transaction from the gossip index and cleanup the linked-list
  668. // element so it can be garbage collected.
  669. txmp.gossipIndex.Remove(wtx.gossipEl)
  670. wtx.gossipEl.DetachPrev()
  671. atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))
  672. if removeFromCache {
  673. txmp.cache.Remove(wtx.tx)
  674. }
  675. }
  676. // purgeExpiredTxs removes all transactions that have exceeded their respective
  677. // height- and/or time-based TTLs from their respective indexes. Every expired
  678. // transaction will be removed from the mempool, but preserved in the cache.
  679. //
  680. // NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which
  681. // the caller has a write-lock on the mempool and so we can safely iterate over
  682. // the height and time based indexes.
  683. func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
  684. now := time.Now()
  685. expiredTxs := make(map[types.TxKey]*WrappedTx)
  686. if txmp.config.TTLNumBlocks > 0 {
  687. purgeIdx := -1
  688. for i, wtx := range txmp.heightIndex.txs {
  689. if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks {
  690. expiredTxs[wtx.tx.Key()] = wtx
  691. purgeIdx = i
  692. } else {
  693. // since the index is sorted, we know no other txs can be be purged
  694. break
  695. }
  696. }
  697. if purgeIdx >= 0 {
  698. txmp.heightIndex.txs = txmp.heightIndex.txs[purgeIdx+1:]
  699. }
  700. }
  701. if txmp.config.TTLDuration > 0 {
  702. purgeIdx := -1
  703. for i, wtx := range txmp.timestampIndex.txs {
  704. if now.Sub(wtx.timestamp) > txmp.config.TTLDuration {
  705. expiredTxs[wtx.tx.Key()] = wtx
  706. purgeIdx = i
  707. } else {
  708. // since the index is sorted, we know no other txs can be be purged
  709. break
  710. }
  711. }
  712. if purgeIdx >= 0 {
  713. txmp.timestampIndex.txs = txmp.timestampIndex.txs[purgeIdx+1:]
  714. }
  715. }
  716. for _, wtx := range expiredTxs {
  717. txmp.removeTx(wtx, false)
  718. }
  719. }
  720. func (txmp *TxMempool) notifyTxsAvailable() {
  721. if txmp.Size() == 0 {
  722. panic("attempt to notify txs available but mempool is empty!")
  723. }
  724. if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable {
  725. // channel cap is 1, so this will send once
  726. txmp.notifiedTxsAvailable = true
  727. select {
  728. case txmp.txsAvailable <- struct{}{}:
  729. default:
  730. }
  731. }
  732. }