From e334555393562e1e356cfe118d7119591ca529f7 Mon Sep 17 00:00:00 2001 From: JayT106 Date: Mon, 10 May 2021 15:45:26 -0400 Subject: [PATCH] node/state: graceful shutdown in the consensus state (#6370) --- CHANGELOG_PENDING.md | 1 + consensus/reactor.go | 5 +++++ consensus/replay.go | 20 +++++++++++++++++--- consensus/state.go | 16 ++++++++++++++++ node/node.go | 1 - state/execution.go | 21 ++++++++++++++++++++- state/execution_test.go | 2 +- 7 files changed, 60 insertions(+), 6 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index c35b4339e..aa4c19129 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -92,6 +92,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - [privval] \#6240 Add `context.Context` to privval interface. - [rpc] \#6265 set cache control in http-rpc response header (@JayT106) - [statesync] \#6378 Retry requests for snapshots and add a minimum discovery time (5s) for new snapshots. +- [node/state] \#6370 graceful shutdown in the consensus reactor (@JayT106) ### BUG FIXES diff --git a/consensus/reactor.go b/consensus/reactor.go index cf3f3e42d..de02a682a 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -196,6 +196,7 @@ func (r *Reactor) OnStart() error { // blocking until they all exit, as well as unsubscribing from events and stopping // state. func (r *Reactor) OnStop() { + r.unsubscribeFromBroadcastEvents() if err := r.state.Stop(); err != nil { @@ -368,6 +369,10 @@ func (r *Reactor) subscribeToBroadcastEvents() { types.EventNewRoundStep, func(data tmevents.EventData) { r.broadcastNewRoundStepMessage(data.(*cstypes.RoundState)) + select { + case r.state.onStopCh <- data.(*cstypes.RoundState): + default: + } }, ) if err != nil { diff --git a/consensus/replay.go b/consensus/replay.go index 06f66ba5d..03dc90e5a 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -469,9 +469,23 @@ func (h *Handshaker) replayBlocks( assertAppHashEqualsOneFromBlock(appHash, block) } - appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight) - if err != nil { - return nil, err + if i == finalBlock && !mutateState { + // We emit events for the index services at the final block due to the sync issue when + // the node shutdown during the block committing status. + blockExec := sm.NewBlockExecutor( + h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}) + blockExec.SetEventBus(h.eventBus) + appHash, err = sm.ExecCommitBlock( + blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) + if err != nil { + return nil, err + } + } else { + appHash, err = sm.ExecCommitBlock( + nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) + if err != nil { + return nil, err + } } h.nBlocks++ diff --git a/consensus/state.go b/consensus/state.go index 146344437..a4ab2122b 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -140,6 +140,9 @@ type State struct { // for reporting metrics metrics *Metrics + + // wait the channel event happening for shutting down the state gracefully + onStopCh chan *cstypes.RoundState } // StateOption sets an optional parameter on the State. @@ -170,6 +173,7 @@ func NewState( evpool: evpool, evsw: tmevents.NewEventSwitch(), metrics: NopMetrics(), + onStopCh: make(chan *cstypes.RoundState), } // set function defaults (may be overwritten before calling Start) @@ -411,6 +415,18 @@ func (cs *State) loadWalFile() error { // OnStop implements service.Service. func (cs *State) OnStop() { + + // If the node is committing a new block, wait until it is finished! + if cs.GetRoundState().Step == cstypes.RoundStepCommit { + select { + case <-cs.onStopCh: + case <-time.After(cs.config.TimeoutCommit): + cs.Logger.Error("OnStop: timeout waiting for commit to finish", "time", cs.config.TimeoutCommit) + } + } + + close(cs.onStopCh) + if err := cs.evsw.Stop(); err != nil { cs.Logger.Error("failed trying to stop eventSwitch", "error", err) } diff --git a/node/node.go b/node/node.go index f0f154a32..fe135a079 100644 --- a/node/node.go +++ b/node/node.go @@ -1435,7 +1435,6 @@ func (n *Node) OnStart() error { // OnStop stops the Node. It implements service.Service. func (n *Node) OnStop() { - n.BaseService.OnStop() n.Logger.Info("Stopping Node") diff --git a/state/execution.go b/state/execution.go index 5f0e68bbc..3408e873a 100644 --- a/state/execution.go +++ b/state/execution.go @@ -551,18 +551,37 @@ func fireEvents( // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state. // It returns the application root hash (result of abci.Commit). func ExecCommitBlock( + be *BlockExecutor, appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger, store Store, initialHeight int64, + s State, ) ([]byte, error) { - _, err := execBlockOnProxyApp(logger, appConnConsensus, block, store, initialHeight) + abciResponses, err := execBlockOnProxyApp(logger, appConnConsensus, block, store, initialHeight) if err != nil { logger.Error("failed executing block on proxy app", "height", block.Height, "err", err) return nil, err } + // the BlockExecutor condition is using for the final block replay process. + if be != nil { + abciValUpdates := abciResponses.EndBlock.ValidatorUpdates + err = validateValidatorUpdates(abciValUpdates, s.ConsensusParams.Validator) + if err != nil { + logger.Error("err", err) + return nil, err + } + validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates) + if err != nil { + logger.Error("err", err) + return nil, err + } + + fireEvents(be.logger, be.eventBus, block, abciResponses, validatorUpdates) + } + // Commit block, get hash back res, err := appConnConsensus.CommitSync(context.Background()) if err != nil { diff --git a/state/execution_test.go b/state/execution_test.go index 984b79822..5ccf03b9f 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -100,7 +100,7 @@ func TestBeginBlockValidators(t *testing.T) { // block for height 2 block, _ := state.MakeBlock(2, makeTxs(2), lastCommit, nil, state.Validators.GetProposer().Address) - _, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger(), stateStore, 1) + _, err = sm.ExecCommitBlock(nil, proxyApp.Consensus(), block, log.TestingLogger(), stateStore, 1, state) require.Nil(t, err, tc.desc) // -> app receives a list of validators with a bool indicating if they signed