You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

636 lines
12 KiB

p2p: Implement PeerTransport This is the implementation for the design described in ADR 12[0]. It's the first step of a larger refactor of the p2p package as tracked in interface bundling all concerns of low-level connection handling and isolating the rest of peer lifecycle management from the specifics of the low-level internet protocols. Even if the swappable implementation will never be utilised, already the isolation of conn related code in one place will help with the reasoning about execution path and addressation of security sensitive issues surfaced through bounty programs and audits. We deliberately decided to not have Peer filtering and other management in the Transport, its sole responsibility is the translation of connections to Peers, handing those to the caller fully setup. It's the responsibility of the caller to reject those and or keep track. Peer filtering will take place in the Switch and can be inspected in a the following commit. This changeset additionally is an exercise in clean separation of logic and other infrastructural concerns like logging and instrumentation. By leveraging a clean and minimal interface. How this looks can be seen in a follow-up change. Design #2069[2] Refs #2067[3] Fixes #2047[4] Fixes #2046[5] changes: * describe Transport interface * implement new default Transport: MultiplexTransport * test MultiplexTransport with new constraints * implement ConnSet for concurrent management of net.Conn, synchronous to PeerSet * implement and expose duplicate IP filter * implemnt TransportOption for optional parametirisation [0] https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-012-peer-transport.md [1] https://github.com/tendermint/tendermint/issues/2067 [2] https://github.com/tendermint/tendermint/pull/2069 [3] https://github.com/tendermint/tendermint/issues/2067 [4] https://github.com/tendermint/tendermint/issues/2047 [5] https://github.com/tendermint/tendermint/issues/2046
6 years ago
  1. package p2p
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "net"
  6. "reflect"
  7. "testing"
  8. "time"
  9. "github.com/tendermint/tendermint/crypto/ed25519"
  10. )
  11. func TestTransportMultiplexConnFilter(t *testing.T) {
  12. mt := NewMultiplexTransport(
  13. NodeInfo{},
  14. NodeKey{
  15. PrivKey: ed25519.GenPrivKey(),
  16. },
  17. )
  18. MultiplexTransportConnFilters(
  19. func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil },
  20. func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil },
  21. func(_ ConnSet, _ net.Conn, _ []net.IP) error {
  22. return fmt.Errorf("rejected")
  23. },
  24. )(mt)
  25. addr, err := NewNetAddressStringWithOptionalID("127.0.0.1:0")
  26. if err != nil {
  27. t.Fatal(err)
  28. }
  29. if err := mt.Listen(*addr); err != nil {
  30. t.Fatal(err)
  31. }
  32. errc := make(chan error)
  33. go func() {
  34. addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
  35. if err != nil {
  36. errc <- err
  37. return
  38. }
  39. _, err = addr.Dial()
  40. if err != nil {
  41. errc <- err
  42. return
  43. }
  44. close(errc)
  45. }()
  46. if err := <-errc; err != nil {
  47. t.Errorf("connection failed: %v", err)
  48. }
  49. _, err = mt.Accept(peerConfig{})
  50. if err, ok := err.(ErrRejected); ok {
  51. if !err.IsFiltered() {
  52. t.Errorf("expected peer to be filtered")
  53. }
  54. } else {
  55. t.Errorf("expected ErrRejected")
  56. }
  57. }
  58. func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
  59. mt := NewMultiplexTransport(
  60. NodeInfo{},
  61. NodeKey{
  62. PrivKey: ed25519.GenPrivKey(),
  63. },
  64. )
  65. MultiplexTransportFilterTimeout(5 * time.Millisecond)(mt)
  66. MultiplexTransportConnFilters(
  67. func(_ ConnSet, _ net.Conn, _ []net.IP) error {
  68. time.Sleep(10 * time.Millisecond)
  69. return nil
  70. },
  71. )(mt)
  72. addr, err := NewNetAddressStringWithOptionalID("127.0.0.1:0")
  73. if err != nil {
  74. t.Fatal(err)
  75. }
  76. if err := mt.Listen(*addr); err != nil {
  77. t.Fatal(err)
  78. }
  79. errc := make(chan error)
  80. go func() {
  81. addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
  82. if err != nil {
  83. errc <- err
  84. return
  85. }
  86. _, err = addr.Dial()
  87. if err != nil {
  88. errc <- err
  89. return
  90. }
  91. close(errc)
  92. }()
  93. if err := <-errc; err != nil {
  94. t.Errorf("connection failed: %v", err)
  95. }
  96. _, err = mt.Accept(peerConfig{})
  97. if _, ok := err.(ErrFilterTimeout); !ok {
  98. t.Errorf("expected ErrFilterTimeout")
  99. }
  100. }
  101. func TestTransportMultiplexAcceptMultiple(t *testing.T) {
  102. mt := testSetupMultiplexTransport(t)
  103. var (
  104. seed = rand.New(rand.NewSource(time.Now().UnixNano()))
  105. errc = make(chan error, seed.Intn(64)+64)
  106. )
  107. // Setup dialers.
  108. for i := 0; i < cap(errc); i++ {
  109. go func() {
  110. var (
  111. pv = ed25519.GenPrivKey()
  112. dialer = NewMultiplexTransport(
  113. NodeInfo{
  114. ID: PubKeyToID(pv.PubKey()),
  115. ListenAddr: "127.0.0.1:0",
  116. Moniker: "dialer",
  117. Version: "1.0.0",
  118. },
  119. NodeKey{
  120. PrivKey: pv,
  121. },
  122. )
  123. )
  124. addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
  125. if err != nil {
  126. errc <- err
  127. return
  128. }
  129. _, err = dialer.Dial(*addr, peerConfig{})
  130. if err != nil {
  131. errc <- err
  132. return
  133. }
  134. // Signal that the connection was established.
  135. errc <- nil
  136. }()
  137. }
  138. // Catch connection errors.
  139. for i := 0; i < cap(errc); i++ {
  140. if err := <-errc; err != nil {
  141. t.Fatal(err)
  142. }
  143. }
  144. ps := []Peer{}
  145. // Accept all peers.
  146. for i := 0; i < cap(errc); i++ {
  147. p, err := mt.Accept(peerConfig{})
  148. if err != nil {
  149. t.Fatal(err)
  150. }
  151. if err := p.Start(); err != nil {
  152. t.Fatal(err)
  153. }
  154. ps = append(ps, p)
  155. }
  156. if have, want := len(ps), cap(errc); have != want {
  157. t.Errorf("have %v, want %v", have, want)
  158. }
  159. // Stop all peers.
  160. for _, p := range ps {
  161. if err := p.Stop(); err != nil {
  162. t.Fatal(err)
  163. }
  164. }
  165. if err := mt.Close(); err != nil {
  166. t.Errorf("close errored: %v", err)
  167. }
  168. }
  169. func TestTransportMultiplexAcceptNonBlocking(t *testing.T) {
  170. mt := testSetupMultiplexTransport(t)
  171. var (
  172. fastNodePV = ed25519.GenPrivKey()
  173. fastNodeInfo = NodeInfo{
  174. ID: PubKeyToID(fastNodePV.PubKey()),
  175. ListenAddr: "127.0.0.1:0",
  176. Moniker: "fastNode",
  177. Version: "1.0.0",
  178. }
  179. errc = make(chan error)
  180. fastc = make(chan struct{})
  181. slowc = make(chan struct{})
  182. )
  183. // Simulate slow Peer.
  184. go func() {
  185. addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
  186. if err != nil {
  187. errc <- err
  188. return
  189. }
  190. c, err := addr.Dial()
  191. if err != nil {
  192. errc <- err
  193. return
  194. }
  195. close(slowc)
  196. select {
  197. case <-fastc:
  198. // Fast peer connected.
  199. case <-time.After(50 * time.Millisecond):
  200. // We error if the fast peer didn't succeed.
  201. errc <- fmt.Errorf("Fast peer timed out")
  202. }
  203. sc, err := upgradeSecretConn(c, 20*time.Millisecond, ed25519.GenPrivKey())
  204. if err != nil {
  205. errc <- err
  206. return
  207. }
  208. _, err = handshake(sc, 20*time.Millisecond, NodeInfo{
  209. ID: PubKeyToID(ed25519.GenPrivKey().PubKey()),
  210. ListenAddr: "127.0.0.1:0",
  211. Moniker: "slow_peer",
  212. })
  213. if err != nil {
  214. errc <- err
  215. return
  216. }
  217. }()
  218. // Simulate fast Peer.
  219. go func() {
  220. <-slowc
  221. var (
  222. dialer = NewMultiplexTransport(
  223. fastNodeInfo,
  224. NodeKey{
  225. PrivKey: fastNodePV,
  226. },
  227. )
  228. )
  229. addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
  230. if err != nil {
  231. errc <- err
  232. return
  233. }
  234. _, err = dialer.Dial(*addr, peerConfig{})
  235. if err != nil {
  236. errc <- err
  237. return
  238. }
  239. close(errc)
  240. close(fastc)
  241. }()
  242. if err := <-errc; err != nil {
  243. t.Errorf("connection failed: %v", err)
  244. }
  245. p, err := mt.Accept(peerConfig{})
  246. if err != nil {
  247. t.Fatal(err)
  248. }
  249. if have, want := p.NodeInfo(), fastNodeInfo; !reflect.DeepEqual(have, want) {
  250. t.Errorf("have %v, want %v", have, want)
  251. }
  252. }
  253. func TestTransportMultiplexValidateNodeInfo(t *testing.T) {
  254. mt := testSetupMultiplexTransport(t)
  255. errc := make(chan error)
  256. go func() {
  257. var (
  258. pv = ed25519.GenPrivKey()
  259. dialer = NewMultiplexTransport(
  260. NodeInfo{
  261. ID: PubKeyToID(pv.PubKey()),
  262. ListenAddr: "127.0.0.1:0",
  263. Moniker: "", // Should not be empty.
  264. Version: "1.0.0",
  265. },
  266. NodeKey{
  267. PrivKey: pv,
  268. },
  269. )
  270. )
  271. addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
  272. if err != nil {
  273. errc <- err
  274. return
  275. }
  276. _, err = dialer.Dial(*addr, peerConfig{})
  277. if err != nil {
  278. errc <- err
  279. return
  280. }
  281. close(errc)
  282. }()
  283. if err := <-errc; err != nil {
  284. t.Errorf("connection failed: %v", err)
  285. }
  286. _, err := mt.Accept(peerConfig{})
  287. if err, ok := err.(ErrRejected); ok {
  288. if !err.IsNodeInfoInvalid() {
  289. t.Errorf("expected NodeInfo to be invalid")
  290. }
  291. } else {
  292. t.Errorf("expected ErrRejected")
  293. }
  294. }
  295. func TestTransportMultiplexRejectMissmatchID(t *testing.T) {
  296. mt := testSetupMultiplexTransport(t)
  297. errc := make(chan error)
  298. go func() {
  299. dialer := NewMultiplexTransport(
  300. NodeInfo{
  301. ID: PubKeyToID(ed25519.GenPrivKey().PubKey()),
  302. ListenAddr: "127.0.0.1:0",
  303. Moniker: "dialer",
  304. Version: "1.0.0",
  305. },
  306. NodeKey{
  307. PrivKey: ed25519.GenPrivKey(),
  308. },
  309. )
  310. addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
  311. if err != nil {
  312. errc <- err
  313. return
  314. }
  315. _, err = dialer.Dial(*addr, peerConfig{})
  316. if err != nil {
  317. errc <- err
  318. return
  319. }
  320. close(errc)
  321. }()
  322. if err := <-errc; err != nil {
  323. t.Errorf("connection failed: %v", err)
  324. }
  325. _, err := mt.Accept(peerConfig{})
  326. if err, ok := err.(ErrRejected); ok {
  327. if !err.IsAuthFailure() {
  328. t.Errorf("expected auth failure")
  329. }
  330. } else {
  331. t.Errorf("expected ErrRejected")
  332. }
  333. }
  334. func TestTransportMultiplexRejectIncompatible(t *testing.T) {
  335. mt := testSetupMultiplexTransport(t)
  336. errc := make(chan error)
  337. go func() {
  338. var (
  339. pv = ed25519.GenPrivKey()
  340. dialer = NewMultiplexTransport(
  341. NodeInfo{
  342. ID: PubKeyToID(pv.PubKey()),
  343. ListenAddr: "127.0.0.1:0",
  344. Moniker: "dialer",
  345. Version: "2.0.0",
  346. },
  347. NodeKey{
  348. PrivKey: pv,
  349. },
  350. )
  351. )
  352. addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
  353. if err != nil {
  354. errc <- err
  355. return
  356. }
  357. _, err = dialer.Dial(*addr, peerConfig{})
  358. if err != nil {
  359. errc <- err
  360. return
  361. }
  362. close(errc)
  363. }()
  364. _, err := mt.Accept(peerConfig{})
  365. if err, ok := err.(ErrRejected); ok {
  366. if !err.IsIncompatible() {
  367. t.Errorf("expected to reject incompatible")
  368. }
  369. } else {
  370. t.Errorf("expected ErrRejected")
  371. }
  372. }
  373. func TestTransportMultiplexRejectSelf(t *testing.T) {
  374. mt := testSetupMultiplexTransport(t)
  375. errc := make(chan error)
  376. go func() {
  377. addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
  378. if err != nil {
  379. errc <- err
  380. return
  381. }
  382. _, err = mt.Dial(*addr, peerConfig{})
  383. if err != nil {
  384. errc <- err
  385. return
  386. }
  387. close(errc)
  388. }()
  389. if err := <-errc; err != nil {
  390. if err, ok := err.(ErrRejected); ok {
  391. if !err.IsSelf() {
  392. t.Errorf("expected to reject self")
  393. }
  394. } else {
  395. t.Errorf("expected ErrRejected")
  396. }
  397. } else {
  398. t.Errorf("expected connection failure")
  399. }
  400. _, err := mt.Accept(peerConfig{})
  401. if err, ok := err.(ErrRejected); ok {
  402. if !err.IsSelf() {
  403. t.Errorf("expected to reject self")
  404. }
  405. } else {
  406. t.Errorf("expected ErrRejected")
  407. }
  408. }
  409. func TestTransportConnDuplicateIPFilter(t *testing.T) {
  410. filter := ConnDuplicateIPFilter()
  411. if err := filter(nil, &testTransportConn{}, nil); err != nil {
  412. t.Fatal(err)
  413. }
  414. var (
  415. c = &testTransportConn{}
  416. cs = NewConnSet()
  417. )
  418. cs.Set(c, []net.IP{
  419. net.IP{10, 0, 10, 1},
  420. net.IP{10, 0, 10, 2},
  421. net.IP{10, 0, 10, 3},
  422. })
  423. if err := filter(cs, c, []net.IP{
  424. net.IP{10, 0, 10, 2},
  425. }); err == nil {
  426. t.Errorf("expected Peer to be rejected as duplicate")
  427. }
  428. }
  429. func TestTransportHandshake(t *testing.T) {
  430. ln, err := net.Listen("tcp", "127.0.0.1:0")
  431. if err != nil {
  432. t.Fatal(err)
  433. }
  434. var (
  435. peerPV = ed25519.GenPrivKey()
  436. peerNodeInfo = NodeInfo{
  437. ID: PubKeyToID(peerPV.PubKey()),
  438. }
  439. )
  440. go func() {
  441. c, err := net.Dial(ln.Addr().Network(), ln.Addr().String())
  442. if err != nil {
  443. t.Error(err)
  444. return
  445. }
  446. go func(c net.Conn) {
  447. _, err := cdc.MarshalBinaryWriter(c, peerNodeInfo)
  448. if err != nil {
  449. t.Error(err)
  450. }
  451. }(c)
  452. go func(c net.Conn) {
  453. ni := NodeInfo{}
  454. _, err := cdc.UnmarshalBinaryReader(
  455. c,
  456. &ni,
  457. int64(MaxNodeInfoSize()),
  458. )
  459. if err != nil {
  460. t.Error(err)
  461. }
  462. }(c)
  463. }()
  464. c, err := ln.Accept()
  465. if err != nil {
  466. t.Fatal(err)
  467. }
  468. ni, err := handshake(c, 20*time.Millisecond, NodeInfo{})
  469. if err != nil {
  470. t.Fatal(err)
  471. }
  472. if have, want := ni, peerNodeInfo; !reflect.DeepEqual(have, want) {
  473. t.Errorf("have %v, want %v", have, want)
  474. }
  475. }
  476. func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport {
  477. var (
  478. pv = ed25519.GenPrivKey()
  479. mt = NewMultiplexTransport(
  480. NodeInfo{
  481. ID: PubKeyToID(pv.PubKey()),
  482. ListenAddr: "127.0.0.1:0",
  483. Moniker: "transport",
  484. Version: "1.0.0",
  485. },
  486. NodeKey{
  487. PrivKey: pv,
  488. },
  489. )
  490. )
  491. addr, err := NewNetAddressStringWithOptionalID("127.0.0.1:0")
  492. if err != nil {
  493. t.Fatal(err)
  494. }
  495. if err := mt.Listen(*addr); err != nil {
  496. t.Fatal(err)
  497. }
  498. return mt
  499. }
  500. type testTransportAddr struct{}
  501. func (a *testTransportAddr) Network() string { return "tcp" }
  502. func (a *testTransportAddr) String() string { return "test.local:1234" }
  503. type testTransportConn struct{}
  504. func (c *testTransportConn) Close() error {
  505. return fmt.Errorf("Close() not implemented")
  506. }
  507. func (c *testTransportConn) LocalAddr() net.Addr {
  508. return &testTransportAddr{}
  509. }
  510. func (c *testTransportConn) RemoteAddr() net.Addr {
  511. return &testTransportAddr{}
  512. }
  513. func (c *testTransportConn) Read(_ []byte) (int, error) {
  514. return -1, fmt.Errorf("Read() not implemented")
  515. }
  516. func (c *testTransportConn) SetDeadline(_ time.Time) error {
  517. return fmt.Errorf("SetDeadline() not implemented")
  518. }
  519. func (c *testTransportConn) SetReadDeadline(_ time.Time) error {
  520. return fmt.Errorf("SetReadDeadline() not implemented")
  521. }
  522. func (c *testTransportConn) SetWriteDeadline(_ time.Time) error {
  523. return fmt.Errorf("SetWriteDeadline() not implemented")
  524. }
  525. func (c *testTransportConn) Write(_ []byte) (int, error) {
  526. return -1, fmt.Errorf("Write() not implemented")
  527. }