diff --git a/internal/p2p/metrics.go b/internal/p2p/metrics.go index e3481058b..3677180de 100644 --- a/internal/p2p/metrics.go +++ b/internal/p2p/metrics.go @@ -1,6 +1,11 @@ package p2p import ( + "fmt" + "reflect" + "regexp" + "sync" + "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/discard" "github.com/go-kit/kit/metrics/prometheus" @@ -13,6 +18,13 @@ const ( MetricsSubsystem = "p2p" ) +var ( + // valueToLabelRegexp is used to find the golang package name and type name + // so that the name can be turned into a prometheus label where the characters + // in the label do not include prometheus special characters such as '*' and '.'. + valueToLabelRegexp = regexp.MustCompile(`\*?(\w+)\.(.*)`) +) + // Metrics contains metrics exposed by this package. type Metrics struct { // Number of peers. @@ -43,6 +55,9 @@ type Metrics struct { // PeerQueueMsgSize defines the average size of messages sent over a peer's // queue for a specific flow (i.e. Channel). PeerQueueMsgSize metrics.Gauge + + mtx *sync.RWMutex + messageLabelNames map[reflect.Type]string } // PrometheusMetrics returns Metrics build using Prometheus client library. @@ -68,14 +83,14 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Subsystem: MetricsSubsystem, Name: "peer_receive_bytes_total", Help: "Number of bytes received from a given peer.", - }, append(labels, "peer_id", "chID")).With(labelsAndValues...), + }, append(labels, "peer_id", "chID", "message_type")).With(labelsAndValues...), PeerSendBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, Name: "peer_send_bytes_total", Help: "Number of bytes sent to a given peer.", - }, append(labels, "peer_id", "chID")).With(labelsAndValues...), + }, append(labels, "peer_id", "chID", "message_type")).With(labelsAndValues...), PeerPendingSendBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, @@ -118,6 +133,9 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "router_channel_queue_msg_size", Help: "The size of messages sent over a peer's queue for a specific p2p Channel.", }, append(labels, "ch_id")).With(labelsAndValues...), + + mtx: &sync.RWMutex{}, + messageLabelNames: map[reflect.Type]string{}, } } @@ -133,5 +151,30 @@ func NopMetrics() *Metrics { RouterChannelQueueSend: discard.NewHistogram(), PeerQueueDroppedMsgs: discard.NewCounter(), PeerQueueMsgSize: discard.NewGauge(), + mtx: &sync.RWMutex{}, + messageLabelNames: map[reflect.Type]string{}, + } +} + +// ValueToMetricLabel is a method that is used to produce a prometheus label value of the golang +// type that is passed in. +// This method uses a map on the Metrics struct so that each label name only needs +// to be produced once to prevent expensive string operations. +func (m *Metrics) ValueToMetricLabel(i interface{}) string { + t := reflect.TypeOf(i) + m.mtx.RLock() + + if s, ok := m.messageLabelNames[t]; ok { + m.mtx.RUnlock() + return s } + m.mtx.RUnlock() + + s := t.String() + ss := valueToLabelRegexp.FindStringSubmatch(s) + l := fmt.Sprintf("%s_%s", ss[1], ss[2]) + m.mtx.Lock() + defer m.mtx.Unlock() + m.messageLabelNames[t] = l + return l } diff --git a/internal/p2p/metrics_test.go b/internal/p2p/metrics_test.go new file mode 100644 index 000000000..53b3c47bd --- /dev/null +++ b/internal/p2p/metrics_test.go @@ -0,0 +1,19 @@ +package p2p + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tendermint/tendermint/proto/tendermint/p2p" +) + +func TestValueToMetricsLabel(t *testing.T) { + m := NopMetrics() + r := &p2p.PexResponse{} + str := m.ValueToMetricLabel(r) + assert.Equal(t, "p2p_PexResponse", str) + + // subsequent calls to the function should produce the same result + str = m.ValueToMetricLabel(r) + assert.Equal(t, "p2p_PexResponse", str) +} diff --git a/internal/p2p/pqueue.go b/internal/p2p/pqueue.go index e4560c7bd..d5051325f 100644 --- a/internal/p2p/pqueue.go +++ b/internal/p2p/pqueue.go @@ -256,7 +256,8 @@ func (s *pqScheduler) process() { s.metrics.PeerSendBytesTotal.With( "chID", chIDStr, - "peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size)) + "peer_id", string(pqEnv.envelope.To), + "message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size)) select { case s.dequeueCh <- pqEnv.envelope: case <-s.closer.Done(): diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 1171566d1..d0a8e5360 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -938,7 +938,8 @@ func (r *Router) receivePeer(peerID types.NodeID, conn Connection) error { case queue.enqueue() <- Envelope{From: peerID, Message: msg}: r.metrics.PeerReceiveBytesTotal.With( "chID", fmt.Sprint(chID), - "peer_id", string(peerID)).Add(float64(proto.Size(msg))) + "peer_id", string(peerID), + "message_type", r.metrics.ValueToMetricLabel(msg)).Add(float64(proto.Size(msg))) r.metrics.RouterChannelQueueSend.Observe(time.Since(start).Seconds()) r.logger.Debug("received message", "peer", peerID, "message", msg)