Browse Source

node: minimize hardcoded service initialization (#6798)

* node: minimize hardcoded service initialization

* hacking

* nil safety

* reduce space

* remove genesis state store

* fix lint

* fix pex

* unwind some odering effects

* fix tests

* remove unused experiment
pull/6823/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
d56a44b884
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 153 additions and 204 deletions
  1. +99
    -165
      node/node.go
  2. +48
    -34
      node/node_test.go
  3. +1
    -1
      node/setup.go
  4. +5
    -4
      rpc/client/local/local.go

+ 99
- 165
node/node.go View File

@ -18,7 +18,6 @@ import (
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
cs "github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/pex"
@ -37,7 +36,6 @@ import (
grpccore "github.com/tendermint/tendermint/rpc/grpc"
rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
@ -71,16 +69,12 @@ type nodeImpl struct {
mempool mempool.Mempool
stateSync bool // whether the node should state sync on startup
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
consensusState *cs.State // latest consensus state
consensusReactor *cs.Reactor // for participating in the consensus
pexReactor *pex.Reactor // for exchanging peer addresses
pexReactorV2 *pex.ReactorV2 // for exchanging peer addresses
evidenceReactor *evidence.Reactor
evidencePool *evidence.Pool // tracking evidence
proxyApp proxy.AppConns // connection to the application
pexReactor service.Service // for exchanging peer addresses
evidenceReactor service.Service
rpcListeners []net.Listener // rpc servers
eventSinks []indexer.EventSink
indexerService *indexer.Service
indexerService service.Service
rpcEnv *rpccore.Environment
prometheusSrv *http.Server
}
@ -371,46 +365,43 @@ func makeNode(config *cfg.Config,
// Note we currently use the addrBook regardless at least for AddOurAddress
var (
pexReactor *pex.Reactor
pexReactorV2 *pex.ReactorV2
sw *p2p.Switch
addrBook pex.AddrBook
pexReactor service.Service
sw *p2p.Switch
addrBook pex.AddrBook
)
pexCh := pex.ChannelDescriptor()
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
if config.P2P.PexReactor {
if config.P2P.DisableLegacy {
addrBook = nil
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
} else {
// setup Transport and Switch
sw = createSwitch(
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
}
if config.P2P.DisableLegacy {
addrBook = nil
pexReactor, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
} else {
// setup Transport and Switch
sw = createSwitch(
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
}
addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
}
if config.RPC.PprofListenAddress != "" {
@ -438,19 +429,37 @@ func makeNode(config *cfg.Config,
bcReactor: bcReactor,
mempoolReactor: mpReactor,
mempool: mp,
consensusState: csState,
consensusReactor: csReactor,
stateSyncReactor: stateSyncReactor,
stateSync: stateSync,
pexReactor: pexReactor,
pexReactorV2: pexReactorV2,
evidenceReactor: evReactor,
evidencePool: evPool,
proxyApp: proxyApp,
indexerService: indexerService,
eventBus: eventBus,
eventSinks: eventSinks,
rpcEnv: &rpccore.Environment{
ProxyAppQuery: proxyApp.Query(),
ProxyAppMempool: proxyApp.Mempool(),
StateStore: stateStore,
BlockStore: blockStore,
EvidencePool: evPool,
ConsensusState: csState,
P2PPeers: sw,
BlockSyncReactor: bcReactor.(cs.BlockSyncReactor),
GenDoc: genDoc,
EventSinks: eventSinks,
ConsensusReactor: csReactor,
EventBus: eventBus,
Mempool: mp,
Logger: logger.With("module", "rpc"),
Config: *config.RPC,
},
}
node.rpcEnv.P2PTransport = node
node.BaseService = *service.NewBaseService(logger, "Node", node)
return node, nil
@ -483,25 +492,6 @@ func makeSeedNode(config *cfg.Config,
p2pMetrics := p2p.PrometheusMetrics(config.Instrumentation.Namespace, "chain_id", genDoc.ChainID)
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, config)
sw := createSwitch(
config, transport, p2pMetrics, nil, nil,
nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err)
}
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
peerManager, err := createPeerManager(config, dbProvider, p2pLogger, nodeKey.ID)
if err != nil {
@ -515,8 +505,9 @@ func makeSeedNode(config *cfg.Config,
}
var (
pexReactor *pex.Reactor
pexReactorV2 *pex.ReactorV2
pexReactor service.Service
sw *p2p.Switch
addrBook pex.AddrBook
)
// add the pex reactor
@ -526,11 +517,31 @@ func makeSeedNode(config *cfg.Config,
pexCh := pex.ChannelDescriptor()
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
if config.P2P.DisableLegacy {
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
pexReactor, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
} else {
sw = createSwitch(
config, transport, p2pMetrics, nil, nil,
nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err)
}
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
}
@ -553,8 +564,7 @@ func makeSeedNode(config *cfg.Config,
peerManager: peerManager,
router: router,
pexReactor: pexReactor,
pexReactorV2: pexReactorV2,
pexReactor: pexReactor,
}
node.BaseService = *service.NewBaseService(logger, "SeedNode", node)
@ -595,23 +605,22 @@ func (n *nodeImpl) OnStart() error {
}
n.isListening = true
n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.DisableLegacy)
if n.config.P2P.DisableLegacy {
err = n.router.Start()
if err = n.router.Start(); err != nil {
return err
}
} else {
// Add private IDs to addrbook to block those peers being added
n.addrBook.AddPrivateIDs(strings.SplitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
err = n.sw.Start()
}
if err != nil {
return err
if err = n.sw.Start(); err != nil {
return err
}
}
if n.config.Mode != cfg.ModeSeed {
if n.config.BlockSync.Version == cfg.BlockSyncV0 {
// Start the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Start(); err != nil {
return err
}
@ -638,8 +647,8 @@ func (n *nodeImpl) OnStart() error {
}
}
if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil {
if err := n.pexReactorV2.Start(); err != nil {
if n.config.P2P.DisableLegacy {
if err := n.pexReactor.Start(); err != nil {
return err
}
} else {
@ -648,7 +657,6 @@ func (n *nodeImpl) OnStart() error {
if err != nil {
return fmt.Errorf("could not dial peers from persistent-peers field: %w", err)
}
}
// Run state sync
@ -723,10 +731,8 @@ func (n *nodeImpl) OnStop() {
}
}
if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil {
if err := n.pexReactorV2.Stop(); err != nil {
n.Logger.Error("failed to stop the PEX v2 reactor", "err", err)
}
if err := n.pexReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the PEX v2 reactor", "err", err)
}
if n.config.P2P.DisableLegacy {
@ -767,55 +773,23 @@ func (n *nodeImpl) OnStop() {
}
}
// ConfigureRPC makes sure RPC has all the objects it needs to operate.
func (n *nodeImpl) ConfigureRPC() (*rpccore.Environment, error) {
rpcCoreEnv := rpccore.Environment{
ProxyAppQuery: n.proxyApp.Query(),
ProxyAppMempool: n.proxyApp.Mempool(),
StateStore: n.stateStore,
BlockStore: n.blockStore,
EvidencePool: n.evidencePool,
ConsensusState: n.consensusState,
P2PPeers: n.sw,
P2PTransport: n,
GenDoc: n.genesisDoc,
EventSinks: n.eventSinks,
ConsensusReactor: n.consensusReactor,
EventBus: n.eventBus,
Mempool: n.mempool,
Logger: n.Logger.With("module", "rpc"),
Config: *n.config.RPC,
BlockSyncReactor: n.bcReactor.(cs.BlockSyncReactor),
}
func (n *nodeImpl) startRPC() ([]net.Listener, error) {
if n.config.Mode == cfg.ModeValidator {
pubKey, err := n.privValidator.GetPubKey(context.TODO())
if pubKey == nil || err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
}
rpcCoreEnv.PubKey = pubKey
}
if err := rpcCoreEnv.InitGenesisChunks(); err != nil {
return nil, err
n.rpcEnv.PubKey = pubKey
}
return &rpcCoreEnv, nil
}
func (n *nodeImpl) startRPC() ([]net.Listener, error) {
env, err := n.ConfigureRPC()
if err != nil {
if err := n.rpcEnv.InitGenesisChunks(); err != nil {
return nil, err
}
listenAddrs := strings.SplitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ")
routes := env.GetRoutes()
routes := n.rpcEnv.GetRoutes()
if n.config.RPC.Unsafe {
env.AddUnsafe(routes)
n.rpcEnv.AddUnsafe(routes)
}
config := rpcserver.DefaultConfig()
@ -912,7 +886,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
return nil, err
}
go func() {
if err := grpccore.StartGRPCServer(env, listener); err != nil {
if err := grpccore.StartGRPCServer(n.rpcEnv, listener); err != nil {
n.Logger.Error("Error starting gRPC server", "err", err)
}
}()
@ -945,46 +919,16 @@ func (n *nodeImpl) startPrometheusServer(addr string) *http.Server {
return srv
}
// Switch returns the Node's Switch.
func (n *nodeImpl) Switch() *p2p.Switch {
return n.sw
}
// BlockStore returns the Node's BlockStore.
func (n *nodeImpl) BlockStore() *store.BlockStore {
return n.blockStore
}
// ConsensusState returns the Node's ConsensusState.
func (n *nodeImpl) ConsensusState() *cs.State {
return n.consensusState
}
// ConsensusReactor returns the Node's ConsensusReactor.
func (n *nodeImpl) ConsensusReactor() *cs.Reactor {
return n.consensusReactor
}
// MempoolReactor returns the Node's mempool reactor.
func (n *nodeImpl) MempoolReactor() service.Service {
return n.mempoolReactor
}
// Mempool returns the Node's mempool.
func (n *nodeImpl) Mempool() mempool.Mempool {
return n.mempool
}
// PEXReactor returns the Node's PEXReactor. It returns nil if PEX is disabled.
func (n *nodeImpl) PEXReactor() *pex.Reactor {
return n.pexReactor
}
// EvidencePool returns the Node's EvidencePool.
func (n *nodeImpl) EvidencePool() *evidence.Pool {
return n.evidencePool
}
// EventBus returns the Node's EventBus.
func (n *nodeImpl) EventBus() *types.EventBus {
return n.eventBus
@ -1001,19 +945,9 @@ func (n *nodeImpl) GenesisDoc() *types.GenesisDoc {
return n.genesisDoc
}
// ProxyApp returns the Node's AppConns, representing its connections to the ABCI application.
func (n *nodeImpl) ProxyApp() proxy.AppConns {
return n.proxyApp
}
// Config returns the Node's config.
func (n *nodeImpl) Config() *cfg.Config {
return n.config
}
// EventSinks returns the Node's event indexing sinks.
func (n *nodeImpl) EventSinks() []indexer.EventSink {
return n.eventSinks
// RPCEnvironment makes sure RPC has all the objects it needs to operate.
func (n *nodeImpl) RPCEnvironment() *rpccore.Environment {
return n.rpcEnv
}
//------------------------------------------------------------------------------


+ 48
- 34
node/node_test.go View File

@ -513,36 +513,50 @@ func TestNodeSetEventSink(t *testing.T) {
config := cfg.ResetTestRoot("node_app_version_test")
defer os.RemoveAll(config.RootDir)
n := getTestNode(t, config, log.TestingLogger())
logger := log.TestingLogger()
setupTest := func(t *testing.T, conf *cfg.Config) []indexer.EventSink {
eventBus, err := createAndStartEventBus(logger)
require.NoError(t, err)
genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
require.NoError(t, err)
indexService, eventSinks, err := createAndStartIndexerService(config,
cfg.DefaultDBProvider, eventBus, logger, genDoc.ChainID)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, indexService.Stop()) })
return eventSinks
}
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.KV, n.eventSinks[0].Type())
eventSinks := setupTest(t, config)
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.KV, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"null"}
n = getTestNode(t, config, log.TestingLogger())
eventSinks = setupTest(t, config)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"null", "kv"}
n = getTestNode(t, config, log.TestingLogger())
eventSinks = setupTest(t, config)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"kvv"}
ns, err := newDefaultNode(config, log.TestingLogger())
ns, err := newDefaultNode(config, logger)
assert.Nil(t, ns)
assert.Equal(t, errors.New("unsupported event sink type"), err)
config.TxIndex.Indexer = []string{}
n = getTestNode(t, config, log.TestingLogger())
eventSinks = setupTest(t, config)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"psql"}
ns, err = newDefaultNode(config, log.TestingLogger())
ns, err = newDefaultNode(config, logger)
assert.Nil(t, ns)
assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err)
@ -550,46 +564,46 @@ func TestNodeSetEventSink(t *testing.T) {
config.TxIndex.Indexer = []string{"psql"}
config.TxIndex.PsqlConn = psqlConn
n = getTestNode(t, config, log.TestingLogger())
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
n.OnStop()
eventSinks = setupTest(t, config)
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"psql", "kv"}
config.TxIndex.PsqlConn = psqlConn
n = getTestNode(t, config, log.TestingLogger())
assert.Equal(t, 2, len(n.eventSinks))
eventSinks = setupTest(t, config)
assert.Equal(t, 2, len(eventSinks))
// we use map to filter the duplicated sinks, so it's not guarantee the order when append sinks.
if n.eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type())
if eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, eventSinks[1].Type())
} else {
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
assert.Equal(t, indexer.KV, n.eventSinks[1].Type())
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
assert.Equal(t, indexer.KV, eventSinks[1].Type())
}
n.OnStop()
config.TxIndex.Indexer = []string{"kv", "psql"}
config.TxIndex.PsqlConn = psqlConn
n = getTestNode(t, config, log.TestingLogger())
assert.Equal(t, 2, len(n.eventSinks))
if n.eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type())
eventSinks = setupTest(t, config)
assert.Equal(t, 2, len(eventSinks))
if eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, eventSinks[1].Type())
} else {
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
assert.Equal(t, indexer.KV, n.eventSinks[1].Type())
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
assert.Equal(t, indexer.KV, eventSinks[1].Type())
}
n.OnStop()
var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
config.TxIndex.Indexer = []string{"psql", "kv", "Kv"}
config.TxIndex.PsqlConn = psqlConn
_, err = newDefaultNode(config, log.TestingLogger())
_, err = newDefaultNode(config, logger)
require.Error(t, err)
assert.Equal(t, e, err)
config.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
config.TxIndex.PsqlConn = psqlConn
_, err = newDefaultNode(config, log.TestingLogger())
_, err = newDefaultNode(config, logger)
require.Error(t, err)
assert.Equal(t, e, err)
}


+ 1
- 1
node/setup.go View File

@ -700,7 +700,7 @@ func createPEXReactorV2(
logger log.Logger,
peerManager *p2p.PeerManager,
router *p2p.Router,
) (*pex.ReactorV2, error) {
) (service.Service, error) {
channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128)
if err != nil {


+ 5
- 4
rpc/client/local/local.go View File

@ -2,6 +2,7 @@ package local
import (
"context"
"errors"
"fmt"
"time"
@ -46,15 +47,15 @@ type Local struct {
// NodeService describes the portion of the node interface that the
// local RPC client constructor needs to build a local client.
type NodeService interface {
ConfigureRPC() (*rpccore.Environment, error)
RPCEnvironment() *rpccore.Environment
EventBus() *types.EventBus
}
// New configures a client that calls the Node directly.
func New(node NodeService) (*Local, error) {
env, err := node.ConfigureRPC()
if err != nil {
return nil, err
env := node.RPCEnvironment()
if env == nil {
return nil, errors.New("rpc is nil")
}
return &Local{
EventBus: node.EventBus(),


Loading…
Cancel
Save