diff --git a/cmd/tendermint/reset_priv_validator.go b/cmd/tendermint/reset_priv_validator.go index 795823c39..2887c10d0 100644 --- a/cmd/tendermint/reset_priv_validator.go +++ b/cmd/tendermint/reset_priv_validator.go @@ -22,10 +22,7 @@ func reset_priv_validator() { privValidatorFile := config.GetString("priv_validator_file") if _, err := os.Stat(privValidatorFile); err == nil { privValidator = types.LoadPrivValidator(privValidatorFile) - privValidator.LastHeight = 0 - privValidator.LastRound = 0 - privValidator.LastStep = 0 - privValidator.Save() + privValidator.Reset() log.Notice("Reset PrivValidator", "file", privValidatorFile) } else { privValidator = types.GenPrivValidator() diff --git a/consensus/common_test.go b/consensus/common_test.go index 693edbca1..3e05d2564 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -316,8 +316,9 @@ func fixedConsensusState() *ConsensusState { state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) privValidatorFile := config.GetString("priv_validator_file") privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - return newConsensusState(state, privValidator, counter.NewCounterApplication(true)) - + privValidator.Reset() + cs := newConsensusState(state, privValidator, counter.NewCounterApplication(true)) + return cs } func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState { diff --git a/consensus/reactor.go b/consensus/reactor.go index 98f6510dd..49dc73c2c 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -140,6 +140,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // Messages affect either a peer state or the consensus state. // Peer state updates can happen in parallel, but processing of // proposals, block parts, and votes are ordered by the receiveRoutine +// NOTE: blocks on consensus state for proposals, block parts, and votes func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { if !conR.IsRunning() { log.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes) diff --git a/consensus/replay.go b/consensus/replay.go index ec85be095..19c117eac 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -19,6 +19,8 @@ import ( ) // unmarshal and apply a single message to the consensus state +// as if it were received in receiveRoutine +// NOTE: receiveRoutine should not be running func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error { var err error var msg ConsensusLogMessage @@ -31,7 +33,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte // for logging switch m := msg.Msg.(type) { case types.EventDataRoundState: - log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) + log.Notice("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step) // these are playback checks ticker := time.After(time.Second * 2) if newStepCh != nil { @@ -53,38 +55,30 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte switch msg := m.Msg.(type) { case *ProposalMessage: p := msg.Proposal - log.Notice("Proposal", "height", p.Height, "round", p.Round, "header", + log.Notice("Replay: Proposal", "height", p.Height, "round", p.Round, "header", p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey) case *BlockPartMessage: - log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) + log.Notice("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) case *VoteMessage: v := msg.Vote - log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type, + log.Notice("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type, "hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey) } - // internal or from peer - if m.PeerKey == "" { - cs.internalMsgQueue <- m - } else { - cs.peerMsgQueue <- m - } + + cs.handleMsg(m, cs.RoundState) case timeoutInfo: - log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) - cs.tockChan <- m + log.Notice("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) + cs.handleTimeout(m, cs.RoundState) default: - return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) + return fmt.Errorf("Replay: Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) } return nil } -// replay only those messages since the last block +// replay only those messages since the last block. +// timeoutRoutine should run concurrently to read off tickChan func (cs *ConsensusState) catchupReplay(height int) error { - if cs.wal == nil { - log.Warn("consensus msg log is nil") - return nil - } - if !cs.wal.exists { - // new wal, nothing to catchup on + if !cs.wal.Exists() { return nil } @@ -142,7 +136,7 @@ func (cs *ConsensusState) catchupReplay(height int) error { return err } } - log.Info("Done catchup replay") + log.Notice("Done catchup replay") return nil } @@ -255,8 +249,9 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { } func (cs *ConsensusState) startForReplay() { + // don't want to start full cs cs.BaseService.OnStart() - go cs.receiveRoutine(0) + // since we replay tocks we just ignore ticks go func() { for { diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 46e862ed2..654cec9c2 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -1,12 +1,16 @@ package consensus import ( + "fmt" "io/ioutil" "os" + "strings" "testing" "time" . "github.com/tendermint/go-common" + "github.com/tendermint/go-events" + "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" ) @@ -56,51 +60,144 @@ var testLog = `{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254 {"time":"2016-04-03T11:23:54.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"39147DA595F08B73CF8C899967C8403B5872FD9042FFA4E239159E0B6C5D9665C9CA81D766EACA2AE658872F94C2FCD1E34BF51859CD5B274DA8512BACE4B50D"}}],"peer_key":""}]} ` -func TestReplayCatchup(t *testing.T) { +// map lines in the above wal to privVal step +var mapPrivValStep = map[int]int8{ + 0: 0, + 1: 0, + 2: 1, + 3: 1, + 4: 1, + 5: 2, + 6: 2, + 7: 3, +} + +func writeWAL(log string) string { + fmt.Println("writing", log) // write the needed wal to file f, err := ioutil.TempFile(os.TempDir(), "replay_test_") if err != nil { panic(err) } - name := f.Name() - _, err = f.WriteString(testLog) + + _, err = f.WriteString(log) if err != nil { panic(err) } + name := f.Name() f.Close() + return name +} + +func waitForBlock(newBlockCh chan interface{}) { + after := time.After(time.Second * 10) + select { + case <-newBlockCh: + case <-after: + panic("Timed out waiting for new block") + } +} + +func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh chan interface{}) { + cs.config.Set("cswal", fileName) + cs.Start() + // Wait to make a new block. + // This is just a signal that we haven't halted; its not something contained in the WAL itself. + // Assuming the consensus state is running, replay of any WAL, including the empty one, + // should eventually be followed by a new block, or else something is wrong + waitForBlock(newBlockCh) + cs.Stop() +} + +func setupReplayTest(nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) { + fmt.Println("-------------------------------------") + log.Notice(Fmt("Starting replay test of %d lines of WAL (crash before write)", nLines)) + + lineStep := nLines + if crashAfter { + lineStep -= 1 + } + + split := strings.Split(testLog, "\n") + lastMsg := split[nLines] + + // we write those lines up to (not including) one with the signature + fileName := writeWAL(strings.Join(split[:nLines], "\n") + "\n") cs := fixedConsensusState() - // we've already precommitted on the first block - // without replay catchup we would be halted here forever + // set the last step according to when we crashed vs the wal cs.privValidator.LastHeight = 1 // first block - cs.privValidator.LastStep = 3 // precommit + cs.privValidator.LastStep = mapPrivValStep[lineStep] - newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 0) + fmt.Println("LAST STEP", cs.privValidator.LastStep) - // start timeout and receive routines - cs.startRoutines(0) + newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) - // open wal and run catchup messages - openWAL(t, cs, name) - if err := cs.catchupReplay(cs.Height); err != nil { - panic(Fmt("Error on catchup replay %v", err)) - } + return cs, newBlockCh, lastMsg, fileName +} - after := time.After(time.Second * 15) - select { - case <-newBlockCh: - case <-after: - panic("Timed out waiting for new block") +//----------------------------------------------- +// Test the log at every iteration, and set the privVal last step +// as if the log was written after signing, before the crash + +func TestReplayCrashAfterWrite(t *testing.T) { + split := strings.Split(testLog, "\n") + for i := 0; i < len(split)-1; i++ { + cs, newBlockCh, _, f := setupReplayTest(i+1, true) + runReplayTest(t, cs, f, newBlockCh) } } -func openWAL(t *testing.T, cs *ConsensusState, file string) { - // open the wal - wal, err := NewWAL(file, config.GetBool("cswal_light")) +//----------------------------------------------- +// Test the log as if we crashed after signing but before writing. +// This relies on privValidator.LastSignature being set + +func TestReplayCrashBeforeWritePropose(t *testing.T) { + cs, newBlockCh, proposalMsg, f := setupReplayTest(2, false) // propose + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(proposalMsg), &err) + proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) if err != nil { - panic(err) + t.Fatalf("Error reading json data: %v", err) } - wal.exists = true - cs.wal = wal + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) + cs.privValidator.LastSignature = proposal.Proposal.Signature + runReplayTest(t, cs, f, newBlockCh) +} + +func TestReplayCrashBeforeWritePrevote(t *testing.T) { + cs, newBlockCh, voteMsg, f := setupReplayTest(5, false) // prevote + cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) { + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(voteMsg), &err) + vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) + cs.privValidator.LastSignature = vote.Vote.Signature + }) + runReplayTest(t, cs, f, newBlockCh) +} + +func TestReplayCrashBeforeWritePrecommit(t *testing.T) { + cs, newBlockCh, voteMsg, f := setupReplayTest(7, false) // precommit + cs.evsw.AddListenerForEvent("tester", types.EventStringPolka(), func(data events.EventData) { + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(voteMsg), &err) + vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) + cs.privValidator.LastSignature = vote.Vote.Signature + }) + runReplayTest(t, cs, f, newBlockCh) } diff --git a/consensus/state.go b/consensus/state.go index 192253192..564f02529 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -298,8 +298,17 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { func (cs *ConsensusState) OnStart() error { cs.QuitService.OnStart() - // start timeout and receive routines - cs.startRoutines(0) + err := cs.OpenWAL(cs.config.GetString("cswal")) + if err != nil { + return err + } + + // start timeout routine + // NOTE: we dont start receiveRoutine until after replay + // so we dont re-write events, and so we dont process + // peer msgs before replay on app restarts. + // timeoutRoutine needed to read off tickChan during replay + go cs.timeoutRoutine() // we may have lost some votes if the process crashed // reload from consensus log to catchup @@ -308,6 +317,9 @@ func (cs *ConsensusState) OnStart() error { // let's go for it anyways, maybe we're fine } + // start + go cs.receiveRoutine(0) + // schedule the first round! cs.scheduleRound0(cs.Height) diff --git a/consensus/wal.go b/consensus/wal.go index 5b4747a6f..fb5b80568 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -60,6 +60,14 @@ func NewWAL(file string, light bool) (*WAL, error) { }, nil } +func (wal *WAL) Exists() bool { + if wal == nil { + log.Warn("consensus msg log is nil") + return false + } + return wal.exists +} + // called in newStep and for each pass in receiveRoutine func (wal *WAL) Save(clm ConsensusLogMessageInterface) { if wal != nil { diff --git a/types/priv_validator.go b/types/priv_validator.go index 30fdfff76..5700fdf59 100644 --- a/types/priv_validator.go +++ b/types/priv_validator.go @@ -35,11 +35,13 @@ func voteToStep(vote *Vote) int8 { } type PrivValidator struct { - Address []byte `json:"address"` - PubKey crypto.PubKey `json:"pub_key"` - LastHeight int `json:"last_height"` - LastRound int `json:"last_round"` - LastStep int8 `json:"last_step"` + Address []byte `json:"address"` + PubKey crypto.PubKey `json:"pub_key"` + LastHeight int `json:"last_height"` + LastRound int `json:"last_round"` + LastStep int8 `json:"last_step"` + LastSignature crypto.Signature `json:"last_signature"` // so we dont lose signatures + LastSignBytes []byte `json:"last_signbytes"` // so we dont lose signatures // PrivKey should be empty if a Signer other than the default is being used. PrivKey crypto.PrivKey `json:"priv_key"` @@ -85,14 +87,16 @@ func GenPrivValidator() *PrivValidator { pubKey := crypto.PubKeyEd25519(*pubKeyBytes) privKey := crypto.PrivKeyEd25519(*privKeyBytes) return &PrivValidator{ - Address: pubKey.Address(), - PubKey: pubKey, - PrivKey: privKey, - LastHeight: 0, - LastRound: 0, - LastStep: stepNone, - filePath: "", - Signer: NewDefaultSigner(privKey), + Address: pubKey.Address(), + PubKey: pubKey, + PrivKey: privKey, + LastHeight: 0, + LastRound: 0, + LastStep: stepNone, + LastSignature: nil, + LastSignBytes: nil, + filePath: "", + Signer: NewDefaultSigner(privKey), } } @@ -149,56 +153,84 @@ func (privVal *PrivValidator) save() { } } +// NOTE: Unsafe! +func (privVal *PrivValidator) Reset() { + privVal.LastHeight = 0 + privVal.LastRound = 0 + privVal.LastStep = 0 + privVal.LastSignature = nil + privVal.LastSignBytes = nil + privVal.Save() +} + func (privVal *PrivValidator) SignVote(chainID string, vote *Vote) error { privVal.mtx.Lock() defer privVal.mtx.Unlock() + signature, err := privVal.signBytesHRS(vote.Height, vote.Round, voteToStep(vote), SignBytes(chainID, vote)) + if err != nil { + return errors.New(Fmt("Error signing vote: %v", err)) + } + vote.Signature = signature.(crypto.SignatureEd25519) + return nil +} - // If height regression, panic - if privVal.LastHeight > vote.Height { - return errors.New("Height regression in SignVote") +func (privVal *PrivValidator) SignProposal(chainID string, proposal *Proposal) error { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + signature, err := privVal.signBytesHRS(proposal.Height, proposal.Round, stepPropose, SignBytes(chainID, proposal)) + if err != nil { + return errors.New(Fmt("Error signing proposal: %v", err)) + } + proposal.Signature = signature.(crypto.SignatureEd25519) + return nil +} + +// check if there's a regression. Else sign and write the hrs+signature to disk +func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signBytes []byte) (crypto.Signature, error) { + // If height regression, err + if privVal.LastHeight > height { + return nil, errors.New("Height regression") } // More cases for when the height matches - if privVal.LastHeight == vote.Height { - // If round regression, panic - if privVal.LastRound > vote.Round { - return errors.New("Round regression in SignVote") + if privVal.LastHeight == height { + // If round regression, err + if privVal.LastRound > round { + return nil, errors.New("Round regression") } - // If step regression, panic - if privVal.LastRound == vote.Round && privVal.LastStep > voteToStep(vote) { - return errors.New("Step regression in SignVote") + // If step regression, err + if privVal.LastRound == round { + if privVal.LastStep > step { + return nil, errors.New("Step regression") + } else if privVal.LastStep == step { + if privVal.LastSignBytes != nil { + if privVal.LastSignature == nil { + PanicSanity("privVal: LastSignature is nil but LastSignBytes is not!") + } + // so we dont sign a conflicting vote or proposal + // NOTE: proposals are non-deterministic (include time), + // so we can actually lose them, but will still never sign conflicting ones + if bytes.Equal(privVal.LastSignBytes, signBytes) { + log.Notice("Using privVal.LastSignature", "sig", privVal.LastSignature) + return privVal.LastSignature, nil + } + } + return nil, errors.New("Step regression") + } } } + // Sign + signature := privVal.Sign(signBytes) + // Persist height/round/step - privVal.LastHeight = vote.Height - privVal.LastRound = vote.Round - privVal.LastStep = voteToStep(vote) + privVal.LastHeight = height + privVal.LastRound = round + privVal.LastStep = step + privVal.LastSignature = signature + privVal.LastSignBytes = signBytes privVal.save() - // Sign - vote.Signature = privVal.Sign(SignBytes(chainID, vote)).(crypto.SignatureEd25519) - return nil -} - -func (privVal *PrivValidator) SignProposal(chainID string, proposal *Proposal) error { - privVal.mtx.Lock() - defer privVal.mtx.Unlock() - if privVal.LastHeight < proposal.Height || - privVal.LastHeight == proposal.Height && privVal.LastRound < proposal.Round || - privVal.LastHeight == 0 && privVal.LastRound == 0 && privVal.LastStep == stepNone { - - // Persist height/round/step - privVal.LastHeight = proposal.Height - privVal.LastRound = proposal.Round - privVal.LastStep = stepPropose - privVal.save() - - // Sign - proposal.Signature = privVal.Sign(SignBytes(chainID, proposal)).(crypto.SignatureEd25519) - return nil - } else { - return errors.New(fmt.Sprintf("Attempt of duplicate signing of proposal: Height %v, Round %v", proposal.Height, proposal.Round)) - } + return signature, nil } func (privVal *PrivValidator) String() string {