You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

633 lines
17 KiB

  1. package conn
  2. import (
  3. "encoding/hex"
  4. "net"
  5. "testing"
  6. "time"
  7. "github.com/fortytw2/leaktest"
  8. "github.com/gogo/protobuf/proto"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. "github.com/tendermint/tendermint/libs/log"
  12. "github.com/tendermint/tendermint/libs/protoio"
  13. tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
  14. "github.com/tendermint/tendermint/proto/tendermint/types"
  15. )
  16. const maxPingPongPacketSize = 1024 // bytes
  17. func createTestMConnection(conn net.Conn) *MConnection {
  18. onReceive := func(chID byte, msgBytes []byte) {
  19. }
  20. onError := func(r interface{}) {
  21. }
  22. c := createMConnectionWithCallbacks(conn, onReceive, onError)
  23. c.SetLogger(log.TestingLogger())
  24. return c
  25. }
  26. func createMConnectionWithCallbacks(
  27. conn net.Conn,
  28. onReceive func(chID byte, msgBytes []byte),
  29. onError func(r interface{}),
  30. ) *MConnection {
  31. cfg := DefaultMConnConfig()
  32. cfg.PingInterval = 90 * time.Millisecond
  33. cfg.PongTimeout = 45 * time.Millisecond
  34. chDescs := []*ChannelDescriptor{{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
  35. c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg)
  36. c.SetLogger(log.TestingLogger())
  37. return c
  38. }
  39. func TestMConnectionSendFlushStop(t *testing.T) {
  40. server, client := NetPipe()
  41. defer server.Close()
  42. defer client.Close()
  43. clientConn := createTestMConnection(client)
  44. err := clientConn.Start()
  45. require.Nil(t, err)
  46. defer clientConn.Stop() // nolint:errcheck // ignore for tests
  47. msg := []byte("abc")
  48. assert.True(t, clientConn.Send(0x01, msg))
  49. msgLength := 14
  50. // start the reader in a new routine, so we can flush
  51. errCh := make(chan error)
  52. go func() {
  53. msgB := make([]byte, msgLength)
  54. _, err := server.Read(msgB)
  55. if err != nil {
  56. t.Error(err)
  57. return
  58. }
  59. errCh <- err
  60. }()
  61. // stop the conn - it should flush all conns
  62. clientConn.FlushStop()
  63. timer := time.NewTimer(3 * time.Second)
  64. select {
  65. case <-errCh:
  66. case <-timer.C:
  67. t.Error("timed out waiting for msgs to be read")
  68. }
  69. }
  70. func TestMConnectionSend(t *testing.T) {
  71. server, client := NetPipe()
  72. defer server.Close()
  73. defer client.Close()
  74. mconn := createTestMConnection(client)
  75. err := mconn.Start()
  76. require.Nil(t, err)
  77. defer mconn.Stop() // nolint:errcheck // ignore for tests
  78. msg := []byte("Ant-Man")
  79. assert.True(t, mconn.Send(0x01, msg))
  80. // Note: subsequent Send/TrySend calls could pass because we are reading from
  81. // the send queue in a separate goroutine.
  82. _, err = server.Read(make([]byte, len(msg)))
  83. if err != nil {
  84. t.Error(err)
  85. }
  86. assert.True(t, mconn.CanSend(0x01))
  87. msg = []byte("Spider-Man")
  88. assert.True(t, mconn.TrySend(0x01, msg))
  89. _, err = server.Read(make([]byte, len(msg)))
  90. if err != nil {
  91. t.Error(err)
  92. }
  93. assert.False(t, mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
  94. assert.False(t, mconn.Send(0x05, []byte("Absorbing Man")), "Send should return false because channel is unknown")
  95. }
  96. func TestMConnectionReceive(t *testing.T) {
  97. server, client := NetPipe()
  98. defer server.Close()
  99. defer client.Close()
  100. receivedCh := make(chan []byte)
  101. errorsCh := make(chan interface{})
  102. onReceive := func(chID byte, msgBytes []byte) {
  103. receivedCh <- msgBytes
  104. }
  105. onError := func(r interface{}) {
  106. errorsCh <- r
  107. }
  108. mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
  109. err := mconn1.Start()
  110. require.Nil(t, err)
  111. defer mconn1.Stop() // nolint:errcheck // ignore for tests
  112. mconn2 := createTestMConnection(server)
  113. err = mconn2.Start()
  114. require.Nil(t, err)
  115. defer mconn2.Stop() // nolint:errcheck // ignore for tests
  116. msg := []byte("Cyclops")
  117. assert.True(t, mconn2.Send(0x01, msg))
  118. select {
  119. case receivedBytes := <-receivedCh:
  120. assert.Equal(t, msg, receivedBytes)
  121. case err := <-errorsCh:
  122. t.Fatalf("Expected %s, got %+v", msg, err)
  123. case <-time.After(500 * time.Millisecond):
  124. t.Fatalf("Did not receive %s message in 500ms", msg)
  125. }
  126. }
  127. func TestMConnectionStatus(t *testing.T) {
  128. server, client := NetPipe()
  129. defer server.Close()
  130. defer client.Close()
  131. mconn := createTestMConnection(client)
  132. err := mconn.Start()
  133. require.Nil(t, err)
  134. defer mconn.Stop() // nolint:errcheck // ignore for tests
  135. status := mconn.Status()
  136. assert.NotNil(t, status)
  137. assert.Zero(t, status.Channels[0].SendQueueSize)
  138. }
  139. func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
  140. server, client := net.Pipe()
  141. defer server.Close()
  142. defer client.Close()
  143. receivedCh := make(chan []byte)
  144. errorsCh := make(chan interface{})
  145. onReceive := func(chID byte, msgBytes []byte) {
  146. receivedCh <- msgBytes
  147. }
  148. onError := func(r interface{}) {
  149. errorsCh <- r
  150. }
  151. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  152. err := mconn.Start()
  153. require.Nil(t, err)
  154. defer mconn.Stop() // nolint:errcheck // ignore for tests
  155. serverGotPing := make(chan struct{})
  156. go func() {
  157. // read ping
  158. var pkt tmp2p.Packet
  159. _, err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&pkt)
  160. require.NoError(t, err)
  161. serverGotPing <- struct{}{}
  162. }()
  163. <-serverGotPing
  164. pongTimerExpired := mconn.config.PongTimeout + 200*time.Millisecond
  165. select {
  166. case msgBytes := <-receivedCh:
  167. t.Fatalf("Expected error, but got %v", msgBytes)
  168. case err := <-errorsCh:
  169. assert.NotNil(t, err)
  170. case <-time.After(pongTimerExpired):
  171. t.Fatalf("Expected to receive error after %v", pongTimerExpired)
  172. }
  173. }
  174. func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) {
  175. server, client := net.Pipe()
  176. defer server.Close()
  177. defer client.Close()
  178. receivedCh := make(chan []byte)
  179. errorsCh := make(chan interface{})
  180. onReceive := func(chID byte, msgBytes []byte) {
  181. receivedCh <- msgBytes
  182. }
  183. onError := func(r interface{}) {
  184. errorsCh <- r
  185. }
  186. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  187. err := mconn.Start()
  188. require.Nil(t, err)
  189. defer mconn.Stop() // nolint:errcheck // ignore for tests
  190. // sending 3 pongs in a row (abuse)
  191. protoWriter := protoio.NewDelimitedWriter(server)
  192. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  193. require.NoError(t, err)
  194. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  195. require.NoError(t, err)
  196. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  197. require.NoError(t, err)
  198. serverGotPing := make(chan struct{})
  199. go func() {
  200. // read ping (one byte)
  201. var packet tmp2p.Packet
  202. _, err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&packet)
  203. require.NoError(t, err)
  204. serverGotPing <- struct{}{}
  205. // respond with pong
  206. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  207. require.NoError(t, err)
  208. }()
  209. <-serverGotPing
  210. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  211. select {
  212. case msgBytes := <-receivedCh:
  213. t.Fatalf("Expected no data, but got %v", msgBytes)
  214. case err := <-errorsCh:
  215. t.Fatalf("Expected no error, but got %v", err)
  216. case <-time.After(pongTimerExpired):
  217. assert.True(t, mconn.IsRunning())
  218. }
  219. }
  220. func TestMConnectionMultiplePings(t *testing.T) {
  221. server, client := net.Pipe()
  222. defer server.Close()
  223. defer client.Close()
  224. receivedCh := make(chan []byte)
  225. errorsCh := make(chan interface{})
  226. onReceive := func(chID byte, msgBytes []byte) {
  227. receivedCh <- msgBytes
  228. }
  229. onError := func(r interface{}) {
  230. errorsCh <- r
  231. }
  232. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  233. err := mconn.Start()
  234. require.Nil(t, err)
  235. defer mconn.Stop() // nolint:errcheck // ignore for tests
  236. // sending 3 pings in a row (abuse)
  237. // see https://github.com/tendermint/tendermint/issues/1190
  238. protoReader := protoio.NewDelimitedReader(server, maxPingPongPacketSize)
  239. protoWriter := protoio.NewDelimitedWriter(server)
  240. var pkt tmp2p.Packet
  241. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
  242. require.NoError(t, err)
  243. _, err = protoReader.ReadMsg(&pkt)
  244. require.NoError(t, err)
  245. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
  246. require.NoError(t, err)
  247. _, err = protoReader.ReadMsg(&pkt)
  248. require.NoError(t, err)
  249. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
  250. require.NoError(t, err)
  251. _, err = protoReader.ReadMsg(&pkt)
  252. require.NoError(t, err)
  253. assert.True(t, mconn.IsRunning())
  254. }
  255. func TestMConnectionPingPongs(t *testing.T) {
  256. // check that we are not leaking any go-routines
  257. defer leaktest.CheckTimeout(t, 10*time.Second)()
  258. server, client := net.Pipe()
  259. defer server.Close()
  260. defer client.Close()
  261. receivedCh := make(chan []byte)
  262. errorsCh := make(chan interface{})
  263. onReceive := func(chID byte, msgBytes []byte) {
  264. receivedCh <- msgBytes
  265. }
  266. onError := func(r interface{}) {
  267. errorsCh <- r
  268. }
  269. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  270. err := mconn.Start()
  271. require.Nil(t, err)
  272. defer mconn.Stop() // nolint:errcheck // ignore for tests
  273. serverGotPing := make(chan struct{})
  274. go func() {
  275. protoReader := protoio.NewDelimitedReader(server, maxPingPongPacketSize)
  276. protoWriter := protoio.NewDelimitedWriter(server)
  277. var pkt tmp2p.PacketPing
  278. // read ping
  279. _, err = protoReader.ReadMsg(&pkt)
  280. require.NoError(t, err)
  281. serverGotPing <- struct{}{}
  282. // respond with pong
  283. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  284. require.NoError(t, err)
  285. time.Sleep(mconn.config.PingInterval)
  286. // read ping
  287. _, err = protoReader.ReadMsg(&pkt)
  288. require.NoError(t, err)
  289. serverGotPing <- struct{}{}
  290. // respond with pong
  291. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  292. require.NoError(t, err)
  293. }()
  294. <-serverGotPing
  295. <-serverGotPing
  296. pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 2
  297. select {
  298. case msgBytes := <-receivedCh:
  299. t.Fatalf("Expected no data, but got %v", msgBytes)
  300. case err := <-errorsCh:
  301. t.Fatalf("Expected no error, but got %v", err)
  302. case <-time.After(2 * pongTimerExpired):
  303. assert.True(t, mconn.IsRunning())
  304. }
  305. }
  306. func TestMConnectionStopsAndReturnsError(t *testing.T) {
  307. server, client := NetPipe()
  308. defer server.Close()
  309. defer client.Close()
  310. receivedCh := make(chan []byte)
  311. errorsCh := make(chan interface{})
  312. onReceive := func(chID byte, msgBytes []byte) {
  313. receivedCh <- msgBytes
  314. }
  315. onError := func(r interface{}) {
  316. errorsCh <- r
  317. }
  318. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  319. err := mconn.Start()
  320. require.Nil(t, err)
  321. defer mconn.Stop() // nolint:errcheck // ignore for tests
  322. if err := client.Close(); err != nil {
  323. t.Error(err)
  324. }
  325. select {
  326. case receivedBytes := <-receivedCh:
  327. t.Fatalf("Expected error, got %v", receivedBytes)
  328. case err := <-errorsCh:
  329. assert.NotNil(t, err)
  330. assert.False(t, mconn.IsRunning())
  331. case <-time.After(500 * time.Millisecond):
  332. t.Fatal("Did not receive error in 500ms")
  333. }
  334. }
  335. func newClientAndServerConnsForReadErrors(t *testing.T, chOnErr chan struct{}) (*MConnection, *MConnection) {
  336. server, client := NetPipe()
  337. onReceive := func(chID byte, msgBytes []byte) {}
  338. onError := func(r interface{}) {}
  339. // create client conn with two channels
  340. chDescs := []*ChannelDescriptor{
  341. {ID: 0x01, Priority: 1, SendQueueCapacity: 1},
  342. {ID: 0x02, Priority: 1, SendQueueCapacity: 1},
  343. }
  344. mconnClient := NewMConnection(client, chDescs, onReceive, onError)
  345. mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
  346. err := mconnClient.Start()
  347. require.Nil(t, err)
  348. // create server conn with 1 channel
  349. // it fires on chOnErr when there's an error
  350. serverLogger := log.TestingLogger().With("module", "server")
  351. onError = func(r interface{}) {
  352. chOnErr <- struct{}{}
  353. }
  354. mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
  355. mconnServer.SetLogger(serverLogger)
  356. err = mconnServer.Start()
  357. require.Nil(t, err)
  358. return mconnClient, mconnServer
  359. }
  360. func expectSend(ch chan struct{}) bool {
  361. after := time.After(time.Second * 5)
  362. select {
  363. case <-ch:
  364. return true
  365. case <-after:
  366. return false
  367. }
  368. }
  369. func TestMConnectionReadErrorBadEncoding(t *testing.T) {
  370. chOnErr := make(chan struct{})
  371. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  372. client := mconnClient.conn
  373. // Write it.
  374. _, err := client.Write([]byte{1, 2, 3, 4, 5})
  375. require.NoError(t, err)
  376. assert.True(t, expectSend(chOnErr), "badly encoded msgPacket")
  377. t.Cleanup(func() {
  378. if err := mconnClient.Stop(); err != nil {
  379. t.Log(err)
  380. }
  381. })
  382. t.Cleanup(func() {
  383. if err := mconnServer.Stop(); err != nil {
  384. t.Log(err)
  385. }
  386. })
  387. }
  388. func TestMConnectionReadErrorUnknownChannel(t *testing.T) {
  389. chOnErr := make(chan struct{})
  390. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  391. msg := []byte("Ant-Man")
  392. // fail to send msg on channel unknown by client
  393. assert.False(t, mconnClient.Send(0x03, msg))
  394. // send msg on channel unknown by the server.
  395. // should cause an error
  396. assert.True(t, mconnClient.Send(0x02, msg))
  397. assert.True(t, expectSend(chOnErr), "unknown channel")
  398. t.Cleanup(func() {
  399. if err := mconnClient.Stop(); err != nil {
  400. t.Log(err)
  401. }
  402. })
  403. t.Cleanup(func() {
  404. if err := mconnServer.Stop(); err != nil {
  405. t.Log(err)
  406. }
  407. })
  408. }
  409. func TestMConnectionReadErrorLongMessage(t *testing.T) {
  410. chOnErr := make(chan struct{})
  411. chOnRcv := make(chan struct{})
  412. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  413. defer mconnClient.Stop() // nolint:errcheck // ignore for tests
  414. defer mconnServer.Stop() // nolint:errcheck // ignore for tests
  415. mconnServer.onReceive = func(chID byte, msgBytes []byte) {
  416. chOnRcv <- struct{}{}
  417. }
  418. client := mconnClient.conn
  419. protoWriter := protoio.NewDelimitedWriter(client)
  420. // send msg thats just right
  421. var packet = tmp2p.PacketMsg{
  422. ChannelID: 0x01,
  423. EOF: true,
  424. Data: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize),
  425. }
  426. _, err := protoWriter.WriteMsg(mustWrapPacket(&packet))
  427. require.NoError(t, err)
  428. assert.True(t, expectSend(chOnRcv), "msg just right")
  429. // send msg thats too long
  430. packet = tmp2p.PacketMsg{
  431. ChannelID: 0x01,
  432. EOF: true,
  433. Data: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+100),
  434. }
  435. _, err = protoWriter.WriteMsg(mustWrapPacket(&packet))
  436. require.Error(t, err)
  437. assert.True(t, expectSend(chOnErr), "msg too long")
  438. }
  439. func TestMConnectionReadErrorUnknownMsgType(t *testing.T) {
  440. chOnErr := make(chan struct{})
  441. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  442. defer mconnClient.Stop() // nolint:errcheck // ignore for tests
  443. defer mconnServer.Stop() // nolint:errcheck // ignore for tests
  444. // send msg with unknown msg type
  445. _, err := protoio.NewDelimitedWriter(mconnClient.conn).WriteMsg(&types.Header{ChainID: "x"})
  446. require.NoError(t, err)
  447. assert.True(t, expectSend(chOnErr), "unknown msg type")
  448. }
  449. func TestMConnectionTrySend(t *testing.T) {
  450. server, client := NetPipe()
  451. defer server.Close()
  452. defer client.Close()
  453. mconn := createTestMConnection(client)
  454. err := mconn.Start()
  455. require.Nil(t, err)
  456. defer mconn.Stop() // nolint:errcheck // ignore for tests
  457. msg := []byte("Semicolon-Woman")
  458. resultCh := make(chan string, 2)
  459. assert.True(t, mconn.TrySend(0x01, msg))
  460. _, err = server.Read(make([]byte, len(msg)))
  461. require.NoError(t, err)
  462. assert.True(t, mconn.CanSend(0x01))
  463. assert.True(t, mconn.TrySend(0x01, msg))
  464. assert.False(t, mconn.CanSend(0x01))
  465. go func() {
  466. mconn.TrySend(0x01, msg)
  467. resultCh <- "TrySend"
  468. }()
  469. assert.False(t, mconn.CanSend(0x01))
  470. assert.False(t, mconn.TrySend(0x01, msg))
  471. assert.Equal(t, "TrySend", <-resultCh)
  472. }
  473. // nolint:lll //ignore line length for tests
  474. func TestConnVectors(t *testing.T) {
  475. testCases := []struct {
  476. testName string
  477. msg proto.Message
  478. expBytes string
  479. }{
  480. {"PacketPing", &tmp2p.PacketPing{}, "0a00"},
  481. {"PacketPong", &tmp2p.PacketPong{}, "1200"},
  482. {"PacketMsg", &tmp2p.PacketMsg{ChannelID: 1, EOF: false, Data: []byte("data transmitted over the wire")}, "1a2208011a1e64617461207472616e736d6974746564206f766572207468652077697265"},
  483. }
  484. for _, tc := range testCases {
  485. tc := tc
  486. pm := mustWrapPacket(tc.msg)
  487. bz, err := pm.Marshal()
  488. require.NoError(t, err, tc.testName)
  489. require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
  490. }
  491. }
  492. func TestMConnectionChannelOverflow(t *testing.T) {
  493. chOnErr := make(chan struct{})
  494. chOnRcv := make(chan struct{})
  495. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  496. t.Cleanup(stopAll(t, mconnClient, mconnServer))
  497. mconnServer.onReceive = func(chID byte, msgBytes []byte) {
  498. chOnRcv <- struct{}{}
  499. }
  500. client := mconnClient.conn
  501. protoWriter := protoio.NewDelimitedWriter(client)
  502. var packet = tmp2p.PacketMsg{
  503. ChannelID: 0x01,
  504. EOF: true,
  505. Data: []byte(`42`),
  506. }
  507. _, err := protoWriter.WriteMsg(mustWrapPacket(&packet))
  508. require.NoError(t, err)
  509. assert.True(t, expectSend(chOnRcv))
  510. packet.ChannelID = int32(1025)
  511. _, err = protoWriter.WriteMsg(mustWrapPacket(&packet))
  512. require.NoError(t, err)
  513. assert.False(t, expectSend(chOnRcv))
  514. }
  515. type stopper interface {
  516. Stop() error
  517. }
  518. func stopAll(t *testing.T, stoppers ...stopper) func() {
  519. return func() {
  520. for _, s := range stoppers {
  521. if err := s.Stop(); err != nil {
  522. t.Log(err)
  523. }
  524. }
  525. }
  526. }