From 273a65724d6958d34dd098f2d68a516f29716976 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 18 Jan 2016 15:57:57 -0500 Subject: [PATCH] replayCatchup test --- consensus/common_test.go | 31 ++++++++++------- consensus/replay.go | 15 ++++++--- consensus/replay_test.go | 72 ++++++++++++++++++++++++++++++++++++++++ consensus/state_test.go | 32 +++++++++--------- consensus/wal.go | 6 +++- 5 files changed, 123 insertions(+), 33 deletions(-) create mode 100644 consensus/replay_test.go diff --git a/consensus/common_test.go b/consensus/common_test.go index dae0882b0..c222db67a 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -9,9 +9,9 @@ import ( "time" dbm "github.com/tendermint/go-db" + "github.com/tendermint/go-events" bc "github.com/tendermint/tendermint/blockchain" _ "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/go-events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" @@ -296,16 +296,16 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo cs.mtx.Unlock() } -func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { - // Get State - state, privVals := randGenesisState(nValidators, false, 10) - - // fmt.Println(state.Validators) - - vss := make([]*validatorStub, nValidators) +func fixedConsensusState() *ConsensusState { + stateDB := dbm.NewMemDB() + state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) + privValidatorFile := config.GetString("priv_validator_file") + privValidator := types.LoadOrGenPrivValidator(privValidatorFile) + return newConsensusState(state, privValidator) - // make consensus state for lead validator +} +func newConsensusState(state *sm.State, pv *types.PrivValidator) *ConsensusState { // Get BlockStore blockDB := dbm.NewMemDB() blockStore := bc.NewBlockStore(blockDB) @@ -320,14 +320,21 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { // Make ConsensusReactor cs := NewConsensusState(state, proxyAppConnCon, blockStore, mempool) - cs.SetPrivValidator(privVals[0]) + cs.SetPrivValidator(pv) evsw := events.NewEventSwitch() cs.SetEventSwitch(evsw) evsw.Start() + return cs +} + +func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { + // Get State + state, privVals := randGenesisState(nValidators, false, 10) + + vss := make([]*validatorStub, nValidators) - // start the transition routines - // cs.startRoutines() + cs := newConsensusState(state, privVals[0]) for i := 0; i < nValidators; i++ { vss[i] = NewValidatorStub(privVals[i]) diff --git a/consensus/replay.go b/consensus/replay.go index 5d193efec..4f6cb1c59 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -24,6 +24,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte var msg ConsensusLogMessage wire.ReadJSON(&msg, msgBytes, &err) if err != nil { + fmt.Println(string(msgBytes)) return fmt.Errorf("Error reading json data: %v", err) } @@ -87,8 +88,6 @@ func (cs *ConsensusState) catchupReplay(height int) error { return nil } - log.Notice("Catchup by replaying consensus messages") - // starting from end of file, // read messages until a new height is found nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { @@ -110,6 +109,13 @@ func (cs *ConsensusState) catchupReplay(height int) error { return err } + var beginning bool // if we had to go back to the beginning + if c, _ := cs.wal.fp.Seek(0, 1); c == 0 { + beginning = true + } + + log.Notice("Catchup by replaying consensus messages", "n", nLines) + // now we can replay the latest nLines on consensus state // note we can't use scan because we've already been reading from the file reader := bufio.NewReader(cs.wal.fp) @@ -122,8 +128,8 @@ func (cs *ConsensusState) catchupReplay(height int) error { } else if len(msgBytes) == 0 { continue } - // the first msg is the NewHeight event, so we can ignore it - if i == 1 { + // the first msg is (usually) the NewHeight event, so we can ignore it + if !beginning && i == 1 { continue } @@ -134,6 +140,7 @@ func (cs *ConsensusState) catchupReplay(height int) error { return err } } + log.Info("Done catchup replay") return nil } diff --git a/consensus/replay_test.go b/consensus/replay_test.go new file mode 100644 index 000000000..f94c73fcc --- /dev/null +++ b/consensus/replay_test.go @@ -0,0 +1,72 @@ +package consensus + +import ( + "io/ioutil" + "os" + "testing" + "time" + + "github.com/tendermint/tendermint/types" +) + +var testLog = `{"time":"2016-01-18T20:46:00.774Z","msg":[3,{"duration":982632969,"height":1,"round":0,"step":1}]} +{"time":"2016-01-18T20:46:00.776Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} +{"time":"2016-01-18T20:46:00.776Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"pol_round":-1,"signature":"A1803A1364F6398C154FE45D5649A89129039F18A0FE42B211BADFDF6E81EA53F48F83D3610DDD848C3A5284D3F09BDEB26FA1D856BDF70D48C507BF2453A70E"}}],"peer_key":""}]} +{"time":"2016-01-18T20:46:00.777Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":0,"bytes":"0101010F74656E6465726D696E745F746573740101142AA030B15DDFC000000000000000000000000000000114C4B01D3810579550997AC5641E759E20D99B51C10001000100","proof":{"aunts":[]}}}],"peer_key":""}]} +{"time":"2016-01-18T20:46:00.781Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrevote"}]} +{"time":"2016-01-18T20:46:00.781Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"E05D1DB8DEC7CDA507A42C8FF208EE4317C663F6","block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"signature":"88F5708C802BEE54EFBF438967FBC6C6EAAFC41258A85D92B9B055481175BE9FA71007B1AAF2BFBC3BF3CC0542DB48A9812324B7BBA7307446CCDBF029077F07"}}],"peer_key":""}]} +{"time":"2016-01-18T20:46:00.786Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} +{"time":"2016-01-18T20:46:00.786Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"E05D1DB8DEC7CDA507A42C8FF208EE4317C663F6","block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"signature":"65B0C9D2A8C9919FC9B036F82C3F1818E706E8BC066A78D99D3316E4814AB06594841E387B323AA7773F926D253C1E4D4A0930F7A8C8AE1E838CA15C673B2B02"}}],"peer_key":""}]} +` + +func TestReplayCatchup(t *testing.T) { + // write the needed wal to file + f, err := ioutil.TempFile(os.TempDir(), "replay_test_") + if err != nil { + t.Fatal(err) + } + name := f.Name() + _, err = f.WriteString(testLog) + if err != nil { + t.Fatal(err) + } + f.Close() + + cs := fixedConsensusState() + + // we've already precommitted on the first block + // without replay catchup we would be halted here forever + cs.privValidator.LastHeight = 1 // first block + cs.privValidator.LastStep = 3 // precommit + + newBlockCh := cs.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) + + // start timeout and receive routines + cs.startRoutines(0) + + // open wal and run catchup messages + openWAL(t, cs, name) + if err := cs.catchupReplay(cs.Height); err != nil { + t.Fatalf("Error on catchup replay %v", err) + } + + cs.enterNewRound(cs.Height, cs.Round) + + after := time.After(time.Second * 2) + select { + case <-newBlockCh: + case <-after: + t.Fatal("Timed out waiting for new block") + } + +} + +func openWAL(t *testing.T, cs *ConsensusState, file string) { + // open the wal + wal, err := NewWAL(file) + if err != nil { + t.Fatal(err) + } + wal.exists = true + cs.wal = wal +} diff --git a/consensus/state_test.go b/consensus/state_test.go index ab6795a69..ba24a5ab6 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -51,7 +51,7 @@ func init() { } func TestProposerSelection0(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) height, round := cs1.Height, cs1.Round newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) @@ -85,7 +85,7 @@ func TestProposerSelection0(t *testing.T) { // Now let's do it all again, but starting from round 2 instead of 0 func TestProposerSelection2(t *testing.T) { - cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators + cs1, vss := randConsensusState(4) // test needs more work for more than 3 validators newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) @@ -114,7 +114,7 @@ func TestProposerSelection2(t *testing.T) { // a non-validator should timeout into the prevote round func TestEnterProposeNoPrivValidator(t *testing.T) { - cs, _ := simpleConsensusState(1) + cs, _ := randConsensusState(1) cs.SetPrivValidator(nil) height, round := cs.Height, cs.Round @@ -139,7 +139,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { // a validator should not timeout of the prevote round (TODO: unless the block is really big!) func TestEnterProposeYesPrivValidator(t *testing.T) { - cs, _ := simpleConsensusState(1) + cs, _ := randConsensusState(1) height, round := cs.Height, cs.Round // Listen for propose timeout event @@ -175,7 +175,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { } func TestBadProposal(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) height, round := cs1.Height, cs1.Round cs2 := vss[1] @@ -231,7 +231,7 @@ func TestBadProposal(t *testing.T) { // propose, prevote, and precommit a block func TestFullRound1(t *testing.T) { - cs, vss := simpleConsensusState(1) + cs, vss := randConsensusState(1) height, round := cs.Height, cs.Round voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) @@ -259,7 +259,7 @@ func TestFullRound1(t *testing.T) { // nil is proposed, so prevote and precommit nil func TestFullRoundNil(t *testing.T) { - cs, vss := simpleConsensusState(1) + cs, vss := randConsensusState(1) height, round := cs.Height, cs.Round voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) @@ -277,7 +277,7 @@ func TestFullRoundNil(t *testing.T) { // run through propose, prevote, precommit commit with two validators // where the first validator has to wait for votes from the second func TestFullRound2(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) cs2 := vss[1] height, round := cs1.Height, cs1.Round @@ -318,7 +318,7 @@ func TestFullRound2(t *testing.T) { // two validators, 4 rounds. // two vals take turns proposing. val1 locks on first one, precommits nil on everything else func TestLockNoPOL(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) cs2 := vss[1] height := cs1.Height @@ -481,7 +481,7 @@ func TestLockNoPOL(t *testing.T) { // 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka func TestLockPOLRelock(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) @@ -589,7 +589,7 @@ func TestLockPOLRelock(t *testing.T) { // 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka func TestLockPOLUnlock(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) @@ -680,7 +680,7 @@ func TestLockPOLUnlock(t *testing.T) { // then a polka at round 2 that we lock on // then we see the polka from round 1 but shouldn't unlock func TestLockPOLSafety1(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) @@ -799,7 +799,7 @@ func TestLockPOLSafety1(t *testing.T) { // What we want: // dont see P0, lock on P1 at R1, dont unlock using P0 at R2 func TestLockPOLSafety2(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) @@ -889,7 +889,7 @@ func TestLockPOLSafety2(t *testing.T) { /* func TestSlashingPrevotes(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) cs2 := vss[1] @@ -924,7 +924,7 @@ func TestSlashingPrevotes(t *testing.T) { } func TestSlashingPrecommits(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) cs2 := vss[1] @@ -969,7 +969,7 @@ func TestSlashingPrecommits(t *testing.T) { // 4 vals. // we receive a final precommit after going into next round, but others might have gone to commit already! func TestHalt1(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) diff --git a/consensus/wal.go b/consensus/wal.go index 5b6e3989a..e2599de2d 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -32,6 +32,8 @@ var _ = wire.RegisterInterface( // Write ahead logger writes msgs to disk before they are processed. // Can be used for crash-recovery and deterministic replay +// TODO: currently the wal is overwritten during replay catchup +// give it a mode so it's either reading or appending - must read to end to start appending again type WAL struct { fp *os.File exists bool // if the file already existed (restarted process) @@ -81,9 +83,11 @@ func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) { } // backup until we find the the right line + // current is how far we are from the beginning for { current -= 1 if current < 0 { + wal.fp.Seek(0, 0) // back to beginning return } // backup one and read a new byte @@ -95,6 +99,7 @@ func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) { return } if b[0] == '\n' || len(b) == 0 { + nLines += 1 // read a full line reader := bufio.NewReader(wal.fp) lineBytes, _ := reader.ReadBytes('\n') @@ -102,7 +107,6 @@ func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) { continue } - nLines += 1 if found(lineBytes) { wal.fp.Seek(0, 1) // (?) wal.fp.Seek(current, 0)