From 3c63f657af55173e1fb8e67772ac7f2ff2e72ce8 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Fri, 11 Mar 2022 07:45:50 -0500 Subject: [PATCH] fix shutdown lock --- internal/consensus/reactor.go | 7 +++ internal/consensus/state.go | 99 +++++++++++++++++++++-------------- 2 files changed, 67 insertions(+), 39 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 16d83a269..2bbf963c8 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -360,6 +360,13 @@ func (r *Reactor) subscribeToBroadcastEvents() { if err := r.broadcastNewRoundStepMessage(ctx, data.(*cstypes.RoundState)); err != nil { return err } + + onStopCh := func() chan *cstypes.RoundState { + r.state.mtx.Lock() + defer r.state.mtx.Unlock() + return r.state.onStopCh + }() + select { case onStopCh <- data.(*cstypes.RoundState): return nil diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 50e2ab75c..99df337a3 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -787,34 +787,38 @@ func (cs *State) updateToState(ctx context.Context, state sm.State) { cs.updateHeight(height) cs.updateRoundStep(0, cstypes.RoundStepNewHeight) - if cs.CommitTime.IsZero() { - // "Now" makes it easier to sync up dev nodes. - // We add timeoutCommit to allow transactions - // to be gathered for the first block. - // And alternative solution that relies on clocks: - // cs.StartTime = state.LastBlockTime.Add(timeoutCommit) - cs.StartTime = cs.config.Commit(tmtime.Now()) - } else { - cs.StartTime = cs.config.Commit(cs.CommitTime) - } - - cs.Validators = validators - cs.Proposal = nil - cs.ProposalReceiveTime = time.Time{} - cs.ProposalBlock = nil - cs.ProposalBlockParts = nil - cs.LockedRound = -1 - cs.LockedBlock = nil - cs.LockedBlockParts = nil - cs.ValidRound = -1 - cs.ValidBlock = nil - cs.ValidBlockParts = nil - cs.Votes = cstypes.NewHeightVoteSet(state.ChainID, height, validators) - cs.CommitRound = -1 - cs.LastValidators = state.LastValidators - cs.TriggeredTimeoutPrecommit = false - - cs.state = state + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + if cs.CommitTime.IsZero() { + // "Now" makes it easier to sync up dev nodes. + // We add timeoutCommit to allow transactions + // to be gathered for the first block. + // And alternative solution that relies on clocks: + // cs.StartTime = state.LastBlockTime.Add(timeoutCommit) + cs.StartTime = cs.config.Commit(tmtime.Now()) + } else { + cs.StartTime = cs.config.Commit(cs.CommitTime) + } + + cs.Validators = validators + cs.Proposal = nil + cs.ProposalReceiveTime = time.Time{} + cs.ProposalBlock = nil + cs.ProposalBlockParts = nil + cs.LockedRound = -1 + cs.LockedBlock = nil + cs.LockedBlockParts = nil + cs.ValidRound = -1 + cs.ValidBlock = nil + cs.ValidBlockParts = nil + cs.Votes = cstypes.NewHeightVoteSet(state.ChainID, height, validators) + cs.CommitRound = -1 + cs.LastValidators = state.LastValidators + cs.TriggeredTimeoutPrecommit = false + cs.state = state + }() // Finally, broadcast RoundState cs.newStep(ctx) @@ -1079,11 +1083,15 @@ func (cs *State) handleTimeout( } func (cs *State) handleTxsAvailable(ctx context.Context) { - cs.mtx.Lock() - defer cs.mtx.Unlock() - // We only need to do this for round 0. - if cs.Round != 0 { + if func() bool { + cs.mtx.RLock() + defer cs.mtx.RUnlock() + if cs.Round != 0 { + return true + } + return false + }() { return } @@ -1147,17 +1155,26 @@ func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) { // we don't fire newStep for this step, // but we fire an event, so update the round step first cs.updateRoundStep(round, cstypes.RoundStepNewRound) - cs.Validators = validators + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.Validators = validators + }() if round == 0 { // We've already reset these upon new height, // and meanwhile we might have received a proposal // for round 0. } else { logger.Debug("resetting proposal info") - cs.Proposal = nil - cs.ProposalReceiveTime = time.Time{} - cs.ProposalBlock = nil - cs.ProposalBlockParts = nil + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + cs.Proposal = nil + cs.ProposalReceiveTime = time.Time{} + cs.ProposalBlock = nil + cs.ProposalBlockParts = nil + }() } r, err := tmmath.SafeAddInt32(round, 1) @@ -1165,8 +1182,12 @@ func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) { panic(err) } - cs.Votes.SetRound(r) // also track next round (round+1) to allow round-skipping - cs.TriggeredTimeoutPrecommit = false + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.Votes.SetRound(r) // also track next round (round+1) to allow round-skipping + cs.TriggeredTimeoutPrecommit = false + }() if err := cs.eventBus.PublishEventNewRound(ctx, cs.NewRoundEvent()); err != nil { cs.logger.Error("failed publishing new round", "err", err)