You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

277 lines
8.3 KiB

  1. //
  2. // Written by Maxim Khitrov (November 2012)
  3. //
  4. // Package flowrate provides the tools for monitoring and limiting the flow rate
  5. // of an arbitrary data stream.
  6. package flowrate
  7. import (
  8. "math"
  9. "sync"
  10. "time"
  11. )
  12. // Monitor monitors and limits the transfer rate of a data stream.
  13. type Monitor struct {
  14. mu sync.Mutex // Mutex guarding access to all internal fields
  15. active bool // Flag indicating an active transfer
  16. start time.Duration // Transfer start time (clock() value)
  17. pStartAt time.Time // time of process start
  18. bytes int64 // Total number of bytes transferred
  19. samples int64 // Total number of samples taken
  20. rSample float64 // Most recent transfer rate sample (bytes per second)
  21. rEMA float64 // Exponential moving average of rSample
  22. rPeak float64 // Peak transfer rate (max of all rSamples)
  23. rWindow float64 // rEMA window (seconds)
  24. sBytes int64 // Number of bytes transferred since sLast
  25. sLast time.Duration // Most recent sample time (stop time when inactive)
  26. sRate time.Duration // Sampling rate
  27. tBytes int64 // Number of bytes expected in the current transfer
  28. tLast time.Duration // Time of the most recent transfer of at least 1 byte
  29. }
  30. // New creates a new flow control monitor. Instantaneous transfer rate is
  31. // measured and updated for each sampleRate interval. windowSize determines the
  32. // weight of each sample in the exponential moving average (EMA) calculation.
  33. // The exact formulas are:
  34. //
  35. // sampleTime = currentTime - prevSampleTime
  36. // sampleRate = byteCount / sampleTime
  37. // weight = 1 - exp(-sampleTime/windowSize)
  38. // newRate = weight*sampleRate + (1-weight)*oldRate
  39. //
  40. // The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s,
  41. // respectively.
  42. func New(startAt time.Time, sampleRate, windowSize time.Duration) *Monitor {
  43. if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
  44. sampleRate = 5 * clockRate
  45. }
  46. if windowSize <= 0 {
  47. windowSize = 1 * time.Second
  48. }
  49. now := clock(startAt)
  50. return &Monitor{
  51. active: true,
  52. start: now,
  53. rWindow: windowSize.Seconds(),
  54. sLast: now,
  55. sRate: sampleRate,
  56. tLast: now,
  57. pStartAt: startAt,
  58. }
  59. }
  60. // Update records the transfer of n bytes and returns n. It should be called
  61. // after each Read/Write operation, even if n is 0.
  62. func (m *Monitor) Update(n int) int {
  63. m.mu.Lock()
  64. m.update(n)
  65. m.mu.Unlock()
  66. return n
  67. }
  68. // Hack to set the current rEMA.
  69. func (m *Monitor) SetREMA(rEMA float64) {
  70. m.mu.Lock()
  71. m.rEMA = rEMA
  72. m.samples++
  73. m.mu.Unlock()
  74. }
  75. // IO is a convenience method intended to wrap io.Reader and io.Writer method
  76. // execution. It calls m.Update(n) and then returns (n, err) unmodified.
  77. func (m *Monitor) IO(n int, err error) (int, error) {
  78. return m.Update(n), err
  79. }
  80. // Done marks the transfer as finished and prevents any further updates or
  81. // limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and
  82. // Limit methods become NOOPs. It returns the total number of bytes transferred.
  83. func (m *Monitor) Done() int64 {
  84. m.mu.Lock()
  85. if now := m.update(0); m.sBytes > 0 {
  86. m.reset(now)
  87. }
  88. m.active = false
  89. m.tLast = 0
  90. n := m.bytes
  91. m.mu.Unlock()
  92. return n
  93. }
  94. // timeRemLimit is the maximum Status.TimeRem value.
  95. const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
  96. // Status represents the current Monitor status. All transfer rates are in bytes
  97. // per second rounded to the nearest byte.
  98. type Status struct {
  99. Start time.Time // Transfer start time
  100. Bytes int64 // Total number of bytes transferred
  101. Samples int64 // Total number of samples taken
  102. InstRate int64 // Instantaneous transfer rate
  103. CurRate int64 // Current transfer rate (EMA of InstRate)
  104. AvgRate int64 // Average transfer rate (Bytes / Duration)
  105. PeakRate int64 // Maximum instantaneous transfer rate
  106. BytesRem int64 // Number of bytes remaining in the transfer
  107. Duration time.Duration // Time period covered by the statistics
  108. Idle time.Duration // Time since the last transfer of at least 1 byte
  109. TimeRem time.Duration // Estimated time to completion
  110. Progress Percent // Overall transfer progress
  111. Active bool // Flag indicating an active transfer
  112. }
  113. // Status returns current transfer status information. The returned value
  114. // becomes static after a call to Done.
  115. func (m *Monitor) Status() Status {
  116. m.mu.Lock()
  117. now := m.update(0)
  118. s := Status{
  119. Active: m.active,
  120. Start: m.pStartAt.Add(m.start),
  121. Duration: m.sLast - m.start,
  122. Idle: now - m.tLast,
  123. Bytes: m.bytes,
  124. Samples: m.samples,
  125. PeakRate: round(m.rPeak),
  126. BytesRem: m.tBytes - m.bytes,
  127. Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
  128. }
  129. if s.BytesRem < 0 {
  130. s.BytesRem = 0
  131. }
  132. if s.Duration > 0 {
  133. rAvg := float64(s.Bytes) / s.Duration.Seconds()
  134. s.AvgRate = round(rAvg)
  135. if s.Active {
  136. s.InstRate = round(m.rSample)
  137. s.CurRate = round(m.rEMA)
  138. if s.BytesRem > 0 {
  139. if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
  140. ns := float64(s.BytesRem) / tRate * 1e9
  141. if ns > float64(timeRemLimit) {
  142. ns = float64(timeRemLimit)
  143. }
  144. s.TimeRem = clockRound(time.Duration(ns))
  145. }
  146. }
  147. }
  148. }
  149. m.mu.Unlock()
  150. return s
  151. }
  152. // Limit restricts the instantaneous (per-sample) data flow to rate bytes per
  153. // second. It returns the maximum number of bytes (0 <= n <= want) that may be
  154. // transferred immediately without exceeding the limit. If block == true, the
  155. // call blocks until n > 0. want is returned unmodified if want < 1, rate < 1,
  156. // or the transfer is inactive (after a call to Done).
  157. //
  158. // At least one byte is always allowed to be transferred in any given sampling
  159. // period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate
  160. // is 10 bytes per second.
  161. //
  162. // For usage examples, see the implementation of Reader and Writer in io.go.
  163. func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
  164. if want < 1 || rate < 1 {
  165. return want
  166. }
  167. m.mu.Lock()
  168. // Determine the maximum number of bytes that can be sent in one sample
  169. limit := round(float64(rate) * m.sRate.Seconds())
  170. if limit <= 0 {
  171. limit = 1
  172. }
  173. // If block == true, wait until m.sBytes < limit
  174. if now := m.update(0); block {
  175. for m.sBytes >= limit && m.active {
  176. now = m.waitNextSample(now)
  177. }
  178. }
  179. // Make limit <= want (unlimited if the transfer is no longer active)
  180. if limit -= m.sBytes; limit > int64(want) || !m.active {
  181. limit = int64(want)
  182. }
  183. m.mu.Unlock()
  184. if limit < 0 {
  185. limit = 0
  186. }
  187. return int(limit)
  188. }
  189. // SetTransferSize specifies the total size of the data transfer, which allows
  190. // the Monitor to calculate the overall progress and time to completion.
  191. func (m *Monitor) SetTransferSize(bytes int64) {
  192. if bytes < 0 {
  193. bytes = 0
  194. }
  195. m.mu.Lock()
  196. m.tBytes = bytes
  197. m.mu.Unlock()
  198. }
  199. // update accumulates the transferred byte count for the current sample until
  200. // clock() - m.sLast >= m.sRate. The monitor status is updated once the current
  201. // sample is done.
  202. func (m *Monitor) update(n int) (now time.Duration) {
  203. if !m.active {
  204. return
  205. }
  206. if now = clock(m.pStartAt); n > 0 {
  207. m.tLast = now
  208. }
  209. m.sBytes += int64(n)
  210. if sTime := now - m.sLast; sTime >= m.sRate {
  211. t := sTime.Seconds()
  212. if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
  213. m.rPeak = m.rSample
  214. }
  215. // Exponential moving average using a method similar to *nix load
  216. // average calculation. Longer sampling periods carry greater weight.
  217. if m.samples > 0 {
  218. w := math.Exp(-t / m.rWindow)
  219. m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
  220. } else {
  221. m.rEMA = m.rSample
  222. }
  223. m.reset(now)
  224. }
  225. return
  226. }
  227. // reset clears the current sample state in preparation for the next sample.
  228. func (m *Monitor) reset(sampleTime time.Duration) {
  229. m.bytes += m.sBytes
  230. m.samples++
  231. m.sBytes = 0
  232. m.sLast = sampleTime
  233. }
  234. // waitNextSample sleeps for the remainder of the current sample. The lock is
  235. // released and reacquired during the actual sleep period, so it's possible for
  236. // the transfer to be inactive when this method returns.
  237. func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
  238. const minWait = 5 * time.Millisecond
  239. current := m.sLast
  240. // sleep until the last sample time changes (ideally, just one iteration)
  241. for m.sLast == current && m.active {
  242. d := current + m.sRate - now
  243. m.mu.Unlock()
  244. if d < minWait {
  245. d = minWait
  246. }
  247. time.Sleep(d)
  248. m.mu.Lock()
  249. now = m.update(0)
  250. }
  251. return now
  252. }