Browse Source

fix shutdown lock

pull/8127/head
tycho garen 3 years ago
committed by M. J. Fromberger
parent
commit
3c63f657af
2 changed files with 67 additions and 39 deletions
  1. +7
    -0
      internal/consensus/reactor.go
  2. +60
    -39
      internal/consensus/state.go

+ 7
- 0
internal/consensus/reactor.go View File

@ -360,6 +360,13 @@ func (r *Reactor) subscribeToBroadcastEvents() {
if err := r.broadcastNewRoundStepMessage(ctx, data.(*cstypes.RoundState)); err != nil {
return err
}
onStopCh := func() chan *cstypes.RoundState {
r.state.mtx.Lock()
defer r.state.mtx.Unlock()
return r.state.onStopCh
}()
select {
case onStopCh <- data.(*cstypes.RoundState):
return nil


+ 60
- 39
internal/consensus/state.go View File

@ -787,34 +787,38 @@ func (cs *State) updateToState(ctx context.Context, state sm.State) {
cs.updateHeight(height)
cs.updateRoundStep(0, cstypes.RoundStepNewHeight)
if cs.CommitTime.IsZero() {
// "Now" makes it easier to sync up dev nodes.
// We add timeoutCommit to allow transactions
// to be gathered for the first block.
// And alternative solution that relies on clocks:
// cs.StartTime = state.LastBlockTime.Add(timeoutCommit)
cs.StartTime = cs.config.Commit(tmtime.Now())
} else {
cs.StartTime = cs.config.Commit(cs.CommitTime)
}
cs.Validators = validators
cs.Proposal = nil
cs.ProposalReceiveTime = time.Time{}
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
cs.LockedRound = -1
cs.LockedBlock = nil
cs.LockedBlockParts = nil
cs.ValidRound = -1
cs.ValidBlock = nil
cs.ValidBlockParts = nil
cs.Votes = cstypes.NewHeightVoteSet(state.ChainID, height, validators)
cs.CommitRound = -1
cs.LastValidators = state.LastValidators
cs.TriggeredTimeoutPrecommit = false
cs.state = state
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.CommitTime.IsZero() {
// "Now" makes it easier to sync up dev nodes.
// We add timeoutCommit to allow transactions
// to be gathered for the first block.
// And alternative solution that relies on clocks:
// cs.StartTime = state.LastBlockTime.Add(timeoutCommit)
cs.StartTime = cs.config.Commit(tmtime.Now())
} else {
cs.StartTime = cs.config.Commit(cs.CommitTime)
}
cs.Validators = validators
cs.Proposal = nil
cs.ProposalReceiveTime = time.Time{}
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
cs.LockedRound = -1
cs.LockedBlock = nil
cs.LockedBlockParts = nil
cs.ValidRound = -1
cs.ValidBlock = nil
cs.ValidBlockParts = nil
cs.Votes = cstypes.NewHeightVoteSet(state.ChainID, height, validators)
cs.CommitRound = -1
cs.LastValidators = state.LastValidators
cs.TriggeredTimeoutPrecommit = false
cs.state = state
}()
// Finally, broadcast RoundState
cs.newStep(ctx)
@ -1079,11 +1083,15 @@ func (cs *State) handleTimeout(
}
func (cs *State) handleTxsAvailable(ctx context.Context) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
// We only need to do this for round 0.
if cs.Round != 0 {
if func() bool {
cs.mtx.RLock()
defer cs.mtx.RUnlock()
if cs.Round != 0 {
return true
}
return false
}() {
return
}
@ -1147,17 +1155,26 @@ func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) {
// we don't fire newStep for this step,
// but we fire an event, so update the round step first
cs.updateRoundStep(round, cstypes.RoundStepNewRound)
cs.Validators = validators
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.Validators = validators
}()
if round == 0 {
// We've already reset these upon new height,
// and meanwhile we might have received a proposal
// for round 0.
} else {
logger.Debug("resetting proposal info")
cs.Proposal = nil
cs.ProposalReceiveTime = time.Time{}
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.Proposal = nil
cs.ProposalReceiveTime = time.Time{}
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
}()
}
r, err := tmmath.SafeAddInt32(round, 1)
@ -1165,8 +1182,12 @@ func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) {
panic(err)
}
cs.Votes.SetRound(r) // also track next round (round+1) to allow round-skipping
cs.TriggeredTimeoutPrecommit = false
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.Votes.SetRound(r) // also track next round (round+1) to allow round-skipping
cs.TriggeredTimeoutPrecommit = false
}()
if err := cs.eventBus.PublishEventNewRound(ctx, cs.NewRoundEvent()); err != nil {
cs.logger.Error("failed publishing new round", "err", err)


Loading…
Cancel
Save