From b646437ec761107b9bde415a6bf85b60cc01405b Mon Sep 17 00:00:00 2001 From: Alessio Treglia Date: Thu, 15 Nov 2018 20:33:04 +0000 Subject: [PATCH] Decouple StartHTTP{,AndTLS}Server from Listen() (#2791) * Decouple StartHTTP{,AndTLS}Server from Listen() This should help solve cosmos/cosmos-sdk#2715 * Fix small mistake * Update StartGRPCServer * s/rpc/rpcserver/ * Start grpccore.StartGRPCServer in a goroutine * Reinstate l.Close() * Fix rpc/lib/test/main.go * Update code comment * update changelog and comments * fix tm-monitor. more comments --- CHANGELOG_PENDING.md | 3 + lite/proxy/proxy.go | 47 ++++++------ node/node.go | 25 +++---- rpc/grpc/client_server.go | 27 ++----- rpc/lib/doc.go | 9 +-- rpc/lib/rpc_test.go | 19 ++--- rpc/lib/server/http_server.go | 112 +++++++++++++---------------- rpc/lib/server/http_server_test.go | 14 ++-- rpc/lib/test/main.go | 4 +- tools/tm-monitor/main.go | 9 +-- tools/tm-monitor/rpc.go | 8 ++- 11 files changed, 126 insertions(+), 151 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 6494867d7..eea80c57e 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -16,6 +16,9 @@ program](https://hackerone.com/tendermint). * Apps * Go API + - [rpc] \#2791 Functions that start HTTP servers are now blocking: + - Impacts: StartHTTPServer, StartHTTPAndTLSServer, and StartGRPCServer, + - These functions now take a `net.Listener` instead of an address * Blockchain Protocol diff --git a/lite/proxy/proxy.go b/lite/proxy/proxy.go index ffd9db1d7..d7ffb27d0 100644 --- a/lite/proxy/proxy.go +++ b/lite/proxy/proxy.go @@ -9,7 +9,7 @@ import ( rpcclient "github.com/tendermint/tendermint/rpc/client" "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" - rpc "github.com/tendermint/tendermint/rpc/lib/server" + rpcserver "github.com/tendermint/tendermint/rpc/lib/server" ) const ( @@ -19,6 +19,7 @@ const ( // StartProxy will start the websocket manager on the client, // set up the rpc routes to proxy via the given client, // and start up an http/rpc server on the location given by bind (eg. :1234) +// NOTE: This function blocks - you may want to call it in a go-routine. func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpenConnections int) error { err := c.Start() if err != nil { @@ -31,47 +32,49 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe // build the handler... mux := http.NewServeMux() - rpc.RegisterRPCFuncs(mux, r, cdc, logger) + rpcserver.RegisterRPCFuncs(mux, r, cdc, logger) - wm := rpc.NewWebsocketManager(r, cdc, rpc.EventSubscriber(c)) + wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.EventSubscriber(c)) wm.SetLogger(logger) core.SetLogger(logger) mux.HandleFunc(wsEndpoint, wm.WebsocketHandler) - _, err = rpc.StartHTTPServer(listenAddr, mux, logger, rpc.Config{MaxOpenConnections: maxOpenConnections}) - - return err + l, err := rpcserver.Listen(listenAddr, rpcserver.Config{MaxOpenConnections: maxOpenConnections}) + if err != nil { + return err + } + return rpcserver.StartHTTPServer(l, mux, logger) } // RPCRoutes just routes everything to the given client, as if it were // a tendermint fullnode. // // if we want security, the client must implement it as a secure client -func RPCRoutes(c rpcclient.Client) map[string]*rpc.RPCFunc { +func RPCRoutes(c rpcclient.Client) map[string]*rpcserver.RPCFunc { - return map[string]*rpc.RPCFunc{ + return map[string]*rpcserver.RPCFunc{ // Subscribe/unsubscribe are reserved for websocket events. // We can just use the core tendermint impl, which uses the // EventSwitch we registered in NewWebsocketManager above - "subscribe": rpc.NewWSRPCFunc(core.Subscribe, "query"), - "unsubscribe": rpc.NewWSRPCFunc(core.Unsubscribe, "query"), + "subscribe": rpcserver.NewWSRPCFunc(core.Subscribe, "query"), + "unsubscribe": rpcserver.NewWSRPCFunc(core.Unsubscribe, "query"), // info API - "status": rpc.NewRPCFunc(c.Status, ""), - "blockchain": rpc.NewRPCFunc(c.BlockchainInfo, "minHeight,maxHeight"), - "genesis": rpc.NewRPCFunc(c.Genesis, ""), - "block": rpc.NewRPCFunc(c.Block, "height"), - "commit": rpc.NewRPCFunc(c.Commit, "height"), - "tx": rpc.NewRPCFunc(c.Tx, "hash,prove"), - "validators": rpc.NewRPCFunc(c.Validators, ""), + "status": rpcserver.NewRPCFunc(c.Status, ""), + "blockchain": rpcserver.NewRPCFunc(c.BlockchainInfo, "minHeight,maxHeight"), + "genesis": rpcserver.NewRPCFunc(c.Genesis, ""), + "block": rpcserver.NewRPCFunc(c.Block, "height"), + "commit": rpcserver.NewRPCFunc(c.Commit, "height"), + "tx": rpcserver.NewRPCFunc(c.Tx, "hash,prove"), + "validators": rpcserver.NewRPCFunc(c.Validators, ""), // broadcast API - "broadcast_tx_commit": rpc.NewRPCFunc(c.BroadcastTxCommit, "tx"), - "broadcast_tx_sync": rpc.NewRPCFunc(c.BroadcastTxSync, "tx"), - "broadcast_tx_async": rpc.NewRPCFunc(c.BroadcastTxAsync, "tx"), + "broadcast_tx_commit": rpcserver.NewRPCFunc(c.BroadcastTxCommit, "tx"), + "broadcast_tx_sync": rpcserver.NewRPCFunc(c.BroadcastTxSync, "tx"), + "broadcast_tx_async": rpcserver.NewRPCFunc(c.BroadcastTxAsync, "tx"), // abci API - "abci_query": rpc.NewRPCFunc(c.ABCIQuery, "path,data,prove"), - "abci_info": rpc.NewRPCFunc(c.ABCIInfo, ""), + "abci_query": rpcserver.NewRPCFunc(c.ABCIQuery, "path,data,prove"), + "abci_info": rpcserver.NewRPCFunc(c.ABCIInfo, ""), } } diff --git a/node/node.go b/node/node.go index 796bbc2a8..f1da1df0e 100644 --- a/node/node.go +++ b/node/node.go @@ -653,6 +653,14 @@ func (n *Node) startRPC() ([]net.Listener, error) { mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger) + listener, err := rpcserver.Listen( + listenAddr, + rpcserver.Config{MaxOpenConnections: n.config.RPC.MaxOpenConnections}, + ) + if err != nil { + return nil, err + } + var rootHandler http.Handler = mux if n.config.RPC.IsCorsEnabled() { corsMiddleware := cors.New(cors.Options{ @@ -663,30 +671,23 @@ func (n *Node) startRPC() ([]net.Listener, error) { rootHandler = corsMiddleware.Handler(mux) } - listener, err := rpcserver.StartHTTPServer( - listenAddr, + go rpcserver.StartHTTPServer( + listener, rootHandler, rpcLogger, - rpcserver.Config{MaxOpenConnections: n.config.RPC.MaxOpenConnections}, ) - if err != nil { - return nil, err - } listeners[i] = listener } // we expose a simplified api over grpc for convenience to app devs grpcListenAddr := n.config.RPC.GRPCListenAddress if grpcListenAddr != "" { - listener, err := grpccore.StartGRPCServer( - grpcListenAddr, - grpccore.Config{ - MaxOpenConnections: n.config.RPC.GRPCMaxOpenConnections, - }, - ) + listener, err := rpcserver.Listen( + grpcListenAddr, rpcserver.Config{MaxOpenConnections: n.config.RPC.GRPCMaxOpenConnections}) if err != nil { return nil, err } + go grpccore.StartGRPCServer(listener) listeners = append(listeners, listener) } diff --git a/rpc/grpc/client_server.go b/rpc/grpc/client_server.go index c88989685..2bc89864d 100644 --- a/rpc/grpc/client_server.go +++ b/rpc/grpc/client_server.go @@ -1,12 +1,9 @@ package core_grpc import ( - "fmt" "net" - "strings" "time" - "golang.org/x/net/netutil" "google.golang.org/grpc" cmn "github.com/tendermint/tendermint/libs/common" @@ -17,28 +14,12 @@ type Config struct { MaxOpenConnections int } -// StartGRPCServer starts a new gRPC BroadcastAPIServer, listening on -// protoAddr, in a goroutine. Returns a listener and an error, if it fails to -// parse an address. -func StartGRPCServer(protoAddr string, config Config) (net.Listener, error) { - parts := strings.SplitN(protoAddr, "://", 2) - if len(parts) != 2 { - return nil, fmt.Errorf("Invalid listen address for grpc server (did you forget a tcp:// prefix?) : %s", protoAddr) - } - proto, addr := parts[0], parts[1] - ln, err := net.Listen(proto, addr) - if err != nil { - return nil, err - } - if config.MaxOpenConnections > 0 { - ln = netutil.LimitListener(ln, config.MaxOpenConnections) - } - +// StartGRPCServer starts a new gRPC BroadcastAPIServer using the given net.Listener. +// NOTE: This function blocks - you may want to call it in a go-routine. +func StartGRPCServer(ln net.Listener) error { grpcServer := grpc.NewServer() RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{}) - go grpcServer.Serve(ln) // nolint: errcheck - - return ln, nil + return grpcServer.Serve(ln) } // StartGRPCClient dials the gRPC server using protoAddr and returns a new diff --git a/rpc/lib/doc.go b/rpc/lib/doc.go index dbdb362da..aa9638bfd 100644 --- a/rpc/lib/doc.go +++ b/rpc/lib/doc.go @@ -70,12 +70,9 @@ // wm := rpcserver.NewWebsocketManager(Routes) // mux.HandleFunc("/websocket", wm.WebsocketHandler) // logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) -// go func() { -// _, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux, logger) -// if err != nil { -// panic(err) -// } -// }() +// listener, err := rpc.Listen("0.0.0.0:8080", rpcserver.Config{}) +// if err != nil { panic(err) } +// go rpcserver.StartHTTPServer(listener, mux, logger) // // Note that unix sockets are supported as well (eg. `/path/to/socket` instead of `0.0.0.0:8008`) // Now see all available endpoints by sending a GET request to `0.0.0.0:8008`. diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index 3d76db323..11b73ef19 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -121,12 +121,11 @@ func setup() { wm := server.NewWebsocketManager(Routes, RoutesCdc, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second)) wm.SetLogger(tcpLogger) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) - go func() { - _, err := server.StartHTTPServer(tcpAddr, mux, tcpLogger, server.Config{}) - if err != nil { - panic(err) - } - }() + listener1, err := server.Listen(tcpAddr, server.Config{}) + if err != nil { + panic(err) + } + go server.StartHTTPServer(listener1, mux, tcpLogger) unixLogger := logger.With("socket", "unix") mux2 := http.NewServeMux() @@ -134,12 +133,8 @@ func setup() { wm = server.NewWebsocketManager(Routes, RoutesCdc) wm.SetLogger(unixLogger) mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler) - go func() { - _, err := server.StartHTTPServer(unixAddr, mux2, unixLogger, server.Config{}) - if err != nil { - panic(err) - } - }() + listener2, err := server.Listen(unixAddr, server.Config{}) + go server.StartHTTPServer(listener2, mux2, unixLogger) // wait for servers to start time.Sleep(time.Second * 2) diff --git a/rpc/lib/server/http_server.go b/rpc/lib/server/http_server.go index 8cacaeefb..3e7632e2f 100644 --- a/rpc/lib/server/http_server.go +++ b/rpc/lib/server/http_server.go @@ -29,90 +29,46 @@ const ( maxBodyBytes = int64(1000000) // 1MB ) -// StartHTTPServer starts an HTTP server on listenAddr with the given handler. +// StartHTTPServer takes a listener and starts an HTTP server with the given handler. // It wraps handler with RecoverAndLogHandler. -func StartHTTPServer( - listenAddr string, - handler http.Handler, - logger log.Logger, - config Config, -) (listener net.Listener, err error) { - var proto, addr string - parts := strings.SplitN(listenAddr, "://", 2) - if len(parts) != 2 { - return nil, errors.Errorf( - "Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", - listenAddr, - ) - } - proto, addr = parts[0], parts[1] - - logger.Info(fmt.Sprintf("Starting RPC HTTP server on %s", listenAddr)) - listener, err = net.Listen(proto, addr) - if err != nil { - return nil, errors.Errorf("Failed to listen on %v: %v", listenAddr, err) - } - if config.MaxOpenConnections > 0 { - listener = netutil.LimitListener(listener, config.MaxOpenConnections) - } +// NOTE: This function blocks - you may want to call it in a go-routine. +func StartHTTPServer(listener net.Listener, handler http.Handler, logger log.Logger) error { + err := http.Serve( + listener, + RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger), + ) + logger.Info("RPC HTTP server stopped", "err", err) - go func() { - err := http.Serve( - listener, - RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger), - ) - logger.Info("RPC HTTP server stopped", "err", err) - }() - return listener, nil + return err } -// StartHTTPAndTLSServer starts an HTTPS server on listenAddr with the given -// handler. +// StartHTTPAndTLSServer takes a listener and starts an HTTPS server with the given handler. // It wraps handler with RecoverAndLogHandler. +// NOTE: This function blocks - you may want to call it in a go-routine. func StartHTTPAndTLSServer( - listenAddr string, + listener net.Listener, handler http.Handler, certFile, keyFile string, logger log.Logger, - config Config, -) (listener net.Listener, err error) { - var proto, addr string - parts := strings.SplitN(listenAddr, "://", 2) - if len(parts) != 2 { - return nil, errors.Errorf( - "Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", - listenAddr, - ) - } - proto, addr = parts[0], parts[1] - +) error { logger.Info( fmt.Sprintf( "Starting RPC HTTPS server on %s (cert: %q, key: %q)", - listenAddr, + listener.Addr(), certFile, keyFile, ), ) - listener, err = net.Listen(proto, addr) - if err != nil { - return nil, errors.Errorf("Failed to listen on %v: %v", listenAddr, err) - } - if config.MaxOpenConnections > 0 { - listener = netutil.LimitListener(listener, config.MaxOpenConnections) - } - - err = http.ServeTLS( + if err := http.ServeTLS( listener, RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger), certFile, keyFile, - ) - if err != nil { + ); err != nil { logger.Error("RPC HTTPS server stopped", "err", err) - return nil, err + return err } - return listener, nil + return nil } func WriteRPCResponseHTTPError( @@ -213,3 +169,35 @@ func (h maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body = http.MaxBytesReader(w, r.Body, h.n) h.h.ServeHTTP(w, r) } + +// MustListen starts a new net.Listener on the given address. +// It panics in case of error. +func MustListen(addr string, config Config) net.Listener { + l, err := Listen(addr, config) + if err != nil { + panic(fmt.Errorf("Listen() failed: %v", err)) + } + return l +} + +// Listen starts a new net.Listener on the given address. +// It returns an error if the address is invalid or the call to Listen() fails. +func Listen(addr string, config Config) (listener net.Listener, err error) { + parts := strings.SplitN(addr, "://", 2) + if len(parts) != 2 { + return nil, errors.Errorf( + "Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", + addr, + ) + } + proto, addr := parts[0], parts[1] + listener, err = net.Listen(proto, addr) + if err != nil { + return nil, errors.Errorf("Failed to listen on %v: %v", addr, err) + } + if config.MaxOpenConnections > 0 { + listener = netutil.LimitListener(listener, config.MaxOpenConnections) + } + + return listener, nil +} diff --git a/rpc/lib/server/http_server_test.go b/rpc/lib/server/http_server_test.go index 73ebc2e7e..6b852afae 100644 --- a/rpc/lib/server/http_server_test.go +++ b/rpc/lib/server/http_server_test.go @@ -30,11 +30,10 @@ func TestMaxOpenConnections(t *testing.T) { time.Sleep(10 * time.Millisecond) fmt.Fprint(w, "some body") }) - l, err := StartHTTPServer("tcp://127.0.0.1:0", mux, log.TestingLogger(), Config{MaxOpenConnections: max}) - if err != nil { - t.Fatal(err) - } + l, err := Listen("tcp://127.0.0.1:0", Config{MaxOpenConnections: max}) + require.NoError(t, err) defer l.Close() + go StartHTTPServer(l, mux, log.TestingLogger()) // Make N GET calls to the server. attempts := max * 2 @@ -67,11 +66,14 @@ func TestMaxOpenConnections(t *testing.T) { func TestStartHTTPAndTLSServer(t *testing.T) { // set up fixtures listenerAddr := "tcp://0.0.0.0:0" + listener, err := Listen(listenerAddr, Config{MaxOpenConnections: 1}) + require.NoError(t, err) mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {}) // test failure - gotListener, err := StartHTTPAndTLSServer(listenerAddr, mux, "", "", log.TestingLogger(), Config{MaxOpenConnections: 1}) - require.Nil(t, gotListener) + err = StartHTTPAndTLSServer(listener, mux, "", "", log.TestingLogger()) require.IsType(t, (*os.PathError)(nil), err) + + // TODO: test that starting the server can actually work } diff --git a/rpc/lib/test/main.go b/rpc/lib/test/main.go index 544284b9c..0a9684d76 100644 --- a/rpc/lib/test/main.go +++ b/rpc/lib/test/main.go @@ -28,11 +28,11 @@ func main() { cdc := amino.NewCodec() logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) rpcserver.RegisterRPCFuncs(mux, routes, cdc, logger) - _, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux, logger, rpcserver.Config{}) + listener, err := rpcserver.Listen("0.0.0.0:8008", rpcserver.Config{}) if err != nil { cmn.Exit(err.Error()) } - + go rpcserver.StartHTTPServer(listener, mux, logger) // Wait forever cmn.TrapSignal(func() { }) diff --git a/tools/tm-monitor/main.go b/tools/tm-monitor/main.go index 32897b978..6e4aea5f9 100644 --- a/tools/tm-monitor/main.go +++ b/tools/tm-monitor/main.go @@ -48,13 +48,13 @@ Examples: logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)) } - m := startMonitor(flag.Arg(0)) + monitor := startMonitor(flag.Arg(0)) - startRPC(listenAddr, m, logger) + listener := startRPC(listenAddr, monitor, logger) var ton *Ton if !noton { - ton = NewTon(m) + ton = NewTon(monitor) ton.Start() } @@ -62,7 +62,8 @@ Examples: if !noton { ton.Stop() } - m.Stop() + monitor.Stop() + listener.Close() }) } diff --git a/tools/tm-monitor/rpc.go b/tools/tm-monitor/rpc.go index ab62e0462..1a08a9ecd 100644 --- a/tools/tm-monitor/rpc.go +++ b/tools/tm-monitor/rpc.go @@ -2,6 +2,7 @@ package main import ( "errors" + "net" "net/http" "github.com/tendermint/tendermint/libs/log" @@ -9,16 +10,19 @@ import ( monitor "github.com/tendermint/tendermint/tools/tm-monitor/monitor" ) -func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) { +func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) net.Listener { routes := routes(m) mux := http.NewServeMux() wm := rpc.NewWebsocketManager(routes, nil) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpc.RegisterRPCFuncs(mux, routes, cdc, logger) - if _, err := rpc.StartHTTPServer(listenAddr, mux, logger, rpc.Config{}); err != nil { + listener, err := rpc.Listen(listenAddr, rpc.Config{}) + if err != nil { panic(err) } + go rpc.StartHTTPServer(listener, mux, logger) + return listener } func routes(m *monitor.Monitor) map[string]*rpc.RPCFunc {