diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 1f7ae9a47..0c9618226 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -132,3 +132,4 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - [evidence] \#6375 Fix bug with inconsistent LightClientAttackEvidence hashing (cmwaters) - [rpc] \#6507 fix RPC client doesn't handle url's without ports (@JayT106) - [statesync] \#6463 Adds Reverse Sync feature to fetch historical light blocks after state sync in order to verify any evidence (@cmwaters) +- [fastsync] \#6590 Update the metrics during fast-sync (@JayT106) diff --git a/internal/blockchain/v0/reactor.go b/internal/blockchain/v0/reactor.go index aa8208914..1fa997b17 100644 --- a/internal/blockchain/v0/reactor.go +++ b/internal/blockchain/v0/reactor.go @@ -6,6 +6,7 @@ import ( "time" bc "github.com/tendermint/tendermint/internal/blockchain" + cons "github.com/tendermint/tendermint/internal/consensus" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" @@ -97,6 +98,8 @@ type Reactor struct { // requestRoutine spawned goroutines when stopping the reactor and before // stopping the p2p Channel(s). poolWG sync.WaitGroup + + metrics *cons.Metrics } // NewReactor returns new reactor instance. @@ -109,6 +112,7 @@ func NewReactor( blockchainCh *p2p.Channel, peerUpdates *p2p.PeerUpdates, fastSync bool, + metrics *cons.Metrics, ) (*Reactor, error) { if state.LastBlockHeight != store.Height() { return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()) @@ -135,6 +139,7 @@ func NewReactor( peerUpdates: peerUpdates, peerUpdatesCh: make(chan p2p.Envelope), closeCh: make(chan struct{}), + metrics: metrics, } r.BaseService = *service.NewBaseService(logger, "Blockchain", r) @@ -560,6 +565,8 @@ FOR_LOOP: panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } + r.metrics.RecordConsMetrics(first) + blocksSynced++ if blocksSynced%100 == 0 { diff --git a/internal/blockchain/v0/reactor_test.go b/internal/blockchain/v0/reactor_test.go index 841a4d64c..6020d4145 100644 --- a/internal/blockchain/v0/reactor_test.go +++ b/internal/blockchain/v0/reactor_test.go @@ -9,6 +9,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" + cons "github.com/tendermint/tendermint/internal/consensus" "github.com/tendermint/tendermint/internal/mempool/mock" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/p2ptest" @@ -162,7 +163,8 @@ func (rts *reactorTestSuite) addNode(t *testing.T, nil, rts.blockchainChannels[nodeID], rts.peerUpdates[nodeID], - rts.fastSync) + rts.fastSync, + cons.NopMetrics()) require.NoError(t, err) require.NoError(t, rts.reactors[nodeID].Start()) diff --git a/internal/blockchain/v2/processor.go b/internal/blockchain/v2/processor.go index c74f9ca9f..8a8b0e4af 100644 --- a/internal/blockchain/v2/processor.go +++ b/internal/blockchain/v2/processor.go @@ -182,6 +182,8 @@ func (state *pcState) handle(event Event) (Event, error) { panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } + state.context.recordConsMetrics(first) + delete(state.queue, first.Height) state.blocksSynced++ diff --git a/internal/blockchain/v2/processor_context.go b/internal/blockchain/v2/processor_context.go index 7385bcc6e..bc6852565 100644 --- a/internal/blockchain/v2/processor_context.go +++ b/internal/blockchain/v2/processor_context.go @@ -3,6 +3,7 @@ package v2 import ( "fmt" + cons "github.com/tendermint/tendermint/internal/consensus" "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -13,19 +14,22 @@ type processorContext interface { saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) tmState() state.State setState(state.State) + recordConsMetrics(block *types.Block) } type pContext struct { store blockStore applier blockApplier state state.State + metrics *cons.Metrics } -func newProcessorContext(st blockStore, ex blockApplier, s state.State) *pContext { +func newProcessorContext(st blockStore, ex blockApplier, s state.State, m *cons.Metrics) *pContext { return &pContext{ store: st, applier: ex, state: s, + metrics: m, } } @@ -51,6 +55,10 @@ func (pc *pContext) saveBlock(block *types.Block, blockParts *types.PartSet, see pc.store.SaveBlock(block, blockParts, seenCommit) } +func (pc *pContext) recordConsMetrics(block *types.Block) { + pc.metrics.RecordConsMetrics(block) +} + type mockPContext struct { applicationBL []int64 verificationBL []int64 @@ -98,3 +106,7 @@ func (mpc *mockPContext) setState(state state.State) { func (mpc *mockPContext) tmState() state.State { return mpc.state } + +func (mpc *mockPContext) recordConsMetrics(block *types.Block) { + +} diff --git a/internal/blockchain/v2/reactor.go b/internal/blockchain/v2/reactor.go index 50c9fa565..43a18e1d2 100644 --- a/internal/blockchain/v2/reactor.go +++ b/internal/blockchain/v2/reactor.go @@ -9,6 +9,7 @@ import ( bc "github.com/tendermint/tendermint/internal/blockchain" "github.com/tendermint/tendermint/internal/blockchain/v2/internal/behavior" + cons "github.com/tendermint/tendermint/internal/consensus" tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/libs/log" @@ -55,13 +56,13 @@ type blockApplier interface { // XXX: unify naming in this package around tmState func newReactor(state state.State, store blockStore, reporter behavior.Reporter, - blockApplier blockApplier, fastSync bool) *BlockchainReactor { + blockApplier blockApplier, fastSync bool, metrics *cons.Metrics) *BlockchainReactor { initHeight := state.LastBlockHeight + 1 if initHeight == 1 { initHeight = state.InitialHeight } scheduler := newScheduler(initHeight, time.Now()) - pContext := newProcessorContext(store, blockApplier, state) + pContext := newProcessorContext(store, blockApplier, state, metrics) // TODO: Fix naming to just newProcesssor // newPcState requires a processorContext processor := newPcState(pContext) @@ -81,9 +82,10 @@ func NewBlockchainReactor( state state.State, blockApplier blockApplier, store blockStore, - fastSync bool) *BlockchainReactor { + fastSync bool, + metrics *cons.Metrics) *BlockchainReactor { reporter := behavior.NewMockReporter() - return newReactor(state, store, reporter, blockApplier, fastSync) + return newReactor(state, store, reporter, blockApplier, fastSync, metrics) } // SetSwitch implements Reactor interface. diff --git a/internal/blockchain/v2/reactor_test.go b/internal/blockchain/v2/reactor_test.go index 4dd661fc5..c99153fa4 100644 --- a/internal/blockchain/v2/reactor_test.go +++ b/internal/blockchain/v2/reactor_test.go @@ -16,6 +16,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/blockchain/v2/internal/behavior" + cons "github.com/tendermint/tendermint/internal/consensus" "github.com/tendermint/tendermint/internal/mempool/mock" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/conn" @@ -175,7 +176,7 @@ func newTestReactor(t *testing.T, p testReactorParams) *BlockchainReactor { require.NoError(t, err) } - r := newReactor(state, store, reporter, appl, true) + r := newReactor(state, store, reporter, appl, true, cons.NopMetrics()) logger := log.TestingLogger() r.SetLogger(logger.With("module", "blockchain")) diff --git a/internal/consensus/metrics.go b/internal/consensus/metrics.go index a4f56f631..5b4c47502 100644 --- a/internal/consensus/metrics.go +++ b/internal/consensus/metrics.go @@ -3,6 +3,7 @@ package consensus import ( "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/discard" + "github.com/tendermint/tendermint/types" prometheus "github.com/go-kit/kit/metrics/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus" @@ -218,3 +219,11 @@ func NopMetrics() *Metrics { BlockParts: discard.NewCounter(), } } + +// RecordConsMetrics uses for recording the block related metrics during fast-sync. +func (m *Metrics) RecordConsMetrics(block *types.Block) { + m.NumTxs.Set(float64(len(block.Data.Txs))) + m.TotalTxs.Add(float64(len(block.Data.Txs))) + m.BlockSizeBytes.Observe(float64(block.Size())) + m.CommittedHeight.Set(float64(block.Height)) +} diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 904ac44ea..0eabeef45 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -1403,3 +1403,7 @@ func (r *Reactor) peerStatsRoutine() { } } } + +func (r *Reactor) GetConsensusState() *State { + return r.state +} diff --git a/internal/consensus/state.go b/internal/consensus/state.go index e31628b33..964ca6563 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -1671,7 +1671,7 @@ func (cs *State) finalizeCommit(height int64) { fail.Fail() // XXX // must be called before we update state - cs.recordMetrics(height, block) + cs.RecordMetrics(height, block) // NewHeightStep! cs.updateToState(stateCopy) @@ -1693,7 +1693,7 @@ func (cs *State) finalizeCommit(height int64) { // * cs.StartTime is set to when we will start round0. } -func (cs *State) recordMetrics(height int64, block *types.Block) { +func (cs *State) RecordMetrics(height int64, block *types.Block) { cs.metrics.Validators.Set(float64(cs.Validators.Size())) cs.metrics.ValidatorsPower.Set(float64(cs.Validators.TotalVotingPower())) @@ -1713,8 +1713,9 @@ func (cs *State) recordMetrics(height int64, block *types.Block) { address types.Address ) if commitSize != valSetLen { - panic(fmt.Sprintf("commit size (%d) doesn't match valset length (%d) at height %d\n\n%v\n\n%v", + cs.Logger.Error(fmt.Sprintf("commit size (%d) doesn't match valset length (%d) at height %d\n\n%v\n\n%v", commitSize, valSetLen, block.Height, block.LastCommit.Signatures, cs.LastValidators.Validators)) + return } if cs.privValidator != nil { @@ -1752,9 +1753,10 @@ func (cs *State) recordMetrics(height int64, block *types.Block) { // NOTE: byzantine validators power and count is only for consensus evidence i.e. duplicate vote var ( - byzantineValidatorsPower = int64(0) - byzantineValidatorsCount = int64(0) + byzantineValidatorsPower int64 + byzantineValidatorsCount int64 ) + for _, ev := range block.Evidence.Evidence { if dve, ok := ev.(*types.DuplicateVoteEvidence); ok { if _, val := cs.Validators.GetByAddress(dve.VoteA.ValidatorAddress); val != nil { diff --git a/node/node.go b/node/node.go index 9b9444a5e..058233de1 100644 --- a/node/node.go +++ b/node/node.go @@ -290,7 +290,7 @@ func makeNode(config *cfg.Config, // doing a state sync first. bcReactorShim, bcReactor, err := createBlockchainReactor( logger, config, state, blockExec, blockStore, csReactor, - peerManager, router, fastSync && !stateSync, + peerManager, router, fastSync && !stateSync, csMetrics, ) if err != nil { return nil, fmt.Errorf("could not create blockchain reactor: %w", err) diff --git a/node/setup.go b/node/setup.go index dc12bf989..505ee476f 100644 --- a/node/setup.go +++ b/node/setup.go @@ -331,6 +331,7 @@ func createBlockchainReactor( peerManager *p2p.PeerManager, router *p2p.Router, fastSync bool, + metrics *cs.Metrics, ) (*p2p.ReactorShim, service.Service, error) { logger = logger.With("module", "blockchain") @@ -355,6 +356,7 @@ func createBlockchainReactor( reactor, err := bcv0.NewReactor( logger, state.Copy(), blockExec, blockStore, csReactor, channels[bcv0.BlockchainChannel], peerUpdates, fastSync, + metrics, ) if err != nil { return nil, nil, err @@ -363,7 +365,7 @@ func createBlockchainReactor( return reactorShim, reactor, nil case cfg.BlockchainV2: - reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync, metrics) reactor.SetLogger(logger) return nil, reactor, nil