Browse Source

state: move pruneBlocks from consensus/state to state/execution (#6541)

state: move pruneBlocks function from consensus/state to state/execution

Closes #5414
pull/6568/head
JayT106 4 years ago
committed by GitHub
parent
commit
a456b71f1f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 123 additions and 95 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +1
    -1
      internal/blockchain/v0/reactor.go
  3. +2
    -1
      internal/blockchain/v0/reactor_test.go
  4. +1
    -1
      internal/blockchain/v2/processor_context.go
  5. +1
    -1
      internal/blockchain/v2/reactor.go
  6. +10
    -8
      internal/blockchain/v2/reactor_test.go
  7. +1
    -1
      internal/consensus/byzantine_test.go
  8. +1
    -1
      internal/consensus/common_test.go
  9. +1
    -1
      internal/consensus/reactor_test.go
  10. +3
    -3
      internal/consensus/replay.go
  11. +1
    -1
      internal/consensus/replay_file.go
  12. +17
    -13
      internal/consensus/replay_test.go
  13. +1
    -33
      internal/consensus/state.go
  14. +1
    -1
      internal/consensus/wal_generator.go
  15. +1
    -0
      node/node.go
  16. +5
    -0
      node/node_test.go
  17. +51
    -19
      state/execution.go
  18. +15
    -9
      state/execution_test.go
  19. +1
    -1
      state/helpers_test.go
  20. +8
    -0
      state/validation_test.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -64,6 +64,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- Data Storage - Data Storage
- [store/state/evidence/light] \#5771 Use an order-preserving varint key encoding (@cmwaters) - [store/state/evidence/light] \#5771 Use an order-preserving varint key encoding (@cmwaters)
- [mempool] \#6396 Remove mempool's write ahead log (WAL), (previously unused by the tendermint code). (@tychoish) - [mempool] \#6396 Remove mempool's write ahead log (WAL), (previously unused by the tendermint code). (@tychoish)
- [state] \#6541 Move pruneBlocks from consensus/state to state/execution. (@JayT106)
- Tooling - Tooling
- [tools] \#6498 Set OS home dir to instead of the hardcoded PATH. (@JayT106) - [tools] \#6498 Set OS home dir to instead of the hardcoded PATH. (@JayT106)


+ 1
- 1
internal/blockchain/v0/reactor.go View File

@ -554,7 +554,7 @@ FOR_LOOP:
// TODO: Same thing for app - but we would need a way to get the hash // TODO: Same thing for app - but we would need a way to get the hash
// without persisting the state. // without persisting the state.
state, _, err = r.blockExec.ApplyBlock(state, firstID, first)
state, err = r.blockExec.ApplyBlock(state, firstID, first)
if err != nil { if err != nil {
// TODO: This is bad, are we zombie? // TODO: This is bad, are we zombie?
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))


+ 2
- 1
internal/blockchain/v0/reactor_test.go View File

@ -113,6 +113,7 @@ func (rts *reactorTestSuite) addNode(t *testing.T,
rts.app[nodeID].Consensus(), rts.app[nodeID].Consensus(),
mock.Mempool{}, mock.Mempool{},
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore,
) )
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
@ -143,7 +144,7 @@ func (rts *reactorTestSuite) addNode(t *testing.T,
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
require.NoError(t, err) require.NoError(t, err)
blockStore.SaveBlock(thisBlock, thisParts, lastCommit) blockStore.SaveBlock(thisBlock, thisParts, lastCommit)


+ 1
- 1
internal/blockchain/v2/processor_context.go View File

@ -30,7 +30,7 @@ func newProcessorContext(st blockStore, ex blockApplier, s state.State) *pContex
} }
func (pc *pContext) applyBlock(blockID types.BlockID, block *types.Block) error { func (pc *pContext) applyBlock(blockID types.BlockID, block *types.Block) error {
newState, _, err := pc.applier.ApplyBlock(pc.state, blockID, block)
newState, err := pc.applier.ApplyBlock(pc.state, blockID, block)
pc.state = newState pc.state = newState
return err return err
} }


+ 1
- 1
internal/blockchain/v2/reactor.go View File

@ -50,7 +50,7 @@ type BlockchainReactor struct {
} }
type blockApplier interface { type blockApplier interface {
ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, int64, error)
ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, error)
} }
// XXX: unify naming in this package around tmState // XXX: unify naming in this package around tmState


+ 10
- 8
internal/blockchain/v2/reactor_test.go View File

@ -25,7 +25,7 @@ import (
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
tmstore "github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -84,9 +84,9 @@ type mockBlockApplier struct {
// XXX: Add whitelist/blacklist? // XXX: Add whitelist/blacklist?
func (mba *mockBlockApplier) ApplyBlock( func (mba *mockBlockApplier) ApplyBlock(
state sm.State, blockID types.BlockID, block *types.Block, state sm.State, blockID types.BlockID, block *types.Block,
) (sm.State, int64, error) {
) (sm.State, error) {
state.LastBlockHeight++ state.LastBlockHeight++
return state, 0, nil
return state, nil
} }
type mockSwitchIo struct { type mockSwitchIo struct {
@ -167,7 +167,9 @@ func newTestReactor(t *testing.T, p testReactorParams) *BlockchainReactor {
require.NoError(t, err) require.NoError(t, err)
db := dbm.NewMemDB() db := dbm.NewMemDB()
stateStore := sm.NewStore(db) stateStore := sm.NewStore(db)
appl = sm.NewBlockExecutor(stateStore, p.logger, proxyApp.Consensus(), mock.Mempool{}, sm.EmptyEvidencePool{})
blockStore := tmstore.NewBlockStore(dbm.NewMemDB())
appl = sm.NewBlockExecutor(
stateStore, p.logger, proxyApp.Consensus(), mock.Mempool{}, sm.EmptyEvidencePool{}, blockStore)
err = stateStore.Save(state) err = stateStore.Save(state)
require.NoError(t, err) require.NoError(t, err)
} }
@ -488,7 +490,7 @@ func newReactorStore(
t *testing.T, t *testing.T,
genDoc *types.GenesisDoc, genDoc *types.GenesisDoc,
privVals []types.PrivValidator, privVals []types.PrivValidator,
maxBlockHeight int64) (*store.BlockStore, sm.State, *sm.BlockExecutor) {
maxBlockHeight int64) (*tmstore.BlockStore, sm.State, *sm.BlockExecutor) {
t.Helper() t.Helper()
require.Len(t, privVals, 1) require.Len(t, privVals, 1)
@ -501,13 +503,13 @@ func newReactorStore(
} }
stateDB := dbm.NewMemDB() stateDB := dbm.NewMemDB()
blockStore := store.NewBlockStore(dbm.NewMemDB())
blockStore := tmstore.NewBlockStore(dbm.NewMemDB())
stateStore := sm.NewStore(stateDB) stateStore := sm.NewStore(stateDB)
state, err := sm.MakeGenesisState(genDoc) state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err) require.NoError(t, err)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
mock.Mempool{}, sm.EmptyEvidencePool{})
mock.Mempool{}, sm.EmptyEvidencePool{}, blockStore)
err = stateStore.Save(state) err = stateStore.Save(state)
require.NoError(t, err) require.NoError(t, err)
@ -534,7 +536,7 @@ func newReactorStore(
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
require.NoError(t, err) require.NoError(t, err)
blockStore.SaveBlock(thisBlock, thisParts, lastCommit) blockStore.SaveBlock(thisBlock, thisParts, lastCommit)


+ 1
- 1
internal/consensus/byzantine_test.go View File

@ -78,7 +78,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Make State // Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(cs.Logger) cs.SetLogger(cs.Logger)
// set private validator // set private validator


+ 1
- 1
internal/consensus/common_test.go View File

@ -416,7 +416,7 @@ func newStateWithConfigAndBlockStore(
panic(err) panic(err)
} }
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus")) cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv) cs.SetPrivValidator(pv)


+ 1
- 1
internal/consensus/reactor_test.go View File

@ -335,7 +335,7 @@ func TestReactorWithEvidence(t *testing.T) {
evpool2 := sm.EmptyEvidencePool{} evpool2 := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2) cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs.SetLogger(log.TestingLogger().With("module", "consensus")) cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv) cs.SetPrivValidator(pv)


+ 3
- 3
internal/consensus/replay.go View File

@ -473,7 +473,7 @@ func (h *Handshaker) replayBlocks(
// We emit events for the index services at the final block due to the sync issue when // We emit events for the index services at the final block due to the sync issue when
// the node shutdown during the block committing status. // the node shutdown during the block committing status.
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{})
h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus) blockExec.SetEventBus(h.eventBus)
appHash, err = sm.ExecCommitBlock( appHash, err = sm.ExecCommitBlock(
blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
@ -511,11 +511,11 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap
// Use stubs for both mempool and evidence pool since no transactions nor // Use stubs for both mempool and evidence pool since no transactions nor
// evidence are needed here - block already exists. // evidence are needed here - block already exists.
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{})
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus) blockExec.SetEventBus(h.eventBus)
var err error var err error
state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block)
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)
if err != nil { if err != nil {
return sm.State{}, err return sm.State{}, err
} }


+ 1
- 1
internal/consensus/replay_file.go View File

@ -328,7 +328,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
} }
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(csConfig, state.Copy(), blockExec, consensusState := NewState(csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool) blockStore, mempool, evpool)


+ 17
- 13
internal/consensus/replay_test.go View File

@ -731,7 +731,7 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod
state := genesisState.Copy() state := genesisState.Copy()
// run the chain through state.ApplyBlock to build up the tendermint state // run the chain through state.ApplyBlock to build up the tendermint state
state = buildTMStateFromChain(config, sim.Mempool, sim.Evpool, stateStore, state, chain, nBlocks, mode)
state = buildTMStateFromChain(config, sim.Mempool, sim.Evpool, stateStore, state, chain, nBlocks, mode, store)
latestAppHash := state.AppHash latestAppHash := state.AppHash
// make a new client creator // make a new client creator
@ -748,7 +748,7 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod
stateStore := sm.NewStore(stateDB1) stateStore := sm.NewStore(stateDB1)
err := stateStore.Save(genesisState) err := stateStore.Save(genesisState)
require.NoError(t, err) require.NoError(t, err)
buildAppStateFromChain(proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, nBlocks, mode)
buildAppStateFromChain(proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, nBlocks, mode, store)
} }
// Prune block store if requested // Prune block store if requested
@ -813,12 +813,13 @@ func applyBlock(stateStore sm.Store,
evpool sm.EvidencePool, evpool sm.EvidencePool,
st sm.State, st sm.State,
blk *types.Block, blk *types.Block,
proxyApp proxy.AppConns) sm.State {
proxyApp proxy.AppConns,
blockStore *mockBlockStore) sm.State {
testPartSize := types.BlockPartSizeBytes testPartSize := types.BlockPartSizeBytes
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
blkID := types.BlockID{Hash: blk.Hash(), PartSetHeader: blk.MakePartSet(testPartSize).Header()} blkID := types.BlockID{Hash: blk.Hash(), PartSetHeader: blk.MakePartSet(testPartSize).Header()}
newState, _, err := blockExec.ApplyBlock(st, blkID, blk)
newState, err := blockExec.ApplyBlock(st, blkID, blk)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -832,7 +833,9 @@ func buildAppStateFromChain(
evpool sm.EvidencePool, evpool sm.EvidencePool,
state sm.State, state sm.State,
chain []*types.Block, chain []*types.Block,
nBlocks int, mode uint) {
nBlocks int,
mode uint,
blockStore *mockBlockStore) {
// start a new app without handshake, play nBlocks blocks // start a new app without handshake, play nBlocks blocks
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
panic(err) panic(err)
@ -853,18 +856,18 @@ func buildAppStateFromChain(
case 0: case 0:
for i := 0; i < nBlocks; i++ { for i := 0; i < nBlocks; i++ {
block := chain[i] block := chain[i]
state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp)
state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore)
} }
case 1, 2, 3: case 1, 2, 3:
for i := 0; i < nBlocks-1; i++ { for i := 0; i < nBlocks-1; i++ {
block := chain[i] block := chain[i]
state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp)
state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore)
} }
if mode == 2 || mode == 3 { if mode == 2 || mode == 3 {
// update the kvstore height and apphash // update the kvstore height and apphash
// as if we ran commit but not // as if we ran commit but not
state = applyBlock(stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp)
state = applyBlock(stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp, blockStore)
} }
default: default:
panic(fmt.Sprintf("unknown mode %v", mode)) panic(fmt.Sprintf("unknown mode %v", mode))
@ -880,7 +883,8 @@ func buildTMStateFromChain(
state sm.State, state sm.State,
chain []*types.Block, chain []*types.Block,
nBlocks int, nBlocks int,
mode uint) sm.State {
mode uint,
blockStore *mockBlockStore) sm.State {
// run the whole chain against this client to build up the tendermint state // run the whole chain against this client to build up the tendermint state
kvstoreApp := kvstore.NewPersistentKVStoreApplication( kvstoreApp := kvstore.NewPersistentKVStoreApplication(
filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode))) filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode)))
@ -907,19 +911,19 @@ func buildTMStateFromChain(
case 0: case 0:
// sync right up // sync right up
for _, block := range chain { for _, block := range chain {
state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp)
state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore)
} }
case 1, 2, 3: case 1, 2, 3:
// sync up to the penultimate as if we stored the block. // sync up to the penultimate as if we stored the block.
// whether we commit or not depends on the appHash // whether we commit or not depends on the appHash
for _, block := range chain[:len(chain)-1] { for _, block := range chain[:len(chain)-1] {
state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp)
state = applyBlock(stateStore, mempool, evpool, state, block, proxyApp, blockStore)
} }
// apply the final block to a state copy so we can // apply the final block to a state copy so we can
// get the right next appHash but keep the state back // get the right next appHash but keep the state back
applyBlock(stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp)
applyBlock(stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, blockStore)
default: default:
panic(fmt.Sprintf("unknown mode %v", mode)) panic(fmt.Sprintf("unknown mode %v", mode))
} }


+ 1
- 33
internal/consensus/state.go View File

@ -1655,12 +1655,7 @@ func (cs *State) finalizeCommit(height int64) {
// Execute and commit the block, update and save the state, and update the mempool. // Execute and commit the block, update and save the state, and update the mempool.
// NOTE The block.AppHash wont reflect these txs until the next block. // NOTE The block.AppHash wont reflect these txs until the next block.
var (
err error
retainHeight int64
)
stateCopy, retainHeight, err = cs.blockExec.ApplyBlock(
stateCopy, err := cs.blockExec.ApplyBlock(
stateCopy, stateCopy,
types.BlockID{ types.BlockID{
Hash: block.Hash(), Hash: block.Hash(),
@ -1675,16 +1670,6 @@ func (cs *State) finalizeCommit(height int64) {
fail.Fail() // XXX fail.Fail() // XXX
// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
pruned, err := cs.pruneBlocks(retainHeight)
if err != nil {
logger.Error("failed to prune blocks", "retain_height", retainHeight, "err", err)
} else {
logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
}
}
// must be called before we update state // must be called before we update state
cs.recordMetrics(height, block) cs.recordMetrics(height, block)
@ -1708,23 +1693,6 @@ func (cs *State) finalizeCommit(height int64) {
// * cs.StartTime is set to when we will start round0. // * cs.StartTime is set to when we will start round0.
} }
func (cs *State) pruneBlocks(retainHeight int64) (uint64, error) {
base := cs.blockStore.Base()
if retainHeight <= base {
return 0, nil
}
pruned, err := cs.blockStore.PruneBlocks(retainHeight)
if err != nil {
return 0, fmt.Errorf("failed to prune block store: %w", err)
}
err = cs.blockExec.Store().PruneStates(retainHeight)
if err != nil {
return 0, fmt.Errorf("failed to prune state store: %w", err)
}
return pruned, nil
}
func (cs *State) recordMetrics(height int64, block *types.Block) { func (cs *State) recordMetrics(height int64, block *types.Block) {
cs.metrics.Validators.Set(float64(cs.Validators.Size())) cs.metrics.Validators.Set(float64(cs.Validators.Size()))
cs.metrics.ValidatorsPower.Set(float64(cs.Validators.TotalVotingPower())) cs.metrics.ValidatorsPower.Set(float64(cs.Validators.TotalVotingPower()))


+ 1
- 1
internal/consensus/wal_generator.go View File

@ -87,7 +87,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
}) })
mempool := emptyMempool{} mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{} evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetLogger(logger) consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus) consensusState.SetEventBus(eventBus)


+ 1
- 0
node/node.go View File

@ -276,6 +276,7 @@ func makeNode(config *cfg.Config,
proxyApp.Consensus(), proxyApp.Consensus(),
mp, mp,
evPool, evPool,
blockStore,
sm.BlockExecutorWithMetrics(smMetrics), sm.BlockExecutorWithMetrics(smMetrics),
) )


+ 5
- 0
node/node_test.go View File

@ -278,6 +278,7 @@ func TestCreateProposalBlock(t *testing.T) {
proxyApp.Consensus(), proxyApp.Consensus(),
mp, mp,
evidencePool, evidencePool,
blockStore,
) )
commit := types.NewCommit(height-1, 0, types.BlockID{}, nil) commit := types.NewCommit(height-1, 0, types.BlockID{}, nil)
@ -317,6 +318,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
const height int64 = 1 const height int64 = 1
state, stateDB, _ := state(1, height) state, stateDB, _ := state(1, height)
stateStore := sm.NewStore(stateDB) stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
const maxBytes int64 = 16384 const maxBytes int64 = 16384
const partSize uint32 = 256 const partSize uint32 = 256
state.ConsensusParams.Block.MaxBytes = maxBytes state.ConsensusParams.Block.MaxBytes = maxBytes
@ -345,6 +347,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
proxyApp.Consensus(), proxyApp.Consensus(),
mp, mp,
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore,
) )
commit := types.NewCommit(height-1, 0, types.BlockID{}, nil) commit := types.NewCommit(height-1, 0, types.BlockID{}, nil)
@ -376,6 +379,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
state, stateDB, _ := state(types.MaxVotesCount, int64(1)) state, stateDB, _ := state(types.MaxVotesCount, int64(1))
stateStore := sm.NewStore(stateDB) stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
const maxBytes int64 = 1024 * 1024 * 2 const maxBytes int64 = 1024 * 1024 * 2
state.ConsensusParams.Block.MaxBytes = maxBytes state.ConsensusParams.Block.MaxBytes = maxBytes
proposerAddr, _ := state.Validators.GetByIndex(0) proposerAddr, _ := state.Validators.GetByIndex(0)
@ -410,6 +414,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
proxyApp.Consensus(), proxyApp.Consensus(),
mp, mp,
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore,
) )
blockID := types.BlockID{ blockID := types.BlockID{


+ 51
- 19
state/execution.go View File

@ -26,6 +26,9 @@ type BlockExecutor struct {
// save state, validators, consensus params, abci responses here // save state, validators, consensus params, abci responses here
store Store store Store
// use blockstore for the pruning functions.
blockStore BlockStore
// execute the app against this // execute the app against this
proxyApp proxy.AppConnConsensus proxyApp proxy.AppConnConsensus
@ -60,17 +63,19 @@ func NewBlockExecutor(
proxyApp proxy.AppConnConsensus, proxyApp proxy.AppConnConsensus,
mempool mempl.Mempool, mempool mempl.Mempool,
evpool EvidencePool, evpool EvidencePool,
blockStore BlockStore,
options ...BlockExecutorOption, options ...BlockExecutorOption,
) *BlockExecutor { ) *BlockExecutor {
res := &BlockExecutor{ res := &BlockExecutor{
store: stateStore,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
metrics: NopMetrics(),
cache: make(map[string]struct{}),
store: stateStore,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
metrics: NopMetrics(),
cache: make(map[string]struct{}),
blockStore: blockStore,
} }
for _, option := range options { for _, option := range options {
@ -139,17 +144,17 @@ func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) e
// ApplyBlock validates the block against the state, executes it against the app, // ApplyBlock validates the block against the state, executes it against the app,
// fires the relevant events, commits the app, and saves the new state and responses. // fires the relevant events, commits the app, and saves the new state and responses.
// It returns the new state and the block height to retain (pruning older blocks).
// It returns the new state.
// It's the only function that needs to be called // It's the only function that needs to be called
// from outside this package to process and commit an entire block. // from outside this package to process and commit an entire block.
// It takes a blockID to avoid recomputing the parts hash. // It takes a blockID to avoid recomputing the parts hash.
func (blockExec *BlockExecutor) ApplyBlock( func (blockExec *BlockExecutor) ApplyBlock(
state State, blockID types.BlockID, block *types.Block, state State, blockID types.BlockID, block *types.Block,
) (State, int64, error) {
) (State, error) {
// validate the block if we haven't already // validate the block if we haven't already
if err := blockExec.ValidateBlock(state, block); err != nil { if err := blockExec.ValidateBlock(state, block); err != nil {
return state, 0, ErrInvalidBlock(err)
return state, ErrInvalidBlock(err)
} }
startTime := time.Now().UnixNano() startTime := time.Now().UnixNano()
@ -159,14 +164,14 @@ func (blockExec *BlockExecutor) ApplyBlock(
endTime := time.Now().UnixNano() endTime := time.Now().UnixNano()
blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000) blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
if err != nil { if err != nil {
return state, 0, ErrProxyAppConn(err)
return state, ErrProxyAppConn(err)
} }
fail.Fail() // XXX fail.Fail() // XXX
// Save the results before we commit. // Save the results before we commit.
if err := blockExec.store.SaveABCIResponses(block.Height, abciResponses); err != nil { if err := blockExec.store.SaveABCIResponses(block.Height, abciResponses); err != nil {
return state, 0, err
return state, err
} }
fail.Fail() // XXX fail.Fail() // XXX
@ -175,12 +180,12 @@ func (blockExec *BlockExecutor) ApplyBlock(
abciValUpdates := abciResponses.EndBlock.ValidatorUpdates abciValUpdates := abciResponses.EndBlock.ValidatorUpdates
err = validateValidatorUpdates(abciValUpdates, state.ConsensusParams.Validator) err = validateValidatorUpdates(abciValUpdates, state.ConsensusParams.Validator)
if err != nil { if err != nil {
return state, 0, fmt.Errorf("error in validator updates: %v", err)
return state, fmt.Errorf("error in validator updates: %v", err)
} }
validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates) validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates)
if err != nil { if err != nil {
return state, 0, err
return state, err
} }
if len(validatorUpdates) > 0 { if len(validatorUpdates) > 0 {
blockExec.logger.Debug("updates to validators", "updates", types.ValidatorListString(validatorUpdates)) blockExec.logger.Debug("updates to validators", "updates", types.ValidatorListString(validatorUpdates))
@ -189,13 +194,13 @@ func (blockExec *BlockExecutor) ApplyBlock(
// Update the state with the block and responses. // Update the state with the block and responses.
state, err = updateState(state, blockID, &block.Header, abciResponses, validatorUpdates) state, err = updateState(state, blockID, &block.Header, abciResponses, validatorUpdates)
if err != nil { if err != nil {
return state, 0, fmt.Errorf("commit failed for application: %v", err)
return state, fmt.Errorf("commit failed for application: %v", err)
} }
// Lock mempool, commit app state, update mempoool. // Lock mempool, commit app state, update mempoool.
appHash, retainHeight, err := blockExec.Commit(state, block, abciResponses.DeliverTxs) appHash, retainHeight, err := blockExec.Commit(state, block, abciResponses.DeliverTxs)
if err != nil { if err != nil {
return state, 0, fmt.Errorf("commit failed for application: %v", err)
return state, fmt.Errorf("commit failed for application: %v", err)
} }
// Update evpool with the latest state. // Update evpool with the latest state.
@ -206,11 +211,21 @@ func (blockExec *BlockExecutor) ApplyBlock(
// Update the app hash and save the state. // Update the app hash and save the state.
state.AppHash = appHash state.AppHash = appHash
if err := blockExec.store.Save(state); err != nil { if err := blockExec.store.Save(state); err != nil {
return state, 0, err
return state, err
} }
fail.Fail() // XXX fail.Fail() // XXX
// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
pruned, err := blockExec.pruneBlocks(retainHeight)
if err != nil {
blockExec.logger.Error("failed to prune blocks", "retain_height", retainHeight, "err", err)
} else {
blockExec.logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
}
}
// reset the verification cache // reset the verification cache
blockExec.cache = make(map[string]struct{}) blockExec.cache = make(map[string]struct{})
@ -218,7 +233,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
// NOTE: if we crash between Commit and Save, events wont be fired during replay // NOTE: if we crash between Commit and Save, events wont be fired during replay
fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, abciResponses, validatorUpdates) fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, abciResponses, validatorUpdates)
return state, retainHeight, nil
return state, nil
} }
// Commit locks the mempool, runs the ABCI Commit message, and updates the // Commit locks the mempool, runs the ABCI Commit message, and updates the
@ -595,3 +610,20 @@ func ExecCommitBlock(
// ResponseCommit has no error or log, just data // ResponseCommit has no error or log, just data
return res.Data, nil return res.Data, nil
} }
func (blockExec *BlockExecutor) pruneBlocks(retainHeight int64) (uint64, error) {
base := blockExec.blockStore.Base()
if retainHeight <= base {
return 0, nil
}
pruned, err := blockExec.blockStore.PruneBlocks(retainHeight)
if err != nil {
return 0, fmt.Errorf("failed to prune block store: %w", err)
}
err = blockExec.Store().PruneStates(retainHeight)
if err != nil {
return 0, fmt.Errorf("failed to prune state store: %w", err)
}
return pruned, nil
}

+ 15
- 9
state/execution_test.go View File

@ -19,9 +19,11 @@ import (
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/mocks" "github.com/tendermint/tendermint/state/mocks"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time" tmtime "github.com/tendermint/tendermint/types/time"
"github.com/tendermint/tendermint/version" "github.com/tendermint/tendermint/version"
dbm "github.com/tendermint/tm-db"
) )
var ( var (
@ -40,16 +42,15 @@ func TestApplyBlock(t *testing.T) {
state, stateDB, _ := makeState(1, 1) state, stateDB, _ := makeState(1, 1)
stateStore := sm.NewStore(stateDB) stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
mmock.Mempool{}, sm.EmptyEvidencePool{})
mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore)
block := makeBlock(state, 1) block := makeBlock(state, 1)
blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()} blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()}
state, retainHeight, err := blockExec.ApplyBlock(state, blockID, block)
state, err = blockExec.ApplyBlock(state, blockID, block)
require.Nil(t, err) require.Nil(t, err)
assert.EqualValues(t, retainHeight, 1)
// TODO check state and mempool // TODO check state and mempool
assert.EqualValues(t, 1, state.Version.Consensus.App, "App version wasn't updated") assert.EqualValues(t, 1, state.Version.Consensus.App, "App version wasn't updated")
@ -196,17 +197,18 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
evpool.On("Update", mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return() evpool.On("Update", mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return()
evpool.On("CheckEvidence", mock.AnythingOfType("types.EvidenceList")).Return(nil) evpool.On("CheckEvidence", mock.AnythingOfType("types.EvidenceList")).Return(nil)
blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
mmock.Mempool{}, evpool)
mmock.Mempool{}, evpool, blockStore)
block := makeBlock(state, 1) block := makeBlock(state, 1)
block.Evidence = types.EvidenceData{Evidence: ev} block.Evidence = types.EvidenceData{Evidence: ev}
block.Header.EvidenceHash = block.Evidence.Hash() block.Header.EvidenceHash = block.Evidence.Hash()
blockID = types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()} blockID = types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()}
state, retainHeight, err := blockExec.ApplyBlock(state, blockID, block)
_, err = blockExec.ApplyBlock(state, blockID, block)
require.Nil(t, err) require.Nil(t, err)
assert.EqualValues(t, retainHeight, 1)
// TODO check state and mempool // TODO check state and mempool
assert.Equal(t, abciEv, app.ByzantineValidators) assert.Equal(t, abciEv, app.ByzantineValidators)
@ -353,6 +355,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
state, stateDB, _ := makeState(1, 1) state, stateDB, _ := makeState(1, 1)
stateStore := sm.NewStore(stateDB) stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
@ -360,6 +363,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
proxyApp.Consensus(), proxyApp.Consensus(),
mmock.Mempool{}, mmock.Mempool{},
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore,
) )
eventBus := types.NewEventBus() eventBus := types.NewEventBus()
@ -386,7 +390,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
{PubKey: pk, Power: 10}, {PubKey: pk, Power: 10},
} }
state, _, err = blockExec.ApplyBlock(state, blockID, block)
state, err = blockExec.ApplyBlock(state, blockID, block)
require.Nil(t, err) require.Nil(t, err)
// test new validator was added to NextValidators // test new validator was added to NextValidators
if assert.Equal(t, state.Validators.Size()+1, state.NextValidators.Size()) { if assert.Equal(t, state.Validators.Size()+1, state.NextValidators.Size()) {
@ -424,12 +428,14 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
state, stateDB, _ := makeState(1, 1) state, stateDB, _ := makeState(1, 1)
stateStore := sm.NewStore(stateDB) stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
log.TestingLogger(), log.TestingLogger(),
proxyApp.Consensus(), proxyApp.Consensus(),
mmock.Mempool{}, mmock.Mempool{},
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore,
) )
block := makeBlock(state, 1) block := makeBlock(state, 1)
@ -442,7 +448,7 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
{PubKey: vp, Power: 0}, {PubKey: vp, Power: 0},
} }
assert.NotPanics(t, func() { state, _, err = blockExec.ApplyBlock(state, blockID, block) })
assert.NotPanics(t, func() { state, err = blockExec.ApplyBlock(state, blockID, block) })
assert.NotNil(t, err) assert.NotNil(t, err)
assert.NotEmpty(t, state.NextValidators.Validators) assert.NotEmpty(t, state.NextValidators.Validators)
} }


+ 1
- 1
state/helpers_test.go View File

@ -62,7 +62,7 @@ func makeAndApplyGoodBlock(state sm.State, height int64, lastCommit *types.Commi
} }
blockID := types.BlockID{Hash: block.Hash(), blockID := types.BlockID{Hash: block.Hash(),
PartSetHeader: types.PartSetHeader{Total: 3, Hash: tmrand.Bytes(32)}} PartSetHeader: types.PartSetHeader{Total: 3, Hash: tmrand.Bytes(32)}}
state, _, err := blockExec.ApplyBlock(state, blockID, block)
state, err := blockExec.ApplyBlock(state, blockID, block)
if err != nil { if err != nil {
return state, types.BlockID{}, err return state, types.BlockID{}, err
} }


+ 8
- 0
state/validation_test.go View File

@ -18,8 +18,10 @@ import (
tmproto "github.com/tendermint/tendermint/proto/tendermint/types" tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/mocks" "github.com/tendermint/tendermint/state/mocks"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time" tmtime "github.com/tendermint/tendermint/types/time"
dbm "github.com/tendermint/tm-db"
) )
const validationTestsStopHeight int64 = 10 const validationTestsStopHeight int64 = 10
@ -31,12 +33,14 @@ func TestValidateBlockHeader(t *testing.T) {
state, stateDB, privVals := makeState(3, 1) state, stateDB, privVals := makeState(3, 1)
stateStore := sm.NewStore(stateDB) stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
log.TestingLogger(), log.TestingLogger(),
proxyApp.Consensus(), proxyApp.Consensus(),
memmock.Mempool{}, memmock.Mempool{},
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore,
) )
lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil) lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil)
@ -122,12 +126,14 @@ func TestValidateBlockCommit(t *testing.T) {
state, stateDB, privVals := makeState(1, 1) state, stateDB, privVals := makeState(1, 1)
stateStore := sm.NewStore(stateDB) stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
log.TestingLogger(), log.TestingLogger(),
proxyApp.Consensus(), proxyApp.Consensus(),
memmock.Mempool{}, memmock.Mempool{},
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore,
) )
lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil) lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil)
wrongSigsCommit := types.NewCommit(1, 0, types.BlockID{}, nil) wrongSigsCommit := types.NewCommit(1, 0, types.BlockID{}, nil)
@ -241,6 +247,7 @@ func TestValidateBlockEvidence(t *testing.T) {
state, stateDB, privVals := makeState(4, 1) state, stateDB, privVals := makeState(4, 1)
stateStore := sm.NewStore(stateDB) stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
defaultEvidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC) defaultEvidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
evpool := &mocks.EvidencePool{} evpool := &mocks.EvidencePool{}
@ -256,6 +263,7 @@ func TestValidateBlockEvidence(t *testing.T) {
proxyApp.Consensus(), proxyApp.Consensus(),
memmock.Mempool{}, memmock.Mempool{},
evpool, evpool,
blockStore,
) )
lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil) lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil)


Loading…
Cancel
Save