diff --git a/internal/rpc/core/env.go b/internal/rpc/core/env.go index 6dfbb1478..257479a27 100644 --- a/internal/rpc/core/env.go +++ b/internal/rpc/core/env.go @@ -1,10 +1,14 @@ package core import ( + "context" "encoding/base64" "fmt" + "net" + "net/http" "time" + "github.com/rs/cors" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/internal/blocksync" @@ -13,12 +17,15 @@ import ( "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/proxy" + tmpubsub "github.com/tendermint/tendermint/internal/pubsub" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/state/indexer" "github.com/tendermint/tendermint/internal/statesync" tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/strings" "github.com/tendermint/tendermint/rpc/coretypes" + rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server" "github.com/tendermint/tendermint/types" ) @@ -205,3 +212,99 @@ func (env *Environment) latestUncommittedHeight() int64 { } return env.BlockStore.Height() + 1 } + +// StartService constructs and starts listeners for the RPC service +// according to the config object, returning an error if the service +// cannot be constructed or started. The listeners, which provide +// access to the service, run until the context is canceled. +func (env *Environment) StartService(ctx context.Context, conf *config.Config) ([]net.Listener, error) { + if err := env.InitGenesisChunks(); err != nil { + return nil, err + } + + listenAddrs := strings.SplitAndTrimEmpty(conf.RPC.ListenAddress, ",", " ") + routes := env.GetRoutes() + + if conf.RPC.Unsafe { + env.AddUnsafe(routes) + } + + cfg := rpcserver.DefaultConfig() + cfg.MaxBodyBytes = conf.RPC.MaxBodyBytes + cfg.MaxHeaderBytes = conf.RPC.MaxHeaderBytes + cfg.MaxOpenConnections = conf.RPC.MaxOpenConnections + // If necessary adjust global WriteTimeout to ensure it's greater than + // TimeoutBroadcastTxCommit. + // See https://github.com/tendermint/tendermint/issues/3435 + if cfg.WriteTimeout <= conf.RPC.TimeoutBroadcastTxCommit { + cfg.WriteTimeout = conf.RPC.TimeoutBroadcastTxCommit + 1*time.Second + } + + // we may expose the rpc over both a unix and tcp socket + listeners := make([]net.Listener, len(listenAddrs)) + for i, listenAddr := range listenAddrs { + mux := http.NewServeMux() + rpcLogger := env.Logger.With("module", "rpc-server") + wmLogger := rpcLogger.With("protocol", "websocket") + wm := rpcserver.NewWebsocketManager(wmLogger, routes, + rpcserver.OnDisconnect(func(remoteAddr string) { + err := env.EventBus.UnsubscribeAll(context.Background(), remoteAddr) + if err != nil && err != tmpubsub.ErrSubscriptionNotFound { + wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) + } + }), + rpcserver.ReadLimit(cfg.MaxBodyBytes), + ) + mux.HandleFunc("/websocket", wm.WebsocketHandler) + rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger) + listener, err := rpcserver.Listen( + listenAddr, + cfg.MaxOpenConnections, + ) + if err != nil { + return nil, err + } + + var rootHandler http.Handler = mux + if conf.RPC.IsCorsEnabled() { + corsMiddleware := cors.New(cors.Options{ + AllowedOrigins: conf.RPC.CORSAllowedOrigins, + AllowedMethods: conf.RPC.CORSAllowedMethods, + AllowedHeaders: conf.RPC.CORSAllowedHeaders, + }) + rootHandler = corsMiddleware.Handler(mux) + } + if conf.RPC.IsTLSEnabled() { + go func() { + if err := rpcserver.ServeTLS( + ctx, + listener, + rootHandler, + conf.RPC.CertFile(), + conf.RPC.KeyFile(), + rpcLogger, + cfg, + ); err != nil { + env.Logger.Error("error serving server with TLS", "err", err) + } + }() + } else { + go func() { + if err := rpcserver.Serve( + ctx, + listener, + rootHandler, + rpcLogger, + cfg, + ); err != nil { + env.Logger.Error("error serving server", "err", err) + } + }() + } + + listeners[i] = listener + } + + return listeners, nil + +} diff --git a/node/node.go b/node/node.go index d653018dd..b559f057f 100644 --- a/node/node.go +++ b/node/node.go @@ -11,7 +11,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/rs/cors" abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" @@ -23,7 +22,6 @@ import ( "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/pex" "github.com/tendermint/tendermint/internal/proxy" - tmpubsub "github.com/tendermint/tendermint/internal/pubsub" rpccore "github.com/tendermint/tendermint/internal/rpc/core" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/state/indexer" @@ -34,7 +32,6 @@ import ( "github.com/tendermint/tendermint/libs/strings" tmtime "github.com/tendermint/tendermint/libs/time" "github.com/tendermint/tendermint/privval" - rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server" "github.com/tendermint/tendermint/types" _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port @@ -449,11 +446,11 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { // Start the RPC server before the P2P server // so we can eg. receive txs for the first block if n.config.RPC.ListenAddress != "" { - listeners, err := n.startRPC(ctx) + var err error + n.rpcListeners, err = n.rpcEnv.StartService(ctx, n.config) if err != nil { return err } - n.rpcListeners = listeners } if n.config.Instrumentation.Prometheus && n.config.Instrumentation.PrometheusListenAddr != "" { @@ -604,97 +601,6 @@ func (n *nodeImpl) OnStop() { } } -func (n *nodeImpl) startRPC(ctx context.Context) ([]net.Listener, error) { - if err := n.rpcEnv.InitGenesisChunks(); err != nil { - return nil, err - } - - listenAddrs := strings.SplitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ") - routes := n.rpcEnv.GetRoutes() - - if n.config.RPC.Unsafe { - n.rpcEnv.AddUnsafe(routes) - } - - cfg := rpcserver.DefaultConfig() - cfg.MaxBodyBytes = n.config.RPC.MaxBodyBytes - cfg.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes - cfg.MaxOpenConnections = n.config.RPC.MaxOpenConnections - // If necessary adjust global WriteTimeout to ensure it's greater than - // TimeoutBroadcastTxCommit. - // See https://github.com/tendermint/tendermint/issues/3435 - if cfg.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit { - cfg.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second - } - - // we may expose the rpc over both a unix and tcp socket - listeners := make([]net.Listener, len(listenAddrs)) - for i, listenAddr := range listenAddrs { - mux := http.NewServeMux() - rpcLogger := n.logger.With("module", "rpc-server") - wmLogger := rpcLogger.With("protocol", "websocket") - wm := rpcserver.NewWebsocketManager(wmLogger, routes, - rpcserver.OnDisconnect(func(remoteAddr string) { - err := n.rpcEnv.EventBus.UnsubscribeAll(context.Background(), remoteAddr) - if err != nil && err != tmpubsub.ErrSubscriptionNotFound { - wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) - } - }), - rpcserver.ReadLimit(cfg.MaxBodyBytes), - ) - mux.HandleFunc("/websocket", wm.WebsocketHandler) - rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger) - listener, err := rpcserver.Listen( - listenAddr, - cfg.MaxOpenConnections, - ) - if err != nil { - return nil, err - } - - var rootHandler http.Handler = mux - if n.config.RPC.IsCorsEnabled() { - corsMiddleware := cors.New(cors.Options{ - AllowedOrigins: n.config.RPC.CORSAllowedOrigins, - AllowedMethods: n.config.RPC.CORSAllowedMethods, - AllowedHeaders: n.config.RPC.CORSAllowedHeaders, - }) - rootHandler = corsMiddleware.Handler(mux) - } - if n.config.RPC.IsTLSEnabled() { - go func() { - if err := rpcserver.ServeTLS( - ctx, - listener, - rootHandler, - n.config.RPC.CertFile(), - n.config.RPC.KeyFile(), - rpcLogger, - cfg, - ); err != nil { - n.logger.Error("error serving server with TLS", "err", err) - } - }() - } else { - go func() { - if err := rpcserver.Serve( - ctx, - listener, - rootHandler, - rpcLogger, - cfg, - ); err != nil { - n.logger.Error("error serving server", "err", err) - } - }() - } - - listeners[i] = listener - } - - return listeners, nil -} - // startPrometheusServer starts a Prometheus HTTP server, listening for metrics // collectors on addr. func (n *nodeImpl) startPrometheusServer(ctx context.Context, addr string) *http.Server {