Browse Source

Comment tweaks

pull/1576/head
Jae Kwon 7 years ago
parent
commit
e1a3f16fa4
2 changed files with 49 additions and 36 deletions
  1. +34
    -26
      consensus/replay.go
  2. +15
    -10
      consensus/state.go

+ 34
- 26
consensus/replay.go View File

@ -26,20 +26,24 @@ import (
var crc32c = crc32.MakeTable(crc32.Castagnoli) var crc32c = crc32.MakeTable(crc32.Castagnoli)
// Functionality to replay blocks and messages on recovery from a crash. // Functionality to replay blocks and messages on recovery from a crash.
// There are two general failure scenarios: failure during consensus, and failure while applying the block.
// The former is handled by the WAL, the latter by the proxyApp Handshake on restart,
// which ultimately hands off the work to the WAL.
// There are two general failure scenarios:
//
// 1. failure during consensus
// 2. failure while applying the block
//
// The former is handled by the WAL, the latter by the proxyApp Handshake on
// restart, which ultimately hands off the work to the WAL.
//----------------------------------------- //-----------------------------------------
// recover from failure during consensus
// by replaying messages from the WAL
// 1. Recover from failure during consensus
// (by replaying messages from the WAL)
//-----------------------------------------
// Unmarshal and apply a single message to the consensus state
// as if it were received in receiveRoutine
// Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running
// Unmarshal and apply a single message to the consensus state as if it were
// received in receiveRoutine. Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running.
func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan interface{}) error { func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan interface{}) error {
// skip meta messages
// Skip meta messages which exist for demarcating boundaries.
if _, ok := msg.Msg.(EndHeightMessage); ok { if _, ok := msg.Msg.(EndHeightMessage); ok {
return nil return nil
} }
@ -89,17 +93,18 @@ func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan
return nil return nil
} }
// replay only those messages since the last block.
// timeoutRoutine should run concurrently to read off tickChan
// Replay only those messages since the last block. `timeoutRoutine` should
// run concurrently to read off tickChan.
func (cs *ConsensusState) catchupReplay(csHeight int64) error { func (cs *ConsensusState) catchupReplay(csHeight int64) error {
// set replayMode
// Set replayMode to true so we don't log signing errors.
cs.replayMode = true cs.replayMode = true
defer func() { cs.replayMode = false }() defer func() { cs.replayMode = false }()
// Ensure that ENDHEIGHT for this height doesn't exist.
// Ensure that #ENDHEIGHT for this height doesn't exist.
// NOTE: This is just a sanity check. As far as we know things work fine // NOTE: This is just a sanity check. As far as we know things work fine
// without it, and Handshake could reuse ConsensusState if it weren't for // without it, and Handshake could reuse ConsensusState if it weren't for
// this check (since we can crash after writing ENDHEIGHT).
// this check (since we can crash after writing #ENDHEIGHT).
// //
// Ignore data corruption errors since this is a sanity check. // Ignore data corruption errors since this is a sanity check.
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true}) gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
@ -115,7 +120,7 @@ func (cs *ConsensusState) catchupReplay(csHeight int64) error {
return fmt.Errorf("WAL should not contain #ENDHEIGHT %d", csHeight) return fmt.Errorf("WAL should not contain #ENDHEIGHT %d", csHeight)
} }
// Search for last height marker
// Search for last height marker.
// //
// Ignore data corruption errors in previous heights because we only care about last height // Ignore data corruption errors in previous heights because we only care about last height
gr, found, err = cs.wal.SearchForEndHeight(csHeight-1, &WALSearchOptions{IgnoreDataCorruptionErrors: true}) gr, found, err = cs.wal.SearchForEndHeight(csHeight-1, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
@ -182,10 +187,11 @@ func makeHeightSearchFunc(height int64) auto.SearchFunc {
} }
}*/ }*/
//----------------------------------------------
// Recover from failure during block processing
// by handshaking with the app to figure out where
// we were last and using the WAL to recover there
//---------------------------------------------------
// 2. Recover from failure while applying the block.
// (by handshaking with the app to figure out where
// we were last, and using the WAL to recover there.)
//---------------------------------------------------
type Handshaker struct { type Handshaker struct {
stateDB dbm.DB stateDB dbm.DB
@ -220,7 +226,8 @@ func (h *Handshaker) NBlocks() int {
// TODO: retry the handshake/replay if it fails ? // TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// handshake is done via info request on the query conn
// Handshake is done via ABCI Info on the query conn.
res, err := proxyApp.Query().InfoSync(abci.RequestInfo{version.Version}) res, err := proxyApp.Query().InfoSync(abci.RequestInfo{version.Version})
if err != nil { if err != nil {
return fmt.Errorf("Error calling Info: %v", err) return fmt.Errorf("Error calling Info: %v", err)
@ -234,15 +241,16 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
h.logger.Info("ABCI Handshake", "appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash)) h.logger.Info("ABCI Handshake", "appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash))
// TODO: check version
// TODO: check app version.
// replay blocks up to the latest in the blockstore
// Replay blocks up to the latest in the blockstore.
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
if err != nil { if err != nil {
return fmt.Errorf("Error on replay: %v", err) return fmt.Errorf("Error on replay: %v", err)
} }
h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced", "appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash))
h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced",
"appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash))
// TODO: (on restart) replay mempool // TODO: (on restart) replay mempool
@ -250,7 +258,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
} }
// Replay all blocks since appBlockHeight and ensure the result matches the current state. // Replay all blocks since appBlockHeight and ensure the result matches the current state.
// Returns the final AppHash or an error
// Returns the final AppHash or an error.
func (h *Handshaker) ReplayBlocks(state sm.State, appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns) ([]byte, error) { func (h *Handshaker) ReplayBlocks(state sm.State, appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns) ([]byte, error) {
storeBlockHeight := h.store.Height() storeBlockHeight := h.store.Height()
@ -314,7 +322,7 @@ func (h *Handshaker) ReplayBlocks(state sm.State, appHash []byte, appBlockHeight
// We haven't run Commit (both the state and app are one block behind), // We haven't run Commit (both the state and app are one block behind),
// so replayBlock with the real app. // so replayBlock with the real app.
// NOTE: We could instead use the cs.WAL on cs.Start, // NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT
h.logger.Info("Replay last block using real app") h.logger.Info("Replay last block using real app")
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus()) state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
return state.AppHash, err return state.AppHash, err


+ 15
- 10
consensus/state.go View File

@ -1210,23 +1210,28 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
fail.Fail() // XXX fail.Fail() // XXX
// Finish writing to the WAL for this height.
// NOTE: If we fail before writing this, we'll never write it,
// and just recover by running ApplyBlock in the Handshake.
// If we moved it before persisting the block, we'd have to allow
// WAL replay for blocks with an #ENDHEIGHT
// As is, ConsensusState should not be started again
// until we successfully call ApplyBlock (ie. here or in Handshake after restart)
// Write EndHeightMessage{} for this height, implying that the blockstore
// has saved the block.
//
// If we crash before writing this EndHeightMessage{}, we will recover by
// running ApplyBlock during the ABCI handshake when we restart. If we
// didn't save the block to the blockstore before writing
// EndHeightMessage{}, we'd have to change WAL replay -- currently it
// complains about replaying for heights where an #ENDHEIGHT entry already
// exists.
//
// Either way, the ConsensusState should not be resumed until we
// successfully call ApplyBlock (ie. later here, or in Handshake after
// restart).
cs.wal.Save(EndHeightMessage{height}) cs.wal.Save(EndHeightMessage{height})
fail.Fail() // XXX fail.Fail() // XXX
// Create a copy of the state for staging
// and an event cache for txs
// Create a copy of the state for staging and an event cache for txs.
stateCopy := cs.state.Copy() stateCopy := cs.state.Copy()
// Execute and commit the block, update and save the state, and update the mempool. // Execute and commit the block, update and save the state, and update the mempool.
// NOTE: the block.AppHash wont reflect these txs until the next block
// NOTE The block.AppHash wont reflect these txs until the next block.
var err error var err error
stateCopy, err = cs.blockExec.ApplyBlock(stateCopy, types.BlockID{block.Hash(), blockParts.Header()}, block) stateCopy, err = cs.blockExec.ApplyBlock(stateCopy, types.BlockID{block.Hash(), blockParts.Header()}, block)
if err != nil { if err != nil {


Loading…
Cancel
Save