From a1c20ce866f0383eb4b24fde04632965d210353a Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 9 Aug 2016 17:18:29 -0400 Subject: [PATCH 1/4] types: privVal.LastSignature. closes #247 --- consensus/replay_test.go | 75 ++++++++++++++++++++++++--- types/priv_validator.go | 109 +++++++++++++++++++++------------------ 2 files changed, 127 insertions(+), 57 deletions(-) diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 46e862ed2..2e53f5199 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -7,6 +7,8 @@ import ( "time" . "github.com/tendermint/go-common" + "github.com/tendermint/go-events" + "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" ) @@ -46,24 +48,83 @@ import ( ``` */ -var testLog = `{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]} +var testLog1 = `{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]} {"time":"2016-04-03T11:23:54.388Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-04-03T11:23:54.388Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"pol_round":-1,"signature":"3A2ECD5023B21EC144EC16CFF1B992A4321317B83EEDD8969FDFEA6EB7BF4389F38DDA3E7BB109D63A07491C16277A197B241CF1F05F5E485C59882ECACD9E07"}}],"peer_key":""}]} {"time":"2016-04-03T11:23:54.389Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":0,"bytes":"0101010F74656E6465726D696E745F7465737401011441D59F4B718AC00000000000000114C4B01D3810579550997AC5641E759E20D99B51C10001000100","proof":{"aunts":[]}}}],"peer_key":""}]} {"time":"2016-04-03T11:23:54.390Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrevote"}]} -{"time":"2016-04-03T11:23:54.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"47D2A75A4E2F15DB1F0D1B656AC0637AF9AADDFEB6A156874F6553C73895E5D5DC948DBAEF15E61276C5342D0E638DFCB77C971CD282096EA8735A564A90F008"}}],"peer_key":""}]} -{"time":"2016-04-03T11:23:54.392Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} +` + +// continuation; splitting allows us to test saving the privVal.LastSignature +// ... to test the case when we sign but crash before writing to the wal, +// we only run replay on testLog1 but stick this signature in the privVal.LastSignature after the proposal +var testLog2 = `{"time":"2016-04-03T11:23:54.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"47D2A75A4E2F15DB1F0D1B656AC0637AF9AADDFEB6A156874F6553C73895E5D5DC948DBAEF15E61276C5342D0E638DFCB77C971CD282096EA8735A564A90F008"}}],"peer_key":""}]} +` + +// continuation; splitting allows us to test saving the privVal.LastSignature +var testLog3 = `{"time":"2016-04-03T11:23:54.392Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} {"time":"2016-04-03T11:23:54.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"39147DA595F08B73CF8C899967C8403B5872FD9042FFA4E239159E0B6C5D9665C9CA81D766EACA2AE658872F94C2FCD1E34BF51859CD5B274DA8512BACE4B50D"}}],"peer_key":""}]} ` -func TestReplayCatchup(t *testing.T) { +func TestReplayWithoutSig(t *testing.T) { + // write the needed wal to file + f, err := ioutil.TempFile(os.TempDir(), "replay_test_") + if err != nil { + panic(err) + } + _, err = f.WriteString(testLog1) + if err != nil { + panic(err) + } + f.Close() + + cs := fixedConsensusState() + + // we've already precommitted on the first block + // without replay catchup we would be halted here forever + cs.privValidator.LastHeight = 1 // first block + cs.privValidator.LastStep = 2 // prevote + + newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 0) + cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) { + // Set LastSig + + // unmarshal log2 + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(testLog2), &err) + vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + + cs.privValidator.LastSignature = vote.Vote.Signature + }) + + // start timeout and receive routines + cs.startRoutines(0) + + // open wal and run catchup messages + openWAL(t, cs, f.Name()) + if err := cs.catchupReplay(cs.Height); err != nil { + panic(Fmt("Error on catchup replay %v", err)) + } + + after := time.After(time.Second * 15) + select { + case <-newBlockCh: + case <-after: + panic("Timed out waiting for new block") + } +} + +func TestReplayWithSig(t *testing.T) { // write the needed wal to file f, err := ioutil.TempFile(os.TempDir(), "replay_test_") if err != nil { panic(err) } - name := f.Name() - _, err = f.WriteString(testLog) + _, err = f.WriteString(testLog1 + testLog2 + testLog3) if err != nil { panic(err) } @@ -82,7 +143,7 @@ func TestReplayCatchup(t *testing.T) { cs.startRoutines(0) // open wal and run catchup messages - openWAL(t, cs, name) + openWAL(t, cs, f.Name()) if err := cs.catchupReplay(cs.Height); err != nil { panic(Fmt("Error on catchup replay %v", err)) } diff --git a/types/priv_validator.go b/types/priv_validator.go index 30fdfff76..e9046c5a4 100644 --- a/types/priv_validator.go +++ b/types/priv_validator.go @@ -35,11 +35,12 @@ func voteToStep(vote *Vote) int8 { } type PrivValidator struct { - Address []byte `json:"address"` - PubKey crypto.PubKey `json:"pub_key"` - LastHeight int `json:"last_height"` - LastRound int `json:"last_round"` - LastStep int8 `json:"last_step"` + Address []byte `json:"address"` + PubKey crypto.PubKey `json:"pub_key"` + LastHeight int `json:"last_height"` + LastRound int `json:"last_round"` + LastStep int8 `json:"last_step"` + LastSignature crypto.Signature `json:"last_signature"` // so we dont lose signatures // PrivKey should be empty if a Signer other than the default is being used. PrivKey crypto.PrivKey `json:"priv_key"` @@ -85,14 +86,15 @@ func GenPrivValidator() *PrivValidator { pubKey := crypto.PubKeyEd25519(*pubKeyBytes) privKey := crypto.PrivKeyEd25519(*privKeyBytes) return &PrivValidator{ - Address: pubKey.Address(), - PubKey: pubKey, - PrivKey: privKey, - LastHeight: 0, - LastRound: 0, - LastStep: stepNone, - filePath: "", - Signer: NewDefaultSigner(privKey), + Address: pubKey.Address(), + PubKey: pubKey, + PrivKey: privKey, + LastHeight: 0, + LastRound: 0, + LastStep: stepNone, + LastSignature: nil, + filePath: "", + Signer: NewDefaultSigner(privKey), } } @@ -152,53 +154,60 @@ func (privVal *PrivValidator) save() { func (privVal *PrivValidator) SignVote(chainID string, vote *Vote) error { privVal.mtx.Lock() defer privVal.mtx.Unlock() + signature, err := privVal.signBytesHRS(vote.Height, vote.Round, voteToStep(vote), SignBytes(chainID, vote)) + if err != nil { + return errors.New(Fmt("Error signing vote: %v", err)) + } + vote.Signature = signature.(crypto.SignatureEd25519) + return nil +} + +func (privVal *PrivValidator) SignProposal(chainID string, proposal *Proposal) error { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + signature, err := privVal.signBytesHRS(proposal.Height, proposal.Round, stepPropose, SignBytes(chainID, proposal)) + if err != nil { + return errors.New(Fmt("Error signing proposal: %v", err)) + } + proposal.Signature = signature.(crypto.SignatureEd25519) + return nil +} - // If height regression, panic - if privVal.LastHeight > vote.Height { - return errors.New("Height regression in SignVote") +// check if there's a regression. Else sign and write the hrs+signature to disk +func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signBytes []byte) (crypto.Signature, error) { + // If height regression, err + if privVal.LastHeight > height { + return nil, errors.New("Height regression") } // More cases for when the height matches - if privVal.LastHeight == vote.Height { - // If round regression, panic - if privVal.LastRound > vote.Round { - return errors.New("Round regression in SignVote") + if privVal.LastHeight == height { + // If round regression, err + if privVal.LastRound > round { + return nil, errors.New("Round regression") } - // If step regression, panic - if privVal.LastRound == vote.Round && privVal.LastStep > voteToStep(vote) { - return errors.New("Step regression in SignVote") + // If step regression, err + if privVal.LastRound == round { + if privVal.LastStep > step { + return nil, errors.New("Step regression") + } else if privVal.LastStep == step { + if privVal.LastSignature != nil { + return privVal.LastSignature, nil + } + } } } + // Sign + signature := privVal.Sign(signBytes) + // Persist height/round/step - privVal.LastHeight = vote.Height - privVal.LastRound = vote.Round - privVal.LastStep = voteToStep(vote) + privVal.LastHeight = height + privVal.LastRound = round + privVal.LastStep = step + privVal.LastSignature = signature privVal.save() - // Sign - vote.Signature = privVal.Sign(SignBytes(chainID, vote)).(crypto.SignatureEd25519) - return nil -} - -func (privVal *PrivValidator) SignProposal(chainID string, proposal *Proposal) error { - privVal.mtx.Lock() - defer privVal.mtx.Unlock() - if privVal.LastHeight < proposal.Height || - privVal.LastHeight == proposal.Height && privVal.LastRound < proposal.Round || - privVal.LastHeight == 0 && privVal.LastRound == 0 && privVal.LastStep == stepNone { - - // Persist height/round/step - privVal.LastHeight = proposal.Height - privVal.LastRound = proposal.Round - privVal.LastStep = stepPropose - privVal.save() - - // Sign - proposal.Signature = privVal.Sign(SignBytes(chainID, proposal)).(crypto.SignatureEd25519) - return nil - } else { - return errors.New(fmt.Sprintf("Attempt of duplicate signing of proposal: Height %v, Round %v", proposal.Height, proposal.Round)) - } + return signature, nil } func (privVal *PrivValidator) String() string { From 1110c5d37db47a9f2b9a4cce665e54eb46cd4617 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 14 Aug 2016 12:31:24 -0400 Subject: [PATCH 2/4] privVal.LastSignBytes and more replay tests --- cmd/tendermint/reset_priv_validator.go | 5 +- consensus/common_test.go | 2 +- consensus/reactor.go | 1 + consensus/replay.go | 29 ++-- consensus/replay_test.go | 192 ++++++++++++++++--------- consensus/state.go | 16 ++- types/priv_validator.go | 27 +++- 7 files changed, 177 insertions(+), 95 deletions(-) diff --git a/cmd/tendermint/reset_priv_validator.go b/cmd/tendermint/reset_priv_validator.go index 795823c39..2887c10d0 100644 --- a/cmd/tendermint/reset_priv_validator.go +++ b/cmd/tendermint/reset_priv_validator.go @@ -22,10 +22,7 @@ func reset_priv_validator() { privValidatorFile := config.GetString("priv_validator_file") if _, err := os.Stat(privValidatorFile); err == nil { privValidator = types.LoadPrivValidator(privValidatorFile) - privValidator.LastHeight = 0 - privValidator.LastRound = 0 - privValidator.LastStep = 0 - privValidator.Save() + privValidator.Reset() log.Notice("Reset PrivValidator", "file", privValidatorFile) } else { privValidator = types.GenPrivValidator() diff --git a/consensus/common_test.go b/consensus/common_test.go index 693edbca1..7f55fc29f 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -316,8 +316,8 @@ func fixedConsensusState() *ConsensusState { state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) privValidatorFile := config.GetString("priv_validator_file") privValidator := types.LoadOrGenPrivValidator(privValidatorFile) + privValidator.Reset() return newConsensusState(state, privValidator, counter.NewCounterApplication(true)) - } func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState { diff --git a/consensus/reactor.go b/consensus/reactor.go index 98f6510dd..49dc73c2c 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -140,6 +140,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // Messages affect either a peer state or the consensus state. // Peer state updates can happen in parallel, but processing of // proposals, block parts, and votes are ordered by the receiveRoutine +// NOTE: blocks on consensus state for proposals, block parts, and votes func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { if !conR.IsRunning() { log.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes) diff --git a/consensus/replay.go b/consensus/replay.go index ec85be095..c729fca27 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -19,6 +19,8 @@ import ( ) // unmarshal and apply a single message to the consensus state +// as if it were received in receiveRoutine +// NOTE: receiveRoutine should not be running func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error { var err error var msg ConsensusLogMessage @@ -31,7 +33,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte // for logging switch m := msg.Msg.(type) { case types.EventDataRoundState: - log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) + log.Notice("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step) // these are playback checks ticker := time.After(time.Second * 2) if newStepCh != nil { @@ -53,31 +55,28 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte switch msg := m.Msg.(type) { case *ProposalMessage: p := msg.Proposal - log.Notice("Proposal", "height", p.Height, "round", p.Round, "header", + log.Notice("Replay: Proposal", "height", p.Height, "round", p.Round, "header", p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey) case *BlockPartMessage: - log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) + log.Notice("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) case *VoteMessage: v := msg.Vote - log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type, + log.Notice("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type, "hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey) } - // internal or from peer - if m.PeerKey == "" { - cs.internalMsgQueue <- m - } else { - cs.peerMsgQueue <- m - } + + cs.handleMsg(m, cs.RoundState) case timeoutInfo: - log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) - cs.tockChan <- m + log.Notice("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) + cs.handleTimeout(m, cs.RoundState) default: - return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) + return fmt.Errorf("Replay: Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) } return nil } -// replay only those messages since the last block +// replay only those messages since the last block. +// timeoutRoutine should run concurrently to read off tickChan func (cs *ConsensusState) catchupReplay(height int) error { if cs.wal == nil { log.Warn("consensus msg log is nil") @@ -142,7 +141,7 @@ func (cs *ConsensusState) catchupReplay(height int) error { return err } } - log.Info("Done catchup replay") + log.Notice("Done catchup replay") return nil } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 2e53f5199..56d8e684b 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -1,8 +1,10 @@ package consensus import ( + "fmt" "io/ioutil" "os" + "strings" "testing" "time" @@ -48,69 +50,47 @@ import ( ``` */ -var testLog1 = `{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]} +var testLog = `{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]} {"time":"2016-04-03T11:23:54.388Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-04-03T11:23:54.388Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"pol_round":-1,"signature":"3A2ECD5023B21EC144EC16CFF1B992A4321317B83EEDD8969FDFEA6EB7BF4389F38DDA3E7BB109D63A07491C16277A197B241CF1F05F5E485C59882ECACD9E07"}}],"peer_key":""}]} {"time":"2016-04-03T11:23:54.389Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":0,"bytes":"0101010F74656E6465726D696E745F7465737401011441D59F4B718AC00000000000000114C4B01D3810579550997AC5641E759E20D99B51C10001000100","proof":{"aunts":[]}}}],"peer_key":""}]} {"time":"2016-04-03T11:23:54.390Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrevote"}]} -` - -// continuation; splitting allows us to test saving the privVal.LastSignature -// ... to test the case when we sign but crash before writing to the wal, -// we only run replay on testLog1 but stick this signature in the privVal.LastSignature after the proposal -var testLog2 = `{"time":"2016-04-03T11:23:54.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"47D2A75A4E2F15DB1F0D1B656AC0637AF9AADDFEB6A156874F6553C73895E5D5DC948DBAEF15E61276C5342D0E638DFCB77C971CD282096EA8735A564A90F008"}}],"peer_key":""}]} -` - -// continuation; splitting allows us to test saving the privVal.LastSignature -var testLog3 = `{"time":"2016-04-03T11:23:54.392Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} +{"time":"2016-04-03T11:23:54.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"47D2A75A4E2F15DB1F0D1B656AC0637AF9AADDFEB6A156874F6553C73895E5D5DC948DBAEF15E61276C5342D0E638DFCB77C971CD282096EA8735A564A90F008"}}],"peer_key":""}]} +{"time":"2016-04-03T11:23:54.392Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} {"time":"2016-04-03T11:23:54.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"39147DA595F08B73CF8C899967C8403B5872FD9042FFA4E239159E0B6C5D9665C9CA81D766EACA2AE658872F94C2FCD1E34BF51859CD5B274DA8512BACE4B50D"}}],"peer_key":""}]} ` -func TestReplayWithoutSig(t *testing.T) { +// map lines in the above wal to privVal step +var mapPrivValStep = map[int]int8{ + 0: 0, + 1: 0, + 2: 1, + 3: 1, + 4: 1, + 5: 2, + 6: 2, + 7: 3, +} + +func writeWAL(log string) string { + fmt.Println("writing", log) // write the needed wal to file f, err := ioutil.TempFile(os.TempDir(), "replay_test_") if err != nil { panic(err) } - _, err = f.WriteString(testLog1) + + _, err = f.WriteString(log) if err != nil { panic(err) } + name := f.Name() f.Close() + return name +} - cs := fixedConsensusState() - - // we've already precommitted on the first block - // without replay catchup we would be halted here forever - cs.privValidator.LastHeight = 1 // first block - cs.privValidator.LastStep = 2 // prevote - - newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 0) - cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) { - // Set LastSig - - // unmarshal log2 - var err error - var msg ConsensusLogMessage - wire.ReadJSON(&msg, []byte(testLog2), &err) - vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) - if err != nil { - t.Fatalf("Error reading json data: %v", err) - } - - cs.privValidator.LastSignature = vote.Vote.Signature - }) - - // start timeout and receive routines - cs.startRoutines(0) - - // open wal and run catchup messages - openWAL(t, cs, f.Name()) - if err := cs.catchupReplay(cs.Height); err != nil { - panic(Fmt("Error on catchup replay %v", err)) - } - - after := time.After(time.Second * 15) +func waitForBlock(newBlockCh chan interface{}) { + after := time.After(time.Second * 10) select { case <-newBlockCh: case <-after: @@ -118,42 +98,47 @@ func TestReplayWithoutSig(t *testing.T) { } } -func TestReplayWithSig(t *testing.T) { - // write the needed wal to file - f, err := ioutil.TempFile(os.TempDir(), "replay_test_") - if err != nil { - panic(err) +func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh chan interface{}) { + // open wal and run catchup messages + openWAL(t, cs, fileName) + go cs.timeoutRoutine() + if err := cs.catchupReplay(cs.Height); err != nil { + panic(Fmt("Error on catchup replay %v", err)) } - _, err = f.WriteString(testLog1 + testLog2 + testLog3) - if err != nil { - panic(err) + go cs.receiveRoutine(0) + // wait to make a new block + waitForBlock(newBlockCh) + cs.QuitService.OnStop() +} + +func setupReplayTest(nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) { + fmt.Println("-------------------------------------") + log.Notice(Fmt("Starting replay test of %d lines of WAL (crash before write)", nLines)) + + lineStep := nLines + if crashAfter { + lineStep -= 1 } - f.Close() + + split := strings.Split(testLog, "\n") + lastMsg := split[nLines] + + // we write those lines up to (not including) one with the signature + fileName := writeWAL(strings.Join(split[:nLines], "\n") + "\n") cs := fixedConsensusState() + cs.QuitService.OnStart() // we've already precommitted on the first block // without replay catchup we would be halted here forever cs.privValidator.LastHeight = 1 // first block - cs.privValidator.LastStep = 3 // precommit + cs.privValidator.LastStep = mapPrivValStep[lineStep] - newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 0) + fmt.Println("LAST STEP", cs.privValidator.LastStep) - // start timeout and receive routines - cs.startRoutines(0) + newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) - // open wal and run catchup messages - openWAL(t, cs, f.Name()) - if err := cs.catchupReplay(cs.Height); err != nil { - panic(Fmt("Error on catchup replay %v", err)) - } - - after := time.After(time.Second * 15) - select { - case <-newBlockCh: - case <-after: - panic("Timed out waiting for new block") - } + return cs, newBlockCh, lastMsg, fileName } func openWAL(t *testing.T, cs *ConsensusState, file string) { @@ -165,3 +150,68 @@ func openWAL(t *testing.T, cs *ConsensusState, file string) { wal.exists = true cs.wal = wal } + +//----------------------------------------------- +// Test the log at every iteration, and set the privVal last step +// as if the log was written after signing, before the crash + +func TestReplayCrashAfterWrite(t *testing.T) { + split := strings.Split(testLog, "\n") + for i := 0; i < len(split)-1; i++ { + cs, newBlockCh, _, f := setupReplayTest(i+1, true) + runReplayTest(t, cs, f, newBlockCh) + } +} + +//----------------------------------------------- +// Test the log as if we crashed after signing but before writing. +// This relies on privValidator.LastSignature being set + +func TestReplayCrashBeforeWritePropose(t *testing.T) { + cs, newBlockCh, proposalMsg, f := setupReplayTest(2, false) // propose + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(proposalMsg), &err) + proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) + cs.privValidator.LastSignature = proposal.Proposal.Signature + runReplayTest(t, cs, f, newBlockCh) +} + +func TestReplayCrashBeforeWritePrevote(t *testing.T) { + cs, newBlockCh, voteMsg, f := setupReplayTest(5, false) // prevote + cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) { + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(voteMsg), &err) + vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) + cs.privValidator.LastSignature = vote.Vote.Signature + }) + runReplayTest(t, cs, f, newBlockCh) +} + +func TestReplayCrashBeforeWritePrecommit(t *testing.T) { + cs, newBlockCh, voteMsg, f := setupReplayTest(7, false) // precommit + cs.evsw.AddListenerForEvent("tester", types.EventStringPolka(), func(data events.EventData) { + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(voteMsg), &err) + vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) + cs.privValidator.LastSignature = vote.Vote.Signature + }) + runReplayTest(t, cs, f, newBlockCh) +} diff --git a/consensus/state.go b/consensus/state.go index 192253192..564f02529 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -298,8 +298,17 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { func (cs *ConsensusState) OnStart() error { cs.QuitService.OnStart() - // start timeout and receive routines - cs.startRoutines(0) + err := cs.OpenWAL(cs.config.GetString("cswal")) + if err != nil { + return err + } + + // start timeout routine + // NOTE: we dont start receiveRoutine until after replay + // so we dont re-write events, and so we dont process + // peer msgs before replay on app restarts. + // timeoutRoutine needed to read off tickChan during replay + go cs.timeoutRoutine() // we may have lost some votes if the process crashed // reload from consensus log to catchup @@ -308,6 +317,9 @@ func (cs *ConsensusState) OnStart() error { // let's go for it anyways, maybe we're fine } + // start + go cs.receiveRoutine(0) + // schedule the first round! cs.scheduleRound0(cs.Height) diff --git a/types/priv_validator.go b/types/priv_validator.go index e9046c5a4..5700fdf59 100644 --- a/types/priv_validator.go +++ b/types/priv_validator.go @@ -41,6 +41,7 @@ type PrivValidator struct { LastRound int `json:"last_round"` LastStep int8 `json:"last_step"` LastSignature crypto.Signature `json:"last_signature"` // so we dont lose signatures + LastSignBytes []byte `json:"last_signbytes"` // so we dont lose signatures // PrivKey should be empty if a Signer other than the default is being used. PrivKey crypto.PrivKey `json:"priv_key"` @@ -93,6 +94,7 @@ func GenPrivValidator() *PrivValidator { LastRound: 0, LastStep: stepNone, LastSignature: nil, + LastSignBytes: nil, filePath: "", Signer: NewDefaultSigner(privKey), } @@ -151,6 +153,16 @@ func (privVal *PrivValidator) save() { } } +// NOTE: Unsafe! +func (privVal *PrivValidator) Reset() { + privVal.LastHeight = 0 + privVal.LastRound = 0 + privVal.LastStep = 0 + privVal.LastSignature = nil + privVal.LastSignBytes = nil + privVal.Save() +} + func (privVal *PrivValidator) SignVote(chainID string, vote *Vote) error { privVal.mtx.Lock() defer privVal.mtx.Unlock() @@ -190,9 +202,19 @@ func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signByt if privVal.LastStep > step { return nil, errors.New("Step regression") } else if privVal.LastStep == step { - if privVal.LastSignature != nil { - return privVal.LastSignature, nil + if privVal.LastSignBytes != nil { + if privVal.LastSignature == nil { + PanicSanity("privVal: LastSignature is nil but LastSignBytes is not!") + } + // so we dont sign a conflicting vote or proposal + // NOTE: proposals are non-deterministic (include time), + // so we can actually lose them, but will still never sign conflicting ones + if bytes.Equal(privVal.LastSignBytes, signBytes) { + log.Notice("Using privVal.LastSignature", "sig", privVal.LastSignature) + return privVal.LastSignature, nil + } } + return nil, errors.New("Step regression") } } } @@ -205,6 +227,7 @@ func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signByt privVal.LastRound = round privVal.LastStep = step privVal.LastSignature = signature + privVal.LastSignBytes = signBytes privVal.save() return signature, nil From 3998bdbfc16e3034926bbaa04e3fcf9288b6958c Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 17 Aug 2016 23:08:43 -0400 Subject: [PATCH 3/4] fixes from review --- consensus/common_test.go | 3 ++- consensus/replay.go | 10 +++------- consensus/replay_test.go | 25 ++++--------------------- consensus/wal.go | 8 ++++++++ 4 files changed, 17 insertions(+), 29 deletions(-) diff --git a/consensus/common_test.go b/consensus/common_test.go index 7f55fc29f..3e05d2564 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -317,7 +317,8 @@ func fixedConsensusState() *ConsensusState { privValidatorFile := config.GetString("priv_validator_file") privValidator := types.LoadOrGenPrivValidator(privValidatorFile) privValidator.Reset() - return newConsensusState(state, privValidator, counter.NewCounterApplication(true)) + cs := newConsensusState(state, privValidator, counter.NewCounterApplication(true)) + return cs } func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState { diff --git a/consensus/replay.go b/consensus/replay.go index c729fca27..19c117eac 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -78,12 +78,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte // replay only those messages since the last block. // timeoutRoutine should run concurrently to read off tickChan func (cs *ConsensusState) catchupReplay(height int) error { - if cs.wal == nil { - log.Warn("consensus msg log is nil") - return nil - } - if !cs.wal.exists { - // new wal, nothing to catchup on + if !cs.wal.Exists() { return nil } @@ -254,8 +249,9 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { } func (cs *ConsensusState) startForReplay() { + // don't want to start full cs cs.BaseService.OnStart() - go cs.receiveRoutine(0) + // since we replay tocks we just ignore ticks go func() { for { diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 56d8e684b..a9376d0e4 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -99,16 +99,11 @@ func waitForBlock(newBlockCh chan interface{}) { } func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh chan interface{}) { - // open wal and run catchup messages - openWAL(t, cs, fileName) - go cs.timeoutRoutine() - if err := cs.catchupReplay(cs.Height); err != nil { - panic(Fmt("Error on catchup replay %v", err)) - } - go cs.receiveRoutine(0) + cs.config.Set("cswal", fileName) + cs.Start() // wait to make a new block waitForBlock(newBlockCh) - cs.QuitService.OnStop() + cs.Stop() } func setupReplayTest(nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) { @@ -127,10 +122,8 @@ func setupReplayTest(nLines int, crashAfter bool) (*ConsensusState, chan interfa fileName := writeWAL(strings.Join(split[:nLines], "\n") + "\n") cs := fixedConsensusState() - cs.QuitService.OnStart() - // we've already precommitted on the first block - // without replay catchup we would be halted here forever + // set the last step according to when we crashed vs the wal cs.privValidator.LastHeight = 1 // first block cs.privValidator.LastStep = mapPrivValStep[lineStep] @@ -141,16 +134,6 @@ func setupReplayTest(nLines int, crashAfter bool) (*ConsensusState, chan interfa return cs, newBlockCh, lastMsg, fileName } -func openWAL(t *testing.T, cs *ConsensusState, file string) { - // open the wal - wal, err := NewWAL(file, config.GetBool("cswal_light")) - if err != nil { - panic(err) - } - wal.exists = true - cs.wal = wal -} - //----------------------------------------------- // Test the log at every iteration, and set the privVal last step // as if the log was written after signing, before the crash diff --git a/consensus/wal.go b/consensus/wal.go index 5b4747a6f..fb5b80568 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -60,6 +60,14 @@ func NewWAL(file string, light bool) (*WAL, error) { }, nil } +func (wal *WAL) Exists() bool { + if wal == nil { + log.Warn("consensus msg log is nil") + return false + } + return wal.exists +} + // called in newStep and for each pass in receiveRoutine func (wal *WAL) Save(clm ConsensusLogMessageInterface) { if wal != nil { From 678599c7d4efc36978d5a075488ba52da9205310 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 23 Aug 2016 11:33:18 -0400 Subject: [PATCH 4/4] consensus: add note about replay test --- consensus/replay_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/consensus/replay_test.go b/consensus/replay_test.go index a9376d0e4..654cec9c2 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -101,7 +101,10 @@ func waitForBlock(newBlockCh chan interface{}) { func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh chan interface{}) { cs.config.Set("cswal", fileName) cs.Start() - // wait to make a new block + // Wait to make a new block. + // This is just a signal that we haven't halted; its not something contained in the WAL itself. + // Assuming the consensus state is running, replay of any WAL, including the empty one, + // should eventually be followed by a new block, or else something is wrong waitForBlock(newBlockCh) cs.Stop() }