You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

563 lines
15 KiB

  1. package conn
  2. import (
  3. "net"
  4. "testing"
  5. "time"
  6. "github.com/fortytw2/leaktest"
  7. "github.com/stretchr/testify/assert"
  8. "github.com/stretchr/testify/require"
  9. "github.com/tendermint/tendermint/libs/log"
  10. "github.com/tendermint/tendermint/libs/protoio"
  11. tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
  12. "github.com/tendermint/tendermint/proto/tendermint/types"
  13. )
  14. const maxPingPongPacketSize = 1024 // bytes
  15. func createTestMConnection(conn net.Conn) *MConnection {
  16. onReceive := func(chID byte, msgBytes []byte) {
  17. }
  18. onError := func(r interface{}) {
  19. }
  20. c := createMConnectionWithCallbacks(conn, onReceive, onError)
  21. c.SetLogger(log.TestingLogger())
  22. return c
  23. }
  24. func createMConnectionWithCallbacks(
  25. conn net.Conn,
  26. onReceive func(chID byte, msgBytes []byte),
  27. onError func(r interface{}),
  28. ) *MConnection {
  29. cfg := DefaultMConnConfig()
  30. cfg.PingInterval = 90 * time.Millisecond
  31. cfg.PongTimeout = 45 * time.Millisecond
  32. chDescs := []*ChannelDescriptor{{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
  33. c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg)
  34. c.SetLogger(log.TestingLogger())
  35. return c
  36. }
  37. func TestMConnectionSendFlushStop(t *testing.T) {
  38. server, client := NetPipe()
  39. defer server.Close()
  40. defer client.Close()
  41. clientConn := createTestMConnection(client)
  42. err := clientConn.Start()
  43. require.Nil(t, err)
  44. defer clientConn.Stop() // nolint:errcheck // ignore for tests
  45. msg := []byte("abc")
  46. assert.True(t, clientConn.Send(0x01, msg))
  47. msgLength := 14
  48. // start the reader in a new routine, so we can flush
  49. errCh := make(chan error)
  50. go func() {
  51. msgB := make([]byte, msgLength)
  52. _, err := server.Read(msgB)
  53. if err != nil {
  54. t.Error(err)
  55. return
  56. }
  57. errCh <- err
  58. }()
  59. // stop the conn - it should flush all conns
  60. clientConn.FlushStop()
  61. timer := time.NewTimer(3 * time.Second)
  62. select {
  63. case <-errCh:
  64. case <-timer.C:
  65. t.Error("timed out waiting for msgs to be read")
  66. }
  67. }
  68. func TestMConnectionSend(t *testing.T) {
  69. server, client := NetPipe()
  70. defer server.Close()
  71. defer client.Close()
  72. mconn := createTestMConnection(client)
  73. err := mconn.Start()
  74. require.Nil(t, err)
  75. defer mconn.Stop() // nolint:errcheck // ignore for tests
  76. msg := []byte("Ant-Man")
  77. assert.True(t, mconn.Send(0x01, msg))
  78. // Note: subsequent Send/TrySend calls could pass because we are reading from
  79. // the send queue in a separate goroutine.
  80. _, err = server.Read(make([]byte, len(msg)))
  81. if err != nil {
  82. t.Error(err)
  83. }
  84. assert.True(t, mconn.CanSend(0x01))
  85. msg = []byte("Spider-Man")
  86. assert.True(t, mconn.TrySend(0x01, msg))
  87. _, err = server.Read(make([]byte, len(msg)))
  88. if err != nil {
  89. t.Error(err)
  90. }
  91. assert.False(t, mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
  92. assert.False(t, mconn.Send(0x05, []byte("Absorbing Man")), "Send should return false because channel is unknown")
  93. }
  94. func TestMConnectionReceive(t *testing.T) {
  95. server, client := NetPipe()
  96. defer server.Close()
  97. defer client.Close()
  98. receivedCh := make(chan []byte)
  99. errorsCh := make(chan interface{})
  100. onReceive := func(chID byte, msgBytes []byte) {
  101. receivedCh <- msgBytes
  102. }
  103. onError := func(r interface{}) {
  104. errorsCh <- r
  105. }
  106. mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
  107. err := mconn1.Start()
  108. require.Nil(t, err)
  109. defer mconn1.Stop() // nolint:errcheck // ignore for tests
  110. mconn2 := createTestMConnection(server)
  111. err = mconn2.Start()
  112. require.Nil(t, err)
  113. defer mconn2.Stop() // nolint:errcheck // ignore for tests
  114. msg := []byte("Cyclops")
  115. assert.True(t, mconn2.Send(0x01, msg))
  116. select {
  117. case receivedBytes := <-receivedCh:
  118. assert.Equal(t, msg, receivedBytes)
  119. case err := <-errorsCh:
  120. t.Fatalf("Expected %s, got %+v", msg, err)
  121. case <-time.After(500 * time.Millisecond):
  122. t.Fatalf("Did not receive %s message in 500ms", msg)
  123. }
  124. }
  125. func TestMConnectionStatus(t *testing.T) {
  126. server, client := NetPipe()
  127. defer server.Close()
  128. defer client.Close()
  129. mconn := createTestMConnection(client)
  130. err := mconn.Start()
  131. require.Nil(t, err)
  132. defer mconn.Stop() // nolint:errcheck // ignore for tests
  133. status := mconn.Status()
  134. assert.NotNil(t, status)
  135. assert.Zero(t, status.Channels[0].SendQueueSize)
  136. }
  137. func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
  138. server, client := net.Pipe()
  139. defer server.Close()
  140. defer client.Close()
  141. receivedCh := make(chan []byte)
  142. errorsCh := make(chan interface{})
  143. onReceive := func(chID byte, msgBytes []byte) {
  144. receivedCh <- msgBytes
  145. }
  146. onError := func(r interface{}) {
  147. errorsCh <- r
  148. }
  149. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  150. err := mconn.Start()
  151. require.Nil(t, err)
  152. defer mconn.Stop() // nolint:errcheck // ignore for tests
  153. serverGotPing := make(chan struct{})
  154. go func() {
  155. // read ping
  156. var pkt tmp2p.Packet
  157. err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&pkt)
  158. require.NoError(t, err)
  159. serverGotPing <- struct{}{}
  160. }()
  161. <-serverGotPing
  162. pongTimerExpired := mconn.config.PongTimeout + 200*time.Millisecond
  163. select {
  164. case msgBytes := <-receivedCh:
  165. t.Fatalf("Expected error, but got %v", msgBytes)
  166. case err := <-errorsCh:
  167. assert.NotNil(t, err)
  168. case <-time.After(pongTimerExpired):
  169. t.Fatalf("Expected to receive error after %v", pongTimerExpired)
  170. }
  171. }
  172. func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) {
  173. server, client := net.Pipe()
  174. defer server.Close()
  175. defer client.Close()
  176. receivedCh := make(chan []byte)
  177. errorsCh := make(chan interface{})
  178. onReceive := func(chID byte, msgBytes []byte) {
  179. receivedCh <- msgBytes
  180. }
  181. onError := func(r interface{}) {
  182. errorsCh <- r
  183. }
  184. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  185. err := mconn.Start()
  186. require.Nil(t, err)
  187. defer mconn.Stop() // nolint:errcheck // ignore for tests
  188. // sending 3 pongs in a row (abuse)
  189. protoWriter := protoio.NewDelimitedWriter(server)
  190. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  191. require.NoError(t, err)
  192. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  193. require.NoError(t, err)
  194. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  195. require.NoError(t, err)
  196. serverGotPing := make(chan struct{})
  197. go func() {
  198. // read ping (one byte)
  199. var packet tmp2p.Packet
  200. err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&packet)
  201. require.NoError(t, err)
  202. serverGotPing <- struct{}{}
  203. // respond with pong
  204. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  205. require.NoError(t, err)
  206. }()
  207. <-serverGotPing
  208. pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
  209. select {
  210. case msgBytes := <-receivedCh:
  211. t.Fatalf("Expected no data, but got %v", msgBytes)
  212. case err := <-errorsCh:
  213. t.Fatalf("Expected no error, but got %v", err)
  214. case <-time.After(pongTimerExpired):
  215. assert.True(t, mconn.IsRunning())
  216. }
  217. }
  218. func TestMConnectionMultiplePings(t *testing.T) {
  219. server, client := net.Pipe()
  220. defer server.Close()
  221. defer client.Close()
  222. receivedCh := make(chan []byte)
  223. errorsCh := make(chan interface{})
  224. onReceive := func(chID byte, msgBytes []byte) {
  225. receivedCh <- msgBytes
  226. }
  227. onError := func(r interface{}) {
  228. errorsCh <- r
  229. }
  230. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  231. err := mconn.Start()
  232. require.Nil(t, err)
  233. defer mconn.Stop() // nolint:errcheck // ignore for tests
  234. // sending 3 pings in a row (abuse)
  235. // see https://github.com/tendermint/tendermint/issues/1190
  236. protoReader := protoio.NewDelimitedReader(server, maxPingPongPacketSize)
  237. protoWriter := protoio.NewDelimitedWriter(server)
  238. var pkt tmp2p.Packet
  239. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
  240. require.NoError(t, err)
  241. err = protoReader.ReadMsg(&pkt)
  242. require.NoError(t, err)
  243. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
  244. require.NoError(t, err)
  245. err = protoReader.ReadMsg(&pkt)
  246. require.NoError(t, err)
  247. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
  248. require.NoError(t, err)
  249. err = protoReader.ReadMsg(&pkt)
  250. require.NoError(t, err)
  251. assert.True(t, mconn.IsRunning())
  252. }
  253. func TestMConnectionPingPongs(t *testing.T) {
  254. // check that we are not leaking any go-routines
  255. defer leaktest.CheckTimeout(t, 10*time.Second)()
  256. server, client := net.Pipe()
  257. defer server.Close()
  258. defer client.Close()
  259. receivedCh := make(chan []byte)
  260. errorsCh := make(chan interface{})
  261. onReceive := func(chID byte, msgBytes []byte) {
  262. receivedCh <- msgBytes
  263. }
  264. onError := func(r interface{}) {
  265. errorsCh <- r
  266. }
  267. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  268. err := mconn.Start()
  269. require.Nil(t, err)
  270. defer mconn.Stop() // nolint:errcheck // ignore for tests
  271. serverGotPing := make(chan struct{})
  272. go func() {
  273. protoReader := protoio.NewDelimitedReader(server, maxPingPongPacketSize)
  274. protoWriter := protoio.NewDelimitedWriter(server)
  275. var pkt tmp2p.PacketPing
  276. // read ping
  277. err = protoReader.ReadMsg(&pkt)
  278. require.NoError(t, err)
  279. serverGotPing <- struct{}{}
  280. // respond with pong
  281. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  282. require.NoError(t, err)
  283. time.Sleep(mconn.config.PingInterval)
  284. // read ping
  285. err = protoReader.ReadMsg(&pkt)
  286. require.NoError(t, err)
  287. serverGotPing <- struct{}{}
  288. // respond with pong
  289. _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  290. require.NoError(t, err)
  291. }()
  292. <-serverGotPing
  293. <-serverGotPing
  294. pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 2
  295. select {
  296. case msgBytes := <-receivedCh:
  297. t.Fatalf("Expected no data, but got %v", msgBytes)
  298. case err := <-errorsCh:
  299. t.Fatalf("Expected no error, but got %v", err)
  300. case <-time.After(2 * pongTimerExpired):
  301. assert.True(t, mconn.IsRunning())
  302. }
  303. }
  304. func TestMConnectionStopsAndReturnsError(t *testing.T) {
  305. server, client := NetPipe()
  306. defer server.Close()
  307. defer client.Close()
  308. receivedCh := make(chan []byte)
  309. errorsCh := make(chan interface{})
  310. onReceive := func(chID byte, msgBytes []byte) {
  311. receivedCh <- msgBytes
  312. }
  313. onError := func(r interface{}) {
  314. errorsCh <- r
  315. }
  316. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  317. err := mconn.Start()
  318. require.Nil(t, err)
  319. defer mconn.Stop() // nolint:errcheck // ignore for tests
  320. if err := client.Close(); err != nil {
  321. t.Error(err)
  322. }
  323. select {
  324. case receivedBytes := <-receivedCh:
  325. t.Fatalf("Expected error, got %v", receivedBytes)
  326. case err := <-errorsCh:
  327. assert.NotNil(t, err)
  328. assert.False(t, mconn.IsRunning())
  329. case <-time.After(500 * time.Millisecond):
  330. t.Fatal("Did not receive error in 500ms")
  331. }
  332. }
  333. func newClientAndServerConnsForReadErrors(t *testing.T, chOnErr chan struct{}) (*MConnection, *MConnection) {
  334. server, client := NetPipe()
  335. onReceive := func(chID byte, msgBytes []byte) {}
  336. onError := func(r interface{}) {}
  337. // create client conn with two channels
  338. chDescs := []*ChannelDescriptor{
  339. {ID: 0x01, Priority: 1, SendQueueCapacity: 1},
  340. {ID: 0x02, Priority: 1, SendQueueCapacity: 1},
  341. }
  342. mconnClient := NewMConnection(client, chDescs, onReceive, onError)
  343. mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
  344. err := mconnClient.Start()
  345. require.Nil(t, err)
  346. // create server conn with 1 channel
  347. // it fires on chOnErr when there's an error
  348. serverLogger := log.TestingLogger().With("module", "server")
  349. onError = func(r interface{}) {
  350. chOnErr <- struct{}{}
  351. }
  352. mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
  353. mconnServer.SetLogger(serverLogger)
  354. err = mconnServer.Start()
  355. require.Nil(t, err)
  356. return mconnClient, mconnServer
  357. }
  358. func expectSend(ch chan struct{}) bool {
  359. after := time.After(time.Second * 5)
  360. select {
  361. case <-ch:
  362. return true
  363. case <-after:
  364. return false
  365. }
  366. }
  367. func TestMConnectionReadErrorBadEncoding(t *testing.T) {
  368. chOnErr := make(chan struct{})
  369. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  370. client := mconnClient.conn
  371. // Write it.
  372. _, err := client.Write([]byte{1, 2, 3, 4, 5})
  373. require.NoError(t, err)
  374. assert.True(t, expectSend(chOnErr), "badly encoded msgPacket")
  375. t.Cleanup(func() {
  376. if err := mconnClient.Stop(); err != nil {
  377. t.Log(err)
  378. }
  379. })
  380. t.Cleanup(func() {
  381. if err := mconnServer.Stop(); err != nil {
  382. t.Log(err)
  383. }
  384. })
  385. }
  386. func TestMConnectionReadErrorUnknownChannel(t *testing.T) {
  387. chOnErr := make(chan struct{})
  388. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  389. msg := []byte("Ant-Man")
  390. // fail to send msg on channel unknown by client
  391. assert.False(t, mconnClient.Send(0x03, msg))
  392. // send msg on channel unknown by the server.
  393. // should cause an error
  394. assert.True(t, mconnClient.Send(0x02, msg))
  395. assert.True(t, expectSend(chOnErr), "unknown channel")
  396. t.Cleanup(func() {
  397. if err := mconnClient.Stop(); err != nil {
  398. t.Log(err)
  399. }
  400. })
  401. t.Cleanup(func() {
  402. if err := mconnServer.Stop(); err != nil {
  403. t.Log(err)
  404. }
  405. })
  406. }
  407. func TestMConnectionReadErrorLongMessage(t *testing.T) {
  408. chOnErr := make(chan struct{})
  409. chOnRcv := make(chan struct{})
  410. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  411. defer mconnClient.Stop() // nolint:errcheck // ignore for tests
  412. defer mconnServer.Stop() // nolint:errcheck // ignore for tests
  413. mconnServer.onReceive = func(chID byte, msgBytes []byte) {
  414. chOnRcv <- struct{}{}
  415. }
  416. client := mconnClient.conn
  417. protoWriter := protoio.NewDelimitedWriter(client)
  418. // send msg thats just right
  419. var packet = tmp2p.PacketMsg{
  420. ChannelID: 0x01,
  421. EOF: true,
  422. Data: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize),
  423. }
  424. _, err := protoWriter.WriteMsg(mustWrapPacket(&packet))
  425. require.NoError(t, err)
  426. assert.True(t, expectSend(chOnRcv), "msg just right")
  427. // send msg thats too long
  428. packet = tmp2p.PacketMsg{
  429. ChannelID: 0x01,
  430. EOF: true,
  431. Data: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+100),
  432. }
  433. _, err = protoWriter.WriteMsg(mustWrapPacket(&packet))
  434. require.Error(t, err)
  435. assert.True(t, expectSend(chOnErr), "msg too long")
  436. }
  437. func TestMConnectionReadErrorUnknownMsgType(t *testing.T) {
  438. chOnErr := make(chan struct{})
  439. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
  440. defer mconnClient.Stop() // nolint:errcheck // ignore for tests
  441. defer mconnServer.Stop() // nolint:errcheck // ignore for tests
  442. // send msg with unknown msg type
  443. _, err := protoio.NewDelimitedWriter(mconnClient.conn).WriteMsg(&types.Header{ChainID: "x"})
  444. require.NoError(t, err)
  445. assert.True(t, expectSend(chOnErr), "unknown msg type")
  446. }
  447. func TestMConnectionTrySend(t *testing.T) {
  448. server, client := NetPipe()
  449. defer server.Close()
  450. defer client.Close()
  451. mconn := createTestMConnection(client)
  452. err := mconn.Start()
  453. require.Nil(t, err)
  454. defer mconn.Stop() // nolint:errcheck // ignore for tests
  455. msg := []byte("Semicolon-Woman")
  456. resultCh := make(chan string, 2)
  457. assert.True(t, mconn.TrySend(0x01, msg))
  458. _, err = server.Read(make([]byte, len(msg)))
  459. require.NoError(t, err)
  460. assert.True(t, mconn.CanSend(0x01))
  461. assert.True(t, mconn.TrySend(0x01, msg))
  462. assert.False(t, mconn.CanSend(0x01))
  463. go func() {
  464. mconn.TrySend(0x01, msg)
  465. resultCh <- "TrySend"
  466. }()
  467. assert.False(t, mconn.CanSend(0x01))
  468. assert.False(t, mconn.TrySend(0x01, msg))
  469. assert.Equal(t, "TrySend", <-resultCh)
  470. }