From a3881f0fb14109ba258aa1aace1b4d29bcbdb31e Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 2 Mar 2022 13:35:39 -0500 Subject: [PATCH] consensus: improve wal test cleanup (#8059) I believe that this gets rid of our temp-file related test errors. --- internal/consensus/common_test.go | 11 ++++++-- internal/consensus/wal_generator.go | 40 ++++++++++++++++------------- internal/consensus/wal_test.go | 19 +++++++------- internal/libs/autofile/group.go | 13 ++++++++++ 4 files changed, 53 insertions(+), 30 deletions(-) diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index 10955fc8c..bd381ff7e 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -370,7 +370,11 @@ func subscribeToVoter(ctx context.Context, t *testing.T, cs *State, addr []byte) vote := msg.Data().(types.EventDataVote) // we only fire for our own votes if bytes.Equal(addr, vote.Vote.ValidatorAddress) { - ch <- msg + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- msg: + } } return nil }, types.EventQueryVote); err != nil { @@ -401,7 +405,10 @@ func subscribeToVoterBuffered(ctx context.Context, t *testing.T, cs *State, addr vote := msg.Data().(types.EventDataVote) // we only fire for our own votes if bytes.Equal(addr, vote.Vote.ValidatorAddress) { - ch <- msg + select { + case <-ctx.Done(): + case ch <- msg: + } } } }() diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index 57f6d6704..b10feb828 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -30,8 +30,10 @@ import ( // stripped down version of node (proxy app, event bus, consensus state) with a // persistent kvstore application and special consensus wal instance // (byteBufferWAL) and waits until numBlocks are created. -// If the node fails to produce given numBlocks, it returns an error. -func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr io.Writer, numBlocks int) (err error) { +// If the node fails to produce given numBlocks, it fails the test. +func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr io.Writer, numBlocks int) { + t.Helper() + cfg := getConfig(t) app := kvstore.NewPersistentKVStoreApplication(logger, filepath.Join(cfg.DBDir(), "wal_generator")) @@ -46,35 +48,37 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr privValidatorStateFile := cfg.PrivValidator.StateFile() privValidator, err := privval.LoadOrGenFilePV(privValidatorKeyFile, privValidatorStateFile) if err != nil { - return err + t.Fatal(err) } genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile()) if err != nil { - return fmt.Errorf("failed to read genesis file: %w", err) + t.Fatal(fmt.Errorf("failed to read genesis file: %w", err)) } blockStoreDB := dbm.NewMemDB() stateDB := blockStoreDB stateStore := sm.NewStore(stateDB) state, err := sm.MakeGenesisState(genDoc) if err != nil { - return fmt.Errorf("failed to make genesis state: %w", err) + t.Fatal(fmt.Errorf("failed to make genesis state: %w", err)) } state.Version.Consensus.App = kvstore.ProtocolVersion if err = stateStore.Save(state); err != nil { - t.Error(err) + t.Fatal(err) } blockStore := store.NewBlockStore(blockStoreDB) proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics()) if err := proxyApp.Start(ctx); err != nil { - return fmt.Errorf("failed to start proxy app connections: %w", err) + t.Fatal(fmt.Errorf("failed to start proxy app connections: %w", err)) } + t.Cleanup(proxyApp.Wait) eventBus := eventbus.NewDefault(logger.With("module", "events")) if err := eventBus.Start(ctx); err != nil { - return fmt.Errorf("failed to start event bus: %w", err) + t.Fatal(fmt.Errorf("failed to start event bus: %w", err)) } + t.Cleanup(func() { eventBus.Stop(); eventBus.Wait() }) mempool := emptyMempool{} evpool := sm.EmptyEvidencePool{} @@ -91,22 +95,24 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten) // see wal.go#103 if err := wal.Write(EndHeightMessage{0}); err != nil { - t.Error(err) + t.Fatal(err) } consensusState.wal = wal if err := consensusState.Start(ctx); err != nil { - return fmt.Errorf("failed to start consensus state: %w", err) + t.Fatal(fmt.Errorf("failed to start consensus state: %w", err)) } + t.Cleanup(consensusState.Wait) + + defer consensusState.Stop() + timer := time.NewTimer(time.Minute) + defer timer.Stop() select { case <-numBlocksWritten: - consensusState.Stop() - return nil - case <-time.After(1 * time.Minute): - consensusState.Stop() - return fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks) + case <-timer.C: + t.Fatal(fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks)) } } @@ -115,9 +121,7 @@ func WALWithNBlocks(ctx context.Context, t *testing.T, logger log.Logger, numBlo var b bytes.Buffer wr := bufio.NewWriter(&b) - if err := WALGenerateNBlocks(ctx, t, logger, wr, numBlocks); err != nil { - return []byte{}, err - } + WALGenerateNBlocks(ctx, t, logger, wr, numBlocks) wr.Flush() return b.Bytes(), nil diff --git a/internal/consensus/wal_test.go b/internal/consensus/wal_test.go index a2c76676c..169b7c327 100644 --- a/internal/consensus/wal_test.go +++ b/internal/consensus/wal_test.go @@ -3,6 +3,7 @@ package consensus import ( "bytes" "context" + "os" "path/filepath" "testing" @@ -41,13 +42,12 @@ func TestWALTruncate(t *testing.T) { require.NoError(t, err) err = wal.Start(ctx) require.NoError(t, err) - t.Cleanup(wal.Wait) + t.Cleanup(func() { wal.Stop(); wal.Group().Stop(); wal.Group().Wait(); wal.Wait() }) // 60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), // when headBuf is full, truncate content will Flush to the file. at this // time, RotateFile is called, truncate content exist in each file. - err = WALGenerateNBlocks(ctx, t, logger, wal.Group(), 60) - require.NoError(t, err) + WALGenerateNBlocks(ctx, t, logger, wal.Group(), 60) // put the leakcheck here so it runs after other cleanup // functions. @@ -112,7 +112,7 @@ func TestWALWrite(t *testing.T) { require.NoError(t, err) err = wal.Start(ctx) require.NoError(t, err) - t.Cleanup(wal.Wait) + t.Cleanup(func() { wal.Stop(); wal.Group().Stop(); wal.Group().Wait(); wal.Wait() }) // 1) Write returns an error if msg is too big msg := &BlockPartMessage{ @@ -151,7 +151,6 @@ func TestWALSearchForEndHeight(t *testing.T) { wal, err := NewWAL(ctx, logger, walFile) require.NoError(t, err) - t.Cleanup(func() { wal.Stop(); wal.Wait() }) h := int64(3) gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{}) @@ -176,24 +175,24 @@ func TestWALPeriodicSync(t *testing.T) { walDir := t.TempDir() walFile := filepath.Join(walDir, "wal") - wal, err := NewWAL(ctx, log.TestingLogger(), walFile, autofile.GroupCheckDuration(1*time.Millisecond)) + defer os.RemoveAll(walFile) + wal, err := NewWAL(ctx, log.TestingLogger(), walFile, autofile.GroupCheckDuration(250*time.Millisecond)) require.NoError(t, err) wal.SetFlushInterval(walTestFlushInterval) logger := log.NewNopLogger() // Generate some data - err = WALGenerateNBlocks(ctx, t, logger, wal.Group(), 5) - require.NoError(t, err) + WALGenerateNBlocks(ctx, t, logger, wal.Group(), 5) // We should have data in the buffer now assert.NotZero(t, wal.Group().Buffered()) require.NoError(t, wal.Start(ctx)) - t.Cleanup(func() { wal.Stop(); wal.Wait() }) + t.Cleanup(func() { wal.Stop(); wal.Group().Stop(); wal.Group().Wait(); wal.Wait() }) - time.Sleep(walTestFlushInterval + (10 * time.Millisecond)) + time.Sleep(walTestFlushInterval + (20 * time.Millisecond)) // The data should have been flushed by the periodic sync assert.Zero(t, wal.Group().Buffered()) diff --git a/internal/libs/autofile/group.go b/internal/libs/autofile/group.go index 1b4418d59..bb2c41808 100644 --- a/internal/libs/autofile/group.go +++ b/internal/libs/autofile/group.go @@ -274,6 +274,10 @@ func (g *Group) checkTotalSizeLimit(ctx context.Context) { g.mtx.Lock() defer g.mtx.Unlock() + if err := ctx.Err(); err != nil { + return + } + if g.totalSizeLimit == 0 { return } @@ -290,6 +294,11 @@ func (g *Group) checkTotalSizeLimit(ctx context.Context) { g.logger.Error("Group's head may grow without bound", "head", g.Head.Path) return } + + if ctx.Err() != nil { + return + } + pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex) fInfo, err := os.Stat(pathToRemove) if err != nil { @@ -314,6 +323,10 @@ func (g *Group) rotateFile(ctx context.Context) { g.mtx.Lock() defer g.mtx.Unlock() + if err := ctx.Err(); err != nil { + return + } + headPath := g.Head.Path if err := g.headBuf.Flush(); err != nil {