From a456b71f1fb4a55223fc6ed6d6e37091e73fdea3 Mon Sep 17 00:00:00 2001 From: JayT106 Date: Thu, 10 Jun 2021 10:24:47 -0400 Subject: [PATCH] state: move pruneBlocks from consensus/state to state/execution (#6541) state: move pruneBlocks function from consensus/state to state/execution Closes #5414 --- CHANGELOG_PENDING.md | 1 + internal/blockchain/v0/reactor.go | 2 +- internal/blockchain/v0/reactor_test.go | 3 +- internal/blockchain/v2/processor_context.go | 2 +- internal/blockchain/v2/reactor.go | 2 +- internal/blockchain/v2/reactor_test.go | 18 +++--- internal/consensus/byzantine_test.go | 2 +- internal/consensus/common_test.go | 2 +- internal/consensus/reactor_test.go | 2 +- internal/consensus/replay.go | 6 +- internal/consensus/replay_file.go | 2 +- internal/consensus/replay_test.go | 30 +++++---- internal/consensus/state.go | 34 +--------- internal/consensus/wal_generator.go | 2 +- node/node.go | 1 + node/node_test.go | 5 ++ state/execution.go | 70 +++++++++++++++------ state/execution_test.go | 24 ++++--- state/helpers_test.go | 2 +- state/validation_test.go | 8 +++ 20 files changed, 123 insertions(+), 95 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 2ce47eb6b..616eb8297 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -64,6 +64,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - Data Storage - [store/state/evidence/light] \#5771 Use an order-preserving varint key encoding (@cmwaters) - [mempool] \#6396 Remove mempool's write ahead log (WAL), (previously unused by the tendermint code). (@tychoish) + - [state] \#6541 Move pruneBlocks from consensus/state to state/execution. (@JayT106) - Tooling - [tools] \#6498 Set OS home dir to instead of the hardcoded PATH. (@JayT106) diff --git a/internal/blockchain/v0/reactor.go b/internal/blockchain/v0/reactor.go index 014f39f83..aa8208914 100644 --- a/internal/blockchain/v0/reactor.go +++ b/internal/blockchain/v0/reactor.go @@ -554,7 +554,7 @@ FOR_LOOP: // TODO: Same thing for app - but we would need a way to get the hash // without persisting the state. - state, _, err = r.blockExec.ApplyBlock(state, firstID, first) + state, err = r.blockExec.ApplyBlock(state, firstID, first) if err != nil { // TODO: This is bad, are we zombie? panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) diff --git a/internal/blockchain/v0/reactor_test.go b/internal/blockchain/v0/reactor_test.go index 4add435a3..ca14b30bd 100644 --- a/internal/blockchain/v0/reactor_test.go +++ b/internal/blockchain/v0/reactor_test.go @@ -113,6 +113,7 @@ func (rts *reactorTestSuite) addNode(t *testing.T, rts.app[nodeID].Consensus(), mock.Mempool{}, sm.EmptyEvidencePool{}, + blockStore, ) for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { @@ -143,7 +144,7 @@ func (rts *reactorTestSuite) addNode(t *testing.T, thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} - state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock) + state, err = blockExec.ApplyBlock(state, blockID, thisBlock) require.NoError(t, err) blockStore.SaveBlock(thisBlock, thisParts, lastCommit) diff --git a/internal/blockchain/v2/processor_context.go b/internal/blockchain/v2/processor_context.go index 6a0466550..7385bcc6e 100644 --- a/internal/blockchain/v2/processor_context.go +++ b/internal/blockchain/v2/processor_context.go @@ -30,7 +30,7 @@ func newProcessorContext(st blockStore, ex blockApplier, s state.State) *pContex } func (pc *pContext) applyBlock(blockID types.BlockID, block *types.Block) error { - newState, _, err := pc.applier.ApplyBlock(pc.state, blockID, block) + newState, err := pc.applier.ApplyBlock(pc.state, blockID, block) pc.state = newState return err } diff --git a/internal/blockchain/v2/reactor.go b/internal/blockchain/v2/reactor.go index 5ce08c7d0..50c9fa565 100644 --- a/internal/blockchain/v2/reactor.go +++ b/internal/blockchain/v2/reactor.go @@ -50,7 +50,7 @@ type BlockchainReactor struct { } type blockApplier interface { - ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, int64, error) + ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, error) } // XXX: unify naming in this package around tmState diff --git a/internal/blockchain/v2/reactor_test.go b/internal/blockchain/v2/reactor_test.go index 3e3755ef5..69ba1ed19 100644 --- a/internal/blockchain/v2/reactor_test.go +++ b/internal/blockchain/v2/reactor_test.go @@ -25,7 +25,7 @@ import ( bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/store" + tmstore "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" ) @@ -84,9 +84,9 @@ type mockBlockApplier struct { // XXX: Add whitelist/blacklist? func (mba *mockBlockApplier) ApplyBlock( state sm.State, blockID types.BlockID, block *types.Block, -) (sm.State, int64, error) { +) (sm.State, error) { state.LastBlockHeight++ - return state, 0, nil + return state, nil } type mockSwitchIo struct { @@ -167,7 +167,9 @@ func newTestReactor(t *testing.T, p testReactorParams) *BlockchainReactor { require.NoError(t, err) db := dbm.NewMemDB() stateStore := sm.NewStore(db) - appl = sm.NewBlockExecutor(stateStore, p.logger, proxyApp.Consensus(), mock.Mempool{}, sm.EmptyEvidencePool{}) + blockStore := tmstore.NewBlockStore(dbm.NewMemDB()) + appl = sm.NewBlockExecutor( + stateStore, p.logger, proxyApp.Consensus(), mock.Mempool{}, sm.EmptyEvidencePool{}, blockStore) err = stateStore.Save(state) require.NoError(t, err) } @@ -488,7 +490,7 @@ func newReactorStore( t *testing.T, genDoc *types.GenesisDoc, privVals []types.PrivValidator, - maxBlockHeight int64) (*store.BlockStore, sm.State, *sm.BlockExecutor) { + maxBlockHeight int64) (*tmstore.BlockStore, sm.State, *sm.BlockExecutor) { t.Helper() require.Len(t, privVals, 1) @@ -501,13 +503,13 @@ func newReactorStore( } stateDB := dbm.NewMemDB() - blockStore := store.NewBlockStore(dbm.NewMemDB()) + blockStore := tmstore.NewBlockStore(dbm.NewMemDB()) stateStore := sm.NewStore(stateDB) state, err := sm.MakeGenesisState(genDoc) require.NoError(t, err) blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), - mock.Mempool{}, sm.EmptyEvidencePool{}) + mock.Mempool{}, sm.EmptyEvidencePool{}, blockStore) err = stateStore.Save(state) require.NoError(t, err) @@ -534,7 +536,7 @@ func newReactorStore( thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} - state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock) + state, err = blockExec.ApplyBlock(state, blockID, thisBlock) require.NoError(t, err) blockStore.SaveBlock(thisBlock, thisParts, lastCommit) diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 2cabc2eb5..7522dda7f 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -78,7 +78,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { require.NoError(t, err) // Make State - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) cs.SetLogger(cs.Logger) // set private validator diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index 073b73596..af8c3ca27 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -416,7 +416,7 @@ func newStateWithConfigAndBlockStore( panic(err) } - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) cs.SetLogger(log.TestingLogger().With("module", "consensus")) cs.SetPrivValidator(pv) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index a83d6aefa..fcd2ae2a2 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -335,7 +335,7 @@ func TestReactorWithEvidence(t *testing.T) { evpool2 := sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2) cs.SetLogger(log.TestingLogger().With("module", "consensus")) cs.SetPrivValidator(pv) diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index 03dc90e5a..9b22f4631 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -473,7 +473,7 @@ func (h *Handshaker) replayBlocks( // We emit events for the index services at the final block due to the sync issue when // the node shutdown during the block committing status. blockExec := sm.NewBlockExecutor( - h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}) + h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store) blockExec.SetEventBus(h.eventBus) appHash, err = sm.ExecCommitBlock( blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) @@ -511,11 +511,11 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap // Use stubs for both mempool and evidence pool since no transactions nor // evidence are needed here - block already exists. - blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}) + blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store) blockExec.SetEventBus(h.eventBus) var err error - state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block) + state, err = blockExec.ApplyBlock(state, meta.BlockID, block) if err != nil { return sm.State{}, err } diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index 2244d868e..ed4007048 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -328,7 +328,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo } mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) consensusState := NewState(csConfig, state.Copy(), blockExec, blockStore, mempool, evpool) diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index ca4ec7e98..f4a1c29bf 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -731,7 +731,7 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod state := genesisState.Copy() // run the chain through state.ApplyBlock to build up the tendermint state - state = buildTMStateFromChain(config, sim.Mempool, sim.Evpool, stateStore, state, chain, nBlocks, mode) + state = buildTMStateFromChain(config, sim.Mempool, sim.Evpool, stateStore, state, chain, nBlocks, mode, store) latestAppHash := state.AppHash // make a new client creator @@ -748,7 +748,7 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod stateStore := sm.NewStore(stateDB1) err := stateStore.Save(genesisState) require.NoError(t, err) - buildAppStateFromChain(proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, nBlocks, mode) + buildAppStateFromChain(proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, nBlocks, mode, store) } // Prune block store if requested @@ -813,12 +813,13 @@ func applyBlock(stateStore sm.Store, evpool sm.EvidencePool, st sm.State, blk *types.Block, - proxyApp proxy.AppConns) sm.State { + proxyApp proxy.AppConns, + blockStore *mockBlockStore) sm.State { testPartSize := types.BlockPartSizeBytes - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) blkID := types.BlockID{Hash: blk.Hash(), PartSetHeader: blk.MakePartSet(testPartSize).Header()} - newState, _, err := blockExec.ApplyBlock(st, blkID, blk) + newState, err := blockExec.ApplyBlock(st, blkID, blk) if err != nil { panic(err) } @@ -832,7 +833,9 @@ func buildAppStateFromChain( evpool sm.EvidencePool, state sm.State, chain []*types.Block, - nBlocks int, mode uint) { + nBlocks int, + mode uint, + blockStore *mockBlockStore) { // start a new app without handshake, play nBlocks blocks if err := proxyApp.Start(); err != nil { panic(err) @@ -853,18 +856,18 @@ func buildAppStateFromChain( case 0: for i := 0; i < nBlocks; i++ { block := chain[i] - state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp) + state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore) } case 1, 2, 3: for i := 0; i < nBlocks-1; i++ { block := chain[i] - state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp) + state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore) } if mode == 2 || mode == 3 { // update the kvstore height and apphash // as if we ran commit but not - state = applyBlock(stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp) + state = applyBlock(stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp, blockStore) } default: panic(fmt.Sprintf("unknown mode %v", mode)) @@ -880,7 +883,8 @@ func buildTMStateFromChain( state sm.State, chain []*types.Block, nBlocks int, - mode uint) sm.State { + mode uint, + blockStore *mockBlockStore) sm.State { // run the whole chain against this client to build up the tendermint state kvstoreApp := kvstore.NewPersistentKVStoreApplication( filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode))) @@ -907,19 +911,19 @@ func buildTMStateFromChain( case 0: // sync right up for _, block := range chain { - state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp) + state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore) } case 1, 2, 3: // sync up to the penultimate as if we stored the block. // whether we commit or not depends on the appHash for _, block := range chain[:len(chain)-1] { - state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp) + state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore) } // apply the final block to a state copy so we can // get the right next appHash but keep the state back - applyBlock(stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp) + applyBlock(stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, blockStore) default: panic(fmt.Sprintf("unknown mode %v", mode)) } diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 560be4d30..e117d7ddb 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -1655,12 +1655,7 @@ func (cs *State) finalizeCommit(height int64) { // Execute and commit the block, update and save the state, and update the mempool. // NOTE The block.AppHash wont reflect these txs until the next block. - var ( - err error - retainHeight int64 - ) - - stateCopy, retainHeight, err = cs.blockExec.ApplyBlock( + stateCopy, err := cs.blockExec.ApplyBlock( stateCopy, types.BlockID{ Hash: block.Hash(), @@ -1675,16 +1670,6 @@ func (cs *State) finalizeCommit(height int64) { fail.Fail() // XXX - // Prune old heights, if requested by ABCI app. - if retainHeight > 0 { - pruned, err := cs.pruneBlocks(retainHeight) - if err != nil { - logger.Error("failed to prune blocks", "retain_height", retainHeight, "err", err) - } else { - logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight) - } - } - // must be called before we update state cs.recordMetrics(height, block) @@ -1708,23 +1693,6 @@ func (cs *State) finalizeCommit(height int64) { // * cs.StartTime is set to when we will start round0. } -func (cs *State) pruneBlocks(retainHeight int64) (uint64, error) { - base := cs.blockStore.Base() - if retainHeight <= base { - return 0, nil - } - pruned, err := cs.blockStore.PruneBlocks(retainHeight) - if err != nil { - return 0, fmt.Errorf("failed to prune block store: %w", err) - } - - err = cs.blockExec.Store().PruneStates(retainHeight) - if err != nil { - return 0, fmt.Errorf("failed to prune state store: %w", err) - } - return pruned, nil -} - func (cs *State) recordMetrics(height int64, block *types.Block) { cs.metrics.Validators.Set(float64(cs.Validators.Size())) cs.metrics.ValidatorsPower.Set(float64(cs.Validators.TotalVotingPower())) diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index b217e4e5b..b7ee90d4d 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -87,7 +87,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { }) mempool := emptyMempool{} evpool := sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState.SetLogger(logger) consensusState.SetEventBus(eventBus) diff --git a/node/node.go b/node/node.go index 37cff9776..3a84fb2b4 100644 --- a/node/node.go +++ b/node/node.go @@ -276,6 +276,7 @@ func makeNode(config *cfg.Config, proxyApp.Consensus(), mp, evPool, + blockStore, sm.BlockExecutorWithMetrics(smMetrics), ) diff --git a/node/node_test.go b/node/node_test.go index 6ab0301b4..741afeabf 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -278,6 +278,7 @@ func TestCreateProposalBlock(t *testing.T) { proxyApp.Consensus(), mp, evidencePool, + blockStore, ) commit := types.NewCommit(height-1, 0, types.BlockID{}, nil) @@ -317,6 +318,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { const height int64 = 1 state, stateDB, _ := state(1, height) stateStore := sm.NewStore(stateDB) + blockStore := store.NewBlockStore(dbm.NewMemDB()) const maxBytes int64 = 16384 const partSize uint32 = 256 state.ConsensusParams.Block.MaxBytes = maxBytes @@ -345,6 +347,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { proxyApp.Consensus(), mp, sm.EmptyEvidencePool{}, + blockStore, ) commit := types.NewCommit(height-1, 0, types.BlockID{}, nil) @@ -376,6 +379,7 @@ func TestMaxProposalBlockSize(t *testing.T) { state, stateDB, _ := state(types.MaxVotesCount, int64(1)) stateStore := sm.NewStore(stateDB) + blockStore := store.NewBlockStore(dbm.NewMemDB()) const maxBytes int64 = 1024 * 1024 * 2 state.ConsensusParams.Block.MaxBytes = maxBytes proposerAddr, _ := state.Validators.GetByIndex(0) @@ -410,6 +414,7 @@ func TestMaxProposalBlockSize(t *testing.T) { proxyApp.Consensus(), mp, sm.EmptyEvidencePool{}, + blockStore, ) blockID := types.BlockID{ diff --git a/state/execution.go b/state/execution.go index 4e7d17c1d..05d5bdd52 100644 --- a/state/execution.go +++ b/state/execution.go @@ -26,6 +26,9 @@ type BlockExecutor struct { // save state, validators, consensus params, abci responses here store Store + // use blockstore for the pruning functions. + blockStore BlockStore + // execute the app against this proxyApp proxy.AppConnConsensus @@ -60,17 +63,19 @@ func NewBlockExecutor( proxyApp proxy.AppConnConsensus, mempool mempl.Mempool, evpool EvidencePool, + blockStore BlockStore, options ...BlockExecutorOption, ) *BlockExecutor { res := &BlockExecutor{ - store: stateStore, - proxyApp: proxyApp, - eventBus: types.NopEventBus{}, - mempool: mempool, - evpool: evpool, - logger: logger, - metrics: NopMetrics(), - cache: make(map[string]struct{}), + store: stateStore, + proxyApp: proxyApp, + eventBus: types.NopEventBus{}, + mempool: mempool, + evpool: evpool, + logger: logger, + metrics: NopMetrics(), + cache: make(map[string]struct{}), + blockStore: blockStore, } for _, option := range options { @@ -139,17 +144,17 @@ func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) e // ApplyBlock validates the block against the state, executes it against the app, // fires the relevant events, commits the app, and saves the new state and responses. -// It returns the new state and the block height to retain (pruning older blocks). +// It returns the new state. // It's the only function that needs to be called // from outside this package to process and commit an entire block. // It takes a blockID to avoid recomputing the parts hash. func (blockExec *BlockExecutor) ApplyBlock( state State, blockID types.BlockID, block *types.Block, -) (State, int64, error) { +) (State, error) { // validate the block if we haven't already if err := blockExec.ValidateBlock(state, block); err != nil { - return state, 0, ErrInvalidBlock(err) + return state, ErrInvalidBlock(err) } startTime := time.Now().UnixNano() @@ -159,14 +164,14 @@ func (blockExec *BlockExecutor) ApplyBlock( endTime := time.Now().UnixNano() blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000) if err != nil { - return state, 0, ErrProxyAppConn(err) + return state, ErrProxyAppConn(err) } fail.Fail() // XXX // Save the results before we commit. if err := blockExec.store.SaveABCIResponses(block.Height, abciResponses); err != nil { - return state, 0, err + return state, err } fail.Fail() // XXX @@ -175,12 +180,12 @@ func (blockExec *BlockExecutor) ApplyBlock( abciValUpdates := abciResponses.EndBlock.ValidatorUpdates err = validateValidatorUpdates(abciValUpdates, state.ConsensusParams.Validator) if err != nil { - return state, 0, fmt.Errorf("error in validator updates: %v", err) + return state, fmt.Errorf("error in validator updates: %v", err) } validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates) if err != nil { - return state, 0, err + return state, err } if len(validatorUpdates) > 0 { blockExec.logger.Debug("updates to validators", "updates", types.ValidatorListString(validatorUpdates)) @@ -189,13 +194,13 @@ func (blockExec *BlockExecutor) ApplyBlock( // Update the state with the block and responses. state, err = updateState(state, blockID, &block.Header, abciResponses, validatorUpdates) if err != nil { - return state, 0, fmt.Errorf("commit failed for application: %v", err) + return state, fmt.Errorf("commit failed for application: %v", err) } // Lock mempool, commit app state, update mempoool. appHash, retainHeight, err := blockExec.Commit(state, block, abciResponses.DeliverTxs) if err != nil { - return state, 0, fmt.Errorf("commit failed for application: %v", err) + return state, fmt.Errorf("commit failed for application: %v", err) } // Update evpool with the latest state. @@ -206,11 +211,21 @@ func (blockExec *BlockExecutor) ApplyBlock( // Update the app hash and save the state. state.AppHash = appHash if err := blockExec.store.Save(state); err != nil { - return state, 0, err + return state, err } fail.Fail() // XXX + // Prune old heights, if requested by ABCI app. + if retainHeight > 0 { + pruned, err := blockExec.pruneBlocks(retainHeight) + if err != nil { + blockExec.logger.Error("failed to prune blocks", "retain_height", retainHeight, "err", err) + } else { + blockExec.logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight) + } + } + // reset the verification cache blockExec.cache = make(map[string]struct{}) @@ -218,7 +233,7 @@ func (blockExec *BlockExecutor) ApplyBlock( // NOTE: if we crash between Commit and Save, events wont be fired during replay fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, abciResponses, validatorUpdates) - return state, retainHeight, nil + return state, nil } // Commit locks the mempool, runs the ABCI Commit message, and updates the @@ -595,3 +610,20 @@ func ExecCommitBlock( // ResponseCommit has no error or log, just data return res.Data, nil } + +func (blockExec *BlockExecutor) pruneBlocks(retainHeight int64) (uint64, error) { + base := blockExec.blockStore.Base() + if retainHeight <= base { + return 0, nil + } + pruned, err := blockExec.blockStore.PruneBlocks(retainHeight) + if err != nil { + return 0, fmt.Errorf("failed to prune block store: %w", err) + } + + err = blockExec.Store().PruneStates(retainHeight) + if err != nil { + return 0, fmt.Errorf("failed to prune state store: %w", err) + } + return pruned, nil +} diff --git a/state/execution_test.go b/state/execution_test.go index e15b1686f..4ccf9ca7d 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -19,9 +19,11 @@ import ( "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/mocks" + "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" "github.com/tendermint/tendermint/version" + dbm "github.com/tendermint/tm-db" ) var ( @@ -40,16 +42,15 @@ func TestApplyBlock(t *testing.T) { state, stateDB, _ := makeState(1, 1) stateStore := sm.NewStore(stateDB) - + blockStore := store.NewBlockStore(dbm.NewMemDB()) blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), - mmock.Mempool{}, sm.EmptyEvidencePool{}) + mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore) block := makeBlock(state, 1) blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()} - state, retainHeight, err := blockExec.ApplyBlock(state, blockID, block) + state, err = blockExec.ApplyBlock(state, blockID, block) require.Nil(t, err) - assert.EqualValues(t, retainHeight, 1) // TODO check state and mempool assert.EqualValues(t, 1, state.Version.Consensus.App, "App version wasn't updated") @@ -196,17 +197,18 @@ func TestBeginBlockByzantineValidators(t *testing.T) { evpool.On("Update", mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return() evpool.On("CheckEvidence", mock.AnythingOfType("types.EvidenceList")).Return(nil) + blockStore := store.NewBlockStore(dbm.NewMemDB()) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), - mmock.Mempool{}, evpool) + mmock.Mempool{}, evpool, blockStore) block := makeBlock(state, 1) block.Evidence = types.EvidenceData{Evidence: ev} block.Header.EvidenceHash = block.Evidence.Hash() blockID = types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()} - state, retainHeight, err := blockExec.ApplyBlock(state, blockID, block) + _, err = blockExec.ApplyBlock(state, blockID, block) require.Nil(t, err) - assert.EqualValues(t, retainHeight, 1) // TODO check state and mempool assert.Equal(t, abciEv, app.ByzantineValidators) @@ -353,6 +355,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { state, stateDB, _ := makeState(1, 1) stateStore := sm.NewStore(stateDB) + blockStore := store.NewBlockStore(dbm.NewMemDB()) blockExec := sm.NewBlockExecutor( stateStore, @@ -360,6 +363,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { proxyApp.Consensus(), mmock.Mempool{}, sm.EmptyEvidencePool{}, + blockStore, ) eventBus := types.NewEventBus() @@ -386,7 +390,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { {PubKey: pk, Power: 10}, } - state, _, err = blockExec.ApplyBlock(state, blockID, block) + state, err = blockExec.ApplyBlock(state, blockID, block) require.Nil(t, err) // test new validator was added to NextValidators if assert.Equal(t, state.Validators.Size()+1, state.NextValidators.Size()) { @@ -424,12 +428,14 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { state, stateDB, _ := makeState(1, 1) stateStore := sm.NewStore(stateDB) + blockStore := store.NewBlockStore(dbm.NewMemDB()) blockExec := sm.NewBlockExecutor( stateStore, log.TestingLogger(), proxyApp.Consensus(), mmock.Mempool{}, sm.EmptyEvidencePool{}, + blockStore, ) block := makeBlock(state, 1) @@ -442,7 +448,7 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { {PubKey: vp, Power: 0}, } - assert.NotPanics(t, func() { state, _, err = blockExec.ApplyBlock(state, blockID, block) }) + assert.NotPanics(t, func() { state, err = blockExec.ApplyBlock(state, blockID, block) }) assert.NotNil(t, err) assert.NotEmpty(t, state.NextValidators.Validators) } diff --git a/state/helpers_test.go b/state/helpers_test.go index 8163345b3..cc28204ac 100644 --- a/state/helpers_test.go +++ b/state/helpers_test.go @@ -62,7 +62,7 @@ func makeAndApplyGoodBlock(state sm.State, height int64, lastCommit *types.Commi } blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: types.PartSetHeader{Total: 3, Hash: tmrand.Bytes(32)}} - state, _, err := blockExec.ApplyBlock(state, blockID, block) + state, err := blockExec.ApplyBlock(state, blockID, block) if err != nil { return state, types.BlockID{}, err } diff --git a/state/validation_test.go b/state/validation_test.go index d124c6bd1..b483f9031 100644 --- a/state/validation_test.go +++ b/state/validation_test.go @@ -18,8 +18,10 @@ import ( tmproto "github.com/tendermint/tendermint/proto/tendermint/types" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/mocks" + "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" + dbm "github.com/tendermint/tm-db" ) const validationTestsStopHeight int64 = 10 @@ -31,12 +33,14 @@ func TestValidateBlockHeader(t *testing.T) { state, stateDB, privVals := makeState(3, 1) stateStore := sm.NewStore(stateDB) + blockStore := store.NewBlockStore(dbm.NewMemDB()) blockExec := sm.NewBlockExecutor( stateStore, log.TestingLogger(), proxyApp.Consensus(), memmock.Mempool{}, sm.EmptyEvidencePool{}, + blockStore, ) lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil) @@ -122,12 +126,14 @@ func TestValidateBlockCommit(t *testing.T) { state, stateDB, privVals := makeState(1, 1) stateStore := sm.NewStore(stateDB) + blockStore := store.NewBlockStore(dbm.NewMemDB()) blockExec := sm.NewBlockExecutor( stateStore, log.TestingLogger(), proxyApp.Consensus(), memmock.Mempool{}, sm.EmptyEvidencePool{}, + blockStore, ) lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil) wrongSigsCommit := types.NewCommit(1, 0, types.BlockID{}, nil) @@ -241,6 +247,7 @@ func TestValidateBlockEvidence(t *testing.T) { state, stateDB, privVals := makeState(4, 1) stateStore := sm.NewStore(stateDB) + blockStore := store.NewBlockStore(dbm.NewMemDB()) defaultEvidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC) evpool := &mocks.EvidencePool{} @@ -256,6 +263,7 @@ func TestValidateBlockEvidence(t *testing.T) { proxyApp.Consensus(), memmock.Mempool{}, evpool, + blockStore, ) lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil)