Browse Source

p2p: minor cleanup + update router options (#6353)

pull/6343/head
Aleksandr Bezobchuk 3 years ago
committed by GitHub
parent
commit
47b28fd6aa
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 140 additions and 83 deletions
  1. +29
    -13
      config/config.go
  2. +13
    -1
      config/toml.go
  3. +6
    -0
      docs/nodes/configuration.md
  4. +35
    -13
      node/node.go
  5. +14
    -16
      p2p/router.go
  6. +11
    -8
      test/e2e/generator/generate.go
  7. +25
    -25
      test/e2e/networks/ci.toml
  8. +3
    -4
      test/e2e/pkg/manifest.go
  9. +3
    -2
      test/e2e/pkg/testnet.go
  10. +1
    -1
      test/e2e/runner/setup.go

+ 29
- 13
config/config.go View File

@ -567,11 +567,25 @@ type P2PConfig struct { //nolint: maligned
AddrBookStrict bool `mapstructure:"addr-book-strict"`
// Maximum number of inbound peers
//
// TODO: Remove once p2p refactor is complete in favor of MaxConnections.
// ref: https://github.com/tendermint/tendermint/issues/5670
MaxNumInboundPeers int `mapstructure:"max-num-inbound-peers"`
// Maximum number of outbound peers to connect to, excluding persistent peers
// Maximum number of outbound peers to connect to, excluding persistent peers.
//
// TODO: Remove once p2p refactor is complete in favor of MaxConnections.
// ref: https://github.com/tendermint/tendermint/issues/5670
MaxNumOutboundPeers int `mapstructure:"max-num-outbound-peers"`
// MaxConnections defines the maximum number of connected peers (inbound and
// outbound).
MaxConnections uint16 `mapstructure:"max-connections"`
// MaxIncomingConnectionAttempts rate limits the number of incoming connection
// attempts per IP address.
MaxIncomingConnectionAttempts uint `mapstructure:"max-incoming-connection-attempts"`
// List of node IDs, to which a connection will be (re)established ignoring any existing limits
UnconditionalPeerIDs string `mapstructure:"unconditional-peer-ids"`
@ -608,9 +622,9 @@ type P2PConfig struct { //nolint: maligned
// Force dial to fail
TestDialFail bool `mapstructure:"test-dial-fail"`
// Mostly for testing, use rather than environment variables
// to turn on the new P2P stack.
UseNewP2P bool `mapstructure:"use-new-p2p"`
// DisableLegacy is used mostly for testing to enable or disable the legacy
// P2P stack.
DisableLegacy bool `mapstructure:"disable-legacy"`
// Makes it possible to configure which queue backend the p2p
// layer uses. Options are: "fifo", "priority" and "wdrr",
@ -621,15 +635,17 @@ type P2PConfig struct { //nolint: maligned
// DefaultP2PConfig returns a default configuration for the peer-to-peer layer
func DefaultP2PConfig() *P2PConfig {
return &P2PConfig{
ListenAddress: "tcp://0.0.0.0:26656",
ExternalAddress: "",
UPNP: false,
AddrBook: defaultAddrBookPath,
AddrBookStrict: true,
MaxNumInboundPeers: 40,
MaxNumOutboundPeers: 10,
PersistentPeersMaxDialPeriod: 0 * time.Second,
FlushThrottleTimeout: 100 * time.Millisecond,
ListenAddress: "tcp://0.0.0.0:26656",
ExternalAddress: "",
UPNP: false,
AddrBook: defaultAddrBookPath,
AddrBookStrict: true,
MaxNumInboundPeers: 40,
MaxNumOutboundPeers: 10,
MaxConnections: 64,
MaxIncomingConnectionAttempts: 100,
PersistentPeersMaxDialPeriod: 0 * time.Second,
FlushThrottleTimeout: 100 * time.Millisecond,
// The MTU (Maximum Transmission Unit) for Ethernet is 1500 bytes.
// The IP header and the TCP header take up 20 bytes each at least (unless
// optional header fields are used) and thus the max for (non-Jumbo frame)


+ 13
- 1
config/toml.go View File

@ -262,7 +262,7 @@ pprof-laddr = "{{ .RPC.PprofListenAddress }}"
[p2p]
# Enable the new p2p layer.
use-new-p2p = {{ .P2P.UseNewP2P }}
disable-legacy = {{ .P2P.DisableLegacy }}
# Select the p2p internal queue
queue-type = "{{ .P2P.QueueType }}"
@ -293,11 +293,23 @@ addr-book-file = "{{ js .P2P.AddrBook }}"
addr-book-strict = {{ .P2P.AddrBookStrict }}
# Maximum number of inbound peers
#
# TODO: Remove once p2p refactor is complete in favor of MaxConnections.
# ref: https://github.com/tendermint/tendermint/issues/5670
max-num-inbound-peers = {{ .P2P.MaxNumInboundPeers }}
# Maximum number of outbound peers to connect to, excluding persistent peers
#
# TODO: Remove once p2p refactor is complete in favor of MaxConnections.
# ref: https://github.com/tendermint/tendermint/issues/5670
max-num-outbound-peers = {{ .P2P.MaxNumOutboundPeers }}
# Maximum number of connections (inbound and outbound).
max-connections = {{ .P2P.MaxConnections }}
# Rate limits the number of incoming connection attempts per IP address.
max-incoming-connection-attempts = {{ .P2P.MaxIncomingConnectionAttempts }}
# List of node IDs, to which a connection will be (re)established ignoring any existing limits
unconditional-peer-ids = "{{ .P2P.UnconditionalPeerIDs }}"


+ 6
- 0
docs/nodes/configuration.md View File

@ -233,6 +233,12 @@ max-num-inbound-peers = 40
# Maximum number of outbound peers to connect to, excluding persistent peers
max-num-outbound-peers = 10
# Maximum number of connections (inbound and outbound).
max-connections = 64
# Rate limits the number of incoming connection attempts per IP address.
max-incoming-connection-attempts = 100
# List of node IDs, to which a connection will be (re)established ignoring any existing limits
unconditional-peer-ids = ""


+ 35
- 13
node/node.go View File

@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"math"
"net"
"net/http"
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
@ -387,7 +388,7 @@ func createMempoolReactor(
peerUpdates *p2p.PeerUpdates
)
if config.P2P.UseNewP2P {
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, channelShims)
peerUpdates = peerManager.Subscribe()
} else {
@ -438,7 +439,7 @@ func createEvidenceReactor(
peerUpdates *p2p.PeerUpdates
)
if config.P2P.UseNewP2P {
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, evidence.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
@ -479,7 +480,7 @@ func createBlockchainReactor(
peerUpdates *p2p.PeerUpdates
)
if config.P2P.UseNewP2P {
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, bcv0.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
@ -545,7 +546,7 @@ func createConsensusReactor(
peerUpdates *p2p.PeerUpdates
)
if config.P2P.UseNewP2P {
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, cs.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
@ -584,8 +585,30 @@ func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport
}
func createPeerManager(config *cfg.Config, p2pLogger log.Logger, nodeID p2p.NodeID) (*p2p.PeerManager, error) {
var maxConns uint16
switch {
case config.P2P.MaxConnections > 0:
maxConns = config.P2P.MaxConnections
case config.P2P.MaxNumInboundPeers > 0 && config.P2P.MaxNumOutboundPeers > 0:
x := config.P2P.MaxNumInboundPeers + config.P2P.MaxNumOutboundPeers
if x > math.MaxUint16 {
return nil, fmt.Errorf(
"max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)",
config.P2P.MaxNumInboundPeers,
config.P2P.MaxNumOutboundPeers,
math.MaxUint16,
)
}
maxConns = uint16(x)
default:
maxConns = 64
}
options := p2p.PeerManagerOptions{
MaxConnected: 64,
MaxConnected: maxConns,
MaxConnectedUpgrade: 4,
MaxPeers: 1000,
MinRetryTime: 100 * time.Millisecond,
@ -1138,7 +1161,7 @@ func NewNode(config *cfg.Config,
stateSyncReactorShim = p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims)
if config.P2P.UseNewP2P {
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, statesync.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
@ -1305,9 +1328,9 @@ func (n *Node) OnStart() error {
n.isListening = true
n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.UseNewP2P)
n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.DisableLegacy)
if n.config.P2P.UseNewP2P {
if n.config.P2P.DisableLegacy {
err = n.router.Start()
} else {
err = n.sw.Start()
@ -1345,7 +1368,7 @@ func (n *Node) OnStart() error {
}
}
if n.config.P2P.UseNewP2P && n.pexReactorV2 != nil {
if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil {
if err := n.pexReactorV2.Start(); err != nil {
return err
}
@ -1388,7 +1411,6 @@ func (n *Node) OnStop() {
}
if n.config.Mode != cfg.ModeSeed {
// now stop the reactors
if n.config.FastSync.Version == "v0" {
// Stop the real blockchain reactor separately since the switch uses the shim.
@ -1418,13 +1440,13 @@ func (n *Node) OnStop() {
}
}
if n.config.P2P.UseNewP2P && n.pexReactorV2 != nil {
if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil {
if err := n.pexReactorV2.Stop(); err != nil {
n.Logger.Error("failed to stop the PEX v2 reactor", "err", err)
}
}
if n.config.P2P.UseNewP2P {
if n.config.P2P.DisableLegacy {
if err := n.router.Stop(); err != nil {
n.Logger.Error("failed to stop router", "err", err)
}
@ -1960,7 +1982,7 @@ func getRouterConfig(conf *cfg.Config, proxyApp proxy.AppConns) p2p.RouterOption
}
if conf.P2P.MaxNumInboundPeers > 0 {
opts.MaxIncommingConnectionsPerIP = uint(conf.P2P.MaxNumInboundPeers)
opts.MaxIncomingConnectionAttempts = conf.P2P.MaxIncomingConnectionAttempts
}
if conf.FilterPeers && proxyApp != nil {


+ 14
- 16
p2p/router.go View File

@ -128,13 +128,13 @@ type RouterOptions struct {
// no timeout.
HandshakeTimeout time.Duration
// QueueType must be "wdrr" (Weighed Deficit Round Robin),
// "priority", or FIFO. Defaults to FIFO.
// QueueType must be "wdrr" (Weighed Deficit Round Robin), "priority", or
// "fifo". Defaults to "fifo".
QueueType string
// MaxIncommingConnectionsPerIP limits the number of incoming
// connections per IP address. Defaults to 100.
MaxIncommingConnectionsPerIP uint
// MaxIncomingConnectionAttempts rate limits the number of incoming connection
// attempts per IP address. Defaults to 100.
MaxIncomingConnectionAttempts uint
// IncomingConnectionWindow describes how often an IP address
// can attempt to create a new connection. Defaults to 10
@ -182,8 +182,8 @@ func (o *RouterOptions) Validate() error {
o.IncomingConnectionWindow)
}
if o.MaxIncommingConnectionsPerIP == 0 {
o.MaxIncommingConnectionsPerIP = 100
if o.MaxIncomingConnectionAttempts == 0 {
o.MaxIncomingConnectionAttempts = 100
}
return nil
@ -278,8 +278,9 @@ func NewRouter(
nodeInfo: nodeInfo,
privKey: privKey,
connTracker: newConnTracker(
options.MaxIncommingConnectionsPerIP,
options.IncomingConnectionWindow),
options.MaxIncomingConnectionAttempts,
options.IncomingConnectionWindow,
),
chDescs: make([]ChannelDescriptor, 0),
transports: transports,
protocolTransports: map[Protocol]Transport{},
@ -526,7 +527,8 @@ func (r *Router) acceptPeers(transport Transport) {
r.logger.Debug("rate limiting incoming peer",
"err", err,
"ip", incomingIP.String(),
"closeErr", closeErr)
"close_err", closeErr,
)
return
}
@ -545,9 +547,7 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) {
incomingIP := re.IP
if err := r.filterPeersIP(ctx, incomingIP, re.Port); err != nil {
r.logger.Debug("peer filtered by IP",
"ip", incomingIP.String(),
"err", err)
r.logger.Debug("peer filtered by IP", "ip", incomingIP.String(), "err", err)
return
}
@ -576,9 +576,7 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) {
}
if err := r.filterPeersID(ctx, peerInfo.NodeID); err != nil {
r.logger.Debug("peer filtered by node ID",
"node", peerInfo.NodeID,
"err", err)
r.logger.Debug("peer filtered by node ID", "node", peerInfo.NodeID, "err", err)
return
}


+ 11
- 8
test/e2e/generator/generate.go View File

@ -78,9 +78,9 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
switch p2pInfo := opt["useNewP2P"].(type) {
case bool:
manifest.UseNewP2P = p2pInfo
manifest.DisableLegacyP2P = p2pInfo
case int:
manifest.UseNewP2P = false
manifest.DisableLegacyP2P = false
p2pNodeFactor = p2pInfo
}
@ -104,11 +104,13 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
for i := 1; i <= numSeeds; i++ {
node := generateNode(r, e2e.ModeSeed, 0, manifest.InitialHeight, false)
node.QueueType = manifest.QueueType
if p2pNodeFactor == 0 {
node.UseNewP2P = manifest.UseNewP2P
node.DisableLegacyP2P = manifest.DisableLegacyP2P
} else if p2pNodeFactor%i == 0 {
node.UseNewP2P = !manifest.UseNewP2P
node.DisableLegacyP2P = !manifest.DisableLegacyP2P
}
manifest.Nodes[fmt.Sprintf("seed%02d", i)] = node
}
@ -129,10 +131,11 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
node.QueueType = manifest.QueueType
if p2pNodeFactor == 0 {
node.UseNewP2P = manifest.UseNewP2P
node.DisableLegacyP2P = manifest.DisableLegacyP2P
} else if p2pNodeFactor%i == 0 {
node.UseNewP2P = !manifest.UseNewP2P
node.DisableLegacyP2P = !manifest.DisableLegacyP2P
}
manifest.Nodes[name] = node
if startAt == 0 {
@ -164,9 +167,9 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
node := generateNode(r, e2e.ModeFull, startAt, manifest.InitialHeight, false)
node.QueueType = manifest.QueueType
if p2pNodeFactor == 0 {
node.UseNewP2P = manifest.UseNewP2P
node.DisableLegacyP2P = manifest.DisableLegacyP2P
} else if p2pNodeFactor%i == 0 {
node.UseNewP2P = !manifest.UseNewP2P
node.DisableLegacyP2P = !manifest.DisableLegacyP2P
}
manifest.Nodes[fmt.Sprintf("full%02d", i)] = node
}


+ 25
- 25
test/e2e/networks/ci.toml View File

@ -1,10 +1,10 @@
# This testnet is run by CI, and attempts to cover a broad range of
# functionality with a single network.
initial_height = 1000
disable_legacy_p2p = false
evidence = 0
initial_state = { initial01 = "a", initial02 = "b", initial03 = "c" }
use_new_p2p = false
initial_height = 1000
initial_state = {initial01 = "a", initial02 = "b", initial03 = "c"}
queue_type = "priority"
[validators]
@ -29,71 +29,71 @@ validator05 = 50
[node.seed01]
mode = "seed"
seeds = ["seed02"]
perturb = ["restart"]
seeds = ["seed02"]
[node.seed02]
mode = "seed"
seeds = ["seed01"]
[node.validator01]
perturb = ["disconnect"]
seeds = ["seed01"]
snapshot_interval = 5
perturb = ["disconnect"]
[node.validator02]
seeds = ["seed02"]
database = "boltdb"
abci_protocol = "tcp"
privval_protocol = "tcp"
database = "boltdb"
persist_interval = 0
perturb = ["restart"]
privval_protocol = "tcp"
seeds = ["seed02"]
[node.validator03]
seeds = ["seed01"]
database = "badgerdb"
seeds = ["seed01"]
# FIXME: should be grpc, disabled due to https://github.com/tendermint/tendermint/issues/5439
#abci_protocol = "grpc"
privval_protocol = "grpc"
persist_interval = 3
retain_blocks = 3
perturb = ["kill"]
privval_protocol = "grpc"
retain_blocks = 3
[node.validator04]
persistent_peers = ["validator01"]
database = "rocksdb"
abci_protocol = "builtin"
database = "rocksdb"
persistent_peers = ["validator01"]
perturb = ["pause"]
[node.validator05]
database = "cleveldb"
fast_sync = "v0"
seeds = ["seed02"]
start_at = 1005 # Becomes part of the validator set at 1010
seeds = ["seed02"]
database = "cleveldb"
fast_sync = "v0"
# FIXME: should be grpc, disabled due to https://github.com/tendermint/tendermint/issues/5439
#abci_protocol = "grpc"
privval_protocol = "tcp"
perturb = ["kill", "pause", "disconnect", "restart"]
privval_protocol = "tcp"
[node.full01]
start_at = 1010
mode = "full"
start_at = 1010
# FIXME: should be v2, disabled due to flake
fast_sync = "v0"
persistent_peers = ["validator01", "validator02", "validator03", "validator04", "validator05"]
retain_blocks = 3
perturb = ["restart"]
retain_blocks = 3
[node.full02]
start_at = 1015
mode = "full"
start_at = 1015
# FIXME: should be v2, disabled due to flake
fast_sync = "v0"
state_sync = true
seeds = ["seed01"]
perturb = ["restart"]
seeds = ["seed01"]
state_sync = true
[node.light01]
mode= "light"
start_at= 1010
persistent_peers = ["validator01", "validator02", "validator03"]
mode = "light"
persistent_peers = ["validator01", "validator02", "validator03"]
start_at = 1010

+ 3
- 4
test/e2e/pkg/manifest.go View File

@ -59,9 +59,8 @@ type Manifest struct {
// by individual nodes.
LogLevel string `toml:"log_level"`
// UseNewP2P enables use of the new p2p layer for all nodes in
// a test.
UseNewP2P bool `toml:"use_new_p2p"`
// DisableLegacyP2P enables use of the new p2p layer for all nodes in a test.
DisableLegacyP2P bool `toml:"disable_legacy_p2p"`
// QueueType describes the type of queue that the system uses internally
QueueType string `toml:"queue_type"`
@ -143,7 +142,7 @@ type ManifestNode struct {
LogLevel string `toml:"log_level"`
// UseNewP2P enables use of the new p2p layer for this node.
UseNewP2P bool `toml:"use_new_p2p"`
DisableLegacyP2P bool `toml:"disable_legacy_p2p"`
// QueueType describes the type of queue that the p2p layer
// uses internally.


+ 3
- 2
test/e2e/pkg/testnet.go View File

@ -90,7 +90,7 @@ type Node struct {
PersistentPeers []*Node
Perturbations []Perturbation
LogLevel string
UseNewP2P bool
DisableLegacyP2P bool
QueueType string
}
@ -169,9 +169,10 @@ func LoadTestnet(file string) (*Testnet, error) {
RetainBlocks: nodeManifest.RetainBlocks,
Perturbations: []Perturbation{},
LogLevel: manifest.LogLevel,
UseNewP2P: manifest.UseNewP2P,
DisableLegacyP2P: manifest.DisableLegacyP2P,
QueueType: manifest.QueueType,
}
if node.StartAt == testnet.InitialHeight {
node.StartAt = 0 // normalize to 0 for initial nodes, since code expects this
}


+ 1
- 1
test/e2e/runner/setup.go View File

@ -239,7 +239,7 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
cfg.RPC.PprofListenAddress = ":6060"
cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false))
cfg.P2P.AddrBookStrict = false
cfg.P2P.UseNewP2P = node.UseNewP2P
cfg.P2P.DisableLegacy = node.DisableLegacyP2P
cfg.P2P.QueueType = node.QueueType
cfg.DBBackend = node.Database
cfg.StateSync.DiscoveryTime = 5 * time.Second


Loading…
Cancel
Save