Browse Source

R4R: Add timeouts to http servers (#2780)

* Replaces our current http servers where connections stay open forever with ones with timeouts to prevent file descriptor exhaustion

* Use the correct handler

* Put in go routines

* fix err

* changelog

* rpc: export Read/WriteTimeout

The `broadcast_tx_commit` endpoint has it's own timeout.
If this is longer than the http server's WriteTimeout, the
user will receive an error. Here, we export the WriteTimeout
and set the broadcast_tx_commit timeout to be less than it.

In the future, we should use a config struct for the timeouts
to avoid the need to export. The broadcast_tx_commit timeout
may also become configurable, but we must check that it's less
than the server's WriteTimeout.
pull/2873/head
Zaki Manian 6 years ago
committed by Ethan Buchman
parent
commit
e6fc10faf6
4 changed files with 33 additions and 15 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +3
    -1
      rpc/core/mempool.go
  3. +2
    -3
      rpc/core/pipe.go
  4. +27
    -11
      rpc/lib/server/http_server.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -34,6 +34,7 @@ program](https://hackerone.com/tendermint).
### IMPROVEMENTS: ### IMPROVEMENTS:
- [rpc] \#2780 Add read and write timeouts to HTTP servers
- [state] \#2765 Make "Update to validators" msg value pretty (@danil-lashin) - [state] \#2765 Make "Update to validators" msg value pretty (@danil-lashin)
- [p2p] \#2857 "Send failed" is logged at debug level instead of error. - [p2p] \#2857 "Send failed" is logged at debug level instead of error.


+ 3
- 1
rpc/core/mempool.go View File

@ -9,6 +9,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -194,7 +195,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
} }
// Wait for the tx to be included in a block or timeout. // Wait for the tx to be included in a block or timeout.
var deliverTxTimeout = 10 * time.Second // TODO: configurable?
// TODO: configurable?
var deliverTxTimeout = rpcserver.WriteTimeout / 2
select { select {
case deliverTxResMsg := <-deliverTxResCh: // The tx was included in a block. case deliverTxResMsg := <-deliverTxResCh: // The tx was included in a block.
deliverTxRes := deliverTxResMsg.(types.EventDataTx) deliverTxRes := deliverTxResMsg.(types.EventDataTx)


+ 2
- 3
rpc/core/pipe.go View File

@ -1,8 +1,6 @@
package core package core
import ( import (
"time"
"github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/consensus"
crypto "github.com/tendermint/tendermint/crypto" crypto "github.com/tendermint/tendermint/crypto"
dbm "github.com/tendermint/tendermint/libs/db" dbm "github.com/tendermint/tendermint/libs/db"
@ -10,6 +8,7 @@ import (
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -21,7 +20,7 @@ const (
maxPerPage = 100 maxPerPage = 100
) )
var subscribeTimeout = 5 * time.Second
var subscribeTimeout = rpcserver.WriteTimeout / 2
//---------------------------------------------- //----------------------------------------------
// These interfaces are used by RPC and must be thread safe // These interfaces are used by RPC and must be thread safe


+ 27
- 11
rpc/lib/server/http_server.go View File

@ -27,6 +27,17 @@ const (
// maxBodyBytes controls the maximum number of bytes the // maxBodyBytes controls the maximum number of bytes the
// server will read parsing the request body. // server will read parsing the request body.
maxBodyBytes = int64(1000000) // 1MB maxBodyBytes = int64(1000000) // 1MB
// same as the net/http default
maxHeaderBytes = 1 << 20
// Timeouts for reading/writing to the http connection.
// Public so handlers can read them -
// /broadcast_tx_commit has it's own timeout, which should
// be less than the WriteTimeout here.
// TODO: use a config instead.
ReadTimeout = 3 * time.Second
WriteTimeout = 20 * time.Second
) )
// StartHTTPServer takes a listener and starts an HTTP server with the given handler. // StartHTTPServer takes a listener and starts an HTTP server with the given handler.
@ -34,10 +45,13 @@ const (
// NOTE: This function blocks - you may want to call it in a go-routine. // NOTE: This function blocks - you may want to call it in a go-routine.
func StartHTTPServer(listener net.Listener, handler http.Handler, logger log.Logger) error { func StartHTTPServer(listener net.Listener, handler http.Handler, logger log.Logger) error {
logger.Info(fmt.Sprintf("Starting RPC HTTP server on %s", listener.Addr())) logger.Info(fmt.Sprintf("Starting RPC HTTP server on %s", listener.Addr()))
err := http.Serve(
listener,
RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger),
)
s := &http.Server{
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger),
ReadTimeout: ReadTimeout,
WriteTimeout: WriteTimeout,
MaxHeaderBytes: maxHeaderBytes,
}
err := s.Serve(listener)
logger.Info("RPC HTTP server stopped", "err", err) logger.Info("RPC HTTP server stopped", "err", err)
return err return err
} }
@ -53,13 +67,15 @@ func StartHTTPAndTLSServer(
) error { ) error {
logger.Info(fmt.Sprintf("Starting RPC HTTPS server on %s (cert: %q, key: %q)", logger.Info(fmt.Sprintf("Starting RPC HTTPS server on %s (cert: %q, key: %q)",
listener.Addr(), certFile, keyFile)) listener.Addr(), certFile, keyFile))
err := http.ServeTLS(
listener,
RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger),
certFile,
keyFile,
)
logger.Info("RPC HTTPS server stopped", "err", err)
s := &http.Server{
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger),
ReadTimeout: ReadTimeout,
WriteTimeout: WriteTimeout,
MaxHeaderBytes: maxHeaderBytes,
}
err := s.ServeTLS(listener, certFile, keyFile)
logger.Error("RPC HTTPS server stopped", "err", err)
return err return err
} }


Loading…
Cancel
Save