You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

275 lines
8.1 KiB

  1. //
  2. // Written by Maxim Khitrov (November 2012)
  3. //
  4. // Package flowrate provides the tools for monitoring and limiting the flow rate
  5. // of an arbitrary data stream.
  6. package flowrate
  7. import (
  8. "math"
  9. "sync"
  10. "time"
  11. )
  12. // Monitor monitors and limits the transfer rate of a data stream.
  13. type Monitor struct {
  14. mu sync.Mutex // Mutex guarding access to all internal fields
  15. active bool // Flag indicating an active transfer
  16. start time.Duration // Transfer start time (clock() value)
  17. bytes int64 // Total number of bytes transferred
  18. samples int64 // Total number of samples taken
  19. rSample float64 // Most recent transfer rate sample (bytes per second)
  20. rEMA float64 // Exponential moving average of rSample
  21. rPeak float64 // Peak transfer rate (max of all rSamples)
  22. rWindow float64 // rEMA window (seconds)
  23. sBytes int64 // Number of bytes transferred since sLast
  24. sLast time.Duration // Most recent sample time (stop time when inactive)
  25. sRate time.Duration // Sampling rate
  26. tBytes int64 // Number of bytes expected in the current transfer
  27. tLast time.Duration // Time of the most recent transfer of at least 1 byte
  28. }
  29. // New creates a new flow control monitor. Instantaneous transfer rate is
  30. // measured and updated for each sampleRate interval. windowSize determines the
  31. // weight of each sample in the exponential moving average (EMA) calculation.
  32. // The exact formulas are:
  33. //
  34. // sampleTime = currentTime - prevSampleTime
  35. // sampleRate = byteCount / sampleTime
  36. // weight = 1 - exp(-sampleTime/windowSize)
  37. // newRate = weight*sampleRate + (1-weight)*oldRate
  38. //
  39. // The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s,
  40. // respectively.
  41. func New(sampleRate, windowSize time.Duration) *Monitor {
  42. if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
  43. sampleRate = 5 * clockRate
  44. }
  45. if windowSize <= 0 {
  46. windowSize = 1 * time.Second
  47. }
  48. now := clock()
  49. return &Monitor{
  50. active: true,
  51. start: now,
  52. rWindow: windowSize.Seconds(),
  53. sLast: now,
  54. sRate: sampleRate,
  55. tLast: now,
  56. }
  57. }
  58. // Update records the transfer of n bytes and returns n. It should be called
  59. // after each Read/Write operation, even if n is 0.
  60. func (m *Monitor) Update(n int) int {
  61. m.mu.Lock()
  62. m.update(n)
  63. m.mu.Unlock()
  64. return n
  65. }
  66. // Hack to set the current rEMA.
  67. func (m *Monitor) SetREMA(rEMA float64) {
  68. m.mu.Lock()
  69. m.rEMA = rEMA
  70. m.samples++
  71. m.mu.Unlock()
  72. }
  73. // IO is a convenience method intended to wrap io.Reader and io.Writer method
  74. // execution. It calls m.Update(n) and then returns (n, err) unmodified.
  75. func (m *Monitor) IO(n int, err error) (int, error) {
  76. return m.Update(n), err
  77. }
  78. // Done marks the transfer as finished and prevents any further updates or
  79. // limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and
  80. // Limit methods become NOOPs. It returns the total number of bytes transferred.
  81. func (m *Monitor) Done() int64 {
  82. m.mu.Lock()
  83. if now := m.update(0); m.sBytes > 0 {
  84. m.reset(now)
  85. }
  86. m.active = false
  87. m.tLast = 0
  88. n := m.bytes
  89. m.mu.Unlock()
  90. return n
  91. }
  92. // timeRemLimit is the maximum Status.TimeRem value.
  93. const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
  94. // Status represents the current Monitor status. All transfer rates are in bytes
  95. // per second rounded to the nearest byte.
  96. type Status struct {
  97. Active bool // Flag indicating an active transfer
  98. Start time.Time // Transfer start time
  99. Duration time.Duration // Time period covered by the statistics
  100. Idle time.Duration // Time since the last transfer of at least 1 byte
  101. Bytes int64 // Total number of bytes transferred
  102. Samples int64 // Total number of samples taken
  103. InstRate int64 // Instantaneous transfer rate
  104. CurRate int64 // Current transfer rate (EMA of InstRate)
  105. AvgRate int64 // Average transfer rate (Bytes / Duration)
  106. PeakRate int64 // Maximum instantaneous transfer rate
  107. BytesRem int64 // Number of bytes remaining in the transfer
  108. TimeRem time.Duration // Estimated time to completion
  109. Progress Percent // Overall transfer progress
  110. }
  111. // Status returns current transfer status information. The returned value
  112. // becomes static after a call to Done.
  113. func (m *Monitor) Status() Status {
  114. m.mu.Lock()
  115. now := m.update(0)
  116. s := Status{
  117. Active: m.active,
  118. Start: clockToTime(m.start),
  119. Duration: m.sLast - m.start,
  120. Idle: now - m.tLast,
  121. Bytes: m.bytes,
  122. Samples: m.samples,
  123. PeakRate: round(m.rPeak),
  124. BytesRem: m.tBytes - m.bytes,
  125. Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
  126. }
  127. if s.BytesRem < 0 {
  128. s.BytesRem = 0
  129. }
  130. if s.Duration > 0 {
  131. rAvg := float64(s.Bytes) / s.Duration.Seconds()
  132. s.AvgRate = round(rAvg)
  133. if s.Active {
  134. s.InstRate = round(m.rSample)
  135. s.CurRate = round(m.rEMA)
  136. if s.BytesRem > 0 {
  137. if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
  138. ns := float64(s.BytesRem) / tRate * 1e9
  139. if ns > float64(timeRemLimit) {
  140. ns = float64(timeRemLimit)
  141. }
  142. s.TimeRem = clockRound(time.Duration(ns))
  143. }
  144. }
  145. }
  146. }
  147. m.mu.Unlock()
  148. return s
  149. }
  150. // Limit restricts the instantaneous (per-sample) data flow to rate bytes per
  151. // second. It returns the maximum number of bytes (0 <= n <= want) that may be
  152. // transferred immediately without exceeding the limit. If block == true, the
  153. // call blocks until n > 0. want is returned unmodified if want < 1, rate < 1,
  154. // or the transfer is inactive (after a call to Done).
  155. //
  156. // At least one byte is always allowed to be transferred in any given sampling
  157. // period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate
  158. // is 10 bytes per second.
  159. //
  160. // For usage examples, see the implementation of Reader and Writer in io.go.
  161. func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
  162. if want < 1 || rate < 1 {
  163. return want
  164. }
  165. m.mu.Lock()
  166. // Determine the maximum number of bytes that can be sent in one sample
  167. limit := round(float64(rate) * m.sRate.Seconds())
  168. if limit <= 0 {
  169. limit = 1
  170. }
  171. // If block == true, wait until m.sBytes < limit
  172. if now := m.update(0); block {
  173. for m.sBytes >= limit && m.active {
  174. now = m.waitNextSample(now)
  175. }
  176. }
  177. // Make limit <= want (unlimited if the transfer is no longer active)
  178. if limit -= m.sBytes; limit > int64(want) || !m.active {
  179. limit = int64(want)
  180. }
  181. m.mu.Unlock()
  182. if limit < 0 {
  183. limit = 0
  184. }
  185. return int(limit)
  186. }
  187. // SetTransferSize specifies the total size of the data transfer, which allows
  188. // the Monitor to calculate the overall progress and time to completion.
  189. func (m *Monitor) SetTransferSize(bytes int64) {
  190. if bytes < 0 {
  191. bytes = 0
  192. }
  193. m.mu.Lock()
  194. m.tBytes = bytes
  195. m.mu.Unlock()
  196. }
  197. // update accumulates the transferred byte count for the current sample until
  198. // clock() - m.sLast >= m.sRate. The monitor status is updated once the current
  199. // sample is done.
  200. func (m *Monitor) update(n int) (now time.Duration) {
  201. if !m.active {
  202. return
  203. }
  204. if now = clock(); n > 0 {
  205. m.tLast = now
  206. }
  207. m.sBytes += int64(n)
  208. if sTime := now - m.sLast; sTime >= m.sRate {
  209. t := sTime.Seconds()
  210. if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
  211. m.rPeak = m.rSample
  212. }
  213. // Exponential moving average using a method similar to *nix load
  214. // average calculation. Longer sampling periods carry greater weight.
  215. if m.samples > 0 {
  216. w := math.Exp(-t / m.rWindow)
  217. m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
  218. } else {
  219. m.rEMA = m.rSample
  220. }
  221. m.reset(now)
  222. }
  223. return
  224. }
  225. // reset clears the current sample state in preparation for the next sample.
  226. func (m *Monitor) reset(sampleTime time.Duration) {
  227. m.bytes += m.sBytes
  228. m.samples++
  229. m.sBytes = 0
  230. m.sLast = sampleTime
  231. }
  232. // waitNextSample sleeps for the remainder of the current sample. The lock is
  233. // released and reacquired during the actual sleep period, so it's possible for
  234. // the transfer to be inactive when this method returns.
  235. func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
  236. const minWait = 5 * time.Millisecond
  237. current := m.sLast
  238. // sleep until the last sample time changes (ideally, just one iteration)
  239. for m.sLast == current && m.active {
  240. d := current + m.sRate - now
  241. m.mu.Unlock()
  242. if d < minWait {
  243. d = minWait
  244. }
  245. time.Sleep(d)
  246. m.mu.Lock()
  247. now = m.update(0)
  248. }
  249. return now
  250. }