You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

482 lines
13 KiB

8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
7 years ago
  1. package conn
  2. import (
  3. "net"
  4. "testing"
  5. "time"
  6. "github.com/stretchr/testify/assert"
  7. "github.com/stretchr/testify/require"
  8. wire "github.com/tendermint/go-wire"
  9. "github.com/tendermint/tmlibs/log"
  10. )
  11. func createTestMConnection(conn net.Conn) *MConnection {
  12. onReceive := func(chID byte, msgBytes []byte) {
  13. }
  14. onError := func(r interface{}) {
  15. }
  16. c := createMConnectionWithCallbacks(conn, onReceive, onError)
  17. c.SetLogger(log.TestingLogger())
  18. return c
  19. }
  20. func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection {
  21. cfg := DefaultMConnConfig()
  22. cfg.PingInterval = 90 * time.Millisecond
  23. cfg.PongTimeout = 45 * time.Millisecond
  24. chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
  25. c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg)
  26. c.SetLogger(log.TestingLogger())
  27. return c
  28. }
  29. func TestMConnectionSend(t *testing.T) {
  30. assert, require := assert.New(t), require.New(t)
  31. server, client := NetPipe()
  32. defer server.Close() // nolint: errcheck
  33. defer client.Close() // nolint: errcheck
  34. mconn := createTestMConnection(client)
  35. err := mconn.Start()
  36. require.Nil(err)
  37. defer mconn.Stop()
  38. msg := "Ant-Man"
  39. assert.True(mconn.Send(0x01, msg))
  40. // Note: subsequent Send/TrySend calls could pass because we are reading from
  41. // the send queue in a separate goroutine.
  42. _, err = server.Read(make([]byte, len(msg)))
  43. if err != nil {
  44. t.Error(err)
  45. }
  46. assert.True(mconn.CanSend(0x01))
  47. msg = "Spider-Man"
  48. assert.True(mconn.TrySend(0x01, msg))
  49. _, err = server.Read(make([]byte, len(msg)))
  50. if err != nil {
  51. t.Error(err)
  52. }
  53. assert.False(mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
  54. assert.False(mconn.Send(0x05, "Absorbing Man"), "Send should return false because channel is unknown")
  55. }
  56. func TestMConnectionReceive(t *testing.T) {
  57. assert, require := assert.New(t), require.New(t)
  58. server, client := NetPipe()
  59. defer server.Close() // nolint: errcheck
  60. defer client.Close() // nolint: errcheck
  61. receivedCh := make(chan []byte)
  62. errorsCh := make(chan interface{})
  63. onReceive := func(chID byte, msgBytes []byte) {
  64. receivedCh <- msgBytes
  65. }
  66. onError := func(r interface{}) {
  67. errorsCh <- r
  68. }
  69. mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
  70. err := mconn1.Start()
  71. require.Nil(err)
  72. defer mconn1.Stop()
  73. mconn2 := createTestMConnection(server)
  74. err = mconn2.Start()
  75. require.Nil(err)
  76. defer mconn2.Stop()
  77. msg := "Cyclops"
  78. assert.True(mconn2.Send(0x01, msg))
  79. select {
  80. case receivedBytes := <-receivedCh:
  81. assert.Equal([]byte(msg), receivedBytes[2:]) // first 3 bytes are internal
  82. case err := <-errorsCh:
  83. t.Fatalf("Expected %s, got %+v", msg, err)
  84. case <-time.After(500 * time.Millisecond):
  85. t.Fatalf("Did not receive %s message in 500ms", msg)
  86. }
  87. }
  88. func TestMConnectionStatus(t *testing.T) {
  89. assert, require := assert.New(t), require.New(t)
  90. server, client := NetPipe()
  91. defer server.Close() // nolint: errcheck
  92. defer client.Close() // nolint: errcheck
  93. mconn := createTestMConnection(client)
  94. err := mconn.Start()
  95. require.Nil(err)
  96. defer mconn.Stop()
  97. status := mconn.Status()
  98. assert.NotNil(status)
  99. assert.Zero(status.Channels[0].SendQueueSize)
  100. }
  101. func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
  102. server, client := net.Pipe()
  103. defer server.Close()
  104. defer client.Close()
  105. receivedCh := make(chan []byte)
  106. errorsCh := make(chan interface{})
  107. onReceive := func(chID byte, msgBytes []byte) {
  108. receivedCh <- msgBytes
  109. }
  110. onError := func(r interface{}) {
  111. errorsCh <- r
  112. }
  113. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  114. err := mconn.Start()
  115. require.Nil(t, err)
  116. defer mconn.Stop()
  117. serverGotPing := make(chan struct{})
  118. go func() {
  119. // read ping
  120. server.Read(make([]byte, 1))
  121. serverGotPing <- struct{}{}
  122. }()
  123. <-serverGotPing
  124. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  125. select {
  126. case msgBytes := <-receivedCh:
  127. t.Fatalf("Expected error, but got %v", msgBytes)
  128. case err := <-errorsCh:
  129. assert.NotNil(t, err)
  130. case <-time.After(pongTimerExpired):
  131. t.Fatalf("Expected to receive error after %v", pongTimerExpired)
  132. }
  133. }
  134. func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) {
  135. server, client := net.Pipe()
  136. defer server.Close()
  137. defer client.Close()
  138. receivedCh := make(chan []byte)
  139. errorsCh := make(chan interface{})
  140. onReceive := func(chID byte, msgBytes []byte) {
  141. receivedCh <- msgBytes
  142. }
  143. onError := func(r interface{}) {
  144. errorsCh <- r
  145. }
  146. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  147. err := mconn.Start()
  148. require.Nil(t, err)
  149. defer mconn.Stop()
  150. // sending 3 pongs in a row (abuse)
  151. _, err = server.Write([]byte{packetTypePong})
  152. require.Nil(t, err)
  153. _, err = server.Write([]byte{packetTypePong})
  154. require.Nil(t, err)
  155. _, err = server.Write([]byte{packetTypePong})
  156. require.Nil(t, err)
  157. serverGotPing := make(chan struct{})
  158. go func() {
  159. // read ping (one byte)
  160. _, err = server.Read(make([]byte, 1))
  161. require.Nil(t, err)
  162. serverGotPing <- struct{}{}
  163. // respond with pong
  164. _, err = server.Write([]byte{packetTypePong})
  165. require.Nil(t, err)
  166. }()
  167. <-serverGotPing
  168. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  169. select {
  170. case msgBytes := <-receivedCh:
  171. t.Fatalf("Expected no data, but got %v", msgBytes)
  172. case err := <-errorsCh:
  173. t.Fatalf("Expected no error, but got %v", err)
  174. case <-time.After(pongTimerExpired):
  175. assert.True(t, mconn.IsRunning())
  176. }
  177. }
  178. func TestMConnectionMultiplePings(t *testing.T) {
  179. server, client := net.Pipe()
  180. defer server.Close()
  181. defer client.Close()
  182. receivedCh := make(chan []byte)
  183. errorsCh := make(chan interface{})
  184. onReceive := func(chID byte, msgBytes []byte) {
  185. receivedCh <- msgBytes
  186. }
  187. onError := func(r interface{}) {
  188. errorsCh <- r
  189. }
  190. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  191. err := mconn.Start()
  192. require.Nil(t, err)
  193. defer mconn.Stop()
  194. // sending 3 pings in a row (abuse)
  195. // see https://github.com/tendermint/tendermint/issues/1190
  196. _, err = server.Write([]byte{packetTypePing})
  197. require.Nil(t, err)
  198. _, err = server.Read(make([]byte, 1))
  199. require.Nil(t, err)
  200. _, err = server.Write([]byte{packetTypePing})
  201. require.Nil(t, err)
  202. _, err = server.Read(make([]byte, 1))
  203. require.Nil(t, err)
  204. _, err = server.Write([]byte{packetTypePing})
  205. require.Nil(t, err)
  206. _, err = server.Read(make([]byte, 1))
  207. require.Nil(t, err)
  208. assert.True(t, mconn.IsRunning())
  209. }
  210. func TestMConnectionPingPongs(t *testing.T) {
  211. server, client := net.Pipe()
  212. defer server.Close()
  213. defer client.Close()
  214. receivedCh := make(chan []byte)
  215. errorsCh := make(chan interface{})
  216. onReceive := func(chID byte, msgBytes []byte) {
  217. receivedCh <- msgBytes
  218. }
  219. onError := func(r interface{}) {
  220. errorsCh <- r
  221. }
  222. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  223. err := mconn.Start()
  224. require.Nil(t, err)
  225. defer mconn.Stop()
  226. serverGotPing := make(chan struct{})
  227. go func() {
  228. // read ping
  229. server.Read(make([]byte, 1))
  230. serverGotPing <- struct{}{}
  231. // respond with pong
  232. _, err = server.Write([]byte{packetTypePong})
  233. require.Nil(t, err)
  234. time.Sleep(mconn.config.PingInterval)
  235. // read ping
  236. server.Read(make([]byte, 1))
  237. // respond with pong
  238. _, err = server.Write([]byte{packetTypePong})
  239. require.Nil(t, err)
  240. }()
  241. <-serverGotPing
  242. pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 2
  243. select {
  244. case msgBytes := <-receivedCh:
  245. t.Fatalf("Expected no data, but got %v", msgBytes)
  246. case err := <-errorsCh:
  247. t.Fatalf("Expected no error, but got %v", err)
  248. case <-time.After(2 * pongTimerExpired):
  249. assert.True(t, mconn.IsRunning())
  250. }
  251. }
  252. func TestMConnectionStopsAndReturnsError(t *testing.T) {
  253. assert, require := assert.New(t), require.New(t)
  254. server, client := NetPipe()
  255. defer server.Close() // nolint: errcheck
  256. defer client.Close() // nolint: errcheck
  257. receivedCh := make(chan []byte)
  258. errorsCh := make(chan interface{})
  259. onReceive := func(chID byte, msgBytes []byte) {
  260. receivedCh <- msgBytes
  261. }
  262. onError := func(r interface{}) {
  263. errorsCh <- r
  264. }
  265. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  266. err := mconn.Start()
  267. require.Nil(err)
  268. defer mconn.Stop()
  269. if err := client.Close(); err != nil {
  270. t.Error(err)
  271. }
  272. select {
  273. case receivedBytes := <-receivedCh:
  274. t.Fatalf("Expected error, got %v", receivedBytes)
  275. case err := <-errorsCh:
  276. assert.NotNil(err)
  277. assert.False(mconn.IsRunning())
  278. case <-time.After(500 * time.Millisecond):
  279. t.Fatal("Did not receive error in 500ms")
  280. }
  281. }
  282. func newClientAndServerConnsForReadErrors(require *require.Assertions, chOnErr chan struct{}) (*MConnection, *MConnection) {
  283. server, client := NetPipe()
  284. onReceive := func(chID byte, msgBytes []byte) {}
  285. onError := func(r interface{}) {}
  286. // create client conn with two channels
  287. chDescs := []*ChannelDescriptor{
  288. {ID: 0x01, Priority: 1, SendQueueCapacity: 1},
  289. {ID: 0x02, Priority: 1, SendQueueCapacity: 1},
  290. }
  291. mconnClient := NewMConnection(client, chDescs, onReceive, onError)
  292. mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
  293. err := mconnClient.Start()
  294. require.Nil(err)
  295. // create server conn with 1 channel
  296. // it fires on chOnErr when there's an error
  297. serverLogger := log.TestingLogger().With("module", "server")
  298. onError = func(r interface{}) {
  299. chOnErr <- struct{}{}
  300. }
  301. mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
  302. mconnServer.SetLogger(serverLogger)
  303. err = mconnServer.Start()
  304. require.Nil(err)
  305. return mconnClient, mconnServer
  306. }
  307. func expectSend(ch chan struct{}) bool {
  308. after := time.After(time.Second * 5)
  309. select {
  310. case <-ch:
  311. return true
  312. case <-after:
  313. return false
  314. }
  315. }
  316. func TestMConnectionReadErrorBadEncoding(t *testing.T) {
  317. assert, require := assert.New(t), require.New(t)
  318. chOnErr := make(chan struct{})
  319. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr)
  320. defer mconnClient.Stop()
  321. defer mconnServer.Stop()
  322. client := mconnClient.conn
  323. msg := "Ant-Man"
  324. // send badly encoded msgPacket
  325. var n int
  326. var err error
  327. wire.WriteByte(packetTypeMsg, client, &n, &err)
  328. wire.WriteByteSlice([]byte(msg), client, &n, &err)
  329. assert.True(expectSend(chOnErr), "badly encoded msgPacket")
  330. }
  331. func TestMConnectionReadErrorUnknownChannel(t *testing.T) {
  332. assert, require := assert.New(t), require.New(t)
  333. chOnErr := make(chan struct{})
  334. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr)
  335. defer mconnClient.Stop()
  336. defer mconnServer.Stop()
  337. msg := "Ant-Man"
  338. // fail to send msg on channel unknown by client
  339. assert.False(mconnClient.Send(0x03, msg))
  340. // send msg on channel unknown by the server.
  341. // should cause an error
  342. assert.True(mconnClient.Send(0x02, msg))
  343. assert.True(expectSend(chOnErr), "unknown channel")
  344. }
  345. func TestMConnectionReadErrorLongMessage(t *testing.T) {
  346. assert, require := assert.New(t), require.New(t)
  347. chOnErr := make(chan struct{})
  348. chOnRcv := make(chan struct{})
  349. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr)
  350. defer mconnClient.Stop()
  351. defer mconnServer.Stop()
  352. mconnServer.onReceive = func(chID byte, msgBytes []byte) {
  353. chOnRcv <- struct{}{}
  354. }
  355. client := mconnClient.conn
  356. // send msg thats just right
  357. var n int
  358. var err error
  359. packet := msgPacket{
  360. ChannelID: 0x01,
  361. Bytes: make([]byte, mconnClient.config.maxMsgPacketTotalSize()-5),
  362. EOF: 1,
  363. }
  364. writeMsgPacketTo(packet, client, &n, &err)
  365. assert.True(expectSend(chOnRcv), "msg just right")
  366. // send msg thats too long
  367. packet = msgPacket{
  368. ChannelID: 0x01,
  369. Bytes: make([]byte, mconnClient.config.maxMsgPacketTotalSize()-4),
  370. EOF: 1,
  371. }
  372. writeMsgPacketTo(packet, client, &n, &err)
  373. assert.True(expectSend(chOnErr), "msg too long")
  374. }
  375. func TestMConnectionReadErrorUnknownMsgType(t *testing.T) {
  376. assert, require := assert.New(t), require.New(t)
  377. chOnErr := make(chan struct{})
  378. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr)
  379. defer mconnClient.Stop()
  380. defer mconnServer.Stop()
  381. // send msg with unknown msg type
  382. var n int
  383. var err error
  384. wire.WriteByte(0x04, mconnClient.conn, &n, &err)
  385. assert.True(expectSend(chOnErr), "unknown msg type")
  386. }
  387. func TestMConnectionTrySend(t *testing.T) {
  388. assert, require := assert.New(t), require.New(t)
  389. server, client := NetPipe()
  390. defer server.Close()
  391. defer client.Close()
  392. mconn := createTestMConnection(client)
  393. err := mconn.Start()
  394. require.Nil(err)
  395. defer mconn.Stop()
  396. msg := "Semicolon-Woman"
  397. resultCh := make(chan string, 2)
  398. assert.True(mconn.TrySend(0x01, msg))
  399. server.Read(make([]byte, len(msg)))
  400. assert.True(mconn.CanSend(0x01))
  401. assert.True(mconn.TrySend(0x01, msg))
  402. assert.False(mconn.CanSend(0x01))
  403. go func() {
  404. mconn.TrySend(0x01, msg)
  405. resultCh <- "TrySend"
  406. }()
  407. assert.False(mconn.CanSend(0x01))
  408. assert.False(mconn.TrySend(0x01, msg))
  409. assert.Equal("TrySend", <-resultCh)
  410. }