Browse Source

p2p+flowrate: rate control refactor (#7828)

Adding `CurrentTransferRate ` in the flowrate package because only the status of the transfer rate has been used.
pull/8109/head
JayT106 2 years ago
committed by GitHub
parent
commit
d9c9675e2a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 15 additions and 3 deletions
  1. +1
    -1
      internal/blocksync/pool.go
  2. +12
    -0
      internal/libs/flowrate/flowrate.go
  3. +2
    -2
      internal/p2p/conn/connection.go

+ 1
- 1
internal/blocksync/pool.go View File

@ -168,7 +168,7 @@ func (pool *BlockPool) removeTimedoutPeers() {
for _, peer := range pool.peers {
// check if peer timed out
if !peer.didTimeout && peer.numPending > 0 {
curRate := peer.recvMonitor.Status().CurRate
curRate := peer.recvMonitor.CurrentTransferRate()
// curRate can be 0 on start
if curRate != 0 && curRate < minRecvRate {
err := errors.New("peer is not sending us data fast enough")


+ 12
- 0
internal/libs/flowrate/flowrate.go View File

@ -275,3 +275,15 @@ func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
}
return now
}
// CurrentTransferRate returns the current transfer rate
func (m *Monitor) CurrentTransferRate() int64 {
m.mu.Lock()
defer m.mu.Unlock()
if m.sLast > m.start && m.active {
return round(m.rEMA)
}
return 0
}

+ 2
- 2
internal/p2p/conn/connection.go View File

@ -413,7 +413,7 @@ func (c *MConnection) sendSomePacketMsgs(ctx context.Context) bool {
// Block until .sendMonitor says we can write.
// Once we're ready we send more than we asked for,
// but amortized it should even out.
c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
c.sendMonitor.Limit(c._maxPacketMsgSize, c.config.SendRate, true)
// Now send some PacketMsgs.
for i := 0; i < numBatchPacketMsgs; i++ {
@ -481,7 +481,7 @@ FOR_LOOP:
}
// Block until .recvMonitor says we can read.
c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
c.recvMonitor.Limit(c._maxPacketMsgSize, c.config.RecvRate, true)
// Peek into bufConnReader for debugging
/*


Loading…
Cancel
Save