Browse Source

consensus: more handshake replay tests

pull/408/head
Ethan Buchman 8 years ago
parent
commit
0c4b6cd071
2 changed files with 136 additions and 65 deletions
  1. +28
    -27
      consensus/replay.go
  2. +108
    -38
      consensus/replay_test.go

+ 28
- 27
consensus/replay.go View File

@ -174,25 +174,6 @@ func makeHeightSearchFunc(height int) auto.SearchFunc {
// by handshaking with the app to figure out where
// we were last and using the WAL to recover there
// Replay the last block through the consensus and return the AppHash from after Commit.
func replayLastBlock(config cfg.Config, state *sm.State, proxyApp proxy.AppConnConsensus, blockStore types.BlockStore) ([]byte, error) {
mempool := types.MockMempool{}
cs := NewConsensusState(config, state, proxyApp, blockStore, mempool)
evsw := types.NewEventSwitch()
evsw.Start()
defer evsw.Stop()
cs.SetEventSwitch(evsw)
newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1)
// run through the WAL, commit new block, stop
cs.Start()
<-newBlockCh // TODO: use a timeout and return err?
cs.Stop()
return cs.state.AppHash, nil
}
type Handshaker struct {
config cfg.Config
state *sm.State
@ -286,13 +267,13 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p
// We haven't run Commit (both the state and app are one block behind),
// so run through consensus with the real app
log.Info("Replay last block using real app")
return replayLastBlock(h.config, h.state, proxyApp.Consensus(), h.store)
return h.replayLastBlock(proxyApp.Consensus())
} else if appBlockHeight == storeBlockHeight {
// We ran Commit, but didn't save the state, so run through consensus with mock app
mockApp := newMockProxyApp(appHash)
log.Info("Replay last block using mock app")
return replayLastBlock(h.config, h.state, mockApp, h.store)
return h.replayLastBlock(mockApp)
}
}
@ -316,28 +297,48 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store
}
for i := appBlockHeight + 1; i <= finalBlock; i++ {
log.Info("Applying block", "height", i)
h.nBlocks += 1
block := h.store.LoadBlock(i)
appHash, err = sm.ApplyBlock(proxyApp.Consensus(), block)
if err != nil {
return nil, err
}
h.nBlocks += 1
}
if useReplayFunc {
// sync the final block
appHash, err = h.ReplayBlocks(appHash, finalBlock, proxyApp)
if err != nil {
return appHash, err
}
return h.ReplayBlocks(appHash, finalBlock, proxyApp)
}
return appHash, h.checkAppHash(appHash)
}
// Replay the last block through the consensus and return the AppHash from after Commit.
func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, error) {
mempool := types.MockMempool{}
cs := NewConsensusState(h.config, h.state, proxyApp, h.store, mempool)
evsw := types.NewEventSwitch()
evsw.Start()
defer evsw.Stop()
cs.SetEventSwitch(evsw)
newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1)
// run through the WAL, commit new block, stop
cs.Start()
<-newBlockCh // TODO: use a timeout and return err?
cs.Stop()
h.nBlocks += 1
return cs.state.AppHash, nil
}
func (h *Handshaker) checkAppHash(appHash []byte) error {
if !bytes.Equal(h.state.AppHash, appHash) {
return errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash))
panic(errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)).Error())
return nil
}
return nil
}


+ 108
- 38
consensus/replay_test.go View File

@ -262,28 +262,41 @@ var (
//---------------------------------------
// Test handshake/replay
// 0 - all synced up
// 1 - saved block but app and state are behind
// 2 - save block and committed but state is behind
var modes = []uint{0, 1, 2}
// Sync from scratch
func TestHandshakeReplayAll(t *testing.T) {
testHandshakeReplay(t, 0)
for _, m := range modes {
testHandshakeReplay(t, 0, m)
}
}
// Sync many, not from scratch
func TestHandshakeReplaySome(t *testing.T) {
testHandshakeReplay(t, 1)
for _, m := range modes {
testHandshakeReplay(t, 1, m)
}
}
// Sync from lagging by one
func TestHandshakeReplayOne(t *testing.T) {
testHandshakeReplay(t, NUM_BLOCKS-1)
for _, m := range modes {
testHandshakeReplay(t, NUM_BLOCKS-1, m)
}
}
// Sync from caught up
func TestHandshakeReplayNone(t *testing.T) {
testHandshakeReplay(t, NUM_BLOCKS)
for _, m := range modes {
testHandshakeReplay(t, NUM_BLOCKS, m)
}
}
// Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart the app and sync it up with the remaining blocks
func testHandshakeReplay(t *testing.T, nBlocks int) {
func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
config := tendermint_test.ResetConfig("proxy_test_")
// copy the many_blocks file
@ -310,44 +323,23 @@ func testHandshakeReplay(t *testing.T, nBlocks int) {
store.chain = chain
store.commits = commits
// run the whole chain against this client to build up the tendermint state
clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "1")))
proxyApp := proxy.NewAppConns(config, clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock))
if _, err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
}
for _, block := range chain {
err := state.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool)
if err != nil {
t.Fatal(err)
}
}
proxyApp.Stop()
latestAppHash := state.AppHash
// run the chain through state.ApplyBlock to build up the tendermint state
latestAppHash := buildTMStateFromChain(config, state, chain, mode)
// run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state
clientCreator2 := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "2")))
// make a new client creator
dummyApp := dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "2"))
clientCreator2 := proxy.NewLocalClientCreator(dummyApp)
if nBlocks > 0 {
// start a new app without handshake, play nBlocks blocks
// run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state
proxyApp := proxy.NewAppConns(config, clientCreator2, nil)
if _, err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
}
state2, _ := stateAndStore(config, privVal.PubKey)
for i := 0; i < nBlocks; i++ {
block := chain[i]
err := state2.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool)
if err != nil {
t.Fatal(err)
}
}
proxyApp.Stop()
state, _ := stateAndStore(config, privVal.PubKey)
buildAppStateFromChain(proxyApp, state, chain, nBlocks, mode)
}
// now start the app using the handshake - it should sync
handshaker := NewHandshaker(config, state, store)
proxyApp = proxy.NewAppConns(config, clientCreator2, handshaker)
proxyApp := proxy.NewAppConns(config, clientCreator2, handshaker)
if _, err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
}
@ -363,9 +355,87 @@ func testHandshakeReplay(t *testing.T, nBlocks int) {
t.Fatalf("Expected app hashes to match after handshake/replay. got %X, expected %X", res.LastBlockAppHash, latestAppHash)
}
if handshaker.NBlocks() != NUM_BLOCKS-nBlocks {
t.Fatalf("Expected handshake to sync %d blocks, got %d", NUM_BLOCKS-nBlocks, handshaker.NBlocks())
expectedBlocksToSync := NUM_BLOCKS - nBlocks
if nBlocks == NUM_BLOCKS && mode > 0 {
expectedBlocksToSync += 1
} else if nBlocks > 0 && mode == 1 {
expectedBlocksToSync += 1
}
if handshaker.NBlocks() != expectedBlocksToSync {
t.Fatalf("Expected handshake to sync %d blocks, got %d", expectedBlocksToSync, handshaker.NBlocks())
}
}
func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) {
err := st.ApplyBlock(nil, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool)
if err != nil {
panic(err)
}
}
func buildAppStateFromChain(proxyApp proxy.AppConns,
state *sm.State, chain []*types.Block, nBlocks int, mode uint) {
// start a new app without handshake, play nBlocks blocks
if _, err := proxyApp.Start(); err != nil {
panic(err)
}
defer proxyApp.Stop()
switch mode {
case 0:
for i := 0; i < nBlocks; i++ {
block := chain[i]
applyBlock(state, block, proxyApp)
}
case 1, 2:
for i := 0; i < nBlocks-1; i++ {
block := chain[i]
applyBlock(state, block, proxyApp)
}
if mode == 2 {
// update the dummy height and apphash
// as if we ran commit but not
applyBlock(state, chain[nBlocks-1], proxyApp)
}
}
}
func buildTMStateFromChain(config cfg.Config, state *sm.State, chain []*types.Block, mode uint) []byte {
// run the whole chain against this client to build up the tendermint state
clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "1")))
proxyApp := proxy.NewAppConns(config, clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock))
if _, err := proxyApp.Start(); err != nil {
panic(err)
}
defer proxyApp.Stop()
var latestAppHash []byte
switch mode {
case 0:
// sync right up
for _, block := range chain {
applyBlock(state, block, proxyApp)
}
latestAppHash = state.AppHash
case 1, 2:
// sync up to the penultimate as if we stored the block.
// whether we commit or not depends on the appHash
for _, block := range chain[:len(chain)-1] {
applyBlock(state, block, proxyApp)
}
// apply the final block to a state copy so we can
// get the right next appHash but keep the state back
stateCopy := state.Copy()
applyBlock(stateCopy, chain[len(chain)-1], proxyApp)
latestAppHash = stateCopy.AppHash
}
return latestAppHash
}
//--------------------------


Loading…
Cancel
Save