|
@ -123,7 +123,7 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei |
|
|
// .Start() begins multiplexing packets to and from "channels".
|
|
|
// .Start() begins multiplexing packets to and from "channels".
|
|
|
func (c *MConnection) Start() { |
|
|
func (c *MConnection) Start() { |
|
|
if atomic.CompareAndSwapUint32(&c.started, 0, 1) { |
|
|
if atomic.CompareAndSwapUint32(&c.started, 0, 1) { |
|
|
log.Debug(Fmt("Starting %v", c)) |
|
|
|
|
|
|
|
|
log.Debug("Starting MConnection", "connection", c) |
|
|
go c.sendRoutine() |
|
|
go c.sendRoutine() |
|
|
go c.recvRoutine() |
|
|
go c.recvRoutine() |
|
|
} |
|
|
} |
|
@ -131,7 +131,7 @@ func (c *MConnection) Start() { |
|
|
|
|
|
|
|
|
func (c *MConnection) Stop() { |
|
|
func (c *MConnection) Stop() { |
|
|
if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { |
|
|
if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { |
|
|
log.Debug(Fmt("Stopping %v", c)) |
|
|
|
|
|
|
|
|
log.Debug("Stopping MConnection", "connection", c) |
|
|
close(c.quit) |
|
|
close(c.quit) |
|
|
c.conn.Close() |
|
|
c.conn.Close() |
|
|
c.flushTimer.Stop() |
|
|
c.flushTimer.Stop() |
|
@ -153,7 +153,7 @@ func (c *MConnection) flush() { |
|
|
err := c.bufWriter.Flush() |
|
|
err := c.bufWriter.Flush() |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if atomic.LoadUint32(&c.stopped) != 1 { |
|
|
if atomic.LoadUint32(&c.stopped) != 1 { |
|
|
log.Warn(Fmt("MConnection flush failed: %v", err)) |
|
|
|
|
|
|
|
|
log.Warn("MConnection flush failed", "error", err) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -209,7 +209,7 @@ func (c *MConnection) TrySend(chId byte, msg interface{}) bool { |
|
|
return false |
|
|
return false |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
log.Debug(Fmt("[%X] TRYSEND %v: %v", chId, c.RemoteAddress, msg)) |
|
|
|
|
|
|
|
|
log.Debug("TrySend", "channel", chId, "connection", c, "msg", msg) |
|
|
|
|
|
|
|
|
// Send message to channel.
|
|
|
// Send message to channel.
|
|
|
channel, ok := c.channelsIdx[chId] |
|
|
channel, ok := c.channelsIdx[chId] |
|
@ -286,7 +286,7 @@ FOR_LOOP: |
|
|
break FOR_LOOP |
|
|
break FOR_LOOP |
|
|
} |
|
|
} |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
log.Warn(Fmt("%v failed @ sendRoutine:\n%v", c, err)) |
|
|
|
|
|
|
|
|
log.Warn("Connection failed @ sendRoutine", "connection", c, "error", err) |
|
|
c.Stop() |
|
|
c.Stop() |
|
|
break FOR_LOOP |
|
|
break FOR_LOOP |
|
|
} |
|
|
} |
|
@ -341,7 +341,7 @@ func (c *MConnection) sendMsgPacket() bool { |
|
|
// Make & send a msgPacket from this channel
|
|
|
// Make & send a msgPacket from this channel
|
|
|
n, err := leastChannel.writeMsgPacketTo(c.bufWriter) |
|
|
n, err := leastChannel.writeMsgPacketTo(c.bufWriter) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
log.Warn(Fmt("Failed to write msgPacket. Error: %v", err)) |
|
|
|
|
|
|
|
|
log.Warn("Failed to write msgPacket", "error", err) |
|
|
c.stopForError(err) |
|
|
c.stopForError(err) |
|
|
return true |
|
|
return true |
|
|
} |
|
|
} |
|
@ -361,6 +361,18 @@ FOR_LOOP: |
|
|
// Block until .recvMonitor says we can read.
|
|
|
// Block until .recvMonitor says we can read.
|
|
|
c.recvMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.recvRate), true) |
|
|
c.recvMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.recvRate), true) |
|
|
|
|
|
|
|
|
|
|
|
// Peek into bufReader for debugging
|
|
|
|
|
|
log.Debug("Peek connection buffer", "bytes", log15.Lazy{func() []byte { |
|
|
|
|
|
numBytes := c.bufReader.Buffered() |
|
|
|
|
|
bytes, err := c.bufReader.Peek(MinInt(numBytes, 100)) |
|
|
|
|
|
if err == nil { |
|
|
|
|
|
return bytes |
|
|
|
|
|
} else { |
|
|
|
|
|
log.Warn("Error peeking connection buffer", "error", err) |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
}}) |
|
|
|
|
|
|
|
|
// Read packet type
|
|
|
// Read packet type
|
|
|
var n int64 |
|
|
var n int64 |
|
|
var err error |
|
|
var err error |
|
@ -368,23 +380,12 @@ FOR_LOOP: |
|
|
c.recvMonitor.Update(int(n)) |
|
|
c.recvMonitor.Update(int(n)) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if atomic.LoadUint32(&c.stopped) != 1 { |
|
|
if atomic.LoadUint32(&c.stopped) != 1 { |
|
|
log.Warn(Fmt("%v failed @ recvRoutine with err: %v", c, err)) |
|
|
|
|
|
|
|
|
log.Warn("Connection failed @ recvRoutine", "connection", c, "error", err) |
|
|
c.Stop() |
|
|
c.Stop() |
|
|
} |
|
|
} |
|
|
break FOR_LOOP |
|
|
break FOR_LOOP |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Peek into bufReader for debugging
|
|
|
|
|
|
log.Debug("%v", log15.Lazy{func() string { |
|
|
|
|
|
numBytes := c.bufReader.Buffered() |
|
|
|
|
|
bytes, err := c.bufReader.Peek(MinInt(numBytes, 100)) |
|
|
|
|
|
if err == nil { |
|
|
|
|
|
return fmt.Sprintf("recvRoutine packet type %X, peeked: %X", pktType, bytes) |
|
|
|
|
|
} else { |
|
|
|
|
|
return fmt.Sprintf("recvRoutine error: %v", err) |
|
|
|
|
|
} |
|
|
|
|
|
}}) |
|
|
|
|
|
|
|
|
|
|
|
// Read more depending on packet type.
|
|
|
// Read more depending on packet type.
|
|
|
switch pktType { |
|
|
switch pktType { |
|
|
case packetTypePing: |
|
|
case packetTypePing: |
|
@ -408,7 +409,6 @@ FOR_LOOP: |
|
|
panic(Fmt("Unknown channel %X", pkt.ChannelId)) |
|
|
panic(Fmt("Unknown channel %X", pkt.ChannelId)) |
|
|
} |
|
|
} |
|
|
msgBytes := channel.recvMsgPacket(pkt) |
|
|
msgBytes := channel.recvMsgPacket(pkt) |
|
|
log.Warn(Fmt("RECEIVE_MSG_BYTES: %X", msgBytes)) |
|
|
|
|
|
if msgBytes != nil { |
|
|
if msgBytes != nil { |
|
|
c.onReceive(pkt.ChannelId, msgBytes) |
|
|
c.onReceive(pkt.ChannelId, msgBytes) |
|
|
} |
|
|
} |
|
|