|
|
- package p2p
-
- import (
- "container/heap"
- "sort"
- "strconv"
- "time"
-
- "github.com/gogo/protobuf/proto"
- "github.com/tendermint/tendermint/libs/log"
- tmsync "github.com/tendermint/tendermint/libs/sync"
- )
-
- // pqEnvelope defines a wrapper around an Envelope with priority to be inserted
- // into a priority queue used for Envelope scheduling.
- type pqEnvelope struct {
- envelope Envelope
- priority uint
- size uint
- timestamp time.Time
-
- index int
- }
-
- // priorityQueue defines a type alias for a priority queue implementation.
- type priorityQueue []*pqEnvelope
-
- func (pq priorityQueue) get(i int) *pqEnvelope { return pq[i] }
- func (pq priorityQueue) Len() int { return len(pq) }
-
- func (pq priorityQueue) Less(i, j int) bool {
- // if both elements have the same priority, prioritize based on most recent
- if pq[i].priority == pq[j].priority {
- return pq[i].timestamp.After(pq[j].timestamp)
- }
-
- // otherwise, pick the pqEnvelope with the higher priority
- return pq[i].priority > pq[j].priority
- }
-
- func (pq priorityQueue) Swap(i, j int) {
- pq[i], pq[j] = pq[j], pq[i]
- pq[i].index = i
- pq[j].index = j
- }
-
- func (pq *priorityQueue) Push(x interface{}) {
- n := len(*pq)
- pqEnv := x.(*pqEnvelope)
- pqEnv.index = n
- *pq = append(*pq, pqEnv)
- }
-
- func (pq *priorityQueue) Pop() interface{} {
- old := *pq
- n := len(old)
- pqEnv := old[n-1]
- old[n-1] = nil
- pqEnv.index = -1
- *pq = old[:n-1]
- return pqEnv
- }
-
- // Assert the priority queue scheduler implements the queue interface at
- // compile-time.
- var _ queue = (*pqScheduler)(nil)
-
- type pqScheduler struct {
- logger log.Logger
- metrics *Metrics
- size uint
- sizes map[uint]uint // cumulative priority sizes
- pq *priorityQueue
- chDescs []ChannelDescriptor
- capacity uint
- chPriorities map[ChannelID]uint
-
- enqueueCh chan Envelope
- dequeueCh chan Envelope
- closer *tmsync.Closer
- done *tmsync.Closer
- }
-
- func newPQScheduler(
- logger log.Logger,
- m *Metrics,
- chDescs []ChannelDescriptor,
- enqueueBuf, dequeueBuf, capacity uint,
- ) *pqScheduler {
-
- // copy each ChannelDescriptor and sort them by ascending channel priority
- chDescsCopy := make([]ChannelDescriptor, len(chDescs))
- copy(chDescsCopy, chDescs)
- sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority < chDescsCopy[j].Priority })
-
- var (
- chPriorities = make(map[ChannelID]uint)
- sizes = make(map[uint]uint)
- )
-
- for _, chDesc := range chDescsCopy {
- chID := ChannelID(chDesc.ID)
- chPriorities[chID] = uint(chDesc.Priority)
- sizes[uint(chDesc.Priority)] = 0
- }
-
- pq := make(priorityQueue, 0)
- heap.Init(&pq)
-
- return &pqScheduler{
- logger: logger.With("router", "scheduler"),
- metrics: m,
- chDescs: chDescsCopy,
- capacity: capacity,
- chPriorities: chPriorities,
- pq: &pq,
- sizes: sizes,
- enqueueCh: make(chan Envelope, enqueueBuf),
- dequeueCh: make(chan Envelope, dequeueBuf),
- closer: tmsync.NewCloser(),
- done: tmsync.NewCloser(),
- }
- }
-
- func (s *pqScheduler) enqueue() chan<- Envelope {
- return s.enqueueCh
- }
-
- func (s *pqScheduler) dequeue() <-chan Envelope {
- return s.dequeueCh
- }
-
- func (s *pqScheduler) close() {
- s.closer.Close()
- <-s.done.Done()
- }
-
- func (s *pqScheduler) closed() <-chan struct{} {
- return s.closer.Done()
- }
-
- // start starts non-blocking process that starts the priority queue scheduler.
- func (s *pqScheduler) start() {
- go s.process()
- }
-
- // process starts a block process where we listen for Envelopes to enqueue. If
- // there is sufficient capacity, it will be enqueued into the priority queue,
- // otherwise, we attempt to dequeue enough elements from the priority queue to
- // make room for the incoming Envelope by dropping lower priority elements. If
- // there isn't sufficient capacity at lower priorities for the incoming Envelope,
- // it is dropped.
- //
- // After we attempt to enqueue the incoming Envelope, if the priority queue is
- // non-empty, we pop the top Envelope and send it on the dequeueCh.
- func (s *pqScheduler) process() {
- defer s.done.Close()
-
- for {
- select {
- case e := <-s.enqueueCh:
- chIDStr := strconv.Itoa(int(e.channelID))
- pqEnv := &pqEnvelope{
- envelope: e,
- size: uint(proto.Size(e.Message)),
- priority: s.chPriorities[e.channelID],
- timestamp: time.Now().UTC(),
- }
-
- s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
-
- // enqueue
-
- // Check if we have sufficient capacity to simply enqueue the incoming
- // Envelope.
- if s.size+pqEnv.size <= s.capacity {
- // enqueue the incoming Envelope
- s.push(pqEnv)
- } else {
- // There is not sufficient capacity to simply enqueue the incoming
- // Envelope. So we have to attempt to make room for it by dropping lower
- // priority Envelopes or drop the incoming Envelope otherwise.
-
- // The cumulative size of all enqueue envelopes at the incoming envelope's
- // priority or lower.
- total := s.sizes[pqEnv.priority]
-
- if total >= pqEnv.size {
- // There is room for the incoming Envelope, so we drop as many lower
- // priority Envelopes as we need to.
- var (
- canEnqueue bool
- tmpSize = s.size
- i = s.pq.Len() - 1
- )
-
- // Drop lower priority Envelopes until sufficient capacity exists for
- // the incoming Envelope
- for i >= 0 && !canEnqueue {
- pqEnvTmp := s.pq.get(i)
-
- if pqEnvTmp.priority < pqEnv.priority {
- if tmpSize+pqEnv.size <= s.capacity {
- canEnqueue = true
- } else {
- pqEnvTmpChIDStr := strconv.Itoa(int(pqEnvTmp.envelope.channelID))
- s.metrics.PeerQueueDroppedMsgs.With("ch_id", pqEnvTmpChIDStr).Add(1)
- s.logger.Debug(
- "dropped envelope",
- "ch_id", pqEnvTmpChIDStr,
- "priority", pqEnvTmp.priority,
- "msg_size", pqEnvTmp.size,
- "capacity", s.capacity,
- )
-
- // dequeue/drop from the priority queue
- heap.Remove(s.pq, pqEnvTmp.index)
-
- // update the size tracker
- tmpSize -= pqEnvTmp.size
-
- // start from the end again
- i = s.pq.Len() - 1
- }
- } else {
- i--
- }
- }
-
- // enqueue the incoming Envelope
- s.push(pqEnv)
- } else {
- // There is not sufficient capacity to drop lower priority Envelopes,
- // so we drop the incoming Envelope.
- s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1)
- s.logger.Debug(
- "dropped envelope",
- "ch_id", chIDStr,
- "priority", pqEnv.priority,
- "msg_size", pqEnv.size,
- "capacity", s.capacity,
- )
- }
- }
-
- // dequeue
-
- for s.pq.Len() > 0 {
- pqEnv = heap.Pop(s.pq).(*pqEnvelope)
- s.size -= pqEnv.size
-
- // deduct the Envelope size from all the relevant cumulative sizes
- for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
- s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
- }
-
- s.metrics.PeerSendBytesTotal.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
- s.dequeueCh <- pqEnv.envelope
- }
-
- case <-s.closer.Done():
- return
- }
- }
- }
-
- func (s *pqScheduler) push(pqEnv *pqEnvelope) {
- chIDStr := strconv.Itoa(int(pqEnv.envelope.channelID))
-
- // enqueue the incoming Envelope
- heap.Push(s.pq, pqEnv)
- s.size += pqEnv.size
- s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Add(float64(pqEnv.size))
-
- // Update the cumulative sizes by adding the Envelope's size to every
- // priority less than or equal to it.
- for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
- s.sizes[uint(s.chDescs[i].Priority)] += pqEnv.size
- }
- }
|