@ -1,111 +1,102 @@
package mempool
package mempool
import (
import (
"bytes"
"container/list"
"crypto/sha256"
"fmt"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
abci "github.com/tendermint/tendermint/abci/types"
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
auto "github.com/tendermint/tendermint/libs/autofile"
"github.com/tendermint/tendermint/libs/clist"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/types"
)
)
// PreCheckFunc is an optional filter executed before CheckTx and rejects
// transaction if false is returned. An example would be to ensure that a
// transaction doesn't exceeded the block size.
type PreCheckFunc func ( types . Tx ) error
// Mempool defines the mempool interface.
//
// Updates to the mempool need to be synchronized with committing a block so
// apps can reset their transient state on Commit.
type Mempool interface {
// CheckTx executes a new transaction against the application to determine
// its validity and whether it should be added to the mempool.
CheckTx ( tx types . Tx , callback func ( * abci . Response ) ) error
// PostCheckFunc is an optional filter executed after CheckTx and rejects
// transaction if false is returned. An example would be to ensure a
// transaction doesn't require more gas than available for the block.
type PostCheckFunc func ( types . Tx , * abci . ResponseCheckTx ) error
// CheckTxWithInfo performs the same operation as CheckTx, but with extra
// meta data about the tx.
// Currently this metadata is the peer who sent it, used to prevent the tx
// from being gossiped back to them.
CheckTxWithInfo ( tx types . Tx , callback func ( * abci . Response ) , txInfo TxInfo ) error
// TxInfo are parameters that get passed when attempting to add a tx to the
// mempool.
type TxInfo struct {
// We don't use p2p.ID here because it's too big. The gain is to store max 2
// bytes with each tx to identify the sender rather than 20 bytes.
PeerID uint16
}
// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes
// bytes total with the condition that the total gasWanted must be less than
// maxGas.
// If both maxes are negative, there is no cap on the size of all returned
// transactions (~ all available transactions).
ReapMaxBytesMaxGas ( maxBytes , maxGas int64 ) types . Txs
/ *
// ReapMaxTxs reaps up to max transactions from the mempool.
// If max is negative, there is no cap on the size of all returned
// transactions (~ all available transactions).
ReapMaxTxs ( max int ) types . Txs
The mempool pushes new txs onto the proxyAppConn .
It gets a stream of ( req , res ) tuples from the proxy .
The mempool stores good txs in a concurrent linked - list .
// Lock locks the mempool. The consensus must be able to hold lock to safely update.
Lock ( )
Multiple concurrent go - routines can traverse this linked - list
safely by calling . NextWait ( ) on each element .
// Unlock unlocks the mempool.
Unlock ( )
So we have several go - routines :
1. Consensus calling Update ( ) and Reap ( ) synchronously
2. Many mempool reactor ' s peer routines calling CheckTx ( )
3. Many mempool reactor ' s peer routines traversing the txs linked list
4. Another goroutine calling GarbageCollectTxs ( ) periodically
// Update informs the mempool that the given txs were committed and can be discarded.
// NOTE: this should be called *after* block is committed by consensus.
// NOTE: unsafe; Lock/Unlock must be managed by caller
Update ( blockHeight int64 , blockTxs types . Txs , newPreFn PreCheckFunc , newPostFn PostCheckFunc ) error
To manage these goroutines , there are three methods of locking .
1. Mutations to the linked - list is protected by an internal mtx ( CList is goroutine - safe )
2. Mutations to the linked - list elements are atomic
3. CheckTx ( ) calls can be paused upon Update ( ) and Reap ( ) , protected by . proxyMtx
// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
// done. E.g. from CheckTx.
FlushAppConn ( ) error
Garbage collection of old elements from mempool . txs is handlde via
the DetachPrev ( ) call , which makes old elements not reachable by
peer broadcastTxRoutine ( ) automatically garbage collected .
// Flush removes all transactions from the mempool and cache
Flush ( )
TODO : Better handle abci client errors . ( make it automatically handle connection errors )
// TxsAvailable returns a channel which fires once for every height,
// and only when transactions are available in the mempool.
// NOTE: the returned channel may be nil if EnableTxsAvailable was not called.
TxsAvailable ( ) <- chan struct { }
* /
// EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will
// trigger once every height when transactions are available.
EnableTxsAvailable ( )
var (
// ErrTxInCache is returned to the client if we saw tx earlier
ErrTxInCache = errors . New ( "Tx already exists in cache" )
// Size returns the number of transactions in the mempool.
Size ( ) int
// ErrTxTooLarge means the tx is too big to be sent in a message to other peers
ErrTxTooLarge = fmt . Errorf ( "Tx too large. Max size is %d" , maxTxSize )
)
// TxsBytes returns the total size of all txs in the mempool.
TxsBytes ( ) int64
// ErrMempoolIsFull means Tendermint & an application can't handle that much load
type ErrMempoolIsFull struct {
numTxs int
maxTxs int
// InitWAL creates a directory for the WAL file and opens a file itself.
InitWAL ( )
txsBytes int64
maxTxsBytes int64
// CloseWAL closes and discards the underlying WAL file.
// Any further writes will not be relayed to disk.
CloseWAL ( )
}
}
func ( e ErrMempoolIsFull ) Error ( ) string {
return fmt . Sprintf (
"Mempool is full: number of txs %d (max: %d), total txs bytes %d (max: %d)" ,
e . numTxs , e . maxTxs ,
e . txsBytes , e . maxTxsBytes )
}
//--------------------------------------------------------------------------------
// ErrPreCheck is returned when tx is too big
type ErrPreCheck struct {
Reason error
}
// PreCheckFunc is an optional filter executed before CheckTx and rejects
// transaction if false is returned. An example would be to ensure that a
// transaction doesn't exceeded the block size.
type PreCheckFunc func ( types . Tx ) error
func ( e ErrPreCheck ) Error ( ) string {
return e . Reason . Error ( )
}
// PostCheckFunc is an optional filter executed after CheckTx and rejects
// transaction if false is returned. An example would be to ensure a
// transaction doesn't require more gas than available for the block.
type PostCheckFunc func ( types . Tx , * abci . ResponseCheckTx ) error
// IsPreCheckError returns true if err is due to pre check failure.
func IsPreCheckError ( err error ) bool {
_ , ok := err . ( ErrPreCheck )
return ok
// TxInfo are parameters that get passed when attempting to add a tx to the
// mempool.
type TxInfo struct {
// We don't use p2p.ID here because it's too big. The gain is to store max 2
// bytes with each tx to identify the sender rather than 20 bytes.
SenderID uint16
}
}
//--------------------------------------------------------------------------------
// PreCheckAminoMaxBytes checks that the size of the transaction plus the amino
// PreCheckAminoMaxBytes checks that the size of the transaction plus the amino
// overhead is smaller or equal to the expected maxBytes.
// overhead is smaller or equal to the expected maxBytes.
func PreCheckAminoMaxBytes ( maxBytes int64 ) PreCheckFunc {
func PreCheckAminoMaxBytes ( maxBytes int64 ) PreCheckFunc {
@ -143,718 +134,3 @@ func PostCheckMaxGas(maxGas int64) PostCheckFunc {
return nil
return nil
}
}
}
}
// TxID is the hex encoded hash of the bytes as a types.Tx.
func TxID ( tx [ ] byte ) string {
return fmt . Sprintf ( "%X" , types . Tx ( tx ) . Hash ( ) )
}
// txKey is the fixed length array sha256 hash used as the key in maps.
func txKey ( tx types . Tx ) [ sha256 . Size ] byte {
return sha256 . Sum256 ( tx )
}
// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus
// round. Transaction validity is checked using the CheckTx abci message before the transaction is
// added to the pool. The Mempool uses a concurrent list structure for storing transactions that
// can be efficiently accessed by multiple concurrent readers.
type Mempool struct {
config * cfg . MempoolConfig
proxyMtx sync . Mutex
proxyAppConn proxy . AppConnMempool
txs * clist . CList // concurrent linked-list of good txs
preCheck PreCheckFunc
postCheck PostCheckFunc
// Track whether we're rechecking txs.
// These are not protected by a mutex and are expected to be mutated
// in serial (ie. by abci responses which are called in serial).
recheckCursor * clist . CElement // next expected response
recheckEnd * clist . CElement // re-checking stops here
// notify listeners (ie. consensus) when txs are available
notifiedTxsAvailable bool
txsAvailable chan struct { } // fires once for each height, when the mempool is not empty
// Map for quick access to txs to record sender in CheckTx.
// txsMap: txKey -> CElement
txsMap sync . Map
// Atomic integers
height int64 // the last block Update()'d to
rechecking int32 // for re-checking filtered txs on Update()
txsBytes int64 // total size of mempool, in bytes
// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
cache txCache
// A log of mempool txs
wal * auto . AutoFile
logger log . Logger
metrics * Metrics
}
// MempoolOption sets an optional parameter on the Mempool.
type MempoolOption func ( * Mempool )
// NewMempool returns a new Mempool with the given configuration and connection to an application.
func NewMempool (
config * cfg . MempoolConfig ,
proxyAppConn proxy . AppConnMempool ,
height int64 ,
options ... MempoolOption ,
) * Mempool {
mempool := & Mempool {
config : config ,
proxyAppConn : proxyAppConn ,
txs : clist . New ( ) ,
height : height ,
rechecking : 0 ,
recheckCursor : nil ,
recheckEnd : nil ,
logger : log . NewNopLogger ( ) ,
metrics : NopMetrics ( ) ,
}
if config . CacheSize > 0 {
mempool . cache = newMapTxCache ( config . CacheSize )
} else {
mempool . cache = nopTxCache { }
}
proxyAppConn . SetResponseCallback ( mempool . globalCb )
for _ , option := range options {
option ( mempool )
}
return mempool
}
// EnableTxsAvailable initializes the TxsAvailable channel,
// ensuring it will trigger once every height when transactions are available.
// NOTE: not thread safe - should only be called once, on startup
func ( mem * Mempool ) EnableTxsAvailable ( ) {
mem . txsAvailable = make ( chan struct { } , 1 )
}
// SetLogger sets the Logger.
func ( mem * Mempool ) SetLogger ( l log . Logger ) {
mem . logger = l
}
// WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns
// false. This is ran before CheckTx.
func WithPreCheck ( f PreCheckFunc ) MempoolOption {
return func ( mem * Mempool ) { mem . preCheck = f }
}
// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns
// false. This is ran after CheckTx.
func WithPostCheck ( f PostCheckFunc ) MempoolOption {
return func ( mem * Mempool ) { mem . postCheck = f }
}
// WithMetrics sets the metrics.
func WithMetrics ( metrics * Metrics ) MempoolOption {
return func ( mem * Mempool ) { mem . metrics = metrics }
}
// InitWAL creates a directory for the WAL file and opens a file itself.
//
// *panics* if can't create directory or open file.
// *not thread safe*
func ( mem * Mempool ) InitWAL ( ) {
walDir := mem . config . WalDir ( )
err := cmn . EnsureDir ( walDir , 0700 )
if err != nil {
panic ( errors . Wrap ( err , "Error ensuring Mempool WAL dir" ) )
}
af , err := auto . OpenAutoFile ( walDir + "/wal" )
if err != nil {
panic ( errors . Wrap ( err , "Error opening Mempool WAL file" ) )
}
mem . wal = af
}
// CloseWAL closes and discards the underlying WAL file.
// Any further writes will not be relayed to disk.
func ( mem * Mempool ) CloseWAL ( ) {
mem . proxyMtx . Lock ( )
defer mem . proxyMtx . Unlock ( )
if err := mem . wal . Close ( ) ; err != nil {
mem . logger . Error ( "Error closing WAL" , "err" , err )
}
mem . wal = nil
}
// Lock locks the mempool. The consensus must be able to hold lock to safely update.
func ( mem * Mempool ) Lock ( ) {
mem . proxyMtx . Lock ( )
}
// Unlock unlocks the mempool.
func ( mem * Mempool ) Unlock ( ) {
mem . proxyMtx . Unlock ( )
}
// Size returns the number of transactions in the mempool.
func ( mem * Mempool ) Size ( ) int {
return mem . txs . Len ( )
}
// TxsBytes returns the total size of all txs in the mempool.
func ( mem * Mempool ) TxsBytes ( ) int64 {
return atomic . LoadInt64 ( & mem . txsBytes )
}
// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
// done. E.g. from CheckTx.
func ( mem * Mempool ) FlushAppConn ( ) error {
return mem . proxyAppConn . FlushSync ( )
}
// Flush removes all transactions from the mempool and cache
func ( mem * Mempool ) Flush ( ) {
mem . proxyMtx . Lock ( )
defer mem . proxyMtx . Unlock ( )
mem . cache . Reset ( )
for e := mem . txs . Front ( ) ; e != nil ; e = e . Next ( ) {
mem . txs . Remove ( e )
e . DetachPrev ( )
}
mem . txsMap = sync . Map { }
_ = atomic . SwapInt64 ( & mem . txsBytes , 0 )
}
// TxsFront returns the first transaction in the ordered list for peer
// goroutines to call .NextWait() on.
func ( mem * Mempool ) TxsFront ( ) * clist . CElement {
return mem . txs . Front ( )
}
// TxsWaitChan returns a channel to wait on transactions. It will be closed
// once the mempool is not empty (ie. the internal `mem.txs` has at least one
// element)
func ( mem * Mempool ) TxsWaitChan ( ) <- chan struct { } {
return mem . txs . WaitChan ( )
}
// CheckTx executes a new transaction against the application to determine its validity
// and whether it should be added to the mempool.
// It blocks if we're waiting on Update() or Reap().
// cb: A callback from the CheckTx command.
// It gets called from another goroutine.
// CONTRACT: Either cb will get called, or err returned.
func ( mem * Mempool ) CheckTx ( tx types . Tx , cb func ( * abci . Response ) ) ( err error ) {
return mem . CheckTxWithInfo ( tx , cb , TxInfo { PeerID : UnknownPeerID } )
}
// CheckTxWithInfo performs the same operation as CheckTx, but with extra meta data about the tx.
// Currently this metadata is the peer who sent it,
// used to prevent the tx from being gossiped back to them.
func ( mem * Mempool ) CheckTxWithInfo ( tx types . Tx , cb func ( * abci . Response ) , txInfo TxInfo ) ( err error ) {
mem . proxyMtx . Lock ( )
// use defer to unlock mutex because application (*local client*) might panic
defer mem . proxyMtx . Unlock ( )
var (
memSize = mem . Size ( )
txsBytes = mem . TxsBytes ( )
)
if memSize >= mem . config . Size ||
int64 ( len ( tx ) ) + txsBytes > mem . config . MaxTxsBytes {
return ErrMempoolIsFull {
memSize , mem . config . Size ,
txsBytes , mem . config . MaxTxsBytes }
}
// The size of the corresponding amino-encoded TxMessage
// can't be larger than the maxMsgSize, otherwise we can't
// relay it to peers.
if len ( tx ) > maxTxSize {
return ErrTxTooLarge
}
if mem . preCheck != nil {
if err := mem . preCheck ( tx ) ; err != nil {
return ErrPreCheck { err }
}
}
// CACHE
if ! mem . cache . Push ( tx ) {
// Record a new sender for a tx we've already seen.
// Note it's possible a tx is still in the cache but no longer in the mempool
// (eg. after committing a block, txs are removed from mempool but not cache),
// so we only record the sender for txs still in the mempool.
if e , ok := mem . txsMap . Load ( txKey ( tx ) ) ; ok {
memTx := e . ( * clist . CElement ) . Value . ( * mempoolTx )
if _ , loaded := memTx . senders . LoadOrStore ( txInfo . PeerID , true ) ; loaded {
// TODO: consider punishing peer for dups,
// its non-trivial since invalid txs can become valid,
// but they can spam the same tx with little cost to them atm.
}
}
return ErrTxInCache
}
// END CACHE
// WAL
if mem . wal != nil {
// TODO: Notify administrators when WAL fails
_ , err := mem . wal . Write ( [ ] byte ( tx ) )
if err != nil {
mem . logger . Error ( "Error writing to WAL" , "err" , err )
}
_ , err = mem . wal . Write ( [ ] byte ( "\n" ) )
if err != nil {
mem . logger . Error ( "Error writing to WAL" , "err" , err )
}
}
// END WAL
// NOTE: proxyAppConn may error if tx buffer is full
if err = mem . proxyAppConn . Error ( ) ; err != nil {
return err
}
reqRes := mem . proxyAppConn . CheckTxAsync ( tx )
reqRes . SetCallback ( mem . reqResCb ( tx , txInfo . PeerID , cb ) )
return nil
}
// Global callback that will be called after every ABCI response.
// Having a single global callback avoids needing to set a callback for each request.
// However, processing the checkTx response requires the peerID (so we can track which txs we heard from who),
// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that
// include this information. If we're not in the midst of a recheck, this function will just return,
// so the request specific callback can do the work.
// When rechecking, we don't need the peerID, so the recheck callback happens here.
func ( mem * Mempool ) globalCb ( req * abci . Request , res * abci . Response ) {
if mem . recheckCursor == nil {
return
}
mem . metrics . RecheckTimes . Add ( 1 )
mem . resCbRecheck ( req , res )
// update metrics
mem . metrics . Size . Set ( float64 ( mem . Size ( ) ) )
}
// Request specific callback that should be set on individual reqRes objects
// to incorporate local information when processing the response.
// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them.
// NOTE: alternatively, we could include this information in the ABCI request itself.
//
// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called
// when all other response processing is complete.
//
// Used in CheckTxWithInfo to record PeerID who sent us the tx.
func ( mem * Mempool ) reqResCb ( tx [ ] byte , peerID uint16 , externalCb func ( * abci . Response ) ) func ( res * abci . Response ) {
return func ( res * abci . Response ) {
if mem . recheckCursor != nil {
// this should never happen
panic ( "recheck cursor is not nil in reqResCb" )
}
mem . resCbFirstTime ( tx , peerID , res )
// update metrics
mem . metrics . Size . Set ( float64 ( mem . Size ( ) ) )
// passed in by the caller of CheckTx, eg. the RPC
if externalCb != nil {
externalCb ( res )
}
}
}
// Called from:
// - resCbFirstTime (lock not held) if tx is valid
func ( mem * Mempool ) addTx ( memTx * mempoolTx ) {
e := mem . txs . PushBack ( memTx )
mem . txsMap . Store ( txKey ( memTx . tx ) , e )
atomic . AddInt64 ( & mem . txsBytes , int64 ( len ( memTx . tx ) ) )
mem . metrics . TxSizeBytes . Observe ( float64 ( len ( memTx . tx ) ) )
}
// Called from:
// - Update (lock held) if tx was committed
// - resCbRecheck (lock not held) if tx was invalidated
func ( mem * Mempool ) removeTx ( tx types . Tx , elem * clist . CElement , removeFromCache bool ) {
mem . txs . Remove ( elem )
elem . DetachPrev ( )
mem . txsMap . Delete ( txKey ( tx ) )
atomic . AddInt64 ( & mem . txsBytes , int64 ( - len ( tx ) ) )
if removeFromCache {
mem . cache . Remove ( tx )
}
}
// callback, which is called after the app checked the tx for the first time.
//
// The case where the app checks the tx for the second and subsequent times is
// handled by the resCbRecheck callback.
func ( mem * Mempool ) resCbFirstTime ( tx [ ] byte , peerID uint16 , res * abci . Response ) {
switch r := res . Value . ( type ) {
case * abci . Response_CheckTx :
var postCheckErr error
if mem . postCheck != nil {
postCheckErr = mem . postCheck ( tx , r . CheckTx )
}
if ( r . CheckTx . Code == abci . CodeTypeOK ) && postCheckErr == nil {
memTx := & mempoolTx {
height : mem . height ,
gasWanted : r . CheckTx . GasWanted ,
tx : tx ,
}
memTx . senders . Store ( peerID , true )
mem . addTx ( memTx )
mem . logger . Info ( "Added good transaction" ,
"tx" , TxID ( tx ) ,
"res" , r ,
"height" , memTx . height ,
"total" , mem . Size ( ) ,
)
mem . notifyTxsAvailable ( )
} else {
// ignore bad transaction
mem . logger . Info ( "Rejected bad transaction" , "tx" , TxID ( tx ) , "res" , r , "err" , postCheckErr )
mem . metrics . FailedTxs . Add ( 1 )
// remove from cache (it might be good later)
mem . cache . Remove ( tx )
}
default :
// ignore other messages
}
}
// callback, which is called after the app rechecked the tx.
//
// The case where the app checks the tx for the first time is handled by the
// resCbFirstTime callback.
func ( mem * Mempool ) resCbRecheck ( req * abci . Request , res * abci . Response ) {
switch r := res . Value . ( type ) {
case * abci . Response_CheckTx :
tx := req . GetCheckTx ( ) . Tx
memTx := mem . recheckCursor . Value . ( * mempoolTx )
if ! bytes . Equal ( tx , memTx . tx ) {
panic ( fmt . Sprintf (
"Unexpected tx response from proxy during recheck\nExpected %X, got %X" ,
memTx . tx ,
tx ) )
}
var postCheckErr error
if mem . postCheck != nil {
postCheckErr = mem . postCheck ( tx , r . CheckTx )
}
if ( r . CheckTx . Code == abci . CodeTypeOK ) && postCheckErr == nil {
// Good, nothing to do.
} else {
// Tx became invalidated due to newly committed block.
mem . logger . Info ( "Tx is no longer valid" , "tx" , TxID ( tx ) , "res" , r , "err" , postCheckErr )
// NOTE: we remove tx from the cache because it might be good later
mem . removeTx ( tx , mem . recheckCursor , true )
}
if mem . recheckCursor == mem . recheckEnd {
mem . recheckCursor = nil
} else {
mem . recheckCursor = mem . recheckCursor . Next ( )
}
if mem . recheckCursor == nil {
// Done!
atomic . StoreInt32 ( & mem . rechecking , 0 )
mem . logger . Info ( "Done rechecking txs" )
// incase the recheck removed all txs
if mem . Size ( ) > 0 {
mem . notifyTxsAvailable ( )
}
}
default :
// ignore other messages
}
}
// TxsAvailable returns a channel which fires once for every height,
// and only when transactions are available in the mempool.
// NOTE: the returned channel may be nil if EnableTxsAvailable was not called.
func ( mem * Mempool ) TxsAvailable ( ) <- chan struct { } {
return mem . txsAvailable
}
func ( mem * Mempool ) notifyTxsAvailable ( ) {
if mem . Size ( ) == 0 {
panic ( "notified txs available but mempool is empty!" )
}
if mem . txsAvailable != nil && ! mem . notifiedTxsAvailable {
// channel cap is 1, so this will send once
mem . notifiedTxsAvailable = true
select {
case mem . txsAvailable <- struct { } { } :
default :
}
}
}
// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes bytes total
// with the condition that the total gasWanted must be less than maxGas.
// If both maxes are negative, there is no cap on the size of all returned
// transactions (~ all available transactions).
func ( mem * Mempool ) ReapMaxBytesMaxGas ( maxBytes , maxGas int64 ) types . Txs {
mem . proxyMtx . Lock ( )
defer mem . proxyMtx . Unlock ( )
for atomic . LoadInt32 ( & mem . rechecking ) > 0 {
// TODO: Something better?
time . Sleep ( time . Millisecond * 10 )
}
var totalBytes int64
var totalGas int64
// TODO: we will get a performance boost if we have a good estimate of avg
// size per tx, and set the initial capacity based off of that.
// txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max/mem.avgTxSize))
txs := make ( [ ] types . Tx , 0 , mem . txs . Len ( ) )
for e := mem . txs . Front ( ) ; e != nil ; e = e . Next ( ) {
memTx := e . Value . ( * mempoolTx )
// Check total size requirement
aminoOverhead := types . ComputeAminoOverhead ( memTx . tx , 1 )
if maxBytes > - 1 && totalBytes + int64 ( len ( memTx . tx ) ) + aminoOverhead > maxBytes {
return txs
}
totalBytes += int64 ( len ( memTx . tx ) ) + aminoOverhead
// Check total gas requirement.
// If maxGas is negative, skip this check.
// Since newTotalGas < masGas, which
// must be non-negative, it follows that this won't overflow.
newTotalGas := totalGas + memTx . gasWanted
if maxGas > - 1 && newTotalGas > maxGas {
return txs
}
totalGas = newTotalGas
txs = append ( txs , memTx . tx )
}
return txs
}
// ReapMaxTxs reaps up to max transactions from the mempool.
// If max is negative, there is no cap on the size of all returned
// transactions (~ all available transactions).
func ( mem * Mempool ) ReapMaxTxs ( max int ) types . Txs {
mem . proxyMtx . Lock ( )
defer mem . proxyMtx . Unlock ( )
if max < 0 {
max = mem . txs . Len ( )
}
for atomic . LoadInt32 ( & mem . rechecking ) > 0 {
// TODO: Something better?
time . Sleep ( time . Millisecond * 10 )
}
txs := make ( [ ] types . Tx , 0 , cmn . MinInt ( mem . txs . Len ( ) , max ) )
for e := mem . txs . Front ( ) ; e != nil && len ( txs ) <= max ; e = e . Next ( ) {
memTx := e . Value . ( * mempoolTx )
txs = append ( txs , memTx . tx )
}
return txs
}
// Update informs the mempool that the given txs were committed and can be discarded.
// NOTE: this should be called *after* block is committed by consensus.
// NOTE: unsafe; Lock/Unlock must be managed by caller
func ( mem * Mempool ) Update (
height int64 ,
txs types . Txs ,
preCheck PreCheckFunc ,
postCheck PostCheckFunc ,
) error {
// Set height
mem . height = height
mem . notifiedTxsAvailable = false
if preCheck != nil {
mem . preCheck = preCheck
}
if postCheck != nil {
mem . postCheck = postCheck
}
// Add committed transactions to cache (if missing).
for _ , tx := range txs {
_ = mem . cache . Push ( tx )
}
// Remove committed transactions.
txsLeft := mem . removeTxs ( txs )
// Either recheck non-committed txs to see if they became invalid
// or just notify there're some txs left.
if len ( txsLeft ) > 0 {
if mem . config . Recheck {
mem . logger . Info ( "Recheck txs" , "numtxs" , len ( txsLeft ) , "height" , height )
mem . recheckTxs ( txsLeft )
// At this point, mem.txs are being rechecked.
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
} else {
mem . notifyTxsAvailable ( )
}
}
// Update metrics
mem . metrics . Size . Set ( float64 ( mem . Size ( ) ) )
return nil
}
func ( mem * Mempool ) removeTxs ( txs types . Txs ) [ ] types . Tx {
// Build a map for faster lookups.
txsMap := make ( map [ string ] struct { } , len ( txs ) )
for _ , tx := range txs {
txsMap [ string ( tx ) ] = struct { } { }
}
txsLeft := make ( [ ] types . Tx , 0 , mem . txs . Len ( ) )
for e := mem . txs . Front ( ) ; e != nil ; e = e . Next ( ) {
memTx := e . Value . ( * mempoolTx )
// Remove the tx if it's already in a block.
if _ , ok := txsMap [ string ( memTx . tx ) ] ; ok {
// NOTE: we don't remove committed txs from the cache.
mem . removeTx ( memTx . tx , e , false )
continue
}
txsLeft = append ( txsLeft , memTx . tx )
}
return txsLeft
}
// NOTE: pass in txs because mem.txs can mutate concurrently.
func ( mem * Mempool ) recheckTxs ( txs [ ] types . Tx ) {
if len ( txs ) == 0 {
return
}
atomic . StoreInt32 ( & mem . rechecking , 1 )
mem . recheckCursor = mem . txs . Front ( )
mem . recheckEnd = mem . txs . Back ( )
// Push txs to proxyAppConn
// NOTE: globalCb may be called concurrently.
for _ , tx := range txs {
mem . proxyAppConn . CheckTxAsync ( tx )
}
mem . proxyAppConn . FlushAsync ( )
}
//--------------------------------------------------------------------------------
// mempoolTx is a transaction that successfully ran
type mempoolTx struct {
height int64 // height that this tx had been validated in
gasWanted int64 // amount of gas this tx states it will require
tx types . Tx //
// ids of peers who've sent us this tx (as a map for quick lookups).
// senders: PeerID -> bool
senders sync . Map
}
// Height returns the height for this transaction
func ( memTx * mempoolTx ) Height ( ) int64 {
return atomic . LoadInt64 ( & memTx . height )
}
//--------------------------------------------------------------------------------
type txCache interface {
Reset ( )
Push ( tx types . Tx ) bool
Remove ( tx types . Tx )
}
// mapTxCache maintains a LRU cache of transactions. This only stores the hash
// of the tx, due to memory concerns.
type mapTxCache struct {
mtx sync . Mutex
size int
map_ map [ [ sha256 . Size ] byte ] * list . Element
list * list . List
}
var _ txCache = ( * mapTxCache ) ( nil )
// newMapTxCache returns a new mapTxCache.
func newMapTxCache ( cacheSize int ) * mapTxCache {
return & mapTxCache {
size : cacheSize ,
map_ : make ( map [ [ sha256 . Size ] byte ] * list . Element , cacheSize ) ,
list : list . New ( ) ,
}
}
// Reset resets the cache to an empty state.
func ( cache * mapTxCache ) Reset ( ) {
cache . mtx . Lock ( )
cache . map_ = make ( map [ [ sha256 . Size ] byte ] * list . Element , cache . size )
cache . list . Init ( )
cache . mtx . Unlock ( )
}
// Push adds the given tx to the cache and returns true. It returns
// false if tx is already in the cache.
func ( cache * mapTxCache ) Push ( tx types . Tx ) bool {
cache . mtx . Lock ( )
defer cache . mtx . Unlock ( )
// Use the tx hash in the cache
txHash := txKey ( tx )
if moved , exists := cache . map_ [ txHash ] ; exists {
cache . list . MoveToBack ( moved )
return false
}
if cache . list . Len ( ) >= cache . size {
popped := cache . list . Front ( )
poppedTxHash := popped . Value . ( [ sha256 . Size ] byte )
delete ( cache . map_ , poppedTxHash )
if popped != nil {
cache . list . Remove ( popped )
}
}
e := cache . list . PushBack ( txHash )
cache . map_ [ txHash ] = e
return true
}
// Remove removes the given tx from the cache.
func ( cache * mapTxCache ) Remove ( tx types . Tx ) {
cache . mtx . Lock ( )
txHash := txKey ( tx )
popped := cache . map_ [ txHash ]
delete ( cache . map_ , txHash )
if popped != nil {
cache . list . Remove ( popped )
}
cache . mtx . Unlock ( )
}
type nopTxCache struct { }
var _ txCache = ( * nopTxCache ) ( nil )
func ( nopTxCache ) Reset ( ) { }
func ( nopTxCache ) Push ( types . Tx ) bool { return true }
func ( nopTxCache ) Remove ( types . Tx ) { }