From 6e921f6644159eac604bae6032a76da7ce61aa60 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 25 Aug 2021 13:33:38 -0400 Subject: [PATCH] p2p: change default to use new stack (#6862) This is just a configuration change to default to using the new stack unless explicitly disabled (e.g. `UseLegacy`) this renames the configuration value and makes the configuration logic more clear. The legacy option is good to retain as a fallback if the new stack has issues operationally, but we should make sure that most of the time we're using the new stack. --- config/config.go | 10 +++-- config/toml.go | 2 +- node/node.go | 74 +++++++++++++++++++--------------- node/node_test.go | 3 +- node/setup.go | 32 +++++++-------- test/e2e/generator/generate.go | 18 ++++----- test/e2e/pkg/manifest.go | 8 ++-- test/e2e/pkg/testnet.go | 4 +- test/e2e/runner/setup.go | 24 +++++------ 9 files changed, 92 insertions(+), 83 deletions(-) diff --git a/config/config.go b/config/config.go index 7d19616aa..7e8dd5976 100644 --- a/config/config.go +++ b/config/config.go @@ -694,13 +694,14 @@ type P2PConfig struct { //nolint: maligned // Force dial to fail TestDialFail bool `mapstructure:"test-dial-fail"` - // DisableLegacy is used mostly for testing to enable or disable the legacy - // P2P stack. - DisableLegacy bool `mapstructure:"disable-legacy"` + // UseLegacy enables the "legacy" P2P implementation and + // disables the newer default implementation. This flag will + // be removed in a future release. + UseLegacy bool `mapstructure:"use-legacy"` // Makes it possible to configure which queue backend the p2p // layer uses. Options are: "fifo", "priority" and "wdrr", - // with the default being "fifo". + // with the default being "priority". QueueType string `mapstructure:"queue-type"` } @@ -732,6 +733,7 @@ func DefaultP2PConfig() *P2PConfig { DialTimeout: 3 * time.Second, TestDialFail: false, QueueType: "priority", + UseLegacy: false, } } diff --git a/config/toml.go b/config/toml.go index edb192109..76058802c 100644 --- a/config/toml.go +++ b/config/toml.go @@ -271,7 +271,7 @@ pprof-laddr = "{{ .RPC.PprofListenAddress }}" [p2p] # Enable the new p2p layer. -disable-legacy = {{ .P2P.DisableLegacy }} +use-legacy = {{ .P2P.UseLegacy }} # Select the p2p internal queue queue-type = "{{ .P2P.QueueType }}" diff --git a/node/node.go b/node/node.go index 9cb5315bc..0dedd2861 100644 --- a/node/node.go +++ b/node/node.go @@ -319,12 +319,12 @@ func makeNode(config *cfg.Config, stateSyncReactorShim = p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims) - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, statesync.ChannelShims) - peerUpdates = peerManager.Subscribe() - } else { + if config.P2P.UseLegacy { channels = getChannelsFromShim(stateSyncReactorShim) peerUpdates = stateSyncReactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, statesync.ChannelShims) + peerUpdates = peerManager.Subscribe() } stateSyncReactor = statesync.NewReactor( @@ -373,13 +373,7 @@ func makeNode(config *cfg.Config, pexCh := pex.ChannelDescriptor() transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) - if config.P2P.DisableLegacy { - addrBook = nil - pexReactor, err = createPEXReactorV2(config, logger, peerManager, router) - if err != nil { - return nil, err - } - } else { + if config.P2P.UseLegacy { // setup Transport and Switch sw = createSwitch( config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, @@ -402,6 +396,12 @@ func makeNode(config *cfg.Config, } pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) + } else { + addrBook = nil + pexReactor, err = createPEXReactorV2(config, logger, peerManager, router) + if err != nil { + return nil, err + } } if config.RPC.PprofListenAddress != "" { @@ -461,6 +461,17 @@ func makeNode(config *cfg.Config, }, } + // this is a terrible, because typed nil interfaces are not == + // nil, so this is just cleanup to avoid having a non-nil + // value in the RPC environment that has the semantic + // properties of nil. + if sw == nil { + node.rpcEnv.P2PPeers = nil + } else if peerManager == nil { + node.rpcEnv.PeerManager = nil + } + // end hack + node.rpcEnv.P2PTransport = node node.BaseService = *service.NewBaseService(logger, "Node", node) @@ -519,12 +530,8 @@ func makeSeedNode(config *cfg.Config, // p2p stack is removed. pexCh := pex.ChannelDescriptor() transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) - if config.P2P.DisableLegacy { - pexReactor, err = createPEXReactorV2(config, logger, peerManager, router) - if err != nil { - return nil, err - } - } else { + + if config.P2P.UseLegacy { sw = createSwitch( config, transport, p2pMetrics, nil, nil, nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger, @@ -546,6 +553,11 @@ func makeSeedNode(config *cfg.Config, } pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) + } else { + pexReactor, err = createPEXReactorV2(config, logger, peerManager, router) + if err != nil { + return nil, err + } } if config.RPC.PprofListenAddress != "" { @@ -608,18 +620,16 @@ func (n *nodeImpl) OnStart() error { } n.isListening = true - n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.DisableLegacy) + n.Logger.Info("p2p service", "legacy_enabled", n.config.P2P.UseLegacy) - if n.config.P2P.DisableLegacy { - if err = n.router.Start(); err != nil { - return err - } - } else { + if n.config.P2P.UseLegacy { // Add private IDs to addrbook to block those peers being added n.addrBook.AddPrivateIDs(strings.SplitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " ")) if err = n.sw.Start(); err != nil { return err } + } else if err = n.router.Start(); err != nil { + return err } if n.config.Mode != cfg.ModeSeed { @@ -650,16 +660,14 @@ func (n *nodeImpl) OnStart() error { } } - if n.config.P2P.DisableLegacy { - if err := n.pexReactor.Start(); err != nil { - return err - } - } else { + if n.config.P2P.UseLegacy { // Always connect to persistent peers err = n.sw.DialPeersAsync(strings.SplitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " ")) if err != nil { return fmt.Errorf("could not dial peers from persistent-peers field: %w", err) } + } else if err := n.pexReactor.Start(); err != nil { + return err } // Run state sync @@ -738,14 +746,14 @@ func (n *nodeImpl) OnStop() { n.Logger.Error("failed to stop the PEX v2 reactor", "err", err) } - if n.config.P2P.DisableLegacy { - if err := n.router.Stop(); err != nil { - n.Logger.Error("failed to stop router", "err", err) - } - } else { + if n.config.P2P.UseLegacy { if err := n.sw.Stop(); err != nil { n.Logger.Error("failed to stop switch", "err", err) } + } else { + if err := n.router.Stop(); err != nil { + n.Logger.Error("failed to stop router", "err", err) + } } if err := n.transport.Close(); err != nil { diff --git a/node/node_test.go b/node/node_test.go index 64b28c0bb..885bddcfc 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -43,6 +43,7 @@ import ( func TestNodeStartStop(t *testing.T) { config := cfg.ResetTestRoot("node_node_test") + defer os.RemoveAll(config.RootDir) // create & start node @@ -53,8 +54,6 @@ func TestNodeStartStop(t *testing.T) { n, ok := ns.(*nodeImpl) require.True(t, ok) - t.Logf("Started node %v", n.sw.NodeInfo()) - // wait for the node to produce a block blocksSub, err := n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock) require.NoError(t, err) diff --git a/node/setup.go b/node/setup.go index 6d2a7523b..00f8051f0 100644 --- a/node/setup.go +++ b/node/setup.go @@ -166,12 +166,12 @@ func createMempoolReactor( peerUpdates *p2p.PeerUpdates ) - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, channelShims) - peerUpdates = peerManager.Subscribe() - } else { + if config.P2P.UseLegacy { channels = getChannelsFromShim(reactorShim) peerUpdates = reactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, channelShims) + peerUpdates = peerManager.Subscribe() } switch config.Mempool.Version { @@ -260,12 +260,12 @@ func createEvidenceReactor( peerUpdates *p2p.PeerUpdates ) - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, evidence.ChannelShims) - peerUpdates = peerManager.Subscribe() - } else { + if config.P2P.UseLegacy { channels = getChannelsFromShim(reactorShim) peerUpdates = reactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, evidence.ChannelShims) + peerUpdates = peerManager.Subscribe() } evidenceReactor := evidence.NewReactor( @@ -302,12 +302,12 @@ func createBlockchainReactor( peerUpdates *p2p.PeerUpdates ) - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, bcv0.ChannelShims) - peerUpdates = peerManager.Subscribe() - } else { + if config.P2P.UseLegacy { channels = getChannelsFromShim(reactorShim) peerUpdates = reactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, bcv0.ChannelShims) + peerUpdates = peerManager.Subscribe() } reactor, err := bcv0.NewReactor( @@ -366,12 +366,12 @@ func createConsensusReactor( peerUpdates *p2p.PeerUpdates ) - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, cs.ChannelShims) - peerUpdates = peerManager.Subscribe() - } else { + if config.P2P.UseLegacy { channels = getChannelsFromShim(reactorShim) peerUpdates = reactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, cs.ChannelShims) + peerUpdates = peerManager.Subscribe() } reactor := cs.NewReactor( diff --git a/test/e2e/generator/generate.go b/test/e2e/generator/generate.go index f699b1162..12997eb81 100644 --- a/test/e2e/generator/generate.go +++ b/test/e2e/generator/generate.go @@ -107,11 +107,11 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er switch opt["p2p"].(P2PMode) { case NewP2PMode: - manifest.DisableLegacyP2P = true + manifest.UseLegacyP2P = true case LegacyP2PMode: - manifest.DisableLegacyP2P = false + manifest.UseLegacyP2P = false case HybridP2PMode: - manifest.DisableLegacyP2P = false + manifest.UseLegacyP2P = true p2pNodeFactor = 2 default: return manifest, fmt.Errorf("unknown p2p mode %s", opt["p2p"]) @@ -138,9 +138,9 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er node := generateNode(r, e2e.ModeSeed, 0, manifest.InitialHeight, false) if p2pNodeFactor == 0 { - node.DisableLegacyP2P = manifest.DisableLegacyP2P + node.UseLegacyP2P = manifest.UseLegacyP2P } else if p2pNodeFactor%i == 0 { - node.DisableLegacyP2P = !manifest.DisableLegacyP2P + node.UseLegacyP2P = !manifest.UseLegacyP2P } manifest.Nodes[fmt.Sprintf("seed%02d", i)] = node @@ -162,9 +162,9 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er r, e2e.ModeValidator, startAt, manifest.InitialHeight, i <= 2) if p2pNodeFactor == 0 { - node.DisableLegacyP2P = manifest.DisableLegacyP2P + node.UseLegacyP2P = manifest.UseLegacyP2P } else if p2pNodeFactor%i == 0 { - node.DisableLegacyP2P = !manifest.DisableLegacyP2P + node.UseLegacyP2P = !manifest.UseLegacyP2P } manifest.Nodes[name] = node @@ -198,9 +198,9 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er node := generateNode(r, e2e.ModeFull, startAt, manifest.InitialHeight, false) if p2pNodeFactor == 0 { - node.DisableLegacyP2P = manifest.DisableLegacyP2P + node.UseLegacyP2P = manifest.UseLegacyP2P } else if p2pNodeFactor%i == 0 { - node.DisableLegacyP2P = !manifest.DisableLegacyP2P + node.UseLegacyP2P = !manifest.UseLegacyP2P } manifest.Nodes[fmt.Sprintf("full%02d", i)] = node } diff --git a/test/e2e/pkg/manifest.go b/test/e2e/pkg/manifest.go index 5711be37d..1b0fc8753 100644 --- a/test/e2e/pkg/manifest.go +++ b/test/e2e/pkg/manifest.go @@ -59,8 +59,8 @@ type Manifest struct { // by individual nodes. LogLevel string `toml:"log_level"` - // DisableLegacyP2P enables use of the new p2p layer for all nodes in a test. - DisableLegacyP2P bool `toml:"disable_legacy_p2p"` + // UseLegacyP2P uses the legacy p2p layer for all nodes in a test. + UseLegacyP2P bool `toml:"use_legacy_p2p"` // QueueType describes the type of queue that the system uses internally QueueType string `toml:"queue_type"` @@ -147,8 +147,8 @@ type ManifestNode struct { // level. LogLevel string `toml:"log_level"` - // UseNewP2P enables use of the new p2p layer for this node. - DisableLegacyP2P bool `toml:"disable_legacy_p2p"` + // UseLegacyP2P enables use of the legacy p2p layer for this node. + UseLegacyP2P bool `toml:"use_legacy_p2p"` } // Save saves the testnet manifest to a file. diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index cec58bd20..e51fa859e 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -92,7 +92,7 @@ type Node struct { PersistentPeers []*Node Perturbations []Perturbation LogLevel string - DisableLegacyP2P bool + UseLegacyP2P bool QueueType string } @@ -177,7 +177,7 @@ func LoadTestnet(file string) (*Testnet, error) { Perturbations: []Perturbation{}, LogLevel: manifest.LogLevel, QueueType: manifest.QueueType, - DisableLegacyP2P: manifest.DisableLegacyP2P || nodeManifest.DisableLegacyP2P, + UseLegacyP2P: manifest.UseLegacyP2P && nodeManifest.UseLegacyP2P, } if node.StartAt == testnet.InitialHeight { diff --git a/test/e2e/runner/setup.go b/test/e2e/runner/setup.go index c968ef306..a0bd4996a 100644 --- a/test/e2e/runner/setup.go +++ b/test/e2e/runner/setup.go @@ -238,7 +238,7 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { cfg.RPC.PprofListenAddress = ":6060" cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false)) cfg.P2P.AddrBookStrict = false - cfg.P2P.DisableLegacy = node.DisableLegacyP2P + cfg.P2P.UseLegacy = node.UseLegacyP2P cfg.P2P.QueueType = node.QueueType cfg.DBBackend = node.Database cfg.StateSync.DiscoveryTime = 5 * time.Second @@ -342,17 +342,17 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { // MakeAppConfig generates an ABCI application config for a node. func MakeAppConfig(node *e2e.Node) ([]byte, error) { cfg := map[string]interface{}{ - "chain_id": node.Testnet.Name, - "dir": "data/app", - "listen": AppAddressUNIX, - "mode": node.Mode, - "proxy_port": node.ProxyPort, - "protocol": "socket", - "persist_interval": node.PersistInterval, - "snapshot_interval": node.SnapshotInterval, - "retain_blocks": node.RetainBlocks, - "key_type": node.PrivvalKey.Type(), - "disable_legacy_p2p": node.DisableLegacyP2P, + "chain_id": node.Testnet.Name, + "dir": "data/app", + "listen": AppAddressUNIX, + "mode": node.Mode, + "proxy_port": node.ProxyPort, + "protocol": "socket", + "persist_interval": node.PersistInterval, + "snapshot_interval": node.SnapshotInterval, + "retain_blocks": node.RetainBlocks, + "key_type": node.PrivvalKey.Type(), + "use_legacy_p2p": node.UseLegacyP2P, } switch node.ABCIProtocol { case e2e.ProtocolUNIX: