|
|
@ -487,7 +487,6 @@ func (r *reactorTestSuite) listenFor( |
|
|
|
} |
|
|
|
|
|
|
|
func (r *reactorTestSuite) listenForRequest(ctx context.Context, t *testing.T, fromNode, toNode int, waitPeriod time.Duration) { |
|
|
|
r.logger.Info("Listening for request", "from", fromNode, "to", toNode) |
|
|
|
to, from := r.checkNodePair(t, toNode, fromNode) |
|
|
|
conditional := func(msg *p2p.Envelope) bool { |
|
|
|
_, ok := msg.Message.(*p2pproto.PexRequest) |
|
|
@ -508,7 +507,7 @@ func (r *reactorTestSuite) pingAndlistenForNAddresses( |
|
|
|
addresses int, |
|
|
|
) { |
|
|
|
t.Helper() |
|
|
|
r.logger.Info("Listening for addresses", "from", fromNode, "to", toNode) |
|
|
|
|
|
|
|
to, from := r.checkNodePair(t, toNode, fromNode) |
|
|
|
conditional := func(msg *p2p.Envelope) bool { |
|
|
|
_, ok := msg.Message.(*p2pproto.PexResponse) |
|
|
@ -541,11 +540,9 @@ func (r *reactorTestSuite) listenForResponse( |
|
|
|
waitPeriod time.Duration, |
|
|
|
addresses []p2pproto.PexAddress, |
|
|
|
) { |
|
|
|
r.logger.Info("Listening for response", "from", fromNode, "to", toNode) |
|
|
|
to, from := r.checkNodePair(t, toNode, fromNode) |
|
|
|
conditional := func(msg *p2p.Envelope) bool { |
|
|
|
_, ok := msg.Message.(*p2pproto.PexResponse) |
|
|
|
r.logger.Info("message", msg, "ok", ok) |
|
|
|
return ok && msg.From == from |
|
|
|
} |
|
|
|
assertion := func(t *testing.T, msg *p2p.Envelope) bool { |
|
|
@ -658,7 +655,6 @@ func (r *reactorTestSuite) connectN(ctx context.Context, t *testing.T, n int) { |
|
|
|
func (r *reactorTestSuite) connectPeers(ctx context.Context, t *testing.T, sourceNode, targetNode int) { |
|
|
|
t.Helper() |
|
|
|
node1, node2 := r.checkNodePair(t, sourceNode, targetNode) |
|
|
|
r.logger.Info("connecting peers", "sourceNode", sourceNode, "targetNode", targetNode) |
|
|
|
|
|
|
|
n1 := r.network.Nodes[node1] |
|
|
|
if n1 == nil { |
|
|
@ -676,16 +672,12 @@ func (r *reactorTestSuite) connectPeers(ctx context.Context, t *testing.T, sourc |
|
|
|
targetSub := n2.PeerManager.Subscribe(ctx) |
|
|
|
|
|
|
|
sourceAddress := n1.NodeAddress |
|
|
|
r.logger.Debug("source address", "address", sourceAddress) |
|
|
|
targetAddress := n2.NodeAddress |
|
|
|
r.logger.Debug("target address", "address", targetAddress) |
|
|
|
|
|
|
|
added, err := n1.PeerManager.Add(targetAddress) |
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
if !added { |
|
|
|
r.logger.Debug("nodes already know about one another", |
|
|
|
"sourceNode", sourceNode, "targetNode", targetNode) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
@ -695,19 +687,16 @@ func (r *reactorTestSuite) connectPeers(ctx context.Context, t *testing.T, sourc |
|
|
|
NodeID: node1, |
|
|
|
Status: p2p.PeerStatusUp, |
|
|
|
}, peerUpdate) |
|
|
|
r.logger.Debug("target connected with source") |
|
|
|
case <-time.After(2 * time.Second): |
|
|
|
require.Fail(t, "timed out waiting for peer", "%v accepting %v", |
|
|
|
targetNode, sourceNode) |
|
|
|
} |
|
|
|
|
|
|
|
select { |
|
|
|
case peerUpdate := <-sourceSub.Updates(): |
|
|
|
require.Equal(t, p2p.PeerUpdate{ |
|
|
|
NodeID: node2, |
|
|
|
Status: p2p.PeerStatusUp, |
|
|
|
}, peerUpdate) |
|
|
|
r.logger.Debug("source connected with target") |
|
|
|
case <-time.After(2 * time.Second): |
|
|
|
require.Fail(t, "timed out waiting for peer", "%v dialing %v", |
|
|
|
sourceNode, targetNode) |
|
|
|