Browse Source

add p2p flush throttle to config

pull/587/head
Ethan Buchman 7 years ago
parent
commit
3c10f7a122
5 changed files with 43 additions and 20 deletions
  1. +1
    -0
      CHANGELOG.md
  2. +30
    -12
      config/config.go
  3. +6
    -3
      p2p/connection.go
  4. +4
    -4
      p2p/peer.go
  5. +2
    -1
      p2p/switch.go

+ 1
- 0
CHANGELOG.md View File

@ -4,6 +4,7 @@
FEATURES: FEATURES:
- Add consensus reactor sleep durations to the config - Add consensus reactor sleep durations to the config
- Add p2p flush throttle timeout to the config
## 0.10.1 (June 28, 2017) ## 0.10.1 (June 28, 2017)


+ 30
- 12
config/config.go View File

@ -199,23 +199,41 @@ func TestRPCConfig() *RPCConfig {
// P2PConfig defines the configuration options for the Tendermint peer-to-peer networking layer // P2PConfig defines the configuration options for the Tendermint peer-to-peer networking layer
type P2PConfig struct { type P2PConfig struct {
RootDir string `mapstructure:"home"`
ListenAddress string `mapstructure:"laddr"`
Seeds string `mapstructure:"seeds"`
SkipUPNP bool `mapstructure:"skip_upnp"`
AddrBook string `mapstructure:"addr_book_file"`
AddrBookStrict bool `mapstructure:"addr_book_strict"`
PexReactor bool `mapstructure:"pex"`
MaxNumPeers int `mapstructure:"max_num_peers"`
RootDir string `mapstructure:"home"`
// Address to listen for incoming connections
ListenAddress string `mapstructure:"laddr"`
// Comma separated list of seed nodes to connect to
Seeds string `mapstructure:"seeds"`
// Skip UPNP port forwarding
SkipUPNP bool `mapstructure:"skip_upnp"`
// Path to address book
AddrBook string `mapstructure:"addr_book_file"`
// Set true for strict address routability rules
AddrBookStrict bool `mapstructure:"addr_book_strict"`
// Set true to enable the peer-exchange reactor
PexReactor bool `mapstructure:"pex"`
// Maximum number of peers to connect to
MaxNumPeers int `mapstructure:"max_num_peers"`
// Time to wait before flushing messages out on the connection. In ms
FlushThrottleTimeout int `mapstructure:"flush_throttle_timeout"`
} }
// DefaultP2PConfig returns a default configuration for the peer-to-peer layer // DefaultP2PConfig returns a default configuration for the peer-to-peer layer
func DefaultP2PConfig() *P2PConfig { func DefaultP2PConfig() *P2PConfig {
return &P2PConfig{ return &P2PConfig{
ListenAddress: "tcp://0.0.0.0:46656",
AddrBook: "addrbook.json",
AddrBookStrict: true,
MaxNumPeers: 50,
ListenAddress: "tcp://0.0.0.0:46656",
AddrBook: "addrbook.json",
AddrBookStrict: true,
MaxNumPeers: 50,
FlushThrottleTimeout: 100,
} }
} }


+ 6
- 3
p2p/connection.go View File

@ -89,13 +89,16 @@ type MConnection struct {
type MConnConfig struct { type MConnConfig struct {
SendRate int64 `mapstructure:"send_rate"` SendRate int64 `mapstructure:"send_rate"`
RecvRate int64 `mapstructure:"recv_rate"` RecvRate int64 `mapstructure:"recv_rate"`
flushThrottle time.Duration
} }
// DefaultMConnConfig returns the default config. // DefaultMConnConfig returns the default config.
func DefaultMConnConfig() *MConnConfig { func DefaultMConnConfig() *MConnConfig {
return &MConnConfig{ return &MConnConfig{
SendRate: defaultSendRate,
RecvRate: defaultRecvRate,
SendRate: defaultSendRate,
RecvRate: defaultRecvRate,
flushThrottle: flushThrottle,
} }
} }
@ -148,7 +151,7 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec
func (c *MConnection) OnStart() error { func (c *MConnection) OnStart() error {
c.BaseService.OnStart() c.BaseService.OnStart()
c.quit = make(chan struct{}) c.quit = make(chan struct{})
c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle)
c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout) c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
go c.sendRoutine() go c.sendRoutine()


+ 4
- 4
p2p/peer.go View File

@ -59,9 +59,9 @@ func DefaultPeerConfig() *PeerConfig {
} }
} }
func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
/*func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig())
}
}*/
func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
conn, err := dial(addr, config) conn, err := dial(addr, config)
@ -77,9 +77,9 @@ func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor,
return peer, nil return peer, nil
} }
func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
/*func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig())
}
}*/
func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)


+ 2
- 1
p2p/switch.go View File

@ -90,6 +90,7 @@ func NewSwitch(config *cfg.P2PConfig) *Switch {
dialing: cmn.NewCMap(), dialing: cmn.NewCMap(),
nodeInfo: nil, nodeInfo: nil,
} }
sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond // TODO: collapse the peerConfig into the config ?
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
return sw return sw
} }
@ -547,7 +548,7 @@ func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch f
} }
func (sw *Switch) addPeerWithConnection(conn net.Conn) error { func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
if err != nil { if err != nil {
conn.Close() conn.Close()
return err return err


Loading…
Cancel
Save