Browse Source

Merge remote-tracking branch 'flowrate/master' into unstable

pull/1842/head
Ethan Buchman 8 years ago
parent
commit
125e25a929
6 changed files with 660 additions and 0 deletions
  1. +29
    -0
      LICENSE
  2. +10
    -0
      README
  3. +275
    -0
      flowrate/flowrate.go
  4. +133
    -0
      flowrate/io.go
  5. +146
    -0
      flowrate/io_test.go
  6. +67
    -0
      flowrate/util.go

+ 29
- 0
LICENSE View File

@ -0,0 +1,29 @@
Copyright (c) 2014 The Go-FlowRate Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the
distribution.
* Neither the name of the go-flowrate project nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

+ 10
- 0
README View File

@ -0,0 +1,10 @@
Data Flow Rate Control
======================
To download and install this package run:
go get github.com/mxk/go-flowrate/flowrate
The documentation is available at:
http://godoc.org/github.com/mxk/go-flowrate/flowrate

+ 275
- 0
flowrate/flowrate.go View File

@ -0,0 +1,275 @@
//
// Written by Maxim Khitrov (November 2012)
//
// Package flowrate provides the tools for monitoring and limiting the flow rate
// of an arbitrary data stream.
package flowrate
import (
"math"
"sync"
"time"
)
// Monitor monitors and limits the transfer rate of a data stream.
type Monitor struct {
mu sync.Mutex // Mutex guarding access to all internal fields
active bool // Flag indicating an active transfer
start time.Duration // Transfer start time (clock() value)
bytes int64 // Total number of bytes transferred
samples int64 // Total number of samples taken
rSample float64 // Most recent transfer rate sample (bytes per second)
rEMA float64 // Exponential moving average of rSample
rPeak float64 // Peak transfer rate (max of all rSamples)
rWindow float64 // rEMA window (seconds)
sBytes int64 // Number of bytes transferred since sLast
sLast time.Duration // Most recent sample time (stop time when inactive)
sRate time.Duration // Sampling rate
tBytes int64 // Number of bytes expected in the current transfer
tLast time.Duration // Time of the most recent transfer of at least 1 byte
}
// New creates a new flow control monitor. Instantaneous transfer rate is
// measured and updated for each sampleRate interval. windowSize determines the
// weight of each sample in the exponential moving average (EMA) calculation.
// The exact formulas are:
//
// sampleTime = currentTime - prevSampleTime
// sampleRate = byteCount / sampleTime
// weight = 1 - exp(-sampleTime/windowSize)
// newRate = weight*sampleRate + (1-weight)*oldRate
//
// The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s,
// respectively.
func New(sampleRate, windowSize time.Duration) *Monitor {
if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
sampleRate = 5 * clockRate
}
if windowSize <= 0 {
windowSize = 1 * time.Second
}
now := clock()
return &Monitor{
active: true,
start: now,
rWindow: windowSize.Seconds(),
sLast: now,
sRate: sampleRate,
tLast: now,
}
}
// Update records the transfer of n bytes and returns n. It should be called
// after each Read/Write operation, even if n is 0.
func (m *Monitor) Update(n int) int {
m.mu.Lock()
m.update(n)
m.mu.Unlock()
return n
}
// Hack to set the current rEMA.
func (m *Monitor) SetREMA(rEMA float64) {
m.mu.Lock()
m.rEMA = rEMA
m.samples++
m.mu.Unlock()
}
// IO is a convenience method intended to wrap io.Reader and io.Writer method
// execution. It calls m.Update(n) and then returns (n, err) unmodified.
func (m *Monitor) IO(n int, err error) (int, error) {
return m.Update(n), err
}
// Done marks the transfer as finished and prevents any further updates or
// limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and
// Limit methods become NOOPs. It returns the total number of bytes transferred.
func (m *Monitor) Done() int64 {
m.mu.Lock()
if now := m.update(0); m.sBytes > 0 {
m.reset(now)
}
m.active = false
m.tLast = 0
n := m.bytes
m.mu.Unlock()
return n
}
// timeRemLimit is the maximum Status.TimeRem value.
const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
// Status represents the current Monitor status. All transfer rates are in bytes
// per second rounded to the nearest byte.
type Status struct {
Active bool // Flag indicating an active transfer
Start time.Time // Transfer start time
Duration time.Duration // Time period covered by the statistics
Idle time.Duration // Time since the last transfer of at least 1 byte
Bytes int64 // Total number of bytes transferred
Samples int64 // Total number of samples taken
InstRate int64 // Instantaneous transfer rate
CurRate int64 // Current transfer rate (EMA of InstRate)
AvgRate int64 // Average transfer rate (Bytes / Duration)
PeakRate int64 // Maximum instantaneous transfer rate
BytesRem int64 // Number of bytes remaining in the transfer
TimeRem time.Duration // Estimated time to completion
Progress Percent // Overall transfer progress
}
// Status returns current transfer status information. The returned value
// becomes static after a call to Done.
func (m *Monitor) Status() Status {
m.mu.Lock()
now := m.update(0)
s := Status{
Active: m.active,
Start: clockToTime(m.start),
Duration: m.sLast - m.start,
Idle: now - m.tLast,
Bytes: m.bytes,
Samples: m.samples,
PeakRate: round(m.rPeak),
BytesRem: m.tBytes - m.bytes,
Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
}
if s.BytesRem < 0 {
s.BytesRem = 0
}
if s.Duration > 0 {
rAvg := float64(s.Bytes) / s.Duration.Seconds()
s.AvgRate = round(rAvg)
if s.Active {
s.InstRate = round(m.rSample)
s.CurRate = round(m.rEMA)
if s.BytesRem > 0 {
if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
ns := float64(s.BytesRem) / tRate * 1e9
if ns > float64(timeRemLimit) {
ns = float64(timeRemLimit)
}
s.TimeRem = clockRound(time.Duration(ns))
}
}
}
}
m.mu.Unlock()
return s
}
// Limit restricts the instantaneous (per-sample) data flow to rate bytes per
// second. It returns the maximum number of bytes (0 <= n <= want) that may be
// transferred immediately without exceeding the limit. If block == true, the
// call blocks until n > 0. want is returned unmodified if want < 1, rate < 1,
// or the transfer is inactive (after a call to Done).
//
// At least one byte is always allowed to be transferred in any given sampling
// period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate
// is 10 bytes per second.
//
// For usage examples, see the implementation of Reader and Writer in io.go.
func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
if want < 1 || rate < 1 {
return want
}
m.mu.Lock()
// Determine the maximum number of bytes that can be sent in one sample
limit := round(float64(rate) * m.sRate.Seconds())
if limit <= 0 {
limit = 1
}
// If block == true, wait until m.sBytes < limit
if now := m.update(0); block {
for m.sBytes >= limit && m.active {
now = m.waitNextSample(now)
}
}
// Make limit <= want (unlimited if the transfer is no longer active)
if limit -= m.sBytes; limit > int64(want) || !m.active {
limit = int64(want)
}
m.mu.Unlock()
if limit < 0 {
limit = 0
}
return int(limit)
}
// SetTransferSize specifies the total size of the data transfer, which allows
// the Monitor to calculate the overall progress and time to completion.
func (m *Monitor) SetTransferSize(bytes int64) {
if bytes < 0 {
bytes = 0
}
m.mu.Lock()
m.tBytes = bytes
m.mu.Unlock()
}
// update accumulates the transferred byte count for the current sample until
// clock() - m.sLast >= m.sRate. The monitor status is updated once the current
// sample is done.
func (m *Monitor) update(n int) (now time.Duration) {
if !m.active {
return
}
if now = clock(); n > 0 {
m.tLast = now
}
m.sBytes += int64(n)
if sTime := now - m.sLast; sTime >= m.sRate {
t := sTime.Seconds()
if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
m.rPeak = m.rSample
}
// Exponential moving average using a method similar to *nix load
// average calculation. Longer sampling periods carry greater weight.
if m.samples > 0 {
w := math.Exp(-t / m.rWindow)
m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
} else {
m.rEMA = m.rSample
}
m.reset(now)
}
return
}
// reset clears the current sample state in preparation for the next sample.
func (m *Monitor) reset(sampleTime time.Duration) {
m.bytes += m.sBytes
m.samples++
m.sBytes = 0
m.sLast = sampleTime
}
// waitNextSample sleeps for the remainder of the current sample. The lock is
// released and reacquired during the actual sleep period, so it's possible for
// the transfer to be inactive when this method returns.
func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
const minWait = 5 * time.Millisecond
current := m.sLast
// sleep until the last sample time changes (ideally, just one iteration)
for m.sLast == current && m.active {
d := current + m.sRate - now
m.mu.Unlock()
if d < minWait {
d = minWait
}
time.Sleep(d)
m.mu.Lock()
now = m.update(0)
}
return now
}

+ 133
- 0
flowrate/io.go View File

@ -0,0 +1,133 @@
//
// Written by Maxim Khitrov (November 2012)
//
package flowrate
import (
"errors"
"io"
)
// ErrLimit is returned by the Writer when a non-blocking write is short due to
// the transfer rate limit.
var ErrLimit = errors.New("flowrate: flow rate limit exceeded")
// Limiter is implemented by the Reader and Writer to provide a consistent
// interface for monitoring and controlling data transfer.
type Limiter interface {
Done() int64
Status() Status
SetTransferSize(bytes int64)
SetLimit(new int64) (old int64)
SetBlocking(new bool) (old bool)
}
// Reader implements io.ReadCloser with a restriction on the rate of data
// transfer.
type Reader struct {
io.Reader // Data source
*Monitor // Flow control monitor
limit int64 // Rate limit in bytes per second (unlimited when <= 0)
block bool // What to do when no new bytes can be read due to the limit
}
// NewReader restricts all Read operations on r to limit bytes per second.
func NewReader(r io.Reader, limit int64) *Reader {
return &Reader{r, New(0, 0), limit, true}
}
// Read reads up to len(p) bytes into p without exceeding the current transfer
// rate limit. It returns (0, nil) immediately if r is non-blocking and no new
// bytes can be read at this time.
func (r *Reader) Read(p []byte) (n int, err error) {
p = p[:r.Limit(len(p), r.limit, r.block)]
if len(p) > 0 {
n, err = r.IO(r.Reader.Read(p))
}
return
}
// SetLimit changes the transfer rate limit to new bytes per second and returns
// the previous setting.
func (r *Reader) SetLimit(new int64) (old int64) {
old, r.limit = r.limit, new
return
}
// SetBlocking changes the blocking behavior and returns the previous setting. A
// Read call on a non-blocking reader returns immediately if no additional bytes
// may be read at this time due to the rate limit.
func (r *Reader) SetBlocking(new bool) (old bool) {
old, r.block = r.block, new
return
}
// Close closes the underlying reader if it implements the io.Closer interface.
func (r *Reader) Close() error {
defer r.Done()
if c, ok := r.Reader.(io.Closer); ok {
return c.Close()
}
return nil
}
// Writer implements io.WriteCloser with a restriction on the rate of data
// transfer.
type Writer struct {
io.Writer // Data destination
*Monitor // Flow control monitor
limit int64 // Rate limit in bytes per second (unlimited when <= 0)
block bool // What to do when no new bytes can be written due to the limit
}
// NewWriter restricts all Write operations on w to limit bytes per second. The
// transfer rate and the default blocking behavior (true) can be changed
// directly on the returned *Writer.
func NewWriter(w io.Writer, limit int64) *Writer {
return &Writer{w, New(0, 0), limit, true}
}
// Write writes len(p) bytes from p to the underlying data stream without
// exceeding the current transfer rate limit. It returns (n, ErrLimit) if w is
// non-blocking and no additional bytes can be written at this time.
func (w *Writer) Write(p []byte) (n int, err error) {
var c int
for len(p) > 0 && err == nil {
s := p[:w.Limit(len(p), w.limit, w.block)]
if len(s) > 0 {
c, err = w.IO(w.Writer.Write(s))
} else {
return n, ErrLimit
}
p = p[c:]
n += c
}
return
}
// SetLimit changes the transfer rate limit to new bytes per second and returns
// the previous setting.
func (w *Writer) SetLimit(new int64) (old int64) {
old, w.limit = w.limit, new
return
}
// SetBlocking changes the blocking behavior and returns the previous setting. A
// Write call on a non-blocking writer returns as soon as no additional bytes
// may be written at this time due to the rate limit.
func (w *Writer) SetBlocking(new bool) (old bool) {
old, w.block = w.block, new
return
}
// Close closes the underlying writer if it implements the io.Closer interface.
func (w *Writer) Close() error {
defer w.Done()
if c, ok := w.Writer.(io.Closer); ok {
return c.Close()
}
return nil
}

+ 146
- 0
flowrate/io_test.go View File

@ -0,0 +1,146 @@
//
// Written by Maxim Khitrov (November 2012)
//
package flowrate
import (
"bytes"
"reflect"
"testing"
"time"
)
const (
_50ms = 50 * time.Millisecond
_100ms = 100 * time.Millisecond
_200ms = 200 * time.Millisecond
_300ms = 300 * time.Millisecond
_400ms = 400 * time.Millisecond
_500ms = 500 * time.Millisecond
)
func nextStatus(m *Monitor) Status {
samples := m.samples
for i := 0; i < 30; i++ {
if s := m.Status(); s.Samples != samples {
return s
}
time.Sleep(5 * time.Millisecond)
}
return m.Status()
}
func TestReader(t *testing.T) {
in := make([]byte, 100)
for i := range in {
in[i] = byte(i)
}
b := make([]byte, 100)
r := NewReader(bytes.NewReader(in), 100)
start := time.Now()
// Make sure r implements Limiter
_ = Limiter(r)
// 1st read of 10 bytes is performed immediately
if n, err := r.Read(b); n != 10 || err != nil {
t.Fatalf("r.Read(b) expected 10 (<nil>); got %v (%v)", n, err)
} else if rt := time.Since(start); rt > _50ms {
t.Fatalf("r.Read(b) took too long (%v)", rt)
}
// No new Reads allowed in the current sample
r.SetBlocking(false)
if n, err := r.Read(b); n != 0 || err != nil {
t.Fatalf("r.Read(b) expected 0 (<nil>); got %v (%v)", n, err)
} else if rt := time.Since(start); rt > _50ms {
t.Fatalf("r.Read(b) took too long (%v)", rt)
}
status := [6]Status{0: r.Status()} // No samples in the first status
// 2nd read of 10 bytes blocks until the next sample
r.SetBlocking(true)
if n, err := r.Read(b[10:]); n != 10 || err != nil {
t.Fatalf("r.Read(b[10:]) expected 10 (<nil>); got %v (%v)", n, err)
} else if rt := time.Since(start); rt < _100ms {
t.Fatalf("r.Read(b[10:]) returned ahead of time (%v)", rt)
}
status[1] = r.Status() // 1st sample
status[2] = nextStatus(r.Monitor) // 2nd sample
status[3] = nextStatus(r.Monitor) // No activity for the 3rd sample
if n := r.Done(); n != 20 {
t.Fatalf("r.Done() expected 20; got %v", n)
}
status[4] = r.Status()
status[5] = nextStatus(r.Monitor) // Timeout
start = status[0].Start
// Active, Start, Duration, Idle, Bytes, Samples, InstRate, CurRate, AvgRate, PeakRate, BytesRem, TimeRem, Progress
want := []Status{
Status{true, start, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Status{true, start, _100ms, 0, 10, 1, 100, 100, 100, 100, 0, 0, 0},
Status{true, start, _200ms, _100ms, 20, 2, 100, 100, 100, 100, 0, 0, 0},
Status{true, start, _300ms, _200ms, 20, 3, 0, 90, 67, 100, 0, 0, 0},
Status{false, start, _300ms, 0, 20, 3, 0, 0, 67, 100, 0, 0, 0},
Status{false, start, _300ms, 0, 20, 3, 0, 0, 67, 100, 0, 0, 0},
}
for i, s := range status {
if !reflect.DeepEqual(&s, &want[i]) {
t.Errorf("r.Status(%v) expected %v; got %v", i, want[i], s)
}
}
if !bytes.Equal(b[:20], in[:20]) {
t.Errorf("r.Read() input doesn't match output")
}
}
func TestWriter(t *testing.T) {
b := make([]byte, 100)
for i := range b {
b[i] = byte(i)
}
w := NewWriter(&bytes.Buffer{}, 200)
start := time.Now()
// Make sure w implements Limiter
_ = Limiter(w)
// Non-blocking 20-byte write for the first sample returns ErrLimit
w.SetBlocking(false)
if n, err := w.Write(b); n != 20 || err != ErrLimit {
t.Fatalf("w.Write(b) expected 20 (ErrLimit); got %v (%v)", n, err)
} else if rt := time.Since(start); rt > _50ms {
t.Fatalf("w.Write(b) took too long (%v)", rt)
}
// Blocking 80-byte write
w.SetBlocking(true)
if n, err := w.Write(b[20:]); n != 80 || err != nil {
t.Fatalf("w.Write(b[20:]) expected 80 (<nil>); got %v (%v)", n, err)
} else if rt := time.Since(start); rt < _400ms {
t.Fatalf("w.Write(b[20:]) returned ahead of time (%v)", rt)
}
w.SetTransferSize(100)
status := []Status{w.Status(), nextStatus(w.Monitor)}
start = status[0].Start
// Active, Start, Duration, Idle, Bytes, Samples, InstRate, CurRate, AvgRate, PeakRate, BytesRem, TimeRem, Progress
want := []Status{
Status{true, start, _400ms, 0, 80, 4, 200, 200, 200, 200, 20, _100ms, 80000},
Status{true, start, _500ms, _100ms, 100, 5, 200, 200, 200, 200, 0, 0, 100000},
}
for i, s := range status {
if !reflect.DeepEqual(&s, &want[i]) {
t.Errorf("w.Status(%v) expected %v; got %v", i, want[i], s)
}
}
if !bytes.Equal(b, w.Writer.(*bytes.Buffer).Bytes()) {
t.Errorf("w.Write() input doesn't match output")
}
}

+ 67
- 0
flowrate/util.go View File

@ -0,0 +1,67 @@
//
// Written by Maxim Khitrov (November 2012)
//
package flowrate
import (
"math"
"strconv"
"time"
)
// clockRate is the resolution and precision of clock().
const clockRate = 20 * time.Millisecond
// czero is the process start time rounded down to the nearest clockRate
// increment.
var czero = time.Duration(time.Now().UnixNano()) / clockRate * clockRate
// clock returns a low resolution timestamp relative to the process start time.
func clock() time.Duration {
return time.Duration(time.Now().UnixNano())/clockRate*clockRate - czero
}
// clockToTime converts a clock() timestamp to an absolute time.Time value.
func clockToTime(c time.Duration) time.Time {
return time.Unix(0, int64(czero+c))
}
// clockRound returns d rounded to the nearest clockRate increment.
func clockRound(d time.Duration) time.Duration {
return (d + clockRate>>1) / clockRate * clockRate
}
// round returns x rounded to the nearest int64 (non-negative values only).
func round(x float64) int64 {
if _, frac := math.Modf(x); frac >= 0.5 {
return int64(math.Ceil(x))
}
return int64(math.Floor(x))
}
// Percent represents a percentage in increments of 1/1000th of a percent.
type Percent uint32
// percentOf calculates what percent of the total is x.
func percentOf(x, total float64) Percent {
if x < 0 || total <= 0 {
return 0
} else if p := round(x / total * 1e5); p <= math.MaxUint32 {
return Percent(p)
}
return Percent(math.MaxUint32)
}
func (p Percent) Float() float64 {
return float64(p) * 1e-3
}
func (p Percent) String() string {
var buf [12]byte
b := strconv.AppendUint(buf[:0], uint64(p)/1000, 10)
n := len(b)
b = strconv.AppendUint(b, 1000+uint64(p)%1000, 10)
b[n] = '.'
return string(append(b, '%'))
}

Loading…
Cancel
Save