Browse Source

fix shutdown lock

pull/8091/head
tycho garen 3 years ago
parent
commit
b5595e6983
2 changed files with 74 additions and 41 deletions
  1. +8
    -1
      internal/consensus/reactor.go
  2. +66
    -40
      internal/consensus/state.go

+ 8
- 1
internal/consensus/reactor.go View File

@ -357,8 +357,15 @@ func (r *Reactor) subscribeToBroadcastEvents() {
if err := r.broadcastNewRoundStepMessage(ctx, data.(*cstypes.RoundState)); err != nil { if err := r.broadcastNewRoundStepMessage(ctx, data.(*cstypes.RoundState)); err != nil {
return err return err
} }
onStopCh := func() chan *cstypes.RoundState {
r.state.mtx.Lock()
defer r.state.mtx.Unlock()
return r.state.onStopCh
}()
select { select {
case r.state.onStopCh <- data.(*cstypes.RoundState):
case onStopCh <- data.(*cstypes.RoundState):
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()


+ 66
- 40
internal/consensus/state.go View File

@ -514,7 +514,12 @@ func (cs *State) OnStop() {
} }
} }
close(cs.onStopCh)
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
close(cs.onStopCh)
}()
if cs.evsw.IsRunning() { if cs.evsw.IsRunning() {
cs.evsw.Stop() cs.evsw.Stop()
@ -801,34 +806,38 @@ func (cs *State) updateToState(ctx context.Context, state sm.State) {
cs.updateHeight(height) cs.updateHeight(height)
cs.updateRoundStep(0, cstypes.RoundStepNewHeight) cs.updateRoundStep(0, cstypes.RoundStepNewHeight)
if cs.CommitTime.IsZero() {
// "Now" makes it easier to sync up dev nodes.
// We add timeoutCommit to allow transactions
// to be gathered for the first block.
// And alternative solution that relies on clocks:
// cs.StartTime = state.LastBlockTime.Add(timeoutCommit)
cs.StartTime = cs.config.Commit(tmtime.Now())
} else {
cs.StartTime = cs.config.Commit(cs.CommitTime)
}
cs.Validators = validators
cs.Proposal = nil
cs.ProposalReceiveTime = time.Time{}
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
cs.LockedRound = -1
cs.LockedBlock = nil
cs.LockedBlockParts = nil
cs.ValidRound = -1
cs.ValidBlock = nil
cs.ValidBlockParts = nil
cs.Votes = cstypes.NewHeightVoteSet(state.ChainID, height, validators)
cs.CommitRound = -1
cs.LastValidators = state.LastValidators
cs.TriggeredTimeoutPrecommit = false
cs.state = state
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.CommitTime.IsZero() {
// "Now" makes it easier to sync up dev nodes.
// We add timeoutCommit to allow transactions
// to be gathered for the first block.
// And alternative solution that relies on clocks:
// cs.StartTime = state.LastBlockTime.Add(timeoutCommit)
cs.StartTime = cs.config.Commit(tmtime.Now())
} else {
cs.StartTime = cs.config.Commit(cs.CommitTime)
}
cs.Validators = validators
cs.Proposal = nil
cs.ProposalReceiveTime = time.Time{}
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
cs.LockedRound = -1
cs.LockedBlock = nil
cs.LockedBlockParts = nil
cs.ValidRound = -1
cs.ValidBlock = nil
cs.ValidBlockParts = nil
cs.Votes = cstypes.NewHeightVoteSet(state.ChainID, height, validators)
cs.CommitRound = -1
cs.LastValidators = state.LastValidators
cs.TriggeredTimeoutPrecommit = false
cs.state = state
}()
// Finally, broadcast RoundState // Finally, broadcast RoundState
cs.newStep(ctx) cs.newStep(ctx)
@ -1068,11 +1077,15 @@ func (cs *State) handleTimeout(
} }
func (cs *State) handleTxsAvailable(ctx context.Context) { func (cs *State) handleTxsAvailable(ctx context.Context) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
// We only need to do this for round 0. // We only need to do this for round 0.
if cs.Round != 0 {
if func() bool {
cs.mtx.RLock()
defer cs.mtx.RUnlock()
if cs.Round != 0 {
return true
}
return false
}() {
return return
} }
@ -1136,17 +1149,26 @@ func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) {
// we don't fire newStep for this step, // we don't fire newStep for this step,
// but we fire an event, so update the round step first // but we fire an event, so update the round step first
cs.updateRoundStep(round, cstypes.RoundStepNewRound) cs.updateRoundStep(round, cstypes.RoundStepNewRound)
cs.Validators = validators
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.Validators = validators
}()
if round == 0 { if round == 0 {
// We've already reset these upon new height, // We've already reset these upon new height,
// and meanwhile we might have received a proposal // and meanwhile we might have received a proposal
// for round 0. // for round 0.
} else { } else {
logger.Debug("resetting proposal info") logger.Debug("resetting proposal info")
cs.Proposal = nil
cs.ProposalReceiveTime = time.Time{}
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.Proposal = nil
cs.ProposalReceiveTime = time.Time{}
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
}()
} }
r, err := tmmath.SafeAddInt32(round, 1) r, err := tmmath.SafeAddInt32(round, 1)
@ -1154,8 +1176,12 @@ func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) {
panic(err) panic(err)
} }
cs.Votes.SetRound(r) // also track next round (round+1) to allow round-skipping
cs.TriggeredTimeoutPrecommit = false
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.Votes.SetRound(r) // also track next round (round+1) to allow round-skipping
cs.TriggeredTimeoutPrecommit = false
}()
if err := cs.eventBus.PublishEventNewRound(ctx, cs.NewRoundEvent()); err != nil { if err := cs.eventBus.PublishEventNewRound(ctx, cs.NewRoundEvent()); err != nil {
cs.logger.Error("failed publishing new round", "err", err) cs.logger.Error("failed publishing new round", "err", err)


Loading…
Cancel
Save