Browse Source

backport: Add basic metrics to the indexer package. (#7250) (#7252)

pull/7260/head
M. J. Fromberger 3 years ago
committed by GitHub
parent
commit
643a3f56f6
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 142 additions and 24 deletions
  1. +5
    -2
      internal/inspect/inspect.go
  2. +36
    -4
      internal/state/indexer/indexer_service.go
  3. +73
    -0
      internal/state/indexer/metrics.go
  4. +19
    -15
      node/node.go
  5. +2
    -1
      node/node_test.go
  6. +7
    -2
      node/setup.go

+ 5
- 2
internal/inspect/inspect.go View File

@ -46,8 +46,11 @@ func New(cfg *config.RPCConfig, bs state.BlockStore, ss state.Store, es []indexe
routes := rpc.Routes(*cfg, ss, bs, es, logger) routes := rpc.Routes(*cfg, ss, bs, es, logger)
eb := types.NewEventBus() eb := types.NewEventBus()
eb.SetLogger(logger.With("module", "events")) eb.SetLogger(logger.With("module", "events"))
is := indexer.NewIndexerService(es, eb)
is.SetLogger(logger.With("module", "txindex"))
is := indexer.NewService(indexer.ServiceArgs{
Sinks: es,
EventBus: eb,
Logger: logger.With("module", "txindex"),
})
return &Inspector{ return &Inspector{
routes: routes, routes: routes,
config: cfg, config: cfg,


+ 36
- 4
internal/state/indexer/indexer_service.go View File

@ -2,7 +2,9 @@ package indexer
import ( import (
"context" "context"
"time"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -20,14 +22,30 @@ type Service struct {
eventSinks []EventSink eventSinks []EventSink
eventBus *types.EventBus eventBus *types.EventBus
metrics *Metrics
}
// NewService constructs a new indexer service from the given arguments.
func NewService(args ServiceArgs) *Service {
is := &Service{
eventSinks: args.Sinks,
eventBus: args.EventBus,
metrics: args.Metrics,
}
if is.metrics == nil {
is.metrics = NopMetrics()
}
is.BaseService = *service.NewBaseService(args.Logger, "IndexerService", is)
return is
} }
// NewIndexerService returns a new service instance. // NewIndexerService returns a new service instance.
// Deprecated: Use NewService instead.
func NewIndexerService(es []EventSink, eventBus *types.EventBus) *Service { func NewIndexerService(es []EventSink, eventBus *types.EventBus) *Service {
is := &Service{eventSinks: es, eventBus: eventBus}
is.BaseService = *service.NewBaseService(nil, "IndexerService", is)
return is
return NewService(ServiceArgs{
Sinks: es,
EventBus: eventBus,
})
} }
// OnStart implements service.Service by subscribing for all transactions // OnStart implements service.Service by subscribing for all transactions
@ -79,17 +97,23 @@ func (is *Service) OnStart() error {
} }
for _, sink := range is.eventSinks { for _, sink := range is.eventSinks {
start := time.Now()
if err := sink.IndexBlockEvents(eventDataHeader); err != nil { if err := sink.IndexBlockEvents(eventDataHeader); err != nil {
is.Logger.Error("failed to index block", "height", height, "err", err) is.Logger.Error("failed to index block", "height", height, "err", err)
} else { } else {
is.metrics.BlockEventsSeconds.Observe(time.Since(start).Seconds())
is.metrics.BlocksIndexed.Add(1)
is.Logger.Debug("indexed block", "height", height, "sink", sink.Type()) is.Logger.Debug("indexed block", "height", height, "sink", sink.Type())
} }
if len(batch.Ops) > 0 { if len(batch.Ops) > 0 {
start := time.Now()
err := sink.IndexTxEvents(batch.Ops) err := sink.IndexTxEvents(batch.Ops)
if err != nil { if err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err) is.Logger.Error("failed to index block txs", "height", height, "err", err)
} else { } else {
is.metrics.TxEventsSeconds.Observe(time.Since(start).Seconds())
is.metrics.TransactionsIndexed.Add(float64(len(batch.Ops)))
is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type()) is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type())
} }
} }
@ -114,6 +138,14 @@ func (is *Service) OnStop() {
} }
} }
// ServiceArgs are arguments for constructing a new indexer service.
type ServiceArgs struct {
Sinks []EventSink
EventBus *types.EventBus
Metrics *Metrics
Logger log.Logger
}
// KVSinkEnabled returns the given eventSinks is containing KVEventSink. // KVSinkEnabled returns the given eventSinks is containing KVEventSink.
func KVSinkEnabled(sinks []EventSink) bool { func KVSinkEnabled(sinks []EventSink) bool {
for _, sink := range sinks { for _, sink := range sinks {


+ 73
- 0
internal/state/indexer/metrics.go View File

@ -0,0 +1,73 @@
package indexer
import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
// MetricsSubsystem is a the subsystem label for the indexer package.
const MetricsSubsystem = "indexer"
// Metrics contains metrics exposed by this package.
type Metrics struct {
// Latency for indexing block events.
BlockEventsSeconds metrics.Histogram
// Latency for indexing transaction events.
TxEventsSeconds metrics.Histogram
// Number of complete blocks indexed.
BlocksIndexed metrics.Counter
// Number of transactions indexed.
TransactionsIndexed metrics.Counter
}
// PrometheusMetrics returns Metrics build using Prometheus client library.
// Optionally, labels can be provided along with their values ("foo",
// "fooValue").
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
BlockEventsSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_events_seconds",
Help: "Latency for indexing block events.",
}, labels).With(labelsAndValues...),
TxEventsSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "tx_events_seconds",
Help: "Latency for indexing transaction events.",
}, labels).With(labelsAndValues...),
BlocksIndexed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "blocks_indexed",
Help: "Number of complete blocks indexed.",
}, labels).With(labelsAndValues...),
TransactionsIndexed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "transactions_indexed",
Help: "Number of transactions indexed.",
}, labels).With(labelsAndValues...),
}
}
// NopMetrics returns an indexer metrics stub that discards all samples.
func NopMetrics() *Metrics {
return &Metrics{
BlockEventsSeconds: discard.NewHistogram(),
TxEventsSeconds: discard.NewHistogram(),
BlocksIndexed: discard.NewCounter(),
TransactionsIndexed: discard.NewCounter(),
}
}

+ 19
- 15
node/node.go View File

@ -23,6 +23,7 @@ import (
"github.com/tendermint/tendermint/internal/proxy" "github.com/tendermint/tendermint/internal/proxy"
rpccore "github.com/tendermint/tendermint/internal/rpc/core" rpccore "github.com/tendermint/tendermint/internal/rpc/core"
sm "github.com/tendermint/tendermint/internal/state" sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/internal/statesync" "github.com/tendermint/tendermint/internal/statesync"
"github.com/tendermint/tendermint/internal/store" "github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
@ -148,6 +149,8 @@ func makeNode(cfg *config.Config,
return nil, err return nil, err
} }
nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger) proxyApp, err := createAndStartProxyAppConns(clientCreator, logger)
if err != nil { if err != nil {
@ -163,7 +166,8 @@ func makeNode(cfg *config.Config,
return nil, err return nil, err
} }
indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus, logger, genDoc.ChainID)
indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider,
eventBus, logger, genDoc.ChainID, nodeMetrics.indexer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -242,9 +246,6 @@ func makeNode(cfg *config.Config,
return nil, fmt.Errorf("failed to create peer manager: %w", err) return nil, fmt.Errorf("failed to create peer manager: %w", err)
} }
nodeMetrics :=
defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey, router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey,
peerManager, transport, getRouterConfig(cfg, proxyApp)) peerManager, transport, getRouterConfig(cfg, proxyApp))
if err != nil { if err != nil {
@ -1053,8 +1054,9 @@ func defaultGenesisDocProviderFunc(cfg *config.Config) genesisDocProvider {
type nodeMetrics struct { type nodeMetrics struct {
consensus *consensus.Metrics consensus *consensus.Metrics
p2p *p2p.Metrics
indexer *indexer.Metrics
mempool *mempool.Metrics mempool *mempool.Metrics
p2p *p2p.Metrics
state *sm.Metrics state *sm.Metrics
statesync *statesync.Metrics statesync *statesync.Metrics
} }
@ -1068,19 +1070,21 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider {
return func(chainID string) *nodeMetrics { return func(chainID string) *nodeMetrics {
if cfg.Prometheus { if cfg.Prometheus {
return &nodeMetrics{ return &nodeMetrics{
consensus.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
p2p.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
mempool.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
sm.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
statesync.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
consensus: consensus.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
indexer: indexer.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
mempool: mempool.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
p2p: p2p.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
state: sm.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
statesync: statesync.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
} }
} }
return &nodeMetrics{ return &nodeMetrics{
consensus.NopMetrics(),
p2p.NopMetrics(),
mempool.NopMetrics(),
sm.NopMetrics(),
statesync.NopMetrics(),
consensus: consensus.NopMetrics(),
indexer: indexer.NopMetrics(),
mempool: mempool.NopMetrics(),
p2p: p2p.NopMetrics(),
state: sm.NopMetrics(),
statesync: statesync.NopMetrics(),
} }
} }
} }


+ 2
- 1
node/node_test.go View File

@ -529,7 +529,8 @@ func TestNodeSetEventSink(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
indexService, eventSinks, err := createAndStartIndexerService(cfg, indexService, eventSinks, err := createAndStartIndexerService(cfg,
config.DefaultDBProvider, eventBus, logger, genDoc.ChainID)
config.DefaultDBProvider, eventBus, logger, genDoc.ChainID,
indexer.NopMetrics())
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, indexService.Stop()) }) t.Cleanup(func() { require.NoError(t, indexService.Stop()) })
return eventSinks return eventSinks


+ 7
- 2
node/setup.go View File

@ -76,14 +76,19 @@ func createAndStartIndexerService(
eventBus *types.EventBus, eventBus *types.EventBus,
logger log.Logger, logger log.Logger,
chainID string, chainID string,
metrics *indexer.Metrics,
) (*indexer.Service, []indexer.EventSink, error) { ) (*indexer.Service, []indexer.EventSink, error) {
eventSinks, err := sink.EventSinksFromConfig(cfg, dbProvider, chainID) eventSinks, err := sink.EventSinksFromConfig(cfg, dbProvider, chainID)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
indexerService := indexer.NewIndexerService(eventSinks, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))
indexerService := indexer.NewService(indexer.ServiceArgs{
Sinks: eventSinks,
EventBus: eventBus,
Logger: logger.With("module", "txindex"),
Metrics: metrics,
})
if err := indexerService.Start(); err != nil { if err := indexerService.Start(); err != nil {
return nil, nil, err return nil, nil, err


Loading…
Cancel
Save