diff --git a/p2p/metrics.go b/p2p/metrics.go index c7d199cd7..e3481058b 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -23,8 +23,6 @@ type Metrics struct { PeerSendBytesTotal metrics.Counter // Pending bytes to be sent to a given peer. PeerPendingSendBytes metrics.Gauge - // Number of transactions submitted by each peer. - NumTxs metrics.Gauge // RouterPeerQueueRecv defines the time taken to read off of a peer's queue // before sending on the connection. @@ -64,30 +62,28 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "peers", Help: "Number of peers.", }, labels).With(labelsAndValues...), + PeerReceiveBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, Name: "peer_receive_bytes_total", Help: "Number of bytes received from a given peer.", }, append(labels, "peer_id", "chID")).With(labelsAndValues...), + PeerSendBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, Name: "peer_send_bytes_total", Help: "Number of bytes sent to a given peer.", }, append(labels, "peer_id", "chID")).With(labelsAndValues...), + PeerPendingSendBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, Name: "peer_pending_send_bytes", Help: "Number of pending bytes to be sent to a given peer.", }, append(labels, "peer_id")).With(labelsAndValues...), - NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "num_txs", - Help: "Number of transactions submitted by each peer.", - }, append(labels, "peer_id")).With(labelsAndValues...), + RouterPeerQueueRecv: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -132,7 +128,6 @@ func NopMetrics() *Metrics { PeerReceiveBytesTotal: discard.NewCounter(), PeerSendBytesTotal: discard.NewCounter(), PeerPendingSendBytes: discard.NewGauge(), - NumTxs: discard.NewGauge(), RouterPeerQueueRecv: discard.NewHistogram(), RouterPeerQueueSend: discard.NewHistogram(), RouterChannelQueueSend: discard.NewHistogram(), diff --git a/p2p/pqueue.go b/p2p/pqueue.go index d97d98375..0e081241d 100644 --- a/p2p/pqueue.go +++ b/p2p/pqueue.go @@ -167,6 +167,8 @@ func (s *pqScheduler) process() { timestamp: time.Now().UTC(), } + s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size)) + // enqueue // Check if we have sufficient capacity to simply enqueue the incoming @@ -252,6 +254,7 @@ func (s *pqScheduler) process() { s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size } + s.metrics.PeerSendBytesTotal.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size)) s.dequeueCh <- pqEnv.envelope } @@ -267,7 +270,7 @@ func (s *pqScheduler) push(pqEnv *pqEnvelope) { // enqueue the incoming Envelope heap.Push(s.pq, pqEnv) s.size += pqEnv.size - s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(pqEnv.size)) + s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Add(float64(pqEnv.size)) // Update the cumulative sizes by adding the Envelope's size to every // priority less than or equal to it. diff --git a/p2p/router.go b/p2p/router.go index b7ebf7570..beb185a08 100644 --- a/p2p/router.go +++ b/p2p/router.go @@ -513,6 +513,8 @@ func (r *Router) acceptPeers(transport Transport) { return } + r.metrics.Peers.Add(1) + queue := r.queueFactory(queueBufferDefault) r.peerMtx.Lock() @@ -528,6 +530,8 @@ func (r *Router) acceptPeers(transport Transport) { if err := r.peerManager.Disconnected(peerInfo.NodeID); err != nil { r.logger.Error("failed to disconnect peer", "peer", peerInfo.NodeID, "err", err) + } else { + r.metrics.Peers.Add(-1) } }() @@ -590,6 +594,8 @@ func (r *Router) dialPeers() { return } + r.metrics.Peers.Add(1) + peerQueue := r.getOrMakeQueue(peerID) defer func() { r.peerMtx.Lock() @@ -600,6 +606,8 @@ func (r *Router) dialPeers() { if err := r.peerManager.Disconnected(peerID); err != nil { r.logger.Error("failed to disconnect peer", "peer", address, "err", err) + } else { + r.metrics.Peers.Add(-1) } }() @@ -775,6 +783,7 @@ func (r *Router) receivePeer(peerID NodeID, conn Connection) error { select { case queue.enqueue() <- Envelope{From: peerID, Message: msg}: + r.metrics.PeerReceiveBytesTotal.With("peer_id", string(peerID)).Add(float64(proto.Size(msg))) r.metrics.RouterChannelQueueSend.Observe(time.Since(start).Seconds()) r.logger.Debug("received message", "peer", peerID, "message", msg) diff --git a/p2p/wdrr_queue.go b/p2p/wdrr_queue.go index 57815c8dd..80bd7ec08 100644 --- a/p2p/wdrr_queue.go +++ b/p2p/wdrr_queue.go @@ -174,6 +174,8 @@ func (s *wdrrScheduler) process() { wEnv := wrappedEnvelope{envelope: e, size: uint(proto.Size(e.Message))} msgSize := wEnv.size + s.metrics.PeerPendingSendBytes.With("peer_id", string(e.To)).Add(float64(msgSize)) + // If we're at capacity, we need to either drop the incoming Envelope or // an Envelope from a lower priority flow. Otherwise, we add the (wrapped) // envelope to the flow's queue. @@ -261,6 +263,7 @@ func (s *wdrrScheduler) process() { // 4. remove from the flow's queue // 5. grab the next HoQ Envelope and flow's deficit for len(s.buffer[chID]) > 0 && d >= we.size { + s.metrics.PeerSendBytesTotal.With("peer_id", string(we.envelope.To)).Add(float64(we.size)) s.dequeueCh <- we.envelope s.size -= we.size s.deficits[chID] -= we.size