From b4bc6bb4e884aca1e3a19ef9aba5aacd6325159a Mon Sep 17 00:00:00 2001 From: William Banfield <4561443+williambanfield@users.noreply.github.com> Date: Tue, 26 Oct 2021 14:45:33 +0200 Subject: [PATCH] p2p: add message type into the send/recv bytes metrics (#7155) This pull request adds a new "mesage_type" label to the send/recv bytes metrics calculated in the p2p code. Below is a snippet of the updated metrics that includes the updated label: ``` tendermint_p2p_peer_receive_bytes_total{chID="32",chain_id="ci",message_type="consensus_HasVote",peer_id="2551a13ed720101b271a5df4816d1e4b3d3bd133"} 652 tendermint_p2p_peer_receive_bytes_total{chID="32",chain_id="ci",message_type="consensus_HasVote",peer_id="4b1068420ef739db63377250553562b9a978708a"} 631 tendermint_p2p_peer_receive_bytes_total{chID="32",chain_id="ci",message_type="consensus_HasVote",peer_id="927c50a5e508c747830ce3ba64a3f70fdda58ef2"} 631 tendermint_p2p_peer_receive_bytes_total{chID="32",chain_id="ci",message_type="consensus_NewRoundStep",peer_id="2551a13ed720101b271a5df4816d1e4b3d3bd133"} 393 tendermint_p2p_peer_receive_bytes_total{chID="32",chain_id="ci",message_type="consensus_NewRoundStep",peer_id="4b1068420ef739db63377250553562b9a978708a"} 357 tendermint_p2p_peer_receive_bytes_total{chID="32",chain_id="ci",message_type="consensus_NewRoundStep",peer_id="927c50a5e508c747830ce3ba64a3f70fdda58ef2"} 386 ``` --- internal/p2p/metrics.go | 47 ++++++++++++++++++++++++++++++++++-- internal/p2p/metrics_test.go | 19 +++++++++++++++ internal/p2p/pqueue.go | 3 ++- internal/p2p/router.go | 3 ++- 4 files changed, 68 insertions(+), 4 deletions(-) create mode 100644 internal/p2p/metrics_test.go diff --git a/internal/p2p/metrics.go b/internal/p2p/metrics.go index e3481058b..3677180de 100644 --- a/internal/p2p/metrics.go +++ b/internal/p2p/metrics.go @@ -1,6 +1,11 @@ package p2p import ( + "fmt" + "reflect" + "regexp" + "sync" + "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/discard" "github.com/go-kit/kit/metrics/prometheus" @@ -13,6 +18,13 @@ const ( MetricsSubsystem = "p2p" ) +var ( + // valueToLabelRegexp is used to find the golang package name and type name + // so that the name can be turned into a prometheus label where the characters + // in the label do not include prometheus special characters such as '*' and '.'. + valueToLabelRegexp = regexp.MustCompile(`\*?(\w+)\.(.*)`) +) + // Metrics contains metrics exposed by this package. type Metrics struct { // Number of peers. @@ -43,6 +55,9 @@ type Metrics struct { // PeerQueueMsgSize defines the average size of messages sent over a peer's // queue for a specific flow (i.e. Channel). PeerQueueMsgSize metrics.Gauge + + mtx *sync.RWMutex + messageLabelNames map[reflect.Type]string } // PrometheusMetrics returns Metrics build using Prometheus client library. @@ -68,14 +83,14 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Subsystem: MetricsSubsystem, Name: "peer_receive_bytes_total", Help: "Number of bytes received from a given peer.", - }, append(labels, "peer_id", "chID")).With(labelsAndValues...), + }, append(labels, "peer_id", "chID", "message_type")).With(labelsAndValues...), PeerSendBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, Name: "peer_send_bytes_total", Help: "Number of bytes sent to a given peer.", - }, append(labels, "peer_id", "chID")).With(labelsAndValues...), + }, append(labels, "peer_id", "chID", "message_type")).With(labelsAndValues...), PeerPendingSendBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, @@ -118,6 +133,9 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "router_channel_queue_msg_size", Help: "The size of messages sent over a peer's queue for a specific p2p Channel.", }, append(labels, "ch_id")).With(labelsAndValues...), + + mtx: &sync.RWMutex{}, + messageLabelNames: map[reflect.Type]string{}, } } @@ -133,5 +151,30 @@ func NopMetrics() *Metrics { RouterChannelQueueSend: discard.NewHistogram(), PeerQueueDroppedMsgs: discard.NewCounter(), PeerQueueMsgSize: discard.NewGauge(), + mtx: &sync.RWMutex{}, + messageLabelNames: map[reflect.Type]string{}, + } +} + +// ValueToMetricLabel is a method that is used to produce a prometheus label value of the golang +// type that is passed in. +// This method uses a map on the Metrics struct so that each label name only needs +// to be produced once to prevent expensive string operations. +func (m *Metrics) ValueToMetricLabel(i interface{}) string { + t := reflect.TypeOf(i) + m.mtx.RLock() + + if s, ok := m.messageLabelNames[t]; ok { + m.mtx.RUnlock() + return s } + m.mtx.RUnlock() + + s := t.String() + ss := valueToLabelRegexp.FindStringSubmatch(s) + l := fmt.Sprintf("%s_%s", ss[1], ss[2]) + m.mtx.Lock() + defer m.mtx.Unlock() + m.messageLabelNames[t] = l + return l } diff --git a/internal/p2p/metrics_test.go b/internal/p2p/metrics_test.go new file mode 100644 index 000000000..53b3c47bd --- /dev/null +++ b/internal/p2p/metrics_test.go @@ -0,0 +1,19 @@ +package p2p + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tendermint/tendermint/proto/tendermint/p2p" +) + +func TestValueToMetricsLabel(t *testing.T) { + m := NopMetrics() + r := &p2p.PexResponse{} + str := m.ValueToMetricLabel(r) + assert.Equal(t, "p2p_PexResponse", str) + + // subsequent calls to the function should produce the same result + str = m.ValueToMetricLabel(r) + assert.Equal(t, "p2p_PexResponse", str) +} diff --git a/internal/p2p/pqueue.go b/internal/p2p/pqueue.go index e0e812cf5..11cdbd130 100644 --- a/internal/p2p/pqueue.go +++ b/internal/p2p/pqueue.go @@ -257,7 +257,8 @@ func (s *pqScheduler) process() { s.metrics.PeerSendBytesTotal.With( "chID", chIDStr, - "peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size)) + "peer_id", string(pqEnv.envelope.To), + "message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size)) s.metrics.PeerPendingSendBytes.With( "peer_id", string(pqEnv.envelope.To)).Add(float64(-pqEnv.size)) select { diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 402e2f0ed..4724e8d9b 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -936,7 +936,8 @@ func (r *Router) receivePeer(peerID types.NodeID, conn Connection) error { case queue.enqueue() <- Envelope{From: peerID, Message: msg}: r.metrics.PeerReceiveBytesTotal.With( "chID", fmt.Sprint(chID), - "peer_id", string(peerID)).Add(float64(proto.Size(msg))) + "peer_id", string(peerID), + "message_type", r.metrics.ValueToMetricLabel(msg)).Add(float64(proto.Size(msg))) r.metrics.RouterChannelQueueSend.Observe(time.Since(start).Seconds()) r.logger.Debug("received message", "peer", peerID, "message", msg)