You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

530 lines
14 KiB

  1. package conn
  2. import (
  3. "bytes"
  4. "net"
  5. "testing"
  6. "time"
  7. "github.com/fortytw2/leaktest"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. amino "github.com/tendermint/go-amino"
  11. "github.com/tendermint/tendermint/libs/log"
  12. )
  13. const maxPingPongPacketSize = 1024 // bytes
  14. func createTestMConnection(conn net.Conn) *MConnection {
  15. onReceive := func(chID byte, msgBytes []byte) {
  16. }
  17. onError := func(r interface{}) {
  18. }
  19. c := createMConnectionWithCallbacks(conn, onReceive, onError)
  20. c.SetLogger(log.TestingLogger())
  21. return c
  22. }
  23. func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection {
  24. cfg := DefaultMConnConfig()
  25. cfg.PingInterval = 90 * time.Millisecond
  26. cfg.PongTimeout = 45 * time.Millisecond
  27. chDescs := []*ChannelDescriptor{{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
  28. c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg)
  29. c.SetLogger(log.TestingLogger())
  30. return c
  31. }
  32. func TestMConnectionSendFlushStop(t *testing.T) {
  33. server, client := NetPipe()
  34. defer server.Close() // nolint: errcheck
  35. defer client.Close() // nolint: errcheck
  36. clientConn := createTestMConnection(client)
  37. err := clientConn.Start()
  38. require.Nil(t, err)
  39. defer clientConn.Stop()
  40. msg := []byte("abc")
  41. assert.True(t, clientConn.Send(0x01, msg))
  42. aminoMsgLength := 14
  43. // start the reader in a new routine, so we can flush
  44. errCh := make(chan error)
  45. go func() {
  46. msgB := make([]byte, aminoMsgLength)
  47. _, err := server.Read(msgB)
  48. if err != nil {
  49. t.Error(err)
  50. return
  51. }
  52. errCh <- err
  53. }()
  54. // stop the conn - it should flush all conns
  55. clientConn.FlushStop()
  56. timer := time.NewTimer(3 * time.Second)
  57. select {
  58. case <-errCh:
  59. case <-timer.C:
  60. t.Error("timed out waiting for msgs to be read")
  61. }
  62. }
  63. func TestMConnectionSend(t *testing.T) {
  64. server, client := NetPipe()
  65. defer server.Close() // nolint: errcheck
  66. defer client.Close() // nolint: errcheck
  67. mconn := createTestMConnection(client)
  68. err := mconn.Start()
  69. require.Nil(t, err)
  70. defer mconn.Stop()
  71. msg := []byte("Ant-Man")
  72. assert.True(t, mconn.Send(0x01, msg))
  73. // Note: subsequent Send/TrySend calls could pass because we are reading from
  74. // the send queue in a separate goroutine.
  75. _, err = server.Read(make([]byte, len(msg)))
  76. if err != nil {
  77. t.Error(err)
  78. }
  79. assert.True(t, mconn.CanSend(0x01))
  80. msg = []byte("Spider-Man")
  81. assert.True(t, mconn.TrySend(0x01, msg))
  82. _, err = server.Read(make([]byte, len(msg)))
  83. if err != nil {
  84. t.Error(err)
  85. }
  86. assert.False(t, mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
  87. assert.False(t, mconn.Send(0x05, []byte("Absorbing Man")), "Send should return false because channel is unknown")
  88. }
  89. func TestMConnectionReceive(t *testing.T) {
  90. server, client := NetPipe()
  91. defer server.Close() // nolint: errcheck
  92. defer client.Close() // nolint: errcheck
  93. receivedCh := make(chan []byte)
  94. errorsCh := make(chan interface{})
  95. onReceive := func(chID byte, msgBytes []byte) {
  96. receivedCh <- msgBytes
  97. }
  98. onError := func(r interface{}) {
  99. errorsCh <- r
  100. }
  101. mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
  102. err := mconn1.Start()
  103. require.Nil(t, err)
  104. defer mconn1.Stop()
  105. mconn2 := createTestMConnection(server)
  106. err = mconn2.Start()
  107. require.Nil(t, err)
  108. defer mconn2.Stop()
  109. msg := []byte("Cyclops")
  110. assert.True(t, mconn2.Send(0x01, msg))
  111. select {
  112. case receivedBytes := <-receivedCh:
  113. assert.Equal(t, []byte(msg), receivedBytes)
  114. case err := <-errorsCh:
  115. t.Fatalf("Expected %s, got %+v", msg, err)
  116. case <-time.After(500 * time.Millisecond):
  117. t.Fatalf("Did not receive %s message in 500ms", msg)
  118. }
  119. }
  120. func TestMConnectionStatus(t *testing.T) {
  121. server, client := NetPipe()
  122. defer server.Close() // nolint: errcheck
  123. defer client.Close() // nolint: errcheck
  124. mconn := createTestMConnection(client)
  125. err := mconn.Start()
  126. require.Nil(t, err)
  127. defer mconn.Stop()
  128. status := mconn.Status()
  129. assert.NotNil(t, status)
  130. assert.Zero(t, status.Channels[0].SendQueueSize)
  131. }
  132. func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
  133. server, client := net.Pipe()
  134. defer server.Close()
  135. defer client.Close()
  136. receivedCh := make(chan []byte)
  137. errorsCh := make(chan interface{})
  138. onReceive := func(chID byte, msgBytes []byte) {
  139. receivedCh <- msgBytes
  140. }
  141. onError := func(r interface{}) {
  142. errorsCh <- r
  143. }
  144. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  145. err := mconn.Start()
  146. require.Nil(t, err)
  147. defer mconn.Stop()
  148. serverGotPing := make(chan struct{})
  149. go func() {
  150. // read ping
  151. var pkt PacketPing
  152. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  153. assert.Nil(t, err)
  154. serverGotPing <- struct{}{}
  155. }()
  156. <-serverGotPing
  157. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  158. select {
  159. case msgBytes := <-receivedCh:
  160. t.Fatalf("Expected error, but got %v", msgBytes)
  161. case err := <-errorsCh:
  162. assert.NotNil(t, err)
  163. case <-time.After(pongTimerExpired):
  164. t.Fatalf("Expected to receive error after %v", pongTimerExpired)
  165. }
  166. }
  167. func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) {
  168. server, client := net.Pipe()
  169. defer server.Close()
  170. defer client.Close()
  171. receivedCh := make(chan []byte)
  172. errorsCh := make(chan interface{})
  173. onReceive := func(chID byte, msgBytes []byte) {
  174. receivedCh <- msgBytes
  175. }
  176. onError := func(r interface{}) {
  177. errorsCh <- r
  178. }
  179. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  180. err := mconn.Start()
  181. require.Nil(t, err)
  182. defer mconn.Stop()
  183. // sending 3 pongs in a row (abuse)
  184. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  185. require.Nil(t, err)
  186. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  187. require.Nil(t, err)
  188. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  189. require.Nil(t, err)
  190. serverGotPing := make(chan struct{})
  191. go func() {
  192. // read ping (one byte)
  193. var (
  194. packet Packet
  195. err error
  196. )
  197. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &packet, maxPingPongPacketSize)
  198. require.Nil(t, err)
  199. serverGotPing <- struct{}{}
  200. // respond with pong
  201. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  202. require.Nil(t, err)
  203. }()
  204. <-serverGotPing
  205. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  206. select {
  207. case msgBytes := <-receivedCh:
  208. t.Fatalf("Expected no data, but got %v", msgBytes)
  209. case err := <-errorsCh:
  210. t.Fatalf("Expected no error, but got %v", err)
  211. case <-time.After(pongTimerExpired):
  212. assert.True(t, mconn.IsRunning())
  213. }
  214. }
  215. func TestMConnectionMultiplePings(t *testing.T) {
  216. server, client := net.Pipe()
  217. defer server.Close()
  218. defer client.Close()
  219. receivedCh := make(chan []byte)
  220. errorsCh := make(chan interface{})
  221. onReceive := func(chID byte, msgBytes []byte) {
  222. receivedCh <- msgBytes
  223. }
  224. onError := func(r interface{}) {
  225. errorsCh <- r
  226. }
  227. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  228. err := mconn.Start()
  229. require.Nil(t, err)
  230. defer mconn.Stop()
  231. // sending 3 pings in a row (abuse)
  232. // see https://github.com/tendermint/tendermint/issues/1190
  233. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
  234. require.Nil(t, err)
  235. var pkt PacketPong
  236. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  237. require.Nil(t, err)
  238. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
  239. require.Nil(t, err)
  240. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  241. require.Nil(t, err)
  242. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
  243. require.Nil(t, err)
  244. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  245. require.Nil(t, err)
  246. assert.True(t, mconn.IsRunning())
  247. }
  248. func TestMConnectionPingPongs(t *testing.T) {
  249. // check that we are not leaking any go-routines
  250. defer leaktest.CheckTimeout(t, 10*time.Second)()
  251. server, client := net.Pipe()
  252. defer server.Close()
  253. defer client.Close()
  254. receivedCh := make(chan []byte)
  255. errorsCh := make(chan interface{})
  256. onReceive := func(chID byte, msgBytes []byte) {
  257. receivedCh <- msgBytes
  258. }
  259. onError := func(r interface{}) {
  260. errorsCh <- r
  261. }
  262. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  263. err := mconn.Start()
  264. require.Nil(t, err)
  265. defer mconn.Stop()
  266. serverGotPing := make(chan struct{})
  267. go func() {
  268. // read ping
  269. var pkt PacketPing
  270. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  271. require.Nil(t, err)
  272. serverGotPing <- struct{}{}
  273. // respond with pong
  274. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  275. require.Nil(t, err)
  276. time.Sleep(mconn.config.PingInterval)
  277. // read ping
  278. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  279. require.Nil(t, err)
  280. // respond with pong
  281. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  282. require.Nil(t, err)
  283. }()
  284. <-serverGotPing
  285. pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 2
  286. select {
  287. case msgBytes := <-receivedCh:
  288. t.Fatalf("Expected no data, but got %v", msgBytes)
  289. case err := <-errorsCh:
  290. t.Fatalf("Expected no error, but got %v", err)
  291. case <-time.After(2 * pongTimerExpired):
  292. assert.True(t, mconn.IsRunning())
  293. }
  294. }
  295. func TestMConnectionStopsAndReturnsError(t *testing.T) {
  296. server, client := NetPipe()
  297. defer server.Close() // nolint: errcheck
  298. defer client.Close() // nolint: errcheck
  299. receivedCh := make(chan []byte)
  300. errorsCh := make(chan interface{})
  301. onReceive := func(chID byte, msgBytes []byte) {
  302. receivedCh <- msgBytes
  303. }
  304. onError := func(r interface{}) {
  305. errorsCh <- r
  306. }
  307. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  308. err := mconn.Start()
  309. require.Nil(t, err)
  310. defer mconn.Stop()
  311. if err := client.Close(); err != nil {
  312. t.Error(err)
  313. }
  314. select {
  315. case receivedBytes := <-receivedCh:
  316. t.Fatalf("Expected error, got %v", receivedBytes)
  317. case err := <-errorsCh:
  318. assert.NotNil(t, err)
  319. assert.False(t, mconn.IsRunning())
  320. case <-time.After(500 * time.Millisecond):
  321. t.Fatal("Did not receive error in 500ms")
  322. }
  323. }
  324. func newClientAndServerConnsForReadErrors(t *testing.T, chOnErr chan struct{}) (*MConnection, *MConnection) {
  325. server, client := NetPipe()
  326. onReceive := func(chID byte, msgBytes []byte) {}
  327. onError := func(r interface{}) {}
  328. // create client conn with two channels
  329. chDescs := []*ChannelDescriptor{
  330. {ID: 0x01, Priority: 1, SendQueueCapacity: 1},
  331. {ID: 0x02, Priority: 1, SendQueueCapacity: 1},
  332. }
  333. mconnClient := NewMConnection(client, chDescs, onReceive, onError)
  334. mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
  335. err := mconnClient.Start()
  336. require.Nil(t, err)
  337. // create server conn with 1 channel
  338. // it fires on chOnErr when there's an error
  339. serverLogger := log.TestingLogger().With("module", "server")
  340. onError = func(r interface{}) {
  341. chOnErr <- struct{}{}
  342. }
  343. mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
  344. mconnServer.SetLogger(serverLogger)
  345. err = mconnServer.Start()
  346. require.Nil(t, err)
  347. return mconnClient, mconnServer
  348. }
  349. func expectSend(ch chan struct{}) bool {
  350. after := time.After(time.Second * 5)
  351. select {
  352. case <-ch:
  353. return true
  354. case <-after:
  355. return false
  356. }
  357. }
  358. func TestMConnectionReadErrorBadEncoding(t *testing.T) {
  359. chOnErr := make(chan struct{})
  360. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  361. defer mconnClient.Stop()
  362. defer mconnServer.Stop()
  363. client := mconnClient.conn
  364. // send badly encoded msgPacket
  365. bz := cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{})
  366. bz[4] += 0x01 // Invalid prefix bytes.
  367. // Write it.
  368. _, err := client.Write(bz)
  369. assert.Nil(t, err)
  370. assert.True(t, expectSend(chOnErr), "badly encoded msgPacket")
  371. }
  372. func TestMConnectionReadErrorUnknownChannel(t *testing.T) {
  373. chOnErr := make(chan struct{})
  374. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  375. defer mconnClient.Stop()
  376. defer mconnServer.Stop()
  377. msg := []byte("Ant-Man")
  378. // fail to send msg on channel unknown by client
  379. assert.False(t, mconnClient.Send(0x03, msg))
  380. // send msg on channel unknown by the server.
  381. // should cause an error
  382. assert.True(t, mconnClient.Send(0x02, msg))
  383. assert.True(t, expectSend(chOnErr), "unknown channel")
  384. }
  385. func TestMConnectionReadErrorLongMessage(t *testing.T) {
  386. chOnErr := make(chan struct{})
  387. chOnRcv := make(chan struct{})
  388. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  389. defer mconnClient.Stop()
  390. defer mconnServer.Stop()
  391. mconnServer.onReceive = func(chID byte, msgBytes []byte) {
  392. chOnRcv <- struct{}{}
  393. }
  394. client := mconnClient.conn
  395. // send msg thats just right
  396. var err error
  397. var buf = new(bytes.Buffer)
  398. var packet = PacketMsg{
  399. ChannelID: 0x01,
  400. EOF: 1,
  401. Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize),
  402. }
  403. _, err = cdc.MarshalBinaryLengthPrefixedWriter(buf, packet)
  404. assert.Nil(t, err)
  405. _, err = client.Write(buf.Bytes())
  406. assert.Nil(t, err)
  407. assert.True(t, expectSend(chOnRcv), "msg just right")
  408. // send msg thats too long
  409. buf = new(bytes.Buffer)
  410. packet = PacketMsg{
  411. ChannelID: 0x01,
  412. EOF: 1,
  413. Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+100),
  414. }
  415. _, err = cdc.MarshalBinaryLengthPrefixedWriter(buf, packet)
  416. assert.Nil(t, err)
  417. _, err = client.Write(buf.Bytes())
  418. assert.NotNil(t, err)
  419. assert.True(t, expectSend(chOnErr), "msg too long")
  420. }
  421. func TestMConnectionReadErrorUnknownMsgType(t *testing.T) {
  422. chOnErr := make(chan struct{})
  423. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  424. defer mconnClient.Stop()
  425. defer mconnServer.Stop()
  426. // send msg with unknown msg type
  427. err := amino.EncodeUvarint(mconnClient.conn, 4)
  428. assert.Nil(t, err)
  429. _, err = mconnClient.conn.Write([]byte{0xFF, 0xFF, 0xFF, 0xFF})
  430. assert.Nil(t, err)
  431. assert.True(t, expectSend(chOnErr), "unknown msg type")
  432. }
  433. func TestMConnectionTrySend(t *testing.T) {
  434. server, client := NetPipe()
  435. defer server.Close()
  436. defer client.Close()
  437. mconn := createTestMConnection(client)
  438. err := mconn.Start()
  439. require.Nil(t, err)
  440. defer mconn.Stop()
  441. msg := []byte("Semicolon-Woman")
  442. resultCh := make(chan string, 2)
  443. assert.True(t, mconn.TrySend(0x01, msg))
  444. server.Read(make([]byte, len(msg)))
  445. assert.True(t, mconn.CanSend(0x01))
  446. assert.True(t, mconn.TrySend(0x01, msg))
  447. assert.False(t, mconn.CanSend(0x01))
  448. go func() {
  449. mconn.TrySend(0x01, msg)
  450. resultCh <- "TrySend"
  451. }()
  452. assert.False(t, mconn.CanSend(0x01))
  453. assert.False(t, mconn.TrySend(0x01, msg))
  454. assert.Equal(t, "TrySend", <-resultCh)
  455. }