You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

254 lines
6.7 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
8 years ago
  1. package p2p
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "math/rand"
  6. "os"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. crypto "github.com/tendermint/go-crypto"
  12. wire "github.com/tendermint/go-wire"
  13. cmn "github.com/tendermint/tmlibs/common"
  14. "github.com/tendermint/tmlibs/log"
  15. )
  16. func TestPEXReactorBasic(t *testing.T) {
  17. assert, require := assert.New(t), require.New(t)
  18. dir, err := ioutil.TempDir("", "pex_reactor")
  19. require.Nil(err)
  20. defer os.RemoveAll(dir) // nolint: errcheck
  21. book := NewAddrBook(dir+"addrbook.json", true)
  22. book.SetLogger(log.TestingLogger())
  23. r := NewPEXReactor(book, &PEXReactorConfig{})
  24. r.SetLogger(log.TestingLogger())
  25. assert.NotNil(r)
  26. assert.NotEmpty(r.GetChannels())
  27. }
  28. func TestPEXReactorAddRemovePeer(t *testing.T) {
  29. assert, require := assert.New(t), require.New(t)
  30. dir, err := ioutil.TempDir("", "pex_reactor")
  31. require.Nil(err)
  32. defer os.RemoveAll(dir) // nolint: errcheck
  33. book := NewAddrBook(dir+"addrbook.json", true)
  34. book.SetLogger(log.TestingLogger())
  35. r := NewPEXReactor(book, &PEXReactorConfig{})
  36. r.SetLogger(log.TestingLogger())
  37. size := book.Size()
  38. peer := createRandomPeer(false)
  39. r.AddPeer(peer)
  40. assert.Equal(size+1, book.Size())
  41. r.RemovePeer(peer, "peer not available")
  42. assert.Equal(size+1, book.Size())
  43. outboundPeer := createRandomPeer(true)
  44. r.AddPeer(outboundPeer)
  45. assert.Equal(size+1, book.Size(), "outbound peers should not be added to the address book")
  46. r.RemovePeer(outboundPeer, "peer not available")
  47. assert.Equal(size+1, book.Size())
  48. }
  49. func TestPEXReactorRunning(t *testing.T) {
  50. N := 3
  51. switches := make([]*Switch, N)
  52. dir, err := ioutil.TempDir("", "pex_reactor")
  53. require.Nil(t, err)
  54. defer os.RemoveAll(dir) // nolint: errcheck
  55. book := NewAddrBook(dir+"addrbook.json", false)
  56. book.SetLogger(log.TestingLogger())
  57. // create switches
  58. for i := 0; i < N; i++ {
  59. switches[i] = makeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
  60. sw.SetLogger(log.TestingLogger().With("switch", i))
  61. r := NewPEXReactor(book, &PEXReactorConfig{})
  62. r.SetLogger(log.TestingLogger())
  63. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  64. sw.AddReactor("pex", r)
  65. return sw
  66. })
  67. }
  68. // fill the address book and add listeners
  69. for _, s := range switches {
  70. addr, _ := NewNetAddressString(s.NodeInfo().ListenAddr)
  71. book.AddAddress(addr, addr)
  72. s.AddListener(NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true, log.TestingLogger()))
  73. }
  74. // start switches
  75. for _, s := range switches {
  76. err := s.Start() // start switch and reactors
  77. require.Nil(t, err)
  78. }
  79. assertSomePeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second)
  80. // stop them
  81. for _, s := range switches {
  82. s.Stop()
  83. }
  84. }
  85. func assertSomePeersWithTimeout(t *testing.T, switches []*Switch, checkPeriod, timeout time.Duration) {
  86. ticker := time.NewTicker(checkPeriod)
  87. remaining := timeout
  88. for {
  89. select {
  90. case <-ticker.C:
  91. // check peers are connected
  92. allGood := true
  93. for _, s := range switches {
  94. outbound, inbound, _ := s.NumPeers()
  95. if outbound+inbound == 0 {
  96. allGood = false
  97. }
  98. }
  99. remaining -= checkPeriod
  100. if remaining < 0 {
  101. remaining = 0
  102. }
  103. if allGood {
  104. return
  105. }
  106. case <-time.After(remaining):
  107. numPeersStr := ""
  108. for i, s := range switches {
  109. outbound, inbound, _ := s.NumPeers()
  110. numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
  111. }
  112. t.Errorf("expected all switches to be connected to at least one peer (switches: %s)", numPeersStr)
  113. return
  114. }
  115. }
  116. }
  117. func TestPEXReactorReceive(t *testing.T) {
  118. assert, require := assert.New(t), require.New(t)
  119. dir, err := ioutil.TempDir("", "pex_reactor")
  120. require.Nil(err)
  121. defer os.RemoveAll(dir) // nolint: errcheck
  122. book := NewAddrBook(dir+"addrbook.json", false)
  123. book.SetLogger(log.TestingLogger())
  124. r := NewPEXReactor(book, &PEXReactorConfig{})
  125. r.SetLogger(log.TestingLogger())
  126. peer := createRandomPeer(false)
  127. size := book.Size()
  128. netAddr, _ := NewNetAddressString(peer.NodeInfo().ListenAddr)
  129. addrs := []*NetAddress{netAddr}
  130. msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  131. r.Receive(PexChannel, peer, msg)
  132. assert.Equal(size+1, book.Size())
  133. msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
  134. r.Receive(PexChannel, peer, msg)
  135. }
  136. func TestPEXReactorAbuseFromPeer(t *testing.T) {
  137. assert, require := assert.New(t), require.New(t)
  138. dir, err := ioutil.TempDir("", "pex_reactor")
  139. require.Nil(err)
  140. defer os.RemoveAll(dir) // nolint: errcheck
  141. book := NewAddrBook(dir+"addrbook.json", true)
  142. book.SetLogger(log.TestingLogger())
  143. r := NewPEXReactor(book, &PEXReactorConfig{})
  144. r.SetLogger(log.TestingLogger())
  145. r.SetMaxMsgCountByPeer(5)
  146. peer := createRandomPeer(false)
  147. msg := wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
  148. for i := 0; i < 10; i++ {
  149. r.Receive(PexChannel, peer, msg)
  150. }
  151. assert.True(r.ReachedMaxMsgCountForPeer(peer.NodeInfo().ID()))
  152. }
  153. func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
  154. dir, err := ioutil.TempDir("", "pex_reactor")
  155. require.Nil(t, err)
  156. defer os.RemoveAll(dir) // nolint: errcheck
  157. book := NewAddrBook(dir+"addrbook.json", false)
  158. book.SetLogger(log.TestingLogger())
  159. // 1. create seed
  160. seed := makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
  161. sw.SetLogger(log.TestingLogger())
  162. r := NewPEXReactor(book, &PEXReactorConfig{})
  163. r.SetLogger(log.TestingLogger())
  164. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  165. sw.AddReactor("pex", r)
  166. return sw
  167. })
  168. seed.AddListener(NewDefaultListener("tcp", seed.NodeInfo().ListenAddr, true, log.TestingLogger()))
  169. err = seed.Start()
  170. require.Nil(t, err)
  171. defer seed.Stop()
  172. // 2. create usual peer
  173. sw := makeSwitch(config, 1, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
  174. sw.SetLogger(log.TestingLogger())
  175. r := NewPEXReactor(book, &PEXReactorConfig{Seeds: []string{seed.NodeInfo().ListenAddr}})
  176. r.SetLogger(log.TestingLogger())
  177. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  178. sw.AddReactor("pex", r)
  179. return sw
  180. })
  181. err = sw.Start()
  182. require.Nil(t, err)
  183. defer sw.Stop()
  184. // 3. check that peer at least connects to seed
  185. assertSomePeersWithTimeout(t, []*Switch{sw}, 10*time.Millisecond, 10*time.Second)
  186. }
  187. func createRoutableAddr() (addr string, netAddr *NetAddress) {
  188. for {
  189. addr = cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256)
  190. netAddr, _ = NewNetAddressString(addr)
  191. if netAddr.Routable() {
  192. break
  193. }
  194. }
  195. return
  196. }
  197. func createRandomPeer(outbound bool) *peer {
  198. addr, netAddr := createRoutableAddr()
  199. p := &peer{
  200. nodeInfo: NodeInfo{
  201. ListenAddr: netAddr.String(),
  202. PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(),
  203. },
  204. outbound: outbound,
  205. mconn: &MConnection{},
  206. }
  207. p.SetLogger(log.TestingLogger().With("peer", addr))
  208. return p
  209. }