diff --git a/consensus/replay.go b/consensus/replay.go index 19c117eac..9bb0fb055 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -82,6 +82,10 @@ func (cs *ConsensusState) catchupReplay(height int) error { return nil } + // set replayMode + cs.replayMode = true + defer func() { cs.replayMode = false }() + // starting from end of file, // read messages until a new height is found nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { diff --git a/consensus/state.go b/consensus/state.go index e7edb7308..714d17c77 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -233,7 +233,8 @@ type ConsensusState struct { evsw *events.EventSwitch - wal *WAL + wal *WAL + replayMode bool // so we don't log signing errors during replay nSteps int // used for testing to limit the number of transitions the state makes } @@ -303,11 +304,11 @@ func (cs *ConsensusState) OnStart() error { return err } - // start timeout routine - // NOTE: we dont start receiveRoutine until after replay - // so we dont re-write events, and so we dont process - // peer msgs before replay on app restarts. - // timeoutRoutine needed to read off tickChan during replay + // we need the timeoutRoutine for replay so + // we don't block on the tick chan. + // NOTE: we will get a build up of garbage go routines + // firing on the tockChan until the receiveRoutine is started + // to deal with them (by that point, at most one will be valid) go cs.timeoutRoutine() // we may have lost some votes if the process crashed @@ -317,12 +318,13 @@ func (cs *ConsensusState) OnStart() error { // let's go for it anyways, maybe we're fine } - // start - go cs.receiveRoutine(0) - // schedule the first round! cs.scheduleRound0(cs.Height) + // start the receiveRoutine last + // to avoid races (catchupReplay may have queued tocks/messages) + go cs.receiveRoutine(0) + return nil } @@ -855,7 +857,9 @@ func (cs *ConsensusState) decideProposal(height, round int) { log.Info("Signed proposal", "height", height, "round", round, "proposal", proposal) log.Debug(Fmt("Signed proposal block: %v", block)) } else { - log.Warn("enterPropose: Error signing proposal", "height", height, "round", round, "error", err) + if !cs.replayMode { + log.Warn("enterPropose: Error signing proposal", "height", height, "round", round, "error", err) + } } } @@ -1514,8 +1518,9 @@ func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSet return vote, err } -// signs the vote, publishes on internalMsgQueue +// sign the vote and publish on internalMsgQueue func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote { + if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.Address) { return nil } @@ -1527,7 +1532,9 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part log.Info("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) return vote } else { - log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) + if !cs.replayMode { + log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) + } return nil } } diff --git a/consensus/state_test.go b/consensus/state_test.go index 9a8d60905..d890c4e93 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -236,7 +236,7 @@ func TestFullRound1(t *testing.T) { cs, vss := randConsensusState(1) height, round := cs.Height, cs.Round - voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1) + voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 0) propCh := subscribeToEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), 1) newRoundCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewRound(), 1) @@ -249,6 +249,8 @@ func TestFullRound1(t *testing.T) { propBlockHash := re.(types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash() <-voteCh // wait for prevote + // NOTE: voteChan cap of 0 ensures we can complete this + // before consensus can move to the next height (and cause a race condition) validatePrevote(t, cs, round, vss[0], propBlockHash) <-voteCh // wait for precommit