You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

386 lines
11 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
8 years ago
  1. package p2p
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "math/rand"
  6. "os"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. crypto "github.com/tendermint/go-crypto"
  12. wire "github.com/tendermint/go-wire"
  13. cmn "github.com/tendermint/tmlibs/common"
  14. "github.com/tendermint/tmlibs/log"
  15. )
  16. func TestPEXReactorBasic(t *testing.T) {
  17. assert, require := assert.New(t), require.New(t)
  18. dir, err := ioutil.TempDir("", "pex_reactor")
  19. require.Nil(err)
  20. defer os.RemoveAll(dir) // nolint: errcheck
  21. book := NewAddrBook(dir+"addrbook.json", true)
  22. book.SetLogger(log.TestingLogger())
  23. r := NewPEXReactor(book, &PEXReactorConfig{})
  24. r.SetLogger(log.TestingLogger())
  25. assert.NotNil(r)
  26. assert.NotEmpty(r.GetChannels())
  27. }
  28. func TestPEXReactorAddRemovePeer(t *testing.T) {
  29. assert, require := assert.New(t), require.New(t)
  30. dir, err := ioutil.TempDir("", "pex_reactor")
  31. require.Nil(err)
  32. defer os.RemoveAll(dir) // nolint: errcheck
  33. book := NewAddrBook(dir+"addrbook.json", true)
  34. book.SetLogger(log.TestingLogger())
  35. r := NewPEXReactor(book, &PEXReactorConfig{})
  36. r.SetLogger(log.TestingLogger())
  37. size := book.Size()
  38. peer := createRandomPeer(false)
  39. r.AddPeer(peer)
  40. assert.Equal(size+1, book.Size())
  41. r.RemovePeer(peer, "peer not available")
  42. assert.Equal(size+1, book.Size())
  43. outboundPeer := createRandomPeer(true)
  44. r.AddPeer(outboundPeer)
  45. assert.Equal(size+1, book.Size(), "outbound peers should not be added to the address book")
  46. r.RemovePeer(outboundPeer, "peer not available")
  47. assert.Equal(size+1, book.Size())
  48. }
  49. func TestPEXReactorRunning(t *testing.T) {
  50. N := 3
  51. switches := make([]*Switch, N)
  52. dir, err := ioutil.TempDir("", "pex_reactor")
  53. require.Nil(t, err)
  54. defer os.RemoveAll(dir) // nolint: errcheck
  55. book := NewAddrBook(dir+"addrbook.json", false)
  56. book.SetLogger(log.TestingLogger())
  57. // create switches
  58. for i := 0; i < N; i++ {
  59. switches[i] = makeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
  60. sw.SetLogger(log.TestingLogger().With("switch", i))
  61. r := NewPEXReactor(book, &PEXReactorConfig{})
  62. r.SetLogger(log.TestingLogger())
  63. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  64. sw.AddReactor("pex", r)
  65. return sw
  66. })
  67. }
  68. // fill the address book and add listeners
  69. for _, s := range switches {
  70. addr, _ := NewNetAddressString(s.NodeInfo().ListenAddr)
  71. book.AddAddress(addr, addr)
  72. s.AddListener(NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true, log.TestingLogger()))
  73. }
  74. // start switches
  75. for _, s := range switches {
  76. err := s.Start() // start switch and reactors
  77. require.Nil(t, err)
  78. }
  79. assertSomePeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second)
  80. // stop them
  81. for _, s := range switches {
  82. s.Stop()
  83. }
  84. }
  85. func assertSomePeersWithTimeout(t *testing.T, switches []*Switch, checkPeriod, timeout time.Duration) {
  86. ticker := time.NewTicker(checkPeriod)
  87. remaining := timeout
  88. for {
  89. select {
  90. case <-ticker.C:
  91. // check peers are connected
  92. allGood := true
  93. for _, s := range switches {
  94. outbound, inbound, _ := s.NumPeers()
  95. if outbound+inbound == 0 {
  96. allGood = false
  97. }
  98. }
  99. remaining -= checkPeriod
  100. if remaining < 0 {
  101. remaining = 0
  102. }
  103. if allGood {
  104. return
  105. }
  106. case <-time.After(remaining):
  107. numPeersStr := ""
  108. for i, s := range switches {
  109. outbound, inbound, _ := s.NumPeers()
  110. numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
  111. }
  112. t.Errorf("expected all switches to be connected to at least one peer (switches: %s)", numPeersStr)
  113. return
  114. }
  115. }
  116. }
  117. func TestPEXReactorReceive(t *testing.T) {
  118. assert, require := assert.New(t), require.New(t)
  119. dir, err := ioutil.TempDir("", "pex_reactor")
  120. require.Nil(err)
  121. defer os.RemoveAll(dir) // nolint: errcheck
  122. book := NewAddrBook(dir+"addrbook.json", false)
  123. book.SetLogger(log.TestingLogger())
  124. r := NewPEXReactor(book, &PEXReactorConfig{})
  125. r.SetLogger(log.TestingLogger())
  126. peer := createRandomPeer(false)
  127. // we have to send a request to receive responses
  128. r.RequestAddrs(peer)
  129. size := book.Size()
  130. addrs := []*NetAddress{peer.NodeInfo().NetAddress()}
  131. msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  132. r.Receive(PexChannel, peer, msg)
  133. assert.Equal(size+1, book.Size())
  134. msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
  135. r.Receive(PexChannel, peer, msg)
  136. }
  137. func TestPEXReactorRequestMessageAbuse(t *testing.T) {
  138. assert, require := assert.New(t), require.New(t)
  139. dir, err := ioutil.TempDir("", "pex_reactor")
  140. require.Nil(err)
  141. defer os.RemoveAll(dir) // nolint: errcheck
  142. book := NewAddrBook(dir+"addrbook.json", true)
  143. book.SetLogger(log.TestingLogger())
  144. r := NewPEXReactor(book, &PEXReactorConfig{})
  145. sw := makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { return sw })
  146. sw.SetLogger(log.TestingLogger())
  147. sw.AddReactor("PEX", r)
  148. r.SetSwitch(sw)
  149. r.SetLogger(log.TestingLogger())
  150. peer := newMockPeer()
  151. sw.peers.Add(peer)
  152. assert.True(sw.Peers().Has(peer.ID()))
  153. id := string(peer.ID())
  154. msg := wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
  155. // first time creates the entry
  156. r.Receive(PexChannel, peer, msg)
  157. assert.True(r.lastReceivedRequests.Has(id))
  158. assert.True(sw.Peers().Has(peer.ID()))
  159. // next time sets the last time value
  160. r.Receive(PexChannel, peer, msg)
  161. assert.True(r.lastReceivedRequests.Has(id))
  162. assert.True(sw.Peers().Has(peer.ID()))
  163. // third time is too many too soon - peer is removed
  164. r.Receive(PexChannel, peer, msg)
  165. assert.False(r.lastReceivedRequests.Has(id))
  166. assert.False(sw.Peers().Has(peer.ID()))
  167. }
  168. func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
  169. assert, require := assert.New(t), require.New(t)
  170. dir, err := ioutil.TempDir("", "pex_reactor")
  171. require.Nil(err)
  172. defer os.RemoveAll(dir) // nolint: errcheck
  173. book := NewAddrBook(dir+"addrbook.json", true)
  174. book.SetLogger(log.TestingLogger())
  175. r := NewPEXReactor(book, &PEXReactorConfig{})
  176. sw := makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { return sw })
  177. sw.SetLogger(log.TestingLogger())
  178. sw.AddReactor("PEX", r)
  179. r.SetSwitch(sw)
  180. r.SetLogger(log.TestingLogger())
  181. peer := newMockPeer()
  182. sw.peers.Add(peer)
  183. assert.True(sw.Peers().Has(peer.ID()))
  184. id := string(peer.ID())
  185. // request addrs from the peer
  186. r.RequestAddrs(peer)
  187. assert.True(r.requestsSent.Has(id))
  188. assert.True(sw.Peers().Has(peer.ID()))
  189. addrs := []*NetAddress{peer.NodeInfo().NetAddress()}
  190. msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  191. // receive some addrs. should clear the request
  192. r.Receive(PexChannel, peer, msg)
  193. assert.False(r.requestsSent.Has(id))
  194. assert.True(sw.Peers().Has(peer.ID()))
  195. // receiving more addrs causes a disconnect
  196. r.Receive(PexChannel, peer, msg)
  197. assert.False(sw.Peers().Has(peer.ID()))
  198. }
  199. func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
  200. dir, err := ioutil.TempDir("", "pex_reactor")
  201. require.Nil(t, err)
  202. defer os.RemoveAll(dir) // nolint: errcheck
  203. book := NewAddrBook(dir+"addrbook.json", false)
  204. book.SetLogger(log.TestingLogger())
  205. // 1. create seed
  206. seed := makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
  207. sw.SetLogger(log.TestingLogger())
  208. r := NewPEXReactor(book, &PEXReactorConfig{})
  209. r.SetLogger(log.TestingLogger())
  210. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  211. sw.AddReactor("pex", r)
  212. return sw
  213. })
  214. seed.AddListener(NewDefaultListener("tcp", seed.NodeInfo().ListenAddr, true, log.TestingLogger()))
  215. err = seed.Start()
  216. require.Nil(t, err)
  217. defer seed.Stop()
  218. // 2. create usual peer
  219. sw := makeSwitch(config, 1, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
  220. sw.SetLogger(log.TestingLogger())
  221. r := NewPEXReactor(book, &PEXReactorConfig{Seeds: []string{seed.NodeInfo().ListenAddr}})
  222. r.SetLogger(log.TestingLogger())
  223. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  224. sw.AddReactor("pex", r)
  225. return sw
  226. })
  227. err = sw.Start()
  228. require.Nil(t, err)
  229. defer sw.Stop()
  230. // 3. check that peer at least connects to seed
  231. assertSomePeersWithTimeout(t, []*Switch{sw}, 10*time.Millisecond, 10*time.Second)
  232. }
  233. func TestPEXReactorCrawlStatus(t *testing.T) {
  234. assert, require := assert.New(t), require.New(t)
  235. dir, err := ioutil.TempDir("", "pex_reactor")
  236. require.Nil(err)
  237. defer os.RemoveAll(dir) // nolint: errcheck
  238. book := NewAddrBook(dir+"addrbook.json", false)
  239. book.SetLogger(log.TestingLogger())
  240. pexR := NewPEXReactor(book, &PEXReactorConfig{SeedMode: true})
  241. // Seed/Crawler mode uses data from the Switch
  242. makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
  243. pexR.SetLogger(log.TestingLogger())
  244. sw.SetLogger(log.TestingLogger().With("switch", i))
  245. sw.AddReactor("pex", pexR)
  246. return sw
  247. })
  248. // Create a peer, add it to the peer set and the addrbook.
  249. peer := createRandomPeer(false)
  250. pexR.Switch.peers.Add(peer)
  251. addr1 := peer.NodeInfo().NetAddress()
  252. pexR.book.AddAddress(addr1, addr1)
  253. // Add a non-connected address to the book.
  254. _, addr2 := createRoutableAddr()
  255. pexR.book.AddAddress(addr2, addr1)
  256. // Get some peerInfos to crawl
  257. peerInfos := pexR.getPeersToCrawl()
  258. // Make sure it has the proper number of elements
  259. assert.Equal(2, len(peerInfos))
  260. // TODO: test
  261. }
  262. func createRoutableAddr() (addr string, netAddr *NetAddress) {
  263. for {
  264. var err error
  265. addr = cmn.Fmt("%X@%v.%v.%v.%v:46656", cmn.RandBytes(20), rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256)
  266. netAddr, err = NewNetAddressString(addr)
  267. if err != nil {
  268. panic(err)
  269. }
  270. if netAddr.Routable() {
  271. break
  272. }
  273. }
  274. return
  275. }
  276. func createRandomPeer(outbound bool) *peer {
  277. addr, netAddr := createRoutableAddr()
  278. p := &peer{
  279. nodeInfo: NodeInfo{
  280. ListenAddr: netAddr.DialString(),
  281. PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(),
  282. },
  283. outbound: outbound,
  284. mconn: &MConnection{},
  285. }
  286. p.SetLogger(log.TestingLogger().With("peer", addr))
  287. return p
  288. }
  289. type mockPeer struct {
  290. *cmn.BaseService
  291. pubKey crypto.PubKey
  292. addr *NetAddress
  293. outbound, persistent bool
  294. }
  295. func newMockPeer() mockPeer {
  296. _, netAddr := createRoutableAddr()
  297. mp := mockPeer{
  298. addr: netAddr,
  299. pubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(),
  300. }
  301. mp.BaseService = cmn.NewBaseService(nil, "MockPeer", mp)
  302. mp.Start()
  303. return mp
  304. }
  305. func (mp mockPeer) ID() ID { return PubKeyToID(mp.pubKey) }
  306. func (mp mockPeer) IsOutbound() bool { return mp.outbound }
  307. func (mp mockPeer) IsPersistent() bool { return mp.persistent }
  308. func (mp mockPeer) NodeInfo() NodeInfo {
  309. return NodeInfo{
  310. PubKey: mp.pubKey,
  311. ListenAddr: mp.addr.DialString(),
  312. }
  313. }
  314. func (mp mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
  315. func (mp mockPeer) Send(byte, interface{}) bool { return false }
  316. func (mp mockPeer) TrySend(byte, interface{}) bool { return false }
  317. func (mp mockPeer) Set(string, interface{}) {}
  318. func (mp mockPeer) Get(string) interface{} { return nil }