diff --git a/CHANGELOG.md b/CHANGELOG.md index f9f0809d9..71fe223e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,7 @@ ## Roadmap BREAKING CHANGES: -- Upgrade the header to support better proofs on validtors, results, evidence, and possibly more - Better support for injecting randomness -- Pass evidence/voteInfo through ABCI - Upgrade consensus for more real-time use of evidence FEATURES: @@ -32,6 +30,27 @@ BUG FIXES: BREAKING CHANGES: - [p2p] enable the Peer Exchange reactor by default - [types] add Timestamp field to Proposal/Vote +- [types] add new fields to Header: TotalTxs, ConsensusParamsHash, LastResultsHash, EvidenceHash +- [types] add Evidence to Block +- [types] simplify ValidateBasic +- [state] updates to support changes to the header +- [state] Enforce <1/3 of validator set can change at a time + +FEATURES: +- [state] Send indices of absent validators and addresses of byzantine validators in BeginBlock +- [state] Historical ConsensusParams and ABCIResponses +- [docs] Specification for the base Tendermint data structures. +- [evidence] New evidence reactor for gossiping and managing evidence +- [rpc] `/block_results?height=X` returns the DeliverTx results for a given height. + +IMPROVEMENTS: +- [consensus] Better handling of corrupt WAL file + +BUG FIXES: +- [lite] fix race +- [state] validate block.Header.ValidatorsHash +- [p2p] allow seed addresses to be prefixed with eg. `tcp://` +- [cmd] fix `tendermint init` to ignore files that are there and generate files that aren't. ## 0.14.0 (December 11, 2017) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index f985e2849..d4b803dd6 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -4,11 +4,11 @@ import ( "bytes" "errors" "reflect" + "sync" "time" wire "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" @@ -34,29 +34,33 @@ const ( type consensusReactor interface { // for when we switch from blockchain reactor and fast sync to // the consensus machine - SwitchToConsensus(*sm.State, int) + SwitchToConsensus(sm.State, int) } // BlockchainReactor handles long-term catchup syncing. type BlockchainReactor struct { p2p.BaseReactor - state *sm.State - proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn - store *BlockStore - pool *BlockPool - fastSync bool - requestsCh chan BlockRequest - timeoutsCh chan string + mtx sync.Mutex + params types.ConsensusParams - eventBus *types.EventBus + // immutable + initialState sm.State + + blockExec *sm.BlockExecutor + store *BlockStore + pool *BlockPool + fastSync bool + requestsCh chan BlockRequest + timeoutsCh chan string } // NewBlockchainReactor returns new reactor instance. -func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor { +func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore, fastSync bool) *BlockchainReactor { if state.LastBlockHeight != store.Height() { cmn.PanicSanity(cmn.Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())) } + requestsCh := make(chan BlockRequest, defaultChannelCapacity) timeoutsCh := make(chan string, defaultChannelCapacity) pool := NewBlockPool( @@ -65,8 +69,9 @@ func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, timeoutsCh, ) bcR := &BlockchainReactor{ - state: state, - proxyAppConn: proxyAppConn, + params: state.ConsensusParams, + initialState: state, + blockExec: blockExec, store: store, pool: pool, fastSync: fastSync, @@ -183,7 +188,16 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) // maxMsgSize returns the maximum allowable size of a // message on the blockchain reactor. func (bcR *BlockchainReactor) maxMsgSize() int { - return bcR.state.ConsensusParams.BlockSize.MaxBytes + 2 + bcR.mtx.Lock() + defer bcR.mtx.Unlock() + return bcR.params.BlockSize.MaxBytes + 2 +} + +// updateConsensusParams updates the internal consensus params +func (bcR *BlockchainReactor) updateConsensusParams(params types.ConsensusParams) { + bcR.mtx.Lock() + defer bcR.mtx.Unlock() + bcR.params = params } // Handle messages from the poolReactor telling the reactor what to do. @@ -197,7 +211,8 @@ func (bcR *BlockchainReactor) poolRoutine() { blocksSynced := 0 - chainID := bcR.state.ChainID + chainID := bcR.initialState.ChainID + state := bcR.initialState lastHundred := time.Now() lastRate := 0.0 @@ -236,7 +251,7 @@ FOR_LOOP: bcR.pool.Stop() conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) - conR.SwitchToConsensus(bcR.state, blocksSynced) + conR.SwitchToConsensus(state, blocksSynced) break FOR_LOOP } @@ -251,14 +266,15 @@ FOR_LOOP: // We need both to sync the first block. break SYNC_LOOP } - firstParts := first.MakePartSet(bcR.state.ConsensusParams.BlockPartSizeBytes) + firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes) firstPartsHeader := firstParts.Header() + firstID := types.BlockID{first.Hash(), firstPartsHeader} // Finally, verify the first block using the second's commit // NOTE: we can probably make this more efficient, but note that calling // first.Hash() doesn't verify the tx contents, so MakePartSet() is // currently necessary. - err := bcR.state.Validators.VerifyCommit( - chainID, types.BlockID{first.Hash(), firstPartsHeader}, first.Height, second.LastCommit) + err := state.Validators.VerifyCommit( + chainID, firstID, first.Height, second.LastCommit) if err != nil { bcR.Logger.Error("Error in validation", "err", err) bcR.pool.RedoRequest(first.Height) @@ -268,19 +284,20 @@ FOR_LOOP: bcR.store.SaveBlock(first, firstParts, second.LastCommit) - // TODO: should we be firing events? need to fire NewBlock events manually ... // NOTE: we could improve performance if we // didn't make the app commit to disk every block // ... but we would need a way to get the hash without it persisting - err := bcR.state.ApplyBlock(bcR.eventBus, bcR.proxyAppConn, - first, firstPartsHeader, - types.MockMempool{}, types.MockEvidencePool{}) // TODO unmock! + var err error + state, err = bcR.blockExec.ApplyBlock(state, firstID, first) if err != nil { // TODO This is bad, are we zombie? cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } blocksSynced += 1 + // update the consensus params + bcR.updateConsensusParams(state.ConsensusParams) + if blocksSynced%100 == 0 { lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height, @@ -302,11 +319,6 @@ func (bcR *BlockchainReactor) BroadcastStatusRequest() error { return nil } -// SetEventBus sets event bus. -func (bcR *BlockchainReactor) SetEventBus(b *types.EventBus) { - bcR.eventBus = b -} - //----------------------------------------------------------------------------- // Messages diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 36cdc080d..fcb8a6f86 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -10,19 +10,15 @@ import ( cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) -func makeStateAndBlockStore(logger log.Logger) (*sm.State, *BlockStore) { +func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) { config := cfg.ResetTestRoot("blockchain_reactor_test") blockStore := NewBlockStore(dbm.NewMemDB()) - - // Get State - state, _ := sm.GetState(dbm.NewMemDB(), config.GenesisFile()) - state.SetLogger(logger.With("module", "state")) - state.Save() - + state, _ := sm.LoadStateFromDBOrGenesisFile(dbm.NewMemDB(), config.GenesisFile()) return state, blockStore } @@ -31,7 +27,10 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe // Make the blockchainReactor itself fastSync := true - bcReactor := NewBlockchainReactor(state.Copy(), nil, blockStore, fastSync) + var nilApp proxy.AppConnConsensus + blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), nilApp, types.MockMempool{}, types.MockEvidencePool{}) + + bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor.SetLogger(logger.With("module", "blockchain")) // Next: we need to set a switch in order for peers to be added in @@ -51,7 +50,7 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe func TestNoBlockMessageResponse(t *testing.T) { maxBlockHeight := int64(20) - bcr := newBlockchainReactor(log.NewNopLogger(), maxBlockHeight) + bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight) bcr.Start() defer bcr.Stop() @@ -71,6 +70,8 @@ func TestNoBlockMessageResponse(t *testing.T) { {100, false}, } + // receive a request message from peer, + // wait to hear response for _, tt := range tests { reqBlockMsg := &bcBlockRequestMessage{tt.height} reqBlockBytes := wire.BinaryBytes(struct{ BlockchainMessage }{reqBlockMsg}) @@ -104,7 +105,7 @@ func makeTxs(height int64) (txs []types.Tx) { return txs } -func makeBlock(height int64, state *sm.State) *types.Block { +func makeBlock(height int64, state sm.State) *types.Block { block, _ := state.MakeBlock(height, makeTxs(height), new(types.Commit)) return block } diff --git a/consensus/common_test.go b/consensus/common_test.go index 6598c15eb..249e77329 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -235,16 +235,16 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { //------------------------------------------------------------------------------- // consensus states -func newConsensusState(state *sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState { +func newConsensusState(state sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState { return newConsensusStateWithConfig(config, state, pv, app) } -func newConsensusStateWithConfig(thisConfig *cfg.Config, state *sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState { +func newConsensusStateWithConfig(thisConfig *cfg.Config, state sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState { blockDB := dbm.NewMemDB() return newConsensusStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB) } -func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state *sm.State, pv types.PrivValidator, app abci.Application, blockDB dbm.DB) *ConsensusState { +func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.State, pv types.PrivValidator, app abci.Application, blockDB dbm.DB) *ConsensusState { // Get BlockStore blockStore := bc.NewBlockStore(blockDB) @@ -264,7 +264,9 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state *sm. evpool := types.MockEvidencePool{} // Make ConsensusReactor - cs := NewConsensusState(thisConfig.Consensus, state, proxyAppConnCon, blockStore, mempool, evpool) + stateDB := dbm.NewMemDB() + blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool) + cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) cs.SetLogger(log.TestingLogger()) cs.SetPrivValidator(pv) @@ -284,9 +286,7 @@ func loadPrivValidator(config *cfg.Config) *types.PrivValidatorFS { } func fixedConsensusStateDummy(config *cfg.Config, logger log.Logger) *ConsensusState { - stateDB := dbm.NewMemDB() - state, _ := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile()) - state.SetLogger(logger.With("module", "state")) + state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile()) privValidator := loadPrivValidator(config) cs := newConsensusState(state, privValidator, dummy.NewDummyApplication()) cs.SetLogger(logger) @@ -354,10 +354,8 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou css := make([]*ConsensusState, nValidators) logger := consensusLogger() for i := 0; i < nValidators; i++ { - db := dbm.NewMemDB() // each state needs its own db - state, _ := sm.MakeGenesisState(db, genDoc) - state.SetLogger(logger.With("module", "state", "validator", i)) - state.Save() + stateDB := dbm.NewMemDB() // each state needs its own db + state, _ := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc) thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i)) for _, opt := range configOpts { opt(thisConfig) @@ -380,10 +378,8 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF css := make([]*ConsensusState, nPeers) logger := consensusLogger() for i := 0; i < nPeers; i++ { - db := dbm.NewMemDB() // each state needs its own db - state, _ := sm.MakeGenesisState(db, genDoc) - state.SetLogger(logger.With("module", "state", "validator", i)) - state.Save() + stateDB := dbm.NewMemDB() // each state needs its own db + state, _ := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc) thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i)) ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal var privVal types.PrivValidator @@ -437,12 +433,11 @@ func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.G }, privValidators } -func randGenesisState(numValidators int, randPower bool, minPower int64) (*sm.State, []*types.PrivValidatorFS) { +func randGenesisState(numValidators int, randPower bool, minPower int64) (sm.State, []*types.PrivValidatorFS) { genDoc, privValidators := randGenesisDoc(numValidators, randPower, minPower) + s0, _ := sm.MakeGenesisState(genDoc) db := dbm.NewMemDB() - s0, _ := sm.MakeGenesisState(db, genDoc) - s0.SetLogger(log.TestingLogger().With("module", "state")) - s0.Save() + sm.SaveState(db, s0) return s0, privValidators } diff --git a/consensus/reactor.go b/consensus/reactor.go index eb752ee17..9b3393e94 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -82,7 +82,7 @@ func (conR *ConsensusReactor) OnStop() { // SwitchToConsensus switches from fast_sync mode to consensus mode. // It resets the state, turns off fast_sync, and starts the consensus state-machine -func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State, blocksSynced int) { +func (conR *ConsensusReactor) SwitchToConsensus(state sm.State, blocksSynced int) { conR.Logger.Info("SwitchToConsensus") conR.conS.reconstructLastCommit(state) // NOTE: The line below causes broadcastNewRoundStepRoutine() to diff --git a/consensus/replay.go b/consensus/replay.go index 209ea5972..784e8bd6e 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -13,6 +13,7 @@ import ( abci "github.com/tendermint/abci/types" //auto "github.com/tendermint/tmlibs/autofile" cmn "github.com/tendermint/tmlibs/common" + dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" "github.com/tendermint/tendermint/proxy" @@ -186,15 +187,16 @@ func makeHeightSearchFunc(height int64) auto.SearchFunc { // we were last and using the WAL to recover there type Handshaker struct { - state *sm.State - store types.BlockStore - logger log.Logger + stateDB dbm.DB + initialState sm.State + store types.BlockStore + logger log.Logger nBlocks int // number of blocks applied to the state } -func NewHandshaker(state *sm.State, store types.BlockStore) *Handshaker { - return &Handshaker{state, store, log.NewNopLogger(), 0} +func NewHandshaker(stateDB dbm.DB, state sm.State, store types.BlockStore) *Handshaker { + return &Handshaker{stateDB, state, store, log.NewNopLogger(), 0} } func (h *Handshaker) SetLogger(l log.Logger) { @@ -224,7 +226,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { // TODO: check version // replay blocks up to the latest in the blockstore - _, err = h.ReplayBlocks(appHash, blockHeight, proxyApp) + _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) if err != nil { return fmt.Errorf("Error on replay: %v", err) } @@ -238,15 +240,15 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { // Replay all blocks since appBlockHeight and ensure the result matches the current state. // Returns the final AppHash or an error -func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns) ([]byte, error) { +func (h *Handshaker) ReplayBlocks(state sm.State, appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns) ([]byte, error) { storeBlockHeight := h.store.Height() - stateBlockHeight := h.state.LastBlockHeight + stateBlockHeight := state.LastBlockHeight h.logger.Info("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight) // If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain if appBlockHeight == 0 { - validators := types.TM2PB.Validators(h.state.Validators) + validators := types.TM2PB.Validators(state.Validators) if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { return nil, err } @@ -254,7 +256,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp // First handle edge cases and constraints on the storeBlockHeight if storeBlockHeight == 0 { - return appHash, h.checkAppHash(appHash) + return appHash, checkAppHash(state, appHash) } else if storeBlockHeight < appBlockHeight { // the app should never be ahead of the store (but this is under app's control) @@ -269,6 +271,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp cmn.PanicSanity(cmn.Fmt("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1)) } + var err error // Now either store is equal to state, or one ahead. // For each, consider all cases of where the app could be, given app <= store if storeBlockHeight == stateBlockHeight { @@ -276,11 +279,11 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp // Either the app is asking for replay, or we're all synced up. if appBlockHeight < storeBlockHeight { // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) - return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, false) + return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false) } else if appBlockHeight == storeBlockHeight { // We're good! - return appHash, h.checkAppHash(appHash) + return appHash, checkAppHash(state, appHash) } } else if storeBlockHeight == stateBlockHeight+1 { @@ -289,7 +292,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp if appBlockHeight < stateBlockHeight { // the app is further behind than it should be, so replay blocks // but leave the last block to go through the WAL - return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, true) + return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true) } else if appBlockHeight == stateBlockHeight { // We haven't run Commit (both the state and app are one block behind), @@ -297,17 +300,19 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp // NOTE: We could instead use the cs.WAL on cs.Start, // but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT h.logger.Info("Replay last block using real app") - return h.replayBlock(storeBlockHeight, proxyApp.Consensus()) + state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus()) + return state.AppHash, err } else if appBlockHeight == storeBlockHeight { // We ran Commit, but didn't save the state, so replayBlock with mock app - abciResponses, err := h.state.LoadABCIResponses(storeBlockHeight) + abciResponses, err := sm.LoadABCIResponses(h.stateDB, storeBlockHeight) if err != nil { return nil, err } mockApp := newMockProxyApp(appHash, abciResponses) h.logger.Info("Replay last block using mock app") - return h.replayBlock(storeBlockHeight, mockApp) + state, err = h.replayBlock(state, storeBlockHeight, mockApp) + return state.AppHash, err } } @@ -316,7 +321,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp return nil, nil } -func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) { +func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) { // App is further behind than it should be, so we need to replay blocks. // We replay all blocks from appBlockHeight+1. // @@ -336,7 +341,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store for i := appBlockHeight + 1; i <= finalBlock; i++ { h.logger.Info("Applying block", "height", i) block := h.store.LoadBlock(i) - appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, h.state.LastValidators) + appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger) if err != nil { return nil, err } @@ -346,33 +351,37 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store if mutateState { // sync the final block - return h.replayBlock(storeBlockHeight, proxyApp.Consensus()) + state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus()) + if err != nil { + return nil, err + } + appHash = state.AppHash } - return appHash, h.checkAppHash(appHash) + return appHash, checkAppHash(state, appHash) } // ApplyBlock on the proxyApp with the last block. -func (h *Handshaker) replayBlock(height int64, proxyApp proxy.AppConnConsensus) ([]byte, error) { - mempool := types.MockMempool{} - evpool := types.MockEvidencePool{} - +func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) { block := h.store.LoadBlock(height) meta := h.store.LoadBlockMeta(height) - if err := h.state.ApplyBlock(types.NopEventBus{}, proxyApp, - block, meta.BlockID.PartsHeader, mempool, evpool); err != nil { - return nil, err + blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, types.MockMempool{}, types.MockEvidencePool{}) + + var err error + state, err = blockExec.ApplyBlock(state, meta.BlockID, block) + if err != nil { + return sm.State{}, err } h.nBlocks += 1 - return h.state.AppHash, nil + return state, nil } -func (h *Handshaker) checkAppHash(appHash []byte) error { - if !bytes.Equal(h.state.AppHash, appHash) { - panic(fmt.Errorf("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash).Error()) +func checkAppHash(state sm.State, appHash []byte) error { + if !bytes.Equal(state.AppHash, appHash) { + panic(fmt.Errorf("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, state.AppHash).Error()) } return nil } diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 4db58ada8..26b8baebf 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -18,6 +18,7 @@ import ( "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" + "github.com/tendermint/tmlibs/log" ) const ( @@ -104,11 +105,11 @@ type playback struct { count int // how many lines/msgs into the file are we // replays can be reset to beginning - fileName string // so we can close/reopen the file - genesisState *sm.State // so the replay session knows where to restart from + fileName string // so we can close/reopen the file + genesisState sm.State // so the replay session knows where to restart from } -func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.State) *playback { +func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState sm.State) *playback { return &playback{ cs: cs, fp: fp, @@ -123,7 +124,7 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { pb.cs.Stop() pb.cs.Wait() - newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn, + newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, pb.cs.blockStore, pb.cs.mempool, pb.cs.evpool) newCS.SetEventBus(pb.cs.eventBus) newCS.startForReplay() @@ -285,14 +286,14 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo // Get State stateDB := dbm.NewDB("state", config.DBBackend, config.DBDir()) - state, err := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile()) + state, err := sm.MakeGenesisStateFromFile(config.GenesisFile()) if err != nil { cmn.Exit(err.Error()) } // Create proxyAppConn connection (consensus, mempool, query) clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()) - proxyApp := proxy.NewAppConns(clientCreator, NewHandshaker(state, blockStore)) + proxyApp := proxy.NewAppConns(clientCreator, NewHandshaker(stateDB, state, blockStore)) err = proxyApp.Start() if err != nil { cmn.Exit(cmn.Fmt("Error starting proxy app conns: %v", err)) @@ -303,8 +304,11 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err)) } - consensusState := NewConsensusState(csConfig, state.Copy(), proxyApp.Consensus(), - blockStore, types.MockMempool{}, types.MockEvidencePool{}) + mempool, evpool := types.MockMempool{}, types.MockEvidencePool{} + blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) + + consensusState := NewConsensusState(csConfig, state.Copy(), blockExec, + blockStore, mempool, evpool) consensusState.SetEventBus(eventBus) return consensusState diff --git a/consensus/replay_test.go b/consensus/replay_test.go index f1a060ec1..c497ed546 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -53,8 +53,7 @@ func init() { func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64, blockDB dbm.DB, stateDB dbm.DB) { logger := log.TestingLogger() - state, _ := sm.GetState(stateDB, consensusReplayConfig.GenesisFile()) - state.SetLogger(logger.With("module", "state")) + state, _ := sm.LoadStateFromDBOrGenesisFile(stateDB, consensusReplayConfig.GenesisFile()) privValidator := loadPrivValidator(consensusReplayConfig) cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB) cs.SetLogger(logger) @@ -98,22 +97,22 @@ func sendTxs(cs *ConsensusState, ctx context.Context) { func TestWALCrash(t *testing.T) { testCases := []struct { name string - initFn func(*ConsensusState, context.Context) + initFn func(dbm.DB, *ConsensusState, context.Context) heightToStop int64 }{ {"empty block", - func(cs *ConsensusState, ctx context.Context) {}, + func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {}, 1}, {"block with a smaller part size", - func(cs *ConsensusState, ctx context.Context) { + func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) { // XXX: is there a better way to change BlockPartSizeBytes? cs.state.ConsensusParams.BlockPartSizeBytes = 512 - cs.state.Save() + sm.SaveState(stateDB, cs.state) go sendTxs(cs, ctx) }, 1}, {"many non-empty blocks", - func(cs *ConsensusState, ctx context.Context) { + func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) { go sendTxs(cs, ctx) }, 3}, @@ -126,7 +125,7 @@ func TestWALCrash(t *testing.T) { } } -func crashWALandCheckLiveness(t *testing.T, initFn func(*ConsensusState, context.Context), heightToStop int64) { +func crashWALandCheckLiveness(t *testing.T, initFn func(dbm.DB, *ConsensusState, context.Context), heightToStop int64) { walPaniced := make(chan error) crashingWal := &crashingWAL{panicCh: walPaniced, heightToStop: heightToStop} @@ -139,8 +138,7 @@ LOOP: // create consensus state from a clean slate logger := log.NewNopLogger() stateDB := dbm.NewMemDB() - state, _ := sm.MakeGenesisStateFromFile(stateDB, consensusReplayConfig.GenesisFile()) - state.SetLogger(logger.With("module", "state")) + state, _ := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile()) privValidator := loadPrivValidator(consensusReplayConfig) blockDB := dbm.NewMemDB() cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB) @@ -148,7 +146,7 @@ LOOP: // start sending transactions ctx, cancel := context.WithCancel(context.Background()) - initFn(cs, ctx) + initFn(stateDB, cs, ctx) // clean up WAL file from the previous iteration walFile := cs.config.WalFile() @@ -344,12 +342,13 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { t.Fatalf(err.Error()) } - state, store := stateAndStore(config, privVal.GetPubKey()) + stateDB, state, store := stateAndStore(config, privVal.GetPubKey()) store.chain = chain store.commits = commits // run the chain through state.ApplyBlock to build up the tendermint state - latestAppHash := buildTMStateFromChain(config, state, chain, mode) + state = buildTMStateFromChain(config, stateDB, state, chain, mode) + latestAppHash := state.AppHash // make a new client creator dummyApp := dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "2")) @@ -358,12 +357,12 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { // run nBlocks against a new client to build up the app state. // use a throwaway tendermint state proxyApp := proxy.NewAppConns(clientCreator2, nil) - state, _ := stateAndStore(config, privVal.GetPubKey()) - buildAppStateFromChain(proxyApp, state, chain, nBlocks, mode) + stateDB, state, _ := stateAndStore(config, privVal.GetPubKey()) + buildAppStateFromChain(proxyApp, stateDB, state, chain, nBlocks, mode) } // now start the app using the handshake - it should sync - handshaker := NewHandshaker(state, store) + handshaker := NewHandshaker(stateDB, state, store) proxyApp := proxy.NewAppConns(clientCreator2, handshaker) if err := proxyApp.Start(); err != nil { t.Fatalf("Error starting proxy app connections: %v", err) @@ -393,16 +392,20 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { } } -func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) { +func applyBlock(stateDB dbm.DB, st sm.State, blk *types.Block, proxyApp proxy.AppConns) sm.State { testPartSize := st.ConsensusParams.BlockPartSizeBytes - err := st.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool, evpool) + blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) + + blkID := types.BlockID{blk.Hash(), blk.MakePartSet(testPartSize).Header()} + newState, err := blockExec.ApplyBlock(st, blkID, blk) if err != nil { panic(err) } + return newState } -func buildAppStateFromChain(proxyApp proxy.AppConns, - state *sm.State, chain []*types.Block, nBlocks int, mode uint) { +func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB, + state sm.State, chain []*types.Block, nBlocks int, mode uint) { // start a new app without handshake, play nBlocks blocks if err := proxyApp.Start(); err != nil { panic(err) @@ -418,24 +421,24 @@ func buildAppStateFromChain(proxyApp proxy.AppConns, case 0: for i := 0; i < nBlocks; i++ { block := chain[i] - applyBlock(state, block, proxyApp) + state = applyBlock(stateDB, state, block, proxyApp) } case 1, 2: for i := 0; i < nBlocks-1; i++ { block := chain[i] - applyBlock(state, block, proxyApp) + state = applyBlock(stateDB, state, block, proxyApp) } if mode == 2 { // update the dummy height and apphash // as if we ran commit but not - applyBlock(state, chain[nBlocks-1], proxyApp) + state = applyBlock(stateDB, state, chain[nBlocks-1], proxyApp) } } } -func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.Block, mode uint) []byte { +func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, chain []*types.Block, mode uint) sm.State { // run the whole chain against this client to build up the tendermint state clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "1"))) proxyApp := proxy.NewAppConns(clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock)) @@ -449,31 +452,26 @@ func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.B panic(err) } - var latestAppHash []byte - switch mode { case 0: // sync right up for _, block := range chain { - applyBlock(state, block, proxyApp) + state = applyBlock(stateDB, state, block, proxyApp) } - latestAppHash = state.AppHash case 1, 2: // sync up to the penultimate as if we stored the block. // whether we commit or not depends on the appHash for _, block := range chain[:len(chain)-1] { - applyBlock(state, block, proxyApp) + state = applyBlock(stateDB, state, block, proxyApp) } // apply the final block to a state copy so we can // get the right next appHash but keep the state back - stateCopy := state.Copy() - applyBlock(stateCopy, chain[len(chain)-1], proxyApp) - latestAppHash = stateCopy.AppHash + applyBlock(stateDB, state, chain[len(chain)-1], proxyApp) } - return latestAppHash + return state } //-------------------------- @@ -587,13 +585,11 @@ func readPieceFromWAL(msg *TimedWALMessage) interface{} { } // fresh state and mock store -func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (*sm.State, *mockBlockStore) { +func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (dbm.DB, sm.State, *mockBlockStore) { stateDB := dbm.NewMemDB() - state, _ := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile()) - state.SetLogger(log.TestingLogger().With("module", "state")) - + state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile()) store := NewMockBlockStore(config, state.ConsensusParams) - return state, store + return stateDB, state, store } //---------------------------------- diff --git a/consensus/state.go b/consensus/state.go index 5e83e6a56..518d81c58 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -17,7 +17,6 @@ import ( cfg "github.com/tendermint/tendermint/config" cstypes "github.com/tendermint/tendermint/consensus/types" - "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -75,15 +74,16 @@ type ConsensusState struct { privValidator types.PrivValidator // for signing votes // services for creating and executing blocks - proxyAppConn proxy.AppConnConsensus - blockStore types.BlockStore - mempool types.Mempool - evpool types.EvidencePool + // TODO: encapsulate all of this in one "BlockManager" + blockExec *sm.BlockExecutor + blockStore types.BlockStore + mempool types.Mempool + evpool types.EvidencePool // internal state mtx sync.Mutex cstypes.RoundState - state *sm.State // State until height-1. + state sm.State // State until height-1. // state changes may be triggered by msgs from peers, // msgs from ourself, or by timeouts @@ -114,10 +114,10 @@ type ConsensusState struct { } // NewConsensusState returns a new ConsensusState. -func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState { +func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState { cs := &ConsensusState{ config: config, - proxyAppConn: proxyAppConn, + blockExec: blockExec, blockStore: blockStore, mempool: mempool, peerMsgQueue: make(chan msgInfo, msgQueueSize), @@ -153,6 +153,7 @@ func (cs *ConsensusState) SetLogger(l log.Logger) { // SetEventBus sets event bus. func (cs *ConsensusState) SetEventBus(b *types.EventBus) { cs.eventBus = b + cs.blockExec.SetEventBus(b) } // String returns a string. @@ -162,7 +163,7 @@ func (cs *ConsensusState) String() string { } // GetState returns a copy of the chain state. -func (cs *ConsensusState) GetState() *sm.State { +func (cs *ConsensusState) GetState() sm.State { cs.mtx.Lock() defer cs.mtx.Unlock() return cs.state.Copy() @@ -399,7 +400,7 @@ func (cs *ConsensusState) sendInternalMessage(mi msgInfo) { // Reconstruct LastCommit from SeenCommit, which we saved along with the block, // (which happens even before saving the state) -func (cs *ConsensusState) reconstructLastCommit(state *sm.State) { +func (cs *ConsensusState) reconstructLastCommit(state sm.State) { if state.LastBlockHeight == 0 { return } @@ -422,12 +423,12 @@ func (cs *ConsensusState) reconstructLastCommit(state *sm.State) { // Updates ConsensusState and increments height to match that of state. // The round becomes 0 and cs.Step becomes cstypes.RoundStepNewHeight. -func (cs *ConsensusState) updateToState(state *sm.State) { +func (cs *ConsensusState) updateToState(state sm.State) { if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight { cmn.PanicSanity(cmn.Fmt("updateToState() expected state height of %v but found %v", cs.Height, state.LastBlockHeight)) } - if cs.state != nil && cs.state.LastBlockHeight+1 != cs.Height { + if !cs.state.IsEmpty() && cs.state.LastBlockHeight+1 != cs.Height { // This might happen when someone else is mutating cs.state. // Someone forgot to pass in state.Copy() somewhere?! cmn.PanicSanity(cmn.Fmt("Inconsistent cs.state.LastBlockHeight+1 %v vs cs.Height %v", @@ -437,7 +438,7 @@ func (cs *ConsensusState) updateToState(state *sm.State) { // If state isn't further out than cs.state, just ignore. // This happens when SwitchToConsensus() is called in the reactor. // We don't want to reset e.g. the Votes. - if cs.state != nil && (state.LastBlockHeight <= cs.state.LastBlockHeight) { + if !cs.state.IsEmpty() && (state.LastBlockHeight <= cs.state.LastBlockHeight) { cs.Logger.Info("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1) return } @@ -922,7 +923,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int64, round int) { } // Validate proposal block - err := cs.state.ValidateBlock(cs.ProposalBlock) + err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock) if err != nil { // ProposalBlock is invalid, prevote nil. logger.Error("enterPrevote: ProposalBlock is invalid", "err", err) @@ -1030,7 +1031,7 @@ func (cs *ConsensusState) enterPrecommit(height int64, round int) { if cs.ProposalBlock.HashesTo(blockID.Hash) { cs.Logger.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", blockID.Hash) // Validate the block. - if err := cs.state.ValidateBlock(cs.ProposalBlock); err != nil { + if err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock); err != nil { cmn.PanicConsensus(cmn.Fmt("enterPrecommit: +2/3 prevoted for an invalid block: %v", err)) } cs.LockedRound = round @@ -1165,7 +1166,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) { if !block.HashesTo(blockID.Hash) { cmn.PanicSanity(cmn.Fmt("Cannot finalizeCommit, ProposalBlock does not hash to commit hash")) } - if err := cs.state.ValidateBlock(block); err != nil { + if err := cs.blockExec.ValidateBlock(cs.state, block); err != nil { cmn.PanicConsensus(cmn.Fmt("+2/3 committed an invalid block: %v", err)) } @@ -1203,14 +1204,11 @@ func (cs *ConsensusState) finalizeCommit(height int64) { // Create a copy of the state for staging // and an event cache for txs stateCopy := cs.state.Copy() - txEventBuffer := types.NewTxEventBuffer(cs.eventBus, int(block.NumTxs)) // Execute and commit the block, update and save the state, and update the mempool. - // All calls to the proxyAppConn come here. // NOTE: the block.AppHash wont reflect these txs until the next block - err := stateCopy.ApplyBlock(txEventBuffer, cs.proxyAppConn, - block, blockParts.Header(), - cs.mempool, cs.evpool) + var err error + stateCopy, err = cs.blockExec.ApplyBlock(stateCopy, types.BlockID{block.Hash(), blockParts.Header()}, block) if err != nil { cs.Logger.Error("Error on ApplyBlock. Did the application crash? Please restart tendermint", "err", err) err := cmn.Kill() @@ -1222,22 +1220,6 @@ func (cs *ConsensusState) finalizeCommit(height int64) { fail.Fail() // XXX - // Fire event for new block. - // NOTE: If we fail before firing, these events will never fire - // - // TODO: Either - // * Fire before persisting state, in ApplyBlock - // * Fire on start up if we haven't written any new WAL msgs - // Both options mean we may fire more than once. Is that fine ? - cs.eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}) - cs.eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}) - err = txEventBuffer.Flush() - if err != nil { - cs.Logger.Error("Failed to flush event buffer", "err", err) - } - - fail.Fail() // XXX - // NewHeightStep! cs.updateToState(stateCopy) diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index 73ad3e7fc..45609e568 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -47,13 +47,12 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { } stateDB := db.NewMemDB() blockStoreDB := db.NewMemDB() - state, err := sm.MakeGenesisState(stateDB, genDoc) - state.SetLogger(logger.With("module", "state")) + state, err := sm.MakeGenesisState(genDoc) if err != nil { return nil, errors.Wrap(err, "failed to make genesis state") } blockStore := bc.NewBlockStore(blockStoreDB) - handshaker := NewHandshaker(state, blockStore) + handshaker := NewHandshaker(stateDB, state, blockStore) proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app), handshaker) proxyApp.SetLogger(logger.With("module", "proxy")) if err := proxyApp.Start(); err != nil { @@ -68,7 +67,8 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { defer eventBus.Stop() mempool := types.MockMempool{} evpool := types.MockEvidencePool{} - consensusState := NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool, evpool) + blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) + consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState.SetLogger(logger) consensusState.SetEventBus(eventBus) if privValidator != nil { diff --git a/evidence/pool.go b/evidence/pool.go index 2296ac028..07c351343 100644 --- a/evidence/pool.go +++ b/evidence/pool.go @@ -1,30 +1,40 @@ package evidence import ( + "fmt" + "sync" + + dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" + sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) // EvidencePool maintains a pool of valid evidence // in an EvidenceStore. type EvidencePool struct { - params types.EvidenceParams logger log.Logger - state types.State // TODO: update this on commit! evidenceStore *EvidenceStore + // needed to load validators to verify evidence + stateDB dbm.DB + + // latest state + mtx sync.Mutex + state sm.State + // never close evidenceChan chan types.Evidence } -func NewEvidencePool(params types.EvidenceParams, evidenceStore *EvidenceStore, state types.State) *EvidencePool { +func NewEvidencePool(stateDB dbm.DB, evidenceStore *EvidenceStore) *EvidencePool { evpool := &EvidencePool{ - params: params, + stateDB: stateDB, + state: sm.LoadState(stateDB), logger: log.NewNopLogger(), evidenceStore: evidenceStore, - state: state, evidenceChan: make(chan types.Evidence), } return evpool @@ -50,19 +60,45 @@ func (evpool *EvidencePool) PendingEvidence() []types.Evidence { return evpool.evidenceStore.PendingEvidence() } +// State returns the current state of the evpool. +func (evpool *EvidencePool) State() sm.State { + evpool.mtx.Lock() + defer evpool.mtx.Unlock() + return evpool.state +} + +// Update loads the latest +func (evpool *EvidencePool) Update(block *types.Block) { + evpool.mtx.Lock() + defer evpool.mtx.Unlock() + + state := sm.LoadState(evpool.stateDB) + if state.LastBlockHeight != block.Height { + panic(fmt.Sprintf("EvidencePool.Update: loaded state with height %d when block.Height=%d", state.LastBlockHeight, block.Height)) + } + evpool.state = state + + // NOTE: shouldn't need the mutex + evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence) +} + // AddEvidence checks the evidence is valid and adds it to the pool. // Blocks on the EvidenceChan. func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { + // TODO: check if we already have evidence for this // validator at this height so we dont get spammed - priority, err := evpool.state.VerifyEvidence(evidence) - if err != nil { - // TODO: if err is just that we cant find it cuz we pruned, ignore. - // TODO: if its actually bad evidence, punish peer + if err := sm.VerifyEvidence(evpool.stateDB, evpool.State(), evidence); err != nil { return err } + // fetch the validator and return its voting power as its priority + // TODO: something better ? + valset, _ := sm.LoadValidators(evpool.stateDB, evidence.Height()) + _, val := valset.GetByAddress(evidence.Address()) + priority := val.VotingPower + added := evpool.evidenceStore.AddNewEvidence(evidence, priority) if !added { // evidence already known, just ignore diff --git a/evidence/pool_test.go b/evidence/pool_test.go index 0997505ca..f5b5205b3 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -3,29 +3,57 @@ package evidence import ( "sync" "testing" + "time" "github.com/stretchr/testify/assert" + sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" dbm "github.com/tendermint/tmlibs/db" ) -type mockState struct{} +var mockState = sm.State{} -func (m mockState) VerifyEvidence(ev types.Evidence) (int64, error) { - err := ev.Verify("") - return 10, err +func initializeValidatorState(valAddr []byte, height int64) dbm.DB { + stateDB := dbm.NewMemDB() + + // create validator set and state + valSet := &types.ValidatorSet{ + Validators: []*types.Validator{ + {Address: valAddr}, + }, + } + state := sm.State{ + LastBlockHeight: 0, + LastBlockTime: time.Now(), + Validators: valSet, + LastHeightValidatorsChanged: 1, + ConsensusParams: types.ConsensusParams{ + EvidenceParams: types.EvidenceParams{ + MaxAge: 1000000, + }, + }, + } + + // save all states up to height + for i := int64(0); i < height; i++ { + state.LastBlockHeight = i + sm.SaveState(stateDB, state) + } + + return stateDB } func TestEvidencePool(t *testing.T) { assert := assert.New(t) - params := types.EvidenceParams{} + valAddr := []byte("val1") + height := int64(5) + stateDB := initializeValidatorState(valAddr, height) store := NewEvidenceStore(dbm.NewMemDB()) - state := mockState{} - pool := NewEvidencePool(params, store, state) + pool := NewEvidencePool(stateDB, store) - goodEvidence := newMockGoodEvidence(5, 1, []byte("val1")) + goodEvidence := newMockGoodEvidence(height, 0, valAddr) badEvidence := MockBadEvidence{goodEvidence} err := pool.AddEvidence(badEvidence) diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index fb83667ce..77c58734e 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -32,15 +32,14 @@ func evidenceLogger() log.Logger { } // connect N evidence reactors through N switches -func makeAndConnectEvidenceReactors(config *cfg.Config, N int) []*EvidenceReactor { +func makeAndConnectEvidenceReactors(config *cfg.Config, stateDBs []dbm.DB) []*EvidenceReactor { + N := len(stateDBs) reactors := make([]*EvidenceReactor, N) logger := evidenceLogger() for i := 0; i < N; i++ { - params := types.EvidenceParams{} store := NewEvidenceStore(dbm.NewMemDB()) - state := mockState{} - pool := NewEvidencePool(params, store, state) + pool := NewEvidencePool(stateDBs[i], store) reactors[i] = NewEvidenceReactor(pool) reactors[i].SetLogger(logger.With("validator", i)) } @@ -99,10 +98,10 @@ func _waitForEvidence(t *testing.T, wg *sync.WaitGroup, evs types.EvidenceList, wg.Done() } -func sendEvidence(t *testing.T, evpool *EvidencePool, n int) types.EvidenceList { +func sendEvidence(t *testing.T, evpool *EvidencePool, valAddr []byte, n int) types.EvidenceList { evList := make([]types.Evidence, n) for i := 0; i < n; i++ { - ev := newMockGoodEvidence(int64(i), 2, []byte("val")) + ev := newMockGoodEvidence(int64(i+1), 0, valAddr) err := evpool.AddEvidence(ev) assert.Nil(t, err) evList[i] = ev @@ -111,17 +110,28 @@ func sendEvidence(t *testing.T, evpool *EvidencePool, n int) types.EvidenceList } var ( - NUM_EVIDENCE = 1000 + NUM_EVIDENCE = 1 TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow ) func TestReactorBroadcastEvidence(t *testing.T) { config := cfg.TestConfig() N := 7 - reactors := makeAndConnectEvidenceReactors(config, N) - // send a bunch of evidence to the first reactor's evpool + // create statedb for everyone + stateDBs := make([]dbm.DB, N) + valAddr := []byte("myval") + // we need validators saved for heights at least as high as we have evidence for + height := int64(NUM_EVIDENCE) + 10 + for i := 0; i < N; i++ { + stateDBs[i] = initializeValidatorState(valAddr, height) + } + + // make reactors from statedb + reactors := makeAndConnectEvidenceReactors(config, stateDBs) + + // send a bunch of valid evidence to the first reactor's evpool // and wait for them all to be received in the others - evList := sendEvidence(t, reactors[0].evpool, NUM_EVIDENCE) + evList := sendEvidence(t, reactors[0].evpool, valAddr, NUM_EVIDENCE) waitForEvidence(t, evList, reactors) } diff --git a/evidence/store_test.go b/evidence/store_test.go index 7828d37b9..192aabc2f 100644 --- a/evidence/store_test.go +++ b/evidence/store_test.go @@ -113,12 +113,14 @@ func TestStorePriority(t *testing.T) { //------------------------------------------- const ( - evidenceTypeMock = byte(0x01) + evidenceTypeMockGood = byte(0x01) + evidenceTypeMockBad = byte(0x02) ) var _ = wire.RegisterInterface( struct{ types.Evidence }{}, - wire.ConcreteType{MockGoodEvidence{}, evidenceTypeMock}, + wire.ConcreteType{MockGoodEvidence{}, evidenceTypeMockGood}, + wire.ConcreteType{MockBadEvidence{}, evidenceTypeMockBad}, ) type MockGoodEvidence struct { diff --git a/node/node.go b/node/node.go index 53eab6e0e..f922d8321 100644 --- a/node/node.go +++ b/node/node.go @@ -102,7 +102,8 @@ type Node struct { trustMetricStore *trust.TrustMetricStore // trust metrics for all peers // services - eventBus *types.EventBus // pub/sub for services + eventBus *types.EventBus // pub/sub for services + stateDB dbm.DB blockStore *bc.BlockStore // store the blockchain to disk bcReactor *bc.BlockchainReactor // for fast-syncing mempoolReactor *mempl.MempoolReactor // for gossipping transactions @@ -137,6 +138,7 @@ func NewNode(config *cfg.Config, } // Get genesis doc + // TODO: move to state package? genDoc, err := loadGenesisDoc(stateDB) if err != nil { genDoc, err = genesisDocProvider() @@ -148,21 +150,16 @@ func NewNode(config *cfg.Config, saveGenesisDoc(stateDB, genDoc) } - stateLogger := logger.With("module", "state") - state := sm.LoadState(stateDB) - if state == nil { - state, err = sm.MakeGenesisState(stateDB, genDoc) - if err != nil { - return nil, err - } - state.Save() + state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc) + if err != nil { + return nil, err } - state.SetLogger(stateLogger) // Create the proxyApp, which manages connections (consensus, mempool, query) - // and sync tendermint and the app by replaying any necessary blocks + // and sync tendermint and the app by performing a handshake + // and replaying any necessary blocks consensusLogger := logger.With("module", "consensus") - handshaker := consensus.NewHandshaker(state, blockStore) + handshaker := consensus.NewHandshaker(stateDB, state, blockStore) handshaker.SetLogger(consensusLogger) proxyApp := proxy.NewAppConns(clientCreator, handshaker) proxyApp.SetLogger(logger.With("module", "proxy")) @@ -172,7 +169,6 @@ func NewNode(config *cfg.Config, // reload the state (it may have been updated by the handshake) state = sm.LoadState(stateDB) - state.SetLogger(stateLogger) // Generate node PrivKey privKey := crypto.GenPrivKeyEd25519() @@ -194,10 +190,6 @@ func NewNode(config *cfg.Config, consensusLogger.Info("This node is not a validator") } - // Make BlockchainReactor - bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync) - bcReactor.SetLogger(logger.With("module", "blockchain")) - // Make MempoolReactor mempoolLogger := logger.With("module", "mempool") mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight) @@ -216,14 +208,22 @@ func NewNode(config *cfg.Config, } evidenceLogger := logger.With("module", "evidence") evidenceStore := evidence.NewEvidenceStore(evidenceDB) - evidencePool := evidence.NewEvidencePool(state.ConsensusParams.EvidenceParams, evidenceStore, state.Copy()) + evidencePool := evidence.NewEvidencePool(stateDB, evidenceStore) evidencePool.SetLogger(evidenceLogger) evidenceReactor := evidence.NewEvidenceReactor(evidencePool) evidenceReactor.SetLogger(evidenceLogger) + blockExecLogger := logger.With("module", "state") + // make block executor for consensus and blockchain reactors to execute blocks + blockExec := sm.NewBlockExecutor(stateDB, blockExecLogger, proxyApp.Consensus(), mempool, evidencePool) + + // Make BlockchainReactor + bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor.SetLogger(logger.With("module", "blockchain")) + // Make ConsensusReactor consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), - proxyApp.Consensus(), blockStore, mempool, evidencePool) + blockExec, blockStore, mempool, evidencePool) consensusState.SetLogger(consensusLogger) if privValidator != nil { consensusState.SetPrivValidator(privValidator) @@ -291,7 +291,7 @@ func NewNode(config *cfg.Config, eventBus.SetLogger(logger.With("module", "events")) // services which will be publishing and/or subscribing for messages (events) - bcReactor.SetEventBus(eventBus) + // consensusReactor will set it on consensusState and blockExecutor consensusReactor.SetEventBus(eventBus) // Transaction indexing @@ -333,6 +333,7 @@ func NewNode(config *cfg.Config, addrBook: addrBook, trustMetricStore: trustMetricStore, + stateDB: stateDB, blockStore: blockStore, bcReactor: bcReactor, mempoolReactor: mempoolReactor, @@ -429,6 +430,7 @@ func (n *Node) AddListener(l p2p.Listener) { // ConfigureRPC sets all variables in rpccore so they will serve // rpc calls from this node func (n *Node) ConfigureRPC() { + rpccore.SetStateDB(n.stateDB) rpccore.SetBlockStore(n.blockStore) rpccore.SetConsensusState(n.consensusState) rpccore.SetMempool(n.mempoolReactor.Mempool) diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index 43edcd356..853bb0f7b 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -4,6 +4,7 @@ import ( "fmt" ctypes "github.com/tendermint/tendermint/rpc/core/types" + sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" ) @@ -336,8 +337,7 @@ func BlockResults(heightPtr *int64) (*ctypes.ResultBlockResults, error) { } // load the results - state := consensusState.GetState() - results, err := state.LoadABCIResponses(height) + results, err := sm.LoadABCIResponses(stateDB, height) if err != nil { return nil, err } diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index e358c4874..65c9fc364 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -4,6 +4,7 @@ import ( cm "github.com/tendermint/tendermint/consensus" cstypes "github.com/tendermint/tendermint/consensus/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" + sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -49,8 +50,7 @@ func Validators(heightPtr *int64) (*ctypes.ResultValidators, error) { return nil, err } - state := consensusState.GetState() - validators, err := state.LoadValidators(height) + validators, err := sm.LoadValidators(stateDB, height) if err != nil { return nil, err } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 325625c79..927d7ccad 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -11,6 +11,7 @@ import ( sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" + dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" ) @@ -20,7 +21,7 @@ var subscribeTimeout = 5 * time.Second // These interfaces are used by RPC and must be thread safe type Consensus interface { - GetState() *sm.State + GetState() sm.State GetValidators() (int64, []*types.Validator) GetRoundState() *cstypes.RoundState } @@ -43,6 +44,7 @@ var ( proxyAppQuery proxy.AppConnQuery // interfaces defined in types and above + stateDB dbm.DB blockStore types.BlockStore mempool types.Mempool evidencePool types.EvidencePool @@ -60,6 +62,10 @@ var ( logger log.Logger ) +func SetStateDB(db dbm.DB) { + stateDB = db +} + func SetBlockStore(bs types.BlockStore) { blockStore = bs } diff --git a/state/execution.go b/state/execution.go index c96861524..4ccf87d4a 100644 --- a/state/execution.go +++ b/state/execution.go @@ -1,7 +1,6 @@ package state import ( - "bytes" "errors" "fmt" @@ -10,36 +9,154 @@ import ( crypto "github.com/tendermint/go-crypto" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" + dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" ) -//-------------------------------------------------- -// Execute the block +//----------------------------------------------------------------------------- +// BlockExecutor handles block execution and state updates. +// It exposes ApplyBlock(), which validates & executes the block, updates state w/ ABCI responses, +// then commits and updates the mempool atomically, then saves state. + +// BlockExecutor provides the context and accessories for properly executing a block. +type BlockExecutor struct { + // save state, validators, consensus params, abci responses here + db dbm.DB + + // execute the app against this + proxyApp proxy.AppConnConsensus + + // events + eventBus types.BlockEventPublisher + + // update these with block results after commit + mempool types.Mempool + evpool types.EvidencePool + + logger log.Logger +} + +// NewBlockExecutor returns a new BlockExecutor with a NopEventBus. +// Call SetEventBus to provide one. +func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, + mempool types.Mempool, evpool types.EvidencePool) *BlockExecutor { + return &BlockExecutor{ + db: db, + proxyApp: proxyApp, + eventBus: types.NopEventBus{}, + mempool: mempool, + evpool: evpool, + logger: logger, + } +} + +// SetEventBus - sets the event bus for publishing block related events. +// If not called, it defaults to types.NopEventBus. +func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) { + blockExec.eventBus = eventBus +} + +// ValidateBlock validates the given block against the given state. +// If the block is invalid, it returns an error. +// Validation does not mutate state, but does require historical information from the stateDB, +// ie. to verify evidence from a validator at an old height. +func (blockExec *BlockExecutor) ValidateBlock(s State, block *types.Block) error { + return validateBlock(blockExec.db, s, block) +} + +// ApplyBlock validates the block against the state, executes it against the app, +// fires the relevant events, commits the app, and saves the new state and responses. +// It's the only function that needs to be called +// from outside this package to process and commit an entire block. +// It takes a blockID to avoid recomputing the parts hash. +func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block *types.Block) (State, error) { -// ValExecBlock executes the block, but does NOT mutate State. -// + validates the block -// + executes block.Txs on the proxyAppConn -func (s *State) ValExecBlock(txEventPublisher types.TxEventPublisher, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) { - // Validate the block. - if err := s.validateBlock(block); err != nil { - return nil, ErrInvalidBlock(err) + if err := blockExec.ValidateBlock(s, block); err != nil { + return s, ErrInvalidBlock(err) } - // Execute the block txs - abciResponses, err := execBlockOnProxyApp(txEventPublisher, proxyAppConn, block, s.logger, s.LastValidators) + abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block) if err != nil { - // There was some error in proxyApp - // TODO Report error and wait for proxyApp to be available. - return nil, ErrProxyAppConn(err) + return s, ErrProxyAppConn(err) } - return abciResponses, nil + fail.Fail() // XXX + + // save the results before we commit + saveABCIResponses(blockExec.db, block.Height, abciResponses) + + fail.Fail() // XXX + + // update the state with the block and responses + s, err = updateState(s, blockID, block.Header, abciResponses) + if err != nil { + return s, fmt.Errorf("Commit failed for application: %v", err) + } + + // lock mempool, commit state, update mempoool + appHash, err := blockExec.Commit(block) + if err != nil { + return s, fmt.Errorf("Commit failed for application: %v", err) + } + + fail.Fail() // XXX + + // update the app hash and save the state + s.AppHash = appHash + SaveState(blockExec.db, s) + + fail.Fail() // XXX + + // Update evpool now that state is saved + // TODO: handle the crash/recover scenario + // ie. (may need to call Update for last block) + blockExec.evpool.Update(block) + + // events are fired after everything else + // NOTE: if we crash between Commit and Save, events wont be fired during replay + fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses) + + return s, nil } +// Commit locks the mempool, runs the ABCI Commit message, and updates the mempool. +// It returns the result of calling abci.Commit (the AppHash), and an error. +// The Mempool must be locked during commit and update because state is typically reset on Commit and old txs must be replayed +// against committed state before new txs are run in the mempool, lest they be invalid. +func (blockExec *BlockExecutor) Commit(block *types.Block) ([]byte, error) { + blockExec.mempool.Lock() + defer blockExec.mempool.Unlock() + + // Commit block, get hash back + res, err := blockExec.proxyApp.CommitSync() + if err != nil { + blockExec.logger.Error("Client error during proxyAppConn.CommitSync", "err", err) + return nil, err + } + if res.IsErr() { + blockExec.logger.Error("Error in proxyAppConn.CommitSync", "err", res) + return nil, res + } + if res.Log != "" { + blockExec.logger.Debug("Commit.Log: " + res.Log) + } + + blockExec.logger.Info("Committed state", "height", block.Height, "txs", block.NumTxs, "appHash", res.Data) + + // Update mempool. + if err := blockExec.mempool.Update(block.Height, block.Txs); err != nil { + return nil, err + } + + return res.Data, nil +} + +//--------------------------------------------------------- +// Helper functions for executing blocks and updating state + // Executes block's transactions on proxyAppConn. // Returns a list of transaction results and updates to the validator set -// TODO: Generate a bitmap or otherwise store tx validity in state. -func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn proxy.AppConnConsensus, block *types.Block, logger log.Logger, lastValidators *types.ValidatorSet) (*ABCIResponses, error) { +func execBlockOnProxyApp(logger log.Logger, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) { var validTxs, invalidTxs = 0, 0 txIndex := 0 @@ -59,17 +176,6 @@ func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn p logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log) invalidTxs++ } - - // NOTE: if we count we can access the tx from the block instead of - // pulling it from the req - tx := types.Tx(req.GetDeliverTx().Tx) - txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{ - Height: block.Height, - Index: uint32(txIndex), - Tx: tx, - Result: *txRes, - }}) - abciResponses.DeliverTx[txIndex] = txRes txIndex++ } @@ -209,194 +315,91 @@ func changeInVotingPowerMoreOrEqualToOneThird(currentSet *types.ValidatorSet, up return false, nil } -// return a bit array of validators that signed the last commit -// NOTE: assumes commits have already been authenticated -/* function is currently unused -func commitBitArrayFromBlock(block *types.Block) *cmn.BitArray { - signed := cmn.NewBitArray(len(block.LastCommit.Precommits)) - for i, precommit := range block.LastCommit.Precommits { - if precommit != nil { - signed.SetIndex(i, true) // val_.LastCommitHeight = block.Height - 1 - } - } - return signed -} -*/ +// updateState returns a new State updated according to the header and responses. +func updateState(s State, blockID types.BlockID, header *types.Header, + abciResponses *ABCIResponses) (State, error) { -//----------------------------------------------------- -// Validate block + // copy the valset so we can apply changes from EndBlock + // and update s.LastValidators and s.Validators + prevValSet := s.Validators.Copy() + nextValSet := prevValSet.Copy() -// ValidateBlock validates the block against the state. -func (s *State) ValidateBlock(block *types.Block) error { - return s.validateBlock(block) -} - -// MakeBlock builds a block with the given txs and commit from the current state. -func (s *State) MakeBlock(height int64, txs []types.Tx, commit *types.Commit) (*types.Block, *types.PartSet) { - // build base block - block := types.MakeBlock(height, txs, commit) - - // fill header with state data - block.ChainID = s.ChainID - block.TotalTxs = s.LastBlockTotalTx + block.NumTxs - block.LastBlockID = s.LastBlockID - block.ValidatorsHash = s.Validators.Hash() - block.AppHash = s.AppHash - block.ConsensusHash = s.ConsensusParams.Hash() - block.LastResultsHash = s.LastResultsHash - - return block, block.MakePartSet(s.ConsensusParams.BlockGossip.BlockPartSizeBytes) -} - -func (s *State) validateBlock(b *types.Block) error { - // validate internal consistency - if err := b.ValidateBasic(); err != nil { - return err - } - - // validate basic info - if b.ChainID != s.ChainID { - return fmt.Errorf("Wrong Block.Header.ChainID. Expected %v, got %v", s.ChainID, b.ChainID) - } - if b.Height != s.LastBlockHeight+1 { - return fmt.Errorf("Wrong Block.Header.Height. Expected %v, got %v", s.LastBlockHeight+1, b.Height) - } - /* TODO: Determine bounds for Time - See blockchain/reactor "stopSyncingDurationMinutes" - - if !b.Time.After(lastBlockTime) { - return errors.New("Invalid Block.Header.Time") + // update the validator set with the latest abciResponses + lastHeightValsChanged := s.LastHeightValidatorsChanged + if len(abciResponses.EndBlock.ValidatorUpdates) > 0 { + err := updateValidators(nextValSet, abciResponses.EndBlock.ValidatorUpdates) + if err != nil { + return s, fmt.Errorf("Error changing validator set: %v", err) } - */ - - // validate prev block info - if !b.LastBlockID.Equals(s.LastBlockID) { - return fmt.Errorf("Wrong Block.Header.LastBlockID. Expected %v, got %v", s.LastBlockID, b.LastBlockID) - } - newTxs := int64(len(b.Data.Txs)) - if b.TotalTxs != s.LastBlockTotalTx+newTxs { - return fmt.Errorf("Wrong Block.Header.TotalTxs. Expected %v, got %v", s.LastBlockTotalTx+newTxs, b.TotalTxs) + // change results from this height but only applies to the next height + lastHeightValsChanged = header.Height + 1 } - // validate app info - if !bytes.Equal(b.AppHash, s.AppHash) { - return fmt.Errorf("Wrong Block.Header.AppHash. Expected %X, got %v", s.AppHash, b.AppHash) - } - if !bytes.Equal(b.ConsensusHash, s.ConsensusParams.Hash()) { - return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %v", s.ConsensusParams.Hash(), b.ConsensusHash) - } - if !bytes.Equal(b.LastResultsHash, s.LastResultsHash) { - return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %v", s.LastResultsHash, b.LastResultsHash) - } - if !bytes.Equal(b.ValidatorsHash, s.Validators.Hash()) { - return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %v", s.Validators.Hash(), b.ValidatorsHash) - } + // Update validator accums and set state variables + nextValSet.IncrementAccum(1) - // Validate block LastCommit. - if b.Height == 1 { - if len(b.LastCommit.Precommits) != 0 { - return errors.New("Block at height 1 (first block) should have no LastCommit precommits") - } - } else { - if len(b.LastCommit.Precommits) != s.LastValidators.Size() { - return fmt.Errorf("Invalid block commit size. Expected %v, got %v", - s.LastValidators.Size(), len(b.LastCommit.Precommits)) - } - err := s.LastValidators.VerifyCommit( - s.ChainID, s.LastBlockID, b.Height-1, b.LastCommit) + // update the params with the latest abciResponses + nextParams := s.ConsensusParams + lastHeightParamsChanged := s.LastHeightConsensusParamsChanged + if abciResponses.EndBlock.ConsensusParamUpdates != nil { + // NOTE: must not mutate s.ConsensusParams + nextParams = s.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates) + err := nextParams.Validate() if err != nil { - return err - } - } - - for _, ev := range b.Evidence.Evidence { - if _, err := s.VerifyEvidence(ev); err != nil { - return types.NewEvidenceInvalidErr(ev, err) + return s, fmt.Errorf("Error updating consensus params: %v", err) } - } - - return nil + // change results from this height but only applies to the next height + lastHeightParamsChanged = header.Height + 1 + } + + // NOTE: the AppHash has not been populated. + // It will be filled on state.Save. + return State{ + ChainID: s.ChainID, + LastBlockHeight: header.Height, + LastBlockTotalTx: s.LastBlockTotalTx + header.NumTxs, + LastBlockID: blockID, + LastBlockTime: header.Time, + Validators: nextValSet, + LastValidators: s.Validators.Copy(), + LastHeightValidatorsChanged: lastHeightValsChanged, + ConsensusParams: nextParams, + LastHeightConsensusParamsChanged: lastHeightParamsChanged, + LastResultsHash: abciResponses.ResultsHash(), + AppHash: nil, + }, nil } -//----------------------------------------------------------------------------- -// ApplyBlock validates & executes the block, updates state w/ ABCI responses, -// then commits and updates the mempool atomically, then saves state. - -// ApplyBlock validates the block against the state, executes it against the app, -// commits it, and saves the block and state. It's the only function that needs to be called -// from outside this package to process and commit an entire block. -func (s *State) ApplyBlock(txEventPublisher types.TxEventPublisher, proxyAppConn proxy.AppConnConsensus, - block *types.Block, partsHeader types.PartSetHeader, - mempool types.Mempool, evpool types.EvidencePool) error { - - abciResponses, err := s.ValExecBlock(txEventPublisher, proxyAppConn, block) - if err != nil { - return fmt.Errorf("Exec failed for application: %v", err) - } - - fail.Fail() // XXX - - // save the results before we commit - s.SaveABCIResponses(block.Height, abciResponses) - - fail.Fail() // XXX - - // now update the block and validators - err = s.SetBlockAndValidators(block.Header, partsHeader, abciResponses) +// Fire NewBlock, NewBlockHeader. +// Fire TxEvent for every tx. +// NOTE: if Tendermint crashes before commit, some or all of these events may be published again. +func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) { + // NOTE: do we still need this buffer ? + txEventBuffer := types.NewTxEventBuffer(eventBus, int(block.NumTxs)) + for i, tx := range block.Data.Txs { + txEventBuffer.PublishEventTx(types.EventDataTx{types.TxResult{ + Height: block.Height, + Index: uint32(i), + Tx: tx, + Result: *(abciResponses.DeliverTx[i]), + }}) + } + + eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}) + eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}) + err := txEventBuffer.Flush() if err != nil { - return fmt.Errorf("Commit failed for application: %v", err) + logger.Error("Failed to flush event buffer", "err", err) } - - // lock mempool, commit state, update mempoool - err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool) - if err != nil { - return fmt.Errorf("Commit failed for application: %v", err) - } - - fail.Fail() // XXX - - evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence) - - // save the state and the validators - s.Save() - - return nil } -// CommitStateUpdateMempool locks the mempool, runs the ABCI Commit message, and updates the mempool. -// The Mempool must be locked during commit and update because state is typically reset on Commit and old txs must be replayed -// against committed state before new txs are run in the mempool, lest they be invalid. -func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, block *types.Block, mempool types.Mempool) error { - mempool.Lock() - defer mempool.Unlock() - - // Commit block, get hash back - res, err := proxyAppConn.CommitSync() - if err != nil { - s.logger.Error("Client error during proxyAppConn.CommitSync", "err", err) - return err - } - if res.IsErr() { - s.logger.Error("Error in proxyAppConn.CommitSync", "err", res) - return res - } - if res.Log != "" { - s.logger.Debug("Commit.Log: " + res.Log) - } - - s.logger.Info("Committed state", "height", block.Height, "txs", block.NumTxs, "hash", res.Data) - - // Set the state's new AppHash - s.AppHash = res.Data - - // Update mempool. - return mempool.Update(block.Height, block.Txs) -} +//---------------------------------------------------------------------------------------------------- +// Execute block without state. TODO: eliminate // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state. // It returns the application root hash (result of abci.Commit). -func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger, lastValidators *types.ValidatorSet) ([]byte, error) { - _, err := execBlockOnProxyApp(types.NopEventBus{}, appConnConsensus, block, logger, lastValidators) +func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger) ([]byte, error) { + _, err := execBlockOnProxyApp(logger, appConnConsensus, block) if err != nil { logger.Error("Error executing block on proxy app", "height", block.Height, "err", err) return nil, err diff --git a/state/execution_test.go b/state/execution_test.go index 7cda5c1da..9db269116 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -23,64 +23,6 @@ var ( nTxsPerBlock = 10 ) -func TestValidateBlock(t *testing.T) { - state := state() - state.SetLogger(log.TestingLogger()) - - // proper block must pass - block := makeBlock(state, 1) - err := state.ValidateBlock(block) - require.NoError(t, err) - - // wrong chain fails - block = makeBlock(state, 1) - block.ChainID = "not-the-real-one" - err = state.ValidateBlock(block) - require.Error(t, err) - - // wrong height fails - block = makeBlock(state, 1) - block.Height += 10 - err = state.ValidateBlock(block) - require.Error(t, err) - - // wrong total tx fails - block = makeBlock(state, 1) - block.TotalTxs += 10 - err = state.ValidateBlock(block) - require.Error(t, err) - - // wrong blockid fails - block = makeBlock(state, 1) - block.LastBlockID.PartsHeader.Total += 10 - err = state.ValidateBlock(block) - require.Error(t, err) - - // wrong app hash fails - block = makeBlock(state, 1) - block.AppHash = []byte("wrong app hash") - err = state.ValidateBlock(block) - require.Error(t, err) - - // wrong consensus hash fails - block = makeBlock(state, 1) - block.ConsensusHash = []byte("wrong consensus hash") - err = state.ValidateBlock(block) - require.Error(t, err) - - // wrong results hash fails - block = makeBlock(state, 1) - block.LastResultsHash = []byte("wrong results hash") - err = state.ValidateBlock(block) - require.Error(t, err) - - // wrong validators hash fails - block = makeBlock(state, 1) - block.ValidatorsHash = []byte("wrong validators hash") - err = state.ValidateBlock(block) - require.Error(t, err) -} - func TestApplyBlock(t *testing.T) { cc := proxy.NewLocalClientCreator(dummy.NewDummyApplication()) proxyApp := proxy.NewAppConns(cc, nil) @@ -88,15 +30,15 @@ func TestApplyBlock(t *testing.T) { require.Nil(t, err) defer proxyApp.Stop() - state := state() - state.SetLogger(log.TestingLogger()) + state, stateDB := state(), dbm.NewMemDB() - block := makeBlock(state, 1) - - err = state.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), - block, block.MakePartSet(testPartSize).Header(), + blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), types.MockMempool{}, types.MockEvidencePool{}) + block := makeBlock(state, 1) + blockID := types.BlockID{block.Hash(), block.MakePartSet(testPartSize).Header()} + + state, err = blockExec.ApplyBlock(state, blockID, block) require.Nil(t, err) // TODO check state and mempool @@ -112,15 +54,6 @@ func TestBeginBlockAbsentValidators(t *testing.T) { defer proxyApp.Stop() state := state() - state.SetLogger(log.TestingLogger()) - - // there were 2 validators - val1PrivKey := crypto.GenPrivKeyEd25519() - val2PrivKey := crypto.GenPrivKeyEd25519() - lastValidators := types.NewValidatorSet([]*types.Validator{ - types.NewValidator(val1PrivKey.PubKey(), 10), - types.NewValidator(val2PrivKey.PubKey(), 5), - }) prevHash := state.LastBlockID.Hash prevParts := types.PartSetHeader{} @@ -141,7 +74,7 @@ func TestBeginBlockAbsentValidators(t *testing.T) { lastCommit := &types.Commit{BlockID: prevBlockID, Precommits: tc.lastCommitPrecommits} block, _ := state.MakeBlock(2, makeTxs(2), lastCommit) - _, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger(), lastValidators) + _, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger()) require.Nil(t, err, tc.desc) // -> app must receive an index of the absent validator @@ -159,8 +92,8 @@ func makeTxs(height int64) (txs []types.Tx) { return txs } -func state() *State { - s, _ := MakeGenesisState(dbm.NewMemDB(), &types.GenesisDoc{ +func state() State { + s, _ := MakeGenesisState(&types.GenesisDoc{ ChainID: chainID, Validators: []types.GenesisValidator{ {privKey.PubKey(), 10000, "test"}, @@ -170,7 +103,7 @@ func state() *State { return s } -func makeBlock(state *State, height int64) *types.Block { +func makeBlock(state State, height int64) *types.Block { block, _ := state.MakeBlock(height, makeTxs(state.LastBlockHeight), new(types.Commit)) return block } diff --git a/state/state.go b/state/state.go index 773b46fcc..575a1630e 100644 --- a/state/state.go +++ b/state/state.go @@ -4,15 +4,8 @@ import ( "bytes" "fmt" "io/ioutil" - "sync" "time" - abci "github.com/tendermint/abci/types" - - cmn "github.com/tendermint/tmlibs/common" - dbm "github.com/tendermint/tmlibs/db" - "github.com/tendermint/tmlibs/log" - wire "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" @@ -23,36 +16,19 @@ var ( stateKey = []byte("stateKey") ) -func calcValidatorsKey(height int64) []byte { - return []byte(cmn.Fmt("validatorsKey:%v", height)) -} - -func calcConsensusParamsKey(height int64) []byte { - return []byte(cmn.Fmt("consensusParamsKey:%v", height)) -} - -func calcABCIResponsesKey(height int64) []byte { - return []byte(cmn.Fmt("abciResponsesKey:%v", height)) -} - //----------------------------------------------------------------------------- // State is a short description of the latest committed block of the Tendermint consensus. // It keeps all information necessary to validate new blocks, // including the last validator set and the consensus params. // All fields are exposed so the struct can be easily serialized, -// but the fields should only be changed by calling state.SetBlockAndValidators. +// but none of them should be mutated directly. +// Instead, use state.Copy() or state.NextState(...). // NOTE: not goroutine-safe. type State struct { - // mtx for writing to db - mtx sync.Mutex - db dbm.DB - // Immutable ChainID string - // Exposed fields are updated by SetBlockAndValidators. - // LastBlockHeight=0 at genesis (ie. block(H=0) does not exist) LastBlockHeight int64 LastBlockTotalTx int64 @@ -78,61 +54,11 @@ type State struct { // The latest AppHash we've received from calling abci.Commit() AppHash []byte - - logger log.Logger -} - -// GetState loads the most recent state from the database, -// or creates a new one from the given genesisFile and persists the result -// to the database. -func GetState(stateDB dbm.DB, genesisFile string) (*State, error) { - state := LoadState(stateDB) - if state == nil { - var err error - state, err = MakeGenesisStateFromFile(stateDB, genesisFile) - if err != nil { - return nil, err - } - state.Save() - } - - return state, nil -} - -// LoadState loads the State from the database. -func LoadState(db dbm.DB) *State { - return loadState(db, stateKey) -} - -func loadState(db dbm.DB, key []byte) *State { - buf := db.Get(key) - if len(buf) == 0 { - return nil - } - - s := &State{db: db} - r, n, err := bytes.NewReader(buf), new(int), new(error) - wire.ReadBinaryPtr(&s, r, 0, n, err) - if *err != nil { - // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED - cmn.Exit(cmn.Fmt(`LoadState: Data has been corrupted or its spec has changed: - %v\n`, *err)) - } - // TODO: ensure that buf is completely read. - - return s -} - -// SetLogger sets the logger on the State. -func (s *State) SetLogger(l log.Logger) { - s.logger = l } // Copy makes a copy of the State for mutating. -func (s *State) Copy() *State { - return &State{ - db: s.db, - +func (s State) Copy() State { + return State{ ChainID: s.ChainID, LastBlockHeight: s.LastBlockHeight, @@ -150,327 +76,47 @@ func (s *State) Copy() *State { AppHash: s.AppHash, LastResultsHash: s.LastResultsHash, - - logger: s.logger, } } -// Save persists the State to the database. -func (s *State) Save() { - s.mtx.Lock() - defer s.mtx.Unlock() - - s.saveValidatorsInfo() - s.saveConsensusParamsInfo() - s.db.SetSync(stateKey, s.Bytes()) -} - -// SaveABCIResponses persists the ABCIResponses to the database. -// This is useful in case we crash after app.Commit and before s.Save(). -// Responses are indexed by height so they can also be loaded later to produce Merkle proofs. -func (s *State) SaveABCIResponses(height int64, abciResponses *ABCIResponses) { - s.db.SetSync(calcABCIResponsesKey(height), abciResponses.Bytes()) -} - -// LoadABCIResponses loads the ABCIResponses for the given height from the database. -// This is useful for recovering from crashes where we called app.Commit and before we called -// s.Save(). It can also be used to produce Merkle proofs of the result of txs. -func (s *State) LoadABCIResponses(height int64) (*ABCIResponses, error) { - buf := s.db.Get(calcABCIResponsesKey(height)) - if len(buf) == 0 { - return nil, ErrNoABCIResponsesForHeight{height} - } - - abciResponses := new(ABCIResponses) - r, n, err := bytes.NewReader(buf), new(int), new(error) - wire.ReadBinaryPtr(abciResponses, r, 0, n, err) - if *err != nil { - // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED - cmn.Exit(cmn.Fmt(`LoadABCIResponses: Data has been corrupted or its spec has - changed: %v\n`, *err)) - } - // TODO: ensure that buf is completely read. - - return abciResponses, nil -} - -// LoadValidators loads the ValidatorSet for a given height. -// Returns ErrNoValSetForHeight if the validator set can't be found for this height. -func (s *State) LoadValidators(height int64) (*types.ValidatorSet, error) { - valInfo := s.loadValidatorsInfo(height) - if valInfo == nil { - return nil, ErrNoValSetForHeight{height} - } - - if valInfo.ValidatorSet == nil { - valInfo = s.loadValidatorsInfo(valInfo.LastHeightChanged) - if valInfo == nil { - cmn.PanicSanity(fmt.Sprintf(`Couldn't find validators at height %d as - last changed from height %d`, valInfo.LastHeightChanged, height)) - } - } - - return valInfo.ValidatorSet, nil -} - -func (s *State) loadValidatorsInfo(height int64) *ValidatorsInfo { - buf := s.db.Get(calcValidatorsKey(height)) - if len(buf) == 0 { - return nil - } - - v := new(ValidatorsInfo) - r, n, err := bytes.NewReader(buf), new(int), new(error) - wire.ReadBinaryPtr(v, r, 0, n, err) - if *err != nil { - // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED - cmn.Exit(cmn.Fmt(`LoadValidators: Data has been corrupted or its spec has changed: - %v\n`, *err)) - } - // TODO: ensure that buf is completely read. - - return v -} - -// saveValidatorsInfo persists the validator set for the next block to disk. -// It should be called from s.Save(), right before the state itself is persisted. -// If the validator set did not change after processing the latest block, -// only the last height for which the validators changed is persisted. -func (s *State) saveValidatorsInfo() { - changeHeight := s.LastHeightValidatorsChanged - nextHeight := s.LastBlockHeight + 1 - valInfo := &ValidatorsInfo{ - LastHeightChanged: changeHeight, - } - if changeHeight == nextHeight { - valInfo.ValidatorSet = s.Validators - } - s.db.SetSync(calcValidatorsKey(nextHeight), valInfo.Bytes()) -} - -// LoadConsensusParams loads the ConsensusParams for a given height. -func (s *State) LoadConsensusParams(height int64) (types.ConsensusParams, error) { - empty := types.ConsensusParams{} - - paramsInfo := s.loadConsensusParamsInfo(height) - if paramsInfo == nil { - return empty, ErrNoConsensusParamsForHeight{height} - } - - if paramsInfo.ConsensusParams == empty { - paramsInfo = s.loadConsensusParamsInfo(paramsInfo.LastHeightChanged) - if paramsInfo == nil { - cmn.PanicSanity(fmt.Sprintf(`Couldn't find consensus params at height %d as - last changed from height %d`, paramsInfo.LastHeightChanged, height)) - } - } - - return paramsInfo.ConsensusParams, nil -} - -func (s *State) loadConsensusParamsInfo(height int64) *ConsensusParamsInfo { - buf := s.db.Get(calcConsensusParamsKey(height)) - if len(buf) == 0 { - return nil - } - - paramsInfo := new(ConsensusParamsInfo) - r, n, err := bytes.NewReader(buf), new(int), new(error) - wire.ReadBinaryPtr(paramsInfo, r, 0, n, err) - if *err != nil { - // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED - cmn.Exit(cmn.Fmt(`LoadConsensusParams: Data has been corrupted or its spec has changed: - %v\n`, *err)) - } - // TODO: ensure that buf is completely read. - - return paramsInfo -} - -// saveConsensusParamsInfo persists the consensus params for the next block to disk. -// It should be called from s.Save(), right before the state itself is persisted. -// If the consensus params did not change after processing the latest block, -// only the last height for which they changed is persisted. -func (s *State) saveConsensusParamsInfo() { - changeHeight := s.LastHeightConsensusParamsChanged - nextHeight := s.LastBlockHeight + 1 - paramsInfo := &ConsensusParamsInfo{ - LastHeightChanged: changeHeight, - } - if changeHeight == nextHeight { - paramsInfo.ConsensusParams = s.ConsensusParams - } - s.db.SetSync(calcConsensusParamsKey(nextHeight), paramsInfo.Bytes()) -} - // Equals returns true if the States are identical. -func (s *State) Equals(s2 *State) bool { +func (s State) Equals(s2 State) bool { return bytes.Equal(s.Bytes(), s2.Bytes()) } // Bytes serializes the State using go-wire. -func (s *State) Bytes() []byte { +func (s State) Bytes() []byte { return wire.BinaryBytes(s) } -// SetBlockAndValidators mutates State variables -// to update block and validators after running EndBlock. -func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, - abciResponses *ABCIResponses) error { - - // copy the valset so we can apply changes from EndBlock - // and update s.LastValidators and s.Validators - prevValSet := s.Validators.Copy() - nextValSet := prevValSet.Copy() - - // update the validator set with the latest abciResponses - if len(abciResponses.EndBlock.ValidatorUpdates) > 0 { - err := updateValidators(nextValSet, abciResponses.EndBlock.ValidatorUpdates) - if err != nil { - return fmt.Errorf("Error changing validator set: %v", err) - } - // change results from this height but only applies to the next height - s.LastHeightValidatorsChanged = header.Height + 1 - } - - // Update validator accums and set state variables - nextValSet.IncrementAccum(1) - - // update the params with the latest abciResponses - nextParams := s.ConsensusParams - if abciResponses.EndBlock.ConsensusParamUpdates != nil { - // NOTE: must not mutate s.ConsensusParams - nextParams = s.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates) - err := nextParams.Validate() - if err != nil { - return fmt.Errorf("Error updating consensus params: %v", err) - } - // change results from this height but only applies to the next height - s.LastHeightConsensusParamsChanged = header.Height + 1 - } - - s.setBlockAndValidators(header.Height, - header.NumTxs, - types.BlockID{header.Hash(), blockPartsHeader}, - header.Time, - nextValSet, - nextParams, - abciResponses.ResultsHash()) - return nil -} - -func (s *State) setBlockAndValidators(height int64, - newTxs int64, blockID types.BlockID, blockTime time.Time, - valSet *types.ValidatorSet, - params types.ConsensusParams, - resultsHash []byte) { - - s.LastBlockHeight = height - s.LastBlockTotalTx += newTxs - s.LastBlockID = blockID - s.LastBlockTime = blockTime - - s.LastValidators = s.Validators.Copy() - s.Validators = valSet - - s.ConsensusParams = params - - s.LastResultsHash = resultsHash +// IsEmpty returns true if the State is equal to the empty State. +func (s State) IsEmpty() bool { + return s.Validators == nil // XXX can't compare to Empty } // GetValidators returns the last and current validator sets. -func (s *State) GetValidators() (last *types.ValidatorSet, current *types.ValidatorSet) { +func (s State) GetValidators() (last *types.ValidatorSet, current *types.ValidatorSet) { return s.LastValidators, s.Validators } -// VerifyEvidence verifies the evidence fully by checking it is internally -// consistent and corresponds to an existing or previous validator. -// It returns the priority of this evidence, or an error. -// NOTE: return error may be ErrNoValSetForHeight, in which case the validator set -// for the evidence height could not be loaded. -func (s *State) VerifyEvidence(evidence types.Evidence) (priority int64, err error) { - evidenceAge := s.LastBlockHeight - evidence.Height() - maxAge := s.ConsensusParams.EvidenceParams.MaxAge - if evidenceAge > maxAge { - return priority, fmt.Errorf("Evidence from height %d is too old. Min height is %d", - evidence.Height(), s.LastBlockHeight-maxAge) - } - - if err := evidence.Verify(s.ChainID); err != nil { - return priority, err - } - - // The address must have been an active validator at the height - ev := evidence - height, addr, idx := ev.Height(), ev.Address(), ev.Index() - valset, err := s.LoadValidators(height) - if err != nil { - // XXX/TODO: what do we do if we can't load the valset? - // eg. if we have pruned the state or height is too high? - return priority, err - } - valIdx, val := valset.GetByAddress(addr) - if val == nil { - return priority, fmt.Errorf("Address %X was not a validator at height %d", addr, height) - } else if idx != valIdx { - return priority, fmt.Errorf("Address %X was validator %d at height %d, not %d", addr, valIdx, height, idx) - } - - priority = val.VotingPower - return priority, nil -} - //------------------------------------------------------------------------ +// Create a block from the latest state -// ABCIResponses retains the responses -// of the various ABCI calls during block processing. -// It is persisted to disk for each height before calling Commit. -type ABCIResponses struct { - DeliverTx []*abci.ResponseDeliverTx - EndBlock *abci.ResponseEndBlock -} - -// NewABCIResponses returns a new ABCIResponses -func NewABCIResponses(block *types.Block) *ABCIResponses { - return &ABCIResponses{ - DeliverTx: make([]*abci.ResponseDeliverTx, block.NumTxs), - } -} - -// Bytes serializes the ABCIResponse using go-wire -func (a *ABCIResponses) Bytes() []byte { - return wire.BinaryBytes(*a) -} - -func (a *ABCIResponses) ResultsHash() []byte { - results := types.NewResults(a.DeliverTx) - return results.Hash() -} - -//----------------------------------------------------------------------------- - -// ValidatorsInfo represents the latest validator set, or the last height it changed -type ValidatorsInfo struct { - ValidatorSet *types.ValidatorSet - LastHeightChanged int64 -} - -// Bytes serializes the ValidatorsInfo using go-wire -func (valInfo *ValidatorsInfo) Bytes() []byte { - return wire.BinaryBytes(*valInfo) -} +// MakeBlock builds a block with the given txs and commit from the current state. +func (s State) MakeBlock(height int64, txs []types.Tx, commit *types.Commit) (*types.Block, *types.PartSet) { + // build base block + block := types.MakeBlock(height, txs, commit) -//----------------------------------------------------------------------------- - -// ConsensusParamsInfo represents the latest consensus params, or the last height it changed -type ConsensusParamsInfo struct { - ConsensusParams types.ConsensusParams - LastHeightChanged int64 -} + // fill header with state data + block.ChainID = s.ChainID + block.TotalTxs = s.LastBlockTotalTx + block.NumTxs + block.LastBlockID = s.LastBlockID + block.ValidatorsHash = s.Validators.Hash() + block.AppHash = s.AppHash + block.ConsensusHash = s.ConsensusParams.Hash() + block.LastResultsHash = s.LastResultsHash -// Bytes serializes the ConsensusParamsInfo using go-wire -func (params ConsensusParamsInfo) Bytes() []byte { - return wire.BinaryBytes(params) + return block, block.MakePartSet(s.ConsensusParams.BlockGossip.BlockPartSizeBytes) } //------------------------------------------------------------------------ @@ -480,12 +126,12 @@ func (params ConsensusParamsInfo) Bytes() []byte { // file. // // Used during replay and in tests. -func MakeGenesisStateFromFile(db dbm.DB, genDocFile string) (*State, error) { +func MakeGenesisStateFromFile(genDocFile string) (State, error) { genDoc, err := MakeGenesisDocFromFile(genDocFile) if err != nil { - return nil, err + return State{}, err } - return MakeGenesisState(db, genDoc) + return MakeGenesisState(genDoc) } // MakeGenesisDocFromFile reads and unmarshals genesis doc from the given file. @@ -502,10 +148,10 @@ func MakeGenesisDocFromFile(genDocFile string) (*types.GenesisDoc, error) { } // MakeGenesisState creates state from types.GenesisDoc. -func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) { +func MakeGenesisState(genDoc *types.GenesisDoc) (State, error) { err := genDoc.ValidateAndComplete() if err != nil { - return nil, fmt.Errorf("Error in genesis file: %v", err) + return State{}, fmt.Errorf("Error in genesis file: %v", err) } // Make validators slice @@ -522,8 +168,7 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) { } } - return &State{ - db: db, + return State{ ChainID: genDoc.ChainID, diff --git a/state/state_test.go b/state/state_test.go index b1adc0d02..61b3167b3 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -14,19 +14,17 @@ import ( cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" - "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/types" ) // setupTestCase does setup common to all test cases -func setupTestCase(t *testing.T) (func(t *testing.T), dbm.DB, *State) { +func setupTestCase(t *testing.T) (func(t *testing.T), dbm.DB, State) { config := cfg.ResetTestRoot("state_") stateDB := dbm.NewDB("state", config.DBBackend, config.DBDir()) - state, err := GetState(stateDB, config.GenesisFile()) - assert.NoError(t, err, "expected no error on GetState") - state.SetLogger(log.TestingLogger()) + state, err := LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile()) + assert.NoError(t, err, "expected no error on LoadStateFromDBOrGenesisFile") tearDown := func(t *testing.T) {} @@ -59,7 +57,7 @@ func TestStateSaveLoad(t *testing.T) { assert := assert.New(t) state.LastBlockHeight++ - state.Save() + SaveState(stateDB, state) loadedState := LoadState(stateDB) assert.True(state.Equals(loadedState), @@ -69,7 +67,7 @@ func TestStateSaveLoad(t *testing.T) { // TestABCIResponsesSaveLoad tests saving and loading ABCIResponses. func TestABCIResponsesSaveLoad1(t *testing.T) { - tearDown, _, state := setupTestCase(t) + tearDown, stateDB, state := setupTestCase(t) defer tearDown(t) // nolint: vetshadow assert := assert.New(t) @@ -88,8 +86,8 @@ func TestABCIResponsesSaveLoad1(t *testing.T) { }, }} - state.SaveABCIResponses(block.Height, abciResponses) - loadedAbciResponses, err := state.LoadABCIResponses(block.Height) + saveABCIResponses(stateDB, block.Height, abciResponses) + loadedAbciResponses, err := LoadABCIResponses(stateDB, block.Height) assert.Nil(err) assert.Equal(abciResponses, loadedAbciResponses, cmn.Fmt(`ABCIResponses don't match: Got %v, Expected %v`, loadedAbciResponses, @@ -98,7 +96,7 @@ func TestABCIResponsesSaveLoad1(t *testing.T) { // TestResultsSaveLoad tests saving and loading abci results. func TestABCIResponsesSaveLoad2(t *testing.T) { - tearDown, _, state := setupTestCase(t) + tearDown, stateDB, _ := setupTestCase(t) defer tearDown(t) // nolint: vetshadow assert := assert.New(t) @@ -142,7 +140,7 @@ func TestABCIResponsesSaveLoad2(t *testing.T) { // query all before, should return error for i := range cases { h := int64(i + 1) - res, err := state.LoadABCIResponses(h) + res, err := LoadABCIResponses(stateDB, h) assert.Error(err, "%d: %#v", i, res) } @@ -153,13 +151,13 @@ func TestABCIResponsesSaveLoad2(t *testing.T) { DeliverTx: tc.added, EndBlock: &abci.ResponseEndBlock{}, } - state.SaveABCIResponses(h, responses) + saveABCIResponses(stateDB, h, responses) } // query all before, should return expected value for i, tc := range cases { h := int64(i + 1) - res, err := state.LoadABCIResponses(h) + res, err := LoadABCIResponses(stateDB, h) assert.NoError(err, "%d", i) assert.Equal(tc.expected.Hash(), res.ResultsHash(), "%d", i) } @@ -167,54 +165,57 @@ func TestABCIResponsesSaveLoad2(t *testing.T) { // TestValidatorSimpleSaveLoad tests saving and loading validators. func TestValidatorSimpleSaveLoad(t *testing.T) { - tearDown, _, state := setupTestCase(t) + tearDown, stateDB, state := setupTestCase(t) defer tearDown(t) // nolint: vetshadow assert := assert.New(t) // can't load anything for height 0 - v, err := state.LoadValidators(0) + v, err := LoadValidators(stateDB, 0) assert.IsType(ErrNoValSetForHeight{}, err, "expected err at height 0") // should be able to load for height 1 - v, err = state.LoadValidators(1) + v, err = LoadValidators(stateDB, 1) assert.Nil(err, "expected no err at height 1") assert.Equal(v.Hash(), state.Validators.Hash(), "expected validator hashes to match") // increment height, save; should be able to load for next height state.LastBlockHeight++ - state.saveValidatorsInfo() - v, err = state.LoadValidators(state.LastBlockHeight + 1) + nextHeight := state.LastBlockHeight + 1 + saveValidatorsInfo(stateDB, nextHeight, state.LastHeightValidatorsChanged, state.Validators) + v, err = LoadValidators(stateDB, nextHeight) assert.Nil(err, "expected no err") assert.Equal(v.Hash(), state.Validators.Hash(), "expected validator hashes to match") // increment height, save; should be able to load for next height state.LastBlockHeight += 10 - state.saveValidatorsInfo() - v, err = state.LoadValidators(state.LastBlockHeight + 1) + nextHeight = state.LastBlockHeight + 1 + saveValidatorsInfo(stateDB, nextHeight, state.LastHeightValidatorsChanged, state.Validators) + v, err = LoadValidators(stateDB, nextHeight) assert.Nil(err, "expected no err") assert.Equal(v.Hash(), state.Validators.Hash(), "expected validator hashes to match") // should be able to load for next next height - _, err = state.LoadValidators(state.LastBlockHeight + 2) + _, err = LoadValidators(stateDB, state.LastBlockHeight+2) assert.IsType(ErrNoValSetForHeight{}, err, "expected err at unknown height") } // TestValidatorChangesSaveLoad tests saving and loading a validator set with changes. func TestOneValidatorChangesSaveLoad(t *testing.T) { - tearDown, _, state := setupTestCase(t) + tearDown, stateDB, state := setupTestCase(t) defer tearDown(t) // change vals at these heights changeHeights := []int64{1, 2, 4, 5, 10, 15, 16, 17, 20} N := len(changeHeights) - // build the validator history by running SetBlockAndValidators + // build the validator history by running updateState // with the right validator set for each height highestHeight := changeHeights[N-1] + 5 changeIndex := 0 _, val := state.Validators.GetByIndex(0) power := val.VotingPower + var err error for i := int64(1); i < highestHeight; i++ { // when we get to a change height, // use the next pubkey @@ -222,10 +223,11 @@ func TestOneValidatorChangesSaveLoad(t *testing.T) { changeIndex++ power += 1 } - header, parts, responses := makeHeaderPartsResponsesValPowerChange(state, i, power) - err := state.SetBlockAndValidators(header, parts, responses) + header, blockID, responses := makeHeaderPartsResponsesValPowerChange(state, i, power) + state, err = updateState(state, blockID, header, responses) assert.Nil(t, err) - state.saveValidatorsInfo() + nextHeight := state.LastBlockHeight + 1 + saveValidatorsInfo(stateDB, nextHeight, state.LastHeightValidatorsChanged, state.Validators) } // on each change height, increment the power by one. @@ -243,7 +245,7 @@ func TestOneValidatorChangesSaveLoad(t *testing.T) { } for i, power := range testCases { - v, err := state.LoadValidators(int64(i + 1)) + v, err := LoadValidators(stateDB, int64(i+1)) assert.Nil(t, err, fmt.Sprintf("expected no err at height %d", i)) assert.Equal(t, v.Size(), 1, "validator set size is greater than 1: %d", v.Size()) _, val := v.GetByIndex(0) @@ -257,20 +259,22 @@ func TestOneValidatorChangesSaveLoad(t *testing.T) { // changes. func TestManyValidatorChangesSaveLoad(t *testing.T) { const valSetSize = 7 - tearDown, _, state := setupTestCase(t) + tearDown, stateDB, state := setupTestCase(t) state.Validators = genValSet(valSetSize) - state.Save() + SaveState(stateDB, state) defer tearDown(t) const height = 1 pubkey := crypto.GenPrivKeyEd25519().PubKey() // swap the first validator with a new one ^^^ (validator set size stays the same) - header, parts, responses := makeHeaderPartsResponsesValPubKeyChange(state, height, pubkey) - err := state.SetBlockAndValidators(header, parts, responses) + header, blockID, responses := makeHeaderPartsResponsesValPubKeyChange(state, height, pubkey) + var err error + state, err = updateState(state, blockID, header, responses) require.Nil(t, err) - state.saveValidatorsInfo() + nextHeight := state.LastBlockHeight + 1 + saveValidatorsInfo(stateDB, nextHeight, state.LastHeightValidatorsChanged, state.Validators) - v, err := state.LoadValidators(height + 1) + v, err := LoadValidators(stateDB, height+1) assert.Nil(t, err) assert.Equal(t, valSetSize, v.Size()) @@ -292,7 +296,7 @@ func genValSet(size int) *types.ValidatorSet { // TestConsensusParamsChangesSaveLoad tests saving and loading consensus params // with changes. func TestConsensusParamsChangesSaveLoad(t *testing.T) { - tearDown, _, state := setupTestCase(t) + tearDown, stateDB, state := setupTestCase(t) defer tearDown(t) // change vals at these heights @@ -308,11 +312,12 @@ func TestConsensusParamsChangesSaveLoad(t *testing.T) { params[i].BlockSize.MaxBytes += i } - // build the params history by running SetBlockAndValidators + // build the params history by running updateState // with the right params set for each height highestHeight := changeHeights[N-1] + 5 changeIndex := 0 cp := params[changeIndex] + var err error for i := int64(1); i < highestHeight; i++ { // when we get to a change height, // use the next params @@ -320,10 +325,12 @@ func TestConsensusParamsChangesSaveLoad(t *testing.T) { changeIndex++ cp = params[changeIndex] } - header, parts, responses := makeHeaderPartsResponsesParams(state, i, cp) - err := state.SetBlockAndValidators(header, parts, responses) + header, blockID, responses := makeHeaderPartsResponsesParams(state, i, cp) + state, err = updateState(state, blockID, header, responses) + require.Nil(t, err) - state.saveConsensusParamsInfo() + nextHeight := state.LastBlockHeight + 1 + saveConsensusParamsInfo(stateDB, nextHeight, state.LastHeightConsensusParamsChanged, state.ConsensusParams) } // make all the test cases by using the same params until after the change @@ -341,7 +348,7 @@ func TestConsensusParamsChangesSaveLoad(t *testing.T) { } for _, testCase := range testCases { - p, err := state.LoadConsensusParams(testCase.height) + p, err := LoadConsensusParams(stateDB, testCase.height) assert.Nil(t, err, fmt.Sprintf("expected no err at height %d", testCase.height)) assert.Equal(t, testCase.params, p, fmt.Sprintf(`unexpected consensus params at height %d`, testCase.height)) @@ -416,15 +423,15 @@ func TestLessThanOneThirdOfVotingPowerPerBlockEnforced(t *testing.T) { } for i, tc := range testCases { - tearDown, _, state := setupTestCase(t) + tearDown, stateDB, state := setupTestCase(t) state.Validators = genValSet(tc.initialValSetSize) - state.Save() + SaveState(stateDB, state) height := state.LastBlockHeight + 1 block := makeBlock(state, height) abciResponses := &ABCIResponses{ EndBlock: &abci.ResponseEndBlock{ValidatorUpdates: tc.valUpdatesFn(state.Validators)}, } - err := state.SetBlockAndValidators(block.Header, types.PartSetHeader{}, abciResponses) + state, err := updateState(state, types.BlockID{block.Hash(), types.PartSetHeader{}}, block.Header, abciResponses) if tc.shouldErr { assert.Error(t, err, "#%d", i) } else { @@ -484,8 +491,8 @@ func TestApplyUpdates(t *testing.T) { } } -func makeHeaderPartsResponsesValPubKeyChange(state *State, height int64, - pubkey crypto.PubKey) (*types.Header, types.PartSetHeader, *ABCIResponses) { +func makeHeaderPartsResponsesValPubKeyChange(state State, height int64, + pubkey crypto.PubKey) (*types.Header, types.BlockID, *ABCIResponses) { block := makeBlock(state, height) abciResponses := &ABCIResponses{ @@ -503,11 +510,11 @@ func makeHeaderPartsResponsesValPubKeyChange(state *State, height int64, } } - return block.Header, types.PartSetHeader{}, abciResponses + return block.Header, types.BlockID{block.Hash(), types.PartSetHeader{}}, abciResponses } -func makeHeaderPartsResponsesValPowerChange(state *State, height int64, - power int64) (*types.Header, types.PartSetHeader, *ABCIResponses) { +func makeHeaderPartsResponsesValPowerChange(state State, height int64, + power int64) (*types.Header, types.BlockID, *ABCIResponses) { block := makeBlock(state, height) abciResponses := &ABCIResponses{ @@ -524,17 +531,17 @@ func makeHeaderPartsResponsesValPowerChange(state *State, height int64, } } - return block.Header, types.PartSetHeader{}, abciResponses + return block.Header, types.BlockID{block.Hash(), types.PartSetHeader{}}, abciResponses } -func makeHeaderPartsResponsesParams(state *State, height int64, - params types.ConsensusParams) (*types.Header, types.PartSetHeader, *ABCIResponses) { +func makeHeaderPartsResponsesParams(state State, height int64, + params types.ConsensusParams) (*types.Header, types.BlockID, *ABCIResponses) { block := makeBlock(state, height) abciResponses := &ABCIResponses{ EndBlock: &abci.ResponseEndBlock{ConsensusParamUpdates: types.TM2PB.ConsensusParams(¶ms)}, } - return block.Header, types.PartSetHeader{}, abciResponses + return block.Header, types.BlockID{block.Hash(), types.PartSetHeader{}}, abciResponses } type paramsChangeTestCase struct { @@ -542,13 +549,13 @@ type paramsChangeTestCase struct { params types.ConsensusParams } -func makeHeaderPartsResults(state *State, height int64, - results []*abci.ResponseDeliverTx) (*types.Header, types.PartSetHeader, *ABCIResponses) { +func makeHeaderPartsResults(state State, height int64, + results []*abci.ResponseDeliverTx) (*types.Header, types.BlockID, *ABCIResponses) { block := makeBlock(state, height) abciResponses := &ABCIResponses{ DeliverTx: results, EndBlock: &abci.ResponseEndBlock{}, } - return block.Header, types.PartSetHeader{}, abciResponses + return block.Header, types.BlockID{block.Hash(), types.PartSetHeader{}}, abciResponses } diff --git a/state/store.go b/state/store.go new file mode 100644 index 000000000..de2d4d67c --- /dev/null +++ b/state/store.go @@ -0,0 +1,282 @@ +package state + +import ( + "bytes" + "fmt" + + abci "github.com/tendermint/abci/types" + wire "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tmlibs/common" + dbm "github.com/tendermint/tmlibs/db" +) + +//------------------------------------------------------------------------ + +func calcValidatorsKey(height int64) []byte { + return []byte(cmn.Fmt("validatorsKey:%v", height)) +} + +func calcConsensusParamsKey(height int64) []byte { + return []byte(cmn.Fmt("consensusParamsKey:%v", height)) +} + +func calcABCIResponsesKey(height int64) []byte { + return []byte(cmn.Fmt("abciResponsesKey:%v", height)) +} + +// LoadStateFromDBOrGenesisFile loads the most recent state from the database, +// or creates a new one from the given genesisFilePath and persists the result +// to the database. +func LoadStateFromDBOrGenesisFile(stateDB dbm.DB, genesisFilePath string) (State, error) { + state := LoadState(stateDB) + if state.IsEmpty() { + var err error + state, err = MakeGenesisStateFromFile(genesisFilePath) + if err != nil { + return state, err + } + SaveState(stateDB, state) + } + + return state, nil +} + +// LoadStateFromDBOrGenesisDoc loads the most recent state from the database, +// or creates a new one from the given genesisDoc and persists the result +// to the database. +func LoadStateFromDBOrGenesisDoc(stateDB dbm.DB, genesisDoc *types.GenesisDoc) (State, error) { + state := LoadState(stateDB) + if state.IsEmpty() { + var err error + state, err = MakeGenesisState(genesisDoc) + if err != nil { + return state, err + } + SaveState(stateDB, state) + } + + return state, nil +} + +// LoadState loads the State from the database. +func LoadState(db dbm.DB) State { + return loadState(db, stateKey) +} + +func loadState(db dbm.DB, key []byte) (state State) { + buf := db.Get(key) + if len(buf) == 0 { + return state + } + + r, n, err := bytes.NewReader(buf), new(int), new(error) + wire.ReadBinaryPtr(&state, r, 0, n, err) + if *err != nil { + // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED + cmn.Exit(cmn.Fmt(`LoadState: Data has been corrupted or its spec has changed: + %v\n`, *err)) + } + // TODO: ensure that buf is completely read. + + return state +} + +// SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database. +func SaveState(db dbm.DB, s State) { + saveState(db, s, stateKey) +} + +func saveState(db dbm.DB, s State, key []byte) { + nextHeight := s.LastBlockHeight + 1 + saveValidatorsInfo(db, nextHeight, s.LastHeightValidatorsChanged, s.Validators) + saveConsensusParamsInfo(db, nextHeight, s.LastHeightConsensusParamsChanged, s.ConsensusParams) + db.SetSync(stateKey, s.Bytes()) +} + +//------------------------------------------------------------------------ + +// ABCIResponses retains the responses +// of the various ABCI calls during block processing. +// It is persisted to disk for each height before calling Commit. +type ABCIResponses struct { + DeliverTx []*abci.ResponseDeliverTx + EndBlock *abci.ResponseEndBlock +} + +// NewABCIResponses returns a new ABCIResponses +func NewABCIResponses(block *types.Block) *ABCIResponses { + return &ABCIResponses{ + DeliverTx: make([]*abci.ResponseDeliverTx, block.NumTxs), + } +} + +// Bytes serializes the ABCIResponse using go-wire +func (a *ABCIResponses) Bytes() []byte { + return wire.BinaryBytes(*a) +} + +func (a *ABCIResponses) ResultsHash() []byte { + results := types.NewResults(a.DeliverTx) + return results.Hash() +} + +// LoadABCIResponses loads the ABCIResponses for the given height from the database. +// This is useful for recovering from crashes where we called app.Commit and before we called +// s.Save(). It can also be used to produce Merkle proofs of the result of txs. +func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) { + buf := db.Get(calcABCIResponsesKey(height)) + if len(buf) == 0 { + return nil, ErrNoABCIResponsesForHeight{height} + } + + abciResponses := new(ABCIResponses) + r, n, err := bytes.NewReader(buf), new(int), new(error) + wire.ReadBinaryPtr(abciResponses, r, 0, n, err) + if *err != nil { + // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED + cmn.Exit(cmn.Fmt(`LoadABCIResponses: Data has been corrupted or its spec has + changed: %v\n`, *err)) + } + // TODO: ensure that buf is completely read. + + return abciResponses, nil +} + +// SaveABCIResponses persists the ABCIResponses to the database. +// This is useful in case we crash after app.Commit and before s.Save(). +// Responses are indexed by height so they can also be loaded later to produce Merkle proofs. +func saveABCIResponses(db dbm.DB, height int64, abciResponses *ABCIResponses) { + db.SetSync(calcABCIResponsesKey(height), abciResponses.Bytes()) +} + +//----------------------------------------------------------------------------- + +// ValidatorsInfo represents the latest validator set, or the last height it changed +type ValidatorsInfo struct { + ValidatorSet *types.ValidatorSet + LastHeightChanged int64 +} + +// Bytes serializes the ValidatorsInfo using go-wire +func (valInfo *ValidatorsInfo) Bytes() []byte { + return wire.BinaryBytes(*valInfo) +} + +// LoadValidators loads the ValidatorSet for a given height. +// Returns ErrNoValSetForHeight if the validator set can't be found for this height. +func LoadValidators(db dbm.DB, height int64) (*types.ValidatorSet, error) { + valInfo := loadValidatorsInfo(db, height) + if valInfo == nil { + return nil, ErrNoValSetForHeight{height} + } + + if valInfo.ValidatorSet == nil { + valInfo = loadValidatorsInfo(db, valInfo.LastHeightChanged) + if valInfo == nil { + cmn.PanicSanity(fmt.Sprintf(`Couldn't find validators at height %d as + last changed from height %d`, valInfo.LastHeightChanged, height)) + } + } + + return valInfo.ValidatorSet, nil +} + +func loadValidatorsInfo(db dbm.DB, height int64) *ValidatorsInfo { + buf := db.Get(calcValidatorsKey(height)) + if len(buf) == 0 { + return nil + } + + v := new(ValidatorsInfo) + r, n, err := bytes.NewReader(buf), new(int), new(error) + wire.ReadBinaryPtr(v, r, 0, n, err) + if *err != nil { + // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED + cmn.Exit(cmn.Fmt(`LoadValidators: Data has been corrupted or its spec has changed: + %v\n`, *err)) + } + // TODO: ensure that buf is completely read. + + return v +} + +// saveValidatorsInfo persists the validator set for the next block to disk. +// It should be called from s.Save(), right before the state itself is persisted. +// If the validator set did not change after processing the latest block, +// only the last height for which the validators changed is persisted. +func saveValidatorsInfo(db dbm.DB, nextHeight, changeHeight int64, valSet *types.ValidatorSet) { + valInfo := &ValidatorsInfo{ + LastHeightChanged: changeHeight, + } + if changeHeight == nextHeight { + valInfo.ValidatorSet = valSet + } + db.SetSync(calcValidatorsKey(nextHeight), valInfo.Bytes()) +} + +//----------------------------------------------------------------------------- + +// ConsensusParamsInfo represents the latest consensus params, or the last height it changed +type ConsensusParamsInfo struct { + ConsensusParams types.ConsensusParams + LastHeightChanged int64 +} + +// Bytes serializes the ConsensusParamsInfo using go-wire +func (params ConsensusParamsInfo) Bytes() []byte { + return wire.BinaryBytes(params) +} + +// LoadConsensusParams loads the ConsensusParams for a given height. +func LoadConsensusParams(db dbm.DB, height int64) (types.ConsensusParams, error) { + empty := types.ConsensusParams{} + + paramsInfo := loadConsensusParamsInfo(db, height) + if paramsInfo == nil { + return empty, ErrNoConsensusParamsForHeight{height} + } + + if paramsInfo.ConsensusParams == empty { + paramsInfo = loadConsensusParamsInfo(db, paramsInfo.LastHeightChanged) + if paramsInfo == nil { + cmn.PanicSanity(fmt.Sprintf(`Couldn't find consensus params at height %d as + last changed from height %d`, paramsInfo.LastHeightChanged, height)) + } + } + + return paramsInfo.ConsensusParams, nil +} + +func loadConsensusParamsInfo(db dbm.DB, height int64) *ConsensusParamsInfo { + buf := db.Get(calcConsensusParamsKey(height)) + if len(buf) == 0 { + return nil + } + + paramsInfo := new(ConsensusParamsInfo) + r, n, err := bytes.NewReader(buf), new(int), new(error) + wire.ReadBinaryPtr(paramsInfo, r, 0, n, err) + if *err != nil { + // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED + cmn.Exit(cmn.Fmt(`LoadConsensusParams: Data has been corrupted or its spec has changed: + %v\n`, *err)) + } + // TODO: ensure that buf is completely read. + + return paramsInfo +} + +// saveConsensusParamsInfo persists the consensus params for the next block to disk. +// It should be called from s.Save(), right before the state itself is persisted. +// If the consensus params did not change after processing the latest block, +// only the last height for which they changed is persisted. +func saveConsensusParamsInfo(db dbm.DB, nextHeight, changeHeight int64, params types.ConsensusParams) { + paramsInfo := &ConsensusParamsInfo{ + LastHeightChanged: changeHeight, + } + if changeHeight == nextHeight { + paramsInfo.ConsensusParams = params + } + db.SetSync(calcConsensusParamsKey(nextHeight), paramsInfo.Bytes()) +} diff --git a/state/validation.go b/state/validation.go new file mode 100644 index 000000000..fb3e8d13d --- /dev/null +++ b/state/validation.go @@ -0,0 +1,122 @@ +package state + +import ( + "bytes" + "errors" + "fmt" + + "github.com/tendermint/tendermint/types" + dbm "github.com/tendermint/tmlibs/db" +) + +//----------------------------------------------------- +// Validate block + +func validateBlock(stateDB dbm.DB, s State, b *types.Block) error { + // validate internal consistency + if err := b.ValidateBasic(); err != nil { + return err + } + + // validate basic info + if b.ChainID != s.ChainID { + return fmt.Errorf("Wrong Block.Header.ChainID. Expected %v, got %v", s.ChainID, b.ChainID) + } + if b.Height != s.LastBlockHeight+1 { + return fmt.Errorf("Wrong Block.Header.Height. Expected %v, got %v", s.LastBlockHeight+1, b.Height) + } + /* TODO: Determine bounds for Time + See blockchain/reactor "stopSyncingDurationMinutes" + + if !b.Time.After(lastBlockTime) { + return errors.New("Invalid Block.Header.Time") + } + */ + + // validate prev block info + if !b.LastBlockID.Equals(s.LastBlockID) { + return fmt.Errorf("Wrong Block.Header.LastBlockID. Expected %v, got %v", s.LastBlockID, b.LastBlockID) + } + newTxs := int64(len(b.Data.Txs)) + if b.TotalTxs != s.LastBlockTotalTx+newTxs { + return fmt.Errorf("Wrong Block.Header.TotalTxs. Expected %v, got %v", s.LastBlockTotalTx+newTxs, b.TotalTxs) + } + + // validate app info + if !bytes.Equal(b.AppHash, s.AppHash) { + return fmt.Errorf("Wrong Block.Header.AppHash. Expected %X, got %v", s.AppHash, b.AppHash) + } + if !bytes.Equal(b.ConsensusHash, s.ConsensusParams.Hash()) { + return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %v", s.ConsensusParams.Hash(), b.ConsensusHash) + } + if !bytes.Equal(b.LastResultsHash, s.LastResultsHash) { + return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %v", s.LastResultsHash, b.LastResultsHash) + } + if !bytes.Equal(b.ValidatorsHash, s.Validators.Hash()) { + return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %v", s.Validators.Hash(), b.ValidatorsHash) + } + + // Validate block LastCommit. + if b.Height == 1 { + if len(b.LastCommit.Precommits) != 0 { + return errors.New("Block at height 1 (first block) should have no LastCommit precommits") + } + } else { + if len(b.LastCommit.Precommits) != s.LastValidators.Size() { + return fmt.Errorf("Invalid block commit size. Expected %v, got %v", + s.LastValidators.Size(), len(b.LastCommit.Precommits)) + } + err := s.LastValidators.VerifyCommit( + s.ChainID, s.LastBlockID, b.Height-1, b.LastCommit) + if err != nil { + return err + } + } + + for _, ev := range b.Evidence.Evidence { + if err := VerifyEvidence(stateDB, s, ev); err != nil { + return types.NewEvidenceInvalidErr(ev, err) + } + } + + return nil +} + +// XXX: What's cheaper (ie. what should be checked first): +// evidence internal validity (ie. sig checks) or validator existed (fetch historical val set from db) + +// VerifyEvidence verifies the evidence fully by checking it is internally +// consistent and sufficiently recent. +func VerifyEvidence(stateDB dbm.DB, s State, evidence types.Evidence) error { + height := s.LastBlockHeight + + evidenceAge := height - evidence.Height() + maxAge := s.ConsensusParams.EvidenceParams.MaxAge + if evidenceAge > maxAge { + return fmt.Errorf("Evidence from height %d is too old. Min height is %d", + evidence.Height(), height-maxAge) + } + + if err := evidence.Verify(s.ChainID); err != nil { + return err + } + + valset, err := LoadValidators(stateDB, evidence.Height()) + if err != nil { + // TODO: if err is just that we cant find it cuz we pruned, ignore. + // TODO: if its actually bad evidence, punish peer + return err + } + + // The address must have been an active validator at the height + ev := evidence + height, addr, idx := ev.Height(), ev.Address(), ev.Index() + valIdx, val := valset.GetByAddress(addr) + if val == nil { + return fmt.Errorf("Address %X was not a validator at height %d", addr, height) + } else if idx != valIdx { + return fmt.Errorf("Address %X was validator %d at height %d, not %d", addr, valIdx, height, idx) + } + + return nil +} diff --git a/state/validation_test.go b/state/validation_test.go new file mode 100644 index 000000000..e0b7fe9ee --- /dev/null +++ b/state/validation_test.go @@ -0,0 +1,68 @@ +package state + +import ( + "testing" + + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tmlibs/db" + "github.com/tendermint/tmlibs/log" +) + +func TestValidateBlock(t *testing.T) { + state := state() + + blockExec := NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), nil, nil, nil) + + // proper block must pass + block := makeBlock(state, 1) + err := blockExec.ValidateBlock(state, block) + require.NoError(t, err) + + // wrong chain fails + block = makeBlock(state, 1) + block.ChainID = "not-the-real-one" + err = blockExec.ValidateBlock(state, block) + require.Error(t, err) + + // wrong height fails + block = makeBlock(state, 1) + block.Height += 10 + err = blockExec.ValidateBlock(state, block) + require.Error(t, err) + + // wrong total tx fails + block = makeBlock(state, 1) + block.TotalTxs += 10 + err = blockExec.ValidateBlock(state, block) + require.Error(t, err) + + // wrong blockid fails + block = makeBlock(state, 1) + block.LastBlockID.PartsHeader.Total += 10 + err = blockExec.ValidateBlock(state, block) + require.Error(t, err) + + // wrong app hash fails + block = makeBlock(state, 1) + block.AppHash = []byte("wrong app hash") + err = blockExec.ValidateBlock(state, block) + require.Error(t, err) + + // wrong consensus hash fails + block = makeBlock(state, 1) + block.ConsensusHash = []byte("wrong consensus hash") + err = blockExec.ValidateBlock(state, block) + require.Error(t, err) + + // wrong results hash fails + block = makeBlock(state, 1) + block.LastResultsHash = []byte("wrong results hash") + err = blockExec.ValidateBlock(state, block) + require.Error(t, err) + + // wrong validators hash fails + block = makeBlock(state, 1) + block.ValidatorsHash = []byte("wrong validators hash") + err = blockExec.ValidateBlock(state, block) + require.Error(t, err) +} diff --git a/types/events.go b/types/events.go index 5c41c6df6..d6f7b012c 100644 --- a/types/events.go +++ b/types/events.go @@ -175,6 +175,13 @@ func QueryForEvent(eventType string) tmpubsub.Query { return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventType)) } +// BlockEventPublisher publishes all block related events +type BlockEventPublisher interface { + PublishEventNewBlock(block EventDataNewBlock) error + PublishEventNewBlockHeader(header EventDataNewBlockHeader) error + PublishEventTx(EventDataTx) error +} + type TxEventPublisher interface { PublishEventTx(EventDataTx) error } diff --git a/types/services.go b/types/services.go index 787b1b99e..6900fae7d 100644 --- a/types/services.go +++ b/types/services.go @@ -70,15 +70,6 @@ type BlockStore interface { SaveBlock(block *Block, blockParts *PartSet, seenCommit *Commit) } -//------------------------------------------------------ -// state - -// State defines the stateful interface used to verify evidence. -// UNSTABLE -type State interface { - VerifyEvidence(Evidence) (priority int64, err error) -} - //------------------------------------------------------ // evidence pool @@ -87,7 +78,7 @@ type State interface { type EvidencePool interface { PendingEvidence() []Evidence AddEvidence(Evidence) error - MarkEvidenceAsCommitted([]Evidence) + Update(*Block) } // MockMempool is an empty implementation of a Mempool, useful for testing. @@ -95,6 +86,6 @@ type EvidencePool interface { type MockEvidencePool struct { } -func (m MockEvidencePool) PendingEvidence() []Evidence { return nil } -func (m MockEvidencePool) AddEvidence(Evidence) error { return nil } -func (m MockEvidencePool) MarkEvidenceAsCommitted([]Evidence) {} +func (m MockEvidencePool) PendingEvidence() []Evidence { return nil } +func (m MockEvidencePool) AddEvidence(Evidence) error { return nil } +func (m MockEvidencePool) Update(*Block) {}