diff --git a/node/node.go b/node/node.go index 6e4a53ab2..ed1bb20fe 100644 --- a/node/node.go +++ b/node/node.go @@ -8,6 +8,8 @@ import ( "net" "net/http" _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port + "os" + "strconv" "time" "github.com/prometheus/client_golang/prometheus" @@ -34,6 +36,7 @@ import ( "github.com/tendermint/tendermint/p2p/pex" "github.com/tendermint/tendermint/privval" tmgrpc "github.com/tendermint/tendermint/privval/grpc" + protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p" "github.com/tendermint/tendermint/proxy" rpccore "github.com/tendermint/tendermint/rpc/core" grpccore "github.com/tendermint/tendermint/rpc/grpc" @@ -49,7 +52,13 @@ import ( "github.com/tendermint/tendermint/version" ) -//------------------------------------------------------------------------------ +var useLegacyP2P = true + +func init() { + if v := os.Getenv("TM_LEGACY_P2P"); len(v) > 0 { + useLegacyP2P, _ = strconv.ParseBool(v) + } +} // DBContext specifies config information for loading a new DB. type DBContext struct { @@ -182,7 +191,9 @@ type Node struct { // network transport *p2p.MConnTransport - sw *p2p.Switch // p2p connections + sw *p2p.Switch // p2p connections + peerManager *p2p.PeerManager + router *p2p.Router addrBook pex.AddrBook // known peers nodeInfo p2p.NodeInfo nodeKey p2p.NodeKey // our node privkey @@ -202,6 +213,7 @@ type Node struct { consensusState *cs.State // latest consensus state consensusReactor *cs.Reactor // for participating in the consensus pexReactor *pex.Reactor // for exchanging peer addresses + pexReactorV2 *pex.ReactorV2 // for exchanging peer addresses evidenceReactor *evidence.Reactor evidencePool *evidence.Pool // tracking evidence proxyApp proxy.AppConns // connection to the application @@ -324,7 +336,8 @@ func createMempoolReactor( proxyApp proxy.AppConns, state sm.State, memplMetrics *mempl.Metrics, - peerMgr *p2p.PeerManager, + peerManager *p2p.PeerManager, + router *p2p.Router, logger log.Logger, ) (*p2p.ReactorShim, *mempl.Reactor, *mempl.CListMempool) { @@ -340,14 +353,29 @@ func createMempoolReactor( mempool.SetLogger(logger) - reactorShim := p2p.NewReactorShim(logger, "MempoolShim", mempl.GetChannelShims(config.Mempool)) + channelShims := mempl.GetChannelShims(config.Mempool) + reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims) + + var ( + channels map[p2p.ChannelID]*p2p.Channel + peerUpdates *p2p.PeerUpdates + ) + + if useLegacyP2P { + channels = getChannelsFromShim(reactorShim) + peerUpdates = reactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, channelShims) + peerUpdates = peerManager.Subscribe() + } + reactor := mempl.NewReactor( logger, config.Mempool, - peerMgr, + peerManager, mempool, - reactorShim.GetChannel(mempl.MempoolChannel), - reactorShim.PeerUpdates, + channels[mempl.MempoolChannel], + peerUpdates, ) if config.Consensus.WaitForTxs() { @@ -362,6 +390,8 @@ func createEvidenceReactor( dbProvider DBProvider, stateDB dbm.DB, blockStore *store.BlockStore, + peerManager *p2p.PeerManager, + router *p2p.Router, logger log.Logger, ) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) { evidenceDB, err := dbProvider(&DBContext{"evidence", config}) @@ -370,21 +400,34 @@ func createEvidenceReactor( } logger = logger.With("module", "evidence") + reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims) evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore) if err != nil { return nil, nil, nil, err } - evidenceReactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims) + var ( + channels map[p2p.ChannelID]*p2p.Channel + peerUpdates *p2p.PeerUpdates + ) + + if useLegacyP2P { + channels = getChannelsFromShim(reactorShim) + peerUpdates = reactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, evidence.ChannelShims) + peerUpdates = peerManager.Subscribe() + } + evidenceReactor := evidence.NewReactor( logger, - evidenceReactorShim.GetChannel(evidence.EvidenceChannel), - evidenceReactorShim.PeerUpdates, + channels[evidence.EvidenceChannel], + peerUpdates, evidencePool, ) - return evidenceReactorShim, evidenceReactor, evidencePool, nil + return reactorShim, evidenceReactor, evidencePool, nil } func createBlockchainReactor( @@ -394,6 +437,8 @@ func createBlockchainReactor( blockExec *sm.BlockExecutor, blockStore *store.BlockStore, csReactor *cs.Reactor, + peerManager *p2p.PeerManager, + router *p2p.Router, fastSync bool, ) (*p2p.ReactorShim, service.Service, error) { @@ -403,9 +448,22 @@ func createBlockchainReactor( case "v0": reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims) + var ( + channels map[p2p.ChannelID]*p2p.Channel + peerUpdates *p2p.PeerUpdates + ) + + if useLegacyP2P { + channels = getChannelsFromShim(reactorShim) + peerUpdates = reactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, bcv0.ChannelShims) + peerUpdates = peerManager.Subscribe() + } + reactor, err := bcv0.NewReactor( logger, state.Copy(), blockExec, blockStore, csReactor, - reactorShim.GetChannel(bcv0.BlockchainChannel), reactorShim.PeerUpdates, fastSync, + channels[bcv0.BlockchainChannel], peerUpdates, fastSync, ) if err != nil { return nil, nil, err @@ -435,6 +493,8 @@ func createConsensusReactor( csMetrics *cs.Metrics, waitSync bool, eventBus *types.EventBus, + peerManager *p2p.PeerManager, + router *p2p.Router, logger log.Logger, ) (*p2p.ReactorShim, *cs.Reactor, *cs.State) { @@ -455,14 +515,28 @@ func createConsensusReactor( } reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims) + + var ( + channels map[p2p.ChannelID]*p2p.Channel + peerUpdates *p2p.PeerUpdates + ) + + if useLegacyP2P { + channels = getChannelsFromShim(reactorShim) + peerUpdates = reactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, cs.ChannelShims) + peerUpdates = peerManager.Subscribe() + } + reactor := cs.NewReactor( logger, consensusState, - reactorShim.GetChannel(cs.StateChannel), - reactorShim.GetChannel(cs.DataChannel), - reactorShim.GetChannel(cs.VoteChannel), - reactorShim.GetChannel(cs.VoteSetBitsChannel), - reactorShim.PeerUpdates, + channels[cs.StateChannel], + channels[cs.DataChannel], + channels[cs.VoteChannel], + channels[cs.VoteSetBitsChannel], + peerUpdates, waitSync, cs.ReactorMetrics(csMetrics), ) @@ -474,10 +548,7 @@ func createConsensusReactor( return reactorShim, reactor, consensusState } -func createTransport( - logger log.Logger, - config *cfg.Config, -) *p2p.MConnTransport { +func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport { return p2p.NewMConnTransport( logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{}, p2p.MConnTransportOptions{ @@ -488,6 +559,52 @@ func createTransport( ) } +func createPeerManager(config *cfg.Config, p2pLogger log.Logger, nodeID p2p.NodeID) (*p2p.PeerManager, error) { + options := p2p.PeerManagerOptions{ + MaxConnected: 64, + MaxConnectedUpgrade: 4, + MaxPeers: 1000, + MinRetryTime: 100 * time.Millisecond, + MaxRetryTime: 8 * time.Hour, + MaxRetryTimePersistent: 5 * time.Minute, + RetryTimeJitter: 3 * time.Second, + } + + peers := []p2p.NodeAddress{} + for _, p := range splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") { + address, err := p2p.ParseNodeAddress(p) + if err != nil { + return nil, fmt.Errorf("invalid peer address %q: %w", p, err) + } + + peers = append(peers, address) + options.PersistentPeers = append(options.PersistentPeers, address.NodeID) + } + + peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), options) + if err != nil { + return nil, fmt.Errorf("failed to create peer manager: %w", err) + } + + for _, peer := range peers { + if err := peerManager.Add(peer); err != nil { + return nil, fmt.Errorf("failed to add peer %q: %w", peer, err) + } + } + + return peerManager, nil +} + +func createRouter( + p2pLogger log.Logger, + nodeInfo p2p.NodeInfo, + privKey crypto.PrivKey, + peerManager *p2p.PeerManager, + transport p2p.Transport, +) (*p2p.Router, error) { + return p2p.NewRouter(p2pLogger, nodeInfo, privKey, peerManager, []p2p.Transport{transport}, p2p.RouterOptions{}) +} + func createSwitch(config *cfg.Config, transport p2p.Transport, p2pMetrics *p2p.Metrics, @@ -499,7 +616,8 @@ func createSwitch(config *cfg.Config, proxyApp proxy.AppConns, nodeInfo p2p.NodeInfo, nodeKey p2p.NodeKey, - p2pLogger log.Logger) *p2p.Switch { + p2pLogger log.Logger, +) *p2p.Switch { var ( connFilters = []p2p.ConnFilterFunc{} @@ -557,6 +675,7 @@ func createSwitch(config *cfg.Config, p2p.SwitchPeerFilters(peerFilters...), p2p.SwitchConnFilters(connFilters...), ) + sw.SetLogger(p2pLogger) sw.AddReactor("MEMPOOL", mempoolReactor) sw.AddReactor("BLOCKCHAIN", bcReactor) @@ -567,6 +686,9 @@ func createSwitch(config *cfg.Config, sw.SetNodeInfo(nodeInfo) sw.SetNodeKey(nodeKey) + // XXX: needed to support old/new P2P stacks + sw.PutChannelDescsIntoTransport() + p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile()) return sw } @@ -619,6 +741,22 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, return pexReactor } +func createPEXReactorV2( + config *cfg.Config, + logger log.Logger, + peerManager *p2p.PeerManager, + router *p2p.Router, +) (*pex.ReactorV2, error) { + + channel, err := router.OpenChannel(p2p.ChannelID(pex.PexChannel), &protop2p.PexMessage{}, 0) + if err != nil { + return nil, err + } + + peerUpdates := peerManager.Subscribe() + return pex.NewReactorV2(logger, peerManager, channel, peerUpdates), nil +} + // startStateSync starts an asynchronous state sync process, then switches to fast sync mode. func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reactor, stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, fastSync bool, @@ -775,15 +913,32 @@ func NewNode(config *cfg.Config, // TODO: Fetch and provide real options and do proper p2p bootstrapping. // TODO: Use a persistent peer database. - peerMgr, err := p2p.NewPeerManager(nodeKey.ID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state) if err != nil { return nil, err } + p2pLogger := logger.With("module", "p2p") + transport := createTransport(p2pLogger, config) + + peerManager, err := createPeerManager(config, p2pLogger, nodeKey.ID) + if err != nil { + return nil, fmt.Errorf("failed to create peer manager: %w", err) + } + + router, err := createRouter(p2pLogger, nodeInfo, nodeKey.PrivKey, peerManager, transport) + if err != nil { + return nil, fmt.Errorf("failed to create router: %w", err) + } + csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) - mpReactorShim, mpReactor, mempool := createMempoolReactor(config, proxyApp, state, memplMetrics, peerMgr, logger) + mpReactorShim, mpReactor, mempool := createMempoolReactor( + config, proxyApp, state, memplMetrics, peerManager, router, logger, + ) - evReactorShim, evReactor, evPool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger) + evReactorShim, evReactor, evPool, err := createEvidenceReactor( + config, dbProvider, stateDB, blockStore, peerManager, router, logger, + ) if err != nil { return nil, err } @@ -800,13 +955,15 @@ func NewNode(config *cfg.Config, csReactorShim, csReactor, csState := createConsensusReactor( config, state, blockExec, blockStore, mempool, evPool, - privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, + privValidator, csMetrics, stateSync || fastSync, eventBus, + peerManager, router, consensusLogger, ) // Create the blockchain reactor. Note, we do not start fast sync if we're // doing a state sync first. bcReactorShim, bcReactor, err := createBlockchainReactor( - logger, config, state, blockExec, blockStore, csReactor, fastSync && !stateSync, + logger, config, state, blockExec, blockStore, csReactor, + peerManager, router, fastSync && !stateSync, ) if err != nil { return nil, fmt.Errorf("could not create blockchain reactor: %w", err) @@ -834,29 +991,34 @@ func NewNode(config *cfg.Config, // https://github.com/tendermint/tendermint/issues/4644 stateSyncReactorShim := p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims) + var ( + channels map[p2p.ChannelID]*p2p.Channel + peerUpdates *p2p.PeerUpdates + ) + + if useLegacyP2P { + channels = getChannelsFromShim(stateSyncReactorShim) + peerUpdates = stateSyncReactorShim.PeerUpdates + } else { + channels = makeChannelsFromShims(router, statesync.ChannelShims) + peerUpdates = peerManager.Subscribe() + } + stateSyncReactor := statesync.NewReactor( stateSyncReactorShim.Logger, proxyApp.Snapshot(), proxyApp.Query(), - stateSyncReactorShim.GetChannel(statesync.SnapshotChannel), - stateSyncReactorShim.GetChannel(statesync.ChunkChannel), - stateSyncReactorShim.PeerUpdates, + channels[statesync.SnapshotChannel], + channels[statesync.ChunkChannel], + peerUpdates, config.StateSync.TempDir, ) - nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state) - if err != nil { - return nil, err - } - // Setup Transport and Switch. - p2pLogger := logger.With("module", "p2p") - transport := createTransport(p2pLogger, config) sw := createSwitch( config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, ) - err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) if err != nil { return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err) @@ -884,9 +1046,17 @@ func NewNode(config *cfg.Config, // // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it. // Note we currently use the addrBook regardless at least for AddOurAddress - var pexReactor *pex.Reactor + var ( + pexReactor *pex.Reactor + pexReactorV2 *pex.ReactorV2 + ) + if config.P2P.PexReactor { pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) + pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router) + if err != nil { + return nil, err + } } if config.RPC.PprofListenAddress != "" { @@ -901,11 +1071,13 @@ func NewNode(config *cfg.Config, genesisDoc: genDoc, privValidator: privValidator, - transport: transport, - sw: sw, - addrBook: addrBook, - nodeInfo: nodeInfo, - nodeKey: nodeKey, + transport: transport, + sw: sw, + peerManager: peerManager, + router: router, + addrBook: addrBook, + nodeInfo: nodeInfo, + nodeKey: nodeKey, stateStore: stateStore, blockStore: blockStore, @@ -918,6 +1090,7 @@ func NewNode(config *cfg.Config, stateSync: stateSync, stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state pexReactor: pexReactor, + pexReactorV2: pexReactorV2, evidenceReactor: evReactor, evidencePool: evPool, proxyApp: proxyApp, @@ -980,8 +1153,13 @@ func (n *Node) OnStart() error { n.isListening = true - // Start the switch (the P2P server). - err = n.sw.Start() + n.Logger.Info("p2p service", "legacy_enabled", useLegacyP2P) + + if useLegacyP2P { + err = n.sw.Start() + } else { + err = n.router.Start() + } if err != nil { return err } @@ -1013,6 +1191,12 @@ func (n *Node) OnStart() error { return err } + if !useLegacyP2P && n.pexReactorV2 != nil { + if err := n.pexReactorV2.Start(); err != nil { + return err + } + } + // Always connect to persistent peers err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " ")) if err != nil { @@ -1050,10 +1234,6 @@ func (n *Node) OnStop() { } // now stop the reactors - if err := n.sw.Stop(); err != nil { - n.Logger.Error("Error closing switch", "err", err) - } - if n.config.FastSync.Version == "v0" { // Stop the real blockchain reactor separately since the switch uses the shim. if err := n.bcReactor.Stop(); err != nil { @@ -1081,6 +1261,22 @@ func (n *Node) OnStop() { n.Logger.Error("failed to stop the evidence reactor", "err", err) } + if !useLegacyP2P && n.pexReactorV2 != nil { + if err := n.pexReactorV2.Stop(); err != nil { + n.Logger.Error("failed to stop the PEX v2 reactor", "err", err) + } + } + + if useLegacyP2P { + if err := n.sw.Stop(); err != nil { + n.Logger.Error("failed to stop switch", "err", err) + } + } else { + if err := n.router.Stop(); err != nil { + n.Logger.Error("failed to stop router", "err", err) + } + } + // stop mempool WAL if n.config.Mempool.WalEnabled() { n.mempool.CloseWAL() @@ -1547,3 +1743,31 @@ func createAndStartPrivValidatorGRPCClient( return pvsc, nil } + +// FIXME: Temporary helper function, shims should be removed. +func makeChannelsFromShims( + router *p2p.Router, + chShims map[p2p.ChannelID]*p2p.ChannelDescriptorShim, +) map[p2p.ChannelID]*p2p.Channel { + + channels := map[p2p.ChannelID]*p2p.Channel{} + for chID, chShim := range chShims { + ch, err := router.OpenChannel(chID, chShim.MsgType, 0) + if err != nil { + panic(fmt.Sprintf("failed to open channel %v: %v", chID, err)) + } + + channels[chID] = ch + } + + return channels +} + +func getChannelsFromShim(reactorShim *p2p.ReactorShim) map[p2p.ChannelID]*p2p.Channel { + channels := map[p2p.ChannelID]*p2p.Channel{} + for chID := range reactorShim.Channels { + channels[chID] = reactorShim.GetChannel(chID) + } + + return channels +} diff --git a/p2p/switch.go b/p2p/switch.go index 71f4a0e1f..ba4b29f2b 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -1037,3 +1037,11 @@ func (sw *Switch) addPeer(p Peer) error { return nil } + +// FIXME: Eww, needed to wire up the new P2P stack along with the old one. This +// should be passed into the transport when it's constructed. +func (sw *Switch) PutChannelDescsIntoTransport() { + if t, ok := sw.transport.(*MConnTransport); ok { + t.channelDescs = sw.chDescs + } +} diff --git a/test/e2e/docker/Dockerfile b/test/e2e/docker/Dockerfile index b3b763ecb..cd28056b6 100644 --- a/test/e2e/docker/Dockerfile +++ b/test/e2e/docker/Dockerfile @@ -27,6 +27,7 @@ RUN cd test/e2e && make app && cp build/app /usr/bin/app WORKDIR /tendermint VOLUME /tendermint ENV TMHOME=/tendermint +ENV TM_LEGACY_P2P=true EXPOSE 26656 26657 26660 6060 ENTRYPOINT ["/usr/bin/entrypoint"] diff --git a/test/e2e/runner/rpc.go b/test/e2e/runner/rpc.go index c50ab6542..3184bb9a2 100644 --- a/test/e2e/runner/rpc.go +++ b/test/e2e/runner/rpc.go @@ -71,8 +71,10 @@ func waitForNode(node *e2e.Node, height int64, timeout time.Duration) (*rpctypes if err != nil { return nil, err } + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() + for { status, err := client.Status(ctx) switch {