|
|
@ -14,6 +14,7 @@ import ( |
|
|
|
tmlegacy "github.com/tendermint/go-wire/nowriter/tmlegacy" |
|
|
|
cmn "github.com/tendermint/tmlibs/common" |
|
|
|
flow "github.com/tendermint/tmlibs/flowrate" |
|
|
|
"github.com/tendermint/tmlibs/log" |
|
|
|
) |
|
|
|
|
|
|
|
var legacy = tmlegacy.TMEncoderLegacy{} |
|
|
@ -161,6 +162,13 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec |
|
|
|
return mconn |
|
|
|
} |
|
|
|
|
|
|
|
func (c *MConnection) SetLogger(l log.Logger) { |
|
|
|
c.BaseService.SetLogger(l) |
|
|
|
for _, ch := range c.channels { |
|
|
|
ch.SetLogger(l) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// OnStart implements BaseService
|
|
|
|
func (c *MConnection) OnStart() error { |
|
|
|
if err := c.BaseService.OnStart(); err != nil { |
|
|
@ -385,6 +393,7 @@ func (c *MConnection) sendMsgPacket() bool { |
|
|
|
|
|
|
|
// Nothing to send?
|
|
|
|
if leastChannel == nil { |
|
|
|
c.Logger.Debug("Least channel == nil") |
|
|
|
return true |
|
|
|
} else { |
|
|
|
// c.Logger.Info("Found a msgPacket to send")
|
|
|
@ -566,6 +575,8 @@ type Channel struct { |
|
|
|
recentlySent int64 // exponential moving average
|
|
|
|
|
|
|
|
maxMsgPacketPayloadSize int |
|
|
|
|
|
|
|
logger log.Logger |
|
|
|
} |
|
|
|
|
|
|
|
func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { |
|
|
@ -582,6 +593,10 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (ch *Channel) SetLogger(l log.Logger) { |
|
|
|
ch.logger = l |
|
|
|
} |
|
|
|
|
|
|
|
// Queues message to send to this channel.
|
|
|
|
// Goroutine-safe
|
|
|
|
// Times out (and returns false) after defaultSendTimeout
|
|
|
@ -654,7 +669,7 @@ func (ch *Channel) nextMsgPacket() msgPacket { |
|
|
|
// Not goroutine-safe
|
|
|
|
func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) { |
|
|
|
packet := ch.nextMsgPacket() |
|
|
|
// log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
|
|
|
|
ch.logger.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet) |
|
|
|
writeMsgPacketTo(packet, w, &n, &err) |
|
|
|
if err == nil { |
|
|
|
ch.recentlySent += int64(n) |
|
|
@ -670,7 +685,7 @@ func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) { |
|
|
|
// Handles incoming msgPackets. Returns a msg bytes if msg is complete.
|
|
|
|
// Not goroutine-safe
|
|
|
|
func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { |
|
|
|
// log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
|
|
|
|
ch.logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet) |
|
|
|
if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) { |
|
|
|
return nil, wire.ErrBinaryReadOverflow |
|
|
|
} |
|
|
|