Browse Source

Merge pull request #268 from tendermint/replay

consensus: no sign err in replay; fix a race
pull/274/head
Ethan Buchman 8 years ago
committed by GitHub
parent
commit
9fb84d66be
3 changed files with 26 additions and 13 deletions
  1. +4
    -0
      consensus/replay.go
  2. +19
    -12
      consensus/state.go
  3. +3
    -1
      consensus/state_test.go

+ 4
- 0
consensus/replay.go View File

@ -82,6 +82,10 @@ func (cs *ConsensusState) catchupReplay(height int) error {
return nil return nil
} }
// set replayMode
cs.replayMode = true
defer func() { cs.replayMode = false }()
// starting from end of file, // starting from end of file,
// read messages until a new height is found // read messages until a new height is found
nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool {


+ 19
- 12
consensus/state.go View File

@ -233,7 +233,8 @@ type ConsensusState struct {
evsw *events.EventSwitch evsw *events.EventSwitch
wal *WAL
wal *WAL
replayMode bool // so we don't log signing errors during replay
nSteps int // used for testing to limit the number of transitions the state makes nSteps int // used for testing to limit the number of transitions the state makes
} }
@ -303,11 +304,11 @@ func (cs *ConsensusState) OnStart() error {
return err return err
} }
// start timeout routine
// NOTE: we dont start receiveRoutine until after replay
// so we dont re-write events, and so we dont process
// peer msgs before replay on app restarts.
// timeoutRoutine needed to read off tickChan during replay
// we need the timeoutRoutine for replay so
// we don't block on the tick chan.
// NOTE: we will get a build up of garbage go routines
// firing on the tockChan until the receiveRoutine is started
// to deal with them (by that point, at most one will be valid)
go cs.timeoutRoutine() go cs.timeoutRoutine()
// we may have lost some votes if the process crashed // we may have lost some votes if the process crashed
@ -317,12 +318,13 @@ func (cs *ConsensusState) OnStart() error {
// let's go for it anyways, maybe we're fine // let's go for it anyways, maybe we're fine
} }
// start
go cs.receiveRoutine(0)
// schedule the first round! // schedule the first round!
cs.scheduleRound0(cs.Height) cs.scheduleRound0(cs.Height)
// start the receiveRoutine last
// to avoid races (catchupReplay may have queued tocks/messages)
go cs.receiveRoutine(0)
return nil return nil
} }
@ -855,7 +857,9 @@ func (cs *ConsensusState) decideProposal(height, round int) {
log.Info("Signed proposal", "height", height, "round", round, "proposal", proposal) log.Info("Signed proposal", "height", height, "round", round, "proposal", proposal)
log.Debug(Fmt("Signed proposal block: %v", block)) log.Debug(Fmt("Signed proposal block: %v", block))
} else { } else {
log.Warn("enterPropose: Error signing proposal", "height", height, "round", round, "error", err)
if !cs.replayMode {
log.Warn("enterPropose: Error signing proposal", "height", height, "round", round, "error", err)
}
} }
} }
@ -1514,8 +1518,9 @@ func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSet
return vote, err return vote, err
} }
// signs the vote, publishes on internalMsgQueue
// sign the vote and publish on internalMsgQueue
func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote { func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote {
if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.Address) { if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.Address) {
return nil return nil
} }
@ -1527,7 +1532,9 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part
log.Info("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) log.Info("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)
return vote return vote
} else { } else {
log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)
if !cs.replayMode {
log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)
}
return nil return nil
} }
} }


+ 3
- 1
consensus/state_test.go View File

@ -236,7 +236,7 @@ func TestFullRound1(t *testing.T) {
cs, vss := randConsensusState(1) cs, vss := randConsensusState(1)
height, round := cs.Height, cs.Round height, round := cs.Height, cs.Round
voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1)
voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 0)
propCh := subscribeToEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), 1) propCh := subscribeToEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), 1)
newRoundCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewRound(), 1) newRoundCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewRound(), 1)
@ -249,6 +249,8 @@ func TestFullRound1(t *testing.T) {
propBlockHash := re.(types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash() propBlockHash := re.(types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash()
<-voteCh // wait for prevote <-voteCh // wait for prevote
// NOTE: voteChan cap of 0 ensures we can complete this
// before consensus can move to the next height (and cause a race condition)
validatePrevote(t, cs, round, vss[0], propBlockHash) validatePrevote(t, cs, round, vss[0], propBlockHash)
<-voteCh // wait for precommit <-voteCh // wait for precommit


Loading…
Cancel
Save