From f28f791fff661e83530a47bef2e18f7f4ee2abef Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 2 Mar 2016 22:32:39 +0000 Subject: [PATCH] make some params configurable --- config.go | 40 ++++++++++++++++++++++++++++++++++++++++ connection.go | 32 ++++++++++++++++++++------------ switch.go | 16 +++++++++------- version.go | 2 +- 4 files changed, 70 insertions(+), 20 deletions(-) create mode 100644 config.go diff --git a/config.go b/config.go new file mode 100644 index 000000000..c5ab0e27b --- /dev/null +++ b/config.go @@ -0,0 +1,40 @@ +package p2p + +import ( + cfg "github.com/tendermint/go-config" +) + +// XXX: go-p2p requires ApplyConfig be called +var config cfg.Config = nil + +func init() { + initConfigureable(dialTimeoutKey, 3) + initConfigureable(handshakeTimeoutKey, 20) + initConfigureable(maxNumPeersKey, 50) + + initConfigureable(sendRateKey, 512000) // 500KB/s + initConfigureable(recvRateKey, 512000) // 500KB/s + + initConfigureable(maxPayloadSizeKey, 1024) + + cfg.OnConfig(func(newConfig cfg.Config) { + config = newConfig + + // fill in any config values that might be missing + for key, value := range defaultConfigValues { + if !config.IsSet(key) { + config.Set(key, value) + } + } + }) + c := cfg.NewMapConfig(nil) + c.Set("log_level", "debug") + cfg.ApplyConfig(c) +} + +// default config map +var defaultConfigValues = make(map[string]int) + +func initConfigureable(key string, value int) { + defaultConfigValues[key] = value +} diff --git a/connection.go b/connection.go index f57f29db4..923a63283 100644 --- a/connection.go +++ b/connection.go @@ -22,8 +22,6 @@ const ( idleTimeoutMinutes = 5 updateStatsSeconds = 2 pingTimeoutSeconds = 40 - defaultSendRate = 512000 // 500KB/s - defaultRecvRate = 512000 // 500KB/s flushThrottleMS = 100 defaultSendQueueCapacity = 1 defaultRecvBufferCapacity = 4096 @@ -31,6 +29,13 @@ const ( defaultSendTimeoutSeconds = 10 ) +// config keys +const ( + sendRateKey = "p2p_send_rate" + recvRateKey = "p2p_recv_rate" + maxPayloadSizeKey = "p2p_max_msg_packet_payload_size" +) + type receiveCbFunc func(chID byte, msgBytes []byte) type errorCbFunc func(interface{}) @@ -94,8 +99,8 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize), sendMonitor: flow.New(0, 0), recvMonitor: flow.New(0, 0), - sendRate: defaultSendRate, - recvRate: defaultRecvRate, + sendRate: int64(config.GetInt(sendRateKey)), + recvRate: int64(config.GetInt(recvRateKey)), send: make(chan struct{}, 1), pong: make(chan struct{}), onReceive: onReceive, @@ -314,7 +319,7 @@ func (c *MConnection) sendSomeMsgPackets() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. - c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.sendRate), true) + c.sendMonitor.Limit(maxMsgPacketTotalSize(), atomic.LoadInt64(&c.sendRate), true) // Now send some msgPackets. for i := 0; i < numBatchMsgPackets; i++ { @@ -372,7 +377,7 @@ func (c *MConnection) recvRoutine() { FOR_LOOP: for { // Block until .recvMonitor says we can read. - c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.recvRate), true) + c.recvMonitor.Limit(maxMsgPacketTotalSize(), atomic.LoadInt64(&c.recvRate), true) /* // Peek into bufReader for debugging @@ -413,7 +418,7 @@ FOR_LOOP: log.Info("Receive Pong") case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) - wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err) + wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize(), &n, &err) c.recvMonitor.Update(int(n)) if err != nil { if c.IsRunning() { @@ -593,14 +598,15 @@ func (ch *Channel) isSendPending() bool { func (ch *Channel) nextMsgPacket() msgPacket { packet := msgPacket{} packet.ChannelID = byte(ch.id) - packet.Bytes = ch.sending[:MinInt(maxMsgPacketPayloadSize, len(ch.sending))] - if len(ch.sending) <= maxMsgPacketPayloadSize { + maxPayloadSize := config.GetInt(maxPayloadSizeKey) + packet.Bytes = ch.sending[:MinInt(maxPayloadSize, len(ch.sending))] + if len(ch.sending) <= maxPayloadSize { packet.EOF = byte(0x01) ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize } else { packet.EOF = byte(0x00) - ch.sending = ch.sending[MinInt(maxMsgPacketPayloadSize, len(ch.sending)):] + ch.sending = ch.sending[MinInt(maxPayloadSize, len(ch.sending)):] } return packet } @@ -644,10 +650,12 @@ func (ch *Channel) updateStats() { //----------------------------------------------------------------------------- +func maxMsgPacketTotalSize() int { + return config.GetInt(maxPayloadSizeKey) + maxMsgPacketOverheadSize +} + const ( - maxMsgPacketPayloadSize = 1024 maxMsgPacketOverheadSize = 10 // It's actually lower but good enough - maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize packetTypePing = byte(0x01) packetTypePong = byte(0x02) packetTypeMsg = byte(0x03) diff --git a/switch.go b/switch.go index 518c6dfa2..59ce3860e 100644 --- a/switch.go +++ b/switch.go @@ -70,10 +70,11 @@ var ( ErrSwitchMaxPeersPerIPRange = errors.New("IP range has too many peers") ) +// config keys const ( - peerDialTimeoutSeconds = 3 // TODO make this configurable - handshakeTimeoutSeconds = 20 // TODO make this configurable - maxNumPeers = 50 // TODO make this configurable + dialTimeoutKey = "p2p_dial_timeout_seconds" + handshakeTimeoutKey = "p2p_handshake_timeout_seconds" + maxNumPeersKey = "p2p_max_num_peers" ) func NewSwitch() *Switch { @@ -194,7 +195,7 @@ func (sw *Switch) OnStop() { // CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { // Set deadline for handshake so we don't block forever on conn.ReadFull - conn.SetDeadline(time.Now().Add(handshakeTimeoutSeconds * time.Second)) + conn.SetDeadline(time.Now().Add(time.Duration(config.GetInt(handshakeTimeoutKey)) * time.Second)) // First, encrypt the connection. sconn, err := MakeSecretConnection(conn, sw.nodePrivKey) @@ -279,7 +280,7 @@ func (sw *Switch) dialSeed(addr *NetAddress) { func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { log.Info("Dialing address", "address", addr) sw.dialing.Set(addr.IP.String(), addr) - conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second) + conn, err := addr.DialTimeout(time.Duration(config.GetInt(dialTimeoutKey)) * time.Second) sw.dialing.Delete(addr.IP.String()) if err != nil { log.Info("Failed dialing address", "address", addr, "error", err) @@ -370,8 +371,9 @@ func (sw *Switch) listenerRoutine(l Listener) { } // ignore connection if we already have enough - if maxNumPeers <= sw.peers.Size() { - log.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxNumPeers) + maxPeers := config.GetInt(maxNumPeersKey) + if maxPeers <= sw.peers.Size() { + log.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers) continue } diff --git a/version.go b/version.go index 7e51463dc..335843c95 100644 --- a/version.go +++ b/version.go @@ -1,3 +1,3 @@ package p2p -const Version = "0.3.0" +const Version = "0.3.1" // configurable params