From e3aaae570dbec3841f0b55a8260abc10f2fa5409 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 14 Dec 2021 14:56:28 -0500 Subject: [PATCH] node: minor package cleanups (#7444) --- node/node.go | 91 +++++++++++++++++++++++------------------------ node/node_test.go | 6 ++-- node/setup.go | 87 +++++++++++++------------------------------- 3 files changed, 74 insertions(+), 110 deletions(-) diff --git a/node/node.go b/node/node.go index 6ad02fbee..9878f60e5 100644 --- a/node/node.go +++ b/node/node.go @@ -171,17 +171,17 @@ func makeNode( nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). - proxyApp, err := createAndStartProxyAppConns(ctx, clientCreator, logger, nodeMetrics.proxy) - if err != nil { - return nil, combineCloseError(err, makeCloser(closers)) + proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), nodeMetrics.proxy) + if err := proxyApp.Start(ctx); err != nil { + return nil, fmt.Errorf("error starting proxy app connections: %v", err) } // EventBus and IndexerService must be started before the handshake because // we might need to index the txs of the replayed block as this might not have happened // when the node stopped last time (i.e. the node stopped after it saved the block // but before it indexed the txs, or, endblocker panicked) - eventBus, err := createAndStartEventBus(ctx, logger) - if err != nil { + eventBus := eventbus.NewDefault(logger.With("module", "events")) + if err := eventBus.Start(ctx); err != nil { return nil, combineCloseError(err, makeCloser(closers)) } @@ -556,8 +556,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { n.rpcListeners = listeners } - if n.config.Instrumentation.Prometheus && - n.config.Instrumentation.PrometheusListenAddr != "" { + if n.config.Instrumentation.Prometheus && n.config.Instrumentation.PrometheusListenAddr != "" { n.prometheusSrv = n.startPrometheusServer(ctx, n.config.Instrumentation.PrometheusListenAddr) } @@ -623,50 +622,50 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { n.logger.Error("failed to emit the statesync start event", "err", err) } - // FIXME: We shouldn't allow state sync to silently error out without - // bubbling up the error and gracefully shutting down the rest of the node - go func() { - n.logger.Info("starting state sync") - state, err := n.stateSyncReactor.Sync(ctx) - if err != nil { - n.logger.Error("state sync failed; shutting down this node", "err", err) - // stop the node - if err := n.Stop(); err != nil { - n.logger.Error("failed to shut down node", "err", err) - } - return + // RUN STATE SYNC NOW: + // + // TODO: Eventually this should run as part of some + // separate orchestrator + n.logger.Info("starting state sync") + ssState, err := n.stateSyncReactor.Sync(ctx) + if err != nil { + n.logger.Error("state sync failed; shutting down this node", "err", err) + // stop the node + if err := n.Stop(); err != nil { + n.logger.Error("failed to shut down node", "err", err) } + return err + } - n.consensusReactor.SetStateSyncingMetrics(0) - - if err := n.eventBus.PublishEventStateSyncStatus(ctx, - types.EventDataStateSyncStatus{ - Complete: true, - Height: state.LastBlockHeight, - }); err != nil { - - n.logger.Error("failed to emit the statesync start event", "err", err) - } + n.consensusReactor.SetStateSyncingMetrics(0) - // TODO: Some form of orchestrator is needed here between the state - // advancing reactors to be able to control which one of the three - // is running - // FIXME Very ugly to have these metrics bleed through here. - n.consensusReactor.SetBlockSyncingMetrics(1) - if err := bcR.SwitchToBlockSync(ctx, state); err != nil { - n.logger.Error("failed to switch to block sync", "err", err) - return - } + if err := n.eventBus.PublishEventStateSyncStatus(ctx, + types.EventDataStateSyncStatus{ + Complete: true, + Height: ssState.LastBlockHeight, + }); err != nil { + n.logger.Error("failed to emit the statesync start event", "err", err) + return err + } - if err := n.eventBus.PublishEventBlockSyncStatus(ctx, - types.EventDataBlockSyncStatus{ - Complete: false, - Height: state.LastBlockHeight, - }); err != nil { + // TODO: Some form of orchestrator is needed here between the state + // advancing reactors to be able to control which one of the three + // is running + // FIXME Very ugly to have these metrics bleed through here. + n.consensusReactor.SetBlockSyncingMetrics(1) + if err := bcR.SwitchToBlockSync(ctx, ssState); err != nil { + n.logger.Error("failed to switch to block sync", "err", err) + return err + } - n.logger.Error("failed to emit the block sync starting event", "err", err) - } - }() + if err := n.eventBus.PublishEventBlockSyncStatus(ctx, + types.EventDataBlockSyncStatus{ + Complete: false, + Height: ssState.LastBlockHeight, + }); err != nil { + n.logger.Error("failed to emit the block sync starting event", "err", err) + return err + } } return nil diff --git a/node/node_test.go b/node/node_test.go index d9806c9f1..666192a5c 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -20,6 +20,7 @@ import ( "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/tmhash" + "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/evidence" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/proxy" @@ -562,8 +563,9 @@ func TestNodeSetEventSink(t *testing.T) { logger := log.TestingLogger() setupTest := func(t *testing.T, conf *config.Config) []indexer.EventSink { - eventBus, err := createAndStartEventBus(ctx, logger) - require.NoError(t, err) + eventBus := eventbus.NewDefault(logger.With("module", "events")) + require.NoError(t, eventBus.Start(ctx)) + t.Cleanup(eventBus.Wait) genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile()) require.NoError(t, err) diff --git a/node/setup.go b/node/setup.go index 910eefad6..5e626a117 100644 --- a/node/setup.go +++ b/node/setup.go @@ -10,7 +10,6 @@ import ( dbm "github.com/tendermint/tm-db" - abciclient "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/internal/blocksync" @@ -93,29 +92,6 @@ func initDBs( return blockStore, stateDB, makeCloser(closers), nil } -func createAndStartProxyAppConns( - ctx context.Context, - clientCreator abciclient.Creator, - logger log.Logger, - metrics *proxy.Metrics, -) (proxy.AppConns, error) { - proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), metrics) - - if err := proxyApp.Start(ctx); err != nil { - return nil, fmt.Errorf("error starting proxy app connections: %v", err) - } - - return proxyApp, nil -} - -func createAndStartEventBus(ctx context.Context, logger log.Logger) (*eventbus.EventBus, error) { - eventBus := eventbus.NewDefault(logger.With("module", "events")) - if err := eventBus.Start(ctx); err != nil { - return nil, err - } - return eventBus, nil -} - func createAndStartIndexerService( ctx context.Context, cfg *config.Config, @@ -368,21 +344,6 @@ func createConsensusReactor( return reactor, consensusState, nil } -func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport { - conf := conn.DefaultMConnConfig() - conf.FlushThrottle = cfg.P2P.FlushThrottleTimeout - conf.SendRate = cfg.P2P.SendRate - conf.RecvRate = cfg.P2P.RecvRate - conf.MaxPacketMsgPayloadSize = cfg.P2P.MaxPacketMsgPayloadSize - - return p2p.NewMConnTransport( - logger, conf, []*p2p.ChannelDescriptor{}, - p2p.MConnTransportOptions{ - MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections), - }, - ) -} - func createPeerManager( cfg *config.Config, dbProvider config.DBProvider, @@ -459,14 +420,25 @@ func createRouter( nodeInfo types.NodeInfo, nodeKey types.NodeKey, peerManager *p2p.PeerManager, - conf *config.Config, + cfg *config.Config, proxyApp proxy.AppConns, ) (*p2p.Router, error) { p2pLogger := logger.With("module", "p2p") - transport := createTransport(p2pLogger, conf) - ep, err := p2p.NewEndpoint(nodeKey.ID.AddressString(conf.P2P.ListenAddress)) + transportConf := conn.DefaultMConnConfig() + transportConf.FlushThrottle = cfg.P2P.FlushThrottleTimeout + transportConf.SendRate = cfg.P2P.SendRate + transportConf.RecvRate = cfg.P2P.RecvRate + transportConf.MaxPacketMsgPayloadSize = cfg.P2P.MaxPacketMsgPayloadSize + transport := p2p.NewMConnTransport( + p2pLogger, transportConf, []*p2p.ChannelDescriptor{}, + p2p.MConnTransportOptions{ + MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections), + }, + ) + + ep, err := p2p.NewEndpoint(nodeKey.ID.AddressString(cfg.P2P.ListenAddress)) if err != nil { return nil, err } @@ -480,7 +452,7 @@ func createRouter( peerManager, []p2p.Transport{transport}, []p2p.Endpoint{ep}, - getRouterConfig(conf, proxyApp), + getRouterConfig(cfg, proxyApp), ) } @@ -506,14 +478,13 @@ func makeNodeInfo( genDoc *types.GenesisDoc, state sm.State, ) (types.NodeInfo, error) { + txIndexerStatus := "off" if indexer.IndexingEnabled(eventSinks) { txIndexerStatus = "on" } - bcChannel := byte(blocksync.BlockSyncChannel) - nodeInfo := types.NodeInfo{ ProtocolVersion: types.ProtocolVersion{ P2P: version.P2PProtocol, // global @@ -524,7 +495,7 @@ func makeNodeInfo( Network: genDoc.ChainID, Version: version.TMVersion, Channels: []byte{ - bcChannel, + byte(blocksync.BlockSyncChannel), byte(consensus.StateChannel), byte(consensus.DataChannel), byte(consensus.VoteChannel), @@ -547,16 +518,12 @@ func makeNodeInfo( nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel) } - lAddr := cfg.P2P.ExternalAddress - - if lAddr == "" { - lAddr = cfg.P2P.ListenAddress + nodeInfo.ListenAddr = cfg.P2P.ExternalAddress + if nodeInfo.ListenAddr == "" { + nodeInfo.ListenAddr = cfg.P2P.ListenAddress } - nodeInfo.ListenAddr = lAddr - - err := nodeInfo.Validate() - return nodeInfo, err + return nodeInfo, nodeInfo.Validate() } func makeSeedNodeInfo( @@ -586,14 +553,10 @@ func makeSeedNodeInfo( nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel) } - lAddr := cfg.P2P.ExternalAddress - - if lAddr == "" { - lAddr = cfg.P2P.ListenAddress + nodeInfo.ListenAddr = cfg.P2P.ExternalAddress + if nodeInfo.ListenAddr == "" { + nodeInfo.ListenAddr = cfg.P2P.ListenAddress } - nodeInfo.ListenAddr = lAddr - - err := nodeInfo.Validate() - return nodeInfo, err + return nodeInfo, nodeInfo.Validate() }