Browse Source

Add basic metrics to the indexer package. (#7250)

This follows the same model as we did in the p2p package.

Rework the indexer service constructor to take a struct of arguments,
that makes it easier to construct the optional settings.
Deprecate but do not remove the existing constructor.

Clean up node initialization a little bit.
pull/7254/head
M. J. Fromberger 3 years ago
committed by GitHub
parent
commit
d5865af1f4
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 142 additions and 23 deletions
  1. +5
    -2
      internal/inspect/inspect.go
  2. +36
    -3
      internal/state/indexer/indexer_service.go
  3. +73
    -0
      internal/state/indexer/metrics.go
  4. +19
    -15
      node/node.go
  5. +2
    -1
      node/node_test.go
  6. +7
    -2
      node/setup.go

+ 5
- 2
internal/inspect/inspect.go View File

@ -47,8 +47,11 @@ func New(cfg *config.RPCConfig, bs state.BlockStore, ss state.Store, es []indexe
routes := rpc.Routes(*cfg, ss, bs, es, logger)
eb := eventbus.NewDefault()
eb.SetLogger(logger.With("module", "events"))
is := indexer.NewIndexerService(es, eb)
is.SetLogger(logger.With("module", "txindex"))
is := indexer.NewService(indexer.ServiceArgs{
Sinks: es,
EventBus: eb,
Logger: logger.With("module", "txindex"),
})
return &Inspector{
routes: routes,
config: cfg,


+ 36
- 3
internal/state/indexer/indexer_service.go View File

@ -2,8 +2,10 @@ package indexer
import (
"context"
"time"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/types"
@ -16,6 +18,7 @@ type Service struct {
eventSinks []EventSink
eventBus *eventbus.EventBus
metrics *Metrics
currentBlock struct {
header types.EventDataNewBlockHeader
@ -24,11 +27,27 @@ type Service struct {
}
}
// NewService constructs a new indexer service from the given arguments.
func NewService(args ServiceArgs) *Service {
is := &Service{
eventSinks: args.Sinks,
eventBus: args.EventBus,
metrics: args.Metrics,
}
if is.metrics == nil {
is.metrics = NopMetrics()
}
is.BaseService = *service.NewBaseService(args.Logger, "IndexerService", is)
return is
}
// NewIndexerService returns a new service instance.
// Deprecated: Use NewService instead.
func NewIndexerService(es []EventSink, eventBus *eventbus.EventBus) *Service {
is := &Service{eventSinks: es, eventBus: eventBus}
is.BaseService = *service.NewBaseService(nil, "IndexerService", is)
return is
return NewService(ServiceArgs{
Sinks: es,
EventBus: eventBus,
})
}
// publish publishes a pubsub message to the service. The service blocks until
@ -71,20 +90,26 @@ func (is *Service) publish(msg pubsub.Message) error {
if curr.Pending == 0 {
// INDEX: We have all the transactions we expect for the current block.
for _, sink := range is.eventSinks {
start := time.Now()
if err := sink.IndexBlockEvents(is.currentBlock.header); err != nil {
is.Logger.Error("failed to index block header",
"height", is.currentBlock.height, "err", err)
} else {
is.metrics.BlockEventsSeconds.Observe(time.Since(start).Seconds())
is.metrics.BlocksIndexed.Add(1)
is.Logger.Debug("indexed block",
"height", is.currentBlock.height, "sink", sink.Type())
}
if curr.Size() != 0 {
start := time.Now()
err := sink.IndexTxEvents(curr.Ops)
if err != nil {
is.Logger.Error("failed to index block txs",
"height", is.currentBlock.height, "err", err)
} else {
is.metrics.TxEventsSeconds.Observe(time.Since(start).Seconds())
is.metrics.TransactionsIndexed.Add(float64(curr.Size()))
is.Logger.Debug("indexed txs",
"height", is.currentBlock.height, "sink", sink.Type())
}
@ -122,6 +147,14 @@ func (is *Service) OnStop() {
}
}
// ServiceArgs are arguments for constructing a new indexer service.
type ServiceArgs struct {
Sinks []EventSink
EventBus *eventbus.EventBus
Metrics *Metrics
Logger log.Logger
}
// KVSinkEnabled returns the given eventSinks is containing KVEventSink.
func KVSinkEnabled(sinks []EventSink) bool {
for _, sink := range sinks {


+ 73
- 0
internal/state/indexer/metrics.go View File

@ -0,0 +1,73 @@
package indexer
import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
// MetricsSubsystem is a the subsystem label for the indexer package.
const MetricsSubsystem = "indexer"
// Metrics contains metrics exposed by this package.
type Metrics struct {
// Latency for indexing block events.
BlockEventsSeconds metrics.Histogram
// Latency for indexing transaction events.
TxEventsSeconds metrics.Histogram
// Number of complete blocks indexed.
BlocksIndexed metrics.Counter
// Number of transactions indexed.
TransactionsIndexed metrics.Counter
}
// PrometheusMetrics returns Metrics build using Prometheus client library.
// Optionally, labels can be provided along with their values ("foo",
// "fooValue").
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
BlockEventsSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_events_seconds",
Help: "Latency for indexing block events.",
}, labels).With(labelsAndValues...),
TxEventsSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "tx_events_seconds",
Help: "Latency for indexing transaction events.",
}, labels).With(labelsAndValues...),
BlocksIndexed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "blocks_indexed",
Help: "Number of complete blocks indexed.",
}, labels).With(labelsAndValues...),
TransactionsIndexed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "transactions_indexed",
Help: "Number of transactions indexed.",
}, labels).With(labelsAndValues...),
}
}
// NopMetrics returns an indexer metrics stub that discards all samples.
func NopMetrics() *Metrics {
return &Metrics{
BlockEventsSeconds: discard.NewHistogram(),
TxEventsSeconds: discard.NewHistogram(),
BlocksIndexed: discard.NewCounter(),
TransactionsIndexed: discard.NewCounter(),
}
}

+ 19
- 15
node/node.go View File

@ -171,7 +171,8 @@ func makeNode(cfg *config.Config,
return nil, combineCloseError(err, makeCloser(closers))
}
indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus, logger, genDoc.ChainID)
indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus,
logger, genDoc.ChainID, nodeMetrics.indexer)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
@ -900,11 +901,12 @@ func defaultGenesisDocProviderFunc(cfg *config.Config) genesisDocProvider {
type nodeMetrics struct {
consensus *consensus.Metrics
p2p *p2p.Metrics
indexer *indexer.Metrics
mempool *mempool.Metrics
p2p *p2p.Metrics
proxy *proxy.Metrics
state *sm.Metrics
statesync *statesync.Metrics
proxy *proxy.Metrics
}
// metricsProvider returns consensus, p2p, mempool, state, statesync Metrics.
@ -916,21 +918,23 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider {
return func(chainID string) *nodeMetrics {
if cfg.Prometheus {
return &nodeMetrics{
consensus.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
p2p.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
mempool.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
sm.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
statesync.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
proxy.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
consensus: consensus.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
indexer: indexer.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
mempool: mempool.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
p2p: p2p.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
proxy: proxy.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
state: sm.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
statesync: statesync.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
}
}
return &nodeMetrics{
consensus.NopMetrics(),
p2p.NopMetrics(),
mempool.NopMetrics(),
sm.NopMetrics(),
statesync.NopMetrics(),
proxy.NopMetrics(),
consensus: consensus.NopMetrics(),
indexer: indexer.NopMetrics(),
mempool: mempool.NopMetrics(),
p2p: p2p.NopMetrics(),
proxy: proxy.NopMetrics(),
state: sm.NopMetrics(),
statesync: statesync.NopMetrics(),
}
}
}


+ 2
- 1
node/node_test.go View File

@ -558,7 +558,8 @@ func TestNodeSetEventSink(t *testing.T) {
require.NoError(t, err)
indexService, eventSinks, err := createAndStartIndexerService(cfg,
config.DefaultDBProvider, eventBus, logger, genDoc.ChainID)
config.DefaultDBProvider, eventBus, logger, genDoc.ChainID,
indexer.NopMetrics())
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, indexService.Stop()) })
return eventSinks


+ 7
- 2
node/setup.go View File

@ -113,14 +113,19 @@ func createAndStartIndexerService(
eventBus *eventbus.EventBus,
logger log.Logger,
chainID string,
metrics *indexer.Metrics,
) (*indexer.Service, []indexer.EventSink, error) {
eventSinks, err := sink.EventSinksFromConfig(cfg, dbProvider, chainID)
if err != nil {
return nil, nil, err
}
indexerService := indexer.NewIndexerService(eventSinks, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))
indexerService := indexer.NewService(indexer.ServiceArgs{
Sinks: eventSinks,
EventBus: eventBus,
Logger: logger.With("module", "txindex"),
Metrics: metrics,
})
if err := indexerService.Start(); err != nil {
return nil, nil, err


Loading…
Cancel
Save