Browse Source

privVal.LastSignBytes and more replay tests

pull/249/head
Ethan Buchman 8 years ago
parent
commit
1110c5d37d
7 changed files with 177 additions and 95 deletions
  1. +1
    -4
      cmd/tendermint/reset_priv_validator.go
  2. +1
    -1
      consensus/common_test.go
  3. +1
    -0
      consensus/reactor.go
  4. +14
    -15
      consensus/replay.go
  5. +121
    -71
      consensus/replay_test.go
  6. +14
    -2
      consensus/state.go
  7. +25
    -2
      types/priv_validator.go

+ 1
- 4
cmd/tendermint/reset_priv_validator.go View File

@ -22,10 +22,7 @@ func reset_priv_validator() {
privValidatorFile := config.GetString("priv_validator_file")
if _, err := os.Stat(privValidatorFile); err == nil {
privValidator = types.LoadPrivValidator(privValidatorFile)
privValidator.LastHeight = 0
privValidator.LastRound = 0
privValidator.LastStep = 0
privValidator.Save()
privValidator.Reset()
log.Notice("Reset PrivValidator", "file", privValidatorFile)
} else {
privValidator = types.GenPrivValidator()


+ 1
- 1
consensus/common_test.go View File

@ -316,8 +316,8 @@ func fixedConsensusState() *ConsensusState {
state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
privValidator.Reset()
return newConsensusState(state, privValidator, counter.NewCounterApplication(true))
}
func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState {


+ 1
- 0
consensus/reactor.go View File

@ -140,6 +140,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// Messages affect either a peer state or the consensus state.
// Peer state updates can happen in parallel, but processing of
// proposals, block parts, and votes are ordered by the receiveRoutine
// NOTE: blocks on consensus state for proposals, block parts, and votes
func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
if !conR.IsRunning() {
log.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)


+ 14
- 15
consensus/replay.go View File

@ -19,6 +19,8 @@ import (
)
// unmarshal and apply a single message to the consensus state
// as if it were received in receiveRoutine
// NOTE: receiveRoutine should not be running
func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error {
var err error
var msg ConsensusLogMessage
@ -31,7 +33,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte
// for logging
switch m := msg.Msg.(type) {
case types.EventDataRoundState:
log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step)
log.Notice("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
// these are playback checks
ticker := time.After(time.Second * 2)
if newStepCh != nil {
@ -53,31 +55,28 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte
switch msg := m.Msg.(type) {
case *ProposalMessage:
p := msg.Proposal
log.Notice("Proposal", "height", p.Height, "round", p.Round, "header",
log.Notice("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey)
case *BlockPartMessage:
log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey)
log.Notice("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey)
case *VoteMessage:
v := msg.Vote
log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type,
log.Notice("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
"hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey)
}
// internal or from peer
if m.PeerKey == "" {
cs.internalMsgQueue <- m
} else {
cs.peerMsgQueue <- m
}
cs.handleMsg(m, cs.RoundState)
case timeoutInfo:
log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.tockChan <- m
log.Notice("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.handleTimeout(m, cs.RoundState)
default:
return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg))
return fmt.Errorf("Replay: Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg))
}
return nil
}
// replay only those messages since the last block
// replay only those messages since the last block.
// timeoutRoutine should run concurrently to read off tickChan
func (cs *ConsensusState) catchupReplay(height int) error {
if cs.wal == nil {
log.Warn("consensus msg log is nil")
@ -142,7 +141,7 @@ func (cs *ConsensusState) catchupReplay(height int) error {
return err
}
}
log.Info("Done catchup replay")
log.Notice("Done catchup replay")
return nil
}


+ 121
- 71
consensus/replay_test.go View File

@ -1,8 +1,10 @@
package consensus
import (
"fmt"
"io/ioutil"
"os"
"strings"
"testing"
"time"
@ -48,69 +50,47 @@ import (
```
*/
var testLog1 = `{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]}
var testLog = `{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]}
{"time":"2016-04-03T11:23:54.388Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
{"time":"2016-04-03T11:23:54.388Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"pol_round":-1,"signature":"3A2ECD5023B21EC144EC16CFF1B992A4321317B83EEDD8969FDFEA6EB7BF4389F38DDA3E7BB109D63A07491C16277A197B241CF1F05F5E485C59882ECACD9E07"}}],"peer_key":""}]}
{"time":"2016-04-03T11:23:54.389Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":0,"bytes":"0101010F74656E6465726D696E745F7465737401011441D59F4B718AC00000000000000114C4B01D3810579550997AC5641E759E20D99B51C10001000100","proof":{"aunts":[]}}}],"peer_key":""}]}
{"time":"2016-04-03T11:23:54.390Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrevote"}]}
`
// continuation; splitting allows us to test saving the privVal.LastSignature
// ... to test the case when we sign but crash before writing to the wal,
// we only run replay on testLog1 but stick this signature in the privVal.LastSignature after the proposal
var testLog2 = `{"time":"2016-04-03T11:23:54.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"47D2A75A4E2F15DB1F0D1B656AC0637AF9AADDFEB6A156874F6553C73895E5D5DC948DBAEF15E61276C5342D0E638DFCB77C971CD282096EA8735A564A90F008"}}],"peer_key":""}]}
`
// continuation; splitting allows us to test saving the privVal.LastSignature
var testLog3 = `{"time":"2016-04-03T11:23:54.392Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]}
{"time":"2016-04-03T11:23:54.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"47D2A75A4E2F15DB1F0D1B656AC0637AF9AADDFEB6A156874F6553C73895E5D5DC948DBAEF15E61276C5342D0E638DFCB77C971CD282096EA8735A564A90F008"}}],"peer_key":""}]}
{"time":"2016-04-03T11:23:54.392Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]}
{"time":"2016-04-03T11:23:54.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"39147DA595F08B73CF8C899967C8403B5872FD9042FFA4E239159E0B6C5D9665C9CA81D766EACA2AE658872F94C2FCD1E34BF51859CD5B274DA8512BACE4B50D"}}],"peer_key":""}]}
`
func TestReplayWithoutSig(t *testing.T) {
// map lines in the above wal to privVal step
var mapPrivValStep = map[int]int8{
0: 0,
1: 0,
2: 1,
3: 1,
4: 1,
5: 2,
6: 2,
7: 3,
}
func writeWAL(log string) string {
fmt.Println("writing", log)
// write the needed wal to file
f, err := ioutil.TempFile(os.TempDir(), "replay_test_")
if err != nil {
panic(err)
}
_, err = f.WriteString(testLog1)
_, err = f.WriteString(log)
if err != nil {
panic(err)
}
name := f.Name()
f.Close()
return name
}
cs := fixedConsensusState()
// we've already precommitted on the first block
// without replay catchup we would be halted here forever
cs.privValidator.LastHeight = 1 // first block
cs.privValidator.LastStep = 2 // prevote
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 0)
cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) {
// Set LastSig
// unmarshal log2
var err error
var msg ConsensusLogMessage
wire.ReadJSON(&msg, []byte(testLog2), &err)
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
if err != nil {
t.Fatalf("Error reading json data: %v", err)
}
cs.privValidator.LastSignature = vote.Vote.Signature
})
// start timeout and receive routines
cs.startRoutines(0)
// open wal and run catchup messages
openWAL(t, cs, f.Name())
if err := cs.catchupReplay(cs.Height); err != nil {
panic(Fmt("Error on catchup replay %v", err))
}
after := time.After(time.Second * 15)
func waitForBlock(newBlockCh chan interface{}) {
after := time.After(time.Second * 10)
select {
case <-newBlockCh:
case <-after:
@ -118,42 +98,47 @@ func TestReplayWithoutSig(t *testing.T) {
}
}
func TestReplayWithSig(t *testing.T) {
// write the needed wal to file
f, err := ioutil.TempFile(os.TempDir(), "replay_test_")
if err != nil {
panic(err)
func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh chan interface{}) {
// open wal and run catchup messages
openWAL(t, cs, fileName)
go cs.timeoutRoutine()
if err := cs.catchupReplay(cs.Height); err != nil {
panic(Fmt("Error on catchup replay %v", err))
}
_, err = f.WriteString(testLog1 + testLog2 + testLog3)
if err != nil {
panic(err)
go cs.receiveRoutine(0)
// wait to make a new block
waitForBlock(newBlockCh)
cs.QuitService.OnStop()
}
func setupReplayTest(nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) {
fmt.Println("-------------------------------------")
log.Notice(Fmt("Starting replay test of %d lines of WAL (crash before write)", nLines))
lineStep := nLines
if crashAfter {
lineStep -= 1
}
f.Close()
split := strings.Split(testLog, "\n")
lastMsg := split[nLines]
// we write those lines up to (not including) one with the signature
fileName := writeWAL(strings.Join(split[:nLines], "\n") + "\n")
cs := fixedConsensusState()
cs.QuitService.OnStart()
// we've already precommitted on the first block
// without replay catchup we would be halted here forever
cs.privValidator.LastHeight = 1 // first block
cs.privValidator.LastStep = 3 // precommit
cs.privValidator.LastStep = mapPrivValStep[lineStep]
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 0)
fmt.Println("LAST STEP", cs.privValidator.LastStep)
// start timeout and receive routines
cs.startRoutines(0)
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1)
// open wal and run catchup messages
openWAL(t, cs, f.Name())
if err := cs.catchupReplay(cs.Height); err != nil {
panic(Fmt("Error on catchup replay %v", err))
}
after := time.After(time.Second * 15)
select {
case <-newBlockCh:
case <-after:
panic("Timed out waiting for new block")
}
return cs, newBlockCh, lastMsg, fileName
}
func openWAL(t *testing.T, cs *ConsensusState, file string) {
@ -165,3 +150,68 @@ func openWAL(t *testing.T, cs *ConsensusState, file string) {
wal.exists = true
cs.wal = wal
}
//-----------------------------------------------
// Test the log at every iteration, and set the privVal last step
// as if the log was written after signing, before the crash
func TestReplayCrashAfterWrite(t *testing.T) {
split := strings.Split(testLog, "\n")
for i := 0; i < len(split)-1; i++ {
cs, newBlockCh, _, f := setupReplayTest(i+1, true)
runReplayTest(t, cs, f, newBlockCh)
}
}
//-----------------------------------------------
// Test the log as if we crashed after signing but before writing.
// This relies on privValidator.LastSignature being set
func TestReplayCrashBeforeWritePropose(t *testing.T) {
cs, newBlockCh, proposalMsg, f := setupReplayTest(2, false) // propose
// Set LastSig
var err error
var msg ConsensusLogMessage
wire.ReadJSON(&msg, []byte(proposalMsg), &err)
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
if err != nil {
t.Fatalf("Error reading json data: %v", err)
}
cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal)
cs.privValidator.LastSignature = proposal.Proposal.Signature
runReplayTest(t, cs, f, newBlockCh)
}
func TestReplayCrashBeforeWritePrevote(t *testing.T) {
cs, newBlockCh, voteMsg, f := setupReplayTest(5, false) // prevote
cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) {
// Set LastSig
var err error
var msg ConsensusLogMessage
wire.ReadJSON(&msg, []byte(voteMsg), &err)
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
if err != nil {
t.Fatalf("Error reading json data: %v", err)
}
cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)
cs.privValidator.LastSignature = vote.Vote.Signature
})
runReplayTest(t, cs, f, newBlockCh)
}
func TestReplayCrashBeforeWritePrecommit(t *testing.T) {
cs, newBlockCh, voteMsg, f := setupReplayTest(7, false) // precommit
cs.evsw.AddListenerForEvent("tester", types.EventStringPolka(), func(data events.EventData) {
// Set LastSig
var err error
var msg ConsensusLogMessage
wire.ReadJSON(&msg, []byte(voteMsg), &err)
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
if err != nil {
t.Fatalf("Error reading json data: %v", err)
}
cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)
cs.privValidator.LastSignature = vote.Vote.Signature
})
runReplayTest(t, cs, f, newBlockCh)
}

+ 14
- 2
consensus/state.go View File

@ -298,8 +298,17 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) {
func (cs *ConsensusState) OnStart() error {
cs.QuitService.OnStart()
// start timeout and receive routines
cs.startRoutines(0)
err := cs.OpenWAL(cs.config.GetString("cswal"))
if err != nil {
return err
}
// start timeout routine
// NOTE: we dont start receiveRoutine until after replay
// so we dont re-write events, and so we dont process
// peer msgs before replay on app restarts.
// timeoutRoutine needed to read off tickChan during replay
go cs.timeoutRoutine()
// we may have lost some votes if the process crashed
// reload from consensus log to catchup
@ -308,6 +317,9 @@ func (cs *ConsensusState) OnStart() error {
// let's go for it anyways, maybe we're fine
}
// start
go cs.receiveRoutine(0)
// schedule the first round!
cs.scheduleRound0(cs.Height)


+ 25
- 2
types/priv_validator.go View File

@ -41,6 +41,7 @@ type PrivValidator struct {
LastRound int `json:"last_round"`
LastStep int8 `json:"last_step"`
LastSignature crypto.Signature `json:"last_signature"` // so we dont lose signatures
LastSignBytes []byte `json:"last_signbytes"` // so we dont lose signatures
// PrivKey should be empty if a Signer other than the default is being used.
PrivKey crypto.PrivKey `json:"priv_key"`
@ -93,6 +94,7 @@ func GenPrivValidator() *PrivValidator {
LastRound: 0,
LastStep: stepNone,
LastSignature: nil,
LastSignBytes: nil,
filePath: "",
Signer: NewDefaultSigner(privKey),
}
@ -151,6 +153,16 @@ func (privVal *PrivValidator) save() {
}
}
// NOTE: Unsafe!
func (privVal *PrivValidator) Reset() {
privVal.LastHeight = 0
privVal.LastRound = 0
privVal.LastStep = 0
privVal.LastSignature = nil
privVal.LastSignBytes = nil
privVal.Save()
}
func (privVal *PrivValidator) SignVote(chainID string, vote *Vote) error {
privVal.mtx.Lock()
defer privVal.mtx.Unlock()
@ -190,9 +202,19 @@ func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signByt
if privVal.LastStep > step {
return nil, errors.New("Step regression")
} else if privVal.LastStep == step {
if privVal.LastSignature != nil {
return privVal.LastSignature, nil
if privVal.LastSignBytes != nil {
if privVal.LastSignature == nil {
PanicSanity("privVal: LastSignature is nil but LastSignBytes is not!")
}
// so we dont sign a conflicting vote or proposal
// NOTE: proposals are non-deterministic (include time),
// so we can actually lose them, but will still never sign conflicting ones
if bytes.Equal(privVal.LastSignBytes, signBytes) {
log.Notice("Using privVal.LastSignature", "sig", privVal.LastSignature)
return privVal.LastSignature, nil
}
}
return nil, errors.New("Step regression")
}
}
}
@ -205,6 +227,7 @@ func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signByt
privVal.LastRound = round
privVal.LastStep = step
privVal.LastSignature = signature
privVal.LastSignBytes = signBytes
privVal.save()
return signature, nil


Loading…
Cancel
Save