You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

490 lines
14 KiB

  1. package conn
  2. import (
  3. "bytes"
  4. "net"
  5. "testing"
  6. "time"
  7. "github.com/fortytw2/leaktest"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. amino "github.com/tendermint/go-amino"
  11. "github.com/tendermint/tendermint/libs/log"
  12. )
  13. const maxPingPongPacketSize = 1024 // bytes
  14. func createTestMConnection(conn net.Conn) *MConnection {
  15. onReceive := func(chID byte, msgBytes []byte) {
  16. }
  17. onError := func(r interface{}) {
  18. }
  19. c := createMConnectionWithCallbacks(conn, onReceive, onError)
  20. c.SetLogger(log.TestingLogger())
  21. return c
  22. }
  23. func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection {
  24. cfg := DefaultMConnConfig()
  25. cfg.PingInterval = 90 * time.Millisecond
  26. cfg.PongTimeout = 45 * time.Millisecond
  27. chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
  28. c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg)
  29. c.SetLogger(log.TestingLogger())
  30. return c
  31. }
  32. func TestMConnectionSend(t *testing.T) {
  33. server, client := NetPipe()
  34. defer server.Close() // nolint: errcheck
  35. defer client.Close() // nolint: errcheck
  36. mconn := createTestMConnection(client)
  37. err := mconn.Start()
  38. require.Nil(t, err)
  39. defer mconn.Stop()
  40. msg := []byte("Ant-Man")
  41. assert.True(t, mconn.Send(0x01, msg))
  42. // Note: subsequent Send/TrySend calls could pass because we are reading from
  43. // the send queue in a separate goroutine.
  44. _, err = server.Read(make([]byte, len(msg)))
  45. if err != nil {
  46. t.Error(err)
  47. }
  48. assert.True(t, mconn.CanSend(0x01))
  49. msg = []byte("Spider-Man")
  50. assert.True(t, mconn.TrySend(0x01, msg))
  51. _, err = server.Read(make([]byte, len(msg)))
  52. if err != nil {
  53. t.Error(err)
  54. }
  55. assert.False(t, mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
  56. assert.False(t, mconn.Send(0x05, []byte("Absorbing Man")), "Send should return false because channel is unknown")
  57. }
  58. func TestMConnectionReceive(t *testing.T) {
  59. server, client := NetPipe()
  60. defer server.Close() // nolint: errcheck
  61. defer client.Close() // nolint: errcheck
  62. receivedCh := make(chan []byte)
  63. errorsCh := make(chan interface{})
  64. onReceive := func(chID byte, msgBytes []byte) {
  65. receivedCh <- msgBytes
  66. }
  67. onError := func(r interface{}) {
  68. errorsCh <- r
  69. }
  70. mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
  71. err := mconn1.Start()
  72. require.Nil(t, err)
  73. defer mconn1.Stop()
  74. mconn2 := createTestMConnection(server)
  75. err = mconn2.Start()
  76. require.Nil(t, err)
  77. defer mconn2.Stop()
  78. msg := []byte("Cyclops")
  79. assert.True(t, mconn2.Send(0x01, msg))
  80. select {
  81. case receivedBytes := <-receivedCh:
  82. assert.Equal(t, []byte(msg), receivedBytes)
  83. case err := <-errorsCh:
  84. t.Fatalf("Expected %s, got %+v", msg, err)
  85. case <-time.After(500 * time.Millisecond):
  86. t.Fatalf("Did not receive %s message in 500ms", msg)
  87. }
  88. }
  89. func TestMConnectionStatus(t *testing.T) {
  90. server, client := NetPipe()
  91. defer server.Close() // nolint: errcheck
  92. defer client.Close() // nolint: errcheck
  93. mconn := createTestMConnection(client)
  94. err := mconn.Start()
  95. require.Nil(t, err)
  96. defer mconn.Stop()
  97. status := mconn.Status()
  98. assert.NotNil(t, status)
  99. assert.Zero(t, status.Channels[0].SendQueueSize)
  100. }
  101. func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
  102. server, client := net.Pipe()
  103. defer server.Close()
  104. defer client.Close()
  105. receivedCh := make(chan []byte)
  106. errorsCh := make(chan interface{})
  107. onReceive := func(chID byte, msgBytes []byte) {
  108. receivedCh <- msgBytes
  109. }
  110. onError := func(r interface{}) {
  111. errorsCh <- r
  112. }
  113. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  114. err := mconn.Start()
  115. require.Nil(t, err)
  116. defer mconn.Stop()
  117. serverGotPing := make(chan struct{})
  118. go func() {
  119. // read ping
  120. var pkt PacketPing
  121. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  122. assert.Nil(t, err)
  123. serverGotPing <- struct{}{}
  124. }()
  125. <-serverGotPing
  126. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  127. select {
  128. case msgBytes := <-receivedCh:
  129. t.Fatalf("Expected error, but got %v", msgBytes)
  130. case err := <-errorsCh:
  131. assert.NotNil(t, err)
  132. case <-time.After(pongTimerExpired):
  133. t.Fatalf("Expected to receive error after %v", pongTimerExpired)
  134. }
  135. }
  136. func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) {
  137. server, client := net.Pipe()
  138. defer server.Close()
  139. defer client.Close()
  140. receivedCh := make(chan []byte)
  141. errorsCh := make(chan interface{})
  142. onReceive := func(chID byte, msgBytes []byte) {
  143. receivedCh <- msgBytes
  144. }
  145. onError := func(r interface{}) {
  146. errorsCh <- r
  147. }
  148. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  149. err := mconn.Start()
  150. require.Nil(t, err)
  151. defer mconn.Stop()
  152. // sending 3 pongs in a row (abuse)
  153. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  154. require.Nil(t, err)
  155. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  156. require.Nil(t, err)
  157. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  158. require.Nil(t, err)
  159. serverGotPing := make(chan struct{})
  160. go func() {
  161. // read ping (one byte)
  162. var packet, err = Packet(nil), error(nil)
  163. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &packet, maxPingPongPacketSize)
  164. require.Nil(t, err)
  165. serverGotPing <- struct{}{}
  166. // respond with pong
  167. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  168. require.Nil(t, err)
  169. }()
  170. <-serverGotPing
  171. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  172. select {
  173. case msgBytes := <-receivedCh:
  174. t.Fatalf("Expected no data, but got %v", msgBytes)
  175. case err := <-errorsCh:
  176. t.Fatalf("Expected no error, but got %v", err)
  177. case <-time.After(pongTimerExpired):
  178. assert.True(t, mconn.IsRunning())
  179. }
  180. }
  181. func TestMConnectionMultiplePings(t *testing.T) {
  182. server, client := net.Pipe()
  183. defer server.Close()
  184. defer client.Close()
  185. receivedCh := make(chan []byte)
  186. errorsCh := make(chan interface{})
  187. onReceive := func(chID byte, msgBytes []byte) {
  188. receivedCh <- msgBytes
  189. }
  190. onError := func(r interface{}) {
  191. errorsCh <- r
  192. }
  193. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  194. err := mconn.Start()
  195. require.Nil(t, err)
  196. defer mconn.Stop()
  197. // sending 3 pings in a row (abuse)
  198. // see https://github.com/tendermint/tendermint/issues/1190
  199. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
  200. require.Nil(t, err)
  201. var pkt PacketPong
  202. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  203. require.Nil(t, err)
  204. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
  205. require.Nil(t, err)
  206. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  207. require.Nil(t, err)
  208. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
  209. require.Nil(t, err)
  210. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  211. require.Nil(t, err)
  212. assert.True(t, mconn.IsRunning())
  213. }
  214. func TestMConnectionPingPongs(t *testing.T) {
  215. // check that we are not leaking any go-routines
  216. defer leaktest.CheckTimeout(t, 10*time.Second)()
  217. server, client := net.Pipe()
  218. defer server.Close()
  219. defer client.Close()
  220. receivedCh := make(chan []byte)
  221. errorsCh := make(chan interface{})
  222. onReceive := func(chID byte, msgBytes []byte) {
  223. receivedCh <- msgBytes
  224. }
  225. onError := func(r interface{}) {
  226. errorsCh <- r
  227. }
  228. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  229. err := mconn.Start()
  230. require.Nil(t, err)
  231. defer mconn.Stop()
  232. serverGotPing := make(chan struct{})
  233. go func() {
  234. // read ping
  235. var pkt PacketPing
  236. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  237. require.Nil(t, err)
  238. serverGotPing <- struct{}{}
  239. // respond with pong
  240. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  241. require.Nil(t, err)
  242. time.Sleep(mconn.config.PingInterval)
  243. // read ping
  244. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
  245. require.Nil(t, err)
  246. // respond with pong
  247. _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
  248. require.Nil(t, err)
  249. }()
  250. <-serverGotPing
  251. pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 2
  252. select {
  253. case msgBytes := <-receivedCh:
  254. t.Fatalf("Expected no data, but got %v", msgBytes)
  255. case err := <-errorsCh:
  256. t.Fatalf("Expected no error, but got %v", err)
  257. case <-time.After(2 * pongTimerExpired):
  258. assert.True(t, mconn.IsRunning())
  259. }
  260. }
  261. func TestMConnectionStopsAndReturnsError(t *testing.T) {
  262. server, client := NetPipe()
  263. defer server.Close() // nolint: errcheck
  264. defer client.Close() // nolint: errcheck
  265. receivedCh := make(chan []byte)
  266. errorsCh := make(chan interface{})
  267. onReceive := func(chID byte, msgBytes []byte) {
  268. receivedCh <- msgBytes
  269. }
  270. onError := func(r interface{}) {
  271. errorsCh <- r
  272. }
  273. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  274. err := mconn.Start()
  275. require.Nil(t, err)
  276. defer mconn.Stop()
  277. if err := client.Close(); err != nil {
  278. t.Error(err)
  279. }
  280. select {
  281. case receivedBytes := <-receivedCh:
  282. t.Fatalf("Expected error, got %v", receivedBytes)
  283. case err := <-errorsCh:
  284. assert.NotNil(t, err)
  285. assert.False(t, mconn.IsRunning())
  286. case <-time.After(500 * time.Millisecond):
  287. t.Fatal("Did not receive error in 500ms")
  288. }
  289. }
  290. func newClientAndServerConnsForReadErrors(t *testing.T, chOnErr chan struct{}) (*MConnection, *MConnection) {
  291. server, client := NetPipe()
  292. onReceive := func(chID byte, msgBytes []byte) {}
  293. onError := func(r interface{}) {}
  294. // create client conn with two channels
  295. chDescs := []*ChannelDescriptor{
  296. {ID: 0x01, Priority: 1, SendQueueCapacity: 1},
  297. {ID: 0x02, Priority: 1, SendQueueCapacity: 1},
  298. }
  299. mconnClient := NewMConnection(client, chDescs, onReceive, onError)
  300. mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
  301. err := mconnClient.Start()
  302. require.Nil(t, err)
  303. // create server conn with 1 channel
  304. // it fires on chOnErr when there's an error
  305. serverLogger := log.TestingLogger().With("module", "server")
  306. onError = func(r interface{}) {
  307. chOnErr <- struct{}{}
  308. }
  309. mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
  310. mconnServer.SetLogger(serverLogger)
  311. err = mconnServer.Start()
  312. require.Nil(t, err)
  313. return mconnClient, mconnServer
  314. }
  315. func expectSend(ch chan struct{}) bool {
  316. after := time.After(time.Second * 5)
  317. select {
  318. case <-ch:
  319. return true
  320. case <-after:
  321. return false
  322. }
  323. }
  324. func TestMConnectionReadErrorBadEncoding(t *testing.T) {
  325. chOnErr := make(chan struct{})
  326. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  327. defer mconnClient.Stop()
  328. defer mconnServer.Stop()
  329. client := mconnClient.conn
  330. // send badly encoded msgPacket
  331. bz := cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{})
  332. bz[4] += 0x01 // Invalid prefix bytes.
  333. // Write it.
  334. _, err := client.Write(bz)
  335. assert.Nil(t, err)
  336. assert.True(t, expectSend(chOnErr), "badly encoded msgPacket")
  337. }
  338. func TestMConnectionReadErrorUnknownChannel(t *testing.T) {
  339. chOnErr := make(chan struct{})
  340. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  341. defer mconnClient.Stop()
  342. defer mconnServer.Stop()
  343. msg := []byte("Ant-Man")
  344. // fail to send msg on channel unknown by client
  345. assert.False(t, mconnClient.Send(0x03, msg))
  346. // send msg on channel unknown by the server.
  347. // should cause an error
  348. assert.True(t, mconnClient.Send(0x02, msg))
  349. assert.True(t, expectSend(chOnErr), "unknown channel")
  350. }
  351. func TestMConnectionReadErrorLongMessage(t *testing.T) {
  352. chOnErr := make(chan struct{})
  353. chOnRcv := make(chan struct{})
  354. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  355. defer mconnClient.Stop()
  356. defer mconnServer.Stop()
  357. mconnServer.onReceive = func(chID byte, msgBytes []byte) {
  358. chOnRcv <- struct{}{}
  359. }
  360. client := mconnClient.conn
  361. // send msg thats just right
  362. var err error
  363. var buf = new(bytes.Buffer)
  364. var packet = PacketMsg{
  365. ChannelID: 0x01,
  366. EOF: 1,
  367. Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize),
  368. }
  369. _, err = cdc.MarshalBinaryLengthPrefixedWriter(buf, packet)
  370. assert.Nil(t, err)
  371. _, err = client.Write(buf.Bytes())
  372. assert.Nil(t, err)
  373. assert.True(t, expectSend(chOnRcv), "msg just right")
  374. // send msg thats too long
  375. buf = new(bytes.Buffer)
  376. packet = PacketMsg{
  377. ChannelID: 0x01,
  378. EOF: 1,
  379. Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+100),
  380. }
  381. _, err = cdc.MarshalBinaryLengthPrefixedWriter(buf, packet)
  382. assert.Nil(t, err)
  383. _, err = client.Write(buf.Bytes())
  384. assert.NotNil(t, err)
  385. assert.True(t, expectSend(chOnErr), "msg too long")
  386. }
  387. func TestMConnectionReadErrorUnknownMsgType(t *testing.T) {
  388. chOnErr := make(chan struct{})
  389. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  390. defer mconnClient.Stop()
  391. defer mconnServer.Stop()
  392. // send msg with unknown msg type
  393. err := error(nil)
  394. err = amino.EncodeUvarint(mconnClient.conn, 4)
  395. assert.Nil(t, err)
  396. _, err = mconnClient.conn.Write([]byte{0xFF, 0xFF, 0xFF, 0xFF})
  397. assert.Nil(t, err)
  398. assert.True(t, expectSend(chOnErr), "unknown msg type")
  399. }
  400. func TestMConnectionTrySend(t *testing.T) {
  401. server, client := NetPipe()
  402. defer server.Close()
  403. defer client.Close()
  404. mconn := createTestMConnection(client)
  405. err := mconn.Start()
  406. require.Nil(t, err)
  407. defer mconn.Stop()
  408. msg := []byte("Semicolon-Woman")
  409. resultCh := make(chan string, 2)
  410. assert.True(t, mconn.TrySend(0x01, msg))
  411. server.Read(make([]byte, len(msg)))
  412. assert.True(t, mconn.CanSend(0x01))
  413. assert.True(t, mconn.TrySend(0x01, msg))
  414. assert.False(t, mconn.CanSend(0x01))
  415. go func() {
  416. mconn.TrySend(0x01, msg)
  417. resultCh <- "TrySend"
  418. }()
  419. assert.False(t, mconn.CanSend(0x01))
  420. assert.False(t, mconn.TrySend(0x01, msg))
  421. assert.Equal(t, "TrySend", <-resultCh)
  422. }