You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

844 lines
25 KiB

7 years ago
mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
5 years ago
8 years ago
abci: Synchronize FinalizeBlock with the updated specification (#7983) This change set implements the most recent version of `FinalizeBlock`. # What does this change actually contain? * This change set is rather large but fear not! The majority of the files touched and changes are renaming `ResponseDeliverTx` to `ExecTxResult`. This should be a pretty inoffensive change since they're effectively the same type but with a different name. * The `execBlockOnProxyApp` was totally removed since it served as just a wrapper around the logic that is now mostly encapsulated within `FinalizeBlock` * The `updateState` helper function has been made a public method on `State`. It was being exposed as a shim through the testing infrastructure, so this seemed innocuous. * Tests already existed to ensure that the application received the `ByzantineValidators` and the `ValidatorUpdates`, but one was fixed up to ensure that `LastCommitInfo` was being sent across. * Tests were removed from the `psql` indexer that seemed to search for an event in the indexer that was not being created. # Questions for reviewers * We store this [ABCIResponses](https://github.com/tendermint/tendermint/blob/5721a13ab1f4479f9807f449f0bf5c536b9a05f2/proto/tendermint/state/types.pb.go#L37) type in the data base as the block results. This type has changed since v0.35 to contain the `FinalizeBlock` response. I'm wondering if we need to do any shimming to keep the old data retrieveable? * Similarly, this change is exposed via the RPC through [ResultBlockResults](https://github.com/tendermint/tendermint/blob/5721a13ab1f4479f9807f449f0bf5c536b9a05f2/rpc/coretypes/responses.go#L69) changing. Should we somehow shim or notify for this change? closes: #7658
2 years ago
abci: Synchronize FinalizeBlock with the updated specification (#7983) This change set implements the most recent version of `FinalizeBlock`. # What does this change actually contain? * This change set is rather large but fear not! The majority of the files touched and changes are renaming `ResponseDeliverTx` to `ExecTxResult`. This should be a pretty inoffensive change since they're effectively the same type but with a different name. * The `execBlockOnProxyApp` was totally removed since it served as just a wrapper around the logic that is now mostly encapsulated within `FinalizeBlock` * The `updateState` helper function has been made a public method on `State`. It was being exposed as a shim through the testing infrastructure, so this seemed innocuous. * Tests already existed to ensure that the application received the `ByzantineValidators` and the `ValidatorUpdates`, but one was fixed up to ensure that `LastCommitInfo` was being sent across. * Tests were removed from the `psql` indexer that seemed to search for an event in the indexer that was not being created. # Questions for reviewers * We store this [ABCIResponses](https://github.com/tendermint/tendermint/blob/5721a13ab1f4479f9807f449f0bf5c536b9a05f2/proto/tendermint/state/types.pb.go#L37) type in the data base as the block results. This type has changed since v0.35 to contain the `FinalizeBlock` response. I'm wondering if we need to do any shimming to keep the old data retrieveable? * Similarly, this change is exposed via the RPC through [ResultBlockResults](https://github.com/tendermint/tendermint/blob/5721a13ab1f4479f9807f449f0bf5c536b9a05f2/rpc/coretypes/responses.go#L69) changing. Should we somehow shim or notify for this change? closes: #7658
2 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. abci "github.com/tendermint/tendermint/abci/types"
  11. "github.com/tendermint/tendermint/config"
  12. "github.com/tendermint/tendermint/internal/libs/clist"
  13. "github.com/tendermint/tendermint/internal/proxy"
  14. "github.com/tendermint/tendermint/libs/log"
  15. tmmath "github.com/tendermint/tendermint/libs/math"
  16. "github.com/tendermint/tendermint/types"
  17. )
  18. var _ Mempool = (*TxMempool)(nil)
  19. // TxMempoolOption sets an optional parameter on the TxMempool.
  20. type TxMempoolOption func(*TxMempool)
  21. // TxMempool defines a prioritized mempool data structure used by the v1 mempool
  22. // reactor. It keeps a thread-safe priority queue of transactions that is used
  23. // when a block proposer constructs a block and a thread-safe linked-list that
  24. // is used to gossip transactions to peers in a FIFO manner.
  25. type TxMempool struct {
  26. logger log.Logger
  27. metrics *Metrics
  28. config *config.MempoolConfig
  29. proxyAppConn proxy.AppConnMempool
  30. // txsAvailable fires once for each height when the mempool is not empty
  31. txsAvailable chan struct{}
  32. notifiedTxsAvailable bool
  33. // height defines the last block height process during Update()
  34. height int64
  35. // sizeBytes defines the total size of the mempool (sum of all tx bytes)
  36. sizeBytes int64
  37. // cache defines a fixed-size cache of already seen transactions as this
  38. // reduces pressure on the proxyApp.
  39. cache TxCache
  40. // txStore defines the main storage of valid transactions. Indexes are built
  41. // on top of this store.
  42. txStore *TxStore
  43. // gossipIndex defines the gossiping index of valid transactions via a
  44. // thread-safe linked-list. We also use the gossip index as a cursor for
  45. // rechecking transactions already in the mempool.
  46. gossipIndex *clist.CList
  47. // recheckCursor and recheckEnd are used as cursors based on the gossip index
  48. // to recheck transactions that are already in the mempool. Iteration is not
  49. // thread-safe and transaction may be mutated in serial order.
  50. //
  51. // XXX/TODO: It might be somewhat of a codesmell to use the gossip index for
  52. // iterator and cursor management when rechecking transactions. If the gossip
  53. // index changes or is removed in a future refactor, this will have to be
  54. // refactored. Instead, we should consider just keeping a slice of a snapshot
  55. // of the mempool's current transactions during Update and an integer cursor
  56. // into that slice. This, however, requires additional O(n) space complexity.
  57. recheckCursor *clist.CElement // next expected response
  58. recheckEnd *clist.CElement // re-checking stops here
  59. // priorityIndex defines the priority index of valid transactions via a
  60. // thread-safe priority queue.
  61. priorityIndex *TxPriorityQueue
  62. // heightIndex defines a height-based, in ascending order, transaction index.
  63. // i.e. older transactions are first.
  64. heightIndex *WrappedTxList
  65. // timestampIndex defines a timestamp-based, in ascending order, transaction
  66. // index. i.e. older transactions are first.
  67. timestampIndex *WrappedTxList
  68. // A read/write lock is used to safe guard updates, insertions and deletions
  69. // from the mempool. A read-lock is implicitly acquired when executing CheckTx,
  70. // however, a caller must explicitly grab a write-lock via Lock when updating
  71. // the mempool via Update().
  72. mtx sync.RWMutex
  73. preCheck PreCheckFunc
  74. postCheck PostCheckFunc
  75. }
  76. func NewTxMempool(
  77. logger log.Logger,
  78. cfg *config.MempoolConfig,
  79. proxyAppConn proxy.AppConnMempool,
  80. options ...TxMempoolOption,
  81. ) *TxMempool {
  82. txmp := &TxMempool{
  83. logger: logger,
  84. config: cfg,
  85. proxyAppConn: proxyAppConn,
  86. height: -1,
  87. cache: NopTxCache{},
  88. metrics: NopMetrics(),
  89. txStore: NewTxStore(),
  90. gossipIndex: clist.New(),
  91. priorityIndex: NewTxPriorityQueue(),
  92. heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
  93. return wtx1.height >= wtx2.height
  94. }),
  95. timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
  96. return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
  97. }),
  98. }
  99. if cfg.CacheSize > 0 {
  100. txmp.cache = NewLRUTxCache(cfg.CacheSize)
  101. }
  102. for _, opt := range options {
  103. opt(txmp)
  104. }
  105. return txmp
  106. }
  107. // WithPreCheck sets a filter for the mempool to reject a transaction if f(tx)
  108. // returns an error. This is executed before CheckTx. It only applies to the
  109. // first created block. After that, Update() overwrites the existing value.
  110. func WithPreCheck(f PreCheckFunc) TxMempoolOption {
  111. return func(txmp *TxMempool) { txmp.preCheck = f }
  112. }
  113. // WithPostCheck sets a filter for the mempool to reject a transaction if
  114. // f(tx, resp) returns an error. This is executed after CheckTx. It only applies
  115. // to the first created block. After that, Update overwrites the existing value.
  116. func WithPostCheck(f PostCheckFunc) TxMempoolOption {
  117. return func(txmp *TxMempool) { txmp.postCheck = f }
  118. }
  119. // WithMetrics sets the mempool's metrics collector.
  120. func WithMetrics(metrics *Metrics) TxMempoolOption {
  121. return func(txmp *TxMempool) { txmp.metrics = metrics }
  122. }
  123. // Lock obtains a write-lock on the mempool. A caller must be sure to explicitly
  124. // release the lock when finished.
  125. func (txmp *TxMempool) Lock() {
  126. txmp.mtx.Lock()
  127. }
  128. // Unlock releases a write-lock on the mempool.
  129. func (txmp *TxMempool) Unlock() {
  130. txmp.mtx.Unlock()
  131. }
  132. // Size returns the number of valid transactions in the mempool. It is
  133. // thread-safe.
  134. func (txmp *TxMempool) Size() int {
  135. return txmp.txStore.Size()
  136. }
  137. // SizeBytes return the total sum in bytes of all the valid transactions in the
  138. // mempool. It is thread-safe.
  139. func (txmp *TxMempool) SizeBytes() int64 {
  140. return atomic.LoadInt64(&txmp.sizeBytes)
  141. }
  142. // FlushAppConn executes FlushSync on the mempool's proxyAppConn.
  143. //
  144. // NOTE: The caller must obtain a write-lock prior to execution.
  145. func (txmp *TxMempool) FlushAppConn(ctx context.Context) error {
  146. return txmp.proxyAppConn.Flush(ctx)
  147. }
  148. // WaitForNextTx returns a blocking channel that will be closed when the next
  149. // valid transaction is available to gossip. It is thread-safe.
  150. func (txmp *TxMempool) WaitForNextTx() <-chan struct{} {
  151. return txmp.gossipIndex.WaitChan()
  152. }
  153. // NextGossipTx returns the next valid transaction to gossip. A caller must wait
  154. // for WaitForNextTx to signal a transaction is available to gossip first. It is
  155. // thread-safe.
  156. func (txmp *TxMempool) NextGossipTx() *clist.CElement {
  157. return txmp.gossipIndex.Front()
  158. }
  159. // EnableTxsAvailable enables the mempool to trigger events when transactions
  160. // are available on a block by block basis.
  161. func (txmp *TxMempool) EnableTxsAvailable() {
  162. txmp.mtx.Lock()
  163. defer txmp.mtx.Unlock()
  164. txmp.txsAvailable = make(chan struct{}, 1)
  165. }
  166. // TxsAvailable returns a channel which fires once for every height, and only
  167. // when transactions are available in the mempool. It is thread-safe.
  168. func (txmp *TxMempool) TxsAvailable() <-chan struct{} {
  169. return txmp.txsAvailable
  170. }
  171. // CheckTx executes the ABCI CheckTx method for a given transaction.
  172. // It acquires a read-lock and attempts to execute the application's
  173. // CheckTx ABCI method synchronously. We return an error if any of
  174. // the following happen:
  175. //
  176. // - The CheckTx execution fails.
  177. // - The transaction already exists in the cache and we've already received the
  178. // transaction from the peer. Otherwise, if it solely exists in the cache, we
  179. // return nil.
  180. // - The transaction size exceeds the maximum transaction size as defined by the
  181. // configuration provided to the mempool.
  182. // - The transaction fails Pre-Check (if it is defined).
  183. // - The proxyAppConn fails, e.g. the buffer is full.
  184. //
  185. // If the mempool is full, we still execute CheckTx and attempt to find a lower
  186. // priority transaction to evict. If such a transaction exists, we remove the
  187. // lower priority transaction and add the new one with higher priority.
  188. //
  189. // NOTE:
  190. // - The applications' CheckTx implementation may panic.
  191. // - The caller is not to explicitly require any locks for executing CheckTx.
  192. func (txmp *TxMempool) CheckTx(
  193. ctx context.Context,
  194. tx types.Tx,
  195. cb func(*abci.ResponseCheckTx),
  196. txInfo TxInfo,
  197. ) error {
  198. txmp.mtx.RLock()
  199. defer txmp.mtx.RUnlock()
  200. if txSize := len(tx); txSize > txmp.config.MaxTxBytes {
  201. return types.ErrTxTooLarge{
  202. Max: txmp.config.MaxTxBytes,
  203. Actual: txSize,
  204. }
  205. }
  206. if txmp.preCheck != nil {
  207. if err := txmp.preCheck(tx); err != nil {
  208. return types.ErrPreCheck{Reason: err}
  209. }
  210. }
  211. if err := txmp.proxyAppConn.Error(); err != nil {
  212. return err
  213. }
  214. txHash := tx.Key()
  215. // We add the transaction to the mempool's cache and if the
  216. // transaction is already present in the cache, i.e. false is returned, then we
  217. // check if we've seen this transaction and error if we have.
  218. if !txmp.cache.Push(tx) {
  219. txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID)
  220. return types.ErrTxInCache
  221. }
  222. res, err := txmp.proxyAppConn.CheckTx(ctx, abci.RequestCheckTx{Tx: tx})
  223. if err != nil {
  224. txmp.cache.Remove(tx)
  225. return err
  226. }
  227. if txmp.recheckCursor != nil {
  228. return errors.New("recheck cursor is non-nil")
  229. }
  230. wtx := &WrappedTx{
  231. tx: tx,
  232. hash: txHash,
  233. timestamp: time.Now().UTC(),
  234. height: txmp.height,
  235. }
  236. txmp.defaultTxCallback(tx, res)
  237. txmp.initTxCallback(wtx, res, txInfo)
  238. if cb != nil {
  239. cb(res)
  240. }
  241. return nil
  242. }
  243. func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {
  244. txmp.Lock()
  245. defer txmp.Unlock()
  246. // remove the committed transaction from the transaction store and indexes
  247. if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil {
  248. txmp.removeTx(wtx, false)
  249. return nil
  250. }
  251. return errors.New("transaction not found")
  252. }
  253. // Flush empties the mempool. It acquires a read-lock, fetches all the
  254. // transactions currently in the transaction store and removes each transaction
  255. // from the store and all indexes and finally resets the cache.
  256. //
  257. // NOTE:
  258. // - Flushing the mempool may leave the mempool in an inconsistent state.
  259. func (txmp *TxMempool) Flush() {
  260. txmp.mtx.RLock()
  261. defer txmp.mtx.RUnlock()
  262. txmp.heightIndex.Reset()
  263. txmp.timestampIndex.Reset()
  264. for _, wtx := range txmp.txStore.GetAllTxs() {
  265. txmp.removeTx(wtx, false)
  266. }
  267. atomic.SwapInt64(&txmp.sizeBytes, 0)
  268. txmp.cache.Reset()
  269. }
  270. // ReapMaxBytesMaxGas returns a list of transactions within the provided size
  271. // and gas constraints. Transaction are retrieved in priority order.
  272. //
  273. // NOTE:
  274. // - Transactions returned are not removed from the mempool transaction
  275. // store or indexes.
  276. func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
  277. txmp.mtx.RLock()
  278. defer txmp.mtx.RUnlock()
  279. var (
  280. totalGas int64
  281. totalSize int64
  282. )
  283. // wTxs contains a list of *WrappedTx retrieved from the priority queue that
  284. // need to be re-enqueued prior to returning.
  285. wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs())
  286. defer func() {
  287. for _, wtx := range wTxs {
  288. txmp.priorityIndex.PushTx(wtx)
  289. }
  290. }()
  291. txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs())
  292. for txmp.priorityIndex.NumTxs() > 0 {
  293. wtx := txmp.priorityIndex.PopTx()
  294. txs = append(txs, wtx.tx)
  295. wTxs = append(wTxs, wtx)
  296. size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx})
  297. // Ensure we have capacity for the transaction with respect to the
  298. // transaction size.
  299. if maxBytes > -1 && totalSize+size > maxBytes {
  300. return txs[:len(txs)-1]
  301. }
  302. totalSize += size
  303. // ensure we have capacity for the transaction with respect to total gas
  304. gas := totalGas + wtx.gasWanted
  305. if maxGas > -1 && gas > maxGas {
  306. return txs[:len(txs)-1]
  307. }
  308. totalGas = gas
  309. }
  310. return txs
  311. }
  312. // ReapMaxTxs returns a list of transactions within the provided number of
  313. // transactions bound. Transaction are retrieved in priority order.
  314. //
  315. // NOTE:
  316. // - Transactions returned are not removed from the mempool transaction
  317. // store or indexes.
  318. func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
  319. txmp.mtx.RLock()
  320. defer txmp.mtx.RUnlock()
  321. numTxs := txmp.priorityIndex.NumTxs()
  322. if max < 0 {
  323. max = numTxs
  324. }
  325. cap := tmmath.MinInt(numTxs, max)
  326. // wTxs contains a list of *WrappedTx retrieved from the priority queue that
  327. // need to be re-enqueued prior to returning.
  328. wTxs := make([]*WrappedTx, 0, cap)
  329. txs := make([]types.Tx, 0, cap)
  330. for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max {
  331. wtx := txmp.priorityIndex.PopTx()
  332. txs = append(txs, wtx.tx)
  333. wTxs = append(wTxs, wtx)
  334. }
  335. for _, wtx := range wTxs {
  336. txmp.priorityIndex.PushTx(wtx)
  337. }
  338. return txs
  339. }
  340. // Update iterates over all the transactions provided by the block producer,
  341. // removes them from the cache (if applicable), and removes
  342. // the transactions from the main transaction store and associated indexes.
  343. // If there are transactions remaining in the mempool, we initiate a
  344. // re-CheckTx for them (if applicable), otherwise, we notify the caller more
  345. // transactions are available.
  346. //
  347. // NOTE:
  348. // - The caller must explicitly acquire a write-lock.
  349. func (txmp *TxMempool) Update(
  350. ctx context.Context,
  351. blockHeight int64,
  352. blockTxs types.Txs,
  353. execTxResult []*abci.ExecTxResult,
  354. newPreFn PreCheckFunc,
  355. newPostFn PostCheckFunc,
  356. ) error {
  357. txmp.height = blockHeight
  358. txmp.notifiedTxsAvailable = false
  359. if newPreFn != nil {
  360. txmp.preCheck = newPreFn
  361. }
  362. if newPostFn != nil {
  363. txmp.postCheck = newPostFn
  364. }
  365. for i, tx := range blockTxs {
  366. if execTxResult[i].Code == abci.CodeTypeOK {
  367. // add the valid committed transaction to the cache (if missing)
  368. _ = txmp.cache.Push(tx)
  369. } else if !txmp.config.KeepInvalidTxsInCache {
  370. // allow invalid transactions to be re-submitted
  371. txmp.cache.Remove(tx)
  372. }
  373. // remove the committed transaction from the transaction store and indexes
  374. if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
  375. txmp.removeTx(wtx, false)
  376. }
  377. }
  378. txmp.purgeExpiredTxs(blockHeight)
  379. // If there any uncommitted transactions left in the mempool, we either
  380. // initiate re-CheckTx per remaining transaction or notify that remaining
  381. // transactions are left.
  382. if txmp.Size() > 0 {
  383. if txmp.config.Recheck {
  384. txmp.logger.Debug(
  385. "executing re-CheckTx for all remaining transactions",
  386. "num_txs", txmp.Size(),
  387. "height", blockHeight,
  388. )
  389. txmp.updateReCheckTxs(ctx)
  390. } else {
  391. txmp.notifyTxsAvailable()
  392. }
  393. }
  394. txmp.metrics.Size.Set(float64(txmp.Size()))
  395. return nil
  396. }
  397. // initTxCallback is the callback invoked for a new unique transaction after CheckTx
  398. // has been executed by the ABCI application for the first time on that transaction.
  399. // CheckTx can be called again for the same transaction later when re-checking;
  400. // however, this callback will not be called.
  401. //
  402. // initTxCallback runs after the ABCI application executes CheckTx.
  403. // It runs the postCheck hook if one is defined on the mempool.
  404. // If the CheckTx response response code is not OK, or if the postCheck hook
  405. // reports an error, the transaction is rejected. Otherwise, we attempt to insert
  406. // the transaction into the mempool.
  407. //
  408. // When inserting a transaction, we first check if there is sufficient capacity.
  409. // If there is, the transaction is added to the txStore and all indexes.
  410. // Otherwise, if the mempool is full, we attempt to find a lower priority transaction
  411. // to evict in place of the new incoming transaction. If no such transaction exists,
  412. // the new incoming transaction is rejected.
  413. //
  414. // NOTE:
  415. // - An explicit lock is NOT required.
  416. func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.ResponseCheckTx, txInfo TxInfo) {
  417. var err error
  418. if txmp.postCheck != nil {
  419. err = txmp.postCheck(wtx.tx, res)
  420. }
  421. if err != nil || res.Code != abci.CodeTypeOK {
  422. // ignore bad transactions
  423. txmp.logger.Info(
  424. "rejected bad transaction",
  425. "priority", wtx.priority,
  426. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  427. "peer_id", txInfo.SenderNodeID,
  428. "code", res.Code,
  429. "post_check_err", err,
  430. )
  431. txmp.metrics.FailedTxs.Add(1)
  432. if !txmp.config.KeepInvalidTxsInCache {
  433. txmp.cache.Remove(wtx.tx)
  434. }
  435. if err != nil {
  436. res.MempoolError = err.Error()
  437. }
  438. return
  439. }
  440. sender := res.Sender
  441. priority := res.Priority
  442. if len(sender) > 0 {
  443. if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil {
  444. txmp.logger.Error(
  445. "rejected incoming good transaction; tx already exists for sender",
  446. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  447. "sender", sender,
  448. )
  449. txmp.metrics.RejectedTxs.Add(1)
  450. return
  451. }
  452. }
  453. if err := txmp.canAddTx(wtx); err != nil {
  454. evictTxs := txmp.priorityIndex.GetEvictableTxs(
  455. priority,
  456. int64(wtx.Size()),
  457. txmp.SizeBytes(),
  458. txmp.config.MaxTxsBytes,
  459. )
  460. if len(evictTxs) == 0 {
  461. // No room for the new incoming transaction so we just remove it from
  462. // the cache.
  463. txmp.cache.Remove(wtx.tx)
  464. txmp.logger.Error(
  465. "rejected incoming good transaction; mempool full",
  466. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  467. "err", err.Error(),
  468. )
  469. txmp.metrics.RejectedTxs.Add(1)
  470. return
  471. }
  472. // evict an existing transaction(s)
  473. //
  474. // NOTE:
  475. // - The transaction, toEvict, can be removed while a concurrent
  476. // reCheckTx callback is being executed for the same transaction.
  477. for _, toEvict := range evictTxs {
  478. txmp.removeTx(toEvict, true)
  479. txmp.logger.Debug(
  480. "evicted existing good transaction; mempool full",
  481. "old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
  482. "old_priority", toEvict.priority,
  483. "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  484. "new_priority", wtx.priority,
  485. )
  486. txmp.metrics.EvictedTxs.Add(1)
  487. }
  488. }
  489. wtx.gasWanted = res.GasWanted
  490. wtx.priority = priority
  491. wtx.sender = sender
  492. wtx.peers = map[uint16]struct{}{
  493. txInfo.SenderID: {},
  494. }
  495. txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
  496. txmp.metrics.Size.Set(float64(txmp.Size()))
  497. txmp.insertTx(wtx)
  498. txmp.logger.Debug(
  499. "inserted good transaction",
  500. "priority", wtx.priority,
  501. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  502. "height", txmp.height,
  503. "num_txs", txmp.Size(),
  504. )
  505. txmp.notifyTxsAvailable()
  506. }
  507. // defaultTxCallback is the CheckTx application callback used when a
  508. // transaction is being re-checked (if re-checking is enabled). The
  509. // caller must hold a mempool write-lock (via Lock()) and when
  510. // executing Update(), if the mempool is non-empty and Recheck is
  511. // enabled, then all remaining transactions will be rechecked via
  512. // CheckTx. The order transactions are rechecked must be the same as
  513. // the order in which this callback is called.
  514. func (txmp *TxMempool) defaultTxCallback(tx types.Tx, res *abci.ResponseCheckTx) {
  515. if txmp.recheckCursor == nil {
  516. return
  517. }
  518. txmp.metrics.RecheckTimes.Add(1)
  519. wtx := txmp.recheckCursor.Value.(*WrappedTx)
  520. // Search through the remaining list of tx to recheck for a transaction that matches
  521. // the one we received from the ABCI application.
  522. for {
  523. if bytes.Equal(tx, wtx.tx) {
  524. // We've found a tx in the recheck list that matches the tx that we
  525. // received from the ABCI application.
  526. // Break, and use this transaction for further checks.
  527. break
  528. }
  529. txmp.logger.Error(
  530. "re-CheckTx transaction mismatch",
  531. "got", wtx.tx.Hash(),
  532. "expected", tx.Key(),
  533. )
  534. if txmp.recheckCursor == txmp.recheckEnd {
  535. // we reached the end of the recheckTx list without finding a tx
  536. // matching the one we received from the ABCI application.
  537. // Return without processing any tx.
  538. txmp.recheckCursor = nil
  539. return
  540. }
  541. txmp.recheckCursor = txmp.recheckCursor.Next()
  542. wtx = txmp.recheckCursor.Value.(*WrappedTx)
  543. }
  544. // Only evaluate transactions that have not been removed. This can happen
  545. // if an existing transaction is evicted during CheckTx and while this
  546. // callback is being executed for the same evicted transaction.
  547. if !txmp.txStore.IsTxRemoved(wtx.hash) {
  548. var err error
  549. if txmp.postCheck != nil {
  550. err = txmp.postCheck(tx, res)
  551. }
  552. if res.Code == abci.CodeTypeOK && err == nil {
  553. wtx.priority = res.Priority
  554. } else {
  555. txmp.logger.Debug(
  556. "existing transaction no longer valid; failed re-CheckTx callback",
  557. "priority", wtx.priority,
  558. "tx", fmt.Sprintf("%X", wtx.tx.Hash()),
  559. "err", err,
  560. "code", res.Code,
  561. )
  562. if wtx.gossipEl != txmp.recheckCursor {
  563. panic("corrupted reCheckTx cursor")
  564. }
  565. txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
  566. }
  567. }
  568. // move reCheckTx cursor to next element
  569. if txmp.recheckCursor == txmp.recheckEnd {
  570. txmp.recheckCursor = nil
  571. } else {
  572. txmp.recheckCursor = txmp.recheckCursor.Next()
  573. }
  574. if txmp.recheckCursor == nil {
  575. txmp.logger.Debug("finished rechecking transactions")
  576. if txmp.Size() > 0 {
  577. txmp.notifyTxsAvailable()
  578. }
  579. }
  580. txmp.metrics.Size.Set(float64(txmp.Size()))
  581. }
  582. // updateReCheckTxs updates the recheck cursors using the gossipIndex. For
  583. // each transaction, it executes CheckTx. The global callback defined on
  584. // the proxyAppConn will be executed for each transaction after CheckTx is
  585. // executed.
  586. //
  587. // NOTE:
  588. // - The caller must have a write-lock when executing updateReCheckTxs.
  589. func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) {
  590. if txmp.Size() == 0 {
  591. panic("attempted to update re-CheckTx txs when mempool is empty")
  592. }
  593. txmp.recheckCursor = txmp.gossipIndex.Front()
  594. txmp.recheckEnd = txmp.gossipIndex.Back()
  595. for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() {
  596. wtx := e.Value.(*WrappedTx)
  597. // Only execute CheckTx if the transaction is not marked as removed which
  598. // could happen if the transaction was evicted.
  599. if !txmp.txStore.IsTxRemoved(wtx.hash) {
  600. res, err := txmp.proxyAppConn.CheckTx(ctx, abci.RequestCheckTx{
  601. Tx: wtx.tx,
  602. Type: abci.CheckTxType_Recheck,
  603. })
  604. if err != nil {
  605. // no need in retrying since the tx will be rechecked after the next block
  606. txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err)
  607. continue
  608. }
  609. txmp.defaultTxCallback(wtx.tx, res)
  610. }
  611. }
  612. if err := txmp.proxyAppConn.Flush(ctx); err != nil {
  613. txmp.logger.Error("failed to flush transactions during rechecking", "err", err)
  614. }
  615. }
  616. // canAddTx returns an error if we cannot insert the provided *WrappedTx into
  617. // the mempool due to mempool configured constraints. If it returns nil,
  618. // the transaction can be inserted into the mempool.
  619. func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
  620. var (
  621. numTxs = txmp.Size()
  622. sizeBytes = txmp.SizeBytes()
  623. )
  624. if numTxs >= txmp.config.Size || int64(wtx.Size())+sizeBytes > txmp.config.MaxTxsBytes {
  625. return types.ErrMempoolIsFull{
  626. NumTxs: numTxs,
  627. MaxTxs: txmp.config.Size,
  628. TxsBytes: sizeBytes,
  629. MaxTxsBytes: txmp.config.MaxTxsBytes,
  630. }
  631. }
  632. return nil
  633. }
  634. func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
  635. txmp.txStore.SetTx(wtx)
  636. txmp.priorityIndex.PushTx(wtx)
  637. txmp.heightIndex.Insert(wtx)
  638. txmp.timestampIndex.Insert(wtx)
  639. // Insert the transaction into the gossip index and mark the reference to the
  640. // linked-list element, which will be needed at a later point when the
  641. // transaction is removed.
  642. gossipEl := txmp.gossipIndex.PushBack(wtx)
  643. wtx.gossipEl = gossipEl
  644. atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size()))
  645. }
  646. func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
  647. if txmp.txStore.IsTxRemoved(wtx.hash) {
  648. return
  649. }
  650. txmp.txStore.RemoveTx(wtx)
  651. txmp.priorityIndex.RemoveTx(wtx)
  652. txmp.heightIndex.Remove(wtx)
  653. txmp.timestampIndex.Remove(wtx)
  654. // Remove the transaction from the gossip index and cleanup the linked-list
  655. // element so it can be garbage collected.
  656. txmp.gossipIndex.Remove(wtx.gossipEl)
  657. wtx.gossipEl.DetachPrev()
  658. atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))
  659. if removeFromCache {
  660. txmp.cache.Remove(wtx.tx)
  661. }
  662. }
  663. // purgeExpiredTxs removes all transactions that have exceeded their respective
  664. // height- and/or time-based TTLs from their respective indexes. Every expired
  665. // transaction will be removed from the mempool, but preserved in the cache.
  666. //
  667. // NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which
  668. // the caller has a write-lock on the mempool and so we can safely iterate over
  669. // the height and time based indexes.
  670. func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
  671. now := time.Now()
  672. expiredTxs := make(map[types.TxKey]*WrappedTx)
  673. if txmp.config.TTLNumBlocks > 0 {
  674. purgeIdx := -1
  675. for i, wtx := range txmp.heightIndex.txs {
  676. if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks {
  677. expiredTxs[wtx.tx.Key()] = wtx
  678. purgeIdx = i
  679. } else {
  680. // since the index is sorted, we know no other txs can be be purged
  681. break
  682. }
  683. }
  684. if purgeIdx >= 0 {
  685. txmp.heightIndex.txs = txmp.heightIndex.txs[purgeIdx+1:]
  686. }
  687. }
  688. if txmp.config.TTLDuration > 0 {
  689. purgeIdx := -1
  690. for i, wtx := range txmp.timestampIndex.txs {
  691. if now.Sub(wtx.timestamp) > txmp.config.TTLDuration {
  692. expiredTxs[wtx.tx.Key()] = wtx
  693. purgeIdx = i
  694. } else {
  695. // since the index is sorted, we know no other txs can be be purged
  696. break
  697. }
  698. }
  699. if purgeIdx >= 0 {
  700. txmp.timestampIndex.txs = txmp.timestampIndex.txs[purgeIdx+1:]
  701. }
  702. }
  703. for _, wtx := range expiredTxs {
  704. txmp.removeTx(wtx, false)
  705. }
  706. }
  707. func (txmp *TxMempool) notifyTxsAvailable() {
  708. if txmp.Size() == 0 {
  709. panic("attempt to notify txs available but mempool is empty!")
  710. }
  711. if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable {
  712. // channel cap is 1, so this will send once
  713. txmp.notifiedTxsAvailable = true
  714. select {
  715. case txmp.txsAvailable <- struct{}{}:
  716. default:
  717. }
  718. }
  719. }