From d56a44b88476d4806478be66f83bbdf34d49dd4b Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Thu, 12 Aug 2021 09:38:17 -0400 Subject: [PATCH] node: minimize hardcoded service initialization (#6798) * node: minimize hardcoded service initialization * hacking * nil safety * reduce space * remove genesis state store * fix lint * fix pex * unwind some odering effects * fix tests * remove unused experiment --- node/node.go | 264 ++++++++++++++------------------------ node/node_test.go | 82 +++++++----- node/setup.go | 2 +- rpc/client/local/local.go | 9 +- 4 files changed, 153 insertions(+), 204 deletions(-) diff --git a/node/node.go b/node/node.go index ced8af729..751c78889 100644 --- a/node/node.go +++ b/node/node.go @@ -18,7 +18,6 @@ import ( cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" cs "github.com/tendermint/tendermint/internal/consensus" - "github.com/tendermint/tendermint/internal/evidence" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/pex" @@ -37,7 +36,6 @@ import ( grpccore "github.com/tendermint/tendermint/rpc/grpc" rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server" sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" ) @@ -71,16 +69,12 @@ type nodeImpl struct { mempool mempool.Mempool stateSync bool // whether the node should state sync on startup stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots - consensusState *cs.State // latest consensus state consensusReactor *cs.Reactor // for participating in the consensus - pexReactor *pex.Reactor // for exchanging peer addresses - pexReactorV2 *pex.ReactorV2 // for exchanging peer addresses - evidenceReactor *evidence.Reactor - evidencePool *evidence.Pool // tracking evidence - proxyApp proxy.AppConns // connection to the application + pexReactor service.Service // for exchanging peer addresses + evidenceReactor service.Service rpcListeners []net.Listener // rpc servers - eventSinks []indexer.EventSink - indexerService *indexer.Service + indexerService service.Service + rpcEnv *rpccore.Environment prometheusSrv *http.Server } @@ -371,46 +365,43 @@ func makeNode(config *cfg.Config, // Note we currently use the addrBook regardless at least for AddOurAddress var ( - pexReactor *pex.Reactor - pexReactorV2 *pex.ReactorV2 - sw *p2p.Switch - addrBook pex.AddrBook + pexReactor service.Service + sw *p2p.Switch + addrBook pex.AddrBook ) pexCh := pex.ChannelDescriptor() transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) - if config.P2P.PexReactor { - if config.P2P.DisableLegacy { - addrBook = nil - pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router) - if err != nil { - return nil, err - } - } else { - // setup Transport and Switch - sw = createSwitch( - config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, - stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, - ) - - err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) - if err != nil { - return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err) - } + if config.P2P.DisableLegacy { + addrBook = nil + pexReactor, err = createPEXReactorV2(config, logger, peerManager, router) + if err != nil { + return nil, err + } + } else { + // setup Transport and Switch + sw = createSwitch( + config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, + stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, + ) - err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) - if err != nil { - return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) - } + err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) + if err != nil { + return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err) + } - addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) - if err != nil { - return nil, fmt.Errorf("could not create addrbook: %w", err) - } + err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) + if err != nil { + return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) + } - pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) + addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) + if err != nil { + return nil, fmt.Errorf("could not create addrbook: %w", err) } + + pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) } if config.RPC.PprofListenAddress != "" { @@ -438,19 +429,37 @@ func makeNode(config *cfg.Config, bcReactor: bcReactor, mempoolReactor: mpReactor, mempool: mp, - consensusState: csState, consensusReactor: csReactor, stateSyncReactor: stateSyncReactor, stateSync: stateSync, pexReactor: pexReactor, - pexReactorV2: pexReactorV2, evidenceReactor: evReactor, - evidencePool: evPool, - proxyApp: proxyApp, indexerService: indexerService, eventBus: eventBus, - eventSinks: eventSinks, + + rpcEnv: &rpccore.Environment{ + ProxyAppQuery: proxyApp.Query(), + ProxyAppMempool: proxyApp.Mempool(), + + StateStore: stateStore, + BlockStore: blockStore, + EvidencePool: evPool, + ConsensusState: csState, + P2PPeers: sw, + BlockSyncReactor: bcReactor.(cs.BlockSyncReactor), + + GenDoc: genDoc, + EventSinks: eventSinks, + ConsensusReactor: csReactor, + EventBus: eventBus, + Mempool: mp, + Logger: logger.With("module", "rpc"), + Config: *config.RPC, + }, } + + node.rpcEnv.P2PTransport = node + node.BaseService = *service.NewBaseService(logger, "Node", node) return node, nil @@ -483,25 +492,6 @@ func makeSeedNode(config *cfg.Config, p2pMetrics := p2p.PrometheusMetrics(config.Instrumentation.Namespace, "chain_id", genDoc.ChainID) p2pLogger := logger.With("module", "p2p") transport := createTransport(p2pLogger, config) - sw := createSwitch( - config, transport, p2pMetrics, nil, nil, - nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger, - ) - - err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) - if err != nil { - return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err) - } - - err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) - if err != nil { - return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) - } - - addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) - if err != nil { - return nil, fmt.Errorf("could not create addrbook: %w", err) - } peerManager, err := createPeerManager(config, dbProvider, p2pLogger, nodeKey.ID) if err != nil { @@ -515,8 +505,9 @@ func makeSeedNode(config *cfg.Config, } var ( - pexReactor *pex.Reactor - pexReactorV2 *pex.ReactorV2 + pexReactor service.Service + sw *p2p.Switch + addrBook pex.AddrBook ) // add the pex reactor @@ -526,11 +517,31 @@ func makeSeedNode(config *cfg.Config, pexCh := pex.ChannelDescriptor() transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) if config.P2P.DisableLegacy { - pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router) + pexReactor, err = createPEXReactorV2(config, logger, peerManager, router) if err != nil { return nil, err } } else { + sw = createSwitch( + config, transport, p2pMetrics, nil, nil, + nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger, + ) + + err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) + if err != nil { + return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err) + } + + err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) + if err != nil { + return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) + } + + addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) + if err != nil { + return nil, fmt.Errorf("could not create addrbook: %w", err) + } + pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) } @@ -553,8 +564,7 @@ func makeSeedNode(config *cfg.Config, peerManager: peerManager, router: router, - pexReactor: pexReactor, - pexReactorV2: pexReactorV2, + pexReactor: pexReactor, } node.BaseService = *service.NewBaseService(logger, "SeedNode", node) @@ -595,23 +605,22 @@ func (n *nodeImpl) OnStart() error { } n.isListening = true - n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.DisableLegacy) if n.config.P2P.DisableLegacy { - err = n.router.Start() + if err = n.router.Start(); err != nil { + return err + } } else { // Add private IDs to addrbook to block those peers being added n.addrBook.AddPrivateIDs(strings.SplitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " ")) - err = n.sw.Start() - } - if err != nil { - return err + if err = n.sw.Start(); err != nil { + return err + } } if n.config.Mode != cfg.ModeSeed { if n.config.BlockSync.Version == cfg.BlockSyncV0 { - // Start the real blockchain reactor separately since the switch uses the shim. if err := n.bcReactor.Start(); err != nil { return err } @@ -638,8 +647,8 @@ func (n *nodeImpl) OnStart() error { } } - if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil { - if err := n.pexReactorV2.Start(); err != nil { + if n.config.P2P.DisableLegacy { + if err := n.pexReactor.Start(); err != nil { return err } } else { @@ -648,7 +657,6 @@ func (n *nodeImpl) OnStart() error { if err != nil { return fmt.Errorf("could not dial peers from persistent-peers field: %w", err) } - } // Run state sync @@ -723,10 +731,8 @@ func (n *nodeImpl) OnStop() { } } - if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil { - if err := n.pexReactorV2.Stop(); err != nil { - n.Logger.Error("failed to stop the PEX v2 reactor", "err", err) - } + if err := n.pexReactor.Stop(); err != nil { + n.Logger.Error("failed to stop the PEX v2 reactor", "err", err) } if n.config.P2P.DisableLegacy { @@ -767,55 +773,23 @@ func (n *nodeImpl) OnStop() { } } -// ConfigureRPC makes sure RPC has all the objects it needs to operate. -func (n *nodeImpl) ConfigureRPC() (*rpccore.Environment, error) { - rpcCoreEnv := rpccore.Environment{ - ProxyAppQuery: n.proxyApp.Query(), - ProxyAppMempool: n.proxyApp.Mempool(), - - StateStore: n.stateStore, - BlockStore: n.blockStore, - EvidencePool: n.evidencePool, - ConsensusState: n.consensusState, - P2PPeers: n.sw, - P2PTransport: n, - - GenDoc: n.genesisDoc, - EventSinks: n.eventSinks, - ConsensusReactor: n.consensusReactor, - EventBus: n.eventBus, - Mempool: n.mempool, - - Logger: n.Logger.With("module", "rpc"), - - Config: *n.config.RPC, - BlockSyncReactor: n.bcReactor.(cs.BlockSyncReactor), - } +func (n *nodeImpl) startRPC() ([]net.Listener, error) { if n.config.Mode == cfg.ModeValidator { pubKey, err := n.privValidator.GetPubKey(context.TODO()) if pubKey == nil || err != nil { return nil, fmt.Errorf("can't get pubkey: %w", err) } - rpcCoreEnv.PubKey = pubKey - } - if err := rpcCoreEnv.InitGenesisChunks(); err != nil { - return nil, err + n.rpcEnv.PubKey = pubKey } - - return &rpcCoreEnv, nil -} - -func (n *nodeImpl) startRPC() ([]net.Listener, error) { - env, err := n.ConfigureRPC() - if err != nil { + if err := n.rpcEnv.InitGenesisChunks(); err != nil { return nil, err } listenAddrs := strings.SplitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ") - routes := env.GetRoutes() + routes := n.rpcEnv.GetRoutes() if n.config.RPC.Unsafe { - env.AddUnsafe(routes) + n.rpcEnv.AddUnsafe(routes) } config := rpcserver.DefaultConfig() @@ -912,7 +886,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) { return nil, err } go func() { - if err := grpccore.StartGRPCServer(env, listener); err != nil { + if err := grpccore.StartGRPCServer(n.rpcEnv, listener); err != nil { n.Logger.Error("Error starting gRPC server", "err", err) } }() @@ -945,46 +919,16 @@ func (n *nodeImpl) startPrometheusServer(addr string) *http.Server { return srv } -// Switch returns the Node's Switch. -func (n *nodeImpl) Switch() *p2p.Switch { - return n.sw -} - -// BlockStore returns the Node's BlockStore. -func (n *nodeImpl) BlockStore() *store.BlockStore { - return n.blockStore -} - -// ConsensusState returns the Node's ConsensusState. -func (n *nodeImpl) ConsensusState() *cs.State { - return n.consensusState -} - // ConsensusReactor returns the Node's ConsensusReactor. func (n *nodeImpl) ConsensusReactor() *cs.Reactor { return n.consensusReactor } -// MempoolReactor returns the Node's mempool reactor. -func (n *nodeImpl) MempoolReactor() service.Service { - return n.mempoolReactor -} - // Mempool returns the Node's mempool. func (n *nodeImpl) Mempool() mempool.Mempool { return n.mempool } -// PEXReactor returns the Node's PEXReactor. It returns nil if PEX is disabled. -func (n *nodeImpl) PEXReactor() *pex.Reactor { - return n.pexReactor -} - -// EvidencePool returns the Node's EvidencePool. -func (n *nodeImpl) EvidencePool() *evidence.Pool { - return n.evidencePool -} - // EventBus returns the Node's EventBus. func (n *nodeImpl) EventBus() *types.EventBus { return n.eventBus @@ -1001,19 +945,9 @@ func (n *nodeImpl) GenesisDoc() *types.GenesisDoc { return n.genesisDoc } -// ProxyApp returns the Node's AppConns, representing its connections to the ABCI application. -func (n *nodeImpl) ProxyApp() proxy.AppConns { - return n.proxyApp -} - -// Config returns the Node's config. -func (n *nodeImpl) Config() *cfg.Config { - return n.config -} - -// EventSinks returns the Node's event indexing sinks. -func (n *nodeImpl) EventSinks() []indexer.EventSink { - return n.eventSinks +// RPCEnvironment makes sure RPC has all the objects it needs to operate. +func (n *nodeImpl) RPCEnvironment() *rpccore.Environment { + return n.rpcEnv } //------------------------------------------------------------------------------ diff --git a/node/node_test.go b/node/node_test.go index 16edb4210..64b28c0bb 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -513,36 +513,50 @@ func TestNodeSetEventSink(t *testing.T) { config := cfg.ResetTestRoot("node_app_version_test") defer os.RemoveAll(config.RootDir) - n := getTestNode(t, config, log.TestingLogger()) + logger := log.TestingLogger() + setupTest := func(t *testing.T, conf *cfg.Config) []indexer.EventSink { + eventBus, err := createAndStartEventBus(logger) + require.NoError(t, err) + + genDoc, err := types.GenesisDocFromFile(config.GenesisFile()) + require.NoError(t, err) + + indexService, eventSinks, err := createAndStartIndexerService(config, + cfg.DefaultDBProvider, eventBus, logger, genDoc.ChainID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, indexService.Stop()) }) + return eventSinks + } - assert.Equal(t, 1, len(n.eventSinks)) - assert.Equal(t, indexer.KV, n.eventSinks[0].Type()) + eventSinks := setupTest(t, config) + assert.Equal(t, 1, len(eventSinks)) + assert.Equal(t, indexer.KV, eventSinks[0].Type()) config.TxIndex.Indexer = []string{"null"} - n = getTestNode(t, config, log.TestingLogger()) + eventSinks = setupTest(t, config) - assert.Equal(t, 1, len(n.eventSinks)) - assert.Equal(t, indexer.NULL, n.eventSinks[0].Type()) + assert.Equal(t, 1, len(eventSinks)) + assert.Equal(t, indexer.NULL, eventSinks[0].Type()) config.TxIndex.Indexer = []string{"null", "kv"} - n = getTestNode(t, config, log.TestingLogger()) + eventSinks = setupTest(t, config) - assert.Equal(t, 1, len(n.eventSinks)) - assert.Equal(t, indexer.NULL, n.eventSinks[0].Type()) + assert.Equal(t, 1, len(eventSinks)) + assert.Equal(t, indexer.NULL, eventSinks[0].Type()) config.TxIndex.Indexer = []string{"kvv"} - ns, err := newDefaultNode(config, log.TestingLogger()) + ns, err := newDefaultNode(config, logger) assert.Nil(t, ns) assert.Equal(t, errors.New("unsupported event sink type"), err) config.TxIndex.Indexer = []string{} - n = getTestNode(t, config, log.TestingLogger()) + eventSinks = setupTest(t, config) - assert.Equal(t, 1, len(n.eventSinks)) - assert.Equal(t, indexer.NULL, n.eventSinks[0].Type()) + assert.Equal(t, 1, len(eventSinks)) + assert.Equal(t, indexer.NULL, eventSinks[0].Type()) config.TxIndex.Indexer = []string{"psql"} - ns, err = newDefaultNode(config, log.TestingLogger()) + ns, err = newDefaultNode(config, logger) assert.Nil(t, ns) assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err) @@ -550,46 +564,46 @@ func TestNodeSetEventSink(t *testing.T) { config.TxIndex.Indexer = []string{"psql"} config.TxIndex.PsqlConn = psqlConn - n = getTestNode(t, config, log.TestingLogger()) - assert.Equal(t, 1, len(n.eventSinks)) - assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type()) - n.OnStop() + eventSinks = setupTest(t, config) + + assert.Equal(t, 1, len(eventSinks)) + assert.Equal(t, indexer.PSQL, eventSinks[0].Type()) config.TxIndex.Indexer = []string{"psql", "kv"} config.TxIndex.PsqlConn = psqlConn - n = getTestNode(t, config, log.TestingLogger()) - assert.Equal(t, 2, len(n.eventSinks)) + eventSinks = setupTest(t, config) + + assert.Equal(t, 2, len(eventSinks)) // we use map to filter the duplicated sinks, so it's not guarantee the order when append sinks. - if n.eventSinks[0].Type() == indexer.KV { - assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type()) + if eventSinks[0].Type() == indexer.KV { + assert.Equal(t, indexer.PSQL, eventSinks[1].Type()) } else { - assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type()) - assert.Equal(t, indexer.KV, n.eventSinks[1].Type()) + assert.Equal(t, indexer.PSQL, eventSinks[0].Type()) + assert.Equal(t, indexer.KV, eventSinks[1].Type()) } - n.OnStop() config.TxIndex.Indexer = []string{"kv", "psql"} config.TxIndex.PsqlConn = psqlConn - n = getTestNode(t, config, log.TestingLogger()) - assert.Equal(t, 2, len(n.eventSinks)) - if n.eventSinks[0].Type() == indexer.KV { - assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type()) + eventSinks = setupTest(t, config) + + assert.Equal(t, 2, len(eventSinks)) + if eventSinks[0].Type() == indexer.KV { + assert.Equal(t, indexer.PSQL, eventSinks[1].Type()) } else { - assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type()) - assert.Equal(t, indexer.KV, n.eventSinks[1].Type()) + assert.Equal(t, indexer.PSQL, eventSinks[0].Type()) + assert.Equal(t, indexer.KV, eventSinks[1].Type()) } - n.OnStop() var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml") config.TxIndex.Indexer = []string{"psql", "kv", "Kv"} config.TxIndex.PsqlConn = psqlConn - _, err = newDefaultNode(config, log.TestingLogger()) + _, err = newDefaultNode(config, logger) require.Error(t, err) assert.Equal(t, e, err) config.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"} config.TxIndex.PsqlConn = psqlConn - _, err = newDefaultNode(config, log.TestingLogger()) + _, err = newDefaultNode(config, logger) require.Error(t, err) assert.Equal(t, e, err) } diff --git a/node/setup.go b/node/setup.go index af48fb382..ceadcd688 100644 --- a/node/setup.go +++ b/node/setup.go @@ -700,7 +700,7 @@ func createPEXReactorV2( logger log.Logger, peerManager *p2p.PeerManager, router *p2p.Router, -) (*pex.ReactorV2, error) { +) (service.Service, error) { channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128) if err != nil { diff --git a/rpc/client/local/local.go b/rpc/client/local/local.go index 0663ebf67..d752e6a93 100644 --- a/rpc/client/local/local.go +++ b/rpc/client/local/local.go @@ -2,6 +2,7 @@ package local import ( "context" + "errors" "fmt" "time" @@ -46,15 +47,15 @@ type Local struct { // NodeService describes the portion of the node interface that the // local RPC client constructor needs to build a local client. type NodeService interface { - ConfigureRPC() (*rpccore.Environment, error) + RPCEnvironment() *rpccore.Environment EventBus() *types.EventBus } // New configures a client that calls the Node directly. func New(node NodeService) (*Local, error) { - env, err := node.ConfigureRPC() - if err != nil { - return nil, err + env := node.RPCEnvironment() + if env == nil { + return nil, errors.New("rpc is nil") } return &Local{ EventBus: node.EventBus(),