Browse Source

autofile: ensure files are not reopened after closing (#7628)

During file rotation and WAL shutdown, there was a race condition between users
of an autofile and its termination. To fix this, ensure operations on an
autofile are properly synchronized, and report errors when attempting to use an
autofile after it was closed.

Notably:

- Simplify the cancellation protocol between signal and Close.
- Exclude writers to an autofile during rotation.
- Add documentation about what is going on.

There is a lot more that could be improved here, but this addresses the more
obvious races that have been panicking unit tests.
pull/7629/head
M. J. Fromberger 3 years ago
committed by GitHub
parent
commit
a7eb95065d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 73 additions and 51 deletions
  1. +61
    -40
      internal/libs/autofile/autofile.go
  2. +1
    -1
      internal/libs/autofile/autofile_test.go
  3. +11
    -10
      internal/libs/autofile/group.go

+ 61
- 40
internal/libs/autofile/autofile.go View File

@ -2,6 +2,8 @@ package autofile
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"path/filepath"
@ -39,6 +41,10 @@ const (
autoFilePerms = os.FileMode(0600)
)
// errAutoFileClosed is reported when operations attempt to use an autofile
// after it has been closed.
var errAutoFileClosed = errors.New("autofile is closed")
// AutoFile automatically closes and re-opens file for writing. The file is
// automatically setup to close itself every 1s and upon receiving SIGHUP.
//
@ -47,12 +53,12 @@ type AutoFile struct {
ID string
Path string
closeTicker *time.Ticker
closeTickerStopc chan struct{} // closed when closeTicker is stopped
hupc chan os.Signal
closeTicker *time.Ticker // signals periodic close
cancel func() // cancels the lifecycle context
mtx sync.Mutex
file *os.File
mtx sync.Mutex // guards the fields below
closed bool // true when the the autofile is no longer usable
file *os.File // the underlying file (may be nil)
}
// OpenAutoFile creates an AutoFile in the path (with random ID). If there is
@ -64,24 +70,28 @@ func OpenAutoFile(ctx context.Context, path string) (*AutoFile, error) {
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
af := &AutoFile{
ID: tmrand.Str(12) + ":" + path,
Path: path,
closeTicker: time.NewTicker(autoFileClosePeriod),
closeTickerStopc: make(chan struct{}),
ID: tmrand.Str(12) + ":" + path,
Path: path,
closeTicker: time.NewTicker(autoFileClosePeriod),
cancel: cancel,
}
if err := af.openFile(); err != nil {
af.Close()
return nil, err
}
// Close file on SIGHUP.
af.hupc = make(chan os.Signal, 1)
signal.Notify(af.hupc, syscall.SIGHUP)
// Set up a SIGHUP handler to forcibly flush and close the filehandle.
// This forces the next operation to re-open the underlying path.
hupc := make(chan os.Signal, 1)
signal.Notify(hupc, syscall.SIGHUP)
go func() {
defer close(hupc)
for {
select {
case <-af.hupc:
case <-hupc:
_ = af.closeFile()
case <-ctx.Done():
return
@ -94,42 +104,47 @@ func OpenAutoFile(ctx context.Context, path string) (*AutoFile, error) {
return af, nil
}
// Close shuts down the closing goroutine, SIGHUP handler and closes the
// AutoFile.
// Close shuts down the service goroutine and marks af as invalid. Operations
// on af after Close will report an error.
func (af *AutoFile) Close() error {
af.closeTicker.Stop()
close(af.closeTickerStopc)
if af.hupc != nil {
close(af.hupc)
}
return af.closeFile()
return af.withLock(func() error {
af.cancel() // signal the close service to stop
af.closed = true // mark the file as invalid
return af.unsyncCloseFile()
})
}
func (af *AutoFile) closeFileRoutine(ctx context.Context) {
for {
select {
case <-ctx.Done():
_ = af.closeFile()
_ = af.Close()
return
case <-af.closeTicker.C:
_ = af.closeFile()
case <-af.closeTickerStopc:
return
}
}
}
func (af *AutoFile) closeFile() (err error) {
af.mtx.Lock()
defer af.mtx.Unlock()
return af.withLock(af.unsyncCloseFile)
}
file := af.file
if file == nil {
return nil
// unsyncCloseFile closes the underlying filehandle if one is open, and reports
// any error it returns. The caller must hold af.mtx exclusively.
func (af *AutoFile) unsyncCloseFile() error {
if fp := af.file; fp != nil {
af.file = nil
return fp.Close()
}
return nil
}
af.file = nil
return file.Close()
// withLock runs f while holding af.mtx, and reports any error it returns.
func (af *AutoFile) withLock(f func() error) error {
af.mtx.Lock()
defer af.mtx.Unlock()
return f()
}
// Write writes len(b) bytes to the AutoFile. It returns the number of bytes
@ -139,6 +154,9 @@ func (af *AutoFile) closeFile() (err error) {
func (af *AutoFile) Write(b []byte) (n int, err error) {
af.mtx.Lock()
defer af.mtx.Unlock()
if af.closed {
return 0, fmt.Errorf("write: %w", errAutoFileClosed)
}
if af.file == nil {
if err = af.openFile(); err != nil {
@ -153,19 +171,19 @@ func (af *AutoFile) Write(b []byte) (n int, err error) {
// Sync commits the current contents of the file to stable storage. Typically,
// this means flushing the file system's in-memory copy of recently written
// data to disk.
// Opens AutoFile if needed.
func (af *AutoFile) Sync() error {
af.mtx.Lock()
defer af.mtx.Unlock()
if af.file == nil {
if err := af.openFile(); err != nil {
return err
return af.withLock(func() error {
if af.closed {
return fmt.Errorf("sync: %w", errAutoFileClosed)
} else if af.file == nil {
return nil // nothing to sync
}
}
return af.file.Sync()
return af.file.Sync()
})
}
// openFile unconditionally replaces af.file with a new filehandle on the path.
// The caller must hold af.mtx exclusively.
func (af *AutoFile) openFile() error {
file, err := os.OpenFile(af.Path, os.O_RDWR|os.O_CREATE|os.O_APPEND, autoFilePerms)
if err != nil {
@ -188,6 +206,9 @@ func (af *AutoFile) openFile() error {
func (af *AutoFile) Size() (int64, error) {
af.mtx.Lock()
defer af.mtx.Unlock()
if af.closed {
return 0, fmt.Errorf("size: %w", errAutoFileClosed)
}
if af.file == nil {
if err := af.openFile(); err != nil {


+ 1
- 1
internal/libs/autofile/autofile_test.go View File

@ -134,7 +134,7 @@ func TestAutoFileSize(t *testing.T) {
require.NoError(t, err)
// 3. Not existing file
require.NoError(t, af.Close())
require.NoError(t, af.closeFile())
require.NoError(t, os.Remove(f.Name()))
size, err = af.Size()
require.EqualValues(t, 0, size, "Expected a new file to be empty")


+ 11
- 10
internal/libs/autofile/group.go View File

@ -56,6 +56,7 @@ assuming that marker lines are written occasionally.
type Group struct {
service.BaseService
logger log.Logger
ctx context.Context
ID string
Head *AutoFile // The head AutoFile to write to
@ -92,6 +93,7 @@ func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOpt
g := &Group{
logger: logger,
ctx: ctx,
ID: "group:" + head.ID,
Head: head,
headBuf: bufio.NewWriterSize(head, 4096*10),
@ -168,7 +170,7 @@ func (g *Group) Close() {
}
g.mtx.Lock()
_ = g.Head.closeFile()
_ = g.Head.Close()
g.mtx.Unlock()
}
@ -304,7 +306,6 @@ func (g *Group) checkTotalSizeLimit() {
}
// RotateFile causes group to close the current head and assign it some index.
// Note it does not create a new head.
func (g *Group) RotateFile() {
g.mtx.Lock()
defer g.mtx.Unlock()
@ -314,20 +315,20 @@ func (g *Group) RotateFile() {
if err := g.headBuf.Flush(); err != nil {
panic(err)
}
if err := g.Head.Sync(); err != nil {
panic(err)
}
err := g.Head.withLock(func() error {
if err := g.Head.unsyncCloseFile(); err != nil {
return err
}
if err := g.Head.closeFile(); err != nil {
panic(err)
}
indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
if err := os.Rename(headPath, indexPath); err != nil {
indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
return os.Rename(headPath, indexPath)
})
if err != nil {
panic(err)
}
g.maxIndex++
}


Loading…
Cancel
Save