diff --git a/config/config.go b/config/config.go index 79be27d11..0679ce82f 100644 --- a/config/config.go +++ b/config/config.go @@ -607,6 +607,15 @@ type P2PConfig struct { //nolint: maligned // Testing params. // Force dial to fail TestDialFail bool `mapstructure:"test-dial-fail"` + + // Mostly for testing, use rather than environment variables + // to turn on the new P2P stack. + UseNewP2P bool `mapstructure:"use-new-p2p"` + + // Makes it possible to configure which queue backend the p2p + // layer uses. Options are: "fifo", "priority" and "wdrr", + // with the default being "fifo". + QueueType string `mapstructure:"queue-type"` } // DefaultP2PConfig returns a default configuration for the peer-to-peer layer @@ -634,6 +643,7 @@ func DefaultP2PConfig() *P2PConfig { HandshakeTimeout: 20 * time.Second, DialTimeout: 3 * time.Second, TestDialFail: false, + QueueType: "priority", } } diff --git a/config/toml.go b/config/toml.go index 84d26935d..4e8e0d0ea 100644 --- a/config/toml.go +++ b/config/toml.go @@ -168,7 +168,6 @@ abci = "{{ .BaseConfig.ABCI }}" # so the app can decide if we should keep the connection or not filter-peers = {{ .BaseConfig.FilterPeers }} - ####################################################################### ### Advanced Configuration Options ### ####################################################################### @@ -262,6 +261,12 @@ pprof-laddr = "{{ .RPC.PprofListenAddress }}" ####################################################### [p2p] +# Enable the new p2p layer. +use-new-p2p = {{ .P2P.UseNewP2P }} + +# Select the p2p internal queue +queue-type = "{{ .P2P.QueueType }}" + # Address to listen for incoming connections laddr = "{{ .P2P.ListenAddress }}" diff --git a/node/node.go b/node/node.go index 477b4ca9a..6ba479537 100644 --- a/node/node.go +++ b/node/node.go @@ -8,7 +8,6 @@ import ( "net" "net/http" _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port - "os" "strconv" "time" @@ -54,21 +53,6 @@ import ( "github.com/tendermint/tendermint/version" ) -var ( - useLegacyP2P = true - p2pRouterQueueType string -) - -func init() { - if v := os.Getenv("TM_LEGACY_P2P"); len(v) > 0 { - useLegacyP2P, _ = strconv.ParseBool(v) - } - - if v := os.Getenv("TM_P2P_QUEUE"); len(v) > 0 { - p2pRouterQueueType = v - } -} - // DBContext specifies config information for loading a new DB. type DBContext struct { ID string @@ -403,12 +387,12 @@ func createMempoolReactor( peerUpdates *p2p.PeerUpdates ) - if useLegacyP2P { - channels = getChannelsFromShim(reactorShim) - peerUpdates = reactorShim.PeerUpdates - } else { + if config.P2P.UseNewP2P { channels = makeChannelsFromShims(router, channelShims) peerUpdates = peerManager.Subscribe() + } else { + channels = getChannelsFromShim(reactorShim) + peerUpdates = reactorShim.PeerUpdates } reactor := mempl.NewReactor( @@ -454,12 +438,12 @@ func createEvidenceReactor( peerUpdates *p2p.PeerUpdates ) - if useLegacyP2P { - channels = getChannelsFromShim(reactorShim) - peerUpdates = reactorShim.PeerUpdates - } else { + if config.P2P.UseNewP2P { channels = makeChannelsFromShims(router, evidence.ChannelShims) peerUpdates = peerManager.Subscribe() + } else { + channels = getChannelsFromShim(reactorShim) + peerUpdates = reactorShim.PeerUpdates } evidenceReactor := evidence.NewReactor( @@ -495,12 +479,12 @@ func createBlockchainReactor( peerUpdates *p2p.PeerUpdates ) - if useLegacyP2P { - channels = getChannelsFromShim(reactorShim) - peerUpdates = reactorShim.PeerUpdates - } else { + if config.P2P.UseNewP2P { channels = makeChannelsFromShims(router, bcv0.ChannelShims) peerUpdates = peerManager.Subscribe() + } else { + channels = getChannelsFromShim(reactorShim) + peerUpdates = reactorShim.PeerUpdates } reactor, err := bcv0.NewReactor( @@ -561,12 +545,12 @@ func createConsensusReactor( peerUpdates *p2p.PeerUpdates ) - if useLegacyP2P { - channels = getChannelsFromShim(reactorShim) - peerUpdates = reactorShim.PeerUpdates - } else { + if config.P2P.UseNewP2P { channels = makeChannelsFromShims(router, cs.ChannelShims) peerUpdates = peerManager.Subscribe() + } else { + channels = getChannelsFromShim(reactorShim) + peerUpdates = reactorShim.PeerUpdates } reactor := cs.NewReactor( @@ -1154,12 +1138,12 @@ func NewNode(config *cfg.Config, stateSyncReactorShim = p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims) - if useLegacyP2P { - channels = getChannelsFromShim(stateSyncReactorShim) - peerUpdates = stateSyncReactorShim.PeerUpdates - } else { + if config.P2P.UseNewP2P { channels = makeChannelsFromShims(router, statesync.ChannelShims) peerUpdates = peerManager.Subscribe() + } else { + channels = getChannelsFromShim(stateSyncReactorShim) + peerUpdates = stateSyncReactorShim.PeerUpdates } stateSyncReactor = statesync.NewReactor( @@ -1321,12 +1305,12 @@ func (n *Node) OnStart() error { n.isListening = true - n.Logger.Info("p2p service", "legacy_enabled", useLegacyP2P) + n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.UseNewP2P) - if useLegacyP2P { - err = n.sw.Start() - } else { + if n.config.P2P.UseNewP2P { err = n.router.Start() + } else { + err = n.sw.Start() } if err != nil { return err @@ -1361,7 +1345,7 @@ func (n *Node) OnStart() error { } } - if !useLegacyP2P && n.pexReactorV2 != nil { + if n.config.P2P.UseNewP2P && n.pexReactorV2 != nil { if err := n.pexReactorV2.Start(); err != nil { return err } @@ -1434,20 +1418,20 @@ func (n *Node) OnStop() { } } - if !useLegacyP2P && n.pexReactorV2 != nil { + if n.config.P2P.UseNewP2P && n.pexReactorV2 != nil { if err := n.pexReactorV2.Stop(); err != nil { n.Logger.Error("failed to stop the PEX v2 reactor", "err", err) } } - if useLegacyP2P { - if err := n.sw.Stop(); err != nil { - n.Logger.Error("failed to stop switch", "err", err) - } - } else { + if n.config.P2P.UseNewP2P { if err := n.router.Stop(); err != nil { n.Logger.Error("failed to stop router", "err", err) } + } else { + if err := n.sw.Stop(); err != nil { + n.Logger.Error("failed to stop switch", "err", err) + } } // stop mempool WAL @@ -1967,7 +1951,7 @@ func createAndStartPrivValidatorGRPCClient( func getRouterConfig(conf *cfg.Config, proxyApp proxy.AppConns) p2p.RouterOptions { opts := p2p.RouterOptions{ - QueueType: p2pRouterQueueType, + QueueType: conf.P2P.QueueType, } if conf.P2P.MaxNumInboundPeers > 0 { diff --git a/p2p/peermanager.go b/p2p/peermanager.go index bbb1e31f9..a230cc797 100644 --- a/p2p/peermanager.go +++ b/p2p/peermanager.go @@ -257,11 +257,6 @@ func (o *PeerManagerOptions) optimize() { // lower-scored to evict. // - EvictNext: pick peer from evict, mark as evicting. // - Disconnected: unmark connected, upgrading[from]=to, evict, evicting. -// -// FIXME: The old stack supports ABCI-based peer ID filtering via -// /p2p/filter/id/ queries, we should implement this here as well by taking -// a peer ID filtering callback in PeerManagerOptions and configuring it during -// Node setup. type PeerManager struct { selfID NodeID options PeerManagerOptions diff --git a/p2p/pqueue.go b/p2p/pqueue.go index 0e081241d..ebb0ed92c 100644 --- a/p2p/pqueue.go +++ b/p2p/pqueue.go @@ -254,7 +254,9 @@ func (s *pqScheduler) process() { s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size } - s.metrics.PeerSendBytesTotal.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size)) + s.metrics.PeerSendBytesTotal.With( + "chID", chIDStr, + "peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size)) s.dequeueCh <- pqEnv.envelope } diff --git a/p2p/router.go b/p2p/router.go index 5c55164f3..ffab0d502 100644 --- a/p2p/router.go +++ b/p2p/router.go @@ -855,7 +855,9 @@ func (r *Router) receivePeer(peerID NodeID, conn Connection) error { select { case queue.enqueue() <- Envelope{From: peerID, Message: msg}: - r.metrics.PeerReceiveBytesTotal.With("peer_id", string(peerID)).Add(float64(proto.Size(msg))) + r.metrics.PeerReceiveBytesTotal.With( + "chID", fmt.Sprint(chID), + "peer_id", string(peerID)).Add(float64(proto.Size(msg))) r.metrics.RouterChannelQueueSend.Observe(time.Since(start).Seconds()) r.logger.Debug("received message", "peer", peerID, "message", msg) diff --git a/p2p/wdrr_queue.go b/p2p/wdrr_queue.go index 80bd7ec08..9d49e6404 100644 --- a/p2p/wdrr_queue.go +++ b/p2p/wdrr_queue.go @@ -1,6 +1,7 @@ package p2p import ( + "fmt" "sort" "strconv" @@ -263,7 +264,9 @@ func (s *wdrrScheduler) process() { // 4. remove from the flow's queue // 5. grab the next HoQ Envelope and flow's deficit for len(s.buffer[chID]) > 0 && d >= we.size { - s.metrics.PeerSendBytesTotal.With("peer_id", string(we.envelope.To)).Add(float64(we.size)) + s.metrics.PeerSendBytesTotal.With( + "chID", fmt.Sprint(chID), + "peer_id", string(we.envelope.To)).Add(float64(we.size)) s.dequeueCh <- we.envelope s.size -= we.size s.deficits[chID] -= we.size diff --git a/test/e2e/docker/Dockerfile b/test/e2e/docker/Dockerfile index 8c76f1d5e..d32e70804 100644 --- a/test/e2e/docker/Dockerfile +++ b/test/e2e/docker/Dockerfile @@ -26,8 +26,6 @@ RUN cd test/e2e && make app && cp build/app /usr/bin/app WORKDIR /tendermint VOLUME /tendermint ENV TMHOME=/tendermint -ENV TM_LEGACY_P2P=true -ENV TM_P2P_QUEUE="priority" EXPOSE 26656 26657 26660 6060 ENTRYPOINT ["/usr/bin/entrypoint"] diff --git a/test/e2e/generator/generate.go b/test/e2e/generator/generate.go index f71e71856..2a1b905bb 100644 --- a/test/e2e/generator/generate.go +++ b/test/e2e/generator/generate.go @@ -16,6 +16,8 @@ var ( testnetCombinations = map[string][]interface{}{ "topology": {"single", "quad", "large"}, "ipv6": {false, true}, + "useNewP2P": {false, true, 2}, + "queueType": {"priority"}, // "fifo", "wdrr" "initialHeight": {0, 1000}, "initialState": { map[string]string{}, @@ -69,6 +71,17 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er Nodes: map[string]*e2e.ManifestNode{}, KeyType: opt["keyType"].(string), Evidence: evidence.Choose(r).(int), + QueueType: opt["queueType"].(string), + } + + var p2pNodeFactor int + + switch p2pInfo := opt["useNewP2P"].(type) { + case bool: + manifest.UseNewP2P = p2pInfo + case int: + manifest.UseNewP2P = false + p2pNodeFactor = p2pInfo } var numSeeds, numValidators, numFulls, numLightClients int @@ -89,8 +102,14 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er // First we generate seed nodes, starting at the initial height. for i := 1; i <= numSeeds; i++ { - manifest.Nodes[fmt.Sprintf("seed%02d", i)] = generateNode( - r, e2e.ModeSeed, 0, manifest.InitialHeight, false) + node := generateNode(r, e2e.ModeSeed, 0, manifest.InitialHeight, false) + node.QueueType = manifest.QueueType + if p2pNodeFactor == 0 { + node.UseNewP2P = manifest.UseNewP2P + } else if p2pNodeFactor%i == 0 { + node.UseNewP2P = !manifest.UseNewP2P + } + manifest.Nodes[fmt.Sprintf("seed%02d", i)] = node } // Next, we generate validators. We make sure a BFT quorum of validators start @@ -105,9 +124,17 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er nextStartAt += 5 } name := fmt.Sprintf("validator%02d", i) - manifest.Nodes[name] = generateNode( + node := generateNode( r, e2e.ModeValidator, startAt, manifest.InitialHeight, i <= 2) + node.QueueType = manifest.QueueType + if p2pNodeFactor == 0 { + node.UseNewP2P = manifest.UseNewP2P + } else if p2pNodeFactor%i == 0 { + node.UseNewP2P = !manifest.UseNewP2P + } + manifest.Nodes[name] = node + if startAt == 0 { (*manifest.Validators)[name] = int64(30 + r.Intn(71)) } else { @@ -134,8 +161,14 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er startAt = nextStartAt nextStartAt += 5 } - manifest.Nodes[fmt.Sprintf("full%02d", i)] = generateNode( - r, e2e.ModeFull, startAt, manifest.InitialHeight, false) + node := generateNode(r, e2e.ModeFull, startAt, manifest.InitialHeight, false) + node.QueueType = manifest.QueueType + if p2pNodeFactor == 0 { + node.UseNewP2P = manifest.UseNewP2P + } else if p2pNodeFactor%i == 0 { + node.UseNewP2P = !manifest.UseNewP2P + } + manifest.Nodes[fmt.Sprintf("full%02d", i)] = node } // We now set up peer discovery for nodes. Seed nodes are fully meshed with diff --git a/test/e2e/networks/ci.toml b/test/e2e/networks/ci.toml index 05875dace..fe0b61f7d 100644 --- a/test/e2e/networks/ci.toml +++ b/test/e2e/networks/ci.toml @@ -4,6 +4,8 @@ initial_height = 1000 evidence = 0 initial_state = { initial01 = "a", initial02 = "b", initial03 = "c" } +use_new_p2p = false +queue_type = "priority" [validators] validator01 = 100 diff --git a/test/e2e/pkg/manifest.go b/test/e2e/pkg/manifest.go index 84628594f..27166b64e 100644 --- a/test/e2e/pkg/manifest.go +++ b/test/e2e/pkg/manifest.go @@ -58,6 +58,13 @@ type Manifest struct { // LogLevel sets the log level of the entire testnet. This can be overridden // by individual nodes. LogLevel string `toml:"log_level"` + + // UseNewP2P enables use of the new p2p layer for all nodes in + // a test. + UseNewP2P bool `toml:"use_new_p2p"` + + // QueueType describes the type of queue that the system uses internally + QueueType string `toml:"queue_type"` } // ManifestNode represents a node in a testnet manifest. @@ -134,6 +141,13 @@ type ManifestNode struct { // This is helpful when debugging a specific problem. This overrides the network // level. LogLevel string `toml:"log_level"` + + // UseNewP2P enables use of the new p2p layer for this node. + UseNewP2P bool `toml:"use_new_p2p"` + + // QueueType describes the type of queue that the p2p layer + // uses internally. + QueueType string `toml:"queue_type"` } // Save saves the testnet manifest to a file. diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index 1e3eede90..7e760e8e7 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -90,6 +90,8 @@ type Node struct { PersistentPeers []*Node Perturbations []Perturbation LogLevel string + UseNewP2P bool + QueueType string } // LoadTestnet loads a testnet from a manifest file, using the filename to @@ -167,6 +169,8 @@ func LoadTestnet(file string) (*Testnet, error) { RetainBlocks: nodeManifest.RetainBlocks, Perturbations: []Perturbation{}, LogLevel: manifest.LogLevel, + UseNewP2P: manifest.UseNewP2P, + QueueType: manifest.QueueType, } if node.StartAt == testnet.InitialHeight { node.StartAt = 0 // normalize to 0 for initial nodes, since code expects this diff --git a/test/e2e/runner/setup.go b/test/e2e/runner/setup.go index b2dbfe5a9..f4acd53f3 100644 --- a/test/e2e/runner/setup.go +++ b/test/e2e/runner/setup.go @@ -239,6 +239,8 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { cfg.RPC.PprofListenAddress = ":6060" cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false)) cfg.P2P.AddrBookStrict = false + cfg.P2P.UseNewP2P = node.UseNewP2P + cfg.P2P.QueueType = node.QueueType cfg.DBBackend = node.Database cfg.StateSync.DiscoveryTime = 5 * time.Second if node.Mode != e2e.ModeLight {