Browse Source

Merge branch 'master' into mempool-reduce-size-of-test

pull/8152/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
e03acadc35
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 17 deletions
  1. +3
    -1
      internal/consensus/reactor.go
  2. +8
    -1
      internal/consensus/state.go
  3. +0
    -15
      internal/libs/autofile/group.go

+ 3
- 1
internal/consensus/reactor.go View File

@ -348,6 +348,8 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote)
// internal pubsub defined in the consensus state to broadcast them to peers
// upon receiving.
func (r *Reactor) subscribeToBroadcastEvents() {
onStopCh := r.state.getOnStopCh()
err := r.state.evsw.AddListenerForEvent(
listenerIDConsensus,
types.EventNewRoundStepValue,
@ -356,7 +358,7 @@ func (r *Reactor) subscribeToBroadcastEvents() {
return err
}
select {
case r.state.onStopCh <- data.(*cstypes.RoundState):
case onStopCh <- data.(*cstypes.RoundState):
return nil
case <-ctx.Done():
return ctx.Err()


+ 8
- 1
internal/consensus/state.go View File

@ -493,12 +493,19 @@ func (cs *State) loadWalFile(ctx context.Context) error {
return nil
}
func (cs *State) getOnStopCh() chan *cstypes.RoundState {
cs.mtx.RLock()
defer cs.mtx.RUnlock()
return cs.onStopCh
}
// OnStop implements service.Service.
func (cs *State) OnStop() {
// If the node is committing a new block, wait until it is finished!
if cs.GetRoundState().Step == cstypes.RoundStepCommit {
select {
case <-cs.onStopCh:
case <-cs.getOnStopCh():
case <-time.After(cs.config.TimeoutCommit):
cs.logger.Error("OnStop: timeout waiting for commit to finish", "time", cs.config.TimeoutCommit)
}


+ 0
- 15
internal/libs/autofile/group.go View File

@ -69,11 +69,6 @@ type Group struct {
minIndex int // Includes head
maxIndex int // Includes head, where Head will move to
// close this when the processTicks routine is done.
// this ensures we can cleanup the dir after calling Stop
// and the routine won't be trying to access it anymore
doneProcessTicks chan struct{}
// TODO: When we start deleting files, we need to start tracking GroupReaders
// and their dependencies.
}
@ -101,7 +96,6 @@ func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOpt
groupCheckDuration: defaultGroupCheckDuration,
minIndex: 0,
maxIndex: 0,
doneProcessTicks: make(chan struct{}),
}
for _, option := range groupOptions {
@ -154,13 +148,6 @@ func (g *Group) OnStop() {
}
}
// Wait blocks until all internal goroutines are finished. Supposed to be
// called after Stop.
func (g *Group) Wait() {
// wait for processTicks routine to finish
<-g.doneProcessTicks
}
// Close closes the head file. The group must be stopped by this moment.
func (g *Group) Close() {
if err := g.FlushAndSync(); err != nil {
@ -241,8 +228,6 @@ func (g *Group) FlushAndSync() error {
}
func (g *Group) processTicks(ctx context.Context) {
defer close(g.doneProcessTicks)
for {
select {
case <-ctx.Done():


Loading…
Cancel
Save