From 0ff3d4b89dd316f049e3186bfa4a321e699a6ac0 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 7 Dec 2021 11:40:59 -0500 Subject: [PATCH] service: cleanup close channel in reactors (#7399) --- internal/blocksync/reactor.go | 26 ++--------------- internal/consensus/reactor.go | 47 ++++-------------------------- internal/evidence/reactor.go | 25 ++-------------- internal/statesync/dispatcher.go | 16 ++-------- internal/statesync/reactor.go | 32 ++++---------------- internal/statesync/reactor_test.go | 1 - internal/statesync/syncer.go | 17 ++++------- internal/statesync/syncer_test.go | 4 +-- 8 files changed, 26 insertions(+), 142 deletions(-) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index f6ea48839..479506c71 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -86,7 +86,6 @@ type Reactor struct { // blockSyncCh.Out. blockSyncOutBridgeCh chan p2p.Envelope peerUpdates *p2p.PeerUpdates - closeCh chan struct{} requestsCh <-chan BlockRequest errorsCh <-chan peerError @@ -138,7 +137,6 @@ func NewReactor( blockSyncCh: blockSyncCh, blockSyncOutBridgeCh: make(chan p2p.Envelope), peerUpdates: peerUpdates, - closeCh: make(chan struct{}), metrics: metrics, syncStartTime: time.Time{}, } @@ -184,10 +182,6 @@ func (r *Reactor) OnStop() { // wait for the poolRoutine and requestRoutine goroutines to gracefully exit r.poolWG.Wait() - // Close closeCh to signal to all spawned goroutines to gracefully exit. All - // p2p Channels should execute Close(). - close(r.closeCh) - <-r.peerUpdates.Done() } @@ -295,6 +289,7 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) { for { select { case <-ctx.Done(): + r.logger.Debug("stopped listening on block sync channel; closing...") return case envelope := <-r.blockSyncCh.In: if err := r.handleMessage(r.blockSyncCh.ID, envelope); err != nil { @@ -304,14 +299,8 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) { Err: err, } } - case envelope := <-r.blockSyncOutBridgeCh: r.blockSyncCh.Out <- envelope - - case <-r.closeCh: - r.logger.Debug("stopped listening on block sync channel; closing...") - return - } } } @@ -350,13 +339,10 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { for { select { case <-ctx.Done(): + r.logger.Debug("stopped listening on peer updates channel; closing...") return case peerUpdate := <-r.peerUpdates.Updates(): r.processPeerUpdate(peerUpdate) - - case <-r.closeCh: - r.logger.Debug("stopped listening on peer updates channel; closing...") - return } } } @@ -391,24 +377,18 @@ func (r *Reactor) requestRoutine(ctx context.Context) { for { select { - case <-r.closeCh: - return - case <-ctx.Done(): return - case request := <-r.requestsCh: r.blockSyncOutBridgeCh <- p2p.Envelope{ To: request.PeerID, Message: &bcproto.BlockRequest{Height: request.Height}, } - case pErr := <-r.errorsCh: r.blockSyncCh.Error <- p2p.PeerError{ NodeID: pErr.peerID, Err: pErr.err, } - case <-statusUpdateTicker.C: r.poolWG.Add(1) @@ -598,8 +578,6 @@ FOR_LOOP: case <-ctx.Done(): break FOR_LOOP - case <-r.closeCh: - break FOR_LOOP case <-r.pool.exitedCh: break FOR_LOOP } diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index d0d625e26..387f84b15 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -126,13 +126,6 @@ type Reactor struct { voteCh *p2p.Channel voteSetBitsCh *p2p.Channel peerUpdates *p2p.PeerUpdates - - // NOTE: We need a dedicated stateCloseCh channel for signaling closure of - // the StateChannel due to the fact that the StateChannel message handler - // performs a send on the VoteSetBitsChannel. This is an antipattern, so having - // this dedicated channel,stateCloseCh, is necessary in order to avoid data races. - stateCloseCh chan struct{} - closeCh chan struct{} } // NewReactor returns a reference to a new consensus reactor, which implements @@ -162,8 +155,6 @@ func NewReactor( voteCh: voteCh, voteSetBitsCh: voteSetBitsCh, peerUpdates: peerUpdates, - stateCloseCh: make(chan struct{}), - closeCh: make(chan struct{}), } r.BaseService = *service.NewBaseService(logger, "Consensus", r) @@ -230,14 +221,6 @@ func (r *Reactor) OnStop() { } r.mtx.Unlock() - // Close the StateChannel goroutine separately since it uses its own channel - // to signal closure. - close(r.stateCloseCh) - - // Close closeCh to signal to all spawned goroutines to gracefully exit. All - // p2p Channels should execute Close(). - close(r.closeCh) - <-r.peerUpdates.Done() } @@ -993,8 +976,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda case p2p.PeerStatusUp: // Do not allow starting new broadcasting goroutines after reactor shutdown // has been initiated. This can happen after we've manually closed all - // peer goroutines and closed r.closeCh, but the router still sends in-flight - // peer updates. + // peer goroutines, but the router still sends in-flight peer updates. if !r.IsRunning() { return } @@ -1337,6 +1319,7 @@ func (r *Reactor) processStateCh(ctx context.Context) { for { select { case <-ctx.Done(): + r.logger.Debug("stopped listening on StateChannel; closing...") return case envelope := <-r.stateCh.In: if err := r.handleMessage(r.stateCh.ID, envelope); err != nil { @@ -1346,10 +1329,6 @@ func (r *Reactor) processStateCh(ctx context.Context) { Err: err, } } - - case <-r.stateCloseCh: - r.logger.Debug("stopped listening on StateChannel; closing...") - return } } } @@ -1363,6 +1342,7 @@ func (r *Reactor) processDataCh(ctx context.Context) { for { select { case <-ctx.Done(): + r.logger.Debug("stopped listening on DataChannel; closing...") return case envelope := <-r.dataCh.In: if err := r.handleMessage(r.dataCh.ID, envelope); err != nil { @@ -1372,10 +1352,6 @@ func (r *Reactor) processDataCh(ctx context.Context) { Err: err, } } - - case <-r.closeCh: - r.logger.Debug("stopped listening on DataChannel; closing...") - return } } } @@ -1389,6 +1365,7 @@ func (r *Reactor) processVoteCh(ctx context.Context) { for { select { case <-ctx.Done(): + r.logger.Debug("stopped listening on VoteChannel; closing...") return case envelope := <-r.voteCh.In: if err := r.handleMessage(r.voteCh.ID, envelope); err != nil { @@ -1398,10 +1375,6 @@ func (r *Reactor) processVoteCh(ctx context.Context) { Err: err, } } - - case <-r.closeCh: - r.logger.Debug("stopped listening on VoteChannel; closing...") - return } } } @@ -1415,6 +1388,7 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context) { for { select { case <-ctx.Done(): + r.logger.Debug("stopped listening on VoteSetBitsChannel; closing...") return case envelope := <-r.voteSetBitsCh.In: if err := r.handleMessage(r.voteSetBitsCh.ID, envelope); err != nil { @@ -1424,10 +1398,6 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context) { Err: err, } } - - case <-r.closeCh: - r.logger.Debug("stopped listening on VoteSetBitsChannel; closing...") - return } } } @@ -1441,13 +1411,10 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { for { select { case <-ctx.Done(): + r.logger.Debug("stopped listening on peer updates channel; closing...") return case peerUpdate := <-r.peerUpdates.Updates(): r.processPeerUpdate(ctx, peerUpdate) - - case <-r.closeCh: - r.logger.Debug("stopped listening on peer updates channel; closing...") - return } } } @@ -1486,8 +1453,6 @@ func (r *Reactor) peerStatsRoutine(ctx context.Context) { } case <-ctx.Done(): return - case <-r.closeCh: - return } } } diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index b559f0a2c..908e7d5f6 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -50,7 +50,6 @@ type Reactor struct { evpool *Pool evidenceCh *p2p.Channel peerUpdates *p2p.PeerUpdates - closeCh chan struct{} peerWG sync.WaitGroup @@ -72,7 +71,6 @@ func NewReactor( evpool: evpool, evidenceCh: evidenceCh, peerUpdates: peerUpdates, - closeCh: make(chan struct{}), peerRoutines: make(map[types.NodeID]*tmsync.Closer), } @@ -104,10 +102,6 @@ func (r *Reactor) OnStop() { // exit. r.peerWG.Wait() - // Close closeCh to signal to all spawned goroutines to gracefully exit. All - // p2p Channels should execute Close(). - close(r.closeCh) - // Wait for all p2p Channels to be closed before returning. This ensures we // can easily reason about synchronization of all p2p Channels and ensure no // panics will occur. @@ -188,6 +182,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context) { for { select { case <-ctx.Done(): + r.logger.Debug("stopped listening on evidence channel; closing...") return case envelope := <-r.evidenceCh.In: if err := r.handleMessage(r.evidenceCh.ID, envelope); err != nil { @@ -197,10 +192,6 @@ func (r *Reactor) processEvidenceCh(ctx context.Context) { Err: err, } } - - case <-r.closeCh: - r.logger.Debug("stopped listening on evidence channel; closing...") - return } } } @@ -226,8 +217,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda case p2p.PeerStatusUp: // Do not allow starting new evidence broadcast loops after reactor shutdown // has been initiated. This can happen after we've manually closed all - // peer broadcast loops and closed r.closeCh, but the router still sends - // in-flight peer updates. + // peer broadcast loops, but the router still sends in-flight peer updates. if !r.IsRunning() { return } @@ -268,8 +258,6 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { case peerUpdate := <-r.peerUpdates.Updates(): r.processPeerUpdate(ctx, peerUpdate) case <-ctx.Done(): - return - case <-r.closeCh: r.logger.Debug("stopped listening on peer updates channel; closing...") return } @@ -323,11 +311,6 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID // The peer is marked for removal via a PeerUpdate as the doneCh was // explicitly closed to signal we should exit. return - - case <-r.closeCh: - // The reactor has signaled that we are stopped and thus we should - // implicitly exit this peer's goroutine. - return } } @@ -366,9 +349,7 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID // explicitly closed to signal we should exit. return - case <-r.closeCh: - // The reactor has signaled that we are stopped and thus we should - // implicitly exit this peer's goroutine. + case <-ctx.Done(): return } } diff --git a/internal/statesync/dispatcher.go b/internal/statesync/dispatcher.go index 844cb5e32..8620e6285 100644 --- a/internal/statesync/dispatcher.go +++ b/internal/statesync/dispatcher.go @@ -27,7 +27,6 @@ var ( type Dispatcher struct { // the channel with which to send light block requests on requestCh chan<- p2p.Envelope - closeCh chan struct{} mtx sync.Mutex // all pending calls that have been dispatched and are awaiting an answer @@ -37,7 +36,6 @@ type Dispatcher struct { func NewDispatcher(requestCh chan<- p2p.Envelope) *Dispatcher { return &Dispatcher{ requestCh: requestCh, - closeCh: make(chan struct{}), calls: make(map[types.NodeID]chan *types.LightBlock), } } @@ -47,7 +45,7 @@ func NewDispatcher(requestCh chan<- p2p.Envelope) *Dispatcher { // LightBlock response is used to signal that the peer doesn't have the requested LightBlock. func (d *Dispatcher) LightBlock(ctx context.Context, height int64, peer types.NodeID) (*types.LightBlock, error) { // dispatch the request to the peer - callCh, err := d.dispatch(peer, height) + callCh, err := d.dispatch(ctx, peer, height) if err != nil { return nil, err } @@ -69,19 +67,16 @@ func (d *Dispatcher) LightBlock(ctx context.Context, height int64, peer types.No case <-ctx.Done(): return nil, ctx.Err() - - case <-d.closeCh: - return nil, errDisconnected } } // dispatch takes a peer and allocates it a channel so long as it's not already // busy and the receiving channel is still running. It then dispatches the message -func (d *Dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.LightBlock, error) { +func (d *Dispatcher) dispatch(ctx context.Context, peer types.NodeID, height int64) (chan *types.LightBlock, error) { d.mtx.Lock() defer d.mtx.Unlock() select { - case <-d.closeCh: + case <-ctx.Done(): return nil, errDisconnected default: } @@ -141,17 +136,12 @@ func (d *Dispatcher) Respond(lb *tmproto.LightBlock, peer types.NodeID) error { func (d *Dispatcher) Close() { d.mtx.Lock() defer d.mtx.Unlock() - close(d.closeCh) for peer, call := range d.calls { delete(d.calls, peer) close(call) } } -func (d *Dispatcher) Done() <-chan struct{} { - return d.closeCh -} - //---------------------------------------------------------------- // BlockProvider is a p2p based light provider which uses a dispatcher connected diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index cd3dde3ea..f6eac2a97 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -142,7 +142,6 @@ type Reactor struct { blockCh *p2p.Channel paramsCh *p2p.Channel peerUpdates *p2p.PeerUpdates - closeCh chan struct{} // Dispatcher is used to multiplex light block requests and responses over multiple // peers used by the p2p state provider and in reverse sync. @@ -192,7 +191,6 @@ func NewReactor( blockCh: blockCh, paramsCh: paramsCh, peerUpdates: peerUpdates, - closeCh: make(chan struct{}), tempDir: tempDir, stateStore: stateStore, blockStore: blockStore, @@ -227,12 +225,6 @@ func (r *Reactor) OnStart(ctx context.Context) error { func (r *Reactor) OnStop() { // tell the dispatcher to stop sending any more requests r.dispatcher.Close() - // wait for any remaining requests to complete - <-r.dispatcher.Done() - - // Close closeCh to signal to all spawned goroutines to gracefully exit. All - // p2p Channels should execute Close(). - close(r.closeCh) <-r.peerUpdates.Done() } @@ -268,7 +260,6 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { r.stateProvider, r.snapshotCh.Out, r.chunkCh.Out, - ctx.Done(), r.tempDir, r.metrics, ) @@ -290,7 +281,6 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { select { case <-ctx.Done(): - case <-r.closeCh: case r.snapshotCh.Out <- msg: } } @@ -446,9 +436,6 @@ func (r *Reactor) backfill( // verify all light blocks for { select { - case <-r.closeCh: - queue.close() - return nil case <-ctx.Done(): queue.close() return nil @@ -816,6 +803,7 @@ func (r *Reactor) processCh(ctx context.Context, ch *p2p.Channel, chName string) for { select { case <-ctx.Done(): + r.logger.Debug("channel closed", "channel", chName) return case envelope := <-ch.In: if err := r.handleMessage(ch.ID, envelope); err != nil { @@ -829,17 +817,13 @@ func (r *Reactor) processCh(ctx context.Context, ch *p2p.Channel, chName string) Err: err, } } - - case <-r.closeCh: - r.logger.Debug("channel closed", "channel", chName) - return } } } // processPeerUpdate processes a PeerUpdate, returning an error upon failing to // handle the PeerUpdate or if a panic is recovered. -func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) { r.logger.Info("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) switch peerUpdate.Status { @@ -859,7 +843,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { case p2p.PeerStatusUp: newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher) r.providers[peerUpdate.NodeID] = newProvider - err := r.syncer.AddPeer(peerUpdate.NodeID) + err := r.syncer.AddPeer(ctx, peerUpdate.NodeID) if err != nil { r.logger.Error("error adding peer to syncer", "error", err) return @@ -886,13 +870,10 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { for { select { case <-ctx.Done(): - return - case peerUpdate := <-r.peerUpdates.Updates(): - r.processPeerUpdate(peerUpdate) - - case <-r.closeCh: r.logger.Debug("stopped listening on peer updates channel; closing...") return + case peerUpdate := <-r.peerUpdates.Updates(): + r.processPeerUpdate(ctx, peerUpdate) } } } @@ -981,9 +962,6 @@ func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) error { case <-ctx.Done(): return fmt.Errorf("operation canceled while waiting for peers after %.2fs [%d/%d]", time.Since(startAt).Seconds(), r.peers.Len(), numPeers) - case <-r.closeCh: - return fmt.Errorf("shutdown while waiting for peers after %.2fs [%d/%d]", - time.Since(startAt).Seconds(), r.peers.Len(), numPeers) case <-t.C: continue case <-logT.C: diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 82ec0f68d..b1863f17b 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -172,7 +172,6 @@ func setup( stateProvider, rts.snapshotOutCh, rts.chunkOutCh, - ctx.Done(), "", rts.reactor.metrics, ) diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index f266017dd..a0f79494a 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -70,7 +70,6 @@ type syncer struct { avgChunkTime int64 lastSyncedSnapshotHeight int64 processingSnapshot *snapshot - closeCh <-chan struct{} } // newSyncer creates a new syncer. @@ -82,7 +81,6 @@ func newSyncer( stateProvider StateProvider, snapshotCh chan<- p2p.Envelope, chunkCh chan<- p2p.Envelope, - closeCh <-chan struct{}, tempDir string, metrics *Metrics, ) *syncer { @@ -98,7 +96,6 @@ func newSyncer( fetchers: cfg.Fetchers, retryTimeout: cfg.ChunkRequestTimeout, metrics: metrics, - closeCh: closeCh, } } @@ -141,7 +138,7 @@ func (s *syncer) AddSnapshot(peerID types.NodeID, snapshot *snapshot) (bool, err // AddPeer adds a peer to the pool. For now we just keep it simple and send a // single request to discover snapshots, later we may want to do retries and stuff. -func (s *syncer) AddPeer(peerID types.NodeID) (err error) { +func (s *syncer) AddPeer(ctx context.Context, peerID types.NodeID) (err error) { defer func() { // TODO: remove panic recover once AddPeer can no longer accientally send on // closed channel. @@ -160,7 +157,7 @@ func (s *syncer) AddPeer(peerID types.NodeID) (err error) { } select { - case <-s.closeCh: + case <-ctx.Done(): case s.snapshotCh <- msg: } return err @@ -494,8 +491,6 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *ch select { case <-ctx.Done(): return - case <-s.closeCh: - return case <-time.After(2 * time.Second): continue } @@ -511,7 +506,7 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *ch ticker := time.NewTicker(s.retryTimeout) defer ticker.Stop() - s.requestChunk(snapshot, index) + s.requestChunk(ctx, snapshot, index) select { case <-chunks.WaitFor(index): @@ -522,8 +517,6 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *ch case <-ctx.Done(): return - case <-s.closeCh: - return } ticker.Stop() @@ -531,7 +524,7 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *ch } // requestChunk requests a chunk from a peer. -func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) { +func (s *syncer) requestChunk(ctx context.Context, snapshot *snapshot, chunk uint32) { peer := s.snapshots.GetPeer(snapshot) if peer == "" { s.logger.Error("No valid peers found for snapshot", "height", snapshot.Height, @@ -558,7 +551,7 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) { select { case s.chunkCh <- msg: - case <-s.closeCh: + case <-ctx.Done(): } } diff --git a/internal/statesync/syncer_test.go b/internal/statesync/syncer_test.go index 4c240830f..816e6301a 100644 --- a/internal/statesync/syncer_test.go +++ b/internal/statesync/syncer_test.go @@ -78,13 +78,13 @@ func TestSyncer_SyncAny(t *testing.T) { require.Error(t, err) // Adding a couple of peers should trigger snapshot discovery messages - err = rts.syncer.AddPeer(peerAID) + err = rts.syncer.AddPeer(ctx, peerAID) require.NoError(t, err) e := <-rts.snapshotOutCh require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message) require.Equal(t, peerAID, e.To) - err = rts.syncer.AddPeer(peerBID) + err = rts.syncer.AddPeer(ctx, peerBID) require.NoError(t, err) e = <-rts.snapshotOutCh require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)