Browse Source

node+autofile: avoid leaks detected during WAL shutdown (#7599)

pull/7601/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
82b65868ce
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 79 additions and 41 deletions
  1. +1
    -1
      internal/consensus/replay_test.go
  2. +1
    -1
      internal/consensus/state.go
  3. +2
    -2
      internal/consensus/wal.go
  4. +7
    -7
      internal/consensus/wal_test.go
  5. +14
    -5
      internal/libs/autofile/autofile.go
  6. +9
    -2
      internal/libs/autofile/autofile_test.go
  7. +1
    -1
      internal/libs/autofile/cmd/logjack.go
  8. +2
    -2
      internal/libs/autofile/group.go
  9. +30
    -9
      internal/libs/autofile/group_test.go
  10. +12
    -11
      node/node_test.go

+ 1
- 1
internal/consensus/replay_test.go View File

@ -767,7 +767,7 @@ func testHandshakeReplay(
privVal, err := privval.LoadFilePV(cfg.PrivValidator.KeyFile(), cfg.PrivValidator.StateFile())
require.NoError(t, err)
wal, err := NewWAL(logger, walFile)
wal, err := NewWAL(ctx, logger, walFile)
require.NoError(t, err)
err = wal.Start(ctx)
require.NoError(t, err)


+ 1
- 1
internal/consensus/state.go View File

@ -488,7 +488,7 @@ func (cs *State) Wait() {
// OpenWAL opens a file to log all consensus messages and timeouts for
// deterministic accountability.
func (cs *State) OpenWAL(ctx context.Context, walFile string) (WAL, error) {
wal, err := NewWAL(cs.logger.With("wal", walFile), walFile)
wal, err := NewWAL(ctx, cs.logger.With("wal", walFile), walFile)
if err != nil {
cs.logger.Error("failed to open WAL", "file", walFile, "err", err)
return nil, err


+ 2
- 2
internal/consensus/wal.go View File

@ -90,13 +90,13 @@ var _ WAL = &BaseWAL{}
// NewWAL returns a new write-ahead logger based on `baseWAL`, which implements
// WAL. It's flushed and synced to disk every 2s and once when stopped.
func NewWAL(logger log.Logger, walFile string, groupOptions ...func(*auto.Group)) (*BaseWAL, error) {
func NewWAL(ctx context.Context, logger log.Logger, walFile string, groupOptions ...func(*auto.Group)) (*BaseWAL, error) {
err := tmos.EnsureDir(filepath.Dir(walFile), 0700)
if err != nil {
return nil, fmt.Errorf("failed to ensure WAL directory is in place: %w", err)
}
group, err := auto.OpenGroup(logger, walFile, groupOptions...)
group, err := auto.OpenGroup(ctx, logger, walFile, groupOptions...)
if err != nil {
return nil, err
}


+ 7
- 7
internal/consensus/wal_test.go View File

@ -33,7 +33,7 @@ func TestWALTruncate(t *testing.T) {
// defaultHeadSizeLimit(10M) is hard to simulate.
// this magic number 1 * time.Millisecond make RotateFile check frequently.
// defaultGroupCheckDuration(5s) is hard to simulate.
wal, err := NewWAL(logger, walFile,
wal, err := NewWAL(ctx, logger, walFile,
autofile.GroupHeadSizeLimit(4096),
autofile.GroupCheckDuration(1*time.Millisecond),
)
@ -103,7 +103,7 @@ func TestWALWrite(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wal, err := NewWAL(log.TestingLogger(), walFile)
wal, err := NewWAL(ctx, log.TestingLogger(), walFile)
require.NoError(t, err)
err = wal.Start(ctx)
require.NoError(t, err)
@ -144,7 +144,7 @@ func TestWALSearchForEndHeight(t *testing.T) {
}
walFile := tempWALWithData(t, walBody)
wal, err := NewWAL(logger, walFile)
wal, err := NewWAL(ctx, logger, walFile)
require.NoError(t, err)
h := int64(3)
@ -163,13 +163,13 @@ func TestWALSearchForEndHeight(t *testing.T) {
}
func TestWALPeriodicSync(t *testing.T) {
walDir := t.TempDir()
walFile := filepath.Join(walDir, "wal")
wal, err := NewWAL(log.TestingLogger(), walFile, autofile.GroupCheckDuration(1*time.Millisecond))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
walDir := t.TempDir()
walFile := filepath.Join(walDir, "wal")
wal, err := NewWAL(ctx, log.TestingLogger(), walFile, autofile.GroupCheckDuration(1*time.Millisecond))
require.NoError(t, err)
wal.SetFlushInterval(walTestFlushInterval)


+ 14
- 5
internal/libs/autofile/autofile.go View File

@ -1,6 +1,7 @@
package autofile
import (
"context"
"os"
"os/signal"
"path/filepath"
@ -57,7 +58,7 @@ type AutoFile struct {
// OpenAutoFile creates an AutoFile in the path (with random ID). If there is
// an error, it will be of type *PathError or *ErrPermissionsChanged (if file's
// permissions got changed (should be 0600)).
func OpenAutoFile(path string) (*AutoFile, error) {
func OpenAutoFile(ctx context.Context, path string) (*AutoFile, error) {
var err error
path, err = filepath.Abs(path)
if err != nil {
@ -78,12 +79,17 @@ func OpenAutoFile(path string) (*AutoFile, error) {
af.hupc = make(chan os.Signal, 1)
signal.Notify(af.hupc, syscall.SIGHUP)
go func() {
for range af.hupc {
_ = af.closeFile()
for {
select {
case <-af.hupc:
_ = af.closeFile()
case <-ctx.Done():
return
}
}
}()
go af.closeFileRoutine()
go af.closeFileRoutine(ctx)
return af, nil
}
@ -99,9 +105,12 @@ func (af *AutoFile) Close() error {
return af.closeFile()
}
func (af *AutoFile) closeFileRoutine() {
func (af *AutoFile) closeFileRoutine(ctx context.Context) {
for {
select {
case <-ctx.Done():
_ = af.closeFile()
return
case <-af.closeTicker.C:
_ = af.closeFile()
case <-af.closeTickerStopc:


+ 9
- 2
internal/libs/autofile/autofile_test.go View File

@ -1,6 +1,7 @@
package autofile
import (
"context"
"os"
"path/filepath"
"syscall"
@ -12,6 +13,9 @@ import (
)
func TestSIGHUP(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
origDir, err := os.Getwd()
require.NoError(t, err)
t.Cleanup(func() {
@ -30,7 +34,7 @@ func TestSIGHUP(t *testing.T) {
// Create an AutoFile in the temporary directory
name := "sighup_test"
af, err := OpenAutoFile(name)
af, err := OpenAutoFile(ctx, name)
require.NoError(t, err)
require.True(t, filepath.IsAbs(af.Path))
@ -104,13 +108,16 @@ func TestSIGHUP(t *testing.T) {
// }
func TestAutoFileSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// First, create an AutoFile writing to a tempfile dir
f, err := os.CreateTemp("", "sighup_test")
require.NoError(t, err)
require.NoError(t, f.Close())
// Here is the actual AutoFile.
af, err := OpenAutoFile(f.Name())
af, err := OpenAutoFile(ctx, f.Name())
require.NoError(t, err)
// 1. Empty file


+ 1
- 1
internal/libs/autofile/cmd/logjack.go View File

@ -48,7 +48,7 @@ func main() {
}
// Open Group
group, err := auto.OpenGroup(log.NewNopLogger(), headPath, auto.GroupHeadSizeLimit(chopSize), auto.GroupTotalSizeLimit(limitSize))
group, err := auto.OpenGroup(ctx, log.NewNopLogger(), headPath, auto.GroupHeadSizeLimit(chopSize), auto.GroupTotalSizeLimit(limitSize))
if err != nil {
fmt.Printf("logjack couldn't create output file %v\n", headPath)
os.Exit(1)


+ 2
- 2
internal/libs/autofile/group.go View File

@ -80,12 +80,12 @@ type Group struct {
// OpenGroup creates a new Group with head at headPath. It returns an error if
// it fails to open head file.
func OpenGroup(logger log.Logger, headPath string, groupOptions ...func(*Group)) (*Group, error) {
func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOptions ...func(*Group)) (*Group, error) {
dir, err := filepath.Abs(filepath.Dir(headPath))
if err != nil {
return nil, err
}
head, err := OpenAutoFile(headPath)
head, err := OpenAutoFile(ctx, headPath)
if err != nil {
return nil, err
}


+ 30
- 9
internal/libs/autofile/group_test.go View File

@ -1,6 +1,7 @@
package autofile
import (
"context"
"io"
"os"
"path/filepath"
@ -14,14 +15,14 @@ import (
tmrand "github.com/tendermint/tendermint/libs/rand"
)
func createTestGroupWithHeadSizeLimit(t *testing.T, logger log.Logger, headSizeLimit int64) *Group {
func createTestGroupWithHeadSizeLimit(ctx context.Context, t *testing.T, logger log.Logger, headSizeLimit int64) *Group {
testID := tmrand.Str(12)
testDir := "_test_" + testID
err := tmos.EnsureDir(testDir, 0700)
require.NoError(t, err, "Error creating dir")
headPath := testDir + "/myfile"
g, err := OpenGroup(logger, headPath, GroupHeadSizeLimit(headSizeLimit))
g, err := OpenGroup(ctx, logger, headPath, GroupHeadSizeLimit(headSizeLimit))
require.NoError(t, err, "Error opening Group")
require.NotEqual(t, nil, g, "Failed to create Group")
@ -43,9 +44,12 @@ func assertGroupInfo(t *testing.T, gInfo GroupInfo, minIndex, maxIndex int, tota
}
func TestCheckHeadSizeLimit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
g := createTestGroupWithHeadSizeLimit(t, logger, 1000*1000)
g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 1000*1000)
// At first, there are no files.
assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 0, 0)
@ -114,7 +118,9 @@ func TestCheckHeadSizeLimit(t *testing.T) {
func TestRotateFile(t *testing.T) {
logger := log.TestingLogger()
g := createTestGroupWithHeadSizeLimit(t, logger, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 0)
// Create a different temporary directory and move into it, to make sure
// relative paths are resolved at Group creation
@ -180,7 +186,10 @@ func TestRotateFile(t *testing.T) {
func TestWrite(t *testing.T) {
logger := log.TestingLogger()
g := createTestGroupWithHeadSizeLimit(t, logger, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 0)
written := []byte("Medusa")
_, err := g.Write(written)
@ -205,7 +214,10 @@ func TestWrite(t *testing.T) {
func TestGroupReaderRead(t *testing.T) {
logger := log.TestingLogger()
g := createTestGroupWithHeadSizeLimit(t, logger, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 0)
professor := []byte("Professor Monster")
_, err := g.Write(professor)
@ -240,7 +252,10 @@ func TestGroupReaderRead(t *testing.T) {
func TestGroupReaderRead2(t *testing.T) {
logger := log.TestingLogger()
g := createTestGroupWithHeadSizeLimit(t, logger, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 0)
professor := []byte("Professor Monster")
_, err := g.Write(professor)
@ -276,7 +291,10 @@ func TestGroupReaderRead2(t *testing.T) {
func TestMinIndex(t *testing.T) {
logger := log.TestingLogger()
g := createTestGroupWithHeadSizeLimit(t, logger, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 0)
assert.Zero(t, g.MinIndex(), "MinIndex should be zero at the beginning")
@ -286,7 +304,10 @@ func TestMinIndex(t *testing.T) {
func TestMaxIndex(t *testing.T) {
logger := log.TestingLogger()
g := createTestGroupWithHeadSizeLimit(t, logger, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 0)
assert.Zero(t, g.MaxIndex(), "MaxIndex should be zero at the beginning")


+ 12
- 11
node/node_test.go View File

@ -47,7 +47,7 @@ func TestNodeStartStop(t *testing.T) {
ctx, bcancel := context.WithCancel(context.Background())
defer bcancel()
logger := log.NewTestingLogger(t)
logger := log.NewNopLogger()
// create & start node
ns, err := newDefaultNode(ctx, cfg, logger)
require.NoError(t, err)
@ -98,6 +98,7 @@ func getTestNode(ctx context.Context, t *testing.T, conf *config.Config, logger
ns.Wait()
}
})
t.Cleanup(leaktest.CheckTimeout(t, time.Second))
return n
}
@ -112,7 +113,7 @@ func TestNodeDelayedStart(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
logger := log.NewNopLogger()
// create & start node
n := getTestNode(ctx, t, cfg, logger)
@ -132,7 +133,7 @@ func TestNodeSetAppVersion(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
logger := log.NewNopLogger()
// create node
n := getTestNode(ctx, t, cfg, logger)
@ -156,7 +157,7 @@ func TestNodeSetPrivValTCP(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
logger := log.NewNopLogger()
cfg, err := config.ResetTestRoot("node_priv_val_tcp_test")
require.NoError(t, err)
@ -200,7 +201,7 @@ func TestPrivValidatorListenAddrNoProtocol(t *testing.T) {
defer os.RemoveAll(cfg.RootDir)
cfg.PrivValidator.ListenAddr = addrNoPrefix
logger := log.NewTestingLogger(t)
logger := log.NewNopLogger()
n, err := newDefaultNode(ctx, cfg, logger)
@ -224,7 +225,7 @@ func TestNodeSetPrivValIPC(t *testing.T) {
defer os.RemoveAll(cfg.RootDir)
cfg.PrivValidator.ListenAddr = "unix://" + tmpfile
logger := log.NewTestingLogger(t)
logger := log.NewNopLogger()
dialer := privval.DialUnixFn(tmpfile)
dialerEndpoint := privval.NewSignerDialerEndpoint(logger, dialer)
@ -270,7 +271,7 @@ func TestCreateProposalBlock(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
logger := log.NewTestingLogger(t)
logger := log.NewNopLogger()
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
@ -367,7 +368,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
defer os.RemoveAll(cfg.RootDir)
logger := log.NewTestingLogger(t)
logger := log.NewNopLogger()
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
@ -433,7 +434,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
logger := log.NewTestingLogger(t)
logger := log.NewNopLogger()
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
@ -554,7 +555,7 @@ func TestNodeNewSeedNode(t *testing.T) {
nodeKey, err := types.LoadOrGenNodeKey(cfg.NodeKeyFile())
require.NoError(t, err)
logger := log.NewTestingLogger(t)
logger := log.NewNopLogger()
ns, err := makeSeedNode(ctx,
cfg,
@ -588,7 +589,7 @@ func TestNodeSetEventSink(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
logger := log.NewNopLogger()
setupTest := func(t *testing.T, conf *config.Config) []indexer.EventSink {
eventBus := eventbus.NewDefault(logger.With("module", "events"))


Loading…
Cancel
Save