diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index 5d097df21..f117eb381 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -220,7 +220,6 @@ func NewHandshaker( eventBus *eventbus.EventBus, genDoc *types.GenesisDoc, ) *Handshaker { - return &Handshaker{ stateStore: stateStore, initialState: state, @@ -452,7 +451,8 @@ func (h *Handshaker) replayBlocks( appClient abciclient.Client, appBlockHeight, storeBlockHeight int64, - mutateState bool) ([]byte, error) { + mutateState bool, +) ([]byte, error) { // App is further behind than it should be, so we need to replay blocks. // We replay all blocks from appBlockHeight+1. // diff --git a/internal/consensus/state.go b/internal/consensus/state.go index bd79f4f83..e4a6521a2 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -223,10 +223,6 @@ func NewState( cs.doPrevote = cs.defaultDoPrevote cs.setProposal = cs.defaultSetProposal - if err := cs.updateStateFromStore(ctx); err != nil { - return nil, err - } - // NOTE: we do not call scheduleRound0 yet, we do that upon Start() cs.BaseService = *service.NewBaseService(logger, "State", cs) for _, option := range options { diff --git a/internal/rpc/core/status.go b/internal/rpc/core/status.go index 46b8a6fcd..3b1499835 100644 --- a/internal/rpc/core/status.go +++ b/internal/rpc/core/status.go @@ -77,14 +77,17 @@ func (env *Environment) Status(ctx context.Context) (*coretypes.ResultStatus, er EarliestAppHash: earliestAppHash, EarliestBlockHeight: earliestBlockHeight, EarliestBlockTime: time.Unix(0, earliestBlockTimeNano), - MaxPeerBlockHeight: env.BlockSyncReactor.GetMaxPeerBlockHeight(), CatchingUp: env.ConsensusReactor.WaitSync(), - TotalSyncedTime: env.BlockSyncReactor.GetTotalSyncedTime(), - RemainingTime: env.BlockSyncReactor.GetRemainingSyncTime(), }, ValidatorInfo: validatorInfo, } + if env.BlockSyncReactor != nil && env.BlockSyncReactor.IsRunning() { + result.SyncInfo.MaxPeerBlockHeight = env.BlockSyncReactor.GetMaxPeerBlockHeight() + result.SyncInfo.TotalSyncedTime = env.BlockSyncReactor.GetTotalSyncedTime() + result.SyncInfo.RemainingTime = env.BlockSyncReactor.GetRemainingSyncTime() + } + if env.StateSyncMetricer != nil { result.SyncInfo.TotalSnapshots = env.StateSyncMetricer.TotalSnapshots() result.SyncInfo.ChunkProcessAvgTime = env.StateSyncMetricer.ChunkProcessAvgTime() diff --git a/internal/state/state.go b/internal/state/state.go index a31d8baad..cdc1f9dde 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -218,7 +218,7 @@ func FromProto(pb *tmstate.State) (*State, error) { bi, err := types.BlockIDFromProto(&pb.LastBlockID) if err != nil { - return nil, err + return nil, fmt.Errorf("block id: %w", err) } state.LastBlockID = *bi state.LastBlockHeight = pb.LastBlockHeight @@ -226,20 +226,20 @@ func FromProto(pb *tmstate.State) (*State, error) { vals, err := types.ValidatorSetFromProto(pb.Validators) if err != nil { - return nil, err + return nil, fmt.Errorf("validators: %w", err) } state.Validators = vals nVals, err := types.ValidatorSetFromProto(pb.NextValidators) if err != nil { - return nil, err + return nil, fmt.Errorf("next validators: %w", err) } state.NextValidators = nVals if state.LastBlockHeight >= 1 { // At Block 1 LastValidators is nil lVals, err := types.ValidatorSetFromProto(pb.LastValidators) if err != nil { - return nil, err + return nil, fmt.Errorf("previous validators: %w", err) } state.LastValidators = lVals } else { diff --git a/node/node.go b/node/node.go index c2acfa7a8..bc386de6d 100644 --- a/node/node.go +++ b/node/node.go @@ -68,6 +68,7 @@ type nodeImpl struct { stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots services []service.Service + pexService service.Service rpcListeners []net.Listener // rpc servers shutdownOps closer rpcEnv *rpccore.Environment @@ -228,7 +229,7 @@ func makeNode( logger.With("module", "handshaker"), stateStore, state, blockStore, eventBus, genDoc, ).Handshake(ctx, proxyApp); err != nil { - return nil, combineCloseError(err, makeCloser(closers)) + return nil, combineCloseError(fmt.Errorf("handshake: %w", err), makeCloser(closers)) } // Reload the state. It will have the Version.Consensus.App set by the @@ -236,9 +237,7 @@ func makeNode( // what happened during block replay). state, err = stateStore.Load() if err != nil { - return nil, combineCloseError( - fmt.Errorf("cannot load state: %w", err), - makeCloser(closers)) + return nil, combineCloseError(fmt.Errorf("reload state: %w", err), makeCloser(closers)) } } @@ -248,7 +247,7 @@ func makeNode( // TODO: Use a persistent peer database. nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state) if err != nil { - return nil, combineCloseError(err, makeCloser(closers)) + return nil, combineCloseError(fmt.Errorf("nodeinfo construction: %w", err), makeCloser(closers)) } peerManager, peerCloser, err := createPeerManager(cfg, dbProvider, nodeKey.ID) @@ -271,7 +270,7 @@ func makeNode( cfg, proxyApp, stateStore, nodeMetrics.mempool, peerManager, router, logger, ) if err != nil { - return nil, combineCloseError(err, makeCloser(closers)) + return nil, combineCloseError(fmt.Errorf("mempool construction: %w", err), makeCloser(closers)) } evReactor, evPool, edbCloser, err := createEvidenceReactor(ctx, @@ -279,7 +278,7 @@ func makeNode( ) closers = append(closers, edbCloser) if err != nil { - return nil, combineCloseError(err, makeCloser(closers)) + return nil, combineCloseError(fmt.Errorf("evidence construction: %w", err), makeCloser(closers)) } // make block executor for consensus and blockchain reactors to execute blocks @@ -304,7 +303,7 @@ func makeNode( peerManager, router, logger, ) if err != nil { - return nil, combineCloseError(err, makeCloser(closers)) + return nil, combineCloseError(fmt.Errorf("consensus construction: %w", err), makeCloser(closers)) } // Create the blockchain reactor. Note, we do not start block sync if we're @@ -355,14 +354,14 @@ func makeNode( eventBus, ) if err != nil { - return nil, combineCloseError(err, makeCloser(closers)) + return nil, combineCloseError(fmt.Errorf("statesync construction: %w", err), makeCloser(closers)) } var pexReactor service.Service = service.NopService{} if cfg.P2P.PexReactor { pexReactor, err = pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx)) if err != nil { - return nil, combineCloseError(err, makeCloser(closers)) + return nil, combineCloseError(fmt.Errorf("pex construction: %w", err), makeCloser(closers)) } } node := &nodeImpl{ @@ -379,13 +378,19 @@ func makeNode( eventSinks: eventSinks, services: []service.Service{ + // the order of these services control + // initialization order. reorder with care. eventBus, - evReactor, - mpReactor, - csReactor, - bcReactor, pexReactor, + bcReactor, + csState, + csReactor, + mpReactor, + evReactor, }, + // seperate so we can start + // before statesync + pexService: pexReactor, stateStore: stateStore, blockStore: blockStore, @@ -466,12 +471,8 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { } } - state, err := n.stateStore.Load() - if err != nil { - return err - } - if err := n.evPool.Start(state); err != nil { - return err + if n.config.Instrumentation.Prometheus && n.config.Instrumentation.PrometheusListenAddr != "" { + n.prometheusSrv = n.startPrometheusServer(ctx, n.config.Instrumentation.PrometheusListenAddr) } n.rpcEnv.NodeInfo = n.nodeInfo @@ -485,25 +486,21 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { } } - if n.config.Instrumentation.Prometheus && n.config.Instrumentation.PrometheusListenAddr != "" { - n.prometheusSrv = n.startPrometheusServer(ctx, n.config.Instrumentation.PrometheusListenAddr) - } - // Start the transport. if err := n.router.Start(ctx); err != nil { return err } n.rpcEnv.IsListening = true - for _, reactor := range n.services { - if err := reactor.Start(ctx); err != nil { - return fmt.Errorf("problem starting service '%T': %w ", reactor, err) - } - } - if err := n.stateSyncReactor.Start(ctx); err != nil { return err } + if err := n.rpcEnv.EventBus.Start(ctx); err != nil { + return err + } + if err := n.pexService.Start(ctx); err != nil { + return err + } // Run state sync // TODO: We shouldn't run state sync if we already have state that has a @@ -550,6 +547,11 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { return err } + if err := bcR.Start(ctx); err != nil { + n.logger.Error("failed to start block reactor", "err", err) + return err + } + // TODO: Some form of orchestrator is needed here between the state // advancing reactors to be able to control which one of the three // is running @@ -570,6 +572,23 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { } } + for _, reactor := range n.services { + if reactor.IsRunning() { + continue + } + if err := reactor.Start(ctx); err != nil { + return fmt.Errorf("problem starting service '%T': %w ", reactor, err) + } + } + + state, err := n.stateStore.Load() + if err != nil { + return fmt.Errorf("loading state during service start: %w", err) + } + if err := n.evPool.Start(state); err != nil { + return err + } + return nil }