Browse Source

Group is a BaseService; TotalSizeLimit enforced; tests fixed

pull/1842/head
Jae Kwon 8 years ago
parent
commit
a528af55d3
2 changed files with 108 additions and 36 deletions
  1. +89
    -28
      group.go
  2. +19
    -8
      group_test.go

+ 89
- 28
group.go View File

@ -3,7 +3,9 @@ package autofile
import (
"bufio"
"errors"
"fmt"
"io"
"log"
"os"
"path"
"path/filepath"
@ -45,10 +47,14 @@ The Group can also be used to binary-search for some line,
assuming that marker lines are written occasionally.
*/
const groupCheckDuration = 1000 * time.Millisecond
const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
const groupCheckDuration = 5000 * time.Millisecond
const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
const defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
const maxFilesToRemove = 4 // needs to be greater than 1
type Group struct {
BaseService
ID string
Head *AutoFile // The head AutoFile to write to
headBuf *bufio.Writer
@ -64,23 +70,43 @@ type Group struct {
// and their dependencies.
}
func OpenGroup(head *AutoFile) (g *Group, err error) {
dir := path.Dir(head.Path)
func OpenGroup(headPath string) (g *Group, err error) {
dir := path.Dir(headPath)
head, err := OpenAutoFile(headPath)
if err != nil {
return nil, err
}
g = &Group{
ID: "group:" + head.ID,
Head: head,
headBuf: bufio.NewWriterSize(head, 4096*10),
Dir: dir,
ticker: time.NewTicker(groupCheckDuration),
headSizeLimit: defaultHeadSizeLimit,
minIndex: 0,
maxIndex: 0,
ID: "group:" + head.ID,
Head: head,
headBuf: bufio.NewWriterSize(head, 4096*10),
Dir: dir,
ticker: time.NewTicker(groupCheckDuration),
headSizeLimit: defaultHeadSizeLimit,
totalSizeLimit: defaultTotalSizeLimit,
minIndex: 0,
maxIndex: 0,
}
g.BaseService = *NewBaseService(nil, "Group", g)
gInfo := g.readGroupInfo()
g.minIndex = gInfo.MinIndex
g.maxIndex = gInfo.MaxIndex
return
}
func (g *Group) OnStart() error {
g.BaseService.OnStart()
go g.processTicks()
return nil
}
// NOTE: g.Head must be closed separately
func (g *Group) OnStop() {
g.BaseService.OnStop()
g.ticker.Stop()
return
}
@ -118,14 +144,16 @@ func (g *Group) MaxIndex() int {
// NOTE: Writes are buffered so they don't write synchronously
// TODO: Make it halt if space is unavailable
func (g *Group) WriteLine(line string) error {
g.mtx.Lock()
defer g.mtx.Unlock()
_, err := g.headBuf.Write([]byte(line + "\n"))
return err
}
// NOTE: g.Head must be closed separately
func (g *Group) Close() error {
g.ticker.Stop()
return nil
func (g *Group) Flush() error {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.headBuf.Flush()
}
func (g *Group) processTicks() {
@ -146,25 +174,58 @@ func (g *Group) stopTicker() {
// NOTE: this function is called manually in tests.
func (g *Group) checkHeadSizeLimit() {
limit := g.HeadSizeLimit()
if limit == 0 {
return
}
size, err := g.Head.Size()
if err != nil {
panic(err)
}
if size >= g.HeadSizeLimit() {
if size >= limit {
g.RotateFile()
}
}
func (g *Group) checkTotalSizeLimit() {
// TODO enforce total size limit
// CHALLENGE
limit := g.TotalSizeLimit()
if limit == 0 {
return
}
gInfo := g.readGroupInfo()
totalSize := gInfo.TotalSize
for i := 0; i < maxFilesToRemove; i++ {
fmt.Println(">>", gInfo, totalSize, i)
index := gInfo.MinIndex + i
if totalSize < limit {
return
}
if index == gInfo.MaxIndex {
// Special degenerate case, just do nothing.
log.Println("WARNING: Group's head " + g.Head.Path + "may grow without bound")
return
}
pathToRemove := filePathForIndex(g.Head.Path, gInfo.MinIndex, gInfo.MaxIndex)
fileInfo, err := os.Stat(pathToRemove)
if err != nil {
log.Println("WARNING: Failed to fetch info for file @" + pathToRemove)
continue
}
err = os.Remove(pathToRemove)
if err != nil {
log.Println(err)
return
}
totalSize -= fileInfo.Size()
}
}
func (g *Group) RotateFile() {
g.mtx.Lock()
defer g.mtx.Unlock()
dstPath := filePathForIndex(g.Head.Path, g.maxIndex)
dstPath := filePathForIndex(g.Head.Path, g.maxIndex, g.maxIndex+1)
err := os.Rename(g.Head.Path, dstPath)
if err != nil {
panic(err)
@ -429,8 +490,12 @@ func (g *Group) readGroupInfo() GroupInfo {
return GroupInfo{minIndex, maxIndex, totalSize, headSize}
}
func filePathForIndex(headPath string, index int) string {
return fmt.Sprintf("%v.%03d", headPath, index)
func filePathForIndex(headPath string, index int, maxIndex int) string {
if index == maxIndex {
return headPath
} else {
return fmt.Sprintf("%v.%03d", headPath, index)
}
}
//--------------------------------------------------------------------------------
@ -522,15 +587,11 @@ func (gr *GroupReader) openFile(index int) error {
gr.Group.mtx.Lock()
defer gr.Group.mtx.Unlock()
var curFilePath string
if index == gr.Group.maxIndex {
curFilePath = gr.Head.Path
} else if index > gr.Group.maxIndex {
if index > gr.Group.maxIndex {
return io.EOF
} else {
curFilePath = filePathForIndex(gr.Head.Path, index)
}
curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex)
curFile, err := os.Open(curFilePath)
if err != nil {
return err


+ 19
- 8
group_test.go View File

@ -21,11 +21,7 @@ func createTestGroup(t *testing.T, headSizeLimit int64) *Group {
t.Fatal("Error creating dir", err)
}
headPath := testDir + "/myfile"
autofile, err := OpenAutoFile(headPath)
if err != nil {
t.Fatal("Error opening AutoFile", headPath, err)
}
g, err := OpenGroup(autofile)
g, err := OpenGroup(headPath)
if err != nil {
t.Fatal("Error opening Group", err)
}
@ -73,6 +69,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
t.Fatal("Error appending to head", err)
}
}
g.Flush()
assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000)
// Even calling checkHeadSizeLimit manually won't rotate it.
@ -84,6 +81,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
if err != nil {
t.Fatal("Error appending to head", err)
}
g.Flush()
// Calling checkHeadSizeLimit this time rolls it.
g.checkHeadSizeLimit()
@ -94,6 +92,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
if err != nil {
t.Fatal("Error appending to head", err)
}
g.Flush()
// Calling checkHeadSizeLimit does nothing.
g.checkHeadSizeLimit()
@ -106,6 +105,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
t.Fatal("Error appending to head", err)
}
}
g.Flush()
assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2000000, 1000000)
// Calling checkHeadSizeLimit rolls it again.
@ -117,6 +117,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
if err != nil {
t.Fatal("Error appending to head", err)
}
g.Flush()
assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2001000, 1000)
// Calling checkHeadSizeLimit does nothing.
@ -256,10 +257,12 @@ func TestRotateFile(t *testing.T) {
g.WriteLine("Line 1")
g.WriteLine("Line 2")
g.WriteLine("Line 3")
g.Flush()
g.RotateFile()
g.WriteLine("Line 4")
g.WriteLine("Line 5")
g.WriteLine("Line 6")
g.Flush()
// Read g.Head.Path+"000"
body1, err := ioutil.ReadFile(g.Head.Path + ".000")
@ -290,11 +293,13 @@ func TestFindLast1(t *testing.T) {
g.WriteLine("Line 2")
g.WriteLine("# a")
g.WriteLine("Line 3")
g.Flush()
g.RotateFile()
g.WriteLine("Line 4")
g.WriteLine("Line 5")
g.WriteLine("Line 6")
g.WriteLine("# b")
g.Flush()
match, found, err := g.FindLast("#")
if err != nil {
@ -303,7 +308,7 @@ func TestFindLast1(t *testing.T) {
if !found {
t.Error("Expected found=True")
}
if match != "# b\n" {
if match != "# b" {
t.Errorf("Unexpected match: [%v]", match)
}
@ -317,12 +322,14 @@ func TestFindLast2(t *testing.T) {
g.WriteLine("Line 1")
g.WriteLine("Line 2")
g.WriteLine("Line 3")
g.Flush()
g.RotateFile()
g.WriteLine("# a")
g.WriteLine("Line 4")
g.WriteLine("Line 5")
g.WriteLine("# b")
g.WriteLine("Line 6")
g.Flush()
match, found, err := g.FindLast("#")
if err != nil {
@ -331,7 +338,7 @@ func TestFindLast2(t *testing.T) {
if !found {
t.Error("Expected found=True")
}
if match != "# b\n" {
if match != "# b" {
t.Errorf("Unexpected match: [%v]", match)
}
@ -347,10 +354,12 @@ func TestFindLast3(t *testing.T) {
g.WriteLine("Line 2")
g.WriteLine("# b")
g.WriteLine("Line 3")
g.Flush()
g.RotateFile()
g.WriteLine("Line 4")
g.WriteLine("Line 5")
g.WriteLine("Line 6")
g.Flush()
match, found, err := g.FindLast("#")
if err != nil {
@ -359,7 +368,7 @@ func TestFindLast3(t *testing.T) {
if !found {
t.Error("Expected found=True")
}
if match != "# b\n" {
if match != "# b" {
t.Errorf("Unexpected match: [%v]", match)
}
@ -373,10 +382,12 @@ func TestFindLast4(t *testing.T) {
g.WriteLine("Line 1")
g.WriteLine("Line 2")
g.WriteLine("Line 3")
g.Flush()
g.RotateFile()
g.WriteLine("Line 4")
g.WriteLine("Line 5")
g.WriteLine("Line 6")
g.Flush()
match, found, err := g.FindLast("#")
if err != nil {


Loading…
Cancel
Save