From daa258ea6d42141ed3d65394fc6c85890f8e24b5 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 31 Aug 2017 00:51:43 -0400 Subject: [PATCH] p2p: put maxMsgPacketPayloadSize, recvRate, sendRate in config Updates #628 --- config/config.go | 22 ++++++++++++++----- p2p/connection.go | 55 ++++++++++++++++++++++++++++------------------- p2p/switch.go | 10 +++++++-- 3 files changed, 58 insertions(+), 29 deletions(-) diff --git a/config/config.go b/config/config.go index cc1c7d719..e29f37e41 100644 --- a/config/config.go +++ b/config/config.go @@ -224,16 +224,28 @@ type P2PConfig struct { // Time to wait before flushing messages out on the connection. In ms FlushThrottleTimeout int `mapstructure:"flush_throttle_timeout"` + + // Maximum size of a message packet payload + MaxMsgPacketPayloadSize int `mapstructure:"max_msg_packet_payload_size"` + + // Rate at which packets can be sent (in bytes/second) + SendRate int64 `mapstructure:"send_rate"` + + // Rate at which packets can be received (in bytes/second) + RecvRate int64 `mapstructure:"recv_rate"` } // DefaultP2PConfig returns a default configuration for the peer-to-peer layer func DefaultP2PConfig() *P2PConfig { return &P2PConfig{ - ListenAddress: "tcp://0.0.0.0:46656", - AddrBook: "addrbook.json", - AddrBookStrict: true, - MaxNumPeers: 50, - FlushThrottleTimeout: 100, + ListenAddress: "tcp://0.0.0.0:46656", + AddrBook: "addrbook.json", + AddrBookStrict: true, + MaxNumPeers: 50, + FlushThrottleTimeout: 100, + MaxMsgPacketPayloadSize: 1024, // 1 kB + SendRate: 512000, // 500 kB/s + RecvRate: 512000, // 500 kB/s } } diff --git a/p2p/connection.go b/p2p/connection.go index 1d97d4559..7d99e1ed3 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -22,15 +22,15 @@ const ( updateState = 2 * time.Second pingTimeout = 40 * time.Second - // flushThrottle used here as a default. - // overwritten by the user config. - // TODO: remove - flushThrottle = 100 * time.Millisecond + // some of these defaults are written in the user config + // flushThrottle, sendRate, recvRate + // TODO: remove values present in config + defaultFlushThrottle = 100 * time.Millisecond defaultSendQueueCapacity = 1 - defaultSendRate = int64(512000) // 500KB/s defaultRecvBufferCapacity = 4096 defaultRecvMessageCapacity = 22020096 // 21MB + defaultSendRate = int64(512000) // 500KB/s defaultRecvRate = int64(512000) // 500KB/s defaultSendTimeout = 10 * time.Second ) @@ -94,15 +94,22 @@ type MConnConfig struct { SendRate int64 `mapstructure:"send_rate"` RecvRate int64 `mapstructure:"recv_rate"` + maxMsgPacketPayloadSize int + flushThrottle time.Duration } +func (cfg *MConnConfig) maxMsgPacketTotalSize() int { + return cfg.maxMsgPacketPayloadSize + maxMsgPacketOverheadSize +} + // DefaultMConnConfig returns the default config. func DefaultMConnConfig() *MConnConfig { return &MConnConfig{ - SendRate: defaultSendRate, - RecvRate: defaultRecvRate, - flushThrottle: flushThrottle, + SendRate: defaultSendRate, + RecvRate: defaultRecvRate, + maxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize, + flushThrottle: defaultFlushThrottle, } } @@ -342,7 +349,7 @@ func (c *MConnection) sendSomeMsgPackets() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. - c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true) + c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true) // Now send some msgPackets. for i := 0; i < numBatchMsgPackets; i++ { @@ -400,7 +407,7 @@ func (c *MConnection) recvRoutine() { FOR_LOOP: for { // Block until .recvMonitor says we can read. - c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true) + c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true) /* // Peek into bufReader for debugging @@ -441,7 +448,7 @@ FOR_LOOP: c.Logger.Debug("Receive Pong") case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) - wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err) + wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err) c.recvMonitor.Update(int(n)) if err != nil { if c.IsRunning() { @@ -547,6 +554,8 @@ type Channel struct { sending []byte priority int recentlySent int64 // exponential moving average + + maxMsgPacketPayloadSize int } func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { @@ -555,12 +564,13 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { cmn.PanicSanity("Channel default priority must be a postive integer") } return &Channel{ - conn: conn, - desc: desc, - id: desc.ID, - sendQueue: make(chan []byte, desc.SendQueueCapacity), - recving: make([]byte, 0, desc.RecvBufferCapacity), - priority: desc.Priority, + conn: conn, + desc: desc, + id: desc.ID, + sendQueue: make(chan []byte, desc.SendQueueCapacity), + recving: make([]byte, 0, desc.RecvBufferCapacity), + priority: desc.Priority, + maxMsgPacketPayloadSize: conn.config.maxMsgPacketPayloadSize, } } @@ -619,14 +629,15 @@ func (ch *Channel) isSendPending() bool { func (ch *Channel) nextMsgPacket() msgPacket { packet := msgPacket{} packet.ChannelID = byte(ch.id) - packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))] - if len(ch.sending) <= maxMsgPacketPayloadSize { + maxSize := ch.maxMsgPacketPayloadSize + packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))] + if len(ch.sending) <= maxSize { packet.EOF = byte(0x01) ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize } else { packet.EOF = byte(0x00) - ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):] + ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):] } return packet } @@ -675,9 +686,9 @@ func (ch *Channel) updateStats() { //----------------------------------------------------------------------------- const ( - maxMsgPacketPayloadSize = 1024 + defaultMaxMsgPacketPayloadSize = 1024 + maxMsgPacketOverheadSize = 10 // It's actually lower but good enough - maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize packetTypePing = byte(0x01) packetTypePong = byte(0x02) packetTypeMsg = byte(0x03) diff --git a/p2p/switch.go b/p2p/switch.go index 107c1d9d8..d92dd6370 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -90,7 +90,13 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { dialing: cmn.NewCMap(), nodeInfo: nil, } - sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond // TODO: collapse the peerConfig into the config ? + + // TODO: collapse the peerConfig into the config ? + sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond + sw.peerConfig.MConfig.SendRate = config.SendRate + sw.peerConfig.MConfig.RecvRate = config.RecvRate + sw.peerConfig.MConfig.maxMsgPacketPayloadSize = config.MaxMsgPacketPayloadSize + sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) return sw } @@ -176,7 +182,7 @@ func (sw *Switch) OnStart() error { return err } } - + // Start listeners for _, listener := range sw.listeners { go sw.listenerRoutine(listener)