From 47b28fd6aa5dede9ccc31fee6fdc0b8aadc608c4 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Wed, 14 Apr 2021 09:35:43 -0400 Subject: [PATCH] p2p: minor cleanup + update router options (#6353) --- config/config.go | 42 +++++++++++++++++++--------- config/toml.go | 14 +++++++++- docs/nodes/configuration.md | 6 ++++ node/node.go | 48 +++++++++++++++++++++++--------- p2p/router.go | 30 ++++++++++---------- test/e2e/generator/generate.go | 19 +++++++------ test/e2e/networks/ci.toml | 50 +++++++++++++++++----------------- test/e2e/pkg/manifest.go | 7 ++--- test/e2e/pkg/testnet.go | 5 ++-- test/e2e/runner/setup.go | 2 +- 10 files changed, 140 insertions(+), 83 deletions(-) diff --git a/config/config.go b/config/config.go index 0679ce82f..4a3968c8c 100644 --- a/config/config.go +++ b/config/config.go @@ -567,11 +567,25 @@ type P2PConfig struct { //nolint: maligned AddrBookStrict bool `mapstructure:"addr-book-strict"` // Maximum number of inbound peers + // + // TODO: Remove once p2p refactor is complete in favor of MaxConnections. + // ref: https://github.com/tendermint/tendermint/issues/5670 MaxNumInboundPeers int `mapstructure:"max-num-inbound-peers"` - // Maximum number of outbound peers to connect to, excluding persistent peers + // Maximum number of outbound peers to connect to, excluding persistent peers. + // + // TODO: Remove once p2p refactor is complete in favor of MaxConnections. + // ref: https://github.com/tendermint/tendermint/issues/5670 MaxNumOutboundPeers int `mapstructure:"max-num-outbound-peers"` + // MaxConnections defines the maximum number of connected peers (inbound and + // outbound). + MaxConnections uint16 `mapstructure:"max-connections"` + + // MaxIncomingConnectionAttempts rate limits the number of incoming connection + // attempts per IP address. + MaxIncomingConnectionAttempts uint `mapstructure:"max-incoming-connection-attempts"` + // List of node IDs, to which a connection will be (re)established ignoring any existing limits UnconditionalPeerIDs string `mapstructure:"unconditional-peer-ids"` @@ -608,9 +622,9 @@ type P2PConfig struct { //nolint: maligned // Force dial to fail TestDialFail bool `mapstructure:"test-dial-fail"` - // Mostly for testing, use rather than environment variables - // to turn on the new P2P stack. - UseNewP2P bool `mapstructure:"use-new-p2p"` + // DisableLegacy is used mostly for testing to enable or disable the legacy + // P2P stack. + DisableLegacy bool `mapstructure:"disable-legacy"` // Makes it possible to configure which queue backend the p2p // layer uses. Options are: "fifo", "priority" and "wdrr", @@ -621,15 +635,17 @@ type P2PConfig struct { //nolint: maligned // DefaultP2PConfig returns a default configuration for the peer-to-peer layer func DefaultP2PConfig() *P2PConfig { return &P2PConfig{ - ListenAddress: "tcp://0.0.0.0:26656", - ExternalAddress: "", - UPNP: false, - AddrBook: defaultAddrBookPath, - AddrBookStrict: true, - MaxNumInboundPeers: 40, - MaxNumOutboundPeers: 10, - PersistentPeersMaxDialPeriod: 0 * time.Second, - FlushThrottleTimeout: 100 * time.Millisecond, + ListenAddress: "tcp://0.0.0.0:26656", + ExternalAddress: "", + UPNP: false, + AddrBook: defaultAddrBookPath, + AddrBookStrict: true, + MaxNumInboundPeers: 40, + MaxNumOutboundPeers: 10, + MaxConnections: 64, + MaxIncomingConnectionAttempts: 100, + PersistentPeersMaxDialPeriod: 0 * time.Second, + FlushThrottleTimeout: 100 * time.Millisecond, // The MTU (Maximum Transmission Unit) for Ethernet is 1500 bytes. // The IP header and the TCP header take up 20 bytes each at least (unless // optional header fields are used) and thus the max for (non-Jumbo frame) diff --git a/config/toml.go b/config/toml.go index 4e8e0d0ea..eeee75242 100644 --- a/config/toml.go +++ b/config/toml.go @@ -262,7 +262,7 @@ pprof-laddr = "{{ .RPC.PprofListenAddress }}" [p2p] # Enable the new p2p layer. -use-new-p2p = {{ .P2P.UseNewP2P }} +disable-legacy = {{ .P2P.DisableLegacy }} # Select the p2p internal queue queue-type = "{{ .P2P.QueueType }}" @@ -293,11 +293,23 @@ addr-book-file = "{{ js .P2P.AddrBook }}" addr-book-strict = {{ .P2P.AddrBookStrict }} # Maximum number of inbound peers +# +# TODO: Remove once p2p refactor is complete in favor of MaxConnections. +# ref: https://github.com/tendermint/tendermint/issues/5670 max-num-inbound-peers = {{ .P2P.MaxNumInboundPeers }} # Maximum number of outbound peers to connect to, excluding persistent peers +# +# TODO: Remove once p2p refactor is complete in favor of MaxConnections. +# ref: https://github.com/tendermint/tendermint/issues/5670 max-num-outbound-peers = {{ .P2P.MaxNumOutboundPeers }} +# Maximum number of connections (inbound and outbound). +max-connections = {{ .P2P.MaxConnections }} + +# Rate limits the number of incoming connection attempts per IP address. +max-incoming-connection-attempts = {{ .P2P.MaxIncomingConnectionAttempts }} + # List of node IDs, to which a connection will be (re)established ignoring any existing limits unconditional-peer-ids = "{{ .P2P.UnconditionalPeerIDs }}" diff --git a/docs/nodes/configuration.md b/docs/nodes/configuration.md index 35e639754..1b665bc39 100644 --- a/docs/nodes/configuration.md +++ b/docs/nodes/configuration.md @@ -233,6 +233,12 @@ max-num-inbound-peers = 40 # Maximum number of outbound peers to connect to, excluding persistent peers max-num-outbound-peers = 10 +# Maximum number of connections (inbound and outbound). +max-connections = 64 + +# Rate limits the number of incoming connection attempts per IP address. +max-incoming-connection-attempts = 100 + # List of node IDs, to which a connection will be (re)established ignoring any existing limits unconditional-peer-ids = "" diff --git a/node/node.go b/node/node.go index a4ea8470e..7d8555a25 100644 --- a/node/node.go +++ b/node/node.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "math" "net" "net/http" _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port @@ -387,7 +388,7 @@ func createMempoolReactor( peerUpdates *p2p.PeerUpdates ) - if config.P2P.UseNewP2P { + if config.P2P.DisableLegacy { channels = makeChannelsFromShims(router, channelShims) peerUpdates = peerManager.Subscribe() } else { @@ -438,7 +439,7 @@ func createEvidenceReactor( peerUpdates *p2p.PeerUpdates ) - if config.P2P.UseNewP2P { + if config.P2P.DisableLegacy { channels = makeChannelsFromShims(router, evidence.ChannelShims) peerUpdates = peerManager.Subscribe() } else { @@ -479,7 +480,7 @@ func createBlockchainReactor( peerUpdates *p2p.PeerUpdates ) - if config.P2P.UseNewP2P { + if config.P2P.DisableLegacy { channels = makeChannelsFromShims(router, bcv0.ChannelShims) peerUpdates = peerManager.Subscribe() } else { @@ -545,7 +546,7 @@ func createConsensusReactor( peerUpdates *p2p.PeerUpdates ) - if config.P2P.UseNewP2P { + if config.P2P.DisableLegacy { channels = makeChannelsFromShims(router, cs.ChannelShims) peerUpdates = peerManager.Subscribe() } else { @@ -584,8 +585,30 @@ func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport } func createPeerManager(config *cfg.Config, p2pLogger log.Logger, nodeID p2p.NodeID) (*p2p.PeerManager, error) { + var maxConns uint16 + switch { + case config.P2P.MaxConnections > 0: + maxConns = config.P2P.MaxConnections + + case config.P2P.MaxNumInboundPeers > 0 && config.P2P.MaxNumOutboundPeers > 0: + x := config.P2P.MaxNumInboundPeers + config.P2P.MaxNumOutboundPeers + if x > math.MaxUint16 { + return nil, fmt.Errorf( + "max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)", + config.P2P.MaxNumInboundPeers, + config.P2P.MaxNumOutboundPeers, + math.MaxUint16, + ) + } + + maxConns = uint16(x) + + default: + maxConns = 64 + } + options := p2p.PeerManagerOptions{ - MaxConnected: 64, + MaxConnected: maxConns, MaxConnectedUpgrade: 4, MaxPeers: 1000, MinRetryTime: 100 * time.Millisecond, @@ -1138,7 +1161,7 @@ func NewNode(config *cfg.Config, stateSyncReactorShim = p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims) - if config.P2P.UseNewP2P { + if config.P2P.DisableLegacy { channels = makeChannelsFromShims(router, statesync.ChannelShims) peerUpdates = peerManager.Subscribe() } else { @@ -1305,9 +1328,9 @@ func (n *Node) OnStart() error { n.isListening = true - n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.UseNewP2P) + n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.DisableLegacy) - if n.config.P2P.UseNewP2P { + if n.config.P2P.DisableLegacy { err = n.router.Start() } else { err = n.sw.Start() @@ -1345,7 +1368,7 @@ func (n *Node) OnStart() error { } } - if n.config.P2P.UseNewP2P && n.pexReactorV2 != nil { + if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil { if err := n.pexReactorV2.Start(); err != nil { return err } @@ -1388,7 +1411,6 @@ func (n *Node) OnStop() { } if n.config.Mode != cfg.ModeSeed { - // now stop the reactors if n.config.FastSync.Version == "v0" { // Stop the real blockchain reactor separately since the switch uses the shim. @@ -1418,13 +1440,13 @@ func (n *Node) OnStop() { } } - if n.config.P2P.UseNewP2P && n.pexReactorV2 != nil { + if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil { if err := n.pexReactorV2.Stop(); err != nil { n.Logger.Error("failed to stop the PEX v2 reactor", "err", err) } } - if n.config.P2P.UseNewP2P { + if n.config.P2P.DisableLegacy { if err := n.router.Stop(); err != nil { n.Logger.Error("failed to stop router", "err", err) } @@ -1960,7 +1982,7 @@ func getRouterConfig(conf *cfg.Config, proxyApp proxy.AppConns) p2p.RouterOption } if conf.P2P.MaxNumInboundPeers > 0 { - opts.MaxIncommingConnectionsPerIP = uint(conf.P2P.MaxNumInboundPeers) + opts.MaxIncomingConnectionAttempts = conf.P2P.MaxIncomingConnectionAttempts } if conf.FilterPeers && proxyApp != nil { diff --git a/p2p/router.go b/p2p/router.go index ffab0d502..afe696426 100644 --- a/p2p/router.go +++ b/p2p/router.go @@ -128,13 +128,13 @@ type RouterOptions struct { // no timeout. HandshakeTimeout time.Duration - // QueueType must be "wdrr" (Weighed Deficit Round Robin), - // "priority", or FIFO. Defaults to FIFO. + // QueueType must be "wdrr" (Weighed Deficit Round Robin), "priority", or + // "fifo". Defaults to "fifo". QueueType string - // MaxIncommingConnectionsPerIP limits the number of incoming - // connections per IP address. Defaults to 100. - MaxIncommingConnectionsPerIP uint + // MaxIncomingConnectionAttempts rate limits the number of incoming connection + // attempts per IP address. Defaults to 100. + MaxIncomingConnectionAttempts uint // IncomingConnectionWindow describes how often an IP address // can attempt to create a new connection. Defaults to 10 @@ -182,8 +182,8 @@ func (o *RouterOptions) Validate() error { o.IncomingConnectionWindow) } - if o.MaxIncommingConnectionsPerIP == 0 { - o.MaxIncommingConnectionsPerIP = 100 + if o.MaxIncomingConnectionAttempts == 0 { + o.MaxIncomingConnectionAttempts = 100 } return nil @@ -278,8 +278,9 @@ func NewRouter( nodeInfo: nodeInfo, privKey: privKey, connTracker: newConnTracker( - options.MaxIncommingConnectionsPerIP, - options.IncomingConnectionWindow), + options.MaxIncomingConnectionAttempts, + options.IncomingConnectionWindow, + ), chDescs: make([]ChannelDescriptor, 0), transports: transports, protocolTransports: map[Protocol]Transport{}, @@ -526,7 +527,8 @@ func (r *Router) acceptPeers(transport Transport) { r.logger.Debug("rate limiting incoming peer", "err", err, "ip", incomingIP.String(), - "closeErr", closeErr) + "close_err", closeErr, + ) return } @@ -545,9 +547,7 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) { incomingIP := re.IP if err := r.filterPeersIP(ctx, incomingIP, re.Port); err != nil { - r.logger.Debug("peer filtered by IP", - "ip", incomingIP.String(), - "err", err) + r.logger.Debug("peer filtered by IP", "ip", incomingIP.String(), "err", err) return } @@ -576,9 +576,7 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) { } if err := r.filterPeersID(ctx, peerInfo.NodeID); err != nil { - r.logger.Debug("peer filtered by node ID", - "node", peerInfo.NodeID, - "err", err) + r.logger.Debug("peer filtered by node ID", "node", peerInfo.NodeID, "err", err) return } diff --git a/test/e2e/generator/generate.go b/test/e2e/generator/generate.go index 2a1b905bb..2d84fab69 100644 --- a/test/e2e/generator/generate.go +++ b/test/e2e/generator/generate.go @@ -78,9 +78,9 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er switch p2pInfo := opt["useNewP2P"].(type) { case bool: - manifest.UseNewP2P = p2pInfo + manifest.DisableLegacyP2P = p2pInfo case int: - manifest.UseNewP2P = false + manifest.DisableLegacyP2P = false p2pNodeFactor = p2pInfo } @@ -104,11 +104,13 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er for i := 1; i <= numSeeds; i++ { node := generateNode(r, e2e.ModeSeed, 0, manifest.InitialHeight, false) node.QueueType = manifest.QueueType + if p2pNodeFactor == 0 { - node.UseNewP2P = manifest.UseNewP2P + node.DisableLegacyP2P = manifest.DisableLegacyP2P } else if p2pNodeFactor%i == 0 { - node.UseNewP2P = !manifest.UseNewP2P + node.DisableLegacyP2P = !manifest.DisableLegacyP2P } + manifest.Nodes[fmt.Sprintf("seed%02d", i)] = node } @@ -129,10 +131,11 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er node.QueueType = manifest.QueueType if p2pNodeFactor == 0 { - node.UseNewP2P = manifest.UseNewP2P + node.DisableLegacyP2P = manifest.DisableLegacyP2P } else if p2pNodeFactor%i == 0 { - node.UseNewP2P = !manifest.UseNewP2P + node.DisableLegacyP2P = !manifest.DisableLegacyP2P } + manifest.Nodes[name] = node if startAt == 0 { @@ -164,9 +167,9 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er node := generateNode(r, e2e.ModeFull, startAt, manifest.InitialHeight, false) node.QueueType = manifest.QueueType if p2pNodeFactor == 0 { - node.UseNewP2P = manifest.UseNewP2P + node.DisableLegacyP2P = manifest.DisableLegacyP2P } else if p2pNodeFactor%i == 0 { - node.UseNewP2P = !manifest.UseNewP2P + node.DisableLegacyP2P = !manifest.DisableLegacyP2P } manifest.Nodes[fmt.Sprintf("full%02d", i)] = node } diff --git a/test/e2e/networks/ci.toml b/test/e2e/networks/ci.toml index fe0b61f7d..3509a399d 100644 --- a/test/e2e/networks/ci.toml +++ b/test/e2e/networks/ci.toml @@ -1,10 +1,10 @@ # This testnet is run by CI, and attempts to cover a broad range of # functionality with a single network. -initial_height = 1000 +disable_legacy_p2p = false evidence = 0 -initial_state = { initial01 = "a", initial02 = "b", initial03 = "c" } -use_new_p2p = false +initial_height = 1000 +initial_state = {initial01 = "a", initial02 = "b", initial03 = "c"} queue_type = "priority" [validators] @@ -29,71 +29,71 @@ validator05 = 50 [node.seed01] mode = "seed" -seeds = ["seed02"] perturb = ["restart"] +seeds = ["seed02"] [node.seed02] mode = "seed" seeds = ["seed01"] [node.validator01] +perturb = ["disconnect"] seeds = ["seed01"] snapshot_interval = 5 -perturb = ["disconnect"] [node.validator02] -seeds = ["seed02"] -database = "boltdb" abci_protocol = "tcp" -privval_protocol = "tcp" +database = "boltdb" persist_interval = 0 perturb = ["restart"] +privval_protocol = "tcp" +seeds = ["seed02"] [node.validator03] -seeds = ["seed01"] database = "badgerdb" +seeds = ["seed01"] # FIXME: should be grpc, disabled due to https://github.com/tendermint/tendermint/issues/5439 #abci_protocol = "grpc" -privval_protocol = "grpc" persist_interval = 3 -retain_blocks = 3 perturb = ["kill"] +privval_protocol = "grpc" +retain_blocks = 3 [node.validator04] -persistent_peers = ["validator01"] -database = "rocksdb" abci_protocol = "builtin" +database = "rocksdb" +persistent_peers = ["validator01"] perturb = ["pause"] [node.validator05] +database = "cleveldb" +fast_sync = "v0" +seeds = ["seed02"] start_at = 1005 # Becomes part of the validator set at 1010 -seeds = ["seed02"] -database = "cleveldb" -fast_sync = "v0" # FIXME: should be grpc, disabled due to https://github.com/tendermint/tendermint/issues/5439 #abci_protocol = "grpc" -privval_protocol = "tcp" perturb = ["kill", "pause", "disconnect", "restart"] +privval_protocol = "tcp" [node.full01] -start_at = 1010 mode = "full" +start_at = 1010 # FIXME: should be v2, disabled due to flake fast_sync = "v0" persistent_peers = ["validator01", "validator02", "validator03", "validator04", "validator05"] -retain_blocks = 3 perturb = ["restart"] +retain_blocks = 3 [node.full02] -start_at = 1015 mode = "full" +start_at = 1015 # FIXME: should be v2, disabled due to flake fast_sync = "v0" -state_sync = true -seeds = ["seed01"] perturb = ["restart"] +seeds = ["seed01"] +state_sync = true [node.light01] -mode= "light" -start_at= 1010 -persistent_peers = ["validator01", "validator02", "validator03"] \ No newline at end of file +mode = "light" +persistent_peers = ["validator01", "validator02", "validator03"] +start_at = 1010 diff --git a/test/e2e/pkg/manifest.go b/test/e2e/pkg/manifest.go index 27166b64e..6c85ee45b 100644 --- a/test/e2e/pkg/manifest.go +++ b/test/e2e/pkg/manifest.go @@ -59,9 +59,8 @@ type Manifest struct { // by individual nodes. LogLevel string `toml:"log_level"` - // UseNewP2P enables use of the new p2p layer for all nodes in - // a test. - UseNewP2P bool `toml:"use_new_p2p"` + // DisableLegacyP2P enables use of the new p2p layer for all nodes in a test. + DisableLegacyP2P bool `toml:"disable_legacy_p2p"` // QueueType describes the type of queue that the system uses internally QueueType string `toml:"queue_type"` @@ -143,7 +142,7 @@ type ManifestNode struct { LogLevel string `toml:"log_level"` // UseNewP2P enables use of the new p2p layer for this node. - UseNewP2P bool `toml:"use_new_p2p"` + DisableLegacyP2P bool `toml:"disable_legacy_p2p"` // QueueType describes the type of queue that the p2p layer // uses internally. diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index 7e760e8e7..7bb64d7a6 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -90,7 +90,7 @@ type Node struct { PersistentPeers []*Node Perturbations []Perturbation LogLevel string - UseNewP2P bool + DisableLegacyP2P bool QueueType string } @@ -169,9 +169,10 @@ func LoadTestnet(file string) (*Testnet, error) { RetainBlocks: nodeManifest.RetainBlocks, Perturbations: []Perturbation{}, LogLevel: manifest.LogLevel, - UseNewP2P: manifest.UseNewP2P, + DisableLegacyP2P: manifest.DisableLegacyP2P, QueueType: manifest.QueueType, } + if node.StartAt == testnet.InitialHeight { node.StartAt = 0 // normalize to 0 for initial nodes, since code expects this } diff --git a/test/e2e/runner/setup.go b/test/e2e/runner/setup.go index f4acd53f3..9147b4da3 100644 --- a/test/e2e/runner/setup.go +++ b/test/e2e/runner/setup.go @@ -239,7 +239,7 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { cfg.RPC.PprofListenAddress = ":6060" cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false)) cfg.P2P.AddrBookStrict = false - cfg.P2P.UseNewP2P = node.UseNewP2P + cfg.P2P.DisableLegacy = node.DisableLegacyP2P cfg.P2P.QueueType = node.QueueType cfg.DBBackend = node.Database cfg.StateSync.DiscoveryTime = 5 * time.Second