You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

189 lines
4.8 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
limit number of /subscribe clients and queries per client (#3269) * limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
6 years ago
  1. package client_test
  2. import (
  3. "context"
  4. "fmt"
  5. "reflect"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. abci "github.com/tendermint/tendermint/abci/types"
  11. tmrand "github.com/tendermint/tendermint/libs/rand"
  12. "github.com/tendermint/tendermint/rpc/client"
  13. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. var waitForEventTimeout = 8 * time.Second
  17. // MakeTxKV returns a text transaction, allong with expected key, value pair
  18. func MakeTxKV() ([]byte, []byte, []byte) {
  19. k := []byte(tmrand.Str(8))
  20. v := []byte(tmrand.Str(8))
  21. return k, v, append(k, append([]byte("="), v...)...)
  22. }
  23. func TestHeaderEvents(t *testing.T) {
  24. n, conf := NodeSuite(t)
  25. for i, c := range GetClients(t, n, conf) {
  26. i, c := i, c
  27. t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
  28. // start for this test it if it wasn't already running
  29. if !c.IsRunning() {
  30. // if so, then we start it, listen, and stop it.
  31. err := c.Start()
  32. require.Nil(t, err, "%d: %+v", i, err)
  33. t.Cleanup(func() {
  34. if err := c.Stop(); err != nil {
  35. t.Error(err)
  36. }
  37. })
  38. }
  39. evt, err := client.WaitForOneEvent(c, types.EventNewBlockHeaderValue, waitForEventTimeout)
  40. require.Nil(t, err, "%d: %+v", i, err)
  41. _, ok := evt.(types.EventDataNewBlockHeader)
  42. require.True(t, ok, "%d: %#v", i, evt)
  43. // TODO: more checks...
  44. })
  45. }
  46. }
  47. // subscribe to new blocks and make sure height increments by 1
  48. func TestBlockEvents(t *testing.T) {
  49. n, conf := NodeSuite(t)
  50. for _, c := range GetClients(t, n, conf) {
  51. c := c
  52. t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
  53. // start for this test it if it wasn't already running
  54. if !c.IsRunning() {
  55. // if so, then we start it, listen, and stop it.
  56. err := c.Start()
  57. require.Nil(t, err)
  58. t.Cleanup(func() {
  59. if err := c.Stop(); err != nil {
  60. t.Error(err)
  61. }
  62. })
  63. }
  64. const subscriber = "TestBlockEvents"
  65. eventCh, err := c.Subscribe(context.Background(), subscriber, types.QueryForEvent(types.EventNewBlockValue).String())
  66. require.NoError(t, err)
  67. t.Cleanup(func() {
  68. if err := c.UnsubscribeAll(context.Background(), subscriber); err != nil {
  69. t.Error(err)
  70. }
  71. })
  72. var firstBlockHeight int64
  73. for i := int64(0); i < 3; i++ {
  74. event := <-eventCh
  75. blockEvent, ok := event.Data.(types.EventDataNewBlock)
  76. require.True(t, ok)
  77. block := blockEvent.Block
  78. if firstBlockHeight == 0 {
  79. firstBlockHeight = block.Header.Height
  80. }
  81. require.Equal(t, firstBlockHeight+i, block.Header.Height)
  82. }
  83. })
  84. }
  85. }
  86. func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { testTxEventsSent(t, "async") }
  87. func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { testTxEventsSent(t, "sync") }
  88. func testTxEventsSent(t *testing.T, broadcastMethod string) {
  89. n, conf := NodeSuite(t)
  90. for _, c := range GetClients(t, n, conf) {
  91. c := c
  92. t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
  93. // start for this test it if it wasn't already running
  94. if !c.IsRunning() {
  95. // if so, then we start it, listen, and stop it.
  96. err := c.Start()
  97. require.Nil(t, err)
  98. t.Cleanup(func() {
  99. if err := c.Stop(); err != nil {
  100. t.Error(err)
  101. }
  102. })
  103. }
  104. // make the tx
  105. _, _, tx := MakeTxKV()
  106. // send
  107. go func() {
  108. var (
  109. txres *ctypes.ResultBroadcastTx
  110. err error
  111. ctx = context.Background()
  112. )
  113. switch broadcastMethod {
  114. case "async":
  115. txres, err = c.BroadcastTxAsync(ctx, tx)
  116. case "sync":
  117. txres, err = c.BroadcastTxSync(ctx, tx)
  118. default:
  119. panic(fmt.Sprintf("Unknown broadcastMethod %s", broadcastMethod))
  120. }
  121. if assert.NoError(t, err) {
  122. assert.Equal(t, txres.Code, abci.CodeTypeOK)
  123. }
  124. }()
  125. // and wait for confirmation
  126. evt, err := client.WaitForOneEvent(c, types.EventTxValue, waitForEventTimeout)
  127. require.Nil(t, err)
  128. // and make sure it has the proper info
  129. txe, ok := evt.(types.EventDataTx)
  130. require.True(t, ok)
  131. // make sure this is the proper tx
  132. require.EqualValues(t, tx, txe.Tx)
  133. require.True(t, txe.Result.IsOK())
  134. })
  135. }
  136. }
  137. // Test HTTPClient resubscribes upon disconnect && subscription error.
  138. // Test Local client resubscribes upon subscription error.
  139. func TestClientsResubscribe(t *testing.T) {
  140. // TODO(melekes)
  141. }
  142. func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) {
  143. ctx, cancel := context.WithCancel(context.Background())
  144. defer cancel()
  145. _, conf := NodeSuite(t)
  146. c := getHTTPClient(t, conf)
  147. // on Subscribe
  148. _, err := c.Subscribe(ctx, "TestHeaderEvents",
  149. types.QueryForEvent(types.EventNewBlockHeaderValue).String())
  150. assert.Error(t, err)
  151. // on Unsubscribe
  152. err = c.Unsubscribe(ctx, "TestHeaderEvents",
  153. types.QueryForEvent(types.EventNewBlockHeaderValue).String())
  154. assert.Error(t, err)
  155. // on UnsubscribeAll
  156. err = c.UnsubscribeAll(ctx, "TestHeaderEvents")
  157. assert.Error(t, err)
  158. }