From 498fb1134a550761f2cc0acef5cf9fbc76fb2562 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 17 Oct 2017 16:26:52 +0400 Subject: [PATCH 1/8] write docs for autofile/group --- autofile/group.go | 73 +++++++++++++++++++++++++++++------------------ 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/autofile/group.go b/autofile/group.go index eedb67b50..b66b5c692 100644 --- a/autofile/group.go +++ b/autofile/group.go @@ -18,6 +18,13 @@ import ( . "github.com/tendermint/tmlibs/common" ) +const ( + groupCheckDuration = 5000 * time.Millisecond + defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB + defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB + maxFilesToRemove = 4 // needs to be greater than 1 +) + /* You can open a Group to keep restrictions on an AutoFile, like the maximum size of each chunk, and/or the total amount of bytes @@ -25,33 +32,27 @@ stored in the group. The first file to be written in the Group.Dir is the head file. - Dir/ - - + Dir/ + - Once the Head file reaches the size limit, it will be rotated. - Dir/ - - .000 // First rolled file - - // New head path, starts empty. - // The implicit index is 001. + Dir/ + - .000 // First rolled file + - // New head path, starts empty. + // The implicit index is 001. As more files are written, the index numbers grow... - Dir/ - - .000 // First rolled file - - .001 // Second rolled file - - ... - - // New head path + Dir/ + - .000 // First rolled file + - .001 // Second rolled file + - ... + - // New head path The Group can also be used to binary-search for some line, assuming that marker lines are written occasionally. */ - -const groupCheckDuration = 5000 * time.Millisecond -const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB -const defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB -const maxFilesToRemove = 4 // needs to be greater than 1 - type Group struct { BaseService @@ -109,37 +110,43 @@ func (g *Group) OnStop() { g.ticker.Stop() } +// SetHeadSizeLimit allows you to overwrite default head size limit - 10MB. func (g *Group) SetHeadSizeLimit(limit int64) { g.mtx.Lock() g.headSizeLimit = limit g.mtx.Unlock() } +// HeadSizeLimit returns the current head size limit. func (g *Group) HeadSizeLimit() int64 { g.mtx.Lock() defer g.mtx.Unlock() return g.headSizeLimit } +// SetTotalSizeLimit allows you to overwrite default total size limit of the +// group - 1GB. func (g *Group) SetTotalSizeLimit(limit int64) { g.mtx.Lock() g.totalSizeLimit = limit g.mtx.Unlock() } +// TotalSizeLimit returns total size limit of the group. func (g *Group) TotalSizeLimit() int64 { g.mtx.Lock() defer g.mtx.Unlock() return g.totalSizeLimit } +// MaxIndex returns index of the last file in the group. func (g *Group) MaxIndex() int { g.mtx.Lock() defer g.mtx.Unlock() return g.maxIndex } -// Auto appends "\n" +// WriteLine writes line into the current head of the group. It also appends "\n". // NOTE: Writes are buffered so they don't write synchronously // TODO: Make it halt if space is unavailable func (g *Group) WriteLine(line string) error { @@ -149,6 +156,8 @@ func (g *Group) WriteLine(line string) error { return err } +// Flush writes any buffered data to the underlying file and commits the +// current content of the file to stable storage. func (g *Group) Flush() error { g.mtx.Lock() defer g.mtx.Unlock() @@ -223,6 +232,8 @@ func (g *Group) checkTotalSizeLimit() { } } +// RotateFile causes group to close the current head and assign it some index. +// Note it does not create a new head. func (g *Group) RotateFile() { g.mtx.Lock() defer g.mtx.Unlock() @@ -241,8 +252,8 @@ func (g *Group) RotateFile() { g.maxIndex += 1 } -// NOTE: if error, returns no GroupReader. -// CONTRACT: Caller must close the returned GroupReader +// NewReader returns a new group reader. +// CONTRACT: Caller must close the returned GroupReader. func (g *Group) NewReader(index int) (*GroupReader, error) { r := newGroupReader(g) err := r.SetIndex(index) @@ -423,14 +434,15 @@ GROUP_LOOP: return } +// GroupInfo holds information about the group. type GroupInfo struct { - MinIndex int - MaxIndex int - TotalSize int64 - HeadSize int64 + MinIndex int // index of the first file in the group, including head + MaxIndex int // index of the last file in the group, including head + TotalSize int64 // total size of the group + HeadSize int64 // size of the head } -// Returns info after scanning all files in g.Head's dir +// Returns info after scanning all files in g.Head's dir. func (g *Group) ReadGroupInfo() GroupInfo { g.mtx.Lock() defer g.mtx.Unlock() @@ -505,6 +517,7 @@ func filePathForIndex(headPath string, index int, maxIndex int) string { //-------------------------------------------------------------------------------- +// GroupReader provides an interface for reading from a Group. type GroupReader struct { *Group mtx sync.Mutex @@ -524,6 +537,7 @@ func newGroupReader(g *Group) *GroupReader { } } +// Close closes the GroupReader by closing the cursor file. func (gr *GroupReader) Close() error { gr.mtx.Lock() defer gr.mtx.Unlock() @@ -540,7 +554,7 @@ func (gr *GroupReader) Close() error { } } -// Reads a line (without delimiter) +// ReadLine reads a line (without delimiter). // just return io.EOF if no new lines found. func (gr *GroupReader) ReadLine() (string, error) { gr.mtx.Lock() @@ -613,6 +627,9 @@ func (gr *GroupReader) openFile(index int) error { return nil } +// PushLine makes the given line the current one, so the next time somebody +// calls ReadLine, this line will be returned. +// panics if called twice without calling ReadLine. func (gr *GroupReader) PushLine(line string) { gr.mtx.Lock() defer gr.mtx.Unlock() @@ -624,13 +641,15 @@ func (gr *GroupReader) PushLine(line string) { } } -// Cursor's file index. +// CurIndex returns cursor's file index. func (gr *GroupReader) CurIndex() int { gr.mtx.Lock() defer gr.mtx.Unlock() return gr.curIndex } +// SetIndex sets the cursor's file index to index by opening a file at this +// position. func (gr *GroupReader) SetIndex(index int) error { gr.mtx.Lock() defer gr.mtx.Unlock() From 45095e83e790624980240e83628d6971cafa5495 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 17 Oct 2017 16:48:44 +0400 Subject: [PATCH 2/8] add Write method to autofile/Group --- autofile/group.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/autofile/group.go b/autofile/group.go index b66b5c692..947c295f8 100644 --- a/autofile/group.go +++ b/autofile/group.go @@ -146,6 +146,17 @@ func (g *Group) MaxIndex() int { return g.maxIndex } +// Write writes the contents of p into the current head of the group. It +// returns the number of bytes written. If nn < len(p), it also returns an +// error explaining why the write is short. +// NOTE: Writes are buffered so they don't write synchronously +// TODO: Make it halt if space is unavailable +func (g *Group) Write(p []byte) (nn int, err error) { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.headBuf.Write(p) +} + // WriteLine writes line into the current head of the group. It also appends "\n". // NOTE: Writes are buffered so they don't write synchronously // TODO: Make it halt if space is unavailable From aace56018a5f70c09a2ab26b280c943a85aba5d7 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 20 Oct 2017 12:38:45 +0400 Subject: [PATCH 3/8] add Read method to GroupReader --- autofile/group.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/autofile/group.go b/autofile/group.go index 947c295f8..2c6aa6109 100644 --- a/autofile/group.go +++ b/autofile/group.go @@ -565,6 +565,42 @@ func (gr *GroupReader) Close() error { } } +// Read implements io.Reader, reading bytes from the current Reader +// incrementing index until enough bytes are read. +func (gr *GroupReader) Read(p []byte) (n int, err error) { + gr.mtx.Lock() + defer gr.mtx.Unlock() + + // Open file if not open yet + if gr.curReader == nil { + if err = gr.openFile(gr.curIndex); err != nil { + return 0, err + } + } + + // Iterate over files until enough bytes are read + lenP := len(p) + for { + nn, err := gr.curReader.Read(p[n:]) + n += nn + if err == io.EOF { + // Open the next file + if err1 := gr.openFile(gr.curIndex + 1); err1 != nil { + return n, err1 + } + if n >= lenP { + return n, nil + } else { + continue + } + } else if err != nil { + return n, err + } + } + + return n, err +} + // ReadLine reads a line (without delimiter). // just return io.EOF if no new lines found. func (gr *GroupReader) ReadLine() (string, error) { From 35e81018e9bd183be7121b6b900dff3d49e234d5 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 20 Oct 2017 13:09:11 +0400 Subject: [PATCH 4/8] add MinIndex method to Group --- autofile/group.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/autofile/group.go b/autofile/group.go index 2c6aa6109..d5797d087 100644 --- a/autofile/group.go +++ b/autofile/group.go @@ -146,6 +146,13 @@ func (g *Group) MaxIndex() int { return g.maxIndex } +// MinIndex returns index of the first file in the group. +func (g *Group) MinIndex() int { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.minIndex +} + // Write writes the contents of p into the current head of the group. It // returns the number of bytes written. If nn < len(p), it also returns an // error explaining why the write is short. From c75ddd0fa3f669c1b391291a10361ddf8c5170bf Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 23 Oct 2017 13:02:02 +0400 Subject: [PATCH 5/8] return err if empty slice given --- autofile/group.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/autofile/group.go b/autofile/group.go index d5797d087..4b3cd6565 100644 --- a/autofile/group.go +++ b/autofile/group.go @@ -575,6 +575,11 @@ func (gr *GroupReader) Close() error { // Read implements io.Reader, reading bytes from the current Reader // incrementing index until enough bytes are read. func (gr *GroupReader) Read(p []byte) (n int, err error) { + lenP := len(p) + if lenP == 0 { + return 0, errors.New("given empty slice") + } + gr.mtx.Lock() defer gr.mtx.Unlock() @@ -586,7 +591,6 @@ func (gr *GroupReader) Read(p []byte) (n int, err error) { } // Iterate over files until enough bytes are read - lenP := len(p) for { nn, err := gr.curReader.Read(p[n:]) n += nn From 21b2c26fb1b26edf5846792890e01eaa8a472508 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 23 Oct 2017 13:02:14 +0400 Subject: [PATCH 6/8] GroupReader#Read: return io.EOF if file is empty --- autofile/group.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/autofile/group.go b/autofile/group.go index 4b3cd6565..6d70a3dbb 100644 --- a/autofile/group.go +++ b/autofile/group.go @@ -606,6 +606,8 @@ func (gr *GroupReader) Read(p []byte) (n int, err error) { } } else if err != nil { return n, err + } else if nn == 0 { // empty file + return n, err } } From 81591e288e87eba7735df53207f74a09ba5f289a Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 24 Oct 2017 23:19:53 +0400 Subject: [PATCH 7/8] fix metalinter warnings --- autofile/group.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/autofile/group.go b/autofile/group.go index 6d70a3dbb..bbf77d27e 100644 --- a/autofile/group.go +++ b/autofile/group.go @@ -591,8 +591,9 @@ func (gr *GroupReader) Read(p []byte) (n int, err error) { } // Iterate over files until enough bytes are read + var nn int for { - nn, err := gr.curReader.Read(p[n:]) + nn, err = gr.curReader.Read(p[n:]) n += nn if err == io.EOF { // Open the next file @@ -610,8 +611,6 @@ func (gr *GroupReader) Read(p []byte) (n int, err error) { return n, err } } - - return n, err } // ReadLine reads a line (without delimiter). From 103fee61921ee8bebd055bedd0815ddc71e03d90 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 24 Oct 2017 23:20:17 +0400 Subject: [PATCH 8/8] add tests for autofile group Write, reader#Read --- autofile/group_test.go | 91 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/autofile/group_test.go b/autofile/group_test.go index 0cfcef72f..398ea3ae9 100644 --- a/autofile/group_test.go +++ b/autofile/group_test.go @@ -1,6 +1,7 @@ package autofile import ( + "bytes" "errors" "io" "io/ioutil" @@ -400,3 +401,93 @@ func TestFindLast4(t *testing.T) { // Cleanup destroyTestGroup(t, g) } + +func TestWrite(t *testing.T) { + g := createTestGroup(t, 0) + + written := []byte("Medusa") + g.Write(written) + g.Flush() + + read := make([]byte, len(written)) + gr, err := g.NewReader(0) + if err != nil { + t.Fatalf("Failed to create reader: %v", err) + } + _, err = gr.Read(read) + if err != nil { + t.Fatalf("Failed to read data: %v", err) + } + + if !bytes.Equal(written, read) { + t.Errorf("%s, %s should be equal", string(written), string(read)) + } + + // Cleanup + destroyTestGroup(t, g) +} + +func TestGroupReaderRead(t *testing.T) { + g := createTestGroup(t, 0) + + professor := []byte("Professor Monster") + g.Write(professor) + g.Flush() + g.RotateFile() + frankenstein := []byte("Frankenstein's Monster") + g.Write(frankenstein) + g.Flush() + + totalWrittenLength := len(professor) + len(frankenstein) + read := make([]byte, totalWrittenLength) + gr, err := g.NewReader(0) + if err != nil { + t.Fatalf("Failed to create reader: %v", err) + } + n, err := gr.Read(read) + if err != nil { + t.Fatalf("Failed to read data: %v", err) + } + if n != totalWrittenLength { + t.Errorf("Failed to read enough bytes: wanted %d, but read %d", totalWrittenLength, n) + } + + professorPlusFrankenstein := professor + professorPlusFrankenstein = append(professorPlusFrankenstein, frankenstein...) + if !bytes.Equal(read, professorPlusFrankenstein) { + t.Errorf("%s, %s should be equal", string(professorPlusFrankenstein), string(read)) + } + + // Cleanup + destroyTestGroup(t, g) +} + +func TestMinIndex(t *testing.T) { + g := createTestGroup(t, 0) + + if g.MinIndex() != 0 { + t.Error("MinIndex should be zero at the beginning") + } + + // Cleanup + destroyTestGroup(t, g) +} + +func TestMaxIndex(t *testing.T) { + g := createTestGroup(t, 0) + + if g.MaxIndex() != 0 { + t.Error("MaxIndex should be zero at the beginning") + } + + g.WriteLine("Line 1") + g.Flush() + g.RotateFile() + + if g.MaxIndex() != 1 { + t.Error("MaxIndex should point to the last file") + } + + // Cleanup + destroyTestGroup(t, g) +}