From 070445bc1088a8cf08bace8080a4549e93dff5a3 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 30 Nov 2021 14:22:28 -0500 Subject: [PATCH] pex: improve goroutine lifecycle (#7343) I saw a race detected in a test here that I think would be better handled by just wiring up these threads. --- internal/p2p/pex/reactor.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index f6fcad5e1..53cd079bc 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -141,8 +141,8 @@ func NewReactor( // messages on that p2p channel accordingly. The caller must be sure to execute // OnStop to ensure the outbound p2p Channels are closed. func (r *Reactor) OnStart(ctx context.Context) error { - go r.processPexCh() - go r.processPeerUpdates() + go r.processPexCh(ctx) + go r.processPeerUpdates(ctx) return nil } @@ -162,17 +162,22 @@ func (r *Reactor) OnStop() { // processPexCh implements a blocking event loop where we listen for p2p // Envelope messages from the pexCh. -func (r *Reactor) processPexCh() { +func (r *Reactor) processPexCh(ctx context.Context) { defer r.pexCh.Close() - + timer := time.NewTimer(0) + defer timer.Stop() for { + timer.Reset(time.Until(r.nextRequestTime)) + select { + case <-ctx.Done(): + return case <-r.closeCh: r.Logger.Debug("stopped listening on PEX channel; closing...") return // outbound requests for new peers - case <-r.waitUntilNextRequest(): + case <-timer.C: r.sendRequestForPeers() // inbound requests for new peers or responses to requests sent by this @@ -192,11 +197,13 @@ func (r *Reactor) processPexCh() { // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates() { +func (r *Reactor) processPeerUpdates(ctx context.Context) { defer r.peerUpdates.Close() for { select { + case <-ctx.Done(): + return case peerUpdate := <-r.peerUpdates.Updates(): r.processPeerUpdate(peerUpdate) @@ -317,10 +324,6 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { } } -func (r *Reactor) waitUntilNextRequest() <-chan time.Time { - return time.After(time.Until(r.nextRequestTime)) -} - // sendRequestForPeers pops the first peerID off the list and sends the // peer a request for more peer addresses. The function then moves the // peer into the requestsSent bucket and calculates when the next request