You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

371 lines
11 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package pex
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "os"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. crypto "github.com/tendermint/go-crypto"
  11. wire "github.com/tendermint/go-wire"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. "github.com/tendermint/tmlibs/log"
  14. cfg "github.com/tendermint/tendermint/config"
  15. "github.com/tendermint/tendermint/p2p"
  16. "github.com/tendermint/tendermint/p2p/conn"
  17. )
  18. var (
  19. config *cfg.P2PConfig
  20. )
  21. func init() {
  22. config = cfg.DefaultP2PConfig()
  23. config.PexReactor = true
  24. }
  25. func TestPEXReactorBasic(t *testing.T) {
  26. assert, require := assert.New(t), require.New(t)
  27. dir, err := ioutil.TempDir("", "pex_reactor")
  28. require.Nil(err)
  29. defer os.RemoveAll(dir) // nolint: errcheck
  30. book := NewAddrBook(dir+"addrbook.json", true)
  31. book.SetLogger(log.TestingLogger())
  32. r := NewPEXReactor(book, &PEXReactorConfig{})
  33. r.SetLogger(log.TestingLogger())
  34. assert.NotNil(r)
  35. assert.NotEmpty(r.GetChannels())
  36. }
  37. func TestPEXReactorAddRemovePeer(t *testing.T) {
  38. assert, require := assert.New(t), require.New(t)
  39. dir, err := ioutil.TempDir("", "pex_reactor")
  40. require.Nil(err)
  41. defer os.RemoveAll(dir) // nolint: errcheck
  42. book := NewAddrBook(dir+"addrbook.json", true)
  43. book.SetLogger(log.TestingLogger())
  44. r := NewPEXReactor(book, &PEXReactorConfig{})
  45. r.SetLogger(log.TestingLogger())
  46. size := book.Size()
  47. peer := p2p.CreateRandomPeer(false)
  48. r.AddPeer(peer)
  49. assert.Equal(size+1, book.Size())
  50. r.RemovePeer(peer, "peer not available")
  51. assert.Equal(size+1, book.Size())
  52. outboundPeer := p2p.CreateRandomPeer(true)
  53. r.AddPeer(outboundPeer)
  54. assert.Equal(size+1, book.Size(), "outbound peers should not be added to the address book")
  55. r.RemovePeer(outboundPeer, "peer not available")
  56. assert.Equal(size+1, book.Size())
  57. }
  58. func TestPEXReactorRunning(t *testing.T) {
  59. N := 3
  60. switches := make([]*p2p.Switch, N)
  61. dir, err := ioutil.TempDir("", "pex_reactor")
  62. require.Nil(t, err)
  63. defer os.RemoveAll(dir) // nolint: errcheck
  64. book := NewAddrBook(dir+"addrbook.json", false)
  65. book.SetLogger(log.TestingLogger())
  66. // create switches
  67. for i := 0; i < N; i++ {
  68. switches[i] = p2p.MakeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
  69. sw.SetLogger(log.TestingLogger().With("switch", i))
  70. r := NewPEXReactor(book, &PEXReactorConfig{})
  71. r.SetLogger(log.TestingLogger())
  72. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  73. sw.AddReactor("pex", r)
  74. return sw
  75. })
  76. }
  77. // fill the address book and add listeners
  78. for _, s := range switches {
  79. addr, _ := p2p.NewNetAddressString(s.NodeInfo().ListenAddr)
  80. book.AddAddress(addr, addr)
  81. s.AddListener(p2p.NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true, log.TestingLogger()))
  82. }
  83. // start switches
  84. for _, s := range switches {
  85. err := s.Start() // start switch and reactors
  86. require.Nil(t, err)
  87. }
  88. assertPeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second, N-1)
  89. // stop them
  90. for _, s := range switches {
  91. s.Stop()
  92. }
  93. }
  94. func assertPeersWithTimeout(t *testing.T, switches []*p2p.Switch, checkPeriod, timeout time.Duration, nPeers int) {
  95. ticker := time.NewTicker(checkPeriod)
  96. remaining := timeout
  97. for {
  98. select {
  99. case <-ticker.C:
  100. // check peers are connected
  101. allGood := true
  102. for _, s := range switches {
  103. outbound, inbound, _ := s.NumPeers()
  104. if outbound+inbound < nPeers {
  105. allGood = false
  106. }
  107. }
  108. remaining -= checkPeriod
  109. if remaining < 0 {
  110. remaining = 0
  111. }
  112. if allGood {
  113. return
  114. }
  115. case <-time.After(remaining):
  116. numPeersStr := ""
  117. for i, s := range switches {
  118. outbound, inbound, _ := s.NumPeers()
  119. numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
  120. }
  121. t.Errorf("expected all switches to be connected to at least one peer (switches: %s)", numPeersStr)
  122. return
  123. }
  124. }
  125. }
  126. func TestPEXReactorReceive(t *testing.T) {
  127. assert, require := assert.New(t), require.New(t)
  128. dir, err := ioutil.TempDir("", "pex_reactor")
  129. require.Nil(err)
  130. defer os.RemoveAll(dir) // nolint: errcheck
  131. book := NewAddrBook(dir+"addrbook.json", false)
  132. book.SetLogger(log.TestingLogger())
  133. r := NewPEXReactor(book, &PEXReactorConfig{})
  134. r.SetLogger(log.TestingLogger())
  135. peer := p2p.CreateRandomPeer(false)
  136. // we have to send a request to receive responses
  137. r.RequestAddrs(peer)
  138. size := book.Size()
  139. addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
  140. msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  141. r.Receive(PexChannel, peer, msg)
  142. assert.Equal(size+1, book.Size())
  143. msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
  144. r.Receive(PexChannel, peer, msg)
  145. }
  146. func TestPEXReactorRequestMessageAbuse(t *testing.T) {
  147. assert, require := assert.New(t), require.New(t)
  148. dir, err := ioutil.TempDir("", "pex_reactor")
  149. require.Nil(err)
  150. defer os.RemoveAll(dir) // nolint: errcheck
  151. book := NewAddrBook(dir+"addrbook.json", true)
  152. book.SetLogger(log.TestingLogger())
  153. r := NewPEXReactor(book, &PEXReactorConfig{})
  154. sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
  155. sw.SetLogger(log.TestingLogger())
  156. sw.AddReactor("PEX", r)
  157. r.SetSwitch(sw)
  158. r.SetLogger(log.TestingLogger())
  159. peer := newMockPeer()
  160. p2p.AddPeerToSwitch(sw, peer)
  161. assert.True(sw.Peers().Has(peer.ID()))
  162. id := string(peer.ID())
  163. msg := wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
  164. // first time creates the entry
  165. r.Receive(PexChannel, peer, msg)
  166. assert.True(r.lastReceivedRequests.Has(id))
  167. assert.True(sw.Peers().Has(peer.ID()))
  168. // next time sets the last time value
  169. r.Receive(PexChannel, peer, msg)
  170. assert.True(r.lastReceivedRequests.Has(id))
  171. assert.True(sw.Peers().Has(peer.ID()))
  172. // third time is too many too soon - peer is removed
  173. r.Receive(PexChannel, peer, msg)
  174. assert.False(r.lastReceivedRequests.Has(id))
  175. assert.False(sw.Peers().Has(peer.ID()))
  176. }
  177. func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
  178. assert, require := assert.New(t), require.New(t)
  179. dir, err := ioutil.TempDir("", "pex_reactor")
  180. require.Nil(err)
  181. defer os.RemoveAll(dir) // nolint: errcheck
  182. book := NewAddrBook(dir+"addrbook.json", true)
  183. book.SetLogger(log.TestingLogger())
  184. r := NewPEXReactor(book, &PEXReactorConfig{})
  185. sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
  186. sw.SetLogger(log.TestingLogger())
  187. sw.AddReactor("PEX", r)
  188. r.SetSwitch(sw)
  189. r.SetLogger(log.TestingLogger())
  190. peer := newMockPeer()
  191. p2p.AddPeerToSwitch(sw, peer)
  192. assert.True(sw.Peers().Has(peer.ID()))
  193. id := string(peer.ID())
  194. // request addrs from the peer
  195. r.RequestAddrs(peer)
  196. assert.True(r.requestsSent.Has(id))
  197. assert.True(sw.Peers().Has(peer.ID()))
  198. addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
  199. msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  200. // receive some addrs. should clear the request
  201. r.Receive(PexChannel, peer, msg)
  202. assert.False(r.requestsSent.Has(id))
  203. assert.True(sw.Peers().Has(peer.ID()))
  204. // receiving more addrs causes a disconnect
  205. r.Receive(PexChannel, peer, msg)
  206. assert.False(sw.Peers().Has(peer.ID()))
  207. }
  208. func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
  209. dir, err := ioutil.TempDir("", "pex_reactor")
  210. require.Nil(t, err)
  211. defer os.RemoveAll(dir) // nolint: errcheck
  212. book := NewAddrBook(dir+"addrbook.json", false)
  213. book.SetLogger(log.TestingLogger())
  214. // 1. create seed
  215. seed := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
  216. sw.SetLogger(log.TestingLogger())
  217. r := NewPEXReactor(book, &PEXReactorConfig{})
  218. r.SetLogger(log.TestingLogger())
  219. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  220. sw.AddReactor("pex", r)
  221. return sw
  222. })
  223. seed.AddListener(p2p.NewDefaultListener("tcp", seed.NodeInfo().ListenAddr, true, log.TestingLogger()))
  224. err = seed.Start()
  225. require.Nil(t, err)
  226. defer seed.Stop()
  227. // 2. create usual peer
  228. sw := p2p.MakeSwitch(config, 1, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
  229. sw.SetLogger(log.TestingLogger())
  230. r := NewPEXReactor(book, &PEXReactorConfig{Seeds: []string{seed.NodeInfo().ListenAddr}})
  231. r.SetLogger(log.TestingLogger())
  232. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  233. sw.AddReactor("pex", r)
  234. return sw
  235. })
  236. err = sw.Start()
  237. require.Nil(t, err)
  238. defer sw.Stop()
  239. // 3. check that peer at least connects to seed
  240. assertPeersWithTimeout(t, []*p2p.Switch{sw}, 10*time.Millisecond, 10*time.Second, 1)
  241. }
  242. func TestPEXReactorCrawlStatus(t *testing.T) {
  243. assert, require := assert.New(t), require.New(t)
  244. dir, err := ioutil.TempDir("", "pex_reactor")
  245. require.Nil(err)
  246. defer os.RemoveAll(dir) // nolint: errcheck
  247. book := NewAddrBook(dir+"addrbook.json", false)
  248. book.SetLogger(log.TestingLogger())
  249. pexR := NewPEXReactor(book, &PEXReactorConfig{SeedMode: true})
  250. // Seed/Crawler mode uses data from the Switch
  251. p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
  252. pexR.SetLogger(log.TestingLogger())
  253. sw.SetLogger(log.TestingLogger().With("switch", i))
  254. sw.AddReactor("pex", pexR)
  255. return sw
  256. })
  257. // Create a peer, add it to the peer set and the addrbook.
  258. peer := p2p.CreateRandomPeer(false)
  259. p2p.AddPeerToSwitch(pexR.Switch, peer)
  260. addr1 := peer.NodeInfo().NetAddress()
  261. pexR.book.AddAddress(addr1, addr1)
  262. // Add a non-connected address to the book.
  263. _, addr2 := p2p.CreateRoutableAddr()
  264. pexR.book.AddAddress(addr2, addr1)
  265. // Get some peerInfos to crawl
  266. peerInfos := pexR.getPeersToCrawl()
  267. // Make sure it has the proper number of elements
  268. assert.Equal(2, len(peerInfos))
  269. // TODO: test
  270. }
  271. type mockPeer struct {
  272. *cmn.BaseService
  273. pubKey crypto.PubKey
  274. addr *p2p.NetAddress
  275. outbound, persistent bool
  276. }
  277. func newMockPeer() mockPeer {
  278. _, netAddr := p2p.CreateRoutableAddr()
  279. mp := mockPeer{
  280. addr: netAddr,
  281. pubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(),
  282. }
  283. mp.BaseService = cmn.NewBaseService(nil, "MockPeer", mp)
  284. mp.Start()
  285. return mp
  286. }
  287. func (mp mockPeer) ID() p2p.ID { return p2p.PubKeyToID(mp.pubKey) }
  288. func (mp mockPeer) IsOutbound() bool { return mp.outbound }
  289. func (mp mockPeer) IsPersistent() bool { return mp.persistent }
  290. func (mp mockPeer) NodeInfo() p2p.NodeInfo {
  291. return p2p.NodeInfo{
  292. PubKey: mp.pubKey,
  293. ListenAddr: mp.addr.DialString(),
  294. }
  295. }
  296. func (mp mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} }
  297. func (mp mockPeer) Send(byte, interface{}) bool { return false }
  298. func (mp mockPeer) TrySend(byte, interface{}) bool { return false }
  299. func (mp mockPeer) Set(string, interface{}) {}
  300. func (mp mockPeer) Get(string) interface{} { return nil }
  301. func (mp mockPeer) QuitChan() <-chan struct{} { return mp.Quit }