diff --git a/config.go b/config.go index 025b55323..0779bb4c8 100644 --- a/config.go +++ b/config.go @@ -4,31 +4,26 @@ import ( cfg "github.com/tendermint/go-config" ) -var config cfg.Config = nil - -func init() { - initConfigureable(dialTimeoutKey, 3) - initConfigureable(handshakeTimeoutKey, 20) - initConfigureable(maxNumPeersKey, 50) - initConfigureable(sendRateKey, 512000) // 500KB/s - initConfigureable(recvRateKey, 512000) // 500KB/s - initConfigureable(maxPayloadSizeKey, 1024) - - initConfigureable(authEncKey, true) - - cfg.OnConfig(func(newConfig cfg.Config) { - config = newConfig - - // fill in any config values that might be missing - for key, value := range defaultConfigValues { - config.SetDefault(key, value) - } - }) -} +const ( + // Switch config keys + configKeyDialTimeoutSeconds = "p2p_dial_timeout_seconds" + configKeyHandshakeTimeoutSeconds = "p2p_handshake_timeout_seconds" + configKeyMaxNumPeers = "p2p_max_num_peers" + configKeyAuthEnc = "p2p_authenticated_encryption" + + // MConnection config keys + configKeySendRate = "p2p_send_rate" + configKeyRecvRate = "p2p_recv_rate" +) -// default config map -var defaultConfigValues = make(map[string]interface{}) +func setConfigDefaults(config cfg.Config) { + // Switch default config + config.SetDefault(configKeyDialTimeoutSeconds, 3) + config.SetDefault(configKeyHandshakeTimeoutSeconds, 20) + config.SetDefault(configKeyMaxNumPeers, 50) + config.SetDefault(configKeyAuthEnc, true) -func initConfigureable(key string, value interface{}) { - defaultConfigValues[key] = value + // MConnection default config + config.SetDefault(configKeySendRate, 512000) // 500KB/s + config.SetDefault(configKeyRecvRate, 512000) // 500KB/s } diff --git a/connection.go b/connection.go index 7c77edf58..aa707b97c 100644 --- a/connection.go +++ b/connection.go @@ -12,30 +12,25 @@ import ( flow "github.com/tendermint/flowcontrol" . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" "github.com/tendermint/go-wire" //"github.com/tendermint/log15" ) const ( - numBatchMsgPackets = 10 - minReadBufferSize = 1024 - minWriteBufferSize = 65536 - idleTimeoutMinutes = 5 - updateStatsSeconds = 2 - pingTimeoutSeconds = 40 - flushThrottleMS = 100 + numBatchMsgPackets = 10 + minReadBufferSize = 1024 + minWriteBufferSize = 65536 + idleTimeoutMinutes = 5 + updateStatsSeconds = 2 + pingTimeoutSeconds = 40 + flushThrottleMS = 100 + defaultSendQueueCapacity = 1 defaultRecvBufferCapacity = 4096 defaultRecvMessageCapacity = 22020096 // 21MB defaultSendTimeoutSeconds = 10 ) -// config keys -const ( - sendRateKey = "p2p_send_rate" - recvRateKey = "p2p_recv_rate" - maxPayloadSizeKey = "p2p_max_msg_packet_payload_size" -) - type receiveCbFunc func(chID byte, msgBytes []byte) type errorCbFunc func(interface{}) @@ -91,7 +86,8 @@ type MConnection struct { RemoteAddress *NetAddress } -func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection { +func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection { + setConfigDefaults(config) mconn := &MConnection{ conn: conn, @@ -99,8 +95,8 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize), sendMonitor: flow.New(0, 0), recvMonitor: flow.New(0, 0), - sendRate: int64(config.GetInt(sendRateKey)), - recvRate: int64(config.GetInt(recvRateKey)), + sendRate: int64(config.GetInt(configKeySendRate)), + recvRate: int64(config.GetInt(configKeyRecvRate)), send: make(chan struct{}, 1), pong: make(chan struct{}), onReceive: onReceive, @@ -319,7 +315,7 @@ func (c *MConnection) sendSomeMsgPackets() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. - c.sendMonitor.Limit(maxMsgPacketTotalSize(), atomic.LoadInt64(&c.sendRate), true) + c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.sendRate), true) // Now send some msgPackets. for i := 0; i < numBatchMsgPackets; i++ { @@ -377,7 +373,7 @@ func (c *MConnection) recvRoutine() { FOR_LOOP: for { // Block until .recvMonitor says we can read. - c.recvMonitor.Limit(maxMsgPacketTotalSize(), atomic.LoadInt64(&c.recvRate), true) + c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.recvRate), true) /* // Peek into bufReader for debugging @@ -418,7 +414,7 @@ FOR_LOOP: log.Debug("Receive Pong") case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) - wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize(), &n, &err) + wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err) c.recvMonitor.Update(int(n)) if err != nil { if c.IsRunning() { @@ -598,15 +594,14 @@ func (ch *Channel) isSendPending() bool { func (ch *Channel) nextMsgPacket() msgPacket { packet := msgPacket{} packet.ChannelID = byte(ch.id) - maxPayloadSize := config.GetInt(maxPayloadSizeKey) - packet.Bytes = ch.sending[:MinInt(maxPayloadSize, len(ch.sending))] - if len(ch.sending) <= maxPayloadSize { + packet.Bytes = ch.sending[:MinInt(maxMsgPacketPayloadSize, len(ch.sending))] + if len(ch.sending) <= maxMsgPacketPayloadSize { packet.EOF = byte(0x01) ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize } else { packet.EOF = byte(0x00) - ch.sending = ch.sending[MinInt(maxPayloadSize, len(ch.sending)):] + ch.sending = ch.sending[MinInt(maxMsgPacketPayloadSize, len(ch.sending)):] } return packet } @@ -654,12 +649,10 @@ func (ch *Channel) updateStats() { //----------------------------------------------------------------------------- -func maxMsgPacketTotalSize() int { - return config.GetInt(maxPayloadSizeKey) + maxMsgPacketOverheadSize -} - const ( + maxMsgPacketPayloadSize = 1024 maxMsgPacketOverheadSize = 10 // It's actually lower but good enough + maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize packetTypePing = byte(0x01) packetTypePong = byte(0x02) packetTypeMsg = byte(0x03) diff --git a/peer.go b/peer.go index a0f153fe3..7a2d647f8 100644 --- a/peer.go +++ b/peer.go @@ -6,6 +6,7 @@ import ( "net" . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" "github.com/tendermint/go-wire" ) @@ -47,7 +48,7 @@ func peerHandshake(conn net.Conn, ourNodeInfo *NodeInfo) (*NodeInfo, error) { } // NOTE: call peerHandshake on conn before calling newPeer(). -func newPeer(conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { +func newPeer(config cfg.Config, conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { var p *Peer onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] @@ -60,7 +61,7 @@ func newPeer(conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh p.Stop() onPeerError(p, r) } - mconn := NewMConnection(conn, chDescs, onReceive, onError) + mconn := NewMConnection(config, conn, chDescs, onReceive, onError) p = &Peer{ outbound: outbound, mconn: mconn, diff --git a/switch.go b/switch.go index b59c6b126..5aa8b64ed 100644 --- a/switch.go +++ b/switch.go @@ -8,6 +8,7 @@ import ( "time" . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" "github.com/tendermint/go-crypto" "github.com/tendermint/log15" ) @@ -55,6 +56,7 @@ incoming messages are received on the reactor. type Switch struct { BaseService + config cfg.Config listeners []Listener reactors map[string]Reactor chDescs []*ChannelDescriptor @@ -70,16 +72,11 @@ var ( ErrSwitchMaxPeersPerIPRange = errors.New("IP range has too many peers") ) -// config keys -const ( - dialTimeoutKey = "p2p_dial_timeout_seconds" - handshakeTimeoutKey = "p2p_handshake_timeout_seconds" - maxNumPeersKey = "p2p_max_num_peers" - authEncKey = "p2p_authenticated_encryption" -) +func NewSwitch(config cfg.Config) *Switch { + setConfigDefaults(config) -func NewSwitch() *Switch { sw := &Switch{ + config: config, reactors: make(map[string]Reactor), chDescs: make([]*ChannelDescriptor, 0), reactorsByCh: make(map[byte]Reactor), @@ -196,11 +193,12 @@ func (sw *Switch) OnStop() { // CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { // Set deadline for handshake so we don't block forever on conn.ReadFull - conn.SetDeadline(time.Now().Add(time.Duration(config.GetInt(handshakeTimeoutKey)) * time.Second)) + conn.SetDeadline(time.Now().Add( + time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second)) // First, encrypt the connection. var sconn net.Conn = conn - if config.GetBool(authEncKey) { + if sw.config.GetBool(configKeyAuthEnc) { var err error sconn, err = MakeSecretConnection(conn, sw.nodePrivKey) if err != nil { @@ -214,7 +212,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er sconn.Close() return nil, err } - if config.GetBool("p2p_authenticated_encryption") { + if sw.config.GetBool(configKeyAuthEnc) { // Check that the professed PubKey matches the sconn's. if !peerNodeInfo.PubKey.Equals(sconn.(*SecretConnection).RemotePubKey()) { sconn.Close() @@ -233,7 +231,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er return nil, err } - peer := newPeer(sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) + peer := newPeer(sw.config, sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) // Add the peer to .peers // ignore if duplicate or if we already have too many for that IP range @@ -287,7 +285,8 @@ func (sw *Switch) dialSeed(addr *NetAddress) { func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { log.Info("Dialing address", "address", addr) sw.dialing.Set(addr.IP.String(), addr) - conn, err := addr.DialTimeout(time.Duration(config.GetInt(dialTimeoutKey)) * time.Second) + conn, err := addr.DialTimeout(time.Duration( + sw.config.GetInt(configKeyDialTimeoutSeconds)) * time.Second) sw.dialing.Delete(addr.IP.String()) if err != nil { log.Info("Failed dialing address", "address", addr, "error", err) @@ -378,7 +377,7 @@ func (sw *Switch) listenerRoutine(l Listener) { } // ignore connection if we already have enough - maxPeers := config.GetInt(maxNumPeersKey) + maxPeers := sw.config.GetInt(configKeyMaxNumPeers) if maxPeers <= sw.peers.Size() { log.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers) continue