|
|
- package consensus
-
- import (
- "bufio"
- "os"
- "time"
-
- . "github.com/tendermint/go-common"
- "github.com/tendermint/go-wire"
- "github.com/tendermint/tendermint/types"
- )
-
- //--------------------------------------------------------
- // types and functions for savings consensus messages
-
- type ConsensusLogMessage struct {
- Time time.Time `json:"time"`
- Msg ConsensusLogMessageInterface `json:"msg"`
- }
-
- type ConsensusLogMessageInterface interface{}
-
- var _ = wire.RegisterInterface(
- struct{ ConsensusLogMessageInterface }{},
- wire.ConcreteType{types.EventDataRoundState{}, 0x01},
- wire.ConcreteType{msgInfo{}, 0x02},
- wire.ConcreteType{timeoutInfo{}, 0x03},
- )
-
- //--------------------------------------------------------
- // Simple write-ahead logger
-
- // Write ahead logger writes msgs to disk before they are processed.
- // Can be used for crash-recovery and deterministic replay
- // TODO: currently the wal is overwritten during replay catchup
- // give it a mode so it's either reading or appending - must read to end to start appending again
- type WAL struct {
- fp *os.File
- exists bool // if the file already existed (restarted process)
-
- done chan struct{}
-
- light bool // ignore block parts
- }
-
- func NewWAL(file string, light bool) (*WAL, error) {
- var walExists bool
- if _, err := os.Stat(file); err == nil {
- walExists = true
- }
- fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
- if err != nil {
- return nil, err
- }
- return &WAL{
- fp: fp,
- exists: walExists,
- done: make(chan struct{}),
- light: light,
- }, nil
- }
-
- func (wal *WAL) Exists() bool {
- if wal == nil {
- log.Warn("consensus msg log is nil")
- return false
- }
- return wal.exists
- }
-
- // called in newStep and for each pass in receiveRoutine
- func (wal *WAL) Save(clm ConsensusLogMessageInterface) {
- if wal != nil {
- if wal.light {
- // in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts)
- if mi, ok := clm.(msgInfo); ok {
- _ = mi
- if mi.PeerKey != "" {
- return
- }
- }
- }
- var n int
- var err error
- wire.WriteJSON(ConsensusLogMessage{time.Now(), clm}, wal.fp, &n, &err)
- wire.WriteTo([]byte("\n"), wal.fp, &n, &err) // one message per line
- if err != nil {
- PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, clm))
- }
- }
- }
-
- // Must not be called concurrently with a write.
- func (wal *WAL) Close() {
- if wal != nil {
- wal.fp.Close()
- }
- wal.done <- struct{}{}
- }
-
- func (wal *WAL) Wait() {
- <-wal.done
- }
-
- func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {
- var current int64
- // start at the end
- current, err = wal.fp.Seek(0, 2)
- if err != nil {
- return
- }
-
- // backup until we find the the right line
- // current is how far we are from the beginning
- for {
- current -= 1
- if current < 0 {
- wal.fp.Seek(0, 0) // back to beginning
- return
- }
- // backup one and read a new byte
- if _, err = wal.fp.Seek(current, 0); err != nil {
- return
- }
- b := make([]byte, 1)
- if _, err = wal.fp.Read(b); err != nil {
- return
- }
- if b[0] == '\n' || len(b) == 0 {
- nLines += 1
- // read a full line
- reader := bufio.NewReader(wal.fp)
- lineBytes, _ := reader.ReadBytes('\n')
- if len(lineBytes) == 0 {
- continue
- }
-
- if found(lineBytes) {
- wal.fp.Seek(0, 1) // (?)
- wal.fp.Seek(current, 0)
- return
- }
- }
- }
- }
|