diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index a67b50874..841deb849 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -10,6 +10,7 @@ import ( "time" "github.com/tendermint/tendermint/internal/consensus" + "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/p2p" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/store" @@ -96,7 +97,8 @@ type Reactor struct { // stopping the p2p Channel(s). poolWG sync.WaitGroup - metrics *consensus.Metrics + metrics *consensus.Metrics + eventBus *eventbus.EventBus syncStartTime time.Time } @@ -113,6 +115,7 @@ func NewReactor( peerUpdates *p2p.PeerUpdates, blockSync bool, metrics *consensus.Metrics, + eventBus *eventbus.EventBus, ) (*Reactor, error) { if state.LastBlockHeight != store.Height() { @@ -146,6 +149,7 @@ func NewReactor( blockSyncOutBridgeCh: make(chan p2p.Envelope), peerUpdates: peerUpdates, metrics: metrics, + eventBus: eventBus, syncStartTime: time.Time{}, } @@ -638,6 +642,13 @@ func (r *Reactor) GetRemainingSyncTime() time.Duration { return time.Duration(int64(remain * float64(time.Second))) } +func (r *Reactor) PublishStatus(ctx context.Context, event types.EventDataBlockSyncStatus) error { + if r.eventBus == nil { + return errors.New("event bus is not configured") + } + return r.eventBus.PublishEventBlockSyncStatus(ctx, event) +} + // atomicBool is an atomic Boolean, safe for concurrent use by multiple // goroutines. type atomicBool int32 diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index db2d0bdf0..73d5a3bc7 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -181,7 +181,9 @@ func (rts *reactorTestSuite) addNode( chCreator, rts.peerUpdates[nodeID], rts.blockSync, - consensus.NopMetrics()) + consensus.NopMetrics(), + nil, // eventbus, can be nil + ) require.NoError(t, err) require.NoError(t, rts.reactors[nodeID].Start(ctx)) diff --git a/internal/inspect/rpc/rpc.go b/internal/inspect/rpc/rpc.go index 5c0d1a7e9..ad69f2c80 100644 --- a/internal/inspect/rpc/rpc.go +++ b/internal/inspect/rpc/rpc.go @@ -8,14 +8,12 @@ import ( "github.com/rs/cors" "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/internal/consensus" "github.com/tendermint/tendermint/internal/pubsub" "github.com/tendermint/tendermint/internal/rpc/core" "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/state/indexer" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/rpc/jsonrpc/server" - "github.com/tendermint/tendermint/types" ) // Server defines parameters for running an Inspector rpc server. @@ -33,12 +31,11 @@ type eventBusUnsubscriber interface { // Routes returns the set of routes used by the Inspector server. func Routes(cfg config.RPCConfig, s state.Store, bs state.BlockStore, es []indexer.EventSink, logger log.Logger) core.RoutesMap { env := &core.Environment{ - Config: cfg, - EventSinks: es, - StateStore: s, - BlockStore: bs, - ConsensusReactor: waitSyncCheckerImpl{}, - Logger: logger, + Config: cfg, + EventSinks: es, + StateStore: s, + BlockStore: bs, + Logger: logger, } return core.RoutesMap{ "blockchain": server.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight"), @@ -93,16 +90,6 @@ func addCORSHandler(rpcConfig *config.RPCConfig, h http.Handler) http.Handler { return h } -type waitSyncCheckerImpl struct{} - -func (waitSyncCheckerImpl) WaitSync() bool { - return false -} - -func (waitSyncCheckerImpl) GetPeerState(peerID types.NodeID) (*consensus.PeerState, bool) { - return nil, false -} - // ListenAndServe listens on the address specified in srv.Addr and handles any // incoming requests over HTTP using the Inspector rpc handler specified on the server. func (srv *Server) ListenAndServe(ctx context.Context) error { diff --git a/internal/rpc/core/consensus.go b/internal/rpc/core/consensus.go index bc3a23ec8..6acdcc333 100644 --- a/internal/rpc/core/consensus.go +++ b/internal/rpc/core/consensus.go @@ -101,9 +101,7 @@ func (env *Environment) GetConsensusState(ctx context.Context) (*coretypes.Resul // ConsensusParams gets the consensus parameters at the given block height. // If no height is provided, it will fetch the latest consensus params. // More: https://docs.tendermint.com/master/rpc/#/Info/consensus_params -func (env *Environment) ConsensusParams( - ctx context.Context, - heightPtr *int64) (*coretypes.ResultConsensusParams, error) { +func (env *Environment) ConsensusParams(ctx context.Context, heightPtr *int64) (*coretypes.ResultConsensusParams, error) { // The latest consensus params that we know is the consensus params after the // last block. diff --git a/internal/rpc/core/env.go b/internal/rpc/core/env.go index 9adeeee71..6dfbb1478 100644 --- a/internal/rpc/core/env.go +++ b/internal/rpc/core/env.go @@ -7,6 +7,7 @@ import ( "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/internal/blocksync" "github.com/tendermint/tendermint/internal/consensus" "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/mempool" @@ -52,11 +53,6 @@ type transport interface { NodeInfo() types.NodeInfo } -type consensusReactor interface { - WaitSync() bool - GetPeerState(peerID types.NodeID) (*consensus.PeerState, bool) -} - type peerManager interface { Peers() []types.NodeID Addresses(types.NodeID) []p2p.NodeAddress @@ -75,7 +71,8 @@ type Environment struct { BlockStore sm.BlockStore EvidencePool sm.EvidencePool ConsensusState consensusState - ConsensusReactor consensusReactor + ConsensusReactor *consensus.Reactor + BlockSyncReactor *blocksync.Reactor // Legacy p2p stack P2PTransport transport @@ -89,7 +86,6 @@ type Environment struct { EventSinks []indexer.EventSink EventBus *eventbus.EventBus // thread safe Mempool mempool.Mempool - BlockSyncReactor consensus.BlockSyncReactor StateSyncMetricer statesync.Metricer Logger log.Logger @@ -199,9 +195,13 @@ func (env *Environment) getHeight(latestHeight int64, heightPtr *int64) (int64, } func (env *Environment) latestUncommittedHeight() int64 { - nodeIsSyncing := env.ConsensusReactor.WaitSync() - if nodeIsSyncing { - return env.BlockStore.Height() + if env.ConsensusReactor != nil { + // consensus reactor can be nil in inspect mode. + + nodeIsSyncing := env.ConsensusReactor.WaitSync() + if nodeIsSyncing { + return env.BlockStore.Height() + } } return env.BlockStore.Height() + 1 } diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 25b30d9ad..78c9d8360 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -13,6 +13,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/proxy" sm "github.com/tendermint/tendermint/internal/state" @@ -156,6 +157,7 @@ type Reactor struct { providers map[types.NodeID]*BlockProvider stateProvider StateProvider + eventBus *eventbus.EventBus metrics *Metrics backfillBlockTotal int64 backfilledBlocks int64 @@ -179,6 +181,7 @@ func NewReactor( blockStore *store.BlockStore, tempDir string, ssMetrics *Metrics, + eventBus *eventbus.EventBus, ) (*Reactor, error) { chDesc := getChannelDescriptors() @@ -219,6 +222,7 @@ func NewReactor( dispatcher: NewDispatcher(blockCh), providers: make(map[types.NodeID]*BlockProvider), metrics: ssMetrics, + eventBus: eventBus, } r.BaseService = *service.NewBaseService(logger, "StateSync", r) @@ -248,6 +252,14 @@ func (r *Reactor) OnStop() { r.dispatcher.Close() } +func (r *Reactor) PublishStatus(ctx context.Context, event types.EventDataStateSyncStatus) error { + if r.eventBus == nil { + return errors.New("event system is not configured") + } + + return r.eventBus.PublishEventStateSyncStatus(ctx, event) +} + // Sync runs a state sync, fetching snapshots and providing chunks to the // application. At the close of the operation, Sync will bootstrap the state // store and persist the commit at that height so that either consensus or diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 4e81d53b0..92209b0be 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -178,6 +178,7 @@ func setup( rts.blockStore, "", m, + nil, // eventbus can be nil ) require.NoError(t, err) diff --git a/node/node.go b/node/node.go index 7429c07db..6d2ce40cc 100644 --- a/node/node.go +++ b/node/node.go @@ -63,23 +63,17 @@ type nodeImpl struct { isListening bool // services - eventBus *eventbus.EventBus // pub/sub for services eventSinks []indexer.EventSink stateStore sm.Store - blockStore *store.BlockStore // store the blockchain to disk - bcReactor service.Service // for block-syncing - mempoolReactor service.Service // for gossipping transactions - mempool mempool.Mempool + blockStore *store.BlockStore // store the blockchain to disk stateSync bool // whether the node should state sync on startup stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots - consensusReactor *consensus.Reactor // for participating in the consensus - pexReactor service.Service // for exchanging peer addresses - evidenceReactor service.Service - rpcListeners []net.Listener // rpc servers - shutdownOps closer - indexerService service.Service - rpcEnv *rpccore.Environment - prometheusSrv *http.Server + + services []service.Service + rpcListeners []net.Listener // rpc servers + shutdownOps closer + rpcEnv *rpccore.Environment + prometheusSrv *http.Server } // newDefaultNode returns a Tendermint node with default settings for the @@ -339,6 +333,7 @@ func makeNode( peerManager.Subscribe(ctx), blockSync && !stateSync, nodeMetrics.consensus, + eventBus, ) if err != nil { return nil, combineCloseError( @@ -372,6 +367,7 @@ func makeNode( blockStore, cfg.StateSync.TempDir, nodeMetrics.statesync, + eventBus, ) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) @@ -384,7 +380,6 @@ func makeNode( return nil, combineCloseError(err, makeCloser(closers)) } } - node := &nodeImpl{ config: cfg, logger: logger, @@ -396,19 +391,22 @@ func makeNode( nodeInfo: nodeInfo, nodeKey: nodeKey, + eventSinks: eventSinks, + + services: []service.Service{ + eventBus, + indexerService, + evReactor, + mpReactor, + csReactor, + bcReactor, + pexReactor, + }, + stateStore: stateStore, blockStore: blockStore, - bcReactor: bcReactor, - mempoolReactor: mpReactor, - mempool: mp, - consensusReactor: csReactor, stateSyncReactor: stateSyncReactor, stateSync: stateSync, - pexReactor: pexReactor, - evidenceReactor: evReactor, - indexerService: indexerService, - eventBus: eventBus, - eventSinks: eventSinks, shutdownOps: makeCloser(closers), @@ -494,40 +492,25 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { } n.isListening = true - if err := n.bcReactor.Start(ctx); err != nil { - return err - } + for _, reactor := range n.services { + if err := reactor.Start(ctx); err != nil { + if errors.Is(err, service.ErrAlreadyStarted) { + continue + } - if err := n.consensusReactor.Start(ctx); err != nil { - return err + return fmt.Errorf("problem starting service '%T': %w ", reactor, err) + } } if err := n.stateSyncReactor.Start(ctx); err != nil { return err } - if err := n.mempoolReactor.Start(ctx); err != nil { - return err - } - - if err := n.evidenceReactor.Start(ctx); err != nil { - return err - } - - if n.config.P2P.PexReactor { - if err := n.pexReactor.Start(ctx); err != nil { - return err - } - } - // Run state sync // TODO: We shouldn't run state sync if we already have state that has a // LastBlockHeight that is not InitialHeight if n.stateSync { - bcR, ok := n.bcReactor.(consensus.BlockSyncReactor) - if !ok { - return fmt.Errorf("this blockchain reactor does not support switching from state sync") - } + bcR := n.rpcEnv.BlockSyncReactor // we need to get the genesis state to get parameters such as state, err := sm.MakeGenesisState(n.genesisDoc) @@ -540,7 +523,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { // At the beginning of the statesync start, we use the initialHeight as the event height // because of the statesync doesn't have the concreate state height before fetched the snapshot. d := types.EventDataStateSyncStatus{Complete: false, Height: state.InitialHeight} - if err := n.eventBus.PublishEventStateSyncStatus(ctx, d); err != nil { + if err := n.stateSyncReactor.PublishStatus(ctx, d); err != nil { n.logger.Error("failed to emit the statesync start event", "err", err) } @@ -559,9 +542,9 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { return err } - n.consensusReactor.SetStateSyncingMetrics(0) + n.rpcEnv.ConsensusReactor.SetStateSyncingMetrics(0) - if err := n.eventBus.PublishEventStateSyncStatus(ctx, + if err := n.stateSyncReactor.PublishStatus(ctx, types.EventDataStateSyncStatus{ Complete: true, Height: ssState.LastBlockHeight, @@ -574,13 +557,13 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { // advancing reactors to be able to control which one of the three // is running // FIXME Very ugly to have these metrics bleed through here. - n.consensusReactor.SetBlockSyncingMetrics(1) + n.rpcEnv.ConsensusReactor.SetBlockSyncingMetrics(1) if err := bcR.SwitchToBlockSync(ctx, ssState); err != nil { n.logger.Error("failed to switch to block sync", "err", err) return err } - if err := n.eventBus.PublishEventBlockSyncStatus(ctx, + if err := bcR.PublishStatus(ctx, types.EventDataBlockSyncStatus{ Complete: false, Height: ssState.LastBlockHeight, @@ -597,25 +580,17 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { func (n *nodeImpl) OnStop() { n.logger.Info("Stopping Node") - if n.eventBus != nil { - n.eventBus.Wait() - } - if n.indexerService != nil { - n.indexerService.Wait() - } - for _, es := range n.eventSinks { if err := es.Stop(); err != nil { n.logger.Error("failed to stop event sink", "err", err) } } - n.bcReactor.Wait() - n.consensusReactor.Wait() + for _, reactor := range n.services { + reactor.Wait() + } + n.stateSyncReactor.Wait() - n.mempoolReactor.Wait() - n.evidenceReactor.Wait() - n.pexReactor.Wait() n.router.Wait() n.isListening = false @@ -693,7 +668,7 @@ func (n *nodeImpl) startRPC(ctx context.Context) ([]net.Listener, error) { wmLogger := rpcLogger.With("protocol", "websocket") wm := rpcserver.NewWebsocketManager(routes, rpcserver.OnDisconnect(func(remoteAddr string) { - err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr) + err := n.rpcEnv.EventBus.UnsubscribeAll(context.Background(), remoteAddr) if err != nil && err != tmpubsub.ErrSubscriptionNotFound { wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) } @@ -788,19 +763,9 @@ func (n *nodeImpl) startPrometheusServer(ctx context.Context, addr string) *http return srv } -// ConsensusReactor returns the Node's ConsensusReactor. -func (n *nodeImpl) ConsensusReactor() *consensus.Reactor { - return n.consensusReactor -} - -// Mempool returns the Node's mempool. -func (n *nodeImpl) Mempool() mempool.Mempool { - return n.mempool -} - // EventBus returns the Node's EventBus. func (n *nodeImpl) EventBus() *eventbus.EventBus { - return n.eventBus + return n.rpcEnv.EventBus } // PrivValidator returns the Node's PrivValidator. diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 03dbca7c7..cc62ffebe 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -20,6 +20,7 @@ import ( "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/encoding" "github.com/tendermint/tendermint/internal/mempool" + rpccore "github.com/tendermint/tendermint/internal/rpc/core" tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" tmmath "github.com/tendermint/tendermint/libs/math" @@ -576,10 +577,10 @@ func TestClientMethodCalls(t *testing.T) { func getMempool(t *testing.T, srv service.Service) mempool.Mempool { t.Helper() n, ok := srv.(interface { - Mempool() mempool.Mempool + RPCEnvironment() *rpccore.Environment }) require.True(t, ok) - return n.Mempool() + return n.RPCEnvironment().Mempool } // these cases are roughly the same as the TestClientMethodCalls, but