You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

180 lines
4.6 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
limit number of /subscribe clients and queries per client (#3269) * limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
6 years ago
  1. package client_test
  2. import (
  3. "context"
  4. "fmt"
  5. "reflect"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. abci "github.com/tendermint/tendermint/abci/types"
  11. tmrand "github.com/tendermint/tendermint/libs/rand"
  12. "github.com/tendermint/tendermint/rpc/client"
  13. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. var waitForEventTimeout = 8 * time.Second
  17. // MakeTxKV returns a text transaction, allong with expected key, value pair
  18. func MakeTxKV() ([]byte, []byte, []byte) {
  19. k := []byte(tmrand.Str(8))
  20. v := []byte(tmrand.Str(8))
  21. return k, v, append(k, append([]byte("="), v...)...)
  22. }
  23. func TestHeaderEvents(t *testing.T) {
  24. for i, c := range GetClients() {
  25. i, c := i, c
  26. t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
  27. // start for this test it if it wasn't already running
  28. if !c.IsRunning() {
  29. // if so, then we start it, listen, and stop it.
  30. err := c.Start()
  31. require.Nil(t, err, "%d: %+v", i, err)
  32. t.Cleanup(func() {
  33. if err := c.Stop(); err != nil {
  34. t.Error(err)
  35. }
  36. })
  37. }
  38. evtTyp := types.EventNewBlockHeader
  39. evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
  40. require.Nil(t, err, "%d: %+v", i, err)
  41. _, ok := evt.(types.EventDataNewBlockHeader)
  42. require.True(t, ok, "%d: %#v", i, evt)
  43. // TODO: more checks...
  44. })
  45. }
  46. }
  47. // subscribe to new blocks and make sure height increments by 1
  48. func TestBlockEvents(t *testing.T) {
  49. for _, c := range GetClients() {
  50. c := c
  51. t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
  52. // start for this test it if it wasn't already running
  53. if !c.IsRunning() {
  54. // if so, then we start it, listen, and stop it.
  55. err := c.Start()
  56. require.Nil(t, err)
  57. t.Cleanup(func() {
  58. if err := c.Stop(); err != nil {
  59. t.Error(err)
  60. }
  61. })
  62. }
  63. const subscriber = "TestBlockEvents"
  64. eventCh, err := c.Subscribe(context.Background(), subscriber, types.QueryForEvent(types.EventNewBlock).String())
  65. require.NoError(t, err)
  66. t.Cleanup(func() {
  67. if err := c.UnsubscribeAll(context.Background(), subscriber); err != nil {
  68. t.Error(err)
  69. }
  70. })
  71. var firstBlockHeight int64
  72. for i := int64(0); i < 3; i++ {
  73. event := <-eventCh
  74. blockEvent, ok := event.Data.(types.EventDataNewBlock)
  75. require.True(t, ok)
  76. block := blockEvent.Block
  77. if firstBlockHeight == 0 {
  78. firstBlockHeight = block.Header.Height
  79. }
  80. require.Equal(t, firstBlockHeight+i, block.Header.Height)
  81. }
  82. })
  83. }
  84. }
  85. func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { testTxEventsSent(t, "async") }
  86. func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { testTxEventsSent(t, "sync") }
  87. func testTxEventsSent(t *testing.T, broadcastMethod string) {
  88. for _, c := range GetClients() {
  89. c := c
  90. t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
  91. // start for this test it if it wasn't already running
  92. if !c.IsRunning() {
  93. // if so, then we start it, listen, and stop it.
  94. err := c.Start()
  95. require.Nil(t, err)
  96. t.Cleanup(func() {
  97. if err := c.Stop(); err != nil {
  98. t.Error(err)
  99. }
  100. })
  101. }
  102. // make the tx
  103. _, _, tx := MakeTxKV()
  104. // send
  105. go func() {
  106. var (
  107. txres *ctypes.ResultBroadcastTx
  108. err error
  109. )
  110. switch broadcastMethod {
  111. case "async":
  112. txres, err = c.BroadcastTxAsync(tx)
  113. case "sync":
  114. txres, err = c.BroadcastTxSync(tx)
  115. default:
  116. panic(fmt.Sprintf("Unknown broadcastMethod %s", broadcastMethod))
  117. }
  118. if assert.NoError(t, err) {
  119. assert.Equal(t, txres.Code, abci.CodeTypeOK)
  120. }
  121. }()
  122. // and wait for confirmation
  123. evt, err := client.WaitForOneEvent(c, types.EventTx, waitForEventTimeout)
  124. require.Nil(t, err)
  125. // and make sure it has the proper info
  126. txe, ok := evt.(types.EventDataTx)
  127. require.True(t, ok)
  128. // make sure this is the proper tx
  129. require.EqualValues(t, tx, txe.Tx)
  130. require.True(t, txe.Result.IsOK())
  131. })
  132. }
  133. }
  134. // Test HTTPClient resubscribes upon disconnect && subscription error.
  135. // Test Local client resubscribes upon subscription error.
  136. func TestClientsResubscribe(t *testing.T) {
  137. // TODO(melekes)
  138. }
  139. func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) {
  140. c := getHTTPClient()
  141. // on Subscribe
  142. _, err := c.Subscribe(context.Background(), "TestHeaderEvents",
  143. types.QueryForEvent(types.EventNewBlockHeader).String())
  144. assert.Error(t, err)
  145. // on Unsubscribe
  146. err = c.Unsubscribe(context.Background(), "TestHeaderEvents",
  147. types.QueryForEvent(types.EventNewBlockHeader).String())
  148. assert.Error(t, err)
  149. // on UnsubscribeAll
  150. err = c.UnsubscribeAll(context.Background(), "TestHeaderEvents")
  151. assert.Error(t, err)
  152. }