Browse Source

fixes from review

pull/672/head
Ethan Buchman 7 years ago
parent
commit
57a684d5ac
6 changed files with 34 additions and 9 deletions
  1. +1
    -2
      consensus/replay.go
  2. +2
    -0
      consensus/replay_test.go
  3. +2
    -6
      consensus/wal.go
  4. +25
    -0
      consensus/wal_test.go
  5. +1
    -1
      scripts/cutWALUntil/main.go
  6. +3
    -0
      scripts/wal2json/main.go

+ 1
- 2
consensus/replay.go View File

@ -113,12 +113,11 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error {
cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1) cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1)
} else if err != nil { } else if err != nil {
return err return err
} else {
defer gr.Close()
} }
if !found { if !found {
return errors.New(cmn.Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1)) return errors.New(cmn.Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1))
} }
defer gr.Close()
cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight) cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight)


+ 2
- 0
consensus/replay_test.go View File

@ -219,6 +219,7 @@ func TestWALCrashAfterWrite(t *testing.T) {
for i := 0; i < splitSize-1; i++ { for i := 0; i < splitSize-1; i++ {
t.Run(fmt.Sprintf("%s:%d", thisCase.name, i), func(t *testing.T) { t.Run(fmt.Sprintf("%s:%d", thisCase.name, i), func(t *testing.T) {
cs, newBlockCh, _, walFile := setupReplayTest(t, thisCase, i+1, true) cs, newBlockCh, _, walFile := setupReplayTest(t, thisCase, i+1, true)
cs.config.TimeoutPropose = 100
runReplayTest(t, cs, walFile, newBlockCh, thisCase, i+1) runReplayTest(t, cs, walFile, newBlockCh, thisCase, i+1)
// cleanup // cleanup
os.Remove(walFile) os.Remove(walFile)
@ -237,6 +238,7 @@ func TestWALCrashBeforeWritePropose(t *testing.T) {
t.Run(fmt.Sprintf("%s:%d", thisCase.name, lineNum), func(t *testing.T) { t.Run(fmt.Sprintf("%s:%d", thisCase.name, lineNum), func(t *testing.T) {
// setup replay test where last message is a proposal // setup replay test where last message is a proposal
cs, newBlockCh, proposalMsg, walFile := setupReplayTest(t, thisCase, lineNum, false) cs, newBlockCh, proposalMsg, walFile := setupReplayTest(t, thisCase, lineNum, false)
cs.config.TimeoutPropose = 100
msg := readTimedWALMessage(t, proposalMsg) msg := readTimedWALMessage(t, proposalMsg)
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
// Set LastSig // Set LastSig


+ 2
- 6
consensus/wal.go View File

@ -136,6 +136,7 @@ func (wal *WAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found b
for { for {
msg, err = dec.Decode() msg, err = dec.Decode()
if err == io.EOF { if err == io.EOF {
// check next file
break break
} }
if err != nil { if err != nil {
@ -147,10 +148,6 @@ func (wal *WAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found b
if m.Height == height { // found if m.Height == height { // found
wal.Logger.Debug("Found", "height", height, "index", index) wal.Logger.Debug("Found", "height", height, "index", index)
return gr, true, nil return gr, true, nil
} else if m.Height < height {
// we will never find it because we're starting from the end
gr.Close()
return nil, false, nil
} }
} }
} }
@ -214,8 +211,7 @@ func NewWALDecoder(rd io.Reader) *WALDecoder {
return &WALDecoder{rd} return &WALDecoder{rd}
} }
// Decode reads the next custom-encoded value from its input and stores it in
// the value pointed to by v.
// Decode reads the next custom-encoded value from its reader and returns it.
func (dec *WALDecoder) Decode() (*TimedWALMessage, error) { func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
b := make([]byte, 4) b := make([]byte, 4)


+ 25
- 0
consensus/wal_test.go View File

@ -2,10 +2,13 @@ package consensus
import ( import (
"bytes" "bytes"
"path"
"testing" "testing"
"time" "time"
"github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/consensus/types"
tmtypes "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -35,3 +38,25 @@ func TestWALEncoderDecoder(t *testing.T) {
assert.Equal(t, msg.Msg, decoded.Msg) assert.Equal(t, msg.Msg, decoded.Msg)
} }
} }
func TestSearchForEndHeight(t *testing.T) {
wal, err := NewWAL(path.Join(data_dir, "many_blocks.cswal"), false)
if err != nil {
t.Fatal(err)
}
h := 3
gr, found, err := wal.SearchForEndHeight(uint64(h))
assert.NoError(t, err, cmn.Fmt("expected not to err on height %d", h))
assert.True(t, found, cmn.Fmt("expected to find end height for %d", h))
assert.NotNil(t, gr, "expected group not to be nil")
defer gr.Close()
dec := NewWALDecoder(gr)
msg, err := dec.Decode()
assert.NoError(t, err, "expected to decode a message")
rs, ok := msg.Msg.(tmtypes.EventDataRoundState)
assert.True(t, ok, "expected message of type EventDataRoundState")
assert.Equal(t, rs.Height, h+1, cmn.Fmt("wrong height"))
}

+ 1
- 1
scripts/cutWALUntil/main.go View File

@ -18,7 +18,7 @@ import (
func main() { func main() {
if len(os.Args) < 4 { if len(os.Args) < 4 {
fmt.Println("3 arguments required: <path-to-wal> height-to-stop <output-wal>")
fmt.Println("3 arguments required: <path-to-wal> <height-to-stop> <output-wal>")
os.Exit(1) os.Exit(1)
} }


+ 3
- 0
scripts/wal2json/main.go View File

@ -43,5 +43,8 @@ func main() {
os.Stdout.Write(json) os.Stdout.Write(json)
os.Stdout.Write([]byte("\n")) os.Stdout.Write([]byte("\n"))
if end, ok := msg.Msg.(cs.EndHeightMessage); ok {
os.Stdout.Write([]byte(fmt.Sprintf("ENDHEIGHT %d\n", end.Height)))
}
} }
} }

Loading…
Cancel
Save