diff --git a/internal/consensus/wal_test.go b/internal/consensus/wal_test.go index 24b1d3dfc..8a39b31b5 100644 --- a/internal/consensus/wal_test.go +++ b/internal/consensus/wal_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -50,6 +51,10 @@ func TestWALTruncate(t *testing.T) { err = WALGenerateNBlocks(ctx, t, logger, wal.Group(), 60) require.NoError(t, err) + // put the leakcheck here so it runs after other cleanup + // functions. + t.Cleanup(leaktest.CheckTimeout(t, 500*time.Millisecond)) + time.Sleep(1 * time.Millisecond) // wait groupCheckDuration, make sure RotateFile run if err := wal.FlushAndSync(); err != nil { diff --git a/internal/libs/autofile/group.go b/internal/libs/autofile/group.go index 4aa6f2cb5..1b4418d59 100644 --- a/internal/libs/autofile/group.go +++ b/internal/libs/autofile/group.go @@ -56,7 +56,6 @@ assuming that marker lines are written occasionally. type Group struct { service.BaseService logger log.Logger - ctx context.Context ID string Head *AutoFile // The head AutoFile to write to @@ -93,7 +92,6 @@ func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOpt g := &Group{ logger: logger, - ctx: ctx, ID: "group:" + head.ID, Head: head, headBuf: bufio.NewWriterSize(head, 4096*10), @@ -250,14 +248,14 @@ func (g *Group) processTicks(ctx context.Context) { case <-ctx.Done(): return case <-g.ticker.C: - g.checkHeadSizeLimit() - g.checkTotalSizeLimit() + g.checkHeadSizeLimit(ctx) + g.checkTotalSizeLimit(ctx) } } } // NOTE: this function is called manually in tests. -func (g *Group) checkHeadSizeLimit() { +func (g *Group) checkHeadSizeLimit(ctx context.Context) { limit := g.HeadSizeLimit() if limit == 0 { return @@ -268,13 +266,15 @@ func (g *Group) checkHeadSizeLimit() { return } if size >= limit { - g.RotateFile() + g.rotateFile(ctx) } } -func (g *Group) checkTotalSizeLimit() { - limit := g.TotalSizeLimit() - if limit == 0 { +func (g *Group) checkTotalSizeLimit(ctx context.Context) { + g.mtx.Lock() + defer g.mtx.Unlock() + + if g.totalSizeLimit == 0 { return } @@ -282,7 +282,7 @@ func (g *Group) checkTotalSizeLimit() { totalSize := gInfo.TotalSize for i := 0; i < maxFilesToRemove; i++ { index := gInfo.MinIndex + i - if totalSize < limit { + if totalSize < g.totalSizeLimit { return } if index == gInfo.MaxIndex { @@ -296,8 +296,12 @@ func (g *Group) checkTotalSizeLimit() { g.logger.Error("Failed to fetch info for file", "file", pathToRemove) continue } - err = os.Remove(pathToRemove) - if err != nil { + + if ctx.Err() != nil { + return + } + + if err = os.Remove(pathToRemove); err != nil { g.logger.Error("Failed to remove path", "path", pathToRemove) return } @@ -305,8 +309,8 @@ func (g *Group) checkTotalSizeLimit() { } } -// RotateFile causes group to close the current head and assign it some index. -func (g *Group) RotateFile() { +// rotateFile causes group to close the current head and assign it some index. +func (g *Group) rotateFile(ctx context.Context) { g.mtx.Lock() defer g.mtx.Unlock() @@ -319,6 +323,10 @@ func (g *Group) RotateFile() { panic(err) } err := g.Head.withLock(func() error { + if err := ctx.Err(); err != nil { + return err + } + if err := g.Head.unsyncCloseFile(); err != nil { return err } @@ -326,9 +334,13 @@ func (g *Group) RotateFile() { indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1) return os.Rename(headPath, indexPath) }) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } if err != nil { panic(err) } + g.maxIndex++ } diff --git a/internal/libs/autofile/group_test.go b/internal/libs/autofile/group_test.go index 328201780..f6b3eaab6 100644 --- a/internal/libs/autofile/group_test.go +++ b/internal/libs/autofile/group_test.go @@ -64,7 +64,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000) // Even calling checkHeadSizeLimit manually won't rotate it. - g.checkHeadSizeLimit() + g.checkHeadSizeLimit(ctx) assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000) // Write 1000 more bytes. @@ -74,7 +74,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { require.NoError(t, err) // Calling checkHeadSizeLimit this time rolls it. - g.checkHeadSizeLimit() + g.checkHeadSizeLimit(ctx) assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 1000000, 0) // Write 1000 more bytes. @@ -84,7 +84,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { require.NoError(t, err) // Calling checkHeadSizeLimit does nothing. - g.checkHeadSizeLimit() + g.checkHeadSizeLimit(ctx) assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 1001000, 1000) // Write 1000 bytes 999 times. @@ -97,7 +97,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2000000, 1000000) // Calling checkHeadSizeLimit rolls it again. - g.checkHeadSizeLimit() + g.checkHeadSizeLimit(ctx) assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2000000, 0) // Write 1000 more bytes. @@ -108,7 +108,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2001000, 1000) // Calling checkHeadSizeLimit does nothing. - g.checkHeadSizeLimit() + g.checkHeadSizeLimit(ctx) assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2001000, 1000) // Cleanup @@ -150,7 +150,7 @@ func TestRotateFile(t *testing.T) { require.NoError(t, err) err = g.FlushAndSync() require.NoError(t, err) - g.RotateFile() + g.rotateFile(ctx) err = g.WriteLine("Line 4") require.NoError(t, err) err = g.WriteLine("Line 5") @@ -224,7 +224,7 @@ func TestGroupReaderRead(t *testing.T) { require.NoError(t, err) err = g.FlushAndSync() require.NoError(t, err) - g.RotateFile() + g.rotateFile(ctx) frankenstein := []byte("Frankenstein's Monster") _, err = g.Write(frankenstein) require.NoError(t, err) @@ -262,7 +262,7 @@ func TestGroupReaderRead2(t *testing.T) { require.NoError(t, err) err = g.FlushAndSync() require.NoError(t, err) - g.RotateFile() + g.rotateFile(ctx) frankenstein := []byte("Frankenstein's Monster") frankensteinPart := []byte("Frankenstein") _, err = g.Write(frankensteinPart) // note writing only a part @@ -315,7 +315,7 @@ func TestMaxIndex(t *testing.T) { require.NoError(t, err) err = g.FlushAndSync() require.NoError(t, err) - g.RotateFile() + g.rotateFile(ctx) assert.Equal(t, 1, g.MaxIndex(), "MaxIndex should point to the last file")