|
@ -22,6 +22,16 @@ import ( |
|
|
"github.com/tendermint/tendermint/types" |
|
|
"github.com/tendermint/tendermint/types" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
// PreCheckFunc is an optional filter executed before CheckTx and rejects
|
|
|
|
|
|
// transaction if false is returned. An example would be to ensure that a
|
|
|
|
|
|
// transaction doesn't exceeded the block size.
|
|
|
|
|
|
type PreCheckFunc func(types.Tx) bool |
|
|
|
|
|
|
|
|
|
|
|
// PostCheckFunc is an optional filter executed after CheckTx and rejects
|
|
|
|
|
|
// transaction if false is returned. An example would be to ensure a
|
|
|
|
|
|
// transaction doesn't require more gas than available for the block.
|
|
|
|
|
|
type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) bool |
|
|
|
|
|
|
|
|
/* |
|
|
/* |
|
|
|
|
|
|
|
|
The mempool pushes new txs onto the proxyAppConn. |
|
|
The mempool pushes new txs onto the proxyAppConn. |
|
@ -58,6 +68,27 @@ var ( |
|
|
ErrMempoolIsFull = errors.New("Mempool is full") |
|
|
ErrMempoolIsFull = errors.New("Mempool is full") |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
// PreCheckAminoMaxBytes checks that the size of the transaction plus the amino
|
|
|
|
|
|
// overhead is smaller or equal to the expected maxBytes.
|
|
|
|
|
|
func PreCheckAminoMaxBytes(maxBytes int64) PreCheckFunc { |
|
|
|
|
|
return func(tx types.Tx) bool { |
|
|
|
|
|
// We have to account for the amino overhead in the tx size as well
|
|
|
|
|
|
aminoOverhead := amino.UvarintSize(uint64(len(tx))) |
|
|
|
|
|
return int64(len(tx)+aminoOverhead) <= maxBytes |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed
|
|
|
|
|
|
// maxGas. Returns true if maxGas is -1.
|
|
|
|
|
|
func PostCheckMaxGas(maxGas int64) PostCheckFunc { |
|
|
|
|
|
return func(tx types.Tx, res *abci.ResponseCheckTx) bool { |
|
|
|
|
|
if maxGas == -1 { |
|
|
|
|
|
return true |
|
|
|
|
|
} |
|
|
|
|
|
return res.GasWanted <= maxGas |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// TxID is the hex encoded hash of the bytes as a types.Tx.
|
|
|
// TxID is the hex encoded hash of the bytes as a types.Tx.
|
|
|
func TxID(tx []byte) string { |
|
|
func TxID(tx []byte) string { |
|
|
return fmt.Sprintf("%X", types.Tx(tx).Hash()) |
|
|
return fmt.Sprintf("%X", types.Tx(tx).Hash()) |
|
@ -80,8 +111,8 @@ type Mempool struct { |
|
|
recheckEnd *clist.CElement // re-checking stops here
|
|
|
recheckEnd *clist.CElement // re-checking stops here
|
|
|
notifiedTxsAvailable bool |
|
|
notifiedTxsAvailable bool |
|
|
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
|
|
|
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
|
|
|
// Filter mempool to only accept txs for which filter(tx) returns true.
|
|
|
|
|
|
filter func(types.Tx) bool |
|
|
|
|
|
|
|
|
preCheck PreCheckFunc |
|
|
|
|
|
postCheck PostCheckFunc |
|
|
|
|
|
|
|
|
// Keep a cache of already-seen txs.
|
|
|
// Keep a cache of already-seen txs.
|
|
|
// This reduces the pressure on the proxyApp.
|
|
|
// This reduces the pressure on the proxyApp.
|
|
@ -141,10 +172,16 @@ func (mem *Mempool) SetLogger(l log.Logger) { |
|
|
mem.logger = l |
|
|
mem.logger = l |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// WithFilter sets a filter for mempool to only accept txs for which f(tx)
|
|
|
|
|
|
// returns true.
|
|
|
|
|
|
func WithFilter(f func(types.Tx) bool) MempoolOption { |
|
|
|
|
|
return func(mem *Mempool) { mem.filter = f } |
|
|
|
|
|
|
|
|
// WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns
|
|
|
|
|
|
// false. This is ran before CheckTx.
|
|
|
|
|
|
func WithPreCheck(f PreCheckFunc) MempoolOption { |
|
|
|
|
|
return func(mem *Mempool) { mem.preCheck = f } |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns
|
|
|
|
|
|
// false. This is ran after CheckTx.
|
|
|
|
|
|
func WithPostCheck(f PostCheckFunc) MempoolOption { |
|
|
|
|
|
return func(mem *Mempool) { mem.postCheck = f } |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// WithMetrics sets the metrics.
|
|
|
// WithMetrics sets the metrics.
|
|
@ -248,7 +285,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { |
|
|
return ErrMempoolIsFull |
|
|
return ErrMempoolIsFull |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if mem.filter != nil && !mem.filter(tx) { |
|
|
|
|
|
|
|
|
if mem.preCheck != nil && !mem.preCheck(tx) { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -298,7 +335,8 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { |
|
|
switch r := res.Value.(type) { |
|
|
switch r := res.Value.(type) { |
|
|
case *abci.Response_CheckTx: |
|
|
case *abci.Response_CheckTx: |
|
|
tx := req.GetCheckTx().Tx |
|
|
tx := req.GetCheckTx().Tx |
|
|
if r.CheckTx.Code == abci.CodeTypeOK { |
|
|
|
|
|
|
|
|
if (r.CheckTx.Code == abci.CodeTypeOK) && |
|
|
|
|
|
mem.isPostCheckPass(tx, r.CheckTx) { |
|
|
mem.counter++ |
|
|
mem.counter++ |
|
|
memTx := &mempoolTx{ |
|
|
memTx := &mempoolTx{ |
|
|
counter: mem.counter, |
|
|
counter: mem.counter, |
|
@ -326,10 +364,15 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { |
|
|
case *abci.Response_CheckTx: |
|
|
case *abci.Response_CheckTx: |
|
|
memTx := mem.recheckCursor.Value.(*mempoolTx) |
|
|
memTx := mem.recheckCursor.Value.(*mempoolTx) |
|
|
if !bytes.Equal(req.GetCheckTx().Tx, memTx.tx) { |
|
|
if !bytes.Equal(req.GetCheckTx().Tx, memTx.tx) { |
|
|
cmn.PanicSanity(fmt.Sprintf("Unexpected tx response from proxy during recheck\n"+ |
|
|
|
|
|
"Expected %X, got %X", r.CheckTx.Data, memTx.tx)) |
|
|
|
|
|
|
|
|
cmn.PanicSanity( |
|
|
|
|
|
fmt.Sprintf( |
|
|
|
|
|
"Unexpected tx response from proxy during recheck\nExpected %X, got %X", |
|
|
|
|
|
r.CheckTx.Data, |
|
|
|
|
|
memTx.tx, |
|
|
|
|
|
), |
|
|
|
|
|
) |
|
|
} |
|
|
} |
|
|
if r.CheckTx.Code == abci.CodeTypeOK { |
|
|
|
|
|
|
|
|
if (r.CheckTx.Code == abci.CodeTypeOK) && mem.isPostCheckPass(memTx.tx, r.CheckTx) { |
|
|
// Good, nothing to do.
|
|
|
// Good, nothing to do.
|
|
|
} else { |
|
|
} else { |
|
|
// Tx became invalidated due to newly committed block.
|
|
|
// Tx became invalidated due to newly committed block.
|
|
@ -444,7 +487,12 @@ func (mem *Mempool) ReapMaxTxs(max int) types.Txs { |
|
|
// Update informs the mempool that the given txs were committed and can be discarded.
|
|
|
// Update informs the mempool that the given txs were committed and can be discarded.
|
|
|
// NOTE: this should be called *after* block is committed by consensus.
|
|
|
// NOTE: this should be called *after* block is committed by consensus.
|
|
|
// NOTE: unsafe; Lock/Unlock must be managed by caller
|
|
|
// NOTE: unsafe; Lock/Unlock must be managed by caller
|
|
|
func (mem *Mempool) Update(height int64, txs types.Txs, filter func(types.Tx) bool) error { |
|
|
|
|
|
|
|
|
func (mem *Mempool) Update( |
|
|
|
|
|
height int64, |
|
|
|
|
|
txs types.Txs, |
|
|
|
|
|
preCheck PreCheckFunc, |
|
|
|
|
|
postCheck PostCheckFunc, |
|
|
|
|
|
) error { |
|
|
// First, create a lookup map of txns in new txs.
|
|
|
// First, create a lookup map of txns in new txs.
|
|
|
txsMap := make(map[string]struct{}, len(txs)) |
|
|
txsMap := make(map[string]struct{}, len(txs)) |
|
|
for _, tx := range txs { |
|
|
for _, tx := range txs { |
|
@ -455,8 +503,11 @@ func (mem *Mempool) Update(height int64, txs types.Txs, filter func(types.Tx) bo |
|
|
mem.height = height |
|
|
mem.height = height |
|
|
mem.notifiedTxsAvailable = false |
|
|
mem.notifiedTxsAvailable = false |
|
|
|
|
|
|
|
|
if filter != nil { |
|
|
|
|
|
mem.filter = filter |
|
|
|
|
|
|
|
|
if preCheck != nil { |
|
|
|
|
|
mem.preCheck = preCheck |
|
|
|
|
|
} |
|
|
|
|
|
if postCheck != nil { |
|
|
|
|
|
mem.postCheck = postCheck |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Remove transactions that are already in txs.
|
|
|
// Remove transactions that are already in txs.
|
|
@ -514,6 +565,10 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) { |
|
|
mem.proxyAppConn.FlushAsync() |
|
|
mem.proxyAppConn.FlushAsync() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (mem *Mempool) isPostCheckPass(tx types.Tx, r *abci.ResponseCheckTx) bool { |
|
|
|
|
|
return mem.postCheck == nil || mem.postCheck(tx, r) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
//--------------------------------------------------------------------------------
|
|
|
//--------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
// mempoolTx is a transaction that successfully ran
|
|
|
// mempoolTx is a transaction that successfully ran
|
|
|