Browse Source

node/state: graceful shutdown in the consensus state (#6370)

pull/6444/head
JayT106 4 years ago
committed by GitHub
parent
commit
e334555393
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 60 additions and 6 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +5
    -0
      consensus/reactor.go
  3. +17
    -3
      consensus/replay.go
  4. +16
    -0
      consensus/state.go
  5. +0
    -1
      node/node.go
  6. +20
    -1
      state/execution.go
  7. +1
    -1
      state/execution_test.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -92,6 +92,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- [privval] \#6240 Add `context.Context` to privval interface.
- [rpc] \#6265 set cache control in http-rpc response header (@JayT106)
- [statesync] \#6378 Retry requests for snapshots and add a minimum discovery time (5s) for new snapshots.
- [node/state] \#6370 graceful shutdown in the consensus reactor (@JayT106)
### BUG FIXES


+ 5
- 0
consensus/reactor.go View File

@ -196,6 +196,7 @@ func (r *Reactor) OnStart() error {
// blocking until they all exit, as well as unsubscribing from events and stopping
// state.
func (r *Reactor) OnStop() {
r.unsubscribeFromBroadcastEvents()
if err := r.state.Stop(); err != nil {
@ -368,6 +369,10 @@ func (r *Reactor) subscribeToBroadcastEvents() {
types.EventNewRoundStep,
func(data tmevents.EventData) {
r.broadcastNewRoundStepMessage(data.(*cstypes.RoundState))
select {
case r.state.onStopCh <- data.(*cstypes.RoundState):
default:
}
},
)
if err != nil {


+ 17
- 3
consensus/replay.go View File

@ -469,9 +469,23 @@ func (h *Handshaker) replayBlocks(
assertAppHashEqualsOneFromBlock(appHash, block)
}
appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight)
if err != nil {
return nil, err
if i == finalBlock && !mutateState {
// We emit events for the index services at the final block due to the sync issue when
// the node shutdown during the block committing status.
blockExec := sm.NewBlockExecutor(
h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{})
blockExec.SetEventBus(h.eventBus)
appHash, err = sm.ExecCommitBlock(
blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil {
return nil, err
}
} else {
appHash, err = sm.ExecCommitBlock(
nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil {
return nil, err
}
}
h.nBlocks++


+ 16
- 0
consensus/state.go View File

@ -140,6 +140,9 @@ type State struct {
// for reporting metrics
metrics *Metrics
// wait the channel event happening for shutting down the state gracefully
onStopCh chan *cstypes.RoundState
}
// StateOption sets an optional parameter on the State.
@ -170,6 +173,7 @@ func NewState(
evpool: evpool,
evsw: tmevents.NewEventSwitch(),
metrics: NopMetrics(),
onStopCh: make(chan *cstypes.RoundState),
}
// set function defaults (may be overwritten before calling Start)
@ -411,6 +415,18 @@ func (cs *State) loadWalFile() error {
// OnStop implements service.Service.
func (cs *State) OnStop() {
// If the node is committing a new block, wait until it is finished!
if cs.GetRoundState().Step == cstypes.RoundStepCommit {
select {
case <-cs.onStopCh:
case <-time.After(cs.config.TimeoutCommit):
cs.Logger.Error("OnStop: timeout waiting for commit to finish", "time", cs.config.TimeoutCommit)
}
}
close(cs.onStopCh)
if err := cs.evsw.Stop(); err != nil {
cs.Logger.Error("failed trying to stop eventSwitch", "error", err)
}


+ 0
- 1
node/node.go View File

@ -1435,7 +1435,6 @@ func (n *Node) OnStart() error {
// OnStop stops the Node. It implements service.Service.
func (n *Node) OnStop() {
n.BaseService.OnStop()
n.Logger.Info("Stopping Node")


+ 20
- 1
state/execution.go View File

@ -551,18 +551,37 @@ func fireEvents(
// ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
// It returns the application root hash (result of abci.Commit).
func ExecCommitBlock(
be *BlockExecutor,
appConnConsensus proxy.AppConnConsensus,
block *types.Block,
logger log.Logger,
store Store,
initialHeight int64,
s State,
) ([]byte, error) {
_, err := execBlockOnProxyApp(logger, appConnConsensus, block, store, initialHeight)
abciResponses, err := execBlockOnProxyApp(logger, appConnConsensus, block, store, initialHeight)
if err != nil {
logger.Error("failed executing block on proxy app", "height", block.Height, "err", err)
return nil, err
}
// the BlockExecutor condition is using for the final block replay process.
if be != nil {
abciValUpdates := abciResponses.EndBlock.ValidatorUpdates
err = validateValidatorUpdates(abciValUpdates, s.ConsensusParams.Validator)
if err != nil {
logger.Error("err", err)
return nil, err
}
validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates)
if err != nil {
logger.Error("err", err)
return nil, err
}
fireEvents(be.logger, be.eventBus, block, abciResponses, validatorUpdates)
}
// Commit block, get hash back
res, err := appConnConsensus.CommitSync(context.Background())
if err != nil {


+ 1
- 1
state/execution_test.go View File

@ -100,7 +100,7 @@ func TestBeginBlockValidators(t *testing.T) {
// block for height 2
block, _ := state.MakeBlock(2, makeTxs(2), lastCommit, nil, state.Validators.GetProposer().Address)
_, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger(), stateStore, 1)
_, err = sm.ExecCommitBlock(nil, proxyApp.Consensus(), block, log.TestingLogger(), stateStore, 1, state)
require.Nil(t, err, tc.desc)
// -> app receives a list of validators with a bool indicating if they signed


Loading…
Cancel
Save