|
|
@ -534,12 +534,6 @@ func (mem *Mempool) Update( |
|
|
|
preCheck PreCheckFunc, |
|
|
|
postCheck PostCheckFunc, |
|
|
|
) error { |
|
|
|
// First, create a lookup map of txns in new txs.
|
|
|
|
txsMap := make(map[string]struct{}, len(txs)) |
|
|
|
for _, tx := range txs { |
|
|
|
txsMap[string(tx)] = struct{}{} |
|
|
|
} |
|
|
|
|
|
|
|
// Set height
|
|
|
|
mem.height = height |
|
|
|
mem.notifiedTxsAvailable = false |
|
|
@ -551,12 +545,13 @@ func (mem *Mempool) Update( |
|
|
|
mem.postCheck = postCheck |
|
|
|
} |
|
|
|
|
|
|
|
// Remove transactions that are already in txs.
|
|
|
|
goodTxs := mem.filterTxs(txsMap) |
|
|
|
// Remove committed transactions.
|
|
|
|
txsLeft := mem.removeTxs(txs) |
|
|
|
|
|
|
|
// Recheck mempool txs if any txs were committed in the block
|
|
|
|
if mem.config.Recheck && len(goodTxs) > 0 { |
|
|
|
mem.logger.Info("Recheck txs", "numtxs", len(goodTxs), "height", height) |
|
|
|
mem.recheckTxs(goodTxs) |
|
|
|
if mem.config.Recheck && len(txsLeft) > 0 { |
|
|
|
mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height) |
|
|
|
mem.recheckTxs(txsLeft) |
|
|
|
// At this point, mem.txs are being rechecked.
|
|
|
|
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
|
|
|
|
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
|
|
|
@ -568,12 +563,18 @@ func (mem *Mempool) Update( |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { |
|
|
|
goodTxs := make([]types.Tx, 0, mem.txs.Len()) |
|
|
|
func (mem *Mempool) removeTxs(txs types.Txs) []types.Tx { |
|
|
|
// Build a map for faster lookups.
|
|
|
|
txsMap := make(map[string]struct{}, len(txs)) |
|
|
|
for _, tx := range txs { |
|
|
|
txsMap[string(tx)] = struct{}{} |
|
|
|
} |
|
|
|
|
|
|
|
txsLeft := make([]types.Tx, 0, mem.txs.Len()) |
|
|
|
for e := mem.txs.Front(); e != nil; e = e.Next() { |
|
|
|
memTx := e.Value.(*mempoolTx) |
|
|
|
// Remove the tx if it's alredy in a block.
|
|
|
|
if _, ok := blockTxsMap[string(memTx.tx)]; ok { |
|
|
|
// Remove the tx if it's already in a block.
|
|
|
|
if _, ok := txsMap[string(memTx.tx)]; ok { |
|
|
|
// remove from clist
|
|
|
|
mem.txs.Remove(e) |
|
|
|
e.DetachPrev() |
|
|
@ -581,15 +582,14 @@ func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { |
|
|
|
// NOTE: we don't remove committed txs from the cache.
|
|
|
|
continue |
|
|
|
} |
|
|
|
// Good tx!
|
|
|
|
goodTxs = append(goodTxs, memTx.tx) |
|
|
|
txsLeft = append(txsLeft, memTx.tx) |
|
|
|
} |
|
|
|
return goodTxs |
|
|
|
return txsLeft |
|
|
|
} |
|
|
|
|
|
|
|
// NOTE: pass in goodTxs because mem.txs can mutate concurrently.
|
|
|
|
func (mem *Mempool) recheckTxs(goodTxs []types.Tx) { |
|
|
|
if len(goodTxs) == 0 { |
|
|
|
// NOTE: pass in txs because mem.txs can mutate concurrently.
|
|
|
|
func (mem *Mempool) recheckTxs(txs []types.Tx) { |
|
|
|
if len(txs) == 0 { |
|
|
|
return |
|
|
|
} |
|
|
|
atomic.StoreInt32(&mem.rechecking, 1) |
|
|
@ -598,7 +598,7 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) { |
|
|
|
|
|
|
|
// Push txs to proxyAppConn
|
|
|
|
// NOTE: resCb() may be called concurrently.
|
|
|
|
for _, tx := range goodTxs { |
|
|
|
for _, tx := range txs { |
|
|
|
mem.proxyAppConn.CheckTxAsync(tx) |
|
|
|
} |
|
|
|
mem.proxyAppConn.FlushAsync() |
|
|
|