|
|
- package v1
-
- import (
- "bytes"
- "context"
- "fmt"
- "math/rand"
- "os"
- "sort"
- "strconv"
- "sync"
- "testing"
- "time"
-
- "github.com/stretchr/testify/require"
- "github.com/tendermint/tendermint/abci/example/code"
- "github.com/tendermint/tendermint/abci/example/kvstore"
- abci "github.com/tendermint/tendermint/abci/types"
- "github.com/tendermint/tendermint/config"
- "github.com/tendermint/tendermint/internal/mempool"
- "github.com/tendermint/tendermint/libs/log"
- "github.com/tendermint/tendermint/proxy"
- "github.com/tendermint/tendermint/types"
- )
-
- // application extends the KV store application by overriding CheckTx to provide
- // transaction priority based on the value in the key/value pair.
- type application struct {
- *kvstore.Application
- }
-
- type testTx struct {
- tx types.Tx
- priority int64
- }
-
- func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
- var (
- priority int64
- sender string
- )
-
- // infer the priority from the raw transaction value (sender=key=value)
- parts := bytes.Split(req.Tx, []byte("="))
- if len(parts) == 3 {
- v, err := strconv.ParseInt(string(parts[2]), 10, 64)
- if err != nil {
- return abci.ResponseCheckTx{
- Priority: priority,
- Code: 100,
- GasWanted: 1,
- }
- }
-
- priority = v
- sender = string(parts[0])
- } else {
- return abci.ResponseCheckTx{
- Priority: priority,
- Code: 101,
- GasWanted: 1,
- }
- }
-
- return abci.ResponseCheckTx{
- Priority: priority,
- Sender: sender,
- Code: code.CodeTypeOK,
- GasWanted: 1,
- }
- }
-
- func setup(t testing.TB, cacheSize int) *TxMempool {
- t.Helper()
-
- app := &application{kvstore.NewApplication()}
- cc := proxy.NewLocalClientCreator(app)
-
- cfg := config.ResetTestRoot(t.Name())
- cfg.Mempool.CacheSize = cacheSize
-
- appConnMem, err := cc.NewABCIClient()
- require.NoError(t, err)
- require.NoError(t, appConnMem.Start())
-
- t.Cleanup(func() {
- os.RemoveAll(cfg.RootDir)
- require.NoError(t, appConnMem.Stop())
- })
-
- return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0)
- }
-
- func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {
- txs := make([]testTx, numTxs)
- txInfo := mempool.TxInfo{SenderID: peerID}
-
- rng := rand.New(rand.NewSource(time.Now().UnixNano()))
-
- for i := 0; i < numTxs; i++ {
- prefix := make([]byte, 20)
- _, err := rng.Read(prefix)
- require.NoError(t, err)
-
- // sender := make([]byte, 10)
- // _, err = rng.Read(sender)
- // require.NoError(t, err)
-
- priority := int64(rng.Intn(9999-1000) + 1000)
-
- txs[i] = testTx{
- tx: []byte(fmt.Sprintf("sender-%d=%X=%d", i, prefix, priority)),
- priority: priority,
- }
- require.NoError(t, txmp.CheckTx(context.Background(), txs[i].tx, nil, txInfo))
- }
-
- return txs
- }
-
- func TestTxMempool_TxsAvailable(t *testing.T) {
- txmp := setup(t, 0)
- txmp.EnableTxsAvailable()
-
- ensureNoTxFire := func() {
- timer := time.NewTimer(500 * time.Millisecond)
- select {
- case <-txmp.TxsAvailable():
- require.Fail(t, "unexpected transactions event")
- case <-timer.C:
- }
- }
-
- ensureTxFire := func() {
- timer := time.NewTimer(500 * time.Millisecond)
- select {
- case <-txmp.TxsAvailable():
- case <-timer.C:
- require.Fail(t, "expected transactions event")
- }
- }
-
- // ensure no event as we have not executed any transactions yet
- ensureNoTxFire()
-
- // Execute CheckTx for some transactions and ensure TxsAvailable only fires
- // once.
- txs := checkTxs(t, txmp, 100, 0)
- ensureTxFire()
- ensureNoTxFire()
-
- rawTxs := make([]types.Tx, len(txs))
- for i, tx := range txs {
- rawTxs[i] = tx.tx
- }
-
- responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
- for i := 0; i < len(responses); i++ {
- responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
- }
-
- // commit half the transactions and ensure we fire an event
- txmp.Lock()
- require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
- txmp.Unlock()
- ensureTxFire()
- ensureNoTxFire()
-
- // Execute CheckTx for more transactions and ensure we do not fire another
- // event as we're still on the same height (1).
- _ = checkTxs(t, txmp, 100, 0)
- ensureNoTxFire()
- }
-
- func TestTxMempool_Size(t *testing.T) {
- txmp := setup(t, 0)
- txs := checkTxs(t, txmp, 100, 0)
- require.Equal(t, len(txs), txmp.Size())
- require.Equal(t, int64(5490), txmp.SizeBytes())
-
- rawTxs := make([]types.Tx, len(txs))
- for i, tx := range txs {
- rawTxs[i] = tx.tx
- }
-
- responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
- for i := 0; i < len(responses); i++ {
- responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
- }
-
- txmp.Lock()
- require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
- txmp.Unlock()
-
- require.Equal(t, len(rawTxs)/2, txmp.Size())
- require.Equal(t, int64(2750), txmp.SizeBytes())
- }
-
- func TestTxMempool_Flush(t *testing.T) {
- txmp := setup(t, 0)
- txs := checkTxs(t, txmp, 100, 0)
- require.Equal(t, len(txs), txmp.Size())
- require.Equal(t, int64(5490), txmp.SizeBytes())
-
- rawTxs := make([]types.Tx, len(txs))
- for i, tx := range txs {
- rawTxs[i] = tx.tx
- }
-
- responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
- for i := 0; i < len(responses); i++ {
- responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
- }
-
- txmp.Lock()
- require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
- txmp.Unlock()
-
- txmp.Flush()
- require.Zero(t, txmp.Size())
- require.Equal(t, int64(0), txmp.SizeBytes())
- }
-
- func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
- txmp := setup(t, 0)
- tTxs := checkTxs(t, txmp, 100, 0) // all txs request 1 gas unit
- require.Equal(t, len(tTxs), txmp.Size())
- require.Equal(t, int64(5490), txmp.SizeBytes())
-
- txMap := make(map[[mempool.TxKeySize]byte]testTx)
- priorities := make([]int64, len(tTxs))
- for i, tTx := range tTxs {
- txMap[mempool.TxKey(tTx.tx)] = tTx
- priorities[i] = tTx.priority
- }
-
- sort.Slice(priorities, func(i, j int) bool {
- // sort by priority, i.e. decreasing order
- return priorities[i] > priorities[j]
- })
-
- ensurePrioritized := func(reapedTxs types.Txs) {
- reapedPriorities := make([]int64, len(reapedTxs))
- for i, rTx := range reapedTxs {
- reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority
- }
-
- require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
- }
-
- // reap by gas capacity only
- reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50)
- ensurePrioritized(reapedTxs)
- require.Equal(t, len(tTxs), txmp.Size())
- require.Equal(t, int64(5490), txmp.SizeBytes())
- require.Len(t, reapedTxs, 50)
-
- // reap by transaction bytes only
- reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1)
- ensurePrioritized(reapedTxs)
- require.Equal(t, len(tTxs), txmp.Size())
- require.Equal(t, int64(5490), txmp.SizeBytes())
- require.Len(t, reapedTxs, 17)
-
- // Reap by both transaction bytes and gas, where the size yields 31 reaped
- // transactions and the gas limit reaps 26 transactions.
- reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30)
- ensurePrioritized(reapedTxs)
- require.Equal(t, len(tTxs), txmp.Size())
- require.Equal(t, int64(5490), txmp.SizeBytes())
- require.Len(t, reapedTxs, 26)
- }
-
- func TestTxMempool_ReapMaxTxs(t *testing.T) {
- txmp := setup(t, 0)
- tTxs := checkTxs(t, txmp, 100, 0)
- require.Equal(t, len(tTxs), txmp.Size())
- require.Equal(t, int64(5490), txmp.SizeBytes())
-
- txMap := make(map[[mempool.TxKeySize]byte]testTx)
- priorities := make([]int64, len(tTxs))
- for i, tTx := range tTxs {
- txMap[mempool.TxKey(tTx.tx)] = tTx
- priorities[i] = tTx.priority
- }
-
- sort.Slice(priorities, func(i, j int) bool {
- // sort by priority, i.e. decreasing order
- return priorities[i] > priorities[j]
- })
-
- ensurePrioritized := func(reapedTxs types.Txs) {
- reapedPriorities := make([]int64, len(reapedTxs))
- for i, rTx := range reapedTxs {
- reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority
- }
-
- require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
- }
-
- // reap all transactions
- reapedTxs := txmp.ReapMaxTxs(-1)
- ensurePrioritized(reapedTxs)
- require.Equal(t, len(tTxs), txmp.Size())
- require.Equal(t, int64(5490), txmp.SizeBytes())
- require.Len(t, reapedTxs, len(tTxs))
-
- // reap a single transaction
- reapedTxs = txmp.ReapMaxTxs(1)
- ensurePrioritized(reapedTxs)
- require.Equal(t, len(tTxs), txmp.Size())
- require.Equal(t, int64(5490), txmp.SizeBytes())
- require.Len(t, reapedTxs, 1)
-
- // reap half of the transactions
- reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2)
- ensurePrioritized(reapedTxs)
- require.Equal(t, len(tTxs), txmp.Size())
- require.Equal(t, int64(5490), txmp.SizeBytes())
- require.Len(t, reapedTxs, len(tTxs)/2)
- }
-
- func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {
- txmp := setup(t, 0)
-
- rng := rand.New(rand.NewSource(time.Now().UnixNano()))
- tx := make([]byte, txmp.config.MaxTxsBytes+1)
- _, err := rng.Read(tx)
- require.NoError(t, err)
-
- require.Error(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: 0}))
- }
-
- func TestTxMempool_CheckTxSamePeer(t *testing.T) {
- txmp := setup(t, 100)
- peerID := uint16(1)
- rng := rand.New(rand.NewSource(time.Now().UnixNano()))
-
- prefix := make([]byte, 20)
- _, err := rng.Read(prefix)
- require.NoError(t, err)
-
- tx := []byte(fmt.Sprintf("sender-0=%X=%d", prefix, 50))
-
- require.NoError(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: peerID}))
- require.Error(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: peerID}))
- }
-
- func TestTxMempool_CheckTxSameSender(t *testing.T) {
- txmp := setup(t, 100)
- peerID := uint16(1)
- rng := rand.New(rand.NewSource(time.Now().UnixNano()))
-
- prefix1 := make([]byte, 20)
- _, err := rng.Read(prefix1)
- require.NoError(t, err)
-
- prefix2 := make([]byte, 20)
- _, err = rng.Read(prefix2)
- require.NoError(t, err)
-
- tx1 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix1, 50))
- tx2 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix2, 50))
-
- require.NoError(t, txmp.CheckTx(context.Background(), tx1, nil, mempool.TxInfo{SenderID: peerID}))
- require.Equal(t, 1, txmp.Size())
- require.NoError(t, txmp.CheckTx(context.Background(), tx2, nil, mempool.TxInfo{SenderID: peerID}))
- require.Equal(t, 1, txmp.Size())
- }
-
- func TestTxMempool_ConcurrentTxs(t *testing.T) {
- txmp := setup(t, 100)
- rng := rand.New(rand.NewSource(time.Now().UnixNano()))
- checkTxDone := make(chan struct{})
-
- var wg sync.WaitGroup
-
- wg.Add(1)
- go func() {
- for i := 0; i < 20; i++ {
- _ = checkTxs(t, txmp, 100, 0)
- dur := rng.Intn(1000-500) + 500
- time.Sleep(time.Duration(dur) * time.Millisecond)
- }
-
- wg.Done()
- close(checkTxDone)
- }()
-
- wg.Add(1)
- go func() {
- ticker := time.NewTicker(time.Second)
- defer ticker.Stop()
- defer wg.Done()
-
- var height int64 = 1
-
- for range ticker.C {
- reapedTxs := txmp.ReapMaxTxs(200)
- if len(reapedTxs) > 0 {
- responses := make([]*abci.ResponseDeliverTx, len(reapedTxs))
- for i := 0; i < len(responses); i++ {
- var code uint32
-
- if i%10 == 0 {
- code = 100
- } else {
- code = abci.CodeTypeOK
- }
-
- responses[i] = &abci.ResponseDeliverTx{Code: code}
- }
-
- txmp.Lock()
- require.NoError(t, txmp.Update(height, reapedTxs, responses, nil, nil))
- txmp.Unlock()
-
- height++
- } else {
- // only return once we know we finished the CheckTx loop
- select {
- case <-checkTxDone:
- return
- default:
- }
- }
- }
- }()
-
- wg.Wait()
- require.Zero(t, txmp.Size())
- require.Zero(t, txmp.SizeBytes())
- }
|