From e2b626fc926fb2af6f31628631863e38a236ec33 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 3 Nov 2021 12:16:35 +0100 Subject: [PATCH] node: cleanup construction (#7191) --- node/node.go | 62 +++++++++++++-------------------------------------- node/setup.go | 46 +++++++++++++++++++++----------------- 2 files changed, 42 insertions(+), 66 deletions(-) diff --git a/node/node.go b/node/node.go index bfccff6ef..eec58ef8b 100644 --- a/node/node.go +++ b/node/node.go @@ -151,7 +151,6 @@ func makeNode(cfg *config.Config, state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) - } nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) @@ -160,7 +159,6 @@ func makeNode(cfg *config.Config, proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, nodeMetrics.proxy) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) - } // EventBus and IndexerService must be started before the handshake because @@ -170,7 +168,6 @@ func makeNode(cfg *config.Config, eventBus, err := createAndStartEventBus(logger) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) - } indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus, logger, genDoc.ChainID) @@ -224,11 +221,9 @@ func makeNode(cfg *config.Config, // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, // and replays any blocks as necessary to sync tendermint with the app. - consensusLogger := logger.With("module", "consensus") if !stateSync { - if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { + if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, logger); err != nil { return nil, combineCloseError(err, makeCloser(closers)) - } // Reload the state. It will have the Version.Consensus.App set by the @@ -246,7 +241,7 @@ func makeNode(cfg *config.Config, // app may modify the validator set, specifying ourself as the only validator. blockSync := !onlyValidatorIsUs(state, pubKey) - logNodeStartupInfo(state, pubKey, logger, consensusLogger, cfg.Mode) + logNodeStartupInfo(state, pubKey, logger, cfg.Mode) // TODO: Fetch and provide real options and do proper p2p bootstrapping. // TODO: Use a persistent peer database. @@ -277,7 +272,6 @@ func makeNode(cfg *config.Config, ) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) - } evReactor, evPool, err := createEvidenceReactor( @@ -285,7 +279,6 @@ func makeNode(cfg *config.Config, ) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) - } // make block executor for consensus and blockchain reactors to execute blocks @@ -302,7 +295,7 @@ func makeNode(cfg *config.Config, csReactor, csState, err := createConsensusReactor( cfg, state, blockExec, blockStore, mp, evPool, privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus, - peerManager, router, consensusLogger, + peerManager, router, logger, ) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) @@ -332,7 +325,6 @@ func makeNode(cfg *config.Config, // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 - ssLogger := logger.With("module", "statesync") ssChDesc := statesync.GetChannelDescriptors() channels := make(map[p2p.ChannelID]*p2p.Channel, len(ssChDesc)) for idx := range ssChDesc { @@ -345,50 +337,29 @@ func makeNode(cfg *config.Config, channels[ch.ID] = ch } - peerUpdates := peerManager.Subscribe() stateSyncReactor := statesync.NewReactor( genDoc.ChainID, genDoc.InitialHeight, *cfg.StateSync, - ssLogger, + logger.With("module", "statesync"), proxyApp.Snapshot(), proxyApp.Query(), channels[statesync.SnapshotChannel], channels[statesync.ChunkChannel], channels[statesync.LightBlockChannel], channels[statesync.ParamsChannel], - peerUpdates, + peerManager.Subscribe(), stateStore, blockStore, cfg.StateSync.TempDir, nodeMetrics.statesync, ) - // Optionally, start the pex reactor - // - // TODO: - // - // We need to set Seeds and PersistentPeers on the switch, - // since it needs to be able to use these (and their DNS names) - // even if the PEX is off. We can include the DNS name in the NetAddress, - // but it would still be nice to have a clear list of the current "PersistentPeers" - // somewhere that we can return with net_info. - // - // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it. - // Note we currently use the addrBook regardless at least for AddOurAddress - pexReactor, err := createPEXReactor(logger, peerManager, router) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) } - if cfg.RPC.PprofListenAddress != "" { - go func() { - logger.Info("Starting pprof server", "laddr", cfg.RPC.PprofListenAddress) - logger.Error("pprof server error", "err", http.ListenAndServe(cfg.RPC.PprofListenAddress, nil)) - }() - } - node := &nodeImpl{ config: cfg, genesisDoc: genDoc, @@ -461,7 +432,6 @@ func makeSeedNode(cfg *config.Config, state, err := sm.MakeGenesisState(genDoc) if err != nil { return nil, err - } nodeInfo, err := makeSeedNodeInfo(cfg, nodeKey, genDoc, state) @@ -487,19 +457,9 @@ func makeSeedNode(cfg *config.Config, closer) } - var pexReactor service.Service - - pexReactor, err = createPEXReactor(logger, peerManager, router) + pexReactor, err := createPEXReactor(logger, peerManager, router) if err != nil { return nil, combineCloseError(err, closer) - - } - - if cfg.RPC.PprofListenAddress != "" { - go func() { - logger.Info("Starting pprof server", "laddr", cfg.RPC.PprofListenAddress) - logger.Error("pprof server error", "err", http.ListenAndServe(cfg.RPC.PprofListenAddress, nil)) - }() } node := &nodeImpl{ @@ -522,6 +482,16 @@ func makeSeedNode(cfg *config.Config, // OnStart starts the Node. It implements service.Service. func (n *nodeImpl) OnStart() error { + if n.config.RPC.PprofListenAddress != "" { + // this service is not cleaned up (I believe that we'd + // need to have another thread and a potentially a + // context to get this functionality.) + go func() { + n.Logger.Info("Starting pprof server", "laddr", n.config.RPC.PprofListenAddress) + n.Logger.Error("pprof server error", "err", http.ListenAndServe(n.config.RPC.PprofListenAddress, nil)) + }() + } + now := tmtime.Now() genTime := n.genesisDoc.GenesisTime if genTime.After(now) { diff --git a/node/setup.go b/node/setup.go index ffedbec33..a0d22a31c 100644 --- a/node/setup.go +++ b/node/setup.go @@ -135,18 +135,20 @@ func doHandshake( genDoc *types.GenesisDoc, eventBus types.BlockEventPublisher, proxyApp proxy.AppConns, - consensusLogger log.Logger) error { + logger log.Logger, +) error { handshaker := consensus.NewHandshaker(stateStore, state, blockStore, genDoc) - handshaker.SetLogger(consensusLogger) + handshaker.SetLogger(logger.With("module", "handshaker")) handshaker.SetEventBus(eventBus) + if err := handshaker.Handshake(proxyApp); err != nil { return fmt.Errorf("error during handshake: %v", err) } return nil } -func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) { +func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger log.Logger, mode string) { // Log the version info. logger.Info("Version info", "tmVersion", version.TMVersion, @@ -162,17 +164,23 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusL "state", state.Version.Consensus.Block, ) } - switch { - case mode == config.ModeFull: - consensusLogger.Info("This node is a fullnode") - case mode == config.ModeValidator: + + switch mode { + case config.ModeFull: + logger.Info("This node is a fullnode") + case config.ModeValidator: addr := pubKey.Address() // Log whether this node is a validator or an observer if state.Validators.HasAddress(addr) { - consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes()) + logger.Info("This node is a validator", + "addr", addr, + "pubKey", pubKey.Bytes(), + ) } else { - consensusLogger.Info("This node is a validator (NOT in the active validator set)", - "addr", addr, "pubKey", pubKey.Bytes()) + logger.Info("This node is a validator (NOT in the active validator set)", + "addr", addr, + "pubKey", pubKey.Bytes(), + ) } } } @@ -312,6 +320,7 @@ func createConsensusReactor( router *p2p.Router, logger log.Logger, ) (*consensus.Reactor, *consensus.State, error) { + logger = logger.With("module", "consensus") consensusState := consensus.NewState( cfg.Consensus, @@ -339,8 +348,6 @@ func createConsensusReactor( channels[ch.ID] = ch } - peerUpdates := peerManager.Subscribe() - reactor := consensus.NewReactor( logger, consensusState, @@ -348,7 +355,7 @@ func createConsensusReactor( channels[consensus.DataChannel], channels[consensus.VoteChannel], channels[consensus.VoteSetBitsChannel], - peerUpdates, + peerManager.Subscribe(), waitSync, consensus.ReactorMetrics(csMetrics), ) @@ -381,6 +388,11 @@ func createPeerManager( nodeID types.NodeID, ) (*p2p.PeerManager, closer, error) { + privatePeerIDs := make(map[types.NodeID]struct{}) + for _, id := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PrivatePeerIDs, ",", " ") { + privatePeerIDs[types.NodeID(id)] = struct{}{} + } + var maxConns uint16 switch { @@ -390,11 +402,6 @@ func createPeerManager( maxConns = 64 } - privatePeerIDs := make(map[types.NodeID]struct{}) - for _, id := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PrivatePeerIDs, ",", " ") { - privatePeerIDs[types.NodeID(id)] = struct{}{} - } - options := p2p.PeerManagerOptions{ MaxConnected: maxConns, MaxConnectedUpgrade: 4, @@ -485,8 +492,7 @@ func createPEXReactor( return nil, err } - peerUpdates := peerManager.Subscribe() - return pex.NewReactor(logger, peerManager, channel, peerUpdates), nil + return pex.NewReactor(logger, peerManager, channel, peerManager.Subscribe()), nil } func makeNodeInfo(