Browse Source

autofile: avoid shutdown race (#7650)

pull/7666/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
78e4c7d379
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 23 deletions
  1. +5
    -0
      internal/consensus/wal_test.go
  2. +26
    -14
      internal/libs/autofile/group.go
  3. +9
    -9
      internal/libs/autofile/group_test.go

+ 5
- 0
internal/consensus/wal_test.go View File

@ -9,6 +9,7 @@ import (
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -50,6 +51,10 @@ func TestWALTruncate(t *testing.T) {
err = WALGenerateNBlocks(ctx, t, logger, wal.Group(), 60)
require.NoError(t, err)
// put the leakcheck here so it runs after other cleanup
// functions.
t.Cleanup(leaktest.CheckTimeout(t, 500*time.Millisecond))
time.Sleep(1 * time.Millisecond) // wait groupCheckDuration, make sure RotateFile run
if err := wal.FlushAndSync(); err != nil {


+ 26
- 14
internal/libs/autofile/group.go View File

@ -56,7 +56,6 @@ assuming that marker lines are written occasionally.
type Group struct {
service.BaseService
logger log.Logger
ctx context.Context
ID string
Head *AutoFile // The head AutoFile to write to
@ -93,7 +92,6 @@ func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOpt
g := &Group{
logger: logger,
ctx: ctx,
ID: "group:" + head.ID,
Head: head,
headBuf: bufio.NewWriterSize(head, 4096*10),
@ -250,14 +248,14 @@ func (g *Group) processTicks(ctx context.Context) {
case <-ctx.Done():
return
case <-g.ticker.C:
g.checkHeadSizeLimit()
g.checkTotalSizeLimit()
g.checkHeadSizeLimit(ctx)
g.checkTotalSizeLimit(ctx)
}
}
}
// NOTE: this function is called manually in tests.
func (g *Group) checkHeadSizeLimit() {
func (g *Group) checkHeadSizeLimit(ctx context.Context) {
limit := g.HeadSizeLimit()
if limit == 0 {
return
@ -268,13 +266,15 @@ func (g *Group) checkHeadSizeLimit() {
return
}
if size >= limit {
g.RotateFile()
g.rotateFile(ctx)
}
}
func (g *Group) checkTotalSizeLimit() {
limit := g.TotalSizeLimit()
if limit == 0 {
func (g *Group) checkTotalSizeLimit(ctx context.Context) {
g.mtx.Lock()
defer g.mtx.Unlock()
if g.totalSizeLimit == 0 {
return
}
@ -282,7 +282,7 @@ func (g *Group) checkTotalSizeLimit() {
totalSize := gInfo.TotalSize
for i := 0; i < maxFilesToRemove; i++ {
index := gInfo.MinIndex + i
if totalSize < limit {
if totalSize < g.totalSizeLimit {
return
}
if index == gInfo.MaxIndex {
@ -296,8 +296,12 @@ func (g *Group) checkTotalSizeLimit() {
g.logger.Error("Failed to fetch info for file", "file", pathToRemove)
continue
}
err = os.Remove(pathToRemove)
if err != nil {
if ctx.Err() != nil {
return
}
if err = os.Remove(pathToRemove); err != nil {
g.logger.Error("Failed to remove path", "path", pathToRemove)
return
}
@ -305,8 +309,8 @@ func (g *Group) checkTotalSizeLimit() {
}
}
// RotateFile causes group to close the current head and assign it some index.
func (g *Group) RotateFile() {
// rotateFile causes group to close the current head and assign it some index.
func (g *Group) rotateFile(ctx context.Context) {
g.mtx.Lock()
defer g.mtx.Unlock()
@ -319,6 +323,10 @@ func (g *Group) RotateFile() {
panic(err)
}
err := g.Head.withLock(func() error {
if err := ctx.Err(); err != nil {
return err
}
if err := g.Head.unsyncCloseFile(); err != nil {
return err
}
@ -326,9 +334,13 @@ func (g *Group) RotateFile() {
indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
return os.Rename(headPath, indexPath)
})
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
if err != nil {
panic(err)
}
g.maxIndex++
}


+ 9
- 9
internal/libs/autofile/group_test.go View File

@ -64,7 +64,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000)
// Even calling checkHeadSizeLimit manually won't rotate it.
g.checkHeadSizeLimit()
g.checkHeadSizeLimit(ctx)
assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000)
// Write 1000 more bytes.
@ -74,7 +74,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
require.NoError(t, err)
// Calling checkHeadSizeLimit this time rolls it.
g.checkHeadSizeLimit()
g.checkHeadSizeLimit(ctx)
assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 1000000, 0)
// Write 1000 more bytes.
@ -84,7 +84,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
require.NoError(t, err)
// Calling checkHeadSizeLimit does nothing.
g.checkHeadSizeLimit()
g.checkHeadSizeLimit(ctx)
assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 1001000, 1000)
// Write 1000 bytes 999 times.
@ -97,7 +97,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2000000, 1000000)
// Calling checkHeadSizeLimit rolls it again.
g.checkHeadSizeLimit()
g.checkHeadSizeLimit(ctx)
assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2000000, 0)
// Write 1000 more bytes.
@ -108,7 +108,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2001000, 1000)
// Calling checkHeadSizeLimit does nothing.
g.checkHeadSizeLimit()
g.checkHeadSizeLimit(ctx)
assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2001000, 1000)
// Cleanup
@ -150,7 +150,7 @@ func TestRotateFile(t *testing.T) {
require.NoError(t, err)
err = g.FlushAndSync()
require.NoError(t, err)
g.RotateFile()
g.rotateFile(ctx)
err = g.WriteLine("Line 4")
require.NoError(t, err)
err = g.WriteLine("Line 5")
@ -224,7 +224,7 @@ func TestGroupReaderRead(t *testing.T) {
require.NoError(t, err)
err = g.FlushAndSync()
require.NoError(t, err)
g.RotateFile()
g.rotateFile(ctx)
frankenstein := []byte("Frankenstein's Monster")
_, err = g.Write(frankenstein)
require.NoError(t, err)
@ -262,7 +262,7 @@ func TestGroupReaderRead2(t *testing.T) {
require.NoError(t, err)
err = g.FlushAndSync()
require.NoError(t, err)
g.RotateFile()
g.rotateFile(ctx)
frankenstein := []byte("Frankenstein's Monster")
frankensteinPart := []byte("Frankenstein")
_, err = g.Write(frankensteinPart) // note writing only a part
@ -315,7 +315,7 @@ func TestMaxIndex(t *testing.T) {
require.NoError(t, err)
err = g.FlushAndSync()
require.NoError(t, err)
g.RotateFile()
g.rotateFile(ctx)
assert.Equal(t, 1, g.MaxIndex(), "MaxIndex should point to the last file")


Loading…
Cancel
Save