From 643a3f56f663fa1c5eda0a2934d455671e6b5aa7 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Mon, 8 Nov 2021 06:48:38 -0800 Subject: [PATCH] backport: Add basic metrics to the indexer package. (#7250) (#7252) --- internal/inspect/inspect.go | 7 ++- internal/state/indexer/indexer_service.go | 40 +++++++++++-- internal/state/indexer/metrics.go | 73 +++++++++++++++++++++++ node/node.go | 34 ++++++----- node/node_test.go | 3 +- node/setup.go | 9 ++- 6 files changed, 142 insertions(+), 24 deletions(-) create mode 100644 internal/state/indexer/metrics.go diff --git a/internal/inspect/inspect.go b/internal/inspect/inspect.go index 90e615341..c5fcf0a7e 100644 --- a/internal/inspect/inspect.go +++ b/internal/inspect/inspect.go @@ -46,8 +46,11 @@ func New(cfg *config.RPCConfig, bs state.BlockStore, ss state.Store, es []indexe routes := rpc.Routes(*cfg, ss, bs, es, logger) eb := types.NewEventBus() eb.SetLogger(logger.With("module", "events")) - is := indexer.NewIndexerService(es, eb) - is.SetLogger(logger.With("module", "txindex")) + is := indexer.NewService(indexer.ServiceArgs{ + Sinks: es, + EventBus: eb, + Logger: logger.With("module", "txindex"), + }) return &Inspector{ routes: routes, config: cfg, diff --git a/internal/state/indexer/indexer_service.go b/internal/state/indexer/indexer_service.go index 39a1847f8..c00b1e54b 100644 --- a/internal/state/indexer/indexer_service.go +++ b/internal/state/indexer/indexer_service.go @@ -2,7 +2,9 @@ package indexer import ( "context" + "time" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/types" ) @@ -20,14 +22,30 @@ type Service struct { eventSinks []EventSink eventBus *types.EventBus + metrics *Metrics +} + +// NewService constructs a new indexer service from the given arguments. +func NewService(args ServiceArgs) *Service { + is := &Service{ + eventSinks: args.Sinks, + eventBus: args.EventBus, + metrics: args.Metrics, + } + if is.metrics == nil { + is.metrics = NopMetrics() + } + is.BaseService = *service.NewBaseService(args.Logger, "IndexerService", is) + return is } // NewIndexerService returns a new service instance. +// Deprecated: Use NewService instead. func NewIndexerService(es []EventSink, eventBus *types.EventBus) *Service { - - is := &Service{eventSinks: es, eventBus: eventBus} - is.BaseService = *service.NewBaseService(nil, "IndexerService", is) - return is + return NewService(ServiceArgs{ + Sinks: es, + EventBus: eventBus, + }) } // OnStart implements service.Service by subscribing for all transactions @@ -79,17 +97,23 @@ func (is *Service) OnStart() error { } for _, sink := range is.eventSinks { + start := time.Now() if err := sink.IndexBlockEvents(eventDataHeader); err != nil { is.Logger.Error("failed to index block", "height", height, "err", err) } else { + is.metrics.BlockEventsSeconds.Observe(time.Since(start).Seconds()) + is.metrics.BlocksIndexed.Add(1) is.Logger.Debug("indexed block", "height", height, "sink", sink.Type()) } if len(batch.Ops) > 0 { + start := time.Now() err := sink.IndexTxEvents(batch.Ops) if err != nil { is.Logger.Error("failed to index block txs", "height", height, "err", err) } else { + is.metrics.TxEventsSeconds.Observe(time.Since(start).Seconds()) + is.metrics.TransactionsIndexed.Add(float64(len(batch.Ops))) is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type()) } } @@ -114,6 +138,14 @@ func (is *Service) OnStop() { } } +// ServiceArgs are arguments for constructing a new indexer service. +type ServiceArgs struct { + Sinks []EventSink + EventBus *types.EventBus + Metrics *Metrics + Logger log.Logger +} + // KVSinkEnabled returns the given eventSinks is containing KVEventSink. func KVSinkEnabled(sinks []EventSink) bool { for _, sink := range sinks { diff --git a/internal/state/indexer/metrics.go b/internal/state/indexer/metrics.go new file mode 100644 index 000000000..aa64a4bb2 --- /dev/null +++ b/internal/state/indexer/metrics.go @@ -0,0 +1,73 @@ +package indexer + +import ( + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/discard" + + prometheus "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +// MetricsSubsystem is a the subsystem label for the indexer package. +const MetricsSubsystem = "indexer" + +// Metrics contains metrics exposed by this package. +type Metrics struct { + // Latency for indexing block events. + BlockEventsSeconds metrics.Histogram + + // Latency for indexing transaction events. + TxEventsSeconds metrics.Histogram + + // Number of complete blocks indexed. + BlocksIndexed metrics.Counter + + // Number of transactions indexed. + TransactionsIndexed metrics.Counter +} + +// PrometheusMetrics returns Metrics build using Prometheus client library. +// Optionally, labels can be provided along with their values ("foo", +// "fooValue"). +func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + BlockEventsSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "block_events_seconds", + Help: "Latency for indexing block events.", + }, labels).With(labelsAndValues...), + TxEventsSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "tx_events_seconds", + Help: "Latency for indexing transaction events.", + }, labels).With(labelsAndValues...), + BlocksIndexed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "blocks_indexed", + Help: "Number of complete blocks indexed.", + }, labels).With(labelsAndValues...), + TransactionsIndexed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "transactions_indexed", + Help: "Number of transactions indexed.", + }, labels).With(labelsAndValues...), + } +} + +// NopMetrics returns an indexer metrics stub that discards all samples. +func NopMetrics() *Metrics { + return &Metrics{ + BlockEventsSeconds: discard.NewHistogram(), + TxEventsSeconds: discard.NewHistogram(), + BlocksIndexed: discard.NewCounter(), + TransactionsIndexed: discard.NewCounter(), + } +} diff --git a/node/node.go b/node/node.go index 10d68c309..fff1d50ce 100644 --- a/node/node.go +++ b/node/node.go @@ -23,6 +23,7 @@ import ( "github.com/tendermint/tendermint/internal/proxy" rpccore "github.com/tendermint/tendermint/internal/rpc/core" sm "github.com/tendermint/tendermint/internal/state" + "github.com/tendermint/tendermint/internal/state/indexer" "github.com/tendermint/tendermint/internal/statesync" "github.com/tendermint/tendermint/internal/store" "github.com/tendermint/tendermint/libs/log" @@ -148,6 +149,8 @@ func makeNode(cfg *config.Config, return nil, err } + nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) + // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). proxyApp, err := createAndStartProxyAppConns(clientCreator, logger) if err != nil { @@ -163,7 +166,8 @@ func makeNode(cfg *config.Config, return nil, err } - indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus, logger, genDoc.ChainID) + indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, + eventBus, logger, genDoc.ChainID, nodeMetrics.indexer) if err != nil { return nil, err } @@ -242,9 +246,6 @@ func makeNode(cfg *config.Config, return nil, fmt.Errorf("failed to create peer manager: %w", err) } - nodeMetrics := - defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) - router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey, peerManager, transport, getRouterConfig(cfg, proxyApp)) if err != nil { @@ -1053,8 +1054,9 @@ func defaultGenesisDocProviderFunc(cfg *config.Config) genesisDocProvider { type nodeMetrics struct { consensus *consensus.Metrics - p2p *p2p.Metrics + indexer *indexer.Metrics mempool *mempool.Metrics + p2p *p2p.Metrics state *sm.Metrics statesync *statesync.Metrics } @@ -1068,19 +1070,21 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider { return func(chainID string) *nodeMetrics { if cfg.Prometheus { return &nodeMetrics{ - consensus.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), - p2p.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), - mempool.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), - sm.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), - statesync.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + consensus: consensus.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + indexer: indexer.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + mempool: mempool.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + p2p: p2p.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + state: sm.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + statesync: statesync.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), } } return &nodeMetrics{ - consensus.NopMetrics(), - p2p.NopMetrics(), - mempool.NopMetrics(), - sm.NopMetrics(), - statesync.NopMetrics(), + consensus: consensus.NopMetrics(), + indexer: indexer.NopMetrics(), + mempool: mempool.NopMetrics(), + p2p: p2p.NopMetrics(), + state: sm.NopMetrics(), + statesync: statesync.NopMetrics(), } } } diff --git a/node/node_test.go b/node/node_test.go index c733555d4..f83d21d45 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -529,7 +529,8 @@ func TestNodeSetEventSink(t *testing.T) { require.NoError(t, err) indexService, eventSinks, err := createAndStartIndexerService(cfg, - config.DefaultDBProvider, eventBus, logger, genDoc.ChainID) + config.DefaultDBProvider, eventBus, logger, genDoc.ChainID, + indexer.NopMetrics()) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, indexService.Stop()) }) return eventSinks diff --git a/node/setup.go b/node/setup.go index 3f305125d..776462b10 100644 --- a/node/setup.go +++ b/node/setup.go @@ -76,14 +76,19 @@ func createAndStartIndexerService( eventBus *types.EventBus, logger log.Logger, chainID string, + metrics *indexer.Metrics, ) (*indexer.Service, []indexer.EventSink, error) { eventSinks, err := sink.EventSinksFromConfig(cfg, dbProvider, chainID) if err != nil { return nil, nil, err } - indexerService := indexer.NewIndexerService(eventSinks, eventBus) - indexerService.SetLogger(logger.With("module", "txindex")) + indexerService := indexer.NewService(indexer.ServiceArgs{ + Sinks: eventSinks, + EventBus: eventBus, + Logger: logger.With("module", "txindex"), + Metrics: metrics, + }) if err := indexerService.Start(); err != nil { return nil, nil, err