|
|
@ -100,27 +100,51 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { |
|
|
|
cs.replayMode = true |
|
|
|
defer func() { cs.replayMode = false }() |
|
|
|
|
|
|
|
// Ensure that height+1 doesn't exist
|
|
|
|
gr, found, err := cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight+1)) |
|
|
|
if found { |
|
|
|
return errors.New(Fmt("WAL should not contain height %d.", csHeight+1)) |
|
|
|
} |
|
|
|
// Ensure that ENDHEIGHT for this height doesn't exist
|
|
|
|
// NOTE: This is just a sanity check. As far as we know things work fine without it,
|
|
|
|
// and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT).
|
|
|
|
gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight)) |
|
|
|
if gr != nil { |
|
|
|
gr.Close() |
|
|
|
} |
|
|
|
if found { |
|
|
|
return errors.New(Fmt("WAL should not contain #ENDHEIGHT %d.", csHeight)) |
|
|
|
} |
|
|
|
|
|
|
|
// Search for height marker
|
|
|
|
gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight)) |
|
|
|
// Search for last height marker
|
|
|
|
gr, found, err = cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight-1)) |
|
|
|
if err == io.EOF { |
|
|
|
log.Warn("Replay: wal.group.Search returned EOF", "height", csHeight) |
|
|
|
return nil |
|
|
|
log.Warn("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1) |
|
|
|
// if we upgraded from 0.9 to 0.9.1, we may have #HEIGHT instead
|
|
|
|
// TODO (0.10.0): remove this
|
|
|
|
gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight)) |
|
|
|
if err == io.EOF { |
|
|
|
log.Warn("Replay: wal.group.Search returned EOF", "#HEIGHT", csHeight) |
|
|
|
return nil |
|
|
|
} else if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} else if err != nil { |
|
|
|
return err |
|
|
|
} else { |
|
|
|
defer gr.Close() |
|
|
|
} |
|
|
|
if !found { |
|
|
|
return errors.New(Fmt("WAL does not contain height %d.", csHeight)) |
|
|
|
// if we upgraded from 0.9 to 0.9.1, we may have #HEIGHT instead
|
|
|
|
// TODO (0.10.0): remove this
|
|
|
|
gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight)) |
|
|
|
if err == io.EOF { |
|
|
|
log.Warn("Replay: wal.group.Search returned EOF", "#HEIGHT", csHeight) |
|
|
|
return nil |
|
|
|
} else if err != nil { |
|
|
|
return err |
|
|
|
} else { |
|
|
|
defer gr.Close() |
|
|
|
} |
|
|
|
|
|
|
|
// TODO (0.10.0): uncomment
|
|
|
|
// return errors.New(Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1))
|
|
|
|
} |
|
|
|
defer gr.Close() |
|
|
|
|
|
|
|
log.Notice("Catchup by replaying consensus messages", "height", csHeight) |
|
|
|
|
|
|
@ -147,7 +171,7 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { |
|
|
|
//--------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
// Parses marker lines of the form:
|
|
|
|
// #HEIGHT: 12345
|
|
|
|
// #ENDHEIGHT: 12345
|
|
|
|
func makeHeightSearchFunc(height int) auto.SearchFunc { |
|
|
|
return func(line string) (int, error) { |
|
|
|
line = strings.TrimRight(line, "\n") |
|
|
@ -273,15 +297,18 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p |
|
|
|
|
|
|
|
} else if appBlockHeight == stateBlockHeight { |
|
|
|
// We haven't run Commit (both the state and app are one block behind),
|
|
|
|
// so run through consensus with the real app
|
|
|
|
// so replayBlock with the real app.
|
|
|
|
// NOTE: We could instead use the cs.WAL on cs.Start,
|
|
|
|
// but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT
|
|
|
|
log.Info("Replay last block using real app") |
|
|
|
return h.replayLastBlock(proxyApp.Consensus()) |
|
|
|
return h.replayBlock(storeBlockHeight, proxyApp.Consensus()) |
|
|
|
|
|
|
|
} else if appBlockHeight == storeBlockHeight { |
|
|
|
// We ran Commit, but didn't save the state, so run through consensus with mock app
|
|
|
|
mockApp := newMockProxyApp(appHash) |
|
|
|
// We ran Commit, but didn't save the state, so replayBlock with mock app
|
|
|
|
abciResponses := h.state.LoadABCIResponses() |
|
|
|
mockApp := newMockProxyApp(appHash, abciResponses) |
|
|
|
log.Info("Replay last block using mock app") |
|
|
|
return h.replayLastBlock(mockApp) |
|
|
|
return h.replayBlock(storeBlockHeight, mockApp) |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
@ -290,24 +317,23 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p |
|
|
|
return nil, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) { |
|
|
|
func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, mutateState bool) ([]byte, error) { |
|
|
|
// App is further behind than it should be, so we need to replay blocks.
|
|
|
|
// We replay all blocks from appBlockHeight+1.
|
|
|
|
// If useReplayFunc == true, stop short of the last block
|
|
|
|
// so it can be replayed using the WAL in ReplayBlocks.
|
|
|
|
// Note that we don't have an old version of the state,
|
|
|
|
// so we by-pass state validation using sm.ApplyBlock.
|
|
|
|
// so we by-pass state validation/mutation using sm.ExecCommitBlock.
|
|
|
|
// If mutateState == true, the final block is replayed with h.replayBlock()
|
|
|
|
|
|
|
|
var appHash []byte |
|
|
|
var err error |
|
|
|
finalBlock := storeBlockHeight |
|
|
|
if useReplayFunc { |
|
|
|
if mutateState { |
|
|
|
finalBlock -= 1 |
|
|
|
} |
|
|
|
for i := appBlockHeight + 1; i <= finalBlock; i++ { |
|
|
|
log.Info("Applying block", "height", i) |
|
|
|
block := h.store.LoadBlock(i) |
|
|
|
appHash, err = sm.ApplyBlock(proxyApp.Consensus(), block) |
|
|
|
appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
@ -315,44 +341,29 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store |
|
|
|
h.nBlocks += 1 |
|
|
|
} |
|
|
|
|
|
|
|
if useReplayFunc { |
|
|
|
if mutateState { |
|
|
|
// sync the final block
|
|
|
|
return h.ReplayBlocks(appHash, finalBlock, proxyApp) |
|
|
|
return h.replayBlock(storeBlockHeight, proxyApp.Consensus()) |
|
|
|
} |
|
|
|
|
|
|
|
return appHash, h.checkAppHash(appHash) |
|
|
|
} |
|
|
|
|
|
|
|
// Replay the last block through the consensus and return the AppHash from after Commit.
|
|
|
|
func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, error) { |
|
|
|
// ApplyBlock on the proxyApp with the last block.
|
|
|
|
func (h *Handshaker) replayBlock(height int, proxyApp proxy.AppConnConsensus) ([]byte, error) { |
|
|
|
mempool := types.MockMempool{} |
|
|
|
cs := NewConsensusState(h.config, h.state, proxyApp, h.store, mempool) |
|
|
|
|
|
|
|
evsw := types.NewEventSwitch() |
|
|
|
evsw.Start() |
|
|
|
defer evsw.Stop() |
|
|
|
cs.SetEventSwitch(evsw) |
|
|
|
newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1) |
|
|
|
var eventCache types.Fireable // nil
|
|
|
|
block := h.store.LoadBlock(height) |
|
|
|
meta := h.store.LoadBlockMeta(height) |
|
|
|
|
|
|
|
// run through the WAL, commit new block, stop
|
|
|
|
if _, err := cs.Start(); err != nil { |
|
|
|
if err := h.state.ApplyBlock(eventCache, proxyApp, block, meta.BlockID.PartsHeader, mempool); err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
defer cs.Stop() |
|
|
|
|
|
|
|
timeout := h.config.GetInt("timeout_handshake") |
|
|
|
timer := time.NewTimer(time.Duration(timeout) * time.Millisecond) |
|
|
|
log.Notice("Attempting to replay last block", "height", h.store.Height(), "timeout", timeout) |
|
|
|
|
|
|
|
select { |
|
|
|
case <-newBlockCh: |
|
|
|
case <-timer.C: |
|
|
|
return nil, ErrReplayLastBlockTimeout |
|
|
|
} |
|
|
|
|
|
|
|
h.nBlocks += 1 |
|
|
|
|
|
|
|
return cs.state.AppHash, nil |
|
|
|
return h.state.AppHash, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (h *Handshaker) checkAppHash(appHash []byte) error { |
|
|
@ -364,9 +375,14 @@ func (h *Handshaker) checkAppHash(appHash []byte) error { |
|
|
|
} |
|
|
|
|
|
|
|
//--------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
func newMockProxyApp(appHash []byte) proxy.AppConnConsensus { |
|
|
|
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{appHash: appHash}) |
|
|
|
// mockProxyApp uses ABCIResponses to give the right results
|
|
|
|
// Useful because we don't want to call Commit() twice for the same block on the real app.
|
|
|
|
|
|
|
|
func newMockProxyApp(appHash []byte, abciResponses *sm.ABCIResponses) proxy.AppConnConsensus { |
|
|
|
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{ |
|
|
|
appHash: appHash, |
|
|
|
abciResponses: abciResponses, |
|
|
|
}) |
|
|
|
cli, _ := clientCreator.NewABCIClient() |
|
|
|
return proxy.NewAppConnConsensus(cli) |
|
|
|
} |
|
|
@ -374,7 +390,24 @@ func newMockProxyApp(appHash []byte) proxy.AppConnConsensus { |
|
|
|
type mockProxyApp struct { |
|
|
|
abci.BaseApplication |
|
|
|
|
|
|
|
appHash []byte |
|
|
|
appHash []byte |
|
|
|
txCount int |
|
|
|
abciResponses *sm.ABCIResponses |
|
|
|
} |
|
|
|
|
|
|
|
func (mock *mockProxyApp) DeliverTx(tx []byte) abci.Result { |
|
|
|
r := mock.abciResponses.DeliverTx[mock.txCount] |
|
|
|
mock.txCount += 1 |
|
|
|
return abci.Result{ |
|
|
|
r.Code, |
|
|
|
r.Data, |
|
|
|
r.Log, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (mock *mockProxyApp) EndBlock(height uint64) abci.ResponseEndBlock { |
|
|
|
mock.txCount = 0 |
|
|
|
return mock.abciResponses.EndBlock |
|
|
|
} |
|
|
|
|
|
|
|
func (mock *mockProxyApp) Commit() abci.Result { |
|
|
|