You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

289 lines
9.9 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "sort"
  5. "strconv"
  6. "github.com/gogo/protobuf/proto"
  7. "github.com/tendermint/tendermint/libs/log"
  8. tmsync "github.com/tendermint/tendermint/libs/sync"
  9. )
  10. const defaultCapacity uint = 1048576 // 1MB
  11. // wrappedEnvelope wraps a p2p Envelope with its precomputed size.
  12. type wrappedEnvelope struct {
  13. envelope Envelope
  14. size uint
  15. }
  16. // assert the WDDR scheduler implements the queue interface at compile-time
  17. var _ queue = (*wdrrScheduler)(nil)
  18. // wdrrQueue implements a Weighted Deficit Round Robin (WDRR) scheduling
  19. // algorithm via the queue interface. A WDRR queue is created per peer, where
  20. // the queue will have N number of flows. Each flow corresponds to a p2p Channel,
  21. // so there are n input flows and a single output source, the peer's connection.
  22. //
  23. // The WDRR scheduler contains a shared buffer with a fixed capacity.
  24. //
  25. // Each flow has the following:
  26. // - quantum: The number of bytes that is added to the deficit counter of the
  27. // flow in each round. The flow can send at most quantum bytes at a time. Each
  28. // flow has its own unique quantum, which gives the queue its weighted nature.
  29. // A higher quantum corresponds to a higher weight/priority. The quantum is
  30. // computed as MaxSendBytes * Priority.
  31. // - deficit counter: The number of bytes that the flow is allowed to transmit
  32. // when it is its turn.
  33. //
  34. // See: https://en.wikipedia.org/wiki/Deficit_round_robin
  35. type wdrrScheduler struct {
  36. logger log.Logger
  37. metrics *Metrics
  38. chDescs []ChannelDescriptor
  39. capacity uint
  40. size uint
  41. chPriorities map[ChannelID]uint
  42. buffer map[ChannelID][]wrappedEnvelope
  43. quanta map[ChannelID]uint
  44. deficits map[ChannelID]uint
  45. closer *tmsync.Closer
  46. doneCh *tmsync.Closer
  47. enqueueCh chan Envelope
  48. dequeueCh chan Envelope
  49. }
  50. func newWDRRScheduler(
  51. logger log.Logger,
  52. m *Metrics,
  53. chDescs []ChannelDescriptor,
  54. enqueueBuf, dequeueBuf, capacity uint,
  55. ) *wdrrScheduler {
  56. // copy each ChannelDescriptor and sort them by channel priority
  57. chDescsCopy := make([]ChannelDescriptor, len(chDescs))
  58. copy(chDescsCopy, chDescs)
  59. sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority > chDescsCopy[j].Priority })
  60. var (
  61. buffer = make(map[ChannelID][]wrappedEnvelope)
  62. chPriorities = make(map[ChannelID]uint)
  63. quanta = make(map[ChannelID]uint)
  64. deficits = make(map[ChannelID]uint)
  65. )
  66. for _, chDesc := range chDescsCopy {
  67. chID := ChannelID(chDesc.ID)
  68. chPriorities[chID] = uint(chDesc.Priority)
  69. buffer[chID] = make([]wrappedEnvelope, 0)
  70. quanta[chID] = chDesc.MaxSendBytes * uint(chDesc.Priority)
  71. }
  72. return &wdrrScheduler{
  73. logger: logger.With("queue", "wdrr"),
  74. metrics: m,
  75. capacity: capacity,
  76. chPriorities: chPriorities,
  77. chDescs: chDescsCopy,
  78. buffer: buffer,
  79. quanta: quanta,
  80. deficits: deficits,
  81. closer: tmsync.NewCloser(),
  82. doneCh: tmsync.NewCloser(),
  83. enqueueCh: make(chan Envelope, enqueueBuf),
  84. dequeueCh: make(chan Envelope, dequeueBuf),
  85. }
  86. }
  87. // enqueue returns an unbuffered write-only channel which a producer can send on.
  88. func (s *wdrrScheduler) enqueue() chan<- Envelope {
  89. return s.enqueueCh
  90. }
  91. // dequeue returns an unbuffered read-only channel which a consumer can read from.
  92. func (s *wdrrScheduler) dequeue() <-chan Envelope {
  93. return s.dequeueCh
  94. }
  95. func (s *wdrrScheduler) closed() <-chan struct{} {
  96. return s.closer.Done()
  97. }
  98. // close closes the WDRR queue. After this call enqueue() will block, so the
  99. // caller must select on closed() as well to avoid blocking forever. The
  100. // enqueue() and dequeue() along with the internal channels will NOT be closed.
  101. // Note, close() will block until all externally spawned goroutines have exited.
  102. func (s *wdrrScheduler) close() {
  103. s.closer.Close()
  104. <-s.doneCh.Done()
  105. }
  106. // start starts the WDRR queue process in a blocking goroutine. This must be
  107. // called before the queue can start to process and accept Envelopes.
  108. func (s *wdrrScheduler) start() {
  109. go s.process()
  110. }
  111. // process starts a blocking WDRR scheduler process, where we continuously
  112. // evaluate if we need to attempt to enqueue an Envelope or schedule Envelopes
  113. // to be dequeued and subsequently read and sent on the source connection.
  114. // Internally, each p2p Channel maps to a flow, where each flow has a deficit
  115. // and a quantum.
  116. //
  117. // For each Envelope requested to be enqueued, we evaluate if there is sufficient
  118. // capacity in the shared buffer to add the Envelope. If so, it is added.
  119. // Otherwise, we evaluate all flows of lower priority where we attempt find an
  120. // existing Envelope in the shared buffer of sufficient size that can be dropped
  121. // in place of the incoming Envelope. If there is no such Envelope that can be
  122. // dropped, then the incoming Envelope is dropped.
  123. //
  124. // When there is nothing to be enqueued, we perform the WDRR algorithm and
  125. // determine which Envelopes can be dequeued. For each Envelope that can be
  126. // dequeued, it is sent on the dequeueCh. Specifically, for each flow, if it is
  127. // non-empty, its deficit counter is incremented by its quantum value. Then, the
  128. // value of the deficit counter is a maximal amount of bytes that can be sent at
  129. // this round. If the deficit counter is greater than the Envelopes's message
  130. // size at the head of the queue (HoQ), this envelope can be sent and the value
  131. // of the counter is decremented by the message's size. Then, the size of the
  132. // next Envelopes's message is compared to the counter value, etc. Once the flow
  133. // is empty or the value of the counter is insufficient, the scheduler will skip
  134. // to the next flow. If the flow is empty, the value of the deficit counter is
  135. // reset to 0.
  136. //
  137. // XXX/TODO: Evaluate the single goroutine scheduler mechanism. In other words,
  138. // evaluate the effectiveness and performance of having a single goroutine
  139. // perform handling both enqueueing and dequeueing logic. Specifically, there
  140. // is potentially contention between reading off of enqueueCh and trying to
  141. // enqueue while also attempting to perform the WDRR algorithm and find the next
  142. // set of Envelope(s) to send on the dequeueCh. Alternatively, we could consider
  143. // separate scheduling goroutines, but then that requires the use of mutexes and
  144. // possibly a degrading performance.
  145. func (s *wdrrScheduler) process() {
  146. defer s.doneCh.Close()
  147. for {
  148. select {
  149. case <-s.closer.Done():
  150. return
  151. case e := <-s.enqueueCh:
  152. // attempt to enqueue the incoming Envelope
  153. chIDStr := strconv.Itoa(int(e.channelID))
  154. wEnv := wrappedEnvelope{envelope: e, size: uint(proto.Size(e.Message))}
  155. msgSize := wEnv.size
  156. s.metrics.PeerPendingSendBytes.With("peer_id", string(e.To)).Add(float64(msgSize))
  157. // If we're at capacity, we need to either drop the incoming Envelope or
  158. // an Envelope from a lower priority flow. Otherwise, we add the (wrapped)
  159. // envelope to the flow's queue.
  160. if s.size+wEnv.size > s.capacity {
  161. chPriority := s.chPriorities[e.channelID]
  162. var (
  163. canDrop bool
  164. dropIdx int
  165. dropChID ChannelID
  166. )
  167. // Evaluate all lower priority flows and determine if there exists an
  168. // Envelope that is of equal or greater size that we can drop in favor
  169. // of the incoming Envelope.
  170. for i := len(s.chDescs) - 1; i >= 0 && uint(s.chDescs[i].Priority) < chPriority && !canDrop; i-- {
  171. currChID := ChannelID(s.chDescs[i].ID)
  172. flow := s.buffer[currChID]
  173. for j := 0; j < len(flow) && !canDrop; j++ {
  174. if flow[j].size >= wEnv.size {
  175. canDrop = true
  176. dropIdx = j
  177. dropChID = currChID
  178. break
  179. }
  180. }
  181. }
  182. // If we can drop an existing Envelope, drop it and enqueue the incoming
  183. // Envelope.
  184. if canDrop {
  185. chIDStr = strconv.Itoa(int(dropChID))
  186. chPriority = s.chPriorities[dropChID]
  187. msgSize = s.buffer[dropChID][dropIdx].size
  188. // Drop Envelope for the lower priority flow and update the queue's
  189. // buffer size
  190. s.size -= msgSize
  191. s.buffer[dropChID] = append(s.buffer[dropChID][:dropIdx], s.buffer[dropChID][dropIdx+1:]...)
  192. // add the incoming Envelope and update queue's buffer size
  193. s.size += wEnv.size
  194. s.buffer[e.channelID] = append(s.buffer[e.channelID], wEnv)
  195. s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(wEnv.size))
  196. }
  197. // We either dropped the incoming Enevelope or one from an existing
  198. // lower priority flow.
  199. s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1)
  200. s.logger.Debug(
  201. "dropped envelope",
  202. "ch_id", chIDStr,
  203. "priority", chPriority,
  204. "capacity", s.capacity,
  205. "msg_size", msgSize,
  206. )
  207. } else {
  208. // we have sufficient capacity to enqueue the incoming Envelope
  209. s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(wEnv.size))
  210. s.buffer[e.channelID] = append(s.buffer[e.channelID], wEnv)
  211. s.size += wEnv.size
  212. }
  213. default:
  214. // perform the WDRR algorithm
  215. for _, chDesc := range s.chDescs {
  216. chID := ChannelID(chDesc.ID)
  217. // only consider non-empty flows
  218. if len(s.buffer[chID]) > 0 {
  219. // bump flow's quantum
  220. s.deficits[chID] += s.quanta[chID]
  221. // grab the flow's current deficit counter and HoQ (wrapped) Envelope
  222. d := s.deficits[chID]
  223. we := s.buffer[chID][0]
  224. // While the flow is non-empty and we can send the current Envelope
  225. // on the dequeueCh:
  226. //
  227. // 1. send the Envelope
  228. // 2. update the scheduler's shared buffer's size
  229. // 3. update the flow's deficit
  230. // 4. remove from the flow's queue
  231. // 5. grab the next HoQ Envelope and flow's deficit
  232. for len(s.buffer[chID]) > 0 && d >= we.size {
  233. s.metrics.PeerSendBytesTotal.With(
  234. "chID", fmt.Sprint(chID),
  235. "peer_id", string(we.envelope.To)).Add(float64(we.size))
  236. s.dequeueCh <- we.envelope
  237. s.size -= we.size
  238. s.deficits[chID] -= we.size
  239. s.buffer[chID] = s.buffer[chID][1:]
  240. if len(s.buffer[chID]) > 0 {
  241. d = s.deficits[chID]
  242. we = s.buffer[chID][0]
  243. }
  244. }
  245. }
  246. // reset the flow's deficit to zero if it is empty
  247. if len(s.buffer[chID]) == 0 {
  248. s.deficits[chID] = 0
  249. }
  250. }
  251. }
  252. }
  253. }