You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

190 lines
4.8 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
limit number of /subscribe clients and queries per client (#3269) * limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
6 years ago
  1. package client_test
  2. import (
  3. "context"
  4. "fmt"
  5. "reflect"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. abci "github.com/tendermint/tendermint/abci/types"
  11. tmrand "github.com/tendermint/tendermint/libs/rand"
  12. "github.com/tendermint/tendermint/rpc/client"
  13. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. var waitForEventTimeout = 8 * time.Second
  17. // MakeTxKV returns a text transaction, allong with expected key, value pair
  18. func MakeTxKV() ([]byte, []byte, []byte) {
  19. k := []byte(tmrand.Str(8))
  20. v := []byte(tmrand.Str(8))
  21. return k, v, append(k, append([]byte("="), v...)...)
  22. }
  23. func TestHeaderEvents(t *testing.T) {
  24. n, conf := NodeSuite(t)
  25. for i, c := range GetClients(t, n, conf) {
  26. i, c := i, c
  27. t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
  28. // start for this test it if it wasn't already running
  29. if !c.IsRunning() {
  30. // if so, then we start it, listen, and stop it.
  31. err := c.Start()
  32. require.Nil(t, err, "%d: %+v", i, err)
  33. t.Cleanup(func() {
  34. if err := c.Stop(); err != nil {
  35. t.Error(err)
  36. }
  37. })
  38. }
  39. evtTyp := types.EventNewBlockHeader
  40. evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
  41. require.Nil(t, err, "%d: %+v", i, err)
  42. _, ok := evt.(types.EventDataNewBlockHeader)
  43. require.True(t, ok, "%d: %#v", i, evt)
  44. // TODO: more checks...
  45. })
  46. }
  47. }
  48. // subscribe to new blocks and make sure height increments by 1
  49. func TestBlockEvents(t *testing.T) {
  50. n, conf := NodeSuite(t)
  51. for _, c := range GetClients(t, n, conf) {
  52. c := c
  53. t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
  54. // start for this test it if it wasn't already running
  55. if !c.IsRunning() {
  56. // if so, then we start it, listen, and stop it.
  57. err := c.Start()
  58. require.Nil(t, err)
  59. t.Cleanup(func() {
  60. if err := c.Stop(); err != nil {
  61. t.Error(err)
  62. }
  63. })
  64. }
  65. const subscriber = "TestBlockEvents"
  66. eventCh, err := c.Subscribe(context.Background(), subscriber, types.QueryForEvent(types.EventNewBlock).String())
  67. require.NoError(t, err)
  68. t.Cleanup(func() {
  69. if err := c.UnsubscribeAll(context.Background(), subscriber); err != nil {
  70. t.Error(err)
  71. }
  72. })
  73. var firstBlockHeight int64
  74. for i := int64(0); i < 3; i++ {
  75. event := <-eventCh
  76. blockEvent, ok := event.Data.(types.EventDataNewBlock)
  77. require.True(t, ok)
  78. block := blockEvent.Block
  79. if firstBlockHeight == 0 {
  80. firstBlockHeight = block.Header.Height
  81. }
  82. require.Equal(t, firstBlockHeight+i, block.Header.Height)
  83. }
  84. })
  85. }
  86. }
  87. func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { testTxEventsSent(t, "async") }
  88. func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { testTxEventsSent(t, "sync") }
  89. func testTxEventsSent(t *testing.T, broadcastMethod string) {
  90. n, conf := NodeSuite(t)
  91. for _, c := range GetClients(t, n, conf) {
  92. c := c
  93. t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
  94. // start for this test it if it wasn't already running
  95. if !c.IsRunning() {
  96. // if so, then we start it, listen, and stop it.
  97. err := c.Start()
  98. require.Nil(t, err)
  99. t.Cleanup(func() {
  100. if err := c.Stop(); err != nil {
  101. t.Error(err)
  102. }
  103. })
  104. }
  105. // make the tx
  106. _, _, tx := MakeTxKV()
  107. // send
  108. go func() {
  109. var (
  110. txres *ctypes.ResultBroadcastTx
  111. err error
  112. ctx = context.Background()
  113. )
  114. switch broadcastMethod {
  115. case "async":
  116. txres, err = c.BroadcastTxAsync(ctx, tx)
  117. case "sync":
  118. txres, err = c.BroadcastTxSync(ctx, tx)
  119. default:
  120. panic(fmt.Sprintf("Unknown broadcastMethod %s", broadcastMethod))
  121. }
  122. if assert.NoError(t, err) {
  123. assert.Equal(t, txres.Code, abci.CodeTypeOK)
  124. }
  125. }()
  126. // and wait for confirmation
  127. evt, err := client.WaitForOneEvent(c, types.EventTx, waitForEventTimeout)
  128. require.Nil(t, err)
  129. // and make sure it has the proper info
  130. txe, ok := evt.(types.EventDataTx)
  131. require.True(t, ok)
  132. // make sure this is the proper tx
  133. require.EqualValues(t, tx, txe.Tx)
  134. require.True(t, txe.Result.IsOK())
  135. })
  136. }
  137. }
  138. // Test HTTPClient resubscribes upon disconnect && subscription error.
  139. // Test Local client resubscribes upon subscription error.
  140. func TestClientsResubscribe(t *testing.T) {
  141. // TODO(melekes)
  142. }
  143. func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) {
  144. ctx, cancel := context.WithCancel(context.Background())
  145. defer cancel()
  146. _, conf := NodeSuite(t)
  147. c := getHTTPClient(t, conf)
  148. // on Subscribe
  149. _, err := c.Subscribe(ctx, "TestHeaderEvents",
  150. types.QueryForEvent(types.EventNewBlockHeader).String())
  151. assert.Error(t, err)
  152. // on Unsubscribe
  153. err = c.Unsubscribe(ctx, "TestHeaderEvents",
  154. types.QueryForEvent(types.EventNewBlockHeader).String())
  155. assert.Error(t, err)
  156. // on UnsubscribeAll
  157. err = c.UnsubscribeAll(ctx, "TestHeaderEvents")
  158. assert.Error(t, err)
  159. }