diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 01b331e56..58dbd8b69 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -12,6 +12,8 @@ Special thanks to external contributors on this release: - CLI/RPC/Config + - [config] \#7276 rpc: Add experimental config params to allow for subscription buffer size control (@thanethomson). + - Apps - P2P Protocol diff --git a/config/config.go b/config/config.go index dfc4836da..3fe001e47 100644 --- a/config/config.go +++ b/config/config.go @@ -64,6 +64,9 @@ var ( defaultNodeKeyPath = filepath.Join(defaultConfigDir, defaultNodeKeyName) defaultAddrBookPath = filepath.Join(defaultConfigDir, defaultAddrBookName) + + minSubscriptionBufferSize = 100 + defaultSubscriptionBufferSize = 200 ) // Config defines the top level configuration for a Tendermint node @@ -496,6 +499,29 @@ type RPCConfig struct { // to the estimated maximum number of broadcast_tx_commit calls per block. MaxSubscriptionsPerClient int `mapstructure:"max-subscriptions-per-client"` + // The number of events that can be buffered per subscription before + // returning `ErrOutOfCapacity`. + SubscriptionBufferSize int `mapstructure:"experimental-subscription-buffer-size"` + + // The maximum number of responses that can be buffered per WebSocket + // client. If clients cannot read from the WebSocket endpoint fast enough, + // they will be disconnected, so increasing this parameter may reduce the + // chances of them being disconnected (but will cause the node to use more + // memory). + // + // Must be at least the same as `SubscriptionBufferSize`, otherwise + // connections may be dropped unnecessarily. + WebSocketWriteBufferSize int `mapstructure:"experimental-websocket-write-buffer-size"` + + // If a WebSocket client cannot read fast enough, at present we may + // silently drop events instead of generating an error or disconnecting the + // client. + // + // Enabling this parameter will cause the WebSocket connection to be closed + // instead if it cannot read fast enough, allowing for greater + // predictability in subscription behavior. + CloseOnSlowClient bool `mapstructure:"experimental-close-on-slow-client"` + // How long to wait for a tx to be committed during /broadcast_tx_commit // WARNING: Using a value larger than 10s will result in increasing the // global HTTP write timeout, which applies to all connections and endpoints. @@ -545,7 +571,9 @@ func DefaultRPCConfig() *RPCConfig { MaxSubscriptionClients: 100, MaxSubscriptionsPerClient: 5, + SubscriptionBufferSize: defaultSubscriptionBufferSize, TimeoutBroadcastTxCommit: 10 * time.Second, + WebSocketWriteBufferSize: defaultSubscriptionBufferSize, MaxBodyBytes: int64(1000000), // 1MB MaxHeaderBytes: 1 << 20, // same as the net/http default @@ -579,6 +607,18 @@ func (cfg *RPCConfig) ValidateBasic() error { if cfg.MaxSubscriptionsPerClient < 0 { return errors.New("max-subscriptions-per-client can't be negative") } + if cfg.SubscriptionBufferSize < minSubscriptionBufferSize { + return fmt.Errorf( + "experimental-subscription-buffer-size must be >= %d", + minSubscriptionBufferSize, + ) + } + if cfg.WebSocketWriteBufferSize < cfg.SubscriptionBufferSize { + return fmt.Errorf( + "experimental-websocket-write-buffer-size must be >= experimental-subscription-buffer-size (%d)", + cfg.SubscriptionBufferSize, + ) + } if cfg.TimeoutBroadcastTxCommit < 0 { return errors.New("timeout-broadcast-tx-commit can't be negative") } diff --git a/config/toml.go b/config/toml.go index 066eb17d7..0cb0b446f 100644 --- a/config/toml.go +++ b/config/toml.go @@ -236,6 +236,33 @@ max-subscription-clients = {{ .RPC.MaxSubscriptionClients }} # the estimated # maximum number of broadcast_tx_commit calls per block. max-subscriptions-per-client = {{ .RPC.MaxSubscriptionsPerClient }} +# Experimental parameter to specify the maximum number of events a node will +# buffer, per subscription, before returning an error and closing the +# subscription. Must be set to at least 100, but higher values will accommodate +# higher event throughput rates (and will use more memory). +experimental-subscription-buffer-size = {{ .RPC.SubscriptionBufferSize }} + +# Experimental parameter to specify the maximum number of RPC responses that +# can be buffered per WebSocket client. If clients cannot read from the +# WebSocket endpoint fast enough, they will be disconnected, so increasing this +# parameter may reduce the chances of them being disconnected (but will cause +# the node to use more memory). +# +# Must be at least the same as "experimental-subscription-buffer-size", +# otherwise connections could be dropped unnecessarily. This value should +# ideally be somewhat higher than "experimental-subscription-buffer-size" to +# accommodate non-subscription-related RPC responses. +experimental-websocket-write-buffer-size = {{ .RPC.WebSocketWriteBufferSize }} + +# If a WebSocket client cannot read fast enough, at present we may +# silently drop events instead of generating an error or disconnecting the +# client. +# +# Enabling this experimental parameter will cause the WebSocket connection to +# be closed instead if it cannot read fast enough, allowing for greater +# predictability in subscription behavior. +experimental-close-on-slow-client = {{ .RPC.CloseOnSlowClient }} + # How long to wait for a tx to be committed during /broadcast_tx_commit. # WARNING: Using a value larger than 10s will result in increasing the # global HTTP write timeout, which applies to all connections and endpoints. diff --git a/internal/rpc/core/events.go b/internal/rpc/core/events.go index 46d9ff6a7..a65e0146d 100644 --- a/internal/rpc/core/events.go +++ b/internal/rpc/core/events.go @@ -13,9 +13,6 @@ import ( ) const ( - // Buffer on the Tendermint (server) side to allow some slowness in clients. - subBufferSize = 100 - // maxQueryLength is the maximum length of a query string that will be // accepted. This is just a safety check to avoid outlandish queries. maxQueryLength = 512 @@ -44,11 +41,13 @@ func (env *Environment) Subscribe(ctx *rpctypes.Context, query string) (*coretyp subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout) defer cancel() - sub, err := env.EventBus.Subscribe(subCtx, addr, q, subBufferSize) + sub, err := env.EventBus.Subscribe(subCtx, addr, q, env.Config.SubscriptionBufferSize) if err != nil { return nil, err } + closeIfSlow := env.Config.CloseOnSlowClient + // Capture the current ID, since it can change in the future. subscriptionID := ctx.JSONReq.ID go func() { @@ -64,6 +63,18 @@ func (env *Environment) Subscribe(ctx *rpctypes.Context, query string) (*coretyp if err := ctx.WSConn.WriteRPCResponse(writeCtx, resp); err != nil { env.Logger.Info("Can't write response (slow client)", "to", addr, "subscriptionID", subscriptionID, "err", err) + + if closeIfSlow { + var ( + err = errors.New("subscription was canceled (reason: slow client)") + resp = rpctypes.RPCServerError(subscriptionID, err) + ) + if !ctx.WSConn.TryWriteRPCResponse(resp) { + env.Logger.Info("Can't write response (slow client)", + "to", addr, "subscriptionID", subscriptionID, "err", err) + } + return + } } case <-sub.Canceled(): if sub.Err() != tmpubsub.ErrUnsubscribed { diff --git a/libs/pubsub/subscription.go b/libs/pubsub/subscription.go index 40b84711e..16c50e4d6 100644 --- a/libs/pubsub/subscription.go +++ b/libs/pubsub/subscription.go @@ -15,7 +15,7 @@ var ( // ErrOutOfCapacity is returned by Err when a client is not pulling messages // fast enough. Note the client's subscription will be terminated. - ErrOutOfCapacity = errors.New("client is not pulling messages fast enough") + ErrOutOfCapacity = errors.New("internal subscription event buffer is out of capacity") ) // A Subscription represents a client subscription for a particular query and diff --git a/node/node.go b/node/node.go index fff1d50ce..81ba1f2c7 100644 --- a/node/node.go +++ b/node/node.go @@ -889,6 +889,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) { } }), rpcserver.ReadLimit(cfg.MaxBodyBytes), + rpcserver.WriteChanCapacity(n.config.RPC.WebSocketWriteBufferSize), ) wm.SetLogger(wmLogger) mux.HandleFunc("/websocket", wm.WebsocketHandler)