From 3c10f7a122cf5856c0cf230ad18779c504121d5d Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 7 Jul 2017 13:08:52 -0400 Subject: [PATCH] add p2p flush throttle to config --- CHANGELOG.md | 1 + config/config.go | 42 ++++++++++++++++++++++++++++++------------ p2p/connection.go | 9 ++++++--- p2p/peer.go | 8 ++++---- p2p/switch.go | 3 ++- 5 files changed, 43 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1dbf7a70a..6d9ee4b4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ FEATURES: - Add consensus reactor sleep durations to the config +- Add p2p flush throttle timeout to the config ## 0.10.1 (June 28, 2017) diff --git a/config/config.go b/config/config.go index 567084421..e552b33b4 100644 --- a/config/config.go +++ b/config/config.go @@ -199,23 +199,41 @@ func TestRPCConfig() *RPCConfig { // P2PConfig defines the configuration options for the Tendermint peer-to-peer networking layer type P2PConfig struct { - RootDir string `mapstructure:"home"` - ListenAddress string `mapstructure:"laddr"` - Seeds string `mapstructure:"seeds"` - SkipUPNP bool `mapstructure:"skip_upnp"` - AddrBook string `mapstructure:"addr_book_file"` - AddrBookStrict bool `mapstructure:"addr_book_strict"` - PexReactor bool `mapstructure:"pex"` - MaxNumPeers int `mapstructure:"max_num_peers"` + RootDir string `mapstructure:"home"` + + // Address to listen for incoming connections + ListenAddress string `mapstructure:"laddr"` + + // Comma separated list of seed nodes to connect to + Seeds string `mapstructure:"seeds"` + + // Skip UPNP port forwarding + SkipUPNP bool `mapstructure:"skip_upnp"` + + // Path to address book + AddrBook string `mapstructure:"addr_book_file"` + + // Set true for strict address routability rules + AddrBookStrict bool `mapstructure:"addr_book_strict"` + + // Set true to enable the peer-exchange reactor + PexReactor bool `mapstructure:"pex"` + + // Maximum number of peers to connect to + MaxNumPeers int `mapstructure:"max_num_peers"` + + // Time to wait before flushing messages out on the connection. In ms + FlushThrottleTimeout int `mapstructure:"flush_throttle_timeout"` } // DefaultP2PConfig returns a default configuration for the peer-to-peer layer func DefaultP2PConfig() *P2PConfig { return &P2PConfig{ - ListenAddress: "tcp://0.0.0.0:46656", - AddrBook: "addrbook.json", - AddrBookStrict: true, - MaxNumPeers: 50, + ListenAddress: "tcp://0.0.0.0:46656", + AddrBook: "addrbook.json", + AddrBookStrict: true, + MaxNumPeers: 50, + FlushThrottleTimeout: 100, } } diff --git a/p2p/connection.go b/p2p/connection.go index 67c9d98f9..ce165a8d6 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -89,13 +89,16 @@ type MConnection struct { type MConnConfig struct { SendRate int64 `mapstructure:"send_rate"` RecvRate int64 `mapstructure:"recv_rate"` + + flushThrottle time.Duration } // DefaultMConnConfig returns the default config. func DefaultMConnConfig() *MConnConfig { return &MConnConfig{ - SendRate: defaultSendRate, - RecvRate: defaultRecvRate, + SendRate: defaultSendRate, + RecvRate: defaultRecvRate, + flushThrottle: flushThrottle, } } @@ -148,7 +151,7 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec func (c *MConnection) OnStart() error { c.BaseService.OnStart() c.quit = make(chan struct{}) - c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle) + c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState) go c.sendRoutine() diff --git a/p2p/peer.go b/p2p/peer.go index 2602206c1..58678b3dc 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -59,9 +59,9 @@ func DefaultPeerConfig() *PeerConfig { } } -func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { +/*func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) -} +}*/ func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { conn, err := dial(addr, config) @@ -77,9 +77,9 @@ func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, return peer, nil } -func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { +/*func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) -} +}*/ func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) diff --git a/p2p/switch.go b/p2p/switch.go index 2d8d34357..547db4bde 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -90,6 +90,7 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { dialing: cmn.NewCMap(), nodeInfo: nil, } + sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond // TODO: collapse the peerConfig into the config ? sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) return sw } @@ -547,7 +548,7 @@ func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch f } func (sw *Switch) addPeerWithConnection(conn net.Conn) error { - peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) + peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) if err != nil { conn.Close() return err