@ -0,0 +1,267 @@ | |||||
// | |||||
// Written by Maxim Khitrov (November 2012) | |||||
// | |||||
// Package flowcontrol provides the tools for monitoring and limiting the | |||||
// transfer rate of an arbitrary data stream. | |||||
package flowcontrol | |||||
import ( | |||||
"math" | |||||
"sync" | |||||
"time" | |||||
) | |||||
// Monitor monitors and limits the transfer rate of a data stream. | |||||
type Monitor struct { | |||||
mu sync.Mutex // Mutex guarding access to all internal fields | |||||
active bool // Flag indicating an active transfer | |||||
start time.Duration // Transfer start time (clock() value) | |||||
bytes int64 // Total number of bytes transferred | |||||
samples int64 // Total number of samples taken | |||||
rSample float64 // Most recent transfer rate sample (bytes per second) | |||||
rEMA float64 // Exponential moving average of rSample | |||||
rPeak float64 // Peak transfer rate (max of all rSamples) | |||||
rWindow float64 // rEMA window (seconds) | |||||
sBytes int64 // Number of bytes transferred since sLast | |||||
sLast time.Duration // Most recent sample time (stop time when inactive) | |||||
sRate time.Duration // Sampling rate | |||||
tBytes int64 // Number of bytes expected in the current transfer | |||||
tLast time.Duration // Time of the most recent transfer of at least 1 byte | |||||
} | |||||
// New creates a new flow control monitor. Instantaneous transfer rate is | |||||
// measured and updated for each sampleRate interval. windowSize determines the | |||||
// weight of each sample in the exponential moving average (EMA) calculation. | |||||
// The exact formulas are: | |||||
// | |||||
// sampleTime = currentTime - prevSampleTime | |||||
// sampleRate = byteCount / sampleTime | |||||
// weight = 1 - exp(-sampleTime/windowSize) | |||||
// newRate = weight*sampleRate + (1-weight)*oldRate | |||||
// | |||||
// The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s, | |||||
// respectively. | |||||
func New(sampleRate, windowSize time.Duration) *Monitor { | |||||
if sampleRate = clockRound(sampleRate); sampleRate <= 0 { | |||||
sampleRate = 5 * clockRate | |||||
} | |||||
if windowSize <= 0 { | |||||
windowSize = 1 * time.Second | |||||
} | |||||
now := clock() | |||||
return &Monitor{ | |||||
active: true, | |||||
start: now, | |||||
rWindow: windowSize.Seconds(), | |||||
sLast: now, | |||||
sRate: sampleRate, | |||||
tLast: now, | |||||
} | |||||
} | |||||
// Update records the transfer of n bytes and returns n. It should be called | |||||
// after each Read/Write operation, even if n is 0. | |||||
func (m *Monitor) Update(n int) int { | |||||
m.mu.Lock() | |||||
m.update(n) | |||||
m.mu.Unlock() | |||||
return n | |||||
} | |||||
// IO is a convenience method intended to wrap io.Reader and io.Writer method | |||||
// execution. It calls m.Update(n) and then returns (n, err) unmodified. | |||||
func (m *Monitor) IO(n int, err error) (int, error) { | |||||
return m.Update(n), err | |||||
} | |||||
// Done marks the transfer as finished and prevents any further updates or | |||||
// limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and | |||||
// Limit methods become NOOPs. It returns the total number of bytes transferred. | |||||
func (m *Monitor) Done() int64 { | |||||
m.mu.Lock() | |||||
if now := m.update(0); m.sBytes > 0 { | |||||
m.reset(now) | |||||
} | |||||
m.active = false | |||||
m.tLast = 0 | |||||
n := m.bytes | |||||
m.mu.Unlock() | |||||
return n | |||||
} | |||||
// timeRemLimit is the maximum Status.TimeRem value. | |||||
const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second | |||||
// Status represents the current Monitor status. All transfer rates are in bytes | |||||
// per second rounded to the nearest byte. | |||||
type Status struct { | |||||
Active bool // Flag indicating an active transfer | |||||
Start time.Time // Transfer start time | |||||
Duration time.Duration // Time period covered by the statistics | |||||
Idle time.Duration // Time since the last transfer of at least 1 byte | |||||
Bytes int64 // Total number of bytes transferred | |||||
Samples int64 // Total number of samples taken | |||||
InstRate int64 // Instantaneous transfer rate | |||||
CurRate int64 // Current transfer rate (EMA of InstRate) | |||||
AvgRate int64 // Average transfer rate (Bytes / Duration) | |||||
PeakRate int64 // Maximum instantaneous transfer rate | |||||
BytesRem int64 // Number of bytes remaining in the transfer | |||||
TimeRem time.Duration // Estimated time to completion | |||||
Progress Percent // Overall transfer progress | |||||
} | |||||
// Status returns current transfer status information. The returned value | |||||
// becomes static after a call to Done. | |||||
func (m *Monitor) Status() Status { | |||||
m.mu.Lock() | |||||
now := m.update(0) | |||||
s := Status{ | |||||
Active: m.active, | |||||
Start: clockToTime(m.start), | |||||
Duration: m.sLast - m.start, | |||||
Idle: now - m.tLast, | |||||
Bytes: m.bytes, | |||||
Samples: m.samples, | |||||
PeakRate: round(m.rPeak), | |||||
BytesRem: m.tBytes - m.bytes, | |||||
Progress: percentOf(float64(m.bytes), float64(m.tBytes)), | |||||
} | |||||
if s.BytesRem < 0 { | |||||
s.BytesRem = 0 | |||||
} | |||||
if s.Duration > 0 { | |||||
rAvg := float64(s.Bytes) / s.Duration.Seconds() | |||||
s.AvgRate = round(rAvg) | |||||
if s.Active { | |||||
s.InstRate = round(m.rSample) | |||||
s.CurRate = round(m.rEMA) | |||||
if s.BytesRem > 0 { | |||||
if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 { | |||||
ns := float64(s.BytesRem) / tRate * 1e9 | |||||
if ns > float64(timeRemLimit) { | |||||
ns = float64(timeRemLimit) | |||||
} | |||||
s.TimeRem = clockRound(time.Duration(ns)) | |||||
} | |||||
} | |||||
} | |||||
} | |||||
m.mu.Unlock() | |||||
return s | |||||
} | |||||
// Limit restricts the instantaneous (per-sample) data flow to rate bytes per | |||||
// second. It returns the maximum number of bytes (0 <= n <= want) that may be | |||||
// transferred immediately without exceeding the limit. If block == true, the | |||||
// call blocks until n > 0. want is returned unmodified if want < 1, rate < 1, | |||||
// or the transfer is inactive (after a call to Done). | |||||
// | |||||
// At least one byte is always allowed to be transferred in any given sampling | |||||
// period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate | |||||
// is 10 bytes per second. | |||||
// | |||||
// For usage examples, see the implementation of Reader and Writer in io.go. | |||||
func (m *Monitor) Limit(want int, rate int64, block bool) (n int) { | |||||
if want < 1 || rate < 1 { | |||||
return want | |||||
} | |||||
m.mu.Lock() | |||||
// Determine the maximum number of bytes that can be sent in one sample | |||||
limit := round(float64(rate) * m.sRate.Seconds()) | |||||
if limit <= 0 { | |||||
limit = 1 | |||||
} | |||||
// If block == true, wait until m.sBytes < limit | |||||
if now := m.update(0); block { | |||||
for m.sBytes >= limit && m.active { | |||||
now = m.waitNextSample(now) | |||||
} | |||||
} | |||||
// Make limit <= want (unlimited if the transfer is no longer active) | |||||
if limit -= m.sBytes; limit > int64(want) || !m.active { | |||||
limit = int64(want) | |||||
} | |||||
m.mu.Unlock() | |||||
if limit < 0 { | |||||
limit = 0 | |||||
} | |||||
return int(limit) | |||||
} | |||||
// SetTransferSize specifies the total size of the data transfer, which allows | |||||
// the Monitor to calculate the overall progress and time to completion. | |||||
func (m *Monitor) SetTransferSize(bytes int64) { | |||||
if bytes < 0 { | |||||
bytes = 0 | |||||
} | |||||
m.mu.Lock() | |||||
m.tBytes = bytes | |||||
m.mu.Unlock() | |||||
} | |||||
// update accumulates the transferred byte count for the current sample until | |||||
// clock() - m.sLast >= m.sRate. The monitor status is updated once the current | |||||
// sample is done. | |||||
func (m *Monitor) update(n int) (now time.Duration) { | |||||
if !m.active { | |||||
return | |||||
} | |||||
if now = clock(); n > 0 { | |||||
m.tLast = now | |||||
} | |||||
m.sBytes += int64(n) | |||||
if sTime := now - m.sLast; sTime >= m.sRate { | |||||
t := sTime.Seconds() | |||||
if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak { | |||||
m.rPeak = m.rSample | |||||
} | |||||
// Exponential moving average using a method similar to *nix load | |||||
// average calculation. Longer sampling periods carry greater weight. | |||||
if m.samples > 0 { | |||||
w := math.Exp(-t / m.rWindow) | |||||
m.rEMA = m.rSample + w*(m.rEMA-m.rSample) | |||||
} else { | |||||
m.rEMA = m.rSample | |||||
} | |||||
m.reset(now) | |||||
} | |||||
return | |||||
} | |||||
// reset clears the current sample state in preparation for the next sample. | |||||
func (m *Monitor) reset(sampleTime time.Duration) { | |||||
m.bytes += m.sBytes | |||||
m.samples++ | |||||
m.sBytes = 0 | |||||
m.sLast = sampleTime | |||||
} | |||||
// waitNextSample sleeps for the remainder of the current sample. The lock is | |||||
// released and reacquired during the actual sleep period, so it's possible for | |||||
// the transfer to be inactive when this method returns. | |||||
func (m *Monitor) waitNextSample(now time.Duration) time.Duration { | |||||
const minWait = 5 * time.Millisecond | |||||
current := m.sLast | |||||
// sleep until the last sample time changes (ideally, just one iteration) | |||||
for m.sLast == current && m.active { | |||||
d := current + m.sRate - now | |||||
m.mu.Unlock() | |||||
if d < minWait { | |||||
d = minWait | |||||
} | |||||
time.Sleep(d) | |||||
m.mu.Lock() | |||||
now = m.update(0) | |||||
} | |||||
return now | |||||
} |
@ -0,0 +1,133 @@ | |||||
// | |||||
// Written by Maxim Khitrov (November 2012) | |||||
// | |||||
package flowcontrol | |||||
import ( | |||||
"errors" | |||||
"io" | |||||
) | |||||
// ErrLimit is returned by the Writer when a non-blocking write is short due to | |||||
// the transfer rate limit. | |||||
var ErrLimit = errors.New("flowcontrol: transfer rate limit exceeded") | |||||
// Limiter is implemented by the Reader and Writer to provide a consistent | |||||
// interface for monitoring and controlling data transfer. | |||||
type Limiter interface { | |||||
Done() int64 | |||||
Status() Status | |||||
SetTransferSize(bytes int64) | |||||
SetLimit(new int64) (old int64) | |||||
SetBlocking(new bool) (old bool) | |||||
} | |||||
// Reader implements io.ReadCloser with a restriction on the rate of data | |||||
// transfer. | |||||
type Reader struct { | |||||
io.Reader // Data source | |||||
*Monitor // Flow control monitor | |||||
limit int64 // Rate limit in bytes per second (unlimited when <= 0) | |||||
block bool // What to do when no new bytes can be read due to the limit | |||||
} | |||||
// NewReader restricts all Read operations on r to limit bytes per second. | |||||
func NewReader(r io.Reader, limit int64) *Reader { | |||||
return &Reader{r, New(0, 0), limit, true} | |||||
} | |||||
// Read reads up to len(p) bytes into p without exceeding the current transfer | |||||
// rate limit. It returns (0, nil) immediately if r is non-blocking and no new | |||||
// bytes can be read at this time. | |||||
func (r *Reader) Read(p []byte) (n int, err error) { | |||||
p = p[:r.Limit(len(p), r.limit, r.block)] | |||||
if len(p) > 0 { | |||||
n, err = r.IO(r.Reader.Read(p)) | |||||
} | |||||
return | |||||
} | |||||
// SetLimit changes the transfer rate limit to new bytes per second and returns | |||||
// the previous setting. | |||||
func (r *Reader) SetLimit(new int64) (old int64) { | |||||
old, r.limit = r.limit, new | |||||
return | |||||
} | |||||
// SetBlocking changes the blocking behavior and returns the previous setting. A | |||||
// Read call on a non-blocking reader returns immediately if no additional bytes | |||||
// may be read at this time due to the rate limit. | |||||
func (r *Reader) SetBlocking(new bool) (old bool) { | |||||
old, r.block = r.block, new | |||||
return | |||||
} | |||||
// Close closes the underlying reader if it implements the io.Closer interface. | |||||
func (r *Reader) Close() error { | |||||
defer r.Done() | |||||
if c, ok := r.Reader.(io.Closer); ok { | |||||
return c.Close() | |||||
} | |||||
return nil | |||||
} | |||||
// Writer implements io.WriteCloser with a restriction on the rate of data | |||||
// transfer. | |||||
type Writer struct { | |||||
io.Writer // Data destination | |||||
*Monitor // Flow control monitor | |||||
limit int64 // Rate limit in bytes per second (unlimited when <= 0) | |||||
block bool // What to do when no new bytes can be written due to the limit | |||||
} | |||||
// NewWriter restricts all Write operations on w to limit bytes per second. The | |||||
// transfer rate and the default blocking behavior (true) can be changed | |||||
// directly on the returned *Writer. | |||||
func NewWriter(w io.Writer, limit int64) *Writer { | |||||
return &Writer{w, New(0, 0), limit, true} | |||||
} | |||||
// Write writes len(p) bytes from p to the underlying data stream without | |||||
// exceeding the current transfer rate limit. It returns (n, ErrLimit) if w is | |||||
// non-blocking and no additional bytes can be written at this time. | |||||
func (w *Writer) Write(p []byte) (n int, err error) { | |||||
var c int | |||||
for len(p) > 0 && err == nil { | |||||
s := p[:w.Limit(len(p), w.limit, w.block)] | |||||
if len(s) > 0 { | |||||
c, err = w.IO(w.Writer.Write(s)) | |||||
} else { | |||||
return n, ErrLimit | |||||
} | |||||
p = p[c:] | |||||
n += c | |||||
} | |||||
return | |||||
} | |||||
// SetLimit changes the transfer rate limit to new bytes per second and returns | |||||
// the previous setting. | |||||
func (w *Writer) SetLimit(new int64) (old int64) { | |||||
old, w.limit = w.limit, new | |||||
return | |||||
} | |||||
// SetBlocking changes the blocking behavior and returns the previous setting. A | |||||
// Write call on a non-blocking writer returns as soon as no additional bytes | |||||
// may be written at this time due to the rate limit. | |||||
func (w *Writer) SetBlocking(new bool) (old bool) { | |||||
old, w.block = w.block, new | |||||
return | |||||
} | |||||
// Close closes the underlying writer if it implements the io.Closer interface. | |||||
func (w *Writer) Close() error { | |||||
defer w.Done() | |||||
if c, ok := w.Writer.(io.Closer); ok { | |||||
return c.Close() | |||||
} | |||||
return nil | |||||
} |
@ -0,0 +1,146 @@ | |||||
// | |||||
// Written by Maxim Khitrov (November 2012) | |||||
// | |||||
package flowcontrol | |||||
import ( | |||||
"bytes" | |||||
"reflect" | |||||
"testing" | |||||
"time" | |||||
) | |||||
const ( | |||||
_50ms = 50 * time.Millisecond | |||||
_100ms = 100 * time.Millisecond | |||||
_200ms = 200 * time.Millisecond | |||||
_300ms = 300 * time.Millisecond | |||||
_400ms = 400 * time.Millisecond | |||||
_500ms = 500 * time.Millisecond | |||||
) | |||||
func nextStatus(m *Monitor) Status { | |||||
samples := m.samples | |||||
for i := 0; i < 30; i++ { | |||||
if s := m.Status(); s.Samples != samples { | |||||
return s | |||||
} | |||||
time.Sleep(5 * time.Millisecond) | |||||
} | |||||
return m.Status() | |||||
} | |||||
func TestReader(t *testing.T) { | |||||
in := make([]byte, 100) | |||||
for i := range in { | |||||
in[i] = byte(i) | |||||
} | |||||
b := make([]byte, 100) | |||||
r := NewReader(bytes.NewReader(in), 100) | |||||
start := time.Now() | |||||
// Make sure r implements Limiter | |||||
_ = Limiter(r) | |||||
// 1st read of 10 bytes is performed immediately | |||||
if n, err := r.Read(b); n != 10 || err != nil { | |||||
t.Fatalf("r.Read(b) expected 10 (<nil>); got %v (%v)", n, err) | |||||
} else if rt := time.Since(start); rt > _50ms { | |||||
t.Fatalf("r.Read(b) took too long (%v)", rt) | |||||
} | |||||
// No new Reads allowed in the current sample | |||||
r.SetBlocking(false) | |||||
if n, err := r.Read(b); n != 0 || err != nil { | |||||
t.Fatalf("r.Read(b) expected 0 (<nil>); got %v (%v)", n, err) | |||||
} else if rt := time.Since(start); rt > _50ms { | |||||
t.Fatalf("r.Read(b) took too long (%v)", rt) | |||||
} | |||||
status := [6]Status{0: r.Status()} // No samples in the first status | |||||
// 2nd read of 10 bytes blocks until the next sample | |||||
r.SetBlocking(true) | |||||
if n, err := r.Read(b[10:]); n != 10 || err != nil { | |||||
t.Fatalf("r.Read(b[10:]) expected 10 (<nil>); got %v (%v)", n, err) | |||||
} else if rt := time.Since(start); rt < _100ms { | |||||
t.Fatalf("r.Read(b[10:]) returned ahead of time (%v)", rt) | |||||
} | |||||
status[1] = r.Status() // 1st sample | |||||
status[2] = nextStatus(r.Monitor) // 2nd sample | |||||
status[3] = nextStatus(r.Monitor) // No activity for the 3rd sample | |||||
if n := r.Done(); n != 20 { | |||||
t.Fatalf("r.Done() expected 20; got %v", n) | |||||
} | |||||
status[4] = r.Status() | |||||
status[5] = nextStatus(r.Monitor) // Timeout | |||||
start = status[0].Start | |||||
// Active, Start, Duration, Idle, Bytes, Samples, InstRate, CurRate, AvgRate, PeakRate, BytesRem, TimeRem, Progress | |||||
want := []Status{ | |||||
Status{true, start, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, | |||||
Status{true, start, _100ms, 0, 10, 1, 100, 100, 100, 100, 0, 0, 0}, | |||||
Status{true, start, _200ms, _100ms, 20, 2, 100, 100, 100, 100, 0, 0, 0}, | |||||
Status{true, start, _300ms, _200ms, 20, 3, 0, 90, 67, 100, 0, 0, 0}, | |||||
Status{false, start, _300ms, 0, 20, 3, 0, 0, 67, 100, 0, 0, 0}, | |||||
Status{false, start, _300ms, 0, 20, 3, 0, 0, 67, 100, 0, 0, 0}, | |||||
} | |||||
for i, s := range status { | |||||
if !reflect.DeepEqual(&s, &want[i]) { | |||||
t.Errorf("r.Status(%v) expected %v; got %v", i, want[i], s) | |||||
} | |||||
} | |||||
if !bytes.Equal(b[:20], in[:20]) { | |||||
t.Errorf("r.Read() input doesn't match output") | |||||
} | |||||
} | |||||
func TestWriter(t *testing.T) { | |||||
b := make([]byte, 100) | |||||
for i := range b { | |||||
b[i] = byte(i) | |||||
} | |||||
w := NewWriter(&bytes.Buffer{}, 200) | |||||
start := time.Now() | |||||
// Make sure w implements Limiter | |||||
_ = Limiter(w) | |||||
// Non-blocking 20-byte write for the first sample returns ErrLimit | |||||
w.SetBlocking(false) | |||||
if n, err := w.Write(b); n != 20 || err != ErrLimit { | |||||
t.Fatalf("w.Write(b) expected 20 (ErrLimit); got %v (%v)", n, err) | |||||
} else if rt := time.Since(start); rt > _50ms { | |||||
t.Fatalf("w.Write(b) took too long (%v)", rt) | |||||
} | |||||
// Blocking 80-byte write | |||||
w.SetBlocking(true) | |||||
if n, err := w.Write(b[20:]); n != 80 || err != nil { | |||||
t.Fatalf("w.Write(b[20:]) expected 80 (<nil>); got %v (%v)", n, err) | |||||
} else if rt := time.Since(start); rt < _400ms { | |||||
t.Fatalf("w.Write(b[20:]) returned ahead of time (%v)", rt) | |||||
} | |||||
w.SetTransferSize(100) | |||||
status := []Status{w.Status(), nextStatus(w.Monitor)} | |||||
start = status[0].Start | |||||
// Active, Start, Duration, Idle, Bytes, Samples, InstRate, CurRate, AvgRate, PeakRate, BytesRem, TimeRem, Progress | |||||
want := []Status{ | |||||
Status{true, start, _400ms, 0, 80, 4, 200, 200, 200, 200, 20, _100ms, 80000}, | |||||
Status{true, start, _500ms, _100ms, 100, 5, 200, 200, 200, 200, 0, 0, 100000}, | |||||
} | |||||
for i, s := range status { | |||||
if !reflect.DeepEqual(&s, &want[i]) { | |||||
t.Errorf("w.Status(%v) expected %v; got %v", i, want[i], s) | |||||
} | |||||
} | |||||
if !bytes.Equal(b, w.Writer.(*bytes.Buffer).Bytes()) { | |||||
t.Errorf("w.Write() input doesn't match output") | |||||
} | |||||
} |
@ -0,0 +1,67 @@ | |||||
// | |||||
// Written by Maxim Khitrov (November 2012) | |||||
// | |||||
package flowcontrol | |||||
import ( | |||||
"math" | |||||
"strconv" | |||||
"time" | |||||
) | |||||
// clockRate is the resolution and precision of clock(). | |||||
const clockRate = 20 * time.Millisecond | |||||
// czero is the process start time rounded down to the nearest clockRate | |||||
// increment. | |||||
var czero = time.Duration(time.Now().UnixNano()) / clockRate * clockRate | |||||
// clock returns a low resolution timestamp relative to the process start time. | |||||
func clock() time.Duration { | |||||
return time.Duration(time.Now().UnixNano())/clockRate*clockRate - czero | |||||
} | |||||
// clockToTime converts a clock() timestamp to an absolute time.Time value. | |||||
func clockToTime(c time.Duration) time.Time { | |||||
return time.Unix(0, int64(czero+c)) | |||||
} | |||||
// clockRound returns d rounded to the nearest clockRate increment. | |||||
func clockRound(d time.Duration) time.Duration { | |||||
return (d + clockRate>>1) / clockRate * clockRate | |||||
} | |||||
// round returns x rounded to the nearest int64 (non-negative values only). | |||||
func round(x float64) int64 { | |||||
if _, frac := math.Modf(x); frac >= 0.5 { | |||||
return int64(math.Ceil(x)) | |||||
} | |||||
return int64(math.Floor(x)) | |||||
} | |||||
// Percent represents a percentage in increments of 1/1000th of a percent. | |||||
type Percent uint32 | |||||
// percentOf calculates what percent of the total is x. | |||||
func percentOf(x, total float64) Percent { | |||||
if x < 0 || total <= 0 { | |||||
return 0 | |||||
} else if p := round(x / total * 1e5); p <= math.MaxUint32 { | |||||
return Percent(p) | |||||
} | |||||
return Percent(math.MaxUint32) | |||||
} | |||||
func (p Percent) Float() float64 { | |||||
return float64(p) * 1e-3 | |||||
} | |||||
func (p Percent) String() string { | |||||
var buf [12]byte | |||||
b := strconv.AppendUint(buf[:0], uint64(p)/1000, 10) | |||||
n := len(b) | |||||
b = strconv.AppendUint(b, 1000+uint64(p)%1000, 10) | |||||
b[n] = '.' | |||||
return string(append(b, '%')) | |||||
} |