@ -0,0 +1,107 @@ | |||
package mempool | |||
import ( | |||
"container/list" | |||
tmsync "github.com/tendermint/tendermint/internal/libs/sync" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// TxCache defines an interface for raw transaction caching in a mempool. | |||
// Currently, a TxCache does not allow direct reading or getting of transaction | |||
// values. A TxCache is used primarily to push transactions and removing | |||
// transactions. Pushing via Push returns a boolean telling the caller if the | |||
// transaction already exists in the cache or not. | |||
type TxCache interface { | |||
// Reset resets the cache to an empty state. | |||
Reset() | |||
// Push adds the given raw transaction to the cache and returns true if it was | |||
// newly added. Otherwise, it returns false. | |||
Push(tx types.Tx) bool | |||
// Remove removes the given raw transaction from the cache. | |||
Remove(tx types.Tx) | |||
} | |||
var _ TxCache = (*LRUTxCache)(nil) | |||
// LRUTxCache maintains a thread-safe LRU cache of raw transactions. The cache | |||
// only stores the hash of the raw transaction. | |||
type LRUTxCache struct { | |||
mtx tmsync.Mutex | |||
size int | |||
cacheMap map[[TxKeySize]byte]*list.Element | |||
list *list.List | |||
} | |||
func NewLRUTxCache(cacheSize int) *LRUTxCache { | |||
return &LRUTxCache{ | |||
size: cacheSize, | |||
cacheMap: make(map[[TxKeySize]byte]*list.Element, cacheSize), | |||
list: list.New(), | |||
} | |||
} | |||
// GetList returns the underlying linked-list that backs the LRU cache. Note, | |||
// this should be used for testing purposes only! | |||
func (c *LRUTxCache) GetList() *list.List { | |||
return c.list | |||
} | |||
func (c *LRUTxCache) Reset() { | |||
c.mtx.Lock() | |||
defer c.mtx.Unlock() | |||
c.cacheMap = make(map[[TxKeySize]byte]*list.Element, c.size) | |||
c.list.Init() | |||
} | |||
func (c *LRUTxCache) Push(tx types.Tx) bool { | |||
c.mtx.Lock() | |||
defer c.mtx.Unlock() | |||
key := TxKey(tx) | |||
moved, ok := c.cacheMap[key] | |||
if ok { | |||
c.list.MoveToBack(moved) | |||
return false | |||
} | |||
if c.list.Len() >= c.size { | |||
front := c.list.Front() | |||
if front != nil { | |||
frontKey := front.Value.([TxKeySize]byte) | |||
delete(c.cacheMap, frontKey) | |||
c.list.Remove(front) | |||
} | |||
} | |||
e := c.list.PushBack(key) | |||
c.cacheMap[key] = e | |||
return true | |||
} | |||
func (c *LRUTxCache) Remove(tx types.Tx) { | |||
c.mtx.Lock() | |||
defer c.mtx.Unlock() | |||
key := TxKey(tx) | |||
e := c.cacheMap[key] | |||
delete(c.cacheMap, key) | |||
if e != nil { | |||
c.list.Remove(e) | |||
} | |||
} | |||
// NopTxCache defines a no-op raw transaction cache. | |||
type NopTxCache struct{} | |||
var _ TxCache = (*NopTxCache)(nil) | |||
func (NopTxCache) Reset() {} | |||
func (NopTxCache) Push(types.Tx) bool { return true } | |||
func (NopTxCache) Remove(types.Tx) {} |
@ -0,0 +1,41 @@ | |||
package mempool | |||
import ( | |||
"encoding/binary" | |||
"testing" | |||
) | |||
func BenchmarkCacheInsertTime(b *testing.B) { | |||
cache := NewLRUTxCache(b.N) | |||
txs := make([][]byte, b.N) | |||
for i := 0; i < b.N; i++ { | |||
txs[i] = make([]byte, 8) | |||
binary.BigEndian.PutUint64(txs[i], uint64(i)) | |||
} | |||
b.ResetTimer() | |||
for i := 0; i < b.N; i++ { | |||
cache.Push(txs[i]) | |||
} | |||
} | |||
// This benchmark is probably skewed, since we actually will be removing | |||
// txs in parallel, which may cause some overhead due to mutex locking. | |||
func BenchmarkCacheRemoveTime(b *testing.B) { | |||
cache := NewLRUTxCache(b.N) | |||
txs := make([][]byte, b.N) | |||
for i := 0; i < b.N; i++ { | |||
txs[i] = make([]byte, 8) | |||
binary.BigEndian.PutUint64(txs[i], uint64(i)) | |||
cache.Push(txs[i]) | |||
} | |||
b.ResetTimer() | |||
for i := 0; i < b.N; i++ { | |||
cache.Remove(txs[i]) | |||
} | |||
} |
@ -0,0 +1,23 @@ | |||
package mempool | |||
import ( | |||
"testing" | |||
"github.com/stretchr/testify/require" | |||
"github.com/tendermint/tendermint/p2p" | |||
) | |||
func TestMempoolIDsBasic(t *testing.T) { | |||
ids := NewMempoolIDs() | |||
peerID, err := p2p.NewNodeID("0011223344556677889900112233445566778899") | |||
require.NoError(t, err) | |||
ids.ReserveForPeer(peerID) | |||
require.EqualValues(t, 1, ids.GetForPeer(peerID)) | |||
ids.Reclaim(peerID) | |||
ids.ReserveForPeer(peerID) | |||
require.EqualValues(t, 2, ids.GetForPeer(peerID)) | |||
ids.Reclaim(peerID) | |||
} |
@ -0,0 +1,37 @@ | |||
package mempool | |||
import ( | |||
"context" | |||
"crypto/sha256" | |||
"github.com/tendermint/tendermint/p2p" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// TxKeySize defines the size of the transaction's key used for indexing. | |||
const TxKeySize = sha256.Size | |||
// TxKey is the fixed length array key used as an index. | |||
func TxKey(tx types.Tx) [TxKeySize]byte { | |||
return sha256.Sum256(tx) | |||
} | |||
// TxHashFromBytes returns the hash of a transaction from raw bytes. | |||
func TxHashFromBytes(tx []byte) []byte { | |||
return types.Tx(tx).Hash() | |||
} | |||
// TxInfo are parameters that get passed when attempting to add a tx to the | |||
// mempool. | |||
type TxInfo struct { | |||
// SenderID is the internal peer ID used in the mempool to identify the | |||
// sender, storing two bytes with each transaction instead of 20 bytes for | |||
// the p2p.NodeID. | |||
SenderID uint16 | |||
// SenderNodeID is the actual p2p.NodeID of the sender. | |||
SenderNodeID p2p.NodeID | |||
// Context is the optional context to cancel CheckTx | |||
Context context.Context | |||
} |
@ -0,0 +1,81 @@ | |||
package v0 | |||
import ( | |||
"crypto/sha256" | |||
"testing" | |||
"github.com/stretchr/testify/require" | |||
"github.com/tendermint/tendermint/abci/example/kvstore" | |||
abci "github.com/tendermint/tendermint/abci/types" | |||
"github.com/tendermint/tendermint/mempool" | |||
"github.com/tendermint/tendermint/proxy" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
func TestCacheAfterUpdate(t *testing.T) { | |||
app := kvstore.NewApplication() | |||
cc := proxy.NewLocalClientCreator(app) | |||
mp, cleanup := newMempoolWithApp(cc) | |||
defer cleanup() | |||
// reAddIndices & txsInCache can have elements > numTxsToCreate | |||
// also assumes max index is 255 for convenience | |||
// txs in cache also checks order of elements | |||
tests := []struct { | |||
numTxsToCreate int | |||
updateIndices []int | |||
reAddIndices []int | |||
txsInCache []int | |||
}{ | |||
{1, []int{}, []int{1}, []int{1, 0}}, // adding new txs works | |||
{2, []int{1}, []int{}, []int{1, 0}}, // update doesn't remove tx from cache | |||
{2, []int{2}, []int{}, []int{2, 1, 0}}, // update adds new tx to cache | |||
{2, []int{1}, []int{1}, []int{1, 0}}, // re-adding after update doesn't make dupe | |||
} | |||
for tcIndex, tc := range tests { | |||
for i := 0; i < tc.numTxsToCreate; i++ { | |||
tx := types.Tx{byte(i)} | |||
err := mp.CheckTx(tx, nil, mempool.TxInfo{}) | |||
require.NoError(t, err) | |||
} | |||
updateTxs := []types.Tx{} | |||
for _, v := range tc.updateIndices { | |||
tx := types.Tx{byte(v)} | |||
updateTxs = append(updateTxs, tx) | |||
} | |||
err := mp.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil) | |||
require.NoError(t, err) | |||
for _, v := range tc.reAddIndices { | |||
tx := types.Tx{byte(v)} | |||
_ = mp.CheckTx(tx, nil, mempool.TxInfo{}) | |||
} | |||
cache := mp.cache.(*mempool.LRUTxCache) | |||
node := cache.GetList().Front() | |||
counter := 0 | |||
for node != nil { | |||
require.NotEqual(t, len(tc.txsInCache), counter, | |||
"cache larger than expected on testcase %d", tcIndex) | |||
nodeVal := node.Value.([sha256.Size]byte) | |||
expectedBz := sha256.Sum256([]byte{byte(tc.txsInCache[len(tc.txsInCache)-counter-1])}) | |||
// Reference for reading the errors: | |||
// >>> sha256('\x00').hexdigest() | |||
// '6e340b9cffb37a989ca544e6bb780a2c78901d3fb33738768511a30617afa01d' | |||
// >>> sha256('\x01').hexdigest() | |||
// '4bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a' | |||
// >>> sha256('\x02').hexdigest() | |||
// 'dbc1b4c900ffe48d575b5da5c638040125f65db0fe3e24494b76ea986457d986' | |||
require.Equal(t, expectedBz, nodeVal, "Equality failed on index %d, tc %d", counter, tcIndex) | |||
counter++ | |||
node = node.Next() | |||
} | |||
require.Equal(t, len(tc.txsInCache), counter, | |||
"cache smaller than expected on testcase %d", tcIndex) | |||
mp.Flush() | |||
} | |||
} |
@ -0,0 +1,763 @@ | |||
package v1 | |||
import ( | |||
"bytes" | |||
"context" | |||
"fmt" | |||
"sync/atomic" | |||
"time" | |||
abci "github.com/tendermint/tendermint/abci/types" | |||
"github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/internal/libs/clist" | |||
tmsync "github.com/tendermint/tendermint/internal/libs/sync" | |||
"github.com/tendermint/tendermint/libs/log" | |||
tmmath "github.com/tendermint/tendermint/libs/math" | |||
"github.com/tendermint/tendermint/mempool" | |||
"github.com/tendermint/tendermint/proxy" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
var _ mempool.Mempool = (*TxMempool)(nil) | |||
// TxMempoolOption sets an optional parameter on the TxMempool. | |||
type TxMempoolOption func(*TxMempool) | |||
// TxMempool defines a prioritized mempool data structure used by the v1 mempool | |||
// reactor. It keeps a thread-safe priority queue of transactions that is used | |||
// when a block proposer constructs a block and a thread-safe linked-list that | |||
// is used to gossip transactions to peers in a FIFO manner. | |||
type TxMempool struct { | |||
logger log.Logger | |||
metrics *mempool.Metrics | |||
config *config.MempoolConfig | |||
proxyAppConn proxy.AppConnMempool | |||
// txsAvailable fires once for each height when the mempool is not empty | |||
txsAvailable chan struct{} | |||
notifiedTxsAvailable bool | |||
// height defines the last block height process during Update() | |||
height int64 | |||
// sizeBytes defines the total size of the mempool (sum of all tx bytes) | |||
sizeBytes int64 | |||
// cache defines a fixed-size cache of already seen transactions as this | |||
// reduces pressure on the proxyApp. | |||
cache mempool.TxCache | |||
// txStore defines the main storage of valid transactions. Indexes are built | |||
// on top of this store. | |||
txStore *TxStore | |||
// gossipIndex defines the gossiping index of valid transactions via a | |||
// thread-safe linked-list. We also use the gossip index as a cursor for | |||
// rechecking transactions already in the mempool. | |||
gossipIndex *clist.CList | |||
// recheckCursor and recheckEnd are used as cursors based on the gossip index | |||
// to recheck transactions that are already in the mempool. Iteration is not | |||
// thread-safe and transaction may be mutated in serial order. | |||
// | |||
// XXX/TODO: It might be somewhat of a codesmell to use the gossip index for | |||
// iterator and cursor management when rechecking transactions. If the gossip | |||
// index changes or is removed in a future refactor, this will have to be | |||
// refactored. Instead, we should consider just keeping a slice of a snapshot | |||
// of the mempool's current transactions during Update and an integer cursor | |||
// into that slice. This, however, requires additional O(n) space complexity. | |||
recheckCursor *clist.CElement // next expected response | |||
recheckEnd *clist.CElement // re-checking stops here | |||
// priorityIndex defines the priority index of valid transactions via a | |||
// thread-safe priority queue. | |||
priorityIndex *TxPriorityQueue | |||
// A read/write lock is used to safe guard updates, insertions and deletions | |||
// from the mempool. A read-lock is implicitly acquired when executing CheckTx, | |||
// however, a caller must explicitly grab a write-lock via Lock when updating | |||
// the mempool via Update(). | |||
mtx tmsync.RWMutex | |||
preCheck mempool.PreCheckFunc | |||
postCheck mempool.PostCheckFunc | |||
} | |||
func NewTxMempool( | |||
logger log.Logger, | |||
cfg *config.MempoolConfig, | |||
proxyAppConn proxy.AppConnMempool, | |||
height int64, | |||
options ...TxMempoolOption, | |||
) *TxMempool { | |||
txmp := &TxMempool{ | |||
logger: logger, | |||
config: cfg, | |||
proxyAppConn: proxyAppConn, | |||
height: height, | |||
cache: mempool.NopTxCache{}, | |||
metrics: mempool.NopMetrics(), | |||
txStore: NewTxStore(), | |||
gossipIndex: clist.New(), | |||
priorityIndex: NewTxPriorityQueue(), | |||
} | |||
if cfg.CacheSize > 0 { | |||
txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize) | |||
} | |||
proxyAppConn.SetResponseCallback(txmp.defaultTxCallback) | |||
for _, opt := range options { | |||
opt(txmp) | |||
} | |||
return txmp | |||
} | |||
// WithPreCheck sets a filter for the mempool to reject a transaction if f(tx) | |||
// returns an error. This is executed before CheckTx. It only applies to the | |||
// first created block. After that, Update() overwrites the existing value. | |||
func WithPreCheck(f mempool.PreCheckFunc) TxMempoolOption { | |||
return func(txmp *TxMempool) { txmp.preCheck = f } | |||
} | |||
// WithPostCheck sets a filter for the mempool to reject a transaction if | |||
// f(tx, resp) returns an error. This is executed after CheckTx. It only applies | |||
// to the first created block. After that, Update overwrites the existing value. | |||
func WithPostCheck(f mempool.PostCheckFunc) TxMempoolOption { | |||
return func(txmp *TxMempool) { txmp.postCheck = f } | |||
} | |||
// WithMetrics sets the mempool's metrics collector. | |||
func WithMetrics(metrics *mempool.Metrics) TxMempoolOption { | |||
return func(txmp *TxMempool) { txmp.metrics = metrics } | |||
} | |||
// Lock obtains a write-lock on the mempool. A caller must be sure to explicitly | |||
// release the lock when finished. | |||
func (txmp *TxMempool) Lock() { | |||
txmp.mtx.Lock() | |||
} | |||
// Unlock releases a write-lock on the mempool. | |||
func (txmp *TxMempool) Unlock() { | |||
txmp.mtx.Unlock() | |||
} | |||
// Size returns the number of valid transactions in the mempool. It is | |||
// thread-safe. | |||
func (txmp *TxMempool) Size() int { | |||
return txmp.txStore.Size() | |||
} | |||
// SizeBytes return the total sum in bytes of all the valid transactions in the | |||
// mempool. It is thread-safe. | |||
func (txmp *TxMempool) SizeBytes() int64 { | |||
return atomic.LoadInt64(&txmp.sizeBytes) | |||
} | |||
// FlushAppConn executes FlushSync on the mempool's proxyAppConn. | |||
// | |||
// NOTE: The caller must obtain a write-lock via Lock() prior to execution. | |||
func (txmp *TxMempool) FlushAppConn() error { | |||
return txmp.proxyAppConn.FlushSync(context.Background()) | |||
} | |||
// WaitForNextTx returns a blocking channel that will be closed when the next | |||
// valid transaction is available to gossip. It is thread-safe. | |||
func (txmp *TxMempool) WaitForNextTx() <-chan struct{} { | |||
return txmp.gossipIndex.WaitChan() | |||
} | |||
// NextGossipTx returns the next valid transaction to gossip. A caller must wait | |||
// for WaitForNextTx to signal a transaction is available to gossip first. It is | |||
// thread-safe. | |||
func (txmp *TxMempool) NextGossipTx() *WrappedTx { | |||
return txmp.gossipIndex.Front().Value.(*WrappedTx) | |||
} | |||
// EnableTxsAvailable enables the mempool to trigger events when transactions | |||
// are available on a block by block basis. | |||
func (txmp *TxMempool) EnableTxsAvailable() { | |||
txmp.mtx.Lock() | |||
defer txmp.mtx.Unlock() | |||
txmp.txsAvailable = make(chan struct{}, 1) | |||
} | |||
// TxsAvailable returns a channel which fires once for every height, and only | |||
// when transactions are available in the mempool. It is thread-safe. | |||
func (txmp *TxMempool) TxsAvailable() <-chan struct{} { | |||
return txmp.txsAvailable | |||
} | |||
// CheckTx executes the ABCI CheckTx method for a given transaction. It acquires | |||
// a read-lock attempts to execute the application's CheckTx ABCI method via | |||
// CheckTxAsync. We return an error if any of the following happen: | |||
// | |||
// - The CheckTxAsync execution fails. | |||
// - The transaction already exists in the cache and we've already received the | |||
// transaction from the peer. Otherwise, if it solely exists in the cache, we | |||
// return nil. | |||
// - The transaction size exceeds the maximum transaction size as defined by the | |||
// configuration provided to the mempool. | |||
// - The transaction fails Pre-Check (if it is defined). | |||
// - The proxyAppConn fails, e.g. the buffer is full. | |||
// | |||
// If the mempool is full, we still execute CheckTx and attempt to find a lower | |||
// priority transaction to evict. If such a transaction exists, we remove the | |||
// lower priority transaction and add the new one with higher priority. | |||
// | |||
// NOTE: | |||
// - The applications' CheckTx implementation may panic. | |||
// - The caller is not to explicitly require any locks for executing CheckTx. | |||
func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo mempool.TxInfo) error { | |||
txmp.mtx.RLock() | |||
defer txmp.mtx.RUnlock() | |||
txSize := len(tx) | |||
if txSize > txmp.config.MaxTxBytes { | |||
return mempool.ErrTxTooLarge{ | |||
Max: txmp.config.MaxTxBytes, | |||
Actual: txSize, | |||
} | |||
} | |||
if txmp.preCheck != nil { | |||
if err := txmp.preCheck(tx); err != nil { | |||
return mempool.ErrPreCheck{ | |||
Reason: err, | |||
} | |||
} | |||
} | |||
if err := txmp.proxyAppConn.Error(); err != nil { | |||
return err | |||
} | |||
txHash := mempool.TxKey(tx) | |||
// We add the transaction to the mempool's cache and if the transaction already | |||
// exists, i.e. false is returned, then we check if we've seen this transaction | |||
// from the same sender and error if we have. Otherwise, we return nil. | |||
if !txmp.cache.Push(tx) { | |||
wtx, ok := txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID) | |||
if wtx != nil && ok { | |||
// We already have the transaction stored and the we've already seen this | |||
// transaction from txInfo.SenderID. | |||
return mempool.ErrTxInCache | |||
} | |||
txmp.logger.Debug("tx exists already in cache", "tx_hash", tx.Hash()) | |||
return nil | |||
} | |||
ctx := txInfo.Context | |||
if ctx == nil { | |||
ctx = context.Background() | |||
} | |||
reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx}) | |||
if err != nil { | |||
txmp.cache.Remove(tx) | |||
return err | |||
} | |||
reqRes.SetCallback(func(res *abci.Response) { | |||
if txmp.recheckCursor != nil { | |||
panic("recheck cursor is non-nil in CheckTx callback") | |||
} | |||
wtx := &WrappedTx{ | |||
tx: tx, | |||
hash: txHash, | |||
timestamp: time.Now().UTC(), | |||
} | |||
txmp.initTxCallback(wtx, res, txInfo) | |||
if cb != nil { | |||
cb(res) | |||
} | |||
}) | |||
return nil | |||
} | |||
// Flush flushes out the mempool. It acquires a read-lock, fetches all the | |||
// transactions currently in the transaction store and removes each transaction | |||
// from the store and all indexes and finally resets the cache. | |||
// | |||
// NOTE: | |||
// - Flushing the mempool may leave the mempool in an inconsistent state. | |||
func (txmp *TxMempool) Flush() { | |||
txmp.mtx.RLock() | |||
defer txmp.mtx.RUnlock() | |||
for _, wtx := range txmp.txStore.GetAllTxs() { | |||
if !txmp.txStore.IsTxRemoved(wtx.hash) { | |||
txmp.txStore.RemoveTx(wtx) | |||
txmp.priorityIndex.RemoveTx(wtx) | |||
txmp.gossipIndex.Remove(wtx.gossipEl) | |||
wtx.gossipEl.DetachPrev() | |||
} | |||
} | |||
atomic.SwapInt64(&txmp.sizeBytes, 0) | |||
txmp.cache.Reset() | |||
} | |||
// ReapMaxBytesMaxGas returns a list of transactions within the provided size | |||
// and gas constraints. Transaction are retrieved in priority order. | |||
// | |||
// NOTE: | |||
// - A read-lock is acquired. | |||
// - Transactions returned are not actually removed from the mempool transaction | |||
// store or indexes. | |||
func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { | |||
txmp.mtx.RLock() | |||
defer txmp.mtx.RUnlock() | |||
var ( | |||
totalGas int64 | |||
totalSize int64 | |||
) | |||
// wTxs contains a list of *WrappedTx retrieved from the priority queue that | |||
// need to be re-enqueued prior to returning. | |||
wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs()) | |||
defer func() { | |||
for _, wtx := range wTxs { | |||
txmp.priorityIndex.PushTx(wtx) | |||
} | |||
}() | |||
txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs()) | |||
for txmp.priorityIndex.NumTxs() > 0 { | |||
wtx := txmp.priorityIndex.PopTx() | |||
txs = append(txs, wtx.tx) | |||
wTxs = append(wTxs, wtx) | |||
size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx}) | |||
// Ensure we have capacity for the transaction with respect to the | |||
// transaction size. | |||
if maxBytes > -1 && totalSize+size > maxBytes { | |||
return txs[:len(txs)-1] | |||
} | |||
totalSize += size | |||
// ensure we have capacity for the transaction with respect to total gas | |||
gas := totalGas + wtx.gasWanted | |||
if maxGas > -1 && gas > maxGas { | |||
return txs[:len(txs)-1] | |||
} | |||
totalGas = gas | |||
} | |||
return txs | |||
} | |||
// ReapMaxTxs returns a list of transactions within the provided number of | |||
// transactions bound. Transaction are retrieved in priority order. | |||
// | |||
// NOTE: | |||
// - A read-lock is acquired. | |||
// - Transactions returned are not actually removed from the mempool transaction | |||
// store or indexes. | |||
func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs { | |||
txmp.mtx.RLock() | |||
defer txmp.mtx.RUnlock() | |||
numTxs := txmp.priorityIndex.NumTxs() | |||
if max < 0 { | |||
max = numTxs | |||
} | |||
cap := tmmath.MinInt(numTxs, max) | |||
// wTxs contains a list of *WrappedTx retrieved from the priority queue that | |||
// need to be re-enqueued prior to returning. | |||
wTxs := make([]*WrappedTx, 0, cap) | |||
defer func() { | |||
for _, wtx := range wTxs { | |||
txmp.priorityIndex.PushTx(wtx) | |||
} | |||
}() | |||
txs := make([]types.Tx, 0, cap) | |||
for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max { | |||
wtx := txmp.priorityIndex.PopTx() | |||
txs = append(txs, wtx.tx) | |||
wTxs = append(wTxs, wtx) | |||
} | |||
return txs | |||
} | |||
// Update iterates over all the transactions provided by the caller, i.e. the | |||
// block producer, and removes them from the cache (if applicable) and removes | |||
// the transactions from the main transaction store and associated indexes. | |||
// Finally, if there are trainsactions remaining in the mempool, we initiate a | |||
// re-CheckTx for them (if applicable), otherwise, we notify the caller more | |||
// transactions are available. | |||
// | |||
// NOTE: | |||
// - The caller must explicitly acquire a write-lock via Lock(). | |||
func (txmp *TxMempool) Update( | |||
blockHeight int64, | |||
blockTxs types.Txs, | |||
deliverTxResponses []*abci.ResponseDeliverTx, | |||
newPreFn mempool.PreCheckFunc, | |||
newPostFn mempool.PostCheckFunc, | |||
) error { | |||
txmp.height = blockHeight | |||
txmp.notifiedTxsAvailable = false | |||
if newPreFn != nil { | |||
txmp.preCheck = newPreFn | |||
} | |||
if newPostFn != nil { | |||
txmp.postCheck = newPostFn | |||
} | |||
for i, tx := range blockTxs { | |||
if deliverTxResponses[i].Code == abci.CodeTypeOK { | |||
// add the valid committed transaction to the cache (if missing) | |||
_ = txmp.cache.Push(tx) | |||
} else if !txmp.config.KeepInvalidTxsInCache { | |||
// allow invalid transactions to be re-submitted | |||
txmp.cache.Remove(tx) | |||
} | |||
// remove the committed transaction from the transaction store and indexes | |||
if wtx := txmp.txStore.GetTxByHash(mempool.TxKey(tx)); wtx != nil { | |||
txmp.removeTx(wtx, false) | |||
} | |||
} | |||
// If there any uncommitted transactions left in the mempool, we either | |||
// initiate re-CheckTx per remaining transaction or notify that remaining | |||
// transactions are left. | |||
if txmp.Size() > 0 { | |||
if txmp.config.Recheck { | |||
txmp.logger.Debug( | |||
"executing re-CheckTx for all remaining transactions", | |||
"num_txs", txmp.Size(), | |||
"height", blockHeight, | |||
) | |||
txmp.updateReCheckTxs() | |||
} else { | |||
txmp.notifyTxsAvailable() | |||
} | |||
} | |||
txmp.metrics.Size.Set(float64(txmp.Size())) | |||
return nil | |||
} | |||
// initTxCallback performs the initial, i.e. the first, callback after CheckTx | |||
// has been executed by the ABCI application. In other words, initTxCallback is | |||
// called after executing CheckTx when we see a unique transaction for the first | |||
// time. CheckTx can be called again for the same transaction at a later point | |||
// in time when re-checking, however, this callback will not be called. | |||
// | |||
// After the ABCI application executes CheckTx, initTxCallback is called with | |||
// the ABCI *Response object and TxInfo. If postCheck is defined on the mempool, | |||
// we execute that first. If there is no error from postCheck (if defined) and | |||
// the ABCI CheckTx response code is OK, we attempt to insert the transaction. | |||
// | |||
// When attempting to insert the transaction, we first check if there is | |||
// sufficient capacity. If there is sufficient capacity, the transaction is | |||
// inserted into the txStore and indexed across all indexes. Otherwise, if the | |||
// mempool is full, we attempt to find a lower priority transaction to evict in | |||
// place of the new incoming transaction. If no such transaction exists, the | |||
// new incoming transaction is rejected. | |||
// | |||
// If the new incoming transaction fails CheckTx or postCheck fails, we reject | |||
// the new incoming transaction. | |||
// | |||
// NOTE: | |||
// - An explicit lock is NOT required. | |||
func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo mempool.TxInfo) { | |||
checkTxRes, ok := res.Value.(*abci.Response_CheckTx) | |||
if ok { | |||
var err error | |||
if txmp.postCheck != nil { | |||
err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx) | |||
} | |||
if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil { | |||
sender := checkTxRes.CheckTx.Sender | |||
priority := checkTxRes.CheckTx.Priority | |||
if len(sender) > 0 { | |||
if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil { | |||
txmp.logger.Error( | |||
"rejected incoming good transaction; tx already exists for sender", | |||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()), | |||
"sender", sender, | |||
) | |||
txmp.metrics.RejectedTxs.Add(1) | |||
return | |||
} | |||
} | |||
if err := txmp.canAddTx(wtx); err != nil { | |||
evictTxs := txmp.priorityIndex.GetEvictableTxs( | |||
priority, | |||
int64(wtx.Size()), | |||
txmp.SizeBytes(), | |||
txmp.config.MaxTxsBytes, | |||
) | |||
if len(evictTxs) == 0 { | |||
// No room for the new incoming transaction so we just remove it from | |||
// the cache. | |||
txmp.cache.Remove(wtx.tx) | |||
txmp.logger.Error( | |||
"rejected incoming good transaction; mempool full", | |||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()), | |||
"err", err.Error(), | |||
) | |||
txmp.metrics.RejectedTxs.Add(1) | |||
return | |||
} | |||
// evict an existing transaction(s) | |||
// | |||
// NOTE: | |||
// - The transaction, toEvict, can be removed while a concurrent | |||
// reCheckTx callback is being executed for the same transaction. | |||
for _, toEvict := range evictTxs { | |||
txmp.removeTx(toEvict, true) | |||
txmp.logger.Debug( | |||
"evicted existing good transaction; mempool full", | |||
"old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()), | |||
"old_priority", toEvict.priority, | |||
"new_tx", fmt.Sprintf("%X", wtx.tx.Hash()), | |||
"new_priority", wtx.priority, | |||
) | |||
txmp.metrics.EvictedTxs.Add(1) | |||
} | |||
} | |||
wtx.gasWanted = checkTxRes.CheckTx.GasWanted | |||
wtx.height = txmp.height | |||
wtx.priority = priority | |||
wtx.sender = sender | |||
wtx.peers = map[uint16]struct{}{ | |||
txInfo.SenderID: {}, | |||
} | |||
txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size())) | |||
txmp.metrics.Size.Set(float64(txmp.Size())) | |||
txmp.insertTx(wtx) | |||
txmp.logger.Debug( | |||
"inserted good transaction", | |||
"priority", wtx.priority, | |||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()), | |||
"height", txmp.height, | |||
"num_txs", txmp.Size(), | |||
) | |||
txmp.notifyTxsAvailable() | |||
} else { | |||
// ignore bad transactions | |||
txmp.logger.Info( | |||
"rejected bad transaction", | |||
"priority", wtx.priority, | |||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()), | |||
"peer_id", txInfo.SenderNodeID, | |||
"code", checkTxRes.CheckTx.Code, | |||
"post_check_err", err, | |||
) | |||
txmp.metrics.FailedTxs.Add(1) | |||
if !txmp.config.KeepInvalidTxsInCache { | |||
txmp.cache.Remove(wtx.tx) | |||
} | |||
} | |||
} | |||
} | |||
// defaultTxCallback performs the default CheckTx application callback. This is | |||
// NOT executed when a transaction is first seen/received. Instead, this callback | |||
// is executed during re-checking transactions (if enabled). A caller, i.e a | |||
// block proposer, acquires a mempool write-lock via Lock() and when executing | |||
// Update(), if the mempool is non-empty and Recheck is enabled, then all | |||
// remaining transactions will be rechecked via CheckTxAsync. The order in which | |||
// they are rechecked must be the same order in which this callback is called | |||
// per transaction. | |||
func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) { | |||
if txmp.recheckCursor == nil { | |||
return | |||
} | |||
txmp.metrics.RecheckTimes.Add(1) | |||
checkTxRes, ok := res.Value.(*abci.Response_CheckTx) | |||
if ok { | |||
tx := req.GetCheckTx().Tx | |||
wtx := txmp.recheckCursor.Value.(*WrappedTx) | |||
if !bytes.Equal(tx, wtx.tx) { | |||
panic(fmt.Sprintf("re-CheckTx transaction mismatch; got: %X, expected: %X", wtx.tx.Hash(), mempool.TxKey(tx))) | |||
} | |||
// Only evaluate transactions that have not been removed. This can happen | |||
// if an existing transaction is evicted during CheckTx and while this | |||
// callback is being executed for the same evicted transaction. | |||
if !txmp.txStore.IsTxRemoved(wtx.hash) { | |||
var err error | |||
if txmp.postCheck != nil { | |||
err = txmp.postCheck(tx, checkTxRes.CheckTx) | |||
} | |||
if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil { | |||
wtx.priority = checkTxRes.CheckTx.Priority | |||
} else { | |||
txmp.logger.Debug( | |||
"existing transaction no longer valid; failed re-CheckTx callback", | |||
"priority", wtx.priority, | |||
"tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(wtx.tx)), | |||
"err", err, | |||
"code", checkTxRes.CheckTx.Code, | |||
) | |||
if wtx.gossipEl != txmp.recheckCursor { | |||
panic("corrupted reCheckTx cursor") | |||
} | |||
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache) | |||
} | |||
} | |||
// move reCheckTx cursor to next element | |||
if txmp.recheckCursor == txmp.recheckEnd { | |||
txmp.recheckCursor = nil | |||
} else { | |||
txmp.recheckCursor = txmp.recheckCursor.Next() | |||
} | |||
if txmp.recheckCursor == nil { | |||
txmp.logger.Debug("finished rechecking transactions") | |||
if txmp.Size() > 0 { | |||
txmp.notifyTxsAvailable() | |||
} | |||
} | |||
txmp.metrics.Size.Set(float64(txmp.Size())) | |||
} | |||
} | |||
// updateReCheckTxs updates the recheck cursors by using the gossipIndex. For | |||
// each transaction, it executes CheckTxAsync. The global callback defined on | |||
// the proxyAppConn will be executed for each transaction after CheckTx is | |||
// executed. | |||
// | |||
// NOTE: | |||
// - The caller must have a write-lock when executing updateReCheckTxs. | |||
func (txmp *TxMempool) updateReCheckTxs() { | |||
if txmp.Size() == 0 { | |||
panic("attempted to update re-CheckTx txs when mempool is empty") | |||
} | |||
txmp.recheckCursor = txmp.gossipIndex.Front() | |||
txmp.recheckEnd = txmp.gossipIndex.Back() | |||
ctx := context.Background() | |||
for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() { | |||
wtx := e.Value.(*WrappedTx) | |||
// Only execute CheckTx if the transaction is not marked as removed which | |||
// could happen if the transaction was evicted. | |||
if !txmp.txStore.IsTxRemoved(wtx.hash) { | |||
_, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{ | |||
Tx: wtx.tx, | |||
Type: abci.CheckTxType_Recheck, | |||
}) | |||
if err != nil { | |||
// no need in retrying since the tx will be rechecked after the next block | |||
txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err) | |||
} | |||
} | |||
} | |||
if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil { | |||
txmp.logger.Error("failed to flush transactions during rechecking", "err", err) | |||
} | |||
} | |||
// canAddTx returns an error if we cannot insert the provided *WrappedTx into | |||
// the mempool due to mempool configured constraints. Otherwise, nil is returned | |||
// and the transaction can be inserted into the mempool. | |||
func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { | |||
var ( | |||
numTxs = txmp.Size() | |||
sizeBytes = txmp.SizeBytes() | |||
) | |||
if numTxs >= txmp.config.Size || int64(wtx.Size())+sizeBytes > txmp.config.MaxTxsBytes { | |||
return mempool.ErrMempoolIsFull{ | |||
NumTxs: numTxs, | |||
MaxTxs: txmp.config.Size, | |||
TxsBytes: sizeBytes, | |||
MaxTxsBytes: txmp.config.MaxTxsBytes, | |||
} | |||
} | |||
return nil | |||
} | |||
func (txmp *TxMempool) insertTx(wtx *WrappedTx) { | |||
txmp.txStore.SetTx(wtx) | |||
txmp.priorityIndex.PushTx(wtx) | |||
// Insert the transaction into the gossip index and mark the reference to the | |||
// linked-list element, which will be needed at a later point when the | |||
// transaction is removed. | |||
gossipEl := txmp.gossipIndex.PushBack(wtx) | |||
wtx.gossipEl = gossipEl | |||
atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size())) | |||
} | |||
func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { | |||
if txmp.txStore.IsTxRemoved(wtx.hash) { | |||
return | |||
} | |||
txmp.txStore.RemoveTx(wtx) | |||
txmp.priorityIndex.RemoveTx(wtx) | |||
// Remove the transaction from the gossip index and cleanup the linked-list | |||
// element so it can be garbage collected. | |||
txmp.gossipIndex.Remove(wtx.gossipEl) | |||
wtx.gossipEl.DetachPrev() | |||
atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size())) | |||
if removeFromCache { | |||
txmp.cache.Remove(wtx.tx) | |||
} | |||
} | |||
func (txmp *TxMempool) notifyTxsAvailable() { | |||
if txmp.Size() == 0 { | |||
panic("attempt to notify txs available but mempool is empty!") | |||
} | |||
if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable { | |||
// channel cap is 1, so this will send once | |||
txmp.notifiedTxsAvailable = true | |||
select { | |||
case txmp.txsAvailable <- struct{}{}: | |||
default: | |||
} | |||
} | |||
} |
@ -0,0 +1,31 @@ | |||
package v1 | |||
import ( | |||
"fmt" | |||
"math/rand" | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/require" | |||
"github.com/tendermint/tendermint/mempool" | |||
) | |||
func BenchmarkTxMempool_CheckTx(b *testing.B) { | |||
txmp := setup(b, 10000) | |||
rng := rand.New(rand.NewSource(time.Now().UnixNano())) | |||
b.ResetTimer() | |||
for n := 0; n < b.N; n++ { | |||
b.StopTimer() | |||
prefix := make([]byte, 20) | |||
_, err := rng.Read(prefix) | |||
require.NoError(b, err) | |||
priority := int64(rng.Intn(9999-1000) + 1000) | |||
tx := []byte(fmt.Sprintf("%X=%d", prefix, priority)) | |||
b.StartTimer() | |||
require.NoError(b, txmp.CheckTx(tx, nil, mempool.TxInfo{})) | |||
} | |||
} |
@ -0,0 +1,432 @@ | |||
package v1 | |||
import ( | |||
"bytes" | |||
"fmt" | |||
"math/rand" | |||
"os" | |||
"sort" | |||
"strconv" | |||
"sync" | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/require" | |||
"github.com/tendermint/tendermint/abci/example/code" | |||
"github.com/tendermint/tendermint/abci/example/kvstore" | |||
abci "github.com/tendermint/tendermint/abci/types" | |||
"github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/libs/log" | |||
"github.com/tendermint/tendermint/mempool" | |||
"github.com/tendermint/tendermint/proxy" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// application extends the KV store application by overriding CheckTx to provide | |||
// transaction priority based on the value in the key/value pair. | |||
type application struct { | |||
*kvstore.Application | |||
} | |||
type testTx struct { | |||
tx types.Tx | |||
priority int64 | |||
} | |||
func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { | |||
var ( | |||
priority int64 | |||
sender string | |||
) | |||
// infer the priority from the raw transaction value (sender=key=value) | |||
parts := bytes.Split(req.Tx, []byte("=")) | |||
if len(parts) == 3 { | |||
v, err := strconv.ParseInt(string(parts[2]), 10, 64) | |||
if err != nil { | |||
return abci.ResponseCheckTx{ | |||
Priority: priority, | |||
Code: 100, | |||
GasWanted: 1, | |||
} | |||
} | |||
priority = v | |||
sender = string(parts[0]) | |||
} else { | |||
return abci.ResponseCheckTx{ | |||
Priority: priority, | |||
Code: 101, | |||
GasWanted: 1, | |||
} | |||
} | |||
return abci.ResponseCheckTx{ | |||
Priority: priority, | |||
Sender: sender, | |||
Code: code.CodeTypeOK, | |||
GasWanted: 1, | |||
} | |||
} | |||
func setup(t testing.TB, cacheSize int) *TxMempool { | |||
t.Helper() | |||
app := &application{kvstore.NewApplication()} | |||
cc := proxy.NewLocalClientCreator(app) | |||
cfg := config.ResetTestRoot(t.Name()) | |||
cfg.Mempool.CacheSize = cacheSize | |||
appConnMem, err := cc.NewABCIClient() | |||
require.NoError(t, err) | |||
require.NoError(t, appConnMem.Start()) | |||
t.Cleanup(func() { | |||
os.RemoveAll(cfg.RootDir) | |||
require.NoError(t, appConnMem.Stop()) | |||
}) | |||
return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0) | |||
} | |||
func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { | |||
txs := make([]testTx, numTxs) | |||
txInfo := mempool.TxInfo{SenderID: peerID} | |||
rng := rand.New(rand.NewSource(time.Now().UnixNano())) | |||
for i := 0; i < numTxs; i++ { | |||
prefix := make([]byte, 20) | |||
_, err := rng.Read(prefix) | |||
require.NoError(t, err) | |||
// sender := make([]byte, 10) | |||
// _, err = rng.Read(sender) | |||
// require.NoError(t, err) | |||
priority := int64(rng.Intn(9999-1000) + 1000) | |||
txs[i] = testTx{ | |||
tx: []byte(fmt.Sprintf("sender-%d=%X=%d", i, prefix, priority)), | |||
priority: priority, | |||
} | |||
require.NoError(t, txmp.CheckTx(txs[i].tx, nil, txInfo)) | |||
} | |||
return txs | |||
} | |||
func TestTxMempool_TxsAvailable(t *testing.T) { | |||
txmp := setup(t, 0) | |||
txmp.EnableTxsAvailable() | |||
ensureNoTxFire := func() { | |||
timer := time.NewTimer(500 * time.Millisecond) | |||
select { | |||
case <-txmp.TxsAvailable(): | |||
require.Fail(t, "unexpected transactions event") | |||
case <-timer.C: | |||
} | |||
} | |||
ensureTxFire := func() { | |||
timer := time.NewTimer(500 * time.Millisecond) | |||
select { | |||
case <-txmp.TxsAvailable(): | |||
case <-timer.C: | |||
require.Fail(t, "expected transactions event") | |||
} | |||
} | |||
// ensure no event as we have not executed any transactions yet | |||
ensureNoTxFire() | |||
// Execute CheckTx for some transactions and ensure TxsAvailable only fires | |||
// once. | |||
txs := checkTxs(t, txmp, 100, 0) | |||
ensureTxFire() | |||
ensureNoTxFire() | |||
rawTxs := make([]types.Tx, len(txs)) | |||
for i, tx := range txs { | |||
rawTxs[i] = tx.tx | |||
} | |||
responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) | |||
for i := 0; i < len(responses); i++ { | |||
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} | |||
} | |||
// commit half the transactions and ensure we fire an event | |||
txmp.Lock() | |||
require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) | |||
txmp.Unlock() | |||
ensureTxFire() | |||
ensureNoTxFire() | |||
// Execute CheckTx for more transactions and ensure we do not fire another | |||
// event as we're still on the same height (1). | |||
_ = checkTxs(t, txmp, 100, 0) | |||
ensureNoTxFire() | |||
} | |||
func TestTxMempool_Size(t *testing.T) { | |||
txmp := setup(t, 0) | |||
txs := checkTxs(t, txmp, 100, 0) | |||
require.Equal(t, len(txs), txmp.Size()) | |||
require.Equal(t, int64(5490), txmp.SizeBytes()) | |||
rawTxs := make([]types.Tx, len(txs)) | |||
for i, tx := range txs { | |||
rawTxs[i] = tx.tx | |||
} | |||
responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) | |||
for i := 0; i < len(responses); i++ { | |||
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} | |||
} | |||
txmp.Lock() | |||
require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) | |||
txmp.Unlock() | |||
require.Equal(t, len(rawTxs)/2, txmp.Size()) | |||
require.Equal(t, int64(2750), txmp.SizeBytes()) | |||
} | |||
func TestTxMempool_Flush(t *testing.T) { | |||
txmp := setup(t, 0) | |||
txs := checkTxs(t, txmp, 100, 0) | |||
require.Equal(t, len(txs), txmp.Size()) | |||
require.Equal(t, int64(5490), txmp.SizeBytes()) | |||
rawTxs := make([]types.Tx, len(txs)) | |||
for i, tx := range txs { | |||
rawTxs[i] = tx.tx | |||
} | |||
responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) | |||
for i := 0; i < len(responses); i++ { | |||
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} | |||
} | |||
txmp.Lock() | |||
require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) | |||
txmp.Unlock() | |||
txmp.Flush() | |||
require.Zero(t, txmp.Size()) | |||
require.Equal(t, int64(0), txmp.SizeBytes()) | |||
} | |||
func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { | |||
txmp := setup(t, 0) | |||
tTxs := checkTxs(t, txmp, 100, 0) // all txs request 1 gas unit | |||
require.Equal(t, len(tTxs), txmp.Size()) | |||
require.Equal(t, int64(5490), txmp.SizeBytes()) | |||
txMap := make(map[[mempool.TxKeySize]byte]testTx) | |||
priorities := make([]int64, len(tTxs)) | |||
for i, tTx := range tTxs { | |||
txMap[mempool.TxKey(tTx.tx)] = tTx | |||
priorities[i] = tTx.priority | |||
} | |||
sort.Slice(priorities, func(i, j int) bool { | |||
// sort by priority, i.e. decreasing order | |||
return priorities[i] > priorities[j] | |||
}) | |||
ensurePrioritized := func(reapedTxs types.Txs) { | |||
reapedPriorities := make([]int64, len(reapedTxs)) | |||
for i, rTx := range reapedTxs { | |||
reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority | |||
} | |||
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) | |||
} | |||
// reap by gas capacity only | |||
reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50) | |||
ensurePrioritized(reapedTxs) | |||
require.Equal(t, len(tTxs), txmp.Size()) | |||
require.Equal(t, int64(5490), txmp.SizeBytes()) | |||
require.Len(t, reapedTxs, 50) | |||
// reap by transaction bytes only | |||
reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1) | |||
ensurePrioritized(reapedTxs) | |||
require.Equal(t, len(tTxs), txmp.Size()) | |||
require.Equal(t, int64(5490), txmp.SizeBytes()) | |||
require.Len(t, reapedTxs, 17) | |||
// Reap by both transaction bytes and gas, where the size yields 31 reaped | |||
// transactions and the gas limit reaps 26 transactions. | |||
reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30) | |||
ensurePrioritized(reapedTxs) | |||
require.Equal(t, len(tTxs), txmp.Size()) | |||
require.Equal(t, int64(5490), txmp.SizeBytes()) | |||
require.Len(t, reapedTxs, 26) | |||
} | |||
func TestTxMempool_ReapMaxTxs(t *testing.T) { | |||
txmp := setup(t, 0) | |||
tTxs := checkTxs(t, txmp, 100, 0) | |||
require.Equal(t, len(tTxs), txmp.Size()) | |||
require.Equal(t, int64(5490), txmp.SizeBytes()) | |||
txMap := make(map[[mempool.TxKeySize]byte]testTx) | |||
priorities := make([]int64, len(tTxs)) | |||
for i, tTx := range tTxs { | |||
txMap[mempool.TxKey(tTx.tx)] = tTx | |||
priorities[i] = tTx.priority | |||
} | |||
sort.Slice(priorities, func(i, j int) bool { | |||
// sort by priority, i.e. decreasing order | |||
return priorities[i] > priorities[j] | |||
}) | |||
ensurePrioritized := func(reapedTxs types.Txs) { | |||
reapedPriorities := make([]int64, len(reapedTxs)) | |||
for i, rTx := range reapedTxs { | |||
reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority | |||
} | |||
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) | |||
} | |||
// reap all transactions | |||
reapedTxs := txmp.ReapMaxTxs(-1) | |||
ensurePrioritized(reapedTxs) | |||
require.Equal(t, len(tTxs), txmp.Size()) | |||
require.Equal(t, int64(5490), txmp.SizeBytes()) | |||
require.Len(t, reapedTxs, len(tTxs)) | |||
// reap a single transaction | |||
reapedTxs = txmp.ReapMaxTxs(1) | |||
ensurePrioritized(reapedTxs) | |||
require.Equal(t, len(tTxs), txmp.Size()) | |||
require.Equal(t, int64(5490), txmp.SizeBytes()) | |||
require.Len(t, reapedTxs, 1) | |||
// reap half of the transactions | |||
reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2) | |||
ensurePrioritized(reapedTxs) | |||
require.Equal(t, len(tTxs), txmp.Size()) | |||
require.Equal(t, int64(5490), txmp.SizeBytes()) | |||
require.Len(t, reapedTxs, len(tTxs)/2) | |||
} | |||
func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) { | |||
txmp := setup(t, 0) | |||
rng := rand.New(rand.NewSource(time.Now().UnixNano())) | |||
tx := make([]byte, txmp.config.MaxTxsBytes+1) | |||
_, err := rng.Read(tx) | |||
require.NoError(t, err) | |||
require.Error(t, txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: 0})) | |||
} | |||
func TestTxMempool_CheckTxSamePeer(t *testing.T) { | |||
txmp := setup(t, 100) | |||
peerID := uint16(1) | |||
rng := rand.New(rand.NewSource(time.Now().UnixNano())) | |||
prefix := make([]byte, 20) | |||
_, err := rng.Read(prefix) | |||
require.NoError(t, err) | |||
tx := []byte(fmt.Sprintf("sender-0=%X=%d", prefix, 50)) | |||
require.NoError(t, txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: peerID})) | |||
require.Error(t, txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: peerID})) | |||
} | |||
func TestTxMempool_CheckTxSameSender(t *testing.T) { | |||
txmp := setup(t, 100) | |||
peerID := uint16(1) | |||
rng := rand.New(rand.NewSource(time.Now().UnixNano())) | |||
prefix1 := make([]byte, 20) | |||
_, err := rng.Read(prefix1) | |||
require.NoError(t, err) | |||
prefix2 := make([]byte, 20) | |||
_, err = rng.Read(prefix2) | |||
require.NoError(t, err) | |||
tx1 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix1, 50)) | |||
tx2 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix2, 50)) | |||
require.NoError(t, txmp.CheckTx(tx1, nil, mempool.TxInfo{SenderID: peerID})) | |||
require.Equal(t, 1, txmp.Size()) | |||
require.NoError(t, txmp.CheckTx(tx2, nil, mempool.TxInfo{SenderID: peerID})) | |||
require.Equal(t, 1, txmp.Size()) | |||
} | |||
func TestTxMempool_ConcurrentTxs(t *testing.T) { | |||
txmp := setup(t, 100) | |||
rng := rand.New(rand.NewSource(time.Now().UnixNano())) | |||
checkTxDone := make(chan struct{}) | |||
var wg sync.WaitGroup | |||
wg.Add(1) | |||
go func() { | |||
for i := 0; i < 20; i++ { | |||
_ = checkTxs(t, txmp, 100, 0) | |||
dur := rng.Intn(1000-500) + 500 | |||
time.Sleep(time.Duration(dur) * time.Millisecond) | |||
} | |||
wg.Done() | |||
close(checkTxDone) | |||
}() | |||
wg.Add(1) | |||
go func() { | |||
ticker := time.NewTicker(time.Second) | |||
defer ticker.Stop() | |||
defer wg.Done() | |||
var height int64 = 1 | |||
for range ticker.C { | |||
reapedTxs := txmp.ReapMaxTxs(200) | |||
if len(reapedTxs) > 0 { | |||
responses := make([]*abci.ResponseDeliverTx, len(reapedTxs)) | |||
for i := 0; i < len(responses); i++ { | |||
var code uint32 | |||
if i%10 == 0 { | |||
code = 100 | |||
} else { | |||
code = abci.CodeTypeOK | |||
} | |||
responses[i] = &abci.ResponseDeliverTx{Code: code} | |||
} | |||
txmp.Lock() | |||
require.NoError(t, txmp.Update(height, reapedTxs, responses, nil, nil)) | |||
txmp.Unlock() | |||
height++ | |||
} else { | |||
// only return once we know we finished the CheckTx loop | |||
select { | |||
case <-checkTxDone: | |||
return | |||
default: | |||
} | |||
} | |||
} | |||
}() | |||
wg.Wait() | |||
require.Zero(t, txmp.Size()) | |||
require.Zero(t, txmp.SizeBytes()) | |||
} |
@ -0,0 +1,159 @@ | |||
package v1 | |||
import ( | |||
"container/heap" | |||
"sort" | |||
tmsync "github.com/tendermint/tendermint/internal/libs/sync" | |||
) | |||
var _ heap.Interface = (*TxPriorityQueue)(nil) | |||
// TxPriorityQueue defines a thread-safe priority queue for valid transactions. | |||
type TxPriorityQueue struct { | |||
mtx tmsync.RWMutex | |||
txs []*WrappedTx | |||
} | |||
func NewTxPriorityQueue() *TxPriorityQueue { | |||
pq := &TxPriorityQueue{ | |||
txs: make([]*WrappedTx, 0), | |||
} | |||
heap.Init(pq) | |||
return pq | |||
} | |||
// GetEvictableTxs attempts to find and return a list of *WrappedTx than can be | |||
// evicted to make room for another *WrappedTx with higher priority. If no such | |||
// list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx | |||
// indicate that these transactions can be removed due to them being of lower | |||
// priority and that their total sum in size allows room for the incoming | |||
// transaction according to the mempool's configured limits. | |||
func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx { | |||
pq.mtx.RLock() | |||
defer pq.mtx.RUnlock() | |||
txs := make([]*WrappedTx, len(pq.txs)) | |||
copy(txs, pq.txs) | |||
sort.Slice(txs, func(i, j int) bool { | |||
return txs[i].priority < txs[j].priority | |||
}) | |||
var ( | |||
toEvict []*WrappedTx | |||
i int | |||
) | |||
currSize := totalSize | |||
// Loop over all transactions in ascending priority order evaluating those | |||
// that are only of less priority than the provided argument. We continue | |||
// evaluating transactions until there is sufficient capacity for the new | |||
// transaction (size) as defined by txSize. | |||
for i < len(txs) && txs[i].priority < priority { | |||
toEvict = append(toEvict, txs[i]) | |||
currSize -= int64(txs[i].Size()) | |||
if currSize+txSize <= cap { | |||
return toEvict | |||
} | |||
i++ | |||
} | |||
return nil | |||
} | |||
// NumTxs returns the number of transactions in the priority queue. It is | |||
// thread safe. | |||
func (pq *TxPriorityQueue) NumTxs() int { | |||
pq.mtx.RLock() | |||
defer pq.mtx.RUnlock() | |||
return len(pq.txs) | |||
} | |||
// RemoveTx removes a specific transaction from the priority queue. | |||
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) { | |||
pq.mtx.Lock() | |||
defer pq.mtx.Unlock() | |||
if tx.heapIndex < len(pq.txs) { | |||
heap.Remove(pq, tx.heapIndex) | |||
} | |||
} | |||
// PushTx adds a valid transaction to the priority queue. It is thread safe. | |||
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { | |||
pq.mtx.Lock() | |||
defer pq.mtx.Unlock() | |||
heap.Push(pq, tx) | |||
} | |||
// PopTx removes the top priority transaction from the queue. It is thread safe. | |||
func (pq *TxPriorityQueue) PopTx() *WrappedTx { | |||
pq.mtx.Lock() | |||
defer pq.mtx.Unlock() | |||
x := heap.Pop(pq) | |||
if x != nil { | |||
return x.(*WrappedTx) | |||
} | |||
return nil | |||
} | |||
// Push implements the Heap interface. | |||
// | |||
// NOTE: A caller should never call Push. Use PushTx instead. | |||
func (pq *TxPriorityQueue) Push(x interface{}) { | |||
n := len(pq.txs) | |||
item := x.(*WrappedTx) | |||
item.heapIndex = n | |||
pq.txs = append(pq.txs, item) | |||
} | |||
// Pop implements the Heap interface. | |||
// | |||
// NOTE: A caller should never call Pop. Use PopTx instead. | |||
func (pq *TxPriorityQueue) Pop() interface{} { | |||
old := pq.txs | |||
n := len(old) | |||
item := old[n-1] | |||
old[n-1] = nil // avoid memory leak | |||
item.heapIndex = -1 // for safety | |||
pq.txs = old[0 : n-1] | |||
return item | |||
} | |||
// Len implements the Heap interface. | |||
// | |||
// NOTE: A caller should never call Len. Use NumTxs instead. | |||
func (pq *TxPriorityQueue) Len() int { | |||
return len(pq.txs) | |||
} | |||
// Less implements the Heap interface. It returns true if the transaction at | |||
// position i in the queue is of less priority than the transaction at position j. | |||
func (pq *TxPriorityQueue) Less(i, j int) bool { | |||
// If there exists two transactions with the same priority, consider the one | |||
// that we saw the earliest as the higher priority transaction. | |||
if pq.txs[i].priority == pq.txs[j].priority { | |||
return pq.txs[i].timestamp.Before(pq.txs[j].timestamp) | |||
} | |||
// We want Pop to give us the highest, not lowest, priority so we use greater | |||
// than here. | |||
return pq.txs[i].priority > pq.txs[j].priority | |||
} | |||
// Swap implements the Heap interface. It swaps two transactions in the queue. | |||
func (pq *TxPriorityQueue) Swap(i, j int) { | |||
pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i] | |||
pq.txs[i].heapIndex = i | |||
pq.txs[j].heapIndex = j | |||
} |
@ -0,0 +1,176 @@ | |||
package v1 | |||
import ( | |||
"math/rand" | |||
"sort" | |||
"sync" | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/require" | |||
) | |||
func TestTxPriorityQueue(t *testing.T) { | |||
pq := NewTxPriorityQueue() | |||
numTxs := 1000 | |||
priorities := make([]int, numTxs) | |||
var wg sync.WaitGroup | |||
for i := 1; i <= numTxs; i++ { | |||
priorities[i-1] = i | |||
wg.Add(1) | |||
go func(i int) { | |||
pq.PushTx(&WrappedTx{ | |||
priority: int64(i), | |||
timestamp: time.Now(), | |||
}) | |||
wg.Done() | |||
}(i) | |||
} | |||
sort.Sort(sort.Reverse(sort.IntSlice(priorities))) | |||
wg.Wait() | |||
require.Equal(t, numTxs, pq.NumTxs()) | |||
// Wait a second and push a tx with a duplicate priority | |||
time.Sleep(time.Second) | |||
now := time.Now() | |||
pq.PushTx(&WrappedTx{ | |||
priority: 1000, | |||
timestamp: now, | |||
}) | |||
require.Equal(t, 1001, pq.NumTxs()) | |||
tx := pq.PopTx() | |||
require.Equal(t, 1000, pq.NumTxs()) | |||
require.Equal(t, int64(1000), tx.priority) | |||
require.NotEqual(t, now, tx.timestamp) | |||
gotPriorities := make([]int, 0) | |||
for pq.NumTxs() > 0 { | |||
gotPriorities = append(gotPriorities, int(pq.PopTx().priority)) | |||
} | |||
require.Equal(t, priorities, gotPriorities) | |||
} | |||
func TestTxPriorityQueue_GetEvictableTxs(t *testing.T) { | |||
pq := NewTxPriorityQueue() | |||
rng := rand.New(rand.NewSource(time.Now().UnixNano())) | |||
values := make([]int, 1000) | |||
for i := 0; i < 1000; i++ { | |||
tx := make([]byte, 5) // each tx is 5 bytes | |||
_, err := rng.Read(tx) | |||
require.NoError(t, err) | |||
x := rng.Intn(100000) | |||
pq.PushTx(&WrappedTx{ | |||
tx: tx, | |||
priority: int64(x), | |||
}) | |||
values[i] = x | |||
} | |||
sort.Ints(values) | |||
max := values[len(values)-1] | |||
min := values[0] | |||
totalSize := int64(len(values) * 5) | |||
testCases := []struct { | |||
name string | |||
priority, txSize, totalSize, cap int64 | |||
expectedLen int | |||
}{ | |||
{ | |||
name: "larest priority; single tx", | |||
priority: int64(max + 1), | |||
txSize: 5, | |||
totalSize: totalSize, | |||
cap: totalSize, | |||
expectedLen: 1, | |||
}, | |||
{ | |||
name: "larest priority; multi tx", | |||
priority: int64(max + 1), | |||
txSize: 17, | |||
totalSize: totalSize, | |||
cap: totalSize, | |||
expectedLen: 4, | |||
}, | |||
{ | |||
name: "larest priority; out of capacity", | |||
priority: int64(max + 1), | |||
txSize: totalSize + 1, | |||
totalSize: totalSize, | |||
cap: totalSize, | |||
expectedLen: 0, | |||
}, | |||
{ | |||
name: "smallest priority; no tx", | |||
priority: int64(min - 1), | |||
txSize: 5, | |||
totalSize: totalSize, | |||
cap: totalSize, | |||
expectedLen: 0, | |||
}, | |||
{ | |||
name: "small priority; no tx", | |||
priority: int64(min), | |||
txSize: 5, | |||
totalSize: totalSize, | |||
cap: totalSize, | |||
expectedLen: 0, | |||
}, | |||
} | |||
for _, tc := range testCases { | |||
tc := tc | |||
t.Run(tc.name, func(t *testing.T) { | |||
evictTxs := pq.GetEvictableTxs(tc.priority, tc.txSize, tc.totalSize, tc.cap) | |||
require.Len(t, evictTxs, tc.expectedLen) | |||
}) | |||
} | |||
} | |||
func TestTxPriorityQueue_RemoveTx(t *testing.T) { | |||
pq := NewTxPriorityQueue() | |||
rng := rand.New(rand.NewSource(time.Now().UnixNano())) | |||
numTxs := 1000 | |||
values := make([]int, numTxs) | |||
for i := 0; i < numTxs; i++ { | |||
x := rng.Intn(100000) | |||
pq.PushTx(&WrappedTx{ | |||
priority: int64(x), | |||
}) | |||
values[i] = x | |||
} | |||
require.Equal(t, numTxs, pq.NumTxs()) | |||
sort.Ints(values) | |||
max := values[len(values)-1] | |||
wtx := pq.txs[pq.NumTxs()/2] | |||
pq.RemoveTx(wtx) | |||
require.Equal(t, numTxs-1, pq.NumTxs()) | |||
require.Equal(t, int64(max), pq.PopTx().priority) | |||
require.Equal(t, numTxs-2, pq.NumTxs()) | |||
require.NotPanics(t, func() { | |||
pq.RemoveTx(&WrappedTx{heapIndex: numTxs}) | |||
pq.RemoveTx(&WrappedTx{heapIndex: numTxs + 1}) | |||
}) | |||
require.Equal(t, numTxs-2, pq.NumTxs()) | |||
} |
@ -0,0 +1,394 @@ | |||
package v1 | |||
import ( | |||
"errors" | |||
"fmt" | |||
"sync" | |||
"time" | |||
cfg "github.com/tendermint/tendermint/config" | |||
tmsync "github.com/tendermint/tendermint/internal/libs/sync" | |||
"github.com/tendermint/tendermint/libs/log" | |||
"github.com/tendermint/tendermint/libs/service" | |||
"github.com/tendermint/tendermint/mempool" | |||
"github.com/tendermint/tendermint/p2p" | |||
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
var ( | |||
_ service.Service = (*Reactor)(nil) | |||
_ p2p.Wrapper = (*protomem.Message)(nil) | |||
) | |||
// PeerManager defines the interface contract required for getting necessary | |||
// peer information. This should eventually be replaced with a message-oriented | |||
// approach utilizing the p2p stack. | |||
type PeerManager interface { | |||
GetHeight(p2p.NodeID) int64 | |||
} | |||
// Reactor implements a service that contains mempool of txs that are broadcasted | |||
// amongst peers. It maintains a map from peer ID to counter, to prevent gossiping | |||
// txs to the peers you received it from. | |||
type Reactor struct { | |||
service.BaseService | |||
config *cfg.MempoolConfig | |||
mempool *TxMempool | |||
ids *mempool.MempoolIDs | |||
// XXX: Currently, this is the only way to get information about a peer. Ideally, | |||
// we rely on message-oriented communication to get necessary peer data. | |||
// ref: https://github.com/tendermint/tendermint/issues/5670 | |||
peerMgr PeerManager | |||
mempoolCh *p2p.Channel | |||
peerUpdates *p2p.PeerUpdates | |||
closeCh chan struct{} | |||
// peerWG is used to coordinate graceful termination of all peer broadcasting | |||
// goroutines. | |||
peerWG sync.WaitGroup | |||
mtx tmsync.Mutex | |||
peerRoutines map[p2p.NodeID]*tmsync.Closer | |||
} | |||
// NewReactor returns a reference to a new reactor. | |||
func NewReactor( | |||
logger log.Logger, | |||
config *cfg.MempoolConfig, | |||
peerMgr PeerManager, | |||
txmp *TxMempool, | |||
mempoolCh *p2p.Channel, | |||
peerUpdates *p2p.PeerUpdates, | |||
) *Reactor { | |||
r := &Reactor{ | |||
config: config, | |||
peerMgr: peerMgr, | |||
mempool: txmp, | |||
ids: mempool.NewMempoolIDs(), | |||
mempoolCh: mempoolCh, | |||
peerUpdates: peerUpdates, | |||
closeCh: make(chan struct{}), | |||
peerRoutines: make(map[p2p.NodeID]*tmsync.Closer), | |||
} | |||
r.BaseService = *service.NewBaseService(logger, "Mempool", r) | |||
return r | |||
} | |||
// GetChannelShims returns a map of ChannelDescriptorShim objects, where each | |||
// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding | |||
// p2p proto.Message the new p2p Channel is responsible for handling. | |||
// | |||
// | |||
// TODO: Remove once p2p refactor is complete. | |||
// ref: https://github.com/tendermint/tendermint/issues/5670 | |||
func GetChannelShims(config *cfg.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDescriptorShim { | |||
largestTx := make([]byte, config.MaxTxBytes) | |||
batchMsg := protomem.Message{ | |||
Sum: &protomem.Message_Txs{ | |||
Txs: &protomem.Txs{Txs: [][]byte{largestTx}}, | |||
}, | |||
} | |||
return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ | |||
mempool.MempoolChannel: { | |||
MsgType: new(protomem.Message), | |||
Descriptor: &p2p.ChannelDescriptor{ | |||
ID: byte(mempool.MempoolChannel), | |||
Priority: 5, | |||
RecvMessageCapacity: batchMsg.Size(), | |||
MaxSendBytes: 5000, | |||
}, | |||
}, | |||
} | |||
} | |||
// OnStart starts separate go routines for each p2p Channel and listens for | |||
// envelopes on each. In addition, it also listens for peer updates and handles | |||
// messages on that p2p channel accordingly. The caller must be sure to execute | |||
// OnStop to ensure the outbound p2p Channels are closed. | |||
func (r *Reactor) OnStart() error { | |||
if !r.config.Broadcast { | |||
r.Logger.Info("tx broadcasting is disabled") | |||
} | |||
go r.processMempoolCh() | |||
go r.processPeerUpdates() | |||
return nil | |||
} | |||
// OnStop stops the reactor by signaling to all spawned goroutines to exit and | |||
// blocking until they all exit. | |||
func (r *Reactor) OnStop() { | |||
r.mtx.Lock() | |||
for _, c := range r.peerRoutines { | |||
c.Close() | |||
} | |||
r.mtx.Unlock() | |||
// wait for all spawned peer tx broadcasting goroutines to gracefully exit | |||
r.peerWG.Wait() | |||
// Close closeCh to signal to all spawned goroutines to gracefully exit. All | |||
// p2p Channels should execute Close(). | |||
close(r.closeCh) | |||
// Wait for all p2p Channels to be closed before returning. This ensures we | |||
// can easily reason about synchronization of all p2p Channels and ensure no | |||
// panics will occur. | |||
<-r.mempoolCh.Done() | |||
<-r.peerUpdates.Done() | |||
} | |||
// handleMempoolMessage handles envelopes sent from peers on the MempoolChannel. | |||
// For every tx in the message, we execute CheckTx. It returns an error if an | |||
// empty set of txs are sent in an envelope or if we receive an unexpected | |||
// message type. | |||
func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error { | |||
logger := r.Logger.With("peer", envelope.From) | |||
switch msg := envelope.Message.(type) { | |||
case *protomem.Txs: | |||
protoTxs := msg.GetTxs() | |||
if len(protoTxs) == 0 { | |||
return errors.New("empty txs received from peer") | |||
} | |||
txInfo := mempool.TxInfo{SenderID: r.ids.GetForPeer(envelope.From)} | |||
if len(envelope.From) != 0 { | |||
txInfo.SenderNodeID = envelope.From | |||
} | |||
for _, tx := range protoTxs { | |||
if err := r.mempool.CheckTx(types.Tx(tx), nil, txInfo); err != nil { | |||
logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(tx)), "err", err) | |||
} | |||
} | |||
default: | |||
return fmt.Errorf("received unknown message: %T", msg) | |||
} | |||
return nil | |||
} | |||
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. | |||
// It will handle errors and any possible panics gracefully. A caller can handle | |||
// any error returned by sending a PeerError on the respective channel. | |||
func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) { | |||
defer func() { | |||
if e := recover(); e != nil { | |||
err = fmt.Errorf("panic in processing message: %v", e) | |||
} | |||
}() | |||
r.Logger.Debug("received message", "peer", envelope.From) | |||
switch chID { | |||
case mempool.MempoolChannel: | |||
err = r.handleMempoolMessage(envelope) | |||
default: | |||
err = fmt.Errorf("unknown channel ID (%d) for envelope (%T)", chID, envelope.Message) | |||
} | |||
return err | |||
} | |||
// processMempoolCh implements a blocking event loop where we listen for p2p | |||
// Envelope messages from the mempoolCh. | |||
func (r *Reactor) processMempoolCh() { | |||
defer r.mempoolCh.Close() | |||
for { | |||
select { | |||
case envelope := <-r.mempoolCh.In: | |||
if err := r.handleMessage(r.mempoolCh.ID, envelope); err != nil { | |||
r.Logger.Error("failed to process message", "ch_id", r.mempoolCh.ID, "envelope", envelope, "err", err) | |||
r.mempoolCh.Error <- p2p.PeerError{ | |||
NodeID: envelope.From, | |||
Err: err, | |||
} | |||
} | |||
case <-r.closeCh: | |||
r.Logger.Debug("stopped listening on mempool channel; closing...") | |||
return | |||
} | |||
} | |||
} | |||
// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we | |||
// check if the reactor is running and if we've already started a tx broadcasting | |||
// goroutine or not. If not, we start one for the newly added peer. For down or | |||
// removed peers, we remove the peer from the mempool peer ID set and signal to | |||
// stop the tx broadcasting goroutine. | |||
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { | |||
r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) | |||
r.mtx.Lock() | |||
defer r.mtx.Unlock() | |||
switch peerUpdate.Status { | |||
case p2p.PeerStatusUp: | |||
// Do not allow starting new tx broadcast loops after reactor shutdown | |||
// has been initiated. This can happen after we've manually closed all | |||
// peer broadcast loops and closed r.closeCh, but the router still sends | |||
// in-flight peer updates. | |||
if !r.IsRunning() { | |||
return | |||
} | |||
if r.config.Broadcast { | |||
// Check if we've already started a goroutine for this peer, if not we create | |||
// a new done channel so we can explicitly close the goroutine if the peer | |||
// is later removed, we increment the waitgroup so the reactor can stop | |||
// safely, and finally start the goroutine to broadcast txs to that peer. | |||
_, ok := r.peerRoutines[peerUpdate.NodeID] | |||
if !ok { | |||
closer := tmsync.NewCloser() | |||
r.peerRoutines[peerUpdate.NodeID] = closer | |||
r.peerWG.Add(1) | |||
r.ids.ReserveForPeer(peerUpdate.NodeID) | |||
// start a broadcast routine ensuring all txs are forwarded to the peer | |||
go r.broadcastTxRoutine(peerUpdate.NodeID, closer) | |||
} | |||
} | |||
case p2p.PeerStatusDown: | |||
r.ids.Reclaim(peerUpdate.NodeID) | |||
// Check if we've started a tx broadcasting goroutine for this peer. | |||
// If we have, we signal to terminate the goroutine via the channel's closure. | |||
// This will internally decrement the peer waitgroup and remove the peer | |||
// from the map of peer tx broadcasting goroutines. | |||
closer, ok := r.peerRoutines[peerUpdate.NodeID] | |||
if ok { | |||
closer.Close() | |||
} | |||
} | |||
} | |||
// processPeerUpdates initiates a blocking process where we listen for and handle | |||
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and | |||
// close the p2p PeerUpdatesCh gracefully. | |||
func (r *Reactor) processPeerUpdates() { | |||
defer r.peerUpdates.Close() | |||
for { | |||
select { | |||
case peerUpdate := <-r.peerUpdates.Updates(): | |||
r.processPeerUpdate(peerUpdate) | |||
case <-r.closeCh: | |||
r.Logger.Debug("stopped listening on peer updates channel; closing...") | |||
return | |||
} | |||
} | |||
} | |||
func (r *Reactor) broadcastTxRoutine(peerID p2p.NodeID, closer *tmsync.Closer) { | |||
peerMempoolID := r.ids.GetForPeer(peerID) | |||
var memTx *WrappedTx | |||
// remove the peer ID from the map of routines and mark the waitgroup as done | |||
defer func() { | |||
r.mtx.Lock() | |||
delete(r.peerRoutines, peerID) | |||
r.mtx.Unlock() | |||
r.peerWG.Done() | |||
if e := recover(); e != nil { | |||
r.Logger.Error("recovering from broadcasting mempool loop", "err", e) | |||
} | |||
}() | |||
for { | |||
if !r.IsRunning() { | |||
return | |||
} | |||
// This happens because the CElement we were looking at got garbage | |||
// collected (removed). That is, .NextWait() returned nil. Go ahead and | |||
// start from the beginning. | |||
if memTx == nil { | |||
select { | |||
case <-r.mempool.WaitForNextTx(): // wait until a tx is available | |||
if memTx = r.mempool.NextGossipTx(); memTx == nil { | |||
continue | |||
} | |||
case <-closer.Done(): | |||
// The peer is marked for removal via a PeerUpdate as the doneCh was | |||
// explicitly closed to signal we should exit. | |||
return | |||
case <-r.closeCh: | |||
// The reactor has signaled that we are stopped and thus we should | |||
// implicitly exit this peer's goroutine. | |||
return | |||
} | |||
} | |||
if r.peerMgr != nil { | |||
height := r.peerMgr.GetHeight(peerID) | |||
if height > 0 && height < memTx.height-1 { | |||
// allow for a lag of one block | |||
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) | |||
continue | |||
} | |||
} | |||
// NOTE: Transaction batching was disabled due to: | |||
// https://github.com/tendermint/tendermint/issues/5796 | |||
if ok := r.mempool.txStore.TxHasPeer(memTx.hash, peerMempoolID); !ok { | |||
// Send the mempool tx to the corresponding peer. Note, the peer may be | |||
// behind and thus would not be able to process the mempool tx correctly. | |||
r.mempoolCh.Out <- p2p.Envelope{ | |||
To: peerID, | |||
Message: &protomem.Txs{ | |||
Txs: [][]byte{memTx.tx}, | |||
}, | |||
} | |||
r.Logger.Debug( | |||
"gossiped tx to peer", | |||
"tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(memTx.tx)), | |||
"peer", peerID, | |||
) | |||
} | |||
select { | |||
case <-memTx.gossipEl.NextWaitChan(): | |||
// If there is a next element in gossip index, we point memTx to that node's | |||
// value, otherwise we reset memTx to nil which will be checked at the | |||
// parent for loop. | |||
next := memTx.gossipEl.Next() | |||
if next != nil { | |||
memTx = next.Value.(*WrappedTx) | |||
} else { | |||
memTx = nil | |||
} | |||
case <-closer.Done(): | |||
// The peer is marked for removal via a PeerUpdate as the doneCh was | |||
// explicitly closed to signal we should exit. | |||
return | |||
case <-r.closeCh: | |||
// The reactor has signaled that we are stopped and thus we should | |||
// implicitly exit this peer's goroutine. | |||
return | |||
} | |||
} | |||
} |
@ -0,0 +1,200 @@ | |||
package v1 | |||
import ( | |||
"time" | |||
"github.com/tendermint/tendermint/internal/libs/clist" | |||
tmsync "github.com/tendermint/tendermint/internal/libs/sync" | |||
"github.com/tendermint/tendermint/mempool" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// WrappedTx defines a wrapper around a raw transaction with additional metadata | |||
// that is used for indexing. | |||
type WrappedTx struct { | |||
// tx represents the raw binary transaction data | |||
tx types.Tx | |||
// hash defines the transaction hash and the primary key used in the mempool | |||
hash [mempool.TxKeySize]byte | |||
// height defines the height at which the transaction was validated at | |||
height int64 | |||
// gasWanted defines the amount of gas the transaction sender requires | |||
gasWanted int64 | |||
// priority defines the transaction's priority as specified by the application | |||
// in the ResponseCheckTx response. | |||
priority int64 | |||
// sender defines the transaction's sender as specified by the application in | |||
// the ResponseCheckTx response. | |||
sender string | |||
// timestamp is the time at which the node first received the transaction from | |||
// a peer. It is used as a second dimension is prioritizing transactions when | |||
// two transactions have the same priority. | |||
timestamp time.Time | |||
// peers records a mapping of all peers that sent a given transaction | |||
peers map[uint16]struct{} | |||
// heapIndex defines the index of the item in the heap | |||
heapIndex int | |||
// gossipEl references the linked-list element in the gossip index | |||
gossipEl *clist.CElement | |||
// removed marks the transaction as removed from the mempool. This is set | |||
// during RemoveTx and is needed due to the fact that a given existing | |||
// transaction in the mempool can be evicted when it is simultaneously having | |||
// a reCheckTx callback executed. | |||
removed bool | |||
} | |||
func (wtx *WrappedTx) Size() int { | |||
return len(wtx.tx) | |||
} | |||
// TxStore implements a thread-safe mapping of valid transaction(s). | |||
// | |||
// NOTE: | |||
// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative | |||
// access is not allowed. Regardless, it is not expected for the mempool to | |||
// need mutative access. | |||
type TxStore struct { | |||
mtx tmsync.RWMutex | |||
hashTxs map[[mempool.TxKeySize]byte]*WrappedTx // primary index | |||
senderTxs map[string]*WrappedTx // sender is defined by the ABCI application | |||
} | |||
func NewTxStore() *TxStore { | |||
return &TxStore{ | |||
senderTxs: make(map[string]*WrappedTx), | |||
hashTxs: make(map[[mempool.TxKeySize]byte]*WrappedTx), | |||
} | |||
} | |||
// Size returns the total number of transactions in the store. | |||
func (txs *TxStore) Size() int { | |||
txs.mtx.RLock() | |||
defer txs.mtx.RUnlock() | |||
return len(txs.hashTxs) | |||
} | |||
// GetAllTxs returns all the transactions currently in the store. | |||
func (txs *TxStore) GetAllTxs() []*WrappedTx { | |||
txs.mtx.RLock() | |||
defer txs.mtx.RUnlock() | |||
wTxs := make([]*WrappedTx, len(txs.hashTxs)) | |||
i := 0 | |||
for _, wtx := range txs.hashTxs { | |||
wTxs[i] = wtx | |||
i++ | |||
} | |||
return wTxs | |||
} | |||
// GetTxBySender returns a *WrappedTx by the transaction's sender property | |||
// defined by the ABCI application. | |||
func (txs *TxStore) GetTxBySender(sender string) *WrappedTx { | |||
txs.mtx.RLock() | |||
defer txs.mtx.RUnlock() | |||
return txs.senderTxs[sender] | |||
} | |||
// GetTxByHash returns a *WrappedTx by the transaction's hash. | |||
func (txs *TxStore) GetTxByHash(hash [mempool.TxKeySize]byte) *WrappedTx { | |||
txs.mtx.RLock() | |||
defer txs.mtx.RUnlock() | |||
return txs.hashTxs[hash] | |||
} | |||
// IsTxRemoved returns true if a transaction by hash is marked as removed and | |||
// false otherwise. | |||
func (txs *TxStore) IsTxRemoved(hash [mempool.TxKeySize]byte) bool { | |||
txs.mtx.RLock() | |||
defer txs.mtx.RUnlock() | |||
wtx, ok := txs.hashTxs[hash] | |||
if ok { | |||
return wtx.removed | |||
} | |||
return false | |||
} | |||
// SetTx stores a *WrappedTx by it's hash. If the transaction also contains a | |||
// non-empty sender, we additionally store the transaction by the sender as | |||
// defined by the ABCI application. | |||
func (txs *TxStore) SetTx(wtx *WrappedTx) { | |||
txs.mtx.Lock() | |||
defer txs.mtx.Unlock() | |||
if len(wtx.sender) > 0 { | |||
txs.senderTxs[wtx.sender] = wtx | |||
} | |||
txs.hashTxs[mempool.TxKey(wtx.tx)] = wtx | |||
} | |||
// RemoveTx removes a *WrappedTx from the transaction store. It deletes all | |||
// indexes of the transaction. | |||
func (txs *TxStore) RemoveTx(wtx *WrappedTx) { | |||
txs.mtx.Lock() | |||
defer txs.mtx.Unlock() | |||
if len(wtx.sender) > 0 { | |||
delete(txs.senderTxs, wtx.sender) | |||
} | |||
delete(txs.hashTxs, mempool.TxKey(wtx.tx)) | |||
wtx.removed = true | |||
} | |||
// TxHasPeer returns true if a transaction by hash has a given peer ID and false | |||
// otherwise. If the transaction does not exist, false is returned. | |||
func (txs *TxStore) TxHasPeer(hash [mempool.TxKeySize]byte, peerID uint16) bool { | |||
txs.mtx.RLock() | |||
defer txs.mtx.RUnlock() | |||
wtx := txs.hashTxs[hash] | |||
if wtx == nil { | |||
return false | |||
} | |||
_, ok := wtx.peers[peerID] | |||
return ok | |||
} | |||
// GetOrSetPeerByTxHash looks up a WrappedTx by transaction hash and adds the | |||
// given peerID to the WrappedTx's set of peers that sent us this transaction. | |||
// We return true if we've already recorded the given peer for this transaction | |||
// and false otherwise. If the transaction does not exist by hash, we return | |||
// (nil, false). | |||
func (txs *TxStore) GetOrSetPeerByTxHash(hash [mempool.TxKeySize]byte, peerID uint16) (*WrappedTx, bool) { | |||
txs.mtx.Lock() | |||
defer txs.mtx.Unlock() | |||
wtx := txs.hashTxs[hash] | |||
if wtx == nil { | |||
return nil, false | |||
} | |||
if wtx.peers == nil { | |||
wtx.peers = make(map[uint16]struct{}) | |||
} | |||
if _, ok := wtx.peers[peerID]; ok { | |||
return wtx, true | |||
} | |||
wtx.peers[peerID] = struct{}{} | |||
return wtx, false | |||
} |
@ -0,0 +1,134 @@ | |||
package v1 | |||
import ( | |||
"fmt" | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/require" | |||
"github.com/tendermint/tendermint/mempool" | |||
) | |||
func TestTxStore_GetTxBySender(t *testing.T) { | |||
txs := NewTxStore() | |||
wtx := &WrappedTx{ | |||
tx: []byte("test_tx"), | |||
sender: "foo", | |||
priority: 1, | |||
timestamp: time.Now(), | |||
} | |||
res := txs.GetTxBySender(wtx.sender) | |||
require.Nil(t, res) | |||
txs.SetTx(wtx) | |||
res = txs.GetTxBySender(wtx.sender) | |||
require.NotNil(t, res) | |||
require.Equal(t, wtx, res) | |||
} | |||
func TestTxStore_GetTxByHash(t *testing.T) { | |||
txs := NewTxStore() | |||
wtx := &WrappedTx{ | |||
tx: []byte("test_tx"), | |||
sender: "foo", | |||
priority: 1, | |||
timestamp: time.Now(), | |||
} | |||
key := mempool.TxKey(wtx.tx) | |||
res := txs.GetTxByHash(key) | |||
require.Nil(t, res) | |||
txs.SetTx(wtx) | |||
res = txs.GetTxByHash(key) | |||
require.NotNil(t, res) | |||
require.Equal(t, wtx, res) | |||
} | |||
func TestTxStore_SetTx(t *testing.T) { | |||
txs := NewTxStore() | |||
wtx := &WrappedTx{ | |||
tx: []byte("test_tx"), | |||
priority: 1, | |||
timestamp: time.Now(), | |||
} | |||
key := mempool.TxKey(wtx.tx) | |||
txs.SetTx(wtx) | |||
res := txs.GetTxByHash(key) | |||
require.NotNil(t, res) | |||
require.Equal(t, wtx, res) | |||
wtx.sender = "foo" | |||
txs.SetTx(wtx) | |||
res = txs.GetTxByHash(key) | |||
require.NotNil(t, res) | |||
require.Equal(t, wtx, res) | |||
} | |||
func TestTxStore_GetOrSetPeerByTxHash(t *testing.T) { | |||
txs := NewTxStore() | |||
wtx := &WrappedTx{ | |||
tx: []byte("test_tx"), | |||
priority: 1, | |||
timestamp: time.Now(), | |||
} | |||
key := mempool.TxKey(wtx.tx) | |||
txs.SetTx(wtx) | |||
res, ok := txs.GetOrSetPeerByTxHash(mempool.TxKey([]byte("test_tx_2")), 15) | |||
require.Nil(t, res) | |||
require.False(t, ok) | |||
res, ok = txs.GetOrSetPeerByTxHash(key, 15) | |||
require.NotNil(t, res) | |||
require.False(t, ok) | |||
res, ok = txs.GetOrSetPeerByTxHash(key, 15) | |||
require.NotNil(t, res) | |||
require.True(t, ok) | |||
require.True(t, txs.TxHasPeer(key, 15)) | |||
require.False(t, txs.TxHasPeer(key, 16)) | |||
} | |||
func TestTxStore_RemoveTx(t *testing.T) { | |||
txs := NewTxStore() | |||
wtx := &WrappedTx{ | |||
tx: []byte("test_tx"), | |||
priority: 1, | |||
timestamp: time.Now(), | |||
} | |||
txs.SetTx(wtx) | |||
key := mempool.TxKey(wtx.tx) | |||
res := txs.GetTxByHash(key) | |||
require.NotNil(t, res) | |||
txs.RemoveTx(res) | |||
res = txs.GetTxByHash(key) | |||
require.Nil(t, res) | |||
} | |||
func TestTxStore_Size(t *testing.T) { | |||
txStore := NewTxStore() | |||
numTxs := 1000 | |||
for i := 0; i < numTxs; i++ { | |||
txStore.SetTx(&WrappedTx{ | |||
tx: []byte(fmt.Sprintf("test_tx_%d", i)), | |||
priority: int64(i), | |||
timestamp: time.Now(), | |||
}) | |||
} | |||
require.Equal(t, numTxs, txStore.Size()) | |||
} |