Browse Source

statesync/rpc: metrics for the statesync and the rpc SyncInfo (#6795)

pull/6970/head
JayT106 3 years ago
committed by GitHub
parent
commit
84ffaaaf37
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 500 additions and 52 deletions
  1. +13
    -0
      internal/statesync/chunks.go
  2. +28
    -18
      internal/statesync/chunks_test.go
  3. +91
    -0
      internal/statesync/metrics.go
  4. +112
    -0
      internal/statesync/mocks/Metricer.go
  5. +95
    -0
      internal/statesync/reactor.go
  6. +14
    -0
      internal/statesync/reactor_test.go
  7. +22
    -4
      internal/statesync/syncer.go
  8. +23
    -3
      internal/statesync/syncer_test.go
  9. +34
    -15
      node/node.go
  10. +21
    -6
      rpc/client/mock/status_test.go
  11. +8
    -6
      rpc/core/env.go
  12. +10
    -0
      rpc/core/status.go
  13. +8
    -0
      rpc/core/types/responses.go
  14. +21
    -0
      rpc/openapi/openapi.yaml

+ 13
- 0
internal/statesync/chunks.go View File

@ -355,3 +355,16 @@ func (q *chunkQueue) WaitFor(index uint32) <-chan uint32 {
return ch return ch
} }
func (q *chunkQueue) numChunksReturned() int {
q.Lock()
defer q.Unlock()
cnt := 0
for _, b := range q.chunkReturned {
if b {
cnt++
}
}
return cnt
}

+ 28
- 18
internal/statesync/chunks_test.go View File

@ -421,15 +421,7 @@ func TestChunkQueue_Retry(t *testing.T) {
queue, teardown := setupChunkQueue(t) queue, teardown := setupChunkQueue(t)
defer teardown() defer teardown()
// Allocate and add all chunks to the queue
for i := uint32(0); i < queue.Size(); i++ {
_, err := queue.Allocate()
require.NoError(t, err)
_, err = queue.Add(&chunk{Height: 3, Format: 1, Index: i, Chunk: []byte{byte(i)}})
require.NoError(t, err)
_, err = queue.Next()
require.NoError(t, err)
}
allocateAddChunksToQueue(t, queue)
// Retrying a couple of chunks makes Next() return them, but they are not allocatable // Retrying a couple of chunks makes Next() return them, but they are not allocatable
queue.Retry(3) queue.Retry(3)
@ -454,15 +446,7 @@ func TestChunkQueue_RetryAll(t *testing.T) {
queue, teardown := setupChunkQueue(t) queue, teardown := setupChunkQueue(t)
defer teardown() defer teardown()
// Allocate and add all chunks to the queue
for i := uint32(0); i < queue.Size(); i++ {
_, err := queue.Allocate()
require.NoError(t, err)
_, err = queue.Add(&chunk{Height: 3, Format: 1, Index: i, Chunk: []byte{byte(i)}})
require.NoError(t, err)
_, err = queue.Next()
require.NoError(t, err)
}
allocateAddChunksToQueue(t, queue)
_, err := queue.Next() _, err := queue.Next()
assert.Equal(t, errDone, err) assert.Equal(t, errDone, err)
@ -552,3 +536,29 @@ func TestChunkQueue_WaitFor(t *testing.T) {
_, ok = <-w _, ok = <-w
assert.False(t, ok) assert.False(t, ok)
} }
func TestNumChunkReturned(t *testing.T) {
queue, teardown := setupChunkQueue(t)
defer teardown()
assert.EqualValues(t, 5, queue.Size())
allocateAddChunksToQueue(t, queue)
assert.EqualValues(t, 5, queue.numChunksReturned())
err := queue.Close()
require.NoError(t, err)
}
// Allocate and add all chunks to the queue
func allocateAddChunksToQueue(t *testing.T, q *chunkQueue) {
t.Helper()
for i := uint32(0); i < q.Size(); i++ {
_, err := q.Allocate()
require.NoError(t, err)
_, err = q.Add(&chunk{Height: 3, Format: 1, Index: i, Chunk: []byte{byte(i)}})
require.NoError(t, err)
_, err = q.Next()
require.NoError(t, err)
}
}

+ 91
- 0
internal/statesync/metrics.go View File

@ -0,0 +1,91 @@
package statesync
import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this package.
MetricsSubsystem = "statesync"
)
// Metrics contains metrics exposed by this package.
type Metrics struct {
TotalSnapshots metrics.Counter
ChunkProcessAvgTime metrics.Gauge
SnapshotHeight metrics.Gauge
SnapshotChunk metrics.Counter
SnapshotChunkTotal metrics.Gauge
BackFilledBlocks metrics.Counter
BackFillBlocksTotal metrics.Gauge
}
// PrometheusMetrics returns Metrics build using Prometheus client library.
// Optionally, labels can be provided along with their values ("foo",
// "fooValue").
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
TotalSnapshots: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "total_snapshots",
Help: "The total number of snapshots discovered.",
}, labels).With(labelsAndValues...),
ChunkProcessAvgTime: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "chunk_process_avg_time",
Help: "The average processing time per chunk.",
}, labels).With(labelsAndValues...),
SnapshotHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "snapshot_height",
Help: "The height of the current snapshot the has been processed.",
}, labels).With(labelsAndValues...),
SnapshotChunk: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "snapshot_chunk",
Help: "The current number of chunks that have been processed.",
}, labels).With(labelsAndValues...),
SnapshotChunkTotal: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "snapshot_chunks_total",
Help: "The total number of chunks in the current snapshot.",
}, labels).With(labelsAndValues...),
BackFilledBlocks: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "backfilled_blocks",
Help: "The current number of blocks that have been back-filled.",
}, labels).With(labelsAndValues...),
BackFillBlocksTotal: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "backfilled_blocks_total",
Help: "The total number of blocks that need to be back-filled.",
}, labels).With(labelsAndValues...),
}
}
// NopMetrics returns no-op Metrics.
func NopMetrics() *Metrics {
return &Metrics{
TotalSnapshots: discard.NewCounter(),
ChunkProcessAvgTime: discard.NewGauge(),
SnapshotHeight: discard.NewGauge(),
SnapshotChunk: discard.NewCounter(),
SnapshotChunkTotal: discard.NewGauge(),
BackFilledBlocks: discard.NewCounter(),
BackFillBlocksTotal: discard.NewGauge(),
}
}

+ 112
- 0
internal/statesync/mocks/Metricer.go View File

@ -0,0 +1,112 @@
// Code generated by mockery 2.9.4. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
time "time"
)
// Metricer is an autogenerated mock type for the Metricer type
type Metricer struct {
mock.Mock
}
// BackFillBlocksTotal provides a mock function with given fields:
func (_m *Metricer) BackFillBlocksTotal() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// BackFilledBlocks provides a mock function with given fields:
func (_m *Metricer) BackFilledBlocks() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// ChunkProcessAvgTime provides a mock function with given fields:
func (_m *Metricer) ChunkProcessAvgTime() time.Duration {
ret := _m.Called()
var r0 time.Duration
if rf, ok := ret.Get(0).(func() time.Duration); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(time.Duration)
}
return r0
}
// SnapshotChunksCount provides a mock function with given fields:
func (_m *Metricer) SnapshotChunksCount() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// SnapshotChunksTotal provides a mock function with given fields:
func (_m *Metricer) SnapshotChunksTotal() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// SnapshotHeight provides a mock function with given fields:
func (_m *Metricer) SnapshotHeight() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// TotalSnapshots provides a mock function with given fields:
func (_m *Metricer) TotalSnapshots() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}

+ 95
- 0
internal/statesync/reactor.go View File

@ -125,6 +125,18 @@ const (
maxLightBlockRequestRetries = 20 maxLightBlockRequestRetries = 20
) )
// Metricer defines an interface used for the rpc sync info query, please see statesync.metrics
// for the details.
type Metricer interface {
TotalSnapshots() int64
ChunkProcessAvgTime() time.Duration
SnapshotHeight() int64
SnapshotChunksCount() int64
SnapshotChunksTotal() int64
BackFilledBlocks() int64
BackFillBlocksTotal() int64
}
// Reactor handles state sync, both restoring snapshots for the local node and // Reactor handles state sync, both restoring snapshots for the local node and
// serving snapshots for other nodes. // serving snapshots for other nodes.
type Reactor struct { type Reactor struct {
@ -158,6 +170,10 @@ type Reactor struct {
syncer *syncer syncer *syncer
providers map[types.NodeID]*BlockProvider providers map[types.NodeID]*BlockProvider
stateProvider StateProvider stateProvider StateProvider
metrics *Metrics
backfillBlockTotal int64
backfilledBlocks int64
} }
// NewReactor returns a reference to a new state sync reactor, which implements // NewReactor returns a reference to a new state sync reactor, which implements
@ -176,6 +192,7 @@ func NewReactor(
stateStore sm.Store, stateStore sm.Store,
blockStore *store.BlockStore, blockStore *store.BlockStore,
tempDir string, tempDir string,
ssMetrics *Metrics,
) *Reactor { ) *Reactor {
r := &Reactor{ r := &Reactor{
chainID: chainID, chainID: chainID,
@ -195,6 +212,7 @@ func NewReactor(
peers: newPeerList(), peers: newPeerList(),
dispatcher: NewDispatcher(blockCh.Out), dispatcher: NewDispatcher(blockCh.Out),
providers: make(map[types.NodeID]*BlockProvider), providers: make(map[types.NodeID]*BlockProvider),
metrics: ssMetrics,
} }
r.BaseService = *service.NewBaseService(logger, "StateSync", r) r.BaseService = *service.NewBaseService(logger, "StateSync", r)
@ -271,6 +289,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
r.snapshotCh.Out, r.snapshotCh.Out,
r.chunkCh.Out, r.chunkCh.Out,
r.tempDir, r.tempDir,
r.metrics,
) )
r.mtx.Unlock() r.mtx.Unlock()
defer func() { defer func() {
@ -347,6 +366,9 @@ func (r *Reactor) backfill(
r.Logger.Info("starting backfill process...", "startHeight", startHeight, r.Logger.Info("starting backfill process...", "startHeight", startHeight,
"stopHeight", stopHeight, "stopTime", stopTime, "trustedBlockID", trustedBlockID) "stopHeight", stopHeight, "stopTime", stopTime, "trustedBlockID", trustedBlockID)
r.backfillBlockTotal = startHeight - stopHeight + 1
r.metrics.BackFillBlocksTotal.Set(float64(r.backfillBlockTotal))
const sleepTime = 1 * time.Second const sleepTime = 1 * time.Second
var ( var (
lastValidatorSet *types.ValidatorSet lastValidatorSet *types.ValidatorSet
@ -481,6 +503,16 @@ func (r *Reactor) backfill(
lastValidatorSet = resp.block.ValidatorSet lastValidatorSet = resp.block.ValidatorSet
r.backfilledBlocks++
r.metrics.BackFilledBlocks.Add(1)
// The block height might be less than the stopHeight because of the stopTime condition
// hasn't been fulfilled.
if resp.block.Height < stopHeight {
r.backfillBlockTotal++
r.metrics.BackFillBlocksTotal.Set(float64(r.backfillBlockTotal))
}
case <-queue.done(): case <-queue.done():
if err := queue.error(); err != nil { if err := queue.error(); err != nil {
return err return err
@ -1005,3 +1037,66 @@ func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initial
} }
return nil return nil
} }
func (r *Reactor) TotalSnapshots() int64 {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.syncer != nil && r.syncer.snapshots != nil {
return int64(len(r.syncer.snapshots.snapshots))
}
return 0
}
func (r *Reactor) ChunkProcessAvgTime() time.Duration {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.syncer != nil {
return time.Duration(r.syncer.avgChunkTime)
}
return time.Duration(0)
}
func (r *Reactor) SnapshotHeight() int64 {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.syncer != nil {
return r.syncer.lastSyncedSnapshotHeight
}
return 0
}
func (r *Reactor) SnapshotChunksCount() int64 {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.syncer != nil && r.syncer.chunks != nil {
return int64(r.syncer.chunks.numChunksReturned())
}
return 0
}
func (r *Reactor) SnapshotChunksTotal() int64 {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.syncer != nil && r.syncer.processingSnapshot != nil {
return int64(r.syncer.processingSnapshot.Chunks)
}
return 0
}
func (r *Reactor) BackFilledBlocks() int64 {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.backfilledBlocks
}
func (r *Reactor) BackFillBlocksTotal() int64 {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.backfillBlockTotal
}

+ 14
- 0
internal/statesync/reactor_test.go View File

@ -29,6 +29,10 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
var (
m = PrometheusMetrics(config.TestConfig().Instrumentation.Namespace)
)
type reactorTestSuite struct { type reactorTestSuite struct {
reactor *Reactor reactor *Reactor
syncer *syncer syncer *syncer
@ -156,6 +160,7 @@ func setup(
rts.stateStore, rts.stateStore,
rts.blockStore, rts.blockStore,
"", "",
m,
) )
rts.syncer = newSyncer( rts.syncer = newSyncer(
@ -167,6 +172,7 @@ func setup(
rts.snapshotOutCh, rts.snapshotOutCh,
rts.chunkOutCh, rts.chunkOutCh,
"", "",
rts.reactor.metrics,
) )
require.NoError(t, rts.reactor.Start()) require.NoError(t, rts.reactor.Start())
@ -596,6 +602,9 @@ func TestReactor_Backfill(t *testing.T) {
) )
if failureRate > 3 { if failureRate > 3 {
require.Error(t, err) require.Error(t, err)
require.NotEqual(t, rts.reactor.backfilledBlocks, rts.reactor.backfillBlockTotal)
require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfillBlockTotal)
} else { } else {
require.NoError(t, err) require.NoError(t, err)
@ -606,7 +615,12 @@ func TestReactor_Backfill(t *testing.T) {
require.Nil(t, rts.blockStore.LoadBlockMeta(stopHeight-1)) require.Nil(t, rts.blockStore.LoadBlockMeta(stopHeight-1))
require.Nil(t, rts.blockStore.LoadBlockMeta(startHeight+1)) require.Nil(t, rts.blockStore.LoadBlockMeta(startHeight+1))
require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfilledBlocks)
require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfillBlockTotal)
} }
require.Equal(t, rts.reactor.backfilledBlocks, rts.reactor.BackFilledBlocks())
require.Equal(t, rts.reactor.backfillBlockTotal, rts.reactor.BackFillBlocksTotal())
}) })
} }
} }


+ 22
- 4
internal/statesync/syncer.go View File

@ -63,8 +63,13 @@ type syncer struct {
fetchers int32 fetchers int32
retryTimeout time.Duration retryTimeout time.Duration
mtx tmsync.RWMutex
chunks *chunkQueue
mtx tmsync.RWMutex
chunks *chunkQueue
metrics *Metrics
avgChunkTime int64
lastSyncedSnapshotHeight int64
processingSnapshot *snapshot
} }
// newSyncer creates a new syncer. // newSyncer creates a new syncer.
@ -76,6 +81,7 @@ func newSyncer(
stateProvider StateProvider, stateProvider StateProvider,
snapshotCh, chunkCh chan<- p2p.Envelope, snapshotCh, chunkCh chan<- p2p.Envelope,
tempDir string, tempDir string,
metrics *Metrics,
) *syncer { ) *syncer {
return &syncer{ return &syncer{
logger: logger, logger: logger,
@ -88,6 +94,7 @@ func newSyncer(
tempDir: tempDir, tempDir: tempDir,
fetchers: cfg.Fetchers, fetchers: cfg.Fetchers,
retryTimeout: cfg.ChunkRequestTimeout, retryTimeout: cfg.ChunkRequestTimeout,
metrics: metrics,
} }
} }
@ -121,6 +128,7 @@ func (s *syncer) AddSnapshot(peerID types.NodeID, snapshot *snapshot) (bool, err
return false, err return false, err
} }
if added { if added {
s.metrics.TotalSnapshots.Add(1)
s.logger.Info("Discovered new snapshot", "height", snapshot.Height, "format", snapshot.Format, s.logger.Info("Discovered new snapshot", "height", snapshot.Height, "format", snapshot.Format,
"hash", snapshot.Hash) "hash", snapshot.Hash)
} }
@ -190,9 +198,14 @@ func (s *syncer) SyncAny(
defer chunks.Close() // in case we forget to close it elsewhere defer chunks.Close() // in case we forget to close it elsewhere
} }
s.processingSnapshot = snapshot
s.metrics.SnapshotChunkTotal.Set(float64(snapshot.Chunks))
newState, commit, err := s.Sync(ctx, snapshot, chunks) newState, commit, err := s.Sync(ctx, snapshot, chunks)
switch { switch {
case err == nil: case err == nil:
s.metrics.SnapshotHeight.Set(float64(snapshot.Height))
s.lastSyncedSnapshotHeight = int64(snapshot.Height)
return newState, commit, nil return newState, commit, nil
case errors.Is(err, errAbort): case errors.Is(err, errAbort):
@ -237,6 +250,7 @@ func (s *syncer) SyncAny(
} }
snapshot = nil snapshot = nil
chunks = nil chunks = nil
s.processingSnapshot = nil
} }
} }
@ -286,6 +300,7 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu
// Spawn chunk fetchers. They will terminate when the chunk queue is closed or context canceled. // Spawn chunk fetchers. They will terminate when the chunk queue is closed or context canceled.
fetchCtx, cancel := context.WithCancel(ctx) fetchCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
fetchStartTime := time.Now()
for i := int32(0); i < s.fetchers; i++ { for i := int32(0); i < s.fetchers; i++ {
go s.fetchChunks(fetchCtx, snapshot, chunks) go s.fetchChunks(fetchCtx, snapshot, chunks)
} }
@ -324,7 +339,7 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu
} }
// Restore snapshot // Restore snapshot
err = s.applyChunks(ctx, chunks)
err = s.applyChunks(ctx, chunks, fetchStartTime)
if err != nil { if err != nil {
return sm.State{}, nil, err return sm.State{}, nil, err
} }
@ -381,7 +396,7 @@ func (s *syncer) offerSnapshot(ctx context.Context, snapshot *snapshot) error {
// applyChunks applies chunks to the app. It returns various errors depending on the app's // applyChunks applies chunks to the app. It returns various errors depending on the app's
// response, or nil once the snapshot is fully restored. // response, or nil once the snapshot is fully restored.
func (s *syncer) applyChunks(ctx context.Context, chunks *chunkQueue) error {
func (s *syncer) applyChunks(ctx context.Context, chunks *chunkQueue, start time.Time) error {
for { for {
chunk, err := chunks.Next() chunk, err := chunks.Next()
if err == errDone { if err == errDone {
@ -423,6 +438,9 @@ func (s *syncer) applyChunks(ctx context.Context, chunks *chunkQueue) error {
switch resp.Result { switch resp.Result {
case abci.ResponseApplySnapshotChunk_ACCEPT: case abci.ResponseApplySnapshotChunk_ACCEPT:
s.metrics.SnapshotChunk.Add(1)
s.avgChunkTime = time.Since(start).Nanoseconds() / int64(chunks.numChunksReturned())
s.metrics.ChunkProcessAvgTime.Set(float64(s.avgChunkTime))
case abci.ResponseApplySnapshotChunk_ABORT: case abci.ResponseApplySnapshotChunk_ABORT:
return errAbort return errAbort
case abci.ResponseApplySnapshotChunk_RETRY: case abci.ResponseApplySnapshotChunk_RETRY:


+ 23
- 3
internal/statesync/syncer_test.go View File

@ -70,6 +70,8 @@ func TestSyncer_SyncAny(t *testing.T) {
peerCID := types.NodeID("cc") peerCID := types.NodeID("cc")
rts := setup(t, connSnapshot, connQuery, stateProvider, 3) rts := setup(t, connSnapshot, connQuery, stateProvider, 3)
rts.reactor.syncer = rts.syncer
// Adding a chunk should error when no sync is in progress // Adding a chunk should error when no sync is in progress
_, err := rts.syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}}) _, err := rts.syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}})
require.Error(t, err) require.Error(t, err)
@ -195,6 +197,16 @@ func TestSyncer_SyncAny(t *testing.T) {
require.Equal(t, expectState, newState) require.Equal(t, expectState, newState)
require.Equal(t, commit, lastCommit) require.Equal(t, commit, lastCommit)
require.Equal(t, len(chunks), int(rts.syncer.processingSnapshot.Chunks))
require.Equal(t, expectState.LastBlockHeight, rts.syncer.lastSyncedSnapshotHeight)
require.True(t, rts.syncer.avgChunkTime > 0)
require.Equal(t, int64(rts.syncer.processingSnapshot.Chunks), rts.reactor.SnapshotChunksTotal())
require.Equal(t, rts.syncer.lastSyncedSnapshotHeight, rts.reactor.SnapshotHeight())
require.Equal(t, time.Duration(rts.syncer.avgChunkTime), rts.reactor.ChunkProcessAvgTime())
require.Equal(t, int64(len(rts.syncer.snapshots.snapshots)), rts.reactor.TotalSnapshots())
require.Equal(t, int64(0), rts.reactor.SnapshotChunksCount())
connSnapshot.AssertExpectations(t) connSnapshot.AssertExpectations(t)
connQuery.AssertExpectations(t) connQuery.AssertExpectations(t)
} }
@ -448,6 +460,9 @@ func TestSyncer_applyChunks_Results(t *testing.T) {
body := []byte{1, 2, 3} body := []byte{1, 2, 3}
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "") chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "")
require.NoError(t, err) require.NoError(t, err)
fetchStartTime := time.Now()
_, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: body}) _, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: body})
require.NoError(t, err) require.NoError(t, err)
@ -461,7 +476,7 @@ func TestSyncer_applyChunks_Results(t *testing.T) {
Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
} }
err = rts.syncer.applyChunks(ctx, chunks)
err = rts.syncer.applyChunks(ctx, chunks, fetchStartTime)
if tc.expectErr == unknownErr { if tc.expectErr == unknownErr {
require.Error(t, err) require.Error(t, err)
} else { } else {
@ -498,6 +513,9 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "") chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "")
require.NoError(t, err) require.NoError(t, err)
fetchStartTime := time.Now()
added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}}) added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}})
require.True(t, added) require.True(t, added)
require.NoError(t, err) require.NoError(t, err)
@ -526,7 +544,7 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
// check the queue contents, and finally close the queue to end the goroutine. // check the queue contents, and finally close the queue to end the goroutine.
// We don't really care about the result of applyChunks, since it has separate test. // We don't really care about the result of applyChunks, since it has separate test.
go func() { go func() {
rts.syncer.applyChunks(ctx, chunks) //nolint:errcheck // purposefully ignore error
rts.syncer.applyChunks(ctx, chunks, fetchStartTime) //nolint:errcheck // purposefully ignore error
}() }()
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
@ -588,6 +606,8 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
chunks, err := newChunkQueue(s1, "") chunks, err := newChunkQueue(s1, "")
require.NoError(t, err) require.NoError(t, err)
fetchStartTime := time.Now()
added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}, Sender: peerAID}) added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}, Sender: peerAID})
require.True(t, added) require.True(t, added)
require.NoError(t, err) require.NoError(t, err)
@ -625,7 +645,7 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
// However, it will block on e.g. retry result, so we spawn a goroutine that will // However, it will block on e.g. retry result, so we spawn a goroutine that will
// be shut down when the chunk queue closes. // be shut down when the chunk queue closes.
go func() { go func() {
rts.syncer.applyChunks(ctx, chunks) //nolint:errcheck // purposefully ignore error
rts.syncer.applyChunks(ctx, chunks, fetchStartTime) //nolint:errcheck // purposefully ignore error
}() }()
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)


+ 34
- 15
node/node.go View File

@ -240,16 +240,17 @@ func makeNode(config *cfg.Config,
return nil, fmt.Errorf("failed to create peer manager: %w", err) return nil, fmt.Errorf("failed to create peer manager: %w", err)
} }
csMetrics, p2pMetrics, memplMetrics, smMetrics := defaultMetricsProvider(config.Instrumentation)(genDoc.ChainID)
nodeMetrics :=
defaultMetricsProvider(config.Instrumentation)(genDoc.ChainID)
router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey,
router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey,
peerManager, transport, getRouterConfig(config, proxyApp)) peerManager, transport, getRouterConfig(config, proxyApp))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create router: %w", err) return nil, fmt.Errorf("failed to create router: %w", err)
} }
mpReactorShim, mpReactor, mp, err := createMempoolReactor( mpReactorShim, mpReactor, mp, err := createMempoolReactor(
config, proxyApp, state, memplMetrics, peerManager, router, logger,
config, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger,
) )
if err != nil { if err != nil {
return nil, err return nil, err
@ -270,12 +271,12 @@ func makeNode(config *cfg.Config,
mp, mp,
evPool, evPool,
blockStore, blockStore,
sm.BlockExecutorWithMetrics(smMetrics),
sm.BlockExecutorWithMetrics(nodeMetrics.state),
) )
csReactorShim, csReactor, csState := createConsensusReactor( csReactorShim, csReactor, csState := createConsensusReactor(
config, state, blockExec, blockStore, mp, evPool, config, state, blockExec, blockStore, mp, evPool,
privValidator, csMetrics, stateSync || blockSync, eventBus,
privValidator, nodeMetrics.cs, stateSync || blockSync, eventBus,
peerManager, router, consensusLogger, peerManager, router, consensusLogger,
) )
@ -283,7 +284,7 @@ func makeNode(config *cfg.Config,
// doing a state sync first. // doing a state sync first.
bcReactorShim, bcReactor, err := createBlockchainReactor( bcReactorShim, bcReactor, err := createBlockchainReactor(
logger, config, state, blockExec, blockStore, csReactor, logger, config, state, blockExec, blockStore, csReactor,
peerManager, router, blockSync && !stateSync, csMetrics,
peerManager, router, blockSync && !stateSync, nodeMetrics.cs,
) )
if err != nil { if err != nil {
return nil, fmt.Errorf("could not create blockchain reactor: %w", err) return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
@ -300,9 +301,9 @@ func makeNode(config *cfg.Config,
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first. // Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them. // FIXME We need to update metrics here, since other reactors don't have access to them.
if stateSync { if stateSync {
csMetrics.StateSyncing.Set(1)
nodeMetrics.cs.StateSyncing.Set(1)
} else if blockSync { } else if blockSync {
csMetrics.BlockSyncing.Set(1)
nodeMetrics.cs.BlockSyncing.Set(1)
} }
// Set up state sync reactor, and schedule a sync if requested. // Set up state sync reactor, and schedule a sync if requested.
@ -342,6 +343,7 @@ func makeNode(config *cfg.Config,
stateStore, stateStore,
blockStore, blockStore,
config.StateSync.TempDir, config.StateSync.TempDir,
nodeMetrics.statesync,
) )
// add the channel descriptors to both the transports // add the channel descriptors to both the transports
@ -379,7 +381,7 @@ func makeNode(config *cfg.Config,
if config.P2P.UseLegacy { if config.P2P.UseLegacy {
// setup Transport and Switch // setup Transport and Switch
sw = createSwitch( sw = createSwitch(
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
config, transport, nodeMetrics.p2p, mpReactorShim, bcReactorForSwitch,
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
) )
@ -1035,20 +1037,37 @@ func defaultGenesisDocProviderFunc(config *cfg.Config) genesisDocProvider {
} }
} }
// metricsProvider returns a consensus, p2p and mempool Metrics.
type metricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempool.Metrics, *sm.Metrics)
type nodeMetrics struct {
cs *cs.Metrics
p2p *p2p.Metrics
mempool *mempool.Metrics
state *sm.Metrics
statesync *statesync.Metrics
}
// metricsProvider returns consensus, p2p, mempool, state, statesync Metrics.
type metricsProvider func(chainID string) *nodeMetrics
// defaultMetricsProvider returns Metrics build using Prometheus client library // defaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics. // if Prometheus is enabled. Otherwise, it returns no-op Metrics.
func defaultMetricsProvider(config *cfg.InstrumentationConfig) metricsProvider { func defaultMetricsProvider(config *cfg.InstrumentationConfig) metricsProvider {
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempool.Metrics, *sm.Metrics) {
return func(chainID string) *nodeMetrics {
if config.Prometheus { if config.Prometheus {
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
return &nodeMetrics{
cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID), p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
mempool.PrometheusMetrics(config.Namespace, "chain_id", chainID), mempool.PrometheusMetrics(config.Namespace, "chain_id", chainID),
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID)
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID),
statesync.PrometheusMetrics(config.Namespace, "chain_id", chainID),
}
}
return &nodeMetrics{
cs.NopMetrics(),
p2p.NopMetrics(),
mempool.NopMetrics(),
sm.NopMetrics(),
statesync.NopMetrics(),
} }
return cs.NopMetrics(), p2p.NopMetrics(), mempool.NopMetrics(), sm.NopMetrics()
} }
} }


+ 21
- 6
rpc/client/mock/status_test.go View File

@ -20,12 +20,19 @@ func TestStatus(t *testing.T) {
Call: mock.Call{ Call: mock.Call{
Response: &ctypes.ResultStatus{ Response: &ctypes.ResultStatus{
SyncInfo: ctypes.SyncInfo{ SyncInfo: ctypes.SyncInfo{
LatestBlockHash: bytes.HexBytes("block"),
LatestAppHash: bytes.HexBytes("app"),
LatestBlockHeight: 10,
MaxPeerBlockHeight: 20,
TotalSyncedTime: time.Second,
RemainingTime: time.Minute,
LatestBlockHash: bytes.HexBytes("block"),
LatestAppHash: bytes.HexBytes("app"),
LatestBlockHeight: 10,
MaxPeerBlockHeight: 20,
TotalSyncedTime: time.Second,
RemainingTime: time.Minute,
TotalSnapshots: 10,
ChunkProcessAvgTime: time.Duration(10),
SnapshotHeight: 10,
SnapshotChunksCount: 9,
SnapshotChunksTotal: 10,
BackFilledBlocks: 9,
BackFillBlocksTotal: 10,
}, },
}}, }},
} }
@ -56,4 +63,12 @@ func TestStatus(t *testing.T) {
assert.EqualValues(20, st.SyncInfo.MaxPeerBlockHeight) assert.EqualValues(20, st.SyncInfo.MaxPeerBlockHeight)
assert.EqualValues(time.Second, status.SyncInfo.TotalSyncedTime) assert.EqualValues(time.Second, status.SyncInfo.TotalSyncedTime)
assert.EqualValues(time.Minute, status.SyncInfo.RemainingTime) assert.EqualValues(time.Minute, status.SyncInfo.RemainingTime)
assert.EqualValues(10, st.SyncInfo.TotalSnapshots)
assert.EqualValues(time.Duration(10), st.SyncInfo.ChunkProcessAvgTime)
assert.EqualValues(10, st.SyncInfo.SnapshotHeight)
assert.EqualValues(9, status.SyncInfo.SnapshotChunksCount)
assert.EqualValues(10, status.SyncInfo.SnapshotChunksTotal)
assert.EqualValues(9, status.SyncInfo.BackFilledBlocks)
assert.EqualValues(10, status.SyncInfo.BackFillBlocksTotal)
} }

+ 8
- 6
rpc/core/env.go View File

@ -11,6 +11,7 @@ import (
mempl "github.com/tendermint/tendermint/internal/mempool" mempl "github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/proxy" "github.com/tendermint/tendermint/internal/proxy"
"github.com/tendermint/tendermint/internal/statesync"
tmjson "github.com/tendermint/tendermint/libs/json" tmjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
@ -91,12 +92,13 @@ type Environment struct {
PeerManager peerManager PeerManager peerManager
// objects // objects
PubKey crypto.PubKey
GenDoc *types.GenesisDoc // cache the genesis structure
EventSinks []indexer.EventSink
EventBus *types.EventBus // thread safe
Mempool mempl.Mempool
BlockSyncReactor consensus.BlockSyncReactor
PubKey crypto.PubKey
GenDoc *types.GenesisDoc // cache the genesis structure
EventSinks []indexer.EventSink
EventBus *types.EventBus // thread safe
Mempool mempl.Mempool
BlockSyncReactor consensus.BlockSyncReactor
StateSyncMetricer statesync.Metricer
Logger log.Logger Logger log.Logger


+ 10
- 0
rpc/core/status.go View File

@ -77,6 +77,16 @@ func (env *Environment) Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, err
ValidatorInfo: validatorInfo, ValidatorInfo: validatorInfo,
} }
if env.StateSyncMetricer != nil {
result.SyncInfo.TotalSnapshots = env.StateSyncMetricer.TotalSnapshots()
result.SyncInfo.ChunkProcessAvgTime = env.StateSyncMetricer.ChunkProcessAvgTime()
result.SyncInfo.SnapshotHeight = env.StateSyncMetricer.SnapshotHeight()
result.SyncInfo.SnapshotChunksCount = env.StateSyncMetricer.SnapshotChunksCount()
result.SyncInfo.SnapshotChunksTotal = env.StateSyncMetricer.SnapshotChunksTotal()
result.SyncInfo.BackFilledBlocks = env.StateSyncMetricer.BackFilledBlocks()
result.SyncInfo.BackFillBlocksTotal = env.StateSyncMetricer.BackFillBlocksTotal()
}
return result, nil return result, nil
} }


+ 8
- 0
rpc/core/types/responses.go View File

@ -100,6 +100,14 @@ type SyncInfo struct {
TotalSyncedTime time.Duration `json:"total_synced_time"` TotalSyncedTime time.Duration `json:"total_synced_time"`
RemainingTime time.Duration `json:"remaining_time"` RemainingTime time.Duration `json:"remaining_time"`
TotalSnapshots int64 `json:"total_snapshots"`
ChunkProcessAvgTime time.Duration `json:"chunk_process_avg_time"`
SnapshotHeight int64 `json:"snapshot_height"`
SnapshotChunksCount int64 `json:"snapshot_chunks_count"`
SnapshotChunksTotal int64 `json:"snapshot_chunks_total"`
BackFilledBlocks int64 `json:"backfilled_blocks"`
BackFillBlocksTotal int64 `json:"backfill_blocks_total"`
} }
// Info about the node's validator // Info about the node's validator


+ 21
- 0
rpc/openapi/openapi.yaml View File

@ -1396,6 +1396,27 @@ components:
remaining_time: remaining_time:
type: string type: string
example: "0" example: "0"
total_snapshots:
type: string
example: "10"
chunk_process_avg_time:
type: string
example: "1000000000"
snapshot_height:
type: string
example: "1262196"
snapshot_chunks_count:
type: string
example: "10"
snapshot_chunks_total:
type: string
example: "100"
backfilled_blocks:
type: string
example: "10"
backfill_blocks_total:
type: string
example: "100"
ValidatorInfo: ValidatorInfo:
type: object type: object
properties: properties:


Loading…
Cancel
Save