From 68ffe8bc64e4efc8b50ee9ad2b857cecd0ca6c8d Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Mon, 19 Jul 2021 15:54:44 -0400 Subject: [PATCH] mempool: add TTL configuration to mempool (#6715) --- config/config.go | 39 ++++++++++-- config/toml.go | 16 +++++ docs/nodes/configuration.md | 23 ++++++- internal/mempool/v1/mempool.go | 82 +++++++++++++++++++++--- internal/mempool/v1/mempool_test.go | 84 +++++++++++++++++++------ internal/mempool/v1/tx.go | 82 ++++++++++++++++++++++++ internal/mempool/v1/tx_test.go | 96 +++++++++++++++++++++++++++++ 7 files changed, 390 insertions(+), 32 deletions(-) diff --git a/config/config.go b/config/config.go index 750c74e45..99b82fb46 100644 --- a/config/config.go +++ b/config/config.go @@ -784,25 +784,47 @@ type MempoolConfig struct { RootDir string `mapstructure:"home"` Recheck bool `mapstructure:"recheck"` Broadcast bool `mapstructure:"broadcast"` + // Maximum number of transactions in the mempool Size int `mapstructure:"size"` + // Limit the total size of all txs in the mempool. // This only accounts for raw transactions (e.g. given 1MB transactions and // max-txs-bytes=5MB, mempool will only accept 5 transactions). MaxTxsBytes int64 `mapstructure:"max-txs-bytes"` + // Size of the cache (used to filter transactions we saw earlier) in transactions CacheSize int `mapstructure:"cache-size"` + // Do not remove invalid transactions from the cache (default: false) // Set to true if it's not possible for any invalid transaction to become // valid again in the future. KeepInvalidTxsInCache bool `mapstructure:"keep-invalid-txs-in-cache"` + // Maximum size of a single transaction // NOTE: the max size of a tx transmitted over the network is {max-tx-bytes}. MaxTxBytes int `mapstructure:"max-tx-bytes"` + // Maximum size of a batch of transactions to send to a peer // Including space needed by encoding (one varint per transaction). // XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796 MaxBatchBytes int `mapstructure:"max-batch-bytes"` + + // TTLDuration, if non-zero, defines the maximum amount of time a transaction + // can exist for in the mempool. + // + // Note, if TTLNumBlocks is also defined, a transaction will be removed if it + // has existed in the mempool at least TTLNumBlocks number of blocks or if it's + // insertion time into the mempool is beyond TTLDuration. + TTLDuration time.Duration `mapstructure:"ttl-duration"` + + // TTLNumBlocks, if non-zero, defines the maximum number of blocks a transaction + // can exist for in the mempool. + // + // Note, if TTLDuration is also defined, a transaction will be removed if it + // has existed in the mempool at least TTLNumBlocks number of blocks or if + // it's insertion time into the mempool is beyond TTLDuration. + TTLNumBlocks int64 `mapstructure:"ttl-num-blocks"` } // DefaultMempoolConfig returns a default configuration for the Tendermint mempool. @@ -813,10 +835,12 @@ func DefaultMempoolConfig() *MempoolConfig { Broadcast: true, // Each signature verification takes .5ms, Size reduced until we implement // ABCI Recheck - Size: 5000, - MaxTxsBytes: 1024 * 1024 * 1024, // 1GB - CacheSize: 10000, - MaxTxBytes: 1024 * 1024, // 1MB + Size: 5000, + MaxTxsBytes: 1024 * 1024 * 1024, // 1GB + CacheSize: 10000, + MaxTxBytes: 1024 * 1024, // 1MB + TTLDuration: 0 * time.Second, + TTLNumBlocks: 0, } } @@ -842,6 +866,13 @@ func (cfg *MempoolConfig) ValidateBasic() error { if cfg.MaxTxBytes < 0 { return errors.New("max-tx-bytes can't be negative") } + if cfg.TTLDuration < 0 { + return errors.New("ttl-duration can't be negative") + } + if cfg.TTLNumBlocks < 0 { + return errors.New("ttl-num-blocks can't be negative") + } + return nil } diff --git a/config/toml.go b/config/toml.go index 925e9d27c..aae716a58 100644 --- a/config/toml.go +++ b/config/toml.go @@ -399,6 +399,22 @@ max-tx-bytes = {{ .Mempool.MaxTxBytes }} # XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796 max-batch-bytes = {{ .Mempool.MaxBatchBytes }} +# ttl-duration, if non-zero, defines the maximum amount of time a transaction +# can exist for in the mempool. +# +# Note, if ttl-num-blocks is also defined, a transaction will be removed if it +# has existed in the mempool at least ttl-num-blocks number of blocks or if it's +# insertion time into the mempool is beyond ttl-duration. +ttl-duration = "{{ .Mempool.TTLDuration }}" + +# ttl-num-blocks, if non-zero, defines the maximum number of blocks a transaction +# can exist for in the mempool. +# +# Note, if ttl-duration is also defined, a transaction will be removed if it +# has existed in the mempool at least ttl-num-blocks number of blocks or if +# it's insertion time into the mempool is beyond ttl-duration. +ttl-num-blocks = {{ .Mempool.TTLNumBlocks }} + ####################################################### ### State Sync Configuration Options ### ####################################################### diff --git a/docs/nodes/configuration.md b/docs/nodes/configuration.md index 203863990..6e2665b26 100644 --- a/docs/nodes/configuration.md +++ b/docs/nodes/configuration.md @@ -275,9 +275,13 @@ dial-timeout = "3s" ####################################################### [mempool] +# Mempool version to use: +# 1) "v0" - The legacy non-prioritized mempool reactor. +# 2) "v1" (default) - The prioritized mempool reactor. +version = "v1" + recheck = true broadcast = true -wal-dir = "" # Maximum number of transactions in the mempool size = 5000 @@ -304,6 +308,22 @@ max-tx-bytes = 1048576 # XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796 max-batch-bytes = 0 +# ttl-duration, if non-zero, defines the maximum amount of time a transaction +# can exist for in the mempool. +# +# Note, if ttl-num-blocks is also defined, a transaction will be removed if it +# has existed in the mempool at least ttl-num-blocks number of blocks or if it's +# insertion time into the mempool is beyond ttl-duration. +ttl-duration = "0s" + +# ttl-num-blocks, if non-zero, defines the maximum number of blocks a transaction +# can exist for in the mempool. +# +# Note, if ttl-duration is also defined, a transaction will be removed if it +# has existed in the mempool at least ttl-num-blocks number of blocks or if +# it's insertion time into the mempool is beyond ttl-duration. +ttl-num-blocks = 0 + ####################################################### ### State Sync Configuration Options ### ####################################################### @@ -421,7 +441,6 @@ max-open-connections = 3 # Instrumentation namespace namespace = "tendermint" - ``` ## Empty blocks VS no empty blocks diff --git a/internal/mempool/v1/mempool.go b/internal/mempool/v1/mempool.go index 9b06ee4a8..534b7ff55 100644 --- a/internal/mempool/v1/mempool.go +++ b/internal/mempool/v1/mempool.go @@ -74,6 +74,14 @@ type TxMempool struct { // thread-safe priority queue. priorityIndex *TxPriorityQueue + // heightIndex defines a height-based, in ascending order, transaction index. + // i.e. older transactions are first. + heightIndex *WrappedTxList + + // timestampIndex defines a timestamp-based, in ascending order, transaction + // index. i.e. older transactions are first. + timestampIndex *WrappedTxList + // A read/write lock is used to safe guard updates, insertions and deletions // from the mempool. A read-lock is implicitly acquired when executing CheckTx, // however, a caller must explicitly grab a write-lock via Lock when updating @@ -101,6 +109,12 @@ func NewTxMempool( txStore: NewTxStore(), gossipIndex: clist.New(), priorityIndex: NewTxPriorityQueue(), + heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { + return wtx1.height >= wtx2.height + }), + timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { + return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp) + }), } if cfg.CacheSize > 0 { @@ -279,6 +293,7 @@ func (txmp *TxMempool) CheckTx( tx: tx, hash: txHash, timestamp: time.Now().UTC(), + height: txmp.height, } txmp.initTxCallback(wtx, res, txInfo) @@ -300,13 +315,11 @@ func (txmp *TxMempool) Flush() { txmp.mtx.RLock() defer txmp.mtx.RUnlock() + txmp.heightIndex.Reset() + txmp.timestampIndex.Reset() + for _, wtx := range txmp.txStore.GetAllTxs() { - if !txmp.txStore.IsTxRemoved(wtx.hash) { - txmp.txStore.RemoveTx(wtx) - txmp.priorityIndex.RemoveTx(wtx) - txmp.gossipIndex.Remove(wtx.gossipEl) - wtx.gossipEl.DetachPrev() - } + txmp.removeTx(wtx, false) } atomic.SwapInt64(&txmp.sizeBytes, 0) @@ -444,6 +457,8 @@ func (txmp *TxMempool) Update( } } + txmp.purgeExpiredTxs(blockHeight) + // If there any uncommitted transactions left in the mempool, we either // initiate re-CheckTx per remaining transaction or notify that remaining // transactions are left. @@ -550,7 +565,6 @@ func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo } wtx.gasWanted = checkTxRes.CheckTx.GasWanted - wtx.height = txmp.height wtx.priority = priority wtx.sender = sender wtx.peers = map[uint16]struct{}{ @@ -722,6 +736,8 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { func (txmp *TxMempool) insertTx(wtx *WrappedTx) { txmp.txStore.SetTx(wtx) txmp.priorityIndex.PushTx(wtx) + txmp.heightIndex.Insert(wtx) + txmp.timestampIndex.Insert(wtx) // Insert the transaction into the gossip index and mark the reference to the // linked-list element, which will be needed at a later point when the @@ -739,6 +755,8 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { txmp.txStore.RemoveTx(wtx) txmp.priorityIndex.RemoveTx(wtx) + txmp.heightIndex.Remove(wtx) + txmp.timestampIndex.Remove(wtx) // Remove the transaction from the gossip index and cleanup the linked-list // element so it can be garbage collected. @@ -752,6 +770,56 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { } } +// purgeExpiredTxs removes all transactions that have exceeded their respective +// height and/or time based TTLs from their respective indexes. Every expired +// transaction will be removed from the mempool entirely, except for the cache. +// +// NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which +// the caller has a write-lock on the mempool and so we can safely iterate over +// the height and time based indexes. +func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { + now := time.Now() + expiredTxs := make(map[[mempool.TxKeySize]byte]*WrappedTx) + + if txmp.config.TTLNumBlocks > 0 { + purgeIdx := -1 + for i, wtx := range txmp.heightIndex.txs { + if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks { + expiredTxs[mempool.TxKey(wtx.tx)] = wtx + purgeIdx = i + } else { + // since the index is sorted, we know no other txs can be be purged + break + } + } + + if purgeIdx >= 0 { + txmp.heightIndex.txs = txmp.heightIndex.txs[purgeIdx+1:] + } + } + + if txmp.config.TTLDuration > 0 { + purgeIdx := -1 + for i, wtx := range txmp.timestampIndex.txs { + if now.Sub(wtx.timestamp) > txmp.config.TTLDuration { + expiredTxs[mempool.TxKey(wtx.tx)] = wtx + purgeIdx = i + } else { + // since the index is sorted, we know no other txs can be be purged + break + } + } + + if purgeIdx >= 0 { + txmp.timestampIndex.txs = txmp.timestampIndex.txs[purgeIdx+1:] + } + } + + for _, wtx := range expiredTxs { + txmp.removeTx(wtx, false) + } +} + func (txmp *TxMempool) notifyTxsAvailable() { if txmp.Size() == 0 { panic("attempt to notify txs available but mempool is empty!") diff --git a/internal/mempool/v1/mempool_test.go b/internal/mempool/v1/mempool_test.go index 6dbf312d4..eb8385d7e 100644 --- a/internal/mempool/v1/mempool_test.go +++ b/internal/mempool/v1/mempool_test.go @@ -102,14 +102,10 @@ func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx _, err := rng.Read(prefix) require.NoError(t, err) - // sender := make([]byte, 10) - // _, err = rng.Read(sender) - // require.NoError(t, err) - priority := int64(rng.Intn(9999-1000) + 1000) txs[i] = testTx{ - tx: []byte(fmt.Sprintf("sender-%d=%X=%d", i, prefix, priority)), + tx: []byte(fmt.Sprintf("sender-%d-%d=%X=%d", i, peerID, prefix, priority)), priority: priority, } require.NoError(t, txmp.CheckTx(context.Background(), txs[i].tx, nil, txInfo)) @@ -176,7 +172,7 @@ func TestTxMempool_Size(t *testing.T) { txmp := setup(t, 0) txs := checkTxs(t, txmp, 100, 0) require.Equal(t, len(txs), txmp.Size()) - require.Equal(t, int64(5490), txmp.SizeBytes()) + require.Equal(t, int64(5690), txmp.SizeBytes()) rawTxs := make([]types.Tx, len(txs)) for i, tx := range txs { @@ -193,14 +189,14 @@ func TestTxMempool_Size(t *testing.T) { txmp.Unlock() require.Equal(t, len(rawTxs)/2, txmp.Size()) - require.Equal(t, int64(2750), txmp.SizeBytes()) + require.Equal(t, int64(2850), txmp.SizeBytes()) } func TestTxMempool_Flush(t *testing.T) { txmp := setup(t, 0) txs := checkTxs(t, txmp, 100, 0) require.Equal(t, len(txs), txmp.Size()) - require.Equal(t, int64(5490), txmp.SizeBytes()) + require.Equal(t, int64(5690), txmp.SizeBytes()) rawTxs := make([]types.Tx, len(txs)) for i, tx := range txs { @@ -225,7 +221,7 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { txmp := setup(t, 0) tTxs := checkTxs(t, txmp, 100, 0) // all txs request 1 gas unit require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5490), txmp.SizeBytes()) + require.Equal(t, int64(5690), txmp.SizeBytes()) txMap := make(map[[mempool.TxKeySize]byte]testTx) priorities := make([]int64, len(tTxs)) @@ -252,30 +248,30 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5490), txmp.SizeBytes()) + require.Equal(t, int64(5690), txmp.SizeBytes()) require.Len(t, reapedTxs, 50) // reap by transaction bytes only reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5490), txmp.SizeBytes()) - require.Len(t, reapedTxs, 17) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.GreaterOrEqual(t, len(reapedTxs), 16) // Reap by both transaction bytes and gas, where the size yields 31 reaped - // transactions and the gas limit reaps 26 transactions. + // transactions and the gas limit reaps 25 transactions. reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5490), txmp.SizeBytes()) - require.Len(t, reapedTxs, 26) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, 25) } func TestTxMempool_ReapMaxTxs(t *testing.T) { txmp := setup(t, 0) tTxs := checkTxs(t, txmp, 100, 0) require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5490), txmp.SizeBytes()) + require.Equal(t, int64(5690), txmp.SizeBytes()) txMap := make(map[[mempool.TxKeySize]byte]testTx) priorities := make([]int64, len(tTxs)) @@ -302,21 +298,21 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) { reapedTxs := txmp.ReapMaxTxs(-1) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5490), txmp.SizeBytes()) + require.Equal(t, int64(5690), txmp.SizeBytes()) require.Len(t, reapedTxs, len(tTxs)) // reap a single transaction reapedTxs = txmp.ReapMaxTxs(1) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5490), txmp.SizeBytes()) + require.Equal(t, int64(5690), txmp.SizeBytes()) require.Len(t, reapedTxs, 1) // reap half of the transactions reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, int64(5490), txmp.SizeBytes()) + require.Equal(t, int64(5690), txmp.SizeBytes()) require.Len(t, reapedTxs, len(tTxs)/2) } @@ -431,3 +427,53 @@ func TestTxMempool_ConcurrentTxs(t *testing.T) { require.Zero(t, txmp.Size()) require.Zero(t, txmp.SizeBytes()) } + +func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { + txmp := setup(t, 500) + txmp.height = 100 + txmp.config.TTLNumBlocks = 10 + + tTxs := checkTxs(t, txmp, 100, 0) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, 100, txmp.heightIndex.Size()) + + // reap 5 txs at the next height -- no txs should expire + reapedTxs := txmp.ReapMaxTxs(5) + responses := make([]*abci.ResponseDeliverTx, len(reapedTxs)) + for i := 0; i < len(responses); i++ { + responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} + } + + txmp.Lock() + require.NoError(t, txmp.Update(txmp.height+1, reapedTxs, responses, nil, nil)) + txmp.Unlock() + + require.Equal(t, 95, txmp.Size()) + require.Equal(t, 95, txmp.heightIndex.Size()) + + // check more txs at height 101 + _ = checkTxs(t, txmp, 50, 1) + require.Equal(t, 145, txmp.Size()) + require.Equal(t, 145, txmp.heightIndex.Size()) + + // Reap 5 txs at a height that would expire all the transactions from before + // the previous Update (height 100). + // + // NOTE: When we reap txs below, we do not know if we're picking txs from the + // initial CheckTx calls or from the second round of CheckTx calls. Thus, we + // cannot guarantee that all 95 txs are remaining that should be expired and + // removed. However, we do know that that at most 95 txs can be expired and + // removed. + reapedTxs = txmp.ReapMaxTxs(5) + responses = make([]*abci.ResponseDeliverTx, len(reapedTxs)) + for i := 0; i < len(responses); i++ { + responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} + } + + txmp.Lock() + require.NoError(t, txmp.Update(txmp.height+10, reapedTxs, responses, nil, nil)) + txmp.Unlock() + + require.GreaterOrEqual(t, txmp.Size(), 45) + require.GreaterOrEqual(t, txmp.heightIndex.Size(), 45) +} diff --git a/internal/mempool/v1/tx.go b/internal/mempool/v1/tx.go index 4f212a6c4..15173b91f 100644 --- a/internal/mempool/v1/tx.go +++ b/internal/mempool/v1/tx.go @@ -1,6 +1,7 @@ package v1 import ( + "sort" "time" "github.com/tendermint/tendermint/internal/libs/clist" @@ -198,3 +199,84 @@ func (txs *TxStore) GetOrSetPeerByTxHash(hash [mempool.TxKeySize]byte, peerID ui wtx.peers[peerID] = struct{}{} return wtx, false } + +// WrappedTxList implements a thread-safe list of *WrappedTx objects that can be +// used to build generic transaction indexes in the mempool. It accepts a +// comparator function, less(a, b *WrappedTx) bool, that compares two WrappedTx +// references which is used during Insert in order to determine sorted order. If +// less returns true, a <= b. +type WrappedTxList struct { + mtx tmsync.RWMutex + txs []*WrappedTx + less func(*WrappedTx, *WrappedTx) bool +} + +func NewWrappedTxList(less func(*WrappedTx, *WrappedTx) bool) *WrappedTxList { + return &WrappedTxList{ + txs: make([]*WrappedTx, 0), + less: less, + } +} + +// Size returns the number of WrappedTx objects in the list. +func (wtl *WrappedTxList) Size() int { + wtl.mtx.RLock() + defer wtl.mtx.RUnlock() + + return len(wtl.txs) +} + +// Reset resets the list of transactions to an empty list. +func (wtl *WrappedTxList) Reset() { + wtl.mtx.Lock() + defer wtl.mtx.Unlock() + + wtl.txs = make([]*WrappedTx, 0) +} + +// Insert inserts a WrappedTx reference into the sorted list based on the list's +// comparator function. +func (wtl *WrappedTxList) Insert(wtx *WrappedTx) { + wtl.mtx.Lock() + defer wtl.mtx.Unlock() + + i := sort.Search(len(wtl.txs), func(i int) bool { + return wtl.less(wtl.txs[i], wtx) + }) + + if i == len(wtl.txs) { + // insert at the end + wtl.txs = append(wtl.txs, wtx) + return + } + + // Make space for the inserted element by shifting values at the insertion + // index up one index. + // + // NOTE: The call to append does not allocate memory when cap(wtl.txs) > len(wtl.txs). + wtl.txs = append(wtl.txs[:i+1], wtl.txs[i:]...) + wtl.txs[i] = wtx +} + +// Remove attempts to remove a WrappedTx from the sorted list. +func (wtl *WrappedTxList) Remove(wtx *WrappedTx) { + wtl.mtx.Lock() + defer wtl.mtx.Unlock() + + i := sort.Search(len(wtl.txs), func(i int) bool { + return wtl.less(wtl.txs[i], wtx) + }) + + // Since the list is sorted, we evaluate all elements starting at i. Note, if + // the element does not exist, we may potentially evaluate the entire remainder + // of the list. However, a caller should not be expected to call Remove with a + // non-existing element. + for i < len(wtl.txs) { + if wtl.txs[i] == wtx { + wtl.txs = append(wtl.txs[:i], wtl.txs[i+1:]...) + return + } + + i++ + } +} diff --git a/internal/mempool/v1/tx_test.go b/internal/mempool/v1/tx_test.go index 546a8fa9c..c5d488669 100644 --- a/internal/mempool/v1/tx_test.go +++ b/internal/mempool/v1/tx_test.go @@ -2,6 +2,8 @@ package v1 import ( "fmt" + "math/rand" + "sort" "testing" "time" @@ -132,3 +134,97 @@ func TestTxStore_Size(t *testing.T) { require.Equal(t, numTxs, txStore.Size()) } + +func TestWrappedTxList_Reset(t *testing.T) { + list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { + return wtx1.height >= wtx2.height + }) + + require.Zero(t, list.Size()) + + for i := 0; i < 100; i++ { + list.Insert(&WrappedTx{height: int64(i)}) + } + + require.Equal(t, 100, list.Size()) + + list.Reset() + require.Zero(t, list.Size()) +} + +func TestWrappedTxList_Insert(t *testing.T) { + list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { + return wtx1.height >= wtx2.height + }) + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + var expected []int + for i := 0; i < 100; i++ { + height := rng.Int63n(10000) + expected = append(expected, int(height)) + list.Insert(&WrappedTx{height: height}) + + if i%10 == 0 { + list.Insert(&WrappedTx{height: height}) + expected = append(expected, int(height)) + } + } + + got := make([]int, list.Size()) + for i, wtx := range list.txs { + got[i] = int(wtx.height) + } + + sort.Ints(expected) + require.Equal(t, expected, got) +} + +func TestWrappedTxList_Remove(t *testing.T) { + list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { + return wtx1.height >= wtx2.height + }) + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + var txs []*WrappedTx + for i := 0; i < 100; i++ { + height := rng.Int63n(10000) + tx := &WrappedTx{height: height} + + txs = append(txs, tx) + list.Insert(tx) + + if i%10 == 0 { + tx = &WrappedTx{height: height} + list.Insert(tx) + txs = append(txs, tx) + } + } + + // remove a tx that does not exist + list.Remove(&WrappedTx{height: 20000}) + + // remove a tx that exists (by height) but not referenced + list.Remove(&WrappedTx{height: txs[0].height}) + + // remove a few existing txs + for i := 0; i < 25; i++ { + j := rng.Intn(len(txs)) + list.Remove(txs[j]) + txs = append(txs[:j], txs[j+1:]...) + } + + expected := make([]int, len(txs)) + for i, tx := range txs { + expected[i] = int(tx.height) + } + + got := make([]int, list.Size()) + for i, wtx := range list.txs { + got[i] = int(wtx.height) + } + + sort.Ints(expected) + require.Equal(t, expected, got) +}