|
|
@ -22,15 +22,15 @@ const ( |
|
|
|
updateState = 2 * time.Second |
|
|
|
pingTimeout = 40 * time.Second |
|
|
|
|
|
|
|
// flushThrottle used here as a default.
|
|
|
|
// overwritten by the user config.
|
|
|
|
// TODO: remove
|
|
|
|
flushThrottle = 100 * time.Millisecond |
|
|
|
// some of these defaults are written in the user config
|
|
|
|
// flushThrottle, sendRate, recvRate
|
|
|
|
// TODO: remove values present in config
|
|
|
|
defaultFlushThrottle = 100 * time.Millisecond |
|
|
|
|
|
|
|
defaultSendQueueCapacity = 1 |
|
|
|
defaultSendRate = int64(512000) // 500KB/s
|
|
|
|
defaultRecvBufferCapacity = 4096 |
|
|
|
defaultRecvMessageCapacity = 22020096 // 21MB
|
|
|
|
defaultSendRate = int64(512000) // 500KB/s
|
|
|
|
defaultRecvRate = int64(512000) // 500KB/s
|
|
|
|
defaultSendTimeout = 10 * time.Second |
|
|
|
) |
|
|
@ -94,15 +94,22 @@ type MConnConfig struct { |
|
|
|
SendRate int64 `mapstructure:"send_rate"` |
|
|
|
RecvRate int64 `mapstructure:"recv_rate"` |
|
|
|
|
|
|
|
maxMsgPacketPayloadSize int |
|
|
|
|
|
|
|
flushThrottle time.Duration |
|
|
|
} |
|
|
|
|
|
|
|
func (cfg *MConnConfig) maxMsgPacketTotalSize() int { |
|
|
|
return cfg.maxMsgPacketPayloadSize + maxMsgPacketOverheadSize |
|
|
|
} |
|
|
|
|
|
|
|
// DefaultMConnConfig returns the default config.
|
|
|
|
func DefaultMConnConfig() *MConnConfig { |
|
|
|
return &MConnConfig{ |
|
|
|
SendRate: defaultSendRate, |
|
|
|
RecvRate: defaultRecvRate, |
|
|
|
flushThrottle: flushThrottle, |
|
|
|
SendRate: defaultSendRate, |
|
|
|
RecvRate: defaultRecvRate, |
|
|
|
maxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize, |
|
|
|
flushThrottle: defaultFlushThrottle, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -342,7 +349,7 @@ func (c *MConnection) sendSomeMsgPackets() bool { |
|
|
|
// Block until .sendMonitor says we can write.
|
|
|
|
// Once we're ready we send more than we asked for,
|
|
|
|
// but amortized it should even out.
|
|
|
|
c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true) |
|
|
|
c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true) |
|
|
|
|
|
|
|
// Now send some msgPackets.
|
|
|
|
for i := 0; i < numBatchMsgPackets; i++ { |
|
|
@ -400,7 +407,7 @@ func (c *MConnection) recvRoutine() { |
|
|
|
FOR_LOOP: |
|
|
|
for { |
|
|
|
// Block until .recvMonitor says we can read.
|
|
|
|
c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true) |
|
|
|
c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true) |
|
|
|
|
|
|
|
/* |
|
|
|
// Peek into bufReader for debugging
|
|
|
@ -441,7 +448,7 @@ FOR_LOOP: |
|
|
|
c.Logger.Debug("Receive Pong") |
|
|
|
case packetTypeMsg: |
|
|
|
pkt, n, err := msgPacket{}, int(0), error(nil) |
|
|
|
wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err) |
|
|
|
wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err) |
|
|
|
c.recvMonitor.Update(int(n)) |
|
|
|
if err != nil { |
|
|
|
if c.IsRunning() { |
|
|
@ -547,6 +554,8 @@ type Channel struct { |
|
|
|
sending []byte |
|
|
|
priority int |
|
|
|
recentlySent int64 // exponential moving average
|
|
|
|
|
|
|
|
maxMsgPacketPayloadSize int |
|
|
|
} |
|
|
|
|
|
|
|
func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { |
|
|
@ -555,12 +564,13 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { |
|
|
|
cmn.PanicSanity("Channel default priority must be a postive integer") |
|
|
|
} |
|
|
|
return &Channel{ |
|
|
|
conn: conn, |
|
|
|
desc: desc, |
|
|
|
id: desc.ID, |
|
|
|
sendQueue: make(chan []byte, desc.SendQueueCapacity), |
|
|
|
recving: make([]byte, 0, desc.RecvBufferCapacity), |
|
|
|
priority: desc.Priority, |
|
|
|
conn: conn, |
|
|
|
desc: desc, |
|
|
|
id: desc.ID, |
|
|
|
sendQueue: make(chan []byte, desc.SendQueueCapacity), |
|
|
|
recving: make([]byte, 0, desc.RecvBufferCapacity), |
|
|
|
priority: desc.Priority, |
|
|
|
maxMsgPacketPayloadSize: conn.config.maxMsgPacketPayloadSize, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -619,14 +629,15 @@ func (ch *Channel) isSendPending() bool { |
|
|
|
func (ch *Channel) nextMsgPacket() msgPacket { |
|
|
|
packet := msgPacket{} |
|
|
|
packet.ChannelID = byte(ch.id) |
|
|
|
packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))] |
|
|
|
if len(ch.sending) <= maxMsgPacketPayloadSize { |
|
|
|
maxSize := ch.maxMsgPacketPayloadSize |
|
|
|
packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))] |
|
|
|
if len(ch.sending) <= maxSize { |
|
|
|
packet.EOF = byte(0x01) |
|
|
|
ch.sending = nil |
|
|
|
atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
|
|
|
|
} else { |
|
|
|
packet.EOF = byte(0x00) |
|
|
|
ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):] |
|
|
|
ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):] |
|
|
|
} |
|
|
|
return packet |
|
|
|
} |
|
|
@ -675,9 +686,9 @@ func (ch *Channel) updateStats() { |
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
const ( |
|
|
|
maxMsgPacketPayloadSize = 1024 |
|
|
|
defaultMaxMsgPacketPayloadSize = 1024 |
|
|
|
|
|
|
|
maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
|
|
|
|
maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize |
|
|
|
packetTypePing = byte(0x01) |
|
|
|
packetTypePong = byte(0x02) |
|
|
|
packetTypeMsg = byte(0x03) |
|
|
|