Browse Source

Merge pull request #1781 from tendermint/1740-node-crashes-when-too-many-rpc-connections

limit number of open connections
pull/1813/head
Alexander Simmerl 7 years ago
committed by GitHub
parent
commit
f62d6651e3
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 280 additions and 43 deletions
  1. +14
    -2
      CHANGELOG.md
  2. +1
    -0
      Gopkg.lock
  3. +4
    -0
      Gopkg.toml
  4. +23
    -3
      config/config.go
  5. +14
    -0
      config/toml.go
  6. +17
    -15
      docs/metrics.md
  7. +37
    -0
      docs/running-in-production.md
  8. +14
    -0
      docs/specification/configuration.md
  9. +3
    -2
      lite/proxy/proxy.go
  10. +13
    -3
      node/node.go
  11. +15
    -3
      rpc/grpc/client_server.go
  12. +6
    -6
      rpc/lib/rpc_test.go
  13. +55
    -7
      rpc/lib/server/http_server.go
  14. +62
    -0
      rpc/lib/server/http_server_test.go
  15. +2
    -2
      rpc/lib/test/main.go

+ 14
- 2
CHANGELOG.md View File

@ -2,8 +2,13 @@
## TBD
FEATURES:
- [node] added metrics (served under /metrics using a Prometheus client; disabled by default)
BUG FIXES:
- [rpc] limited number of HTTP/WebSocket connections
(`rpc.max_open_connections`) and gRPC connections
(`rpc.grpc_max_open_connections`). Check out [Running In
Production](https://tendermint.readthedocs.io/en/master/running-in-production.html)
guide if you want to increase them.
## 0.21.0
@ -20,6 +25,13 @@ IMPROVEMENT
- [pubsub] Set default capacity to 0
- [docs] Various improvements
FEATURES
- [main] added metrics (served under `/metrics` using a Prometheus client;
disabled by default). See the new `instrumentation` section in the config and
[metrics](https://tendermint.readthedocs.io/projects/tools/en/v0.21.0/metrics.html)
guide.
BUG FIXES
- [consensus] Fix an issue where we don't make blocks after `fast_sync` when `create_empty_blocks=false`


+ 1
- 0
Gopkg.lock View File

@ -362,6 +362,7 @@
"http2/hpack",
"idna",
"internal/timeseries",
"netutil",
"trace"
]
revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196"


+ 4
- 0
Gopkg.toml View File

@ -93,3 +93,7 @@
[[constraint]]
name = "github.com/prometheus/client_golang"
version = "0.8.0"
[[constraint]]
branch = "master"
name = "golang.org/x/net"

+ 23
- 3
config/config.go View File

@ -224,16 +224,36 @@ type RPCConfig struct {
// NOTE: This server only supports /broadcast_tx_commit
GRPCListenAddress string `mapstructure:"grpc_laddr"`
// Maximum number of simultaneous connections.
// Does not include RPC (HTTP&WebSocket) connections. See max_open_connections
// If you want to accept more significant number than the default, make sure
// you increase your OS limits.
// 0 - unlimited.
GRPCMaxOpenConnections int `mapstructure:"grpc_max_open_connections"`
// Activate unsafe RPC commands like /dial_persistent_peers and /unsafe_flush_mempool
Unsafe bool `mapstructure:"unsafe"`
// Maximum number of simultaneous connections (including WebSocket).
// Does not include gRPC connections. See grpc_max_open_connections
// If you want to accept more significant number than the default, make sure
// you increase your OS limits.
// 0 - unlimited.
MaxOpenConnections int `mapstructure:"max_open_connections"`
}
// DefaultRPCConfig returns a default configuration for the RPC server
func DefaultRPCConfig() *RPCConfig {
return &RPCConfig{
ListenAddress: "tcp://0.0.0.0:26657",
GRPCListenAddress: "",
Unsafe: false,
ListenAddress: "tcp://0.0.0.0:26657",
GRPCListenAddress: "",
GRPCMaxOpenConnections: 900, // no ipv4
Unsafe: false,
// should be < {ulimit -Sn} - {MaxNumPeers} - {N of wal, db and other open files}
// 1024 - 50 - 50 = 924 = ~900
MaxOpenConnections: 900,
}
}


+ 14
- 0
config/toml.go View File

@ -119,9 +119,23 @@ laddr = "{{ .RPC.ListenAddress }}"
# NOTE: This server only supports /broadcast_tx_commit
grpc_laddr = "{{ .RPC.GRPCListenAddress }}"
# Maximum number of simultaneous connections.
# Does not include RPC (HTTP&WebSocket) connections. See max_open_connections
# If you want to accept more significant number than the default, make sure
# you increase your OS limits.
# 0 - unlimited.
grpc_max_open_connections = {{ .RPC.GRPCMaxOpenConnections }}
# Activate unsafe RPC commands like /dial_seeds and /unsafe_flush_mempool
unsafe = {{ .RPC.Unsafe }}
# Maximum number of simultaneous connections (including WebSocket).
# Does not include gRPC connections. See grpc_max_open_connections
# If you want to accept more significant number than the default, make sure
# you increase your OS limits.
# 0 - unlimited.
max_open_connections = {{ .RPC.MaxOpenConnections }}
##### peer to peer configuration options #####
[p2p]


+ 17
- 15
docs/metrics.md View File

@ -8,28 +8,30 @@ This functionality is disabled by default.
To enable the Prometheus metrics, set `instrumentation.prometheus=true` if your
config file. Metrics will be served under `/metrics` on 26660 port by default.
Listen address can be changed in the config file (see
`prometheus_listen_addr`).
`instrumentation.prometheus_listen_addr`).
## List of available metrics
The following metrics are available:
```
| Name | Type | Since | Description |
| --------------------------------------- | ------- | --------- | ----------------------------------------------------------------------------- |
| consensus_height | Gauge | 0.20.1 | Height of the chain |
| consensus_validators | Gauge | 0.20.1 | Number of validators |
| consensus_validators_power | Gauge | 0.20.1 | Total voting power of all validators |
| consensus_missing_validators | Gauge | 0.20.1 | Number of validators who did not sign |
| consensus_missing_validators_power | Gauge | 0.20.1 | Total voting power of the missing validators |
| consensus_byzantine_validators | Gauge | 0.20.1 | Number of validators who tried to double sign |
| consensus_byzantine_validators_power | Gauge | 0.20.1 | Total voting power of the byzantine validators |
| consensus_block_interval_seconds | Histogram | 0.20.1 | Time between this and last block (Block.Header.Time) in seconds |
| consensus_rounds | Gauge | 0.20.1 | Number of rounds |
| consensus_num_txs | Gauge | 0.20.1 | Number of transactions |
| mempool_size | Gauge | 0.20.1 | Number of uncommitted transactions |
| consensus_total_txs | Gauge | 0.20.1 | Total number of transactions committed |
| consensus_block_size_bytes | Gauge | 0.20.1 | Block size in bytes |
| p2p_peers | Gauge | 0.20.1 | Number of peers node's connected to |
| consensus_height | Gauge | 0.21.0 | Height of the chain |
| consensus_validators | Gauge | 0.21.0 | Number of validators |
| consensus_validators_power | Gauge | 0.21.0 | Total voting power of all validators |
| consensus_missing_validators | Gauge | 0.21.0 | Number of validators who did not sign |
| consensus_missing_validators_power | Gauge | 0.21.0 | Total voting power of the missing validators |
| consensus_byzantine_validators | Gauge | 0.21.0 | Number of validators who tried to double sign |
| consensus_byzantine_validators_power | Gauge | 0.21.0 | Total voting power of the byzantine validators |
| consensus_block_interval_seconds | Histogram | 0.21.0 | Time between this and last block (Block.Header.Time) in seconds |
| consensus_rounds | Gauge | 0.21.0 | Number of rounds |
| consensus_num_txs | Gauge | 0.21.0 | Number of transactions |
| mempool_size | Gauge | 0.21.0 | Number of uncommitted transactions |
| consensus_total_txs | Gauge | 0.21.0 | Total number of transactions committed |
| consensus_block_size_bytes | Gauge | 0.21.0 | Block size in bytes |
| p2p_peers | Gauge | 0.21.0 | Number of peers node's connected to |
```
## Useful queries


+ 37
- 0
docs/running-in-production.md View File

@ -78,6 +78,9 @@ We have a small tool, called `tm-monitor`, which outputs information from
the endpoints above plus some statistics. The tool can be found
[here](https://github.com/tendermint/tools/tree/master/tm-monitor).
Tendermint also can report and serve Prometheus metrics. See
[Metrics](./metrics.md).
## What happens when my app dies?
You are supposed to run Tendermint under a [process
@ -204,3 +207,37 @@ ranges](https://github.com/tendermint/tendermint/blob/27bd1deabe4ba6a2d9b463b8f3
This may not be the case for private networks, where your IP range is usually
strictly limited and private. If that case, you need to set `addr_book_strict`
to `false` (turn off).
- `rpc.max_open_connections`
By default, the number of simultaneous connections is limited because most OS
give you limited number of file descriptors.
If you want to accept greater number of connections, you will need to increase
these limits.
[Sysctls to tune the system to be able to open more connections](https://github.com/satori-com/tcpkali/blob/master/doc/tcpkali.man.md#sysctls-to-tune-the-system-to-be-able-to-open-more-connections)
...for N connections, such as 50k:
```
kern.maxfiles=10000+2*N # BSD
kern.maxfilesperproc=100+2*N # BSD
kern.ipc.maxsockets=10000+2*N # BSD
fs.file-max=10000+2*N # Linux
net.ipv4.tcp_max_orphans=N # Linux
# For load-generating clients.
net.ipv4.ip_local_port_range="10000 65535" # Linux.
net.inet.ip.portrange.first=10000 # BSD/Mac.
net.inet.ip.portrange.last=65535 # (Enough for N < 55535)
net.ipv4.tcp_tw_reuse=1 # Linux
net.inet.tcp.maxtcptw=2*N # BSD
# If using netfilter on Linux:
net.netfilter.nf_conntrack_max=N
echo $((N/8)) > /sys/module/nf_conntrack/parameters/hashsize
```
The similar option exists for limiting the number of gRPC connections -
`rpc.grpc_max_open_connections`.

+ 14
- 0
docs/specification/configuration.md View File

@ -73,9 +73,23 @@ laddr = "tcp://0.0.0.0:26657"
# NOTE: This server only supports /broadcast_tx_commit
grpc_laddr = ""
# Maximum number of simultaneous connections.
# Does not include RPC (HTTP&WebSocket) connections. See max_open_connections
# If you want to accept more significant number than the default, make sure
# you increase your OS limits.
# 0 - unlimited.
grpc_max_open_connections = 900
# Activate unsafe RPC commands like /dial_seeds and /unsafe_flush_mempool
unsafe = false
# Maximum number of simultaneous connections (including WebSocket).
# Does not include gRPC connections. See grpc_max_open_connections
# If you want to accept more significant number than the default, make sure
# you increase your OS limits.
# 0 - unlimited.
max_open_connections = 450
##### peer to peer configuration options #####
[p2p]


+ 3
- 2
lite/proxy/proxy.go View File

@ -3,7 +3,7 @@ package proxy
import (
"net/http"
"github.com/tendermint/go-amino"
amino "github.com/tendermint/go-amino"
"github.com/tendermint/tmlibs/log"
rpcclient "github.com/tendermint/tendermint/rpc/client"
@ -38,7 +38,8 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger) error
core.SetLogger(logger)
mux.HandleFunc(wsEndpoint, wm.WebsocketHandler)
_, err = rpc.StartHTTPServer(listenAddr, mux, logger)
// TODO: limit max number of open connections rpc.Config{MaxOpenConnections: X}
_, err = rpc.StartHTTPServer(listenAddr, mux, logger, rpc.Config{})
return err
}


+ 13
- 3
node/node.go View File

@ -16,10 +16,10 @@ import (
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
"github.com/tendermint/tendermint/crypto"
bc "github.com/tendermint/tendermint/blockchain"
cfg "github.com/tendermint/tendermint/config"
cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/evidence"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
@ -562,7 +562,12 @@ func (n *Node) startRPC() ([]net.Listener, error) {
wm.SetLogger(rpcLogger.With("protocol", "websocket"))
mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)
listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger)
listener, err := rpcserver.StartHTTPServer(
listenAddr,
mux,
rpcLogger,
rpcserver.Config{MaxOpenConnections: n.config.RPC.MaxOpenConnections},
)
if err != nil {
return nil, err
}
@ -572,7 +577,12 @@ func (n *Node) startRPC() ([]net.Listener, error) {
// we expose a simplified api over grpc for convenience to app devs
grpcListenAddr := n.config.RPC.GRPCListenAddress
if grpcListenAddr != "" {
listener, err := grpccore.StartGRPCServer(grpcListenAddr)
listener, err := grpccore.StartGRPCServer(
grpcListenAddr,
grpccore.Config{
MaxOpenConnections: n.config.RPC.GRPCMaxOpenConnections,
},
)
if err != nil {
return nil, err
}


+ 15
- 3
rpc/grpc/client_server.go View File

@ -6,13 +6,21 @@ import (
"strings"
"time"
"golang.org/x/net/netutil"
"google.golang.org/grpc"
cmn "github.com/tendermint/tmlibs/common"
)
// Start the grpcServer in a go routine
func StartGRPCServer(protoAddr string) (net.Listener, error) {
// Config is an gRPC server configuration.
type Config struct {
MaxOpenConnections int
}
// StartGRPCServer starts a new gRPC BroadcastAPIServer, listening on
// protoAddr, in a goroutine. Returns a listener and an error, if it fails to
// parse an address.
func StartGRPCServer(protoAddr string, config Config) (net.Listener, error) {
parts := strings.SplitN(protoAddr, "://", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("Invalid listen address for grpc server (did you forget a tcp:// prefix?) : %s", protoAddr)
@ -22,6 +30,9 @@ func StartGRPCServer(protoAddr string) (net.Listener, error) {
if err != nil {
return nil, err
}
if config.MaxOpenConnections > 0 {
ln = netutil.LimitListener(ln, config.MaxOpenConnections)
}
grpcServer := grpc.NewServer()
RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{})
@ -30,7 +41,8 @@ func StartGRPCServer(protoAddr string) (net.Listener, error) {
return ln, nil
}
// Start the client by dialing the server
// StartGRPCClient dials the gRPC server using protoAddr and returns a new
// BroadcastAPIClient.
func StartGRPCClient(protoAddr string) BroadcastAPIClient {
conn, err := grpc.Dial(protoAddr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
if err != nil {


+ 6
- 6
rpc/lib/rpc_test.go View File

@ -17,7 +17,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/go-amino"
amino "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
@ -123,7 +123,7 @@ func setup() {
wm.SetLogger(tcpLogger)
mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
go func() {
_, err := server.StartHTTPServer(tcpAddr, mux, tcpLogger)
_, err := server.StartHTTPServer(tcpAddr, mux, tcpLogger, server.Config{})
if err != nil {
panic(err)
}
@ -136,7 +136,7 @@ func setup() {
wm.SetLogger(unixLogger)
mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
go func() {
_, err := server.StartHTTPServer(unixAddr, mux2, unixLogger)
_, err := server.StartHTTPServer(unixAddr, mux2, unixLogger, server.Config{})
if err != nil {
panic(err)
}
@ -274,18 +274,18 @@ func TestServersAndClientsBasic(t *testing.T) {
serverAddrs := [...]string{tcpAddr, unixAddr}
for _, addr := range serverAddrs {
cl1 := client.NewURIClient(addr)
fmt.Printf("=== testing server on %s using %v client", addr, cl1)
fmt.Printf("=== testing server on %s using URI client", addr)
testWithHTTPClient(t, cl1)
cl2 := client.NewJSONRPCClient(addr)
fmt.Printf("=== testing server on %s using %v client", addr, cl2)
fmt.Printf("=== testing server on %s using JSONRPC client", addr)
testWithHTTPClient(t, cl2)
cl3 := client.NewWSClient(addr, websocketEndpoint)
cl3.SetLogger(log.TestingLogger())
err := cl3.Start()
require.Nil(t, err)
fmt.Printf("=== testing server on %s using %v client", addr, cl3)
fmt.Printf("=== testing server on %s using WS client", addr)
testWithWSClient(t, cl3)
cl3.Stop()
}


+ 55
- 7
rpc/lib/server/http_server.go View File

@ -12,16 +12,32 @@ import (
"time"
"github.com/pkg/errors"
"golang.org/x/net/netutil"
types "github.com/tendermint/tendermint/rpc/lib/types"
"github.com/tendermint/tmlibs/log"
)
func StartHTTPServer(listenAddr string, handler http.Handler, logger log.Logger) (listener net.Listener, err error) {
// Config is an RPC server configuration.
type Config struct {
MaxOpenConnections int
}
// StartHTTPServer starts an HTTP server on listenAddr with the given handler.
// It wraps handler with RecoverAndLogHandler.
func StartHTTPServer(
listenAddr string,
handler http.Handler,
logger log.Logger,
config Config,
) (listener net.Listener, err error) {
var proto, addr string
parts := strings.SplitN(listenAddr, "://", 2)
if len(parts) != 2 {
return nil, errors.Errorf("Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", listenAddr)
return nil, errors.Errorf(
"Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)",
listenAddr,
)
}
proto, addr = parts[0], parts[1]
@ -30,6 +46,9 @@ func StartHTTPServer(listenAddr string, handler http.Handler, logger log.Logger)
if err != nil {
return nil, errors.Errorf("Failed to listen on %v: %v", listenAddr, err)
}
if config.MaxOpenConnections > 0 {
listener = netutil.LimitListener(listener, config.MaxOpenConnections)
}
go func() {
err := http.Serve(
@ -41,19 +60,41 @@ func StartHTTPServer(listenAddr string, handler http.Handler, logger log.Logger)
return listener, nil
}
func StartHTTPAndTLSServer(listenAddr string, handler http.Handler, certFile, keyFile string, logger log.Logger) (listener net.Listener, err error) {
// StartHTTPAndTLSServer starts an HTTPS server on listenAddr with the given
// handler.
// It wraps handler with RecoverAndLogHandler.
func StartHTTPAndTLSServer(
listenAddr string,
handler http.Handler,
certFile, keyFile string,
logger log.Logger,
config Config,
) (listener net.Listener, err error) {
var proto, addr string
parts := strings.SplitN(listenAddr, "://", 2)
if len(parts) != 2 {
return nil, errors.Errorf("Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", listenAddr)
return nil, errors.Errorf(
"Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)",
listenAddr,
)
}
proto, addr = parts[0], parts[1]
logger.Info(fmt.Sprintf("Starting RPC HTTPS server on %s (cert: %q, key: %q)", listenAddr, certFile, keyFile))
logger.Info(
fmt.Sprintf(
"Starting RPC HTTPS server on %s (cert: %q, key: %q)",
listenAddr,
certFile,
keyFile,
),
)
listener, err = net.Listen(proto, addr)
if err != nil {
return nil, errors.Errorf("Failed to listen on %v: %v", listenAddr, err)
}
if config.MaxOpenConnections > 0 {
listener = netutil.LimitListener(listener, config.MaxOpenConnections)
}
go func() {
err := http.ServeTLS(
@ -67,7 +108,11 @@ func StartHTTPAndTLSServer(listenAddr string, handler http.Handler, certFile, ke
return listener, nil
}
func WriteRPCResponseHTTPError(w http.ResponseWriter, httpCode int, res types.RPCResponse) {
func WriteRPCResponseHTTPError(
w http.ResponseWriter,
httpCode int,
res types.RPCResponse,
) {
jsonBytes, err := json.MarshalIndent(res, "", " ")
if err != nil {
panic(err)
@ -117,7 +162,10 @@ func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler
WriteRPCResponseHTTP(rww, res)
} else {
// For the rest,
logger.Error("Panic in RPC HTTP handler", "err", e, "stack", string(debug.Stack()))
logger.Error(
"Panic in RPC HTTP handler", "err", e, "stack",
string(debug.Stack()),
)
rww.WriteHeader(http.StatusInternalServerError)
WriteRPCResponseHTTP(rww, types.RPCInternalError("", e.(error)))
}


+ 62
- 0
rpc/lib/server/http_server_test.go View File

@ -0,0 +1,62 @@
package rpcserver
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/tendermint/tmlibs/log"
)
func TestMaxOpenConnections(t *testing.T) {
const max = 5 // max simultaneous connections
// Start the server.
var open int32
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if n := atomic.AddInt32(&open, 1); n > int32(max) {
t.Errorf("%d open connections, want <= %d", n, max)
}
defer atomic.AddInt32(&open, -1)
time.Sleep(10 * time.Millisecond)
fmt.Fprint(w, "some body")
})
l, err := StartHTTPServer("tcp://127.0.0.1:0", mux, log.TestingLogger(), Config{MaxOpenConnections: max})
if err != nil {
t.Fatal(err)
}
defer l.Close()
// Make N GET calls to the server.
attempts := max * 2
var wg sync.WaitGroup
var failed int32
for i := 0; i < attempts; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c := http.Client{Timeout: 3 * time.Second}
r, err := c.Get("http://" + l.Addr().String())
if err != nil {
t.Log(err)
atomic.AddInt32(&failed, 1)
return
}
defer r.Body.Close()
io.Copy(ioutil.Discard, r.Body)
}()
}
wg.Wait()
// We expect some Gets to fail as the server's accept queue is filled,
// but most should succeed.
if int(failed) >= attempts/2 {
t.Errorf("%d requests failed within %d attempts", failed, attempts)
}
}

+ 2
- 2
rpc/lib/test/main.go View File

@ -5,7 +5,7 @@ import (
"net/http"
"os"
"github.com/tendermint/go-amino"
amino "github.com/tendermint/go-amino"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
@ -28,7 +28,7 @@ func main() {
cdc := amino.NewCodec()
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
rpcserver.RegisterRPCFuncs(mux, routes, cdc, logger)
_, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux, logger)
_, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux, logger, rpcserver.Config{})
if err != nil {
cmn.Exit(err.Error())
}


Loading…
Cancel
Save