commit bec34fc8d7eec716c3f2136dc78c0d7eee050d7f Author: Maxim Khitrov Date: Fri Apr 18 20:31:57 2014 -0400 Import flowcontrol package from Google Code diff --git a/flowcontrol/flowcontrol.go b/flowcontrol/flowcontrol.go new file mode 100644 index 000000000..40db5d89e --- /dev/null +++ b/flowcontrol/flowcontrol.go @@ -0,0 +1,267 @@ +// +// Written by Maxim Khitrov (November 2012) +// + +// Package flowcontrol provides the tools for monitoring and limiting the +// transfer rate of an arbitrary data stream. +package flowcontrol + +import ( + "math" + "sync" + "time" +) + +// Monitor monitors and limits the transfer rate of a data stream. +type Monitor struct { + mu sync.Mutex // Mutex guarding access to all internal fields + active bool // Flag indicating an active transfer + start time.Duration // Transfer start time (clock() value) + bytes int64 // Total number of bytes transferred + samples int64 // Total number of samples taken + + rSample float64 // Most recent transfer rate sample (bytes per second) + rEMA float64 // Exponential moving average of rSample + rPeak float64 // Peak transfer rate (max of all rSamples) + rWindow float64 // rEMA window (seconds) + + sBytes int64 // Number of bytes transferred since sLast + sLast time.Duration // Most recent sample time (stop time when inactive) + sRate time.Duration // Sampling rate + + tBytes int64 // Number of bytes expected in the current transfer + tLast time.Duration // Time of the most recent transfer of at least 1 byte +} + +// New creates a new flow control monitor. Instantaneous transfer rate is +// measured and updated for each sampleRate interval. windowSize determines the +// weight of each sample in the exponential moving average (EMA) calculation. +// The exact formulas are: +// +// sampleTime = currentTime - prevSampleTime +// sampleRate = byteCount / sampleTime +// weight = 1 - exp(-sampleTime/windowSize) +// newRate = weight*sampleRate + (1-weight)*oldRate +// +// The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s, +// respectively. +func New(sampleRate, windowSize time.Duration) *Monitor { + if sampleRate = clockRound(sampleRate); sampleRate <= 0 { + sampleRate = 5 * clockRate + } + if windowSize <= 0 { + windowSize = 1 * time.Second + } + now := clock() + return &Monitor{ + active: true, + start: now, + rWindow: windowSize.Seconds(), + sLast: now, + sRate: sampleRate, + tLast: now, + } +} + +// Update records the transfer of n bytes and returns n. It should be called +// after each Read/Write operation, even if n is 0. +func (m *Monitor) Update(n int) int { + m.mu.Lock() + m.update(n) + m.mu.Unlock() + return n +} + +// IO is a convenience method intended to wrap io.Reader and io.Writer method +// execution. It calls m.Update(n) and then returns (n, err) unmodified. +func (m *Monitor) IO(n int, err error) (int, error) { + return m.Update(n), err +} + +// Done marks the transfer as finished and prevents any further updates or +// limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and +// Limit methods become NOOPs. It returns the total number of bytes transferred. +func (m *Monitor) Done() int64 { + m.mu.Lock() + if now := m.update(0); m.sBytes > 0 { + m.reset(now) + } + m.active = false + m.tLast = 0 + n := m.bytes + m.mu.Unlock() + return n +} + +// timeRemLimit is the maximum Status.TimeRem value. +const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second + +// Status represents the current Monitor status. All transfer rates are in bytes +// per second rounded to the nearest byte. +type Status struct { + Active bool // Flag indicating an active transfer + Start time.Time // Transfer start time + Duration time.Duration // Time period covered by the statistics + Idle time.Duration // Time since the last transfer of at least 1 byte + Bytes int64 // Total number of bytes transferred + Samples int64 // Total number of samples taken + InstRate int64 // Instantaneous transfer rate + CurRate int64 // Current transfer rate (EMA of InstRate) + AvgRate int64 // Average transfer rate (Bytes / Duration) + PeakRate int64 // Maximum instantaneous transfer rate + BytesRem int64 // Number of bytes remaining in the transfer + TimeRem time.Duration // Estimated time to completion + Progress Percent // Overall transfer progress +} + +// Status returns current transfer status information. The returned value +// becomes static after a call to Done. +func (m *Monitor) Status() Status { + m.mu.Lock() + now := m.update(0) + s := Status{ + Active: m.active, + Start: clockToTime(m.start), + Duration: m.sLast - m.start, + Idle: now - m.tLast, + Bytes: m.bytes, + Samples: m.samples, + PeakRate: round(m.rPeak), + BytesRem: m.tBytes - m.bytes, + Progress: percentOf(float64(m.bytes), float64(m.tBytes)), + } + if s.BytesRem < 0 { + s.BytesRem = 0 + } + if s.Duration > 0 { + rAvg := float64(s.Bytes) / s.Duration.Seconds() + s.AvgRate = round(rAvg) + if s.Active { + s.InstRate = round(m.rSample) + s.CurRate = round(m.rEMA) + if s.BytesRem > 0 { + if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 { + ns := float64(s.BytesRem) / tRate * 1e9 + if ns > float64(timeRemLimit) { + ns = float64(timeRemLimit) + } + s.TimeRem = clockRound(time.Duration(ns)) + } + } + } + } + m.mu.Unlock() + return s +} + +// Limit restricts the instantaneous (per-sample) data flow to rate bytes per +// second. It returns the maximum number of bytes (0 <= n <= want) that may be +// transferred immediately without exceeding the limit. If block == true, the +// call blocks until n > 0. want is returned unmodified if want < 1, rate < 1, +// or the transfer is inactive (after a call to Done). +// +// At least one byte is always allowed to be transferred in any given sampling +// period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate +// is 10 bytes per second. +// +// For usage examples, see the implementation of Reader and Writer in io.go. +func (m *Monitor) Limit(want int, rate int64, block bool) (n int) { + if want < 1 || rate < 1 { + return want + } + m.mu.Lock() + + // Determine the maximum number of bytes that can be sent in one sample + limit := round(float64(rate) * m.sRate.Seconds()) + if limit <= 0 { + limit = 1 + } + + // If block == true, wait until m.sBytes < limit + if now := m.update(0); block { + for m.sBytes >= limit && m.active { + now = m.waitNextSample(now) + } + } + + // Make limit <= want (unlimited if the transfer is no longer active) + if limit -= m.sBytes; limit > int64(want) || !m.active { + limit = int64(want) + } + m.mu.Unlock() + + if limit < 0 { + limit = 0 + } + return int(limit) +} + +// SetTransferSize specifies the total size of the data transfer, which allows +// the Monitor to calculate the overall progress and time to completion. +func (m *Monitor) SetTransferSize(bytes int64) { + if bytes < 0 { + bytes = 0 + } + m.mu.Lock() + m.tBytes = bytes + m.mu.Unlock() +} + +// update accumulates the transferred byte count for the current sample until +// clock() - m.sLast >= m.sRate. The monitor status is updated once the current +// sample is done. +func (m *Monitor) update(n int) (now time.Duration) { + if !m.active { + return + } + if now = clock(); n > 0 { + m.tLast = now + } + m.sBytes += int64(n) + if sTime := now - m.sLast; sTime >= m.sRate { + t := sTime.Seconds() + if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak { + m.rPeak = m.rSample + } + + // Exponential moving average using a method similar to *nix load + // average calculation. Longer sampling periods carry greater weight. + if m.samples > 0 { + w := math.Exp(-t / m.rWindow) + m.rEMA = m.rSample + w*(m.rEMA-m.rSample) + } else { + m.rEMA = m.rSample + } + m.reset(now) + } + return +} + +// reset clears the current sample state in preparation for the next sample. +func (m *Monitor) reset(sampleTime time.Duration) { + m.bytes += m.sBytes + m.samples++ + m.sBytes = 0 + m.sLast = sampleTime +} + +// waitNextSample sleeps for the remainder of the current sample. The lock is +// released and reacquired during the actual sleep period, so it's possible for +// the transfer to be inactive when this method returns. +func (m *Monitor) waitNextSample(now time.Duration) time.Duration { + const minWait = 5 * time.Millisecond + current := m.sLast + + // sleep until the last sample time changes (ideally, just one iteration) + for m.sLast == current && m.active { + d := current + m.sRate - now + m.mu.Unlock() + if d < minWait { + d = minWait + } + time.Sleep(d) + m.mu.Lock() + now = m.update(0) + } + return now +} diff --git a/flowcontrol/io.go b/flowcontrol/io.go new file mode 100644 index 000000000..12a753ddf --- /dev/null +++ b/flowcontrol/io.go @@ -0,0 +1,133 @@ +// +// Written by Maxim Khitrov (November 2012) +// + +package flowcontrol + +import ( + "errors" + "io" +) + +// ErrLimit is returned by the Writer when a non-blocking write is short due to +// the transfer rate limit. +var ErrLimit = errors.New("flowcontrol: transfer rate limit exceeded") + +// Limiter is implemented by the Reader and Writer to provide a consistent +// interface for monitoring and controlling data transfer. +type Limiter interface { + Done() int64 + Status() Status + SetTransferSize(bytes int64) + SetLimit(new int64) (old int64) + SetBlocking(new bool) (old bool) +} + +// Reader implements io.ReadCloser with a restriction on the rate of data +// transfer. +type Reader struct { + io.Reader // Data source + *Monitor // Flow control monitor + + limit int64 // Rate limit in bytes per second (unlimited when <= 0) + block bool // What to do when no new bytes can be read due to the limit +} + +// NewReader restricts all Read operations on r to limit bytes per second. +func NewReader(r io.Reader, limit int64) *Reader { + return &Reader{r, New(0, 0), limit, true} +} + +// Read reads up to len(p) bytes into p without exceeding the current transfer +// rate limit. It returns (0, nil) immediately if r is non-blocking and no new +// bytes can be read at this time. +func (r *Reader) Read(p []byte) (n int, err error) { + p = p[:r.Limit(len(p), r.limit, r.block)] + if len(p) > 0 { + n, err = r.IO(r.Reader.Read(p)) + } + return +} + +// SetLimit changes the transfer rate limit to new bytes per second and returns +// the previous setting. +func (r *Reader) SetLimit(new int64) (old int64) { + old, r.limit = r.limit, new + return +} + +// SetBlocking changes the blocking behavior and returns the previous setting. A +// Read call on a non-blocking reader returns immediately if no additional bytes +// may be read at this time due to the rate limit. +func (r *Reader) SetBlocking(new bool) (old bool) { + old, r.block = r.block, new + return +} + +// Close closes the underlying reader if it implements the io.Closer interface. +func (r *Reader) Close() error { + defer r.Done() + if c, ok := r.Reader.(io.Closer); ok { + return c.Close() + } + return nil +} + +// Writer implements io.WriteCloser with a restriction on the rate of data +// transfer. +type Writer struct { + io.Writer // Data destination + *Monitor // Flow control monitor + + limit int64 // Rate limit in bytes per second (unlimited when <= 0) + block bool // What to do when no new bytes can be written due to the limit +} + +// NewWriter restricts all Write operations on w to limit bytes per second. The +// transfer rate and the default blocking behavior (true) can be changed +// directly on the returned *Writer. +func NewWriter(w io.Writer, limit int64) *Writer { + return &Writer{w, New(0, 0), limit, true} +} + +// Write writes len(p) bytes from p to the underlying data stream without +// exceeding the current transfer rate limit. It returns (n, ErrLimit) if w is +// non-blocking and no additional bytes can be written at this time. +func (w *Writer) Write(p []byte) (n int, err error) { + var c int + for len(p) > 0 && err == nil { + s := p[:w.Limit(len(p), w.limit, w.block)] + if len(s) > 0 { + c, err = w.IO(w.Writer.Write(s)) + } else { + return n, ErrLimit + } + p = p[c:] + n += c + } + return +} + +// SetLimit changes the transfer rate limit to new bytes per second and returns +// the previous setting. +func (w *Writer) SetLimit(new int64) (old int64) { + old, w.limit = w.limit, new + return +} + +// SetBlocking changes the blocking behavior and returns the previous setting. A +// Write call on a non-blocking writer returns as soon as no additional bytes +// may be written at this time due to the rate limit. +func (w *Writer) SetBlocking(new bool) (old bool) { + old, w.block = w.block, new + return +} + +// Close closes the underlying writer if it implements the io.Closer interface. +func (w *Writer) Close() error { + defer w.Done() + if c, ok := w.Writer.(io.Closer); ok { + return c.Close() + } + return nil +} diff --git a/flowcontrol/io_test.go b/flowcontrol/io_test.go new file mode 100644 index 000000000..318069366 --- /dev/null +++ b/flowcontrol/io_test.go @@ -0,0 +1,146 @@ +// +// Written by Maxim Khitrov (November 2012) +// + +package flowcontrol + +import ( + "bytes" + "reflect" + "testing" + "time" +) + +const ( + _50ms = 50 * time.Millisecond + _100ms = 100 * time.Millisecond + _200ms = 200 * time.Millisecond + _300ms = 300 * time.Millisecond + _400ms = 400 * time.Millisecond + _500ms = 500 * time.Millisecond +) + +func nextStatus(m *Monitor) Status { + samples := m.samples + for i := 0; i < 30; i++ { + if s := m.Status(); s.Samples != samples { + return s + } + time.Sleep(5 * time.Millisecond) + } + return m.Status() +} + +func TestReader(t *testing.T) { + in := make([]byte, 100) + for i := range in { + in[i] = byte(i) + } + b := make([]byte, 100) + r := NewReader(bytes.NewReader(in), 100) + start := time.Now() + + // Make sure r implements Limiter + _ = Limiter(r) + + // 1st read of 10 bytes is performed immediately + if n, err := r.Read(b); n != 10 || err != nil { + t.Fatalf("r.Read(b) expected 10 (); got %v (%v)", n, err) + } else if rt := time.Since(start); rt > _50ms { + t.Fatalf("r.Read(b) took too long (%v)", rt) + } + + // No new Reads allowed in the current sample + r.SetBlocking(false) + if n, err := r.Read(b); n != 0 || err != nil { + t.Fatalf("r.Read(b) expected 0 (); got %v (%v)", n, err) + } else if rt := time.Since(start); rt > _50ms { + t.Fatalf("r.Read(b) took too long (%v)", rt) + } + + status := [6]Status{0: r.Status()} // No samples in the first status + + // 2nd read of 10 bytes blocks until the next sample + r.SetBlocking(true) + if n, err := r.Read(b[10:]); n != 10 || err != nil { + t.Fatalf("r.Read(b[10:]) expected 10 (); got %v (%v)", n, err) + } else if rt := time.Since(start); rt < _100ms { + t.Fatalf("r.Read(b[10:]) returned ahead of time (%v)", rt) + } + + status[1] = r.Status() // 1st sample + status[2] = nextStatus(r.Monitor) // 2nd sample + status[3] = nextStatus(r.Monitor) // No activity for the 3rd sample + + if n := r.Done(); n != 20 { + t.Fatalf("r.Done() expected 20; got %v", n) + } + + status[4] = r.Status() + status[5] = nextStatus(r.Monitor) // Timeout + start = status[0].Start + + // Active, Start, Duration, Idle, Bytes, Samples, InstRate, CurRate, AvgRate, PeakRate, BytesRem, TimeRem, Progress + want := []Status{ + Status{true, start, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Status{true, start, _100ms, 0, 10, 1, 100, 100, 100, 100, 0, 0, 0}, + Status{true, start, _200ms, _100ms, 20, 2, 100, 100, 100, 100, 0, 0, 0}, + Status{true, start, _300ms, _200ms, 20, 3, 0, 90, 67, 100, 0, 0, 0}, + Status{false, start, _300ms, 0, 20, 3, 0, 0, 67, 100, 0, 0, 0}, + Status{false, start, _300ms, 0, 20, 3, 0, 0, 67, 100, 0, 0, 0}, + } + for i, s := range status { + if !reflect.DeepEqual(&s, &want[i]) { + t.Errorf("r.Status(%v) expected %v; got %v", i, want[i], s) + } + } + if !bytes.Equal(b[:20], in[:20]) { + t.Errorf("r.Read() input doesn't match output") + } +} + +func TestWriter(t *testing.T) { + b := make([]byte, 100) + for i := range b { + b[i] = byte(i) + } + w := NewWriter(&bytes.Buffer{}, 200) + start := time.Now() + + // Make sure w implements Limiter + _ = Limiter(w) + + // Non-blocking 20-byte write for the first sample returns ErrLimit + w.SetBlocking(false) + if n, err := w.Write(b); n != 20 || err != ErrLimit { + t.Fatalf("w.Write(b) expected 20 (ErrLimit); got %v (%v)", n, err) + } else if rt := time.Since(start); rt > _50ms { + t.Fatalf("w.Write(b) took too long (%v)", rt) + } + + // Blocking 80-byte write + w.SetBlocking(true) + if n, err := w.Write(b[20:]); n != 80 || err != nil { + t.Fatalf("w.Write(b[20:]) expected 80 (); got %v (%v)", n, err) + } else if rt := time.Since(start); rt < _400ms { + t.Fatalf("w.Write(b[20:]) returned ahead of time (%v)", rt) + } + + w.SetTransferSize(100) + status := []Status{w.Status(), nextStatus(w.Monitor)} + start = status[0].Start + + // Active, Start, Duration, Idle, Bytes, Samples, InstRate, CurRate, AvgRate, PeakRate, BytesRem, TimeRem, Progress + want := []Status{ + Status{true, start, _400ms, 0, 80, 4, 200, 200, 200, 200, 20, _100ms, 80000}, + Status{true, start, _500ms, _100ms, 100, 5, 200, 200, 200, 200, 0, 0, 100000}, + } + for i, s := range status { + if !reflect.DeepEqual(&s, &want[i]) { + t.Errorf("w.Status(%v) expected %v; got %v", i, want[i], s) + } + } + if !bytes.Equal(b, w.Writer.(*bytes.Buffer).Bytes()) { + t.Errorf("w.Write() input doesn't match output") + } +} diff --git a/flowcontrol/util.go b/flowcontrol/util.go new file mode 100644 index 000000000..91efd8815 --- /dev/null +++ b/flowcontrol/util.go @@ -0,0 +1,67 @@ +// +// Written by Maxim Khitrov (November 2012) +// + +package flowcontrol + +import ( + "math" + "strconv" + "time" +) + +// clockRate is the resolution and precision of clock(). +const clockRate = 20 * time.Millisecond + +// czero is the process start time rounded down to the nearest clockRate +// increment. +var czero = time.Duration(time.Now().UnixNano()) / clockRate * clockRate + +// clock returns a low resolution timestamp relative to the process start time. +func clock() time.Duration { + return time.Duration(time.Now().UnixNano())/clockRate*clockRate - czero +} + +// clockToTime converts a clock() timestamp to an absolute time.Time value. +func clockToTime(c time.Duration) time.Time { + return time.Unix(0, int64(czero+c)) +} + +// clockRound returns d rounded to the nearest clockRate increment. +func clockRound(d time.Duration) time.Duration { + return (d + clockRate>>1) / clockRate * clockRate +} + +// round returns x rounded to the nearest int64 (non-negative values only). +func round(x float64) int64 { + if _, frac := math.Modf(x); frac >= 0.5 { + return int64(math.Ceil(x)) + } + return int64(math.Floor(x)) +} + +// Percent represents a percentage in increments of 1/1000th of a percent. +type Percent uint32 + +// percentOf calculates what percent of the total is x. +func percentOf(x, total float64) Percent { + if x < 0 || total <= 0 { + return 0 + } else if p := round(x / total * 1e5); p <= math.MaxUint32 { + return Percent(p) + } + return Percent(math.MaxUint32) +} + +func (p Percent) Float() float64 { + return float64(p) * 1e-3 +} + +func (p Percent) String() string { + var buf [12]byte + b := strconv.AppendUint(buf[:0], uint64(p)/1000, 10) + n := len(b) + b = strconv.AppendUint(b, 1000+uint64(p)%1000, 10) + b[n] = '.' + return string(append(b, '%')) +}