diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 822e8c627..52e547bf6 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -348,6 +348,8 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote) // internal pubsub defined in the consensus state to broadcast them to peers // upon receiving. func (r *Reactor) subscribeToBroadcastEvents() { + onStopCh := r.state.getOnStopCh() + err := r.state.evsw.AddListenerForEvent( listenerIDConsensus, types.EventNewRoundStepValue, @@ -356,7 +358,7 @@ func (r *Reactor) subscribeToBroadcastEvents() { return err } select { - case r.state.onStopCh <- data.(*cstypes.RoundState): + case onStopCh <- data.(*cstypes.RoundState): return nil case <-ctx.Done(): return ctx.Err() diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 7b48aa3bc..109e90683 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -493,12 +493,19 @@ func (cs *State) loadWalFile(ctx context.Context) error { return nil } +func (cs *State) getOnStopCh() chan *cstypes.RoundState { + cs.mtx.RLock() + defer cs.mtx.RUnlock() + + return cs.onStopCh +} + // OnStop implements service.Service. func (cs *State) OnStop() { // If the node is committing a new block, wait until it is finished! if cs.GetRoundState().Step == cstypes.RoundStepCommit { select { - case <-cs.onStopCh: + case <-cs.getOnStopCh(): case <-time.After(cs.config.TimeoutCommit): cs.logger.Error("OnStop: timeout waiting for commit to finish", "time", cs.config.TimeoutCommit) } diff --git a/internal/libs/autofile/group.go b/internal/libs/autofile/group.go index 0d1806f22..81e16feea 100644 --- a/internal/libs/autofile/group.go +++ b/internal/libs/autofile/group.go @@ -69,11 +69,6 @@ type Group struct { minIndex int // Includes head maxIndex int // Includes head, where Head will move to - // close this when the processTicks routine is done. - // this ensures we can cleanup the dir after calling Stop - // and the routine won't be trying to access it anymore - doneProcessTicks chan struct{} - // TODO: When we start deleting files, we need to start tracking GroupReaders // and their dependencies. } @@ -101,7 +96,6 @@ func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOpt groupCheckDuration: defaultGroupCheckDuration, minIndex: 0, maxIndex: 0, - doneProcessTicks: make(chan struct{}), } for _, option := range groupOptions { @@ -154,13 +148,6 @@ func (g *Group) OnStop() { } } -// Wait blocks until all internal goroutines are finished. Supposed to be -// called after Stop. -func (g *Group) Wait() { - // wait for processTicks routine to finish - <-g.doneProcessTicks -} - // Close closes the head file. The group must be stopped by this moment. func (g *Group) Close() { if err := g.FlushAndSync(); err != nil { @@ -241,8 +228,6 @@ func (g *Group) FlushAndSync() error { } func (g *Group) processTicks(ctx context.Context) { - defer close(g.doneProcessTicks) - for { select { case <-ctx.Done():