You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

530 lines
14 KiB

  1. package pex
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net"
  6. "os"
  7. "path/filepath"
  8. "testing"
  9. "time"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/stretchr/testify/require"
  12. crypto "github.com/tendermint/tendermint/crypto"
  13. "github.com/tendermint/tendermint/crypto/ed25519"
  14. cmn "github.com/tendermint/tendermint/libs/common"
  15. "github.com/tendermint/tendermint/libs/log"
  16. "github.com/tendermint/tendermint/config"
  17. "github.com/tendermint/tendermint/p2p"
  18. "github.com/tendermint/tendermint/p2p/conn"
  19. )
  20. var (
  21. cfg *config.P2PConfig
  22. )
  23. func init() {
  24. cfg = config.DefaultP2PConfig()
  25. cfg.PexReactor = true
  26. cfg.AllowDuplicateIP = true
  27. }
  28. func TestPEXReactorBasic(t *testing.T) {
  29. r, book := createReactor(&PEXReactorConfig{})
  30. defer teardownReactor(book)
  31. assert.NotNil(t, r)
  32. assert.NotEmpty(t, r.GetChannels())
  33. }
  34. func TestPEXReactorAddRemovePeer(t *testing.T) {
  35. r, book := createReactor(&PEXReactorConfig{})
  36. defer teardownReactor(book)
  37. size := book.Size()
  38. peer := p2p.CreateRandomPeer(false)
  39. r.AddPeer(peer)
  40. assert.Equal(t, size+1, book.Size())
  41. r.RemovePeer(peer, "peer not available")
  42. outboundPeer := p2p.CreateRandomPeer(true)
  43. r.AddPeer(outboundPeer)
  44. assert.Equal(t, size+1, book.Size(), "outbound peers should not be added to the address book")
  45. r.RemovePeer(outboundPeer, "peer not available")
  46. }
  47. // --- FAIL: TestPEXReactorRunning (11.10s)
  48. // pex_reactor_test.go:411: expected all switches to be connected to at
  49. // least one peer (switches: 0 => {outbound: 1, inbound: 0}, 1 =>
  50. // {outbound: 0, inbound: 1}, 2 => {outbound: 0, inbound: 0}, )
  51. //
  52. // EXPLANATION: peers are getting rejected because in switch#addPeer we check
  53. // if any peer (who we already connected to) has the same IP. Even though local
  54. // peers have different IP addresses, they all have the same underlying remote
  55. // IP: 127.0.0.1.
  56. //
  57. func TestPEXReactorRunning(t *testing.T) {
  58. N := 3
  59. switches := make([]*p2p.Switch, N)
  60. // directory to store address books
  61. dir, err := ioutil.TempDir("", "pex_reactor")
  62. require.Nil(t, err)
  63. defer os.RemoveAll(dir) // nolint: errcheck
  64. books := make([]*addrBook, N)
  65. logger := log.TestingLogger()
  66. // create switches
  67. for i := 0; i < N; i++ {
  68. switches[i] = p2p.MakeSwitch(cfg, i, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
  69. books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false)
  70. books[i].SetLogger(logger.With("pex", i))
  71. sw.SetAddrBook(books[i])
  72. sw.SetLogger(logger.With("pex", i))
  73. r := NewPEXReactor(books[i], &PEXReactorConfig{})
  74. r.SetLogger(logger.With("pex", i))
  75. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  76. sw.AddReactor("pex", r)
  77. return sw
  78. })
  79. }
  80. addOtherNodeAddrToAddrBook := func(switchIndex, otherSwitchIndex int) {
  81. addr := switches[otherSwitchIndex].NodeInfo().NetAddress()
  82. books[switchIndex].AddAddress(addr, addr)
  83. }
  84. addOtherNodeAddrToAddrBook(0, 1)
  85. addOtherNodeAddrToAddrBook(1, 0)
  86. addOtherNodeAddrToAddrBook(2, 1)
  87. for i, sw := range switches {
  88. sw.AddListener(p2p.NewDefaultListener("tcp://"+sw.NodeInfo().ListenAddr, "", false, logger.With("pex", i)))
  89. err := sw.Start() // start switch and reactors
  90. require.Nil(t, err)
  91. }
  92. assertPeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second, N-1)
  93. // stop them
  94. for _, s := range switches {
  95. s.Stop()
  96. }
  97. }
  98. func TestPEXReactorReceive(t *testing.T) {
  99. r, book := createReactor(&PEXReactorConfig{})
  100. defer teardownReactor(book)
  101. peer := p2p.CreateRandomPeer(false)
  102. // we have to send a request to receive responses
  103. r.RequestAddrs(peer)
  104. size := book.Size()
  105. addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
  106. msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
  107. r.Receive(PexChannel, peer, msg)
  108. assert.Equal(t, size+1, book.Size())
  109. msg = cdc.MustMarshalBinaryBare(&pexRequestMessage{})
  110. r.Receive(PexChannel, peer, msg) // should not panic.
  111. }
  112. func TestPEXReactorRequestMessageAbuse(t *testing.T) {
  113. r, book := createReactor(&PEXReactorConfig{})
  114. defer teardownReactor(book)
  115. sw := createSwitchAndAddReactors(r)
  116. sw.SetAddrBook(book)
  117. peer := newMockPeer()
  118. p2p.AddPeerToSwitch(sw, peer)
  119. assert.True(t, sw.Peers().Has(peer.ID()))
  120. id := string(peer.ID())
  121. msg := cdc.MustMarshalBinaryBare(&pexRequestMessage{})
  122. // first time creates the entry
  123. r.Receive(PexChannel, peer, msg)
  124. assert.True(t, r.lastReceivedRequests.Has(id))
  125. assert.True(t, sw.Peers().Has(peer.ID()))
  126. // next time sets the last time value
  127. r.Receive(PexChannel, peer, msg)
  128. assert.True(t, r.lastReceivedRequests.Has(id))
  129. assert.True(t, sw.Peers().Has(peer.ID()))
  130. // third time is too many too soon - peer is removed
  131. r.Receive(PexChannel, peer, msg)
  132. assert.False(t, r.lastReceivedRequests.Has(id))
  133. assert.False(t, sw.Peers().Has(peer.ID()))
  134. }
  135. func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
  136. r, book := createReactor(&PEXReactorConfig{})
  137. defer teardownReactor(book)
  138. sw := createSwitchAndAddReactors(r)
  139. sw.SetAddrBook(book)
  140. peer := newMockPeer()
  141. p2p.AddPeerToSwitch(sw, peer)
  142. assert.True(t, sw.Peers().Has(peer.ID()))
  143. id := string(peer.ID())
  144. // request addrs from the peer
  145. r.RequestAddrs(peer)
  146. assert.True(t, r.requestsSent.Has(id))
  147. assert.True(t, sw.Peers().Has(peer.ID()))
  148. addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
  149. msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
  150. // receive some addrs. should clear the request
  151. r.Receive(PexChannel, peer, msg)
  152. assert.False(t, r.requestsSent.Has(id))
  153. assert.True(t, sw.Peers().Has(peer.ID()))
  154. // receiving more addrs causes a disconnect
  155. r.Receive(PexChannel, peer, msg)
  156. assert.False(t, sw.Peers().Has(peer.ID()))
  157. }
  158. func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
  159. // directory to store address books
  160. dir, err := ioutil.TempDir("", "pex_reactor")
  161. require.Nil(t, err)
  162. defer os.RemoveAll(dir) // nolint: errcheck
  163. // 1. create seed
  164. seed := testCreateSeed(dir, 0, []*p2p.NetAddress{}, []*p2p.NetAddress{})
  165. require.Nil(t, seed.Start())
  166. defer seed.Stop()
  167. // 2. create usual peer with only seed configured.
  168. peer := testCreatePeerWithSeed(dir, 1, seed)
  169. require.Nil(t, peer.Start())
  170. defer peer.Stop()
  171. // 3. check that the peer connects to seed immediately
  172. assertPeersWithTimeout(t, []*p2p.Switch{peer}, 10*time.Millisecond, 3*time.Second, 1)
  173. }
  174. func TestConnectionSpeedForPeerReceivedFromSeed(t *testing.T) {
  175. // directory to store address books
  176. dir, err := ioutil.TempDir("", "pex_reactor")
  177. require.Nil(t, err)
  178. defer os.RemoveAll(dir) // nolint: errcheck
  179. // 1. create peer
  180. peer := p2p.MakeSwitch(
  181. cfg,
  182. 1,
  183. "127.0.0.1",
  184. "123.123.123",
  185. func(i int, sw *p2p.Switch) *p2p.Switch {
  186. book := NewAddrBook(filepath.Join(dir, "addrbook1.json"), false)
  187. book.SetLogger(log.TestingLogger())
  188. sw.SetAddrBook(book)
  189. sw.SetLogger(log.TestingLogger())
  190. r := NewPEXReactor(
  191. book,
  192. &PEXReactorConfig{},
  193. )
  194. r.SetLogger(log.TestingLogger())
  195. sw.AddReactor("pex", r)
  196. return sw
  197. },
  198. )
  199. peer.AddListener(
  200. p2p.NewDefaultListener("tcp://"+peer.NodeInfo().ListenAddr, "", false, log.TestingLogger()),
  201. )
  202. require.Nil(t, peer.Start())
  203. defer peer.Stop()
  204. // 2. Create seed which knows about the peer
  205. seed := testCreateSeed(dir, 2, []*p2p.NetAddress{peer.NodeInfo().NetAddress()}, []*p2p.NetAddress{peer.NodeInfo().NetAddress()})
  206. require.Nil(t, seed.Start())
  207. defer seed.Stop()
  208. // 3. create another peer with only seed configured.
  209. secondPeer := testCreatePeerWithSeed(dir, 3, seed)
  210. require.Nil(t, secondPeer.Start())
  211. defer secondPeer.Stop()
  212. // 4. check that the second peer connects to seed immediately
  213. assertPeersWithTimeout(t, []*p2p.Switch{secondPeer}, 10*time.Millisecond, 3*time.Second, 1)
  214. // 5. check that the second peer connects to the first peer immediately
  215. assertPeersWithTimeout(t, []*p2p.Switch{secondPeer}, 10*time.Millisecond, 1*time.Second, 2)
  216. }
  217. func TestPEXReactorCrawlStatus(t *testing.T) {
  218. pexR, book := createReactor(&PEXReactorConfig{SeedMode: true})
  219. defer teardownReactor(book)
  220. // Seed/Crawler mode uses data from the Switch
  221. sw := createSwitchAndAddReactors(pexR)
  222. sw.SetAddrBook(book)
  223. // Create a peer, add it to the peer set and the addrbook.
  224. peer := p2p.CreateRandomPeer(false)
  225. p2p.AddPeerToSwitch(pexR.Switch, peer)
  226. addr1 := peer.NodeInfo().NetAddress()
  227. pexR.book.AddAddress(addr1, addr1)
  228. // Add a non-connected address to the book.
  229. _, addr2 := p2p.CreateRoutableAddr()
  230. pexR.book.AddAddress(addr2, addr1)
  231. // Get some peerInfos to crawl
  232. peerInfos := pexR.getPeersToCrawl()
  233. // Make sure it has the proper number of elements
  234. assert.Equal(t, 2, len(peerInfos))
  235. // TODO: test
  236. }
  237. func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
  238. peer := p2p.CreateRandomPeer(false)
  239. pexR, book := createReactor(&PEXReactorConfig{})
  240. book.AddPrivateIDs([]string{string(peer.NodeInfo().ID)})
  241. defer teardownReactor(book)
  242. // we have to send a request to receive responses
  243. pexR.RequestAddrs(peer)
  244. size := book.Size()
  245. addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
  246. msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
  247. pexR.Receive(PexChannel, peer, msg)
  248. assert.Equal(t, size, book.Size())
  249. pexR.AddPeer(peer)
  250. assert.Equal(t, size, book.Size())
  251. }
  252. func TestPEXReactorDialPeer(t *testing.T) {
  253. pexR, book := createReactor(&PEXReactorConfig{})
  254. defer teardownReactor(book)
  255. sw := createSwitchAndAddReactors(pexR)
  256. sw.SetAddrBook(book)
  257. peer := newMockPeer()
  258. addr := peer.NodeInfo().NetAddress()
  259. assert.Equal(t, 0, pexR.AttemptsToDial(addr))
  260. // 1st unsuccessful attempt
  261. pexR.dialPeer(addr)
  262. assert.Equal(t, 1, pexR.AttemptsToDial(addr))
  263. // 2nd unsuccessful attempt
  264. pexR.dialPeer(addr)
  265. // must be skipped because it is too early
  266. assert.Equal(t, 1, pexR.AttemptsToDial(addr))
  267. if !testing.Short() {
  268. time.Sleep(3 * time.Second)
  269. // 3rd attempt
  270. pexR.dialPeer(addr)
  271. assert.Equal(t, 2, pexR.AttemptsToDial(addr))
  272. }
  273. }
  274. type mockPeer struct {
  275. *cmn.BaseService
  276. pubKey crypto.PubKey
  277. addr *p2p.NetAddress
  278. outbound, persistent bool
  279. }
  280. func newMockPeer() mockPeer {
  281. _, netAddr := p2p.CreateRoutableAddr()
  282. mp := mockPeer{
  283. addr: netAddr,
  284. pubKey: ed25519.GenPrivKey().PubKey(),
  285. }
  286. mp.BaseService = cmn.NewBaseService(nil, "MockPeer", mp)
  287. mp.Start()
  288. return mp
  289. }
  290. func (mp mockPeer) ID() p2p.ID { return mp.addr.ID }
  291. func (mp mockPeer) IsOutbound() bool { return mp.outbound }
  292. func (mp mockPeer) IsPersistent() bool { return mp.persistent }
  293. func (mp mockPeer) NodeInfo() p2p.NodeInfo {
  294. return p2p.NodeInfo{
  295. ID: mp.addr.ID,
  296. ListenAddr: mp.addr.DialString(),
  297. }
  298. }
  299. func (mockPeer) RemoteIP() net.IP { return net.ParseIP("127.0.0.1") }
  300. func (mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} }
  301. func (mockPeer) Send(byte, []byte) bool { return false }
  302. func (mockPeer) TrySend(byte, []byte) bool { return false }
  303. func (mockPeer) Set(string, interface{}) {}
  304. func (mockPeer) Get(string) interface{} { return nil }
  305. func (mockPeer) OriginalAddr() *p2p.NetAddress { return nil }
  306. func assertPeersWithTimeout(
  307. t *testing.T,
  308. switches []*p2p.Switch,
  309. checkPeriod, timeout time.Duration,
  310. nPeers int,
  311. ) {
  312. var (
  313. ticker = time.NewTicker(checkPeriod)
  314. remaining = timeout
  315. )
  316. for {
  317. select {
  318. case <-ticker.C:
  319. // check peers are connected
  320. allGood := true
  321. for _, s := range switches {
  322. outbound, inbound, _ := s.NumPeers()
  323. if outbound+inbound < nPeers {
  324. allGood = false
  325. break
  326. }
  327. }
  328. remaining -= checkPeriod
  329. if remaining < 0 {
  330. remaining = 0
  331. }
  332. if allGood {
  333. return
  334. }
  335. case <-time.After(remaining):
  336. numPeersStr := ""
  337. for i, s := range switches {
  338. outbound, inbound, _ := s.NumPeers()
  339. numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
  340. }
  341. t.Errorf(
  342. "expected all switches to be connected to at least %d peer(s) (switches: %s)",
  343. nPeers, numPeersStr,
  344. )
  345. return
  346. }
  347. }
  348. }
  349. // Creates a seed which knows about the provided addresses / source address pairs.
  350. // Starting and stopping the seed is left to the caller
  351. func testCreateSeed(dir string, id int, knownAddrs, srcAddrs []*p2p.NetAddress) *p2p.Switch {
  352. seed := p2p.MakeSwitch(
  353. cfg,
  354. id,
  355. "127.0.0.1",
  356. "123.123.123",
  357. func(i int, sw *p2p.Switch) *p2p.Switch {
  358. book := NewAddrBook(filepath.Join(dir, "addrbookSeed.json"), false)
  359. book.SetLogger(log.TestingLogger())
  360. for j := 0; j < len(knownAddrs); j++ {
  361. book.AddAddress(knownAddrs[j], srcAddrs[j])
  362. book.MarkGood(knownAddrs[j])
  363. }
  364. sw.SetAddrBook(book)
  365. sw.SetLogger(log.TestingLogger())
  366. r := NewPEXReactor(book, &PEXReactorConfig{})
  367. r.SetLogger(log.TestingLogger())
  368. sw.AddReactor("pex", r)
  369. return sw
  370. },
  371. )
  372. seed.AddListener(
  373. p2p.NewDefaultListener("tcp://"+seed.NodeInfo().ListenAddr, "", false, log.TestingLogger()),
  374. )
  375. return seed
  376. }
  377. // Creates a peer which knows about the provided seed.
  378. // Starting and stopping the peer is left to the caller
  379. func testCreatePeerWithSeed(dir string, id int, seed *p2p.Switch) *p2p.Switch {
  380. peer := p2p.MakeSwitch(
  381. cfg,
  382. id,
  383. "127.0.0.1",
  384. "123.123.123",
  385. func(i int, sw *p2p.Switch) *p2p.Switch {
  386. book := NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", id)), false)
  387. book.SetLogger(log.TestingLogger())
  388. sw.SetAddrBook(book)
  389. sw.SetLogger(log.TestingLogger())
  390. r := NewPEXReactor(
  391. book,
  392. &PEXReactorConfig{
  393. Seeds: []string{seed.NodeInfo().NetAddress().String()},
  394. },
  395. )
  396. r.SetLogger(log.TestingLogger())
  397. sw.AddReactor("pex", r)
  398. return sw
  399. },
  400. )
  401. peer.AddListener(
  402. p2p.NewDefaultListener("tcp://"+peer.NodeInfo().ListenAddr, "", false, log.TestingLogger()),
  403. )
  404. return peer
  405. }
  406. func createReactor(conf *PEXReactorConfig) (r *PEXReactor, book *addrBook) {
  407. // directory to store address book
  408. dir, err := ioutil.TempDir("", "pex_reactor")
  409. if err != nil {
  410. panic(err)
  411. }
  412. book = NewAddrBook(filepath.Join(dir, "addrbook.json"), true)
  413. book.SetLogger(log.TestingLogger())
  414. r = NewPEXReactor(book, conf)
  415. r.SetLogger(log.TestingLogger())
  416. return
  417. }
  418. func teardownReactor(book *addrBook) {
  419. err := os.RemoveAll(filepath.Dir(book.FilePath()))
  420. if err != nil {
  421. panic(err)
  422. }
  423. }
  424. func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch {
  425. sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
  426. sw.SetLogger(log.TestingLogger())
  427. for _, r := range reactors {
  428. sw.AddReactor(r.String(), r)
  429. r.SetSwitch(sw)
  430. }
  431. return sw
  432. }