@ -167,13 +167,12 @@ func (s *pqScheduler) process() {
timestamp : time . Now ( ) . UTC ( ) ,
timestamp : time . Now ( ) . UTC ( ) ,
}
}
s . metrics . PeerPendingSendBytes . With ( "peer_id" , string ( pqEnv . envelope . To ) ) . Add ( float64 ( pqEnv . size ) )
// enqueue
// enqueue
// Check if we have sufficient capacity to simply enqueue the incoming
// Check if we have sufficient capacity to simply enqueue the incoming
// Envelope.
// Envelope.
if s . size + pqEnv . size <= s . capacity {
if s . size + pqEnv . size <= s . capacity {
s . metrics . PeerPendingSendBytes . With ( "peer_id" , string ( pqEnv . envelope . To ) ) . Add ( float64 ( pqEnv . size ) )
// enqueue the incoming Envelope
// enqueue the incoming Envelope
s . push ( pqEnv )
s . push ( pqEnv )
} else {
} else {
@ -213,6 +212,8 @@ func (s *pqScheduler) process() {
"capacity" , s . capacity ,
"capacity" , s . capacity ,
)
)
s . metrics . PeerPendingSendBytes . With ( "peer_id" , string ( pqEnvTmp . envelope . To ) ) . Add ( float64 ( - pqEnvTmp . size ) )
// dequeue/drop from the priority queue
// dequeue/drop from the priority queue
heap . Remove ( s . pq , pqEnvTmp . index )
heap . Remove ( s . pq , pqEnvTmp . index )
@ -257,6 +258,8 @@ func (s *pqScheduler) process() {
s . metrics . PeerSendBytesTotal . With (
s . metrics . PeerSendBytesTotal . With (
"chID" , chIDStr ,
"chID" , chIDStr ,
"peer_id" , string ( pqEnv . envelope . To ) ) . Add ( float64 ( pqEnv . size ) )
"peer_id" , string ( pqEnv . envelope . To ) ) . Add ( float64 ( pqEnv . size ) )
s . metrics . PeerPendingSendBytes . With (
"peer_id" , string ( pqEnv . envelope . To ) ) . Add ( float64 ( - pqEnv . size ) )
select {
select {
case s . dequeueCh <- pqEnv . envelope :
case s . dequeueCh <- pqEnv . envelope :
case <- s . closer . Done ( ) :
case <- s . closer . Done ( ) :