You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

262 lines
7.8 KiB

limit number of /subscribe clients and queries per client (#3269) * limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
6 years ago
7 years ago
limit number of /subscribe clients and queries per client (#3269) * limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
6 years ago
limit number of /subscribe clients and queries per client (#3269) * limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
6 years ago
7 years ago
  1. package proxy
  2. import (
  3. "context"
  4. "fmt"
  5. cmn "github.com/tendermint/tendermint/libs/common"
  6. "github.com/tendermint/tendermint/crypto/merkle"
  7. "github.com/tendermint/tendermint/lite"
  8. rpcclient "github.com/tendermint/tendermint/rpc/client"
  9. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  10. rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
  11. )
  12. var _ rpcclient.Client = Wrapper{}
  13. // Wrapper wraps a rpcclient with a Verifier and double-checks any input that is
  14. // provable before passing it along. Allows you to make any rpcclient fully secure.
  15. type Wrapper struct {
  16. rpcclient.Client
  17. cert *lite.DynamicVerifier
  18. prt *merkle.ProofRuntime
  19. }
  20. // SecureClient uses a given Verifier to wrap an connection to an untrusted
  21. // host and return a cryptographically secure rpc client.
  22. //
  23. // If it is wrapping an HTTP rpcclient, it will also wrap the websocket interface
  24. func SecureClient(c rpcclient.Client, cert *lite.DynamicVerifier) Wrapper {
  25. prt := defaultProofRuntime()
  26. wrap := Wrapper{c, cert, prt}
  27. // TODO: no longer possible as no more such interface exposed....
  28. // if we wrap http client, then we can swap out the event switch to filter
  29. // if hc, ok := c.(*rpcclient.HTTP); ok {
  30. // evt := hc.WSEvents.EventSwitch
  31. // hc.WSEvents.EventSwitch = WrappedSwitch{evt, wrap}
  32. // }
  33. return wrap
  34. }
  35. // ABCIQueryWithOptions exposes all options for the ABCI query and verifies the returned proof
  36. func (w Wrapper) ABCIQueryWithOptions(path string, data cmn.HexBytes,
  37. opts rpcclient.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
  38. res, err := GetWithProofOptions(w.prt, path, data, opts, w.Client, w.cert)
  39. return res, err
  40. }
  41. // ABCIQuery uses default options for the ABCI query and verifies the returned proof
  42. func (w Wrapper) ABCIQuery(path string, data cmn.HexBytes) (*ctypes.ResultABCIQuery, error) {
  43. return w.ABCIQueryWithOptions(path, data, rpcclient.DefaultABCIQueryOptions)
  44. }
  45. // Tx queries for a given tx and verifies the proof if it was requested
  46. func (w Wrapper) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
  47. res, err := w.Client.Tx(hash, prove)
  48. if !prove || err != nil {
  49. return res, err
  50. }
  51. h := int64(res.Height)
  52. sh, err := GetCertifiedCommit(h, w.Client, w.cert)
  53. if err != nil {
  54. return res, err
  55. }
  56. err = res.Proof.Validate(sh.DataHash)
  57. return res, err
  58. }
  59. // BlockchainInfo requests a list of headers and verifies them all...
  60. // Rather expensive.
  61. //
  62. // TODO: optimize this if used for anything needing performance
  63. func (w Wrapper) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
  64. r, err := w.Client.BlockchainInfo(minHeight, maxHeight)
  65. if err != nil {
  66. return nil, err
  67. }
  68. // go and verify every blockmeta in the result....
  69. for _, meta := range r.BlockMetas {
  70. // get a checkpoint to verify from
  71. res, err := w.Commit(&meta.Header.Height)
  72. if err != nil {
  73. return nil, err
  74. }
  75. sh := res.SignedHeader
  76. err = ValidateBlockMeta(meta, sh)
  77. if err != nil {
  78. return nil, err
  79. }
  80. }
  81. return r, nil
  82. }
  83. // Block returns an entire block and verifies all signatures
  84. func (w Wrapper) Block(height *int64) (*ctypes.ResultBlock, error) {
  85. resBlock, err := w.Client.Block(height)
  86. if err != nil {
  87. return nil, err
  88. }
  89. // get a checkpoint to verify from
  90. resCommit, err := w.Commit(height)
  91. if err != nil {
  92. return nil, err
  93. }
  94. sh := resCommit.SignedHeader
  95. // now verify
  96. err = ValidateBlockMeta(resBlock.BlockMeta, sh)
  97. if err != nil {
  98. return nil, err
  99. }
  100. err = ValidateBlock(resBlock.Block, sh)
  101. if err != nil {
  102. return nil, err
  103. }
  104. return resBlock, nil
  105. }
  106. // Commit downloads the Commit and certifies it with the lite.
  107. //
  108. // This is the foundation for all other verification in this module
  109. func (w Wrapper) Commit(height *int64) (*ctypes.ResultCommit, error) {
  110. if height == nil {
  111. resStatus, err := w.Client.Status()
  112. if err != nil {
  113. return nil, err
  114. }
  115. // NOTE: If resStatus.CatchingUp, there is a race
  116. // condition where the validator set for the next height
  117. // isn't available until some time after the blockstore
  118. // has height h on the remote node. This isn't an issue
  119. // once the node has caught up, and a syncing node likely
  120. // won't have this issue esp with the implementation we
  121. // have here, but we may have to address this at some
  122. // point.
  123. height = new(int64)
  124. *height = resStatus.SyncInfo.LatestBlockHeight
  125. }
  126. rpcclient.WaitForHeight(w.Client, *height, nil)
  127. res, err := w.Client.Commit(height)
  128. // if we got it, then verify it
  129. if err == nil {
  130. sh := res.SignedHeader
  131. err = w.cert.Verify(sh)
  132. }
  133. return res, err
  134. }
  135. func (w Wrapper) RegisterOpDecoder(typ string, dec merkle.OpDecoder) {
  136. w.prt.RegisterOpDecoder(typ, dec)
  137. }
  138. // SubscribeWS subscribes for events using the given query and remote address as
  139. // a subscriber, but does not verify responses (UNSAFE)!
  140. func (w Wrapper) SubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
  141. out, err := w.Client.Subscribe(context.Background(), ctx.RemoteAddr(), query)
  142. if err != nil {
  143. return nil, err
  144. }
  145. go func() {
  146. for {
  147. select {
  148. case resultEvent := <-out:
  149. // XXX(melekes) We should have a switch here that performs a validation
  150. // depending on the event's type.
  151. ctx.WSConn.TryWriteRPCResponse(
  152. rpctypes.NewRPCSuccessResponse(
  153. ctx.WSConn.Codec(),
  154. rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)),
  155. resultEvent,
  156. ))
  157. case <-w.Client.Quit():
  158. return
  159. }
  160. }
  161. }()
  162. return &ctypes.ResultSubscribe{}, nil
  163. }
  164. // UnsubscribeWS calls original client's Unsubscribe using remote address as a
  165. // subscriber.
  166. func (w Wrapper) UnsubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) {
  167. err := w.Client.Unsubscribe(context.Background(), ctx.RemoteAddr(), query)
  168. if err != nil {
  169. return nil, err
  170. }
  171. return &ctypes.ResultUnsubscribe{}, nil
  172. }
  173. // UnsubscribeAllWS calls original client's UnsubscribeAll using remote address
  174. // as a subscriber.
  175. func (w Wrapper) UnsubscribeAllWS(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
  176. err := w.Client.UnsubscribeAll(context.Background(), ctx.RemoteAddr())
  177. if err != nil {
  178. return nil, err
  179. }
  180. return &ctypes.ResultUnsubscribe{}, nil
  181. }
  182. // // WrappedSwitch creates a websocket connection that auto-verifies any info
  183. // // coming through before passing it along.
  184. // //
  185. // // Since the verification takes 1-2 rpc calls, this is obviously only for
  186. // // relatively low-throughput situations that can tolerate a bit extra latency
  187. // type WrappedSwitch struct {
  188. // types.EventSwitch
  189. // client rpcclient.Client
  190. // }
  191. // // FireEvent verifies any block or header returned from the eventswitch
  192. // func (s WrappedSwitch) FireEvent(event string, data events.EventData) {
  193. // tm, ok := data.(types.TMEventData)
  194. // if !ok {
  195. // fmt.Printf("bad type %#v\n", data)
  196. // return
  197. // }
  198. // // check to validate it if possible, and drop if not valid
  199. // switch t := tm.(type) {
  200. // case types.EventDataNewBlockHeader:
  201. // err := verifyHeader(s.client, t.Header)
  202. // if err != nil {
  203. // fmt.Printf("Invalid header: %#v\n", err)
  204. // return
  205. // }
  206. // case types.EventDataNewBlock:
  207. // err := verifyBlock(s.client, t.Block)
  208. // if err != nil {
  209. // fmt.Printf("Invalid block: %#v\n", err)
  210. // return
  211. // }
  212. // // TODO: can we verify tx as well? anything else
  213. // }
  214. // // looks good, we fire it
  215. // s.EventSwitch.FireEvent(event, data)
  216. // }
  217. // func verifyHeader(c rpcclient.Client, head *types.Header) error {
  218. // // get a checkpoint to verify from
  219. // commit, err := c.Commit(&head.Height)
  220. // if err != nil {
  221. // return err
  222. // }
  223. // check := certclient.CommitFromResult(commit)
  224. // return ValidateHeader(head, check)
  225. // }
  226. //
  227. // func verifyBlock(c rpcclient.Client, block *types.Block) error {
  228. // // get a checkpoint to verify from
  229. // commit, err := c.Commit(&block.Height)
  230. // if err != nil {
  231. // return err
  232. // }
  233. // check := certclient.CommitFromResult(commit)
  234. // return ValidateBlock(block, check)
  235. // }