You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

535 lines
14 KiB

  1. package conn
  2. import (
  3. "bytes"
  4. "net"
  5. "testing"
  6. "time"
  7. "github.com/fortytw2/leaktest"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. amino "github.com/tendermint/go-amino"
  11. "github.com/tendermint/tendermint/libs/log"
  12. )
  13. const maxPingPongPacketSize = 1024 // bytes
  14. func createTestMConnection(conn net.Conn) *MConnection {
  15. onReceive := func(chID byte, msgBytes []byte) {
  16. }
  17. onError := func(r interface{}) {
  18. }
  19. c := createMConnectionWithCallbacks(conn, onReceive, onError)
  20. c.SetLogger(log.TestingLogger())
  21. return c
  22. }
  23. func createMConnectionWithCallbacks(
  24. conn net.Conn,
  25. onReceive func(chID byte, msgBytes []byte),
  26. onError func(r interface{}),
  27. ) *MConnection {
  28. cfg := DefaultMConnConfig()
  29. cfg.PingInterval = 90 * time.Millisecond
  30. cfg.PongTimeout = 45 * time.Millisecond
  31. chDescs := []*ChannelDescriptor{{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
  32. c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg)
  33. c.SetLogger(log.TestingLogger())
  34. return c
  35. }
  36. func TestMConnectionSendFlushStop(t *testing.T) {
  37. server, client := NetPipe()
  38. defer server.Close() // nolint: errcheck
  39. defer client.Close() // nolint: errcheck
  40. clientConn := createTestMConnection(client)
  41. err := clientConn.Start()
  42. require.Nil(t, err)
  43. defer clientConn.Stop()
  44. msg := []byte("abc")
  45. assert.True(t, clientConn.Send(0x01, msg))
  46. aminoMsgLength := 14
  47. // start the reader in a new routine, so we can flush
  48. errCh := make(chan error)
  49. go func() {
  50. msgB := make([]byte, aminoMsgLength)
  51. _, err := server.Read(msgB)
  52. if err != nil {
  53. t.Error(err)
  54. return
  55. }
  56. errCh <- err
  57. }()
  58. // stop the conn - it should flush all conns
  59. clientConn.FlushStop()
  60. timer := time.NewTimer(3 * time.Second)
  61. select {
  62. case <-errCh:
  63. case <-timer.C:
  64. t.Error("timed out waiting for msgs to be read")
  65. }
  66. }
  67. func TestMConnectionSend(t *testing.T) {
  68. server, client := NetPipe()
  69. defer server.Close() // nolint: errcheck
  70. defer client.Close() // nolint: errcheck
  71. mconn := createTestMConnection(client)
  72. err := mconn.Start()
  73. require.Nil(t, err)
  74. defer mconn.Stop()
  75. msg := []byte("Ant-Man")
  76. assert.True(t, mconn.Send(0x01, msg))
  77. // Note: subsequent Send/TrySend calls could pass because we are reading from
  78. // the send queue in a separate goroutine.
  79. _, err = server.Read(make([]byte, len(msg)))
  80. if err != nil {
  81. t.Error(err)
  82. }
  83. assert.True(t, mconn.CanSend(0x01))
  84. msg = []byte("Spider-Man")
  85. assert.True(t, mconn.TrySend(0x01, msg))
  86. _, err = server.Read(make([]byte, len(msg)))
  87. if err != nil {
  88. t.Error(err)
  89. }
  90. assert.False(t, mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
  91. assert.False(t, mconn.Send(0x05, []byte("Absorbing Man")), "Send should return false because channel is unknown")
  92. }
  93. func TestMConnectionReceive(t *testing.T) {
  94. server, client := NetPipe()
  95. defer server.Close() // nolint: errcheck
  96. defer client.Close() // nolint: errcheck
  97. receivedCh := make(chan []byte)
  98. errorsCh := make(chan interface{})
  99. onReceive := func(chID byte, msgBytes []byte) {
  100. receivedCh <- msgBytes
  101. }
  102. onError := func(r interface{}) {
  103. errorsCh <- r
  104. }
  105. mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
  106. err := mconn1.Start()
  107. require.Nil(t, err)
  108. defer mconn1.Stop()
  109. mconn2 := createTestMConnection(server)
  110. err = mconn2.Start()
  111. require.Nil(t, err)
  112. defer mconn2.Stop()
  113. msg := []byte("Cyclops")
  114. assert.True(t, mconn2.Send(0x01, msg))
  115. select {
  116. case receivedBytes := <-receivedCh:
  117. assert.Equal(t, msg, receivedBytes)
  118. case err := <-errorsCh:
  119. t.Fatalf("Expected %s, got %+v", msg, err)
  120. case <-time.After(500 * time.Millisecond):
  121. t.Fatalf("Did not receive %s message in 500ms", msg)
  122. }
  123. }
  124. func TestMConnectionStatus(t *testing.T) {
  125. server, client := NetPipe()
  126. defer server.Close() // nolint: errcheck
  127. defer client.Close() // nolint: errcheck
  128. mconn := createTestMConnection(client)
  129. err := mconn.Start()
  130. require.Nil(t, err)
  131. defer mconn.Stop()
  132. status := mconn.Status()
  133. assert.NotNil(t, status)
  134. assert.Zero(t, status.Channels[0].SendQueueSize)
  135. }
  136. func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
  137. server, client := net.Pipe()
  138. defer server.Close()
  139. defer client.Close()
  140. receivedCh := make(chan []byte)
  141. errorsCh := make(chan interface{})
  142. onReceive := func(chID byte, msgBytes []byte) {
  143. receivedCh <- msgBytes
  144. }
  145. onError := func(r interface{}) {
  146. errorsCh <- r
  147. }
  148. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  149. err := mconn.Start()
  150. require.Nil(t, err)
  151. defer mconn.Stop()
  152. serverGotPing := make(chan struct{})
  153. go func() {
  154. // read ping
  155. var pkt PacketPing
  156. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  157. assert.Nil(t, err)
  158. serverGotPing <- struct{}{}
  159. }()
  160. <-serverGotPing
  161. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  162. select {
  163. case msgBytes := <-receivedCh:
  164. t.Fatalf("Expected error, but got %v", msgBytes)
  165. case err := <-errorsCh:
  166. assert.NotNil(t, err)
  167. case <-time.After(pongTimerExpired):
  168. t.Fatalf("Expected to receive error after %v", pongTimerExpired)
  169. }
  170. }
  171. func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) {
  172. server, client := net.Pipe()
  173. defer server.Close()
  174. defer client.Close()
  175. receivedCh := make(chan []byte)
  176. errorsCh := make(chan interface{})
  177. onReceive := func(chID byte, msgBytes []byte) {
  178. receivedCh <- msgBytes
  179. }
  180. onError := func(r interface{}) {
  181. errorsCh <- r
  182. }
  183. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  184. err := mconn.Start()
  185. require.Nil(t, err)
  186. defer mconn.Stop()
  187. // sending 3 pongs in a row (abuse)
  188. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  189. require.Nil(t, err)
  190. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  191. require.Nil(t, err)
  192. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  193. require.Nil(t, err)
  194. serverGotPing := make(chan struct{})
  195. go func() {
  196. // read ping (one byte)
  197. var (
  198. packet Packet
  199. err error
  200. )
  201. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &packet, maxPingPongPacketSize)
  202. require.Nil(t, err)
  203. serverGotPing <- struct{}{}
  204. // respond with pong
  205. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  206. require.Nil(t, err)
  207. }()
  208. <-serverGotPing
  209. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  210. select {
  211. case msgBytes := <-receivedCh:
  212. t.Fatalf("Expected no data, but got %v", msgBytes)
  213. case err := <-errorsCh:
  214. t.Fatalf("Expected no error, but got %v", err)
  215. case <-time.After(pongTimerExpired):
  216. assert.True(t, mconn.IsRunning())
  217. }
  218. }
  219. func TestMConnectionMultiplePings(t *testing.T) {
  220. server, client := net.Pipe()
  221. defer server.Close()
  222. defer client.Close()
  223. receivedCh := make(chan []byte)
  224. errorsCh := make(chan interface{})
  225. onReceive := func(chID byte, msgBytes []byte) {
  226. receivedCh <- msgBytes
  227. }
  228. onError := func(r interface{}) {
  229. errorsCh <- r
  230. }
  231. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  232. err := mconn.Start()
  233. require.Nil(t, err)
  234. defer mconn.Stop()
  235. // sending 3 pings in a row (abuse)
  236. // see https://github.com/tendermint/tendermint/issues/1190
  237. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
  238. require.Nil(t, err)
  239. var pkt PacketPong
  240. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  241. require.Nil(t, err)
  242. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
  243. require.Nil(t, err)
  244. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  245. require.Nil(t, err)
  246. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
  247. require.Nil(t, err)
  248. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  249. require.Nil(t, err)
  250. assert.True(t, mconn.IsRunning())
  251. }
  252. func TestMConnectionPingPongs(t *testing.T) {
  253. // check that we are not leaking any go-routines
  254. defer leaktest.CheckTimeout(t, 10*time.Second)()
  255. server, client := net.Pipe()
  256. defer server.Close()
  257. defer client.Close()
  258. receivedCh := make(chan []byte)
  259. errorsCh := make(chan interface{})
  260. onReceive := func(chID byte, msgBytes []byte) {
  261. receivedCh <- msgBytes
  262. }
  263. onError := func(r interface{}) {
  264. errorsCh <- r
  265. }
  266. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  267. err := mconn.Start()
  268. require.Nil(t, err)
  269. defer mconn.Stop()
  270. serverGotPing := make(chan struct{})
  271. go func() {
  272. // read ping
  273. var pkt PacketPing
  274. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  275. require.Nil(t, err)
  276. serverGotPing <- struct{}{}
  277. // respond with pong
  278. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  279. require.Nil(t, err)
  280. time.Sleep(mconn.config.PingInterval)
  281. // read ping
  282. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  283. require.Nil(t, err)
  284. // respond with pong
  285. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  286. require.Nil(t, err)
  287. }()
  288. <-serverGotPing
  289. pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 2
  290. select {
  291. case msgBytes := <-receivedCh:
  292. t.Fatalf("Expected no data, but got %v", msgBytes)
  293. case err := <-errorsCh:
  294. t.Fatalf("Expected no error, but got %v", err)
  295. case <-time.After(2 * pongTimerExpired):
  296. assert.True(t, mconn.IsRunning())
  297. }
  298. }
  299. func TestMConnectionStopsAndReturnsError(t *testing.T) {
  300. server, client := NetPipe()
  301. defer server.Close() // nolint: errcheck
  302. defer client.Close() // nolint: errcheck
  303. receivedCh := make(chan []byte)
  304. errorsCh := make(chan interface{})
  305. onReceive := func(chID byte, msgBytes []byte) {
  306. receivedCh <- msgBytes
  307. }
  308. onError := func(r interface{}) {
  309. errorsCh <- r
  310. }
  311. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  312. err := mconn.Start()
  313. require.Nil(t, err)
  314. defer mconn.Stop()
  315. if err := client.Close(); err != nil {
  316. t.Error(err)
  317. }
  318. select {
  319. case receivedBytes := <-receivedCh:
  320. t.Fatalf("Expected error, got %v", receivedBytes)
  321. case err := <-errorsCh:
  322. assert.NotNil(t, err)
  323. assert.False(t, mconn.IsRunning())
  324. case <-time.After(500 * time.Millisecond):
  325. t.Fatal("Did not receive error in 500ms")
  326. }
  327. }
  328. func newClientAndServerConnsForReadErrors(t *testing.T, chOnErr chan struct{}) (*MConnection, *MConnection) {
  329. server, client := NetPipe()
  330. onReceive := func(chID byte, msgBytes []byte) {}
  331. onError := func(r interface{}) {}
  332. // create client conn with two channels
  333. chDescs := []*ChannelDescriptor{
  334. {ID: 0x01, Priority: 1, SendQueueCapacity: 1},
  335. {ID: 0x02, Priority: 1, SendQueueCapacity: 1},
  336. }
  337. mconnClient := NewMConnection(client, chDescs, onReceive, onError)
  338. mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
  339. err := mconnClient.Start()
  340. require.Nil(t, err)
  341. // create server conn with 1 channel
  342. // it fires on chOnErr when there's an error
  343. serverLogger := log.TestingLogger().With("module", "server")
  344. onError = func(r interface{}) {
  345. chOnErr <- struct{}{}
  346. }
  347. mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
  348. mconnServer.SetLogger(serverLogger)
  349. err = mconnServer.Start()
  350. require.Nil(t, err)
  351. return mconnClient, mconnServer
  352. }
  353. func expectSend(ch chan struct{}) bool {
  354. after := time.After(time.Second * 5)
  355. select {
  356. case <-ch:
  357. return true
  358. case <-after:
  359. return false
  360. }
  361. }
  362. func TestMConnectionReadErrorBadEncoding(t *testing.T) {
  363. chOnErr := make(chan struct{})
  364. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  365. defer mconnClient.Stop()
  366. defer mconnServer.Stop()
  367. client := mconnClient.conn
  368. // send badly encoded msgPacket
  369. bz := cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{})
  370. bz[4] += 0x01 // Invalid prefix bytes.
  371. // Write it.
  372. _, err := client.Write(bz)
  373. assert.Nil(t, err)
  374. assert.True(t, expectSend(chOnErr), "badly encoded msgPacket")
  375. }
  376. func TestMConnectionReadErrorUnknownChannel(t *testing.T) {
  377. chOnErr := make(chan struct{})
  378. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  379. defer mconnClient.Stop()
  380. defer mconnServer.Stop()
  381. msg := []byte("Ant-Man")
  382. // fail to send msg on channel unknown by client
  383. assert.False(t, mconnClient.Send(0x03, msg))
  384. // send msg on channel unknown by the server.
  385. // should cause an error
  386. assert.True(t, mconnClient.Send(0x02, msg))
  387. assert.True(t, expectSend(chOnErr), "unknown channel")
  388. }
  389. func TestMConnectionReadErrorLongMessage(t *testing.T) {
  390. chOnErr := make(chan struct{})
  391. chOnRcv := make(chan struct{})
  392. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  393. defer mconnClient.Stop()
  394. defer mconnServer.Stop()
  395. mconnServer.onReceive = func(chID byte, msgBytes []byte) {
  396. chOnRcv <- struct{}{}
  397. }
  398. client := mconnClient.conn
  399. // send msg thats just right
  400. var err error
  401. var buf = new(bytes.Buffer)
  402. var packet = PacketMsg{
  403. ChannelID: 0x01,
  404. EOF: 1,
  405. Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize),
  406. }
  407. _, err = cdc.MarshalBinaryLengthPrefixedWriter(buf, packet)
  408. assert.Nil(t, err)
  409. _, err = client.Write(buf.Bytes())
  410. assert.Nil(t, err)
  411. assert.True(t, expectSend(chOnRcv), "msg just right")
  412. // send msg thats too long
  413. buf = new(bytes.Buffer)
  414. packet = PacketMsg{
  415. ChannelID: 0x01,
  416. EOF: 1,
  417. Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+100),
  418. }
  419. _, err = cdc.MarshalBinaryLengthPrefixedWriter(buf, packet)
  420. assert.Nil(t, err)
  421. _, err = client.Write(buf.Bytes())
  422. assert.NotNil(t, err)
  423. assert.True(t, expectSend(chOnErr), "msg too long")
  424. }
  425. func TestMConnectionReadErrorUnknownMsgType(t *testing.T) {
  426. chOnErr := make(chan struct{})
  427. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  428. defer mconnClient.Stop()
  429. defer mconnServer.Stop()
  430. // send msg with unknown msg type
  431. err := amino.EncodeUvarint(mconnClient.conn, 4)
  432. assert.Nil(t, err)
  433. _, err = mconnClient.conn.Write([]byte{0xFF, 0xFF, 0xFF, 0xFF})
  434. assert.Nil(t, err)
  435. assert.True(t, expectSend(chOnErr), "unknown msg type")
  436. }
  437. func TestMConnectionTrySend(t *testing.T) {
  438. server, client := NetPipe()
  439. defer server.Close()
  440. defer client.Close()
  441. mconn := createTestMConnection(client)
  442. err := mconn.Start()
  443. require.Nil(t, err)
  444. defer mconn.Stop()
  445. msg := []byte("Semicolon-Woman")
  446. resultCh := make(chan string, 2)
  447. assert.True(t, mconn.TrySend(0x01, msg))
  448. server.Read(make([]byte, len(msg)))
  449. assert.True(t, mconn.CanSend(0x01))
  450. assert.True(t, mconn.TrySend(0x01, msg))
  451. assert.False(t, mconn.CanSend(0x01))
  452. go func() {
  453. mconn.TrySend(0x01, msg)
  454. resultCh <- "TrySend"
  455. }()
  456. assert.False(t, mconn.CanSend(0x01))
  457. assert.False(t, mconn.TrySend(0x01, msg))
  458. assert.Equal(t, "TrySend", <-resultCh)
  459. }