@ -18,7 +18,7 @@ import (
)
const (
defaultMaxPacketMsgSize = 1024
defaultMaxPacketMsgPayload Size = 1024
numBatchPacketMsgs = 10
minReadBufferSize = 1024
@ -96,7 +96,7 @@ type MConnection struct {
created time . Time // time of creation
empty PacketMsgSize int
_max PacketMsgSize int
}
// MConnConfig is a MConnection configuration.
@ -105,7 +105,7 @@ type MConnConfig struct {
RecvRate int64 ` mapstructure:"recv_rate" `
// Maximum payload size
MaxPacketMsgSize int ` mapstructure:"max_packet_msg_size" `
MaxPacketMsgPayload Size int ` mapstructure:"max_packet_msg_payload _size" `
// Interval to flush writes (throttled)
FlushThrottle time . Duration ` mapstructure:"flush_throttle" `
@ -120,12 +120,12 @@ type MConnConfig struct {
// DefaultMConnConfig returns the default config.
func DefaultMConnConfig ( ) MConnConfig {
return MConnConfig {
SendRate : defaultSendRate ,
RecvRate : defaultRecvRate ,
MaxPacketMsgSize : defaultMaxPacketMsgSize ,
FlushThrottle : defaultFlushThrottle ,
PingInterval : defaultPingInterval ,
PongTimeout : defaultPongTimeout ,
SendRate : defaultSendRate ,
RecvRate : defaultRecvRate ,
MaxPacketMsgPayload Size : defaultMaxPacketMsgPayload Size ,
FlushThrottle : defaultFlushThrottle ,
PingInterval : defaultPingInterval ,
PongTimeout : defaultPongTimeout ,
}
}
@ -146,17 +146,16 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec
}
mconn := & MConnection {
conn : conn ,
bufConnReader : bufio . NewReaderSize ( conn , minReadBufferSize ) ,
bufConnWriter : bufio . NewWriterSize ( conn , minWriteBufferSize ) ,
sendMonitor : flow . New ( 0 , 0 ) ,
recvMonitor : flow . New ( 0 , 0 ) ,
send : make ( chan struct { } , 1 ) ,
pong : make ( chan struct { } , 1 ) ,
onReceive : onReceive ,
onError : onError ,
config : config ,
emptyPacketMsgSize : emptyPacketMsgSize ( config . MaxPacketMsgSize ) ,
conn : conn ,
bufConnReader : bufio . NewReaderSize ( conn , minReadBufferSize ) ,
bufConnWriter : bufio . NewWriterSize ( conn , minWriteBufferSize ) ,
sendMonitor : flow . New ( 0 , 0 ) ,
recvMonitor : flow . New ( 0 , 0 ) ,
send : make ( chan struct { } , 1 ) ,
pong : make ( chan struct { } , 1 ) ,
onReceive : onReceive ,
onError : onError ,
config : config ,
}
// Create channels
@ -173,6 +172,9 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec
mconn . BaseService = * cmn . NewBaseService ( nil , "MConnection" , mconn )
// maxPacketMsgSize() is a bit heavy, so call just once
mconn . _maxPacketMsgSize = mconn . maxPacketMsgSize ( )
return mconn
}
@ -397,7 +399,7 @@ func (c *MConnection) sendSomePacketMsgs() bool {
// Block until .sendMonitor says we can write.
// Once we're ready we send more than we asked for,
// but amortized it should even out.
c . sendMonitor . Limit ( c . config . M axPacketMsgSize, atomic . LoadInt64 ( & c . config . SendRate ) , true )
c . sendMonitor . Limit ( c . _m axPacketMsgSize, atomic . LoadInt64 ( & c . config . SendRate ) , true )
// Now send some PacketMsgs.
for i := 0 ; i < numBatchPacketMsgs ; i ++ {
@ -455,7 +457,7 @@ func (c *MConnection) recvRoutine() {
FOR_LOOP :
for {
// Block until .recvMonitor says we can read.
c . recvMonitor . Limit ( c . config . M axPacketMsgSize, atomic . LoadInt64 ( & c . config . RecvRate ) , true )
c . recvMonitor . Limit ( c . _m axPacketMsgSize, atomic . LoadInt64 ( & c . config . RecvRate ) , true )
// Peek into bufConnReader for debugging
/ *
@ -475,7 +477,7 @@ FOR_LOOP:
var packet Packet
var _n int64
var err error
_n , err = cdc . UnmarshalBinaryReader ( c . bufConnReader , & packet , int64 ( c . config . M axPacketMsgSize) )
_n , err = cdc . UnmarshalBinaryReader ( c . bufConnReader , & packet , int64 ( c . _m axPacketMsgSize) )
c . recvMonitor . Update ( int ( _n ) )
if err != nil {
if c . IsRunning ( ) {
@ -548,6 +550,16 @@ func (c *MConnection) stopPongTimer() {
}
}
// maxPacketMsgSize returns a maximum size of PacketMsg, including the overhead
// of amino encoding.
func ( c * MConnection ) maxPacketMsgSize ( ) int {
return len ( cdc . MustMarshalBinary ( PacketMsg {
ChannelID : 0x01 ,
EOF : 1 ,
Bytes : make ( [ ] byte , c . config . MaxPacketMsgPayloadSize ) ,
} ) ) + 10 // leave room for changes in amino
}
type ConnectionStatus struct {
Duration time . Duration
SendMonitor flow . Status
@ -631,7 +643,7 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
desc : desc ,
sendQueue : make ( chan [ ] byte , desc . SendQueueCapacity ) ,
recving : make ( [ ] byte , 0 , desc . RecvBufferCapacity ) ,
maxPacketMsgPayloadSize : conn . config . MaxPacketMsgSize - conn . emptyPacketMsg Size ,
maxPacketMsgPayloadSize : conn . config . MaxPacketMsgPayload Size ,
}
}
@ -780,24 +792,3 @@ type PacketMsg struct {
func ( mp PacketMsg ) String ( ) string {
return fmt . Sprintf ( "PacketMsg{%X:%X T:%X}" , mp . ChannelID , mp . Bytes , mp . EOF )
}
// - Uvarint length of MustMarshalBinary(packet) = 1 or 2 bytes
// (as long as it's less than 16,384 bytes)
// - Prefix bytes = 4 bytes
// - ChannelID field key + byte = 2 bytes
// - EOF field key + byte = 2 bytes
// - Bytes field key = 1 bytes
// - Uvarint length of MustMarshalBinary(bytes) = 1 or 2 bytes
// = up to 13 bytes overhead for the packet.
func emptyPacketMsgSize ( maxPayloadSize int ) int {
emptyPacketMsgSize := len ( cdc . MustMarshalBinary ( PacketMsg {
ChannelID : 0x01 ,
EOF : 1 ,
Bytes : make ( [ ] byte , maxPayloadSize ) ,
} ) )
// -1 byte of data
// +1 byte because uvarint length of MustMarshalBinary(bytes) will be 2 bytes for big packets
// +1 byte because uvarint length of MustMarshalBinary(packet) will be 2 bytes for big packets
return emptyPacketMsgSize - maxPayloadSize + 1 + 1
}