You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

287 lines
9.9 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "sort"
  5. "strconv"
  6. "github.com/gogo/protobuf/proto"
  7. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  8. "github.com/tendermint/tendermint/libs/log"
  9. )
  10. // wrappedEnvelope wraps a p2p Envelope with its precomputed size.
  11. type wrappedEnvelope struct {
  12. envelope Envelope
  13. size uint
  14. }
  15. // assert the WDRR scheduler implements the queue interface at compile-time
  16. var _ queue = (*wdrrScheduler)(nil)
  17. // wdrrQueue implements a Weighted Deficit Round Robin (WDRR) scheduling
  18. // algorithm via the queue interface. A WDRR queue is created per peer, where
  19. // the queue will have N number of flows. Each flow corresponds to a p2p Channel,
  20. // so there are n input flows and a single output source, the peer's connection.
  21. //
  22. // The WDRR scheduler contains a shared buffer with a fixed capacity.
  23. //
  24. // Each flow has the following:
  25. // - quantum: The number of bytes that is added to the deficit counter of the
  26. // flow in each round. The flow can send at most quantum bytes at a time. Each
  27. // flow has its own unique quantum, which gives the queue its weighted nature.
  28. // A higher quantum corresponds to a higher weight/priority. The quantum is
  29. // computed as MaxSendBytes * Priority.
  30. // - deficit counter: The number of bytes that the flow is allowed to transmit
  31. // when it is its turn.
  32. //
  33. // See: https://en.wikipedia.org/wiki/Deficit_round_robin
  34. type wdrrScheduler struct {
  35. logger log.Logger
  36. metrics *Metrics
  37. chDescs []ChannelDescriptor
  38. capacity uint
  39. size uint
  40. chPriorities map[ChannelID]uint
  41. buffer map[ChannelID][]wrappedEnvelope
  42. quanta map[ChannelID]uint
  43. deficits map[ChannelID]uint
  44. closer *tmsync.Closer
  45. doneCh *tmsync.Closer
  46. enqueueCh chan Envelope
  47. dequeueCh chan Envelope
  48. }
  49. func newWDRRScheduler(
  50. logger log.Logger,
  51. m *Metrics,
  52. chDescs []ChannelDescriptor,
  53. enqueueBuf, dequeueBuf, capacity uint,
  54. ) *wdrrScheduler {
  55. // copy each ChannelDescriptor and sort them by channel priority
  56. chDescsCopy := make([]ChannelDescriptor, len(chDescs))
  57. copy(chDescsCopy, chDescs)
  58. sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority > chDescsCopy[j].Priority })
  59. var (
  60. buffer = make(map[ChannelID][]wrappedEnvelope)
  61. chPriorities = make(map[ChannelID]uint)
  62. quanta = make(map[ChannelID]uint)
  63. deficits = make(map[ChannelID]uint)
  64. )
  65. for _, chDesc := range chDescsCopy {
  66. chID := ChannelID(chDesc.ID)
  67. chPriorities[chID] = uint(chDesc.Priority)
  68. buffer[chID] = make([]wrappedEnvelope, 0)
  69. quanta[chID] = chDesc.MaxSendBytes * uint(chDesc.Priority)
  70. }
  71. return &wdrrScheduler{
  72. logger: logger.With("queue", "wdrr"),
  73. metrics: m,
  74. capacity: capacity,
  75. chPriorities: chPriorities,
  76. chDescs: chDescsCopy,
  77. buffer: buffer,
  78. quanta: quanta,
  79. deficits: deficits,
  80. closer: tmsync.NewCloser(),
  81. doneCh: tmsync.NewCloser(),
  82. enqueueCh: make(chan Envelope, enqueueBuf),
  83. dequeueCh: make(chan Envelope, dequeueBuf),
  84. }
  85. }
  86. // enqueue returns an unbuffered write-only channel which a producer can send on.
  87. func (s *wdrrScheduler) enqueue() chan<- Envelope {
  88. return s.enqueueCh
  89. }
  90. // dequeue returns an unbuffered read-only channel which a consumer can read from.
  91. func (s *wdrrScheduler) dequeue() <-chan Envelope {
  92. return s.dequeueCh
  93. }
  94. func (s *wdrrScheduler) closed() <-chan struct{} {
  95. return s.closer.Done()
  96. }
  97. // close closes the WDRR queue. After this call enqueue() will block, so the
  98. // caller must select on closed() as well to avoid blocking forever. The
  99. // enqueue() and dequeue() along with the internal channels will NOT be closed.
  100. // Note, close() will block until all externally spawned goroutines have exited.
  101. func (s *wdrrScheduler) close() {
  102. s.closer.Close()
  103. <-s.doneCh.Done()
  104. }
  105. // start starts the WDRR queue process in a blocking goroutine. This must be
  106. // called before the queue can start to process and accept Envelopes.
  107. func (s *wdrrScheduler) start() {
  108. go s.process()
  109. }
  110. // process starts a blocking WDRR scheduler process, where we continuously
  111. // evaluate if we need to attempt to enqueue an Envelope or schedule Envelopes
  112. // to be dequeued and subsequently read and sent on the source connection.
  113. // Internally, each p2p Channel maps to a flow, where each flow has a deficit
  114. // and a quantum.
  115. //
  116. // For each Envelope requested to be enqueued, we evaluate if there is sufficient
  117. // capacity in the shared buffer to add the Envelope. If so, it is added.
  118. // Otherwise, we evaluate all flows of lower priority where we attempt find an
  119. // existing Envelope in the shared buffer of sufficient size that can be dropped
  120. // in place of the incoming Envelope. If there is no such Envelope that can be
  121. // dropped, then the incoming Envelope is dropped.
  122. //
  123. // When there is nothing to be enqueued, we perform the WDRR algorithm and
  124. // determine which Envelopes can be dequeued. For each Envelope that can be
  125. // dequeued, it is sent on the dequeueCh. Specifically, for each flow, if it is
  126. // non-empty, its deficit counter is incremented by its quantum value. Then, the
  127. // value of the deficit counter is a maximal amount of bytes that can be sent at
  128. // this round. If the deficit counter is greater than the Envelopes's message
  129. // size at the head of the queue (HoQ), this envelope can be sent and the value
  130. // of the counter is decremented by the message's size. Then, the size of the
  131. // next Envelopes's message is compared to the counter value, etc. Once the flow
  132. // is empty or the value of the counter is insufficient, the scheduler will skip
  133. // to the next flow. If the flow is empty, the value of the deficit counter is
  134. // reset to 0.
  135. //
  136. // XXX/TODO: Evaluate the single goroutine scheduler mechanism. In other words,
  137. // evaluate the effectiveness and performance of having a single goroutine
  138. // perform handling both enqueueing and dequeueing logic. Specifically, there
  139. // is potentially contention between reading off of enqueueCh and trying to
  140. // enqueue while also attempting to perform the WDRR algorithm and find the next
  141. // set of Envelope(s) to send on the dequeueCh. Alternatively, we could consider
  142. // separate scheduling goroutines, but then that requires the use of mutexes and
  143. // possibly a degrading performance.
  144. func (s *wdrrScheduler) process() {
  145. defer s.doneCh.Close()
  146. for {
  147. select {
  148. case <-s.closer.Done():
  149. return
  150. case e := <-s.enqueueCh:
  151. // attempt to enqueue the incoming Envelope
  152. chIDStr := strconv.Itoa(int(e.channelID))
  153. wEnv := wrappedEnvelope{envelope: e, size: uint(proto.Size(e.Message))}
  154. msgSize := wEnv.size
  155. s.metrics.PeerPendingSendBytes.With("peer_id", string(e.To)).Add(float64(msgSize))
  156. // If we're at capacity, we need to either drop the incoming Envelope or
  157. // an Envelope from a lower priority flow. Otherwise, we add the (wrapped)
  158. // envelope to the flow's queue.
  159. if s.size+wEnv.size > s.capacity {
  160. chPriority := s.chPriorities[e.channelID]
  161. var (
  162. canDrop bool
  163. dropIdx int
  164. dropChID ChannelID
  165. )
  166. // Evaluate all lower priority flows and determine if there exists an
  167. // Envelope that is of equal or greater size that we can drop in favor
  168. // of the incoming Envelope.
  169. for i := len(s.chDescs) - 1; i >= 0 && uint(s.chDescs[i].Priority) < chPriority && !canDrop; i-- {
  170. currChID := ChannelID(s.chDescs[i].ID)
  171. flow := s.buffer[currChID]
  172. for j := 0; j < len(flow) && !canDrop; j++ {
  173. if flow[j].size >= wEnv.size {
  174. canDrop = true
  175. dropIdx = j
  176. dropChID = currChID
  177. break
  178. }
  179. }
  180. }
  181. // If we can drop an existing Envelope, drop it and enqueue the incoming
  182. // Envelope.
  183. if canDrop {
  184. chIDStr = strconv.Itoa(int(dropChID))
  185. chPriority = s.chPriorities[dropChID]
  186. msgSize = s.buffer[dropChID][dropIdx].size
  187. // Drop Envelope for the lower priority flow and update the queue's
  188. // buffer size
  189. s.size -= msgSize
  190. s.buffer[dropChID] = append(s.buffer[dropChID][:dropIdx], s.buffer[dropChID][dropIdx+1:]...)
  191. // add the incoming Envelope and update queue's buffer size
  192. s.size += wEnv.size
  193. s.buffer[e.channelID] = append(s.buffer[e.channelID], wEnv)
  194. s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(wEnv.size))
  195. }
  196. // We either dropped the incoming Enevelope or one from an existing
  197. // lower priority flow.
  198. s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1)
  199. s.logger.Debug(
  200. "dropped envelope",
  201. "ch_id", chIDStr,
  202. "priority", chPriority,
  203. "capacity", s.capacity,
  204. "msg_size", msgSize,
  205. )
  206. } else {
  207. // we have sufficient capacity to enqueue the incoming Envelope
  208. s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(wEnv.size))
  209. s.buffer[e.channelID] = append(s.buffer[e.channelID], wEnv)
  210. s.size += wEnv.size
  211. }
  212. default:
  213. // perform the WDRR algorithm
  214. for _, chDesc := range s.chDescs {
  215. chID := ChannelID(chDesc.ID)
  216. // only consider non-empty flows
  217. if len(s.buffer[chID]) > 0 {
  218. // bump flow's quantum
  219. s.deficits[chID] += s.quanta[chID]
  220. // grab the flow's current deficit counter and HoQ (wrapped) Envelope
  221. d := s.deficits[chID]
  222. we := s.buffer[chID][0]
  223. // While the flow is non-empty and we can send the current Envelope
  224. // on the dequeueCh:
  225. //
  226. // 1. send the Envelope
  227. // 2. update the scheduler's shared buffer's size
  228. // 3. update the flow's deficit
  229. // 4. remove from the flow's queue
  230. // 5. grab the next HoQ Envelope and flow's deficit
  231. for len(s.buffer[chID]) > 0 && d >= we.size {
  232. s.metrics.PeerSendBytesTotal.With(
  233. "chID", fmt.Sprint(chID),
  234. "peer_id", string(we.envelope.To)).Add(float64(we.size))
  235. s.dequeueCh <- we.envelope
  236. s.size -= we.size
  237. s.deficits[chID] -= we.size
  238. s.buffer[chID] = s.buffer[chID][1:]
  239. if len(s.buffer[chID]) > 0 {
  240. d = s.deficits[chID]
  241. we = s.buffer[chID][0]
  242. }
  243. }
  244. }
  245. // reset the flow's deficit to zero if it is empty
  246. if len(s.buffer[chID]) == 0 {
  247. s.deficits[chID] = 0
  248. }
  249. }
  250. }
  251. }
  252. }