|
|
@ -67,9 +67,10 @@ type WAL interface { |
|
|
|
} |
|
|
|
|
|
|
|
// Write ahead logger writes msgs to disk before they are processed.
|
|
|
|
// Can be used for crash-recovery and deterministic replay
|
|
|
|
// TODO: currently the wal is overwritten during replay catchup
|
|
|
|
// give it a mode so it's either reading or appending - must read to end to start appending again
|
|
|
|
// Can be used for crash-recovery and deterministic replay.
|
|
|
|
// TODO: currently the wal is overwritten during replay catchup, give it a mode
|
|
|
|
// so it's either reading or appending - must read to end to start appending
|
|
|
|
// again.
|
|
|
|
type baseWAL struct { |
|
|
|
cmn.BaseService |
|
|
|
|
|
|
@ -81,9 +82,8 @@ type baseWAL struct { |
|
|
|
flushInterval time.Duration |
|
|
|
} |
|
|
|
|
|
|
|
// NewWAL attempts to create a new write-ahead logger based on `baseWAL`, which
|
|
|
|
// implements all of the required WAL functionality. This base WAL also flushes
|
|
|
|
// data to disk every 2s.
|
|
|
|
// NewWAL returns a new write-ahead logger based on `baseWAL`, which implements
|
|
|
|
// WAL. It's flushed and synced to disk every 2s and once when stopped.
|
|
|
|
func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*baseWAL, error) { |
|
|
|
err := cmn.EnsureDir(filepath.Dir(walFile), 0700) |
|
|
|
if err != nil { |
|
|
@ -130,13 +130,11 @@ func (wal *baseWAL) OnStart() error { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// processFlushTicks allows us to periodically attempt to sync the WAL to disk.
|
|
|
|
func (wal *baseWAL) processFlushTicks() { |
|
|
|
for { |
|
|
|
select { |
|
|
|
case <-wal.flushTicker.C: |
|
|
|
err := wal.Flush() |
|
|
|
if err != nil { |
|
|
|
if err := wal.Flush(); err != nil { |
|
|
|
wal.Logger.Error("Periodic WAL flush failed", "err", err) |
|
|
|
} |
|
|
|
case <-wal.Quit(): |
|
|
@ -145,7 +143,7 @@ func (wal *baseWAL) processFlushTicks() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Flush will attempt to flush the underlying group's data to disk.
|
|
|
|
// Flush will attempt to flush and fsync the underlying group's data to disk.
|
|
|
|
func (wal *baseWAL) Flush() error { |
|
|
|
return wal.group.Flush() |
|
|
|
} |
|
|
|