diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 18e8bbe45..f3d4a9e0a 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -331,8 +331,16 @@ func (pool *BlockPool) SetPeerRange(peerID types.NodeID, base int64, height int6 peer.base = base peer.height = height } else { - peer = newBPPeer(pool, peerID, base, height) - peer.logger = pool.logger.With("peer", peerID) + peer = &bpPeer{ + pool: pool, + id: peerID, + base: base, + height: height, + numPending: 0, + logger: pool.logger.With("peer", peerID), + startAt: time.Now(), + } + pool.peers[peerID] = peer } @@ -490,24 +498,13 @@ type bpPeer struct { recvMonitor *flowrate.Monitor timeout *time.Timer + startAt time.Time logger log.Logger } -func newBPPeer(pool *BlockPool, peerID types.NodeID, base int64, height int64) *bpPeer { - peer := &bpPeer{ - pool: pool, - id: peerID, - base: base, - height: height, - numPending: 0, - logger: log.NewNopLogger(), - } - return peer -} - func (peer *bpPeer) resetMonitor() { - peer.recvMonitor = flowrate.New(time.Second, time.Second*40) + peer.recvMonitor = flowrate.New(peer.startAt, time.Second, time.Second*40) initialValue := float64(minRecvRate) * math.E peer.recvMonitor.SetREMA(initialValue) } diff --git a/internal/libs/flowrate/flowrate.go b/internal/libs/flowrate/flowrate.go index 2a053805c..c2234669b 100644 --- a/internal/libs/flowrate/flowrate.go +++ b/internal/libs/flowrate/flowrate.go @@ -14,11 +14,12 @@ import ( // Monitor monitors and limits the transfer rate of a data stream. type Monitor struct { - mu sync.Mutex // Mutex guarding access to all internal fields - active bool // Flag indicating an active transfer - start time.Duration // Transfer start time (clock() value) - bytes int64 // Total number of bytes transferred - samples int64 // Total number of samples taken + mu sync.Mutex // Mutex guarding access to all internal fields + active bool // Flag indicating an active transfer + start time.Duration // Transfer start time (clock() value) + pStartAt time.Time // time of process start + bytes int64 // Total number of bytes transferred + samples int64 // Total number of samples taken rSample float64 // Most recent transfer rate sample (bytes per second) rEMA float64 // Exponential moving average of rSample @@ -45,21 +46,22 @@ type Monitor struct { // // The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s, // respectively. -func New(sampleRate, windowSize time.Duration) *Monitor { +func New(startAt time.Time, sampleRate, windowSize time.Duration) *Monitor { if sampleRate = clockRound(sampleRate); sampleRate <= 0 { sampleRate = 5 * clockRate } if windowSize <= 0 { windowSize = 1 * time.Second } - now := clock() + now := clock(startAt) return &Monitor{ - active: true, - start: now, - rWindow: windowSize.Seconds(), - sLast: now, - sRate: sampleRate, - tLast: now, + active: true, + start: now, + rWindow: windowSize.Seconds(), + sLast: now, + sRate: sampleRate, + tLast: now, + pStartAt: startAt, } } @@ -129,7 +131,7 @@ func (m *Monitor) Status() Status { now := m.update(0) s := Status{ Active: m.active, - Start: clockToTime(m.start), + Start: m.pStartAt.Add(m.start), Duration: m.sLast - m.start, Idle: now - m.tLast, Bytes: m.bytes, @@ -222,7 +224,7 @@ func (m *Monitor) update(n int) (now time.Duration) { if !m.active { return } - if now = clock(); n > 0 { + if now = clock(m.pStartAt); n > 0 { m.tLast = now } m.sBytes += int64(n) diff --git a/internal/libs/flowrate/util.go b/internal/libs/flowrate/util.go index b33ddc701..ef66f77e5 100644 --- a/internal/libs/flowrate/util.go +++ b/internal/libs/flowrate/util.go @@ -13,18 +13,9 @@ import ( // clockRate is the resolution and precision of clock(). const clockRate = 20 * time.Millisecond -// czero is the process start time rounded down to the nearest clockRate -// increment. -var czero = time.Now().Round(clockRate) - // clock returns a low resolution timestamp relative to the process start time. -func clock() time.Duration { - return time.Now().Round(clockRate).Sub(czero) -} - -// clockToTime converts a clock() timestamp to an absolute time.Time value. -func clockToTime(c time.Duration) time.Time { - return czero.Add(c) +func clock(startAt time.Time) time.Duration { + return time.Now().Round(clockRate).Sub(startAt) } // clockRound returns d rounded to the nearest clockRate increment. diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index ab0f45739..32e5ca6b8 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -134,6 +134,9 @@ type MConnConfig struct { // Maximum wait time for pongs PongTimeout time.Duration `mapstructure:"pong_timeout"` + + // Process/Transport Start time + StartTime time.Time `mapstructure:",omitempty"` } // DefaultMConnConfig returns the default config. @@ -145,33 +148,17 @@ func DefaultMConnConfig() MConnConfig { FlushThrottle: defaultFlushThrottle, PingInterval: defaultPingInterval, PongTimeout: defaultPongTimeout, + StartTime: time.Now(), } } -// NewMConnection wraps net.Conn and creates multiplex connection +// NewMConnection wraps net.Conn and creates multiplex connection with a config func NewMConnection( logger log.Logger, conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, -) *MConnection { - return NewMConnectionWithConfig( - logger, - conn, - chDescs, - onReceive, - onError, - DefaultMConnConfig()) -} - -// NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config -func NewMConnectionWithConfig( - logger log.Logger, - conn net.Conn, - chDescs []*ChannelDescriptor, - onReceive receiveCbFunc, - onError errorCbFunc, config MConnConfig, ) *MConnection { if config.PongTimeout >= config.PingInterval { @@ -183,8 +170,8 @@ func NewMConnectionWithConfig( conn: conn, bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize), bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize), - sendMonitor: flowrate.New(0, 0), - recvMonitor: flowrate.New(0, 0), + sendMonitor: flowrate.New(config.StartTime, 0, 0), + recvMonitor: flowrate.New(config.StartTime, 0, 0), send: make(chan struct{}, 1), pong: make(chan struct{}, 1), onReceive: onReceive, diff --git a/internal/p2p/conn/connection_test.go b/internal/p2p/conn/connection_test.go index dbbfe23bd..ab05eef21 100644 --- a/internal/p2p/conn/connection_test.go +++ b/internal/p2p/conn/connection_test.go @@ -42,7 +42,7 @@ func createMConnectionWithCallbacks( cfg.PingInterval = 90 * time.Millisecond cfg.PongTimeout = 45 * time.Millisecond chDescs := []*ChannelDescriptor{{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} - c := NewMConnectionWithConfig(logger, conn, chDescs, onReceive, onError, cfg) + c := NewMConnection(logger, conn, chDescs, onReceive, onError, cfg) return c } @@ -453,7 +453,7 @@ func newClientAndServerConnsForReadErrors( } logger := log.TestingLogger() - mconnClient := NewMConnection(logger.With("module", "client"), client, chDescs, onReceive, onError) + mconnClient := NewMConnection(logger.With("module", "client"), client, chDescs, onReceive, onError, DefaultMConnConfig()) err := mconnClient.Start(ctx) require.NoError(t, err) diff --git a/internal/p2p/transport_mconn.go b/internal/p2p/transport_mconn.go index b52bd5d7b..6e6728f11 100644 --- a/internal/p2p/transport_mconn.go +++ b/internal/p2p/transport_mconn.go @@ -376,7 +376,7 @@ func (c *mConnConnection) handshake( return nil, types.NodeInfo{}, nil, err } - mconn := conn.NewMConnectionWithConfig( + mconn := conn.NewMConnection( c.logger.With("peer", c.RemoteEndpoint().NodeAddress(peerInfo.NodeID)), secretConn, c.channelDescs,