|
@ -21,14 +21,14 @@ const ( |
|
|
minReadBufferSize = 1024 |
|
|
minReadBufferSize = 1024 |
|
|
minWriteBufferSize = 65536 |
|
|
minWriteBufferSize = 65536 |
|
|
idleTimeoutMinutes = 5 |
|
|
idleTimeoutMinutes = 5 |
|
|
updateStatsSeconds = 2 |
|
|
|
|
|
pingTimeoutSeconds = 40 |
|
|
|
|
|
flushThrottleMS = 100 |
|
|
|
|
|
|
|
|
updateState = 2 * time.Second |
|
|
|
|
|
pingTimeout = 40 * time.Second |
|
|
|
|
|
flushThrottle = 100 * time.Millisecond |
|
|
|
|
|
|
|
|
defaultSendQueueCapacity = 1 |
|
|
defaultSendQueueCapacity = 1 |
|
|
defaultRecvBufferCapacity = 4096 |
|
|
defaultRecvBufferCapacity = 4096 |
|
|
defaultRecvMessageCapacity = 22020096 // 21MB
|
|
|
defaultRecvMessageCapacity = 22020096 // 21MB
|
|
|
defaultSendTimeoutSeconds = 10 |
|
|
|
|
|
|
|
|
defaultSendTimeout = 10 * time.Second |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
type receiveCbFunc func(chID byte, msgBytes []byte) |
|
|
type receiveCbFunc func(chID byte, msgBytes []byte) |
|
@ -131,9 +131,9 @@ func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescript |
|
|
func (c *MConnection) OnStart() error { |
|
|
func (c *MConnection) OnStart() error { |
|
|
c.BaseService.OnStart() |
|
|
c.BaseService.OnStart() |
|
|
c.quit = make(chan struct{}) |
|
|
c.quit = make(chan struct{}) |
|
|
c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottleMS*time.Millisecond) |
|
|
|
|
|
c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeoutSeconds*time.Second) |
|
|
|
|
|
c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStatsSeconds*time.Second) |
|
|
|
|
|
|
|
|
c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle) |
|
|
|
|
|
c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout) |
|
|
|
|
|
c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState) |
|
|
go c.sendRoutine() |
|
|
go c.sendRoutine() |
|
|
go c.recvRoutine() |
|
|
go c.recvRoutine() |
|
|
return nil |
|
|
return nil |
|
@ -538,9 +538,9 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { |
|
|
|
|
|
|
|
|
// Queues message to send to this channel.
|
|
|
// Queues message to send to this channel.
|
|
|
// Goroutine-safe
|
|
|
// Goroutine-safe
|
|
|
// Times out (and returns false) after defaultSendTimeoutSeconds
|
|
|
|
|
|
|
|
|
// Times out (and returns false) after defaultSendTimeout
|
|
|
func (ch *Channel) sendBytes(bytes []byte) bool { |
|
|
func (ch *Channel) sendBytes(bytes []byte) bool { |
|
|
timeout := time.NewTimer(defaultSendTimeoutSeconds * time.Second) |
|
|
|
|
|
|
|
|
timeout := time.NewTimer(defaultSendTimeout) |
|
|
select { |
|
|
select { |
|
|
case <-timeout.C: |
|
|
case <-timeout.C: |
|
|
// timeout
|
|
|
// timeout
|
|
|