From 81dcc8d1b44898dbac6154f4388a117f54e13af5 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Thu, 10 Feb 2022 07:29:54 -0500 Subject: [PATCH] mempool+evidence: simplify cleanup (#7794) --- internal/evidence/reactor.go | 46 +++++++++----------------------- internal/mempool/reactor.go | 43 ++++++----------------------- internal/mempool/reactor_test.go | 6 +---- 3 files changed, 21 insertions(+), 74 deletions(-) diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 76086f17e..62479874f 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -8,7 +8,6 @@ import ( "time" clist "github.com/tendermint/tendermint/internal/libs/clist" - tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" @@ -51,10 +50,8 @@ type Reactor struct { evidenceCh *p2p.Channel peerUpdates *p2p.PeerUpdates - peerWG sync.WaitGroup - mtx sync.Mutex - peerRoutines map[types.NodeID]*tmsync.Closer + peerRoutines map[types.NodeID]context.CancelFunc } // NewReactor returns a reference to a new evidence reactor, which implements the @@ -77,7 +74,7 @@ func NewReactor( evpool: evpool, evidenceCh: evidenceCh, peerUpdates: peerUpdates, - peerRoutines: make(map[types.NodeID]*tmsync.Closer), + peerRoutines: make(map[types.NodeID]context.CancelFunc), } r.BaseService = *service.NewBaseService(logger, "Evidence", r) @@ -98,16 +95,6 @@ func (r *Reactor) OnStart(ctx context.Context) error { // OnStop stops the reactor by signaling to all spawned goroutines to exit and // blocking until they all exit. func (r *Reactor) OnStop() { - r.mtx.Lock() - for _, c := range r.peerRoutines { - c.Close() - } - r.mtx.Unlock() - - // Wait for all spawned peer evidence broadcasting goroutines to gracefully - // exit. - r.peerWG.Wait() - // Close the evidence db r.evpool.Close() } @@ -221,11 +208,9 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // safely, and finally start the goroutine to broadcast evidence to that peer. _, ok := r.peerRoutines[peerUpdate.NodeID] if !ok { - closer := tmsync.NewCloser() - - r.peerRoutines[peerUpdate.NodeID] = closer - r.peerWG.Add(1) - go r.broadcastEvidenceLoop(ctx, peerUpdate.NodeID, closer) + pctx, pcancel := context.WithCancel(ctx) + r.peerRoutines[peerUpdate.NodeID] = pcancel + go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID) } case p2p.PeerStatusDown: @@ -235,7 +220,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // from the map of peer evidence broadcasting goroutines. closer, ok := r.peerRoutines[peerUpdate.NodeID] if ok { - closer.Close() + closer() } } } @@ -265,7 +250,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { // that the peer has already received or may not be ready for. // // REF: https://github.com/tendermint/tendermint/issues/4727 -func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, closer *tmsync.Closer) { +func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID) { var next *clist.CElement defer func() { @@ -273,8 +258,6 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID delete(r.peerRoutines, peerID) r.mtx.Unlock() - r.peerWG.Done() - if e := recover(); e != nil { r.logger.Error( "recovering from broadcasting evidence loop", @@ -284,6 +267,9 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID } }() + timer := time.NewTimer(0) + defer timer.Stop() + for { // This happens because the CElement we were looking at got garbage // collected (removed). That is, .NextWaitChan() returned nil. So we can go @@ -297,10 +283,6 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID case <-ctx.Done(): return - case <-closer.Done(): - // The peer is marked for removal via a PeerUpdate as the doneCh was - // explicitly closed to signal we should exit. - return } } @@ -324,18 +306,14 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID r.logger.Debug("gossiped evidence to peer", "evidence", ev, "peer", peerID) select { - case <-time.After(time.Second * broadcastEvidenceIntervalS): + case <-timer.C: // start from the beginning after broadcastEvidenceIntervalS seconds + timer.Reset(time.Second * broadcastEvidenceIntervalS) next = nil case <-next.NextWaitChan(): next = next.Next() - case <-closer.Done(): - // The peer is marked for removal via a PeerUpdate as the doneCh was - // explicitly closed to signal we should exit. - return - case <-ctx.Done(): return } diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go index 7ef80aecd..feda178b1 100644 --- a/internal/mempool/reactor.go +++ b/internal/mempool/reactor.go @@ -10,7 +10,6 @@ import ( "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/libs/clist" - tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" @@ -49,16 +48,12 @@ type Reactor struct { mempoolCh *p2p.Channel peerUpdates *p2p.PeerUpdates - // peerWG is used to coordinate graceful termination of all peer broadcasting - // goroutines. - peerWG sync.WaitGroup - // observePanic is a function for observing panics that were recovered in methods on // Reactor. observePanic is called with the recovered value. observePanic func(interface{}) mtx sync.Mutex - peerRoutines map[types.NodeID]*tmsync.Closer + peerRoutines map[types.NodeID]context.CancelFunc } // NewReactor returns a reference to a new reactor. @@ -85,7 +80,7 @@ func NewReactor( ids: NewMempoolIDs(), mempoolCh: ch, peerUpdates: peerUpdates, - peerRoutines: make(map[types.NodeID]*tmsync.Closer), + peerRoutines: make(map[types.NodeID]context.CancelFunc), observePanic: defaultObservePanic, } @@ -131,16 +126,7 @@ func (r *Reactor) OnStart(ctx context.Context) error { // OnStop stops the reactor by signaling to all spawned goroutines to exit and // blocking until they all exit. -func (r *Reactor) OnStop() { - r.mtx.Lock() - for _, c := range r.peerRoutines { - c.Close() - } - r.mtx.Unlock() - - // wait for all spawned peer tx broadcasting goroutines to gracefully exit - r.peerWG.Wait() -} +func (r *Reactor) OnStop() {} // handleMempoolMessage handles envelopes sent from peers on the MempoolChannel. // For every tx in the message, we execute CheckTx. It returns an error if an @@ -248,15 +234,13 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // safely, and finally start the goroutine to broadcast txs to that peer. _, ok := r.peerRoutines[peerUpdate.NodeID] if !ok { - closer := tmsync.NewCloser() - - r.peerRoutines[peerUpdate.NodeID] = closer - r.peerWG.Add(1) + pctx, pcancel := context.WithCancel(ctx) + r.peerRoutines[peerUpdate.NodeID] = pcancel r.ids.ReserveForPeer(peerUpdate.NodeID) // start a broadcast routine ensuring all txs are forwarded to the peer - go r.broadcastTxRoutine(ctx, peerUpdate.NodeID, closer) + go r.broadcastTxRoutine(pctx, peerUpdate.NodeID) } } @@ -269,7 +253,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // from the map of peer tx broadcasting goroutines. closer, ok := r.peerRoutines[peerUpdate.NodeID] if ok { - closer.Close() + closer() } } } @@ -288,7 +272,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { } } -func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, closer *tmsync.Closer) { +func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) { peerMempoolID := r.ids.GetForPeer(peerID) var nextGossipTx *clist.CElement @@ -298,8 +282,6 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, c delete(r.peerRoutines, peerID) r.mtx.Unlock() - r.peerWG.Done() - if e := recover(); e != nil { r.observePanic(e) r.logger.Error( @@ -326,11 +308,6 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, c if nextGossipTx = r.mempool.NextGossipTx(); nextGossipTx == nil { continue } - - case <-closer.Done(): - // The peer is marked for removal via a PeerUpdate as the doneCh was - // explicitly closed to signal we should exit. - return } } @@ -369,10 +346,6 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, c select { case <-nextGossipTx.NextWaitChan(): nextGossipTx = nextGossipTx.Next() - case <-closer.Done(): - // The peer is marked for removal via a PeerUpdate as the doneCh was - // explicitly closed to signal we should exit. - return case <-ctx.Done(): return } diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index cddbc3be8..b64b22120 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -15,7 +15,6 @@ import ( "github.com/tendermint/tendermint/abci/example/kvstore" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" - tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/p2ptest" "github.com/tendermint/tendermint/libs/log" @@ -172,9 +171,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) { // run the router rts.start(ctx, t) - closer := tmsync.NewCloser() - primaryReactor.peerWG.Add(1) - go primaryReactor.broadcastTxRoutine(ctx, secondary, closer) + go primaryReactor.broadcastTxRoutine(ctx, secondary) wg := &sync.WaitGroup{} for i := 0; i < 50; i++ { @@ -188,7 +185,6 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) { err := primaryReactor.Stop() require.NoError(t, err) - primaryReactor.peerWG.Wait() wg.Wait() }