Browse Source

consensus: improve wal test cleanup

pull/8059/head
tycho garen 3 years ago
parent
commit
ded0d9a517
2 changed files with 25 additions and 6 deletions
  1. +12
    -6
      internal/consensus/wal_test.go
  2. +13
    -0
      internal/libs/autofile/group.go

+ 12
- 6
internal/consensus/wal_test.go View File

@ -3,6 +3,7 @@ package consensus
import ( import (
"bytes" "bytes"
"context" "context"
"os"
"path/filepath" "path/filepath"
"testing" "testing"
@ -41,7 +42,7 @@ func TestWALTruncate(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
err = wal.Start(ctx) err = wal.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(wal.Wait)
t.Cleanup(func() { wal.Stop(); wal.Group().Wait(); wal.Wait() })
// 60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), // 60 block's size nearly 70K, greater than group's headBuf size(4096 * 10),
// when headBuf is full, truncate content will Flush to the file. at this // when headBuf is full, truncate content will Flush to the file. at this
@ -112,7 +113,7 @@ func TestWALWrite(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
err = wal.Start(ctx) err = wal.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(wal.Wait)
t.Cleanup(func() { wal.Stop(); wal.Group().Wait(); wal.Wait() })
// 1) Write returns an error if msg is too big // 1) Write returns an error if msg is too big
msg := &BlockPartMessage{ msg := &BlockPartMessage{
@ -151,7 +152,7 @@ func TestWALSearchForEndHeight(t *testing.T) {
wal, err := NewWAL(ctx, logger, walFile) wal, err := NewWAL(ctx, logger, walFile)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { wal.Stop(); wal.Wait() })
t.Cleanup(func() { wal.Stop(); wal.Group().Wait(); wal.Wait() })
h := int64(3) h := int64(3)
gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{}) gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{})
@ -176,8 +177,9 @@ func TestWALPeriodicSync(t *testing.T) {
walDir := t.TempDir() walDir := t.TempDir()
walFile := filepath.Join(walDir, "wal") walFile := filepath.Join(walDir, "wal")
wal, err := NewWAL(ctx, log.TestingLogger(), walFile, autofile.GroupCheckDuration(1*time.Millisecond))
defer os.RemoveAll(walFile)
wal, err := NewWAL(ctx, log.TestingLogger(), walFile, autofile.GroupCheckDuration(250*time.Millisecond))
require.NoError(t, err) require.NoError(t, err)
wal.SetFlushInterval(walTestFlushInterval) wal.SetFlushInterval(walTestFlushInterval)
@ -191,9 +193,13 @@ func TestWALPeriodicSync(t *testing.T) {
assert.NotZero(t, wal.Group().Buffered()) assert.NotZero(t, wal.Group().Buffered())
require.NoError(t, wal.Start(ctx)) require.NoError(t, wal.Start(ctx))
t.Cleanup(func() { wal.Stop(); wal.Wait() })
defer wal.Stop()
t.Cleanup(func() {
wal.Group().Wait()
wal.Wait()
})
time.Sleep(walTestFlushInterval + (10 * time.Millisecond))
time.Sleep(walTestFlushInterval + (20 * time.Millisecond))
// The data should have been flushed by the periodic sync // The data should have been flushed by the periodic sync
assert.Zero(t, wal.Group().Buffered()) assert.Zero(t, wal.Group().Buffered())


+ 13
- 0
internal/libs/autofile/group.go View File

@ -274,6 +274,10 @@ func (g *Group) checkTotalSizeLimit(ctx context.Context) {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
if err := ctx.Err(); err != nil {
return
}
if g.totalSizeLimit == 0 { if g.totalSizeLimit == 0 {
return return
} }
@ -290,6 +294,11 @@ func (g *Group) checkTotalSizeLimit(ctx context.Context) {
g.logger.Error("Group's head may grow without bound", "head", g.Head.Path) g.logger.Error("Group's head may grow without bound", "head", g.Head.Path)
return return
} }
if ctx.Err() != nil {
return
}
pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex) pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
fInfo, err := os.Stat(pathToRemove) fInfo, err := os.Stat(pathToRemove)
if err != nil { if err != nil {
@ -314,6 +323,10 @@ func (g *Group) rotateFile(ctx context.Context) {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
if err := ctx.Err(); err != nil {
return
}
headPath := g.Head.Path headPath := g.Head.Path
if err := g.headBuf.Flush(); err != nil { if err := g.headBuf.Flush(); err != nil {


Loading…
Cancel
Save