diff --git a/mempool/mempool.go b/mempool/mempool.go index 0bdb47140..136f7abf8 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -534,12 +534,6 @@ func (mem *Mempool) Update( preCheck PreCheckFunc, postCheck PostCheckFunc, ) error { - // First, create a lookup map of txns in new txs. - txsMap := make(map[string]struct{}, len(txs)) - for _, tx := range txs { - txsMap[string(tx)] = struct{}{} - } - // Set height mem.height = height mem.notifiedTxsAvailable = false @@ -551,12 +545,13 @@ func (mem *Mempool) Update( mem.postCheck = postCheck } - // Remove transactions that are already in txs. - goodTxs := mem.filterTxs(txsMap) + // Remove committed transactions. + txsLeft := mem.removeTxs(txs) + // Recheck mempool txs if any txs were committed in the block - if mem.config.Recheck && len(goodTxs) > 0 { - mem.logger.Info("Recheck txs", "numtxs", len(goodTxs), "height", height) - mem.recheckTxs(goodTxs) + if mem.config.Recheck && len(txsLeft) > 0 { + mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height) + mem.recheckTxs(txsLeft) // At this point, mem.txs are being rechecked. // mem.recheckCursor re-scans mem.txs and possibly removes some txs. // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. @@ -568,12 +563,18 @@ func (mem *Mempool) Update( return nil } -func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { - goodTxs := make([]types.Tx, 0, mem.txs.Len()) +func (mem *Mempool) removeTxs(txs types.Txs) []types.Tx { + // Build a map for faster lookups. + txsMap := make(map[string]struct{}, len(txs)) + for _, tx := range txs { + txsMap[string(tx)] = struct{}{} + } + + txsLeft := make([]types.Tx, 0, mem.txs.Len()) for e := mem.txs.Front(); e != nil; e = e.Next() { memTx := e.Value.(*mempoolTx) - // Remove the tx if it's alredy in a block. - if _, ok := blockTxsMap[string(memTx.tx)]; ok { + // Remove the tx if it's already in a block. + if _, ok := txsMap[string(memTx.tx)]; ok { // remove from clist mem.txs.Remove(e) e.DetachPrev() @@ -581,15 +582,14 @@ func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { // NOTE: we don't remove committed txs from the cache. continue } - // Good tx! - goodTxs = append(goodTxs, memTx.tx) + txsLeft = append(txsLeft, memTx.tx) } - return goodTxs + return txsLeft } -// NOTE: pass in goodTxs because mem.txs can mutate concurrently. -func (mem *Mempool) recheckTxs(goodTxs []types.Tx) { - if len(goodTxs) == 0 { +// NOTE: pass in txs because mem.txs can mutate concurrently. +func (mem *Mempool) recheckTxs(txs []types.Tx) { + if len(txs) == 0 { return } atomic.StoreInt32(&mem.rechecking, 1) @@ -598,7 +598,7 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) { // Push txs to proxyAppConn // NOTE: resCb() may be called concurrently. - for _, tx := range goodTxs { + for _, tx := range txs { mem.proxyAppConn.CheckTxAsync(tx) } mem.proxyAppConn.FlushAsync()