From 1859c4d5fe2a0cbb0071b010ef8f604bb397feca Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 26 Oct 2016 16:23:19 -0700 Subject: [PATCH] First commit --- .gitignore | 2 + autofile.go | 116 ++++++++++++++ autofile_test.go | 73 +++++++++ group.go | 396 ++++++++++++++++++++++++++++++++++++++++++++++ group_test.go | 110 +++++++++++++ sighup_watcher.go | 63 ++++++++ 6 files changed, 760 insertions(+) create mode 100644 .gitignore create mode 100644 autofile.go create mode 100644 autofile_test.go create mode 100644 group.go create mode 100644 group_test.go create mode 100644 sighup_watcher.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..381931381 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.swp +*.swo diff --git a/autofile.go b/autofile.go new file mode 100644 index 000000000..ed9d549bf --- /dev/null +++ b/autofile.go @@ -0,0 +1,116 @@ +package autofile + +import ( + . "github.com/tendermint/go-common" + "os" + "sync" + "time" +) + +/* AutoFile usage + +// Create/Append to ./autofile_test +af, err := OpenAutoFile("autofile_test") +if err != nil { + panic(err) +} + +// Stream of writes. +// During this time, the file may be moved e.g. by logRotate. +for i := 0; i < 60; i++ { + af.Write([]byte(Fmt("LOOP(%v)", i))) + time.Sleep(time.Second) +} + +// Close the AutoFile +err = af.Close() +if err != nil { + panic(err) +} +*/ + +const autoFileOpenDuration = 1000 * time.Millisecond + +// Automatically closes and re-opens file for writing. +// This is useful for using a log file with the logrotate tool. +type AutoFile struct { + ID string + Path string + ticker *time.Ticker + mtx sync.Mutex + file *os.File +} + +func OpenAutoFile(path string) (af *AutoFile, err error) { + af = &AutoFile{ + ID: RandStr(12) + ":" + path, + Path: path, + ticker: time.NewTicker(autoFileOpenDuration), + } + if err = af.openFile(); err != nil { + return + } + go af.processTicks() + sighupWatchers.addAutoFile(af) + return +} + +func (af *AutoFile) Close() error { + af.ticker.Stop() + err := af.closeFile() + sighupWatchers.removeAutoFile(af) + return err +} + +func (af *AutoFile) processTicks() { + for { + _, ok := <-af.ticker.C + if !ok { + return // Done. + } + af.closeFile() + } +} + +func (af *AutoFile) closeFile() (err error) { + af.mtx.Lock() + defer af.mtx.Unlock() + + file := af.file + if file == nil { + return nil + } + af.file = nil + return file.Close() +} + +func (af *AutoFile) Write(b []byte) (n int, err error) { + af.mtx.Lock() + defer af.mtx.Unlock() + if af.file == nil { + if err = af.openFile(); err != nil { + return + } + } + return af.file.Write(b) +} + +func (af *AutoFile) openFile() error { + file, err := os.OpenFile(af.Path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600) + if err != nil { + return err + } + af.file = file + return nil +} + +func (af *AutoFile) Size() (int64, error) { + af.mtx.Lock() + defer af.mtx.Unlock() + stat, err := af.file.Stat() + if err != nil { + return -1, err + } + return stat.Size(), nil + +} diff --git a/autofile_test.go b/autofile_test.go new file mode 100644 index 000000000..243125ca6 --- /dev/null +++ b/autofile_test.go @@ -0,0 +1,73 @@ +package autofile + +import ( + . "github.com/tendermint/go-common" + "os" + "sync/atomic" + "syscall" + "testing" + "time" +) + +func TestSIGHUP(t *testing.T) { + + // First, create an AutoFile writing to a tempfile dir + file, name := Tempfile("sighup_test") + err := file.Close() + if err != nil { + t.Fatalf("Error creating tempfile: %v", err) + } + // Here is the actual AutoFile + af, err := OpenAutoFile(name) + if err != nil { + t.Fatalf("Error creating autofile: %v", err) + } + + // Write to the file. + _, err = af.Write([]byte("Line 1\n")) + if err != nil { + t.Fatalf("Error writing to autofile: %v", err) + } + _, err = af.Write([]byte("Line 2\n")) + if err != nil { + t.Fatalf("Error writing to autofile: %v", err) + } + + // Move the file over + err = os.Rename(name, name+"_old") + if err != nil { + t.Fatalf("Error moving autofile: %v", err) + } + + // Send SIGHUP to self. + oldSighupCounter := atomic.LoadInt32(&sighupCounter) + syscall.Kill(syscall.Getpid(), syscall.SIGHUP) + + // Wait a bit... signals are not handled synchronously. + for atomic.LoadInt32(&sighupCounter) == oldSighupCounter { + time.Sleep(time.Millisecond * 10) + } + + // Write more to the file. + _, err = af.Write([]byte("Line 3\n")) + if err != nil { + t.Fatalf("Error writing to autofile: %v", err) + } + _, err = af.Write([]byte("Line 4\n")) + if err != nil { + t.Fatalf("Error writing to autofile: %v", err) + } + err = af.Close() + if err != nil { + t.Fatalf("Error closing autofile") + } + + // Both files should exist + if body := MustReadFile(name + "_old"); string(body) != "Line 1\nLine 2\n" { + t.Errorf("Unexpected body %s", body) + } + if body := MustReadFile(name); string(body) != "Line 3\nLine 4\n" { + t.Errorf("Unexpected body %s", body) + } + +} diff --git a/group.go b/group.go new file mode 100644 index 000000000..c0d199e1d --- /dev/null +++ b/group.go @@ -0,0 +1,396 @@ +package autofile + +import ( + "bufio" + "fmt" + "io" + "os" + "path" + "path/filepath" + "regexp" + "strconv" + "strings" + "sync" + "time" +) + +/* +You can open a Group to keep restrictions on an AutoFile, like +the maximum size of each chunk, and/or the total amount of bytes +stored in the group. + +The Group can also be used to binary-search, and to read atomically +with respect to the Group's Head (the AutoFile being appended to) +*/ + +const groupCheckDuration = 1000 * time.Millisecond + +type Group struct { + ID string + Head *AutoFile // The head AutoFile to write to + Dir string // Directory that contains .Head + ticker *time.Ticker + mtx sync.Mutex + headSizeLimit int64 + totalSizeLimit int64 +} + +func OpenGroup(head *AutoFile) (g *Group, err error) { + dir := path.Dir(head.Path) + + g = &Group{ + ID: "group:" + head.ID, + Head: head, + Dir: dir, + ticker: time.NewTicker(groupCheckDuration), + } + go g.processTicks() + return +} + +func (g *Group) SetHeadSizeLimit(limit int64) { + g.mtx.Lock() + g.headSizeLimit = limit + g.mtx.Unlock() +} + +func (g *Group) HeadSizeLimit() int64 { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.headSizeLimit +} + +func (g *Group) SetTotalSizeLimit(limit int64) { + g.mtx.Lock() + g.totalSizeLimit = limit + g.mtx.Unlock() +} + +func (g *Group) TotalSizeLimit() int64 { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.totalSizeLimit +} + +func (g *Group) Close() error { + g.ticker.Stop() + return nil +} + +func (g *Group) processTicks() { + for { + _, ok := <-g.ticker.C + if !ok { + return // Done. + } + // TODO Check head size limit + // TODO check total size limit + } +} + +// NOTE: for testing +func (g *Group) stopTicker() { + g.ticker.Stop() +} + +// NOTE: this function is called manually in tests. +func (g *Group) checkHeadSizeLimit() { + size, err := g.Head.Size() + if err != nil { + panic(err) + } + if size >= g.HeadSizeLimit() { + g.RotateFile() + } +} + +func (g *Group) checkTotalSizeLimit() { + // TODO enforce total size limit +} + +func (g *Group) RotateFile() { + g.mtx.Lock() + defer g.mtx.Unlock() + + gInfo := g.readGroupInfo() + dstPath := filePathForIndex(g.Head.Path, gInfo.MaxIndex+1) + err := os.Rename(g.Head.Path, dstPath) + if err != nil { + panic(err) + } + err = g.Head.closeFile() + if err != nil { + panic(err) + } +} + +func (g *Group) NewReader(index int) *GroupReader { + r := newGroupReader(g) + r.SetIndex(index) + return r +} + +// Returns -1 if line comes after, 0 if found, 1 if line comes before. +type SearchFunc func(line string) (int, error) + +// Searches for the right file in Group, +// then returns a GroupReader to start streaming lines +// CONTRACT: caller is responsible for closing GroupReader. +func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, error) { + gInfo := g.ReadGroupInfo() + minIndex, maxIndex := gInfo.MinIndex, gInfo.MaxIndex + curIndex := (minIndex + maxIndex + 1) / 2 + + for { + + // Base case, when there's only 1 choice left. + if minIndex == maxIndex { + r := g.NewReader(maxIndex) + err := scanUntil(r, prefix, cmp) + if err != nil { + r.Close() + return nil, err + } else { + return r, err + } + } + + // Read starting roughly at the middle file, + // until we find line that has prefix. + r := g.NewReader(curIndex) + foundIndex, line, err := scanFirst(r, prefix) + r.Close() + if err != nil { + return nil, err + } + + // Compare this line to our search query. + val, err := cmp(line) + if err != nil { + return nil, err + } + if val < 0 { + // Line will come later + minIndex = foundIndex + } else if val == 0 { + // Stroke of luck, found the line + r := g.NewReader(foundIndex) + err := scanUntil(r, prefix, cmp) + if err != nil { + r.Close() + return nil, err + } else { + return r, err + } + } else { + // We passed it + maxIndex = curIndex - 1 + } + } + +} + +// Scans and returns the first line that starts with 'prefix' +func scanFirst(r *GroupReader, prefix string) (int, string, error) { + for { + line, err := r.ReadLine() + if err != nil { + return 0, "", err + } + if !strings.HasPrefix(line, prefix) { + continue + } + index := r.CurIndex() + return index, line, nil + } +} + +func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) error { + for { + line, err := r.ReadLine() + if err != nil { + return err + } + if !strings.HasPrefix(line, prefix) { + continue + } + val, err := cmp(line) + if err != nil { + return err + } + if val < 0 { + continue + } else { + r.PushLine(line) + return nil + } + } +} + +type GroupInfo struct { + MinIndex int + MaxIndex int + TotalSize int64 + HeadSize int64 +} + +// Returns info after scanning all files in g.Head's dir +func (g *Group) ReadGroupInfo() GroupInfo { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.readGroupInfo() +} + +// CONTRACT: caller should have called g.mtx.Lock +func (g *Group) readGroupInfo() GroupInfo { + groupDir := filepath.Dir(g.Head.Path) + headBase := filepath.Base(g.Head.Path) + var minIndex, maxIndex int = -1, -1 + var totalSize, headSize int64 = 0, 0 + + dir, err := os.Open(groupDir) + if err != nil { + panic(err) + } + fiz, err := dir.Readdir(0) + if err != nil { + panic(err) + } + + // For each file in the directory, filter by pattern + for _, fileInfo := range fiz { + if fileInfo.Name() == headBase { + fileSize := fileInfo.Size() + totalSize += fileSize + headSize = fileSize + continue + } else if strings.HasPrefix(fileInfo.Name(), headBase) { + fileSize := fileInfo.Size() + totalSize += fileSize + indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`) + submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name())) + if len(submatch) != 0 { + // Matches + fileIndex, err := strconv.Atoi(string(submatch[1])) + if err != nil { + panic(err) + } + if maxIndex < fileIndex { + maxIndex = fileIndex + } + if minIndex == -1 || fileIndex < minIndex { + minIndex = fileIndex + } + } + } + } + + return GroupInfo{minIndex, maxIndex, totalSize, headSize} +} + +func filePathForIndex(headPath string, index int) string { + return fmt.Sprintf("%v.%03d", headPath, index) +} + +//-------------------------------------------------------------------------------- + +type GroupReader struct { + *Group + mtx sync.Mutex + curIndex int + curFile *os.File + curReader *bufio.Reader + curLine []byte +} + +func newGroupReader(g *Group) *GroupReader { + return &GroupReader{ + Group: g, + curIndex: -1, + curFile: nil, + curReader: nil, + curLine: nil, + } +} + +func (g *GroupReader) ReadLine() (string, error) { + g.mtx.Lock() + defer g.mtx.Unlock() + + // From PushLine + if g.curLine != nil { + line := string(g.curLine) + g.curLine = nil + return line, nil + } + + // Open file if not open yet + if g.curReader == nil { + err := g.openFile(0) + if err != nil { + return "", err + } + } + + // Iterate over files until line is found + for { + bytes, err := g.curReader.ReadBytes('\n') + if err != nil { + if err != io.EOF { + return string(bytes), err + } else { + // Open the next file + err := g.openFile(g.curIndex + 1) + if err != nil { + return "", err + } + } + } + } +} + +// CONTRACT: caller should hold g.mtx +func (g *GroupReader) openFile(index int) error { + + // Lock on Group to ensure that head doesn't move in the meanwhile. + g.Group.mtx.Lock() + defer g.Group.mtx.Unlock() + + curFilePath := filePathForIndex(g.Head.Path, index) + curFile, err := os.Open(curFilePath) + if err != nil { + return err + } + curReader := bufio.NewReader(curFile) + + // Update g.cur* + g.curIndex = index + g.curFile = curFile + g.curReader = curReader + g.curLine = nil + return nil +} + +func (g *GroupReader) PushLine(line string) { + g.mtx.Lock() + defer g.mtx.Unlock() + + if g.curLine == nil { + g.curLine = []byte(line) + } else { + panic("PushLine failed, already have line") + } +} + +// Cursor's file index. +func (g *GroupReader) CurIndex() int { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.curIndex +} + +func (g *GroupReader) SetIndex(index int) { + g.mtx.Lock() + defer g.mtx.Unlock() + g.openFile(index) +} diff --git a/group_test.go b/group_test.go new file mode 100644 index 000000000..8c1b7b6a8 --- /dev/null +++ b/group_test.go @@ -0,0 +1,110 @@ +package autofile + +import ( + "testing" + + . "github.com/tendermint/go-common" +) + +func createTestGroup(t *testing.T, headPath string) *Group { + autofile, err := OpenAutoFile(headPath) + if err != nil { + t.Fatal("Error opening AutoFile", headPath, err) + } + g, err := OpenGroup(autofile) + if err != nil { + t.Fatal("Error opening Group", err) + } + return g +} + +func assertGroupInfo(t *testing.T, gInfo GroupInfo, minIndex, maxIndex int, totalSize, headSize int64) { + if gInfo.MinIndex != minIndex { + t.Errorf("GroupInfo MinIndex expected %v, got %v", minIndex, gInfo.MinIndex) + } + if gInfo.MaxIndex != maxIndex { + t.Errorf("GroupInfo MaxIndex expected %v, got %v", maxIndex, gInfo.MaxIndex) + } + if gInfo.TotalSize != totalSize { + t.Errorf("GroupInfo TotalSize expected %v, got %v", totalSize, gInfo.TotalSize) + } + if gInfo.HeadSize != headSize { + t.Errorf("GroupInfo HeadSize expected %v, got %v", headSize, gInfo.HeadSize) + } +} + +func TestCreateGroup(t *testing.T) { + testID := RandStr(12) + testDir := "_test_" + testID + err := EnsureDir(testDir, 0700) + if err != nil { + t.Fatal("Error creating dir", err) + } + + g := createTestGroup(t, testDir+"/myfile") + if g == nil { + t.Error("Failed to create Group") + } + g.SetHeadSizeLimit(1000 * 1000) + g.stopTicker() + + // At first, there are no files. + assertGroupInfo(t, g.ReadGroupInfo(), -1, -1, 0, 0) + + // Write 1000 bytes 999 times. + for i := 0; i < 999; i++ { + _, err := g.Head.Write([]byte(RandStr(999) + "\n")) + if err != nil { + t.Fatal("Error appending to head", err) + } + } + assertGroupInfo(t, g.ReadGroupInfo(), -1, -1, 999000, 999000) + + // Even calling checkHeadSizeLimit manually won't rotate it. + g.checkHeadSizeLimit() + assertGroupInfo(t, g.ReadGroupInfo(), -1, -1, 999000, 999000) + + // Write 1000 more bytes. + _, err = g.Head.Write([]byte(RandStr(999) + "\n")) + if err != nil { + t.Fatal("Error appending to head", err) + } + + // Calling checkHeadSizeLimit this time rolls it. + g.checkHeadSizeLimit() + assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 1000000, 0) + + // Write 1000 more bytes. + _, err = g.Head.Write([]byte(RandStr(999) + "\n")) + if err != nil { + t.Fatal("Error appending to head", err) + } + + // Calling checkHeadSizeLimit does nothing. + g.checkHeadSizeLimit() + assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 1001000, 1000) + + // Write 1000 bytes 999 times. + for i := 0; i < 999; i++ { + _, err := g.Head.Write([]byte(RandStr(999) + "\n")) + if err != nil { + t.Fatal("Error appending to head", err) + } + } + assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 2000000, 1000000) + + // Calling checkHeadSizeLimit rolls it again. + g.checkHeadSizeLimit() + assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2000000, 0) + + // Write 1000 more bytes. + _, err = g.Head.Write([]byte(RandStr(999) + "\n")) + if err != nil { + t.Fatal("Error appending to head", err) + } + assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2001000, 1000) + + // Calling checkHeadSizeLimit does nothing. + g.checkHeadSizeLimit() + assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2001000, 1000) +} diff --git a/sighup_watcher.go b/sighup_watcher.go new file mode 100644 index 000000000..facc238d5 --- /dev/null +++ b/sighup_watcher.go @@ -0,0 +1,63 @@ +package autofile + +import ( + "os" + "os/signal" + "sync" + "sync/atomic" + "syscall" +) + +func init() { + initSighupWatcher() +} + +var sighupWatchers *SighupWatcher +var sighupCounter int32 // For testing + +func initSighupWatcher() { + sighupWatchers = newSighupWatcher() + + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGHUP) + + go func() { + for _ = range c { + sighupWatchers.closeAll() + atomic.AddInt32(&sighupCounter, 1) + } + }() +} + +// Watchces for SIGHUP events and notifies registered AutoFiles +type SighupWatcher struct { + mtx sync.Mutex + autoFiles map[string]*AutoFile +} + +func newSighupWatcher() *SighupWatcher { + return &SighupWatcher{ + autoFiles: make(map[string]*AutoFile, 10), + } +} + +func (w *SighupWatcher) addAutoFile(af *AutoFile) { + w.mtx.Lock() + w.autoFiles[af.ID] = af + w.mtx.Unlock() +} + +// If AutoFile isn't registered or was already removed, does nothing. +func (w *SighupWatcher) removeAutoFile(af *AutoFile) { + w.mtx.Lock() + delete(w.autoFiles, af.ID) + w.mtx.Unlock() +} + +func (w *SighupWatcher) closeAll() { + w.mtx.Lock() + for _, af := range w.autoFiles { + af.closeFile() + } + w.mtx.Unlock() +}