You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

427 lines
11 KiB

8 years ago
8 years ago
8 years ago
p2p: introduce peerConn to simplify peer creation (#1226) * expose AuthEnc in the P2P config if AuthEnc is true, dialed peers must have a node ID in the address and it must match the persistent pubkey from the secret handshake. Refs #1157 * fixes after my own review * fix docs * fix build failure ``` p2p/pex/pex_reactor_test.go:288:88: cannot use seed.NodeInfo().NetAddress() (type *p2p.NetAddress) as type string in array or slice literal ``` * p2p: introduce peerConn to simplify peer creation * Introduce `peerConn` containing the known fields of `peer` * `peer` only created in `sw.addPeer` once handshake is complete and NodeInfo is checked * Eliminates some mutable variables and makes the code flow better * Simplifies the `newXxxPeer` funcs * Use ID instead of PubKey where possible. * SetPubKeyFilter -> SetIDFilter * nodeInfo.Validate takes ID * remove peer.PubKey() * persistent node ids * fixes from review * test: use ip_plus_id.sh more * fix invalid memory panic during fast_sync test ``` 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: panic: runtime error: invalid memory address or nil pointer dereference 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: [signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0x98dd3e] 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: goroutine 3432 [running]: 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.newOutboundPeerConn(0xc423fd1380, 0xc420933e00, 0x1, 0x1239a60, 0 xc420128c40, 0x2, 0x42caf6, 0xc42001f300, 0xc422831d98, 0xc4227951c0, ...) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/peer.go:123 +0x31e 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).addOutboundPeerWithConfig(0xc4200ad040, 0xc423fd1380, 0 xc420933e00, 0xc423f48801, 0x28, 0x2) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:455 +0x12b 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).DialPeerWithAddress(0xc4200ad040, 0xc423fd1380, 0x1, 0x 0, 0x0) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:371 +0xdc 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).reconnectToPeer(0xc4200ad040, 0x123e000, 0xc42007bb00) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:290 +0x25f 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: created by github.com/tendermint/tendermint/p2p.(*Switch).StopPeerForError 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:256 +0x1b7 ```
7 years ago
8 years ago
  1. package pex
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "os"
  6. "path/filepath"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. crypto "github.com/tendermint/go-crypto"
  12. wire "github.com/tendermint/go-wire"
  13. cfg "github.com/tendermint/tendermint/config"
  14. "github.com/tendermint/tendermint/p2p"
  15. "github.com/tendermint/tendermint/p2p/conn"
  16. cmn "github.com/tendermint/tmlibs/common"
  17. "github.com/tendermint/tmlibs/log"
  18. )
  19. var (
  20. config *cfg.P2PConfig
  21. )
  22. func init() {
  23. config = cfg.DefaultP2PConfig()
  24. config.PexReactor = true
  25. }
  26. func TestPEXReactorBasic(t *testing.T) {
  27. r, book := createReactor(&PEXReactorConfig{})
  28. defer teardownReactor(book)
  29. assert.NotNil(t, r)
  30. assert.NotEmpty(t, r.GetChannels())
  31. }
  32. func TestPEXReactorAddRemovePeer(t *testing.T) {
  33. r, book := createReactor(&PEXReactorConfig{})
  34. defer teardownReactor(book)
  35. size := book.Size()
  36. peer := p2p.CreateRandomPeer(false)
  37. r.AddPeer(peer)
  38. assert.Equal(t, size+1, book.Size())
  39. r.RemovePeer(peer, "peer not available")
  40. assert.Equal(t, size+1, book.Size())
  41. outboundPeer := p2p.CreateRandomPeer(true)
  42. r.AddPeer(outboundPeer)
  43. assert.Equal(t, size+1, book.Size(), "outbound peers should not be added to the address book")
  44. r.RemovePeer(outboundPeer, "peer not available")
  45. assert.Equal(t, size+1, book.Size())
  46. }
  47. func TestPEXReactorRunning(t *testing.T) {
  48. N := 3
  49. switches := make([]*p2p.Switch, N)
  50. dir, err := ioutil.TempDir("", "pex_reactor")
  51. require.Nil(t, err)
  52. defer os.RemoveAll(dir) // nolint: errcheck
  53. book := NewAddrBook(filepath.Join(dir, "addrbook.json"), false)
  54. book.SetLogger(log.TestingLogger())
  55. // create switches
  56. for i := 0; i < N; i++ {
  57. switches[i] = p2p.MakeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
  58. sw.SetLogger(log.TestingLogger().With("switch", i))
  59. r := NewPEXReactor(book, &PEXReactorConfig{})
  60. r.SetLogger(log.TestingLogger())
  61. r.SetEnsurePeersPeriod(250 * time.Millisecond)
  62. sw.AddReactor("pex", r)
  63. return sw
  64. })
  65. }
  66. // fill the address book and add listeners
  67. for _, s := range switches {
  68. addr := s.NodeInfo().NetAddress()
  69. book.AddAddress(addr, addr)
  70. s.AddListener(p2p.NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true, log.TestingLogger()))
  71. }
  72. // start switches
  73. for _, s := range switches {
  74. err := s.Start() // start switch and reactors
  75. require.Nil(t, err)
  76. }
  77. assertPeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second, N-1)
  78. // stop them
  79. for _, s := range switches {
  80. s.Stop()
  81. }
  82. }
  83. func TestPEXReactorReceive(t *testing.T) {
  84. r, book := createReactor(&PEXReactorConfig{})
  85. defer teardownReactor(book)
  86. peer := p2p.CreateRandomPeer(false)
  87. // we have to send a request to receive responses
  88. r.RequestAddrs(peer)
  89. size := book.Size()
  90. addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
  91. msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  92. r.Receive(PexChannel, peer, msg)
  93. assert.Equal(t, size+1, book.Size())
  94. msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
  95. r.Receive(PexChannel, peer, msg)
  96. }
  97. func TestPEXReactorRequestMessageAbuse(t *testing.T) {
  98. r, book := createReactor(&PEXReactorConfig{})
  99. defer teardownReactor(book)
  100. sw := createSwitchAndAddReactors(r)
  101. peer := newMockPeer()
  102. p2p.AddPeerToSwitch(sw, peer)
  103. assert.True(t, sw.Peers().Has(peer.ID()))
  104. id := string(peer.ID())
  105. msg := wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
  106. // first time creates the entry
  107. r.Receive(PexChannel, peer, msg)
  108. assert.True(t, r.lastReceivedRequests.Has(id))
  109. assert.True(t, sw.Peers().Has(peer.ID()))
  110. // next time sets the last time value
  111. r.Receive(PexChannel, peer, msg)
  112. assert.True(t, r.lastReceivedRequests.Has(id))
  113. assert.True(t, sw.Peers().Has(peer.ID()))
  114. // third time is too many too soon - peer is removed
  115. r.Receive(PexChannel, peer, msg)
  116. assert.False(t, r.lastReceivedRequests.Has(id))
  117. assert.False(t, sw.Peers().Has(peer.ID()))
  118. }
  119. func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
  120. r, book := createReactor(&PEXReactorConfig{})
  121. defer teardownReactor(book)
  122. sw := createSwitchAndAddReactors(r)
  123. peer := newMockPeer()
  124. p2p.AddPeerToSwitch(sw, peer)
  125. assert.True(t, sw.Peers().Has(peer.ID()))
  126. id := string(peer.ID())
  127. // request addrs from the peer
  128. r.RequestAddrs(peer)
  129. assert.True(t, r.requestsSent.Has(id))
  130. assert.True(t, sw.Peers().Has(peer.ID()))
  131. addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
  132. msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  133. // receive some addrs. should clear the request
  134. r.Receive(PexChannel, peer, msg)
  135. assert.False(t, r.requestsSent.Has(id))
  136. assert.True(t, sw.Peers().Has(peer.ID()))
  137. // receiving more addrs causes a disconnect
  138. r.Receive(PexChannel, peer, msg)
  139. assert.False(t, sw.Peers().Has(peer.ID()))
  140. }
  141. func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
  142. dir, err := ioutil.TempDir("", "pex_reactor")
  143. require.Nil(t, err)
  144. defer os.RemoveAll(dir) // nolint: errcheck
  145. book := NewAddrBook(filepath.Join(dir, "addrbook.json"), false)
  146. book.SetLogger(log.TestingLogger())
  147. // 1. create seed
  148. seed := p2p.MakeSwitch(
  149. config,
  150. 0,
  151. "127.0.0.1",
  152. "123.123.123",
  153. func(i int, sw *p2p.Switch) *p2p.Switch {
  154. sw.SetLogger(log.TestingLogger())
  155. r := NewPEXReactor(book, &PEXReactorConfig{})
  156. r.SetLogger(log.TestingLogger())
  157. sw.AddReactor("pex", r)
  158. return sw
  159. },
  160. )
  161. seed.AddListener(
  162. p2p.NewDefaultListener(
  163. "tcp",
  164. seed.NodeInfo().ListenAddr,
  165. true,
  166. log.TestingLogger(),
  167. ),
  168. )
  169. require.Nil(t, seed.Start())
  170. defer seed.Stop()
  171. // 2. create usual peer with only seed configured.
  172. peer := p2p.MakeSwitch(
  173. config,
  174. 1,
  175. "127.0.0.1",
  176. "123.123.123",
  177. func(i int, sw *p2p.Switch) *p2p.Switch {
  178. sw.SetLogger(log.TestingLogger())
  179. r := NewPEXReactor(
  180. book,
  181. &PEXReactorConfig{
  182. Seeds: []string{seed.NodeInfo().NetAddress().String()},
  183. },
  184. )
  185. r.SetLogger(log.TestingLogger())
  186. sw.AddReactor("pex", r)
  187. return sw
  188. },
  189. )
  190. require.Nil(t, peer.Start())
  191. defer peer.Stop()
  192. // 3. check that the peer connects to seed immediately
  193. assertPeersWithTimeout(t, []*p2p.Switch{peer}, 10*time.Millisecond, 1*time.Second, 1)
  194. }
  195. func TestPEXReactorCrawlStatus(t *testing.T) {
  196. pexR, book := createReactor(&PEXReactorConfig{SeedMode: true})
  197. defer teardownReactor(book)
  198. // Seed/Crawler mode uses data from the Switch
  199. _ = createSwitchAndAddReactors(pexR)
  200. // Create a peer, add it to the peer set and the addrbook.
  201. peer := p2p.CreateRandomPeer(false)
  202. p2p.AddPeerToSwitch(pexR.Switch, peer)
  203. addr1 := peer.NodeInfo().NetAddress()
  204. pexR.book.AddAddress(addr1, addr1)
  205. // Add a non-connected address to the book.
  206. _, addr2 := p2p.CreateRoutableAddr()
  207. pexR.book.AddAddress(addr2, addr1)
  208. // Get some peerInfos to crawl
  209. peerInfos := pexR.getPeersToCrawl()
  210. // Make sure it has the proper number of elements
  211. assert.Equal(t, 2, len(peerInfos))
  212. // TODO: test
  213. }
  214. func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
  215. peer := p2p.CreateRandomPeer(false)
  216. pexR, book := createReactor(&PEXReactorConfig{PrivatePeerIDs: []string{string(peer.NodeInfo().ID())}})
  217. defer teardownReactor(book)
  218. // we have to send a request to receive responses
  219. pexR.RequestAddrs(peer)
  220. size := book.Size()
  221. addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
  222. msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  223. pexR.Receive(PexChannel, peer, msg)
  224. assert.Equal(t, size, book.Size())
  225. pexR.AddPeer(peer)
  226. assert.Equal(t, size, book.Size())
  227. }
  228. func TestPEXReactorDialPeer(t *testing.T) {
  229. pexR, book := createReactor(&PEXReactorConfig{})
  230. defer teardownReactor(book)
  231. _ = createSwitchAndAddReactors(pexR)
  232. peer := newMockPeer()
  233. addr := peer.NodeInfo().NetAddress()
  234. assert.Equal(t, 0, pexR.AttemptsToDial(addr))
  235. // 1st unsuccessful attempt
  236. pexR.dialPeer(addr)
  237. assert.Equal(t, 1, pexR.AttemptsToDial(addr))
  238. // 2nd unsuccessful attempt
  239. pexR.dialPeer(addr)
  240. // must be skipped because it is too early
  241. assert.Equal(t, 1, pexR.AttemptsToDial(addr))
  242. if !testing.Short() {
  243. time.Sleep(3 * time.Second)
  244. // 3rd attempt
  245. pexR.dialPeer(addr)
  246. assert.Equal(t, 2, pexR.AttemptsToDial(addr))
  247. }
  248. }
  249. type mockPeer struct {
  250. *cmn.BaseService
  251. pubKey crypto.PubKey
  252. addr *p2p.NetAddress
  253. outbound, persistent bool
  254. }
  255. func newMockPeer() mockPeer {
  256. _, netAddr := p2p.CreateRoutableAddr()
  257. mp := mockPeer{
  258. addr: netAddr,
  259. pubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(),
  260. }
  261. mp.BaseService = cmn.NewBaseService(nil, "MockPeer", mp)
  262. mp.Start()
  263. return mp
  264. }
  265. func (mp mockPeer) ID() p2p.ID { return p2p.PubKeyToID(mp.pubKey) }
  266. func (mp mockPeer) IsOutbound() bool { return mp.outbound }
  267. func (mp mockPeer) IsPersistent() bool { return mp.persistent }
  268. func (mp mockPeer) NodeInfo() p2p.NodeInfo {
  269. return p2p.NodeInfo{
  270. PubKey: mp.pubKey,
  271. ListenAddr: mp.addr.DialString(),
  272. }
  273. }
  274. func (mp mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} }
  275. func (mp mockPeer) Send(byte, interface{}) bool { return false }
  276. func (mp mockPeer) TrySend(byte, interface{}) bool { return false }
  277. func (mp mockPeer) Set(string, interface{}) {}
  278. func (mp mockPeer) Get(string) interface{} { return nil }
  279. func assertPeersWithTimeout(
  280. t *testing.T,
  281. switches []*p2p.Switch,
  282. checkPeriod, timeout time.Duration,
  283. nPeers int,
  284. ) {
  285. var (
  286. ticker = time.NewTicker(checkPeriod)
  287. remaining = timeout
  288. )
  289. for {
  290. select {
  291. case <-ticker.C:
  292. // check peers are connected
  293. allGood := true
  294. for _, s := range switches {
  295. outbound, inbound, _ := s.NumPeers()
  296. if outbound+inbound < nPeers {
  297. allGood = false
  298. }
  299. }
  300. remaining -= checkPeriod
  301. if remaining < 0 {
  302. remaining = 0
  303. }
  304. if allGood {
  305. return
  306. }
  307. case <-time.After(remaining):
  308. numPeersStr := ""
  309. for i, s := range switches {
  310. outbound, inbound, _ := s.NumPeers()
  311. numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
  312. }
  313. t.Errorf(
  314. "expected all switches to be connected to at least one peer (switches: %s)",
  315. numPeersStr,
  316. )
  317. return
  318. }
  319. }
  320. }
  321. func createReactor(config *PEXReactorConfig) (r *PEXReactor, book *addrBook) {
  322. dir, err := ioutil.TempDir("", "pex_reactor")
  323. if err != nil {
  324. panic(err)
  325. }
  326. book = NewAddrBook(filepath.Join(dir, "addrbook.json"), true)
  327. book.SetLogger(log.TestingLogger())
  328. r = NewPEXReactor(book, config)
  329. r.SetLogger(log.TestingLogger())
  330. return
  331. }
  332. func teardownReactor(book *addrBook) {
  333. err := os.RemoveAll(filepath.Dir(book.FilePath()))
  334. if err != nil {
  335. panic(err)
  336. }
  337. }
  338. func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch {
  339. sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
  340. sw.SetLogger(log.TestingLogger())
  341. for _, r := range reactors {
  342. sw.AddReactor(r.String(), r)
  343. r.SetSwitch(sw)
  344. }
  345. return sw
  346. }