Browse Source

rpc: Add experimental config params to allow for subscription buffer size control (tm v0.34.x) (#7230)

A workaround for #6729. Add parameters to control buffer sizes for
event subscription RPC clients. On some networks, buffering causes
clients to be dropped and/or events to be lost.

For additional context, see the discussion on #7188.

- Add experimental_subscription_buffer_size config parameter
- Add experimental_websocket_write_buffer_size config parameter
- Add experimental_close_on_slow_client config parameter

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
pull/7267/head
Thane Thomson 3 years ago
committed by GitHub
parent
commit
12e3419f2b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 101 additions and 10 deletions
  1. +3
    -1
      CHANGELOG_PENDING.md
  2. +40
    -0
      config/config.go
  3. +27
    -0
      config/toml.go
  4. +1
    -0
      evidence/mocks/block_store.go
  5. +1
    -1
      libs/pubsub/subscription.go
  6. +1
    -0
      node/node.go
  7. +1
    -0
      proxy/mocks/app_conn_consensus.go
  8. +1
    -0
      proxy/mocks/app_conn_mempool.go
  9. +1
    -0
      proxy/mocks/client_creator.go
  10. +17
    -7
      rpc/core/events.go
  11. +2
    -1
      state/indexer/block/kv/kv_test.go
  12. +1
    -0
      state/indexer/block/kv/util.go
  13. +1
    -0
      state/indexer/sink/psql/psql.go
  14. +1
    -0
      state/indexer/sink/psql/psql_test.go
  15. +1
    -0
      state/mocks/evidence_pool.go
  16. +1
    -0
      state/mocks/store.go
  17. +1
    -0
      statesync/mocks/state_provider.go

+ 3
- 1
CHANGELOG_PENDING.md View File

@ -2,7 +2,7 @@
## v0.34.15
Special thanks to external contributors on this release:
Special thanks to external contributors on this release: @thanethomson
Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermint).
@ -10,6 +10,8 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- CLI/RPC/Config
- [config] \#7230 rpc: Add experimental config params to allow for subscription buffer size control (@thanethomson).
- Apps
- P2P Protocol


+ 40
- 0
config/config.go View File

@ -52,6 +52,9 @@ var (
defaultNodeKeyPath = filepath.Join(defaultConfigDir, defaultNodeKeyName)
defaultAddrBookPath = filepath.Join(defaultConfigDir, defaultAddrBookName)
minSubscriptionBufferSize = 100
defaultSubscriptionBufferSize = 200
)
// Config defines the top level configuration for a Tendermint node
@ -342,6 +345,29 @@ type RPCConfig struct {
// to the estimated maximum number of broadcast_tx_commit calls per block.
MaxSubscriptionsPerClient int `mapstructure:"max_subscriptions_per_client"`
// The number of events that can be buffered per subscription before
// returning `ErrOutOfCapacity`.
SubscriptionBufferSize int `mapstructure:"experimental_subscription_buffer_size"`
// The maximum number of responses that can be buffered per WebSocket
// client. If clients cannot read from the WebSocket endpoint fast enough,
// they will be disconnected, so increasing this parameter may reduce the
// chances of them being disconnected (but will cause the node to use more
// memory).
//
// Must be at least the same as `SubscriptionBufferSize`, otherwise
// connections may be dropped unnecessarily.
WebSocketWriteBufferSize int `mapstructure:"experimental_websocket_write_buffer_size"`
// If a WebSocket client cannot read fast enough, at present we may
// silently drop events instead of generating an error or disconnecting the
// client.
//
// Enabling this parameter will cause the WebSocket connection to be closed
// instead if it cannot read fast enough, allowing for greater
// predictability in subscription behaviour.
CloseOnSlowClient bool `mapstructure:"experimental_close_on_slow_client"`
// How long to wait for a tx to be committed during /broadcast_tx_commit
// WARNING: Using a value larger than 10s will result in increasing the
// global HTTP write timeout, which applies to all connections and endpoints.
@ -391,7 +417,9 @@ func DefaultRPCConfig() *RPCConfig {
MaxSubscriptionClients: 100,
MaxSubscriptionsPerClient: 5,
SubscriptionBufferSize: defaultSubscriptionBufferSize,
TimeoutBroadcastTxCommit: 10 * time.Second,
WebSocketWriteBufferSize: defaultSubscriptionBufferSize,
MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
@ -425,6 +453,18 @@ func (cfg *RPCConfig) ValidateBasic() error {
if cfg.MaxSubscriptionsPerClient < 0 {
return errors.New("max_subscriptions_per_client can't be negative")
}
if cfg.SubscriptionBufferSize < minSubscriptionBufferSize {
return fmt.Errorf(
"experimental_subscription_buffer_size must be >= %d",
minSubscriptionBufferSize,
)
}
if cfg.WebSocketWriteBufferSize < cfg.SubscriptionBufferSize {
return fmt.Errorf(
"experimental_websocket_write_buffer_size must be >= experimental_subscription_buffer_size (%d)",
cfg.SubscriptionBufferSize,
)
}
if cfg.TimeoutBroadcastTxCommit < 0 {
return errors.New("timeout_broadcast_tx_commit can't be negative")
}


+ 27
- 0
config/toml.go View File

@ -206,6 +206,33 @@ max_subscription_clients = {{ .RPC.MaxSubscriptionClients }}
# the estimated # maximum number of broadcast_tx_commit calls per block.
max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }}
# Experimental parameter to specify the maximum number of events a node will
# buffer, per subscription, before returning an error and closing the
# subscription. Must be set to at least 100, but higher values will accommodate
# higher event throughput rates (and will use more memory).
experimental_subscription_buffer_size = {{ .RPC.SubscriptionBufferSize }}
# Experimental parameter to specify the maximum number of RPC responses that
# can be buffered per WebSocket client. If clients cannot read from the
# WebSocket endpoint fast enough, they will be disconnected, so increasing this
# parameter may reduce the chances of them being disconnected (but will cause
# the node to use more memory).
#
# Must be at least the same as "experimental_subscription_buffer_size",
# otherwise connections could be dropped unnecessarily. This value should
# ideally be somewhat higher than "experimental_subscription_buffer_size" to
# accommodate non-subscription-related RPC responses.
experimental_websocket_write_buffer_size = {{ .RPC.WebSocketWriteBufferSize }}
# If a WebSocket client cannot read fast enough, at present we may
# silently drop events instead of generating an error or disconnecting the
# client.
#
# Enabling this experimental parameter will cause the WebSocket connection to
# be closed instead if it cannot read fast enough, allowing for greater
# predictability in subscription behaviour.
experimental_close_on_slow_client = {{ .RPC.CloseOnSlowClient }}
# How long to wait for a tx to be committed during /broadcast_tx_commit.
# WARNING: Using a value larger than 10s will result in increasing the
# global HTTP write timeout, which applies to all connections and endpoints.


+ 1
- 0
evidence/mocks/block_store.go View File

@ -4,6 +4,7 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
types "github.com/tendermint/tendermint/types"
)


+ 1
- 1
libs/pubsub/subscription.go View File

@ -12,7 +12,7 @@ var (
// ErrOutOfCapacity is returned by Err when a client is not pulling messages
// fast enough. Note the client's subscription will be terminated.
ErrOutOfCapacity = errors.New("client is not pulling messages fast enough")
ErrOutOfCapacity = errors.New("internal subscription event buffer is out of capacity")
)
// A Subscription represents a client subscription for a particular query and


+ 1
- 0
node/node.go View File

@ -1090,6 +1090,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
}
}),
rpcserver.ReadLimit(config.MaxBodyBytes),
rpcserver.WriteChanCapacity(n.config.RPC.WebSocketWriteBufferSize),
)
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler)


+ 1
- 0
proxy/mocks/app_conn_consensus.go View File

@ -4,6 +4,7 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
abcicli "github.com/tendermint/tendermint/abci/client"
types "github.com/tendermint/tendermint/abci/types"


+ 1
- 0
proxy/mocks/app_conn_mempool.go View File

@ -4,6 +4,7 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
abcicli "github.com/tendermint/tendermint/abci/client"
types "github.com/tendermint/tendermint/abci/types"


+ 1
- 0
proxy/mocks/client_creator.go View File

@ -4,6 +4,7 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
abcicli "github.com/tendermint/tendermint/abci/client"
)


+ 17
- 7
rpc/core/events.go View File

@ -2,6 +2,7 @@ package core
import (
"context"
"errors"
"fmt"
"time"
@ -11,11 +12,6 @@ import (
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
)
const (
// Buffer on the Tendermint (server) side to allow some slowness in clients.
subBufferSize = 100
)
// Subscribe for events via WebSocket.
// More: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe
func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
@ -37,11 +33,13 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout)
defer cancel()
sub, err := env.EventBus.Subscribe(subCtx, addr, q, subBufferSize)
sub, err := env.EventBus.Subscribe(subCtx, addr, q, env.Config.SubscriptionBufferSize)
if err != nil {
return nil, err
}
closeIfSlow := env.Config.CloseOnSlowClient
// Capture the current ID, since it can change in the future.
subscriptionID := ctx.JSONReq.ID
go func() {
@ -57,6 +55,18 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
if err := ctx.WSConn.WriteRPCResponse(writeCtx, resp); err != nil {
env.Logger.Info("Can't write response (slow client)",
"to", addr, "subscriptionID", subscriptionID, "err", err)
if closeIfSlow {
var (
err = errors.New("subscription was cancelled (reason: slow client)")
resp = rpctypes.RPCServerError(subscriptionID, err)
)
if !ctx.WSConn.TryWriteRPCResponse(resp) {
env.Logger.Info("Can't write response (slow client)",
"to", addr, "subscriptionID", subscriptionID, "err", err)
}
return
}
}
case <-sub.Cancelled():
if sub.Err() != tmpubsub.ErrUnsubscribed {
@ -70,7 +80,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
err = fmt.Errorf("subscription was cancelled (reason: %s)", reason)
resp = rpctypes.RPCServerError(subscriptionID, err)
)
if ok := ctx.WSConn.TryWriteRPCResponse(resp); !ok {
if !ctx.WSConn.TryWriteRPCResponse(resp) {
env.Logger.Info("Can't write response (slow client)",
"to", addr, "subscriptionID", subscriptionID, "err", err)
}


+ 2
- 1
state/indexer/block/kv/kv_test.go View File

@ -6,11 +6,12 @@ import (
"testing"
"github.com/stretchr/testify/require"
db "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv"
"github.com/tendermint/tendermint/types"
db "github.com/tendermint/tm-db"
)
func TestBlockIndexer(t *testing.T) {


+ 1
- 0
state/indexer/block/kv/util.go View File

@ -6,6 +6,7 @@ import (
"strconv"
"github.com/google/orderedcode"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/types"
)


+ 1
- 0
state/indexer/sink/psql/psql.go View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/gogo/protobuf/proto"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/types"


+ 1
- 0
state/indexer/sink/psql/psql_test.go View File

@ -18,6 +18,7 @@ import (
"github.com/ory/dockertest/docker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/types"


+ 1
- 0
state/mocks/evidence_pool.go View File

@ -4,6 +4,7 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/state"
types "github.com/tendermint/tendermint/types"


+ 1
- 0
state/mocks/store.go View File

@ -4,6 +4,7 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/state"
tendermintstate "github.com/tendermint/tendermint/proto/tendermint/state"


+ 1
- 0
statesync/mocks/state_provider.go View File

@ -6,6 +6,7 @@ import (
context "context"
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/state"
types "github.com/tendermint/tendermint/types"


Loading…
Cancel
Save