Browse Source

consensus: attempt to repair the WAL file on data corruption (#4682)

Closes: #4578

Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
pull/4952/head
Alessio Treglia 5 years ago
committed by GitHub
parent
commit
c8483531d8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 169 additions and 42 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +105
    -42
      consensus/state.go
  3. +25
    -0
      libs/os/os.go
  4. +37
    -0
      libs/os/os_test.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -58,6 +58,8 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [evidence] [\#4839](https://github.com/tendermint/tendermint/pull/4839) Reject duplicate evidence from being proposed (@cmwaters) - [evidence] [\#4839](https://github.com/tendermint/tendermint/pull/4839) Reject duplicate evidence from being proposed (@cmwaters)
- [evidence] [\#4892](https://github.com/tendermint/tendermint/pull/4892) Remove redundant header from phantom validator evidence (@cmwaters) - [evidence] [\#4892](https://github.com/tendermint/tendermint/pull/4892) Remove redundant header from phantom validator evidence (@cmwaters)
- [types] [\#4905](https://github.com/tendermint/tendermint/pull/4905) Add ValidateBasic to validator and validator set (@cmwaters) - [types] [\#4905](https://github.com/tendermint/tendermint/pull/4905) Add ValidateBasic to validator and validator set (@cmwaters)
- [consensus] [\#4578](https://github.com/tendermint/tendermint/issues/4578) Attempt to repair the consensus WAL file (`data/cs.wal/wal`) automatically in case of corruption (@alessio)
The original WAL file will be backed up to `data/cs.wal/wal.CORRUPTED`.
- [lite2] [\#4935](https://github.com/tendermint/tendermint/pull/4935) Fetch and compare a new header with witnesses in parallel (@melekes) - [lite2] [\#4935](https://github.com/tendermint/tendermint/pull/4935) Fetch and compare a new header with witnesses in parallel (@melekes)
- [lite2] [\#4929](https://github.com/tendermint/tendermint/pull/4929) compare header w/ witnesses only when doing bisection (@melekes) - [lite2] [\#4929](https://github.com/tendermint/tendermint/pull/4929) compare header w/ witnesses only when doing bisection (@melekes)


+ 105
- 42
consensus/state.go View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"os"
"reflect" "reflect"
"runtime/debug" "runtime/debug"
"sync" "sync"
@ -276,23 +277,63 @@ func (cs *State) LoadCommit(height int64) *types.Commit {
return cs.blockStore.LoadBlockCommit(height) return cs.blockStore.LoadBlockCommit(height)
} }
// OnStart implements service.Service.
// It loads the latest state via the WAL, and starts the timeout and receive routines.
// OnStart loads the latest state via the WAL, and starts the timeout and
// receive routines.
func (cs *State) OnStart() error { func (cs *State) OnStart() error {
if err := cs.evsw.Start(); err != nil {
return err
}
// we may set the WAL in testing before calling Start,
// so only OpenWAL if its still the nilWAL
// We may set the WAL in testing before calling Start, so only OpenWAL if its
// still the nilWAL.
if _, ok := cs.wal.(nilWAL); ok { if _, ok := cs.wal.(nilWAL); ok {
walFile := cs.config.WalFile()
wal, err := cs.OpenWAL(walFile)
if err != nil {
cs.Logger.Error("Error loading State wal", "err", err.Error())
if err := cs.loadWalFile(); err != nil {
return err return err
} }
cs.wal = wal
}
// We may have lost some votes if the process crashed reload from consensus
// log to catchup.
if cs.doWALCatchup {
repairAttempted := false
LOOP:
for {
err := cs.catchupReplay(cs.Height)
switch {
case err == nil:
break LOOP
case !IsDataCorruptionError(err):
cs.Logger.Error("Error on catchup replay. Proceeding to start State anyway", "err", err)
break LOOP
case repairAttempted:
return err
}
cs.Logger.Info("WAL file is corrupted. Attempting repair", "err", err)
// 1) prep work
cs.wal.Stop()
repairAttempted = true
// 2) backup original WAL file
corruptedFile := fmt.Sprintf("%s.CORRUPTED", cs.config.WalFile())
if err := tmos.CopyFile(cs.config.WalFile(), corruptedFile); err != nil {
return err
}
cs.Logger.Info("Backed up WAL file", "src", cs.config.WalFile(), "dst", corruptedFile)
// 3) try to repair (WAL file will be overwritten!)
if err := repairWalFile(corruptedFile, cs.config.WalFile()); err != nil {
cs.Logger.Error("Repair failed", "err", err)
return err
}
cs.Logger.Info("Successful repair")
// reload WAL file
if err := cs.loadWalFile(); err != nil {
return err
}
}
}
if err := cs.evsw.Start(); err != nil {
return err
} }
// we need the timeoutRoutine for replay so // we need the timeoutRoutine for replay so
@ -304,33 +345,6 @@ func (cs *State) OnStart() error {
return err return err
} }
// we may have lost some votes if the process crashed
// reload from consensus log to catchup
if cs.doWALCatchup {
if err := cs.catchupReplay(cs.Height); err != nil {
// don't try to recover from data corruption error
if IsDataCorruptionError(err) {
cs.Logger.Error("Encountered corrupt WAL file", "err", err.Error())
cs.Logger.Error("Please repair the WAL file before restarting")
fmt.Println(`You can attempt to repair the WAL as follows:
----
WALFILE=~/.tendermint/data/cs.wal/wal
cp $WALFILE ${WALFILE}.bak # backup the file
go run scripts/wal2json/main.go $WALFILE > wal.json # this will panic, but can be ignored
rm $WALFILE # remove the corrupt file
go run scripts/json2wal/main.go wal.json $WALFILE # rebuild the file without corruption
----`)
return err
}
cs.Logger.Error("Error on catchup replay. Proceeding to start State anyway", "err", err.Error())
// NOTE: if we ever do return an error here,
// make sure to stop the timeoutTicker
}
}
// now start the receiveRoutine // now start the receiveRoutine
go cs.receiveRoutine(0) go cs.receiveRoutine(0)
@ -352,6 +366,17 @@ func (cs *State) startRoutines(maxSteps int) {
go cs.receiveRoutine(maxSteps) go cs.receiveRoutine(maxSteps)
} }
// loadWalFile loads WAL data from file. It overwrites cs.wal.
func (cs *State) loadWalFile() error {
wal, err := cs.OpenWAL(cs.config.WalFile())
if err != nil {
cs.Logger.Error("Error loading State wal", "err", err)
return err
}
cs.wal = wal
return nil
}
// OnStop implements service.Service. // OnStop implements service.Service.
func (cs *State) OnStop() { func (cs *State) OnStop() {
cs.evsw.Stop() cs.evsw.Stop()
@ -366,15 +391,17 @@ func (cs *State) Wait() {
<-cs.done <-cs.done
} }
// OpenWAL opens a file to log all consensus messages and timeouts for deterministic accountability
// OpenWAL opens a file to log all consensus messages and timeouts for
// deterministic accountability.
func (cs *State) OpenWAL(walFile string) (WAL, error) { func (cs *State) OpenWAL(walFile string) (WAL, error) {
wal, err := NewWAL(walFile) wal, err := NewWAL(walFile)
if err != nil { if err != nil {
cs.Logger.Error("Failed to open WAL for consensus state", "wal", walFile, "err", err)
cs.Logger.Error("Failed to open WAL", "file", walFile, "err", err)
return nil, err return nil, err
} }
wal.SetLogger(cs.Logger.With("wal", walFile)) wal.SetLogger(cs.Logger.With("wal", walFile))
if err := wal.Start(); err != nil { if err := wal.Start(); err != nil {
cs.Logger.Error("Failed to start WAL", "err", err)
return nil, err return nil, err
} }
return wal, nil return wal, nil
@ -2034,3 +2061,39 @@ func CompareHRS(h1 int64, r1 int, s1 cstypes.RoundStepType, h2 int64, r2 int, s2
} }
return 0 return 0
} }
// repairWalFile decodes messages from src (until the decoder errors) and
// writes them to dst.
func repairWalFile(src, dst string) error {
in, err := os.Open(src)
if err != nil {
return err
}
defer in.Close()
out, err := os.Open(dst)
if err != nil {
return err
}
defer out.Close()
var (
dec = NewWALDecoder(in)
enc = NewWALEncoder(out)
)
// best-case repair (until first error is encountered)
for {
msg, err := dec.Decode()
if err != nil {
break
}
err = enc.Encode(msg)
if err != nil {
return fmt.Errorf("failed to encode msg: %w", err)
}
}
return nil
}

+ 25
- 0
libs/os/os.go View File

@ -2,6 +2,7 @@ package os
import ( import (
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"os/signal" "os/signal"
@ -80,3 +81,27 @@ func MustWriteFile(filePath string, contents []byte, mode os.FileMode) {
Exit(fmt.Sprintf("MustWriteFile failed: %v", err)) Exit(fmt.Sprintf("MustWriteFile failed: %v", err))
} }
} }
// CopyFile copies a file. It truncates the destination file if it exists.
func CopyFile(src, dst string) error {
info, err := os.Stat(src)
if err != nil {
return err
}
srcfile, err := os.Open(src)
if err != nil {
return err
}
defer srcfile.Close()
// create new file, truncate if exists and apply same permissions as the original one
dstfile, err := os.OpenFile(dst, os.O_RDWR|os.O_CREATE|os.O_TRUNC, info.Mode().Perm())
if err != nil {
return err
}
defer dstfile.Close()
_, err = io.Copy(dstfile, srcfile)
return err
}

+ 37
- 0
libs/os/os_test.go View File

@ -0,0 +1,37 @@
package os
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"testing"
)
func TestCopyFile(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "example")
if err != nil {
t.Fatal(err)
}
defer os.Remove(tmpfile.Name())
content := []byte("hello world")
if _, err := tmpfile.Write(content); err != nil {
t.Fatal(err)
}
copyfile := fmt.Sprintf("%s.copy", tmpfile.Name())
if err := CopyFile(tmpfile.Name(), copyfile); err != nil {
t.Fatal(err)
}
if _, err := os.Stat(copyfile); os.IsNotExist(err) {
t.Fatal("copy should exist")
}
data, err := ioutil.ReadFile(copyfile)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, content) {
t.Fatalf("copy file content differs: expected %v, got %v", content, data)
}
os.Remove(copyfile)
}

Loading…
Cancel
Save