You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

137 lines
3.8 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
limit number of /subscribe clients and queries per client (#3269) * limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
6 years ago
  1. package client_test
  2. import (
  3. "fmt"
  4. "reflect"
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/require"
  8. abci "github.com/tendermint/tendermint/abci/types"
  9. tmrand "github.com/tendermint/tendermint/libs/rand"
  10. "github.com/tendermint/tendermint/rpc/client"
  11. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  12. "github.com/tendermint/tendermint/types"
  13. )
  14. var waitForEventTimeout = 5 * time.Second
  15. // MakeTxKV returns a text transaction, allong with expected key, value pair
  16. func MakeTxKV() ([]byte, []byte, []byte) {
  17. k := []byte(tmrand.Str(8))
  18. v := []byte(tmrand.Str(8))
  19. return k, v, append(k, append([]byte("="), v...)...)
  20. }
  21. func TestHeaderEvents(t *testing.T) {
  22. for i, c := range GetClients() {
  23. i, c := i, c // capture params
  24. t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
  25. // start for this test it if it wasn't already running
  26. if !c.IsRunning() {
  27. // if so, then we start it, listen, and stop it.
  28. err := c.Start()
  29. require.Nil(t, err, "%d: %+v", i, err)
  30. defer c.Stop()
  31. }
  32. evtTyp := types.EventNewBlockHeader
  33. evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
  34. require.Nil(t, err, "%d: %+v", i, err)
  35. _, ok := evt.(types.EventDataNewBlockHeader)
  36. require.True(t, ok, "%d: %#v", i, evt)
  37. // TODO: more checks...
  38. })
  39. }
  40. }
  41. func TestBlockEvents(t *testing.T) {
  42. for i, c := range GetClients() {
  43. i, c := i, c // capture params
  44. t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
  45. // start for this test it if it wasn't already running
  46. if !c.IsRunning() {
  47. // if so, then we start it, listen, and stop it.
  48. err := c.Start()
  49. require.Nil(t, err, "%d: %+v", i, err)
  50. defer c.Stop()
  51. }
  52. // listen for a new block; ensure height increases by 1
  53. var firstBlockHeight int64
  54. for j := 0; j < 3; j++ {
  55. evtTyp := types.EventNewBlock
  56. evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
  57. require.Nil(t, err, "%d: %+v", j, err)
  58. blockEvent, ok := evt.(types.EventDataNewBlock)
  59. require.True(t, ok, "%d: %#v", j, evt)
  60. block := blockEvent.Block
  61. if j == 0 {
  62. firstBlockHeight = block.Header.Height
  63. continue
  64. }
  65. require.Equal(t, block.Header.Height, firstBlockHeight+int64(j))
  66. }
  67. })
  68. }
  69. }
  70. func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { testTxEventsSent(t, "async") }
  71. func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { testTxEventsSent(t, "sync") }
  72. func testTxEventsSent(t *testing.T, broadcastMethod string) {
  73. for i, c := range GetClients() {
  74. i, c := i, c // capture params
  75. t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
  76. // start for this test it if it wasn't already running
  77. if !c.IsRunning() {
  78. // if so, then we start it, listen, and stop it.
  79. err := c.Start()
  80. require.Nil(t, err, "%d: %+v", i, err)
  81. defer c.Stop()
  82. }
  83. // make the tx
  84. _, _, tx := MakeTxKV()
  85. evtTyp := types.EventTx
  86. // send
  87. var (
  88. txres *ctypes.ResultBroadcastTx
  89. err error
  90. )
  91. switch broadcastMethod {
  92. case "async":
  93. txres, err = c.BroadcastTxAsync(tx)
  94. case "sync":
  95. txres, err = c.BroadcastTxSync(tx)
  96. default:
  97. panic(fmt.Sprintf("Unknown broadcastMethod %s", broadcastMethod))
  98. }
  99. require.NoError(t, err)
  100. require.Equal(t, txres.Code, abci.CodeTypeOK)
  101. // and wait for confirmation
  102. evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
  103. require.Nil(t, err, "%d: %+v", i, err)
  104. // and make sure it has the proper info
  105. txe, ok := evt.(types.EventDataTx)
  106. require.True(t, ok, "%d: %#v", i, evt)
  107. // make sure this is the proper tx
  108. require.EqualValues(t, tx, txe.Tx)
  109. require.True(t, txe.Result.IsOK())
  110. })
  111. }
  112. }
  113. // Test HTTPClient resubscribes upon disconnect && subscription error.
  114. // Test Local client resubscribes upon subscription error.
  115. func TestClientsResubscribe(t *testing.T) {
  116. // TODO(melekes)
  117. }