You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

399 lines
10 KiB

7 years ago
7 years ago
7 years ago
p2p: introduce peerConn to simplify peer creation (#1226) * expose AuthEnc in the P2P config if AuthEnc is true, dialed peers must have a node ID in the address and it must match the persistent pubkey from the secret handshake. Refs #1157 * fixes after my own review * fix docs * fix build failure ``` p2p/pex/pex_reactor_test.go:288:88: cannot use seed.NodeInfo().NetAddress() (type *p2p.NetAddress) as type string in array or slice literal ``` * p2p: introduce peerConn to simplify peer creation * Introduce `peerConn` containing the known fields of `peer` * `peer` only created in `sw.addPeer` once handshake is complete and NodeInfo is checked * Eliminates some mutable variables and makes the code flow better * Simplifies the `newXxxPeer` funcs * Use ID instead of PubKey where possible. * SetPubKeyFilter -> SetIDFilter * nodeInfo.Validate takes ID * remove peer.PubKey() * persistent node ids * fixes from review * test: use ip_plus_id.sh more * fix invalid memory panic during fast_sync test ``` 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: panic: runtime error: invalid memory address or nil pointer dereference 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: [signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0x98dd3e] 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: goroutine 3432 [running]: 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.newOutboundPeerConn(0xc423fd1380, 0xc420933e00, 0x1, 0x1239a60, 0 xc420128c40, 0x2, 0x42caf6, 0xc42001f300, 0xc422831d98, 0xc4227951c0, ...) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/peer.go:123 +0x31e 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).addOutboundPeerWithConfig(0xc4200ad040, 0xc423fd1380, 0 xc420933e00, 0xc423f48801, 0x28, 0x2) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:455 +0x12b 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).DialPeerWithAddress(0xc4200ad040, 0xc423fd1380, 0x1, 0x 0, 0x0) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:371 +0xdc 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).reconnectToPeer(0xc4200ad040, 0x123e000, 0xc42007bb00) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:290 +0x25f 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: created by github.com/tendermint/tendermint/p2p.(*Switch).StopPeerForError 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:256 +0x1b7 ```
6 years ago
7 years ago
  1. package pex
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "os"
  6. "path/filepath"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. crypto "github.com/tendermint/go-crypto"
  12. wire "github.com/tendermint/go-wire"
  13. cfg "github.com/tendermint/tendermint/config"
  14. "github.com/tendermint/tendermint/p2p"
  15. "github.com/tendermint/tendermint/p2p/conn"
  16. cmn "github.com/tendermint/tmlibs/common"
  17. "github.com/tendermint/tmlibs/log"
  18. )
  19. var (
  20. config *cfg.P2PConfig
  21. )
  22. func init() {
  23. config = cfg.DefaultP2PConfig()
  24. config.PexReactor = true
  25. }
  26. func TestPEXReactorBasic(t *testing.T) {
  27. r, book := createReactor(&PEXReactorConfig{})
  28. defer teardownReactor(book)
  29. assert.NotNil(t, r)
  30. assert.NotEmpty(t, r.GetChannels())
  31. }
  32. func TestPEXReactorAddRemovePeer(t *testing.T) {
  33. r, book := createReactor(&PEXReactorConfig{})
  34. defer teardownReactor(book)
  35. size := book.Size()
  36. peer := p2p.CreateRandomPeer(false)
  37. r.AddPeer(peer)
  38. assert.Equal(t, size+1, book.Size())
  39. r.RemovePeer(peer, "peer not available")
  40. assert.Equal(t, size+1, book.Size())
  41. outboundPeer := p2p.CreateRandomPeer(true)
  42. r.AddPeer(outboundPeer)
  43. assert.Equal(t, size+1, book.Size(), "outbound peers should not be added to the address book")
  44. r.RemovePeer(outboundPeer, "peer not available")
  45. assert.Equal(t, size+1, book.Size())
  46. }
  47. func TestPEXReactorRunning(t *testing.T) {
  48. N := 3
  49. switches := make([]*p2p.Switch, N)
  50. dir, err := ioutil.TempDir("", "pex_reactor")
  51. require.Nil(t, err)
  52. defer os.RemoveAll(dir) // nolint: errcheck
  53. book := NewAddrBook(filepath.Join(dir, "addrbook.json"), false)
  54. book.SetLogger(log.TestingLogger())
  55. // create switches
  56. for i := 0; i < N; i++ {
  57. switches[i] = p2p.MakeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
  58. sw.SetLogger(log.TestingLogger().With("switch", i))
  59. r := NewPEXReactor(book, &PEXReactorConfig{})
  60. r.SetLogger(log.TestingLogger())
  61. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  62. sw.AddReactor("pex", r)
  63. return sw
  64. })
  65. }
  66. // fill the address book and add listeners
  67. for _, s := range switches {
  68. addr := s.NodeInfo().NetAddress()
  69. book.AddAddress(addr, addr)
  70. s.AddListener(p2p.NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true, log.TestingLogger()))
  71. }
  72. // start switches
  73. for _, s := range switches {
  74. err := s.Start() // start switch and reactors
  75. require.Nil(t, err)
  76. }
  77. assertPeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second, N-1)
  78. // stop them
  79. for _, s := range switches {
  80. s.Stop()
  81. }
  82. }
  83. func TestPEXReactorReceive(t *testing.T) {
  84. r, book := createReactor(&PEXReactorConfig{})
  85. defer teardownReactor(book)
  86. peer := p2p.CreateRandomPeer(false)
  87. // we have to send a request to receive responses
  88. r.RequestAddrs(peer)
  89. size := book.Size()
  90. addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
  91. msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  92. r.Receive(PexChannel, peer, msg)
  93. assert.Equal(t, size+1, book.Size())
  94. msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
  95. r.Receive(PexChannel, peer, msg)
  96. }
  97. func TestPEXReactorRequestMessageAbuse(t *testing.T) {
  98. r, book := createReactor(&PEXReactorConfig{})
  99. defer teardownReactor(book)
  100. sw := createSwitchAndAddReactors(r)
  101. peer := newMockPeer()
  102. p2p.AddPeerToSwitch(sw, peer)
  103. assert.True(t, sw.Peers().Has(peer.ID()))
  104. id := string(peer.ID())
  105. msg := wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
  106. // first time creates the entry
  107. r.Receive(PexChannel, peer, msg)
  108. assert.True(t, r.lastReceivedRequests.Has(id))
  109. assert.True(t, sw.Peers().Has(peer.ID()))
  110. // next time sets the last time value
  111. r.Receive(PexChannel, peer, msg)
  112. assert.True(t, r.lastReceivedRequests.Has(id))
  113. assert.True(t, sw.Peers().Has(peer.ID()))
  114. // third time is too many too soon - peer is removed
  115. r.Receive(PexChannel, peer, msg)
  116. assert.False(t, r.lastReceivedRequests.Has(id))
  117. assert.False(t, sw.Peers().Has(peer.ID()))
  118. }
  119. func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
  120. r, book := createReactor(&PEXReactorConfig{})
  121. defer teardownReactor(book)
  122. sw := createSwitchAndAddReactors(r)
  123. peer := newMockPeer()
  124. p2p.AddPeerToSwitch(sw, peer)
  125. assert.True(t, sw.Peers().Has(peer.ID()))
  126. id := string(peer.ID())
  127. // request addrs from the peer
  128. r.RequestAddrs(peer)
  129. assert.True(t, r.requestsSent.Has(id))
  130. assert.True(t, sw.Peers().Has(peer.ID()))
  131. addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
  132. msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  133. // receive some addrs. should clear the request
  134. r.Receive(PexChannel, peer, msg)
  135. assert.False(t, r.requestsSent.Has(id))
  136. assert.True(t, sw.Peers().Has(peer.ID()))
  137. // receiving more addrs causes a disconnect
  138. r.Receive(PexChannel, peer, msg)
  139. assert.False(t, sw.Peers().Has(peer.ID()))
  140. }
  141. func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
  142. dir, err := ioutil.TempDir("", "pex_reactor")
  143. require.Nil(t, err)
  144. defer os.RemoveAll(dir) // nolint: errcheck
  145. book := NewAddrBook(filepath.Join(dir, "addrbook.json"), false)
  146. book.SetLogger(log.TestingLogger())
  147. // 1. create seed
  148. seed := p2p.MakeSwitch(
  149. config,
  150. 0,
  151. "127.0.0.1",
  152. "123.123.123",
  153. func(i int, sw *p2p.Switch) *p2p.Switch {
  154. sw.SetLogger(log.TestingLogger())
  155. r := NewPEXReactor(book, &PEXReactorConfig{})
  156. r.SetLogger(log.TestingLogger())
  157. sw.AddReactor("pex", r)
  158. return sw
  159. },
  160. )
  161. seed.AddListener(
  162. p2p.NewDefaultListener(
  163. "tcp",
  164. seed.NodeInfo().ListenAddr,
  165. true,
  166. log.TestingLogger(),
  167. ),
  168. )
  169. require.Nil(t, seed.Start())
  170. defer seed.Stop()
  171. // 2. create usual peer with only seed configured.
  172. peer := p2p.MakeSwitch(
  173. config,
  174. 1,
  175. "127.0.0.1",
  176. "123.123.123",
  177. func(i int, sw *p2p.Switch) *p2p.Switch {
  178. sw.SetLogger(log.TestingLogger())
  179. r := NewPEXReactor(
  180. book,
  181. &PEXReactorConfig{
  182. Seeds: []string{seed.NodeInfo().NetAddress().String()},
  183. },
  184. )
  185. r.SetLogger(log.TestingLogger())
  186. sw.AddReactor("pex", r)
  187. return sw
  188. },
  189. )
  190. require.Nil(t, peer.Start())
  191. defer peer.Stop()
  192. // 3. check that the peer connects to seed immediately
  193. assertPeersWithTimeout(t, []*p2p.Switch{peer}, 10*time.Millisecond, 1*time.Second, 1)
  194. }
  195. func TestPEXReactorCrawlStatus(t *testing.T) {
  196. pexR, book := createReactor(&PEXReactorConfig{SeedMode: true})
  197. defer teardownReactor(book)
  198. // Seed/Crawler mode uses data from the Switch
  199. _ = createSwitchAndAddReactors(pexR)
  200. // Create a peer, add it to the peer set and the addrbook.
  201. peer := p2p.CreateRandomPeer(false)
  202. p2p.AddPeerToSwitch(pexR.Switch, peer)
  203. addr1 := peer.NodeInfo().NetAddress()
  204. pexR.book.AddAddress(addr1, addr1)
  205. // Add a non-connected address to the book.
  206. _, addr2 := p2p.CreateRoutableAddr()
  207. pexR.book.AddAddress(addr2, addr1)
  208. // Get some peerInfos to crawl
  209. peerInfos := pexR.getPeersToCrawl()
  210. // Make sure it has the proper number of elements
  211. assert.Equal(t, 2, len(peerInfos))
  212. // TODO: test
  213. }
  214. func TestPEXReactorDialPeer(t *testing.T) {
  215. pexR, book := createReactor(&PEXReactorConfig{})
  216. defer teardownReactor(book)
  217. _ = createSwitchAndAddReactors(pexR)
  218. peer := newMockPeer()
  219. addr := peer.NodeInfo().NetAddress()
  220. assert.Equal(t, 0, pexR.AttemptsToDial(addr))
  221. // 1st unsuccessful attempt
  222. pexR.dialPeer(addr)
  223. assert.Equal(t, 1, pexR.AttemptsToDial(addr))
  224. // 2nd unsuccessful attempt
  225. pexR.dialPeer(addr)
  226. // must be skipped because it is too early
  227. assert.Equal(t, 1, pexR.AttemptsToDial(addr))
  228. }
  229. type mockPeer struct {
  230. *cmn.BaseService
  231. pubKey crypto.PubKey
  232. addr *p2p.NetAddress
  233. outbound, persistent bool
  234. }
  235. func newMockPeer() mockPeer {
  236. _, netAddr := p2p.CreateRoutableAddr()
  237. mp := mockPeer{
  238. addr: netAddr,
  239. pubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(),
  240. }
  241. mp.BaseService = cmn.NewBaseService(nil, "MockPeer", mp)
  242. mp.Start()
  243. return mp
  244. }
  245. func (mp mockPeer) ID() p2p.ID { return p2p.PubKeyToID(mp.pubKey) }
  246. func (mp mockPeer) IsOutbound() bool { return mp.outbound }
  247. func (mp mockPeer) IsPersistent() bool { return mp.persistent }
  248. func (mp mockPeer) NodeInfo() p2p.NodeInfo {
  249. return p2p.NodeInfo{
  250. PubKey: mp.pubKey,
  251. ListenAddr: mp.addr.DialString(),
  252. }
  253. }
  254. func (mp mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} }
  255. func (mp mockPeer) Send(byte, interface{}) bool { return false }
  256. func (mp mockPeer) TrySend(byte, interface{}) bool { return false }
  257. func (mp mockPeer) Set(string, interface{}) {}
  258. func (mp mockPeer) Get(string) interface{} { return nil }
  259. func assertPeersWithTimeout(
  260. t *testing.T,
  261. switches []*p2p.Switch,
  262. checkPeriod, timeout time.Duration,
  263. nPeers int,
  264. ) {
  265. var (
  266. ticker = time.NewTicker(checkPeriod)
  267. remaining = timeout
  268. )
  269. for {
  270. select {
  271. case <-ticker.C:
  272. // check peers are connected
  273. allGood := true
  274. for _, s := range switches {
  275. outbound, inbound, _ := s.NumPeers()
  276. if outbound+inbound < nPeers {
  277. allGood = false
  278. }
  279. }
  280. remaining -= checkPeriod
  281. if remaining < 0 {
  282. remaining = 0
  283. }
  284. if allGood {
  285. return
  286. }
  287. case <-time.After(remaining):
  288. numPeersStr := ""
  289. for i, s := range switches {
  290. outbound, inbound, _ := s.NumPeers()
  291. numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
  292. }
  293. t.Errorf(
  294. "expected all switches to be connected to at least one peer (switches: %s)",
  295. numPeersStr,
  296. )
  297. return
  298. }
  299. }
  300. }
  301. func createReactor(config *PEXReactorConfig) (r *PEXReactor, book *addrBook) {
  302. dir, err := ioutil.TempDir("", "pex_reactor")
  303. if err != nil {
  304. panic(err)
  305. }
  306. book = NewAddrBook(filepath.Join(dir, "addrbook.json"), true)
  307. book.SetLogger(log.TestingLogger())
  308. r = NewPEXReactor(book, &PEXReactorConfig{})
  309. r.SetLogger(log.TestingLogger())
  310. return
  311. }
  312. func teardownReactor(book *addrBook) {
  313. err := os.RemoveAll(filepath.Dir(book.FilePath()))
  314. if err != nil {
  315. panic(err)
  316. }
  317. }
  318. func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch {
  319. sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
  320. sw.SetLogger(log.TestingLogger())
  321. for _, r := range reactors {
  322. sw.AddReactor(r.String(), r)
  323. r.SetSwitch(sw)
  324. }
  325. return sw
  326. }