diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index ecb4a9e63..f58b83942 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -21,7 +21,7 @@ func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) { // Get State stateDB := dbm.NewMemDB() state, _ := sm.GetState(stateDB, config.GenesisFile()) - sm.SaveState(stateDB, state, state.AppHash) + sm.SaveState(stateDB, state) return state, blockStore } diff --git a/consensus/common_test.go b/consensus/common_test.go index 6598c15eb..ba3564aae 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -235,16 +235,16 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { //------------------------------------------------------------------------------- // consensus states -func newConsensusState(state *sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState { +func newConsensusState(state sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState { return newConsensusStateWithConfig(config, state, pv, app) } -func newConsensusStateWithConfig(thisConfig *cfg.Config, state *sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState { +func newConsensusStateWithConfig(thisConfig *cfg.Config, state sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState { blockDB := dbm.NewMemDB() return newConsensusStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB) } -func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state *sm.State, pv types.PrivValidator, app abci.Application, blockDB dbm.DB) *ConsensusState { +func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.State, pv types.PrivValidator, app abci.Application, blockDB dbm.DB) *ConsensusState { // Get BlockStore blockStore := bc.NewBlockStore(blockDB) @@ -264,7 +264,10 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state *sm. evpool := types.MockEvidencePool{} // Make ConsensusReactor - cs := NewConsensusState(thisConfig.Consensus, state, proxyAppConnCon, blockStore, mempool, evpool) + stateDB := dbm.NewMemDB() // XXX !! + blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), + nil, proxyAppConnCon, mempool, evpool) + cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) cs.SetLogger(log.TestingLogger()) cs.SetPrivValidator(pv) @@ -284,9 +287,7 @@ func loadPrivValidator(config *cfg.Config) *types.PrivValidatorFS { } func fixedConsensusStateDummy(config *cfg.Config, logger log.Logger) *ConsensusState { - stateDB := dbm.NewMemDB() - state, _ := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile()) - state.SetLogger(logger.With("module", "state")) + state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile()) privValidator := loadPrivValidator(config) cs := newConsensusState(state, privValidator, dummy.NewDummyApplication()) cs.SetLogger(logger) @@ -354,10 +355,9 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou css := make([]*ConsensusState, nValidators) logger := consensusLogger() for i := 0; i < nValidators; i++ { - db := dbm.NewMemDB() // each state needs its own db - state, _ := sm.MakeGenesisState(db, genDoc) - state.SetLogger(logger.With("module", "state", "validator", i)) - state.Save() + stateDB := dbm.NewMemDB() // each state needs its own db + state, _ := sm.MakeGenesisState(genDoc) + sm.SaveState(stateDB, state) thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i)) for _, opt := range configOpts { opt(thisConfig) @@ -380,10 +380,9 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF css := make([]*ConsensusState, nPeers) logger := consensusLogger() for i := 0; i < nPeers; i++ { - db := dbm.NewMemDB() // each state needs its own db - state, _ := sm.MakeGenesisState(db, genDoc) - state.SetLogger(logger.With("module", "state", "validator", i)) - state.Save() + stateDB := dbm.NewMemDB() // each state needs its own db + state, _ := sm.MakeGenesisState(genDoc) + sm.SaveState(stateDB, state) thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i)) ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal var privVal types.PrivValidator @@ -437,12 +436,11 @@ func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.G }, privValidators } -func randGenesisState(numValidators int, randPower bool, minPower int64) (*sm.State, []*types.PrivValidatorFS) { +func randGenesisState(numValidators int, randPower bool, minPower int64) (sm.State, []*types.PrivValidatorFS) { genDoc, privValidators := randGenesisDoc(numValidators, randPower, minPower) + s0, _ := sm.MakeGenesisState(genDoc) db := dbm.NewMemDB() - s0, _ := sm.MakeGenesisState(db, genDoc) - s0.SetLogger(log.TestingLogger().With("module", "state")) - s0.Save() + sm.SaveState(db, s0) return s0, privValidators } diff --git a/consensus/reactor.go b/consensus/reactor.go index eb752ee17..9b3393e94 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -82,7 +82,7 @@ func (conR *ConsensusReactor) OnStop() { // SwitchToConsensus switches from fast_sync mode to consensus mode. // It resets the state, turns off fast_sync, and starts the consensus state-machine -func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State, blocksSynced int) { +func (conR *ConsensusReactor) SwitchToConsensus(state sm.State, blocksSynced int) { conR.Logger.Info("SwitchToConsensus") conR.conS.reconstructLastCommit(state) // NOTE: The line below causes broadcastNewRoundStepRoutine() to diff --git a/consensus/replay.go b/consensus/replay.go index a9aaeefcf..55c291783 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -13,6 +13,7 @@ import ( abci "github.com/tendermint/abci/types" //auto "github.com/tendermint/tmlibs/autofile" cmn "github.com/tendermint/tmlibs/common" + dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" "github.com/tendermint/tendermint/proxy" @@ -186,15 +187,16 @@ func makeHeightSearchFunc(height int64) auto.SearchFunc { // we were last and using the WAL to recover there type Handshaker struct { - state *sm.State - store types.BlockStore - logger log.Logger + stateDB dbm.DB + initialState sm.State + store types.BlockStore + logger log.Logger nBlocks int // number of blocks applied to the state } -func NewHandshaker(state *sm.State, store types.BlockStore) *Handshaker { - return &Handshaker{state, store, log.NewNopLogger(), 0} +func NewHandshaker(stateDB dbm.DB, state sm.State, store types.BlockStore) *Handshaker { + return &Handshaker{stateDB, state, store, log.NewNopLogger(), 0} } func (h *Handshaker) SetLogger(l log.Logger) { @@ -224,7 +226,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { // TODO: check version // replay blocks up to the latest in the blockstore - _, err = h.ReplayBlocks(appHash, blockHeight, proxyApp) + _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) if err != nil { return fmt.Errorf("Error on replay: %v", err) } @@ -238,15 +240,15 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { // Replay all blocks since appBlockHeight and ensure the result matches the current state. // Returns the final AppHash or an error -func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns) ([]byte, error) { +func (h *Handshaker) ReplayBlocks(state sm.State, appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns) ([]byte, error) { storeBlockHeight := h.store.Height() - stateBlockHeight := h.state.LastBlockHeight + stateBlockHeight := state.LastBlockHeight h.logger.Info("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight) // If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain if appBlockHeight == 0 { - validators := types.TM2PB.Validators(h.state.Validators) + validators := types.TM2PB.Validators(state.Validators) if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { return nil, err } @@ -254,7 +256,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp // First handle edge cases and constraints on the storeBlockHeight if storeBlockHeight == 0 { - return appHash, h.checkAppHash(appHash) + return appHash, checkAppHash(state, appHash) } else if storeBlockHeight < appBlockHeight { // the app should never be ahead of the store (but this is under app's control) @@ -269,6 +271,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp cmn.PanicSanity(cmn.Fmt("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1)) } + var err error // Now either store is equal to state, or one ahead. // For each, consider all cases of where the app could be, given app <= store if storeBlockHeight == stateBlockHeight { @@ -276,11 +279,11 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp // Either the app is asking for replay, or we're all synced up. if appBlockHeight < storeBlockHeight { // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) - return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, false) + return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false) } else if appBlockHeight == storeBlockHeight { // We're good! - return appHash, h.checkAppHash(appHash) + return appHash, checkAppHash(state, appHash) } } else if storeBlockHeight == stateBlockHeight+1 { @@ -289,7 +292,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp if appBlockHeight < stateBlockHeight { // the app is further behind than it should be, so replay blocks // but leave the last block to go through the WAL - return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, true) + return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true) } else if appBlockHeight == stateBlockHeight { // We haven't run Commit (both the state and app are one block behind), @@ -297,17 +300,19 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp // NOTE: We could instead use the cs.WAL on cs.Start, // but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT h.logger.Info("Replay last block using real app") - return h.replayBlock(storeBlockHeight, proxyApp.Consensus()) + state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus()) + return state.AppHash, err } else if appBlockHeight == storeBlockHeight { // We ran Commit, but didn't save the state, so replayBlock with mock app - abciResponses, err := sm.LoadABCIResponses(h.state.DB(), storeBlockHeight) + abciResponses, err := sm.LoadABCIResponses(h.stateDB, storeBlockHeight) if err != nil { return nil, err } mockApp := newMockProxyApp(appHash, abciResponses) h.logger.Info("Replay last block using mock app") - return h.replayBlock(storeBlockHeight, mockApp) + state, err = h.replayBlock(state, storeBlockHeight, mockApp) + return state.AppHash, err } } @@ -316,7 +321,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp return nil, nil } -func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) { +func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) { // App is further behind than it should be, so we need to replay blocks. // We replay all blocks from appBlockHeight+1. // @@ -336,7 +341,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store for i := appBlockHeight + 1; i <= finalBlock; i++ { h.logger.Info("Applying block", "height", i) block := h.store.LoadBlock(i) - appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, h.state.LastValidators) + appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger) if err != nil { return nil, err } @@ -346,33 +351,41 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store if mutateState { // sync the final block - return h.replayBlock(storeBlockHeight, proxyApp.Consensus()) + state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus()) + if err != nil { + return nil, err + } + appHash = state.AppHash } - return appHash, h.checkAppHash(appHash) + return appHash, checkAppHash(state, appHash) } // ApplyBlock on the proxyApp with the last block. -func (h *Handshaker) replayBlock(height int64, proxyApp proxy.AppConnConsensus) ([]byte, error) { +func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) { mempool := types.MockMempool{} evpool := types.MockEvidencePool{} block := h.store.LoadBlock(height) meta := h.store.LoadBlockMeta(height) - if err := h.state.ApplyBlock(types.NopEventBus{}, proxyApp, - block, meta.BlockID.PartsHeader, mempool, evpool); err != nil { - return nil, err + blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, + types.NopEventBus{}, proxyApp, mempool, evpool) + + var err error + state, err = blockExec.ApplyBlock(state, meta.BlockID, block) + if err != nil { + return sm.State{}, err } h.nBlocks += 1 - return h.state.AppHash, nil + return state, nil } -func (h *Handshaker) checkAppHash(appHash []byte) error { - if !bytes.Equal(h.state.AppHash, appHash) { - panic(fmt.Errorf("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash).Error()) +func checkAppHash(state sm.State, appHash []byte) error { + if !bytes.Equal(state.AppHash, appHash) { + panic(fmt.Errorf("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, state.AppHash).Error()) } return nil } diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 4db58ada8..d832abad4 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -18,6 +18,7 @@ import ( "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" + "github.com/tendermint/tmlibs/log" ) const ( @@ -104,11 +105,11 @@ type playback struct { count int // how many lines/msgs into the file are we // replays can be reset to beginning - fileName string // so we can close/reopen the file - genesisState *sm.State // so the replay session knows where to restart from + fileName string // so we can close/reopen the file + genesisState sm.State // so the replay session knows where to restart from } -func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.State) *playback { +func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState sm.State) *playback { return &playback{ cs: cs, fp: fp, @@ -123,7 +124,7 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { pb.cs.Stop() pb.cs.Wait() - newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn, + newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, pb.cs.blockStore, pb.cs.mempool, pb.cs.evpool) newCS.SetEventBus(pb.cs.eventBus) newCS.startForReplay() @@ -285,14 +286,14 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo // Get State stateDB := dbm.NewDB("state", config.DBBackend, config.DBDir()) - state, err := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile()) + state, err := sm.MakeGenesisStateFromFile(config.GenesisFile()) if err != nil { cmn.Exit(err.Error()) } // Create proxyAppConn connection (consensus, mempool, query) clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()) - proxyApp := proxy.NewAppConns(clientCreator, NewHandshaker(state, blockStore)) + proxyApp := proxy.NewAppConns(clientCreator, NewHandshaker(stateDB, state, blockStore)) err = proxyApp.Start() if err != nil { cmn.Exit(cmn.Fmt("Error starting proxy app conns: %v", err)) @@ -303,8 +304,13 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err)) } - consensusState := NewConsensusState(csConfig, state.Copy(), proxyApp.Consensus(), - blockStore, types.MockMempool{}, types.MockEvidencePool{}) + mempool, evpool := types.MockMempool{}, types.MockEvidencePool{} + blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), + nil, proxyApp.Consensus(), + mempool, evpool) + + consensusState := NewConsensusState(csConfig, state.Copy(), blockExec, + blockStore, mempool, evpool) consensusState.SetEventBus(eventBus) return consensusState diff --git a/consensus/replay_test.go b/consensus/replay_test.go index f1a060ec1..4647ff3d6 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -54,7 +54,6 @@ func init() { func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64, blockDB dbm.DB, stateDB dbm.DB) { logger := log.TestingLogger() state, _ := sm.GetState(stateDB, consensusReplayConfig.GenesisFile()) - state.SetLogger(logger.With("module", "state")) privValidator := loadPrivValidator(consensusReplayConfig) cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB) cs.SetLogger(logger) @@ -98,22 +97,22 @@ func sendTxs(cs *ConsensusState, ctx context.Context) { func TestWALCrash(t *testing.T) { testCases := []struct { name string - initFn func(*ConsensusState, context.Context) + initFn func(dbm.DB, *ConsensusState, context.Context) heightToStop int64 }{ {"empty block", - func(cs *ConsensusState, ctx context.Context) {}, + func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {}, 1}, {"block with a smaller part size", - func(cs *ConsensusState, ctx context.Context) { + func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) { // XXX: is there a better way to change BlockPartSizeBytes? cs.state.ConsensusParams.BlockPartSizeBytes = 512 - cs.state.Save() + sm.SaveState(stateDB, cs.state) go sendTxs(cs, ctx) }, 1}, {"many non-empty blocks", - func(cs *ConsensusState, ctx context.Context) { + func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) { go sendTxs(cs, ctx) }, 3}, @@ -126,7 +125,7 @@ func TestWALCrash(t *testing.T) { } } -func crashWALandCheckLiveness(t *testing.T, initFn func(*ConsensusState, context.Context), heightToStop int64) { +func crashWALandCheckLiveness(t *testing.T, initFn func(dbm.DB, *ConsensusState, context.Context), heightToStop int64) { walPaniced := make(chan error) crashingWal := &crashingWAL{panicCh: walPaniced, heightToStop: heightToStop} @@ -139,8 +138,7 @@ LOOP: // create consensus state from a clean slate logger := log.NewNopLogger() stateDB := dbm.NewMemDB() - state, _ := sm.MakeGenesisStateFromFile(stateDB, consensusReplayConfig.GenesisFile()) - state.SetLogger(logger.With("module", "state")) + state, _ := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile()) privValidator := loadPrivValidator(consensusReplayConfig) blockDB := dbm.NewMemDB() cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB) @@ -148,7 +146,7 @@ LOOP: // start sending transactions ctx, cancel := context.WithCancel(context.Background()) - initFn(cs, ctx) + initFn(stateDB, cs, ctx) // clean up WAL file from the previous iteration walFile := cs.config.WalFile() @@ -344,12 +342,13 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { t.Fatalf(err.Error()) } - state, store := stateAndStore(config, privVal.GetPubKey()) + stateDB, state, store := stateAndStore(config, privVal.GetPubKey()) store.chain = chain store.commits = commits // run the chain through state.ApplyBlock to build up the tendermint state - latestAppHash := buildTMStateFromChain(config, state, chain, mode) + state = buildTMStateFromChain(config, stateDB, state, chain, mode) + latestAppHash := state.AppHash // make a new client creator dummyApp := dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "2")) @@ -358,12 +357,12 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { // run nBlocks against a new client to build up the app state. // use a throwaway tendermint state proxyApp := proxy.NewAppConns(clientCreator2, nil) - state, _ := stateAndStore(config, privVal.GetPubKey()) - buildAppStateFromChain(proxyApp, state, chain, nBlocks, mode) + stateDB, state, _ := stateAndStore(config, privVal.GetPubKey()) + buildAppStateFromChain(proxyApp, stateDB, state, chain, nBlocks, mode) } // now start the app using the handshake - it should sync - handshaker := NewHandshaker(state, store) + handshaker := NewHandshaker(stateDB, state, store) proxyApp := proxy.NewAppConns(clientCreator2, handshaker) if err := proxyApp.Start(); err != nil { t.Fatalf("Error starting proxy app connections: %v", err) @@ -393,16 +392,21 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { } } -func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) { +func applyBlock(stateDB dbm.DB, st sm.State, blk *types.Block, proxyApp proxy.AppConns) sm.State { testPartSize := st.ConsensusParams.BlockPartSizeBytes - err := st.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool, evpool) + blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), + types.NopEventBus{}, proxyApp.Consensus(), mempool, evpool) + + blkID := types.BlockID{blk.Hash(), blk.MakePartSet(testPartSize).Header()} + newState, err := blockExec.ApplyBlock(st, blkID, blk) if err != nil { panic(err) } + return newState } -func buildAppStateFromChain(proxyApp proxy.AppConns, - state *sm.State, chain []*types.Block, nBlocks int, mode uint) { +func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB, + state sm.State, chain []*types.Block, nBlocks int, mode uint) { // start a new app without handshake, play nBlocks blocks if err := proxyApp.Start(); err != nil { panic(err) @@ -418,24 +422,24 @@ func buildAppStateFromChain(proxyApp proxy.AppConns, case 0: for i := 0; i < nBlocks; i++ { block := chain[i] - applyBlock(state, block, proxyApp) + state = applyBlock(stateDB, state, block, proxyApp) } case 1, 2: for i := 0; i < nBlocks-1; i++ { block := chain[i] - applyBlock(state, block, proxyApp) + state = applyBlock(stateDB, state, block, proxyApp) } if mode == 2 { // update the dummy height and apphash // as if we ran commit but not - applyBlock(state, chain[nBlocks-1], proxyApp) + state = applyBlock(stateDB, state, chain[nBlocks-1], proxyApp) } } } -func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.Block, mode uint) []byte { +func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, chain []*types.Block, mode uint) sm.State { // run the whole chain against this client to build up the tendermint state clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "1"))) proxyApp := proxy.NewAppConns(clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock)) @@ -449,31 +453,26 @@ func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.B panic(err) } - var latestAppHash []byte - switch mode { case 0: // sync right up for _, block := range chain { - applyBlock(state, block, proxyApp) + state = applyBlock(stateDB, state, block, proxyApp) } - latestAppHash = state.AppHash case 1, 2: // sync up to the penultimate as if we stored the block. // whether we commit or not depends on the appHash for _, block := range chain[:len(chain)-1] { - applyBlock(state, block, proxyApp) + state = applyBlock(stateDB, state, block, proxyApp) } // apply the final block to a state copy so we can // get the right next appHash but keep the state back - stateCopy := state.Copy() - applyBlock(stateCopy, chain[len(chain)-1], proxyApp) - latestAppHash = stateCopy.AppHash + applyBlock(stateDB, state, chain[len(chain)-1], proxyApp) } - return latestAppHash + return state } //-------------------------- @@ -587,13 +586,11 @@ func readPieceFromWAL(msg *TimedWALMessage) interface{} { } // fresh state and mock store -func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (*sm.State, *mockBlockStore) { +func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (dbm.DB, sm.State, *mockBlockStore) { stateDB := dbm.NewMemDB() - state, _ := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile()) - state.SetLogger(log.TestingLogger().With("module", "state")) - + state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile()) store := NewMockBlockStore(config, state.ConsensusParams) - return state, store + return stateDB, state, store } //---------------------------------- diff --git a/consensus/state.go b/consensus/state.go index 5e83e6a56..477d872b0 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -17,7 +17,6 @@ import ( cfg "github.com/tendermint/tendermint/config" cstypes "github.com/tendermint/tendermint/consensus/types" - "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -75,15 +74,16 @@ type ConsensusState struct { privValidator types.PrivValidator // for signing votes // services for creating and executing blocks - proxyAppConn proxy.AppConnConsensus - blockStore types.BlockStore - mempool types.Mempool - evpool types.EvidencePool + // TODO: encapsulate all of this in one "BlockManager" + blockExec *sm.BlockExecutor + blockStore types.BlockStore + mempool types.Mempool + evpool types.EvidencePool // internal state mtx sync.Mutex cstypes.RoundState - state *sm.State // State until height-1. + state sm.State // State until height-1. // state changes may be triggered by msgs from peers, // msgs from ourself, or by timeouts @@ -114,10 +114,10 @@ type ConsensusState struct { } // NewConsensusState returns a new ConsensusState. -func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState { +func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState { cs := &ConsensusState{ config: config, - proxyAppConn: proxyAppConn, + blockExec: blockExec, blockStore: blockStore, mempool: mempool, peerMsgQueue: make(chan msgInfo, msgQueueSize), @@ -162,7 +162,7 @@ func (cs *ConsensusState) String() string { } // GetState returns a copy of the chain state. -func (cs *ConsensusState) GetState() *sm.State { +func (cs *ConsensusState) GetState() sm.State { cs.mtx.Lock() defer cs.mtx.Unlock() return cs.state.Copy() @@ -399,7 +399,7 @@ func (cs *ConsensusState) sendInternalMessage(mi msgInfo) { // Reconstruct LastCommit from SeenCommit, which we saved along with the block, // (which happens even before saving the state) -func (cs *ConsensusState) reconstructLastCommit(state *sm.State) { +func (cs *ConsensusState) reconstructLastCommit(state sm.State) { if state.LastBlockHeight == 0 { return } @@ -422,12 +422,12 @@ func (cs *ConsensusState) reconstructLastCommit(state *sm.State) { // Updates ConsensusState and increments height to match that of state. // The round becomes 0 and cs.Step becomes cstypes.RoundStepNewHeight. -func (cs *ConsensusState) updateToState(state *sm.State) { +func (cs *ConsensusState) updateToState(state sm.State) { if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight { cmn.PanicSanity(cmn.Fmt("updateToState() expected state height of %v but found %v", cs.Height, state.LastBlockHeight)) } - if cs.state != nil && cs.state.LastBlockHeight+1 != cs.Height { + if !cs.state.IsEmpty() && cs.state.LastBlockHeight+1 != cs.Height { // This might happen when someone else is mutating cs.state. // Someone forgot to pass in state.Copy() somewhere?! cmn.PanicSanity(cmn.Fmt("Inconsistent cs.state.LastBlockHeight+1 %v vs cs.Height %v", @@ -437,7 +437,7 @@ func (cs *ConsensusState) updateToState(state *sm.State) { // If state isn't further out than cs.state, just ignore. // This happens when SwitchToConsensus() is called in the reactor. // We don't want to reset e.g. the Votes. - if cs.state != nil && (state.LastBlockHeight <= cs.state.LastBlockHeight) { + if !cs.state.IsEmpty() && (state.LastBlockHeight <= cs.state.LastBlockHeight) { cs.Logger.Info("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1) return } @@ -922,7 +922,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int64, round int) { } // Validate proposal block - err := cs.state.ValidateBlock(cs.ProposalBlock) + err := sm.ValidateBlock(cs.state, cs.ProposalBlock) if err != nil { // ProposalBlock is invalid, prevote nil. logger.Error("enterPrevote: ProposalBlock is invalid", "err", err) @@ -1030,7 +1030,7 @@ func (cs *ConsensusState) enterPrecommit(height int64, round int) { if cs.ProposalBlock.HashesTo(blockID.Hash) { cs.Logger.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", blockID.Hash) // Validate the block. - if err := cs.state.ValidateBlock(cs.ProposalBlock); err != nil { + if err := sm.ValidateBlock(cs.state, cs.ProposalBlock); err != nil { cmn.PanicConsensus(cmn.Fmt("enterPrecommit: +2/3 prevoted for an invalid block: %v", err)) } cs.LockedRound = round @@ -1165,7 +1165,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) { if !block.HashesTo(blockID.Hash) { cmn.PanicSanity(cmn.Fmt("Cannot finalizeCommit, ProposalBlock does not hash to commit hash")) } - if err := cs.state.ValidateBlock(block); err != nil { + if err := sm.ValidateBlock(cs.state, block); err != nil { cmn.PanicConsensus(cmn.Fmt("+2/3 committed an invalid block: %v", err)) } @@ -1204,13 +1204,12 @@ func (cs *ConsensusState) finalizeCommit(height int64) { // and an event cache for txs stateCopy := cs.state.Copy() txEventBuffer := types.NewTxEventBuffer(cs.eventBus, int(block.NumTxs)) + cs.blockExec.SetTxEventPublisher(txEventBuffer) // Execute and commit the block, update and save the state, and update the mempool. - // All calls to the proxyAppConn come here. // NOTE: the block.AppHash wont reflect these txs until the next block - err := stateCopy.ApplyBlock(txEventBuffer, cs.proxyAppConn, - block, blockParts.Header(), - cs.mempool, cs.evpool) + var err error + stateCopy, err = cs.blockExec.ApplyBlock(stateCopy, types.BlockID{block.Hash(), blockParts.Header()}, block) if err != nil { cs.Logger.Error("Error on ApplyBlock. Did the application crash? Please restart tendermint", "err", err) err := cmn.Kill() diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index 73ad3e7fc..fe9066b3f 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -47,13 +47,12 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { } stateDB := db.NewMemDB() blockStoreDB := db.NewMemDB() - state, err := sm.MakeGenesisState(stateDB, genDoc) - state.SetLogger(logger.With("module", "state")) + state, err := sm.MakeGenesisState(genDoc) if err != nil { return nil, errors.Wrap(err, "failed to make genesis state") } blockStore := bc.NewBlockStore(blockStoreDB) - handshaker := NewHandshaker(state, blockStore) + handshaker := NewHandshaker(stateDB, state, blockStore) proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app), handshaker) proxyApp.SetLogger(logger.With("module", "proxy")) if err := proxyApp.Start(); err != nil { @@ -68,7 +67,8 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { defer eventBus.Stop() mempool := types.MockMempool{} evpool := types.MockEvidencePool{} - consensusState := NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool, evpool) + blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), nil, proxyApp.Consensus(), mempool, evpool) + consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState.SetLogger(logger) consensusState.SetEventBus(eventBus) if privValidator != nil { diff --git a/evidence/pool.go b/evidence/pool.go index 381801df3..2e7cd4704 100644 --- a/evidence/pool.go +++ b/evidence/pool.go @@ -21,13 +21,13 @@ type EvidencePool struct { evidenceChan chan types.Evidence } -func NewEvidencePool(params types.EvidenceParams, evidenceStore *EvidenceStore, state types.State) *EvidencePool { +func NewEvidencePool(params types.EvidenceParams, evidenceStore *EvidenceStore) *EvidencePool { evpool := &EvidencePool{ params: params, logger: log.NewNopLogger(), evidenceStore: evidenceStore, - state: *state, - evidenceChan: make(chan types.Evidence), + // state: *state, + evidenceChan: make(chan types.Evidence), } return evpool } @@ -58,12 +58,15 @@ func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { // TODO: check if we already have evidence for this // validator at this height so we dont get spammed - priority, err := sm.VerifyEvidence(evpool.state, evidence) - if err != nil { - // TODO: if err is just that we cant find it cuz we pruned, ignore. - // TODO: if its actually bad evidence, punish peer - return err - } + // TODO + var priority int64 + /* + priority, err := sm.VerifyEvidence(evpool.state, evidence) + if err != nil { + // TODO: if err is just that we cant find it cuz we pruned, ignore. + // TODO: if its actually bad evidence, punish peer + return err + }*/ added := evpool.evidenceStore.AddNewEvidence(evidence, priority) if !added { diff --git a/node/node.go b/node/node.go index 53eab6e0e..15e4f0432 100644 --- a/node/node.go +++ b/node/node.go @@ -102,7 +102,8 @@ type Node struct { trustMetricStore *trust.TrustMetricStore // trust metrics for all peers // services - eventBus *types.EventBus // pub/sub for services + eventBus *types.EventBus // pub/sub for services + stateDB dbm.DB blockStore *bc.BlockStore // store the blockchain to disk bcReactor *bc.BlockchainReactor // for fast-syncing mempoolReactor *mempl.MempoolReactor // for gossipping transactions @@ -148,21 +149,20 @@ func NewNode(config *cfg.Config, saveGenesisDoc(stateDB, genDoc) } - stateLogger := logger.With("module", "state") state := sm.LoadState(stateDB) - if state == nil { - state, err = sm.MakeGenesisState(stateDB, genDoc) + if state.IsEmpty() { + state, err = sm.MakeGenesisState(genDoc) if err != nil { return nil, err } - state.Save() + sm.SaveState(stateDB, state) } - state.SetLogger(stateLogger) // Create the proxyApp, which manages connections (consensus, mempool, query) - // and sync tendermint and the app by replaying any necessary blocks + // and sync tendermint and the app by performing a handshake + // and replaying any necessary blocks consensusLogger := logger.With("module", "consensus") - handshaker := consensus.NewHandshaker(state, blockStore) + handshaker := consensus.NewHandshaker(stateDB, state, blockStore) handshaker.SetLogger(consensusLogger) proxyApp := proxy.NewAppConns(clientCreator, handshaker) proxyApp.SetLogger(logger.With("module", "proxy")) @@ -172,7 +172,6 @@ func NewNode(config *cfg.Config, // reload the state (it may have been updated by the handshake) state = sm.LoadState(stateDB) - state.SetLogger(stateLogger) // Generate node PrivKey privKey := crypto.GenPrivKeyEd25519() @@ -194,10 +193,6 @@ func NewNode(config *cfg.Config, consensusLogger.Info("This node is not a validator") } - // Make BlockchainReactor - bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync) - bcReactor.SetLogger(logger.With("module", "blockchain")) - // Make MempoolReactor mempoolLogger := logger.With("module", "mempool") mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight) @@ -216,14 +211,24 @@ func NewNode(config *cfg.Config, } evidenceLogger := logger.With("module", "evidence") evidenceStore := evidence.NewEvidenceStore(evidenceDB) - evidencePool := evidence.NewEvidencePool(state.ConsensusParams.EvidenceParams, evidenceStore, state.Copy()) + evidencePool := evidence.NewEvidencePool(state.ConsensusParams.EvidenceParams, evidenceStore) // , state.Copy()) evidencePool.SetLogger(evidenceLogger) evidenceReactor := evidence.NewEvidenceReactor(evidencePool) evidenceReactor.SetLogger(evidenceLogger) + blockExecLogger := logger.With("module", "state") + // make block executor for consensus and blockchain reactors to execute blocks + blockExec := sm.NewBlockExecutor(stateDB, blockExecLogger, + nil, proxyApp.Consensus(), + mempool, evidencePool) + + // Make BlockchainReactor + bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor.SetLogger(logger.With("module", "blockchain")) + // Make ConsensusReactor consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), - proxyApp.Consensus(), blockStore, mempool, evidencePool) + blockExec, blockStore, mempool, evidencePool) consensusState.SetLogger(consensusLogger) if privValidator != nil { consensusState.SetPrivValidator(privValidator) @@ -291,7 +296,6 @@ func NewNode(config *cfg.Config, eventBus.SetLogger(logger.With("module", "events")) // services which will be publishing and/or subscribing for messages (events) - bcReactor.SetEventBus(eventBus) consensusReactor.SetEventBus(eventBus) // Transaction indexing @@ -333,6 +337,7 @@ func NewNode(config *cfg.Config, addrBook: addrBook, trustMetricStore: trustMetricStore, + stateDB: stateDB, blockStore: blockStore, bcReactor: bcReactor, mempoolReactor: mempoolReactor, @@ -429,6 +434,7 @@ func (n *Node) AddListener(l p2p.Listener) { // ConfigureRPC sets all variables in rpccore so they will serve // rpc calls from this node func (n *Node) ConfigureRPC() { + rpccore.SetStateDB(n.stateDB) rpccore.SetBlockStore(n.blockStore) rpccore.SetConsensusState(n.consensusState) rpccore.SetMempool(n.mempoolReactor.Mempool) diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index 8b0ee4592..853bb0f7b 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -337,8 +337,7 @@ func BlockResults(heightPtr *int64) (*ctypes.ResultBlockResults, error) { } // load the results - state := consensusState.GetState() - results, err := sm.LoadABCIResponses(state.DB(), height) + results, err := sm.LoadABCIResponses(stateDB, height) if err != nil { return nil, err } diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index eedcce277..65c9fc364 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -50,8 +50,7 @@ func Validators(heightPtr *int64) (*ctypes.ResultValidators, error) { return nil, err } - state := consensusState.GetState() - validators, err := sm.LoadValidators(state.DB(), height) + validators, err := sm.LoadValidators(stateDB, height) if err != nil { return nil, err } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 325625c79..927d7ccad 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -11,6 +11,7 @@ import ( sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" + dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" ) @@ -20,7 +21,7 @@ var subscribeTimeout = 5 * time.Second // These interfaces are used by RPC and must be thread safe type Consensus interface { - GetState() *sm.State + GetState() sm.State GetValidators() (int64, []*types.Validator) GetRoundState() *cstypes.RoundState } @@ -43,6 +44,7 @@ var ( proxyAppQuery proxy.AppConnQuery // interfaces defined in types and above + stateDB dbm.DB blockStore types.BlockStore mempool types.Mempool evidencePool types.EvidencePool @@ -60,6 +62,10 @@ var ( logger log.Logger ) +func SetStateDB(db dbm.DB) { + stateDB = db +} + func SetBlockStore(bs types.BlockStore) { blockStore = bs } diff --git a/state/db.go b/state/db.go index 32f625841..fbe99863b 100644 --- a/state/db.go +++ b/state/db.go @@ -36,7 +36,7 @@ func GetState(stateDB dbm.DB, genesisFile string) (State, error) { if err != nil { return state, err } - SaveState(stateDB, state, state.AppHash) + SaveState(stateDB, state) } return state, nil @@ -66,9 +66,7 @@ func loadState(db dbm.DB, key []byte) (state State) { } // SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database. -// It sets the given appHash on the state before persisting. -func SaveState(db dbm.DB, s State, appHash []byte) { - s.AppHash = appHash +func SaveState(db dbm.DB, s State) { nextHeight := s.LastBlockHeight + 1 saveValidatorsInfo(db, nextHeight, s.LastHeightValidatorsChanged, s.Validators) saveConsensusParamsInfo(db, nextHeight, s.LastHeightConsensusParamsChanged, s.ConsensusParams) diff --git a/state/execution.go b/state/execution.go index 88ee1127a..ce45e4c43 100644 --- a/state/execution.go +++ b/state/execution.go @@ -30,6 +30,10 @@ type BlockExecutor struct { evpool types.EvidencePool } +func (blockExec *BlockExecutor) SetTxEventPublisher(txEventPublisher types.TxEventPublisher) { + blockExec.txEventPublisher = txEventPublisher +} + // NewBlockExecutor returns a new BlockExecutor. func NewBlockExecutor(db dbm.DB, logger log.Logger, txEventer types.TxEventPublisher, proxyApp proxy.AppConnConsensus, @@ -82,8 +86,9 @@ func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block fail.Fail() // XXX - // save the state and the validators - SaveState(blockExec.db, s, appHash) + // update the app hash and save the state + s.AppHash = appHash + SaveState(blockExec.db, s) return s, nil } @@ -110,7 +115,7 @@ func (blockExec *BlockExecutor) Commit(block *types.Block) ([]byte, error) { blockExec.logger.Debug("Commit.Log: " + res.Log) } - blockExec.logger.Info("Committed state", "height", block.Height, "txs", block.NumTxs, "hash", res.Data) + blockExec.logger.Info("Committed state", "height", block.Height, "txs", block.NumTxs, "appHash", res.Data) // Update evpool blockExec.evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence) @@ -343,16 +348,14 @@ func updateState(s State, blockID types.BlockID, header *types.Header, } func fireEvents(txEventPublisher types.TxEventPublisher, block *types.Block, abciResponses *ABCIResponses) { - // TODO: Fire events - /* - tx := types.Tx(req.GetDeliverTx().Tx) + for i, tx := range block.Data.Txs { txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{ Height: block.Height, - Index: uint32(txIndex), + Index: uint32(i), Tx: tx, - Result: *txRes, + Result: *(abciResponses.DeliverTx[i]), }}) - */ + } } //---------------------------------------------------------------------------------------------------- diff --git a/state/state.go b/state/state.go index ed8a20137..7ffa56edc 100644 --- a/state/state.go +++ b/state/state.go @@ -91,7 +91,7 @@ func (s State) Bytes() []byte { // IsEmpty returns true if the State is equal to the empty State. func (s State) IsEmpty() bool { - return s.LastBlockHeight == 0 // XXX can't compare to Empty + return s.Validators == nil // XXX can't compare to Empty } // GetValidators returns the last and current validator sets. diff --git a/state/state_test.go b/state/state_test.go index cbd3c8137..7e9ed2cf8 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -57,7 +57,7 @@ func TestStateSaveLoad(t *testing.T) { assert := assert.New(t) state.LastBlockHeight++ - SaveState(stateDB, state, state.AppHash) + SaveState(stateDB, state) loadedState := LoadState(stateDB) assert.True(state.Equals(loadedState), @@ -261,7 +261,7 @@ func TestManyValidatorChangesSaveLoad(t *testing.T) { const valSetSize = 7 tearDown, stateDB, state := setupTestCase(t) state.Validators = genValSet(valSetSize) - SaveState(stateDB, state, state.AppHash) + SaveState(stateDB, state) defer tearDown(t) const height = 1 @@ -425,7 +425,7 @@ func TestLessThanOneThirdOfVotingPowerPerBlockEnforced(t *testing.T) { for i, tc := range testCases { tearDown, stateDB, state := setupTestCase(t) state.Validators = genValSet(tc.initialValSetSize) - SaveState(stateDB, state, state.AppHash) + SaveState(stateDB, state) height := state.LastBlockHeight + 1 block := makeBlock(state, height) abciResponses := &ABCIResponses{