diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index d10a32b0d..8117fe2d4 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -7,7 +7,7 @@ import ( "sync/atomic" "time" - flow "github.com/tendermint/tendermint/internal/libs/flowrate" + "github.com/tendermint/tendermint/internal/libs/flowrate" tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" @@ -471,7 +471,7 @@ type bpPeer struct { base int64 pool *BlockPool id types.NodeID - recvMonitor *flow.Monitor + recvMonitor *flowrate.Monitor timeout *time.Timer @@ -495,7 +495,7 @@ func (peer *bpPeer) setLogger(l log.Logger) { } func (peer *bpPeer) resetMonitor() { - peer.recvMonitor = flow.New(time.Second, time.Second*40) + peer.recvMonitor = flowrate.New(time.Second, time.Second*40) initialValue := float64(minRecvRate) * math.E peer.recvMonitor.SetREMA(initialValue) } diff --git a/internal/libs/flowrate/README.md b/internal/libs/flowrate/README.md deleted file mode 100644 index caed79aa3..000000000 --- a/internal/libs/flowrate/README.md +++ /dev/null @@ -1,10 +0,0 @@ -Data Flow Rate Control -====================== - -To download and install this package run: - -go get github.com/mxk/go-flowrate/flowrate - -The documentation is available at: - - diff --git a/internal/libs/flowrate/io.go b/internal/libs/flowrate/io.go deleted file mode 100644 index fbe090972..000000000 --- a/internal/libs/flowrate/io.go +++ /dev/null @@ -1,133 +0,0 @@ -// -// Written by Maxim Khitrov (November 2012) -// - -package flowrate - -import ( - "errors" - "io" -) - -// ErrLimit is returned by the Writer when a non-blocking write is short due to -// the transfer rate limit. -var ErrLimit = errors.New("flowrate: flow rate limit exceeded") - -// Limiter is implemented by the Reader and Writer to provide a consistent -// interface for monitoring and controlling data transfer. -type Limiter interface { - Done() int64 - Status() Status - SetTransferSize(bytes int64) - SetLimit(new int64) (old int64) - SetBlocking(new bool) (old bool) -} - -// Reader implements io.ReadCloser with a restriction on the rate of data -// transfer. -type Reader struct { - io.Reader // Data source - *Monitor // Flow control monitor - - limit int64 // Rate limit in bytes per second (unlimited when <= 0) - block bool // What to do when no new bytes can be read due to the limit -} - -// NewReader restricts all Read operations on r to limit bytes per second. -func NewReader(r io.Reader, limit int64) *Reader { - return &Reader{r, New(0, 0), limit, true} -} - -// Read reads up to len(p) bytes into p without exceeding the current transfer -// rate limit. It returns (0, nil) immediately if r is non-blocking and no new -// bytes can be read at this time. -func (r *Reader) Read(p []byte) (n int, err error) { - p = p[:r.Limit(len(p), r.limit, r.block)] - if len(p) > 0 { - n, err = r.IO(r.Reader.Read(p)) - } - return -} - -// SetLimit changes the transfer rate limit to new bytes per second and returns -// the previous setting. -func (r *Reader) SetLimit(new int64) (old int64) { - old, r.limit = r.limit, new - return -} - -// SetBlocking changes the blocking behavior and returns the previous setting. A -// Read call on a non-blocking reader returns immediately if no additional bytes -// may be read at this time due to the rate limit. -func (r *Reader) SetBlocking(new bool) (old bool) { - old, r.block = r.block, new - return -} - -// Close closes the underlying reader if it implements the io.Closer interface. -func (r *Reader) Close() error { - defer r.Done() - if c, ok := r.Reader.(io.Closer); ok { - return c.Close() - } - return nil -} - -// Writer implements io.WriteCloser with a restriction on the rate of data -// transfer. -type Writer struct { - io.Writer // Data destination - *Monitor // Flow control monitor - - limit int64 // Rate limit in bytes per second (unlimited when <= 0) - block bool // What to do when no new bytes can be written due to the limit -} - -// NewWriter restricts all Write operations on w to limit bytes per second. The -// transfer rate and the default blocking behavior (true) can be changed -// directly on the returned *Writer. -func NewWriter(w io.Writer, limit int64) *Writer { - return &Writer{w, New(0, 0), limit, true} -} - -// Write writes len(p) bytes from p to the underlying data stream without -// exceeding the current transfer rate limit. It returns (n, ErrLimit) if w is -// non-blocking and no additional bytes can be written at this time. -func (w *Writer) Write(p []byte) (n int, err error) { - var c int - for len(p) > 0 && err == nil { - s := p[:w.Limit(len(p), w.limit, w.block)] - if len(s) > 0 { - c, err = w.IO(w.Writer.Write(s)) - } else { - return n, ErrLimit - } - p = p[c:] - n += c - } - return -} - -// SetLimit changes the transfer rate limit to new bytes per second and returns -// the previous setting. -func (w *Writer) SetLimit(new int64) (old int64) { - old, w.limit = w.limit, new - return -} - -// SetBlocking changes the blocking behavior and returns the previous setting. A -// Write call on a non-blocking writer returns as soon as no additional bytes -// may be written at this time due to the rate limit. -func (w *Writer) SetBlocking(new bool) (old bool) { - old, w.block = w.block, new - return -} - -// Close closes the underlying writer if it implements the io.Closer interface. -func (w *Writer) Close() error { - defer w.Done() - if c, ok := w.Writer.(io.Closer); ok { - return c.Close() - } - return nil -} diff --git a/internal/libs/flowrate/io_test.go b/internal/libs/flowrate/io_test.go deleted file mode 100644 index 4d7de417e..000000000 --- a/internal/libs/flowrate/io_test.go +++ /dev/null @@ -1,197 +0,0 @@ -// -// Written by Maxim Khitrov (November 2012) -// - -package flowrate - -import ( - "bytes" - "testing" - "time" -) - -const ( - _50ms = 50 * time.Millisecond - _100ms = 100 * time.Millisecond - _200ms = 200 * time.Millisecond - _300ms = 300 * time.Millisecond - _400ms = 400 * time.Millisecond - _500ms = 500 * time.Millisecond -) - -func nextStatus(m *Monitor) Status { - samples := m.samples - for i := 0; i < 30; i++ { - if s := m.Status(); s.Samples != samples { - return s - } - time.Sleep(5 * time.Millisecond) - } - return m.Status() -} - -func TestReader(t *testing.T) { - in := make([]byte, 100) - for i := range in { - in[i] = byte(i) - } - b := make([]byte, 100) - r := NewReader(bytes.NewReader(in), 100) - start := time.Now() - - // Make sure r implements Limiter - _ = Limiter(r) - - // 1st read of 10 bytes is performed immediately - if n, err := r.Read(b); n != 10 || err != nil { - t.Fatalf("r.Read(b) expected 10 (); got %v (%v)", n, err) - } else if rt := time.Since(start); rt > _50ms { - t.Fatalf("r.Read(b) took too long (%v)", rt) - } - - // No new Reads allowed in the current sample - r.SetBlocking(false) - if n, err := r.Read(b); n != 0 || err != nil { - t.Fatalf("r.Read(b) expected 0 (); got %v (%v)", n, err) - } else if rt := time.Since(start); rt > _50ms { - t.Fatalf("r.Read(b) took too long (%v)", rt) - } - - status := [6]Status{0: r.Status()} // No samples in the first status - - // 2nd read of 10 bytes blocks until the next sample - r.SetBlocking(true) - if n, err := r.Read(b[10:]); n != 10 || err != nil { - t.Fatalf("r.Read(b[10:]) expected 10 (); got %v (%v)", n, err) - } else if rt := time.Since(start); rt < _100ms { - t.Fatalf("r.Read(b[10:]) returned ahead of time (%v)", rt) - } - - status[1] = r.Status() // 1st sample - status[2] = nextStatus(r.Monitor) // 2nd sample - status[3] = nextStatus(r.Monitor) // No activity for the 3rd sample - - if n := r.Done(); n != 20 { - t.Fatalf("r.Done() expected 20; got %v", n) - } - - status[4] = r.Status() - status[5] = nextStatus(r.Monitor) // Timeout - start = status[0].Start - - // Active, Bytes, Samples, InstRate, CurRate, AvgRate, PeakRate, BytesRem, Start, Duration, Idle, TimeRem, Progress - want := []Status{ - {start, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, true}, - {start, 10, 1, 100, 100, 100, 100, 0, _100ms, 0, 0, 0, true}, - {start, 20, 2, 100, 100, 100, 100, 0, _200ms, _100ms, 0, 0, true}, - {start, 20, 3, 0, 90, 67, 100, 0, _300ms, _200ms, 0, 0, true}, - {start, 20, 3, 0, 0, 67, 100, 0, _300ms, 0, 0, 0, false}, - {start, 20, 3, 0, 0, 67, 100, 0, _300ms, 0, 0, 0, false}, - } - for i, s := range status { - s := s - if !statusesAreEqual(&s, &want[i]) { - t.Errorf("r.Status(%v)\nexpected: %v\ngot : %v", i, want[i], s) - } - } - if !bytes.Equal(b[:20], in[:20]) { - t.Errorf("r.Read() input doesn't match output") - } -} - -func TestWriter(t *testing.T) { - b := make([]byte, 100) - for i := range b { - b[i] = byte(i) - } - w := NewWriter(&bytes.Buffer{}, 200) - start := time.Now() - - // Make sure w implements Limiter - _ = Limiter(w) - - // Non-blocking 20-byte write for the first sample returns ErrLimit - w.SetBlocking(false) - if n, err := w.Write(b); n != 20 || err != ErrLimit { - t.Fatalf("w.Write(b) expected 20 (ErrLimit); got %v (%v)", n, err) - } else if rt := time.Since(start); rt > _50ms { - t.Fatalf("w.Write(b) took too long (%v)", rt) - } - - // Blocking 80-byte write - w.SetBlocking(true) - if n, err := w.Write(b[20:]); n != 80 || err != nil { - t.Fatalf("w.Write(b[20:]) expected 80 (); got %v (%v)", n, err) - } else if rt := time.Since(start); rt < _300ms { - // Explanation for `rt < _300ms` (as opposed to `< _400ms`) - // - // |<-- start | | - // epochs: -----0ms|---100ms|---200ms|---300ms|---400ms - // sends: 20|20 |20 |20 |20# - // - // NOTE: The '#' symbol can thus happen before 400ms is up. - // Thus, we can only panic if rt < _300ms. - t.Fatalf("w.Write(b[20:]) returned ahead of time (%v)", rt) - } - - w.SetTransferSize(100) - status := []Status{w.Status(), nextStatus(w.Monitor)} - start = status[0].Start - - // Active, Bytes, Samples, InstRate, CurRate, AvgRate, PeakRate, BytesRem, Start, Duration, Idle, TimeRem, Progress - want := []Status{ - {start, 80, 4, 200, 200, 200, 200, 20, _400ms, 0, _100ms, 80000, true}, - {start, 100, 5, 200, 200, 200, 200, 0, _500ms, _100ms, 0, 100000, true}, - } - - for i, s := range status { - s := s - if !statusesAreEqual(&s, &want[i]) { - t.Errorf("w.Status(%v)\nexpected: %v\ngot : %v\n", i, want[i], s) - } - } - if !bytes.Equal(b, w.Writer.(*bytes.Buffer).Bytes()) { - t.Errorf("w.Write() input doesn't match output") - } -} - -const maxDeviationForDuration = 50 * time.Millisecond -const maxDeviationForRate int64 = 50 - -// statusesAreEqual returns true if s1 is equal to s2. Equality here means -// general equality of fields except for the duration and rates, which can -// drift due to unpredictable delays (e.g. thread wakes up 25ms after -// `time.Sleep` has ended). -func statusesAreEqual(s1 *Status, s2 *Status) bool { - if s1.Active == s2.Active && - s1.Start == s2.Start && - durationsAreEqual(s1.Duration, s2.Duration, maxDeviationForDuration) && - s1.Idle == s2.Idle && - s1.Bytes == s2.Bytes && - s1.Samples == s2.Samples && - ratesAreEqual(s1.InstRate, s2.InstRate, maxDeviationForRate) && - ratesAreEqual(s1.CurRate, s2.CurRate, maxDeviationForRate) && - ratesAreEqual(s1.AvgRate, s2.AvgRate, maxDeviationForRate) && - ratesAreEqual(s1.PeakRate, s2.PeakRate, maxDeviationForRate) && - s1.BytesRem == s2.BytesRem && - durationsAreEqual(s1.TimeRem, s2.TimeRem, maxDeviationForDuration) && - s1.Progress == s2.Progress { - return true - } - return false -} - -func durationsAreEqual(d1 time.Duration, d2 time.Duration, maxDeviation time.Duration) bool { - return d2-d1 <= maxDeviation -} - -func ratesAreEqual(r1 int64, r2 int64, maxDeviation int64) bool { - sub := r1 - r2 - if sub < 0 { - sub = -sub - } - if sub <= maxDeviation { - return true - } - return false -} diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index a99e83dc5..4782d7717 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -14,7 +14,7 @@ import ( "github.com/gogo/protobuf/proto" - flow "github.com/tendermint/tendermint/internal/libs/flowrate" + "github.com/tendermint/tendermint/internal/libs/flowrate" "github.com/tendermint/tendermint/internal/libs/protoio" tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/internal/libs/timer" @@ -77,8 +77,8 @@ type MConnection struct { conn net.Conn bufConnReader *bufio.Reader bufConnWriter *bufio.Writer - sendMonitor *flow.Monitor - recvMonitor *flow.Monitor + sendMonitor *flowrate.Monitor + recvMonitor *flowrate.Monitor send chan struct{} pong chan struct{} channels []*channel @@ -175,8 +175,8 @@ func NewMConnectionWithConfig( conn: conn, bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize), bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize), - sendMonitor: flow.New(0, 0), - recvMonitor: flow.New(0, 0), + sendMonitor: flowrate.New(0, 0), + recvMonitor: flowrate.New(0, 0), send: make(chan struct{}, 1), pong: make(chan struct{}, 1), onReceive: onReceive,