|
package mempool
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
"github.com/tendermint/tendermint/config"
|
|
"github.com/tendermint/tendermint/internal/libs/clist"
|
|
"github.com/tendermint/tendermint/internal/proxy"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
tmmath "github.com/tendermint/tendermint/libs/math"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
var _ Mempool = (*TxMempool)(nil)
|
|
|
|
// TxMempoolOption sets an optional parameter on the TxMempool.
|
|
type TxMempoolOption func(*TxMempool)
|
|
|
|
// TxMempool defines a prioritized mempool data structure used by the v1 mempool
|
|
// reactor. It keeps a thread-safe priority queue of transactions that is used
|
|
// when a block proposer constructs a block and a thread-safe linked-list that
|
|
// is used to gossip transactions to peers in a FIFO manner.
|
|
type TxMempool struct {
|
|
logger log.Logger
|
|
metrics *Metrics
|
|
config *config.MempoolConfig
|
|
proxyAppConn proxy.AppConnMempool
|
|
|
|
// txsAvailable fires once for each height when the mempool is not empty
|
|
txsAvailable chan struct{}
|
|
notifiedTxsAvailable bool
|
|
|
|
// height defines the last block height process during Update()
|
|
height int64
|
|
|
|
// sizeBytes defines the total size of the mempool (sum of all tx bytes)
|
|
sizeBytes int64
|
|
|
|
// cache defines a fixed-size cache of already seen transactions as this
|
|
// reduces pressure on the proxyApp.
|
|
cache TxCache
|
|
|
|
// txStore defines the main storage of valid transactions. Indexes are built
|
|
// on top of this store.
|
|
txStore *TxStore
|
|
|
|
// gossipIndex defines the gossiping index of valid transactions via a
|
|
// thread-safe linked-list. We also use the gossip index as a cursor for
|
|
// rechecking transactions already in the mempool.
|
|
gossipIndex *clist.CList
|
|
|
|
// recheckCursor and recheckEnd are used as cursors based on the gossip index
|
|
// to recheck transactions that are already in the mempool. Iteration is not
|
|
// thread-safe and transaction may be mutated in serial order.
|
|
//
|
|
// XXX/TODO: It might be somewhat of a codesmell to use the gossip index for
|
|
// iterator and cursor management when rechecking transactions. If the gossip
|
|
// index changes or is removed in a future refactor, this will have to be
|
|
// refactored. Instead, we should consider just keeping a slice of a snapshot
|
|
// of the mempool's current transactions during Update and an integer cursor
|
|
// into that slice. This, however, requires additional O(n) space complexity.
|
|
recheckCursor *clist.CElement // next expected response
|
|
recheckEnd *clist.CElement // re-checking stops here
|
|
|
|
// priorityIndex defines the priority index of valid transactions via a
|
|
// thread-safe priority queue.
|
|
priorityIndex *TxPriorityQueue
|
|
|
|
// heightIndex defines a height-based, in ascending order, transaction index.
|
|
// i.e. older transactions are first.
|
|
heightIndex *WrappedTxList
|
|
|
|
// timestampIndex defines a timestamp-based, in ascending order, transaction
|
|
// index. i.e. older transactions are first.
|
|
timestampIndex *WrappedTxList
|
|
|
|
// A read/write lock is used to safe guard updates, insertions and deletions
|
|
// from the mempool. A read-lock is implicitly acquired when executing CheckTx,
|
|
// however, a caller must explicitly grab a write-lock via Lock when updating
|
|
// the mempool via Update().
|
|
mtx sync.RWMutex
|
|
preCheck PreCheckFunc
|
|
postCheck PostCheckFunc
|
|
}
|
|
|
|
func NewTxMempool(
|
|
logger log.Logger,
|
|
cfg *config.MempoolConfig,
|
|
proxyAppConn proxy.AppConnMempool,
|
|
height int64,
|
|
options ...TxMempoolOption,
|
|
) *TxMempool {
|
|
|
|
txmp := &TxMempool{
|
|
logger: logger,
|
|
config: cfg,
|
|
proxyAppConn: proxyAppConn,
|
|
height: height,
|
|
cache: NopTxCache{},
|
|
metrics: NopMetrics(),
|
|
txStore: NewTxStore(),
|
|
gossipIndex: clist.New(),
|
|
priorityIndex: NewTxPriorityQueue(),
|
|
heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
|
|
return wtx1.height >= wtx2.height
|
|
}),
|
|
timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
|
|
return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
|
|
}),
|
|
}
|
|
|
|
if cfg.CacheSize > 0 {
|
|
txmp.cache = NewLRUTxCache(cfg.CacheSize)
|
|
}
|
|
|
|
proxyAppConn.SetResponseCallback(txmp.defaultTxCallback)
|
|
|
|
for _, opt := range options {
|
|
opt(txmp)
|
|
}
|
|
|
|
return txmp
|
|
}
|
|
|
|
// WithPreCheck sets a filter for the mempool to reject a transaction if f(tx)
|
|
// returns an error. This is executed before CheckTx. It only applies to the
|
|
// first created block. After that, Update() overwrites the existing value.
|
|
func WithPreCheck(f PreCheckFunc) TxMempoolOption {
|
|
return func(txmp *TxMempool) { txmp.preCheck = f }
|
|
}
|
|
|
|
// WithPostCheck sets a filter for the mempool to reject a transaction if
|
|
// f(tx, resp) returns an error. This is executed after CheckTx. It only applies
|
|
// to the first created block. After that, Update overwrites the existing value.
|
|
func WithPostCheck(f PostCheckFunc) TxMempoolOption {
|
|
return func(txmp *TxMempool) { txmp.postCheck = f }
|
|
}
|
|
|
|
// WithMetrics sets the mempool's metrics collector.
|
|
func WithMetrics(metrics *Metrics) TxMempoolOption {
|
|
return func(txmp *TxMempool) { txmp.metrics = metrics }
|
|
}
|
|
|
|
// Lock obtains a write-lock on the mempool. A caller must be sure to explicitly
|
|
// release the lock when finished.
|
|
func (txmp *TxMempool) Lock() {
|
|
txmp.mtx.Lock()
|
|
}
|
|
|
|
// Unlock releases a write-lock on the mempool.
|
|
func (txmp *TxMempool) Unlock() {
|
|
txmp.mtx.Unlock()
|
|
}
|
|
|
|
// Size returns the number of valid transactions in the mempool. It is
|
|
// thread-safe.
|
|
func (txmp *TxMempool) Size() int {
|
|
return txmp.txStore.Size()
|
|
}
|
|
|
|
// SizeBytes return the total sum in bytes of all the valid transactions in the
|
|
// mempool. It is thread-safe.
|
|
func (txmp *TxMempool) SizeBytes() int64 {
|
|
return atomic.LoadInt64(&txmp.sizeBytes)
|
|
}
|
|
|
|
// FlushAppConn executes FlushSync on the mempool's proxyAppConn.
|
|
//
|
|
// NOTE: The caller must obtain a write-lock prior to execution.
|
|
func (txmp *TxMempool) FlushAppConn(ctx context.Context) error {
|
|
return txmp.proxyAppConn.Flush(ctx)
|
|
}
|
|
|
|
// WaitForNextTx returns a blocking channel that will be closed when the next
|
|
// valid transaction is available to gossip. It is thread-safe.
|
|
func (txmp *TxMempool) WaitForNextTx() <-chan struct{} {
|
|
return txmp.gossipIndex.WaitChan()
|
|
}
|
|
|
|
// NextGossipTx returns the next valid transaction to gossip. A caller must wait
|
|
// for WaitForNextTx to signal a transaction is available to gossip first. It is
|
|
// thread-safe.
|
|
func (txmp *TxMempool) NextGossipTx() *clist.CElement {
|
|
return txmp.gossipIndex.Front()
|
|
}
|
|
|
|
// EnableTxsAvailable enables the mempool to trigger events when transactions
|
|
// are available on a block by block basis.
|
|
func (txmp *TxMempool) EnableTxsAvailable() {
|
|
txmp.mtx.Lock()
|
|
defer txmp.mtx.Unlock()
|
|
|
|
txmp.txsAvailable = make(chan struct{}, 1)
|
|
}
|
|
|
|
// TxsAvailable returns a channel which fires once for every height, and only
|
|
// when transactions are available in the mempool. It is thread-safe.
|
|
func (txmp *TxMempool) TxsAvailable() <-chan struct{} {
|
|
return txmp.txsAvailable
|
|
}
|
|
|
|
// CheckTx executes the ABCI CheckTx method for a given transaction. It acquires
|
|
// a read-lock attempts to execute the application's CheckTx ABCI method via
|
|
// CheckTxAsync. We return an error if any of the following happen:
|
|
//
|
|
// - The CheckTxAsync execution fails.
|
|
// - The transaction already exists in the cache and we've already received the
|
|
// transaction from the peer. Otherwise, if it solely exists in the cache, we
|
|
// return nil.
|
|
// - The transaction size exceeds the maximum transaction size as defined by the
|
|
// configuration provided to the mempool.
|
|
// - The transaction fails Pre-Check (if it is defined).
|
|
// - The proxyAppConn fails, e.g. the buffer is full.
|
|
//
|
|
// If the mempool is full, we still execute CheckTx and attempt to find a lower
|
|
// priority transaction to evict. If such a transaction exists, we remove the
|
|
// lower priority transaction and add the new one with higher priority.
|
|
//
|
|
// NOTE:
|
|
// - The applications' CheckTx implementation may panic.
|
|
// - The caller is not to explicitly require any locks for executing CheckTx.
|
|
func (txmp *TxMempool) CheckTx(
|
|
ctx context.Context,
|
|
tx types.Tx,
|
|
cb func(*abci.Response),
|
|
txInfo TxInfo,
|
|
) error {
|
|
txmp.mtx.RLock()
|
|
defer txmp.mtx.RUnlock()
|
|
|
|
if txSize := len(tx); txSize > txmp.config.MaxTxBytes {
|
|
return types.ErrTxTooLarge{
|
|
Max: txmp.config.MaxTxBytes,
|
|
Actual: txSize,
|
|
}
|
|
}
|
|
|
|
if txmp.preCheck != nil {
|
|
if err := txmp.preCheck(tx); err != nil {
|
|
return types.ErrPreCheck{Reason: err}
|
|
}
|
|
}
|
|
|
|
if err := txmp.proxyAppConn.Error(); err != nil {
|
|
return err
|
|
}
|
|
|
|
txHash := tx.Key()
|
|
|
|
// We add the transaction to the mempool's cache and if the
|
|
// transaction is already present in the cache, i.e. false is returned, then we
|
|
// check if we've seen this transaction and error if we have.
|
|
if !txmp.cache.Push(tx) {
|
|
txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID)
|
|
return types.ErrTxInCache
|
|
}
|
|
|
|
reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})
|
|
if err != nil {
|
|
txmp.cache.Remove(tx)
|
|
return err
|
|
}
|
|
|
|
reqRes.SetCallback(func(res *abci.Response) {
|
|
if txmp.recheckCursor != nil {
|
|
panic("recheck cursor is non-nil in CheckTx callback")
|
|
}
|
|
|
|
wtx := &WrappedTx{
|
|
tx: tx,
|
|
hash: txHash,
|
|
timestamp: time.Now().UTC(),
|
|
height: txmp.height,
|
|
}
|
|
txmp.initTxCallback(wtx, res, txInfo)
|
|
|
|
if cb != nil {
|
|
cb(res)
|
|
}
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {
|
|
txmp.Lock()
|
|
defer txmp.Unlock()
|
|
|
|
// remove the committed transaction from the transaction store and indexes
|
|
if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil {
|
|
txmp.removeTx(wtx, false)
|
|
return nil
|
|
}
|
|
|
|
return errors.New("transaction not found")
|
|
}
|
|
|
|
// Flush empties the mempool. It acquires a read-lock, fetches all the
|
|
// transactions currently in the transaction store and removes each transaction
|
|
// from the store and all indexes and finally resets the cache.
|
|
//
|
|
// NOTE:
|
|
// - Flushing the mempool may leave the mempool in an inconsistent state.
|
|
func (txmp *TxMempool) Flush() {
|
|
txmp.mtx.RLock()
|
|
defer txmp.mtx.RUnlock()
|
|
|
|
txmp.heightIndex.Reset()
|
|
txmp.timestampIndex.Reset()
|
|
|
|
for _, wtx := range txmp.txStore.GetAllTxs() {
|
|
txmp.removeTx(wtx, false)
|
|
}
|
|
|
|
atomic.SwapInt64(&txmp.sizeBytes, 0)
|
|
txmp.cache.Reset()
|
|
}
|
|
|
|
// ReapMaxBytesMaxGas returns a list of transactions within the provided size
|
|
// and gas constraints. Transaction are retrieved in priority order.
|
|
//
|
|
// NOTE:
|
|
// - Transactions returned are not removed from the mempool transaction
|
|
// store or indexes.
|
|
func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
|
|
txmp.mtx.RLock()
|
|
defer txmp.mtx.RUnlock()
|
|
|
|
var (
|
|
totalGas int64
|
|
totalSize int64
|
|
)
|
|
|
|
// wTxs contains a list of *WrappedTx retrieved from the priority queue that
|
|
// need to be re-enqueued prior to returning.
|
|
wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs())
|
|
defer func() {
|
|
for _, wtx := range wTxs {
|
|
txmp.priorityIndex.PushTx(wtx)
|
|
}
|
|
}()
|
|
|
|
txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs())
|
|
for txmp.priorityIndex.NumTxs() > 0 {
|
|
wtx := txmp.priorityIndex.PopTx()
|
|
txs = append(txs, wtx.tx)
|
|
wTxs = append(wTxs, wtx)
|
|
size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx})
|
|
|
|
// Ensure we have capacity for the transaction with respect to the
|
|
// transaction size.
|
|
if maxBytes > -1 && totalSize+size > maxBytes {
|
|
return txs[:len(txs)-1]
|
|
}
|
|
|
|
totalSize += size
|
|
|
|
// ensure we have capacity for the transaction with respect to total gas
|
|
gas := totalGas + wtx.gasWanted
|
|
if maxGas > -1 && gas > maxGas {
|
|
return txs[:len(txs)-1]
|
|
}
|
|
|
|
totalGas = gas
|
|
}
|
|
|
|
return txs
|
|
}
|
|
|
|
// ReapMaxTxs returns a list of transactions within the provided number of
|
|
// transactions bound. Transaction are retrieved in priority order.
|
|
//
|
|
// NOTE:
|
|
// - Transactions returned are not removed from the mempool transaction
|
|
// store or indexes.
|
|
func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
|
|
txmp.mtx.RLock()
|
|
defer txmp.mtx.RUnlock()
|
|
|
|
numTxs := txmp.priorityIndex.NumTxs()
|
|
if max < 0 {
|
|
max = numTxs
|
|
}
|
|
|
|
cap := tmmath.MinInt(numTxs, max)
|
|
|
|
// wTxs contains a list of *WrappedTx retrieved from the priority queue that
|
|
// need to be re-enqueued prior to returning.
|
|
wTxs := make([]*WrappedTx, 0, cap)
|
|
txs := make([]types.Tx, 0, cap)
|
|
for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max {
|
|
wtx := txmp.priorityIndex.PopTx()
|
|
txs = append(txs, wtx.tx)
|
|
wTxs = append(wTxs, wtx)
|
|
}
|
|
for _, wtx := range wTxs {
|
|
txmp.priorityIndex.PushTx(wtx)
|
|
}
|
|
return txs
|
|
}
|
|
|
|
// Update iterates over all the transactions provided by the block producer,
|
|
// removes them from the cache (if applicable), and removes
|
|
// the transactions from the main transaction store and associated indexes.
|
|
// If there are transactions remaining in the mempool, we initiate a
|
|
// re-CheckTx for them (if applicable), otherwise, we notify the caller more
|
|
// transactions are available.
|
|
//
|
|
// NOTE:
|
|
// - The caller must explicitly acquire a write-lock.
|
|
func (txmp *TxMempool) Update(
|
|
ctx context.Context,
|
|
blockHeight int64,
|
|
blockTxs types.Txs,
|
|
deliverTxResponses []*abci.ResponseDeliverTx,
|
|
newPreFn PreCheckFunc,
|
|
newPostFn PostCheckFunc,
|
|
) error {
|
|
|
|
txmp.height = blockHeight
|
|
txmp.notifiedTxsAvailable = false
|
|
|
|
if newPreFn != nil {
|
|
txmp.preCheck = newPreFn
|
|
}
|
|
if newPostFn != nil {
|
|
txmp.postCheck = newPostFn
|
|
}
|
|
|
|
for i, tx := range blockTxs {
|
|
if deliverTxResponses[i].Code == abci.CodeTypeOK {
|
|
// add the valid committed transaction to the cache (if missing)
|
|
_ = txmp.cache.Push(tx)
|
|
} else if !txmp.config.KeepInvalidTxsInCache {
|
|
// allow invalid transactions to be re-submitted
|
|
txmp.cache.Remove(tx)
|
|
}
|
|
|
|
// remove the committed transaction from the transaction store and indexes
|
|
if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
|
|
txmp.removeTx(wtx, false)
|
|
}
|
|
}
|
|
|
|
txmp.purgeExpiredTxs(blockHeight)
|
|
|
|
// If there any uncommitted transactions left in the mempool, we either
|
|
// initiate re-CheckTx per remaining transaction or notify that remaining
|
|
// transactions are left.
|
|
if txmp.Size() > 0 {
|
|
if txmp.config.Recheck {
|
|
txmp.logger.Debug(
|
|
"executing re-CheckTx for all remaining transactions",
|
|
"num_txs", txmp.Size(),
|
|
"height", blockHeight,
|
|
)
|
|
txmp.updateReCheckTxs(ctx)
|
|
} else {
|
|
txmp.notifyTxsAvailable()
|
|
}
|
|
}
|
|
|
|
txmp.metrics.Size.Set(float64(txmp.Size()))
|
|
return nil
|
|
}
|
|
|
|
// initTxCallback is the callback invoked for a new unique transaction after CheckTx
|
|
// has been executed by the ABCI application for the first time on that transaction.
|
|
// CheckTx can be called again for the same transaction later when re-checking;
|
|
// however, this callback will not be called.
|
|
//
|
|
// initTxCallback runs after the ABCI application executes CheckTx.
|
|
// It runs the postCheck hook if one is defined on the mempool.
|
|
// If the CheckTx response response code is not OK, or if the postCheck hook
|
|
// reports an error, the transaction is rejected. Otherwise, we attempt to insert
|
|
// the transaction into the mempool.
|
|
//
|
|
// When inserting a transaction, we first check if there is sufficient capacity.
|
|
// If there is, the transaction is added to the txStore and all indexes.
|
|
// Otherwise, if the mempool is full, we attempt to find a lower priority transaction
|
|
// to evict in place of the new incoming transaction. If no such transaction exists,
|
|
// the new incoming transaction is rejected.
|
|
//
|
|
// NOTE:
|
|
// - An explicit lock is NOT required.
|
|
func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo TxInfo) {
|
|
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
var err error
|
|
if txmp.postCheck != nil {
|
|
err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx)
|
|
}
|
|
|
|
if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK {
|
|
// ignore bad transactions
|
|
txmp.logger.Info(
|
|
"rejected bad transaction",
|
|
"priority", wtx.priority,
|
|
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
|
"peer_id", txInfo.SenderNodeID,
|
|
"code", checkTxRes.CheckTx.Code,
|
|
"post_check_err", err,
|
|
)
|
|
|
|
txmp.metrics.FailedTxs.Add(1)
|
|
|
|
if !txmp.config.KeepInvalidTxsInCache {
|
|
txmp.cache.Remove(wtx.tx)
|
|
}
|
|
if err != nil {
|
|
checkTxRes.CheckTx.MempoolError = err.Error()
|
|
}
|
|
return
|
|
}
|
|
|
|
sender := checkTxRes.CheckTx.Sender
|
|
priority := checkTxRes.CheckTx.Priority
|
|
|
|
if len(sender) > 0 {
|
|
if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil {
|
|
txmp.logger.Error(
|
|
"rejected incoming good transaction; tx already exists for sender",
|
|
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
|
"sender", sender,
|
|
)
|
|
txmp.metrics.RejectedTxs.Add(1)
|
|
return
|
|
}
|
|
}
|
|
|
|
if err := txmp.canAddTx(wtx); err != nil {
|
|
evictTxs := txmp.priorityIndex.GetEvictableTxs(
|
|
priority,
|
|
int64(wtx.Size()),
|
|
txmp.SizeBytes(),
|
|
txmp.config.MaxTxsBytes,
|
|
)
|
|
if len(evictTxs) == 0 {
|
|
// No room for the new incoming transaction so we just remove it from
|
|
// the cache.
|
|
txmp.cache.Remove(wtx.tx)
|
|
txmp.logger.Error(
|
|
"rejected incoming good transaction; mempool full",
|
|
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
|
"err", err.Error(),
|
|
)
|
|
txmp.metrics.RejectedTxs.Add(1)
|
|
return
|
|
}
|
|
|
|
// evict an existing transaction(s)
|
|
//
|
|
// NOTE:
|
|
// - The transaction, toEvict, can be removed while a concurrent
|
|
// reCheckTx callback is being executed for the same transaction.
|
|
for _, toEvict := range evictTxs {
|
|
txmp.removeTx(toEvict, true)
|
|
txmp.logger.Debug(
|
|
"evicted existing good transaction; mempool full",
|
|
"old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
|
|
"old_priority", toEvict.priority,
|
|
"new_tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
|
"new_priority", wtx.priority,
|
|
)
|
|
txmp.metrics.EvictedTxs.Add(1)
|
|
}
|
|
}
|
|
|
|
wtx.gasWanted = checkTxRes.CheckTx.GasWanted
|
|
wtx.priority = priority
|
|
wtx.sender = sender
|
|
wtx.peers = map[uint16]struct{}{
|
|
txInfo.SenderID: {},
|
|
}
|
|
|
|
txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
|
|
txmp.metrics.Size.Set(float64(txmp.Size()))
|
|
|
|
txmp.insertTx(wtx)
|
|
txmp.logger.Debug(
|
|
"inserted good transaction",
|
|
"priority", wtx.priority,
|
|
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
|
"height", txmp.height,
|
|
"num_txs", txmp.Size(),
|
|
)
|
|
txmp.notifyTxsAvailable()
|
|
}
|
|
|
|
// defaultTxCallback is the CheckTx application callback used when a transaction
|
|
// is being re-checked (if re-checking is enabled). The caller must hold a mempool
|
|
// write-lock (via Lock()) and when executing Update(), if the mempool is non-empty
|
|
// and Recheck is enabled, then all remaining transactions will be rechecked via
|
|
// CheckTxAsync. The order transactions are rechecked must be the same as the
|
|
// order in which this callback is called.
|
|
func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) {
|
|
if txmp.recheckCursor == nil {
|
|
return
|
|
}
|
|
|
|
txmp.metrics.RecheckTimes.Add(1)
|
|
|
|
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
|
|
if !ok {
|
|
txmp.logger.Error("received incorrect type in mempool callback",
|
|
"expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(),
|
|
"got", reflect.TypeOf(res.Value).Name(),
|
|
)
|
|
return
|
|
}
|
|
tx := req.GetCheckTx().Tx
|
|
wtx := txmp.recheckCursor.Value.(*WrappedTx)
|
|
|
|
// Search through the remaining list of tx to recheck for a transaction that matches
|
|
// the one we received from the ABCI application.
|
|
for {
|
|
if bytes.Equal(tx, wtx.tx) {
|
|
// We've found a tx in the recheck list that matches the tx that we
|
|
// received from the ABCI application.
|
|
// Break, and use this transaction for further checks.
|
|
break
|
|
}
|
|
|
|
txmp.logger.Error(
|
|
"re-CheckTx transaction mismatch",
|
|
"got", wtx.tx.Hash(),
|
|
"expected", types.Tx(tx).Key(),
|
|
)
|
|
|
|
if txmp.recheckCursor == txmp.recheckEnd {
|
|
// we reached the end of the recheckTx list without finding a tx
|
|
// matching the one we received from the ABCI application.
|
|
// Return without processing any tx.
|
|
txmp.recheckCursor = nil
|
|
return
|
|
}
|
|
|
|
txmp.recheckCursor = txmp.recheckCursor.Next()
|
|
wtx = txmp.recheckCursor.Value.(*WrappedTx)
|
|
}
|
|
|
|
// Only evaluate transactions that have not been removed. This can happen
|
|
// if an existing transaction is evicted during CheckTx and while this
|
|
// callback is being executed for the same evicted transaction.
|
|
if !txmp.txStore.IsTxRemoved(wtx.hash) {
|
|
var err error
|
|
if txmp.postCheck != nil {
|
|
err = txmp.postCheck(tx, checkTxRes.CheckTx)
|
|
}
|
|
|
|
if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
|
|
wtx.priority = checkTxRes.CheckTx.Priority
|
|
} else {
|
|
txmp.logger.Debug(
|
|
"existing transaction no longer valid; failed re-CheckTx callback",
|
|
"priority", wtx.priority,
|
|
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
|
"err", err,
|
|
"code", checkTxRes.CheckTx.Code,
|
|
)
|
|
|
|
if wtx.gossipEl != txmp.recheckCursor {
|
|
panic("corrupted reCheckTx cursor")
|
|
}
|
|
|
|
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
|
|
}
|
|
}
|
|
|
|
// move reCheckTx cursor to next element
|
|
if txmp.recheckCursor == txmp.recheckEnd {
|
|
txmp.recheckCursor = nil
|
|
} else {
|
|
txmp.recheckCursor = txmp.recheckCursor.Next()
|
|
}
|
|
|
|
if txmp.recheckCursor == nil {
|
|
txmp.logger.Debug("finished rechecking transactions")
|
|
|
|
if txmp.Size() > 0 {
|
|
txmp.notifyTxsAvailable()
|
|
}
|
|
}
|
|
|
|
txmp.metrics.Size.Set(float64(txmp.Size()))
|
|
}
|
|
|
|
// updateReCheckTxs updates the recheck cursors using the gossipIndex. For
|
|
// each transaction, it executes CheckTxAsync. The global callback defined on
|
|
// the proxyAppConn will be executed for each transaction after CheckTx is
|
|
// executed.
|
|
//
|
|
// NOTE:
|
|
// - The caller must have a write-lock when executing updateReCheckTxs.
|
|
func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) {
|
|
if txmp.Size() == 0 {
|
|
panic("attempted to update re-CheckTx txs when mempool is empty")
|
|
}
|
|
|
|
txmp.recheckCursor = txmp.gossipIndex.Front()
|
|
txmp.recheckEnd = txmp.gossipIndex.Back()
|
|
|
|
for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() {
|
|
wtx := e.Value.(*WrappedTx)
|
|
|
|
// Only execute CheckTx if the transaction is not marked as removed which
|
|
// could happen if the transaction was evicted.
|
|
if !txmp.txStore.IsTxRemoved(wtx.hash) {
|
|
_, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{
|
|
Tx: wtx.tx,
|
|
Type: abci.CheckTxType_Recheck,
|
|
})
|
|
if err != nil {
|
|
// no need in retrying since the tx will be rechecked after the next block
|
|
txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil {
|
|
txmp.logger.Error("failed to flush transactions during rechecking", "err", err)
|
|
}
|
|
}
|
|
|
|
// canAddTx returns an error if we cannot insert the provided *WrappedTx into
|
|
// the mempool due to mempool configured constraints. If it returns nil,
|
|
// the transaction can be inserted into the mempool.
|
|
func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
|
|
var (
|
|
numTxs = txmp.Size()
|
|
sizeBytes = txmp.SizeBytes()
|
|
)
|
|
|
|
if numTxs >= txmp.config.Size || int64(wtx.Size())+sizeBytes > txmp.config.MaxTxsBytes {
|
|
return types.ErrMempoolIsFull{
|
|
NumTxs: numTxs,
|
|
MaxTxs: txmp.config.Size,
|
|
TxsBytes: sizeBytes,
|
|
MaxTxsBytes: txmp.config.MaxTxsBytes,
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
|
|
txmp.txStore.SetTx(wtx)
|
|
txmp.priorityIndex.PushTx(wtx)
|
|
txmp.heightIndex.Insert(wtx)
|
|
txmp.timestampIndex.Insert(wtx)
|
|
|
|
// Insert the transaction into the gossip index and mark the reference to the
|
|
// linked-list element, which will be needed at a later point when the
|
|
// transaction is removed.
|
|
gossipEl := txmp.gossipIndex.PushBack(wtx)
|
|
wtx.gossipEl = gossipEl
|
|
|
|
atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size()))
|
|
}
|
|
|
|
func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
|
|
if txmp.txStore.IsTxRemoved(wtx.hash) {
|
|
return
|
|
}
|
|
|
|
txmp.txStore.RemoveTx(wtx)
|
|
txmp.priorityIndex.RemoveTx(wtx)
|
|
txmp.heightIndex.Remove(wtx)
|
|
txmp.timestampIndex.Remove(wtx)
|
|
|
|
// Remove the transaction from the gossip index and cleanup the linked-list
|
|
// element so it can be garbage collected.
|
|
txmp.gossipIndex.Remove(wtx.gossipEl)
|
|
wtx.gossipEl.DetachPrev()
|
|
|
|
atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))
|
|
|
|
if removeFromCache {
|
|
txmp.cache.Remove(wtx.tx)
|
|
}
|
|
}
|
|
|
|
// purgeExpiredTxs removes all transactions that have exceeded their respective
|
|
// height- and/or time-based TTLs from their respective indexes. Every expired
|
|
// transaction will be removed from the mempool, but preserved in the cache.
|
|
//
|
|
// NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which
|
|
// the caller has a write-lock on the mempool and so we can safely iterate over
|
|
// the height and time based indexes.
|
|
func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
|
|
now := time.Now()
|
|
expiredTxs := make(map[types.TxKey]*WrappedTx)
|
|
|
|
if txmp.config.TTLNumBlocks > 0 {
|
|
purgeIdx := -1
|
|
for i, wtx := range txmp.heightIndex.txs {
|
|
if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks {
|
|
expiredTxs[wtx.tx.Key()] = wtx
|
|
purgeIdx = i
|
|
} else {
|
|
// since the index is sorted, we know no other txs can be be purged
|
|
break
|
|
}
|
|
}
|
|
|
|
if purgeIdx >= 0 {
|
|
txmp.heightIndex.txs = txmp.heightIndex.txs[purgeIdx+1:]
|
|
}
|
|
}
|
|
|
|
if txmp.config.TTLDuration > 0 {
|
|
purgeIdx := -1
|
|
for i, wtx := range txmp.timestampIndex.txs {
|
|
if now.Sub(wtx.timestamp) > txmp.config.TTLDuration {
|
|
expiredTxs[wtx.tx.Key()] = wtx
|
|
purgeIdx = i
|
|
} else {
|
|
// since the index is sorted, we know no other txs can be be purged
|
|
break
|
|
}
|
|
}
|
|
|
|
if purgeIdx >= 0 {
|
|
txmp.timestampIndex.txs = txmp.timestampIndex.txs[purgeIdx+1:]
|
|
}
|
|
}
|
|
|
|
for _, wtx := range expiredTxs {
|
|
txmp.removeTx(wtx, false)
|
|
}
|
|
}
|
|
|
|
func (txmp *TxMempool) notifyTxsAvailable() {
|
|
if txmp.Size() == 0 {
|
|
panic("attempt to notify txs available but mempool is empty!")
|
|
}
|
|
|
|
if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable {
|
|
// channel cap is 1, so this will send once
|
|
txmp.notifiedTxsAvailable = true
|
|
|
|
select {
|
|
case txmp.txsAvailable <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|