From a528af55d3c8354f676b4a5f718ab51d9b9fbb9f Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 21 Nov 2016 19:09:14 -0800 Subject: [PATCH] Group is a BaseService; TotalSizeLimit enforced; tests fixed --- group.go | 117 ++++++++++++++++++++++++++++++++++++++------------ group_test.go | 27 ++++++++---- 2 files changed, 108 insertions(+), 36 deletions(-) diff --git a/group.go b/group.go index 1c9842496..a2584dfc7 100644 --- a/group.go +++ b/group.go @@ -3,7 +3,9 @@ package autofile import ( "bufio" "errors" + "fmt" "io" + "log" "os" "path" "path/filepath" @@ -45,10 +47,14 @@ The Group can also be used to binary-search for some line, assuming that marker lines are written occasionally. */ -const groupCheckDuration = 1000 * time.Millisecond -const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB +const groupCheckDuration = 5000 * time.Millisecond +const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB +const defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB +const maxFilesToRemove = 4 // needs to be greater than 1 type Group struct { + BaseService + ID string Head *AutoFile // The head AutoFile to write to headBuf *bufio.Writer @@ -64,23 +70,43 @@ type Group struct { // and their dependencies. } -func OpenGroup(head *AutoFile) (g *Group, err error) { - dir := path.Dir(head.Path) +func OpenGroup(headPath string) (g *Group, err error) { + + dir := path.Dir(headPath) + head, err := OpenAutoFile(headPath) + if err != nil { + return nil, err + } g = &Group{ - ID: "group:" + head.ID, - Head: head, - headBuf: bufio.NewWriterSize(head, 4096*10), - Dir: dir, - ticker: time.NewTicker(groupCheckDuration), - headSizeLimit: defaultHeadSizeLimit, - minIndex: 0, - maxIndex: 0, + ID: "group:" + head.ID, + Head: head, + headBuf: bufio.NewWriterSize(head, 4096*10), + Dir: dir, + ticker: time.NewTicker(groupCheckDuration), + headSizeLimit: defaultHeadSizeLimit, + totalSizeLimit: defaultTotalSizeLimit, + minIndex: 0, + maxIndex: 0, } + g.BaseService = *NewBaseService(nil, "Group", g) + gInfo := g.readGroupInfo() g.minIndex = gInfo.MinIndex g.maxIndex = gInfo.MaxIndex + return +} + +func (g *Group) OnStart() error { + g.BaseService.OnStart() go g.processTicks() + return nil +} + +// NOTE: g.Head must be closed separately +func (g *Group) OnStop() { + g.BaseService.OnStop() + g.ticker.Stop() return } @@ -118,14 +144,16 @@ func (g *Group) MaxIndex() int { // NOTE: Writes are buffered so they don't write synchronously // TODO: Make it halt if space is unavailable func (g *Group) WriteLine(line string) error { + g.mtx.Lock() + defer g.mtx.Unlock() _, err := g.headBuf.Write([]byte(line + "\n")) return err } -// NOTE: g.Head must be closed separately -func (g *Group) Close() error { - g.ticker.Stop() - return nil +func (g *Group) Flush() error { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.headBuf.Flush() } func (g *Group) processTicks() { @@ -146,25 +174,58 @@ func (g *Group) stopTicker() { // NOTE: this function is called manually in tests. func (g *Group) checkHeadSizeLimit() { + limit := g.HeadSizeLimit() + if limit == 0 { + return + } size, err := g.Head.Size() if err != nil { panic(err) } - if size >= g.HeadSizeLimit() { + if size >= limit { g.RotateFile() } } func (g *Group) checkTotalSizeLimit() { - // TODO enforce total size limit - // CHALLENGE + limit := g.TotalSizeLimit() + if limit == 0 { + return + } + + gInfo := g.readGroupInfo() + totalSize := gInfo.TotalSize + for i := 0; i < maxFilesToRemove; i++ { + fmt.Println(">>", gInfo, totalSize, i) + index := gInfo.MinIndex + i + if totalSize < limit { + return + } + if index == gInfo.MaxIndex { + // Special degenerate case, just do nothing. + log.Println("WARNING: Group's head " + g.Head.Path + "may grow without bound") + return + } + pathToRemove := filePathForIndex(g.Head.Path, gInfo.MinIndex, gInfo.MaxIndex) + fileInfo, err := os.Stat(pathToRemove) + if err != nil { + log.Println("WARNING: Failed to fetch info for file @" + pathToRemove) + continue + } + err = os.Remove(pathToRemove) + if err != nil { + log.Println(err) + return + } + totalSize -= fileInfo.Size() + } } func (g *Group) RotateFile() { g.mtx.Lock() defer g.mtx.Unlock() - dstPath := filePathForIndex(g.Head.Path, g.maxIndex) + dstPath := filePathForIndex(g.Head.Path, g.maxIndex, g.maxIndex+1) err := os.Rename(g.Head.Path, dstPath) if err != nil { panic(err) @@ -429,8 +490,12 @@ func (g *Group) readGroupInfo() GroupInfo { return GroupInfo{minIndex, maxIndex, totalSize, headSize} } -func filePathForIndex(headPath string, index int) string { - return fmt.Sprintf("%v.%03d", headPath, index) +func filePathForIndex(headPath string, index int, maxIndex int) string { + if index == maxIndex { + return headPath + } else { + return fmt.Sprintf("%v.%03d", headPath, index) + } } //-------------------------------------------------------------------------------- @@ -522,15 +587,11 @@ func (gr *GroupReader) openFile(index int) error { gr.Group.mtx.Lock() defer gr.Group.mtx.Unlock() - var curFilePath string - if index == gr.Group.maxIndex { - curFilePath = gr.Head.Path - } else if index > gr.Group.maxIndex { + if index > gr.Group.maxIndex { return io.EOF - } else { - curFilePath = filePathForIndex(gr.Head.Path, index) } + curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex) curFile, err := os.Open(curFilePath) if err != nil { return err diff --git a/group_test.go b/group_test.go index 672bd4d90..1c2280e83 100644 --- a/group_test.go +++ b/group_test.go @@ -21,11 +21,7 @@ func createTestGroup(t *testing.T, headSizeLimit int64) *Group { t.Fatal("Error creating dir", err) } headPath := testDir + "/myfile" - autofile, err := OpenAutoFile(headPath) - if err != nil { - t.Fatal("Error opening AutoFile", headPath, err) - } - g, err := OpenGroup(autofile) + g, err := OpenGroup(headPath) if err != nil { t.Fatal("Error opening Group", err) } @@ -73,6 +69,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { t.Fatal("Error appending to head", err) } } + g.Flush() assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000) // Even calling checkHeadSizeLimit manually won't rotate it. @@ -84,6 +81,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { if err != nil { t.Fatal("Error appending to head", err) } + g.Flush() // Calling checkHeadSizeLimit this time rolls it. g.checkHeadSizeLimit() @@ -94,6 +92,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { if err != nil { t.Fatal("Error appending to head", err) } + g.Flush() // Calling checkHeadSizeLimit does nothing. g.checkHeadSizeLimit() @@ -106,6 +105,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { t.Fatal("Error appending to head", err) } } + g.Flush() assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2000000, 1000000) // Calling checkHeadSizeLimit rolls it again. @@ -117,6 +117,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { if err != nil { t.Fatal("Error appending to head", err) } + g.Flush() assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2001000, 1000) // Calling checkHeadSizeLimit does nothing. @@ -256,10 +257,12 @@ func TestRotateFile(t *testing.T) { g.WriteLine("Line 1") g.WriteLine("Line 2") g.WriteLine("Line 3") + g.Flush() g.RotateFile() g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("Line 6") + g.Flush() // Read g.Head.Path+"000" body1, err := ioutil.ReadFile(g.Head.Path + ".000") @@ -290,11 +293,13 @@ func TestFindLast1(t *testing.T) { g.WriteLine("Line 2") g.WriteLine("# a") g.WriteLine("Line 3") + g.Flush() g.RotateFile() g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("Line 6") g.WriteLine("# b") + g.Flush() match, found, err := g.FindLast("#") if err != nil { @@ -303,7 +308,7 @@ func TestFindLast1(t *testing.T) { if !found { t.Error("Expected found=True") } - if match != "# b\n" { + if match != "# b" { t.Errorf("Unexpected match: [%v]", match) } @@ -317,12 +322,14 @@ func TestFindLast2(t *testing.T) { g.WriteLine("Line 1") g.WriteLine("Line 2") g.WriteLine("Line 3") + g.Flush() g.RotateFile() g.WriteLine("# a") g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("# b") g.WriteLine("Line 6") + g.Flush() match, found, err := g.FindLast("#") if err != nil { @@ -331,7 +338,7 @@ func TestFindLast2(t *testing.T) { if !found { t.Error("Expected found=True") } - if match != "# b\n" { + if match != "# b" { t.Errorf("Unexpected match: [%v]", match) } @@ -347,10 +354,12 @@ func TestFindLast3(t *testing.T) { g.WriteLine("Line 2") g.WriteLine("# b") g.WriteLine("Line 3") + g.Flush() g.RotateFile() g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("Line 6") + g.Flush() match, found, err := g.FindLast("#") if err != nil { @@ -359,7 +368,7 @@ func TestFindLast3(t *testing.T) { if !found { t.Error("Expected found=True") } - if match != "# b\n" { + if match != "# b" { t.Errorf("Unexpected match: [%v]", match) } @@ -373,10 +382,12 @@ func TestFindLast4(t *testing.T) { g.WriteLine("Line 1") g.WriteLine("Line 2") g.WriteLine("Line 3") + g.Flush() g.RotateFile() g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("Line 6") + g.Flush() match, found, err := g.FindLast("#") if err != nil {