Browse Source

node: minor package cleanups (#7444)

pull/7449/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
e3aaae570d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 110 deletions
  1. +45
    -46
      node/node.go
  2. +4
    -2
      node/node_test.go
  3. +25
    -62
      node/setup.go

+ 45
- 46
node/node.go View File

@ -171,17 +171,17 @@ func makeNode(
nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp, err := createAndStartProxyAppConns(ctx, clientCreator, logger, nodeMetrics.proxy)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), nodeMetrics.proxy)
if err := proxyApp.Start(ctx); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
}
// EventBus and IndexerService must be started before the handshake because
// we might need to index the txs of the replayed block as this might not have happened
// when the node stopped last time (i.e. the node stopped after it saved the block
// but before it indexed the txs, or, endblocker panicked)
eventBus, err := createAndStartEventBus(ctx, logger)
if err != nil {
eventBus := eventbus.NewDefault(logger.With("module", "events"))
if err := eventBus.Start(ctx); err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
@ -556,8 +556,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
n.rpcListeners = listeners
}
if n.config.Instrumentation.Prometheus &&
n.config.Instrumentation.PrometheusListenAddr != "" {
if n.config.Instrumentation.Prometheus && n.config.Instrumentation.PrometheusListenAddr != "" {
n.prometheusSrv = n.startPrometheusServer(ctx, n.config.Instrumentation.PrometheusListenAddr)
}
@ -623,50 +622,50 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
n.logger.Error("failed to emit the statesync start event", "err", err)
}
// FIXME: We shouldn't allow state sync to silently error out without
// bubbling up the error and gracefully shutting down the rest of the node
go func() {
n.logger.Info("starting state sync")
state, err := n.stateSyncReactor.Sync(ctx)
if err != nil {
n.logger.Error("state sync failed; shutting down this node", "err", err)
// stop the node
if err := n.Stop(); err != nil {
n.logger.Error("failed to shut down node", "err", err)
}
return
// RUN STATE SYNC NOW:
//
// TODO: Eventually this should run as part of some
// separate orchestrator
n.logger.Info("starting state sync")
ssState, err := n.stateSyncReactor.Sync(ctx)
if err != nil {
n.logger.Error("state sync failed; shutting down this node", "err", err)
// stop the node
if err := n.Stop(); err != nil {
n.logger.Error("failed to shut down node", "err", err)
}
return err
}
n.consensusReactor.SetStateSyncingMetrics(0)
if err := n.eventBus.PublishEventStateSyncStatus(ctx,
types.EventDataStateSyncStatus{
Complete: true,
Height: state.LastBlockHeight,
}); err != nil {
n.logger.Error("failed to emit the statesync start event", "err", err)
}
n.consensusReactor.SetStateSyncingMetrics(0)
// TODO: Some form of orchestrator is needed here between the state
// advancing reactors to be able to control which one of the three
// is running
// FIXME Very ugly to have these metrics bleed through here.
n.consensusReactor.SetBlockSyncingMetrics(1)
if err := bcR.SwitchToBlockSync(ctx, state); err != nil {
n.logger.Error("failed to switch to block sync", "err", err)
return
}
if err := n.eventBus.PublishEventStateSyncStatus(ctx,
types.EventDataStateSyncStatus{
Complete: true,
Height: ssState.LastBlockHeight,
}); err != nil {
n.logger.Error("failed to emit the statesync start event", "err", err)
return err
}
if err := n.eventBus.PublishEventBlockSyncStatus(ctx,
types.EventDataBlockSyncStatus{
Complete: false,
Height: state.LastBlockHeight,
}); err != nil {
// TODO: Some form of orchestrator is needed here between the state
// advancing reactors to be able to control which one of the three
// is running
// FIXME Very ugly to have these metrics bleed through here.
n.consensusReactor.SetBlockSyncingMetrics(1)
if err := bcR.SwitchToBlockSync(ctx, ssState); err != nil {
n.logger.Error("failed to switch to block sync", "err", err)
return err
}
n.logger.Error("failed to emit the block sync starting event", "err", err)
}
}()
if err := n.eventBus.PublishEventBlockSyncStatus(ctx,
types.EventDataBlockSyncStatus{
Complete: false,
Height: ssState.LastBlockHeight,
}); err != nil {
n.logger.Error("failed to emit the block sync starting event", "err", err)
return err
}
}
return nil


+ 4
- 2
node/node_test.go View File

@ -20,6 +20,7 @@ import (
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/proxy"
@ -562,8 +563,9 @@ func TestNodeSetEventSink(t *testing.T) {
logger := log.TestingLogger()
setupTest := func(t *testing.T, conf *config.Config) []indexer.EventSink {
eventBus, err := createAndStartEventBus(ctx, logger)
require.NoError(t, err)
eventBus := eventbus.NewDefault(logger.With("module", "events"))
require.NoError(t, eventBus.Start(ctx))
t.Cleanup(eventBus.Wait)
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err)


+ 25
- 62
node/setup.go View File

@ -10,7 +10,6 @@ import (
dbm "github.com/tendermint/tm-db"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/internal/blocksync"
@ -93,29 +92,6 @@ func initDBs(
return blockStore, stateDB, makeCloser(closers), nil
}
func createAndStartProxyAppConns(
ctx context.Context,
clientCreator abciclient.Creator,
logger log.Logger,
metrics *proxy.Metrics,
) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), metrics)
if err := proxyApp.Start(ctx); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
}
return proxyApp, nil
}
func createAndStartEventBus(ctx context.Context, logger log.Logger) (*eventbus.EventBus, error) {
eventBus := eventbus.NewDefault(logger.With("module", "events"))
if err := eventBus.Start(ctx); err != nil {
return nil, err
}
return eventBus, nil
}
func createAndStartIndexerService(
ctx context.Context,
cfg *config.Config,
@ -368,21 +344,6 @@ func createConsensusReactor(
return reactor, consensusState, nil
}
func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport {
conf := conn.DefaultMConnConfig()
conf.FlushThrottle = cfg.P2P.FlushThrottleTimeout
conf.SendRate = cfg.P2P.SendRate
conf.RecvRate = cfg.P2P.RecvRate
conf.MaxPacketMsgPayloadSize = cfg.P2P.MaxPacketMsgPayloadSize
return p2p.NewMConnTransport(
logger, conf, []*p2p.ChannelDescriptor{},
p2p.MConnTransportOptions{
MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections),
},
)
}
func createPeerManager(
cfg *config.Config,
dbProvider config.DBProvider,
@ -459,14 +420,25 @@ func createRouter(
nodeInfo types.NodeInfo,
nodeKey types.NodeKey,
peerManager *p2p.PeerManager,
conf *config.Config,
cfg *config.Config,
proxyApp proxy.AppConns,
) (*p2p.Router, error) {
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, conf)
ep, err := p2p.NewEndpoint(nodeKey.ID.AddressString(conf.P2P.ListenAddress))
transportConf := conn.DefaultMConnConfig()
transportConf.FlushThrottle = cfg.P2P.FlushThrottleTimeout
transportConf.SendRate = cfg.P2P.SendRate
transportConf.RecvRate = cfg.P2P.RecvRate
transportConf.MaxPacketMsgPayloadSize = cfg.P2P.MaxPacketMsgPayloadSize
transport := p2p.NewMConnTransport(
p2pLogger, transportConf, []*p2p.ChannelDescriptor{},
p2p.MConnTransportOptions{
MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections),
},
)
ep, err := p2p.NewEndpoint(nodeKey.ID.AddressString(cfg.P2P.ListenAddress))
if err != nil {
return nil, err
}
@ -480,7 +452,7 @@ func createRouter(
peerManager,
[]p2p.Transport{transport},
[]p2p.Endpoint{ep},
getRouterConfig(conf, proxyApp),
getRouterConfig(cfg, proxyApp),
)
}
@ -506,14 +478,13 @@ func makeNodeInfo(
genDoc *types.GenesisDoc,
state sm.State,
) (types.NodeInfo, error) {
txIndexerStatus := "off"
if indexer.IndexingEnabled(eventSinks) {
txIndexerStatus = "on"
}
bcChannel := byte(blocksync.BlockSyncChannel)
nodeInfo := types.NodeInfo{
ProtocolVersion: types.ProtocolVersion{
P2P: version.P2PProtocol, // global
@ -524,7 +495,7 @@ func makeNodeInfo(
Network: genDoc.ChainID,
Version: version.TMVersion,
Channels: []byte{
bcChannel,
byte(blocksync.BlockSyncChannel),
byte(consensus.StateChannel),
byte(consensus.DataChannel),
byte(consensus.VoteChannel),
@ -547,16 +518,12 @@ func makeNodeInfo(
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
}
lAddr := cfg.P2P.ExternalAddress
if lAddr == "" {
lAddr = cfg.P2P.ListenAddress
nodeInfo.ListenAddr = cfg.P2P.ExternalAddress
if nodeInfo.ListenAddr == "" {
nodeInfo.ListenAddr = cfg.P2P.ListenAddress
}
nodeInfo.ListenAddr = lAddr
err := nodeInfo.Validate()
return nodeInfo, err
return nodeInfo, nodeInfo.Validate()
}
func makeSeedNodeInfo(
@ -586,14 +553,10 @@ func makeSeedNodeInfo(
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
}
lAddr := cfg.P2P.ExternalAddress
if lAddr == "" {
lAddr = cfg.P2P.ListenAddress
nodeInfo.ListenAddr = cfg.P2P.ExternalAddress
if nodeInfo.ListenAddr == "" {
nodeInfo.ListenAddr = cfg.P2P.ListenAddress
}
nodeInfo.ListenAddr = lAddr
err := nodeInfo.Validate()
return nodeInfo, err
return nodeInfo, nodeInfo.Validate()
}

Loading…
Cancel
Save