Browse Source

write docs for autofile/group

pull/1842/head
Anton Kaliaev 7 years ago
parent
commit
498fb1134a
No known key found for this signature in database GPG Key ID: 7B6881D965918214
1 changed files with 46 additions and 27 deletions
  1. +46
    -27
      autofile/group.go

+ 46
- 27
autofile/group.go View File

@ -18,6 +18,13 @@ import (
. "github.com/tendermint/tmlibs/common" . "github.com/tendermint/tmlibs/common"
) )
const (
groupCheckDuration = 5000 * time.Millisecond
defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
maxFilesToRemove = 4 // needs to be greater than 1
)
/* /*
You can open a Group to keep restrictions on an AutoFile, like You can open a Group to keep restrictions on an AutoFile, like
the maximum size of each chunk, and/or the total amount of bytes the maximum size of each chunk, and/or the total amount of bytes
@ -25,33 +32,27 @@ stored in the group.
The first file to be written in the Group.Dir is the head file. The first file to be written in the Group.Dir is the head file.
Dir/
- <HeadPath>
Dir/
- <HeadPath>
Once the Head file reaches the size limit, it will be rotated. Once the Head file reaches the size limit, it will be rotated.
Dir/
- <HeadPath>.000 // First rolled file
- <HeadPath> // New head path, starts empty.
// The implicit index is 001.
Dir/
- <HeadPath>.000 // First rolled file
- <HeadPath> // New head path, starts empty.
// The implicit index is 001.
As more files are written, the index numbers grow... As more files are written, the index numbers grow...
Dir/
- <HeadPath>.000 // First rolled file
- <HeadPath>.001 // Second rolled file
- ...
- <HeadPath> // New head path
Dir/
- <HeadPath>.000 // First rolled file
- <HeadPath>.001 // Second rolled file
- ...
- <HeadPath> // New head path
The Group can also be used to binary-search for some line, The Group can also be used to binary-search for some line,
assuming that marker lines are written occasionally. assuming that marker lines are written occasionally.
*/ */
const groupCheckDuration = 5000 * time.Millisecond
const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
const defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
const maxFilesToRemove = 4 // needs to be greater than 1
type Group struct { type Group struct {
BaseService BaseService
@ -109,37 +110,43 @@ func (g *Group) OnStop() {
g.ticker.Stop() g.ticker.Stop()
} }
// SetHeadSizeLimit allows you to overwrite default head size limit - 10MB.
func (g *Group) SetHeadSizeLimit(limit int64) { func (g *Group) SetHeadSizeLimit(limit int64) {
g.mtx.Lock() g.mtx.Lock()
g.headSizeLimit = limit g.headSizeLimit = limit
g.mtx.Unlock() g.mtx.Unlock()
} }
// HeadSizeLimit returns the current head size limit.
func (g *Group) HeadSizeLimit() int64 { func (g *Group) HeadSizeLimit() int64 {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
return g.headSizeLimit return g.headSizeLimit
} }
// SetTotalSizeLimit allows you to overwrite default total size limit of the
// group - 1GB.
func (g *Group) SetTotalSizeLimit(limit int64) { func (g *Group) SetTotalSizeLimit(limit int64) {
g.mtx.Lock() g.mtx.Lock()
g.totalSizeLimit = limit g.totalSizeLimit = limit
g.mtx.Unlock() g.mtx.Unlock()
} }
// TotalSizeLimit returns total size limit of the group.
func (g *Group) TotalSizeLimit() int64 { func (g *Group) TotalSizeLimit() int64 {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
return g.totalSizeLimit return g.totalSizeLimit
} }
// MaxIndex returns index of the last file in the group.
func (g *Group) MaxIndex() int { func (g *Group) MaxIndex() int {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
return g.maxIndex return g.maxIndex
} }
// Auto appends "\n"
// WriteLine writes line into the current head of the group. It also appends "\n".
// NOTE: Writes are buffered so they don't write synchronously // NOTE: Writes are buffered so they don't write synchronously
// TODO: Make it halt if space is unavailable // TODO: Make it halt if space is unavailable
func (g *Group) WriteLine(line string) error { func (g *Group) WriteLine(line string) error {
@ -149,6 +156,8 @@ func (g *Group) WriteLine(line string) error {
return err return err
} }
// Flush writes any buffered data to the underlying file and commits the
// current content of the file to stable storage.
func (g *Group) Flush() error { func (g *Group) Flush() error {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
@ -223,6 +232,8 @@ func (g *Group) checkTotalSizeLimit() {
} }
} }
// RotateFile causes group to close the current head and assign it some index.
// Note it does not create a new head.
func (g *Group) RotateFile() { func (g *Group) RotateFile() {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
@ -241,8 +252,8 @@ func (g *Group) RotateFile() {
g.maxIndex += 1 g.maxIndex += 1
} }
// NOTE: if error, returns no GroupReader.
// CONTRACT: Caller must close the returned GroupReader
// NewReader returns a new group reader.
// CONTRACT: Caller must close the returned GroupReader.
func (g *Group) NewReader(index int) (*GroupReader, error) { func (g *Group) NewReader(index int) (*GroupReader, error) {
r := newGroupReader(g) r := newGroupReader(g)
err := r.SetIndex(index) err := r.SetIndex(index)
@ -423,14 +434,15 @@ GROUP_LOOP:
return return
} }
// GroupInfo holds information about the group.
type GroupInfo struct { type GroupInfo struct {
MinIndex int
MaxIndex int
TotalSize int64
HeadSize int64
MinIndex int // index of the first file in the group, including head
MaxIndex int // index of the last file in the group, including head
TotalSize int64 // total size of the group
HeadSize int64 // size of the head
} }
// Returns info after scanning all files in g.Head's dir
// Returns info after scanning all files in g.Head's dir.
func (g *Group) ReadGroupInfo() GroupInfo { func (g *Group) ReadGroupInfo() GroupInfo {
g.mtx.Lock() g.mtx.Lock()
defer g.mtx.Unlock() defer g.mtx.Unlock()
@ -505,6 +517,7 @@ func filePathForIndex(headPath string, index int, maxIndex int) string {
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
// GroupReader provides an interface for reading from a Group.
type GroupReader struct { type GroupReader struct {
*Group *Group
mtx sync.Mutex mtx sync.Mutex
@ -524,6 +537,7 @@ func newGroupReader(g *Group) *GroupReader {
} }
} }
// Close closes the GroupReader by closing the cursor file.
func (gr *GroupReader) Close() error { func (gr *GroupReader) Close() error {
gr.mtx.Lock() gr.mtx.Lock()
defer gr.mtx.Unlock() defer gr.mtx.Unlock()
@ -540,7 +554,7 @@ func (gr *GroupReader) Close() error {
} }
} }
// Reads a line (without delimiter)
// ReadLine reads a line (without delimiter).
// just return io.EOF if no new lines found. // just return io.EOF if no new lines found.
func (gr *GroupReader) ReadLine() (string, error) { func (gr *GroupReader) ReadLine() (string, error) {
gr.mtx.Lock() gr.mtx.Lock()
@ -613,6 +627,9 @@ func (gr *GroupReader) openFile(index int) error {
return nil return nil
} }
// PushLine makes the given line the current one, so the next time somebody
// calls ReadLine, this line will be returned.
// panics if called twice without calling ReadLine.
func (gr *GroupReader) PushLine(line string) { func (gr *GroupReader) PushLine(line string) {
gr.mtx.Lock() gr.mtx.Lock()
defer gr.mtx.Unlock() defer gr.mtx.Unlock()
@ -624,13 +641,15 @@ func (gr *GroupReader) PushLine(line string) {
} }
} }
// Cursor's file index.
// CurIndex returns cursor's file index.
func (gr *GroupReader) CurIndex() int { func (gr *GroupReader) CurIndex() int {
gr.mtx.Lock() gr.mtx.Lock()
defer gr.mtx.Unlock() defer gr.mtx.Unlock()
return gr.curIndex return gr.curIndex
} }
// SetIndex sets the cursor's file index to index by opening a file at this
// position.
func (gr *GroupReader) SetIndex(index int) error { func (gr *GroupReader) SetIndex(index int) error {
gr.mtx.Lock() gr.mtx.Lock()
defer gr.mtx.Unlock() defer gr.mtx.Unlock()


Loading…
Cancel
Save