|
@ -185,19 +185,13 @@ func (c *MConnection) OnStart() error { |
|
|
|
|
|
|
|
|
// OnStop implements BaseService
|
|
|
// OnStop implements BaseService
|
|
|
func (c *MConnection) OnStop() { |
|
|
func (c *MConnection) OnStop() { |
|
|
c.Logger.Debug("MConn.OnStop") |
|
|
|
|
|
c.BaseService.OnStop() |
|
|
c.BaseService.OnStop() |
|
|
c.Logger.Debug("MConn.flushTimer.Stop") |
|
|
|
|
|
c.flushTimer.Stop() |
|
|
c.flushTimer.Stop() |
|
|
c.Logger.Debug("MConn.pingTimer.Stop") |
|
|
|
|
|
c.pingTimer.Stop() |
|
|
c.pingTimer.Stop() |
|
|
c.Logger.Debug("MConn.chStatsTimer.Stop") |
|
|
|
|
|
c.chStatsTimer.Stop() |
|
|
c.chStatsTimer.Stop() |
|
|
if c.quit != nil { |
|
|
if c.quit != nil { |
|
|
c.Logger.Debug("MConn: Close Quit") |
|
|
|
|
|
close(c.quit) |
|
|
close(c.quit) |
|
|
} |
|
|
} |
|
|
c.Logger.Debug("MConn.conn.Close()") |
|
|
|
|
|
c.conn.Close() // nolint: errcheck
|
|
|
c.conn.Close() // nolint: errcheck
|
|
|
// We can't close pong safely here because
|
|
|
// We can't close pong safely here because
|
|
|
// recvRoutine may write to it after we've stopped.
|
|
|
// recvRoutine may write to it after we've stopped.
|
|
@ -339,7 +333,6 @@ FOR_LOOP: |
|
|
case <-c.send: |
|
|
case <-c.send: |
|
|
// Send some msgPackets
|
|
|
// Send some msgPackets
|
|
|
eof := c.sendSomeMsgPackets() |
|
|
eof := c.sendSomeMsgPackets() |
|
|
c.Logger.Debug("finished sendSomeMsgPackets", "eof", eof) |
|
|
|
|
|
if !eof { |
|
|
if !eof { |
|
|
// Keep sendRoutine awake.
|
|
|
// Keep sendRoutine awake.
|
|
|
select { |
|
|
select { |
|
@ -359,7 +352,6 @@ FOR_LOOP: |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
c.Logger.Debug("sendRoutine: End") |
|
|
|
|
|
// Cleanup
|
|
|
// Cleanup
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -369,12 +361,10 @@ func (c *MConnection) sendSomeMsgPackets() bool { |
|
|
// Block until .sendMonitor says we can write.
|
|
|
// Block until .sendMonitor says we can write.
|
|
|
// Once we're ready we send more than we asked for,
|
|
|
// Once we're ready we send more than we asked for,
|
|
|
// but amortized it should even out.
|
|
|
// but amortized it should even out.
|
|
|
c.Logger.Debug("sendMonitor.Limit") |
|
|
|
|
|
c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true) |
|
|
c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true) |
|
|
|
|
|
|
|
|
// Now send some msgPackets.
|
|
|
// Now send some msgPackets.
|
|
|
for i := 0; i < numBatchMsgPackets; i++ { |
|
|
for i := 0; i < numBatchMsgPackets; i++ { |
|
|
c.Logger.Debug("sendMsgPacket", "i", i) |
|
|
|
|
|
if c.sendMsgPacket() { |
|
|
if c.sendMsgPacket() { |
|
|
return true |
|
|
return true |
|
|
} |
|
|
} |
|
@ -403,7 +393,6 @@ func (c *MConnection) sendMsgPacket() bool { |
|
|
|
|
|
|
|
|
// Nothing to send?
|
|
|
// Nothing to send?
|
|
|
if leastChannel == nil { |
|
|
if leastChannel == nil { |
|
|
c.Logger.Debug("Least channel == nil") |
|
|
|
|
|
return true |
|
|
return true |
|
|
} else { |
|
|
} else { |
|
|
// c.Logger.Info("Found a msgPacket to send")
|
|
|
// c.Logger.Info("Found a msgPacket to send")
|
|
@ -416,7 +405,6 @@ func (c *MConnection) sendMsgPacket() bool { |
|
|
c.stopForError(err) |
|
|
c.stopForError(err) |
|
|
return true |
|
|
return true |
|
|
} |
|
|
} |
|
|
c.Logger.Debug("sendMonitor.Update") |
|
|
|
|
|
c.sendMonitor.Update(int(n)) |
|
|
c.sendMonitor.Update(int(n)) |
|
|
c.flushTimer.Set() |
|
|
c.flushTimer.Set() |
|
|
return false |
|
|
return false |
|
@ -430,7 +418,6 @@ func (c *MConnection) recvRoutine() { |
|
|
|
|
|
|
|
|
FOR_LOOP: |
|
|
FOR_LOOP: |
|
|
for { |
|
|
for { |
|
|
c.Logger.Debug("recvRoutine: recvMonitor.Limit") |
|
|
|
|
|
// Block until .recvMonitor says we can read.
|
|
|
// Block until .recvMonitor says we can read.
|
|
|
c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true) |
|
|
c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true) |
|
|
|
|
|
|
|
@ -452,9 +439,7 @@ FOR_LOOP: |
|
|
// Read packet type
|
|
|
// Read packet type
|
|
|
var n int |
|
|
var n int |
|
|
var err error |
|
|
var err error |
|
|
c.Logger.Debug("recvRoutine: ReadByte") |
|
|
|
|
|
pktType := wire.ReadByte(c.bufReader, &n, &err) |
|
|
pktType := wire.ReadByte(c.bufReader, &n, &err) |
|
|
c.Logger.Debug("recvRoutine: recvMonitor.Update") |
|
|
|
|
|
c.recvMonitor.Update(int(n)) |
|
|
c.recvMonitor.Update(int(n)) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if c.IsRunning() { |
|
|
if c.IsRunning() { |
|
@ -470,15 +455,12 @@ FOR_LOOP: |
|
|
// TODO: prevent abuse, as they cause flush()'s.
|
|
|
// TODO: prevent abuse, as they cause flush()'s.
|
|
|
c.Logger.Debug("Receive Ping") |
|
|
c.Logger.Debug("Receive Ping") |
|
|
c.pong <- struct{}{} |
|
|
c.pong <- struct{}{} |
|
|
c.Logger.Debug("recvRoutine: trigger pong") |
|
|
|
|
|
case packetTypePong: |
|
|
case packetTypePong: |
|
|
// do nothing
|
|
|
// do nothing
|
|
|
c.Logger.Debug("Receive Pong") |
|
|
c.Logger.Debug("Receive Pong") |
|
|
case packetTypeMsg: |
|
|
case packetTypeMsg: |
|
|
pkt, n, err := msgPacket{}, int(0), error(nil) |
|
|
pkt, n, err := msgPacket{}, int(0), error(nil) |
|
|
c.Logger.Debug("recvRoutine: ReadBinaryPtr") |
|
|
|
|
|
wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err) |
|
|
wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err) |
|
|
c.Logger.Debug("recvRoutine: recvMonitor.Update") |
|
|
|
|
|
c.recvMonitor.Update(int(n)) |
|
|
c.recvMonitor.Update(int(n)) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if c.IsRunning() { |
|
|
if c.IsRunning() { |
|
@ -494,9 +476,7 @@ FOR_LOOP: |
|
|
c.stopForError(err) |
|
|
c.stopForError(err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
c.Logger.Debug("recvRoutine: recvMsgPacket") |
|
|
|
|
|
msgBytes, err := channel.recvMsgPacket(pkt) |
|
|
msgBytes, err := channel.recvMsgPacket(pkt) |
|
|
c.Logger.Debug("recvRoutine: msgBytes", "msgBytes", msgBytes, "err", err) |
|
|
|
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if c.IsRunning() { |
|
|
if c.IsRunning() { |
|
|
c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err) |
|
|
c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err) |
|
@ -508,7 +488,6 @@ FOR_LOOP: |
|
|
c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes) |
|
|
c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes) |
|
|
// NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
|
|
|
// NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
|
|
|
c.onReceive(pkt.ChannelID, msgBytes) |
|
|
c.onReceive(pkt.ChannelID, msgBytes) |
|
|
c.Logger.Debug("done onReceive") |
|
|
|
|
|
} |
|
|
} |
|
|
default: |
|
|
default: |
|
|
err := fmt.Errorf("Unknown message type %X", pktType) |
|
|
err := fmt.Errorf("Unknown message type %X", pktType) |
|
@ -518,11 +497,8 @@ FOR_LOOP: |
|
|
|
|
|
|
|
|
// TODO: shouldn't this go in the sendRoutine?
|
|
|
// TODO: shouldn't this go in the sendRoutine?
|
|
|
// Better to send a ping packet when *we* haven't sent anything for a while.
|
|
|
// Better to send a ping packet when *we* haven't sent anything for a while.
|
|
|
c.Logger.Debug("pingTimer.Reset()") |
|
|
|
|
|
c.pingTimer.Reset() |
|
|
c.pingTimer.Reset() |
|
|
c.Logger.Debug("done pingTimer.Reset()") |
|
|
|
|
|
} |
|
|
} |
|
|
c.Logger.Debug("recvRoutine: End") |
|
|
|
|
|
|
|
|
|
|
|
// Cleanup
|
|
|
// Cleanup
|
|
|
close(c.pong) |
|
|
close(c.pong) |
|
|