You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

315 lines
8.4 KiB

8 years ago
8 years ago
8 years ago
  1. package p2p
  2. import (
  3. "net"
  4. "testing"
  5. "time"
  6. "github.com/stretchr/testify/assert"
  7. "github.com/stretchr/testify/require"
  8. wire "github.com/tendermint/go-wire"
  9. "github.com/tendermint/tmlibs/log"
  10. )
  11. func createTestMConnection(conn net.Conn) *MConnection {
  12. onReceive := func(chID byte, msgBytes []byte) {
  13. }
  14. onError := func(r interface{}) {
  15. }
  16. c := createMConnectionWithCallbacks(conn, onReceive, onError)
  17. c.SetLogger(log.TestingLogger())
  18. return c
  19. }
  20. func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection {
  21. chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
  22. c := NewMConnection(conn, chDescs, onReceive, onError)
  23. c.SetLogger(log.TestingLogger())
  24. return c
  25. }
  26. func TestMConnectionSend(t *testing.T) {
  27. assert, require := assert.New(t), require.New(t)
  28. server, client := netPipe()
  29. defer server.Close() // nolint: errcheck
  30. defer client.Close() // nolint: errcheck
  31. mconn := createTestMConnection(client)
  32. _, err := mconn.Start()
  33. require.Nil(err)
  34. defer mconn.Stop()
  35. msg := "Ant-Man"
  36. assert.True(mconn.Send(0x01, msg))
  37. // Note: subsequent Send/TrySend calls could pass because we are reading from
  38. // the send queue in a separate goroutine.
  39. _, err = server.Read(make([]byte, len(msg)))
  40. if err != nil {
  41. t.Error(err)
  42. }
  43. assert.True(mconn.CanSend(0x01))
  44. msg = "Spider-Man"
  45. assert.True(mconn.TrySend(0x01, msg))
  46. _, err = server.Read(make([]byte, len(msg)))
  47. if err != nil {
  48. t.Error(err)
  49. }
  50. assert.False(mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
  51. assert.False(mconn.Send(0x05, "Absorbing Man"), "Send should return false because channel is unknown")
  52. }
  53. func TestMConnectionReceive(t *testing.T) {
  54. assert, require := assert.New(t), require.New(t)
  55. server, client := netPipe()
  56. defer server.Close() // nolint: errcheck
  57. defer client.Close() // nolint: errcheck
  58. receivedCh := make(chan []byte)
  59. errorsCh := make(chan interface{})
  60. onReceive := func(chID byte, msgBytes []byte) {
  61. receivedCh <- msgBytes
  62. }
  63. onError := func(r interface{}) {
  64. errorsCh <- r
  65. }
  66. mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
  67. _, err := mconn1.Start()
  68. require.Nil(err)
  69. defer mconn1.Stop()
  70. mconn2 := createTestMConnection(server)
  71. _, err = mconn2.Start()
  72. require.Nil(err)
  73. defer mconn2.Stop()
  74. msg := "Cyclops"
  75. assert.True(mconn2.Send(0x01, msg))
  76. select {
  77. case receivedBytes := <-receivedCh:
  78. assert.Equal([]byte(msg), receivedBytes[2:]) // first 3 bytes are internal
  79. case err := <-errorsCh:
  80. t.Fatalf("Expected %s, got %+v", msg, err)
  81. case <-time.After(500 * time.Millisecond):
  82. t.Fatalf("Did not receive %s message in 500ms", msg)
  83. }
  84. }
  85. func TestMConnectionStatus(t *testing.T) {
  86. assert, require := assert.New(t), require.New(t)
  87. server, client := netPipe()
  88. defer server.Close() // nolint: errcheck
  89. defer client.Close() // nolint: errcheck
  90. mconn := createTestMConnection(client)
  91. _, err := mconn.Start()
  92. require.Nil(err)
  93. defer mconn.Stop()
  94. status := mconn.Status()
  95. assert.NotNil(status)
  96. assert.Zero(status.Channels[0].SendQueueSize)
  97. }
  98. func TestMConnectionStopsAndReturnsError(t *testing.T) {
  99. assert, require := assert.New(t), require.New(t)
  100. server, client := netPipe()
  101. defer server.Close() // nolint: errcheck
  102. defer client.Close() // nolint: errcheck
  103. receivedCh := make(chan []byte)
  104. errorsCh := make(chan interface{})
  105. onReceive := func(chID byte, msgBytes []byte) {
  106. receivedCh <- msgBytes
  107. }
  108. onError := func(r interface{}) {
  109. errorsCh <- r
  110. }
  111. mconn := createMConnectionWithCallbacks(client, onReceive, onError)
  112. _, err := mconn.Start()
  113. require.Nil(err)
  114. defer mconn.Stop()
  115. if err := client.Close(); err != nil {
  116. t.Error(err)
  117. }
  118. select {
  119. case receivedBytes := <-receivedCh:
  120. t.Fatalf("Expected error, got %v", receivedBytes)
  121. case err := <-errorsCh:
  122. assert.NotNil(err)
  123. assert.False(mconn.IsRunning())
  124. case <-time.After(500 * time.Millisecond):
  125. t.Fatal("Did not receive error in 500ms")
  126. }
  127. }
  128. func newClientAndServerConnsForReadErrors(require *require.Assertions, chOnErr chan struct{}) (*MConnection, *MConnection) {
  129. server, client := netPipe()
  130. onReceive := func(chID byte, msgBytes []byte) {}
  131. onError := func(r interface{}) {}
  132. // create client conn with two channels
  133. chDescs := []*ChannelDescriptor{
  134. {ID: 0x01, Priority: 1, SendQueueCapacity: 1},
  135. {ID: 0x02, Priority: 1, SendQueueCapacity: 1},
  136. }
  137. mconnClient := NewMConnection(client, chDescs, onReceive, onError)
  138. mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
  139. _, err := mconnClient.Start()
  140. require.Nil(err)
  141. // create server conn with 1 channel
  142. // it fires on chOnErr when there's an error
  143. serverLogger := log.TestingLogger().With("module", "server")
  144. onError = func(r interface{}) {
  145. chOnErr <- struct{}{}
  146. }
  147. mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
  148. mconnServer.SetLogger(serverLogger)
  149. _, err = mconnServer.Start()
  150. require.Nil(err)
  151. return mconnClient, mconnServer
  152. }
  153. func expectSend(ch chan struct{}) bool {
  154. after := time.After(time.Second * 5)
  155. select {
  156. case <-ch:
  157. return true
  158. case <-after:
  159. return false
  160. }
  161. }
  162. func TestMConnectionReadErrorBadEncoding(t *testing.T) {
  163. assert, require := assert.New(t), require.New(t)
  164. chOnErr := make(chan struct{})
  165. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr)
  166. defer mconnClient.Stop()
  167. defer mconnServer.Stop()
  168. client := mconnClient.conn
  169. msg := "Ant-Man"
  170. // send badly encoded msgPacket
  171. var n int
  172. var err error
  173. wire.WriteByte(packetTypeMsg, client, &n, &err)
  174. wire.WriteByteSlice([]byte(msg), client, &n, &err)
  175. assert.True(expectSend(chOnErr), "badly encoded msgPacket")
  176. }
  177. func TestMConnectionReadErrorUnknownChannel(t *testing.T) {
  178. assert, require := assert.New(t), require.New(t)
  179. chOnErr := make(chan struct{})
  180. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr)
  181. defer mconnClient.Stop()
  182. defer mconnServer.Stop()
  183. msg := "Ant-Man"
  184. // fail to send msg on channel unknown by client
  185. assert.False(mconnClient.Send(0x03, msg))
  186. // send msg on channel unknown by the server.
  187. // should cause an error
  188. assert.True(mconnClient.Send(0x02, msg))
  189. assert.True(expectSend(chOnErr), "unknown channel")
  190. }
  191. func TestMConnectionReadErrorLongMessage(t *testing.T) {
  192. assert, require := assert.New(t), require.New(t)
  193. chOnErr := make(chan struct{})
  194. chOnRcv := make(chan struct{})
  195. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr)
  196. defer mconnClient.Stop()
  197. defer mconnServer.Stop()
  198. mconnServer.onReceive = func(chID byte, msgBytes []byte) {
  199. chOnRcv <- struct{}{}
  200. }
  201. client := mconnClient.conn
  202. // send msg thats just right
  203. var n int
  204. var err error
  205. packet := msgPacket{
  206. ChannelID: 0x01,
  207. Bytes: make([]byte, mconnClient.config.maxMsgPacketTotalSize()-5),
  208. EOF: 1,
  209. }
  210. writeMsgPacketTo(packet, client, &n, &err)
  211. assert.True(expectSend(chOnRcv), "msg just right")
  212. // send msg thats too long
  213. packet = msgPacket{
  214. ChannelID: 0x01,
  215. Bytes: make([]byte, mconnClient.config.maxMsgPacketTotalSize()-4),
  216. EOF: 1,
  217. }
  218. writeMsgPacketTo(packet, client, &n, &err)
  219. assert.True(expectSend(chOnErr), "msg too long")
  220. }
  221. func TestMConnectionReadErrorUnknownMsgType(t *testing.T) {
  222. assert, require := assert.New(t), require.New(t)
  223. chOnErr := make(chan struct{})
  224. mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr)
  225. defer mconnClient.Stop()
  226. defer mconnServer.Stop()
  227. // send msg with unknown msg type
  228. var n int
  229. var err error
  230. wire.WriteByte(0x04, mconnClient.conn, &n, &err)
  231. assert.True(expectSend(chOnErr), "unknown msg type")
  232. }
  233. func TestMConnectionTrySend(t *testing.T) {
  234. assert, require := assert.New(t), require.New(t)
  235. server, client := netPipe()
  236. defer server.Close()
  237. defer client.Close()
  238. mconn := createTestMConnection(client)
  239. _, err := mconn.Start()
  240. require.Nil(err)
  241. defer mconn.Stop()
  242. msg := "Semicolon-Woman"
  243. resultCh := make(chan string, 2)
  244. assert.True(mconn.TrySend(0x01, msg))
  245. server.Read(make([]byte, len(msg)))
  246. assert.True(mconn.CanSend(0x01))
  247. assert.True(mconn.TrySend(0x01, msg))
  248. assert.False(mconn.CanSend(0x01))
  249. go func() {
  250. mconn.TrySend(0x01, msg)
  251. resultCh <- "TrySend"
  252. }()
  253. go func() {
  254. mconn.Send(0x01, msg)
  255. resultCh <- "Send"
  256. }()
  257. assert.False(mconn.CanSend(0x01))
  258. assert.False(mconn.TrySend(0x01, msg))
  259. assert.Equal("TrySend", <-resultCh)
  260. server.Read(make([]byte, len(msg)))
  261. assert.Equal("Send", <-resultCh) // Order constrained by parallel blocking above
  262. }