You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

585 lines
16 KiB

  1. package pex
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "os"
  6. "path/filepath"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. "github.com/tendermint/tendermint/config"
  12. "github.com/tendermint/tendermint/libs/log"
  13. "github.com/tendermint/tendermint/p2p"
  14. "github.com/tendermint/tendermint/p2p/mock"
  15. )
  16. var (
  17. cfg *config.P2PConfig
  18. )
  19. func init() {
  20. cfg = config.DefaultP2PConfig()
  21. cfg.PexReactor = true
  22. cfg.AllowDuplicateIP = true
  23. }
  24. func TestPEXReactorBasic(t *testing.T) {
  25. r, book := createReactor(&PEXReactorConfig{})
  26. defer teardownReactor(book)
  27. assert.NotNil(t, r)
  28. assert.NotEmpty(t, r.GetChannels())
  29. }
  30. func TestPEXReactorAddRemovePeer(t *testing.T) {
  31. r, book := createReactor(&PEXReactorConfig{})
  32. defer teardownReactor(book)
  33. size := book.Size()
  34. peer := p2p.CreateRandomPeer(false)
  35. r.AddPeer(peer)
  36. assert.Equal(t, size+1, book.Size())
  37. r.RemovePeer(peer, "peer not available")
  38. outboundPeer := p2p.CreateRandomPeer(true)
  39. r.AddPeer(outboundPeer)
  40. assert.Equal(t, size+1, book.Size(), "outbound peers should not be added to the address book")
  41. r.RemovePeer(outboundPeer, "peer not available")
  42. }
  43. // --- FAIL: TestPEXReactorRunning (11.10s)
  44. // pex_reactor_test.go:411: expected all switches to be connected to at
  45. // least one peer (switches: 0 => {outbound: 1, inbound: 0}, 1 =>
  46. // {outbound: 0, inbound: 1}, 2 => {outbound: 0, inbound: 0}, )
  47. //
  48. // EXPLANATION: peers are getting rejected because in switch#addPeer we check
  49. // if any peer (who we already connected to) has the same IP. Even though local
  50. // peers have different IP addresses, they all have the same underlying remote
  51. // IP: 127.0.0.1.
  52. //
  53. func TestPEXReactorRunning(t *testing.T) {
  54. N := 3
  55. switches := make([]*p2p.Switch, N)
  56. // directory to store address books
  57. dir, err := ioutil.TempDir("", "pex_reactor")
  58. require.Nil(t, err)
  59. defer os.RemoveAll(dir) // nolint: errcheck
  60. books := make([]*addrBook, N)
  61. logger := log.TestingLogger()
  62. // create switches
  63. for i := 0; i < N; i++ {
  64. switches[i] = p2p.MakeSwitch(cfg, i, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
  65. books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false)
  66. books[i].SetLogger(logger.With("pex", i))
  67. sw.SetAddrBook(books[i])
  68. sw.SetLogger(logger.With("pex", i))
  69. r := NewPEXReactor(books[i], &PEXReactorConfig{})
  70. r.SetLogger(logger.With("pex", i))
  71. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  72. sw.AddReactor("pex", r)
  73. return sw
  74. })
  75. }
  76. addOtherNodeAddrToAddrBook := func(switchIndex, otherSwitchIndex int) {
  77. addr := switches[otherSwitchIndex].NetAddress()
  78. books[switchIndex].AddAddress(addr, addr)
  79. }
  80. addOtherNodeAddrToAddrBook(0, 1)
  81. addOtherNodeAddrToAddrBook(1, 0)
  82. addOtherNodeAddrToAddrBook(2, 1)
  83. for _, sw := range switches {
  84. err := sw.Start() // start switch and reactors
  85. require.Nil(t, err)
  86. }
  87. assertPeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second, N-1)
  88. // stop them
  89. for _, s := range switches {
  90. s.Stop()
  91. }
  92. }
  93. func TestPEXReactorReceive(t *testing.T) {
  94. r, book := createReactor(&PEXReactorConfig{})
  95. defer teardownReactor(book)
  96. peer := p2p.CreateRandomPeer(false)
  97. // we have to send a request to receive responses
  98. r.RequestAddrs(peer)
  99. size := book.Size()
  100. addrs := []*p2p.NetAddress{peer.SocketAddr()}
  101. msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
  102. r.Receive(PexChannel, peer, msg)
  103. assert.Equal(t, size+1, book.Size())
  104. msg = cdc.MustMarshalBinaryBare(&pexRequestMessage{})
  105. r.Receive(PexChannel, peer, msg) // should not panic.
  106. }
  107. func TestPEXReactorRequestMessageAbuse(t *testing.T) {
  108. r, book := createReactor(&PEXReactorConfig{})
  109. defer teardownReactor(book)
  110. sw := createSwitchAndAddReactors(r)
  111. sw.SetAddrBook(book)
  112. peer := mock.NewPeer(nil)
  113. p2p.AddPeerToSwitch(sw, peer)
  114. assert.True(t, sw.Peers().Has(peer.ID()))
  115. id := string(peer.ID())
  116. msg := cdc.MustMarshalBinaryBare(&pexRequestMessage{})
  117. // first time creates the entry
  118. r.Receive(PexChannel, peer, msg)
  119. assert.True(t, r.lastReceivedRequests.Has(id))
  120. assert.True(t, sw.Peers().Has(peer.ID()))
  121. // next time sets the last time value
  122. r.Receive(PexChannel, peer, msg)
  123. assert.True(t, r.lastReceivedRequests.Has(id))
  124. assert.True(t, sw.Peers().Has(peer.ID()))
  125. // third time is too many too soon - peer is removed
  126. r.Receive(PexChannel, peer, msg)
  127. assert.False(t, r.lastReceivedRequests.Has(id))
  128. assert.False(t, sw.Peers().Has(peer.ID()))
  129. }
  130. func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
  131. r, book := createReactor(&PEXReactorConfig{})
  132. defer teardownReactor(book)
  133. sw := createSwitchAndAddReactors(r)
  134. sw.SetAddrBook(book)
  135. peer := mock.NewPeer(nil)
  136. p2p.AddPeerToSwitch(sw, peer)
  137. assert.True(t, sw.Peers().Has(peer.ID()))
  138. id := string(peer.ID())
  139. // request addrs from the peer
  140. r.RequestAddrs(peer)
  141. assert.True(t, r.requestsSent.Has(id))
  142. assert.True(t, sw.Peers().Has(peer.ID()))
  143. addrs := []*p2p.NetAddress{peer.SocketAddr()}
  144. msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
  145. // receive some addrs. should clear the request
  146. r.Receive(PexChannel, peer, msg)
  147. assert.False(t, r.requestsSent.Has(id))
  148. assert.True(t, sw.Peers().Has(peer.ID()))
  149. // receiving more addrs causes a disconnect
  150. r.Receive(PexChannel, peer, msg)
  151. assert.False(t, sw.Peers().Has(peer.ID()))
  152. }
  153. func TestCheckSeeds(t *testing.T) {
  154. // directory to store address books
  155. dir, err := ioutil.TempDir("", "pex_reactor")
  156. require.Nil(t, err)
  157. defer os.RemoveAll(dir) // nolint: errcheck
  158. // 1. test creating peer with no seeds works
  159. peer := testCreateDefaultPeer(dir, 0)
  160. require.Nil(t, peer.Start())
  161. peer.Stop()
  162. // 2. create seed
  163. seed := testCreateSeed(dir, 1, []*p2p.NetAddress{}, []*p2p.NetAddress{})
  164. // 3. test create peer with online seed works
  165. peer = testCreatePeerWithSeed(dir, 2, seed)
  166. require.Nil(t, peer.Start())
  167. peer.Stop()
  168. // 4. test create peer with all seeds having unresolvable DNS fails
  169. badPeerConfig := &PEXReactorConfig{
  170. Seeds: []string{"ed3dfd27bfc4af18f67a49862f04cc100696e84d@bad.network.addr:26657",
  171. "d824b13cb5d40fa1d8a614e089357c7eff31b670@anotherbad.network.addr:26657"},
  172. }
  173. peer = testCreatePeerWithConfig(dir, 2, badPeerConfig)
  174. require.Error(t, peer.Start())
  175. peer.Stop()
  176. // 5. test create peer with one good seed address succeeds
  177. badPeerConfig = &PEXReactorConfig{
  178. Seeds: []string{"ed3dfd27bfc4af18f67a49862f04cc100696e84d@bad.network.addr:26657",
  179. "d824b13cb5d40fa1d8a614e089357c7eff31b670@anotherbad.network.addr:26657",
  180. seed.NetAddress().String()},
  181. }
  182. peer = testCreatePeerWithConfig(dir, 2, badPeerConfig)
  183. require.Nil(t, peer.Start())
  184. peer.Stop()
  185. }
  186. func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
  187. // directory to store address books
  188. dir, err := ioutil.TempDir("", "pex_reactor")
  189. require.Nil(t, err)
  190. defer os.RemoveAll(dir) // nolint: errcheck
  191. // 1. create seed
  192. seed := testCreateSeed(dir, 0, []*p2p.NetAddress{}, []*p2p.NetAddress{})
  193. require.Nil(t, seed.Start())
  194. defer seed.Stop()
  195. // 2. create usual peer with only seed configured.
  196. peer := testCreatePeerWithSeed(dir, 1, seed)
  197. require.Nil(t, peer.Start())
  198. defer peer.Stop()
  199. // 3. check that the peer connects to seed immediately
  200. assertPeersWithTimeout(t, []*p2p.Switch{peer}, 10*time.Millisecond, 3*time.Second, 1)
  201. }
  202. func TestConnectionSpeedForPeerReceivedFromSeed(t *testing.T) {
  203. // directory to store address books
  204. dir, err := ioutil.TempDir("", "pex_reactor")
  205. require.Nil(t, err)
  206. defer os.RemoveAll(dir) // nolint: errcheck
  207. // 1. create peer
  208. peerSwitch := testCreateDefaultPeer(dir, 1)
  209. require.Nil(t, peerSwitch.Start())
  210. defer peerSwitch.Stop()
  211. // 2. Create seed which knows about the peer
  212. peerAddr := peerSwitch.NetAddress()
  213. seed := testCreateSeed(dir, 2, []*p2p.NetAddress{peerAddr}, []*p2p.NetAddress{peerAddr})
  214. require.Nil(t, seed.Start())
  215. defer seed.Stop()
  216. // 3. create another peer with only seed configured.
  217. secondPeer := testCreatePeerWithSeed(dir, 3, seed)
  218. require.Nil(t, secondPeer.Start())
  219. defer secondPeer.Stop()
  220. // 4. check that the second peer connects to seed immediately
  221. assertPeersWithTimeout(t, []*p2p.Switch{secondPeer}, 10*time.Millisecond, 3*time.Second, 1)
  222. // 5. check that the second peer connects to the first peer immediately
  223. assertPeersWithTimeout(t, []*p2p.Switch{secondPeer}, 10*time.Millisecond, 1*time.Second, 2)
  224. }
  225. func TestPEXReactorCrawlStatus(t *testing.T) {
  226. pexR, book := createReactor(&PEXReactorConfig{SeedMode: true})
  227. defer teardownReactor(book)
  228. // Seed/Crawler mode uses data from the Switch
  229. sw := createSwitchAndAddReactors(pexR)
  230. sw.SetAddrBook(book)
  231. // Create a peer, add it to the peer set and the addrbook.
  232. peer := p2p.CreateRandomPeer(false)
  233. p2p.AddPeerToSwitch(pexR.Switch, peer)
  234. addr1 := peer.SocketAddr()
  235. pexR.book.AddAddress(addr1, addr1)
  236. // Add a non-connected address to the book.
  237. _, addr2 := p2p.CreateRoutableAddr()
  238. pexR.book.AddAddress(addr2, addr1)
  239. // Get some peerInfos to crawl
  240. peerInfos := pexR.getPeersToCrawl()
  241. // Make sure it has the proper number of elements
  242. assert.Equal(t, 2, len(peerInfos))
  243. // TODO: test
  244. }
  245. // connect a peer to a seed, wait a bit, then stop it.
  246. // this should give it time to request addrs and for the seed
  247. // to call FlushStop, and allows us to test calling Stop concurrently
  248. // with FlushStop. Before a fix, this non-deterministically reproduced
  249. // https://github.com/tendermint/tendermint/issues/3231.
  250. func TestPEXReactorSeedModeFlushStop(t *testing.T) {
  251. N := 2
  252. switches := make([]*p2p.Switch, N)
  253. // directory to store address books
  254. dir, err := ioutil.TempDir("", "pex_reactor")
  255. require.Nil(t, err)
  256. defer os.RemoveAll(dir) // nolint: errcheck
  257. books := make([]*addrBook, N)
  258. logger := log.TestingLogger()
  259. // create switches
  260. for i := 0; i < N; i++ {
  261. switches[i] = p2p.MakeSwitch(cfg, i, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
  262. books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false)
  263. books[i].SetLogger(logger.With("pex", i))
  264. sw.SetAddrBook(books[i])
  265. sw.SetLogger(logger.With("pex", i))
  266. config := &PEXReactorConfig{}
  267. if i == 0 {
  268. // first one is a seed node
  269. config = &PEXReactorConfig{SeedMode: true}
  270. }
  271. r := NewPEXReactor(books[i], config)
  272. r.SetLogger(logger.With("pex", i))
  273. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  274. sw.AddReactor("pex", r)
  275. return sw
  276. })
  277. }
  278. for _, sw := range switches {
  279. err := sw.Start() // start switch and reactors
  280. require.Nil(t, err)
  281. }
  282. reactor := switches[0].Reactors()["pex"].(*PEXReactor)
  283. peerID := switches[1].NodeInfo().ID()
  284. err = switches[1].DialPeerWithAddress(switches[0].NetAddress(), false)
  285. assert.NoError(t, err)
  286. // sleep up to a second while waiting for the peer to send us a message.
  287. // this isn't perfect since it's possible the peer sends us a msg and we FlushStop
  288. // before this loop catches it. but non-deterministically it works pretty well.
  289. for i := 0; i < 1000; i++ {
  290. v := reactor.lastReceivedRequests.Get(string(peerID))
  291. if v != nil {
  292. break
  293. }
  294. time.Sleep(time.Millisecond)
  295. }
  296. // by now the FlushStop should have happened. Try stopping the peer.
  297. // it should be safe to do this.
  298. peers := switches[0].Peers().List()
  299. for _, peer := range peers {
  300. peer.Stop()
  301. }
  302. // stop the switches
  303. for _, s := range switches {
  304. s.Stop()
  305. }
  306. }
  307. func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
  308. peer := p2p.CreateRandomPeer(false)
  309. pexR, book := createReactor(&PEXReactorConfig{})
  310. book.AddPrivateIDs([]string{string(peer.NodeInfo().ID())})
  311. defer teardownReactor(book)
  312. // we have to send a request to receive responses
  313. pexR.RequestAddrs(peer)
  314. size := book.Size()
  315. addrs := []*p2p.NetAddress{peer.SocketAddr()}
  316. msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
  317. pexR.Receive(PexChannel, peer, msg)
  318. assert.Equal(t, size, book.Size())
  319. pexR.AddPeer(peer)
  320. assert.Equal(t, size, book.Size())
  321. }
  322. func TestPEXReactorDialPeer(t *testing.T) {
  323. pexR, book := createReactor(&PEXReactorConfig{})
  324. defer teardownReactor(book)
  325. sw := createSwitchAndAddReactors(pexR)
  326. sw.SetAddrBook(book)
  327. peer := mock.NewPeer(nil)
  328. addr := peer.SocketAddr()
  329. assert.Equal(t, 0, pexR.AttemptsToDial(addr))
  330. // 1st unsuccessful attempt
  331. pexR.dialPeer(addr)
  332. assert.Equal(t, 1, pexR.AttemptsToDial(addr))
  333. // 2nd unsuccessful attempt
  334. pexR.dialPeer(addr)
  335. // must be skipped because it is too early
  336. assert.Equal(t, 1, pexR.AttemptsToDial(addr))
  337. if !testing.Short() {
  338. time.Sleep(3 * time.Second)
  339. // 3rd attempt
  340. pexR.dialPeer(addr)
  341. assert.Equal(t, 2, pexR.AttemptsToDial(addr))
  342. }
  343. }
  344. func assertPeersWithTimeout(
  345. t *testing.T,
  346. switches []*p2p.Switch,
  347. checkPeriod, timeout time.Duration,
  348. nPeers int,
  349. ) {
  350. var (
  351. ticker = time.NewTicker(checkPeriod)
  352. remaining = timeout
  353. )
  354. for {
  355. select {
  356. case <-ticker.C:
  357. // check peers are connected
  358. allGood := true
  359. for _, s := range switches {
  360. outbound, inbound, _ := s.NumPeers()
  361. if outbound+inbound < nPeers {
  362. allGood = false
  363. break
  364. }
  365. }
  366. remaining -= checkPeriod
  367. if remaining < 0 {
  368. remaining = 0
  369. }
  370. if allGood {
  371. return
  372. }
  373. case <-time.After(remaining):
  374. numPeersStr := ""
  375. for i, s := range switches {
  376. outbound, inbound, _ := s.NumPeers()
  377. numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
  378. }
  379. t.Errorf(
  380. "expected all switches to be connected to at least %d peer(s) (switches: %s)",
  381. nPeers, numPeersStr,
  382. )
  383. return
  384. }
  385. }
  386. }
  387. // Creates a peer with the provided config
  388. func testCreatePeerWithConfig(dir string, id int, config *PEXReactorConfig) *p2p.Switch {
  389. peer := p2p.MakeSwitch(
  390. cfg,
  391. id,
  392. "127.0.0.1",
  393. "123.123.123",
  394. func(i int, sw *p2p.Switch) *p2p.Switch {
  395. book := NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", id)), false)
  396. book.SetLogger(log.TestingLogger())
  397. sw.SetAddrBook(book)
  398. sw.SetLogger(log.TestingLogger())
  399. r := NewPEXReactor(
  400. book,
  401. config,
  402. )
  403. r.SetLogger(log.TestingLogger())
  404. sw.AddReactor("pex", r)
  405. return sw
  406. },
  407. )
  408. return peer
  409. }
  410. // Creates a peer with the default config
  411. func testCreateDefaultPeer(dir string, id int) *p2p.Switch {
  412. return testCreatePeerWithConfig(dir, id, &PEXReactorConfig{})
  413. }
  414. // Creates a seed which knows about the provided addresses / source address pairs.
  415. // Starting and stopping the seed is left to the caller
  416. func testCreateSeed(dir string, id int, knownAddrs, srcAddrs []*p2p.NetAddress) *p2p.Switch {
  417. seed := p2p.MakeSwitch(
  418. cfg,
  419. id,
  420. "127.0.0.1",
  421. "123.123.123",
  422. func(i int, sw *p2p.Switch) *p2p.Switch {
  423. book := NewAddrBook(filepath.Join(dir, "addrbookSeed.json"), false)
  424. book.SetLogger(log.TestingLogger())
  425. for j := 0; j < len(knownAddrs); j++ {
  426. book.AddAddress(knownAddrs[j], srcAddrs[j])
  427. book.MarkGood(knownAddrs[j])
  428. }
  429. sw.SetAddrBook(book)
  430. sw.SetLogger(log.TestingLogger())
  431. r := NewPEXReactor(book, &PEXReactorConfig{})
  432. r.SetLogger(log.TestingLogger())
  433. sw.AddReactor("pex", r)
  434. return sw
  435. },
  436. )
  437. return seed
  438. }
  439. // Creates a peer which knows about the provided seed.
  440. // Starting and stopping the peer is left to the caller
  441. func testCreatePeerWithSeed(dir string, id int, seed *p2p.Switch) *p2p.Switch {
  442. conf := &PEXReactorConfig{
  443. Seeds: []string{seed.NetAddress().String()},
  444. }
  445. return testCreatePeerWithConfig(dir, id, conf)
  446. }
  447. func createReactor(conf *PEXReactorConfig) (r *PEXReactor, book *addrBook) {
  448. // directory to store address book
  449. dir, err := ioutil.TempDir("", "pex_reactor")
  450. if err != nil {
  451. panic(err)
  452. }
  453. book = NewAddrBook(filepath.Join(dir, "addrbook.json"), true)
  454. book.SetLogger(log.TestingLogger())
  455. r = NewPEXReactor(book, conf)
  456. r.SetLogger(log.TestingLogger())
  457. return
  458. }
  459. func teardownReactor(book *addrBook) {
  460. err := os.RemoveAll(filepath.Dir(book.FilePath()))
  461. if err != nil {
  462. panic(err)
  463. }
  464. }
  465. func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch {
  466. sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
  467. sw.SetLogger(log.TestingLogger())
  468. for _, r := range reactors {
  469. sw.AddReactor(r.String(), r)
  470. r.SetSwitch(sw)
  471. }
  472. return sw
  473. }