You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

140 lines
3.6 KiB

  1. package p2p_test
  2. import (
  3. "net"
  4. "testing"
  5. "time"
  6. "github.com/stretchr/testify/assert"
  7. "github.com/stretchr/testify/require"
  8. p2p "github.com/tendermint/go-p2p"
  9. )
  10. func createMConnection(conn net.Conn) *p2p.MConnection {
  11. onReceive := func(chID byte, msgBytes []byte) {
  12. }
  13. onError := func(r interface{}) {
  14. }
  15. return createMConnectionWithCallbacks(conn, onReceive, onError)
  16. }
  17. func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection {
  18. chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
  19. return p2p.NewMConnection(conn, chDescs, onReceive, onError)
  20. }
  21. func TestMConnectionSend(t *testing.T) {
  22. assert, require := assert.New(t), require.New(t)
  23. server, client := net.Pipe()
  24. defer server.Close()
  25. defer client.Close()
  26. mconn := createMConnection(client)
  27. _, err := mconn.Start()
  28. require.Nil(err)
  29. defer mconn.Stop()
  30. msg := "Ant-Man"
  31. assert.True(mconn.Send(0x01, msg))
  32. // Note: subsequent Send/TrySend calls could pass because we are reading from
  33. // the send queue in a separate goroutine.
  34. assert.False(mconn.CanSend(0x01), "CanSend should return false because queue is full")
  35. server.Read(make([]byte, len(msg)))
  36. assert.True(mconn.CanSend(0x01))
  37. msg = "Spider-Man"
  38. assert.True(mconn.TrySend(0x01, msg))
  39. server.Read(make([]byte, len(msg)))
  40. assert.False(mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
  41. assert.False(mconn.Send(0x05, "Absorbing Man"), "Send should return false because channel is unknown")
  42. }
  43. func TestMConnectionReceive(t *testing.T) {
  44. assert, require := assert.New(t), require.New(t)
  45. server, client := net.Pipe()
  46. defer server.Close()
  47. defer client.Close()
  48. receivedCh := make(chan []byte)
  49. errorsCh := make(chan interface{})
  50. onReceive := func(chID byte, msgBytes []byte) {
  51. receivedCh <- msgBytes
  52. }
  53. onError := func(r interface{}) {
  54. errorsCh <- r
  55. }
  56. mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
  57. _, err := mconn1.Start()
  58. require.Nil(err)
  59. defer mconn1.Stop()
  60. mconn2 := createMConnection(server)
  61. _, err = mconn2.Start()
  62. require.Nil(err)
  63. defer mconn2.Stop()
  64. msg := "Cyclops"
  65. assert.True(mconn2.Send(0x01, msg))
  66. select {
  67. case receivedBytes := <-receivedCh:
  68. assert.Equal([]byte(msg), receivedBytes[2:]) // first 3 bytes are internal
  69. case err := <-errorsCh:
  70. t.Fatalf("Expected %s, got %+v", msg, err)
  71. case <-time.After(500 * time.Millisecond):
  72. t.Fatalf("Did not receive %s message in 500ms", msg)
  73. }
  74. }
  75. func TestMConnectionStatus(t *testing.T) {
  76. assert, require := assert.New(t), require.New(t)
  77. server, client := net.Pipe()
  78. defer server.Close()
  79. defer client.Close()
  80. mconn := createMConnection(client)
  81. _, err := mconn.Start()
  82. require.Nil(err)
  83. defer mconn.Stop()
  84. status := mconn.Status()
  85. assert.NotNil(status)
  86. assert.Zero(status.Channels[0].SendQueueSize)
  87. }
  88. func TestMConnectionStopsAndReturnsError(t *testing.T) {
  89. assert, require := assert.New(t), require.New(t)
  90. server, client := net.Pipe()
  91. defer server.Close()
  92. defer client.Close()
  93. receivedCh := make(chan []byte)
  94. errorsCh := make(chan interface{})
  95. onReceive := func(chID byte, msgBytes []byte) {
  96. receivedCh <- msgBytes
  97. }
  98. onError := func(r interface{}) {
  99. errorsCh <- r
  100. }
  101. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  102. _, err := mconn.Start()
  103. require.Nil(err)
  104. defer mconn.Stop()
  105. client.Close()
  106. select {
  107. case receivedBytes := <-receivedCh:
  108. t.Fatalf("Expected error, got %v", receivedBytes)
  109. case err := <-errorsCh:
  110. assert.NotNil(err)
  111. assert.False(mconn.IsRunning())
  112. case <-time.After(500 * time.Millisecond):
  113. t.Fatal("Did not receive error in 500ms")
  114. }
  115. }