You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

500 lines
14 KiB

8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
7 years ago
  1. package conn
  2. import (
  3. "bytes"
  4. "net"
  5. "testing"
  6. "time"
  7. "github.com/fortytw2/leaktest"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. amino "github.com/tendermint/go-amino"
  11. "github.com/tendermint/tmlibs/log"
  12. )
  13. func createTestMConnection(conn net.Conn) *MConnection {
  14. onReceive := func(chID byte, msgBytes []byte) {
  15. }
  16. onError := func(r interface{}) {
  17. }
  18. c := createMConnectionWithCallbacks(conn, onReceive, onError)
  19. c.SetLogger(log.TestingLogger())
  20. return c
  21. }
  22. func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection {
  23. cfg := DefaultMConnConfig()
  24. cfg.PingInterval = 90 * time.Millisecond
  25. cfg.PongTimeout = 45 * time.Millisecond
  26. chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
  27. c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg)
  28. c.SetLogger(log.TestingLogger())
  29. return c
  30. }
  31. func TestMConnectionSend(t *testing.T) {
  32. server, client := NetPipe()
  33. defer server.Close() // nolint: errcheck
  34. defer client.Close() // nolint: errcheck
  35. mconn := createTestMConnection(client)
  36. err := mconn.Start()
  37. require.Nil(t, err)
  38. defer mconn.Stop()
  39. msg := []byte("Ant-Man")
  40. assert.True(t, mconn.Send(0x01, msg))
  41. // Note: subsequent Send/TrySend calls could pass because we are reading from
  42. // the send queue in a separate goroutine.
  43. _, err = server.Read(make([]byte, len(msg)))
  44. if err != nil {
  45. t.Error(err)
  46. }
  47. assert.True(t, mconn.CanSend(0x01))
  48. msg = []byte("Spider-Man")
  49. assert.True(t, mconn.TrySend(0x01, msg))
  50. _, err = server.Read(make([]byte, len(msg)))
  51. if err != nil {
  52. t.Error(err)
  53. }
  54. assert.False(t, mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
  55. assert.False(t, mconn.Send(0x05, []byte("Absorbing Man")), "Send should return false because channel is unknown")
  56. }
  57. func TestMConnectionReceive(t *testing.T) {
  58. server, client := NetPipe()
  59. defer server.Close() // nolint: errcheck
  60. defer client.Close() // nolint: errcheck
  61. receivedCh := make(chan []byte)
  62. errorsCh := make(chan interface{})
  63. onReceive := func(chID byte, msgBytes []byte) {
  64. receivedCh <- msgBytes
  65. }
  66. onError := func(r interface{}) {
  67. errorsCh <- r
  68. }
  69. mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
  70. err := mconn1.Start()
  71. require.Nil(t, err)
  72. defer mconn1.Stop()
  73. mconn2 := createTestMConnection(server)
  74. err = mconn2.Start()
  75. require.Nil(t, err)
  76. defer mconn2.Stop()
  77. msg := []byte("Cyclops")
  78. assert.True(t, mconn2.Send(0x01, msg))
  79. select {
  80. case receivedBytes := <-receivedCh:
  81. assert.Equal(t, []byte(msg), receivedBytes)
  82. case err := <-errorsCh:
  83. t.Fatalf("Expected %s, got %+v", msg, err)
  84. case <-time.After(500 * time.Millisecond):
  85. t.Fatalf("Did not receive %s message in 500ms", msg)
  86. }
  87. }
  88. func TestMConnectionStatus(t *testing.T) {
  89. server, client := NetPipe()
  90. defer server.Close() // nolint: errcheck
  91. defer client.Close() // nolint: errcheck
  92. mconn := createTestMConnection(client)
  93. err := mconn.Start()
  94. require.Nil(t, err)
  95. defer mconn.Stop()
  96. status := mconn.Status()
  97. assert.NotNil(t, status)
  98. assert.Zero(t, status.Channels[0].SendQueueSize)
  99. }
  100. func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
  101. server, client := net.Pipe()
  102. defer server.Close()
  103. defer client.Close()
  104. receivedCh := make(chan []byte)
  105. errorsCh := make(chan interface{})
  106. onReceive := func(chID byte, msgBytes []byte) {
  107. receivedCh <- msgBytes
  108. }
  109. onError := func(r interface{}) {
  110. errorsCh <- r
  111. }
  112. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  113. err := mconn.Start()
  114. require.Nil(t, err)
  115. defer mconn.Stop()
  116. serverGotPing := make(chan struct{})
  117. go func() {
  118. // read ping
  119. var pkt PacketPing
  120. const maxPacketPingSize = 1024
  121. _, err = cdc.UnmarshalBinaryReader(server, &pkt, maxPacketPingSize)
  122. assert.Nil(t, err)
  123. serverGotPing <- struct{}{}
  124. }()
  125. <-serverGotPing
  126. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  127. select {
  128. case msgBytes := <-receivedCh:
  129. t.Fatalf("Expected error, but got %v", msgBytes)
  130. case err := <-errorsCh:
  131. assert.NotNil(t, err)
  132. case <-time.After(pongTimerExpired):
  133. t.Fatalf("Expected to receive error after %v", pongTimerExpired)
  134. }
  135. }
  136. func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) {
  137. server, client := net.Pipe()
  138. defer server.Close()
  139. defer client.Close()
  140. receivedCh := make(chan []byte)
  141. errorsCh := make(chan interface{})
  142. onReceive := func(chID byte, msgBytes []byte) {
  143. receivedCh <- msgBytes
  144. }
  145. onError := func(r interface{}) {
  146. errorsCh <- r
  147. }
  148. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  149. err := mconn.Start()
  150. require.Nil(t, err)
  151. defer mconn.Stop()
  152. // sending 3 pongs in a row (abuse)
  153. _, err = server.Write(cdc.MustMarshalBinary(PacketPong{}))
  154. require.Nil(t, err)
  155. _, err = server.Write(cdc.MustMarshalBinary(PacketPong{}))
  156. require.Nil(t, err)
  157. _, err = server.Write(cdc.MustMarshalBinary(PacketPong{}))
  158. require.Nil(t, err)
  159. serverGotPing := make(chan struct{})
  160. go func() {
  161. // read ping (one byte)
  162. var packet, err = Packet(nil), error(nil)
  163. _, err = cdc.UnmarshalBinaryReader(server, &packet, 1024)
  164. require.Nil(t, err)
  165. serverGotPing <- struct{}{}
  166. // respond with pong
  167. _, err = server.Write(cdc.MustMarshalBinary(PacketPong{}))
  168. require.Nil(t, err)
  169. }()
  170. <-serverGotPing
  171. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  172. select {
  173. case msgBytes := <-receivedCh:
  174. t.Fatalf("Expected no data, but got %v", msgBytes)
  175. case err := <-errorsCh:
  176. t.Fatalf("Expected no error, but got %v", err)
  177. case <-time.After(pongTimerExpired):
  178. assert.True(t, mconn.IsRunning())
  179. }
  180. }
  181. func TestMConnectionMultiplePings(t *testing.T) {
  182. server, client := net.Pipe()
  183. defer server.Close()
  184. defer client.Close()
  185. receivedCh := make(chan []byte)
  186. errorsCh := make(chan interface{})
  187. onReceive := func(chID byte, msgBytes []byte) {
  188. receivedCh <- msgBytes
  189. }
  190. onError := func(r interface{}) {
  191. errorsCh <- r
  192. }
  193. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  194. err := mconn.Start()
  195. require.Nil(t, err)
  196. defer mconn.Stop()
  197. // sending 3 pings in a row (abuse)
  198. // see https://github.com/tendermint/tendermint/issues/1190
  199. _, err = server.Write(cdc.MustMarshalBinary(PacketPing{}))
  200. require.Nil(t, err)
  201. var pkt PacketPong
  202. _, err = cdc.UnmarshalBinaryReader(server, &pkt, 1024)
  203. require.Nil(t, err)
  204. _, err = server.Write(cdc.MustMarshalBinary(PacketPing{}))
  205. require.Nil(t, err)
  206. _, err = cdc.UnmarshalBinaryReader(server, &pkt, 1024)
  207. require.Nil(t, err)
  208. _, err = server.Write(cdc.MustMarshalBinary(PacketPing{}))
  209. require.Nil(t, err)
  210. _, err = cdc.UnmarshalBinaryReader(server, &pkt, 1024)
  211. require.Nil(t, err)
  212. assert.True(t, mconn.IsRunning())
  213. }
  214. func TestMConnectionPingPongs(t *testing.T) {
  215. // check that we are not leaking any go-routines
  216. defer leaktest.CheckTimeout(t, 10*time.Second)()
  217. server, client := net.Pipe()
  218. defer server.Close()
  219. defer client.Close()
  220. receivedCh := make(chan []byte)
  221. errorsCh := make(chan interface{})
  222. onReceive := func(chID byte, msgBytes []byte) {
  223. receivedCh <- msgBytes
  224. }
  225. onError := func(r interface{}) {
  226. errorsCh <- r
  227. }
  228. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  229. err := mconn.Start()
  230. require.Nil(t, err)
  231. defer mconn.Stop()
  232. serverGotPing := make(chan struct{})
  233. go func() {
  234. // read ping
  235. var pkt PacketPing
  236. _, err = cdc.UnmarshalBinaryReader(server, &pkt, 1024)
  237. require.Nil(t, err)
  238. serverGotPing <- struct{}{}
  239. // respond with pong
  240. _, err = server.Write(cdc.MustMarshalBinary(PacketPong{}))
  241. require.Nil(t, err)
  242. time.Sleep(mconn.config.PingInterval)
  243. // read ping
  244. _, err = cdc.UnmarshalBinaryReader(server, &pkt, 1024)
  245. require.Nil(t, err)
  246. // respond with pong
  247. _, err = server.Write(cdc.MustMarshalBinary(PacketPong{}))
  248. require.Nil(t, err)
  249. }()
  250. <-serverGotPing
  251. pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 2
  252. select {
  253. case msgBytes := <-receivedCh:
  254. t.Fatalf("Expected no data, but got %v", msgBytes)
  255. case err := <-errorsCh:
  256. t.Fatalf("Expected no error, but got %v", err)
  257. case <-time.After(2 * pongTimerExpired):
  258. assert.True(t, mconn.IsRunning())
  259. }
  260. }
  261. func TestMConnectionStopsAndReturnsError(t *testing.T) {
  262. server, client := NetPipe()
  263. defer server.Close() // nolint: errcheck
  264. defer client.Close() // nolint: errcheck
  265. receivedCh := make(chan []byte)
  266. errorsCh := make(chan interface{})
  267. onReceive := func(chID byte, msgBytes []byte) {
  268. receivedCh <- msgBytes
  269. }
  270. onError := func(r interface{}) {
  271. errorsCh <- r
  272. }
  273. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  274. err := mconn.Start()
  275. require.Nil(t, err)
  276. defer mconn.Stop()
  277. if err := client.Close(); err != nil {
  278. t.Error(err)
  279. }
  280. select {
  281. case receivedBytes := <-receivedCh:
  282. t.Fatalf("Expected error, got %v", receivedBytes)
  283. case err := <-errorsCh:
  284. assert.NotNil(t, err)
  285. assert.False(t, mconn.IsRunning())
  286. case <-time.After(500 * time.Millisecond):
  287. t.Fatal("Did not receive error in 500ms")
  288. }
  289. }
  290. func newClientAndServerConnsForReadErrors(t *testing.T, chOnErr chan struct{}) (*MConnection, *MConnection) {
  291. server, client := NetPipe()
  292. onReceive := func(chID byte, msgBytes []byte) {}
  293. onError := func(r interface{}) {}
  294. // create client conn with two channels
  295. chDescs := []*ChannelDescriptor{
  296. {ID: 0x01, Priority: 1, SendQueueCapacity: 1},
  297. {ID: 0x02, Priority: 1, SendQueueCapacity: 1},
  298. }
  299. mconnClient := NewMConnection(client, chDescs, onReceive, onError)
  300. mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
  301. err := mconnClient.Start()
  302. require.Nil(t, err)
  303. // create server conn with 1 channel
  304. // it fires on chOnErr when there's an error
  305. serverLogger := log.TestingLogger().With("module", "server")
  306. onError = func(r interface{}) {
  307. chOnErr <- struct{}{}
  308. }
  309. mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
  310. mconnServer.SetLogger(serverLogger)
  311. err = mconnServer.Start()
  312. require.Nil(t, err)
  313. return mconnClient, mconnServer
  314. }
  315. func expectSend(ch chan struct{}) bool {
  316. after := time.After(time.Second * 5)
  317. select {
  318. case <-ch:
  319. return true
  320. case <-after:
  321. return false
  322. }
  323. }
  324. func TestMConnectionReadErrorBadEncoding(t *testing.T) {
  325. chOnErr := make(chan struct{})
  326. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  327. defer mconnClient.Stop()
  328. defer mconnServer.Stop()
  329. client := mconnClient.conn
  330. // send badly encoded msgPacket
  331. bz := cdc.MustMarshalBinary(PacketMsg{})
  332. bz[4] += 0x01 // Invalid prefix bytes.
  333. // Write it.
  334. _, err := client.Write(bz)
  335. assert.Nil(t, err)
  336. assert.True(t, expectSend(chOnErr), "badly encoded msgPacket")
  337. }
  338. func TestMConnectionReadErrorUnknownChannel(t *testing.T) {
  339. chOnErr := make(chan struct{})
  340. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  341. defer mconnClient.Stop()
  342. defer mconnServer.Stop()
  343. msg := []byte("Ant-Man")
  344. // fail to send msg on channel unknown by client
  345. assert.False(t, mconnClient.Send(0x03, msg))
  346. // send msg on channel unknown by the server.
  347. // should cause an error
  348. assert.True(t, mconnClient.Send(0x02, msg))
  349. assert.True(t, expectSend(chOnErr), "unknown channel")
  350. }
  351. func TestMConnectionReadErrorLongMessage(t *testing.T) {
  352. chOnErr := make(chan struct{})
  353. chOnRcv := make(chan struct{})
  354. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  355. defer mconnClient.Stop()
  356. defer mconnServer.Stop()
  357. mconnServer.onReceive = func(chID byte, msgBytes []byte) {
  358. chOnRcv <- struct{}{}
  359. }
  360. client := mconnClient.conn
  361. // send msg thats just right
  362. var err error
  363. var buf = new(bytes.Buffer)
  364. // - Uvarint length of MustMarshalBinary(packet) = 1 or 2 bytes
  365. // (as long as it's less than 16,384 bytes)
  366. // - Prefix bytes = 4 bytes
  367. // - ChannelID field key + byte = 2 bytes
  368. // - EOF field key + byte = 2 bytes
  369. // - Bytes field key = 1 bytes
  370. // - Uvarint length of MustMarshalBinary(bytes) = 1 or 2 bytes
  371. // - Struct terminator = 1 byte
  372. // = up to 14 bytes overhead for the packet.
  373. var packet = PacketMsg{
  374. ChannelID: 0x01,
  375. EOF: 1,
  376. Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize),
  377. }
  378. _, err = cdc.MarshalBinaryWriter(buf, packet)
  379. assert.Nil(t, err)
  380. _, err = client.Write(buf.Bytes())
  381. assert.Nil(t, err)
  382. assert.True(t, expectSend(chOnRcv), "msg just right")
  383. assert.False(t, expectSend(chOnErr), "msg just right")
  384. // send msg thats too long
  385. buf = new(bytes.Buffer)
  386. packet = PacketMsg{
  387. ChannelID: 0x01,
  388. EOF: 1,
  389. Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+1),
  390. }
  391. _, err = cdc.MarshalBinaryWriter(buf, packet)
  392. assert.Nil(t, err)
  393. _, err = client.Write(buf.Bytes())
  394. assert.NotNil(t, err)
  395. assert.False(t, expectSend(chOnRcv), "msg too long")
  396. assert.True(t, expectSend(chOnErr), "msg too long")
  397. }
  398. func TestMConnectionReadErrorUnknownMsgType(t *testing.T) {
  399. chOnErr := make(chan struct{})
  400. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  401. defer mconnClient.Stop()
  402. defer mconnServer.Stop()
  403. // send msg with unknown msg type
  404. err := error(nil)
  405. err = amino.EncodeUvarint(mconnClient.conn, 4)
  406. assert.Nil(t, err)
  407. _, err = mconnClient.conn.Write([]byte{0xFF, 0xFF, 0xFF, 0xFF})
  408. assert.Nil(t, err)
  409. assert.True(t, expectSend(chOnErr), "unknown msg type")
  410. }
  411. func TestMConnectionTrySend(t *testing.T) {
  412. server, client := NetPipe()
  413. defer server.Close()
  414. defer client.Close()
  415. mconn := createTestMConnection(client)
  416. err := mconn.Start()
  417. require.Nil(t, err)
  418. defer mconn.Stop()
  419. msg := []byte("Semicolon-Woman")
  420. resultCh := make(chan string, 2)
  421. assert.True(t, mconn.TrySend(0x01, msg))
  422. server.Read(make([]byte, len(msg)))
  423. assert.True(t, mconn.CanSend(0x01))
  424. assert.True(t, mconn.TrySend(0x01, msg))
  425. assert.False(t, mconn.CanSend(0x01))
  426. go func() {
  427. mconn.TrySend(0x01, msg)
  428. resultCh <- "TrySend"
  429. }()
  430. assert.False(t, mconn.CanSend(0x01))
  431. assert.False(t, mconn.TrySend(0x01, msg))
  432. assert.Equal(t, "TrySend", <-resultCh)
  433. }