Browse Source

fastsync: update the metrics during fast-sync (#6590)

Closes #3507
pull/6612/head
JayT106 3 years ago
committed by GitHub
parent
commit
2b0a3c151b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 58 additions and 14 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +7
    -0
      internal/blockchain/v0/reactor.go
  3. +3
    -1
      internal/blockchain/v0/reactor_test.go
  4. +2
    -0
      internal/blockchain/v2/processor.go
  5. +13
    -1
      internal/blockchain/v2/processor_context.go
  6. +6
    -4
      internal/blockchain/v2/reactor.go
  7. +2
    -1
      internal/blockchain/v2/reactor_test.go
  8. +9
    -0
      internal/consensus/metrics.go
  9. +4
    -0
      internal/consensus/reactor.go
  10. +7
    -5
      internal/consensus/state.go
  11. +1
    -1
      node/node.go
  12. +3
    -1
      node/setup.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -132,3 +132,4 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- [evidence] \#6375 Fix bug with inconsistent LightClientAttackEvidence hashing (cmwaters) - [evidence] \#6375 Fix bug with inconsistent LightClientAttackEvidence hashing (cmwaters)
- [rpc] \#6507 fix RPC client doesn't handle url's without ports (@JayT106) - [rpc] \#6507 fix RPC client doesn't handle url's without ports (@JayT106)
- [statesync] \#6463 Adds Reverse Sync feature to fetch historical light blocks after state sync in order to verify any evidence (@cmwaters) - [statesync] \#6463 Adds Reverse Sync feature to fetch historical light blocks after state sync in order to verify any evidence (@cmwaters)
- [fastsync] \#6590 Update the metrics during fast-sync (@JayT106)

+ 7
- 0
internal/blockchain/v0/reactor.go View File

@ -6,6 +6,7 @@ import (
"time" "time"
bc "github.com/tendermint/tendermint/internal/blockchain" bc "github.com/tendermint/tendermint/internal/blockchain"
cons "github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/libs/service"
@ -97,6 +98,8 @@ type Reactor struct {
// requestRoutine spawned goroutines when stopping the reactor and before // requestRoutine spawned goroutines when stopping the reactor and before
// stopping the p2p Channel(s). // stopping the p2p Channel(s).
poolWG sync.WaitGroup poolWG sync.WaitGroup
metrics *cons.Metrics
} }
// NewReactor returns new reactor instance. // NewReactor returns new reactor instance.
@ -109,6 +112,7 @@ func NewReactor(
blockchainCh *p2p.Channel, blockchainCh *p2p.Channel,
peerUpdates *p2p.PeerUpdates, peerUpdates *p2p.PeerUpdates,
fastSync bool, fastSync bool,
metrics *cons.Metrics,
) (*Reactor, error) { ) (*Reactor, error) {
if state.LastBlockHeight != store.Height() { if state.LastBlockHeight != store.Height() {
return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()) return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())
@ -135,6 +139,7 @@ func NewReactor(
peerUpdates: peerUpdates, peerUpdates: peerUpdates,
peerUpdatesCh: make(chan p2p.Envelope), peerUpdatesCh: make(chan p2p.Envelope),
closeCh: make(chan struct{}), closeCh: make(chan struct{}),
metrics: metrics,
} }
r.BaseService = *service.NewBaseService(logger, "Blockchain", r) r.BaseService = *service.NewBaseService(logger, "Blockchain", r)
@ -560,6 +565,8 @@ FOR_LOOP:
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
} }
r.metrics.RecordConsMetrics(first)
blocksSynced++ blocksSynced++
if blocksSynced%100 == 0 { if blocksSynced%100 == 0 {


+ 3
- 1
internal/blockchain/v0/reactor_test.go View File

@ -9,6 +9,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
cons "github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/mempool/mock" "github.com/tendermint/tendermint/internal/mempool/mock"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/p2ptest" "github.com/tendermint/tendermint/internal/p2p/p2ptest"
@ -162,7 +163,8 @@ func (rts *reactorTestSuite) addNode(t *testing.T,
nil, nil,
rts.blockchainChannels[nodeID], rts.blockchainChannels[nodeID],
rts.peerUpdates[nodeID], rts.peerUpdates[nodeID],
rts.fastSync)
rts.fastSync,
cons.NopMetrics())
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, rts.reactors[nodeID].Start()) require.NoError(t, rts.reactors[nodeID].Start())


+ 2
- 0
internal/blockchain/v2/processor.go View File

@ -182,6 +182,8 @@ func (state *pcState) handle(event Event) (Event, error) {
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
} }
state.context.recordConsMetrics(first)
delete(state.queue, first.Height) delete(state.queue, first.Height)
state.blocksSynced++ state.blocksSynced++


+ 13
- 1
internal/blockchain/v2/processor_context.go View File

@ -3,6 +3,7 @@ package v2
import ( import (
"fmt" "fmt"
cons "github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -13,19 +14,22 @@ type processorContext interface {
saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit)
tmState() state.State tmState() state.State
setState(state.State) setState(state.State)
recordConsMetrics(block *types.Block)
} }
type pContext struct { type pContext struct {
store blockStore store blockStore
applier blockApplier applier blockApplier
state state.State state state.State
metrics *cons.Metrics
} }
func newProcessorContext(st blockStore, ex blockApplier, s state.State) *pContext {
func newProcessorContext(st blockStore, ex blockApplier, s state.State, m *cons.Metrics) *pContext {
return &pContext{ return &pContext{
store: st, store: st,
applier: ex, applier: ex,
state: s, state: s,
metrics: m,
} }
} }
@ -51,6 +55,10 @@ func (pc *pContext) saveBlock(block *types.Block, blockParts *types.PartSet, see
pc.store.SaveBlock(block, blockParts, seenCommit) pc.store.SaveBlock(block, blockParts, seenCommit)
} }
func (pc *pContext) recordConsMetrics(block *types.Block) {
pc.metrics.RecordConsMetrics(block)
}
type mockPContext struct { type mockPContext struct {
applicationBL []int64 applicationBL []int64
verificationBL []int64 verificationBL []int64
@ -98,3 +106,7 @@ func (mpc *mockPContext) setState(state state.State) {
func (mpc *mockPContext) tmState() state.State { func (mpc *mockPContext) tmState() state.State {
return mpc.state return mpc.state
} }
func (mpc *mockPContext) recordConsMetrics(block *types.Block) {
}

+ 6
- 4
internal/blockchain/v2/reactor.go View File

@ -9,6 +9,7 @@ import (
bc "github.com/tendermint/tendermint/internal/blockchain" bc "github.com/tendermint/tendermint/internal/blockchain"
"github.com/tendermint/tendermint/internal/blockchain/v2/internal/behavior" "github.com/tendermint/tendermint/internal/blockchain/v2/internal/behavior"
cons "github.com/tendermint/tendermint/internal/consensus"
tmsync "github.com/tendermint/tendermint/internal/libs/sync" tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
@ -55,13 +56,13 @@ type blockApplier interface {
// XXX: unify naming in this package around tmState // XXX: unify naming in this package around tmState
func newReactor(state state.State, store blockStore, reporter behavior.Reporter, func newReactor(state state.State, store blockStore, reporter behavior.Reporter,
blockApplier blockApplier, fastSync bool) *BlockchainReactor {
blockApplier blockApplier, fastSync bool, metrics *cons.Metrics) *BlockchainReactor {
initHeight := state.LastBlockHeight + 1 initHeight := state.LastBlockHeight + 1
if initHeight == 1 { if initHeight == 1 {
initHeight = state.InitialHeight initHeight = state.InitialHeight
} }
scheduler := newScheduler(initHeight, time.Now()) scheduler := newScheduler(initHeight, time.Now())
pContext := newProcessorContext(store, blockApplier, state)
pContext := newProcessorContext(store, blockApplier, state, metrics)
// TODO: Fix naming to just newProcesssor // TODO: Fix naming to just newProcesssor
// newPcState requires a processorContext // newPcState requires a processorContext
processor := newPcState(pContext) processor := newPcState(pContext)
@ -81,9 +82,10 @@ func NewBlockchainReactor(
state state.State, state state.State,
blockApplier blockApplier, blockApplier blockApplier,
store blockStore, store blockStore,
fastSync bool) *BlockchainReactor {
fastSync bool,
metrics *cons.Metrics) *BlockchainReactor {
reporter := behavior.NewMockReporter() reporter := behavior.NewMockReporter()
return newReactor(state, store, reporter, blockApplier, fastSync)
return newReactor(state, store, reporter, blockApplier, fastSync, metrics)
} }
// SetSwitch implements Reactor interface. // SetSwitch implements Reactor interface.


+ 2
- 1
internal/blockchain/v2/reactor_test.go View File

@ -16,6 +16,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/blockchain/v2/internal/behavior" "github.com/tendermint/tendermint/internal/blockchain/v2/internal/behavior"
cons "github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/mempool/mock" "github.com/tendermint/tendermint/internal/mempool/mock"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn" "github.com/tendermint/tendermint/internal/p2p/conn"
@ -175,7 +176,7 @@ func newTestReactor(t *testing.T, p testReactorParams) *BlockchainReactor {
require.NoError(t, err) require.NoError(t, err)
} }
r := newReactor(state, store, reporter, appl, true)
r := newReactor(state, store, reporter, appl, true, cons.NopMetrics())
logger := log.TestingLogger() logger := log.TestingLogger()
r.SetLogger(logger.With("module", "blockchain")) r.SetLogger(logger.With("module", "blockchain"))


+ 9
- 0
internal/consensus/metrics.go View File

@ -3,6 +3,7 @@ package consensus
import ( import (
"github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard" "github.com/go-kit/kit/metrics/discard"
"github.com/tendermint/tendermint/types"
prometheus "github.com/go-kit/kit/metrics/prometheus" prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus"
@ -218,3 +219,11 @@ func NopMetrics() *Metrics {
BlockParts: discard.NewCounter(), BlockParts: discard.NewCounter(),
} }
} }
// RecordConsMetrics uses for recording the block related metrics during fast-sync.
func (m *Metrics) RecordConsMetrics(block *types.Block) {
m.NumTxs.Set(float64(len(block.Data.Txs)))
m.TotalTxs.Add(float64(len(block.Data.Txs)))
m.BlockSizeBytes.Observe(float64(block.Size()))
m.CommittedHeight.Set(float64(block.Height))
}

+ 4
- 0
internal/consensus/reactor.go View File

@ -1403,3 +1403,7 @@ func (r *Reactor) peerStatsRoutine() {
} }
} }
} }
func (r *Reactor) GetConsensusState() *State {
return r.state
}

+ 7
- 5
internal/consensus/state.go View File

@ -1671,7 +1671,7 @@ func (cs *State) finalizeCommit(height int64) {
fail.Fail() // XXX fail.Fail() // XXX
// must be called before we update state // must be called before we update state
cs.recordMetrics(height, block)
cs.RecordMetrics(height, block)
// NewHeightStep! // NewHeightStep!
cs.updateToState(stateCopy) cs.updateToState(stateCopy)
@ -1693,7 +1693,7 @@ func (cs *State) finalizeCommit(height int64) {
// * cs.StartTime is set to when we will start round0. // * cs.StartTime is set to when we will start round0.
} }
func (cs *State) recordMetrics(height int64, block *types.Block) {
func (cs *State) RecordMetrics(height int64, block *types.Block) {
cs.metrics.Validators.Set(float64(cs.Validators.Size())) cs.metrics.Validators.Set(float64(cs.Validators.Size()))
cs.metrics.ValidatorsPower.Set(float64(cs.Validators.TotalVotingPower())) cs.metrics.ValidatorsPower.Set(float64(cs.Validators.TotalVotingPower()))
@ -1713,8 +1713,9 @@ func (cs *State) recordMetrics(height int64, block *types.Block) {
address types.Address address types.Address
) )
if commitSize != valSetLen { if commitSize != valSetLen {
panic(fmt.Sprintf("commit size (%d) doesn't match valset length (%d) at height %d\n\n%v\n\n%v",
cs.Logger.Error(fmt.Sprintf("commit size (%d) doesn't match valset length (%d) at height %d\n\n%v\n\n%v",
commitSize, valSetLen, block.Height, block.LastCommit.Signatures, cs.LastValidators.Validators)) commitSize, valSetLen, block.Height, block.LastCommit.Signatures, cs.LastValidators.Validators))
return
} }
if cs.privValidator != nil { if cs.privValidator != nil {
@ -1752,9 +1753,10 @@ func (cs *State) recordMetrics(height int64, block *types.Block) {
// NOTE: byzantine validators power and count is only for consensus evidence i.e. duplicate vote // NOTE: byzantine validators power and count is only for consensus evidence i.e. duplicate vote
var ( var (
byzantineValidatorsPower = int64(0)
byzantineValidatorsCount = int64(0)
byzantineValidatorsPower int64
byzantineValidatorsCount int64
) )
for _, ev := range block.Evidence.Evidence { for _, ev := range block.Evidence.Evidence {
if dve, ok := ev.(*types.DuplicateVoteEvidence); ok { if dve, ok := ev.(*types.DuplicateVoteEvidence); ok {
if _, val := cs.Validators.GetByAddress(dve.VoteA.ValidatorAddress); val != nil { if _, val := cs.Validators.GetByAddress(dve.VoteA.ValidatorAddress); val != nil {


+ 1
- 1
node/node.go View File

@ -290,7 +290,7 @@ func makeNode(config *cfg.Config,
// doing a state sync first. // doing a state sync first.
bcReactorShim, bcReactor, err := createBlockchainReactor( bcReactorShim, bcReactor, err := createBlockchainReactor(
logger, config, state, blockExec, blockStore, csReactor, logger, config, state, blockExec, blockStore, csReactor,
peerManager, router, fastSync && !stateSync,
peerManager, router, fastSync && !stateSync, csMetrics,
) )
if err != nil { if err != nil {
return nil, fmt.Errorf("could not create blockchain reactor: %w", err) return nil, fmt.Errorf("could not create blockchain reactor: %w", err)


+ 3
- 1
node/setup.go View File

@ -331,6 +331,7 @@ func createBlockchainReactor(
peerManager *p2p.PeerManager, peerManager *p2p.PeerManager,
router *p2p.Router, router *p2p.Router,
fastSync bool, fastSync bool,
metrics *cs.Metrics,
) (*p2p.ReactorShim, service.Service, error) { ) (*p2p.ReactorShim, service.Service, error) {
logger = logger.With("module", "blockchain") logger = logger.With("module", "blockchain")
@ -355,6 +356,7 @@ func createBlockchainReactor(
reactor, err := bcv0.NewReactor( reactor, err := bcv0.NewReactor(
logger, state.Copy(), blockExec, blockStore, csReactor, logger, state.Copy(), blockExec, blockStore, csReactor,
channels[bcv0.BlockchainChannel], peerUpdates, fastSync, channels[bcv0.BlockchainChannel], peerUpdates, fastSync,
metrics,
) )
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -363,7 +365,7 @@ func createBlockchainReactor(
return reactorShim, reactor, nil return reactorShim, reactor, nil
case cfg.BlockchainV2: case cfg.BlockchainV2:
reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync, metrics)
reactor.SetLogger(logger) reactor.SetLogger(logger)
return nil, reactor, nil return nil, reactor, nil


Loading…
Cancel
Save