You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

287 lines
7.6 KiB

  1. package p2p
  2. import (
  3. "container/heap"
  4. "sort"
  5. "strconv"
  6. "time"
  7. "github.com/gogo/protobuf/proto"
  8. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  9. "github.com/tendermint/tendermint/libs/log"
  10. )
  11. // pqEnvelope defines a wrapper around an Envelope with priority to be inserted
  12. // into a priority queue used for Envelope scheduling.
  13. type pqEnvelope struct {
  14. envelope Envelope
  15. priority uint
  16. size uint
  17. timestamp time.Time
  18. index int
  19. }
  20. // priorityQueue defines a type alias for a priority queue implementation.
  21. type priorityQueue []*pqEnvelope
  22. func (pq priorityQueue) get(i int) *pqEnvelope { return pq[i] }
  23. func (pq priorityQueue) Len() int { return len(pq) }
  24. func (pq priorityQueue) Less(i, j int) bool {
  25. // if both elements have the same priority, prioritize based on most recent
  26. if pq[i].priority == pq[j].priority {
  27. return pq[i].timestamp.After(pq[j].timestamp)
  28. }
  29. // otherwise, pick the pqEnvelope with the higher priority
  30. return pq[i].priority > pq[j].priority
  31. }
  32. func (pq priorityQueue) Swap(i, j int) {
  33. pq[i], pq[j] = pq[j], pq[i]
  34. pq[i].index = i
  35. pq[j].index = j
  36. }
  37. func (pq *priorityQueue) Push(x interface{}) {
  38. n := len(*pq)
  39. pqEnv := x.(*pqEnvelope)
  40. pqEnv.index = n
  41. *pq = append(*pq, pqEnv)
  42. }
  43. func (pq *priorityQueue) Pop() interface{} {
  44. old := *pq
  45. n := len(old)
  46. pqEnv := old[n-1]
  47. old[n-1] = nil
  48. pqEnv.index = -1
  49. *pq = old[:n-1]
  50. return pqEnv
  51. }
  52. // Assert the priority queue scheduler implements the queue interface at
  53. // compile-time.
  54. var _ queue = (*pqScheduler)(nil)
  55. type pqScheduler struct {
  56. logger log.Logger
  57. metrics *Metrics
  58. size uint
  59. sizes map[uint]uint // cumulative priority sizes
  60. pq *priorityQueue
  61. chDescs []ChannelDescriptor
  62. capacity uint
  63. chPriorities map[ChannelID]uint
  64. enqueueCh chan Envelope
  65. dequeueCh chan Envelope
  66. closer *tmsync.Closer
  67. done *tmsync.Closer
  68. }
  69. func newPQScheduler(
  70. logger log.Logger,
  71. m *Metrics,
  72. chDescs []ChannelDescriptor,
  73. enqueueBuf, dequeueBuf, capacity uint,
  74. ) *pqScheduler {
  75. // copy each ChannelDescriptor and sort them by ascending channel priority
  76. chDescsCopy := make([]ChannelDescriptor, len(chDescs))
  77. copy(chDescsCopy, chDescs)
  78. sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority < chDescsCopy[j].Priority })
  79. var (
  80. chPriorities = make(map[ChannelID]uint)
  81. sizes = make(map[uint]uint)
  82. )
  83. for _, chDesc := range chDescsCopy {
  84. chID := ChannelID(chDesc.ID)
  85. chPriorities[chID] = uint(chDesc.Priority)
  86. sizes[uint(chDesc.Priority)] = 0
  87. }
  88. pq := make(priorityQueue, 0)
  89. heap.Init(&pq)
  90. return &pqScheduler{
  91. logger: logger.With("router", "scheduler"),
  92. metrics: m,
  93. chDescs: chDescsCopy,
  94. capacity: capacity,
  95. chPriorities: chPriorities,
  96. pq: &pq,
  97. sizes: sizes,
  98. enqueueCh: make(chan Envelope, enqueueBuf),
  99. dequeueCh: make(chan Envelope, dequeueBuf),
  100. closer: tmsync.NewCloser(),
  101. done: tmsync.NewCloser(),
  102. }
  103. }
  104. func (s *pqScheduler) enqueue() chan<- Envelope {
  105. return s.enqueueCh
  106. }
  107. func (s *pqScheduler) dequeue() <-chan Envelope {
  108. return s.dequeueCh
  109. }
  110. func (s *pqScheduler) close() {
  111. s.closer.Close()
  112. <-s.done.Done()
  113. }
  114. func (s *pqScheduler) closed() <-chan struct{} {
  115. return s.closer.Done()
  116. }
  117. // start starts non-blocking process that starts the priority queue scheduler.
  118. func (s *pqScheduler) start() {
  119. go s.process()
  120. }
  121. // process starts a block process where we listen for Envelopes to enqueue. If
  122. // there is sufficient capacity, it will be enqueued into the priority queue,
  123. // otherwise, we attempt to dequeue enough elements from the priority queue to
  124. // make room for the incoming Envelope by dropping lower priority elements. If
  125. // there isn't sufficient capacity at lower priorities for the incoming Envelope,
  126. // it is dropped.
  127. //
  128. // After we attempt to enqueue the incoming Envelope, if the priority queue is
  129. // non-empty, we pop the top Envelope and send it on the dequeueCh.
  130. func (s *pqScheduler) process() {
  131. defer s.done.Close()
  132. for {
  133. select {
  134. case e := <-s.enqueueCh:
  135. chIDStr := strconv.Itoa(int(e.channelID))
  136. pqEnv := &pqEnvelope{
  137. envelope: e,
  138. size: uint(proto.Size(e.Message)),
  139. priority: s.chPriorities[e.channelID],
  140. timestamp: time.Now().UTC(),
  141. }
  142. s.metrics.PeerPendingSendBytes.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
  143. // enqueue
  144. // Check if we have sufficient capacity to simply enqueue the incoming
  145. // Envelope.
  146. if s.size+pqEnv.size <= s.capacity {
  147. // enqueue the incoming Envelope
  148. s.push(pqEnv)
  149. } else {
  150. // There is not sufficient capacity to simply enqueue the incoming
  151. // Envelope. So we have to attempt to make room for it by dropping lower
  152. // priority Envelopes or drop the incoming Envelope otherwise.
  153. // The cumulative size of all enqueue envelopes at the incoming envelope's
  154. // priority or lower.
  155. total := s.sizes[pqEnv.priority]
  156. if total >= pqEnv.size {
  157. // There is room for the incoming Envelope, so we drop as many lower
  158. // priority Envelopes as we need to.
  159. var (
  160. canEnqueue bool
  161. tmpSize = s.size
  162. i = s.pq.Len() - 1
  163. )
  164. // Drop lower priority Envelopes until sufficient capacity exists for
  165. // the incoming Envelope
  166. for i >= 0 && !canEnqueue {
  167. pqEnvTmp := s.pq.get(i)
  168. if pqEnvTmp.priority < pqEnv.priority {
  169. if tmpSize+pqEnv.size <= s.capacity {
  170. canEnqueue = true
  171. } else {
  172. pqEnvTmpChIDStr := strconv.Itoa(int(pqEnvTmp.envelope.channelID))
  173. s.metrics.PeerQueueDroppedMsgs.With("ch_id", pqEnvTmpChIDStr).Add(1)
  174. s.logger.Debug(
  175. "dropped envelope",
  176. "ch_id", pqEnvTmpChIDStr,
  177. "priority", pqEnvTmp.priority,
  178. "msg_size", pqEnvTmp.size,
  179. "capacity", s.capacity,
  180. )
  181. // dequeue/drop from the priority queue
  182. heap.Remove(s.pq, pqEnvTmp.index)
  183. // update the size tracker
  184. tmpSize -= pqEnvTmp.size
  185. // start from the end again
  186. i = s.pq.Len() - 1
  187. }
  188. } else {
  189. i--
  190. }
  191. }
  192. // enqueue the incoming Envelope
  193. s.push(pqEnv)
  194. } else {
  195. // There is not sufficient capacity to drop lower priority Envelopes,
  196. // so we drop the incoming Envelope.
  197. s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1)
  198. s.logger.Debug(
  199. "dropped envelope",
  200. "ch_id", chIDStr,
  201. "priority", pqEnv.priority,
  202. "msg_size", pqEnv.size,
  203. "capacity", s.capacity,
  204. )
  205. }
  206. }
  207. // dequeue
  208. for s.pq.Len() > 0 {
  209. pqEnv = heap.Pop(s.pq).(*pqEnvelope)
  210. s.size -= pqEnv.size
  211. // deduct the Envelope size from all the relevant cumulative sizes
  212. for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
  213. s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
  214. }
  215. s.metrics.PeerSendBytesTotal.With(
  216. "chID", chIDStr,
  217. "peer_id", string(pqEnv.envelope.To),
  218. "message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size))
  219. select {
  220. case s.dequeueCh <- pqEnv.envelope:
  221. case <-s.closer.Done():
  222. return
  223. }
  224. }
  225. case <-s.closer.Done():
  226. return
  227. }
  228. }
  229. }
  230. func (s *pqScheduler) push(pqEnv *pqEnvelope) {
  231. chIDStr := strconv.Itoa(int(pqEnv.envelope.channelID))
  232. // enqueue the incoming Envelope
  233. heap.Push(s.pq, pqEnv)
  234. s.size += pqEnv.size
  235. s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Add(float64(pqEnv.size))
  236. // Update the cumulative sizes by adding the Envelope's size to every
  237. // priority less than or equal to it.
  238. for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
  239. s.sizes[uint(s.chDescs[i].Priority)] += pqEnv.size
  240. }
  241. }