From ec9bff52346d9629f690147674326a41fcefd20f Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 25 Feb 2019 09:11:07 +0400 Subject: [PATCH] rename WAL#Flush to WAL#FlushAndSync (#3345) * rename WAL#Flush to WAL#FlushAndSync - rename auto#Flush to auto#FlushAndSync - cleanup WAL interface to not leak implementation details! * remove Group() * add WALReader interface and return it in SearchForEndHeight() - add interface assertions Refs #3337 * replace WALReader with io.ReadCloser --- consensus/replay_test.go | 9 ++++---- consensus/state.go | 4 ++-- consensus/wal.go | 41 ++++++++++++++++++++++------------- consensus/wal_generator.go | 9 +++----- consensus/wal_test.go | 13 ++++++----- libs/autofile/cmd/logjack.go | 2 +- libs/autofile/group.go | 18 +++++++++------- libs/autofile/group_test.go | 42 ++++++++++++++++++------------------ 8 files changed, 76 insertions(+), 62 deletions(-) diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 74a6e0fbe..86dca7657 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -19,7 +19,6 @@ import ( abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" - auto "github.com/tendermint/tendermint/libs/autofile" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/privval" @@ -201,6 +200,8 @@ type crashingWAL struct { lastPanickedForMsgIndex int // last message for which we panicked } +var _ WAL = &crashingWAL{} + // WALWriteError indicates a WAL crash. type WALWriteError struct { msg string @@ -248,15 +249,15 @@ func (w *crashingWAL) WriteSync(m WALMessage) { w.Write(m) } -func (w *crashingWAL) Group() *auto.Group { return w.next.Group() } -func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) { +func (w *crashingWAL) FlushAndSync() error { return w.next.FlushAndSync() } + +func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) { return w.next.SearchForEndHeight(height, options) } func (w *crashingWAL) Start() error { return w.next.Start() } func (w *crashingWAL) Stop() error { return w.next.Stop() } func (w *crashingWAL) Wait() { w.next.Wait() } -func (w *crashingWAL) Flush() error { return w.Group().Flush() } //------------------------------------------------------------------------------------------ // Handshake Tests diff --git a/consensus/state.go b/consensus/state.go index 8ae17ea6c..865cd553f 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -910,7 +910,7 @@ func (cs *ConsensusState) defaultDecideProposal(height int64, round int) { } // Flush the WAL. Otherwise, we may not recompute the same proposal to sign, and the privValidator will refuse to sign anything. - cs.wal.Flush() + cs.wal.FlushAndSync() // Make proposal propBlockId := types.BlockID{Hash: block.Hash(), PartsHeader: blockParts.Header()} @@ -1678,7 +1678,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool, func (cs *ConsensusState) signVote(type_ types.SignedMsgType, hash []byte, header types.PartSetHeader) (*types.Vote, error) { // Flush the WAL. Otherwise, we may not recompute the same vote to sign, and the privValidator will refuse to sign anything. - cs.wal.Flush() + cs.wal.FlushAndSync() addr := cs.privValidator.GetPubKey().Address() valIndex, _ := cs.Validators.GetByAddress(addr) diff --git a/consensus/wal.go b/consensus/wal.go index 2d544857d..c63c6b940 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -57,10 +57,11 @@ func RegisterWALMessages(cdc *amino.Codec) { type WAL interface { Write(WALMessage) WriteSync(WALMessage) - Group() *auto.Group - SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) - Flush() error + FlushAndSync() error + SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) + + // service methods Start() error Stop() error Wait() @@ -82,6 +83,8 @@ type baseWAL struct { flushInterval time.Duration } +var _ WAL = &baseWAL{} + // NewWAL returns a new write-ahead logger based on `baseWAL`, which implements // WAL. It's flushed and synced to disk every 2s and once when stopped. func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*baseWAL, error) { @@ -125,16 +128,19 @@ func (wal *baseWAL) OnStart() error { wal.WriteSync(EndHeightMessage{0}) } err = wal.group.Start() + if err != nil { + return err + } wal.flushTicker = time.NewTicker(wal.flushInterval) go wal.processFlushTicks() - return err + return nil } func (wal *baseWAL) processFlushTicks() { for { select { case <-wal.flushTicker.C: - if err := wal.Flush(); err != nil { + if err := wal.FlushAndSync(); err != nil { wal.Logger.Error("Periodic WAL flush failed", "err", err) } case <-wal.Quit(): @@ -143,9 +149,10 @@ func (wal *baseWAL) processFlushTicks() { } } -// Flush will attempt to flush and fsync the underlying group's data to disk. -func (wal *baseWAL) Flush() error { - return wal.group.Flush() +// FlushAndSync flushes and fsync's the underlying group's data to disk. +// See auto#FlushAndSync +func (wal *baseWAL) FlushAndSync() error { + return wal.group.FlushAndSync() } // Stop the underlying autofile group. @@ -153,7 +160,7 @@ func (wal *baseWAL) Flush() error { // before cleaning up files. func (wal *baseWAL) OnStop() { wal.flushTicker.Stop() - wal.Flush() + wal.FlushAndSync() wal.group.Stop() wal.group.Close() } @@ -187,7 +194,7 @@ func (wal *baseWAL) WriteSync(msg WALMessage) { } wal.Write(msg) - if err := wal.Flush(); err != nil { + if err := wal.FlushAndSync(); err != nil { panic(fmt.Sprintf("Error flushing consensus wal buf to file. Error: %v \n", err)) } } @@ -203,8 +210,11 @@ type WALSearchOptions struct { // Group reader will be nil if found equals false. // // CONTRACT: caller must close group reader. -func (wal *baseWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) { - var msg *TimedWALMessage +func (wal *baseWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) { + var ( + msg *TimedWALMessage + gr *auto.GroupReader + ) lastHeightFound := int64(-1) // NOTE: starting from the last file in the group because we're usually @@ -371,13 +381,14 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) { type nilWAL struct{} +var _ WAL = nilWAL{} + func (nilWAL) Write(m WALMessage) {} func (nilWAL) WriteSync(m WALMessage) {} -func (nilWAL) Group() *auto.Group { return nil } -func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) { +func (nilWAL) FlushAndSync() error { return nil } +func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) { return nil, false, nil } func (nilWAL) Start() error { return nil } func (nilWAL) Stop() error { return nil } func (nilWAL) Wait() {} -func (nilWAL) Flush() error { return nil } diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index 9375d5dcc..1a4cfb9ff 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -13,7 +13,6 @@ import ( "github.com/tendermint/tendermint/abci/example/kvstore" bc "github.com/tendermint/tendermint/blockchain" cfg "github.com/tendermint/tendermint/config" - auto "github.com/tendermint/tendermint/libs/autofile" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" @@ -192,14 +191,12 @@ func (w *byteBufferWAL) WriteSync(m WALMessage) { w.Write(m) } -func (w *byteBufferWAL) Group() *auto.Group { - panic("not implemented") -} -func (w *byteBufferWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) { +func (w *byteBufferWAL) FlushAndSync() error { return nil } + +func (w *byteBufferWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) { return nil, false, nil } func (w *byteBufferWAL) Start() error { return nil } func (w *byteBufferWAL) Stop() error { return nil } func (w *byteBufferWAL) Wait() {} -func (w *byteBufferWAL) Flush() error { return nil } diff --git a/consensus/wal_test.go b/consensus/wal_test.go index 0bb750567..5cb73fb7f 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -32,8 +32,10 @@ func TestWALTruncate(t *testing.T) { walFile := filepath.Join(walDir, "wal") - //this magic number 4K can truncate the content when RotateFile. defaultHeadSizeLimit(10M) is hard to simulate. - //this magic number 1 * time.Millisecond make RotateFile check frequently. defaultGroupCheckDuration(5s) is hard to simulate. + // this magic number 4K can truncate the content when RotateFile. + // defaultHeadSizeLimit(10M) is hard to simulate. + // this magic number 1 * time.Millisecond make RotateFile check frequently. + // defaultGroupCheckDuration(5s) is hard to simulate. wal, err := NewWAL(walFile, autofile.GroupHeadSizeLimit(4096), autofile.GroupCheckDuration(1*time.Millisecond), @@ -49,14 +51,15 @@ func TestWALTruncate(t *testing.T) { wal.Wait() }() - //60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), when headBuf is full, truncate content will Flush to the file. - //at this time, RotateFile is called, truncate content exist in each file. + // 60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), + // when headBuf is full, truncate content will Flush to the file. at this + // time, RotateFile is called, truncate content exist in each file. err = WALGenerateNBlocks(t, wal.Group(), 60) require.NoError(t, err) time.Sleep(1 * time.Millisecond) //wait groupCheckDuration, make sure RotateFile run - wal.Group().Flush() + wal.FlushAndSync() h := int64(50) gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{}) diff --git a/libs/autofile/cmd/logjack.go b/libs/autofile/cmd/logjack.go index e1bd5167d..20002eff3 100644 --- a/libs/autofile/cmd/logjack.go +++ b/libs/autofile/cmd/logjack.go @@ -70,7 +70,7 @@ func main() { for { n, err := os.Stdin.Read(buf) group.Write(buf[:n]) - group.Flush() + group.FlushAndSync() if err != nil { group.Stop() if err == io.EOF { diff --git a/libs/autofile/group.go b/libs/autofile/group.go index cafbb6d7e..d1ea0de75 100644 --- a/libs/autofile/group.go +++ b/libs/autofile/group.go @@ -131,21 +131,23 @@ func GroupTotalSizeLimit(limit int64) func(*Group) { } } -// OnStart implements Service by starting the goroutine that checks file and -// group limits. +// OnStart implements cmn.Service by starting the goroutine that checks file +// and group limits. func (g *Group) OnStart() error { g.ticker = time.NewTicker(g.groupCheckDuration) go g.processTicks() return nil } -// OnStop implements Service by stopping the goroutine described above. +// OnStop implements cmn.Service by stopping the goroutine described above. // NOTE: g.Head must be closed separately using Close. func (g *Group) OnStop() { g.ticker.Stop() - g.Flush() // flush any uncommitted data + g.FlushAndSync() } +// Wait blocks until all internal goroutines are finished. Supposed to be +// called after Stop. func (g *Group) Wait() { // wait for processTicks routine to finish <-g.doneProcessTicks @@ -153,7 +155,7 @@ func (g *Group) Wait() { // Close closes the head file. The group must be stopped by this moment. func (g *Group) Close() { - g.Flush() // flush any uncommitted data + g.FlushAndSync() g.mtx.Lock() _ = g.Head.closeFile() @@ -216,9 +218,9 @@ func (g *Group) Buffered() int { return g.headBuf.Buffered() } -// Flush writes any buffered data to the underlying file and commits the -// current content of the file to stable storage. -func (g *Group) Flush() error { +// FlushAndSync writes any buffered data to the underlying file and commits the +// current content of the file to stable storage (fsync). +func (g *Group) FlushAndSync() error { g.mtx.Lock() defer g.mtx.Unlock() err := g.headBuf.Flush() diff --git a/libs/autofile/group_test.go b/libs/autofile/group_test.go index e173e4996..68870df87 100644 --- a/libs/autofile/group_test.go +++ b/libs/autofile/group_test.go @@ -55,7 +55,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { err := g.WriteLine(cmn.RandStr(999)) require.NoError(t, err, "Error appending to head") } - g.Flush() + g.FlushAndSync() assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000) // Even calling checkHeadSizeLimit manually won't rotate it. @@ -65,7 +65,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { // Write 1000 more bytes. err := g.WriteLine(cmn.RandStr(999)) require.NoError(t, err, "Error appending to head") - g.Flush() + g.FlushAndSync() // Calling checkHeadSizeLimit this time rolls it. g.checkHeadSizeLimit() @@ -74,7 +74,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { // Write 1000 more bytes. err = g.WriteLine(cmn.RandStr(999)) require.NoError(t, err, "Error appending to head") - g.Flush() + g.FlushAndSync() // Calling checkHeadSizeLimit does nothing. g.checkHeadSizeLimit() @@ -85,7 +85,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { err = g.WriteLine(cmn.RandStr(999)) require.NoError(t, err, "Error appending to head") } - g.Flush() + g.FlushAndSync() assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2000000, 1000000) // Calling checkHeadSizeLimit rolls it again. @@ -95,7 +95,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { // Write 1000 more bytes. _, err = g.Head.Write([]byte(cmn.RandStr(999) + "\n")) require.NoError(t, err, "Error appending to head") - g.Flush() + g.FlushAndSync() assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2001000, 1000) // Calling checkHeadSizeLimit does nothing. @@ -212,12 +212,12 @@ func TestRotateFile(t *testing.T) { g.WriteLine("Line 1") g.WriteLine("Line 2") g.WriteLine("Line 3") - g.Flush() + g.FlushAndSync() g.RotateFile() g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("Line 6") - g.Flush() + g.FlushAndSync() // Read g.Head.Path+"000" body1, err := ioutil.ReadFile(g.Head.Path + ".000") @@ -244,13 +244,13 @@ func TestFindLast1(t *testing.T) { g.WriteLine("Line 2") g.WriteLine("# a") g.WriteLine("Line 3") - g.Flush() + g.FlushAndSync() g.RotateFile() g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("Line 6") g.WriteLine("# b") - g.Flush() + g.FlushAndSync() match, found, err := g.FindLast("#") assert.NoError(t, err) @@ -267,14 +267,14 @@ func TestFindLast2(t *testing.T) { g.WriteLine("Line 1") g.WriteLine("Line 2") g.WriteLine("Line 3") - g.Flush() + g.FlushAndSync() g.RotateFile() g.WriteLine("# a") g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("# b") g.WriteLine("Line 6") - g.Flush() + g.FlushAndSync() match, found, err := g.FindLast("#") assert.NoError(t, err) @@ -293,12 +293,12 @@ func TestFindLast3(t *testing.T) { g.WriteLine("Line 2") g.WriteLine("# b") g.WriteLine("Line 3") - g.Flush() + g.FlushAndSync() g.RotateFile() g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("Line 6") - g.Flush() + g.FlushAndSync() match, found, err := g.FindLast("#") assert.NoError(t, err) @@ -315,12 +315,12 @@ func TestFindLast4(t *testing.T) { g.WriteLine("Line 1") g.WriteLine("Line 2") g.WriteLine("Line 3") - g.Flush() + g.FlushAndSync() g.RotateFile() g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("Line 6") - g.Flush() + g.FlushAndSync() match, found, err := g.FindLast("#") assert.NoError(t, err) @@ -336,7 +336,7 @@ func TestWrite(t *testing.T) { written := []byte("Medusa") g.Write(written) - g.Flush() + g.FlushAndSync() read := make([]byte, len(written)) gr, err := g.NewReader(0) @@ -357,11 +357,11 @@ func TestGroupReaderRead(t *testing.T) { professor := []byte("Professor Monster") g.Write(professor) - g.Flush() + g.FlushAndSync() g.RotateFile() frankenstein := []byte("Frankenstein's Monster") g.Write(frankenstein) - g.Flush() + g.FlushAndSync() totalWrittenLength := len(professor) + len(frankenstein) read := make([]byte, totalWrittenLength) @@ -386,12 +386,12 @@ func TestGroupReaderRead2(t *testing.T) { professor := []byte("Professor Monster") g.Write(professor) - g.Flush() + g.FlushAndSync() g.RotateFile() frankenstein := []byte("Frankenstein's Monster") frankensteinPart := []byte("Frankenstein") g.Write(frankensteinPart) // note writing only a part - g.Flush() + g.FlushAndSync() totalLength := len(professor) + len(frankenstein) read := make([]byte, totalLength) @@ -427,7 +427,7 @@ func TestMaxIndex(t *testing.T) { assert.Zero(t, g.MaxIndex(), "MaxIndex should be zero at the beginning") g.WriteLine("Line 1") - g.Flush() + g.FlushAndSync() g.RotateFile() assert.Equal(t, 1, g.MaxIndex(), "MaxIndex should point to the last file")