Browse Source

blocksync: shutdown cleanup (#7840)

Now that shutdown is handled by contexts in most cases, I think it's
fair to cleanup the way this reactor shuts down. Additionaly there
were a few cases where the `blockSyncOutBridgeCh` was misshandled and
could have lead to a deadlock which I observed in some tests
pull/7847/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
d187962ec0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 39 deletions
  1. +17
    -39
      internal/blocksync/reactor.go
  2. +2
    -0
      internal/blocksync/reactor_test.go

+ 17
- 39
internal/blocksync/reactor.go View File

@ -5,7 +5,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"runtime/debug" "runtime/debug"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -92,11 +91,6 @@ type Reactor struct {
requestsCh <-chan BlockRequest requestsCh <-chan BlockRequest
errorsCh <-chan peerError errorsCh <-chan peerError
// poolWG is used to synchronize the graceful shutdown of the poolRoutine and
// requestRoutine spawned goroutines when stopping the reactor and before
// stopping the p2p Channel(s).
poolWG sync.WaitGroup
metrics *consensus.Metrics metrics *consensus.Metrics
eventBus *eventbus.EventBus eventBus *eventbus.EventBus
@ -169,10 +163,8 @@ func (r *Reactor) OnStart(ctx context.Context) error {
if err := r.pool.Start(ctx); err != nil { if err := r.pool.Start(ctx); err != nil {
return err return err
} }
r.poolWG.Add(1)
go r.requestRoutine(ctx) go r.requestRoutine(ctx)
r.poolWG.Add(1)
go r.poolRoutine(ctx, false) go r.poolRoutine(ctx, false)
} }
@ -189,9 +181,6 @@ func (r *Reactor) OnStop() {
if r.blockSync.IsSet() { if r.blockSync.IsSet() {
r.pool.Stop() r.pool.Stop()
} }
// wait for the poolRoutine and requestRoutine goroutines to gracefully exit
r.poolWG.Wait()
} }
// respondToPeer loads a block and sends it to the requesting peer, if we have it. // respondToPeer loads a block and sends it to the requesting peer, if we have it.
@ -376,10 +365,8 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
r.syncStartTime = time.Now() r.syncStartTime = time.Now()
r.poolWG.Add(1)
go r.requestRoutine(ctx) go r.requestRoutine(ctx)
r.poolWG.Add(1)
go r.poolRoutine(ctx, true) go r.poolRoutine(ctx, true)
return nil return nil
@ -389,17 +376,20 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
defer statusUpdateTicker.Stop() defer statusUpdateTicker.Stop()
defer r.poolWG.Done()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case request := <-r.requestsCh: case request := <-r.requestsCh:
r.blockSyncOutBridgeCh <- p2p.Envelope{
select {
case <-ctx.Done():
return
case r.blockSyncOutBridgeCh <- p2p.Envelope{
To: request.PeerID, To: request.PeerID,
Message: &bcproto.BlockRequest{Height: request.Height}, Message: &bcproto.BlockRequest{Height: request.Height},
}:
} }
case pErr := <-r.errorsCh: case pErr := <-r.errorsCh:
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: pErr.peerID, NodeID: pErr.peerID,
@ -408,17 +398,14 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
return return
} }
case <-statusUpdateTicker.C: case <-statusUpdateTicker.C:
r.poolWG.Add(1)
go func() { go func() {
defer r.poolWG.Done()
select { select {
case <-ctx.Done():
return
case r.blockSyncOutBridgeCh <- p2p.Envelope{ case r.blockSyncOutBridgeCh <- p2p.Envelope{
Broadcast: true, Broadcast: true,
Message: &bcproto.StatusRequest{}, Message: &bcproto.StatusRequest{},
}: }:
case <-ctx.Done():
} }
}() }()
} }
@ -448,11 +435,12 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {
defer trySyncTicker.Stop() defer trySyncTicker.Stop()
defer switchToConsensusTicker.Stop() defer switchToConsensusTicker.Stop()
defer r.poolWG.Done()
FOR_LOOP:
for { for {
select { select {
case <-ctx.Done():
return
case <-r.pool.exitedCh:
return
case <-switchToConsensusTicker.C: case <-switchToConsensusTicker.C:
var ( var (
height, numPending, lenRequesters = r.pool.GetStatus() height, numPending, lenRequesters = r.pool.GetStatus()
@ -491,14 +479,13 @@ FOR_LOOP:
r.consReactor.SwitchToConsensus(ctx, state, blocksSynced > 0 || stateSynced) r.consReactor.SwitchToConsensus(ctx, state, blocksSynced > 0 || stateSynced)
} }
break FOR_LOOP
return
case <-trySyncTicker.C: case <-trySyncTicker.C:
select { select {
case didProcessCh <- struct{}{}: case didProcessCh <- struct{}{}:
default: default:
} }
case <-didProcessCh: case <-didProcessCh:
// NOTE: It is a subtle mistake to process more than a single block at a // NOTE: It is a subtle mistake to process more than a single block at a
// time (e.g. 10) here, because we only send one BlockRequest per loop // time (e.g. 10) here, because we only send one BlockRequest per loop
@ -513,7 +500,7 @@ FOR_LOOP:
first, second := r.pool.PeekTwoBlocks() first, second := r.pool.PeekTwoBlocks()
if first == nil || second == nil { if first == nil || second == nil {
// we need both to sync the first block // we need both to sync the first block
continue FOR_LOOP
continue
} else { } else {
// try again quickly next loop // try again quickly next loop
didProcessCh <- struct{}{} didProcessCh <- struct{}{}
@ -524,7 +511,7 @@ FOR_LOOP:
r.logger.Error("failed to make ", r.logger.Error("failed to make ",
"height", first.Height, "height", first.Height,
"err", err.Error()) "err", err.Error())
break FOR_LOOP
return
} }
var ( var (
@ -553,7 +540,7 @@ FOR_LOOP:
NodeID: peerID, NodeID: peerID,
Err: err, Err: err,
}); serr != nil { }); serr != nil {
break FOR_LOOP
return
} }
peerID2 := r.pool.RedoRequest(second.Height) peerID2 := r.pool.RedoRequest(second.Height)
@ -562,11 +549,9 @@ FOR_LOOP:
NodeID: peerID2, NodeID: peerID2,
Err: err, Err: err,
}); serr != nil { }); serr != nil {
break FOR_LOOP
return
} }
} }
continue FOR_LOOP
} else { } else {
r.pool.PopRequest() r.pool.PopRequest()
@ -599,13 +584,6 @@ FOR_LOOP:
lastHundred = time.Now() lastHundred = time.Now()
} }
} }
continue FOR_LOOP
case <-ctx.Done():
break FOR_LOOP
case <-r.pool.exitedCh:
break FOR_LOOP
} }
} }
} }


+ 2
- 0
internal/blocksync/reactor_test.go View File

@ -6,6 +6,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db" dbm "github.com/tendermint/tm-db"
@ -90,6 +91,7 @@ func setup(
} }
} }
}) })
t.Cleanup(leaktest.Check(t))
return rts return rts
} }


Loading…
Cancel
Save