diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index f3d4a9e0a..4c905c660 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -168,7 +168,7 @@ func (pool *BlockPool) removeTimedoutPeers() { for _, peer := range pool.peers { // check if peer timed out if !peer.didTimeout && peer.numPending > 0 { - curRate := peer.recvMonitor.Status().CurRate + curRate := peer.recvMonitor.CurrentTransferRate() // curRate can be 0 on start if curRate != 0 && curRate < minRecvRate { err := errors.New("peer is not sending us data fast enough") diff --git a/internal/libs/flowrate/flowrate.go b/internal/libs/flowrate/flowrate.go index c2234669b..aaa54a22c 100644 --- a/internal/libs/flowrate/flowrate.go +++ b/internal/libs/flowrate/flowrate.go @@ -275,3 +275,15 @@ func (m *Monitor) waitNextSample(now time.Duration) time.Duration { } return now } + +// CurrentTransferRate returns the current transfer rate +func (m *Monitor) CurrentTransferRate() int64 { + m.mu.Lock() + defer m.mu.Unlock() + + if m.sLast > m.start && m.active { + return round(m.rEMA) + } + + return 0 +} diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index 693a7ce58..4cbca7f19 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -413,7 +413,7 @@ func (c *MConnection) sendSomePacketMsgs(ctx context.Context) bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. - c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true) + c.sendMonitor.Limit(c._maxPacketMsgSize, c.config.SendRate, true) // Now send some PacketMsgs. for i := 0; i < numBatchPacketMsgs; i++ { @@ -481,7 +481,7 @@ FOR_LOOP: } // Block until .recvMonitor says we can read. - c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true) + c.recvMonitor.Limit(c._maxPacketMsgSize, c.config.RecvRate, true) // Peek into bufConnReader for debugging /*