Browse Source

FindLast

pull/1842/head
Jae Kwon 8 years ago
parent
commit
1261fca160
2 changed files with 235 additions and 22 deletions
  1. +84
    -12
      group.go
  2. +151
    -10
      group_test.go

+ 84
- 12
group.go View File

@ -168,10 +168,16 @@ func (g *Group) RotateFile() {
g.maxIndex += 1 g.maxIndex += 1
} }
func (g *Group) NewReader(index int) *GroupReader {
// NOTE: if error, returns no GroupReader.
// CONTRACT: Caller must close the returned GroupReader
func (g *Group) NewReader(index int) (*GroupReader, error) {
r := newGroupReader(g) r := newGroupReader(g)
r.SetIndex(index)
return r
err := r.SetIndex(index)
if err != nil {
return nil, err
} else {
return r, nil
}
} }
// Returns -1 if line comes after, 0 if found, 1 if line comes before. // Returns -1 if line comes after, 0 if found, 1 if line comes before.
@ -181,7 +187,7 @@ type SearchFunc func(line string) (int, error)
// then returns a GroupReader to start streaming lines // then returns a GroupReader to start streaming lines
// Returns true if an exact match was found, otherwise returns // Returns true if an exact match was found, otherwise returns
// the next greater line that starts with prefix. // the next greater line that starts with prefix.
// CONTRACT: caller is responsible for closing GroupReader.
// CONTRACT: Caller must close the returned GroupReader
func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) { func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) {
g.mtx.Lock() g.mtx.Lock()
minIndex, maxIndex := g.minIndex, g.maxIndex minIndex, maxIndex := g.minIndex, g.maxIndex
@ -195,7 +201,10 @@ func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error
// Base case, when there's only 1 choice left. // Base case, when there's only 1 choice left.
if minIndex == maxIndex { if minIndex == maxIndex {
r := g.NewReader(maxIndex)
r, err := g.NewReader(maxIndex)
if err != nil {
return nil, false, err
}
match, err := scanUntil(r, prefix, cmp) match, err := scanUntil(r, prefix, cmp)
if err != nil { if err != nil {
r.Close() r.Close()
@ -207,8 +216,11 @@ func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error
// Read starting roughly at the middle file, // Read starting roughly at the middle file,
// until we find line that has prefix. // until we find line that has prefix.
r := g.NewReader(curIndex)
foundIndex, line, err := scanFirst(r, prefix)
r, err := g.NewReader(curIndex)
if err != nil {
return nil, false, err
}
foundIndex, line, err := scanNext(r, prefix)
r.Close() r.Close()
if err != nil { if err != nil {
return nil, false, err return nil, false, err
@ -224,7 +236,10 @@ func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error
minIndex = foundIndex minIndex = foundIndex
} else if val == 0 { } else if val == 0 {
// Stroke of luck, found the line // Stroke of luck, found the line
r := g.NewReader(foundIndex)
r, err := g.NewReader(foundIndex)
if err != nil {
return nil, false, err
}
match, err := scanUntil(r, prefix, cmp) match, err := scanUntil(r, prefix, cmp)
if !match { if !match {
panic("Expected match to be true") panic("Expected match to be true")
@ -244,7 +259,8 @@ func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error
} }
// Scans and returns the first line that starts with 'prefix' // Scans and returns the first line that starts with 'prefix'
func scanFirst(r *GroupReader, prefix string) (int, string, error) {
// Consumes line and returns it.
func scanNext(r *GroupReader, prefix string) (int, string, error) {
for { for {
line, err := r.ReadLine() line, err := r.ReadLine()
if err != nil { if err != nil {
@ -259,6 +275,7 @@ func scanFirst(r *GroupReader, prefix string) (int, string, error) {
} }
// Returns true iff an exact match was found. // Returns true iff an exact match was found.
// Pushes line, does not consume it.
func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) { func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) {
for { for {
line, err := r.ReadLine() line, err := r.ReadLine()
@ -284,6 +301,47 @@ func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) {
} }
} }
// Searches for the last line in Group with prefix.
func (g *Group) FindLast(prefix string) (match string, found bool, err error) {
g.mtx.Lock()
minIndex, maxIndex := g.minIndex, g.maxIndex
g.mtx.Unlock()
r, err := g.NewReader(maxIndex)
if err != nil {
return "", false, err
}
defer r.Close()
// Open files from the back and read
GROUP_LOOP:
for i := maxIndex; i >= minIndex; i-- {
err := r.SetIndex(i)
if err != nil {
return "", false, err
}
// Scan each line and test whether line matches
for {
line, err := r.ReadLineInCurrent()
if err == io.EOF {
if found {
return match, found, nil
} else {
continue GROUP_LOOP
}
} else if err != nil {
return "", false, err
}
if strings.HasPrefix(line, prefix) {
match = line
found = true
}
}
}
return
}
type GroupInfo struct { type GroupInfo struct {
MinIndex int MinIndex int
MaxIndex int MaxIndex int
@ -399,6 +457,18 @@ func (gr *GroupReader) Close() error {
func (gr *GroupReader) ReadLine() (string, error) { func (gr *GroupReader) ReadLine() (string, error) {
gr.mtx.Lock() gr.mtx.Lock()
defer gr.mtx.Unlock() defer gr.mtx.Unlock()
return gr.readLineWithOptions(false)
}
func (gr *GroupReader) ReadLineInCurrent() (string, error) {
gr.mtx.Lock()
defer gr.mtx.Unlock()
return gr.readLineWithOptions(true)
}
// curFileOnly: if True, do not open new files,
// just return io.EOF if no new lines found.
func (gr *GroupReader) readLineWithOptions(curFileOnly bool) (string, error) {
// From PushLine // From PushLine
if gr.curLine != nil { if gr.curLine != nil {
@ -420,7 +490,9 @@ func (gr *GroupReader) ReadLine() (string, error) {
bytes, err := gr.curReader.ReadBytes('\n') bytes, err := gr.curReader.ReadBytes('\n')
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
return string(bytes), err
return "", err
} else if curFileOnly {
return "", err
} else { } else {
// Open the next file // Open the next file
err := gr.openFile(gr.curIndex + 1) err := gr.openFile(gr.curIndex + 1)
@ -483,8 +555,8 @@ func (gr *GroupReader) CurIndex() int {
return gr.curIndex return gr.curIndex
} }
func (gr *GroupReader) SetIndex(index int) {
func (gr *GroupReader) SetIndex(index int) error {
gr.mtx.Lock() gr.mtx.Lock()
defer gr.mtx.Unlock() defer gr.mtx.Unlock()
gr.openFile(index)
return gr.openFile(index)
} }

+ 151
- 10
group_test.go View File

@ -3,6 +3,7 @@ package autofile
import ( import (
"errors" "errors"
"io" "io"
"io/ioutil"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@ -30,6 +31,10 @@ func createTestGroup(t *testing.T, headSizeLimit int64) *Group {
} }
g.SetHeadSizeLimit(headSizeLimit) g.SetHeadSizeLimit(headSizeLimit)
g.stopTicker() g.stopTicker()
if g == nil {
t.Fatal("Failed to create Group")
}
return g return g
} }
@ -57,16 +62,13 @@ func assertGroupInfo(t *testing.T, gInfo GroupInfo, minIndex, maxIndex int, tota
func TestCheckHeadSizeLimit(t *testing.T) { func TestCheckHeadSizeLimit(t *testing.T) {
g := createTestGroup(t, 1000*1000) g := createTestGroup(t, 1000*1000)
if g == nil {
t.Error("Failed to create Group")
}
// At first, there are no files. // At first, there are no files.
assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 0, 0) assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 0, 0)
// Write 1000 bytes 999 times. // Write 1000 bytes 999 times.
for i := 0; i < 999; i++ { for i := 0; i < 999; i++ {
_, err := g.Head.Write([]byte(RandStr(999) + "\n"))
err := g.WriteLine(RandStr(999))
if err != nil { if err != nil {
t.Fatal("Error appending to head", err) t.Fatal("Error appending to head", err)
} }
@ -78,7 +80,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000) assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000)
// Write 1000 more bytes. // Write 1000 more bytes.
_, err := g.Head.Write([]byte(RandStr(999) + "\n"))
err := g.WriteLine(RandStr(999))
if err != nil { if err != nil {
t.Fatal("Error appending to head", err) t.Fatal("Error appending to head", err)
} }
@ -88,7 +90,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 1000000, 0) assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 1000000, 0)
// Write 1000 more bytes. // Write 1000 more bytes.
_, err = g.Head.Write([]byte(RandStr(999) + "\n"))
err = g.WriteLine(RandStr(999))
if err != nil { if err != nil {
t.Fatal("Error appending to head", err) t.Fatal("Error appending to head", err)
} }
@ -99,7 +101,7 @@ func TestCheckHeadSizeLimit(t *testing.T) {
// Write 1000 bytes 999 times. // Write 1000 bytes 999 times.
for i := 0; i < 999; i++ { for i := 0; i < 999; i++ {
_, err := g.Head.Write([]byte(RandStr(999) + "\n"))
err := g.WriteLine(RandStr(999))
if err != nil { if err != nil {
t.Fatal("Error appending to head", err) t.Fatal("Error appending to head", err)
} }
@ -127,9 +129,6 @@ func TestCheckHeadSizeLimit(t *testing.T) {
func TestSearch(t *testing.T) { func TestSearch(t *testing.T) {
g := createTestGroup(t, 10*1000) g := createTestGroup(t, 10*1000)
if g == nil {
t.Error("Failed to create Group")
}
// Create some files in the group that have several INFO lines in them. // Create some files in the group that have several INFO lines in them.
// Try to put the INFO lines in various spots. // Try to put the INFO lines in various spots.
@ -251,3 +250,145 @@ func TestSearch(t *testing.T) {
// Cleanup // Cleanup
destroyTestGroup(t, g) destroyTestGroup(t, g)
} }
func TestRotateFile(t *testing.T) {
g := createTestGroup(t, 0)
g.WriteLine("Line 1")
g.WriteLine("Line 2")
g.WriteLine("Line 3")
g.RotateFile()
g.WriteLine("Line 4")
g.WriteLine("Line 5")
g.WriteLine("Line 6")
// Read g.Head.Path+"000"
body1, err := ioutil.ReadFile(g.Head.Path + ".000")
if err != nil {
t.Error("Failed to read first rolled file")
}
if string(body1) != "Line 1\nLine 2\nLine 3\n" {
t.Errorf("Got unexpected contents: [%v]", string(body1))
}
// Read g.Head.Path
body2, err := ioutil.ReadFile(g.Head.Path)
if err != nil {
t.Error("Failed to read first rolled file")
}
if string(body2) != "Line 4\nLine 5\nLine 6\n" {
t.Errorf("Got unexpected contents: [%v]", string(body2))
}
// Cleanup
destroyTestGroup(t, g)
}
func TestFindLast1(t *testing.T) {
g := createTestGroup(t, 0)
g.WriteLine("Line 1")
g.WriteLine("Line 2")
g.WriteLine("# a")
g.WriteLine("Line 3")
g.RotateFile()
g.WriteLine("Line 4")
g.WriteLine("Line 5")
g.WriteLine("Line 6")
g.WriteLine("# b")
match, found, err := g.FindLast("#")
if err != nil {
t.Error("Unexpected error", err)
}
if !found {
t.Error("Expected found=True")
}
if match != "# b\n" {
t.Errorf("Unexpected match: [%v]", match)
}
// Cleanup
destroyTestGroup(t, g)
}
func TestFindLast2(t *testing.T) {
g := createTestGroup(t, 0)
g.WriteLine("Line 1")
g.WriteLine("Line 2")
g.WriteLine("Line 3")
g.RotateFile()
g.WriteLine("# a")
g.WriteLine("Line 4")
g.WriteLine("Line 5")
g.WriteLine("# b")
g.WriteLine("Line 6")
match, found, err := g.FindLast("#")
if err != nil {
t.Error("Unexpected error", err)
}
if !found {
t.Error("Expected found=True")
}
if match != "# b\n" {
t.Errorf("Unexpected match: [%v]", match)
}
// Cleanup
destroyTestGroup(t, g)
}
func TestFindLast3(t *testing.T) {
g := createTestGroup(t, 0)
g.WriteLine("Line 1")
g.WriteLine("# a")
g.WriteLine("Line 2")
g.WriteLine("# b")
g.WriteLine("Line 3")
g.RotateFile()
g.WriteLine("Line 4")
g.WriteLine("Line 5")
g.WriteLine("Line 6")
match, found, err := g.FindLast("#")
if err != nil {
t.Error("Unexpected error", err)
}
if !found {
t.Error("Expected found=True")
}
if match != "# b\n" {
t.Errorf("Unexpected match: [%v]", match)
}
// Cleanup
destroyTestGroup(t, g)
}
func TestFindLast4(t *testing.T) {
g := createTestGroup(t, 0)
g.WriteLine("Line 1")
g.WriteLine("Line 2")
g.WriteLine("Line 3")
g.RotateFile()
g.WriteLine("Line 4")
g.WriteLine("Line 5")
g.WriteLine("Line 6")
match, found, err := g.FindLast("#")
if err != nil {
t.Error("Unexpected error", err)
}
if found {
t.Error("Expected found=False")
}
if match != "" {
t.Errorf("Unexpected match: [%v]", match)
}
// Cleanup
destroyTestGroup(t, g)
}

Loading…
Cancel
Save