Browse Source

handshake replay through consensus using mockApp

pull/408/head
Ethan Buchman 8 years ago
parent
commit
cbe6dbe7a1
6 changed files with 95 additions and 90 deletions
  1. +1
    -1
      consensus/replay_file.go
  2. +15
    -3
      consensus/state.go
  3. +4
    -4
      mempool/mempool.go
  4. +1
    -1
      node/node.go
  5. +1
    -1
      rpc/core/pipe.go
  6. +73
    -80
      state/execution.go

+ 1
- 1
consensus/replay_file.go View File

@ -248,7 +248,7 @@ func newConsensusStateForReplay(config cfg.Config) *ConsensusState {
state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
// Create proxyAppConn connection (consensus, mempool, query) // Create proxyAppConn connection (consensus, mempool, query)
proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), sm.NewHandshaker(config, state, blockStore))
proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), sm.NewHandshaker(config, state, blockStore, ReplayLastBlock))
_, err := proxyApp.Start() _, err := proxyApp.Start()
if err != nil { if err != nil {
Exit(Fmt("Error starting proxy app conns: %v", err)) Exit(Fmt("Error starting proxy app conns: %v", err))


+ 15
- 3
consensus/state.go View File

@ -15,7 +15,6 @@ import (
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -227,7 +226,7 @@ type ConsensusState struct {
config cfg.Config config cfg.Config
proxyAppConn proxy.AppConnConsensus proxyAppConn proxy.AppConnConsensus
blockStore *bc.BlockStore blockStore *bc.BlockStore
mempool *mempl.Mempool
mempool sm.Mempool
privValidator PrivValidator // for signing votes privValidator PrivValidator // for signing votes
@ -255,7 +254,20 @@ type ConsensusState struct {
done chan struct{} done chan struct{}
} }
func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
func ReplayLastBlock(config cfg.Config, state *sm.State, proxyApp proxy.AppConnConsensus, blockStore sm.BlockStore) {
mempool := sm.MockMempool{}
cs := NewConsensusState(config, state, proxyApp, blockStore.(*bc.BlockStore), mempool)
evsw := types.NewEventSwitch()
cs.SetEventSwitch(evsw)
newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 0)
cs.Start()
<-newBlockCh
cs.Stop()
}
func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool sm.Mempool) *ConsensusState {
cs := &ConsensusState{ cs := &ConsensusState{
config: config, config: config,
proxyAppConn: proxyAppConn, proxyAppConn: proxyAppConn,


+ 4
- 4
mempool/mempool.go View File

@ -7,13 +7,13 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
abci "github.com/tendermint/abci/types"
auto "github.com/tendermint/go-autofile" auto "github.com/tendermint/go-autofile"
"github.com/tendermint/go-clist" "github.com/tendermint/go-clist"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
abci "github.com/tendermint/abci/types"
) )
/* /*
@ -249,7 +249,7 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
// Get the valid transactions remaining // Get the valid transactions remaining
// If maxTxs is -1, there is no cap on returned transactions. // If maxTxs is -1, there is no cap on returned transactions.
func (mem *Mempool) Reap(maxTxs int) []types.Tx {
func (mem *Mempool) Reap(maxTxs int) types.Txs {
mem.proxyMtx.Lock() mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock() defer mem.proxyMtx.Unlock()
@ -263,7 +263,7 @@ func (mem *Mempool) Reap(maxTxs int) []types.Tx {
} }
// maxTxs: -1 means uncapped, 0 means none // maxTxs: -1 means uncapped, 0 means none
func (mem *Mempool) collectTxs(maxTxs int) []types.Tx {
func (mem *Mempool) collectTxs(maxTxs int) types.Txs {
if maxTxs == 0 { if maxTxs == 0 {
return []types.Tx{} return []types.Tx{}
} else if maxTxs < 0 { } else if maxTxs < 0 {
@ -281,7 +281,7 @@ func (mem *Mempool) collectTxs(maxTxs int) []types.Tx {
// Mempool will discard these txs. // Mempool will discard these txs.
// NOTE: this should be called *after* block is committed by consensus. // NOTE: this should be called *after* block is committed by consensus.
// NOTE: unsafe; Lock/Unlock must be managed by caller // NOTE: unsafe; Lock/Unlock must be managed by caller
func (mem *Mempool) Update(height int, txs []types.Tx) {
func (mem *Mempool) Update(height int, txs types.Txs) {
// TODO: check err ? // TODO: check err ?
mem.proxyAppConn.FlushSync() // To flush async resCb calls e.g. from CheckTx mem.proxyAppConn.FlushSync() // To flush async resCb calls e.g. from CheckTx


+ 1
- 1
node/node.go View File

@ -64,7 +64,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
state := sm.GetState(config, stateDB) state := sm.GetState(config, stateDB)
// Create the proxyApp, which manages connections (consensus, mempool, query) // Create the proxyApp, which manages connections (consensus, mempool, query)
proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(config, state, blockStore))
proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(config, state, blockStore, consensus.ReplayLastBlock))
if _, err := proxyApp.Start(); err != nil { if _, err := proxyApp.Start(); err != nil {
cmn.Exit(cmn.Fmt("Error starting proxy app connections: %v", err)) cmn.Exit(cmn.Fmt("Error starting proxy app connections: %v", err))
} }


+ 1
- 1
rpc/core/pipe.go View File

@ -31,7 +31,7 @@ type Consensus interface {
type Mempool interface { type Mempool interface {
Size() int Size() int
CheckTx(types.Tx, func(*abci.Response)) error CheckTx(types.Tx, func(*abci.Response)) error
Reap(int) []types.Tx
Reap(int) types.Txs
Flush() Flush()
} }


+ 73
- 80
state/execution.go View File

@ -1,7 +1,6 @@
package state package state
import ( import (
"bytes"
"errors" "errors"
"github.com/ebuchman/fail-test" "github.com/ebuchman/fail-test"
@ -278,15 +277,20 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl
type Mempool interface { type Mempool interface {
Lock() Lock()
Unlock() Unlock()
Update(height int, txs []types.Tx)
CheckTx(types.Tx, func(*abci.Response)) error
Reap(int) types.Txs
Update(height int, txs types.Txs)
} }
type MockMempool struct { type MockMempool struct {
} }
func (m MockMempool) Lock() {}
func (m MockMempool) Unlock() {}
func (m MockMempool) Update(height int, txs []types.Tx) {}
func (m MockMempool) Lock() {}
func (m MockMempool) Unlock() {}
func (m MockMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) error { return nil }
func (m MockMempool) Reap(n int) types.Txs { return types.Txs{} }
func (m MockMempool) Update(height int, txs types.Txs) {}
//---------------------------------------------------------------- //----------------------------------------------------------------
// Handshake with app to sync to latest state of core by replaying blocks // Handshake with app to sync to latest state of core by replaying blocks
@ -298,16 +302,19 @@ type BlockStore interface {
LoadBlockMeta(height int) *types.BlockMeta LoadBlockMeta(height int) *types.BlockMeta
} }
type blockReplayFunc func(cfg.Config, *State, proxy.AppConnConsensus, BlockStore)
type Handshaker struct { type Handshaker struct {
config cfg.Config
state *State
store BlockStore
config cfg.Config
state *State
store BlockStore
replayLastBlock blockReplayFunc
nBlocks int // number of blocks applied to the state nBlocks int // number of blocks applied to the state
} }
func NewHandshaker(config cfg.Config, state *State, store BlockStore) *Handshaker {
return &Handshaker{config, state, store, 0}
func NewHandshaker(config cfg.Config, state *State, store BlockStore, f blockReplayFunc) *Handshaker {
return &Handshaker{config, state, store, f, 0}
} }
// TODO: retry the handshake/replay if it fails ? // TODO: retry the handshake/replay if it fails ?
@ -326,7 +333,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// TODO: check version // TODO: check version
// replay blocks up to the latest in the blockstore // replay blocks up to the latest in the blockstore
err = h.ReplayBlocks(appHash, blockHeight, proxyApp.Consensus())
err = h.ReplayBlocks(appHash, blockHeight, proxyApp)
if err != nil { if err != nil {
return errors.New(Fmt("Error on replay: %v", err)) return errors.New(Fmt("Error on replay: %v", err))
} }
@ -340,7 +347,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
} }
// Replay all blocks after blockHeight and ensure the result matches the current state. // Replay all blocks after blockHeight and ensure the result matches the current state.
func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, appConnConsensus proxy.AppConnConsensus) error {
func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp proxy.AppConns) error {
storeBlockHeight := h.store.Height() storeBlockHeight := h.store.Height()
stateBlockHeight := h.state.LastBlockHeight stateBlockHeight := h.state.LastBlockHeight
@ -353,85 +360,71 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, appConnCon
return ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight} return ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight}
} else if storeBlockHeight == appBlockHeight { } else if storeBlockHeight == appBlockHeight {
// We ran Commit, but if we crashed before state.Save(),
// load the intermediate state and update the state.AppHash.
// NOTE: If ABCI allowed rollbacks, we could just replay the
// block even though it's been committed
stateAppHash := h.state.AppHash
lastBlockAppHash := h.store.LoadBlock(storeBlockHeight).AppHash
if bytes.Equal(stateAppHash, appHash) {
// we're all synced up
log.Debug("ABCI RelpayBlocks: Already synced")
} else if bytes.Equal(stateAppHash, lastBlockAppHash) {
// we crashed after commit and before saving state,
// so load the intermediate state and update the hash
h.state.LoadIntermediate()
h.state.AppHash = appHash
log.Debug("ABCI RelpayBlocks: Loaded intermediate state and updated state.AppHash")
} else {
PanicSanity(Fmt("Unexpected state.AppHash: state.AppHash %X; app.AppHash %X, lastBlock.AppHash %X", stateAppHash, appHash, lastBlockAppHash))
// We already ran Commit, so run through consensus with mock app
mockApp := newMockProxyApp(appHash)
}
return nil
h.replayLastBlock(h.config, h.state, mockApp, h.store)
} else if storeBlockHeight == appBlockHeight+1 &&
storeBlockHeight == stateBlockHeight+1 {
} else if storeBlockHeight == appBlockHeight+1 {
// We crashed after saving the block // We crashed after saving the block
// but before Commit (both the state and app are behind), // but before Commit (both the state and app are behind),
// so just replay the block
// check that the lastBlock.AppHash matches the state apphash
block := h.store.LoadBlock(storeBlockHeight)
if !bytes.Equal(block.Header.AppHash, appHash) {
return ErrLastStateMismatch{storeBlockHeight, block.Header.AppHash, appHash}
}
blockMeta := h.store.LoadBlockMeta(storeBlockHeight)
h.nBlocks += 1
var eventCache types.Fireable // nil
// so run through consensus with the real app
h.replayLastBlock(h.config, h.state, proxyApp.Consensus(), h.store)
// replay the latest block
return h.state.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.BlockID.PartsHeader, MockMempool{})
} else if storeBlockHeight != stateBlockHeight {
// unless we failed before committing or saving state (previous 2 case),
// the store and state should be at the same height!
PanicSanity(Fmt("Expected storeHeight (%d) and stateHeight (%d) to match.", storeBlockHeight, stateBlockHeight))
} else { } else {
// store is more than one ahead, // store is more than one ahead,
// so app wants to replay many blocks // so app wants to replay many blocks
// replay all blocks starting with appBlockHeight+1
var eventCache types.Fireable // nil
// TODO: use stateBlockHeight instead and let the consensus state
// do the replay
var appHash []byte
for i := appBlockHeight + 1; i <= storeBlockHeight; i++ {
h.nBlocks += 1
block := h.store.LoadBlock(i)
_, err := execBlockOnProxyApp(eventCache, appConnConsensus, block)
if err != nil {
log.Warn("Error executing block on proxy app", "height", i, "err", err)
return err
/*
// replay all blocks starting with appBlockHeight+1
var eventCache types.Fireable // nil
// TODO: use stateBlockHeight instead and let the consensus state
// do the replay
var appHash []byte
for i := appBlockHeight + 1; i <= storeBlockHeight; i++ {
h.nBlocks += 1
block := h.store.LoadBlock(i)
_, err := execBlockOnProxyApp(eventCache, appConnConsensus, block)
if err != nil {
log.Warn("Error executing block on proxy app", "height", i, "err", err)
return err
}
// Commit block, get hash back
res := appConnConsensus.CommitSync()
if res.IsErr() {
log.Warn("Error in proxyAppConn.CommitSync", "error", res)
return res
}
if res.Log != "" {
log.Info("Commit.Log: " + res.Log)
}
appHash = res.Data
} }
// Commit block, get hash back
res := appConnConsensus.CommitSync()
if res.IsErr() {
log.Warn("Error in proxyAppConn.CommitSync", "error", res)
return res
if !bytes.Equal(h.state.AppHash, appHash) {
return errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash))
} }
if res.Log != "" {
log.Info("Commit.Log: " + res.Log)
}
appHash = res.Data
}
if !bytes.Equal(h.state.AppHash, appHash) {
return errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash))
}
*/
return nil return nil
} }
return nil return nil
} }
//--------------------------------------------------------------------------------
func newMockProxyApp(appHash []byte) proxy.AppConnConsensus {
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{appHash: appHash})
cli, _ := clientCreator.NewABCIClient()
return proxy.NewAppConnConsensus(cli)
}
type mockProxyApp struct {
abci.BaseApplication
appHash []byte
}
func (mock *mockProxyApp) Commit() abci.Result {
return abci.NewResultOK(mock.appHash, "")
}

Loading…
Cancel
Save