Browse Source

node: alter initialization order to avoid panic during statesync

pull/8127/head
tycho garen 2 years ago
committed by M. J. Fromberger
parent
commit
8e65f78141
5 changed files with 61 additions and 43 deletions
  1. +2
    -2
      internal/consensus/replay.go
  2. +0
    -4
      internal/consensus/state.go
  3. +6
    -3
      internal/rpc/core/status.go
  4. +4
    -4
      internal/state/state.go
  5. +49
    -30
      node/node.go

+ 2
- 2
internal/consensus/replay.go View File

@ -220,7 +220,6 @@ func NewHandshaker(
eventBus *eventbus.EventBus,
genDoc *types.GenesisDoc,
) *Handshaker {
return &Handshaker{
stateStore: stateStore,
initialState: state,
@ -452,7 +451,8 @@ func (h *Handshaker) replayBlocks(
appClient abciclient.Client,
appBlockHeight,
storeBlockHeight int64,
mutateState bool) ([]byte, error) {
mutateState bool,
) ([]byte, error) {
// App is further behind than it should be, so we need to replay blocks.
// We replay all blocks from appBlockHeight+1.
//


+ 0
- 4
internal/consensus/state.go View File

@ -223,10 +223,6 @@ func NewState(
cs.doPrevote = cs.defaultDoPrevote
cs.setProposal = cs.defaultSetProposal
if err := cs.updateStateFromStore(ctx); err != nil {
return nil, err
}
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
cs.BaseService = *service.NewBaseService(logger, "State", cs)
for _, option := range options {


+ 6
- 3
internal/rpc/core/status.go View File

@ -77,14 +77,17 @@ func (env *Environment) Status(ctx context.Context) (*coretypes.ResultStatus, er
EarliestAppHash: earliestAppHash,
EarliestBlockHeight: earliestBlockHeight,
EarliestBlockTime: time.Unix(0, earliestBlockTimeNano),
MaxPeerBlockHeight: env.BlockSyncReactor.GetMaxPeerBlockHeight(),
CatchingUp: env.ConsensusReactor.WaitSync(),
TotalSyncedTime: env.BlockSyncReactor.GetTotalSyncedTime(),
RemainingTime: env.BlockSyncReactor.GetRemainingSyncTime(),
},
ValidatorInfo: validatorInfo,
}
if env.BlockSyncReactor != nil && env.BlockSyncReactor.IsRunning() {
result.SyncInfo.MaxPeerBlockHeight = env.BlockSyncReactor.GetMaxPeerBlockHeight()
result.SyncInfo.TotalSyncedTime = env.BlockSyncReactor.GetTotalSyncedTime()
result.SyncInfo.RemainingTime = env.BlockSyncReactor.GetRemainingSyncTime()
}
if env.StateSyncMetricer != nil {
result.SyncInfo.TotalSnapshots = env.StateSyncMetricer.TotalSnapshots()
result.SyncInfo.ChunkProcessAvgTime = env.StateSyncMetricer.ChunkProcessAvgTime()


+ 4
- 4
internal/state/state.go View File

@ -218,7 +218,7 @@ func FromProto(pb *tmstate.State) (*State, error) {
bi, err := types.BlockIDFromProto(&pb.LastBlockID)
if err != nil {
return nil, err
return nil, fmt.Errorf("block id: %w", err)
}
state.LastBlockID = *bi
state.LastBlockHeight = pb.LastBlockHeight
@ -226,20 +226,20 @@ func FromProto(pb *tmstate.State) (*State, error) {
vals, err := types.ValidatorSetFromProto(pb.Validators)
if err != nil {
return nil, err
return nil, fmt.Errorf("validators: %w", err)
}
state.Validators = vals
nVals, err := types.ValidatorSetFromProto(pb.NextValidators)
if err != nil {
return nil, err
return nil, fmt.Errorf("next validators: %w", err)
}
state.NextValidators = nVals
if state.LastBlockHeight >= 1 { // At Block 1 LastValidators is nil
lVals, err := types.ValidatorSetFromProto(pb.LastValidators)
if err != nil {
return nil, err
return nil, fmt.Errorf("previous validators: %w", err)
}
state.LastValidators = lVals
} else {


+ 49
- 30
node/node.go View File

@ -68,6 +68,7 @@ type nodeImpl struct {
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
services []service.Service
pexService service.Service
rpcListeners []net.Listener // rpc servers
shutdownOps closer
rpcEnv *rpccore.Environment
@ -228,7 +229,7 @@ func makeNode(
logger.With("module", "handshaker"),
stateStore, state, blockStore, eventBus, genDoc,
).Handshake(ctx, proxyApp); err != nil {
return nil, combineCloseError(err, makeCloser(closers))
return nil, combineCloseError(fmt.Errorf("handshake: %w", err), makeCloser(closers))
}
// Reload the state. It will have the Version.Consensus.App set by the
@ -236,9 +237,7 @@ func makeNode(
// what happened during block replay).
state, err = stateStore.Load()
if err != nil {
return nil, combineCloseError(
fmt.Errorf("cannot load state: %w", err),
makeCloser(closers))
return nil, combineCloseError(fmt.Errorf("reload state: %w", err), makeCloser(closers))
}
}
@ -248,7 +247,7 @@ func makeNode(
// TODO: Use a persistent peer database.
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
return nil, combineCloseError(fmt.Errorf("nodeinfo construction: %w", err), makeCloser(closers))
}
peerManager, peerCloser, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
@ -271,7 +270,7 @@ func makeNode(
cfg, proxyApp, stateStore, nodeMetrics.mempool, peerManager, router, logger,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
return nil, combineCloseError(fmt.Errorf("mempool construction: %w", err), makeCloser(closers))
}
evReactor, evPool, edbCloser, err := createEvidenceReactor(ctx,
@ -279,7 +278,7 @@ func makeNode(
)
closers = append(closers, edbCloser)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
return nil, combineCloseError(fmt.Errorf("evidence construction: %w", err), makeCloser(closers))
}
// make block executor for consensus and blockchain reactors to execute blocks
@ -304,7 +303,7 @@ func makeNode(
peerManager, router, logger,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
return nil, combineCloseError(fmt.Errorf("consensus construction: %w", err), makeCloser(closers))
}
// Create the blockchain reactor. Note, we do not start block sync if we're
@ -355,14 +354,14 @@ func makeNode(
eventBus,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
return nil, combineCloseError(fmt.Errorf("statesync construction: %w", err), makeCloser(closers))
}
var pexReactor service.Service = service.NopService{}
if cfg.P2P.PexReactor {
pexReactor, err = pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx))
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
return nil, combineCloseError(fmt.Errorf("pex construction: %w", err), makeCloser(closers))
}
}
node := &nodeImpl{
@ -379,13 +378,19 @@ func makeNode(
eventSinks: eventSinks,
services: []service.Service{
// the order of these services control
// initialization order. reorder with care.
eventBus,
evReactor,
mpReactor,
csReactor,
bcReactor,
pexReactor,
bcReactor,
csState,
csReactor,
mpReactor,
evReactor,
},
// seperate so we can start
// before statesync
pexService: pexReactor,
stateStore: stateStore,
blockStore: blockStore,
@ -466,12 +471,8 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
}
}
state, err := n.stateStore.Load()
if err != nil {
return err
}
if err := n.evPool.Start(state); err != nil {
return err
if n.config.Instrumentation.Prometheus && n.config.Instrumentation.PrometheusListenAddr != "" {
n.prometheusSrv = n.startPrometheusServer(ctx, n.config.Instrumentation.PrometheusListenAddr)
}
n.rpcEnv.NodeInfo = n.nodeInfo
@ -485,25 +486,21 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
}
}
if n.config.Instrumentation.Prometheus && n.config.Instrumentation.PrometheusListenAddr != "" {
n.prometheusSrv = n.startPrometheusServer(ctx, n.config.Instrumentation.PrometheusListenAddr)
}
// Start the transport.
if err := n.router.Start(ctx); err != nil {
return err
}
n.rpcEnv.IsListening = true
for _, reactor := range n.services {
if err := reactor.Start(ctx); err != nil {
return fmt.Errorf("problem starting service '%T': %w ", reactor, err)
}
}
if err := n.stateSyncReactor.Start(ctx); err != nil {
return err
}
if err := n.rpcEnv.EventBus.Start(ctx); err != nil {
return err
}
if err := n.pexService.Start(ctx); err != nil {
return err
}
// Run state sync
// TODO: We shouldn't run state sync if we already have state that has a
@ -550,6 +547,11 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
return err
}
if err := bcR.Start(ctx); err != nil {
n.logger.Error("failed to start block reactor", "err", err)
return err
}
// TODO: Some form of orchestrator is needed here between the state
// advancing reactors to be able to control which one of the three
// is running
@ -570,6 +572,23 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
}
}
for _, reactor := range n.services {
if reactor.IsRunning() {
continue
}
if err := reactor.Start(ctx); err != nil {
return fmt.Errorf("problem starting service '%T': %w ", reactor, err)
}
}
state, err := n.stateStore.Load()
if err != nil {
return fmt.Errorf("loading state during service start: %w", err)
}
if err := n.evPool.Start(state); err != nil {
return err
}
return nil
}


Loading…
Cancel
Save