You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

529 lines
14 KiB

  1. package conn
  2. import (
  3. "bytes"
  4. "net"
  5. "testing"
  6. "time"
  7. "github.com/fortytw2/leaktest"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. amino "github.com/tendermint/go-amino"
  11. "github.com/tendermint/tendermint/libs/log"
  12. )
  13. const maxPingPongPacketSize = 1024 // bytes
  14. func createTestMConnection(conn net.Conn) *MConnection {
  15. onReceive := func(chID byte, msgBytes []byte) {
  16. }
  17. onError := func(r interface{}) {
  18. }
  19. c := createMConnectionWithCallbacks(conn, onReceive, onError)
  20. c.SetLogger(log.TestingLogger())
  21. return c
  22. }
  23. func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection {
  24. cfg := DefaultMConnConfig()
  25. cfg.PingInterval = 90 * time.Millisecond
  26. cfg.PongTimeout = 45 * time.Millisecond
  27. chDescs := []*ChannelDescriptor{{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
  28. c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg)
  29. c.SetLogger(log.TestingLogger())
  30. return c
  31. }
  32. func TestMConnectionSendFlushStop(t *testing.T) {
  33. server, client := NetPipe()
  34. defer server.Close() // nolint: errcheck
  35. defer client.Close() // nolint: errcheck
  36. clientConn := createTestMConnection(client)
  37. err := clientConn.Start()
  38. require.Nil(t, err)
  39. defer clientConn.Stop()
  40. msg := []byte("abc")
  41. assert.True(t, clientConn.Send(0x01, msg))
  42. aminoMsgLength := 14
  43. // start the reader in a new routine, so we can flush
  44. errCh := make(chan error)
  45. go func() {
  46. msgB := make([]byte, aminoMsgLength)
  47. _, err := server.Read(msgB)
  48. if err != nil {
  49. t.Fatal(err)
  50. }
  51. errCh <- err
  52. }()
  53. // stop the conn - it should flush all conns
  54. clientConn.FlushStop()
  55. timer := time.NewTimer(3 * time.Second)
  56. select {
  57. case <-errCh:
  58. case <-timer.C:
  59. t.Error("timed out waiting for msgs to be read")
  60. }
  61. }
  62. func TestMConnectionSend(t *testing.T) {
  63. server, client := NetPipe()
  64. defer server.Close() // nolint: errcheck
  65. defer client.Close() // nolint: errcheck
  66. mconn := createTestMConnection(client)
  67. err := mconn.Start()
  68. require.Nil(t, err)
  69. defer mconn.Stop()
  70. msg := []byte("Ant-Man")
  71. assert.True(t, mconn.Send(0x01, msg))
  72. // Note: subsequent Send/TrySend calls could pass because we are reading from
  73. // the send queue in a separate goroutine.
  74. _, err = server.Read(make([]byte, len(msg)))
  75. if err != nil {
  76. t.Error(err)
  77. }
  78. assert.True(t, mconn.CanSend(0x01))
  79. msg = []byte("Spider-Man")
  80. assert.True(t, mconn.TrySend(0x01, msg))
  81. _, err = server.Read(make([]byte, len(msg)))
  82. if err != nil {
  83. t.Error(err)
  84. }
  85. assert.False(t, mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
  86. assert.False(t, mconn.Send(0x05, []byte("Absorbing Man")), "Send should return false because channel is unknown")
  87. }
  88. func TestMConnectionReceive(t *testing.T) {
  89. server, client := NetPipe()
  90. defer server.Close() // nolint: errcheck
  91. defer client.Close() // nolint: errcheck
  92. receivedCh := make(chan []byte)
  93. errorsCh := make(chan interface{})
  94. onReceive := func(chID byte, msgBytes []byte) {
  95. receivedCh <- msgBytes
  96. }
  97. onError := func(r interface{}) {
  98. errorsCh <- r
  99. }
  100. mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
  101. err := mconn1.Start()
  102. require.Nil(t, err)
  103. defer mconn1.Stop()
  104. mconn2 := createTestMConnection(server)
  105. err = mconn2.Start()
  106. require.Nil(t, err)
  107. defer mconn2.Stop()
  108. msg := []byte("Cyclops")
  109. assert.True(t, mconn2.Send(0x01, msg))
  110. select {
  111. case receivedBytes := <-receivedCh:
  112. assert.Equal(t, []byte(msg), receivedBytes)
  113. case err := <-errorsCh:
  114. t.Fatalf("Expected %s, got %+v", msg, err)
  115. case <-time.After(500 * time.Millisecond):
  116. t.Fatalf("Did not receive %s message in 500ms", msg)
  117. }
  118. }
  119. func TestMConnectionStatus(t *testing.T) {
  120. server, client := NetPipe()
  121. defer server.Close() // nolint: errcheck
  122. defer client.Close() // nolint: errcheck
  123. mconn := createTestMConnection(client)
  124. err := mconn.Start()
  125. require.Nil(t, err)
  126. defer mconn.Stop()
  127. status := mconn.Status()
  128. assert.NotNil(t, status)
  129. assert.Zero(t, status.Channels[0].SendQueueSize)
  130. }
  131. func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
  132. server, client := net.Pipe()
  133. defer server.Close()
  134. defer client.Close()
  135. receivedCh := make(chan []byte)
  136. errorsCh := make(chan interface{})
  137. onReceive := func(chID byte, msgBytes []byte) {
  138. receivedCh <- msgBytes
  139. }
  140. onError := func(r interface{}) {
  141. errorsCh <- r
  142. }
  143. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  144. err := mconn.Start()
  145. require.Nil(t, err)
  146. defer mconn.Stop()
  147. serverGotPing := make(chan struct{})
  148. go func() {
  149. // read ping
  150. var pkt PacketPing
  151. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  152. assert.Nil(t, err)
  153. serverGotPing <- struct{}{}
  154. }()
  155. <-serverGotPing
  156. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  157. select {
  158. case msgBytes := <-receivedCh:
  159. t.Fatalf("Expected error, but got %v", msgBytes)
  160. case err := <-errorsCh:
  161. assert.NotNil(t, err)
  162. case <-time.After(pongTimerExpired):
  163. t.Fatalf("Expected to receive error after %v", pongTimerExpired)
  164. }
  165. }
  166. func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) {
  167. server, client := net.Pipe()
  168. defer server.Close()
  169. defer client.Close()
  170. receivedCh := make(chan []byte)
  171. errorsCh := make(chan interface{})
  172. onReceive := func(chID byte, msgBytes []byte) {
  173. receivedCh <- msgBytes
  174. }
  175. onError := func(r interface{}) {
  176. errorsCh <- r
  177. }
  178. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  179. err := mconn.Start()
  180. require.Nil(t, err)
  181. defer mconn.Stop()
  182. // sending 3 pongs in a row (abuse)
  183. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  184. require.Nil(t, err)
  185. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  186. require.Nil(t, err)
  187. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  188. require.Nil(t, err)
  189. serverGotPing := make(chan struct{})
  190. go func() {
  191. // read ping (one byte)
  192. var (
  193. packet Packet
  194. err error
  195. )
  196. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &packet, maxPingPongPacketSize)
  197. require.Nil(t, err)
  198. serverGotPing <- struct{}{}
  199. // respond with pong
  200. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  201. require.Nil(t, err)
  202. }()
  203. <-serverGotPing
  204. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  205. select {
  206. case msgBytes := <-receivedCh:
  207. t.Fatalf("Expected no data, but got %v", msgBytes)
  208. case err := <-errorsCh:
  209. t.Fatalf("Expected no error, but got %v", err)
  210. case <-time.After(pongTimerExpired):
  211. assert.True(t, mconn.IsRunning())
  212. }
  213. }
  214. func TestMConnectionMultiplePings(t *testing.T) {
  215. server, client := net.Pipe()
  216. defer server.Close()
  217. defer client.Close()
  218. receivedCh := make(chan []byte)
  219. errorsCh := make(chan interface{})
  220. onReceive := func(chID byte, msgBytes []byte) {
  221. receivedCh <- msgBytes
  222. }
  223. onError := func(r interface{}) {
  224. errorsCh <- r
  225. }
  226. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  227. err := mconn.Start()
  228. require.Nil(t, err)
  229. defer mconn.Stop()
  230. // sending 3 pings in a row (abuse)
  231. // see https://github.com/tendermint/tendermint/issues/1190
  232. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
  233. require.Nil(t, err)
  234. var pkt PacketPong
  235. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  236. require.Nil(t, err)
  237. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
  238. require.Nil(t, err)
  239. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  240. require.Nil(t, err)
  241. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
  242. require.Nil(t, err)
  243. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  244. require.Nil(t, err)
  245. assert.True(t, mconn.IsRunning())
  246. }
  247. func TestMConnectionPingPongs(t *testing.T) {
  248. // check that we are not leaking any go-routines
  249. defer leaktest.CheckTimeout(t, 10*time.Second)()
  250. server, client := net.Pipe()
  251. defer server.Close()
  252. defer client.Close()
  253. receivedCh := make(chan []byte)
  254. errorsCh := make(chan interface{})
  255. onReceive := func(chID byte, msgBytes []byte) {
  256. receivedCh <- msgBytes
  257. }
  258. onError := func(r interface{}) {
  259. errorsCh <- r
  260. }
  261. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  262. err := mconn.Start()
  263. require.Nil(t, err)
  264. defer mconn.Stop()
  265. serverGotPing := make(chan struct{})
  266. go func() {
  267. // read ping
  268. var pkt PacketPing
  269. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  270. require.Nil(t, err)
  271. serverGotPing <- struct{}{}
  272. // respond with pong
  273. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  274. require.Nil(t, err)
  275. time.Sleep(mconn.config.PingInterval)
  276. // read ping
  277. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  278. require.Nil(t, err)
  279. // respond with pong
  280. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  281. require.Nil(t, err)
  282. }()
  283. <-serverGotPing
  284. pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 2
  285. select {
  286. case msgBytes := <-receivedCh:
  287. t.Fatalf("Expected no data, but got %v", msgBytes)
  288. case err := <-errorsCh:
  289. t.Fatalf("Expected no error, but got %v", err)
  290. case <-time.After(2 * pongTimerExpired):
  291. assert.True(t, mconn.IsRunning())
  292. }
  293. }
  294. func TestMConnectionStopsAndReturnsError(t *testing.T) {
  295. server, client := NetPipe()
  296. defer server.Close() // nolint: errcheck
  297. defer client.Close() // nolint: errcheck
  298. receivedCh := make(chan []byte)
  299. errorsCh := make(chan interface{})
  300. onReceive := func(chID byte, msgBytes []byte) {
  301. receivedCh <- msgBytes
  302. }
  303. onError := func(r interface{}) {
  304. errorsCh <- r
  305. }
  306. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  307. err := mconn.Start()
  308. require.Nil(t, err)
  309. defer mconn.Stop()
  310. if err := client.Close(); err != nil {
  311. t.Error(err)
  312. }
  313. select {
  314. case receivedBytes := <-receivedCh:
  315. t.Fatalf("Expected error, got %v", receivedBytes)
  316. case err := <-errorsCh:
  317. assert.NotNil(t, err)
  318. assert.False(t, mconn.IsRunning())
  319. case <-time.After(500 * time.Millisecond):
  320. t.Fatal("Did not receive error in 500ms")
  321. }
  322. }
  323. func newClientAndServerConnsForReadErrors(t *testing.T, chOnErr chan struct{}) (*MConnection, *MConnection) {
  324. server, client := NetPipe()
  325. onReceive := func(chID byte, msgBytes []byte) {}
  326. onError := func(r interface{}) {}
  327. // create client conn with two channels
  328. chDescs := []*ChannelDescriptor{
  329. {ID: 0x01, Priority: 1, SendQueueCapacity: 1},
  330. {ID: 0x02, Priority: 1, SendQueueCapacity: 1},
  331. }
  332. mconnClient := NewMConnection(client, chDescs, onReceive, onError)
  333. mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
  334. err := mconnClient.Start()
  335. require.Nil(t, err)
  336. // create server conn with 1 channel
  337. // it fires on chOnErr when there's an error
  338. serverLogger := log.TestingLogger().With("module", "server")
  339. onError = func(r interface{}) {
  340. chOnErr <- struct{}{}
  341. }
  342. mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
  343. mconnServer.SetLogger(serverLogger)
  344. err = mconnServer.Start()
  345. require.Nil(t, err)
  346. return mconnClient, mconnServer
  347. }
  348. func expectSend(ch chan struct{}) bool {
  349. after := time.After(time.Second * 5)
  350. select {
  351. case <-ch:
  352. return true
  353. case <-after:
  354. return false
  355. }
  356. }
  357. func TestMConnectionReadErrorBadEncoding(t *testing.T) {
  358. chOnErr := make(chan struct{})
  359. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  360. defer mconnClient.Stop()
  361. defer mconnServer.Stop()
  362. client := mconnClient.conn
  363. // send badly encoded msgPacket
  364. bz := cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{})
  365. bz[4] += 0x01 // Invalid prefix bytes.
  366. // Write it.
  367. _, err := client.Write(bz)
  368. assert.Nil(t, err)
  369. assert.True(t, expectSend(chOnErr), "badly encoded msgPacket")
  370. }
  371. func TestMConnectionReadErrorUnknownChannel(t *testing.T) {
  372. chOnErr := make(chan struct{})
  373. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  374. defer mconnClient.Stop()
  375. defer mconnServer.Stop()
  376. msg := []byte("Ant-Man")
  377. // fail to send msg on channel unknown by client
  378. assert.False(t, mconnClient.Send(0x03, msg))
  379. // send msg on channel unknown by the server.
  380. // should cause an error
  381. assert.True(t, mconnClient.Send(0x02, msg))
  382. assert.True(t, expectSend(chOnErr), "unknown channel")
  383. }
  384. func TestMConnectionReadErrorLongMessage(t *testing.T) {
  385. chOnErr := make(chan struct{})
  386. chOnRcv := make(chan struct{})
  387. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  388. defer mconnClient.Stop()
  389. defer mconnServer.Stop()
  390. mconnServer.onReceive = func(chID byte, msgBytes []byte) {
  391. chOnRcv <- struct{}{}
  392. }
  393. client := mconnClient.conn
  394. // send msg thats just right
  395. var err error
  396. var buf = new(bytes.Buffer)
  397. var packet = PacketMsg{
  398. ChannelID: 0x01,
  399. EOF: 1,
  400. Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize),
  401. }
  402. _, err = cdc.MarshalBinaryLengthPrefixedWriter(buf, packet)
  403. assert.Nil(t, err)
  404. _, err = client.Write(buf.Bytes())
  405. assert.Nil(t, err)
  406. assert.True(t, expectSend(chOnRcv), "msg just right")
  407. // send msg thats too long
  408. buf = new(bytes.Buffer)
  409. packet = PacketMsg{
  410. ChannelID: 0x01,
  411. EOF: 1,
  412. Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+100),
  413. }
  414. _, err = cdc.MarshalBinaryLengthPrefixedWriter(buf, packet)
  415. assert.Nil(t, err)
  416. _, err = client.Write(buf.Bytes())
  417. assert.NotNil(t, err)
  418. assert.True(t, expectSend(chOnErr), "msg too long")
  419. }
  420. func TestMConnectionReadErrorUnknownMsgType(t *testing.T) {
  421. chOnErr := make(chan struct{})
  422. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  423. defer mconnClient.Stop()
  424. defer mconnServer.Stop()
  425. // send msg with unknown msg type
  426. err := amino.EncodeUvarint(mconnClient.conn, 4)
  427. assert.Nil(t, err)
  428. _, err = mconnClient.conn.Write([]byte{0xFF, 0xFF, 0xFF, 0xFF})
  429. assert.Nil(t, err)
  430. assert.True(t, expectSend(chOnErr), "unknown msg type")
  431. }
  432. func TestMConnectionTrySend(t *testing.T) {
  433. server, client := NetPipe()
  434. defer server.Close()
  435. defer client.Close()
  436. mconn := createTestMConnection(client)
  437. err := mconn.Start()
  438. require.Nil(t, err)
  439. defer mconn.Stop()
  440. msg := []byte("Semicolon-Woman")
  441. resultCh := make(chan string, 2)
  442. assert.True(t, mconn.TrySend(0x01, msg))
  443. server.Read(make([]byte, len(msg)))
  444. assert.True(t, mconn.CanSend(0x01))
  445. assert.True(t, mconn.TrySend(0x01, msg))
  446. assert.False(t, mconn.CanSend(0x01))
  447. go func() {
  448. mconn.TrySend(0x01, msg)
  449. resultCh <- "TrySend"
  450. }()
  451. assert.False(t, mconn.CanSend(0x01))
  452. assert.False(t, mconn.TrySend(0x01, msg))
  453. assert.Equal(t, "TrySend", <-resultCh)
  454. }