|
|
@ -97,12 +97,17 @@ type MConnConfig struct { |
|
|
|
SendRate int64 `mapstructure:"send_rate"` |
|
|
|
RecvRate int64 `mapstructure:"recv_rate"` |
|
|
|
|
|
|
|
MaxMsgPacketPayloadSize int |
|
|
|
// Maximum payload size
|
|
|
|
MaxMsgPacketPayloadSize int `mapstructure:"max_msg_packet_payload_size"` |
|
|
|
|
|
|
|
FlushThrottle time.Duration |
|
|
|
// Interval to flush writes (throttled)
|
|
|
|
FlushThrottle time.Duration `mapstructure:"flush_throttle"` |
|
|
|
|
|
|
|
pingInterval time.Duration |
|
|
|
pongTimeout time.Duration |
|
|
|
// Interval to send pings
|
|
|
|
PingInterval time.Duration `mapstructure:"ping_interval"` |
|
|
|
|
|
|
|
// Maximum wait time for pongs
|
|
|
|
PongTimeout time.Duration `mapstructure:"pong_timeout"` |
|
|
|
} |
|
|
|
|
|
|
|
func (cfg *MConnConfig) maxMsgPacketTotalSize() int { |
|
|
@ -116,8 +121,8 @@ func DefaultMConnConfig() *MConnConfig { |
|
|
|
RecvRate: defaultRecvRate, |
|
|
|
MaxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize, |
|
|
|
FlushThrottle: defaultFlushThrottle, |
|
|
|
pingInterval: defaultPingInterval, |
|
|
|
pongTimeout: defaultPongTimeout, |
|
|
|
PingInterval: defaultPingInterval, |
|
|
|
PongTimeout: defaultPongTimeout, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -133,7 +138,7 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei |
|
|
|
|
|
|
|
// NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
|
|
|
|
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection { |
|
|
|
if config.pongTimeout >= config.pingInterval { |
|
|
|
if config.PongTimeout >= config.PingInterval { |
|
|
|
panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)") |
|
|
|
} |
|
|
|
|
|
|
@ -180,9 +185,9 @@ func (c *MConnection) OnStart() error { |
|
|
|
return err |
|
|
|
} |
|
|
|
c.quit = make(chan struct{}) |
|
|
|
c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle) |
|
|
|
c.pingTimer = cmn.NewRepeatTimer("ping", c.config.pingInterval) |
|
|
|
c.pongTimer = time.NewTimer(c.config.pongTimeout) |
|
|
|
c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) |
|
|
|
c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval) |
|
|
|
c.pongTimer = time.NewTimer(c.config.PongTimeout) |
|
|
|
// we start timer once we've send ping; needed here because we use start
|
|
|
|
// listening in recvRoutine
|
|
|
|
_ = c.pongTimer.Stop() |
|
|
@ -334,7 +339,7 @@ FOR_LOOP: |
|
|
|
c.sendMonitor.Update(int(n)) |
|
|
|
c.flush() |
|
|
|
c.Logger.Debug("Starting pong timer") |
|
|
|
c.pongTimer.Reset(c.config.pongTimeout) |
|
|
|
c.pongTimer.Reset(c.config.PongTimeout) |
|
|
|
case <-c.pongTimer.C: |
|
|
|
c.Logger.Debug("Pong timeout") |
|
|
|
// XXX: should we decrease peer score instead of closing connection?
|
|
|
|