diff --git a/cmd/tendermint/reset_priv_validator.go b/cmd/tendermint/reset_priv_validator.go index 795823c39..2887c10d0 100644 --- a/cmd/tendermint/reset_priv_validator.go +++ b/cmd/tendermint/reset_priv_validator.go @@ -22,10 +22,7 @@ func reset_priv_validator() { privValidatorFile := config.GetString("priv_validator_file") if _, err := os.Stat(privValidatorFile); err == nil { privValidator = types.LoadPrivValidator(privValidatorFile) - privValidator.LastHeight = 0 - privValidator.LastRound = 0 - privValidator.LastStep = 0 - privValidator.Save() + privValidator.Reset() log.Notice("Reset PrivValidator", "file", privValidatorFile) } else { privValidator = types.GenPrivValidator() diff --git a/consensus/common_test.go b/consensus/common_test.go index 693edbca1..7f55fc29f 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -316,8 +316,8 @@ func fixedConsensusState() *ConsensusState { state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) privValidatorFile := config.GetString("priv_validator_file") privValidator := types.LoadOrGenPrivValidator(privValidatorFile) + privValidator.Reset() return newConsensusState(state, privValidator, counter.NewCounterApplication(true)) - } func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState { diff --git a/consensus/reactor.go b/consensus/reactor.go index 98f6510dd..49dc73c2c 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -140,6 +140,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // Messages affect either a peer state or the consensus state. // Peer state updates can happen in parallel, but processing of // proposals, block parts, and votes are ordered by the receiveRoutine +// NOTE: blocks on consensus state for proposals, block parts, and votes func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { if !conR.IsRunning() { log.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes) diff --git a/consensus/replay.go b/consensus/replay.go index ec85be095..c729fca27 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -19,6 +19,8 @@ import ( ) // unmarshal and apply a single message to the consensus state +// as if it were received in receiveRoutine +// NOTE: receiveRoutine should not be running func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error { var err error var msg ConsensusLogMessage @@ -31,7 +33,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte // for logging switch m := msg.Msg.(type) { case types.EventDataRoundState: - log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) + log.Notice("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step) // these are playback checks ticker := time.After(time.Second * 2) if newStepCh != nil { @@ -53,31 +55,28 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte switch msg := m.Msg.(type) { case *ProposalMessage: p := msg.Proposal - log.Notice("Proposal", "height", p.Height, "round", p.Round, "header", + log.Notice("Replay: Proposal", "height", p.Height, "round", p.Round, "header", p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey) case *BlockPartMessage: - log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) + log.Notice("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) case *VoteMessage: v := msg.Vote - log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type, + log.Notice("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type, "hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey) } - // internal or from peer - if m.PeerKey == "" { - cs.internalMsgQueue <- m - } else { - cs.peerMsgQueue <- m - } + + cs.handleMsg(m, cs.RoundState) case timeoutInfo: - log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) - cs.tockChan <- m + log.Notice("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) + cs.handleTimeout(m, cs.RoundState) default: - return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) + return fmt.Errorf("Replay: Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) } return nil } -// replay only those messages since the last block +// replay only those messages since the last block. +// timeoutRoutine should run concurrently to read off tickChan func (cs *ConsensusState) catchupReplay(height int) error { if cs.wal == nil { log.Warn("consensus msg log is nil") @@ -142,7 +141,7 @@ func (cs *ConsensusState) catchupReplay(height int) error { return err } } - log.Info("Done catchup replay") + log.Notice("Done catchup replay") return nil } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 2e53f5199..56d8e684b 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -1,8 +1,10 @@ package consensus import ( + "fmt" "io/ioutil" "os" + "strings" "testing" "time" @@ -48,69 +50,47 @@ import ( ``` */ -var testLog1 = `{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]} +var testLog = `{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]} {"time":"2016-04-03T11:23:54.388Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-04-03T11:23:54.388Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"pol_round":-1,"signature":"3A2ECD5023B21EC144EC16CFF1B992A4321317B83EEDD8969FDFEA6EB7BF4389F38DDA3E7BB109D63A07491C16277A197B241CF1F05F5E485C59882ECACD9E07"}}],"peer_key":""}]} {"time":"2016-04-03T11:23:54.389Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":0,"bytes":"0101010F74656E6465726D696E745F7465737401011441D59F4B718AC00000000000000114C4B01D3810579550997AC5641E759E20D99B51C10001000100","proof":{"aunts":[]}}}],"peer_key":""}]} {"time":"2016-04-03T11:23:54.390Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrevote"}]} -` - -// continuation; splitting allows us to test saving the privVal.LastSignature -// ... to test the case when we sign but crash before writing to the wal, -// we only run replay on testLog1 but stick this signature in the privVal.LastSignature after the proposal -var testLog2 = `{"time":"2016-04-03T11:23:54.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"47D2A75A4E2F15DB1F0D1B656AC0637AF9AADDFEB6A156874F6553C73895E5D5DC948DBAEF15E61276C5342D0E638DFCB77C971CD282096EA8735A564A90F008"}}],"peer_key":""}]} -` - -// continuation; splitting allows us to test saving the privVal.LastSignature -var testLog3 = `{"time":"2016-04-03T11:23:54.392Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} +{"time":"2016-04-03T11:23:54.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"47D2A75A4E2F15DB1F0D1B656AC0637AF9AADDFEB6A156874F6553C73895E5D5DC948DBAEF15E61276C5342D0E638DFCB77C971CD282096EA8735A564A90F008"}}],"peer_key":""}]} +{"time":"2016-04-03T11:23:54.392Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} {"time":"2016-04-03T11:23:54.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"39147DA595F08B73CF8C899967C8403B5872FD9042FFA4E239159E0B6C5D9665C9CA81D766EACA2AE658872F94C2FCD1E34BF51859CD5B274DA8512BACE4B50D"}}],"peer_key":""}]} ` -func TestReplayWithoutSig(t *testing.T) { +// map lines in the above wal to privVal step +var mapPrivValStep = map[int]int8{ + 0: 0, + 1: 0, + 2: 1, + 3: 1, + 4: 1, + 5: 2, + 6: 2, + 7: 3, +} + +func writeWAL(log string) string { + fmt.Println("writing", log) // write the needed wal to file f, err := ioutil.TempFile(os.TempDir(), "replay_test_") if err != nil { panic(err) } - _, err = f.WriteString(testLog1) + + _, err = f.WriteString(log) if err != nil { panic(err) } + name := f.Name() f.Close() + return name +} - cs := fixedConsensusState() - - // we've already precommitted on the first block - // without replay catchup we would be halted here forever - cs.privValidator.LastHeight = 1 // first block - cs.privValidator.LastStep = 2 // prevote - - newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 0) - cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) { - // Set LastSig - - // unmarshal log2 - var err error - var msg ConsensusLogMessage - wire.ReadJSON(&msg, []byte(testLog2), &err) - vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) - if err != nil { - t.Fatalf("Error reading json data: %v", err) - } - - cs.privValidator.LastSignature = vote.Vote.Signature - }) - - // start timeout and receive routines - cs.startRoutines(0) - - // open wal and run catchup messages - openWAL(t, cs, f.Name()) - if err := cs.catchupReplay(cs.Height); err != nil { - panic(Fmt("Error on catchup replay %v", err)) - } - - after := time.After(time.Second * 15) +func waitForBlock(newBlockCh chan interface{}) { + after := time.After(time.Second * 10) select { case <-newBlockCh: case <-after: @@ -118,42 +98,47 @@ func TestReplayWithoutSig(t *testing.T) { } } -func TestReplayWithSig(t *testing.T) { - // write the needed wal to file - f, err := ioutil.TempFile(os.TempDir(), "replay_test_") - if err != nil { - panic(err) +func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh chan interface{}) { + // open wal and run catchup messages + openWAL(t, cs, fileName) + go cs.timeoutRoutine() + if err := cs.catchupReplay(cs.Height); err != nil { + panic(Fmt("Error on catchup replay %v", err)) } - _, err = f.WriteString(testLog1 + testLog2 + testLog3) - if err != nil { - panic(err) + go cs.receiveRoutine(0) + // wait to make a new block + waitForBlock(newBlockCh) + cs.QuitService.OnStop() +} + +func setupReplayTest(nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) { + fmt.Println("-------------------------------------") + log.Notice(Fmt("Starting replay test of %d lines of WAL (crash before write)", nLines)) + + lineStep := nLines + if crashAfter { + lineStep -= 1 } - f.Close() + + split := strings.Split(testLog, "\n") + lastMsg := split[nLines] + + // we write those lines up to (not including) one with the signature + fileName := writeWAL(strings.Join(split[:nLines], "\n") + "\n") cs := fixedConsensusState() + cs.QuitService.OnStart() // we've already precommitted on the first block // without replay catchup we would be halted here forever cs.privValidator.LastHeight = 1 // first block - cs.privValidator.LastStep = 3 // precommit + cs.privValidator.LastStep = mapPrivValStep[lineStep] - newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 0) + fmt.Println("LAST STEP", cs.privValidator.LastStep) - // start timeout and receive routines - cs.startRoutines(0) + newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) - // open wal and run catchup messages - openWAL(t, cs, f.Name()) - if err := cs.catchupReplay(cs.Height); err != nil { - panic(Fmt("Error on catchup replay %v", err)) - } - - after := time.After(time.Second * 15) - select { - case <-newBlockCh: - case <-after: - panic("Timed out waiting for new block") - } + return cs, newBlockCh, lastMsg, fileName } func openWAL(t *testing.T, cs *ConsensusState, file string) { @@ -165,3 +150,68 @@ func openWAL(t *testing.T, cs *ConsensusState, file string) { wal.exists = true cs.wal = wal } + +//----------------------------------------------- +// Test the log at every iteration, and set the privVal last step +// as if the log was written after signing, before the crash + +func TestReplayCrashAfterWrite(t *testing.T) { + split := strings.Split(testLog, "\n") + for i := 0; i < len(split)-1; i++ { + cs, newBlockCh, _, f := setupReplayTest(i+1, true) + runReplayTest(t, cs, f, newBlockCh) + } +} + +//----------------------------------------------- +// Test the log as if we crashed after signing but before writing. +// This relies on privValidator.LastSignature being set + +func TestReplayCrashBeforeWritePropose(t *testing.T) { + cs, newBlockCh, proposalMsg, f := setupReplayTest(2, false) // propose + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(proposalMsg), &err) + proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) + cs.privValidator.LastSignature = proposal.Proposal.Signature + runReplayTest(t, cs, f, newBlockCh) +} + +func TestReplayCrashBeforeWritePrevote(t *testing.T) { + cs, newBlockCh, voteMsg, f := setupReplayTest(5, false) // prevote + cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) { + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(voteMsg), &err) + vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) + cs.privValidator.LastSignature = vote.Vote.Signature + }) + runReplayTest(t, cs, f, newBlockCh) +} + +func TestReplayCrashBeforeWritePrecommit(t *testing.T) { + cs, newBlockCh, voteMsg, f := setupReplayTest(7, false) // precommit + cs.evsw.AddListenerForEvent("tester", types.EventStringPolka(), func(data events.EventData) { + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(voteMsg), &err) + vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) + cs.privValidator.LastSignature = vote.Vote.Signature + }) + runReplayTest(t, cs, f, newBlockCh) +} diff --git a/consensus/state.go b/consensus/state.go index 192253192..564f02529 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -298,8 +298,17 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { func (cs *ConsensusState) OnStart() error { cs.QuitService.OnStart() - // start timeout and receive routines - cs.startRoutines(0) + err := cs.OpenWAL(cs.config.GetString("cswal")) + if err != nil { + return err + } + + // start timeout routine + // NOTE: we dont start receiveRoutine until after replay + // so we dont re-write events, and so we dont process + // peer msgs before replay on app restarts. + // timeoutRoutine needed to read off tickChan during replay + go cs.timeoutRoutine() // we may have lost some votes if the process crashed // reload from consensus log to catchup @@ -308,6 +317,9 @@ func (cs *ConsensusState) OnStart() error { // let's go for it anyways, maybe we're fine } + // start + go cs.receiveRoutine(0) + // schedule the first round! cs.scheduleRound0(cs.Height) diff --git a/types/priv_validator.go b/types/priv_validator.go index e9046c5a4..5700fdf59 100644 --- a/types/priv_validator.go +++ b/types/priv_validator.go @@ -41,6 +41,7 @@ type PrivValidator struct { LastRound int `json:"last_round"` LastStep int8 `json:"last_step"` LastSignature crypto.Signature `json:"last_signature"` // so we dont lose signatures + LastSignBytes []byte `json:"last_signbytes"` // so we dont lose signatures // PrivKey should be empty if a Signer other than the default is being used. PrivKey crypto.PrivKey `json:"priv_key"` @@ -93,6 +94,7 @@ func GenPrivValidator() *PrivValidator { LastRound: 0, LastStep: stepNone, LastSignature: nil, + LastSignBytes: nil, filePath: "", Signer: NewDefaultSigner(privKey), } @@ -151,6 +153,16 @@ func (privVal *PrivValidator) save() { } } +// NOTE: Unsafe! +func (privVal *PrivValidator) Reset() { + privVal.LastHeight = 0 + privVal.LastRound = 0 + privVal.LastStep = 0 + privVal.LastSignature = nil + privVal.LastSignBytes = nil + privVal.Save() +} + func (privVal *PrivValidator) SignVote(chainID string, vote *Vote) error { privVal.mtx.Lock() defer privVal.mtx.Unlock() @@ -190,9 +202,19 @@ func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signByt if privVal.LastStep > step { return nil, errors.New("Step regression") } else if privVal.LastStep == step { - if privVal.LastSignature != nil { - return privVal.LastSignature, nil + if privVal.LastSignBytes != nil { + if privVal.LastSignature == nil { + PanicSanity("privVal: LastSignature is nil but LastSignBytes is not!") + } + // so we dont sign a conflicting vote or proposal + // NOTE: proposals are non-deterministic (include time), + // so we can actually lose them, but will still never sign conflicting ones + if bytes.Equal(privVal.LastSignBytes, signBytes) { + log.Notice("Using privVal.LastSignature", "sig", privVal.LastSignature) + return privVal.LastSignature, nil + } } + return nil, errors.New("Step regression") } } } @@ -205,6 +227,7 @@ func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signByt privVal.LastRound = round privVal.LastStep = step privVal.LastSignature = signature + privVal.LastSignBytes = signBytes privVal.save() return signature, nil