Browse Source

replayCatchup test

pull/172/head
Ethan Buchman 9 years ago
parent
commit
273a65724d
5 changed files with 123 additions and 33 deletions
  1. +19
    -12
      consensus/common_test.go
  2. +11
    -4
      consensus/replay.go
  3. +72
    -0
      consensus/replay_test.go
  4. +16
    -16
      consensus/state_test.go
  5. +5
    -1
      consensus/wal.go

+ 19
- 12
consensus/common_test.go View File

@ -9,9 +9,9 @@ import (
"time" "time"
dbm "github.com/tendermint/go-db" dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-events"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
_ "github.com/tendermint/tendermint/config/tendermint_test" _ "github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/go-events"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
@ -296,16 +296,16 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo
cs.mtx.Unlock() cs.mtx.Unlock()
} }
func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
// Get State
state, privVals := randGenesisState(nValidators, false, 10)
// fmt.Println(state.Validators)
vss := make([]*validatorStub, nValidators)
func fixedConsensusState() *ConsensusState {
stateDB := dbm.NewMemDB()
state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
return newConsensusState(state, privValidator)
// make consensus state for lead validator
}
func newConsensusState(state *sm.State, pv *types.PrivValidator) *ConsensusState {
// Get BlockStore // Get BlockStore
blockDB := dbm.NewMemDB() blockDB := dbm.NewMemDB()
blockStore := bc.NewBlockStore(blockDB) blockStore := bc.NewBlockStore(blockDB)
@ -320,14 +320,21 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
// Make ConsensusReactor // Make ConsensusReactor
cs := NewConsensusState(state, proxyAppConnCon, blockStore, mempool) cs := NewConsensusState(state, proxyAppConnCon, blockStore, mempool)
cs.SetPrivValidator(privVals[0])
cs.SetPrivValidator(pv)
evsw := events.NewEventSwitch() evsw := events.NewEventSwitch()
cs.SetEventSwitch(evsw) cs.SetEventSwitch(evsw)
evsw.Start() evsw.Start()
return cs
}
func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
// Get State
state, privVals := randGenesisState(nValidators, false, 10)
vss := make([]*validatorStub, nValidators)
// start the transition routines
// cs.startRoutines()
cs := newConsensusState(state, privVals[0])
for i := 0; i < nValidators; i++ { for i := 0; i < nValidators; i++ {
vss[i] = NewValidatorStub(privVals[i]) vss[i] = NewValidatorStub(privVals[i])


+ 11
- 4
consensus/replay.go View File

@ -24,6 +24,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte
var msg ConsensusLogMessage var msg ConsensusLogMessage
wire.ReadJSON(&msg, msgBytes, &err) wire.ReadJSON(&msg, msgBytes, &err)
if err != nil { if err != nil {
fmt.Println(string(msgBytes))
return fmt.Errorf("Error reading json data: %v", err) return fmt.Errorf("Error reading json data: %v", err)
} }
@ -87,8 +88,6 @@ func (cs *ConsensusState) catchupReplay(height int) error {
return nil return nil
} }
log.Notice("Catchup by replaying consensus messages")
// starting from end of file, // starting from end of file,
// read messages until a new height is found // read messages until a new height is found
nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool {
@ -110,6 +109,13 @@ func (cs *ConsensusState) catchupReplay(height int) error {
return err return err
} }
var beginning bool // if we had to go back to the beginning
if c, _ := cs.wal.fp.Seek(0, 1); c == 0 {
beginning = true
}
log.Notice("Catchup by replaying consensus messages", "n", nLines)
// now we can replay the latest nLines on consensus state // now we can replay the latest nLines on consensus state
// note we can't use scan because we've already been reading from the file // note we can't use scan because we've already been reading from the file
reader := bufio.NewReader(cs.wal.fp) reader := bufio.NewReader(cs.wal.fp)
@ -122,8 +128,8 @@ func (cs *ConsensusState) catchupReplay(height int) error {
} else if len(msgBytes) == 0 { } else if len(msgBytes) == 0 {
continue continue
} }
// the first msg is the NewHeight event, so we can ignore it
if i == 1 {
// the first msg is (usually) the NewHeight event, so we can ignore it
if !beginning && i == 1 {
continue continue
} }
@ -134,6 +140,7 @@ func (cs *ConsensusState) catchupReplay(height int) error {
return err return err
} }
} }
log.Info("Done catchup replay")
return nil return nil
} }


+ 72
- 0
consensus/replay_test.go View File

@ -0,0 +1,72 @@
package consensus
import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/tendermint/tendermint/types"
)
var testLog = `{"time":"2016-01-18T20:46:00.774Z","msg":[3,{"duration":982632969,"height":1,"round":0,"step":1}]}
{"time":"2016-01-18T20:46:00.776Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
{"time":"2016-01-18T20:46:00.776Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"pol_round":-1,"signature":"A1803A1364F6398C154FE45D5649A89129039F18A0FE42B211BADFDF6E81EA53F48F83D3610DDD848C3A5284D3F09BDEB26FA1D856BDF70D48C507BF2453A70E"}}],"peer_key":""}]}
{"time":"2016-01-18T20:46:00.777Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":0,"bytes":"0101010F74656E6465726D696E745F746573740101142AA030B15DDFC000000000000000000000000000000114C4B01D3810579550997AC5641E759E20D99B51C10001000100","proof":{"aunts":[]}}}],"peer_key":""}]}
{"time":"2016-01-18T20:46:00.781Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrevote"}]}
{"time":"2016-01-18T20:46:00.781Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"E05D1DB8DEC7CDA507A42C8FF208EE4317C663F6","block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"signature":"88F5708C802BEE54EFBF438967FBC6C6EAAFC41258A85D92B9B055481175BE9FA71007B1AAF2BFBC3BF3CC0542DB48A9812324B7BBA7307446CCDBF029077F07"}}],"peer_key":""}]}
{"time":"2016-01-18T20:46:00.786Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]}
{"time":"2016-01-18T20:46:00.786Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"E05D1DB8DEC7CDA507A42C8FF208EE4317C663F6","block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"signature":"65B0C9D2A8C9919FC9B036F82C3F1818E706E8BC066A78D99D3316E4814AB06594841E387B323AA7773F926D253C1E4D4A0930F7A8C8AE1E838CA15C673B2B02"}}],"peer_key":""}]}
`
func TestReplayCatchup(t *testing.T) {
// write the needed wal to file
f, err := ioutil.TempFile(os.TempDir(), "replay_test_")
if err != nil {
t.Fatal(err)
}
name := f.Name()
_, err = f.WriteString(testLog)
if err != nil {
t.Fatal(err)
}
f.Close()
cs := fixedConsensusState()
// we've already precommitted on the first block
// without replay catchup we would be halted here forever
cs.privValidator.LastHeight = 1 // first block
cs.privValidator.LastStep = 3 // precommit
newBlockCh := cs.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0)
// start timeout and receive routines
cs.startRoutines(0)
// open wal and run catchup messages
openWAL(t, cs, name)
if err := cs.catchupReplay(cs.Height); err != nil {
t.Fatalf("Error on catchup replay %v", err)
}
cs.enterNewRound(cs.Height, cs.Round)
after := time.After(time.Second * 2)
select {
case <-newBlockCh:
case <-after:
t.Fatal("Timed out waiting for new block")
}
}
func openWAL(t *testing.T, cs *ConsensusState, file string) {
// open the wal
wal, err := NewWAL(file)
if err != nil {
t.Fatal(err)
}
wal.exists = true
cs.wal = wal
}

+ 16
- 16
consensus/state_test.go View File

@ -51,7 +51,7 @@ func init() {
} }
func TestProposerSelection0(t *testing.T) { func TestProposerSelection0(t *testing.T) {
cs1, vss := simpleConsensusState(4)
cs1, vss := randConsensusState(4)
height, round := cs1.Height, cs1.Round height, round := cs1.Height, cs1.Round
newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
@ -85,7 +85,7 @@ func TestProposerSelection0(t *testing.T) {
// Now let's do it all again, but starting from round 2 instead of 0 // Now let's do it all again, but starting from round 2 instead of 0
func TestProposerSelection2(t *testing.T) { func TestProposerSelection2(t *testing.T) {
cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators
cs1, vss := randConsensusState(4) // test needs more work for more than 3 validators
newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
@ -114,7 +114,7 @@ func TestProposerSelection2(t *testing.T) {
// a non-validator should timeout into the prevote round // a non-validator should timeout into the prevote round
func TestEnterProposeNoPrivValidator(t *testing.T) { func TestEnterProposeNoPrivValidator(t *testing.T) {
cs, _ := simpleConsensusState(1)
cs, _ := randConsensusState(1)
cs.SetPrivValidator(nil) cs.SetPrivValidator(nil)
height, round := cs.Height, cs.Round height, round := cs.Height, cs.Round
@ -139,7 +139,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) {
// a validator should not timeout of the prevote round (TODO: unless the block is really big!) // a validator should not timeout of the prevote round (TODO: unless the block is really big!)
func TestEnterProposeYesPrivValidator(t *testing.T) { func TestEnterProposeYesPrivValidator(t *testing.T) {
cs, _ := simpleConsensusState(1)
cs, _ := randConsensusState(1)
height, round := cs.Height, cs.Round height, round := cs.Height, cs.Round
// Listen for propose timeout event // Listen for propose timeout event
@ -175,7 +175,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) {
} }
func TestBadProposal(t *testing.T) { func TestBadProposal(t *testing.T) {
cs1, vss := simpleConsensusState(2)
cs1, vss := randConsensusState(2)
height, round := cs1.Height, cs1.Round height, round := cs1.Height, cs1.Round
cs2 := vss[1] cs2 := vss[1]
@ -231,7 +231,7 @@ func TestBadProposal(t *testing.T) {
// propose, prevote, and precommit a block // propose, prevote, and precommit a block
func TestFullRound1(t *testing.T) { func TestFullRound1(t *testing.T) {
cs, vss := simpleConsensusState(1)
cs, vss := randConsensusState(1)
height, round := cs.Height, cs.Round height, round := cs.Height, cs.Round
voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
@ -259,7 +259,7 @@ func TestFullRound1(t *testing.T) {
// nil is proposed, so prevote and precommit nil // nil is proposed, so prevote and precommit nil
func TestFullRoundNil(t *testing.T) { func TestFullRoundNil(t *testing.T) {
cs, vss := simpleConsensusState(1)
cs, vss := randConsensusState(1)
height, round := cs.Height, cs.Round height, round := cs.Height, cs.Round
voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
@ -277,7 +277,7 @@ func TestFullRoundNil(t *testing.T) {
// run through propose, prevote, precommit commit with two validators // run through propose, prevote, precommit commit with two validators
// where the first validator has to wait for votes from the second // where the first validator has to wait for votes from the second
func TestFullRound2(t *testing.T) { func TestFullRound2(t *testing.T) {
cs1, vss := simpleConsensusState(2)
cs1, vss := randConsensusState(2)
cs2 := vss[1] cs2 := vss[1]
height, round := cs1.Height, cs1.Round height, round := cs1.Height, cs1.Round
@ -318,7 +318,7 @@ func TestFullRound2(t *testing.T) {
// two validators, 4 rounds. // two validators, 4 rounds.
// two vals take turns proposing. val1 locks on first one, precommits nil on everything else // two vals take turns proposing. val1 locks on first one, precommits nil on everything else
func TestLockNoPOL(t *testing.T) { func TestLockNoPOL(t *testing.T) {
cs1, vss := simpleConsensusState(2)
cs1, vss := randConsensusState(2)
cs2 := vss[1] cs2 := vss[1]
height := cs1.Height height := cs1.Height
@ -481,7 +481,7 @@ func TestLockNoPOL(t *testing.T) {
// 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka // 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka
func TestLockPOLRelock(t *testing.T) { func TestLockPOLRelock(t *testing.T) {
cs1, vss := simpleConsensusState(4)
cs1, vss := randConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3] cs2, cs3, cs4 := vss[1], vss[2], vss[3]
timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
@ -589,7 +589,7 @@ func TestLockPOLRelock(t *testing.T) {
// 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka // 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka
func TestLockPOLUnlock(t *testing.T) { func TestLockPOLUnlock(t *testing.T) {
cs1, vss := simpleConsensusState(4)
cs1, vss := randConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3] cs2, cs3, cs4 := vss[1], vss[2], vss[3]
proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
@ -680,7 +680,7 @@ func TestLockPOLUnlock(t *testing.T) {
// then a polka at round 2 that we lock on // then a polka at round 2 that we lock on
// then we see the polka from round 1 but shouldn't unlock // then we see the polka from round 1 but shouldn't unlock
func TestLockPOLSafety1(t *testing.T) { func TestLockPOLSafety1(t *testing.T) {
cs1, vss := simpleConsensusState(4)
cs1, vss := randConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3] cs2, cs3, cs4 := vss[1], vss[2], vss[3]
proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
@ -799,7 +799,7 @@ func TestLockPOLSafety1(t *testing.T) {
// What we want: // What we want:
// dont see P0, lock on P1 at R1, dont unlock using P0 at R2 // dont see P0, lock on P1 at R1, dont unlock using P0 at R2
func TestLockPOLSafety2(t *testing.T) { func TestLockPOLSafety2(t *testing.T) {
cs1, vss := simpleConsensusState(4)
cs1, vss := randConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3] cs2, cs3, cs4 := vss[1], vss[2], vss[3]
proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
@ -889,7 +889,7 @@ func TestLockPOLSafety2(t *testing.T) {
/* /*
func TestSlashingPrevotes(t *testing.T) { func TestSlashingPrevotes(t *testing.T) {
cs1, vss := simpleConsensusState(2)
cs1, vss := randConsensusState(2)
cs2 := vss[1] cs2 := vss[1]
@ -924,7 +924,7 @@ func TestSlashingPrevotes(t *testing.T) {
} }
func TestSlashingPrecommits(t *testing.T) { func TestSlashingPrecommits(t *testing.T) {
cs1, vss := simpleConsensusState(2)
cs1, vss := randConsensusState(2)
cs2 := vss[1] cs2 := vss[1]
@ -969,7 +969,7 @@ func TestSlashingPrecommits(t *testing.T) {
// 4 vals. // 4 vals.
// we receive a final precommit after going into next round, but others might have gone to commit already! // we receive a final precommit after going into next round, but others might have gone to commit already!
func TestHalt1(t *testing.T) { func TestHalt1(t *testing.T) {
cs1, vss := simpleConsensusState(4)
cs1, vss := randConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3] cs2, cs3, cs4 := vss[1], vss[2], vss[3]
proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)


+ 5
- 1
consensus/wal.go View File

@ -32,6 +32,8 @@ var _ = wire.RegisterInterface(
// Write ahead logger writes msgs to disk before they are processed. // Write ahead logger writes msgs to disk before they are processed.
// Can be used for crash-recovery and deterministic replay // Can be used for crash-recovery and deterministic replay
// TODO: currently the wal is overwritten during replay catchup
// give it a mode so it's either reading or appending - must read to end to start appending again
type WAL struct { type WAL struct {
fp *os.File fp *os.File
exists bool // if the file already existed (restarted process) exists bool // if the file already existed (restarted process)
@ -81,9 +83,11 @@ func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {
} }
// backup until we find the the right line // backup until we find the the right line
// current is how far we are from the beginning
for { for {
current -= 1 current -= 1
if current < 0 { if current < 0 {
wal.fp.Seek(0, 0) // back to beginning
return return
} }
// backup one and read a new byte // backup one and read a new byte
@ -95,6 +99,7 @@ func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {
return return
} }
if b[0] == '\n' || len(b) == 0 { if b[0] == '\n' || len(b) == 0 {
nLines += 1
// read a full line // read a full line
reader := bufio.NewReader(wal.fp) reader := bufio.NewReader(wal.fp)
lineBytes, _ := reader.ReadBytes('\n') lineBytes, _ := reader.ReadBytes('\n')
@ -102,7 +107,6 @@ func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {
continue continue
} }
nLines += 1
if found(lineBytes) { if found(lineBytes) {
wal.fp.Seek(0, 1) // (?) wal.fp.Seek(0, 1) // (?)
wal.fp.Seek(current, 0) wal.fp.Seek(current, 0)


Loading…
Cancel
Save