Browse Source

consensus: improve wal test cleanup (#8059)

I believe that this gets rid of our temp-file related test errors.
pull/8064/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
a3881f0fb1
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 30 deletions
  1. +9
    -2
      internal/consensus/common_test.go
  2. +22
    -18
      internal/consensus/wal_generator.go
  3. +9
    -10
      internal/consensus/wal_test.go
  4. +13
    -0
      internal/libs/autofile/group.go

+ 9
- 2
internal/consensus/common_test.go View File

@ -370,7 +370,11 @@ func subscribeToVoter(ctx context.Context, t *testing.T, cs *State, addr []byte)
vote := msg.Data().(types.EventDataVote) vote := msg.Data().(types.EventDataVote)
// we only fire for our own votes // we only fire for our own votes
if bytes.Equal(addr, vote.Vote.ValidatorAddress) { if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
ch <- msg
select {
case <-ctx.Done():
return ctx.Err()
case ch <- msg:
}
} }
return nil return nil
}, types.EventQueryVote); err != nil { }, types.EventQueryVote); err != nil {
@ -401,7 +405,10 @@ func subscribeToVoterBuffered(ctx context.Context, t *testing.T, cs *State, addr
vote := msg.Data().(types.EventDataVote) vote := msg.Data().(types.EventDataVote)
// we only fire for our own votes // we only fire for our own votes
if bytes.Equal(addr, vote.Vote.ValidatorAddress) { if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
ch <- msg
select {
case <-ctx.Done():
case ch <- msg:
}
} }
} }
}() }()


+ 22
- 18
internal/consensus/wal_generator.go View File

@ -30,8 +30,10 @@ import (
// stripped down version of node (proxy app, event bus, consensus state) with a // stripped down version of node (proxy app, event bus, consensus state) with a
// persistent kvstore application and special consensus wal instance // persistent kvstore application and special consensus wal instance
// (byteBufferWAL) and waits until numBlocks are created. // (byteBufferWAL) and waits until numBlocks are created.
// If the node fails to produce given numBlocks, it returns an error.
func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr io.Writer, numBlocks int) (err error) {
// If the node fails to produce given numBlocks, it fails the test.
func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr io.Writer, numBlocks int) {
t.Helper()
cfg := getConfig(t) cfg := getConfig(t)
app := kvstore.NewPersistentKVStoreApplication(logger, filepath.Join(cfg.DBDir(), "wal_generator")) app := kvstore.NewPersistentKVStoreApplication(logger, filepath.Join(cfg.DBDir(), "wal_generator"))
@ -46,35 +48,37 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
privValidatorStateFile := cfg.PrivValidator.StateFile() privValidatorStateFile := cfg.PrivValidator.StateFile()
privValidator, err := privval.LoadOrGenFilePV(privValidatorKeyFile, privValidatorStateFile) privValidator, err := privval.LoadOrGenFilePV(privValidatorKeyFile, privValidatorStateFile)
if err != nil { if err != nil {
return err
t.Fatal(err)
} }
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile()) genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile())
if err != nil { if err != nil {
return fmt.Errorf("failed to read genesis file: %w", err)
t.Fatal(fmt.Errorf("failed to read genesis file: %w", err))
} }
blockStoreDB := dbm.NewMemDB() blockStoreDB := dbm.NewMemDB()
stateDB := blockStoreDB stateDB := blockStoreDB
stateStore := sm.NewStore(stateDB) stateStore := sm.NewStore(stateDB)
state, err := sm.MakeGenesisState(genDoc) state, err := sm.MakeGenesisState(genDoc)
if err != nil { if err != nil {
return fmt.Errorf("failed to make genesis state: %w", err)
t.Fatal(fmt.Errorf("failed to make genesis state: %w", err))
} }
state.Version.Consensus.App = kvstore.ProtocolVersion state.Version.Consensus.App = kvstore.ProtocolVersion
if err = stateStore.Save(state); err != nil { if err = stateStore.Save(state); err != nil {
t.Error(err)
t.Fatal(err)
} }
blockStore := store.NewBlockStore(blockStoreDB) blockStore := store.NewBlockStore(blockStoreDB)
proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics()) proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics())
if err := proxyApp.Start(ctx); err != nil { if err := proxyApp.Start(ctx); err != nil {
return fmt.Errorf("failed to start proxy app connections: %w", err)
t.Fatal(fmt.Errorf("failed to start proxy app connections: %w", err))
} }
t.Cleanup(proxyApp.Wait)
eventBus := eventbus.NewDefault(logger.With("module", "events")) eventBus := eventbus.NewDefault(logger.With("module", "events"))
if err := eventBus.Start(ctx); err != nil { if err := eventBus.Start(ctx); err != nil {
return fmt.Errorf("failed to start event bus: %w", err)
t.Fatal(fmt.Errorf("failed to start event bus: %w", err))
} }
t.Cleanup(func() { eventBus.Stop(); eventBus.Wait() })
mempool := emptyMempool{} mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{} evpool := sm.EmptyEvidencePool{}
@ -91,22 +95,24 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten) wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
// see wal.go#103 // see wal.go#103
if err := wal.Write(EndHeightMessage{0}); err != nil { if err := wal.Write(EndHeightMessage{0}); err != nil {
t.Error(err)
t.Fatal(err)
} }
consensusState.wal = wal consensusState.wal = wal
if err := consensusState.Start(ctx); err != nil { if err := consensusState.Start(ctx); err != nil {
return fmt.Errorf("failed to start consensus state: %w", err)
t.Fatal(fmt.Errorf("failed to start consensus state: %w", err))
} }
t.Cleanup(consensusState.Wait)
defer consensusState.Stop()
timer := time.NewTimer(time.Minute)
defer timer.Stop()
select { select {
case <-numBlocksWritten: case <-numBlocksWritten:
consensusState.Stop()
return nil
case <-time.After(1 * time.Minute):
consensusState.Stop()
return fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks)
case <-timer.C:
t.Fatal(fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks))
} }
} }
@ -115,9 +121,7 @@ func WALWithNBlocks(ctx context.Context, t *testing.T, logger log.Logger, numBlo
var b bytes.Buffer var b bytes.Buffer
wr := bufio.NewWriter(&b) wr := bufio.NewWriter(&b)
if err := WALGenerateNBlocks(ctx, t, logger, wr, numBlocks); err != nil {
return []byte{}, err
}
WALGenerateNBlocks(ctx, t, logger, wr, numBlocks)
wr.Flush() wr.Flush()
return b.Bytes(), nil return b.Bytes(), nil


+ 9
- 10
internal/consensus/wal_test.go View File

@ -3,6 +3,7 @@ package consensus
import ( import (
"bytes" "bytes"
"context" "context"
"os"
"path/filepath" "path/filepath"
"testing" "testing"
@ -41,13 +42,12 @@ func TestWALTruncate(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
err = wal.Start(ctx) err = wal.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(wal.Wait)
t.Cleanup(func() { wal.Stop(); wal.Group().Stop(); wal.Group().Wait(); wal.Wait() })
// 60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), // 60 block's size nearly 70K, greater than group's headBuf size(4096 * 10),
// when headBuf is full, truncate content will Flush to the file. at this // when headBuf is full, truncate content will Flush to the file. at this
// time, RotateFile is called, truncate content exist in each file. // time, RotateFile is called, truncate content exist in each file.
err = WALGenerateNBlocks(ctx, t, logger, wal.Group(), 60)
require.NoError(t, err)
WALGenerateNBlocks(ctx, t, logger, wal.Group(), 60)
// put the leakcheck here so it runs after other cleanup // put the leakcheck here so it runs after other cleanup
// functions. // functions.
@ -112,7 +112,7 @@ func TestWALWrite(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
err = wal.Start(ctx) err = wal.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(wal.Wait)
t.Cleanup(func() { wal.Stop(); wal.Group().Stop(); wal.Group().Wait(); wal.Wait() })
// 1) Write returns an error if msg is too big // 1) Write returns an error if msg is too big
msg := &BlockPartMessage{ msg := &BlockPartMessage{
@ -151,7 +151,6 @@ func TestWALSearchForEndHeight(t *testing.T) {
wal, err := NewWAL(ctx, logger, walFile) wal, err := NewWAL(ctx, logger, walFile)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { wal.Stop(); wal.Wait() })
h := int64(3) h := int64(3)
gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{}) gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{})
@ -176,24 +175,24 @@ func TestWALPeriodicSync(t *testing.T) {
walDir := t.TempDir() walDir := t.TempDir()
walFile := filepath.Join(walDir, "wal") walFile := filepath.Join(walDir, "wal")
wal, err := NewWAL(ctx, log.TestingLogger(), walFile, autofile.GroupCheckDuration(1*time.Millisecond))
defer os.RemoveAll(walFile)
wal, err := NewWAL(ctx, log.TestingLogger(), walFile, autofile.GroupCheckDuration(250*time.Millisecond))
require.NoError(t, err) require.NoError(t, err)
wal.SetFlushInterval(walTestFlushInterval) wal.SetFlushInterval(walTestFlushInterval)
logger := log.NewNopLogger() logger := log.NewNopLogger()
// Generate some data // Generate some data
err = WALGenerateNBlocks(ctx, t, logger, wal.Group(), 5)
require.NoError(t, err)
WALGenerateNBlocks(ctx, t, logger, wal.Group(), 5)
// We should have data in the buffer now // We should have data in the buffer now
assert.NotZero(t, wal.Group().Buffered()) assert.NotZero(t, wal.Group().Buffered())
require.NoError(t, wal.Start(ctx)) require.NoError(t, wal.Start(ctx))
t.Cleanup(func() { wal.Stop(); wal.Wait() })
t.Cleanup(func() { wal.Stop(); wal.Group().Stop(); wal.Group().Wait(); wal.Wait() })
time.Sleep(walTestFlushInterval + (10 * time.Millisecond))
time.Sleep(walTestFlushInterval + (20 * time.Millisecond))
// The data should have been flushed by the periodic sync // The data should have been flushed by the periodic sync
assert.Zero(t, wal.Group().Buffered()) assert.Zero(t, wal.Group().Buffered())


+ 13
- 0
internal/libs/autofile/group.go View File

@ -274,6 +274,10 @@ func (g *Group) checkTotalSizeLimit(ctx context.Context) {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
if err := ctx.Err(); err != nil {
return
}
if g.totalSizeLimit == 0 { if g.totalSizeLimit == 0 {
return return
} }
@ -290,6 +294,11 @@ func (g *Group) checkTotalSizeLimit(ctx context.Context) {
g.logger.Error("Group's head may grow without bound", "head", g.Head.Path) g.logger.Error("Group's head may grow without bound", "head", g.Head.Path)
return return
} }
if ctx.Err() != nil {
return
}
pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex) pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
fInfo, err := os.Stat(pathToRemove) fInfo, err := os.Stat(pathToRemove)
if err != nil { if err != nil {
@ -314,6 +323,10 @@ func (g *Group) rotateFile(ctx context.Context) {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
if err := ctx.Err(); err != nil {
return
}
headPath := g.Head.Path headPath := g.Head.Path
if err := g.headBuf.Flush(); err != nil { if err := g.headBuf.Flush(); err != nil {


Loading…
Cancel
Save