You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

817 lines
22 KiB

  1. package pex_test
  2. import (
  3. "context"
  4. "strings"
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/require"
  8. dbm "github.com/tendermint/tm-db"
  9. "github.com/tendermint/tendermint/crypto/ed25519"
  10. "github.com/tendermint/tendermint/libs/log"
  11. "github.com/tendermint/tendermint/p2p"
  12. "github.com/tendermint/tendermint/p2p/p2ptest"
  13. "github.com/tendermint/tendermint/p2p/pex"
  14. proto "github.com/tendermint/tendermint/proto/tendermint/p2p"
  15. )
  16. const (
  17. checkFrequency = 500 * time.Millisecond
  18. defaultBufferSize = 2
  19. shortWait = 10 * time.Second
  20. longWait = 60 * time.Second
  21. firstNode = 0
  22. secondNode = 1
  23. thirdNode = 2
  24. fourthNode = 3
  25. )
  26. func TestReactorBasic(t *testing.T) {
  27. // start a network with one mock reactor and one "real" reactor
  28. testNet := setupNetwork(t, testOptions{
  29. MockNodes: 1,
  30. TotalNodes: 2,
  31. })
  32. testNet.connectAll(t)
  33. testNet.start(t)
  34. // assert that the mock node receives a request from the real node
  35. testNet.listenForRequest(t, secondNode, firstNode, shortWait)
  36. // assert that when a mock node sends a request it receives a response (and
  37. // the correct one)
  38. testNet.sendRequest(t, firstNode, secondNode, true)
  39. testNet.listenForResponse(t, secondNode, firstNode, shortWait, []proto.PexAddressV2(nil))
  40. }
  41. func TestReactorConnectFullNetwork(t *testing.T) {
  42. testNet := setupNetwork(t, testOptions{
  43. TotalNodes: 8,
  44. })
  45. // make every node be only connected with one other node (it actually ends up
  46. // being two because of two way connections but oh well)
  47. testNet.connectN(t, 1)
  48. testNet.start(t)
  49. // assert that all nodes add each other in the network
  50. for idx := 0; idx < len(testNet.nodes); idx++ {
  51. testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1, longWait)
  52. }
  53. }
  54. func TestReactorSendsRequestsTooOften(t *testing.T) {
  55. r := setupSingle(t)
  56. badNode := newNodeID(t, "b")
  57. r.pexInCh <- p2p.Envelope{
  58. From: badNode,
  59. Message: &proto.PexRequestV2{},
  60. }
  61. resp := <-r.pexOutCh
  62. msg, ok := resp.Message.(*proto.PexResponseV2)
  63. require.True(t, ok)
  64. require.Empty(t, msg.Addresses)
  65. r.pexInCh <- p2p.Envelope{
  66. From: badNode,
  67. Message: &proto.PexRequestV2{},
  68. }
  69. peerErr := <-r.pexErrCh
  70. require.Error(t, peerErr.Err)
  71. require.Empty(t, r.pexOutCh)
  72. require.Contains(t, peerErr.Err.Error(), "peer sent a request too close after a prior one")
  73. require.Equal(t, badNode, peerErr.NodeID)
  74. }
  75. func TestReactorSendsResponseWithoutRequest(t *testing.T) {
  76. testNet := setupNetwork(t, testOptions{
  77. MockNodes: 1,
  78. TotalNodes: 3,
  79. })
  80. testNet.connectAll(t)
  81. testNet.start(t)
  82. // firstNode sends the secondNode an unrequested response
  83. // NOTE: secondNode will send a request by default during startup so we send
  84. // two responses to counter that.
  85. testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode}, true)
  86. testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode}, true)
  87. // secondNode should evict the firstNode
  88. testNet.listenForPeerUpdate(t, secondNode, firstNode, p2p.PeerStatusDown, shortWait)
  89. }
  90. func TestReactorNeverSendsTooManyPeers(t *testing.T) {
  91. testNet := setupNetwork(t, testOptions{
  92. MockNodes: 1,
  93. TotalNodes: 2,
  94. })
  95. testNet.connectAll(t)
  96. testNet.start(t)
  97. testNet.addNodes(t, 110)
  98. nodes := make([]int, 110)
  99. for i := 0; i < len(nodes); i++ {
  100. nodes[i] = i + 2
  101. }
  102. testNet.addAddresses(t, secondNode, nodes)
  103. // first we check that even although we have 110 peers, honest pex reactors
  104. // only send 100 (test if secondNode sends firstNode 100 addresses)
  105. testNet.pingAndlistenForNAddresses(t, secondNode, firstNode, shortWait, 100)
  106. }
  107. func TestReactorErrorsOnReceivingTooManyPeers(t *testing.T) {
  108. r := setupSingle(t)
  109. peer := p2p.NodeAddress{Protocol: p2p.MemoryProtocol, NodeID: randomNodeID(t)}
  110. added, err := r.manager.Add(peer)
  111. require.NoError(t, err)
  112. require.True(t, added)
  113. addresses := make([]proto.PexAddressV2, 101)
  114. for i := 0; i < len(addresses); i++ {
  115. nodeAddress := p2p.NodeAddress{Protocol: p2p.MemoryProtocol, NodeID: randomNodeID(t)}
  116. addresses[i] = proto.PexAddressV2{
  117. URL: nodeAddress.String(),
  118. }
  119. }
  120. r.peerCh <- p2p.PeerUpdate{
  121. NodeID: peer.NodeID,
  122. Status: p2p.PeerStatusUp,
  123. }
  124. select {
  125. // wait for a request and then send a response with too many addresses
  126. case req := <-r.pexOutCh:
  127. if _, ok := req.Message.(*proto.PexRequestV2); !ok {
  128. t.Fatal("expected v2 pex request")
  129. }
  130. r.pexInCh <- p2p.Envelope{
  131. From: peer.NodeID,
  132. Message: &proto.PexResponseV2{
  133. Addresses: addresses,
  134. },
  135. }
  136. case <-time.After(10 * time.Second):
  137. t.Fatal("pex failed to send a request within 10 seconds")
  138. }
  139. peerErr := <-r.pexErrCh
  140. require.Error(t, peerErr.Err)
  141. require.Empty(t, r.pexOutCh)
  142. require.Contains(t, peerErr.Err.Error(), "peer sent too many addresses")
  143. require.Equal(t, peer.NodeID, peerErr.NodeID)
  144. }
  145. func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) {
  146. testNet := setupNetwork(t, testOptions{
  147. TotalNodes: 16,
  148. MaxPeers: 8,
  149. MaxConnected: 6,
  150. BufferSize: 8,
  151. })
  152. testNet.connectN(t, 1)
  153. testNet.start(t)
  154. // test that all nodes reach full capacity
  155. for _, nodeID := range testNet.nodes {
  156. require.Eventually(t, func() bool {
  157. // nolint:scopelint
  158. return testNet.network.Nodes[nodeID].PeerManager.PeerRatio() >= 0.9
  159. }, longWait, checkFrequency)
  160. }
  161. }
  162. func TestReactorLargePeerStoreInASmallNetwork(t *testing.T) {
  163. testNet := setupNetwork(t, testOptions{
  164. TotalNodes: 5,
  165. MaxPeers: 50,
  166. MaxConnected: 50,
  167. BufferSize: 5,
  168. })
  169. testNet.connectN(t, 1)
  170. testNet.start(t)
  171. // assert that all nodes add each other in the network
  172. for idx := 0; idx < len(testNet.nodes); idx++ {
  173. testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1, longWait)
  174. }
  175. }
  176. func TestReactorWithNetworkGrowth(t *testing.T) {
  177. testNet := setupNetwork(t, testOptions{
  178. TotalNodes: 5,
  179. BufferSize: 5,
  180. })
  181. testNet.connectAll(t)
  182. testNet.start(t)
  183. // assert that all nodes add each other in the network
  184. for idx := 0; idx < len(testNet.nodes); idx++ {
  185. testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1, shortWait)
  186. }
  187. // now we inject 10 more nodes
  188. testNet.addNodes(t, 10)
  189. for i := 5; i < testNet.total; i++ {
  190. node := testNet.nodes[i]
  191. require.NoError(t, testNet.reactors[node].Start())
  192. require.True(t, testNet.reactors[node].IsRunning())
  193. // we connect all new nodes to a single entry point and check that the
  194. // node can distribute the addresses to all the others
  195. testNet.connectPeers(t, 0, i)
  196. }
  197. require.Len(t, testNet.reactors, 15)
  198. // assert that all nodes add each other in the network
  199. for idx := 0; idx < len(testNet.nodes); idx++ {
  200. testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1, longWait)
  201. }
  202. }
  203. func TestReactorIntegrationWithLegacyHandleRequest(t *testing.T) {
  204. testNet := setupNetwork(t, testOptions{
  205. MockNodes: 1,
  206. TotalNodes: 3,
  207. })
  208. testNet.connectAll(t)
  209. testNet.start(t)
  210. t.Log(testNet.nodes)
  211. // mock node sends a V1 Pex message to the second node
  212. testNet.sendRequest(t, firstNode, secondNode, false)
  213. addrs := testNet.getAddressesFor(t, []int{thirdNode})
  214. testNet.listenForLegacyResponse(t, secondNode, firstNode, shortWait, addrs)
  215. }
  216. func TestReactorIntegrationWithLegacyHandleResponse(t *testing.T) {
  217. testNet := setupNetwork(t, testOptions{
  218. MockNodes: 1,
  219. TotalNodes: 4,
  220. BufferSize: 4,
  221. })
  222. testNet.connectPeers(t, firstNode, secondNode)
  223. testNet.connectPeers(t, firstNode, thirdNode)
  224. testNet.connectPeers(t, firstNode, fourthNode)
  225. testNet.start(t)
  226. testNet.listenForRequest(t, secondNode, firstNode, shortWait)
  227. // send a v1 response instead
  228. testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode, fourthNode}, false)
  229. testNet.requireNumberOfPeers(t, secondNode, len(testNet.nodes)-1, shortWait)
  230. }
  231. type singleTestReactor struct {
  232. reactor *pex.ReactorV2
  233. pexInCh chan p2p.Envelope
  234. pexOutCh chan p2p.Envelope
  235. pexErrCh chan p2p.PeerError
  236. pexCh *p2p.Channel
  237. peerCh chan p2p.PeerUpdate
  238. manager *p2p.PeerManager
  239. }
  240. func setupSingle(t *testing.T) *singleTestReactor {
  241. t.Helper()
  242. nodeID := newNodeID(t, "a")
  243. chBuf := 2
  244. pexInCh := make(chan p2p.Envelope, chBuf)
  245. pexOutCh := make(chan p2p.Envelope, chBuf)
  246. pexErrCh := make(chan p2p.PeerError, chBuf)
  247. pexCh := p2p.NewChannel(
  248. p2p.ChannelID(pex.PexChannel),
  249. new(proto.PexMessage),
  250. pexInCh,
  251. pexOutCh,
  252. pexErrCh,
  253. )
  254. peerCh := make(chan p2p.PeerUpdate, chBuf)
  255. peerUpdates := p2p.NewPeerUpdates(peerCh, chBuf)
  256. peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  257. require.NoError(t, err)
  258. reactor := pex.NewReactorV2(log.TestingLogger(), peerManager, pexCh, peerUpdates)
  259. require.NoError(t, reactor.Start())
  260. t.Cleanup(func() {
  261. err := reactor.Stop()
  262. if err != nil {
  263. t.Fatal(err)
  264. }
  265. pexCh.Close()
  266. peerUpdates.Close()
  267. })
  268. return &singleTestReactor{
  269. reactor: reactor,
  270. pexInCh: pexInCh,
  271. pexOutCh: pexOutCh,
  272. pexErrCh: pexErrCh,
  273. pexCh: pexCh,
  274. peerCh: peerCh,
  275. manager: peerManager,
  276. }
  277. }
  278. type reactorTestSuite struct {
  279. network *p2ptest.Network
  280. logger log.Logger
  281. reactors map[p2p.NodeID]*pex.ReactorV2
  282. pexChannels map[p2p.NodeID]*p2p.Channel
  283. peerChans map[p2p.NodeID]chan p2p.PeerUpdate
  284. peerUpdates map[p2p.NodeID]*p2p.PeerUpdates
  285. nodes []p2p.NodeID
  286. mocks []p2p.NodeID
  287. total int
  288. opts testOptions
  289. }
  290. type testOptions struct {
  291. MockNodes int
  292. TotalNodes int
  293. BufferSize int
  294. MaxPeers uint16
  295. MaxConnected uint16
  296. }
  297. // setup setups a test suite with a network of nodes. Mocknodes represent the
  298. // hollow nodes that the test can listen and send on
  299. func setupNetwork(t *testing.T, opts testOptions) *reactorTestSuite {
  300. t.Helper()
  301. require.Greater(t, opts.TotalNodes, opts.MockNodes)
  302. if opts.BufferSize == 0 {
  303. opts.BufferSize = defaultBufferSize
  304. }
  305. networkOpts := p2ptest.NetworkOptions{
  306. NumNodes: opts.TotalNodes,
  307. BufferSize: opts.BufferSize,
  308. NodeOpts: p2ptest.NodeOptions{
  309. MaxPeers: opts.MaxPeers,
  310. MaxConnected: opts.MaxConnected,
  311. },
  312. }
  313. chBuf := opts.BufferSize
  314. realNodes := opts.TotalNodes - opts.MockNodes
  315. rts := &reactorTestSuite{
  316. logger: log.TestingLogger().With("testCase", t.Name()),
  317. network: p2ptest.MakeNetwork(t, networkOpts),
  318. reactors: make(map[p2p.NodeID]*pex.ReactorV2, realNodes),
  319. pexChannels: make(map[p2p.NodeID]*p2p.Channel, opts.TotalNodes),
  320. peerChans: make(map[p2p.NodeID]chan p2p.PeerUpdate, opts.TotalNodes),
  321. peerUpdates: make(map[p2p.NodeID]*p2p.PeerUpdates, opts.TotalNodes),
  322. total: opts.TotalNodes,
  323. opts: opts,
  324. }
  325. // NOTE: we don't assert that the channels get drained after stopping the
  326. // reactor
  327. rts.pexChannels = rts.network.MakeChannelsNoCleanup(
  328. t, pex.ChannelDescriptor(), new(proto.PexMessage), chBuf,
  329. )
  330. idx := 0
  331. for nodeID := range rts.network.Nodes {
  332. rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf)
  333. rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf)
  334. rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID])
  335. // the first nodes in the array are always mock nodes
  336. if idx < opts.MockNodes {
  337. rts.mocks = append(rts.mocks, nodeID)
  338. } else {
  339. rts.reactors[nodeID] = pex.NewReactorV2(
  340. rts.logger.With("nodeID", nodeID),
  341. rts.network.Nodes[nodeID].PeerManager,
  342. rts.pexChannels[nodeID],
  343. rts.peerUpdates[nodeID],
  344. )
  345. }
  346. rts.nodes = append(rts.nodes, nodeID)
  347. idx++
  348. }
  349. require.Len(t, rts.reactors, realNodes)
  350. t.Cleanup(func() {
  351. for nodeID, reactor := range rts.reactors {
  352. if reactor.IsRunning() {
  353. require.NoError(t, reactor.Stop())
  354. require.False(t, reactor.IsRunning())
  355. }
  356. rts.pexChannels[nodeID].Close()
  357. rts.peerUpdates[nodeID].Close()
  358. }
  359. for _, nodeID := range rts.mocks {
  360. rts.pexChannels[nodeID].Close()
  361. rts.peerUpdates[nodeID].Close()
  362. }
  363. })
  364. return rts
  365. }
  366. // starts up the pex reactors for each node
  367. func (r *reactorTestSuite) start(t *testing.T) {
  368. t.Helper()
  369. for _, reactor := range r.reactors {
  370. require.NoError(t, reactor.Start())
  371. require.True(t, reactor.IsRunning())
  372. }
  373. }
  374. func (r *reactorTestSuite) addNodes(t *testing.T, nodes int) {
  375. t.Helper()
  376. for i := 0; i < nodes; i++ {
  377. node := r.network.MakeNode(t, p2ptest.NodeOptions{
  378. MaxPeers: r.opts.MaxPeers,
  379. MaxConnected: r.opts.MaxConnected,
  380. })
  381. r.network.Nodes[node.NodeID] = node
  382. nodeID := node.NodeID
  383. r.pexChannels[nodeID] = node.MakeChannelNoCleanup(
  384. t, pex.ChannelDescriptor(), new(proto.PexMessage), r.opts.BufferSize,
  385. )
  386. r.peerChans[nodeID] = make(chan p2p.PeerUpdate, r.opts.BufferSize)
  387. r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize)
  388. r.network.Nodes[nodeID].PeerManager.Register(r.peerUpdates[nodeID])
  389. r.reactors[nodeID] = pex.NewReactorV2(
  390. r.logger.With("nodeID", nodeID),
  391. r.network.Nodes[nodeID].PeerManager,
  392. r.pexChannels[nodeID],
  393. r.peerUpdates[nodeID],
  394. )
  395. r.nodes = append(r.nodes, nodeID)
  396. r.total++
  397. }
  398. }
  399. func (r *reactorTestSuite) listenFor(
  400. t *testing.T,
  401. node p2p.NodeID,
  402. conditional func(msg p2p.Envelope) bool,
  403. assertion func(t *testing.T, msg p2p.Envelope) bool,
  404. waitPeriod time.Duration,
  405. ) {
  406. timesUp := time.After(waitPeriod)
  407. for {
  408. select {
  409. case envelope := <-r.pexChannels[node].In:
  410. if conditional(envelope) && assertion(t, envelope) {
  411. return
  412. }
  413. case <-timesUp:
  414. require.Fail(t, "timed out waiting for message",
  415. "node=%v, waitPeriod=%s", node, waitPeriod)
  416. }
  417. }
  418. }
  419. func (r *reactorTestSuite) listenForRequest(t *testing.T, fromNode, toNode int, waitPeriod time.Duration) {
  420. r.logger.Info("Listening for request", "from", fromNode, "to", toNode)
  421. to, from := r.checkNodePair(t, toNode, fromNode)
  422. conditional := func(msg p2p.Envelope) bool {
  423. _, ok := msg.Message.(*proto.PexRequestV2)
  424. return ok && msg.From == from
  425. }
  426. assertion := func(t *testing.T, msg p2p.Envelope) bool {
  427. require.Equal(t, &proto.PexRequestV2{}, msg.Message)
  428. return true
  429. }
  430. r.listenFor(t, to, conditional, assertion, waitPeriod)
  431. }
  432. func (r *reactorTestSuite) pingAndlistenForNAddresses(
  433. t *testing.T,
  434. fromNode, toNode int,
  435. waitPeriod time.Duration,
  436. addresses int,
  437. ) {
  438. r.logger.Info("Listening for addresses", "from", fromNode, "to", toNode)
  439. to, from := r.checkNodePair(t, toNode, fromNode)
  440. conditional := func(msg p2p.Envelope) bool {
  441. _, ok := msg.Message.(*proto.PexResponseV2)
  442. return ok && msg.From == from
  443. }
  444. assertion := func(t *testing.T, msg p2p.Envelope) bool {
  445. m, ok := msg.Message.(*proto.PexResponseV2)
  446. if !ok {
  447. require.Fail(t, "expected pex response v2")
  448. return true
  449. }
  450. // assert the same amount of addresses
  451. if len(m.Addresses) == addresses {
  452. return true
  453. }
  454. // if we didn't get the right length, we wait and send the
  455. // request again
  456. time.Sleep(300 * time.Millisecond)
  457. r.sendRequest(t, toNode, fromNode, true)
  458. return false
  459. }
  460. r.sendRequest(t, toNode, fromNode, true)
  461. r.listenFor(t, to, conditional, assertion, waitPeriod)
  462. }
  463. func (r *reactorTestSuite) listenForResponse(
  464. t *testing.T,
  465. fromNode, toNode int,
  466. waitPeriod time.Duration,
  467. addresses []proto.PexAddressV2,
  468. ) {
  469. r.logger.Info("Listening for response", "from", fromNode, "to", toNode)
  470. to, from := r.checkNodePair(t, toNode, fromNode)
  471. conditional := func(msg p2p.Envelope) bool {
  472. _, ok := msg.Message.(*proto.PexResponseV2)
  473. r.logger.Info("message", msg, "ok", ok)
  474. return ok && msg.From == from
  475. }
  476. assertion := func(t *testing.T, msg p2p.Envelope) bool {
  477. require.Equal(t, &proto.PexResponseV2{Addresses: addresses}, msg.Message)
  478. return true
  479. }
  480. r.listenFor(t, to, conditional, assertion, waitPeriod)
  481. }
  482. func (r *reactorTestSuite) listenForLegacyResponse(
  483. t *testing.T,
  484. fromNode, toNode int,
  485. waitPeriod time.Duration,
  486. addresses []proto.PexAddress,
  487. ) {
  488. r.logger.Info("Listening for response", "from", fromNode, "to", toNode)
  489. to, from := r.checkNodePair(t, toNode, fromNode)
  490. conditional := func(msg p2p.Envelope) bool {
  491. _, ok := msg.Message.(*proto.PexResponse)
  492. return ok && msg.From == from
  493. }
  494. assertion := func(t *testing.T, msg p2p.Envelope) bool {
  495. require.Equal(t, &proto.PexResponse{Addresses: addresses}, msg.Message)
  496. return true
  497. }
  498. r.listenFor(t, to, conditional, assertion, waitPeriod)
  499. }
  500. func (r *reactorTestSuite) listenForPeerUpdate(
  501. t *testing.T,
  502. onNode, withNode int,
  503. status p2p.PeerStatus,
  504. waitPeriod time.Duration,
  505. ) {
  506. on, with := r.checkNodePair(t, onNode, withNode)
  507. sub := r.network.Nodes[on].PeerManager.Subscribe()
  508. defer sub.Close()
  509. timesUp := time.After(waitPeriod)
  510. for {
  511. select {
  512. case peerUpdate := <-sub.Updates():
  513. if peerUpdate.NodeID == with {
  514. require.Equal(t, status, peerUpdate.Status)
  515. return
  516. }
  517. case <-timesUp:
  518. require.Fail(t, "timed out waiting for peer status", "%v with status %v",
  519. with, status)
  520. return
  521. }
  522. }
  523. }
  524. func (r *reactorTestSuite) getV2AddressesFor(nodes []int) []proto.PexAddressV2 {
  525. addresses := make([]proto.PexAddressV2, len(nodes))
  526. for idx, node := range nodes {
  527. nodeID := r.nodes[node]
  528. addresses[idx] = proto.PexAddressV2{
  529. URL: r.network.Nodes[nodeID].NodeAddress.String(),
  530. }
  531. }
  532. return addresses
  533. }
  534. func (r *reactorTestSuite) getAddressesFor(t *testing.T, nodes []int) []proto.PexAddress {
  535. addresses := make([]proto.PexAddress, len(nodes))
  536. for idx, node := range nodes {
  537. nodeID := r.nodes[node]
  538. nodeAddrs := r.network.Nodes[nodeID].NodeAddress
  539. endpoints, err := nodeAddrs.Resolve(context.Background())
  540. require.NoError(t, err)
  541. require.Len(t, endpoints, 1)
  542. addresses[idx] = proto.PexAddress{
  543. ID: string(nodeAddrs.NodeID),
  544. IP: endpoints[0].IP.String(),
  545. Port: uint32(endpoints[0].Port),
  546. }
  547. }
  548. return addresses
  549. }
  550. func (r *reactorTestSuite) sendRequest(t *testing.T, fromNode, toNode int, v2 bool) {
  551. to, from := r.checkNodePair(t, toNode, fromNode)
  552. if v2 {
  553. r.pexChannels[from].Out <- p2p.Envelope{
  554. To: to,
  555. Message: &proto.PexRequestV2{},
  556. }
  557. } else {
  558. r.pexChannels[from].Out <- p2p.Envelope{
  559. To: to,
  560. Message: &proto.PexRequest{},
  561. }
  562. }
  563. }
  564. func (r *reactorTestSuite) sendResponse(
  565. t *testing.T,
  566. fromNode, toNode int,
  567. withNodes []int,
  568. v2 bool,
  569. ) {
  570. from, to := r.checkNodePair(t, fromNode, toNode)
  571. if v2 {
  572. addrs := r.getV2AddressesFor(withNodes)
  573. r.pexChannels[from].Out <- p2p.Envelope{
  574. To: to,
  575. Message: &proto.PexResponseV2{
  576. Addresses: addrs,
  577. },
  578. }
  579. } else {
  580. addrs := r.getAddressesFor(t, withNodes)
  581. r.pexChannels[from].Out <- p2p.Envelope{
  582. To: to,
  583. Message: &proto.PexResponse{
  584. Addresses: addrs,
  585. },
  586. }
  587. }
  588. }
  589. func (r *reactorTestSuite) requireNumberOfPeers(
  590. t *testing.T,
  591. nodeIndex, numPeers int,
  592. waitPeriod time.Duration,
  593. ) {
  594. t.Helper()
  595. require.Eventuallyf(t, func() bool {
  596. actualNumPeers := len(r.network.Nodes[r.nodes[nodeIndex]].PeerManager.Peers())
  597. return actualNumPeers >= numPeers
  598. }, waitPeriod, checkFrequency, "peer failed to connect with the asserted amount of peers "+
  599. "index=%d, node=%q, waitPeriod=%s expected=%d actual=%d",
  600. nodeIndex, r.nodes[nodeIndex], waitPeriod, numPeers,
  601. len(r.network.Nodes[r.nodes[nodeIndex]].PeerManager.Peers()),
  602. )
  603. }
  604. func (r *reactorTestSuite) connectAll(t *testing.T) {
  605. r.connectN(t, r.total-1)
  606. }
  607. // connects all nodes with n other nodes
  608. func (r *reactorTestSuite) connectN(t *testing.T, n int) {
  609. if n >= r.total {
  610. require.Fail(t, "connectN: n must be less than the size of the network - 1")
  611. }
  612. for i := 0; i < r.total; i++ {
  613. for j := 0; j < n; j++ {
  614. r.connectPeers(t, i, (i+j+1)%r.total)
  615. }
  616. }
  617. }
  618. // connects node1 to node2
  619. func (r *reactorTestSuite) connectPeers(t *testing.T, sourceNode, targetNode int) {
  620. t.Helper()
  621. node1, node2 := r.checkNodePair(t, sourceNode, targetNode)
  622. r.logger.Info("connecting peers", "sourceNode", sourceNode, "targetNode", targetNode)
  623. n1 := r.network.Nodes[node1]
  624. if n1 == nil {
  625. require.Fail(t, "connectPeers: source node %v is not part of the testnet", node1)
  626. return
  627. }
  628. n2 := r.network.Nodes[node2]
  629. if n2 == nil {
  630. require.Fail(t, "connectPeers: target node %v is not part of the testnet", node2)
  631. return
  632. }
  633. sourceSub := n1.PeerManager.Subscribe()
  634. defer sourceSub.Close()
  635. targetSub := n2.PeerManager.Subscribe()
  636. defer targetSub.Close()
  637. sourceAddress := n1.NodeAddress
  638. r.logger.Debug("source address", "address", sourceAddress)
  639. targetAddress := n2.NodeAddress
  640. r.logger.Debug("target address", "address", targetAddress)
  641. added, err := n1.PeerManager.Add(targetAddress)
  642. require.NoError(t, err)
  643. if !added {
  644. r.logger.Debug("nodes already know about one another",
  645. "sourceNode", sourceNode, "targetNode", targetNode)
  646. return
  647. }
  648. select {
  649. case peerUpdate := <-targetSub.Updates():
  650. require.Equal(t, p2p.PeerUpdate{
  651. NodeID: node1,
  652. Status: p2p.PeerStatusUp,
  653. }, peerUpdate)
  654. r.logger.Debug("target connected with source")
  655. case <-time.After(time.Second):
  656. require.Fail(t, "timed out waiting for peer", "%v accepting %v",
  657. targetNode, sourceNode)
  658. }
  659. select {
  660. case peerUpdate := <-sourceSub.Updates():
  661. require.Equal(t, p2p.PeerUpdate{
  662. NodeID: node2,
  663. Status: p2p.PeerStatusUp,
  664. }, peerUpdate)
  665. r.logger.Debug("source connected with target")
  666. case <-time.After(time.Second):
  667. require.Fail(t, "timed out waiting for peer", "%v dialing %v",
  668. sourceNode, targetNode)
  669. }
  670. added, err = n2.PeerManager.Add(sourceAddress)
  671. require.NoError(t, err)
  672. require.True(t, added)
  673. }
  674. // nolint: unused
  675. func (r *reactorTestSuite) pexAddresses(t *testing.T, nodeIndices []int) []proto.PexAddress {
  676. var addresses []proto.PexAddress
  677. for _, i := range nodeIndices {
  678. if i < len(r.nodes) {
  679. require.Fail(t, "index for pex address is greater than number of nodes")
  680. }
  681. nodeAddrs := r.network.Nodes[r.nodes[i]].NodeAddress
  682. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  683. endpoints, err := nodeAddrs.Resolve(ctx)
  684. cancel()
  685. require.NoError(t, err)
  686. for _, endpoint := range endpoints {
  687. if endpoint.IP != nil {
  688. addresses = append(addresses, proto.PexAddress{
  689. ID: string(nodeAddrs.NodeID),
  690. IP: endpoint.IP.String(),
  691. Port: uint32(endpoint.Port),
  692. })
  693. }
  694. }
  695. }
  696. return addresses
  697. }
  698. func (r *reactorTestSuite) checkNodePair(t *testing.T, first, second int) (p2p.NodeID, p2p.NodeID) {
  699. require.NotEqual(t, first, second)
  700. require.Less(t, first, r.total)
  701. require.Less(t, second, r.total)
  702. return r.nodes[first], r.nodes[second]
  703. }
  704. func (r *reactorTestSuite) addAddresses(t *testing.T, node int, addrs []int) {
  705. peerManager := r.network.Nodes[r.nodes[node]].PeerManager
  706. for _, addr := range addrs {
  707. require.Less(t, addr, r.total)
  708. address := r.network.Nodes[r.nodes[addr]].NodeAddress
  709. added, err := peerManager.Add(address)
  710. require.NoError(t, err)
  711. require.True(t, added)
  712. }
  713. }
  714. func newNodeID(t *testing.T, id string) p2p.NodeID {
  715. nodeID, err := p2p.NewNodeID(strings.Repeat(id, 2*p2p.NodeIDByteLength))
  716. require.NoError(t, err)
  717. return nodeID
  718. }
  719. func randomNodeID(t *testing.T) p2p.NodeID {
  720. return p2p.NodeIDFromPubKey(ed25519.GenPrivKey().PubKey())
  721. }