Browse Source

mempool: test harness should expose application

pull/8143/head
tycho garen 3 years ago
parent
commit
c95cb86743
3 changed files with 112 additions and 29 deletions
  1. +9
    -1
      internal/mempool/mempool_bench_test.go
  2. +78
    -19
      internal/mempool/mempool_test.go
  3. +25
    -9
      internal/mempool/reactor_test.go

+ 9
- 1
internal/mempool/mempool_bench_test.go View File

@ -8,15 +8,23 @@ import (
"time"
"github.com/stretchr/testify/require"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/libs/log"
)
func BenchmarkTxMempool_CheckTx(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication())
if err := client.Start(ctx); err != nil {
b.Fatal(err)
}
// setup the cache and the mempool number for hitting GetEvictableTxs during the
// benchmark. 5000 is the current default mempool size in the TM config.
txmp := setup(ctx, b, 10000)
txmp := setup(ctx, b, client, 10000)
txmp.config.Size = 5000
rng := rand.New(rand.NewSource(time.Now().UnixNano()))


+ 78
- 19
internal/mempool/mempool_test.go View File

@ -72,7 +72,7 @@ func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
}
}
func setup(ctx context.Context, t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool {
func setup(ctx context.Context, t testing.TB, app abciclient.Client, cacheSize int, options ...TxMempoolOption) *TxMempool {
t.Helper()
var cancel context.CancelFunc
@ -80,22 +80,16 @@ func setup(ctx context.Context, t testing.TB, cacheSize int, options ...TxMempoo
logger := log.TestingLogger()
conn := abciclient.NewLocalClient(logger, &application{
kvstore.NewApplication(),
})
cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|"))
require.NoError(t, err)
cfg.Mempool.CacheSize = cacheSize
require.NoError(t, conn.Start(ctx))
t.Cleanup(func() {
os.RemoveAll(cfg.RootDir)
cancel()
conn.Wait()
})
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, conn, options...)
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, app, options...)
}
func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {
@ -137,7 +131,13 @@ func TestTxMempool_TxsAvailable(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 0)
client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication())
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
txmp.EnableTxsAvailable()
ensureNoTxFire := func() {
@ -194,7 +194,13 @@ func TestTxMempool_Size(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 0)
client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication())
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
txs := checkTxs(ctx, t, txmp, 100, 0)
require.Equal(t, len(txs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
@ -221,7 +227,13 @@ func TestTxMempool_Flush(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 0)
client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication())
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
txs := checkTxs(ctx, t, txmp, 100, 0)
require.Equal(t, len(txs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
@ -249,7 +261,13 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 0)
client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication())
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
tTxs := checkTxs(ctx, t, txmp, 100, 0) // all txs request 1 gas unit
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
@ -302,7 +320,13 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 0)
client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication())
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
tTxs := checkTxs(ctx, t, txmp, 100, 0)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
@ -354,7 +378,12 @@ func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 0)
client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication())
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
tx := make([]byte, txmp.config.MaxTxBytes+1)
@ -374,7 +403,13 @@ func TestTxMempool_CheckTxSamePeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 100)
client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication())
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 100)
peerID := uint16(1)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
@ -392,7 +427,13 @@ func TestTxMempool_CheckTxSameSender(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 100)
client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication())
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 100)
peerID := uint16(1)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
@ -417,7 +458,13 @@ func TestTxMempool_ConcurrentTxs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 100)
client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication())
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 100)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
checkTxDone := make(chan struct{})
@ -484,7 +531,13 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 500)
client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication())
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 500)
txmp.height = 100
txmp.config.TTLNumBlocks = 10
@ -556,10 +609,16 @@ func TestTxMempool_CheckTxPostCheckError(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication())
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error {
return testCase.err
}
txmp := setup(ctx, t, 0, WithPostCheck(postCheckFn))
txmp := setup(ctx, t, client, 0, WithPostCheck(postCheckFn))
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
tx := make([]byte, txmp.config.MaxTxBytes-1)
_, err := rng.Read(tx)


+ 25
- 9
internal/mempool/reactor_test.go View File

@ -13,6 +13,7 @@ import (
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/require"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/kvstore"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
@ -39,7 +40,7 @@ type reactorTestSuite struct {
nodes []types.NodeID
}
func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint) *reactorTestSuite {
func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNodes int, chBuf uint) *reactorTestSuite {
t.Helper()
cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|"))
@ -63,7 +64,11 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint)
for nodeID := range rts.network.Nodes {
rts.kvstores[nodeID] = kvstore.NewApplication()
mempool := setup(ctx, t, 0)
client := abciclient.NewLocalClient(logger, rts.kvstores[nodeID])
require.NoError(t, client.Start(ctx))
t.Cleanup(client.Wait)
mempool := setup(ctx, t, client, 0)
rts.mempools[nodeID] = mempool
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf)
@ -151,7 +156,9 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
defer cancel()
const numNodes = 2
rts := setupReactors(ctx, t, numNodes, 0)
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, numNodes, 0)
observePanic := func(r interface{}) {
t.Fatal("panic detected in reactor")
@ -194,7 +201,9 @@ func TestReactorBroadcastTxs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, numNodes, uint(numTxs))
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, numNodes, uint(numTxs))
primary := rts.nodes[0]
secondaries := rts.nodes[1:]
@ -218,7 +227,8 @@ func TestReactorConcurrency(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, numNodes, 0)
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, numNodes, 0)
primary := rts.nodes[0]
secondary := rts.nodes[1]
@ -276,7 +286,8 @@ func TestReactorNoBroadcastToSender(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, numNodes, uint(numTxs))
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, numNodes, uint(numTxs))
primary := rts.nodes[0]
secondary := rts.nodes[1]
@ -300,7 +311,9 @@ func TestReactor_MaxTxBytes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, numNodes, 0)
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, numNodes, 0)
primary := rts.nodes[0]
secondary := rts.nodes[1]
@ -336,7 +349,8 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, 1, MaxActiveIDs+1)
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, 1, MaxActiveIDs+1)
nodeID := rts.nodes[0]
@ -388,7 +402,9 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, 2, 2)
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, 2, 2)
primary := rts.nodes[0]
secondary := rts.nodes[1]


Loading…
Cancel
Save