You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

292 lines
7.9 KiB

  1. package p2p
  2. import (
  3. "container/heap"
  4. "context"
  5. "sort"
  6. "strconv"
  7. "time"
  8. "github.com/gogo/protobuf/proto"
  9. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  10. "github.com/tendermint/tendermint/libs/log"
  11. )
  12. // pqEnvelope defines a wrapper around an Envelope with priority to be inserted
  13. // into a priority queue used for Envelope scheduling.
  14. type pqEnvelope struct {
  15. envelope Envelope
  16. priority uint
  17. size uint
  18. timestamp time.Time
  19. index int
  20. }
  21. // priorityQueue defines a type alias for a priority queue implementation.
  22. type priorityQueue []*pqEnvelope
  23. func (pq priorityQueue) get(i int) *pqEnvelope { return pq[i] }
  24. func (pq priorityQueue) Len() int { return len(pq) }
  25. func (pq priorityQueue) Less(i, j int) bool {
  26. // if both elements have the same priority, prioritize based on most recent
  27. if pq[i].priority == pq[j].priority {
  28. return pq[i].timestamp.After(pq[j].timestamp)
  29. }
  30. // otherwise, pick the pqEnvelope with the higher priority
  31. return pq[i].priority > pq[j].priority
  32. }
  33. func (pq priorityQueue) Swap(i, j int) {
  34. pq[i], pq[j] = pq[j], pq[i]
  35. pq[i].index = i
  36. pq[j].index = j
  37. }
  38. func (pq *priorityQueue) Push(x interface{}) {
  39. n := len(*pq)
  40. pqEnv := x.(*pqEnvelope)
  41. pqEnv.index = n
  42. *pq = append(*pq, pqEnv)
  43. }
  44. func (pq *priorityQueue) Pop() interface{} {
  45. old := *pq
  46. n := len(old)
  47. pqEnv := old[n-1]
  48. old[n-1] = nil
  49. pqEnv.index = -1
  50. *pq = old[:n-1]
  51. return pqEnv
  52. }
  53. // Assert the priority queue scheduler implements the queue interface at
  54. // compile-time.
  55. var _ queue = (*pqScheduler)(nil)
  56. type pqScheduler struct {
  57. logger log.Logger
  58. metrics *Metrics
  59. size uint
  60. sizes map[uint]uint // cumulative priority sizes
  61. pq *priorityQueue
  62. chDescs []*ChannelDescriptor
  63. capacity uint
  64. chPriorities map[ChannelID]uint
  65. enqueueCh chan Envelope
  66. dequeueCh chan Envelope
  67. closer *tmsync.Closer
  68. done *tmsync.Closer
  69. }
  70. func newPQScheduler(
  71. logger log.Logger,
  72. m *Metrics,
  73. chDescs []*ChannelDescriptor,
  74. enqueueBuf, dequeueBuf, capacity uint,
  75. ) *pqScheduler {
  76. // copy each ChannelDescriptor and sort them by ascending channel priority
  77. chDescsCopy := make([]*ChannelDescriptor, len(chDescs))
  78. copy(chDescsCopy, chDescs)
  79. sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority < chDescsCopy[j].Priority })
  80. var (
  81. chPriorities = make(map[ChannelID]uint)
  82. sizes = make(map[uint]uint)
  83. )
  84. for _, chDesc := range chDescsCopy {
  85. chID := chDesc.ID
  86. chPriorities[chID] = uint(chDesc.Priority)
  87. sizes[uint(chDesc.Priority)] = 0
  88. }
  89. pq := make(priorityQueue, 0)
  90. heap.Init(&pq)
  91. return &pqScheduler{
  92. logger: logger.With("router", "scheduler"),
  93. metrics: m,
  94. chDescs: chDescsCopy,
  95. capacity: capacity,
  96. chPriorities: chPriorities,
  97. pq: &pq,
  98. sizes: sizes,
  99. enqueueCh: make(chan Envelope, enqueueBuf),
  100. dequeueCh: make(chan Envelope, dequeueBuf),
  101. closer: tmsync.NewCloser(),
  102. done: tmsync.NewCloser(),
  103. }
  104. }
  105. func (s *pqScheduler) enqueue() chan<- Envelope {
  106. return s.enqueueCh
  107. }
  108. func (s *pqScheduler) dequeue() <-chan Envelope {
  109. return s.dequeueCh
  110. }
  111. func (s *pqScheduler) close() {
  112. s.closer.Close()
  113. <-s.done.Done()
  114. }
  115. func (s *pqScheduler) closed() <-chan struct{} {
  116. return s.closer.Done()
  117. }
  118. // start starts non-blocking process that starts the priority queue scheduler.
  119. func (s *pqScheduler) start(ctx context.Context) {
  120. go s.process(ctx)
  121. }
  122. // process starts a block process where we listen for Envelopes to enqueue. If
  123. // there is sufficient capacity, it will be enqueued into the priority queue,
  124. // otherwise, we attempt to dequeue enough elements from the priority queue to
  125. // make room for the incoming Envelope by dropping lower priority elements. If
  126. // there isn't sufficient capacity at lower priorities for the incoming Envelope,
  127. // it is dropped.
  128. //
  129. // After we attempt to enqueue the incoming Envelope, if the priority queue is
  130. // non-empty, we pop the top Envelope and send it on the dequeueCh.
  131. func (s *pqScheduler) process(ctx context.Context) {
  132. defer s.done.Close()
  133. for {
  134. select {
  135. case e := <-s.enqueueCh:
  136. chIDStr := strconv.Itoa(int(e.ChannelID))
  137. pqEnv := &pqEnvelope{
  138. envelope: e,
  139. size: uint(proto.Size(e.Message)),
  140. priority: s.chPriorities[e.ChannelID],
  141. timestamp: time.Now().UTC(),
  142. }
  143. // enqueue
  144. // Check if we have sufficient capacity to simply enqueue the incoming
  145. // Envelope.
  146. if s.size+pqEnv.size <= s.capacity {
  147. s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
  148. // enqueue the incoming Envelope
  149. s.push(pqEnv)
  150. } else {
  151. // There is not sufficient capacity to simply enqueue the incoming
  152. // Envelope. So we have to attempt to make room for it by dropping lower
  153. // priority Envelopes or drop the incoming Envelope otherwise.
  154. // The cumulative size of all enqueue envelopes at the incoming envelope's
  155. // priority or lower.
  156. total := s.sizes[pqEnv.priority]
  157. if total >= pqEnv.size {
  158. // There is room for the incoming Envelope, so we drop as many lower
  159. // priority Envelopes as we need to.
  160. var (
  161. canEnqueue bool
  162. tmpSize = s.size
  163. i = s.pq.Len() - 1
  164. )
  165. // Drop lower priority Envelopes until sufficient capacity exists for
  166. // the incoming Envelope
  167. for i >= 0 && !canEnqueue {
  168. pqEnvTmp := s.pq.get(i)
  169. if pqEnvTmp.priority < pqEnv.priority {
  170. if tmpSize+pqEnv.size <= s.capacity {
  171. canEnqueue = true
  172. } else {
  173. pqEnvTmpChIDStr := strconv.Itoa(int(pqEnvTmp.envelope.ChannelID))
  174. s.metrics.PeerQueueDroppedMsgs.With("ch_id", pqEnvTmpChIDStr).Add(1)
  175. s.logger.Debug(
  176. "dropped envelope",
  177. "ch_id", pqEnvTmpChIDStr,
  178. "priority", pqEnvTmp.priority,
  179. "msg_size", pqEnvTmp.size,
  180. "capacity", s.capacity,
  181. )
  182. s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnvTmp.envelope.To)).Add(float64(-pqEnvTmp.size))
  183. // dequeue/drop from the priority queue
  184. heap.Remove(s.pq, pqEnvTmp.index)
  185. // update the size tracker
  186. tmpSize -= pqEnvTmp.size
  187. // start from the end again
  188. i = s.pq.Len() - 1
  189. }
  190. } else {
  191. i--
  192. }
  193. }
  194. // enqueue the incoming Envelope
  195. s.push(pqEnv)
  196. } else {
  197. // There is not sufficient capacity to drop lower priority Envelopes,
  198. // so we drop the incoming Envelope.
  199. s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1)
  200. s.logger.Debug(
  201. "dropped envelope",
  202. "ch_id", chIDStr,
  203. "priority", pqEnv.priority,
  204. "msg_size", pqEnv.size,
  205. "capacity", s.capacity,
  206. )
  207. }
  208. }
  209. // dequeue
  210. for s.pq.Len() > 0 {
  211. pqEnv = heap.Pop(s.pq).(*pqEnvelope)
  212. s.size -= pqEnv.size
  213. // deduct the Envelope size from all the relevant cumulative sizes
  214. for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
  215. s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
  216. }
  217. s.metrics.PeerSendBytesTotal.With(
  218. "chID", chIDStr,
  219. "peer_id", string(pqEnv.envelope.To),
  220. "message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size))
  221. s.metrics.PeerPendingSendBytes.With(
  222. "peer_id", string(pqEnv.envelope.To)).Add(float64(-pqEnv.size))
  223. select {
  224. case s.dequeueCh <- pqEnv.envelope:
  225. case <-s.closer.Done():
  226. return
  227. }
  228. }
  229. case <-ctx.Done():
  230. return
  231. case <-s.closer.Done():
  232. return
  233. }
  234. }
  235. }
  236. func (s *pqScheduler) push(pqEnv *pqEnvelope) {
  237. chIDStr := strconv.Itoa(int(pqEnv.envelope.ChannelID))
  238. // enqueue the incoming Envelope
  239. heap.Push(s.pq, pqEnv)
  240. s.size += pqEnv.size
  241. s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Add(float64(pqEnv.size))
  242. // Update the cumulative sizes by adding the Envelope's size to every
  243. // priority less than or equal to it.
  244. for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
  245. s.sizes[uint(s.chDescs[i].Priority)] += pqEnv.size
  246. }
  247. }