Browse Source

Ensure WriteTimeout > TimeoutBroadcastTxCommit (#3443)

* Make sure config.TimeoutBroadcastTxCommit < rpcserver.WriteTimeout()

* remove redundant comment

* libs/rpc/http_server: move Read/WriteTimeout into Config

* increase defaults for read/write timeouts

Based on this article
https://www.digitalocean.com/community/tutorials/how-to-optimize-nginx-configuration

* WriteTimeout should be larger than TimeoutBroadcastTxCommit

* set a deadline for subscribing to txs

* extract duration into const

* add two changelog entries

* Update CHANGELOG_PENDING.md

Co-Authored-By: melekes <anton.kalyaev@gmail.com>

* Update CHANGELOG_PENDING.md

Co-Authored-By: melekes <anton.kalyaev@gmail.com>

* 12 -> 10

* changelog

* changelog
release/v0.31.0
Ismail Khoffi 5 years ago
committed by Ethan Buchman
parent
commit
1e3469789d
16 changed files with 123 additions and 46 deletions
  1. +4
    -2
      CHANGELOG.md
  2. +3
    -4
      config/config.go
  3. +3
    -0
      config/toml.go
  4. +3
    -0
      docs/tendermint-core/configuration.md
  5. +4
    -2
      lite/proxy/proxy.go
  6. +14
    -3
      node/node.go
  7. +1
    -2
      rpc/core/events.go
  8. +1
    -1
      rpc/core/mempool.go
  9. +6
    -3
      rpc/core/pipe.go
  10. +5
    -4
      rpc/lib/rpc_test.go
  11. +18
    -0
      rpc/lib/server/handlers.go
  12. +22
    -15
      rpc/lib/server/http_server.go
  13. +8
    -4
      rpc/lib/server/http_server_test.go
  14. +3
    -2
      rpc/lib/test/main.go
  15. +25
    -2
      rpc/lib/types/types.go
  16. +3
    -2
      tools/tm-monitor/rpc.go

+ 4
- 2
CHANGELOG.md View File

@ -11,8 +11,8 @@ This release is primarily about the new pubsub implementation, dubbed `pubsub 2.
like configurable limits on the number of active RPC subscriptions at a time (`max_subscription_clients`).
Pubsub 2.0 is an improved version of the older pubsub that is non-blocking and has a nicer API.
Note the improved pubsub API also resulted in some improvements to the HTTPClient interface and the API for WebSocket subscriptions.
This release also adds a configurable limit to the mempool size, `max_txs_bytes`, with
default 1GB, and includes many smaller improvements and bug-fixes.
This release also adds a configurable limit to the mempool size (`max_txs_bytes`, default 1GB)
and a configurable timeout for the `/broadcast_tx_commit` endpoint.
See the [v0.31.0
Milestone](https://github.com/tendermint/tendermint/milestone/19?closed=1) for
@ -30,6 +30,7 @@ program](https://hackerone.com/tendermint).
the subscription.
- [rpc] [\#3269](https://github.com/tendermint/tendermint/issues/2826) Limit number of unique clientIDs with open subscriptions. Configurable via `rpc.max_subscription_clients`
- [rpc] [\#3269](https://github.com/tendermint/tendermint/issues/2826) Limit number of unique queries a given client can subscribe to at once. Configurable via `rpc.max_subscriptions_per_client`.
- [rpc] [\#3435](https://github.com/tendermint/tendermint/issues/3435) Default ReadTimeout and WriteTimeout changed to 10s. WriteTimeout can increased by setting `rpc.timeout_broadcast_tx_commit` in the config.
- [rpc/client] [\#3269](https://github.com/tendermint/tendermint/issues/3269) Update `EventsClient` interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md). This includes `Subscribe`, `Unsubscribe`, and `UnsubscribeAll` methods.
* Apps
@ -43,6 +44,7 @@ program](https://hackerone.com/tendermint).
* TrapSignal should not be responsible for blocking thread of execution
- [libs/db] [\#3397](https://github.com/tendermint/tendermint/pull/3397) Add possibility to `Close()` `Batch` to prevent memory leak when using ClevelDB. (@Stumble)
- [types] [\#3354](https://github.com/tendermint/tendermint/issues/3354) Remove RoundState from EventDataRoundState
- [rpc] [\#3435](https://github.com/tendermint/tendermint/issues/3435) `StartHTTPServer` / `StartHTTPAndTLSServer` now require a Config (use `rpcserver.DefaultConfig`)
* Blockchain Protocol


+ 3
- 4
config/config.go View File

@ -7,7 +7,6 @@ import (
"time"
"github.com/pkg/errors"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
)
const (
@ -336,6 +335,9 @@ type RPCConfig struct {
MaxSubscriptionsPerClient int `mapstructure:"max_subscriptions_per_client"`
// How long to wait for a tx to be committed during /broadcast_tx_commit
// WARNING: Using a value larger than 10s will result in increasing the
// global HTTP write timeout, which applies to all connections and endpoints.
// See https://github.com/tendermint/tendermint/issues/3435
TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout_broadcast_tx_commit"`
}
@ -385,9 +387,6 @@ func (cfg *RPCConfig) ValidateBasic() error {
if cfg.TimeoutBroadcastTxCommit < 0 {
return errors.New("timeout_broadcast_tx_commit can't be negative")
}
if cfg.TimeoutBroadcastTxCommit > rpcserver.WriteTimeout {
return fmt.Errorf("timeout_broadcast_tx_commit can't be greater than rpc server's write timeout: %v", rpcserver.WriteTimeout)
}
return nil
}


+ 3
- 0
config/toml.go View File

@ -176,6 +176,9 @@ max_subscription_clients = {{ .RPC.MaxSubscriptionClients }}
max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }}
# How long to wait for a tx to be committed during /broadcast_tx_commit.
# WARNING: Using a value larger than 10s will result in increasing the
# global HTTP write timeout, which applies to all connections and endpoints.
# See https://github.com/tendermint/tendermint/issues/3435
timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}"
##### peer to peer configuration options #####


+ 3
- 0
docs/tendermint-core/configuration.md View File

@ -122,6 +122,9 @@ max_subscription_clients = 100
max_subscriptions_per_client = 5
# How long to wait for a tx to be committed during /broadcast_tx_commit.
# WARNING: Using a value larger than 10s will result in increasing the
# global HTTP write timeout, which applies to all connections and endpoints.
# See https://github.com/tendermint/tendermint/issues/3435
timeout_broadcast_tx_commit = "10s"
##### peer to peer configuration options #####


+ 4
- 2
lite/proxy/proxy.go View File

@ -45,11 +45,13 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe
core.SetLogger(logger)
mux.HandleFunc(wsEndpoint, wm.WebsocketHandler)
l, err := rpcserver.Listen(listenAddr, rpcserver.Config{MaxOpenConnections: maxOpenConnections})
config := rpcserver.DefaultConfig()
config.MaxOpenConnections = maxOpenConnections
l, err := rpcserver.Listen(listenAddr, config)
if err != nil {
return err
}
return rpcserver.StartHTTPServer(l, mux, logger)
return rpcserver.StartHTTPServer(l, mux, logger, config)
}
// RPCRoutes just routes everything to the given client, as if it were


+ 14
- 3
node/node.go View File

@ -689,9 +689,18 @@ func (n *Node) startRPC() ([]net.Listener, error) {
mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)
config := rpcserver.DefaultConfig()
config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
// If necessary adjust global WriteTimeout to ensure it's greater than
// TimeoutBroadcastTxCommit.
// See https://github.com/tendermint/tendermint/issues/3435
if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}
listener, err := rpcserver.Listen(
listenAddr,
rpcserver.Config{MaxOpenConnections: n.config.RPC.MaxOpenConnections},
config,
)
if err != nil {
return nil, err
@ -711,6 +720,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
listener,
rootHandler,
rpcLogger,
config,
)
listeners[i] = listener
}
@ -718,8 +728,9 @@ func (n *Node) startRPC() ([]net.Listener, error) {
// we expose a simplified api over grpc for convenience to app devs
grpcListenAddr := n.config.RPC.GRPCListenAddress
if grpcListenAddr != "" {
listener, err := rpcserver.Listen(
grpcListenAddr, rpcserver.Config{MaxOpenConnections: n.config.RPC.GRPCMaxOpenConnections})
config := rpcserver.DefaultConfig()
config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
listener, err := rpcserver.Listen(grpcListenAddr, config)
if err != nil {
return nil, err
}


+ 1
- 2
rpc/core/events.go View File

@ -105,8 +105,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
if err != nil {
return nil, errors.Wrap(err, "failed to parse query")
}
subCtx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout)
defer cancel()
sub, err := eventBus.Subscribe(subCtx, addr, q)
if err != nil {


+ 1
- 1
rpc/core/mempool.go View File

@ -197,7 +197,7 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc
}
// Subscribe to tx being committed in block.
subCtx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout)
defer cancel()
q := types.EventQueryTxFor(tx)
deliverTxSub, err := eventBus.Subscribe(subCtx, subscriber, q)


+ 6
- 3
rpc/core/pipe.go View File

@ -1,6 +1,8 @@
package core
import (
"time"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
@ -9,7 +11,6 @@ import (
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
@ -19,9 +20,11 @@ const (
// see README
defaultPerPage = 30
maxPerPage = 100
)
var subscribeTimeout = rpcserver.WriteTimeout / 2
// SubscribeTimeout is the maximum time we wait to subscribe for an event.
// must be less than the server's write timeout (see rpcserver.DefaultConfig)
SubscribeTimeout = 5 * time.Second
)
//----------------------------------------------
// These interfaces are used by RPC and must be thread safe


+ 5
- 4
rpc/lib/rpc_test.go View File

@ -121,11 +121,12 @@ func setup() {
wm := server.NewWebsocketManager(Routes, RoutesCdc, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second))
wm.SetLogger(tcpLogger)
mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
listener1, err := server.Listen(tcpAddr, server.Config{})
config := server.DefaultConfig()
listener1, err := server.Listen(tcpAddr, config)
if err != nil {
panic(err)
}
go server.StartHTTPServer(listener1, mux, tcpLogger)
go server.StartHTTPServer(listener1, mux, tcpLogger, config)
unixLogger := logger.With("socket", "unix")
mux2 := http.NewServeMux()
@ -133,11 +134,11 @@ func setup() {
wm = server.NewWebsocketManager(Routes, RoutesCdc)
wm.SetLogger(unixLogger)
mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
listener2, err := server.Listen(unixAddr, server.Config{})
listener2, err := server.Listen(unixAddr, config)
if err != nil {
panic(err)
}
go server.StartHTTPServer(listener2, mux2, unixLogger)
go server.StartHTTPServer(listener2, mux2, unixLogger, config)
// wait for servers to start
time.Sleep(time.Second * 2)


+ 18
- 0
rpc/lib/server/handlers.go View File

@ -2,6 +2,7 @@ package rpcserver
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
@ -439,6 +440,9 @@ type wsConnection struct {
// callback which is called upon disconnect
onDisconnect func(remoteAddr string)
ctx context.Context
cancel context.CancelFunc
}
// NewWSConnection wraps websocket.Conn.
@ -532,6 +536,10 @@ func (wsc *wsConnection) OnStop() {
if wsc.onDisconnect != nil {
wsc.onDisconnect(wsc.remoteAddr)
}
if wsc.ctx != nil {
wsc.cancel()
}
}
// GetRemoteAddr returns the remote address of the underlying connection.
@ -569,6 +577,16 @@ func (wsc *wsConnection) Codec() *amino.Codec {
return wsc.cdc
}
// Context returns the connection's context.
// The context is canceled when the client's connection closes.
func (wsc *wsConnection) Context() context.Context {
if wsc.ctx != nil {
return wsc.ctx
}
wsc.ctx, wsc.cancel = context.WithCancel(context.Background())
return wsc.ctx
}
// Read from the socket and subscribe to or unsubscribe from events
func (wsc *wsConnection) readRoutine() {
defer func() {


+ 22
- 15
rpc/lib/server/http_server.go View File

@ -18,9 +18,23 @@ import (
types "github.com/tendermint/tendermint/rpc/lib/types"
)
// Config is an RPC server configuration.
// Config is a RPC server configuration.
type Config struct {
// see netutil.LimitListener
MaxOpenConnections int
// mirrors http.Server#ReadTimeout
ReadTimeout time.Duration
// mirrors http.Server#WriteTimeout
WriteTimeout time.Duration
}
// DefaultConfig returns a default configuration.
func DefaultConfig() *Config {
return &Config{
MaxOpenConnections: 0, // unlimited
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
}
const (
@ -30,25 +44,17 @@ const (
// same as the net/http default
maxHeaderBytes = 1 << 20
// Timeouts for reading/writing to the http connection.
// Public so handlers can read them -
// /broadcast_tx_commit has it's own timeout, which should
// be less than the WriteTimeout here.
// TODO: use a config instead.
ReadTimeout = 3 * time.Second
WriteTimeout = 20 * time.Second
)
// StartHTTPServer takes a listener and starts an HTTP server with the given handler.
// It wraps handler with RecoverAndLogHandler.
// NOTE: This function blocks - you may want to call it in a go-routine.
func StartHTTPServer(listener net.Listener, handler http.Handler, logger log.Logger) error {
func StartHTTPServer(listener net.Listener, handler http.Handler, logger log.Logger, config *Config) error {
logger.Info(fmt.Sprintf("Starting RPC HTTP server on %s", listener.Addr()))
s := &http.Server{
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger),
ReadTimeout: ReadTimeout,
WriteTimeout: WriteTimeout,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
MaxHeaderBytes: maxHeaderBytes,
}
err := s.Serve(listener)
@ -64,13 +70,14 @@ func StartHTTPAndTLSServer(
handler http.Handler,
certFile, keyFile string,
logger log.Logger,
config *Config,
) error {
logger.Info(fmt.Sprintf("Starting RPC HTTPS server on %s (cert: %q, key: %q)",
listener.Addr(), certFile, keyFile))
s := &http.Server{
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger),
ReadTimeout: ReadTimeout,
WriteTimeout: WriteTimeout,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
MaxHeaderBytes: maxHeaderBytes,
}
err := s.ServeTLS(listener, certFile, keyFile)
@ -180,7 +187,7 @@ func (h maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Listen starts a new net.Listener on the given address.
// It returns an error if the address is invalid or the call to Listen() fails.
func Listen(addr string, config Config) (listener net.Listener, err error) {
func Listen(addr string, config *Config) (listener net.Listener, err error) {
parts := strings.SplitN(addr, "://", 2)
if len(parts) != 2 {
return nil, errors.Errorf(


+ 8
- 4
rpc/lib/server/http_server_test.go View File

@ -30,10 +30,12 @@ func TestMaxOpenConnections(t *testing.T) {
time.Sleep(10 * time.Millisecond)
fmt.Fprint(w, "some body")
})
l, err := Listen("tcp://127.0.0.1:0", Config{MaxOpenConnections: max})
config := DefaultConfig()
config.MaxOpenConnections = max
l, err := Listen("tcp://127.0.0.1:0", config)
require.NoError(t, err)
defer l.Close()
go StartHTTPServer(l, mux, log.TestingLogger())
go StartHTTPServer(l, mux, log.TestingLogger(), config)
// Make N GET calls to the server.
attempts := max * 2
@ -64,15 +66,17 @@ func TestMaxOpenConnections(t *testing.T) {
}
func TestStartHTTPAndTLSServer(t *testing.T) {
config := DefaultConfig()
config.MaxOpenConnections = 1
// set up fixtures
listenerAddr := "tcp://0.0.0.0:0"
listener, err := Listen(listenerAddr, Config{MaxOpenConnections: 1})
listener, err := Listen(listenerAddr, config)
require.NoError(t, err)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {})
// test failure
err = StartHTTPAndTLSServer(listener, mux, "", "", log.TestingLogger())
err = StartHTTPAndTLSServer(listener, mux, "", "", log.TestingLogger(), config)
require.IsType(t, (*os.PathError)(nil), err)
// TODO: test that starting the server can actually work


+ 3
- 2
rpc/lib/test/main.go View File

@ -36,9 +36,10 @@ func main() {
cmn.TrapSignal(logger, func() {})
rpcserver.RegisterRPCFuncs(mux, routes, cdc, logger)
listener, err := rpcserver.Listen("0.0.0.0:8008", rpcserver.Config{})
config := rpcserver.DefaultConfig()
listener, err := rpcserver.Listen("0.0.0.0:8008", config)
if err != nil {
cmn.Exit(err.Error())
}
rpcserver.StartHTTPServer(listener, mux, logger)
rpcserver.StartHTTPServer(listener, mux, logger, config)
}

+ 25
- 2
rpc/lib/types/types.go View File

@ -1,6 +1,7 @@
package rpctypes
import (
"context"
"encoding/json"
"fmt"
"net/http"
@ -243,6 +244,8 @@ type WSRPCConnection interface {
TryWriteRPCResponse(resp RPCResponse) bool
// Codec returns an Amino codec used.
Codec() *amino.Codec
// Context returns the connection's context.
Context() context.Context
}
// Context is the first parameter for all functions. It carries a json-rpc
@ -260,8 +263,12 @@ type Context struct {
HTTPReq *http.Request
}
// RemoteAddr returns either HTTPReq#RemoteAddr or result of the
// WSConn#GetRemoteAddr().
// RemoteAddr returns the remote address (usually a string "IP:port").
// If neither HTTPReq nor WSConn is set, an empty string is returned.
// HTTP:
// http.Request#RemoteAddr
// WS:
// result of GetRemoteAddr
func (ctx *Context) RemoteAddr() string {
if ctx.HTTPReq != nil {
return ctx.HTTPReq.RemoteAddr
@ -271,6 +278,22 @@ func (ctx *Context) RemoteAddr() string {
return ""
}
// Context returns the request's context.
// The returned context is always non-nil; it defaults to the background context.
// HTTP:
// The context is canceled when the client's connection closes, the request
// is canceled (with HTTP/2), or when the ServeHTTP method returns.
// WS:
// The context is canceled when the client's connections closes.
func (ctx *Context) Context() context.Context {
if ctx.HTTPReq != nil {
return ctx.HTTPReq.Context()
} else if ctx.WSConn != nil {
return ctx.WSConn.Context()
}
return context.Background()
}
//----------------------------------------
// SOCKETS


+ 3
- 2
tools/tm-monitor/rpc.go View File

@ -17,11 +17,12 @@ func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) net.List
wm := rpc.NewWebsocketManager(routes, nil)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpc.RegisterRPCFuncs(mux, routes, cdc, logger)
listener, err := rpc.Listen(listenAddr, rpc.Config{})
config := rpc.DefaultConfig()
listener, err := rpc.Listen(listenAddr, config)
if err != nil {
panic(err)
}
go rpc.StartHTTPServer(listener, mux, logger)
go rpc.StartHTTPServer(listener, mux, logger, config)
return listener
}


Loading…
Cancel
Save