Browse Source

service: cleanup mempool and peer update shutdown (#7401)

pull/7409/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
892f5d9524
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 110 additions and 186 deletions
  1. +0
    -4
      internal/blocksync/reactor.go
  2. +1
    -3
      internal/blocksync/reactor_test.go
  3. +0
    -4
      internal/consensus/reactor.go
  4. +0
    -7
      internal/evidence/reactor.go
  5. +2
    -2
      internal/evidence/reactor_test.go
  6. +9
    -38
      internal/mempool/reactor.go
  7. +11
    -5
      internal/mempool/reactor_test.go
  8. +7
    -10
      internal/p2p/p2ptest/network.go
  9. +0
    -6
      internal/p2p/p2ptest/require.go
  10. +20
    -40
      internal/p2p/peermanager.go
  11. +2
    -4
      internal/p2p/peermanager_scoring_test.go
  12. +54
    -37
      internal/p2p/peermanager_test.go
  13. +0
    -1
      internal/p2p/pex/reactor.go
  14. +2
    -12
      internal/p2p/pex/reactor_test.go
  15. +2
    -2
      internal/p2p/router.go
  16. +0
    -7
      internal/p2p/router_test.go
  17. +0
    -4
      internal/statesync/reactor.go

+ 0
- 4
internal/blocksync/reactor.go View File

@ -181,8 +181,6 @@ func (r *Reactor) OnStop() {
// wait for the poolRoutine and requestRoutine goroutines to gracefully exit
r.poolWG.Wait()
<-r.peerUpdates.Done()
}
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
@ -334,8 +332,6 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
defer r.peerUpdates.Close()
for {
select {
case <-ctx.Done():


+ 1
- 3
internal/blocksync/reactor_test.go View File

@ -82,8 +82,6 @@ func setup(
t.Cleanup(func() {
cancel()
for _, nodeID := range rts.nodes {
rts.peerUpdates[nodeID].Close()
if rts.reactors[nodeID].IsRunning() {
rts.reactors[nodeID].Wait()
rts.app[nodeID].Wait()
@ -228,7 +226,7 @@ func TestReactor_AbruptDisconnect(t *testing.T) {
Status: p2p.PeerStatusDown,
NodeID: rts.nodes[0],
}
rts.network.Nodes[rts.nodes[1]].PeerManager.Disconnected(rts.nodes[0])
rts.network.Nodes[rts.nodes[1]].PeerManager.Disconnected(ctx, rts.nodes[0])
}
func TestReactor_SyncTime(t *testing.T) {


+ 0
- 4
internal/consensus/reactor.go View File

@ -220,8 +220,6 @@ func (r *Reactor) OnStop() {
state.broadcastWG.Wait()
}
r.mtx.Unlock()
<-r.peerUpdates.Done()
}
// SetEventBus sets the reactor's event bus.
@ -1406,8 +1404,6 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context) {
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
defer r.peerUpdates.Close()
for {
select {
case <-ctx.Done():


+ 0
- 7
internal/evidence/reactor.go View File

@ -102,11 +102,6 @@ func (r *Reactor) OnStop() {
// exit.
r.peerWG.Wait()
// Wait for all p2p Channels to be closed before returning. This ensures we
// can easily reason about synchronization of all p2p Channels and ensure no
// panics will occur.
<-r.peerUpdates.Done()
// Close the evidence db
r.evpool.Close()
}
@ -251,8 +246,6 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
defer r.peerUpdates.Close()
for {
select {
case peerUpdate := <-r.peerUpdates.Updates():


+ 2
- 2
internal/evidence/reactor_test.go View File

@ -257,11 +257,11 @@ func TestReactorMultiDisconnect(t *testing.T) {
// Ensure "disconnecting" the secondary peer from the primary more than once
// is handled gracefully.
primary.PeerManager.Disconnected(secondary.NodeID)
primary.PeerManager.Disconnected(ctx, secondary.NodeID)
require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusDown)
_, err := primary.PeerManager.TryEvictNext()
require.NoError(t, err)
primary.PeerManager.Disconnected(secondary.NodeID)
primary.PeerManager.Disconnected(ctx, secondary.NodeID)
require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusDown)
require.Equal(t, secondary.PeerManager.Status(primary.NodeID), p2p.PeerStatusUp)


+ 9
- 38
internal/mempool/reactor.go View File

@ -48,7 +48,6 @@ type Reactor struct {
mempoolCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
// peerWG is used to coordinate graceful termination of all peer broadcasting
// goroutines.
@ -80,7 +79,6 @@ func NewReactor(
ids: NewMempoolIDs(),
mempoolCh: mempoolCh,
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
peerRoutines: make(map[types.NodeID]*tmsync.Closer),
observePanic: defaultObservePanic,
}
@ -136,19 +134,13 @@ func (r *Reactor) OnStop() {
// wait for all spawned peer tx broadcasting goroutines to gracefully exit
r.peerWG.Wait()
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
// p2p Channels should execute Close().
close(r.closeCh)
<-r.peerUpdates.Done()
}
// handleMempoolMessage handles envelopes sent from peers on the MempoolChannel.
// For every tx in the message, we execute CheckTx. It returns an error if an
// empty set of txs are sent in an envelope or if we receive an unexpected
// message type.
func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error {
func (r *Reactor) handleMempoolMessage(ctx context.Context, envelope p2p.Envelope) error {
logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
@ -164,7 +156,7 @@ func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error {
}
for _, tx := range protoTxs {
if err := r.mempool.CheckTx(context.Background(), types.Tx(tx), nil, txInfo); err != nil {
if err := r.mempool.CheckTx(ctx, types.Tx(tx), nil, txInfo); err != nil {
logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", types.Tx(tx).Hash()), "err", err)
}
}
@ -179,7 +171,7 @@ func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error {
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
defer func() {
if e := recover(); e != nil {
r.observePanic(e)
@ -196,7 +188,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
switch chID {
case MempoolChannel:
err = r.handleMempoolMessage(envelope)
err = r.handleMempoolMessage(ctx, envelope)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%T)", chID, envelope.Message)
@ -211,7 +203,7 @@ func (r *Reactor) processMempoolCh(ctx context.Context) {
for {
select {
case envelope := <-r.mempoolCh.In:
if err := r.handleMessage(r.mempoolCh.ID, envelope); err != nil {
if err := r.handleMessage(ctx, r.mempoolCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.mempoolCh.ID, "envelope", envelope, "err", err)
r.mempoolCh.Error <- p2p.PeerError{
NodeID: envelope.From,
@ -219,8 +211,6 @@ func (r *Reactor) processMempoolCh(ctx context.Context) {
}
}
case <-ctx.Done():
return
case <-r.closeCh:
r.logger.Debug("stopped listening on mempool channel; closing...")
return
}
@ -242,8 +232,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
case p2p.PeerStatusUp:
// Do not allow starting new tx broadcast loops after reactor shutdown
// has been initiated. This can happen after we've manually closed all
// peer broadcast loops and closed r.closeCh, but the router still sends
// in-flight peer updates.
// peer broadcast, but the router still sends in-flight peer updates.
if !r.IsRunning() {
return
}
@ -285,18 +274,13 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
defer r.peerUpdates.Close()
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on peer updates channel; closing...")
return
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
case <-r.closeCh:
r.logger.Debug("stopped listening on peer updates channel; closing...")
return
}
}
}
@ -333,6 +317,8 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, c
// start from the beginning.
if nextGossipTx == nil {
select {
case <-ctx.Done():
return
case <-r.mempool.WaitForNextTx(): // wait until a tx is available
if nextGossipTx = r.mempool.NextGossipTx(); nextGossipTx == nil {
continue
@ -342,14 +328,6 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, c
// The peer is marked for removal via a PeerUpdate as the doneCh was
// explicitly closed to signal we should exit.
return
case <-ctx.Done():
return
case <-r.closeCh:
// The reactor has signaled that we are stopped and thus we should
// implicitly exit this peer's goroutine.
return
}
}
@ -388,19 +366,12 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, c
select {
case <-nextGossipTx.NextWaitChan():
nextGossipTx = nextGossipTx.Next()
case <-closer.Done():
// The peer is marked for removal via a PeerUpdate as the doneCh was
// explicitly closed to signal we should exit.
return
case <-ctx.Done():
return
case <-r.closeCh:
// The reactor has signaled that we are stopped and thus we should
// implicitly exit this peer's goroutine.
return
}
}
}

+ 11
- 5
internal/mempool/reactor_test.go View File

@ -64,7 +64,7 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint)
mempool := setup(ctx, t, 0)
rts.mempools[nodeID] = mempool
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
@ -102,6 +102,7 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint)
func (rts *reactorTestSuite) start(ctx context.Context, t *testing.T) {
t.Helper()
rts.network.Start(ctx, t)
require.Len(t,
rts.network.RandomNode().PeerManager.Peers(),
len(rts.nodes)-1,
@ -126,13 +127,17 @@ func (rts *reactorTestSuite) waitForTxns(t *testing.T, txs []types.Tx, ids ...ty
if !p2ptest.NodeInSlice(name, ids) {
continue
}
if len(txs) == pool.Size() {
continue
}
wg.Add(1)
go func(pool *TxMempool) {
defer wg.Done()
require.Eventually(t, func() bool { return len(txs) == pool.Size() },
time.Minute,
100*time.Millisecond,
250*time.Millisecond,
"ntx=%d, size=%d", len(txs), pool.Size(),
)
}(pool)
}
@ -191,14 +196,15 @@ func TestReactorBroadcastTxs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, numNodes, 0)
rts := setupReactors(ctx, t, numNodes, uint(numTxs))
primary := rts.nodes[0]
secondaries := rts.nodes[1:]
txs := checkTxs(ctx, t, rts.reactors[primary].mempool, numTxs, UnknownPeerID)
// run the router
require.Equal(t, numTxs, rts.reactors[primary].mempool.Size())
rts.start(ctx, t)
// Wait till all secondary suites (reactor) received all mempool txs from the
@ -407,7 +413,7 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, 2, 0)
rts := setupReactors(ctx, t, 2, 2)
primary := rts.nodes[0]
secondary := rts.nodes[1]


+ 7
- 10
internal/p2p/p2ptest/network.go View File

@ -76,10 +76,11 @@ func (n *Network) Start(ctx context.Context, t *testing.T) {
// for each node.
dialQueue := []p2p.NodeAddress{}
subs := map[types.NodeID]*p2p.PeerUpdates{}
subctx, subcancel := context.WithCancel(ctx)
defer subcancel()
for _, node := range n.Nodes {
dialQueue = append(dialQueue, node.NodeAddress)
subs[node.NodeID] = node.PeerManager.Subscribe(ctx)
defer subs[node.NodeID].Close()
subs[node.NodeID] = node.PeerManager.Subscribe(subctx)
}
// For each node, dial the nodes that it still doesn't have a connection to
@ -197,9 +198,10 @@ func (n *Network) Remove(ctx context.Context, t *testing.T, id types.NodeID) {
delete(n.Nodes, id)
subs := []*p2p.PeerUpdates{}
subctx, subcancel := context.WithCancel(ctx)
defer subcancel()
for _, peer := range n.Nodes {
sub := peer.PeerManager.Subscribe(ctx)
defer sub.Close()
sub := peer.PeerManager.Subscribe(subctx)
subs = append(subs, sub)
}
@ -329,7 +331,6 @@ func (n *Node) MakePeerUpdates(ctx context.Context, t *testing.T) *p2p.PeerUpdat
sub := n.PeerManager.Subscribe(ctx)
t.Cleanup(func() {
RequireNoUpdates(ctx, t, sub)
sub.Close()
})
return sub
@ -339,11 +340,7 @@ func (n *Node) MakePeerUpdates(ctx context.Context, t *testing.T) *p2p.PeerUpdat
// It does *not* check that all updates have been consumed, but will
// close the update channel.
func (n *Node) MakePeerUpdatesNoRequireEmpty(ctx context.Context, t *testing.T) *p2p.PeerUpdates {
sub := n.PeerManager.Subscribe(ctx)
t.Cleanup(sub.Close)
return sub
return n.PeerManager.Subscribe(ctx)
}
func MakeChannelDesc(chID p2p.ChannelID) *p2p.ChannelDescriptor {


+ 0
- 6
internal/p2p/p2ptest/require.go View File

@ -119,9 +119,6 @@ func RequireUpdate(t *testing.T, peerUpdates *p2p.PeerUpdates, expect p2p.PeerUp
case update := <-peerUpdates.Updates():
require.Equal(t, expect, update, "peer update did not match")
case <-peerUpdates.Done():
require.Fail(t, "peer updates subscription is closed")
case <-timer.C:
require.Fail(t, "timed out waiting for peer update", "expected %v", expect)
}
@ -143,9 +140,6 @@ func RequireUpdates(t *testing.T, peerUpdates *p2p.PeerUpdates, expect []p2p.Pee
return
}
case <-peerUpdates.Done():
require.Fail(t, "peer updates subscription is closed")
case <-timer.C:
require.Equal(t, expect, actual, "did not receive expected peer updates")
return


+ 20
- 40
internal/p2p/peermanager.go View File

@ -56,8 +56,6 @@ type PeerUpdate struct {
type PeerUpdates struct {
routerUpdatesCh chan PeerUpdate
reactorUpdatesCh chan PeerUpdate
closeOnce sync.Once
doneCh chan struct{}
}
// NewPeerUpdates creates a new PeerUpdates subscription. It is primarily for
@ -67,7 +65,6 @@ func NewPeerUpdates(updatesCh chan PeerUpdate, buf int) *PeerUpdates {
return &PeerUpdates{
reactorUpdatesCh: updatesCh,
routerUpdatesCh: make(chan PeerUpdate, buf),
doneCh: make(chan struct{}),
}
}
@ -76,21 +73,6 @@ func (pu *PeerUpdates) Updates() <-chan PeerUpdate {
return pu.reactorUpdatesCh
}
// Done returns a channel that is closed when the subscription is closed.
func (pu *PeerUpdates) Done() <-chan struct{} {
return pu.doneCh
}
// Close closes the peer updates subscription.
func (pu *PeerUpdates) Close() {
pu.closeOnce.Do(func() {
// NOTE: We don't close updatesCh since multiple goroutines may be
// sending on it. The PeerManager senders will select on doneCh as well
// to avoid blocking on a closed subscription.
close(pu.doneCh)
})
}
// SendUpdate pushes information about a peer into the routing layer,
// presumably from a peer.
func (pu *PeerUpdates) SendUpdate(ctx context.Context, update PeerUpdate) {
@ -692,13 +674,13 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error {
// peer must already be marked as connected. This is separate from Dialed() and
// Accepted() to allow the router to set up its internal queues before reactors
// start sending messages.
func (m *PeerManager) Ready(peerID types.NodeID) {
func (m *PeerManager) Ready(ctx context.Context, peerID types.NodeID) {
m.mtx.Lock()
defer m.mtx.Unlock()
if m.connected[peerID] {
m.ready[peerID] = true
m.broadcast(PeerUpdate{
m.broadcast(ctx, PeerUpdate{
NodeID: peerID,
Status: PeerStatusUp,
})
@ -759,7 +741,7 @@ func (m *PeerManager) TryEvictNext() (types.NodeID, error) {
// Disconnected unmarks a peer as connected, allowing it to be dialed or
// accepted again as appropriate.
func (m *PeerManager) Disconnected(peerID types.NodeID) {
func (m *PeerManager) Disconnected(ctx context.Context, peerID types.NodeID) {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -772,7 +754,7 @@ func (m *PeerManager) Disconnected(peerID types.NodeID) {
delete(m.ready, peerID)
if ready {
m.broadcast(PeerUpdate{
m.broadcast(ctx, PeerUpdate{
NodeID: peerID,
Status: PeerStatusDown,
})
@ -854,8 +836,8 @@ func (m *PeerManager) Subscribe(ctx context.Context) *PeerUpdates {
// otherwise the PeerManager will halt.
func (m *PeerManager) Register(ctx context.Context, peerUpdates *PeerUpdates) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.subscriptions[peerUpdates] = peerUpdates
m.mtx.Unlock()
go func() {
for {
@ -863,26 +845,27 @@ func (m *PeerManager) Register(ctx context.Context, peerUpdates *PeerUpdates) {
case <-ctx.Done():
return
case pu := <-peerUpdates.routerUpdatesCh:
m.processPeerEvent(pu)
m.processPeerEvent(ctx, pu)
}
}
}()
go func() {
select {
case <-peerUpdates.Done():
m.mtx.Lock()
delete(m.subscriptions, peerUpdates)
m.mtx.Unlock()
case <-ctx.Done():
}
<-ctx.Done()
m.mtx.Lock()
defer m.mtx.Unlock()
delete(m.subscriptions, peerUpdates)
}()
}
func (m *PeerManager) processPeerEvent(pu PeerUpdate) {
func (m *PeerManager) processPeerEvent(ctx context.Context, pu PeerUpdate) {
m.mtx.Lock()
defer m.mtx.Unlock()
if ctx.Err() != nil {
return
}
if _, ok := m.store.peers[pu.NodeID]; !ok {
m.store.peers[pu.NodeID] = &peerInfo{}
}
@ -902,18 +885,15 @@ func (m *PeerManager) processPeerEvent(pu PeerUpdate) {
//
// FIXME: Consider using an internal channel to buffer updates while also
// maintaining order if this is a problem.
func (m *PeerManager) broadcast(peerUpdate PeerUpdate) {
func (m *PeerManager) broadcast(ctx context.Context, peerUpdate PeerUpdate) {
for _, sub := range m.subscriptions {
// We have to check doneChan separately first, otherwise there's a 50%
// chance the second select will send on a closed subscription.
select {
case <-sub.doneCh:
continue
default:
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case sub.reactorUpdatesCh <- peerUpdate:
case <-sub.doneCh:
}
}
}


+ 2
- 4
internal/p2p/peermanager_scoring_test.go View File

@ -38,7 +38,7 @@ func TestPeerScoring(t *testing.T) {
// add a bunch of good status updates and watch things increase.
for i := 1; i < 10; i++ {
peerManager.processPeerEvent(PeerUpdate{
peerManager.processPeerEvent(ctx, PeerUpdate{
NodeID: id,
Status: PeerStatusGood,
})
@ -47,7 +47,7 @@ func TestPeerScoring(t *testing.T) {
// watch the corresponding decreases respond to update
for i := 10; i == 0; i-- {
peerManager.processPeerEvent(PeerUpdate{
peerManager.processPeerEvent(ctx, PeerUpdate{
NodeID: id,
Status: PeerStatusBad,
})
@ -57,7 +57,6 @@ func TestPeerScoring(t *testing.T) {
t.Run("AsynchronousIncrement", func(t *testing.T) {
start := peerManager.Scores()[id]
pu := peerManager.Subscribe(ctx)
defer pu.Close()
pu.SendUpdate(ctx, PeerUpdate{
NodeID: id,
Status: PeerStatusGood,
@ -71,7 +70,6 @@ func TestPeerScoring(t *testing.T) {
t.Run("AsynchronousDecrement", func(t *testing.T) {
start := peerManager.Scores()[id]
pu := peerManager.Subscribe(ctx)
defer pu.Close()
pu.SendUpdate(ctx, PeerUpdate{
NodeID: id,
Status: PeerStatusBad,


+ 54
- 37
internal/p2p/peermanager_test.go View File

@ -461,9 +461,11 @@ func TestPeerManager_DialNext_WakeOnDisconnected(t *testing.T) {
require.NoError(t, err)
require.Zero(t, dial)
dctx, dcancel := context.WithTimeout(ctx, 300*time.Millisecond)
defer dcancel()
go func() {
time.Sleep(200 * time.Millisecond)
peerManager.Disconnected(a.NodeID)
peerManager.Disconnected(dctx, a.NodeID)
}()
ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
@ -510,6 +512,9 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) {
}
func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
@ -575,7 +580,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
// Now, if we disconnect a, we should be allowed to dial d because we have a
// free upgrade slot.
peerManager.Disconnected(a.NodeID)
peerManager.Disconnected(ctx, a.NodeID)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, d, dial)
@ -584,7 +589,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
// However, if we disconnect b (such that only c and d are connected), we
// should not be allowed to dial e even though there are upgrade slots,
// because there are no lower-scored nodes that can be upgraded.
peerManager.Disconnected(b.NodeID)
peerManager.Disconnected(ctx, b.NodeID)
added, err = peerManager.Add(e)
require.NoError(t, err)
require.True(t, added)
@ -966,6 +971,9 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) {
}
func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
@ -1005,7 +1013,7 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) {
// In the meanwhile, a disconnects and d connects. d is even lower-scored
// than b (1 vs 2), which is currently being upgraded.
peerManager.Disconnected(a.NodeID)
peerManager.Disconnected(ctx, a.NodeID)
added, err = peerManager.Add(d)
require.NoError(t, err)
require.True(t, added)
@ -1020,6 +1028,9 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) {
}
func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
@ -1055,7 +1066,7 @@ func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) {
require.Equal(t, c, dial)
// In the meanwhile, b disconnects.
peerManager.Disconnected(b.NodeID)
peerManager.Disconnected(ctx, b.NodeID)
// Once c completes the upgrade of b, there is no longer a need to
// evict anything since we're at capacity.
@ -1188,6 +1199,9 @@ func TestPeerManager_Accepted_MaxConnectedUpgrade(t *testing.T) {
}
func TestPeerManager_Accepted_Upgrade(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
@ -1224,7 +1238,7 @@ func TestPeerManager_Accepted_Upgrade(t *testing.T) {
evict, err := peerManager.TryEvictNext()
require.NoError(t, err)
require.Equal(t, a.NodeID, evict)
peerManager.Disconnected(a.NodeID)
peerManager.Disconnected(ctx, a.NodeID)
// c still cannot get accepted, since it's not scored above b.
require.Error(t, peerManager.Accepted(c.NodeID))
@ -1288,7 +1302,6 @@ func TestPeerManager_Ready(t *testing.T) {
require.NoError(t, err)
sub := peerManager.Subscribe(ctx)
defer sub.Close()
// Connecting to a should still have it as status down.
added, err := peerManager.Add(a)
@ -1298,7 +1311,7 @@ func TestPeerManager_Ready(t *testing.T) {
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID))
// Marking a as ready should transition it to PeerStatusUp and send an update.
peerManager.Ready(a.NodeID)
peerManager.Ready(ctx, a.NodeID)
require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
require.Equal(t, p2p.PeerUpdate{
NodeID: a.NodeID,
@ -1310,7 +1323,7 @@ func TestPeerManager_Ready(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
peerManager.Ready(b.NodeID)
peerManager.Ready(ctx, b.NodeID)
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
require.Empty(t, sub.Updates())
}
@ -1329,7 +1342,7 @@ func TestPeerManager_EvictNext(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(ctx, a.NodeID)
// Since there are no peers to evict, EvictNext should block until timeout.
timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
@ -1365,7 +1378,7 @@ func TestPeerManager_EvictNext_WakeOnError(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(ctx, a.NodeID)
// Spawn a goroutine to error a peer after a delay.
go func() {
@ -1400,7 +1413,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(ctx, a.NodeID)
// Spawn a goroutine to upgrade to b with a delay.
go func() {
@ -1441,7 +1454,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(ctx, a.NodeID)
// Spawn a goroutine to upgrade b with a delay.
go func() {
@ -1457,6 +1470,9 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) {
require.Equal(t, a.NodeID, evict)
}
func TestPeerManager_TryEvictNext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
@ -1473,7 +1489,7 @@ func TestPeerManager_TryEvictNext(t *testing.T) {
// Connecting to a won't evict anything either.
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(ctx, a.NodeID)
// But if a errors it should be evicted.
peerManager.Errored(a.NodeID, errors.New("foo"))
@ -1502,10 +1518,9 @@ func TestPeerManager_Disconnected(t *testing.T) {
defer cancel()
sub := peerManager.Subscribe(ctx)
defer sub.Close()
// Disconnecting an unknown peer does nothing.
peerManager.Disconnected(a.NodeID)
peerManager.Disconnected(ctx, a.NodeID)
require.Empty(t, peerManager.Peers())
require.Empty(t, sub.Updates())
@ -1514,14 +1529,14 @@ func TestPeerManager_Disconnected(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Disconnected(a.NodeID)
peerManager.Disconnected(ctx, a.NodeID)
require.Empty(t, sub.Updates())
// Disconnecting a ready peer sends a status update.
_, err = peerManager.Add(a)
require.NoError(t, err)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(ctx, a.NodeID)
require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{
@ -1529,7 +1544,7 @@ func TestPeerManager_Disconnected(t *testing.T) {
Status: p2p.PeerStatusUp,
}, <-sub.Updates())
peerManager.Disconnected(a.NodeID)
peerManager.Disconnected(ctx, a.NodeID)
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID))
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{
@ -1543,13 +1558,16 @@ func TestPeerManager_Disconnected(t *testing.T) {
require.NoError(t, err)
require.Equal(t, a, dial)
peerManager.Disconnected(a.NodeID)
peerManager.Disconnected(ctx, a.NodeID)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
}
func TestPeerManager_Errored(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
@ -1573,7 +1591,7 @@ func TestPeerManager_Errored(t *testing.T) {
require.Zero(t, evict)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(ctx, a.NodeID)
evict, err = peerManager.TryEvictNext()
require.NoError(t, err)
require.Zero(t, evict)
@ -1596,7 +1614,6 @@ func TestPeerManager_Subscribe(t *testing.T) {
// This tests all subscription events for full peer lifecycles.
sub := peerManager.Subscribe(ctx)
defer sub.Close()
added, err := peerManager.Add(a)
require.NoError(t, err)
@ -1607,11 +1624,11 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.NoError(t, peerManager.Accepted(a.NodeID))
require.Empty(t, sub.Updates())
peerManager.Ready(a.NodeID)
peerManager.Ready(ctx, a.NodeID)
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
peerManager.Disconnected(a.NodeID)
peerManager.Disconnected(ctx, a.NodeID)
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
@ -1624,7 +1641,7 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.NoError(t, peerManager.Dialed(a))
require.Empty(t, sub.Updates())
peerManager.Ready(a.NodeID)
peerManager.Ready(ctx, a.NodeID)
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
@ -1635,7 +1652,7 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.NoError(t, err)
require.Equal(t, a.NodeID, evict)
peerManager.Disconnected(a.NodeID)
peerManager.Disconnected(ctx, a.NodeID)
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
@ -1659,7 +1676,6 @@ func TestPeerManager_Subscribe_Close(t *testing.T) {
require.NoError(t, err)
sub := peerManager.Subscribe(ctx)
defer sub.Close()
added, err := peerManager.Add(a)
require.NoError(t, err)
@ -1667,13 +1683,13 @@ func TestPeerManager_Subscribe_Close(t *testing.T) {
require.NoError(t, peerManager.Accepted(a.NodeID))
require.Empty(t, sub.Updates())
peerManager.Ready(a.NodeID)
peerManager.Ready(ctx, a.NodeID)
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
// Closing the subscription should not send us the disconnected update.
sub.Close()
peerManager.Disconnected(a.NodeID)
cancel()
peerManager.Disconnected(ctx, a.NodeID)
require.Empty(t, sub.Updates())
}
@ -1688,19 +1704,19 @@ func TestPeerManager_Subscribe_Broadcast(t *testing.T) {
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
s2ctx, s2cancel := context.WithCancel(ctx)
defer s2cancel()
s1 := peerManager.Subscribe(ctx)
defer s1.Close()
s2 := peerManager.Subscribe(ctx)
defer s2.Close()
s2 := peerManager.Subscribe(s2ctx)
s3 := peerManager.Subscribe(ctx)
defer s3.Close()
// Connecting to a peer should send updates on all subscriptions.
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(ctx, a.NodeID)
expectUp := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}
require.NotEmpty(t, s1)
@ -1712,8 +1728,9 @@ func TestPeerManager_Subscribe_Broadcast(t *testing.T) {
// We now close s2. Disconnecting the peer should only send updates
// on s1 and s3.
s2.Close()
peerManager.Disconnected(a.NodeID)
s2cancel()
time.Sleep(250 * time.Millisecond) // give the thread a chance to exit
peerManager.Disconnected(ctx, a.NodeID)
expectDown := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}
require.NotEmpty(t, s1)


+ 0
- 1
internal/p2p/pex/reactor.go View File

@ -185,7 +185,6 @@ func (r *Reactor) processPexCh(ctx context.Context) {
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
defer r.peerUpdates.Close()
for {
select {
case <-ctx.Done():


+ 2
- 12
internal/p2p/pex/reactor_test.go View File

@ -296,10 +296,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor {
reactor := pex.NewReactor(log.TestingLogger(), peerManager, pexCh, peerUpdates)
require.NoError(t, reactor.Start(ctx))
t.Cleanup(func() {
peerUpdates.Close()
reactor.Wait()
})
t.Cleanup(reactor.Wait)
return &singleTestReactor{
reactor: reactor,
@ -396,15 +393,11 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
require.Len(t, rts.reactors, realNodes)
t.Cleanup(func() {
for nodeID, reactor := range rts.reactors {
for _, reactor := range rts.reactors {
if reactor.IsRunning() {
reactor.Wait()
require.False(t, reactor.IsRunning())
}
rts.peerUpdates[nodeID].Close()
}
for _, nodeID := range rts.mocks {
rts.peerUpdates[nodeID].Close()
}
})
@ -542,7 +535,6 @@ func (r *reactorTestSuite) listenForPeerUpdate(
) {
on, with := r.checkNodePair(t, onNode, withNode)
sub := r.network.Nodes[on].PeerManager.Subscribe(ctx)
defer sub.Close()
timesUp := time.After(waitPeriod)
for {
select {
@ -649,9 +641,7 @@ func (r *reactorTestSuite) connectPeers(ctx context.Context, t *testing.T, sourc
}
sourceSub := n1.PeerManager.Subscribe(ctx)
defer sourceSub.Close()
targetSub := n2.PeerManager.Subscribe(ctx)
defer targetSub.Close()
sourceAddress := n1.NodeAddress
r.logger.Debug("source address", "address", sourceAddress)


+ 2
- 2
internal/p2p/router.go View File

@ -758,7 +758,7 @@ func (r *Router) runWithPeerMutex(fn func() error) error {
// they are closed elsewhere it will cause this method to shut down and return.
func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connection, channels channelIDs) {
r.metrics.Peers.Add(1)
r.peerManager.Ready(peerID)
r.peerManager.Ready(ctx, peerID)
sendQueue := r.getOrMakeQueue(peerID, channels)
defer func() {
@ -769,7 +769,7 @@ func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connec
sendQueue.close()
r.peerManager.Disconnected(peerID)
r.peerManager.Disconnected(ctx, peerID)
r.metrics.Peers.Add(-1)
}()


+ 0
- 7
internal/p2p/router_test.go View File

@ -409,7 +409,6 @@ func TestRouter_AcceptPeers(t *testing.T) {
require.NoError(t, err)
sub := peerManager.Subscribe(ctx)
defer sub.Close()
router, err := p2p.NewRouter(
ctx,
@ -433,7 +432,6 @@ func TestRouter_AcceptPeers(t *testing.T) {
// force a context switch so that the
// connection is handled.
time.Sleep(time.Millisecond)
sub.Close()
} else {
select {
case <-closer.Done():
@ -659,7 +657,6 @@ func TestRouter_DialPeers(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
sub := peerManager.Subscribe(ctx)
defer sub.Close()
router, err := p2p.NewRouter(
ctx,
@ -683,7 +680,6 @@ func TestRouter_DialPeers(t *testing.T) {
// force a context switch so that the
// connection is handled.
time.Sleep(time.Millisecond)
sub.Close()
} else {
select {
case <-closer.Done():
@ -822,7 +818,6 @@ func TestRouter_EvictPeers(t *testing.T) {
require.NoError(t, err)
sub := peerManager.Subscribe(ctx)
defer sub.Close()
router, err := p2p.NewRouter(
ctx,
@ -850,7 +845,6 @@ func TestRouter_EvictPeers(t *testing.T) {
NodeID: peerInfo.NodeID,
Status: p2p.PeerStatusDown,
})
sub.Close()
require.NoError(t, router.Stop())
mockTransport.AssertExpectations(t)
@ -943,7 +937,6 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
require.NoError(t, err)
sub := peerManager.Subscribe(ctx)
defer sub.Close()
router, err := p2p.NewRouter(
ctx,


+ 0
- 4
internal/statesync/reactor.go View File

@ -225,8 +225,6 @@ func (r *Reactor) OnStart(ctx context.Context) error {
func (r *Reactor) OnStop() {
// tell the dispatcher to stop sending any more requests
r.dispatcher.Close()
<-r.peerUpdates.Done()
}
// Sync runs a state sync, fetching snapshots and providing chunks to the
@ -865,8 +863,6 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
defer r.peerUpdates.Close()
for {
select {
case <-ctx.Done():


Loading…
Cancel
Save