From 07b46d5a054365b576b8dbab9744c944a4d6e4f4 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Thu, 17 Mar 2022 13:30:13 -0400 Subject: [PATCH 1/2] blocksync: drop redundant shutdown mechanisms (#8136) --- internal/blocksync/pool.go | 16 ---------------- internal/blocksync/reactor.go | 2 -- 2 files changed, 18 deletions(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 4c905c660..7f133a7a1 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -86,7 +86,6 @@ type BlockPool struct { requestsCh chan<- BlockRequest errorsCh chan<- peerError - exitedCh chan struct{} startHeight int64 lastHundredBlockTimeStamp time.Time @@ -109,7 +108,6 @@ func NewBlockPool( height: start, startHeight: start, numPending: 0, - exitedCh: make(chan struct{}), requestsCh: requestsCh, errorsCh: errorsCh, lastSyncRate: 0, @@ -125,11 +123,6 @@ func (pool *BlockPool) OnStart(ctx context.Context) error { pool.lastHundredBlockTimeStamp = pool.lastAdvance go pool.makeRequestersRoutine(ctx) - go func() { - defer close(pool.exitedCh) - pool.Wait() - }() - return nil } @@ -637,12 +630,6 @@ func (bpr *bpRequester) redo(peerID types.NodeID) { // Responsible for making more requests as necessary // Returns only when a block is found (e.g. AddBlock() is called) func (bpr *bpRequester) requestRoutine(ctx context.Context) { - bprPoolDone := make(chan struct{}) - go func() { - defer close(bprPoolDone) - bpr.pool.Wait() - }() - OUTER_LOOP: for { // Pick a peer to send request to. @@ -670,9 +657,6 @@ OUTER_LOOP: select { case <-ctx.Done(): return - case <-bpr.pool.exitedCh: - bpr.Stop() - return case peerID := <-bpr.redoCh: if peerID == bpr.peerID { bpr.reset() diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index cf1a10623..89c8c642b 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -445,8 +445,6 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) { select { case <-ctx.Done(): return - case <-r.pool.exitedCh: - return case <-switchToConsensusTicker.C: var ( height, numPending, lenRequesters = r.pool.GetStatus() From 1dd8807cc32e2ca5d59ec032fb708809119a6129 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Thu, 17 Mar 2022 13:45:27 -0400 Subject: [PATCH 2/2] mempool: test harness should expose application (#8143) This is minor, but I was trying to write a test and realized that the application reference in the harness isn't actually used, which is quite confusing. --- internal/mempool/mempool_bench_test.go | 10 ++- internal/mempool/mempool_test.go | 105 +++++++++++++++++++------ internal/mempool/reactor_test.go | 34 +++++--- 3 files changed, 113 insertions(+), 36 deletions(-) diff --git a/internal/mempool/mempool_bench_test.go b/internal/mempool/mempool_bench_test.go index 088af174a..843e42e87 100644 --- a/internal/mempool/mempool_bench_test.go +++ b/internal/mempool/mempool_bench_test.go @@ -8,15 +8,23 @@ import ( "time" "github.com/stretchr/testify/require" + abciclient "github.com/tendermint/tendermint/abci/client" + "github.com/tendermint/tendermint/abci/example/kvstore" + "github.com/tendermint/tendermint/libs/log" ) func BenchmarkTxMempool_CheckTx(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication()) + if err := client.Start(ctx); err != nil { + b.Fatal(err) + } + // setup the cache and the mempool number for hitting GetEvictableTxs during the // benchmark. 5000 is the current default mempool size in the TM config. - txmp := setup(ctx, b, 10000) + txmp := setup(ctx, b, client, 10000) txmp.config.Size = 5000 rng := rand.New(rand.NewSource(time.Now().UnixNano())) diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index e4f604cb1..165e4bd20 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -72,30 +72,18 @@ func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { } } -func setup(ctx context.Context, t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool { +func setup(ctx context.Context, t testing.TB, app abciclient.Client, cacheSize int, options ...TxMempoolOption) *TxMempool { t.Helper() - var cancel context.CancelFunc - ctx, cancel = context.WithCancel(ctx) - logger := log.TestingLogger() - conn := abciclient.NewLocalClient(logger, &application{ - kvstore.NewApplication(), - }) - cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|")) require.NoError(t, err) cfg.Mempool.CacheSize = cacheSize - require.NoError(t, conn.Start(ctx)) - t.Cleanup(func() { - os.RemoveAll(cfg.RootDir) - cancel() - conn.Wait() - }) + t.Cleanup(func() { os.RemoveAll(cfg.RootDir) }) - return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, conn, options...) + return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, app, options...) } func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { @@ -137,7 +125,13 @@ func TestTxMempool_TxsAvailable(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - txmp := setup(ctx, t, 0) + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(ctx, t, client, 0) txmp.EnableTxsAvailable() ensureNoTxFire := func() { @@ -194,7 +188,13 @@ func TestTxMempool_Size(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - txmp := setup(ctx, t, 0) + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(ctx, t, client, 0) txs := checkTxs(ctx, t, txmp, 100, 0) require.Equal(t, len(txs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) @@ -221,7 +221,13 @@ func TestTxMempool_Flush(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - txmp := setup(ctx, t, 0) + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(ctx, t, client, 0) txs := checkTxs(ctx, t, txmp, 100, 0) require.Equal(t, len(txs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) @@ -249,7 +255,13 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - txmp := setup(ctx, t, 0) + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(ctx, t, client, 0) tTxs := checkTxs(ctx, t, txmp, 100, 0) // all txs request 1 gas unit require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) @@ -302,7 +314,13 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - txmp := setup(ctx, t, 0) + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(ctx, t, client, 0) tTxs := checkTxs(ctx, t, txmp, 100, 0) require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) @@ -354,7 +372,12 @@ func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - txmp := setup(ctx, t, 0) + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + txmp := setup(ctx, t, client, 0) rng := rand.New(rand.NewSource(time.Now().UnixNano())) tx := make([]byte, txmp.config.MaxTxBytes+1) @@ -374,7 +397,13 @@ func TestTxMempool_CheckTxSamePeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - txmp := setup(ctx, t, 100) + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(ctx, t, client, 100) peerID := uint16(1) rng := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -392,7 +421,13 @@ func TestTxMempool_CheckTxSameSender(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - txmp := setup(ctx, t, 100) + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(ctx, t, client, 100) peerID := uint16(1) rng := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -417,7 +452,13 @@ func TestTxMempool_ConcurrentTxs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - txmp := setup(ctx, t, 100) + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(ctx, t, client, 100) rng := rand.New(rand.NewSource(time.Now().UnixNano())) checkTxDone := make(chan struct{}) @@ -484,7 +525,13 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - txmp := setup(ctx, t, 500) + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(ctx, t, client, 500) txmp.height = 100 txmp.config.TTLNumBlocks = 10 @@ -556,10 +603,16 @@ func TestTxMempool_CheckTxPostCheckError(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error { return testCase.err } - txmp := setup(ctx, t, 0, WithPostCheck(postCheckFn)) + txmp := setup(ctx, t, client, 0, WithPostCheck(postCheckFn)) rng := rand.New(rand.NewSource(time.Now().UnixNano())) tx := make([]byte, txmp.config.MaxTxBytes-1) _, err := rng.Read(tx) diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index 04e51ca8d..dab63af73 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -13,6 +13,7 @@ import ( "github.com/fortytw2/leaktest" "github.com/stretchr/testify/require" + abciclient "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/example/kvstore" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" @@ -39,7 +40,7 @@ type reactorTestSuite struct { nodes []types.NodeID } -func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint) *reactorTestSuite { +func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNodes int, chBuf uint) *reactorTestSuite { t.Helper() cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|")) @@ -63,7 +64,11 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint) for nodeID := range rts.network.Nodes { rts.kvstores[nodeID] = kvstore.NewApplication() - mempool := setup(ctx, t, 0) + client := abciclient.NewLocalClient(logger, rts.kvstores[nodeID]) + require.NoError(t, client.Start(ctx)) + t.Cleanup(client.Wait) + + mempool := setup(ctx, t, client, 0) rts.mempools[nodeID] = mempool rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf) @@ -151,7 +156,9 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) { defer cancel() const numNodes = 2 - rts := setupReactors(ctx, t, numNodes, 0) + + logger := log.NewNopLogger() + rts := setupReactors(ctx, t, logger, numNodes, 0) observePanic := func(r interface{}) { t.Fatal("panic detected in reactor") @@ -194,7 +201,9 @@ func TestReactorBroadcastTxs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setupReactors(ctx, t, numNodes, uint(numTxs)) + logger := log.NewNopLogger() + + rts := setupReactors(ctx, t, logger, numNodes, uint(numTxs)) primary := rts.nodes[0] secondaries := rts.nodes[1:] @@ -218,7 +227,8 @@ func TestReactorConcurrency(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setupReactors(ctx, t, numNodes, 0) + logger := log.NewNopLogger() + rts := setupReactors(ctx, t, logger, numNodes, 0) primary := rts.nodes[0] secondary := rts.nodes[1] @@ -276,7 +286,8 @@ func TestReactorNoBroadcastToSender(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setupReactors(ctx, t, numNodes, uint(numTxs)) + logger := log.NewNopLogger() + rts := setupReactors(ctx, t, logger, numNodes, uint(numTxs)) primary := rts.nodes[0] secondary := rts.nodes[1] @@ -300,7 +311,9 @@ func TestReactor_MaxTxBytes(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setupReactors(ctx, t, numNodes, 0) + logger := log.NewNopLogger() + + rts := setupReactors(ctx, t, logger, numNodes, 0) primary := rts.nodes[0] secondary := rts.nodes[1] @@ -336,7 +349,8 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setupReactors(ctx, t, 1, MaxActiveIDs+1) + logger := log.NewNopLogger() + rts := setupReactors(ctx, t, logger, 1, MaxActiveIDs+1) nodeID := rts.nodes[0] @@ -388,7 +402,9 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setupReactors(ctx, t, 2, 2) + logger := log.NewNopLogger() + + rts := setupReactors(ctx, t, logger, 2, 2) primary := rts.nodes[0] secondary := rts.nodes[1]