|
@ -18,7 +18,7 @@ import ( |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
const ( |
|
|
const ( |
|
|
defaultMaxPacketMsgSize = 1024 |
|
|
|
|
|
|
|
|
defaultMaxPacketMsgPayloadSize = 1024 |
|
|
|
|
|
|
|
|
numBatchPacketMsgs = 10 |
|
|
numBatchPacketMsgs = 10 |
|
|
minReadBufferSize = 1024 |
|
|
minReadBufferSize = 1024 |
|
@ -96,7 +96,7 @@ type MConnection struct { |
|
|
|
|
|
|
|
|
created time.Time // time of creation
|
|
|
created time.Time // time of creation
|
|
|
|
|
|
|
|
|
emptyPacketMsgSize int |
|
|
|
|
|
|
|
|
_maxPacketMsgSize int |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// MConnConfig is a MConnection configuration.
|
|
|
// MConnConfig is a MConnection configuration.
|
|
@ -105,7 +105,7 @@ type MConnConfig struct { |
|
|
RecvRate int64 `mapstructure:"recv_rate"` |
|
|
RecvRate int64 `mapstructure:"recv_rate"` |
|
|
|
|
|
|
|
|
// Maximum payload size
|
|
|
// Maximum payload size
|
|
|
MaxPacketMsgSize int `mapstructure:"max_packet_msg_size"` |
|
|
|
|
|
|
|
|
MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"` |
|
|
|
|
|
|
|
|
// Interval to flush writes (throttled)
|
|
|
// Interval to flush writes (throttled)
|
|
|
FlushThrottle time.Duration `mapstructure:"flush_throttle"` |
|
|
FlushThrottle time.Duration `mapstructure:"flush_throttle"` |
|
@ -120,12 +120,12 @@ type MConnConfig struct { |
|
|
// DefaultMConnConfig returns the default config.
|
|
|
// DefaultMConnConfig returns the default config.
|
|
|
func DefaultMConnConfig() MConnConfig { |
|
|
func DefaultMConnConfig() MConnConfig { |
|
|
return MConnConfig{ |
|
|
return MConnConfig{ |
|
|
SendRate: defaultSendRate, |
|
|
|
|
|
RecvRate: defaultRecvRate, |
|
|
|
|
|
MaxPacketMsgSize: defaultMaxPacketMsgSize, |
|
|
|
|
|
FlushThrottle: defaultFlushThrottle, |
|
|
|
|
|
PingInterval: defaultPingInterval, |
|
|
|
|
|
PongTimeout: defaultPongTimeout, |
|
|
|
|
|
|
|
|
SendRate: defaultSendRate, |
|
|
|
|
|
RecvRate: defaultRecvRate, |
|
|
|
|
|
MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize, |
|
|
|
|
|
FlushThrottle: defaultFlushThrottle, |
|
|
|
|
|
PingInterval: defaultPingInterval, |
|
|
|
|
|
PongTimeout: defaultPongTimeout, |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -146,17 +146,16 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
mconn := &MConnection{ |
|
|
mconn := &MConnection{ |
|
|
conn: conn, |
|
|
|
|
|
bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize), |
|
|
|
|
|
bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize), |
|
|
|
|
|
sendMonitor: flow.New(0, 0), |
|
|
|
|
|
recvMonitor: flow.New(0, 0), |
|
|
|
|
|
send: make(chan struct{}, 1), |
|
|
|
|
|
pong: make(chan struct{}, 1), |
|
|
|
|
|
onReceive: onReceive, |
|
|
|
|
|
onError: onError, |
|
|
|
|
|
config: config, |
|
|
|
|
|
emptyPacketMsgSize: emptyPacketMsgSize(), |
|
|
|
|
|
|
|
|
conn: conn, |
|
|
|
|
|
bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize), |
|
|
|
|
|
bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize), |
|
|
|
|
|
sendMonitor: flow.New(0, 0), |
|
|
|
|
|
recvMonitor: flow.New(0, 0), |
|
|
|
|
|
send: make(chan struct{}, 1), |
|
|
|
|
|
pong: make(chan struct{}, 1), |
|
|
|
|
|
onReceive: onReceive, |
|
|
|
|
|
onError: onError, |
|
|
|
|
|
config: config, |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Create channels
|
|
|
// Create channels
|
|
@ -173,6 +172,9 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec |
|
|
|
|
|
|
|
|
mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn) |
|
|
mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn) |
|
|
|
|
|
|
|
|
|
|
|
// maxPacketMsgSize() is a bit heavy, so call just once
|
|
|
|
|
|
mconn._maxPacketMsgSize = mconn.maxPacketMsgSize() |
|
|
|
|
|
|
|
|
return mconn |
|
|
return mconn |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -397,7 +399,7 @@ func (c *MConnection) sendSomePacketMsgs() bool { |
|
|
// Block until .sendMonitor says we can write.
|
|
|
// Block until .sendMonitor says we can write.
|
|
|
// Once we're ready we send more than we asked for,
|
|
|
// Once we're ready we send more than we asked for,
|
|
|
// but amortized it should even out.
|
|
|
// but amortized it should even out.
|
|
|
c.sendMonitor.Limit(c.config.MaxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true) |
|
|
|
|
|
|
|
|
c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true) |
|
|
|
|
|
|
|
|
// Now send some PacketMsgs.
|
|
|
// Now send some PacketMsgs.
|
|
|
for i := 0; i < numBatchPacketMsgs; i++ { |
|
|
for i := 0; i < numBatchPacketMsgs; i++ { |
|
@ -455,7 +457,7 @@ func (c *MConnection) recvRoutine() { |
|
|
FOR_LOOP: |
|
|
FOR_LOOP: |
|
|
for { |
|
|
for { |
|
|
// Block until .recvMonitor says we can read.
|
|
|
// Block until .recvMonitor says we can read.
|
|
|
c.recvMonitor.Limit(c.config.MaxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true) |
|
|
|
|
|
|
|
|
c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true) |
|
|
|
|
|
|
|
|
// Peek into bufConnReader for debugging
|
|
|
// Peek into bufConnReader for debugging
|
|
|
/* |
|
|
/* |
|
@ -475,7 +477,7 @@ FOR_LOOP: |
|
|
var packet Packet |
|
|
var packet Packet |
|
|
var _n int64 |
|
|
var _n int64 |
|
|
var err error |
|
|
var err error |
|
|
_n, err = cdc.UnmarshalBinaryReader(c.bufConnReader, &packet, int64(c.config.MaxPacketMsgSize)) |
|
|
|
|
|
|
|
|
_n, err = cdc.UnmarshalBinaryReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize)) |
|
|
c.recvMonitor.Update(int(_n)) |
|
|
c.recvMonitor.Update(int(_n)) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if c.IsRunning() { |
|
|
if c.IsRunning() { |
|
@ -548,6 +550,16 @@ func (c *MConnection) stopPongTimer() { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// maxPacketMsgSize returns a maximum size of PacketMsg, including the overhead
|
|
|
|
|
|
// of amino encoding.
|
|
|
|
|
|
func (c *MConnection) maxPacketMsgSize() int { |
|
|
|
|
|
return len(cdc.MustMarshalBinary(PacketMsg{ |
|
|
|
|
|
ChannelID: 0x01, |
|
|
|
|
|
EOF: 1, |
|
|
|
|
|
Bytes: make([]byte, c.config.MaxPacketMsgPayloadSize), |
|
|
|
|
|
})) + 10 // leave room for changes in amino
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
type ConnectionStatus struct { |
|
|
type ConnectionStatus struct { |
|
|
Duration time.Duration |
|
|
Duration time.Duration |
|
|
SendMonitor flow.Status |
|
|
SendMonitor flow.Status |
|
@ -631,7 +643,7 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { |
|
|
desc: desc, |
|
|
desc: desc, |
|
|
sendQueue: make(chan []byte, desc.SendQueueCapacity), |
|
|
sendQueue: make(chan []byte, desc.SendQueueCapacity), |
|
|
recving: make([]byte, 0, desc.RecvBufferCapacity), |
|
|
recving: make([]byte, 0, desc.RecvBufferCapacity), |
|
|
maxPacketMsgPayloadSize: conn.config.MaxPacketMsgSize, |
|
|
|
|
|
|
|
|
maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize, |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -694,7 +706,7 @@ func (ch *Channel) isSendPending() bool { |
|
|
func (ch *Channel) nextPacketMsg() PacketMsg { |
|
|
func (ch *Channel) nextPacketMsg() PacketMsg { |
|
|
packet := PacketMsg{} |
|
|
packet := PacketMsg{} |
|
|
packet.ChannelID = byte(ch.desc.ID) |
|
|
packet.ChannelID = byte(ch.desc.ID) |
|
|
maxSize := ch.maxPacketMsgPayloadSize - ch.conn.emptyPacketMsgSize |
|
|
|
|
|
|
|
|
maxSize := ch.maxPacketMsgPayloadSize |
|
|
packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))] |
|
|
packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))] |
|
|
if len(ch.sending) <= maxSize { |
|
|
if len(ch.sending) <= maxSize { |
|
|
packet.EOF = byte(0x01) |
|
|
packet.EOF = byte(0x01) |
|
@ -780,25 +792,3 @@ type PacketMsg struct { |
|
|
func (mp PacketMsg) String() string { |
|
|
func (mp PacketMsg) String() string { |
|
|
return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF) |
|
|
return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// - Uvarint length of MustMarshalBinary(packet) = 1 or 2 bytes
|
|
|
|
|
|
// (as long as it's less than 16,384 bytes)
|
|
|
|
|
|
// - Prefix bytes = 4 bytes
|
|
|
|
|
|
// - ChannelID field key + byte = 2 bytes
|
|
|
|
|
|
// - EOF field key + byte = 2 bytes
|
|
|
|
|
|
// - Bytes field key = 1 bytes
|
|
|
|
|
|
// - Uvarint length of MustMarshalBinary(bytes) = 1 or 2 bytes
|
|
|
|
|
|
// - Struct terminator = 1 byte
|
|
|
|
|
|
// = up to 14 bytes overhead for the packet.
|
|
|
|
|
|
|
|
|
|
|
|
func emptyPacketMsgSize() int { |
|
|
|
|
|
emptyPacketMsgSize := len(cdc.MustMarshalBinary(PacketMsg{ |
|
|
|
|
|
ChannelID: 0x01, |
|
|
|
|
|
EOF: 1, |
|
|
|
|
|
Bytes: make([]byte, 1), |
|
|
|
|
|
})) |
|
|
|
|
|
// -1 byte of data
|
|
|
|
|
|
// +1 byte because uvarint length of MustMarshalBinary(bytes) will be 2 bytes for big packets
|
|
|
|
|
|
// +1 byte because uvarint length of MustMarshalBinary(packet) will be 2 bytes for big packets
|
|
|
|
|
|
return emptyPacketMsgSize - 1 + 1 + 1 |
|
|
|
|
|
} |
|
|
|