diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 136e45b61..2146f21d1 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -83,7 +83,7 @@ func startNewConsensusStateAndWaitForBlock(t *testing.T, consensusReplayConfig * require.NoError(t, err) select { case <-newBlockCh: - case <-time.After(60 * time.Second): + case <-time.After(120 * time.Second): t.Fatalf("Timed out waiting for new block (see trace above)") } } @@ -128,8 +128,8 @@ func TestWALCrash(t *testing.T) { func crashWALandCheckLiveness(t *testing.T, consensusReplayConfig *cfg.Config, initFn func(dbm.DB, *ConsensusState, context.Context), heightToStop int64) { - walPaniced := make(chan error) - crashingWal := &crashingWAL{panicCh: walPaniced, heightToStop: heightToStop} + walPanicked := make(chan error) + crashingWal := &crashingWAL{panicCh: walPanicked, heightToStop: heightToStop} i := 1 LOOP: @@ -168,8 +168,8 @@ LOOP: i++ select { - case err := <-walPaniced: - t.Logf("WAL paniced: %v", err) + case err := <-walPanicked: + t.Logf("WAL panicked: %v", err) // make sure we can make blocks after a crash startNewConsensusStateAndWaitForBlock(t, consensusReplayConfig, cs.Height, blockDB, stateDB) @@ -190,14 +190,14 @@ LOOP: // crashingWAL is a WAL which crashes or rather simulates a crash during Save // (before and after). It remembers a message for which we last panicked -// (lastPanicedForMsgIndex), so we don't panic for it in subsequent iterations. +// (lastPanickedForMsgIndex), so we don't panic for it in subsequent iterations. type crashingWAL struct { next WAL panicCh chan error heightToStop int64 - msgIndex int // current message index - lastPanicedForMsgIndex int // last message for which we panicked + msgIndex int // current message index + lastPanickedForMsgIndex int // last message for which we panicked } // WALWriteError indicates a WAL crash. @@ -232,8 +232,8 @@ func (w *crashingWAL) Write(m WALMessage) { return } - if w.msgIndex > w.lastPanicedForMsgIndex { - w.lastPanicedForMsgIndex = w.msgIndex + if w.msgIndex > w.lastPanickedForMsgIndex { + w.lastPanickedForMsgIndex = w.msgIndex _, file, line, _ := runtime.Caller(1) w.panicCh <- WALWriteError{fmt.Sprintf("failed to write %T to WAL (fileline: %s:%d)", m, file, line)} runtime.Goexit() @@ -255,6 +255,7 @@ func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions func (w *crashingWAL) Start() error { return w.next.Start() } func (w *crashingWAL) Stop() error { return w.next.Stop() } func (w *crashingWAL) Wait() { w.next.Wait() } +func (w *crashingWAL) Flush() error { return w.Group().Flush() } //------------------------------------------------------------------------------------------ // Handshake Tests diff --git a/consensus/state.go b/consensus/state.go index 08e1da267..8ae17ea6c 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -909,6 +909,9 @@ func (cs *ConsensusState) defaultDecideProposal(height int64, round int) { } } + // Flush the WAL. Otherwise, we may not recompute the same proposal to sign, and the privValidator will refuse to sign anything. + cs.wal.Flush() + // Make proposal propBlockId := types.BlockID{Hash: block.Hash(), PartsHeader: blockParts.Header()} proposal := types.NewProposal(height, round, cs.ValidRound, propBlockId) @@ -1674,6 +1677,9 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool, } func (cs *ConsensusState) signVote(type_ types.SignedMsgType, hash []byte, header types.PartSetHeader) (*types.Vote, error) { + // Flush the WAL. Otherwise, we may not recompute the same vote to sign, and the privValidator will refuse to sign anything. + cs.wal.Flush() + addr := cs.privValidator.GetPubKey().Address() valIndex, _ := cs.Validators.GetByAddress(addr) diff --git a/consensus/wal.go b/consensus/wal.go index f8988691c..b69a99411 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -21,6 +21,9 @@ import ( const ( // must be greater than types.BlockPartSizeBytes + a few bytes maxMsgSizeBytes = 1024 * 1024 // 1MB + + // how often the WAL should be sync'd during period sync'ing + walDefaultFlushInterval = 2 * time.Second ) //-------------------------------------------------------- @@ -56,6 +59,7 @@ type WAL interface { WriteSync(WALMessage) Group() *auto.Group SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) + Flush() error Start() error Stop() error @@ -72,8 +76,14 @@ type baseWAL struct { group *auto.Group enc *WALEncoder + + flushTicker *time.Ticker + flushInterval time.Duration } +// NewWAL attempts to create a new write-ahead logger based on `baseWAL`, which +// implements all of the required WAL functionality. This base WAL also flushes +// data to disk every 2s. func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*baseWAL, error) { err := cmn.EnsureDir(filepath.Dir(walFile), 0700) if err != nil { @@ -85,13 +95,19 @@ func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*baseWAL, error) return nil, err } wal := &baseWAL{ - group: group, - enc: NewWALEncoder(group), + group: group, + enc: NewWALEncoder(group), + flushInterval: walDefaultFlushInterval, } wal.BaseService = *cmn.NewBaseService(nil, "baseWAL", wal) return wal, nil } +// SetFlushInterval allows us to override the periodic flush interval for the WAL. +func (wal *baseWAL) SetFlushInterval(i time.Duration) { + wal.flushInterval = i +} + func (wal *baseWAL) Group() *auto.Group { return wal.group } @@ -109,14 +125,37 @@ func (wal *baseWAL) OnStart() error { wal.WriteSync(EndHeightMessage{0}) } err = wal.group.Start() + wal.flushTicker = time.NewTicker(wal.flushInterval) + go wal.processFlushTicks() return err } +// processFlushTicks allows us to periodically attempt to sync the WAL to disk. +func (wal *baseWAL) processFlushTicks() { + for { + select { + case <-wal.flushTicker.C: + err := wal.Flush() + if err != nil { + wal.Logger.Error("Periodic WAL flush failed", "err", err) + } + case <-wal.Quit(): + return + } + } +} + +// Flush will attempt to flush the underlying group's data to disk. +func (wal *baseWAL) Flush() error { + return wal.group.Flush() +} + // Stop the underlying autofile group. // Use Wait() to ensure it's finished shutting down // before cleaning up files. func (wal *baseWAL) OnStop() { - wal.group.Flush() + wal.flushTicker.Stop() + wal.Flush() wal.group.Stop() wal.group.Close() } @@ -150,7 +189,7 @@ func (wal *baseWAL) WriteSync(msg WALMessage) { } wal.Write(msg) - if err := wal.group.Flush(); err != nil { + if err := wal.Flush(); err != nil { panic(fmt.Sprintf("Error flushing consensus wal buf to file. Error: %v \n", err)) } } @@ -343,3 +382,4 @@ func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *a func (nilWAL) Start() error { return nil } func (nilWAL) Stop() error { return nil } func (nilWAL) Wait() {} +func (nilWAL) Flush() error { return nil } diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index 8ed041c6b..9375d5dcc 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -5,7 +5,6 @@ import ( "bytes" "fmt" "io" - "os" "path/filepath" "testing" "time" @@ -24,13 +23,12 @@ import ( "github.com/tendermint/tendermint/types" ) -// WALGenerateNBlocks generates a consensus WAL. It does this by spining up a +// WALGenerateNBlocks generates a consensus WAL. It does this by spinning up a // stripped down version of node (proxy app, event bus, consensus state) with a // persistent kvstore application and special consensus wal instance // (byteBufferWAL) and waits until numBlocks are created. If the node fails to produce given numBlocks, it returns an error. func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { config := getConfig(t) - defer os.RemoveAll(config.RootDir) app := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "wal_generator")) @@ -204,3 +202,4 @@ func (w *byteBufferWAL) SearchForEndHeight(height int64, options *WALSearchOptio func (w *byteBufferWAL) Start() error { return nil } func (w *byteBufferWAL) Stop() error { return nil } func (w *byteBufferWAL) Wait() {} +func (w *byteBufferWAL) Flush() error { return nil } diff --git a/consensus/wal_test.go b/consensus/wal_test.go index e1fb9ece0..a2c80be3e 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -3,7 +3,6 @@ package consensus import ( "bytes" "crypto/rand" - "fmt" "io/ioutil" "os" "path/filepath" @@ -22,6 +21,10 @@ import ( "github.com/stretchr/testify/require" ) +const ( + walTestFlushInterval = time.Duration(100) * time.Millisecond +) + func TestWALTruncate(t *testing.T) { walDir, err := ioutil.TempDir("", "wal") require.NoError(t, err) @@ -57,9 +60,9 @@ func TestWALTruncate(t *testing.T) { h := int64(50) gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{}) - assert.NoError(t, err, fmt.Sprintf("expected not to err on height %d", h)) - assert.True(t, found, fmt.Sprintf("expected to find end height for %d", h)) - assert.NotNil(t, gr, "expected group not to be nil") + assert.NoError(t, err, "expected not to err on height %d", h) + assert.True(t, found, "expected to find end height for %d", h) + assert.NotNil(t, gr) defer gr.Close() dec := NewWALDecoder(gr) @@ -67,7 +70,7 @@ func TestWALTruncate(t *testing.T) { assert.NoError(t, err, "expected to decode a message") rs, ok := msg.Msg.(tmtypes.EventDataRoundState) assert.True(t, ok, "expected message of type EventDataRoundState") - assert.Equal(t, rs.Height, h+1, fmt.Sprintf("wrong height")) + assert.Equal(t, rs.Height, h+1, "wrong height") } func TestWALEncoderDecoder(t *testing.T) { @@ -128,9 +131,9 @@ func TestWALSearchForEndHeight(t *testing.T) { h := int64(3) gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{}) - assert.NoError(t, err, fmt.Sprintf("expected not to err on height %d", h)) - assert.True(t, found, fmt.Sprintf("expected to find end height for %d", h)) - assert.NotNil(t, gr, "expected group not to be nil") + assert.NoError(t, err, "expected not to err on height %d", h) + assert.True(t, found, "expected to find end height for %d", h) + assert.NotNil(t, gr) defer gr.Close() dec := NewWALDecoder(gr) @@ -138,7 +141,46 @@ func TestWALSearchForEndHeight(t *testing.T) { assert.NoError(t, err, "expected to decode a message") rs, ok := msg.Msg.(tmtypes.EventDataRoundState) assert.True(t, ok, "expected message of type EventDataRoundState") - assert.Equal(t, rs.Height, h+1, fmt.Sprintf("wrong height")) + assert.Equal(t, rs.Height, h+1, "wrong height") +} + +func TestWALPeriodicSync(t *testing.T) { + walDir, err := ioutil.TempDir("", "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + walFile := filepath.Join(walDir, "wal") + wal, err := NewWAL(walFile, autofile.GroupCheckDuration(1*time.Millisecond)) + require.NoError(t, err) + + wal.SetFlushInterval(walTestFlushInterval) + wal.SetLogger(log.TestingLogger()) + + require.NoError(t, wal.Start()) + defer func() { + wal.Stop() + wal.Wait() + }() + + err = WALGenerateNBlocks(t, wal.Group(), 5) + require.NoError(t, err) + + // We should have data in the buffer now + assert.NotZero(t, wal.Group().Buffered()) + + time.Sleep(walTestFlushInterval + (10 * time.Millisecond)) + + // The data should have been flushed by the periodic sync + assert.Zero(t, wal.Group().Buffered()) + + h := int64(4) + gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{}) + assert.NoError(t, err, "expected not to err on height %d", h) + assert.True(t, found, "expected to find end height for %d", h) + assert.NotNil(t, gr) + if gr != nil { + gr.Close() + } } /* diff --git a/libs/autofile/group.go b/libs/autofile/group.go index 7e9269461..cafbb6d7e 100644 --- a/libs/autofile/group.go +++ b/libs/autofile/group.go @@ -209,6 +209,13 @@ func (g *Group) WriteLine(line string) error { return err } +// Buffered returns the size of the currently buffered data. +func (g *Group) Buffered() int { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.headBuf.Buffered() +} + // Flush writes any buffered data to the underlying file and commits the // current content of the file to stable storage. func (g *Group) Flush() error {