From d187962ec06c345445556c4cf8e3501c303c85a3 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Thu, 17 Feb 2022 15:42:11 -0500 Subject: [PATCH] blocksync: shutdown cleanup (#7840) Now that shutdown is handled by contexts in most cases, I think it's fair to cleanup the way this reactor shuts down. Additionaly there were a few cases where the `blockSyncOutBridgeCh` was misshandled and could have lead to a deadlock which I observed in some tests --- internal/blocksync/reactor.go | 56 +++++++++--------------------- internal/blocksync/reactor_test.go | 2 ++ 2 files changed, 19 insertions(+), 39 deletions(-) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 0261c055a..f4d69b8b0 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "runtime/debug" - "sync" "sync/atomic" "time" @@ -92,11 +91,6 @@ type Reactor struct { requestsCh <-chan BlockRequest errorsCh <-chan peerError - // poolWG is used to synchronize the graceful shutdown of the poolRoutine and - // requestRoutine spawned goroutines when stopping the reactor and before - // stopping the p2p Channel(s). - poolWG sync.WaitGroup - metrics *consensus.Metrics eventBus *eventbus.EventBus @@ -169,10 +163,8 @@ func (r *Reactor) OnStart(ctx context.Context) error { if err := r.pool.Start(ctx); err != nil { return err } - r.poolWG.Add(1) go r.requestRoutine(ctx) - r.poolWG.Add(1) go r.poolRoutine(ctx, false) } @@ -189,9 +181,6 @@ func (r *Reactor) OnStop() { if r.blockSync.IsSet() { r.pool.Stop() } - - // wait for the poolRoutine and requestRoutine goroutines to gracefully exit - r.poolWG.Wait() } // respondToPeer loads a block and sends it to the requesting peer, if we have it. @@ -376,10 +365,8 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error { r.syncStartTime = time.Now() - r.poolWG.Add(1) go r.requestRoutine(ctx) - r.poolWG.Add(1) go r.poolRoutine(ctx, true) return nil @@ -389,17 +376,20 @@ func (r *Reactor) requestRoutine(ctx context.Context) { statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) defer statusUpdateTicker.Stop() - defer r.poolWG.Done() - for { select { case <-ctx.Done(): return case request := <-r.requestsCh: - r.blockSyncOutBridgeCh <- p2p.Envelope{ + select { + case <-ctx.Done(): + return + case r.blockSyncOutBridgeCh <- p2p.Envelope{ To: request.PeerID, Message: &bcproto.BlockRequest{Height: request.Height}, + }: } + case pErr := <-r.errorsCh: if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: pErr.peerID, @@ -408,17 +398,14 @@ func (r *Reactor) requestRoutine(ctx context.Context) { return } case <-statusUpdateTicker.C: - r.poolWG.Add(1) - go func() { - defer r.poolWG.Done() - select { + case <-ctx.Done(): + return case r.blockSyncOutBridgeCh <- p2p.Envelope{ Broadcast: true, Message: &bcproto.StatusRequest{}, }: - case <-ctx.Done(): } }() } @@ -448,11 +435,12 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) { defer trySyncTicker.Stop() defer switchToConsensusTicker.Stop() - defer r.poolWG.Done() - -FOR_LOOP: for { select { + case <-ctx.Done(): + return + case <-r.pool.exitedCh: + return case <-switchToConsensusTicker.C: var ( height, numPending, lenRequesters = r.pool.GetStatus() @@ -491,14 +479,13 @@ FOR_LOOP: r.consReactor.SwitchToConsensus(ctx, state, blocksSynced > 0 || stateSynced) } - break FOR_LOOP + return case <-trySyncTicker.C: select { case didProcessCh <- struct{}{}: default: } - case <-didProcessCh: // NOTE: It is a subtle mistake to process more than a single block at a // time (e.g. 10) here, because we only send one BlockRequest per loop @@ -513,7 +500,7 @@ FOR_LOOP: first, second := r.pool.PeekTwoBlocks() if first == nil || second == nil { // we need both to sync the first block - continue FOR_LOOP + continue } else { // try again quickly next loop didProcessCh <- struct{}{} @@ -524,7 +511,7 @@ FOR_LOOP: r.logger.Error("failed to make ", "height", first.Height, "err", err.Error()) - break FOR_LOOP + return } var ( @@ -553,7 +540,7 @@ FOR_LOOP: NodeID: peerID, Err: err, }); serr != nil { - break FOR_LOOP + return } peerID2 := r.pool.RedoRequest(second.Height) @@ -562,11 +549,9 @@ FOR_LOOP: NodeID: peerID2, Err: err, }); serr != nil { - break FOR_LOOP + return } } - - continue FOR_LOOP } else { r.pool.PopRequest() @@ -599,13 +584,6 @@ FOR_LOOP: lastHundred = time.Now() } } - - continue FOR_LOOP - - case <-ctx.Done(): - break FOR_LOOP - case <-r.pool.exitedCh: - break FOR_LOOP } } } diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 3a6b7b2f5..68656fbc3 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" @@ -90,6 +91,7 @@ func setup( } } }) + t.Cleanup(leaktest.Check(t)) return rts }