From 892f5d952481b956d6c317557a60fa7f48f31786 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 8 Dec 2021 08:44:32 -0500 Subject: [PATCH] service: cleanup mempool and peer update shutdown (#7401) --- internal/blocksync/reactor.go | 4 -- internal/blocksync/reactor_test.go | 4 +- internal/consensus/reactor.go | 4 -- internal/evidence/reactor.go | 7 -- internal/evidence/reactor_test.go | 4 +- internal/mempool/reactor.go | 47 +++--------- internal/mempool/reactor_test.go | 16 +++-- internal/p2p/p2ptest/network.go | 17 ++--- internal/p2p/p2ptest/require.go | 6 -- internal/p2p/peermanager.go | 60 ++++++---------- internal/p2p/peermanager_scoring_test.go | 6 +- internal/p2p/peermanager_test.go | 91 ++++++++++++++---------- internal/p2p/pex/reactor.go | 1 - internal/p2p/pex/reactor_test.go | 14 +--- internal/p2p/router.go | 4 +- internal/p2p/router_test.go | 7 -- internal/statesync/reactor.go | 4 -- 17 files changed, 110 insertions(+), 186 deletions(-) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 479506c71..6a5620f4e 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -181,8 +181,6 @@ func (r *Reactor) OnStop() { // wait for the poolRoutine and requestRoutine goroutines to gracefully exit r.poolWG.Wait() - - <-r.peerUpdates.Done() } // respondToPeer loads a block and sends it to the requesting peer, if we have it. @@ -334,8 +332,6 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. func (r *Reactor) processPeerUpdates(ctx context.Context) { - defer r.peerUpdates.Close() - for { select { case <-ctx.Done(): diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 5345cb5c4..d9d959ddc 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -82,8 +82,6 @@ func setup( t.Cleanup(func() { cancel() for _, nodeID := range rts.nodes { - rts.peerUpdates[nodeID].Close() - if rts.reactors[nodeID].IsRunning() { rts.reactors[nodeID].Wait() rts.app[nodeID].Wait() @@ -228,7 +226,7 @@ func TestReactor_AbruptDisconnect(t *testing.T) { Status: p2p.PeerStatusDown, NodeID: rts.nodes[0], } - rts.network.Nodes[rts.nodes[1]].PeerManager.Disconnected(rts.nodes[0]) + rts.network.Nodes[rts.nodes[1]].PeerManager.Disconnected(ctx, rts.nodes[0]) } func TestReactor_SyncTime(t *testing.T) { diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 387f84b15..8f5cdd0b1 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -220,8 +220,6 @@ func (r *Reactor) OnStop() { state.broadcastWG.Wait() } r.mtx.Unlock() - - <-r.peerUpdates.Done() } // SetEventBus sets the reactor's event bus. @@ -1406,8 +1404,6 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context) { // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. func (r *Reactor) processPeerUpdates(ctx context.Context) { - defer r.peerUpdates.Close() - for { select { case <-ctx.Done(): diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 908e7d5f6..31e927ba7 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -102,11 +102,6 @@ func (r *Reactor) OnStop() { // exit. r.peerWG.Wait() - // Wait for all p2p Channels to be closed before returning. This ensures we - // can easily reason about synchronization of all p2p Channels and ensure no - // panics will occur. - <-r.peerUpdates.Done() - // Close the evidence db r.evpool.Close() } @@ -251,8 +246,6 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. func (r *Reactor) processPeerUpdates(ctx context.Context) { - defer r.peerUpdates.Close() - for { select { case peerUpdate := <-r.peerUpdates.Updates(): diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index b30f9e9b1..df636ba66 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -257,11 +257,11 @@ func TestReactorMultiDisconnect(t *testing.T) { // Ensure "disconnecting" the secondary peer from the primary more than once // is handled gracefully. - primary.PeerManager.Disconnected(secondary.NodeID) + primary.PeerManager.Disconnected(ctx, secondary.NodeID) require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusDown) _, err := primary.PeerManager.TryEvictNext() require.NoError(t, err) - primary.PeerManager.Disconnected(secondary.NodeID) + primary.PeerManager.Disconnected(ctx, secondary.NodeID) require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusDown) require.Equal(t, secondary.PeerManager.Status(primary.NodeID), p2p.PeerStatusUp) diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go index 00345ccf8..43215f5f8 100644 --- a/internal/mempool/reactor.go +++ b/internal/mempool/reactor.go @@ -48,7 +48,6 @@ type Reactor struct { mempoolCh *p2p.Channel peerUpdates *p2p.PeerUpdates - closeCh chan struct{} // peerWG is used to coordinate graceful termination of all peer broadcasting // goroutines. @@ -80,7 +79,6 @@ func NewReactor( ids: NewMempoolIDs(), mempoolCh: mempoolCh, peerUpdates: peerUpdates, - closeCh: make(chan struct{}), peerRoutines: make(map[types.NodeID]*tmsync.Closer), observePanic: defaultObservePanic, } @@ -136,19 +134,13 @@ func (r *Reactor) OnStop() { // wait for all spawned peer tx broadcasting goroutines to gracefully exit r.peerWG.Wait() - - // Close closeCh to signal to all spawned goroutines to gracefully exit. All - // p2p Channels should execute Close(). - close(r.closeCh) - - <-r.peerUpdates.Done() } // handleMempoolMessage handles envelopes sent from peers on the MempoolChannel. // For every tx in the message, we execute CheckTx. It returns an error if an // empty set of txs are sent in an envelope or if we receive an unexpected // message type. -func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error { +func (r *Reactor) handleMempoolMessage(ctx context.Context, envelope p2p.Envelope) error { logger := r.logger.With("peer", envelope.From) switch msg := envelope.Message.(type) { @@ -164,7 +156,7 @@ func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error { } for _, tx := range protoTxs { - if err := r.mempool.CheckTx(context.Background(), types.Tx(tx), nil, txInfo); err != nil { + if err := r.mempool.CheckTx(ctx, types.Tx(tx), nil, txInfo); err != nil { logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", types.Tx(tx).Hash()), "err", err) } } @@ -179,7 +171,7 @@ func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error { // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope p2p.Envelope) (err error) { defer func() { if e := recover(); e != nil { r.observePanic(e) @@ -196,7 +188,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err switch chID { case MempoolChannel: - err = r.handleMempoolMessage(envelope) + err = r.handleMempoolMessage(ctx, envelope) default: err = fmt.Errorf("unknown channel ID (%d) for envelope (%T)", chID, envelope.Message) @@ -211,7 +203,7 @@ func (r *Reactor) processMempoolCh(ctx context.Context) { for { select { case envelope := <-r.mempoolCh.In: - if err := r.handleMessage(r.mempoolCh.ID, envelope); err != nil { + if err := r.handleMessage(ctx, r.mempoolCh.ID, envelope); err != nil { r.logger.Error("failed to process message", "ch_id", r.mempoolCh.ID, "envelope", envelope, "err", err) r.mempoolCh.Error <- p2p.PeerError{ NodeID: envelope.From, @@ -219,8 +211,6 @@ func (r *Reactor) processMempoolCh(ctx context.Context) { } } case <-ctx.Done(): - return - case <-r.closeCh: r.logger.Debug("stopped listening on mempool channel; closing...") return } @@ -242,8 +232,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda case p2p.PeerStatusUp: // Do not allow starting new tx broadcast loops after reactor shutdown // has been initiated. This can happen after we've manually closed all - // peer broadcast loops and closed r.closeCh, but the router still sends - // in-flight peer updates. + // peer broadcast, but the router still sends in-flight peer updates. if !r.IsRunning() { return } @@ -285,18 +274,13 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. func (r *Reactor) processPeerUpdates(ctx context.Context) { - defer r.peerUpdates.Close() - for { select { case <-ctx.Done(): + r.logger.Debug("stopped listening on peer updates channel; closing...") return case peerUpdate := <-r.peerUpdates.Updates(): r.processPeerUpdate(ctx, peerUpdate) - - case <-r.closeCh: - r.logger.Debug("stopped listening on peer updates channel; closing...") - return } } } @@ -333,6 +317,8 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, c // start from the beginning. if nextGossipTx == nil { select { + case <-ctx.Done(): + return case <-r.mempool.WaitForNextTx(): // wait until a tx is available if nextGossipTx = r.mempool.NextGossipTx(); nextGossipTx == nil { continue @@ -342,14 +328,6 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, c // The peer is marked for removal via a PeerUpdate as the doneCh was // explicitly closed to signal we should exit. return - - case <-ctx.Done(): - return - - case <-r.closeCh: - // The reactor has signaled that we are stopped and thus we should - // implicitly exit this peer's goroutine. - return } } @@ -388,19 +366,12 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, c select { case <-nextGossipTx.NextWaitChan(): nextGossipTx = nextGossipTx.Next() - case <-closer.Done(): // The peer is marked for removal via a PeerUpdate as the doneCh was // explicitly closed to signal we should exit. return - case <-ctx.Done(): return - - case <-r.closeCh: - // The reactor has signaled that we are stopped and thus we should - // implicitly exit this peer's goroutine. - return } } } diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index 86a3b4db4..096544910 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -64,7 +64,7 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint) mempool := setup(ctx, t, 0) rts.mempools[nodeID] = mempool - rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) + rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf) rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) @@ -102,6 +102,7 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint) func (rts *reactorTestSuite) start(ctx context.Context, t *testing.T) { t.Helper() rts.network.Start(ctx, t) + require.Len(t, rts.network.RandomNode().PeerManager.Peers(), len(rts.nodes)-1, @@ -126,13 +127,17 @@ func (rts *reactorTestSuite) waitForTxns(t *testing.T, txs []types.Tx, ids ...ty if !p2ptest.NodeInSlice(name, ids) { continue } + if len(txs) == pool.Size() { + continue + } wg.Add(1) go func(pool *TxMempool) { defer wg.Done() require.Eventually(t, func() bool { return len(txs) == pool.Size() }, time.Minute, - 100*time.Millisecond, + 250*time.Millisecond, + "ntx=%d, size=%d", len(txs), pool.Size(), ) }(pool) } @@ -191,14 +196,15 @@ func TestReactorBroadcastTxs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setupReactors(ctx, t, numNodes, 0) + rts := setupReactors(ctx, t, numNodes, uint(numTxs)) primary := rts.nodes[0] secondaries := rts.nodes[1:] txs := checkTxs(ctx, t, rts.reactors[primary].mempool, numTxs, UnknownPeerID) - // run the router + require.Equal(t, numTxs, rts.reactors[primary].mempool.Size()) + rts.start(ctx, t) // Wait till all secondary suites (reactor) received all mempool txs from the @@ -407,7 +413,7 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setupReactors(ctx, t, 2, 0) + rts := setupReactors(ctx, t, 2, 2) primary := rts.nodes[0] secondary := rts.nodes[1] diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 30f1a435f..bde96ba66 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -76,10 +76,11 @@ func (n *Network) Start(ctx context.Context, t *testing.T) { // for each node. dialQueue := []p2p.NodeAddress{} subs := map[types.NodeID]*p2p.PeerUpdates{} + subctx, subcancel := context.WithCancel(ctx) + defer subcancel() for _, node := range n.Nodes { dialQueue = append(dialQueue, node.NodeAddress) - subs[node.NodeID] = node.PeerManager.Subscribe(ctx) - defer subs[node.NodeID].Close() + subs[node.NodeID] = node.PeerManager.Subscribe(subctx) } // For each node, dial the nodes that it still doesn't have a connection to @@ -197,9 +198,10 @@ func (n *Network) Remove(ctx context.Context, t *testing.T, id types.NodeID) { delete(n.Nodes, id) subs := []*p2p.PeerUpdates{} + subctx, subcancel := context.WithCancel(ctx) + defer subcancel() for _, peer := range n.Nodes { - sub := peer.PeerManager.Subscribe(ctx) - defer sub.Close() + sub := peer.PeerManager.Subscribe(subctx) subs = append(subs, sub) } @@ -329,7 +331,6 @@ func (n *Node) MakePeerUpdates(ctx context.Context, t *testing.T) *p2p.PeerUpdat sub := n.PeerManager.Subscribe(ctx) t.Cleanup(func() { RequireNoUpdates(ctx, t, sub) - sub.Close() }) return sub @@ -339,11 +340,7 @@ func (n *Node) MakePeerUpdates(ctx context.Context, t *testing.T) *p2p.PeerUpdat // It does *not* check that all updates have been consumed, but will // close the update channel. func (n *Node) MakePeerUpdatesNoRequireEmpty(ctx context.Context, t *testing.T) *p2p.PeerUpdates { - sub := n.PeerManager.Subscribe(ctx) - - t.Cleanup(sub.Close) - - return sub + return n.PeerManager.Subscribe(ctx) } func MakeChannelDesc(chID p2p.ChannelID) *p2p.ChannelDescriptor { diff --git a/internal/p2p/p2ptest/require.go b/internal/p2p/p2ptest/require.go index 3a7731829..ce44a724c 100644 --- a/internal/p2p/p2ptest/require.go +++ b/internal/p2p/p2ptest/require.go @@ -119,9 +119,6 @@ func RequireUpdate(t *testing.T, peerUpdates *p2p.PeerUpdates, expect p2p.PeerUp case update := <-peerUpdates.Updates(): require.Equal(t, expect, update, "peer update did not match") - case <-peerUpdates.Done(): - require.Fail(t, "peer updates subscription is closed") - case <-timer.C: require.Fail(t, "timed out waiting for peer update", "expected %v", expect) } @@ -143,9 +140,6 @@ func RequireUpdates(t *testing.T, peerUpdates *p2p.PeerUpdates, expect []p2p.Pee return } - case <-peerUpdates.Done(): - require.Fail(t, "peer updates subscription is closed") - case <-timer.C: require.Equal(t, expect, actual, "did not receive expected peer updates") return diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 40dcf8464..f7e4fb730 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -56,8 +56,6 @@ type PeerUpdate struct { type PeerUpdates struct { routerUpdatesCh chan PeerUpdate reactorUpdatesCh chan PeerUpdate - closeOnce sync.Once - doneCh chan struct{} } // NewPeerUpdates creates a new PeerUpdates subscription. It is primarily for @@ -67,7 +65,6 @@ func NewPeerUpdates(updatesCh chan PeerUpdate, buf int) *PeerUpdates { return &PeerUpdates{ reactorUpdatesCh: updatesCh, routerUpdatesCh: make(chan PeerUpdate, buf), - doneCh: make(chan struct{}), } } @@ -76,21 +73,6 @@ func (pu *PeerUpdates) Updates() <-chan PeerUpdate { return pu.reactorUpdatesCh } -// Done returns a channel that is closed when the subscription is closed. -func (pu *PeerUpdates) Done() <-chan struct{} { - return pu.doneCh -} - -// Close closes the peer updates subscription. -func (pu *PeerUpdates) Close() { - pu.closeOnce.Do(func() { - // NOTE: We don't close updatesCh since multiple goroutines may be - // sending on it. The PeerManager senders will select on doneCh as well - // to avoid blocking on a closed subscription. - close(pu.doneCh) - }) -} - // SendUpdate pushes information about a peer into the routing layer, // presumably from a peer. func (pu *PeerUpdates) SendUpdate(ctx context.Context, update PeerUpdate) { @@ -692,13 +674,13 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error { // peer must already be marked as connected. This is separate from Dialed() and // Accepted() to allow the router to set up its internal queues before reactors // start sending messages. -func (m *PeerManager) Ready(peerID types.NodeID) { +func (m *PeerManager) Ready(ctx context.Context, peerID types.NodeID) { m.mtx.Lock() defer m.mtx.Unlock() if m.connected[peerID] { m.ready[peerID] = true - m.broadcast(PeerUpdate{ + m.broadcast(ctx, PeerUpdate{ NodeID: peerID, Status: PeerStatusUp, }) @@ -759,7 +741,7 @@ func (m *PeerManager) TryEvictNext() (types.NodeID, error) { // Disconnected unmarks a peer as connected, allowing it to be dialed or // accepted again as appropriate. -func (m *PeerManager) Disconnected(peerID types.NodeID) { +func (m *PeerManager) Disconnected(ctx context.Context, peerID types.NodeID) { m.mtx.Lock() defer m.mtx.Unlock() @@ -772,7 +754,7 @@ func (m *PeerManager) Disconnected(peerID types.NodeID) { delete(m.ready, peerID) if ready { - m.broadcast(PeerUpdate{ + m.broadcast(ctx, PeerUpdate{ NodeID: peerID, Status: PeerStatusDown, }) @@ -854,8 +836,8 @@ func (m *PeerManager) Subscribe(ctx context.Context) *PeerUpdates { // otherwise the PeerManager will halt. func (m *PeerManager) Register(ctx context.Context, peerUpdates *PeerUpdates) { m.mtx.Lock() + defer m.mtx.Unlock() m.subscriptions[peerUpdates] = peerUpdates - m.mtx.Unlock() go func() { for { @@ -863,26 +845,27 @@ func (m *PeerManager) Register(ctx context.Context, peerUpdates *PeerUpdates) { case <-ctx.Done(): return case pu := <-peerUpdates.routerUpdatesCh: - m.processPeerEvent(pu) + m.processPeerEvent(ctx, pu) } } }() go func() { - select { - case <-peerUpdates.Done(): - m.mtx.Lock() - delete(m.subscriptions, peerUpdates) - m.mtx.Unlock() - case <-ctx.Done(): - } + <-ctx.Done() + m.mtx.Lock() + defer m.mtx.Unlock() + delete(m.subscriptions, peerUpdates) }() } -func (m *PeerManager) processPeerEvent(pu PeerUpdate) { +func (m *PeerManager) processPeerEvent(ctx context.Context, pu PeerUpdate) { m.mtx.Lock() defer m.mtx.Unlock() + if ctx.Err() != nil { + return + } + if _, ok := m.store.peers[pu.NodeID]; !ok { m.store.peers[pu.NodeID] = &peerInfo{} } @@ -902,18 +885,15 @@ func (m *PeerManager) processPeerEvent(pu PeerUpdate) { // // FIXME: Consider using an internal channel to buffer updates while also // maintaining order if this is a problem. -func (m *PeerManager) broadcast(peerUpdate PeerUpdate) { +func (m *PeerManager) broadcast(ctx context.Context, peerUpdate PeerUpdate) { for _, sub := range m.subscriptions { - // We have to check doneChan separately first, otherwise there's a 50% - // chance the second select will send on a closed subscription. - select { - case <-sub.doneCh: - continue - default: + if ctx.Err() != nil { + return } select { + case <-ctx.Done(): + return case sub.reactorUpdatesCh <- peerUpdate: - case <-sub.doneCh: } } } diff --git a/internal/p2p/peermanager_scoring_test.go b/internal/p2p/peermanager_scoring_test.go index ecaf71c98..4c7bef0cc 100644 --- a/internal/p2p/peermanager_scoring_test.go +++ b/internal/p2p/peermanager_scoring_test.go @@ -38,7 +38,7 @@ func TestPeerScoring(t *testing.T) { // add a bunch of good status updates and watch things increase. for i := 1; i < 10; i++ { - peerManager.processPeerEvent(PeerUpdate{ + peerManager.processPeerEvent(ctx, PeerUpdate{ NodeID: id, Status: PeerStatusGood, }) @@ -47,7 +47,7 @@ func TestPeerScoring(t *testing.T) { // watch the corresponding decreases respond to update for i := 10; i == 0; i-- { - peerManager.processPeerEvent(PeerUpdate{ + peerManager.processPeerEvent(ctx, PeerUpdate{ NodeID: id, Status: PeerStatusBad, }) @@ -57,7 +57,6 @@ func TestPeerScoring(t *testing.T) { t.Run("AsynchronousIncrement", func(t *testing.T) { start := peerManager.Scores()[id] pu := peerManager.Subscribe(ctx) - defer pu.Close() pu.SendUpdate(ctx, PeerUpdate{ NodeID: id, Status: PeerStatusGood, @@ -71,7 +70,6 @@ func TestPeerScoring(t *testing.T) { t.Run("AsynchronousDecrement", func(t *testing.T) { start := peerManager.Scores()[id] pu := peerManager.Subscribe(ctx) - defer pu.Close() pu.SendUpdate(ctx, PeerUpdate{ NodeID: id, Status: PeerStatusBad, diff --git a/internal/p2p/peermanager_test.go b/internal/p2p/peermanager_test.go index dec92dab0..2999e8d6d 100644 --- a/internal/p2p/peermanager_test.go +++ b/internal/p2p/peermanager_test.go @@ -461,9 +461,11 @@ func TestPeerManager_DialNext_WakeOnDisconnected(t *testing.T) { require.NoError(t, err) require.Zero(t, dial) + dctx, dcancel := context.WithTimeout(ctx, 300*time.Millisecond) + defer dcancel() go func() { time.Sleep(200 * time.Millisecond) - peerManager.Disconnected(a.NodeID) + peerManager.Disconnected(dctx, a.NodeID) }() ctx, cancel = context.WithTimeout(ctx, 3*time.Second) @@ -510,6 +512,9 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) { } func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))} c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))} @@ -575,7 +580,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { // Now, if we disconnect a, we should be allowed to dial d because we have a // free upgrade slot. - peerManager.Disconnected(a.NodeID) + peerManager.Disconnected(ctx, a.NodeID) dial, err = peerManager.TryDialNext() require.NoError(t, err) require.Equal(t, d, dial) @@ -584,7 +589,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { // However, if we disconnect b (such that only c and d are connected), we // should not be allowed to dial e even though there are upgrade slots, // because there are no lower-scored nodes that can be upgraded. - peerManager.Disconnected(b.NodeID) + peerManager.Disconnected(ctx, b.NodeID) added, err = peerManager.Add(e) require.NoError(t, err) require.True(t, added) @@ -966,6 +971,9 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) { } func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))} c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))} @@ -1005,7 +1013,7 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) { // In the meanwhile, a disconnects and d connects. d is even lower-scored // than b (1 vs 2), which is currently being upgraded. - peerManager.Disconnected(a.NodeID) + peerManager.Disconnected(ctx, a.NodeID) added, err = peerManager.Add(d) require.NoError(t, err) require.True(t, added) @@ -1020,6 +1028,9 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) { } func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))} c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))} @@ -1055,7 +1066,7 @@ func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) { require.Equal(t, c, dial) // In the meanwhile, b disconnects. - peerManager.Disconnected(b.NodeID) + peerManager.Disconnected(ctx, b.NodeID) // Once c completes the upgrade of b, there is no longer a need to // evict anything since we're at capacity. @@ -1188,6 +1199,9 @@ func TestPeerManager_Accepted_MaxConnectedUpgrade(t *testing.T) { } func TestPeerManager_Accepted_Upgrade(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))} c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))} @@ -1224,7 +1238,7 @@ func TestPeerManager_Accepted_Upgrade(t *testing.T) { evict, err := peerManager.TryEvictNext() require.NoError(t, err) require.Equal(t, a.NodeID, evict) - peerManager.Disconnected(a.NodeID) + peerManager.Disconnected(ctx, a.NodeID) // c still cannot get accepted, since it's not scored above b. require.Error(t, peerManager.Accepted(c.NodeID)) @@ -1288,7 +1302,6 @@ func TestPeerManager_Ready(t *testing.T) { require.NoError(t, err) sub := peerManager.Subscribe(ctx) - defer sub.Close() // Connecting to a should still have it as status down. added, err := peerManager.Add(a) @@ -1298,7 +1311,7 @@ func TestPeerManager_Ready(t *testing.T) { require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID)) // Marking a as ready should transition it to PeerStatusUp and send an update. - peerManager.Ready(a.NodeID) + peerManager.Ready(ctx, a.NodeID) require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID)) require.Equal(t, p2p.PeerUpdate{ NodeID: a.NodeID, @@ -1310,7 +1323,7 @@ func TestPeerManager_Ready(t *testing.T) { require.NoError(t, err) require.True(t, added) require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID)) - peerManager.Ready(b.NodeID) + peerManager.Ready(ctx, b.NodeID) require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID)) require.Empty(t, sub.Updates()) } @@ -1329,7 +1342,7 @@ func TestPeerManager_EvictNext(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(ctx, a.NodeID) // Since there are no peers to evict, EvictNext should block until timeout. timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) @@ -1365,7 +1378,7 @@ func TestPeerManager_EvictNext_WakeOnError(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(ctx, a.NodeID) // Spawn a goroutine to error a peer after a delay. go func() { @@ -1400,7 +1413,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(ctx, a.NodeID) // Spawn a goroutine to upgrade to b with a delay. go func() { @@ -1441,7 +1454,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(ctx, a.NodeID) // Spawn a goroutine to upgrade b with a delay. go func() { @@ -1457,6 +1470,9 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) { require.Equal(t, a.NodeID, evict) } func TestPeerManager_TryEvictNext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) @@ -1473,7 +1489,7 @@ func TestPeerManager_TryEvictNext(t *testing.T) { // Connecting to a won't evict anything either. require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(ctx, a.NodeID) // But if a errors it should be evicted. peerManager.Errored(a.NodeID, errors.New("foo")) @@ -1502,10 +1518,9 @@ func TestPeerManager_Disconnected(t *testing.T) { defer cancel() sub := peerManager.Subscribe(ctx) - defer sub.Close() // Disconnecting an unknown peer does nothing. - peerManager.Disconnected(a.NodeID) + peerManager.Disconnected(ctx, a.NodeID) require.Empty(t, peerManager.Peers()) require.Empty(t, sub.Updates()) @@ -1514,14 +1529,14 @@ func TestPeerManager_Disconnected(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Disconnected(a.NodeID) + peerManager.Disconnected(ctx, a.NodeID) require.Empty(t, sub.Updates()) // Disconnecting a ready peer sends a status update. _, err = peerManager.Add(a) require.NoError(t, err) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(ctx, a.NodeID) require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID)) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{ @@ -1529,7 +1544,7 @@ func TestPeerManager_Disconnected(t *testing.T) { Status: p2p.PeerStatusUp, }, <-sub.Updates()) - peerManager.Disconnected(a.NodeID) + peerManager.Disconnected(ctx, a.NodeID) require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID)) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{ @@ -1543,13 +1558,16 @@ func TestPeerManager_Disconnected(t *testing.T) { require.NoError(t, err) require.Equal(t, a, dial) - peerManager.Disconnected(a.NodeID) + peerManager.Disconnected(ctx, a.NodeID) dial, err = peerManager.TryDialNext() require.NoError(t, err) require.Zero(t, dial) } func TestPeerManager_Errored(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) @@ -1573,7 +1591,7 @@ func TestPeerManager_Errored(t *testing.T) { require.Zero(t, evict) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(ctx, a.NodeID) evict, err = peerManager.TryEvictNext() require.NoError(t, err) require.Zero(t, evict) @@ -1596,7 +1614,6 @@ func TestPeerManager_Subscribe(t *testing.T) { // This tests all subscription events for full peer lifecycles. sub := peerManager.Subscribe(ctx) - defer sub.Close() added, err := peerManager.Add(a) require.NoError(t, err) @@ -1607,11 +1624,11 @@ func TestPeerManager_Subscribe(t *testing.T) { require.NoError(t, peerManager.Accepted(a.NodeID)) require.Empty(t, sub.Updates()) - peerManager.Ready(a.NodeID) + peerManager.Ready(ctx, a.NodeID) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) - peerManager.Disconnected(a.NodeID) + peerManager.Disconnected(ctx, a.NodeID) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates()) @@ -1624,7 +1641,7 @@ func TestPeerManager_Subscribe(t *testing.T) { require.NoError(t, peerManager.Dialed(a)) require.Empty(t, sub.Updates()) - peerManager.Ready(a.NodeID) + peerManager.Ready(ctx, a.NodeID) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) @@ -1635,7 +1652,7 @@ func TestPeerManager_Subscribe(t *testing.T) { require.NoError(t, err) require.Equal(t, a.NodeID, evict) - peerManager.Disconnected(a.NodeID) + peerManager.Disconnected(ctx, a.NodeID) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates()) @@ -1659,7 +1676,6 @@ func TestPeerManager_Subscribe_Close(t *testing.T) { require.NoError(t, err) sub := peerManager.Subscribe(ctx) - defer sub.Close() added, err := peerManager.Add(a) require.NoError(t, err) @@ -1667,13 +1683,13 @@ func TestPeerManager_Subscribe_Close(t *testing.T) { require.NoError(t, peerManager.Accepted(a.NodeID)) require.Empty(t, sub.Updates()) - peerManager.Ready(a.NodeID) + peerManager.Ready(ctx, a.NodeID) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) // Closing the subscription should not send us the disconnected update. - sub.Close() - peerManager.Disconnected(a.NodeID) + cancel() + peerManager.Disconnected(ctx, a.NodeID) require.Empty(t, sub.Updates()) } @@ -1688,19 +1704,19 @@ func TestPeerManager_Subscribe_Broadcast(t *testing.T) { peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) require.NoError(t, err) + s2ctx, s2cancel := context.WithCancel(ctx) + defer s2cancel() + s1 := peerManager.Subscribe(ctx) - defer s1.Close() - s2 := peerManager.Subscribe(ctx) - defer s2.Close() + s2 := peerManager.Subscribe(s2ctx) s3 := peerManager.Subscribe(ctx) - defer s3.Close() // Connecting to a peer should send updates on all subscriptions. added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(ctx, a.NodeID) expectUp := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp} require.NotEmpty(t, s1) @@ -1712,8 +1728,9 @@ func TestPeerManager_Subscribe_Broadcast(t *testing.T) { // We now close s2. Disconnecting the peer should only send updates // on s1 and s3. - s2.Close() - peerManager.Disconnected(a.NodeID) + s2cancel() + time.Sleep(250 * time.Millisecond) // give the thread a chance to exit + peerManager.Disconnected(ctx, a.NodeID) expectDown := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown} require.NotEmpty(t, s1) diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index b42bb2f4b..6970c6fef 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -185,7 +185,6 @@ func (r *Reactor) processPexCh(ctx context.Context) { // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. func (r *Reactor) processPeerUpdates(ctx context.Context) { - defer r.peerUpdates.Close() for { select { case <-ctx.Done(): diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index 5a061d76d..28da5c72c 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -296,10 +296,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor { reactor := pex.NewReactor(log.TestingLogger(), peerManager, pexCh, peerUpdates) require.NoError(t, reactor.Start(ctx)) - t.Cleanup(func() { - peerUpdates.Close() - reactor.Wait() - }) + t.Cleanup(reactor.Wait) return &singleTestReactor{ reactor: reactor, @@ -396,15 +393,11 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT require.Len(t, rts.reactors, realNodes) t.Cleanup(func() { - for nodeID, reactor := range rts.reactors { + for _, reactor := range rts.reactors { if reactor.IsRunning() { reactor.Wait() require.False(t, reactor.IsRunning()) } - rts.peerUpdates[nodeID].Close() - } - for _, nodeID := range rts.mocks { - rts.peerUpdates[nodeID].Close() } }) @@ -542,7 +535,6 @@ func (r *reactorTestSuite) listenForPeerUpdate( ) { on, with := r.checkNodePair(t, onNode, withNode) sub := r.network.Nodes[on].PeerManager.Subscribe(ctx) - defer sub.Close() timesUp := time.After(waitPeriod) for { select { @@ -649,9 +641,7 @@ func (r *reactorTestSuite) connectPeers(ctx context.Context, t *testing.T, sourc } sourceSub := n1.PeerManager.Subscribe(ctx) - defer sourceSub.Close() targetSub := n2.PeerManager.Subscribe(ctx) - defer targetSub.Close() sourceAddress := n1.NodeAddress r.logger.Debug("source address", "address", sourceAddress) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 8f751ec6a..87842bee6 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -758,7 +758,7 @@ func (r *Router) runWithPeerMutex(fn func() error) error { // they are closed elsewhere it will cause this method to shut down and return. func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connection, channels channelIDs) { r.metrics.Peers.Add(1) - r.peerManager.Ready(peerID) + r.peerManager.Ready(ctx, peerID) sendQueue := r.getOrMakeQueue(peerID, channels) defer func() { @@ -769,7 +769,7 @@ func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connec sendQueue.close() - r.peerManager.Disconnected(peerID) + r.peerManager.Disconnected(ctx, peerID) r.metrics.Peers.Add(-1) }() diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index a561f68cd..1a59a0239 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -409,7 +409,6 @@ func TestRouter_AcceptPeers(t *testing.T) { require.NoError(t, err) sub := peerManager.Subscribe(ctx) - defer sub.Close() router, err := p2p.NewRouter( ctx, @@ -433,7 +432,6 @@ func TestRouter_AcceptPeers(t *testing.T) { // force a context switch so that the // connection is handled. time.Sleep(time.Millisecond) - sub.Close() } else { select { case <-closer.Done(): @@ -659,7 +657,6 @@ func TestRouter_DialPeers(t *testing.T) { require.NoError(t, err) require.True(t, added) sub := peerManager.Subscribe(ctx) - defer sub.Close() router, err := p2p.NewRouter( ctx, @@ -683,7 +680,6 @@ func TestRouter_DialPeers(t *testing.T) { // force a context switch so that the // connection is handled. time.Sleep(time.Millisecond) - sub.Close() } else { select { case <-closer.Done(): @@ -822,7 +818,6 @@ func TestRouter_EvictPeers(t *testing.T) { require.NoError(t, err) sub := peerManager.Subscribe(ctx) - defer sub.Close() router, err := p2p.NewRouter( ctx, @@ -850,7 +845,6 @@ func TestRouter_EvictPeers(t *testing.T) { NodeID: peerInfo.NodeID, Status: p2p.PeerStatusDown, }) - sub.Close() require.NoError(t, router.Stop()) mockTransport.AssertExpectations(t) @@ -943,7 +937,6 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) { require.NoError(t, err) sub := peerManager.Subscribe(ctx) - defer sub.Close() router, err := p2p.NewRouter( ctx, diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index f6eac2a97..8cac68891 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -225,8 +225,6 @@ func (r *Reactor) OnStart(ctx context.Context) error { func (r *Reactor) OnStop() { // tell the dispatcher to stop sending any more requests r.dispatcher.Close() - - <-r.peerUpdates.Done() } // Sync runs a state sync, fetching snapshots and providing chunks to the @@ -865,8 +863,6 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. func (r *Reactor) processPeerUpdates(ctx context.Context) { - defer r.peerUpdates.Close() - for { select { case <-ctx.Done():