From 936a65599047dda49acb14e3bbd89b2ef5aa96d1 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 20 Jun 2018 18:38:42 +0400 Subject: [PATCH] limit number of open connections Refs #1740 also, expose limit option for number concurrent streams for gRPC (unlimited by default) --- CHANGELOG.md | 16 +++++++- Gopkg.lock | 1 + Gopkg.toml | 4 ++ config/config.go | 27 +++++++++++-- config/toml.go | 14 +++++++ docs/metrics.md | 32 ++++++++------- docs/running-in-production.md | 37 +++++++++++++++++ docs/specification/configuration.md | 14 +++++++ lite/proxy/proxy.go | 5 ++- node/node.go | 16 ++++++-- rpc/grpc/client_server.go | 18 +++++++-- rpc/lib/rpc_test.go | 12 +++--- rpc/lib/server/http_server.go | 21 +++++++++- rpc/lib/server/http_server_test.go | 62 +++++++++++++++++++++++++++++ rpc/lib/test/main.go | 4 +- 15 files changed, 245 insertions(+), 38 deletions(-) create mode 100644 rpc/lib/server/http_server_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d307bbf6..9ffc95fac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,13 @@ ## TBD -FEATURES: -- [node] added metrics (served under /metrics using a Prometheus client; disabled by default) +BUG FIXES: + +- [rpc] limited number of HTTP/WebSocket connections + (`rpc.max_open_connections`) and gRPC connections + (`rpc.grpc_max_open_connections`). Check out [Running In + Production](https://tendermint.readthedocs.io/en/master/running-in-production.html) + guide if you want to increase them. ## 0.21.0 @@ -20,6 +25,13 @@ IMPROVEMENT - [pubsub] Set default capacity to 0 - [docs] Various improvements +FEATURES + +- [main] added metrics (served under `/metrics` using a Prometheus client; + disabled by default). See the new `instrumentation` section in the config and + [metrics](https://tendermint.readthedocs.io/projects/tools/en/v0.21.0/metrics.html) + guide. + BUG FIXES - [consensus] Fix an issue where we don't make blocks after `fast_sync` when `create_empty_blocks=false` diff --git a/Gopkg.lock b/Gopkg.lock index f2390c376..f9729ffab 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -362,6 +362,7 @@ "http2/hpack", "idna", "internal/timeseries", + "netutil", "trace" ] revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196" diff --git a/Gopkg.toml b/Gopkg.toml index d9b381337..823ef76eb 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -97,3 +97,7 @@ [[constraint]] name = "github.com/prometheus/client_golang" version = "0.8.0" + +[[constraint]] + branch = "master" + name = "golang.org/x/net" diff --git a/config/config.go b/config/config.go index 418638a60..0c7fd5df6 100644 --- a/config/config.go +++ b/config/config.go @@ -224,16 +224,37 @@ type RPCConfig struct { // NOTE: This server only supports /broadcast_tx_commit GRPCListenAddress string `mapstructure:"grpc_laddr"` + // Maximum number of simultaneous connections. + // Does not include RPC (HTTP&WebSocket) connections. See max_open_connections + // If you want to accept more significant number than the default, make sure + // you increase your OS limits. + // 0 - unlimited. + GRPCMaxOpenConnections int `mapstructure:"grpc_max_open_connections"` + // Activate unsafe RPC commands like /dial_persistent_peers and /unsafe_flush_mempool Unsafe bool `mapstructure:"unsafe"` + + // Maximum number of simultaneous connections (including WebSocket). + // Does not include gRPC connections. See grpc_max_open_connections + // If you want to accept more significant number than the default, make sure + // you increase your OS limits. + // 0 - unlimited. + MaxOpenConnections int `mapstructure:"max_open_connections"` } // DefaultRPCConfig returns a default configuration for the RPC server func DefaultRPCConfig() *RPCConfig { return &RPCConfig{ - ListenAddress: "tcp://0.0.0.0:26657", - GRPCListenAddress: "", - Unsafe: false, + ListenAddress: "tcp://0.0.0.0:26657", + + GRPCListenAddress: "", + GRPCMaxOpenConnections: 900, // no ipv4 + + Unsafe: false, + // should be < ({ulimit -Sn} - {MaxNumPeers} - {N of wal, db and other open files}) / 2 + // divided by 2 because 1 fd for ipv4, 1 fd - ipv6 + // 1024 - 50 - 50 = 924 / 2 = ~450 + MaxOpenConnections: 450, } } diff --git a/config/toml.go b/config/toml.go index 0bf9d5315..c0840e440 100644 --- a/config/toml.go +++ b/config/toml.go @@ -119,9 +119,23 @@ laddr = "{{ .RPC.ListenAddress }}" # NOTE: This server only supports /broadcast_tx_commit grpc_laddr = "{{ .RPC.GRPCListenAddress }}" +# Maximum number of simultaneous connections. +# Does not include RPC (HTTP&WebSocket) connections. See max_open_connections +# If you want to accept more significant number than the default, make sure +# you increase your OS limits. +# 0 - unlimited. +grpc_max_open_connections = {{ .RPC.GRPCMaxOpenConnections }} + # Activate unsafe RPC commands like /dial_seeds and /unsafe_flush_mempool unsafe = {{ .RPC.Unsafe }} +# Maximum number of simultaneous connections (including WebSocket). +# Does not include gRPC connections. See grpc_max_open_connections +# If you want to accept more significant number than the default, make sure +# you increase your OS limits. +# 0 - unlimited. +max_open_connections = {{ .RPC.MaxOpenConnections }} + ##### peer to peer configuration options ##### [p2p] diff --git a/docs/metrics.md b/docs/metrics.md index 1cd758b94..b469c6890 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -8,28 +8,30 @@ This functionality is disabled by default. To enable the Prometheus metrics, set `instrumentation.prometheus=true` if your config file. Metrics will be served under `/metrics` on 26660 port by default. Listen address can be changed in the config file (see -`prometheus_listen_addr`). +`instrumentation.prometheus_listen_addr`). ## List of available metrics The following metrics are available: +``` | Name | Type | Since | Description | | --------------------------------------- | ------- | --------- | ----------------------------------------------------------------------------- | -| consensus_height | Gauge | 0.20.1 | Height of the chain | -| consensus_validators | Gauge | 0.20.1 | Number of validators | -| consensus_validators_power | Gauge | 0.20.1 | Total voting power of all validators | -| consensus_missing_validators | Gauge | 0.20.1 | Number of validators who did not sign | -| consensus_missing_validators_power | Gauge | 0.20.1 | Total voting power of the missing validators | -| consensus_byzantine_validators | Gauge | 0.20.1 | Number of validators who tried to double sign | -| consensus_byzantine_validators_power | Gauge | 0.20.1 | Total voting power of the byzantine validators | -| consensus_block_interval_seconds | Histogram | 0.20.1 | Time between this and last block (Block.Header.Time) in seconds | -| consensus_rounds | Gauge | 0.20.1 | Number of rounds | -| consensus_num_txs | Gauge | 0.20.1 | Number of transactions | -| mempool_size | Gauge | 0.20.1 | Number of uncommitted transactions | -| consensus_total_txs | Gauge | 0.20.1 | Total number of transactions committed | -| consensus_block_size_bytes | Gauge | 0.20.1 | Block size in bytes | -| p2p_peers | Gauge | 0.20.1 | Number of peers node's connected to | +| consensus_height | Gauge | 0.21.0 | Height of the chain | +| consensus_validators | Gauge | 0.21.0 | Number of validators | +| consensus_validators_power | Gauge | 0.21.0 | Total voting power of all validators | +| consensus_missing_validators | Gauge | 0.21.0 | Number of validators who did not sign | +| consensus_missing_validators_power | Gauge | 0.21.0 | Total voting power of the missing validators | +| consensus_byzantine_validators | Gauge | 0.21.0 | Number of validators who tried to double sign | +| consensus_byzantine_validators_power | Gauge | 0.21.0 | Total voting power of the byzantine validators | +| consensus_block_interval_seconds | Histogram | 0.21.0 | Time between this and last block (Block.Header.Time) in seconds | +| consensus_rounds | Gauge | 0.21.0 | Number of rounds | +| consensus_num_txs | Gauge | 0.21.0 | Number of transactions | +| mempool_size | Gauge | 0.21.0 | Number of uncommitted transactions | +| consensus_total_txs | Gauge | 0.21.0 | Total number of transactions committed | +| consensus_block_size_bytes | Gauge | 0.21.0 | Block size in bytes | +| p2p_peers | Gauge | 0.21.0 | Number of peers node's connected to | +``` ## Useful queries diff --git a/docs/running-in-production.md b/docs/running-in-production.md index dc60c618d..3ceded499 100644 --- a/docs/running-in-production.md +++ b/docs/running-in-production.md @@ -78,6 +78,9 @@ We have a small tool, called `tm-monitor`, which outputs information from the endpoints above plus some statistics. The tool can be found [here](https://github.com/tendermint/tools/tree/master/tm-monitor). +Tendermint also can report and serve Prometheus metrics. See +[Metrics](./metrics.md). + ## What happens when my app dies? You are supposed to run Tendermint under a [process @@ -204,3 +207,37 @@ ranges](https://github.com/tendermint/tendermint/blob/27bd1deabe4ba6a2d9b463b8f3 This may not be the case for private networks, where your IP range is usually strictly limited and private. If that case, you need to set `addr_book_strict` to `false` (turn off). + +- `rpc.max_open_connections` + +By default, the number of simultaneous connections is limited because most OS +give you limited number of file descriptors. + +If you want to accept greater number of connections, you will need to increase +these limits. + +[Sysctls to tune the system to be able to open more connections](https://github.com/satori-com/tcpkali/blob/master/doc/tcpkali.man.md#sysctls-to-tune-the-system-to-be-able-to-open-more-connections) + +...for N connections, such as 50k: + +``` +kern.maxfiles=10000+2*N # BSD +kern.maxfilesperproc=100+2*N # BSD +kern.ipc.maxsockets=10000+2*N # BSD +fs.file-max=10000+2*N # Linux +net.ipv4.tcp_max_orphans=N # Linux + +# For load-generating clients. +net.ipv4.ip_local_port_range="10000 65535" # Linux. +net.inet.ip.portrange.first=10000 # BSD/Mac. +net.inet.ip.portrange.last=65535 # (Enough for N < 55535) +net.ipv4.tcp_tw_reuse=1 # Linux +net.inet.tcp.maxtcptw=2*N # BSD + +# If using netfilter on Linux: +net.netfilter.nf_conntrack_max=N +echo $((N/8)) > /sys/module/nf_conntrack/parameters/hashsize +``` + +The similar option exists for limiting the number of gRPC connections - +`rpc.grpc_max_open_connections`. diff --git a/docs/specification/configuration.md b/docs/specification/configuration.md index 08981c067..59de9767b 100644 --- a/docs/specification/configuration.md +++ b/docs/specification/configuration.md @@ -73,9 +73,23 @@ laddr = "tcp://0.0.0.0:26657" # NOTE: This server only supports /broadcast_tx_commit grpc_laddr = "" +# Maximum number of simultaneous connections. +# Does not include RPC (HTTP&WebSocket) connections. See max_open_connections +# If you want to accept more significant number than the default, make sure +# you increase your OS limits. +# 0 - unlimited. +grpc_max_open_connections = 900 + # Activate unsafe RPC commands like /dial_seeds and /unsafe_flush_mempool unsafe = false +# Maximum number of simultaneous connections (including WebSocket). +# Does not include gRPC connections. See grpc_max_open_connections +# If you want to accept more significant number than the default, make sure +# you increase your OS limits. +# 0 - unlimited. +max_open_connections = 450 + ##### peer to peer configuration options ##### [p2p] diff --git a/lite/proxy/proxy.go b/lite/proxy/proxy.go index fe10399dc..2f068f160 100644 --- a/lite/proxy/proxy.go +++ b/lite/proxy/proxy.go @@ -3,7 +3,7 @@ package proxy import ( "net/http" - "github.com/tendermint/go-amino" + amino "github.com/tendermint/go-amino" "github.com/tendermint/tmlibs/log" rpcclient "github.com/tendermint/tendermint/rpc/client" @@ -38,7 +38,8 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger) error core.SetLogger(logger) mux.HandleFunc(wsEndpoint, wm.WebsocketHandler) - _, err = rpc.StartHTTPServer(listenAddr, mux, logger) + // TODO: limit max number of open connections rpc.Config{MaxOpenConnections: X} + _, err = rpc.StartHTTPServer(listenAddr, mux, logger, rpc.Config{}) return err } diff --git a/node/node.go b/node/node.go index 896840488..fb94bfbf2 100644 --- a/node/node.go +++ b/node/node.go @@ -16,10 +16,10 @@ import ( dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" - "github.com/tendermint/tendermint/crypto" bc "github.com/tendermint/tendermint/blockchain" cfg "github.com/tendermint/tendermint/config" cs "github.com/tendermint/tendermint/consensus" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/evidence" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" @@ -562,7 +562,12 @@ func (n *Node) startRPC() ([]net.Listener, error) { wm.SetLogger(rpcLogger.With("protocol", "websocket")) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger) - listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger) + listener, err := rpcserver.StartHTTPServer( + listenAddr, + mux, + rpcLogger, + rpcserver.Config{MaxOpenConnections: n.config.RPC.MaxOpenConnections}, + ) if err != nil { return nil, err } @@ -572,7 +577,12 @@ func (n *Node) startRPC() ([]net.Listener, error) { // we expose a simplified api over grpc for convenience to app devs grpcListenAddr := n.config.RPC.GRPCListenAddress if grpcListenAddr != "" { - listener, err := grpccore.StartGRPCServer(grpcListenAddr) + listener, err := grpccore.StartGRPCServer( + grpcListenAddr, + grpccore.Config{ + MaxOpenConnections: n.config.RPC.GRPCMaxOpenConnections, + }, + ) if err != nil { return nil, err } diff --git a/rpc/grpc/client_server.go b/rpc/grpc/client_server.go index 80d736f57..c06f1cf46 100644 --- a/rpc/grpc/client_server.go +++ b/rpc/grpc/client_server.go @@ -6,13 +6,21 @@ import ( "strings" "time" + "golang.org/x/net/netutil" "google.golang.org/grpc" cmn "github.com/tendermint/tmlibs/common" ) -// Start the grpcServer in a go routine -func StartGRPCServer(protoAddr string) (net.Listener, error) { +// Config is an gRPC server configuration. +type Config struct { + MaxOpenConnections int +} + +// StartGRPCServer starts a new gRPC BroadcastAPIServer, listening on +// protoAddr, in a goroutine. Returns a listener and an error, if it fails to +// parse an address. +func StartGRPCServer(protoAddr string, config Config) (net.Listener, error) { parts := strings.SplitN(protoAddr, "://", 2) if len(parts) != 2 { return nil, fmt.Errorf("Invalid listen address for grpc server (did you forget a tcp:// prefix?) : %s", protoAddr) @@ -22,6 +30,9 @@ func StartGRPCServer(protoAddr string) (net.Listener, error) { if err != nil { return nil, err } + if config.MaxOpenConnections > 0 { + ln = netutil.LimitListener(ln, config.MaxOpenConnections) + } grpcServer := grpc.NewServer() RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{}) @@ -30,7 +41,8 @@ func StartGRPCServer(protoAddr string) (net.Listener, error) { return ln, nil } -// Start the client by dialing the server +// StartGRPCClient dials the gRPC server using protoAddr and returns a new +// BroadcastAPIClient. func StartGRPCClient(protoAddr string) BroadcastAPIClient { conn, err := grpc.Dial(protoAddr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) if err != nil { diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index f34b09f68..fe765473d 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -17,7 +17,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/tendermint/go-amino" + amino "github.com/tendermint/go-amino" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" @@ -123,7 +123,7 @@ func setup() { wm.SetLogger(tcpLogger) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) go func() { - _, err := server.StartHTTPServer(tcpAddr, mux, tcpLogger) + _, err := server.StartHTTPServer(tcpAddr, mux, tcpLogger, server.Config{}) if err != nil { panic(err) } @@ -136,7 +136,7 @@ func setup() { wm.SetLogger(unixLogger) mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler) go func() { - _, err := server.StartHTTPServer(unixAddr, mux2, unixLogger) + _, err := server.StartHTTPServer(unixAddr, mux2, unixLogger, server.Config{}) if err != nil { panic(err) } @@ -274,18 +274,18 @@ func TestServersAndClientsBasic(t *testing.T) { serverAddrs := [...]string{tcpAddr, unixAddr} for _, addr := range serverAddrs { cl1 := client.NewURIClient(addr) - fmt.Printf("=== testing server on %s using %v client", addr, cl1) + fmt.Printf("=== testing server on %s using URI client", addr) testWithHTTPClient(t, cl1) cl2 := client.NewJSONRPCClient(addr) - fmt.Printf("=== testing server on %s using %v client", addr, cl2) + fmt.Printf("=== testing server on %s using JSONRPC client", addr) testWithHTTPClient(t, cl2) cl3 := client.NewWSClient(addr, websocketEndpoint) cl3.SetLogger(log.TestingLogger()) err := cl3.Start() require.Nil(t, err) - fmt.Printf("=== testing server on %s using %v client", addr, cl3) + fmt.Printf("=== testing server on %s using WS client", addr) testWithWSClient(t, cl3) cl3.Stop() } diff --git a/rpc/lib/server/http_server.go b/rpc/lib/server/http_server.go index 3f54c61ef..2adab7f27 100644 --- a/rpc/lib/server/http_server.go +++ b/rpc/lib/server/http_server.go @@ -12,12 +12,20 @@ import ( "time" "github.com/pkg/errors" + "golang.org/x/net/netutil" types "github.com/tendermint/tendermint/rpc/lib/types" "github.com/tendermint/tmlibs/log" ) -func StartHTTPServer(listenAddr string, handler http.Handler, logger log.Logger) (listener net.Listener, err error) { +// Config is an RPC server configuration. +type Config struct { + MaxOpenConnections int +} + +// StartHTTPServer starts an HTTP server on listenAddr with the given handler. +// It wraps handler with RecoverAndLogHandler. +func StartHTTPServer(listenAddr string, handler http.Handler, logger log.Logger, config Config) (listener net.Listener, err error) { var proto, addr string parts := strings.SplitN(listenAddr, "://", 2) if len(parts) != 2 { @@ -30,6 +38,9 @@ func StartHTTPServer(listenAddr string, handler http.Handler, logger log.Logger) if err != nil { return nil, errors.Errorf("Failed to listen on %v: %v", listenAddr, err) } + if config.MaxOpenConnections > 0 { + listener = netutil.LimitListener(listener, config.MaxOpenConnections) + } go func() { err := http.Serve( @@ -41,7 +52,10 @@ func StartHTTPServer(listenAddr string, handler http.Handler, logger log.Logger) return listener, nil } -func StartHTTPAndTLSServer(listenAddr string, handler http.Handler, certFile, keyFile string, logger log.Logger) (listener net.Listener, err error) { +// StartHTTPAndTLSServer starts an HTTPS server on listenAddr with the given +// handler. +// It wraps handler with RecoverAndLogHandler. +func StartHTTPAndTLSServer(listenAddr string, handler http.Handler, certFile, keyFile string, logger log.Logger, config Config) (listener net.Listener, err error) { var proto, addr string parts := strings.SplitN(listenAddr, "://", 2) if len(parts) != 2 { @@ -54,6 +68,9 @@ func StartHTTPAndTLSServer(listenAddr string, handler http.Handler, certFile, ke if err != nil { return nil, errors.Errorf("Failed to listen on %v: %v", listenAddr, err) } + if config.MaxOpenConnections > 0 { + listener = netutil.LimitListener(listener, config.MaxOpenConnections) + } go func() { err := http.ServeTLS( diff --git a/rpc/lib/server/http_server_test.go b/rpc/lib/server/http_server_test.go new file mode 100644 index 000000000..22fd8a23b --- /dev/null +++ b/rpc/lib/server/http_server_test.go @@ -0,0 +1,62 @@ +package rpcserver + +import ( + "fmt" + "io" + "io/ioutil" + "net/http" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/tendermint/tmlibs/log" +) + +func TestMaxOpenConnections(t *testing.T) { + const max = 5 // max simultaneous connections + + // Start the server. + var open int32 + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + if n := atomic.AddInt32(&open, 1); n > int32(max) { + t.Errorf("%d open connections, want <= %d", n, max) + } + defer atomic.AddInt32(&open, -1) + time.Sleep(10 * time.Millisecond) + fmt.Fprint(w, "some body") + }) + l, err := StartHTTPServer("tcp://127.0.0.1:0", mux, log.TestingLogger(), Config{MaxOpenConnections: max}) + if err != nil { + t.Fatal(err) + } + defer l.Close() + + // Make N GET calls to the server. + attempts := max * 2 + var wg sync.WaitGroup + var failed int32 + for i := 0; i < attempts; i++ { + wg.Add(1) + go func() { + defer wg.Done() + c := http.Client{Timeout: 3 * time.Second} + r, err := c.Get("http://" + l.Addr().String()) + if err != nil { + t.Log(err) + atomic.AddInt32(&failed, 1) + return + } + defer r.Body.Close() + io.Copy(ioutil.Discard, r.Body) + }() + } + wg.Wait() + + // We expect some Gets to fail as the server's accept queue is filled, + // but most should succeed. + if int(failed) >= attempts/2 { + t.Errorf("%d requests failed within %d attempts", failed, attempts) + } +} diff --git a/rpc/lib/test/main.go b/rpc/lib/test/main.go index 604cbd3d8..4dd95ce05 100644 --- a/rpc/lib/test/main.go +++ b/rpc/lib/test/main.go @@ -5,7 +5,7 @@ import ( "net/http" "os" - "github.com/tendermint/go-amino" + amino "github.com/tendermint/go-amino" rpcserver "github.com/tendermint/tendermint/rpc/lib/server" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" @@ -28,7 +28,7 @@ func main() { cdc := amino.NewCodec() logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) rpcserver.RegisterRPCFuncs(mux, routes, cdc, logger) - _, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux, logger) + _, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux, logger, rpcserver.Config{}) if err != nil { cmn.Exit(err.Error()) }