Browse Source

Handshake uses ApplyBlock, no ConsensuState

pull/449/head
Ethan Buchman 8 years ago
parent
commit
5109746b1c
5 changed files with 116 additions and 50 deletions
  1. +43
    -21
      consensus/replay.go
  2. +16
    -8
      consensus/state.go
  3. +14
    -0
      state/execution.go
  4. +12
    -21
      state/state.go
  5. +31
    -0
      state/state_test.go

+ 43
- 21
consensus/replay.go View File

@ -101,6 +101,8 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error {
defer func() { cs.replayMode = false }() defer func() { cs.replayMode = false }()
// Ensure that ENDHEIGHT for this height doesn't exist // Ensure that ENDHEIGHT for this height doesn't exist
// NOTE: This is just a sanity check. As far as we know things work fine without it,
// and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT).
gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight)) gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight))
if found { if found {
return errors.New(Fmt("WAL should not contain height %d.", csHeight)) return errors.New(Fmt("WAL should not contain height %d.", csHeight))
@ -273,15 +275,18 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p
} else if appBlockHeight == stateBlockHeight { } else if appBlockHeight == stateBlockHeight {
// We haven't run Commit (both the state and app are one block behind), // We haven't run Commit (both the state and app are one block behind),
// so run through consensus with the real app
// so ApplyBlock with the real app.
// NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT
log.Info("Replay last block using real app") log.Info("Replay last block using real app")
return h.replayLastBlock(proxyApp.Consensus())
return h.replayLastBlock(storeBlockHeight, proxyApp.Consensus())
} else if appBlockHeight == storeBlockHeight { } else if appBlockHeight == storeBlockHeight {
// We ran Commit, but didn't save the state, so run through consensus with mock app
mockApp := newMockProxyApp(appHash)
// We ran Commit, but didn't save the state, so ApplyBlock with mock app
abciResponses := h.state.LoadABCIResponses()
mockApp := newMockProxyApp(appHash, abciResponses)
log.Info("Replay last block using mock app") log.Info("Replay last block using mock app")
return h.replayLastBlock(mockApp)
return h.replayLastBlock(storeBlockHeight, mockApp)
} }
} }
@ -323,26 +328,21 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store
return appHash, h.checkAppHash(appHash) return appHash, h.checkAppHash(appHash)
} }
// Replay the last block through the consensus and return the AppHash from after Commit.
func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, error) {
// ApplyBlock on the proxyApp with the last block.
func (h *Handshaker) replayLastBlock(height int, proxyApp proxy.AppConnConsensus) ([]byte, error) {
mempool := types.MockMempool{} mempool := types.MockMempool{}
cs := NewConsensusState(h.config, h.state, proxyApp, h.store, mempool)
evsw := types.NewEventSwitch()
evsw.Start()
defer evsw.Stop()
cs.SetEventSwitch(evsw)
var eventCache types.Fireable // nil
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)
log.Notice("Attempting to replay last block", "height", h.store.Height())
// run through the WAL, commit new block, stop
if _, err := cs.Start(); err != nil {
if err := h.state.ApplyBlock(eventCache, proxyApp, block, meta.BlockID.PartsHeader, mempool); err != nil {
return nil, err return nil, err
} }
cs.Stop()
h.nBlocks += 1 h.nBlocks += 1
return cs.state.AppHash, nil
return h.state.AppHash, nil
} }
func (h *Handshaker) checkAppHash(appHash []byte) error { func (h *Handshaker) checkAppHash(appHash []byte) error {
@ -354,9 +354,14 @@ func (h *Handshaker) checkAppHash(appHash []byte) error {
} }
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
func newMockProxyApp(appHash []byte) proxy.AppConnConsensus {
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{appHash: appHash})
// mockProxyApp uses ABCIResponses to give the right results
// Useful because we don't want to call Commit() twice for the same block on the real app.
func newMockProxyApp(appHash []byte, abciResponses *sm.ABCIResponses) proxy.AppConnConsensus {
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{
appHash: appHash,
abciResponses: abciResponses,
})
cli, _ := clientCreator.NewABCIClient() cli, _ := clientCreator.NewABCIClient()
return proxy.NewAppConnConsensus(cli) return proxy.NewAppConnConsensus(cli)
} }
@ -364,7 +369,24 @@ func newMockProxyApp(appHash []byte) proxy.AppConnConsensus {
type mockProxyApp struct { type mockProxyApp struct {
abci.BaseApplication abci.BaseApplication
appHash []byte
appHash []byte
txCount int
abciResponses *sm.ABCIResponses
}
func (mock *mockProxyApp) DeliverTx(tx []byte) abci.Result {
r := mock.abciResponses.DeliverTx[mock.txCount]
mock.txCount += 1
return abci.Result{
r.Code,
r.Data,
r.Log,
}
}
func (mock *mockProxyApp) EndBlock(height uint64) abci.ResponseEndBlock {
mock.txCount = 0
return mock.abciResponses.EndBlock
} }
func (mock *mockProxyApp) Commit() abci.Result { func (mock *mockProxyApp) Commit() abci.Result {


+ 16
- 8
consensus/state.go View File

@ -1202,12 +1202,6 @@ func (cs *ConsensusState) finalizeCommit(height int) {
fail.Fail() // XXX fail.Fail() // XXX
if cs.wal != nil {
cs.wal.writeEndHeight(height)
}
fail.Fail() // XXX
// Save to blockStore. // Save to blockStore.
if cs.blockStore.Height() < block.Height { if cs.blockStore.Height() < block.Height {
// NOTE: the seenCommit is local justification to commit this block, // NOTE: the seenCommit is local justification to commit this block,
@ -1222,13 +1216,22 @@ func (cs *ConsensusState) finalizeCommit(height int) {
fail.Fail() // XXX fail.Fail() // XXX
// Finish writing to the WAL for this height.
// NOTE: ConsensusState should not be started again
// until we successfully call ApplyBlock (eg. in Handshake after restart)
if cs.wal != nil {
cs.wal.writeEndHeight(height)
}
fail.Fail() // XXX
// Create a copy of the state for staging // Create a copy of the state for staging
// and an event cache for txs // and an event cache for txs
stateCopy := cs.state.Copy() stateCopy := cs.state.Copy()
eventCache := types.NewEventCache(cs.evsw) eventCache := types.NewEventCache(cs.evsw)
// Execute and commit the block, update and save the state, and update the mempool. // Execute and commit the block, update and save the state, and update the mempool.
// All calls to the proxyAppConn should come here.
// All calls to the proxyAppConn come here.
// NOTE: the block.AppHash wont reflect these txs until the next block // NOTE: the block.AppHash wont reflect these txs until the next block
err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
if err != nil { if err != nil {
@ -1238,7 +1241,12 @@ func (cs *ConsensusState) finalizeCommit(height int) {
fail.Fail() // XXX fail.Fail() // XXX
// Fire off event for new block.
// Fire event for new block.
// NOTE: If we fail before firing, these events will never fire
//
// Some options (for which they may fire more than once. I guess that's fine):
// * Fire before persisting state, in ApplyBlock
// * Fire on start up if we haven't written any new WAL msgs
types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block})
types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header})
eventCache.Flush() eventCache.Flush()


+ 14
- 0
state/execution.go View File

@ -223,6 +223,9 @@ func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConn
fail.Fail() // XXX fail.Fail() // XXX
// index txs. This could run in the background
s.indexTxs(abciResponses)
// save the results before we commit // save the results before we commit
s.SaveABCIResponses(abciResponses) s.SaveABCIResponses(abciResponses)
@ -278,6 +281,17 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl
return nil return nil
} }
func (s *State) indexTxs(abciResponses *ABCIResponses) {
// save the tx results using the TxIndexer
// NOTE: these may be overwriting, but the values should be the same.
batch := txindexer.NewBatch()
for i, d := range abciResponses.DeliverTx {
tx := abciResponses.txs[i]
batch.Index(tx.Hash(), types.TxResult{uint64(abciResponses.Height), uint32(i), *d})
}
s.TxIndexer.Batch(batch)
}
// Apply and commit a block, but without all the state validation. // Apply and commit a block, but without all the state validation.
// Returns the application root hash (result of abci.Commit) // Returns the application root hash (result of abci.Commit)
// TODO handle abciResponses // TODO handle abciResponses


+ 12
- 21
state/state.go View File

@ -64,7 +64,7 @@ func loadState(db dbm.DB, key []byte) *State {
wire.ReadBinaryPtr(&s, r, 0, n, err) wire.ReadBinaryPtr(&s, r, 0, n, err)
if *err != nil { if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
Exit(Fmt("Data has been corrupted or its spec has changed: %v\n", *err))
Exit(Fmt("LoadState: Data has been corrupted or its spec has changed: %v\n", *err))
} }
// TODO: ensure that buf is completely read. // TODO: ensure that buf is completely read.
} }
@ -95,18 +95,8 @@ func (s *State) Save() {
// Sets the ABCIResponses in the state and writes them to disk // Sets the ABCIResponses in the state and writes them to disk
// in case we crash after app.Commit and before s.Save() // in case we crash after app.Commit and before s.Save()
func (s *State) SaveABCIResponses(abciResponses *ABCIResponses) { func (s *State) SaveABCIResponses(abciResponses *ABCIResponses) {
// save the validators to the db // save the validators to the db
s.db.SetSync(abciResponsesKey, abciResponses.Bytes()) s.db.SetSync(abciResponsesKey, abciResponses.Bytes())
// save the tx results using the TxIndexer
// NOTE: these may be overwriting, but the values should be the same.
batch := txindexer.NewBatch()
for i, d := range abciResponses.DeliverTx {
tx := abciResponses.txs[i]
batch.Index(tx.Hash(), types.TxResult{uint64(abciResponses.height), uint32(i), *d})
}
s.TxIndexer.Batch(batch)
} }
func (s *State) LoadABCIResponses() *ABCIResponses { func (s *State) LoadABCIResponses() *ABCIResponses {
@ -115,10 +105,10 @@ func (s *State) LoadABCIResponses() *ABCIResponses {
buf := s.db.Get(abciResponsesKey) buf := s.db.Get(abciResponsesKey)
if len(buf) != 0 { if len(buf) != 0 {
r, n, err := bytes.NewReader(buf), new(int), new(error) r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(&abciResponses.EndBlock.Diffs, r, 0, n, err)
wire.ReadBinaryPtr(abciResponses, r, 0, n, err)
if *err != nil { if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
Exit(Fmt("Data has been corrupted or its spec has changed: %v\n", *err))
Exit(Fmt("LoadABCIResponses: Data has been corrupted or its spec has changed: %v\n", *err))
} }
// TODO: ensure that buf is completely read. // TODO: ensure that buf is completely read.
} }
@ -191,25 +181,26 @@ func GetState(config cfg.Config, stateDB dbm.DB) *State {
// ABCIResponses holds intermediate state during block processing // ABCIResponses holds intermediate state during block processing
type ABCIResponses struct { type ABCIResponses struct {
height int
txs types.Txs // for reference later
Height int
DeliverTx []*abci.ResponseDeliverTx // results of the txs, populated in the proxyCb
EndBlock abci.ResponseEndBlock // changes to the validator set
DeliverTx []*abci.ResponseDeliverTx
EndBlock abci.ResponseEndBlock
txs types.Txs // for reference later
} }
func NewABCIResponses(block *types.Block) *ABCIResponses { func NewABCIResponses(block *types.Block) *ABCIResponses {
return &ABCIResponses{ return &ABCIResponses{
height: block.Height,
txs: block.Data.Txs,
Height: block.Height,
DeliverTx: make([]*abci.ResponseDeliverTx, block.NumTxs), DeliverTx: make([]*abci.ResponseDeliverTx, block.NumTxs),
txs: block.Data.Txs,
} }
} }
// Serialize the list of validators
// Serialize the ABCIResponse
func (a *ABCIResponses) Bytes() []byte { func (a *ABCIResponses) Bytes() []byte {
buf, n, err := new(bytes.Buffer), new(int), new(error) buf, n, err := new(bytes.Buffer), new(int), new(error)
wire.WriteBinary(a.EndBlock, buf, n, err)
wire.WriteBinary(*a, buf, n, err)
if *err != nil { if *err != nil {
PanicCrisis(*err) PanicCrisis(*err)
} }


+ 31
- 0
state/state_test.go View File

@ -1,8 +1,12 @@
package state package state
import ( import (
"fmt"
"testing" "testing"
"github.com/stretchr/testify/assert"
abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-crypto"
dbm "github.com/tendermint/go-db" dbm "github.com/tendermint/go-db"
"github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/config/tendermint_test"
) )
@ -40,3 +44,30 @@ func TestStateSaveLoad(t *testing.T) {
t.Fatal("expected state and its copy to be identical. got %v\n expected %v\n", loadedState, state) t.Fatal("expected state and its copy to be identical. got %v\n expected %v\n", loadedState, state)
} }
} }
func TestABCIResponsesSaveLoad(t *testing.T) {
assert := assert.New(t)
config := tendermint_test.ResetConfig("state_")
stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
state := GetState(config, stateDB)
state.LastBlockHeight += 1
// build mock responses
block := makeBlock(2, state)
abciResponses := NewABCIResponses(block)
abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo")}
abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok"}
abciResponses.EndBlock = abci.ResponseEndBlock{Diffs: []*abci.Validator{
{
PubKey: crypto.GenPrivKeyEd25519().PubKey().Bytes(),
Power: 10,
},
}}
abciResponses.txs = nil
state.SaveABCIResponses(abciResponses)
abciResponses2 := state.LoadABCIResponses()
assert.Equal(abciResponses, abciResponses2, fmt.Sprintf("ABCIResponses don't match: Got %v, Expected %v", abciResponses2, abciResponses))
}

Loading…
Cancel
Save