From 82b65868ceb34ba625612882e0bcb9e17c8e0a10 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 14 Jan 2022 13:04:01 -0500 Subject: [PATCH] node+autofile: avoid leaks detected during WAL shutdown (#7599) --- internal/consensus/replay_test.go | 2 +- internal/consensus/state.go | 2 +- internal/consensus/wal.go | 4 +-- internal/consensus/wal_test.go | 14 ++++----- internal/libs/autofile/autofile.go | 19 ++++++++---- internal/libs/autofile/autofile_test.go | 11 +++++-- internal/libs/autofile/cmd/logjack.go | 2 +- internal/libs/autofile/group.go | 4 +-- internal/libs/autofile/group_test.go | 39 +++++++++++++++++++------ node/node_test.go | 23 ++++++++------- 10 files changed, 79 insertions(+), 41 deletions(-) diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index 9b3031597..19d86dac8 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -767,7 +767,7 @@ func testHandshakeReplay( privVal, err := privval.LoadFilePV(cfg.PrivValidator.KeyFile(), cfg.PrivValidator.StateFile()) require.NoError(t, err) - wal, err := NewWAL(logger, walFile) + wal, err := NewWAL(ctx, logger, walFile) require.NoError(t, err) err = wal.Start(ctx) require.NoError(t, err) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 49559f15d..9d12ce50a 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -488,7 +488,7 @@ func (cs *State) Wait() { // OpenWAL opens a file to log all consensus messages and timeouts for // deterministic accountability. func (cs *State) OpenWAL(ctx context.Context, walFile string) (WAL, error) { - wal, err := NewWAL(cs.logger.With("wal", walFile), walFile) + wal, err := NewWAL(ctx, cs.logger.With("wal", walFile), walFile) if err != nil { cs.logger.Error("failed to open WAL", "file", walFile, "err", err) return nil, err diff --git a/internal/consensus/wal.go b/internal/consensus/wal.go index 59086e712..36993e762 100644 --- a/internal/consensus/wal.go +++ b/internal/consensus/wal.go @@ -90,13 +90,13 @@ var _ WAL = &BaseWAL{} // NewWAL returns a new write-ahead logger based on `baseWAL`, which implements // WAL. It's flushed and synced to disk every 2s and once when stopped. -func NewWAL(logger log.Logger, walFile string, groupOptions ...func(*auto.Group)) (*BaseWAL, error) { +func NewWAL(ctx context.Context, logger log.Logger, walFile string, groupOptions ...func(*auto.Group)) (*BaseWAL, error) { err := tmos.EnsureDir(filepath.Dir(walFile), 0700) if err != nil { return nil, fmt.Errorf("failed to ensure WAL directory is in place: %w", err) } - group, err := auto.OpenGroup(logger, walFile, groupOptions...) + group, err := auto.OpenGroup(ctx, logger, walFile, groupOptions...) if err != nil { return nil, err } diff --git a/internal/consensus/wal_test.go b/internal/consensus/wal_test.go index b52c41b9f..f686fece6 100644 --- a/internal/consensus/wal_test.go +++ b/internal/consensus/wal_test.go @@ -33,7 +33,7 @@ func TestWALTruncate(t *testing.T) { // defaultHeadSizeLimit(10M) is hard to simulate. // this magic number 1 * time.Millisecond make RotateFile check frequently. // defaultGroupCheckDuration(5s) is hard to simulate. - wal, err := NewWAL(logger, walFile, + wal, err := NewWAL(ctx, logger, walFile, autofile.GroupHeadSizeLimit(4096), autofile.GroupCheckDuration(1*time.Millisecond), ) @@ -103,7 +103,7 @@ func TestWALWrite(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - wal, err := NewWAL(log.TestingLogger(), walFile) + wal, err := NewWAL(ctx, log.TestingLogger(), walFile) require.NoError(t, err) err = wal.Start(ctx) require.NoError(t, err) @@ -144,7 +144,7 @@ func TestWALSearchForEndHeight(t *testing.T) { } walFile := tempWALWithData(t, walBody) - wal, err := NewWAL(logger, walFile) + wal, err := NewWAL(ctx, logger, walFile) require.NoError(t, err) h := int64(3) @@ -163,13 +163,13 @@ func TestWALSearchForEndHeight(t *testing.T) { } func TestWALPeriodicSync(t *testing.T) { - walDir := t.TempDir() - walFile := filepath.Join(walDir, "wal") - wal, err := NewWAL(log.TestingLogger(), walFile, autofile.GroupCheckDuration(1*time.Millisecond)) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + walDir := t.TempDir() + walFile := filepath.Join(walDir, "wal") + wal, err := NewWAL(ctx, log.TestingLogger(), walFile, autofile.GroupCheckDuration(1*time.Millisecond)) + require.NoError(t, err) wal.SetFlushInterval(walTestFlushInterval) diff --git a/internal/libs/autofile/autofile.go b/internal/libs/autofile/autofile.go index 10cc04a28..0bc9a63a3 100644 --- a/internal/libs/autofile/autofile.go +++ b/internal/libs/autofile/autofile.go @@ -1,6 +1,7 @@ package autofile import ( + "context" "os" "os/signal" "path/filepath" @@ -57,7 +58,7 @@ type AutoFile struct { // OpenAutoFile creates an AutoFile in the path (with random ID). If there is // an error, it will be of type *PathError or *ErrPermissionsChanged (if file's // permissions got changed (should be 0600)). -func OpenAutoFile(path string) (*AutoFile, error) { +func OpenAutoFile(ctx context.Context, path string) (*AutoFile, error) { var err error path, err = filepath.Abs(path) if err != nil { @@ -78,12 +79,17 @@ func OpenAutoFile(path string) (*AutoFile, error) { af.hupc = make(chan os.Signal, 1) signal.Notify(af.hupc, syscall.SIGHUP) go func() { - for range af.hupc { - _ = af.closeFile() + for { + select { + case <-af.hupc: + _ = af.closeFile() + case <-ctx.Done(): + return + } } }() - go af.closeFileRoutine() + go af.closeFileRoutine(ctx) return af, nil } @@ -99,9 +105,12 @@ func (af *AutoFile) Close() error { return af.closeFile() } -func (af *AutoFile) closeFileRoutine() { +func (af *AutoFile) closeFileRoutine(ctx context.Context) { for { select { + case <-ctx.Done(): + _ = af.closeFile() + return case <-af.closeTicker.C: _ = af.closeFile() case <-af.closeTickerStopc: diff --git a/internal/libs/autofile/autofile_test.go b/internal/libs/autofile/autofile_test.go index 479a239cb..9864ed82a 100644 --- a/internal/libs/autofile/autofile_test.go +++ b/internal/libs/autofile/autofile_test.go @@ -1,6 +1,7 @@ package autofile import ( + "context" "os" "path/filepath" "syscall" @@ -12,6 +13,9 @@ import ( ) func TestSIGHUP(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + origDir, err := os.Getwd() require.NoError(t, err) t.Cleanup(func() { @@ -30,7 +34,7 @@ func TestSIGHUP(t *testing.T) { // Create an AutoFile in the temporary directory name := "sighup_test" - af, err := OpenAutoFile(name) + af, err := OpenAutoFile(ctx, name) require.NoError(t, err) require.True(t, filepath.IsAbs(af.Path)) @@ -104,13 +108,16 @@ func TestSIGHUP(t *testing.T) { // } func TestAutoFileSize(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // First, create an AutoFile writing to a tempfile dir f, err := os.CreateTemp("", "sighup_test") require.NoError(t, err) require.NoError(t, f.Close()) // Here is the actual AutoFile. - af, err := OpenAutoFile(f.Name()) + af, err := OpenAutoFile(ctx, f.Name()) require.NoError(t, err) // 1. Empty file diff --git a/internal/libs/autofile/cmd/logjack.go b/internal/libs/autofile/cmd/logjack.go index c246871dc..a9f6cf766 100644 --- a/internal/libs/autofile/cmd/logjack.go +++ b/internal/libs/autofile/cmd/logjack.go @@ -48,7 +48,7 @@ func main() { } // Open Group - group, err := auto.OpenGroup(log.NewNopLogger(), headPath, auto.GroupHeadSizeLimit(chopSize), auto.GroupTotalSizeLimit(limitSize)) + group, err := auto.OpenGroup(ctx, log.NewNopLogger(), headPath, auto.GroupHeadSizeLimit(chopSize), auto.GroupTotalSizeLimit(limitSize)) if err != nil { fmt.Printf("logjack couldn't create output file %v\n", headPath) os.Exit(1) diff --git a/internal/libs/autofile/group.go b/internal/libs/autofile/group.go index b8bbb78bd..0ffc2f04c 100644 --- a/internal/libs/autofile/group.go +++ b/internal/libs/autofile/group.go @@ -80,12 +80,12 @@ type Group struct { // OpenGroup creates a new Group with head at headPath. It returns an error if // it fails to open head file. -func OpenGroup(logger log.Logger, headPath string, groupOptions ...func(*Group)) (*Group, error) { +func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOptions ...func(*Group)) (*Group, error) { dir, err := filepath.Abs(filepath.Dir(headPath)) if err != nil { return nil, err } - head, err := OpenAutoFile(headPath) + head, err := OpenAutoFile(ctx, headPath) if err != nil { return nil, err } diff --git a/internal/libs/autofile/group_test.go b/internal/libs/autofile/group_test.go index c4e068af9..328201780 100644 --- a/internal/libs/autofile/group_test.go +++ b/internal/libs/autofile/group_test.go @@ -1,6 +1,7 @@ package autofile import ( + "context" "io" "os" "path/filepath" @@ -14,14 +15,14 @@ import ( tmrand "github.com/tendermint/tendermint/libs/rand" ) -func createTestGroupWithHeadSizeLimit(t *testing.T, logger log.Logger, headSizeLimit int64) *Group { +func createTestGroupWithHeadSizeLimit(ctx context.Context, t *testing.T, logger log.Logger, headSizeLimit int64) *Group { testID := tmrand.Str(12) testDir := "_test_" + testID err := tmos.EnsureDir(testDir, 0700) require.NoError(t, err, "Error creating dir") headPath := testDir + "/myfile" - g, err := OpenGroup(logger, headPath, GroupHeadSizeLimit(headSizeLimit)) + g, err := OpenGroup(ctx, logger, headPath, GroupHeadSizeLimit(headSizeLimit)) require.NoError(t, err, "Error opening Group") require.NotEqual(t, nil, g, "Failed to create Group") @@ -43,9 +44,12 @@ func assertGroupInfo(t *testing.T, gInfo GroupInfo, minIndex, maxIndex int, tota } func TestCheckHeadSizeLimit(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := log.TestingLogger() - g := createTestGroupWithHeadSizeLimit(t, logger, 1000*1000) + g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 1000*1000) // At first, there are no files. assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 0, 0) @@ -114,7 +118,9 @@ func TestCheckHeadSizeLimit(t *testing.T) { func TestRotateFile(t *testing.T) { logger := log.TestingLogger() - g := createTestGroupWithHeadSizeLimit(t, logger, 0) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 0) // Create a different temporary directory and move into it, to make sure // relative paths are resolved at Group creation @@ -180,7 +186,10 @@ func TestRotateFile(t *testing.T) { func TestWrite(t *testing.T) { logger := log.TestingLogger() - g := createTestGroupWithHeadSizeLimit(t, logger, 0) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 0) written := []byte("Medusa") _, err := g.Write(written) @@ -205,7 +214,10 @@ func TestWrite(t *testing.T) { func TestGroupReaderRead(t *testing.T) { logger := log.TestingLogger() - g := createTestGroupWithHeadSizeLimit(t, logger, 0) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 0) professor := []byte("Professor Monster") _, err := g.Write(professor) @@ -240,7 +252,10 @@ func TestGroupReaderRead(t *testing.T) { func TestGroupReaderRead2(t *testing.T) { logger := log.TestingLogger() - g := createTestGroupWithHeadSizeLimit(t, logger, 0) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 0) professor := []byte("Professor Monster") _, err := g.Write(professor) @@ -276,7 +291,10 @@ func TestGroupReaderRead2(t *testing.T) { func TestMinIndex(t *testing.T) { logger := log.TestingLogger() - g := createTestGroupWithHeadSizeLimit(t, logger, 0) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 0) assert.Zero(t, g.MinIndex(), "MinIndex should be zero at the beginning") @@ -286,7 +304,10 @@ func TestMinIndex(t *testing.T) { func TestMaxIndex(t *testing.T) { logger := log.TestingLogger() - g := createTestGroupWithHeadSizeLimit(t, logger, 0) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 0) assert.Zero(t, g.MaxIndex(), "MaxIndex should be zero at the beginning") diff --git a/node/node_test.go b/node/node_test.go index bfc3aa907..c563906b3 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -47,7 +47,7 @@ func TestNodeStartStop(t *testing.T) { ctx, bcancel := context.WithCancel(context.Background()) defer bcancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() // create & start node ns, err := newDefaultNode(ctx, cfg, logger) require.NoError(t, err) @@ -98,6 +98,7 @@ func getTestNode(ctx context.Context, t *testing.T, conf *config.Config, logger ns.Wait() } }) + t.Cleanup(leaktest.CheckTimeout(t, time.Second)) return n } @@ -112,7 +113,7 @@ func TestNodeDelayedStart(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() // create & start node n := getTestNode(ctx, t, cfg, logger) @@ -132,7 +133,7 @@ func TestNodeSetAppVersion(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() // create node n := getTestNode(ctx, t, cfg, logger) @@ -156,7 +157,7 @@ func TestNodeSetPrivValTCP(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() cfg, err := config.ResetTestRoot("node_priv_val_tcp_test") require.NoError(t, err) @@ -200,7 +201,7 @@ func TestPrivValidatorListenAddrNoProtocol(t *testing.T) { defer os.RemoveAll(cfg.RootDir) cfg.PrivValidator.ListenAddr = addrNoPrefix - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() n, err := newDefaultNode(ctx, cfg, logger) @@ -224,7 +225,7 @@ func TestNodeSetPrivValIPC(t *testing.T) { defer os.RemoveAll(cfg.RootDir) cfg.PrivValidator.ListenAddr = "unix://" + tmpfile - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() dialer := privval.DialUnixFn(tmpfile) dialerEndpoint := privval.NewSignerDialerEndpoint(logger, dialer) @@ -270,7 +271,7 @@ func TestCreateProposalBlock(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(cfg.RootDir) - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() cc := abciclient.NewLocalCreator(kvstore.NewApplication()) proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) @@ -367,7 +368,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { defer os.RemoveAll(cfg.RootDir) - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() cc := abciclient.NewLocalCreator(kvstore.NewApplication()) proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) @@ -433,7 +434,7 @@ func TestMaxProposalBlockSize(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(cfg.RootDir) - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() cc := abciclient.NewLocalCreator(kvstore.NewApplication()) proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) @@ -554,7 +555,7 @@ func TestNodeNewSeedNode(t *testing.T) { nodeKey, err := types.LoadOrGenNodeKey(cfg.NodeKeyFile()) require.NoError(t, err) - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() ns, err := makeSeedNode(ctx, cfg, @@ -588,7 +589,7 @@ func TestNodeSetEventSink(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() setupTest := func(t *testing.T, conf *config.Config) []indexer.EventSink { eventBus := eventbus.NewDefault(logger.With("module", "events"))