You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

284 lines
8.0 KiB

  1. package p2p
  2. import (
  3. "container/heap"
  4. "context"
  5. "sort"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "github.com/gogo/protobuf/proto"
  10. "github.com/tendermint/tendermint/libs/log"
  11. )
  12. // pqEnvelope defines a wrapper around an Envelope with priority to be inserted
  13. // into a priority queue used for Envelope scheduling.
  14. type pqEnvelope struct {
  15. envelope Envelope
  16. priority uint
  17. size uint
  18. timestamp time.Time
  19. index int
  20. }
  21. // priorityQueue defines a type alias for a priority queue implementation.
  22. type priorityQueue []*pqEnvelope
  23. func (pq priorityQueue) get(i int) *pqEnvelope { return pq[i] }
  24. func (pq priorityQueue) Len() int { return len(pq) }
  25. func (pq priorityQueue) Less(i, j int) bool {
  26. // if both elements have the same priority, prioritize based on most recent
  27. if pq[i].priority == pq[j].priority {
  28. return pq[i].timestamp.After(pq[j].timestamp)
  29. }
  30. // otherwise, pick the pqEnvelope with the higher priority
  31. return pq[i].priority > pq[j].priority
  32. }
  33. func (pq priorityQueue) Swap(i, j int) {
  34. pq[i], pq[j] = pq[j], pq[i]
  35. pq[i].index = i
  36. pq[j].index = j
  37. }
  38. func (pq *priorityQueue) Push(x interface{}) {
  39. n := len(*pq)
  40. pqEnv := x.(*pqEnvelope)
  41. pqEnv.index = n
  42. *pq = append(*pq, pqEnv)
  43. }
  44. func (pq *priorityQueue) Pop() interface{} {
  45. old := *pq
  46. n := len(old)
  47. pqEnv := old[n-1]
  48. old[n-1] = nil
  49. pqEnv.index = -1
  50. *pq = old[:n-1]
  51. return pqEnv
  52. }
  53. // Assert the priority queue scheduler implements the queue interface at
  54. // compile-time.
  55. var _ queue = (*pqScheduler)(nil)
  56. type pqScheduler struct {
  57. logger log.Logger
  58. metrics *Metrics
  59. size uint
  60. sizes map[uint]uint // cumulative priority sizes
  61. pq *priorityQueue
  62. chDescs []*ChannelDescriptor
  63. capacity uint
  64. chPriorities map[ChannelID]uint
  65. enqueueCh chan Envelope
  66. dequeueCh chan Envelope
  67. closeFn func()
  68. closeCh <-chan struct{}
  69. done chan struct{}
  70. }
  71. func newPQScheduler(
  72. logger log.Logger,
  73. m *Metrics,
  74. chDescs []*ChannelDescriptor,
  75. enqueueBuf, dequeueBuf, capacity uint,
  76. ) *pqScheduler {
  77. // copy each ChannelDescriptor and sort them by ascending channel priority
  78. chDescsCopy := make([]*ChannelDescriptor, len(chDescs))
  79. copy(chDescsCopy, chDescs)
  80. sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority < chDescsCopy[j].Priority })
  81. var (
  82. chPriorities = make(map[ChannelID]uint)
  83. sizes = make(map[uint]uint)
  84. )
  85. for _, chDesc := range chDescsCopy {
  86. chID := chDesc.ID
  87. chPriorities[chID] = uint(chDesc.Priority)
  88. sizes[uint(chDesc.Priority)] = 0
  89. }
  90. pq := make(priorityQueue, 0)
  91. heap.Init(&pq)
  92. closeCh := make(chan struct{})
  93. once := &sync.Once{}
  94. return &pqScheduler{
  95. logger: logger.With("router", "scheduler"),
  96. metrics: m,
  97. chDescs: chDescsCopy,
  98. capacity: capacity,
  99. chPriorities: chPriorities,
  100. pq: &pq,
  101. sizes: sizes,
  102. enqueueCh: make(chan Envelope, enqueueBuf),
  103. dequeueCh: make(chan Envelope, dequeueBuf),
  104. closeFn: func() { once.Do(func() { close(closeCh) }) },
  105. closeCh: closeCh,
  106. done: make(chan struct{}),
  107. }
  108. }
  109. // start starts non-blocking process that starts the priority queue scheduler.
  110. func (s *pqScheduler) start(ctx context.Context) { go s.process(ctx) }
  111. func (s *pqScheduler) enqueue() chan<- Envelope { return s.enqueueCh }
  112. func (s *pqScheduler) dequeue() <-chan Envelope { return s.dequeueCh }
  113. func (s *pqScheduler) close() { s.closeFn() }
  114. func (s *pqScheduler) closed() <-chan struct{} { return s.done }
  115. // process starts a block process where we listen for Envelopes to enqueue. If
  116. // there is sufficient capacity, it will be enqueued into the priority queue,
  117. // otherwise, we attempt to dequeue enough elements from the priority queue to
  118. // make room for the incoming Envelope by dropping lower priority elements. If
  119. // there isn't sufficient capacity at lower priorities for the incoming Envelope,
  120. // it is dropped.
  121. //
  122. // After we attempt to enqueue the incoming Envelope, if the priority queue is
  123. // non-empty, we pop the top Envelope and send it on the dequeueCh.
  124. func (s *pqScheduler) process(ctx context.Context) {
  125. defer close(s.done)
  126. for {
  127. select {
  128. case e := <-s.enqueueCh:
  129. chIDStr := strconv.Itoa(int(e.ChannelID))
  130. pqEnv := &pqEnvelope{
  131. envelope: e,
  132. size: uint(proto.Size(e.Message)),
  133. priority: s.chPriorities[e.ChannelID],
  134. timestamp: time.Now().UTC(),
  135. }
  136. // enqueue
  137. // Check if we have sufficient capacity to simply enqueue the incoming
  138. // Envelope.
  139. if s.size+pqEnv.size <= s.capacity {
  140. s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
  141. // enqueue the incoming Envelope
  142. s.push(pqEnv)
  143. } else {
  144. // There is not sufficient capacity to simply enqueue the incoming
  145. // Envelope. So we have to attempt to make room for it by dropping lower
  146. // priority Envelopes or drop the incoming Envelope otherwise.
  147. // The cumulative size of all enqueue envelopes at the incoming envelope's
  148. // priority or lower.
  149. total := s.sizes[pqEnv.priority]
  150. if total >= pqEnv.size {
  151. // There is room for the incoming Envelope, so we drop as many lower
  152. // priority Envelopes as we need to.
  153. var (
  154. canEnqueue bool
  155. tmpSize = s.size
  156. i = s.pq.Len() - 1
  157. )
  158. // Drop lower priority Envelopes until sufficient capacity exists for
  159. // the incoming Envelope
  160. for i >= 0 && !canEnqueue {
  161. pqEnvTmp := s.pq.get(i)
  162. if pqEnvTmp.priority < pqEnv.priority {
  163. if tmpSize+pqEnv.size <= s.capacity {
  164. canEnqueue = true
  165. } else {
  166. pqEnvTmpChIDStr := strconv.Itoa(int(pqEnvTmp.envelope.ChannelID))
  167. s.metrics.PeerQueueDroppedMsgs.With("ch_id", pqEnvTmpChIDStr).Add(1)
  168. s.logger.Debug(
  169. "dropped envelope",
  170. "ch_id", pqEnvTmpChIDStr,
  171. "priority", pqEnvTmp.priority,
  172. "msg_size", pqEnvTmp.size,
  173. "capacity", s.capacity,
  174. )
  175. s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnvTmp.envelope.To)).Add(float64(-pqEnvTmp.size))
  176. // dequeue/drop from the priority queue
  177. heap.Remove(s.pq, pqEnvTmp.index)
  178. // update the size tracker
  179. tmpSize -= pqEnvTmp.size
  180. // start from the end again
  181. i = s.pq.Len() - 1
  182. }
  183. } else {
  184. i--
  185. }
  186. }
  187. // enqueue the incoming Envelope
  188. s.push(pqEnv)
  189. } else {
  190. // There is not sufficient capacity to drop lower priority Envelopes,
  191. // so we drop the incoming Envelope.
  192. s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1)
  193. s.logger.Debug(
  194. "dropped envelope",
  195. "ch_id", chIDStr,
  196. "priority", pqEnv.priority,
  197. "msg_size", pqEnv.size,
  198. "capacity", s.capacity,
  199. )
  200. }
  201. }
  202. // dequeue
  203. for s.pq.Len() > 0 {
  204. pqEnv = heap.Pop(s.pq).(*pqEnvelope)
  205. s.size -= pqEnv.size
  206. // deduct the Envelope size from all the relevant cumulative sizes
  207. for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
  208. s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
  209. }
  210. s.metrics.PeerSendBytesTotal.With(
  211. "chID", chIDStr,
  212. "peer_id", string(pqEnv.envelope.To),
  213. "message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size))
  214. s.metrics.PeerPendingSendBytes.With(
  215. "peer_id", string(pqEnv.envelope.To)).Add(float64(-pqEnv.size))
  216. select {
  217. case s.dequeueCh <- pqEnv.envelope:
  218. case <-s.closeCh:
  219. return
  220. }
  221. }
  222. case <-ctx.Done():
  223. return
  224. case <-s.closeCh:
  225. return
  226. }
  227. }
  228. }
  229. func (s *pqScheduler) push(pqEnv *pqEnvelope) {
  230. chIDStr := strconv.Itoa(int(pqEnv.envelope.ChannelID))
  231. // enqueue the incoming Envelope
  232. heap.Push(s.pq, pqEnv)
  233. s.size += pqEnv.size
  234. s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Add(float64(pqEnv.size))
  235. // Update the cumulative sizes by adding the Envelope's size to every
  236. // priority less than or equal to it.
  237. for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
  238. s.sizes[uint(s.chDescs[i].Priority)] += pqEnv.size
  239. }
  240. }