diff --git a/addrbook.go b/addrbook.go index 367ced8d9..a951dffd3 100644 --- a/addrbook.go +++ b/addrbook.go @@ -65,7 +65,8 @@ const ( minGetSelection = 32 // max addresses returned by GetSelection - maxGetSelection = 2500 + // NOTE: this must match "maxPexMessageSize" + maxGetSelection = 250 // current version of the on-disk format. serializationVersion = 1 diff --git a/connection.go b/connection.go index baa0dc1b4..494fdd753 100644 --- a/connection.go +++ b/connection.go @@ -16,18 +16,19 @@ import ( ) const ( - numBatchMsgPackets = 10 - minReadBufferSize = 1024 - minWriteBufferSize = 1024 - idleTimeoutMinutes = 5 - updateStatsSeconds = 2 - pingTimeoutSeconds = 40 - defaultSendRate = 51200 // 50Kb/s - defaultRecvRate = 51200 // 50Kb/s - flushThrottleMS = 100 - defaultSendQueueCapacity = 1 - defaultRecvBufferCapacity = 4096 - defaultSendTimeoutSeconds = 10 + numBatchMsgPackets = 10 + minReadBufferSize = 1024 + minWriteBufferSize = 1024 + idleTimeoutMinutes = 5 + updateStatsSeconds = 2 + pingTimeoutSeconds = 40 + defaultSendRate = 51200 // 50KB/s + defaultRecvRate = 51200 // 50KB/s + flushThrottleMS = 100 + defaultSendQueueCapacity = 1 + defaultRecvBufferCapacity = 4096 + defaultRecvMessageCapacity = 22020096 // 21MB + defaultSendTimeoutSeconds = 10 ) type receiveCbFunc func(chID byte, msgBytes []byte) @@ -259,7 +260,7 @@ func (c *MConnection) sendRoutine() { FOR_LOOP: for { - var n int64 + var n int var err error select { case <-c.flushTimer.Ch: @@ -313,7 +314,7 @@ func (c *MConnection) sendSomeMsgPackets() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. - c.sendMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.sendRate), true) + c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.sendRate), true) // Now send some msgPackets. for i := 0; i < numBatchMsgPackets; i++ { @@ -371,7 +372,7 @@ func (c *MConnection) recvRoutine() { FOR_LOOP: for { // Block until .recvMonitor says we can read. - c.recvMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.recvRate), true) + c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.recvRate), true) /* // Peek into bufReader for debugging @@ -389,7 +390,7 @@ FOR_LOOP: */ // Read packet type - var n int64 + var n int var err error pktType := wire.ReadByte(c.bufReader, &n, &err) c.recvMonitor.Update(int(n)) @@ -411,8 +412,8 @@ FOR_LOOP: // do nothing log.Info("Receive Pong") case packetTypeMsg: - pkt, n, err := msgPacket{}, int64(0), error(nil) - wire.ReadBinaryPtr(&pkt, c.bufReader, &n, &err) + pkt, n, err := msgPacket{}, int(0), error(nil) + wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err) c.recvMonitor.Update(int(n)) if err != nil { if c.IsRunning() { @@ -456,10 +457,11 @@ FOR_LOOP: //----------------------------------------------------------------------------- type ChannelDescriptor struct { - ID byte - Priority int - SendQueueCapacity int - RecvBufferCapacity int + ID byte + Priority int + SendQueueCapacity int + RecvBufferCapacity int + RecvMessageCapacity int } func (chDesc *ChannelDescriptor) FillDefaults() { @@ -469,6 +471,9 @@ func (chDesc *ChannelDescriptor) FillDefaults() { if chDesc.RecvBufferCapacity == 0 { chDesc.RecvBufferCapacity = defaultRecvBufferCapacity } + if chDesc.RecvMessageCapacity == 0 { + chDesc.RecvMessageCapacity = defaultRecvMessageCapacity + } } // TODO: lowercase. @@ -557,27 +562,27 @@ func (ch *Channel) isSendPending() bool { func (ch *Channel) nextMsgPacket() msgPacket { packet := msgPacket{} packet.ChannelID = byte(ch.id) - packet.Bytes = ch.sending[:MinInt(maxMsgPacketSize, len(ch.sending))] - if len(ch.sending) <= maxMsgPacketSize { + packet.Bytes = ch.sending[:MinInt(maxMsgPacketPayloadSize, len(ch.sending))] + if len(ch.sending) <= maxMsgPacketPayloadSize { packet.EOF = byte(0x01) ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize } else { packet.EOF = byte(0x00) - ch.sending = ch.sending[MinInt(maxMsgPacketSize, len(ch.sending)):] + ch.sending = ch.sending[MinInt(maxMsgPacketPayloadSize, len(ch.sending)):] } return packet } // Writes next msgPacket to w. // Not goroutine-safe -func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int64, err error) { +func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) { packet := ch.nextMsgPacket() log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet) wire.WriteByte(packetTypeMsg, w, &n, &err) wire.WriteBinary(packet, w, &n, &err) if err != nil { - ch.recentlySent += n + ch.recentlySent += int64(n) } return } @@ -586,7 +591,7 @@ func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int64, err error) { // Not goroutine-safe func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet) - if wire.MaxBinaryReadSize < len(ch.recving)+len(packet.Bytes) { + if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) { return nil, wire.ErrBinaryReadSizeOverflow } ch.recving = append(ch.recving, packet.Bytes...) @@ -609,10 +614,12 @@ func (ch *Channel) updateStats() { //----------------------------------------------------------------------------- const ( - maxMsgPacketSize = 1024 - packetTypePing = byte(0x01) - packetTypePong = byte(0x02) - packetTypeMsg = byte(0x03) + maxMsgPacketPayloadSize = 1024 + maxMsgPacketOverheadSize = 10 // It's actually lower but good enough + maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize + packetTypePing = byte(0x01) + packetTypePong = byte(0x02) + packetTypeMsg = byte(0x03) ) // Messages in channels are chopped into smaller msgPackets for multiplexing. @@ -625,16 +632,3 @@ type msgPacket struct { func (p msgPacket) String() string { return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF) } - -//----------------------------------------------------------------------------- - -// Convenience struct for writing typed messages. -// Reading requires a custom decoder that switches on the first type byte of a byteslice. -type TypedMessage struct { - Type byte - Msg interface{} -} - -func (tm TypedMessage) String() string { - return fmt.Sprintf("TMsg{%X:%v}", tm.Type, tm.Msg) -} diff --git a/peer.go b/peer.go index 61e7cd688..47a6955a9 100644 --- a/peer.go +++ b/peer.go @@ -28,12 +28,12 @@ func peerHandshake(conn net.Conn, ourNodeInfo *NodeInfo) (*NodeInfo, error) { var err2 error Parallel( func() { - var n int64 + var n int wire.WriteBinary(ourNodeInfo, conn, &n, &err1) }, func() { - var n int64 - wire.ReadBinary(peerNodeInfo, conn, &n, &err2) + var n int + wire.ReadBinary(peerNodeInfo, conn, maxNodeInfoSize, &n, &err2) log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo) }) if err1 != nil { @@ -112,7 +112,9 @@ func (p *Peer) CanSend(chID byte) bool { } func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { - wire.WriteString(p.Key, w, &n, &err) + var n_ int + wire.WriteString(p.Key, w, &n_, &err) + n += int64(n_) return } diff --git a/pex_reactor.go b/pex_reactor.go index b6d8bf0e1..1a421b366 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -18,6 +18,7 @@ const ( PexChannel = byte(0x00) ensurePeersPeriodSeconds = 30 minNumOutboundPeers = 10 + maxPexMessageSize = 1048576 // 1MB ) /* @@ -227,9 +228,9 @@ var _ = wire.RegisterInterface( func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) { msgType = bz[0] - n := new(int64) + n := new(int) r := bytes.NewReader(bz) - msg = wire.ReadBinary(struct{ PexMessage }{}, r, n, &err).(struct{ PexMessage }).PexMessage + msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage return } diff --git a/secret_connection.go b/secret_connection.go index bec465e62..1e1cdb4b2 100644 --- a/secret_connection.go +++ b/secret_connection.go @@ -279,8 +279,8 @@ func shareAuthSignature(sc *SecretConnection, pubKey crypto.PubKeyEd25519, signa if err2 != nil { return } - n := int64(0) // not used. - recvMsg = wire.ReadBinary(authSigMessage{}, bytes.NewBuffer(readBuffer), &n, &err2).(authSigMessage) + n := int(0) // not used. + recvMsg = wire.ReadBinary(authSigMessage{}, bytes.NewBuffer(readBuffer), authSigMsgSize, &n, &err2).(authSigMessage) }) if err1 != nil { diff --git a/types.go b/types.go index 0e7fc681d..fa139124e 100644 --- a/types.go +++ b/types.go @@ -9,6 +9,8 @@ import ( "github.com/tendermint/go-crypto" ) +const maxNodeInfoSize = 10240 // 10Kb + type NodeInfo struct { PubKey crypto.PubKeyEd25519 `json:"pub_key"` Moniker string `json:"moniker"`