Browse Source

node: cleanup construction (#7191)

pull/7204/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
e2b626fc92
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 66 deletions
  1. +16
    -46
      node/node.go
  2. +26
    -20
      node/setup.go

+ 16
- 46
node/node.go View File

@ -151,7 +151,6 @@ func makeNode(cfg *config.Config,
state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc) state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc)
if err != nil { if err != nil {
return nil, combineCloseError(err, makeCloser(closers)) return nil, combineCloseError(err, makeCloser(closers))
} }
nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
@ -160,7 +159,6 @@ func makeNode(cfg *config.Config,
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, nodeMetrics.proxy) proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, nodeMetrics.proxy)
if err != nil { if err != nil {
return nil, combineCloseError(err, makeCloser(closers)) return nil, combineCloseError(err, makeCloser(closers))
} }
// EventBus and IndexerService must be started before the handshake because // EventBus and IndexerService must be started before the handshake because
@ -170,7 +168,6 @@ func makeNode(cfg *config.Config,
eventBus, err := createAndStartEventBus(logger) eventBus, err := createAndStartEventBus(logger)
if err != nil { if err != nil {
return nil, combineCloseError(err, makeCloser(closers)) return nil, combineCloseError(err, makeCloser(closers))
} }
indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus, logger, genDoc.ChainID) indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus, logger, genDoc.ChainID)
@ -224,11 +221,9 @@ func makeNode(cfg *config.Config,
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app. // and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
if !stateSync { if !stateSync {
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, logger); err != nil {
return nil, combineCloseError(err, makeCloser(closers)) return nil, combineCloseError(err, makeCloser(closers))
} }
// Reload the state. It will have the Version.Consensus.App set by the // Reload the state. It will have the Version.Consensus.App set by the
@ -246,7 +241,7 @@ func makeNode(cfg *config.Config,
// app may modify the validator set, specifying ourself as the only validator. // app may modify the validator set, specifying ourself as the only validator.
blockSync := !onlyValidatorIsUs(state, pubKey) blockSync := !onlyValidatorIsUs(state, pubKey)
logNodeStartupInfo(state, pubKey, logger, consensusLogger, cfg.Mode)
logNodeStartupInfo(state, pubKey, logger, cfg.Mode)
// TODO: Fetch and provide real options and do proper p2p bootstrapping. // TODO: Fetch and provide real options and do proper p2p bootstrapping.
// TODO: Use a persistent peer database. // TODO: Use a persistent peer database.
@ -277,7 +272,6 @@ func makeNode(cfg *config.Config,
) )
if err != nil { if err != nil {
return nil, combineCloseError(err, makeCloser(closers)) return nil, combineCloseError(err, makeCloser(closers))
} }
evReactor, evPool, err := createEvidenceReactor( evReactor, evPool, err := createEvidenceReactor(
@ -285,7 +279,6 @@ func makeNode(cfg *config.Config,
) )
if err != nil { if err != nil {
return nil, combineCloseError(err, makeCloser(closers)) return nil, combineCloseError(err, makeCloser(closers))
} }
// make block executor for consensus and blockchain reactors to execute blocks // make block executor for consensus and blockchain reactors to execute blocks
@ -302,7 +295,7 @@ func makeNode(cfg *config.Config,
csReactor, csState, err := createConsensusReactor( csReactor, csState, err := createConsensusReactor(
cfg, state, blockExec, blockStore, mp, evPool, cfg, state, blockExec, blockStore, mp, evPool,
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus, privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
peerManager, router, consensusLogger,
peerManager, router, logger,
) )
if err != nil { if err != nil {
return nil, combineCloseError(err, makeCloser(closers)) return nil, combineCloseError(err, makeCloser(closers))
@ -332,7 +325,6 @@ func makeNode(cfg *config.Config,
// FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy,
// we should clean this whole thing up. See: // we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644 // https://github.com/tendermint/tendermint/issues/4644
ssLogger := logger.With("module", "statesync")
ssChDesc := statesync.GetChannelDescriptors() ssChDesc := statesync.GetChannelDescriptors()
channels := make(map[p2p.ChannelID]*p2p.Channel, len(ssChDesc)) channels := make(map[p2p.ChannelID]*p2p.Channel, len(ssChDesc))
for idx := range ssChDesc { for idx := range ssChDesc {
@ -345,50 +337,29 @@ func makeNode(cfg *config.Config,
channels[ch.ID] = ch channels[ch.ID] = ch
} }
peerUpdates := peerManager.Subscribe()
stateSyncReactor := statesync.NewReactor( stateSyncReactor := statesync.NewReactor(
genDoc.ChainID, genDoc.ChainID,
genDoc.InitialHeight, genDoc.InitialHeight,
*cfg.StateSync, *cfg.StateSync,
ssLogger,
logger.With("module", "statesync"),
proxyApp.Snapshot(), proxyApp.Snapshot(),
proxyApp.Query(), proxyApp.Query(),
channels[statesync.SnapshotChannel], channels[statesync.SnapshotChannel],
channels[statesync.ChunkChannel], channels[statesync.ChunkChannel],
channels[statesync.LightBlockChannel], channels[statesync.LightBlockChannel],
channels[statesync.ParamsChannel], channels[statesync.ParamsChannel],
peerUpdates,
peerManager.Subscribe(),
stateStore, stateStore,
blockStore, blockStore,
cfg.StateSync.TempDir, cfg.StateSync.TempDir,
nodeMetrics.statesync, nodeMetrics.statesync,
) )
// Optionally, start the pex reactor
//
// TODO:
//
// We need to set Seeds and PersistentPeers on the switch,
// since it needs to be able to use these (and their DNS names)
// even if the PEX is off. We can include the DNS name in the NetAddress,
// but it would still be nice to have a clear list of the current "PersistentPeers"
// somewhere that we can return with net_info.
//
// If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
// Note we currently use the addrBook regardless at least for AddOurAddress
pexReactor, err := createPEXReactor(logger, peerManager, router) pexReactor, err := createPEXReactor(logger, peerManager, router)
if err != nil { if err != nil {
return nil, combineCloseError(err, makeCloser(closers)) return nil, combineCloseError(err, makeCloser(closers))
} }
if cfg.RPC.PprofListenAddress != "" {
go func() {
logger.Info("Starting pprof server", "laddr", cfg.RPC.PprofListenAddress)
logger.Error("pprof server error", "err", http.ListenAndServe(cfg.RPC.PprofListenAddress, nil))
}()
}
node := &nodeImpl{ node := &nodeImpl{
config: cfg, config: cfg,
genesisDoc: genDoc, genesisDoc: genDoc,
@ -461,7 +432,6 @@ func makeSeedNode(cfg *config.Config,
state, err := sm.MakeGenesisState(genDoc) state, err := sm.MakeGenesisState(genDoc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
nodeInfo, err := makeSeedNodeInfo(cfg, nodeKey, genDoc, state) nodeInfo, err := makeSeedNodeInfo(cfg, nodeKey, genDoc, state)
@ -487,19 +457,9 @@ func makeSeedNode(cfg *config.Config,
closer) closer)
} }
var pexReactor service.Service
pexReactor, err = createPEXReactor(logger, peerManager, router)
pexReactor, err := createPEXReactor(logger, peerManager, router)
if err != nil { if err != nil {
return nil, combineCloseError(err, closer) return nil, combineCloseError(err, closer)
}
if cfg.RPC.PprofListenAddress != "" {
go func() {
logger.Info("Starting pprof server", "laddr", cfg.RPC.PprofListenAddress)
logger.Error("pprof server error", "err", http.ListenAndServe(cfg.RPC.PprofListenAddress, nil))
}()
} }
node := &nodeImpl{ node := &nodeImpl{
@ -522,6 +482,16 @@ func makeSeedNode(cfg *config.Config,
// OnStart starts the Node. It implements service.Service. // OnStart starts the Node. It implements service.Service.
func (n *nodeImpl) OnStart() error { func (n *nodeImpl) OnStart() error {
if n.config.RPC.PprofListenAddress != "" {
// this service is not cleaned up (I believe that we'd
// need to have another thread and a potentially a
// context to get this functionality.)
go func() {
n.Logger.Info("Starting pprof server", "laddr", n.config.RPC.PprofListenAddress)
n.Logger.Error("pprof server error", "err", http.ListenAndServe(n.config.RPC.PprofListenAddress, nil))
}()
}
now := tmtime.Now() now := tmtime.Now()
genTime := n.genesisDoc.GenesisTime genTime := n.genesisDoc.GenesisTime
if genTime.After(now) { if genTime.After(now) {


+ 26
- 20
node/setup.go View File

@ -135,18 +135,20 @@ func doHandshake(
genDoc *types.GenesisDoc, genDoc *types.GenesisDoc,
eventBus types.BlockEventPublisher, eventBus types.BlockEventPublisher,
proxyApp proxy.AppConns, proxyApp proxy.AppConns,
consensusLogger log.Logger) error {
logger log.Logger,
) error {
handshaker := consensus.NewHandshaker(stateStore, state, blockStore, genDoc) handshaker := consensus.NewHandshaker(stateStore, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger)
handshaker.SetLogger(logger.With("module", "handshaker"))
handshaker.SetEventBus(eventBus) handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(proxyApp); err != nil { if err := handshaker.Handshake(proxyApp); err != nil {
return fmt.Errorf("error during handshake: %v", err) return fmt.Errorf("error during handshake: %v", err)
} }
return nil return nil
} }
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) {
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger log.Logger, mode string) {
// Log the version info. // Log the version info.
logger.Info("Version info", logger.Info("Version info",
"tmVersion", version.TMVersion, "tmVersion", version.TMVersion,
@ -162,17 +164,23 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusL
"state", state.Version.Consensus.Block, "state", state.Version.Consensus.Block,
) )
} }
switch {
case mode == config.ModeFull:
consensusLogger.Info("This node is a fullnode")
case mode == config.ModeValidator:
switch mode {
case config.ModeFull:
logger.Info("This node is a fullnode")
case config.ModeValidator:
addr := pubKey.Address() addr := pubKey.Address()
// Log whether this node is a validator or an observer // Log whether this node is a validator or an observer
if state.Validators.HasAddress(addr) { if state.Validators.HasAddress(addr) {
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes())
logger.Info("This node is a validator",
"addr", addr,
"pubKey", pubKey.Bytes(),
)
} else { } else {
consensusLogger.Info("This node is a validator (NOT in the active validator set)",
"addr", addr, "pubKey", pubKey.Bytes())
logger.Info("This node is a validator (NOT in the active validator set)",
"addr", addr,
"pubKey", pubKey.Bytes(),
)
} }
} }
} }
@ -312,6 +320,7 @@ func createConsensusReactor(
router *p2p.Router, router *p2p.Router,
logger log.Logger, logger log.Logger,
) (*consensus.Reactor, *consensus.State, error) { ) (*consensus.Reactor, *consensus.State, error) {
logger = logger.With("module", "consensus")
consensusState := consensus.NewState( consensusState := consensus.NewState(
cfg.Consensus, cfg.Consensus,
@ -339,8 +348,6 @@ func createConsensusReactor(
channels[ch.ID] = ch channels[ch.ID] = ch
} }
peerUpdates := peerManager.Subscribe()
reactor := consensus.NewReactor( reactor := consensus.NewReactor(
logger, logger,
consensusState, consensusState,
@ -348,7 +355,7 @@ func createConsensusReactor(
channels[consensus.DataChannel], channels[consensus.DataChannel],
channels[consensus.VoteChannel], channels[consensus.VoteChannel],
channels[consensus.VoteSetBitsChannel], channels[consensus.VoteSetBitsChannel],
peerUpdates,
peerManager.Subscribe(),
waitSync, waitSync,
consensus.ReactorMetrics(csMetrics), consensus.ReactorMetrics(csMetrics),
) )
@ -381,6 +388,11 @@ func createPeerManager(
nodeID types.NodeID, nodeID types.NodeID,
) (*p2p.PeerManager, closer, error) { ) (*p2p.PeerManager, closer, error) {
privatePeerIDs := make(map[types.NodeID]struct{})
for _, id := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PrivatePeerIDs, ",", " ") {
privatePeerIDs[types.NodeID(id)] = struct{}{}
}
var maxConns uint16 var maxConns uint16
switch { switch {
@ -390,11 +402,6 @@ func createPeerManager(
maxConns = 64 maxConns = 64
} }
privatePeerIDs := make(map[types.NodeID]struct{})
for _, id := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PrivatePeerIDs, ",", " ") {
privatePeerIDs[types.NodeID(id)] = struct{}{}
}
options := p2p.PeerManagerOptions{ options := p2p.PeerManagerOptions{
MaxConnected: maxConns, MaxConnected: maxConns,
MaxConnectedUpgrade: 4, MaxConnectedUpgrade: 4,
@ -485,8 +492,7 @@ func createPEXReactor(
return nil, err return nil, err
} }
peerUpdates := peerManager.Subscribe()
return pex.NewReactor(logger, peerManager, channel, peerUpdates), nil
return pex.NewReactor(logger, peerManager, channel, peerManager.Subscribe()), nil
} }
func makeNodeInfo( func makeNodeInfo(


Loading…
Cancel
Save