|
|
@ -11,7 +11,6 @@ import ( |
|
|
|
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
"github.com/prometheus/client_golang/prometheus/promhttp" |
|
|
|
"github.com/rs/cors" |
|
|
|
abciclient "github.com/tendermint/tendermint/abci/client" |
|
|
|
abci "github.com/tendermint/tendermint/abci/types" |
|
|
|
"github.com/tendermint/tendermint/config" |
|
|
@ -23,7 +22,6 @@ import ( |
|
|
|
"github.com/tendermint/tendermint/internal/p2p" |
|
|
|
"github.com/tendermint/tendermint/internal/p2p/pex" |
|
|
|
"github.com/tendermint/tendermint/internal/proxy" |
|
|
|
tmpubsub "github.com/tendermint/tendermint/internal/pubsub" |
|
|
|
rpccore "github.com/tendermint/tendermint/internal/rpc/core" |
|
|
|
sm "github.com/tendermint/tendermint/internal/state" |
|
|
|
"github.com/tendermint/tendermint/internal/state/indexer" |
|
|
@ -34,7 +32,6 @@ import ( |
|
|
|
"github.com/tendermint/tendermint/libs/strings" |
|
|
|
tmtime "github.com/tendermint/tendermint/libs/time" |
|
|
|
"github.com/tendermint/tendermint/privval" |
|
|
|
rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server" |
|
|
|
"github.com/tendermint/tendermint/types" |
|
|
|
|
|
|
|
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
|
|
|
@ -449,11 +446,11 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { |
|
|
|
// Start the RPC server before the P2P server
|
|
|
|
// so we can eg. receive txs for the first block
|
|
|
|
if n.config.RPC.ListenAddress != "" { |
|
|
|
listeners, err := n.startRPC(ctx) |
|
|
|
var err error |
|
|
|
n.rpcListeners, err = n.rpcEnv.StartService(ctx, n.config) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
n.rpcListeners = listeners |
|
|
|
} |
|
|
|
|
|
|
|
if n.config.Instrumentation.Prometheus && n.config.Instrumentation.PrometheusListenAddr != "" { |
|
|
@ -604,97 +601,6 @@ func (n *nodeImpl) OnStop() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (n *nodeImpl) startRPC(ctx context.Context) ([]net.Listener, error) { |
|
|
|
if err := n.rpcEnv.InitGenesisChunks(); err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
listenAddrs := strings.SplitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ") |
|
|
|
routes := n.rpcEnv.GetRoutes() |
|
|
|
|
|
|
|
if n.config.RPC.Unsafe { |
|
|
|
n.rpcEnv.AddUnsafe(routes) |
|
|
|
} |
|
|
|
|
|
|
|
cfg := rpcserver.DefaultConfig() |
|
|
|
cfg.MaxBodyBytes = n.config.RPC.MaxBodyBytes |
|
|
|
cfg.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes |
|
|
|
cfg.MaxOpenConnections = n.config.RPC.MaxOpenConnections |
|
|
|
// If necessary adjust global WriteTimeout to ensure it's greater than
|
|
|
|
// TimeoutBroadcastTxCommit.
|
|
|
|
// See https://github.com/tendermint/tendermint/issues/3435
|
|
|
|
if cfg.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit { |
|
|
|
cfg.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second |
|
|
|
} |
|
|
|
|
|
|
|
// we may expose the rpc over both a unix and tcp socket
|
|
|
|
listeners := make([]net.Listener, len(listenAddrs)) |
|
|
|
for i, listenAddr := range listenAddrs { |
|
|
|
mux := http.NewServeMux() |
|
|
|
rpcLogger := n.logger.With("module", "rpc-server") |
|
|
|
wmLogger := rpcLogger.With("protocol", "websocket") |
|
|
|
wm := rpcserver.NewWebsocketManager(wmLogger, routes, |
|
|
|
rpcserver.OnDisconnect(func(remoteAddr string) { |
|
|
|
err := n.rpcEnv.EventBus.UnsubscribeAll(context.Background(), remoteAddr) |
|
|
|
if err != nil && err != tmpubsub.ErrSubscriptionNotFound { |
|
|
|
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) |
|
|
|
} |
|
|
|
}), |
|
|
|
rpcserver.ReadLimit(cfg.MaxBodyBytes), |
|
|
|
) |
|
|
|
mux.HandleFunc("/websocket", wm.WebsocketHandler) |
|
|
|
rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger) |
|
|
|
listener, err := rpcserver.Listen( |
|
|
|
listenAddr, |
|
|
|
cfg.MaxOpenConnections, |
|
|
|
) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
var rootHandler http.Handler = mux |
|
|
|
if n.config.RPC.IsCorsEnabled() { |
|
|
|
corsMiddleware := cors.New(cors.Options{ |
|
|
|
AllowedOrigins: n.config.RPC.CORSAllowedOrigins, |
|
|
|
AllowedMethods: n.config.RPC.CORSAllowedMethods, |
|
|
|
AllowedHeaders: n.config.RPC.CORSAllowedHeaders, |
|
|
|
}) |
|
|
|
rootHandler = corsMiddleware.Handler(mux) |
|
|
|
} |
|
|
|
if n.config.RPC.IsTLSEnabled() { |
|
|
|
go func() { |
|
|
|
if err := rpcserver.ServeTLS( |
|
|
|
ctx, |
|
|
|
listener, |
|
|
|
rootHandler, |
|
|
|
n.config.RPC.CertFile(), |
|
|
|
n.config.RPC.KeyFile(), |
|
|
|
rpcLogger, |
|
|
|
cfg, |
|
|
|
); err != nil { |
|
|
|
n.logger.Error("error serving server with TLS", "err", err) |
|
|
|
} |
|
|
|
}() |
|
|
|
} else { |
|
|
|
go func() { |
|
|
|
if err := rpcserver.Serve( |
|
|
|
ctx, |
|
|
|
listener, |
|
|
|
rootHandler, |
|
|
|
rpcLogger, |
|
|
|
cfg, |
|
|
|
); err != nil { |
|
|
|
n.logger.Error("error serving server", "err", err) |
|
|
|
} |
|
|
|
}() |
|
|
|
} |
|
|
|
|
|
|
|
listeners[i] = listener |
|
|
|
} |
|
|
|
|
|
|
|
return listeners, nil |
|
|
|
} |
|
|
|
|
|
|
|
// startPrometheusServer starts a Prometheus HTTP server, listening for metrics
|
|
|
|
// collectors on addr.
|
|
|
|
func (n *nodeImpl) startPrometheusServer(ctx context.Context, addr string) *http.Server { |
|
|
|