From 110b07fb3fe7d9930f066ce2b46335f53673c11e Mon Sep 17 00:00:00 2001 From: goolAdapter <267310165@qq.com> Date: Tue, 25 Sep 2018 19:22:45 +0800 Subject: [PATCH] libs: Call Flush() before rename #2428 (#2439) * fix Group.RotateFile need call Flush() before rename. #2428 * fix some review issue. #2428 refactor Group's config: replace setting member with initial option * fix a handwriting mistake * fix a time window error between rename and write. * fix a syntax mistake. * change option name Get_ to With_ * fix review issue * fix review issue --- CHANGELOG_PENDING.md | 1 + consensus/wal.go | 4 +- consensus/wal_generator.go | 38 ++++++++----- consensus/wal_test.go | 50 +++++++++++++++++ libs/autofile/cmd/logjack.go | 5 +- libs/autofile/group.go | 105 ++++++++++++++++++++++------------- libs/autofile/group_test.go | 4 +- 7 files changed, 146 insertions(+), 61 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 3893cc4cf..26a31461f 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -18,4 +18,5 @@ IMPROVEMENTS: - [p2p] [\#2169](https://github.com/cosmos/cosmos-sdk/issues/2169) add additional metrics BUG FIXES: +- [autofile] \#2428 Group.RotateFile need call Flush() before rename (@goolAdapter) - [node] \#2434 Make node respond to signal interrupts while sleeping for genesis time diff --git a/consensus/wal.go b/consensus/wal.go index 10bef542b..6472c2573 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -73,13 +73,13 @@ type baseWAL struct { enc *WALEncoder } -func NewWAL(walFile string) (*baseWAL, error) { +func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*baseWAL, error) { err := cmn.EnsureDir(filepath.Dir(walFile), 0700) if err != nil { return nil, errors.Wrap(err, "failed to ensure WAL directory is in place") } - group, err := auto.OpenGroup(walFile) + group, err := auto.OpenGroup(walFile, groupOptions...) if err != nil { return nil, err } diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index cdb667edf..980a44892 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "fmt" + "io" "os" "path/filepath" "strings" @@ -23,12 +24,11 @@ import ( "github.com/tendermint/tendermint/types" ) -// WALWithNBlocks generates a consensus WAL. It does this by spining up a +// WALGenerateNBlocks generates a consensus WAL. It does this by spining up a // stripped down version of node (proxy app, event bus, consensus state) with a // persistent kvstore application and special consensus wal instance -// (byteBufferWAL) and waits until numBlocks are created. Then it returns a WAL -// content. If the node fails to produce given numBlocks, it returns an error. -func WALWithNBlocks(numBlocks int) (data []byte, err error) { +// (byteBufferWAL) and waits until numBlocks are created. If the node fails to produce given numBlocks, it returns an error. +func WALGenerateNBlocks(wr io.Writer, numBlocks int) (err error) { config := getConfig() app := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "wal_generator")) @@ -43,26 +43,26 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { privValidator := privval.LoadOrGenFilePV(privValidatorFile) genDoc, err := types.GenesisDocFromFile(config.GenesisFile()) if err != nil { - return nil, errors.Wrap(err, "failed to read genesis file") + return errors.Wrap(err, "failed to read genesis file") } stateDB := db.NewMemDB() blockStoreDB := db.NewMemDB() state, err := sm.MakeGenesisState(genDoc) if err != nil { - return nil, errors.Wrap(err, "failed to make genesis state") + return errors.Wrap(err, "failed to make genesis state") } blockStore := bc.NewBlockStore(blockStoreDB) proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app)) proxyApp.SetLogger(logger.With("module", "proxy")) if err := proxyApp.Start(); err != nil { - return nil, errors.Wrap(err, "failed to start proxy app connections") + return errors.Wrap(err, "failed to start proxy app connections") } defer proxyApp.Stop() eventBus := types.NewEventBus() eventBus.SetLogger(logger.With("module", "events")) if err := eventBus.Start(); err != nil { - return nil, errors.Wrap(err, "failed to start event bus") + return errors.Wrap(err, "failed to start event bus") } defer eventBus.Stop() mempool := sm.MockMempool{} @@ -78,8 +78,6 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { ///////////////////////////////////////////////////////////////////////////// // set consensus wal to buffered WAL, which will write all incoming msgs to buffer - var b bytes.Buffer - wr := bufio.NewWriter(&b) numBlocksWritten := make(chan struct{}) wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten) // see wal.go#103 @@ -87,20 +85,32 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { consensusState.wal = wal if err := consensusState.Start(); err != nil { - return nil, errors.Wrap(err, "failed to start consensus state") + return errors.Wrap(err, "failed to start consensus state") } select { case <-numBlocksWritten: consensusState.Stop() - wr.Flush() - return b.Bytes(), nil + return nil case <-time.After(1 * time.Minute): consensusState.Stop() - return []byte{}, fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks) + return fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks) } } +//WALWithNBlocks returns a WAL content with numBlocks. +func WALWithNBlocks(numBlocks int) (data []byte, err error) { + var b bytes.Buffer + wr := bufio.NewWriter(&b) + + if err := WALGenerateNBlocks(wr, numBlocks); err != nil { + return []byte{}, err + } + + wr.Flush() + return b.Bytes(), nil +} + // f**ing long, but unique for each test func makePathname() string { // get path diff --git a/consensus/wal_test.go b/consensus/wal_test.go index e5744c0a1..c45f6acee 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -4,11 +4,16 @@ import ( "bytes" "crypto/rand" "fmt" + "io/ioutil" + "os" + "path/filepath" + // "sync" "testing" "time" "github.com/tendermint/tendermint/consensus/types" + "github.com/tendermint/tendermint/libs/autofile" tmtypes "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" @@ -16,6 +21,51 @@ import ( "github.com/stretchr/testify/require" ) +func TestWALTruncate(t *testing.T) { + walDir, err := ioutil.TempDir("", "wal") + if err != nil { + panic(fmt.Errorf("failed to create temp WAL file: %v", err)) + } + defer os.RemoveAll(walDir) + + walFile := filepath.Join(walDir, "wal") + + //this magic number 4K can truncate the content when RotateFile. defaultHeadSizeLimit(10M) is hard to simulate. + //this magic number 1 * time.Millisecond make RotateFile check frequently. defaultGroupCheckDuration(5s) is hard to simulate. + wal, err := NewWAL(walFile, autofile.GroupHeadSizeLimit(4096), autofile.GroupCheckDuration(1*time.Millisecond)) + if err != nil { + t.Fatal(err) + } + + wal.Start() + defer wal.Stop() + + //60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), when headBuf is full, truncate content will Flush to the file. + //at this time, RotateFile is called, truncate content exist in each file. + err = WALGenerateNBlocks(wal.Group(), 60) + if err != nil { + t.Fatal(err) + } + + time.Sleep(1 * time.Millisecond) //wait groupCheckDuration, make sure RotateFile run + + wal.Group().Flush() + + h := int64(50) + gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{}) + assert.NoError(t, err, fmt.Sprintf("expected not to err on height %d", h)) + assert.True(t, found, fmt.Sprintf("expected to find end height for %d", h)) + assert.NotNil(t, gr, "expected group not to be nil") + defer gr.Close() + + dec := NewWALDecoder(gr) + msg, err := dec.Decode() + assert.NoError(t, err, "expected to decode a message") + rs, ok := msg.Msg.(tmtypes.EventDataRoundState) + assert.True(t, ok, "expected message of type EventDataRoundState") + assert.Equal(t, rs.Height, h+1, fmt.Sprintf("wrong height")) +} + func TestWALEncoderDecoder(t *testing.T) { now := tmtime.Now() msgs := []TimedWALMessage{ diff --git a/libs/autofile/cmd/logjack.go b/libs/autofile/cmd/logjack.go index 17b482bed..ead3f8305 100644 --- a/libs/autofile/cmd/logjack.go +++ b/libs/autofile/cmd/logjack.go @@ -39,13 +39,12 @@ func main() { } // Open Group - group, err := auto.OpenGroup(headPath) + group, err := auto.OpenGroup(headPath, auto.GroupHeadSizeLimit(chopSize), auto.GroupTotalSizeLimit(limitSize)) if err != nil { fmt.Printf("logjack couldn't create output file %v\n", headPath) os.Exit(1) } - group.SetHeadSizeLimit(chopSize) - group.SetTotalSizeLimit(limitSize) + err = group.Start() if err != nil { fmt.Printf("logjack couldn't start with file %v\n", headPath) diff --git a/libs/autofile/group.go b/libs/autofile/group.go index 286447cda..807f7e1ed 100644 --- a/libs/autofile/group.go +++ b/libs/autofile/group.go @@ -19,10 +19,10 @@ import ( ) const ( - groupCheckDuration = 5000 * time.Millisecond - defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB - defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB - maxFilesToRemove = 4 // needs to be greater than 1 + defaultGroupCheckDuration = 5000 * time.Millisecond + defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB + defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB + maxFilesToRemove = 4 // needs to be greater than 1 ) /* @@ -56,16 +56,17 @@ assuming that marker lines are written occasionally. type Group struct { cmn.BaseService - ID string - Head *AutoFile // The head AutoFile to write to - headBuf *bufio.Writer - Dir string // Directory that contains .Head - ticker *time.Ticker - mtx sync.Mutex - headSizeLimit int64 - totalSizeLimit int64 - minIndex int // Includes head - maxIndex int // Includes head, where Head will move to + ID string + Head *AutoFile // The head AutoFile to write to + headBuf *bufio.Writer + Dir string // Directory that contains .Head + ticker *time.Ticker + mtx sync.Mutex + headSizeLimit int64 + totalSizeLimit int64 + groupCheckDuration time.Duration + minIndex int // Includes head + maxIndex int // Includes head, where Head will move to // TODO: When we start deleting files, we need to start tracking GroupReaders // and their dependencies. @@ -73,7 +74,7 @@ type Group struct { // OpenGroup creates a new Group with head at headPath. It returns an error if // it fails to open head file. -func OpenGroup(headPath string) (g *Group, err error) { +func OpenGroup(headPath string, groupOptions ...func(*Group)) (g *Group, err error) { dir := path.Dir(headPath) head, err := OpenAutoFile(headPath) if err != nil { @@ -81,15 +82,21 @@ func OpenGroup(headPath string) (g *Group, err error) { } g = &Group{ - ID: "group:" + head.ID, - Head: head, - headBuf: bufio.NewWriterSize(head, 4096*10), - Dir: dir, - headSizeLimit: defaultHeadSizeLimit, - totalSizeLimit: defaultTotalSizeLimit, - minIndex: 0, - maxIndex: 0, + ID: "group:" + head.ID, + Head: head, + headBuf: bufio.NewWriterSize(head, 4096*10), + Dir: dir, + headSizeLimit: defaultHeadSizeLimit, + totalSizeLimit: defaultTotalSizeLimit, + groupCheckDuration: defaultGroupCheckDuration, + minIndex: 0, + maxIndex: 0, } + + for _, option := range groupOptions { + option(g) + } + g.BaseService = *cmn.NewBaseService(nil, "Group", g) gInfo := g.readGroupInfo() @@ -98,10 +105,31 @@ func OpenGroup(headPath string) (g *Group, err error) { return } +// GroupCheckDuration allows you to overwrite default groupCheckDuration. +func GroupCheckDuration(duration time.Duration) func(*Group) { + return func(g *Group) { + g.groupCheckDuration = duration + } +} + +// GroupHeadSizeLimit allows you to overwrite default head size limit - 10MB. +func GroupHeadSizeLimit(limit int64) func(*Group) { + return func(g *Group) { + g.headSizeLimit = limit + } +} + +// GroupTotalSizeLimit allows you to overwrite default total size limit of the group - 1GB. +func GroupTotalSizeLimit(limit int64) func(*Group) { + return func(g *Group) { + g.totalSizeLimit = limit + } +} + // OnStart implements Service by starting the goroutine that checks file and // group limits. func (g *Group) OnStart() error { - g.ticker = time.NewTicker(groupCheckDuration) + g.ticker = time.NewTicker(g.groupCheckDuration) go g.processTicks() return nil } @@ -122,13 +150,6 @@ func (g *Group) Close() { g.mtx.Unlock() } -// SetHeadSizeLimit allows you to overwrite default head size limit - 10MB. -func (g *Group) SetHeadSizeLimit(limit int64) { - g.mtx.Lock() - g.headSizeLimit = limit - g.mtx.Unlock() -} - // HeadSizeLimit returns the current head size limit. func (g *Group) HeadSizeLimit() int64 { g.mtx.Lock() @@ -136,14 +157,6 @@ func (g *Group) HeadSizeLimit() int64 { return g.headSizeLimit } -// SetTotalSizeLimit allows you to overwrite default total size limit of the -// group - 1GB. -func (g *Group) SetTotalSizeLimit(limit int64) { - g.mtx.Lock() - g.totalSizeLimit = limit - g.mtx.Unlock() -} - // TotalSizeLimit returns total size limit of the group. func (g *Group) TotalSizeLimit() int64 { g.mtx.Lock() @@ -266,6 +279,14 @@ func (g *Group) RotateFile() { headPath := g.Head.Path + if err := g.headBuf.Flush(); err != nil { + panic(err) //panic is used for consistent with below + } + + if err := g.Head.Sync(); err != nil { + panic(err) + } + if err := g.Head.closeFile(); err != nil { panic(err) } @@ -275,6 +296,12 @@ func (g *Group) RotateFile() { panic(err) } + //make sure head file exist, there is a window time between rename and next write + //when NewReader(maxIndex), lead to "open /tmp/wal058868562/wal: no such file or directory" + if err := g.Head.openFile(); err != nil { + panic(err) + } + g.maxIndex++ } diff --git a/libs/autofile/group_test.go b/libs/autofile/group_test.go index d87bdba82..e173e4996 100644 --- a/libs/autofile/group_test.go +++ b/libs/autofile/group_test.go @@ -23,12 +23,10 @@ func createTestGroupWithHeadSizeLimit(t *testing.T, headSizeLimit int64) *Group require.NoError(t, err, "Error creating dir") headPath := testDir + "/myfile" - g, err := OpenGroup(headPath) + g, err := OpenGroup(headPath, GroupHeadSizeLimit(headSizeLimit)) require.NoError(t, err, "Error opening Group") require.NotEqual(t, nil, g, "Failed to create Group") - g.SetHeadSizeLimit(headSizeLimit) - return g }