@ -63,13 +63,26 @@ var (
// ErrTxInCache is returned to the client if we saw tx earlier
// ErrTxInCache is returned to the client if we saw tx earlier
ErrTxInCache = errors . New ( "Tx already exists in cache" )
ErrTxInCache = errors . New ( "Tx already exists in cache" )
// ErrMempoolIsFull means Tendermint & an application can't handle that much load
ErrMempoolIsFull = errors . New ( "Mempool is full" )
// ErrTxTooLarge means the tx is too big to be sent in a message to other peers
// ErrTxTooLarge means the tx is too big to be sent in a message to other peers
ErrTxTooLarge = fmt . Errorf ( "Tx too large. Max size is %d" , maxTxSize )
ErrTxTooLarge = fmt . Errorf ( "Tx too large. Max size is %d" , maxTxSize )
)
)
// ErrMempoolIsFull means Tendermint & an application can't handle that much load
type ErrMempoolIsFull struct {
numTxs int
maxTxs int
txsBytes int64
maxTxsBytes int64
}
func ( e ErrMempoolIsFull ) Error ( ) string {
return fmt . Sprintf (
"Mempool is full: number of txs %d (max: %d), total txs bytes %d (max: %d)" ,
e . numTxs , e . maxTxs ,
e . txsBytes , e . maxTxsBytes )
}
// ErrPreCheck is returned when tx is too big
// ErrPreCheck is returned when tx is too big
type ErrPreCheck struct {
type ErrPreCheck struct {
Reason error
Reason error
@ -147,6 +160,9 @@ type Mempool struct {
preCheck PreCheckFunc
preCheck PreCheckFunc
postCheck PostCheckFunc
postCheck PostCheckFunc
// Atomic integers
txsBytes int64 // see TxsBytes
// Keep a cache of already-seen txs.
// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
// This reduces the pressure on the proxyApp.
cache txCache
cache txCache
@ -265,8 +281,13 @@ func (mem *Mempool) Size() int {
return mem . txs . Len ( )
return mem . txs . Len ( )
}
}
// Flushes the mempool connection to ensure async resCb calls are done e.g.
// from CheckTx.
// TxsBytes returns the total size of all txs in the mempool.
func ( mem * Mempool ) TxsBytes ( ) int64 {
return atomic . LoadInt64 ( & mem . txsBytes )
}
// FlushAppConn flushes the mempool connection to ensure async resCb calls are
// done e.g. from CheckTx.
func ( mem * Mempool ) FlushAppConn ( ) error {
func ( mem * Mempool ) FlushAppConn ( ) error {
return mem . proxyAppConn . FlushSync ( )
return mem . proxyAppConn . FlushSync ( )
}
}
@ -282,6 +303,8 @@ func (mem *Mempool) Flush() {
mem . txs . Remove ( e )
mem . txs . Remove ( e )
e . DetachPrev ( )
e . DetachPrev ( )
}
}
_ = atomic . SwapInt64 ( & mem . txsBytes , 0 )
}
}
// TxsFront returns the first transaction in the ordered list for peer
// TxsFront returns the first transaction in the ordered list for peer
@ -308,8 +331,15 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
// use defer to unlock mutex because application (*local client*) might panic
// use defer to unlock mutex because application (*local client*) might panic
defer mem . proxyMtx . Unlock ( )
defer mem . proxyMtx . Unlock ( )
if mem . Size ( ) >= mem . config . Size {
return ErrMempoolIsFull
var (
memSize = mem . Size ( )
txsBytes = mem . TxsBytes ( )
)
if memSize >= mem . config . Size ||
int64 ( len ( tx ) ) + txsBytes > mem . config . MaxTxsBytes {
return ErrMempoolIsFull {
memSize , mem . config . Size ,
txsBytes , mem . config . MaxTxsBytes }
}
}
// The size of the corresponding amino-encoded TxMessage
// The size of the corresponding amino-encoded TxMessage
@ -383,6 +413,7 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
tx : tx ,
tx : tx ,
}
}
mem . txs . PushBack ( memTx )
mem . txs . PushBack ( memTx )
atomic . AddInt64 ( & mem . txsBytes , int64 ( len ( tx ) ) )
mem . logger . Info ( "Added good transaction" ,
mem . logger . Info ( "Added good transaction" ,
"tx" , TxID ( tx ) ,
"tx" , TxID ( tx ) ,
"res" , r ,
"res" , r ,
@ -424,6 +455,7 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
// Tx became invalidated due to newly committed block.
// Tx became invalidated due to newly committed block.
mem . logger . Info ( "Tx is no longer valid" , "tx" , TxID ( tx ) , "res" , r , "err" , postCheckErr )
mem . logger . Info ( "Tx is no longer valid" , "tx" , TxID ( tx ) , "res" , r , "err" , postCheckErr )
mem . txs . Remove ( mem . recheckCursor )
mem . txs . Remove ( mem . recheckCursor )
atomic . AddInt64 ( & mem . txsBytes , int64 ( - len ( tx ) ) )
mem . recheckCursor . DetachPrev ( )
mem . recheckCursor . DetachPrev ( )
// remove from cache (it might be good later)
// remove from cache (it might be good later)
@ -597,6 +629,7 @@ func (mem *Mempool) removeTxs(txs types.Txs) []types.Tx {
if _ , ok := txsMap [ string ( memTx . tx ) ] ; ok {
if _ , ok := txsMap [ string ( memTx . tx ) ] ; ok {
// remove from clist
// remove from clist
mem . txs . Remove ( e )
mem . txs . Remove ( e )
atomic . AddInt64 ( & mem . txsBytes , int64 ( - len ( memTx . tx ) ) )
e . DetachPrev ( )
e . DetachPrev ( )
// NOTE: we don't remove committed txs from the cache.
// NOTE: we don't remove committed txs from the cache.