From 19699d644fa9bcede24d5ce8ff153c0945bed395 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 15 Jun 2018 15:10:25 +0400 Subject: [PATCH] p2p metric, make height and totalTxs gauges --- blockchain/reactor_test.go | 2 +- consensus/byzantine_test.go | 2 +- consensus/metrics.go | 30 ++++--- consensus/state.go | 8 +- node/node.go | 152 +++++++++++++++++++----------------- p2p/metrics.go | 17 ++++ p2p/switch.go | 7 +- p2p/test_util.go | 2 +- 8 files changed, 128 insertions(+), 92 deletions(-) create mode 100644 p2p/metrics.go diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index c7f7e9afd..672cb83d6 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -42,7 +42,7 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe bcReactor.SetLogger(logger.With("module", "blockchain")) // Next: we need to set a switch in order for peers to be added in - bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig()) + bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig(), p2p.NopMetrics()) // Lastly: let's add some blocks in for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index d3be8c358..c0d2e636d 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -38,7 +38,7 @@ func TestByzantine(t *testing.T) { switches := make([]*p2p.Switch, N) p2pLogger := logger.With("module", "p2p") for i := 0; i < N; i++ { - switches[i] = p2p.NewSwitch(config.P2P) + switches[i] = p2p.NewSwitch(config.P2P, p2p.NopMetrics()) switches[i].SetLogger(p2pLogger.With("validator", i)) } diff --git a/consensus/metrics.go b/consensus/metrics.go index 169c2fd76..cd3bfc9b2 100644 --- a/consensus/metrics.go +++ b/consensus/metrics.go @@ -6,7 +6,8 @@ import "github.com/go-kit/kit/metrics/discard" // Metrics contains metrics exposed by this package. // see MetricsProvider for descriptions. type Metrics struct { - Height metrics.Counter + Height metrics.Gauge + Rounds metrics.Gauge Validators metrics.Gauge @@ -18,22 +19,29 @@ type Metrics struct { BlockIntervalSeconds metrics.Histogram - NumTxs metrics.Gauge - TotalTxs metrics.Counter - + NumTxs metrics.Gauge BlockSizeBytes metrics.Gauge + TotalTxs metrics.Gauge } // NopMetrics returns no-op Metrics. func NopMetrics() *Metrics { return &Metrics{ - Height: discard.NewCounter(), - Validators: discard.NewGauge(), - MissingValidators: discard.NewGauge(), - ByzantineValidators: discard.NewGauge(), + Height: discard.NewGauge(), + + Rounds: discard.NewGauge(), + + Validators: discard.NewGauge(), + ValidatorsPower: discard.NewGauge(), + MissingValidators: discard.NewGauge(), + MissingValidatorsPower: discard.NewGauge(), + ByzantineValidators: discard.NewGauge(), + ByzantineValidatorsPower: discard.NewGauge(), + BlockIntervalSeconds: discard.NewHistogram(), - NumTxs: discard.NewGauge(), - TotalTxs: discard.NewCounter(), - BlockSizeBytes: discard.NewGauge(), + + NumTxs: discard.NewGauge(), + BlockSizeBytes: discard.NewGauge(), + TotalTxs: discard.NewGauge(), } } diff --git a/consensus/state.go b/consensus/state.go index f078663dd..a92584532 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -391,7 +391,7 @@ func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *t // internal functions for managing the state func (cs *ConsensusState) updateHeight(height int64) { - cs.metrics.Height.Add(float64(height - cs.Height)) + cs.metrics.Height.Set(float64(height)) cs.Height = height } @@ -697,7 +697,6 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) { } logger.Info(cmn.Fmt("enterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) - cs.metrics.Rounds.Set(float64(round)) // Increment validators if necessary validators := cs.Validators @@ -724,6 +723,7 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) { cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping cs.eventBus.PublishEventNewRound(cs.RoundStateEvent()) + cs.metrics.Rounds.Set(float64(round)) // Wait for txs to be available in the mempool // before we enterPropose in round 0. If the last block changed the app hash, @@ -1282,6 +1282,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) { fail.Fail() // XXX + // must be called before we update state cs.recordMetrics(height, block) // NewHeightStep! @@ -1332,9 +1333,8 @@ func (cs *ConsensusState) recordMetrics(height int64, block *types.Block) { } cs.metrics.NumTxs.Set(float64(block.NumTxs)) - cs.metrics.TotalTxs.Add(float64(block.NumTxs)) - cs.metrics.BlockSizeBytes.Set(float64(block.Size())) + cs.metrics.TotalTxs.Set(float64(block.TotalTxs)) } //----------------------------------------------------------------------------- diff --git a/node/node.go b/node/node.go index 5630df6c0..a8671c9f8 100644 --- a/node/node.go +++ b/node/node.go @@ -90,79 +90,84 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { ) } -// MetricsProvider returns a consensus Metrics. -type MetricsProvider func() *cs.Metrics +// MetricsProvider returns a consensus and p2p Metrics. +type MetricsProvider func() (*cs.Metrics, *p2p.Metrics) -// DefaultMetrics returns a consensus Metrics build using Prometheus client -// library. -func DefaultMetricsProvider() *cs.Metrics { +// DefaultMetricsProvider returns a consensus and p2p Metrics build using +// Prometheus client library. +func DefaultMetricsProvider() (*cs.Metrics, *p2p.Metrics) { return &cs.Metrics{ - Height: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Subsystem: "consensus", - Name: "height", - Help: "Height of the chain.", - }, []string{}), - Rounds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", - Name: "rounds", - Help: "Number of rounds.", - }, []string{}), - - Validators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", - Name: "validators", - Help: "Number of validators who signed.", - }, []string{}), - ValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", - Name: "validators_power", - Help: "Total power of all validators.", - }, []string{}), - MissingValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", - Name: "missing_validators", - Help: "Number of validators who did not sign.", - }, []string{}), - MissingValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", - Name: "missing_validators_power", - Help: "Total power of the missing validators.", - }, []string{}), - ByzantineValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", - Name: "byzantine_validators", - Help: "Number of validators who tried to double sign.", - }, []string{}), - ByzantineValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", - Name: "byzantine_validators_power", - Help: "Total power of the byzantine validators.", - }, []string{}), - - BlockIntervalSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ - Subsystem: "consensus", - Name: "block_interval_seconds", - Help: "Time between this and the last block.", - Buckets: []float64{1, 2.5, 5, 10, 60}, - }, []string{}), - - NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", - Name: "num_txs", - Help: "Number of transactions.", - }, []string{}), - TotalTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Subsystem: "consensus", - Name: "total_txs", - Help: "Total number of transactions.", - }, []string{}), - - BlockSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", - Name: "block_size_bytes", - Help: "Size of the block.", - }, []string{}), - } + Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: "consensus", + Name: "height", + Help: "Height of the chain.", + }, []string{}), + Rounds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: "consensus", + Name: "rounds", + Help: "Number of rounds.", + }, []string{}), + + Validators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: "consensus", + Name: "validators", + Help: "Number of validators who signed.", + }, []string{}), + ValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: "consensus", + Name: "validators_power", + Help: "Total power of all validators.", + }, []string{}), + MissingValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: "consensus", + Name: "missing_validators", + Help: "Number of validators who did not sign.", + }, []string{}), + MissingValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: "consensus", + Name: "missing_validators_power", + Help: "Total power of the missing validators.", + }, []string{}), + ByzantineValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: "consensus", + Name: "byzantine_validators", + Help: "Number of validators who tried to double sign.", + }, []string{}), + ByzantineValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: "consensus", + Name: "byzantine_validators_power", + Help: "Total power of the byzantine validators.", + }, []string{}), + + BlockIntervalSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Subsystem: "consensus", + Name: "block_interval_seconds", + Help: "Time between this and the last block.", + Buckets: []float64{1, 2.5, 5, 10, 60}, + }, []string{}), + + NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: "consensus", + Name: "num_txs", + Help: "Number of transactions.", + }, []string{}), + BlockSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: "consensus", + Name: "block_size_bytes", + Help: "Size of the block.", + }, []string{}), + TotalTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: "consensus", + Name: "total_txs", + Help: "Total number of transactions.", + }, []string{}), + }, &p2p.Metrics{ + Peers: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: "p2p", + Name: "peers", + Help: "Number of peers.", + }, []string{}), + } } //------------------------------------------------------------------------------ @@ -321,8 +326,9 @@ func NewNode(config *cfg.Config, bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor.SetLogger(logger.With("module", "blockchain")) + csMetrics, p2pMetrics := metricsProvider() + // Make ConsensusReactor - csMetrics := metricsProvider() consensusState := cs.NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evidencePool, csMetrics) consensusState.SetLogger(consensusLogger) @@ -334,7 +340,7 @@ func NewNode(config *cfg.Config, p2pLogger := logger.With("module", "p2p") - sw := p2p.NewSwitch(config.P2P) + sw := p2p.NewSwitch(config.P2P, p2pMetrics) sw.SetLogger(p2pLogger) sw.AddReactor("MEMPOOL", mempoolReactor) sw.AddReactor("BLOCKCHAIN", bcReactor) diff --git a/p2p/metrics.go b/p2p/metrics.go new file mode 100644 index 000000000..3de453026 --- /dev/null +++ b/p2p/metrics.go @@ -0,0 +1,17 @@ +package p2p + +import "github.com/go-kit/kit/metrics" +import "github.com/go-kit/kit/metrics/discard" + +// Metrics contains metrics exposed by this package. +// see MetricsProvider for descriptions. +type Metrics struct { + Peers metrics.Gauge +} + +// NopMetrics returns no-op Metrics. +func NopMetrics() *Metrics { + return &Metrics{ + Peers: discard.NewGauge(), + } +} diff --git a/p2p/switch.go b/p2p/switch.go index f1ceee5c6..2eb10cf8d 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -73,10 +73,12 @@ type Switch struct { mConfig conn.MConnConfig rng *cmn.Rand // seed for randomizing dial times and orders + + metrics *Metrics } // NewSwitch creates a new Switch with the given config. -func NewSwitch(cfg *config.P2PConfig) *Switch { +func NewSwitch(cfg *config.P2PConfig, metrics *Metrics) *Switch { sw := &Switch{ config: cfg, reactors: make(map[string]Reactor), @@ -85,6 +87,7 @@ func NewSwitch(cfg *config.P2PConfig) *Switch { peers: NewPeerSet(), dialing: cmn.NewCMap(), reconnecting: cmn.NewCMap(), + metrics: metrics, } // Ensure we have a completely undeterministic PRNG. @@ -279,6 +282,7 @@ func (sw *Switch) StopPeerGracefully(peer Peer) { func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { sw.peers.Remove(peer) + sw.metrics.Peers.Add(float64(-1)) peer.Stop() for _, reactor := range sw.reactors { reactor.RemovePeer(peer, reason) @@ -623,6 +627,7 @@ func (sw *Switch) addPeer(pc peerConn) error { if err := sw.peers.Add(peer); err != nil { return err } + sw.metrics.Peers.Add(float64(1)) sw.Logger.Info("Added peer", "peer", peer) return nil diff --git a/p2p/test_util.go b/p2p/test_util.go index 0d2ba6c5e..02e4f644b 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -137,7 +137,7 @@ func MakeSwitch(cfg *config.P2PConfig, i int, network, version string, initSwitc nodeKey := &NodeKey{ PrivKey: crypto.GenPrivKeyEd25519(), } - sw := NewSwitch(cfg) + sw := NewSwitch(cfg, NopMetrics()) sw.SetLogger(log.TestingLogger()) sw = initSwitch(i, sw) ni := NodeInfo{