Browse Source

Merge pull request #672 from tendermint/573-wal-issues

Add checksum and length to CS WAL record
pull/568/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
6af28ead87
16 changed files with 613 additions and 204 deletions
  1. +3
    -0
      .gitignore
  2. +18
    -23
      consensus/replay.go
  3. +25
    -9
      consensus/replay_file.go
  4. +71
    -66
      consensus/replay_test.go
  5. +1
    -1
      consensus/state.go
  6. +110
    -74
      consensus/test_data/build.sh
  7. BIN
      consensus/test_data/empty_block.cswal
  8. BIN
      consensus/test_data/many_blocks.cswal
  9. BIN
      consensus/test_data/small_block1.cswal
  10. BIN
      consensus/test_data/small_block2.cswal
  11. +188
    -13
      consensus/wal.go
  12. +62
    -0
      consensus/wal_test.go
  13. +15
    -18
      glide.lock
  14. +5
    -0
      glide.yaml
  15. +65
    -0
      scripts/cutWALUntil/main.go
  16. +50
    -0
      scripts/wal2json/main.go

+ 3
- 0
.gitignore View File

@ -17,3 +17,6 @@ test/logs
coverage.txt
docs/_build
docs/tools
scripts/wal2json/wal2json
scripts/cutWALUntil/cutWALUntil

+ 18
- 23
consensus/replay.go View File

@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"hash/crc32"
"io"
"reflect"
"strconv"
@ -11,7 +12,6 @@ import (
"time"
abci "github.com/tendermint/abci/types"
wire "github.com/tendermint/go-wire"
auto "github.com/tendermint/tmlibs/autofile"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
@ -22,6 +22,8 @@ import (
"github.com/tendermint/tendermint/version"
)
var crc32c = crc32.MakeTable(crc32.Castagnoli)
// Functionality to replay blocks and messages on recovery from a crash.
// There are two general failure scenarios: failure during consensus, and failure while applying the block.
// The former is handled by the WAL, the latter by the proxyApp Handshake on restart,
@ -35,18 +37,11 @@ import (
// as if it were received in receiveRoutine
// Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running
func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error {
// Skip over empty and meta lines
if len(msgBytes) == 0 || msgBytes[0] == '#' {
func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan interface{}) error {
// skip meta messages
if _, ok := msg.Msg.(EndHeightMessage); ok {
return nil
}
var err error
var msg TimedWALMessage
wire.ReadJSON(&msg, msgBytes, &err)
if err != nil {
fmt.Println("MsgBytes:", msgBytes, string(msgBytes))
return fmt.Errorf("Error reading json data: %v", err)
}
// for logging
switch m := msg.Msg.(type) {
@ -104,7 +99,7 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error {
// Ensure that ENDHEIGHT for this height doesn't exist
// NOTE: This is just a sanity check. As far as we know things work fine without it,
// and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT).
gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight))
gr, found, err := cs.wal.SearchForEndHeight(uint64(csHeight))
if gr != nil {
gr.Close()
}
@ -113,33 +108,33 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error {
}
// Search for last height marker
gr, found, err = cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight-1))
gr, found, err = cs.wal.SearchForEndHeight(uint64(csHeight - 1))
if err == io.EOF {
cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1)
} else if err != nil {
return err
} else {
defer gr.Close()
}
if !found {
return errors.New(cmn.Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1))
}
defer gr.Close()
cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight)
var msg *TimedWALMessage
dec := WALDecoder{gr}
for {
line, err := gr.ReadLine()
if err != nil {
if err == io.EOF {
break
} else {
return err
}
msg, err = dec.Decode()
if err == io.EOF {
break
} else if err != nil {
return err
}
// NOTE: since the priv key is set when the msgs are received
// it will attempt to eg double sign but we can just ignore it
// since the votes will be replayed and we'll get to the next step
if err := cs.readReplayMessage([]byte(line), nil); err != nil {
if err := cs.readReplayMessage(msg, nil); err != nil {
return err
}
}


+ 25
- 9
consensus/replay_file.go View File

@ -4,6 +4,7 @@ import (
"bufio"
"errors"
"fmt"
"io"
"os"
"strconv"
"strings"
@ -53,12 +54,20 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error {
defer pb.fp.Close()
var nextN int // apply N msgs in a row
for pb.scanner.Scan() {
var msg *TimedWALMessage
for {
if nextN == 0 && console {
nextN = pb.replayConsoleLoop()
}
if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil {
msg, err = pb.dec.Decode()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
if err := pb.cs.readReplayMessage(msg, newStepCh); err != nil {
return err
}
@ -76,9 +85,9 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error {
type playback struct {
cs *ConsensusState
fp *os.File
scanner *bufio.Scanner
count int // how many lines/msgs into the file are we
fp *os.File
dec *WALDecoder
count int // how many lines/msgs into the file are we
// replays can be reset to beginning
fileName string // so we can close/reopen the file
@ -91,7 +100,7 @@ func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.
fp: fp,
fileName: fileName,
genesisState: genState,
scanner: bufio.NewScanner(fp),
dec: NewWALDecoder(fp),
}
}
@ -111,13 +120,20 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
return err
}
pb.fp = fp
pb.scanner = bufio.NewScanner(fp)
pb.dec = NewWALDecoder(fp)
count = pb.count - count
fmt.Printf("Reseting from %d to %d\n", pb.count, count)
pb.count = 0
pb.cs = newCS
for i := 0; pb.scanner.Scan() && i < count; i++ {
if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil {
var msg *TimedWALMessage
for i := 0; i < count; i++ {
msg, err = pb.dec.Decode()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
if err := pb.cs.readReplayMessage(msg, newStepCh); err != nil {
return err
}
pb.count += 1


+ 71
- 66
consensus/replay_test.go View File

@ -8,7 +8,6 @@ import (
"io/ioutil"
"os"
"path"
"strings"
"testing"
"time"
@ -58,14 +57,14 @@ var baseStepChanges = []int{3, 6, 8}
// test recovery from each line in each testCase
var testCases = []*testCase{
newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part)
newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part
newTestCase("small_block2", []int{3, 9, 11}), // small block with txs across 6 smaller block parts
newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part)
newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part
newTestCase("small_block2", []int{3, 12, 14}), // small block with txs across 6 smaller block parts
}
type testCase struct {
name string
log string //full cs wal
log []byte //full cs wal
stepMap map[int]int8 // map lines of log to privval step
proposeLine int
@ -100,29 +99,27 @@ func newMapFromChanges(changes []int) map[int]int8 {
return m
}
func readWAL(p string) string {
func readWAL(p string) []byte {
b, err := ioutil.ReadFile(p)
if err != nil {
panic(err)
}
return string(b)
return b
}
func writeWAL(walMsgs string) string {
tempDir := os.TempDir()
walDir := path.Join(tempDir, "/wal"+cmn.RandStr(12))
walFile := path.Join(walDir, "wal")
// Create WAL directory
err := cmn.EnsureDir(walDir, 0700)
func writeWAL(walMsgs []byte) string {
walFile, err := ioutil.TempFile("", "wal")
if err != nil {
panic(err)
panic(fmt.Errorf("failed to create temp WAL file: %v", err))
}
// Write the needed WAL to file
err = cmn.WriteFile(walFile, []byte(walMsgs), 0600)
_, err = walFile.Write(walMsgs)
if err != nil {
panic(err)
panic(fmt.Errorf("failed to write to temp WAL file: %v", err))
}
return walFile
if err := walFile.Close(); err != nil {
panic(fmt.Errorf("failed to close temp WAL file: %v", err))
}
return walFile.Name()
}
func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) {
@ -167,7 +164,7 @@ func toPV(pv types.PrivValidator) *types.PrivValidatorFS {
return pv.(*types.PrivValidatorFS)
}
func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) {
func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, []byte, string) {
t.Log("-------------------------------------")
t.Logf("Starting replay test %v (of %d lines of WAL). Crash after = %v", thisCase.name, nLines, crashAfter)
@ -176,11 +173,13 @@ func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bo
lineStep -= 1
}
split := strings.Split(thisCase.log, "\n")
split := bytes.Split(thisCase.log, walSeparator)
lastMsg := split[nLines]
// we write those lines up to (not including) one with the signature
walFile := writeWAL(strings.Join(split[:nLines], "\n") + "\n")
b := bytes.Join(split[:nLines], walSeparator)
b = append(b, walSeparator...)
walFile := writeWAL(b)
cs := fixedConsensusStateDummy()
@ -195,14 +194,19 @@ func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bo
return cs, newBlockCh, lastMsg, walFile
}
func readTimedWALMessage(t *testing.T, walMsg string) TimedWALMessage {
var err error
var msg TimedWALMessage
wire.ReadJSON(&msg, []byte(walMsg), &err)
func readTimedWALMessage(t *testing.T, rawMsg []byte) TimedWALMessage {
b := bytes.NewBuffer(rawMsg)
// because rawMsg does not contain a separator and WALDecoder#Decode expects it
_, err := b.Write(walSeparator)
if err != nil {
t.Fatal(err)
}
dec := NewWALDecoder(b)
msg, err := dec.Decode()
if err != nil {
t.Fatalf("Error reading json data: %v", err)
}
return msg
return *msg
}
//-----------------------------------------------
@ -211,10 +215,15 @@ func readTimedWALMessage(t *testing.T, walMsg string) TimedWALMessage {
func TestWALCrashAfterWrite(t *testing.T) {
for _, thisCase := range testCases {
split := strings.Split(thisCase.log, "\n")
for i := 0; i < len(split)-1; i++ {
cs, newBlockCh, _, walFile := setupReplayTest(t, thisCase, i+1, true)
runReplayTest(t, cs, walFile, newBlockCh, thisCase, i+1)
splitSize := bytes.Count(thisCase.log, walSeparator)
for i := 0; i < splitSize-1; i++ {
t.Run(fmt.Sprintf("%s:%d", thisCase.name, i), func(t *testing.T) {
cs, newBlockCh, _, walFile := setupReplayTest(t, thisCase, i+1, true)
cs.config.TimeoutPropose = 100
runReplayTest(t, cs, walFile, newBlockCh, thisCase, i+1)
// cleanup
os.Remove(walFile)
})
}
}
}
@ -226,14 +235,19 @@ func TestWALCrashAfterWrite(t *testing.T) {
func TestWALCrashBeforeWritePropose(t *testing.T) {
for _, thisCase := range testCases {
lineNum := thisCase.proposeLine
// setup replay test where last message is a proposal
cs, newBlockCh, proposalMsg, walFile := setupReplayTest(t, thisCase, lineNum, false)
msg := readTimedWALMessage(t, proposalMsg)
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
// Set LastSig
toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal)
toPV(cs.privValidator).LastSignature = proposal.Proposal.Signature
runReplayTest(t, cs, walFile, newBlockCh, thisCase, lineNum)
t.Run(fmt.Sprintf("%s:%d", thisCase.name, lineNum), func(t *testing.T) {
// setup replay test where last message is a proposal
cs, newBlockCh, proposalMsg, walFile := setupReplayTest(t, thisCase, lineNum, false)
cs.config.TimeoutPropose = 100
msg := readTimedWALMessage(t, proposalMsg)
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
// Set LastSig
toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal)
toPV(cs.privValidator).LastSignature = proposal.Proposal.Signature
runReplayTest(t, cs, walFile, newBlockCh, thisCase, lineNum)
// cleanup
os.Remove(walFile)
})
}
}
@ -315,7 +329,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
if err != nil {
t.Fatal(err)
}
walFile := writeWAL(string(walBody))
walFile := writeWAL(walBody)
config.Consensus.SetWalFile(walFile)
privVal := types.LoadPrivValidatorFS(config.PrivValidatorFile())
@ -465,7 +479,7 @@ func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.B
func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) {
// Search for height marker
gr, found, err := wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(0))
gr, found, err := wal.SearchForEndHeight(0)
if err != nil {
return nil, nil, err
}
@ -479,20 +493,17 @@ func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) {
var blockParts *types.PartSet
var blocks []*types.Block
var commits []*types.Commit
for {
line, err := gr.ReadLine()
if err != nil {
if err == io.EOF {
break
} else {
return nil, nil, err
}
}
piece, err := readPieceFromWAL([]byte(line))
if err != nil {
dec := NewWALDecoder(gr)
for {
msg, err := dec.Decode()
if err == io.EOF {
break
} else if err != nil {
return nil, nil, err
}
piece := readPieceFromWAL(msg)
if piece == nil {
continue
}
@ -528,17 +539,10 @@ func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) {
return blocks, commits, nil
}
func readPieceFromWAL(msgBytes []byte) (interface{}, error) {
// Skip over empty and meta lines
if len(msgBytes) == 0 || msgBytes[0] == '#' {
return nil, nil
}
var err error
var msg TimedWALMessage
wire.ReadJSON(&msg, msgBytes, &err)
if err != nil {
fmt.Println("MsgBytes:", msgBytes, string(msgBytes))
return nil, fmt.Errorf("Error reading json data: %v", err)
func readPieceFromWAL(msg *TimedWALMessage) interface{} {
// skip meta messages
if _, ok := msg.Msg.(EndHeightMessage); ok {
return nil
}
// for logging
@ -546,14 +550,15 @@ func readPieceFromWAL(msgBytes []byte) (interface{}, error) {
case msgInfo:
switch msg := m.Msg.(type) {
case *ProposalMessage:
return &msg.Proposal.BlockPartsHeader, nil
return &msg.Proposal.BlockPartsHeader
case *BlockPartMessage:
return msg.Part, nil
return msg.Part
case *VoteMessage:
return msg.Vote, nil
return msg.Vote
}
}
return nil, nil
return nil
}
// fresh state and mock store


+ 1
- 1
consensus/state.go View File

@ -1188,7 +1188,7 @@ func (cs *ConsensusState) finalizeCommit(height int) {
// As is, ConsensusState should not be started again
// until we successfully call ApplyBlock (ie. here or in Handshake after restart)
if cs.wal != nil {
cs.wal.writeEndHeight(height)
cs.wal.Save(EndHeightMessage{uint64(height)})
}
fail.Fail() // XXX


+ 110
- 74
consensus/test_data/build.sh View File

@ -1,112 +1,148 @@
#!/usr/bin/env bash
# XXX: removes tendermint dir
# Requires: killall command and jq JSON processor.
cd "$GOPATH/src/github.com/tendermint/tendermint" || exit 1
# Get the parent directory of where this script is.
SOURCE="${BASH_SOURCE[0]}"
while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done
DIR="$( cd -P "$( dirname "$SOURCE" )/../.." && pwd )"
# Change into that dir because we expect that.
cd "$DIR" || exit 1
# Make sure we have a tendermint command.
if ! hash tendermint 2>/dev/null; then
make install
make install
fi
# Make sure we have a cutWALUntil binary.
cutWALUntil=./scripts/cutWALUntil/cutWALUntil
cutWALUntilDir=$(dirname $cutWALUntil)
if ! hash $cutWALUntil 2>/dev/null; then
cd "$cutWALUntilDir" && go build && cd - || exit 1
fi
TMHOME=$(mktemp -d)
export TMHOME="$TMHOME"
if [[ ! -d "$TMHOME" ]]; then
echo "Could not create temp directory"
exit 1
else
echo "TMHOME: ${TMHOME}"
fi
# specify a dir to copy
# TODO: eventually we should replace with `tendermint init --test`
DIR_TO_COPY=$HOME/.tendermint_test/consensus_state_test
if [ ! -d "$DIR_TO_COPY" ]; then
echo "$DIR_TO_COPY does not exist. Please run: go test ./consensus"
exit 1
fi
echo "==> Copying ${DIR_TO_COPY} to ${TMHOME} directory..."
cp -r "$DIR_TO_COPY"/* "$TMHOME"
TMHOME="$HOME/.tendermint"
#rm -rf "$TMHOME"
#cp -r "$DIR_TO_COPY" "$TMHOME"
#mv $TMHOME/config.toml $TMHOME/config.toml.bak
cp $TMHOME/genesis.json $TMHOME/genesis.json.bak
# preserve original genesis file because later it will be modified (see small_block2)
cp "$TMHOME/genesis.json" "$TMHOME/genesis.json.bak"
function reset(){
tendermint unsafe_reset_all
cp $TMHOME/genesis.json.bak $TMHOME/genesis.json
echo "==> Resetting tendermint..."
tendermint unsafe_reset_all
cp "$TMHOME/genesis.json.bak" "$TMHOME/genesis.json"
}
reset
# empty block
function empty_block(){
tendermint node --proxy_app=persistent_dummy &> /dev/null &
sleep 5
killall tendermint
echo "==> Starting tendermint..."
tendermint node --proxy_app=persistent_dummy &> /dev/null &
sleep 5
echo "==> Killing tendermint..."
killall tendermint
# /q would print up to and including the match, then quit.
# /Q doesn't include the match.
# http://unix.stackexchange.com/questions/11305/grep-show-all-the-file-up-to-the-match
# note on macbook we need `gnu-sed` for Q to work
sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/empty_block.cswal
echo "==> Copying WAL log..."
$cutWALUntil "$TMHOME/data/cs.wal/wal" 1 consensus/test_data/new_empty_block.cswal
mv consensus/test_data/new_empty_block.cswal consensus/test_data/empty_block.cswal
reset
reset
}
# many blocks
function many_blocks(){
bash scripts/txs/random.sh 1000 36657 &> /dev/null &
PID=$!
tendermint node --proxy_app=persistent_dummy &> /dev/null &
sleep 10
killall tendermint
kill -9 $PID
sed '/ENDHEIGHT: 6/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/many_blocks.cswal
reset
bash scripts/txs/random.sh 1000 36657 &> /dev/null &
PID=$!
echo "==> Starting tendermint..."
tendermint node --proxy_app=persistent_dummy &> /dev/null &
sleep 10
echo "==> Killing tendermint..."
kill -9 $PID
killall tendermint
echo "==> Copying WAL log..."
$cutWALUntil "$TMHOME/data/cs.wal/wal" 6 consensus/test_data/new_many_blocks.cswal
mv consensus/test_data/new_many_blocks.cswal consensus/test_data/many_blocks.cswal
reset
}
# small block 1
function small_block1(){
bash scripts/txs/random.sh 1000 36657 &> /dev/null &
PID=$!
tendermint node --proxy_app=persistent_dummy &> /dev/null &
sleep 10
killall tendermint
kill -9 $PID
sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block1.cswal
reset
bash scripts/txs/random.sh 1000 36657 &> /dev/null &
PID=$!
echo "==> Starting tendermint..."
tendermint node --proxy_app=persistent_dummy &> /dev/null &
sleep 10
echo "==> Killing tendermint..."
kill -9 $PID
killall tendermint
echo "==> Copying WAL log..."
$cutWALUntil "$TMHOME/data/cs.wal/wal" 1 consensus/test_data/new_small_block1.cswal
mv consensus/test_data/new_small_block1.cswal consensus/test_data/small_block1.cswal
reset
}
# small block 2 (part size = 512)
# block part size = 512
function small_block2(){
cat ~/.tendermint/genesis.json | jq '. + {"consensus_params": {"block_size_params": {"max_bytes":1000000}, "block_gossip_params": {"block_part_size_bytes":512}}}' > genesis.json.new
mv genesis.json.new ~/.tendermint/genesis.json
bash scripts/txs/random.sh 1000 36657 &> /dev/null &
PID=$!
tendermint node --proxy_app=persistent_dummy &> /dev/null &
sleep 10
killall tendermint
kill -9 $PID
sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block2.cswal
reset
cat "$TMHOME/genesis.json" | jq '. + {consensus_params: {block_size_params: {max_bytes: 22020096}, block_gossip_params: {block_part_size_bytes: 512}}}' > "$TMHOME/new_genesis.json"
mv "$TMHOME/new_genesis.json" "$TMHOME/genesis.json"
bash scripts/txs/random.sh 1000 36657 &> /dev/null &
PID=$!
echo "==> Starting tendermint..."
tendermint node --proxy_app=persistent_dummy &> /dev/null &
sleep 5
echo "==> Killing tendermint..."
kill -9 $PID
killall tendermint
echo "==> Copying WAL log..."
$cutWALUntil "$TMHOME/data/cs.wal/wal" 1 consensus/test_data/new_small_block2.cswal
mv consensus/test_data/new_small_block2.cswal consensus/test_data/small_block2.cswal
reset
}
case "$1" in
"small_block1")
small_block1
;;
"small_block2")
small_block2
;;
"empty_block")
empty_block
;;
"many_blocks")
many_blocks
;;
*)
small_block1
small_block2
empty_block
many_blocks
"small_block1")
small_block1
;;
"small_block2")
small_block2
;;
"empty_block")
empty_block
;;
"many_blocks")
many_blocks
;;
*)
small_block1
small_block2
empty_block
many_blocks
esac
echo "==> Cleaning up..."
rm -rf "$TMHOME"

BIN
consensus/test_data/empty_block.cswal View File


BIN
consensus/test_data/many_blocks.cswal View File


BIN
consensus/test_data/small_block1.cswal View File


BIN
consensus/test_data/small_block2.cswal View File


+ 188
- 13
consensus/wal.go View File

@ -1,6 +1,11 @@
package consensus
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"time"
wire "github.com/tendermint/go-wire"
@ -12,11 +17,21 @@ import (
//--------------------------------------------------------
// types and functions for savings consensus messages
var (
walSeparator = []byte{55, 127, 6, 130} // 0x377f0682 - magic number
)
type TimedWALMessage struct {
Time time.Time `json:"time"`
Time time.Time `json:"time"` // for debugging purposes
Msg WALMessage `json:"msg"`
}
// EndHeightMessage marks the end of the given height inside WAL.
// @internal used by scripts/cutWALUntil util.
type EndHeightMessage struct {
Height uint64 `json:"height"`
}
type WALMessage interface{}
var _ = wire.RegisterInterface(
@ -24,6 +39,7 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{types.EventDataRoundState{}, 0x01},
wire.ConcreteType{msgInfo{}, 0x02},
wire.ConcreteType{timeoutInfo{}, 0x03},
wire.ConcreteType{EndHeightMessage{}, 0x04},
)
//--------------------------------------------------------
@ -38,6 +54,8 @@ type WAL struct {
group *auto.Group
light bool // ignore block parts
enc *WALEncoder
}
func NewWAL(walFile string, light bool) (*WAL, error) {
@ -48,6 +66,7 @@ func NewWAL(walFile string, light bool) (*WAL, error) {
wal := &WAL{
group: group,
light: light,
enc: NewWALEncoder(group),
}
wal.BaseService = *cmn.NewBaseService(nil, "WAL", wal)
return wal, nil
@ -58,7 +77,7 @@ func (wal *WAL) OnStart() error {
if err != nil {
return err
} else if size == 0 {
wal.writeEndHeight(0)
wal.Save(EndHeightMessage{0})
}
_, err = wal.group.Start()
return err
@ -70,35 +89,191 @@ func (wal *WAL) OnStop() {
}
// called in newStep and for each pass in receiveRoutine
func (wal *WAL) Save(wmsg WALMessage) {
func (wal *WAL) Save(msg WALMessage) {
if wal == nil {
return
}
if wal.light {
// in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts)
if mi, ok := wmsg.(msgInfo); ok {
if mi, ok := msg.(msgInfo); ok {
if mi.PeerKey != "" {
return
}
}
}
// Write the wal message
var wmsgBytes = wire.JSONBytes(TimedWALMessage{time.Now(), wmsg})
err := wal.group.WriteLine(string(wmsgBytes))
if err != nil {
cmn.PanicQ(cmn.Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, wmsg))
if err := wal.enc.Encode(&TimedWALMessage{time.Now(), msg}); err != nil {
cmn.PanicQ(cmn.Fmt("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg))
}
// TODO: only flush when necessary
if err := wal.group.Flush(); err != nil {
cmn.PanicQ(cmn.Fmt("Error flushing consensus wal buf to file. Error: %v \n", err))
}
}
func (wal *WAL) writeEndHeight(height int) {
wal.group.WriteLine(cmn.Fmt("#ENDHEIGHT: %v", height))
// SearchForEndHeight searches for the EndHeightMessage with the height and
// returns an auto.GroupReader, whenever it was found or not and an error.
// Group reader will be nil if found equals false.
//
// CONTRACT: caller must close group reader.
func (wal *WAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) {
var msg *TimedWALMessage
// TODO: only flush when necessary
if err := wal.group.Flush(); err != nil {
cmn.PanicQ(cmn.Fmt("Error flushing consensus wal buf to file. Error: %v \n", err))
// NOTE: starting from the last file in the group because we're usually
// searching for the last height. See replay.go
min, max := wal.group.MinIndex(), wal.group.MaxIndex()
wal.Logger.Debug("Searching for height", "height", height, "min", min, "max", max)
for index := max; index >= min; index-- {
gr, err = wal.group.NewReader(index)
if err != nil {
return nil, false, err
}
dec := NewWALDecoder(gr)
for {
msg, err = dec.Decode()
if err == io.EOF {
// check next file
break
}
if err != nil {
gr.Close()
return nil, false, err
}
if m, ok := msg.Msg.(EndHeightMessage); ok {
if m.Height == height { // found
wal.Logger.Debug("Found", "height", height, "index", index)
return gr, true, nil
}
}
}
gr.Close()
}
return nil, false, nil
}
///////////////////////////////////////////////////////////////////////////////
// A WALEncoder writes custom-encoded WAL messages to an output stream.
//
// Format: 4 bytes CRC sum + 4 bytes length + arbitrary-length value (go-wire encoded)
type WALEncoder struct {
wr io.Writer
}
// NewWALEncoder returns a new encoder that writes to wr.
func NewWALEncoder(wr io.Writer) *WALEncoder {
return &WALEncoder{wr}
}
// Encode writes the custom encoding of v to the stream.
func (enc *WALEncoder) Encode(v interface{}) error {
data := wire.BinaryBytes(v)
crc := crc32.Checksum(data, crc32c)
length := uint32(len(data))
totalLength := 8 + int(length)
msg := make([]byte, totalLength)
binary.BigEndian.PutUint32(msg[0:4], crc)
binary.BigEndian.PutUint32(msg[4:8], length)
copy(msg[8:], data)
_, err := enc.wr.Write(msg)
if err == nil {
// TODO [Anton Kaliaev 23 Oct 2017]: remove separator
_, err = enc.wr.Write(walSeparator)
}
return err
}
///////////////////////////////////////////////////////////////////////////////
// A WALDecoder reads and decodes custom-encoded WAL messages from an input
// stream. See WALEncoder for the format used.
//
// It will also compare the checksums and make sure data size is equal to the
// length from the header. If that is not the case, error will be returned.
type WALDecoder struct {
rd io.Reader
}
// NewWALDecoder returns a new decoder that reads from rd.
func NewWALDecoder(rd io.Reader) *WALDecoder {
return &WALDecoder{rd}
}
// Decode reads the next custom-encoded value from its reader and returns it.
func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
b := make([]byte, 4)
n, err := dec.rd.Read(b)
if err == io.EOF {
return nil, err
}
if err != nil {
return nil, fmt.Errorf("failed to read checksum: %v", err)
}
crc := binary.BigEndian.Uint32(b)
b = make([]byte, 4)
n, err = dec.rd.Read(b)
if err == io.EOF {
return nil, err
}
if err != nil {
return nil, fmt.Errorf("failed to read length: %v", err)
}
length := binary.BigEndian.Uint32(b)
data := make([]byte, length)
n, err = dec.rd.Read(data)
if err == io.EOF {
return nil, err
}
if err != nil {
return nil, fmt.Errorf("not enough bytes for data: %v (want: %d, read: %v)", err, length, n)
}
// check checksum before decoding data
actualCRC := crc32.Checksum(data, crc32c)
if actualCRC != crc {
return nil, fmt.Errorf("checksums do not match: (read: %v, actual: %v)", crc, actualCRC)
}
var nn int
var res *TimedWALMessage
res = wire.ReadBinary(&TimedWALMessage{}, bytes.NewBuffer(data), int(length), &nn, &err).(*TimedWALMessage)
if err != nil {
return nil, fmt.Errorf("failed to decode data: %v", err)
}
// TODO [Anton Kaliaev 23 Oct 2017]: remove separator
if err = readSeparator(dec.rd); err != nil {
return nil, err
}
return res, err
}
// readSeparator reads a separator from r. It returns any error from underlying
// reader or if it's not a separator.
func readSeparator(r io.Reader) error {
b := make([]byte, len(walSeparator))
_, err := r.Read(b)
if err != nil {
return fmt.Errorf("failed to read separator: %v", err)
}
if !bytes.Equal(b, walSeparator) {
return fmt.Errorf("not a separator: %v", b)
}
return nil
}

+ 62
- 0
consensus/wal_test.go View File

@ -0,0 +1,62 @@
package consensus
import (
"bytes"
"path"
"testing"
"time"
"github.com/tendermint/tendermint/consensus/types"
tmtypes "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestWALEncoderDecoder(t *testing.T) {
now := time.Now()
msgs := []TimedWALMessage{
TimedWALMessage{Time: now, Msg: EndHeightMessage{0}},
TimedWALMessage{Time: now, Msg: timeoutInfo{Duration: time.Second, Height: 1, Round: 1, Step: types.RoundStepPropose}},
}
b := new(bytes.Buffer)
for _, msg := range msgs {
b.Reset()
enc := NewWALEncoder(b)
err := enc.Encode(&msg)
require.NoError(t, err)
dec := NewWALDecoder(b)
decoded, err := dec.Decode()
require.NoError(t, err)
assert.Equal(t, msg.Time.Truncate(time.Millisecond), decoded.Time)
assert.Equal(t, msg.Msg, decoded.Msg)
}
}
func TestSearchForEndHeight(t *testing.T) {
wal, err := NewWAL(path.Join(data_dir, "many_blocks.cswal"), false)
if err != nil {
t.Fatal(err)
}
h := 3
gr, found, err := wal.SearchForEndHeight(uint64(h))
assert.NoError(t, err, cmn.Fmt("expected not to err on height %d", h))
assert.True(t, found, cmn.Fmt("expected to find end height for %d", h))
assert.NotNil(t, gr, "expected group not to be nil")
defer gr.Close()
dec := NewWALDecoder(gr)
msg, err := dec.Decode()
assert.NoError(t, err, "expected to decode a message")
rs, ok := msg.Msg.(tmtypes.EventDataRoundState)
assert.True(t, ok, "expected message of type EventDataRoundState")
assert.Equal(t, rs.Height, h+1, cmn.Fmt("wrong height"))
}

+ 15
- 18
glide.lock View File

@ -1,5 +1,5 @@
hash: 9867fa4543ca4daea1a96a3883a7f483819c067ca34ed6d3aa67aace4a289e93
updated: 2017-10-25T07:15:06.075544403Z
hash: 58d209dee0c21d507226d6b56f7b7f49d24f60090ef9a6c1d89bc27ff00f90e4
updated: 2017-10-26T00:04:10.142172009-04:00
imports:
- name: github.com/btcsuite/btcd
version: c7588cbf7690cd9f047a28efa2dcd8f2435a4e5e
@ -10,7 +10,7 @@ imports:
- name: github.com/fsnotify/fsnotify
version: 4da3e2cfbabc9f751898f250b49f2439785783a1
- name: github.com/go-kit/kit
version: e2b298466b32c7cd5579a9b9b07e968fc9d9452c
version: 4dc7be5d2d12881735283bcab7352178e190fc71
subpackages:
- log
- log/level
@ -26,7 +26,7 @@ imports:
- name: github.com/go-stack/stack
version: 817915b46b97fd7bb80e8ab6b69f01a53ac3eebf
- name: github.com/gogo/protobuf
version: 117892bf1866fbaa2318c03e50e40564c8845457
version: 342cbe0a04158f6dcb03ca0079991a51a4248c02
subpackages:
- proto
- name: github.com/golang/protobuf
@ -40,7 +40,7 @@ imports:
- name: github.com/golang/snappy
version: 553a641470496b2327abcac10b36396bd98e45c9
- name: github.com/gorilla/websocket
version: 71fa72d4842364bc5f74185f4161e0099ea3624a
version: ea4d1f681babbce9545c9c5f3d5194a789c89f5b
- name: github.com/hashicorp/hcl
version: 23c074d0eceb2b8a5bfdbb271ab780cde70f05a8
subpackages:
@ -63,7 +63,7 @@ imports:
- name: github.com/mitchellh/mapstructure
version: 06020f85339e21b2478f756a78e295255ffa4d6a
- name: github.com/pelletier/go-toml
version: 8c31c2ec65b208cc2ad1608bf25a3ff91adf1944
version: 4e9e0ee19b60b13eb79915933f44d8ed5f268bdd
- name: github.com/pkg/errors
version: 645ef00459ed84a119197bfb8d8205042c6df63d
- name: github.com/rcrowley/go-metrics
@ -81,7 +81,7 @@ imports:
- name: github.com/spf13/pflag
version: 97afa5e7ca8a08a383cb259e06636b5e2cc7897f
- name: github.com/spf13/viper
version: 8ef37cbca71638bf32f3d5e194117d4cb46da163
version: 25b30aa063fc18e48662b86996252eabdcf2f0c7
- name: github.com/syndtr/goleveldb
version: b89cc31ef7977104127d34c1bd31ebd1a9db2199
subpackages:
@ -98,7 +98,7 @@ imports:
- leveldb/table
- leveldb/util
- name: github.com/tendermint/abci
version: bb9bb4aa465a31fd6a272765be381888e6898c74
version: a0e38dc58374f485481ea07b23659d85f670a694
subpackages:
- client
- example/counter
@ -118,11 +118,11 @@ imports:
- data
- data/base58
- name: github.com/tendermint/iavl
version: 721710e7aa59f61dbfbf558943a207ba3fe6b926
version: 595f3dcd5b6cd4a292e90757ae6d367fd7a6e653
subpackages:
- iavl
- name: github.com/tendermint/tmlibs
version: 0a652499ead7cd20a57a6a592f0491a2b493bb85
version: b30e3ba26d4077edeed83c50a4e0c38b0ec9ddb3
subpackages:
- autofile
- cli
@ -136,7 +136,7 @@ imports:
- merkle
- test
- name: golang.org/x/crypto
version: edd5e9b0879d13ee6970a50153d85b8fec9f7686
version: 2509b142fb2b797aa7587dad548f113b2c0f20ce
subpackages:
- curve25519
- nacl/box
@ -147,7 +147,7 @@ imports:
- ripemd160
- salsa20/salsa
- name: golang.org/x/net
version: cd69bc3fc700721b709c3a59e16e24c67b58f6ff
version: 4b14673ba32bee7f5ac0f990a48f033919fd418b
subpackages:
- context
- http2
@ -157,11 +157,11 @@ imports:
- lex/httplex
- trace
- name: golang.org/x/sys
version: 8dbc5d05d6edcc104950cc299a1ce6641235bc86
version: 176de7413414c01569163271c745672ff04a7267
subpackages:
- unix
- name: golang.org/x/text
version: c01e4764d870b77f8abe5096ee19ad20d80e8075
version: 6eab0e8f74e86c598ec3b6fad4888e0c11482d48
subpackages:
- secure/bidirule
- transform
@ -172,10 +172,9 @@ imports:
subpackages:
- googleapis/rpc/status
- name: google.golang.org/grpc
version: a5986a5c88227370a9c0a82e5277167229c034cd
version: f7bf885db0b7479a537ec317c6e48ce53145f3db
subpackages:
- balancer
- balancer/roundrobin
- codes
- connectivity
- credentials
@ -187,8 +186,6 @@ imports:
- naming
- peer
- resolver
- resolver/dns
- resolver/passthrough
- stats
- status
- tap


+ 5
- 0
glide.yaml View File

@ -2,17 +2,21 @@ package: github.com/tendermint/tendermint
import:
- package: github.com/ebuchman/fail-test
- package: github.com/gogo/protobuf
version: v0.5
subpackages:
- proto
- package: github.com/golang/protobuf
subpackages:
- proto
- package: github.com/gorilla/websocket
version: v1.2.0
- package: github.com/pkg/errors
version: ~0.8.0
- package: github.com/rcrowley/go-metrics
- package: github.com/spf13/cobra
version: v0.0.1
- package: github.com/spf13/viper
version: v1.0.0
- package: github.com/tendermint/abci
version: develop
subpackages:
@ -50,6 +54,7 @@ import:
subpackages:
- context
- package: google.golang.org/grpc
version: v1.7.0
testImport:
- package: github.com/go-kit/kit
subpackages:


+ 65
- 0
scripts/cutWALUntil/main.go View File

@ -0,0 +1,65 @@
/*
cutWALUntil is a small utility for cutting a WAL until the given height
(inclusively). Note it does not include last cs.EndHeightMessage.
Usage:
cutWALUntil <path-to-wal> height-to-stop <output-wal>
*/
package main
import (
"fmt"
"io"
"os"
"strconv"
cs "github.com/tendermint/tendermint/consensus"
)
func main() {
if len(os.Args) < 4 {
fmt.Println("3 arguments required: <path-to-wal> <height-to-stop> <output-wal>")
os.Exit(1)
}
var heightToStop uint64
var err error
if heightToStop, err = strconv.ParseUint(os.Args[2], 10, 64); err != nil {
panic(fmt.Errorf("failed to parse height: %v", err))
}
in, err := os.Open(os.Args[1])
if err != nil {
panic(fmt.Errorf("failed to open input WAL file: %v", err))
}
defer in.Close()
out, err := os.Create(os.Args[3])
if err != nil {
panic(fmt.Errorf("failed to open output WAL file: %v", err))
}
defer out.Close()
enc := cs.NewWALEncoder(out)
dec := cs.NewWALDecoder(in)
for {
msg, err := dec.Decode()
if err == io.EOF {
break
} else if err != nil {
panic(fmt.Errorf("failed to decode msg: %v", err))
}
if m, ok := msg.Msg.(cs.EndHeightMessage); ok {
if m.Height == heightToStop {
break
}
}
err = enc.Encode(msg)
if err != nil {
panic(fmt.Errorf("failed to encode msg: %v", err))
}
}
}

+ 50
- 0
scripts/wal2json/main.go View File

@ -0,0 +1,50 @@
/*
wal2json converts binary WAL file to JSON.
Usage:
wal2json <path-to-wal>
*/
package main
import (
"encoding/json"
"fmt"
"io"
"os"
cs "github.com/tendermint/tendermint/consensus"
)
func main() {
if len(os.Args) < 2 {
fmt.Println("missing one argument: <path-to-wal>")
os.Exit(1)
}
f, err := os.Open(os.Args[1])
if err != nil {
panic(fmt.Errorf("failed to open WAL file: %v", err))
}
defer f.Close()
dec := cs.NewWALDecoder(f)
for {
msg, err := dec.Decode()
if err == io.EOF {
break
} else if err != nil {
panic(fmt.Errorf("failed to decode msg: %v", err))
}
json, err := json.Marshal(msg)
if err != nil {
panic(fmt.Errorf("failed to marshal msg: %v", err))
}
os.Stdout.Write(json)
os.Stdout.Write([]byte("\n"))
if end, ok := msg.Msg.(cs.EndHeightMessage); ok {
os.Stdout.Write([]byte(fmt.Sprintf("ENDHEIGHT %d\n", end.Height)))
}
}
}

Loading…
Cancel
Save