Browse Source

state: prune states using an iterator (#5864)

pull/5881/head
Callum Waters 4 years ago
committed by GitHub
parent
commit
03a6fb2777
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 274 additions and 149 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +1
    -1
      consensus/common_test.go
  3. +1
    -1
      consensus/replay_test.go
  4. +3
    -2
      consensus/state.go
  5. +0
    -11
      state/export_test.go
  6. +33
    -0
      state/helpers_test.go
  7. +1
    -1
      state/mocks/evidence_pool.go
  8. +6
    -6
      state/mocks/store.go
  9. +129
    -102
      state/store.go
  10. +97
    -24
      state/store_test.go
  11. +2
    -1
      test/maverick/consensus/state.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -36,6 +36,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [libs/os] `EnsureDir` now propagates IO errors and checks the file type (@erikgrinaker)
- [libs/os] Kill() and {Must,}{Read,Write}File() functions have been removed. (@alessio)
- [store] \#5848 Remove block store state in favor of using the db iterators directly (@cmwaters)
- [state] \#5864 Use an iterator when pruning state (@cmwaters)
- Blockchain Protocol


+ 1
- 1
consensus/common_test.go View File

@ -389,7 +389,7 @@ func newStateWithConfigAndBlockStore(
evpool := sm.EmptyEvidencePool{}
// Make State
stateDB := blockDB
stateDB := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB)
if err := stateStore.Save(state); err != nil { // for save height 1's validators info
panic(err)


+ 1
- 1
consensus/replay_test.go View File

@ -158,7 +158,7 @@ LOOP:
// create consensus state from a clean slate
logger := log.NewNopLogger()
blockDB := dbm.NewMemDB()
stateDB := blockDB
stateDB := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB)
state, err := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile())
require.NoError(t, err)


+ 3
- 2
consensus/state.go View File

@ -1619,9 +1619,10 @@ func (cs *State) pruneBlocks(retainHeight int64) (uint64, error) {
if err != nil {
return 0, fmt.Errorf("failed to prune block store: %w", err)
}
err = cs.blockExec.Store().PruneStates(base, retainHeight)
err = cs.blockExec.Store().PruneStates(retainHeight)
if err != nil {
return 0, fmt.Errorf("failed to prune state database: %w", err)
return 0, fmt.Errorf("failed to prune state store: %w", err)
}
return pruned, nil
}


+ 0
- 11
state/export_test.go View File

@ -1,8 +1,6 @@
package state
import (
dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
@ -20,8 +18,6 @@ import (
// easily testable from outside of the package.
//
const ValSetCheckpointInterval = valSetCheckpointInterval
// UpdateState is an alias for updateState exported from execution.go,
// exclusively and explicitly for testing.
func UpdateState(
@ -39,10 +35,3 @@ func UpdateState(
func ValidateValidatorUpdates(abciUpdates []abci.ValidatorUpdate, params tmproto.ValidatorParams) error {
return validateValidatorUpdates(abciUpdates, params)
}
// SaveValidatorsInfo is an alias for the private saveValidatorsInfo method in
// store.go, exported exclusively and explicitly for testing.
func SaveValidatorsInfo(db dbm.DB, height, lastHeightChanged int64, valSet *types.ValidatorSet) error {
stateStore := dbStore{db}
return stateStore.saveValidatorsInfo(height, lastHeightChanged, valSet)
}

+ 33
- 0
state/helpers_test.go View File

@ -228,6 +228,39 @@ func randomGenesisDoc() *types.GenesisDoc {
}
}
// used for testing by state store
func makeRandomStateFromValidatorSet(
lastValSet *types.ValidatorSet,
height, lastHeightValidatorsChanged int64,
) sm.State {
return sm.State{
LastBlockHeight: height - 1,
NextValidators: lastValSet.CopyIncrementProposerPriority(2),
Validators: lastValSet.CopyIncrementProposerPriority(1),
LastValidators: lastValSet.Copy(),
LastHeightConsensusParamsChanged: height,
ConsensusParams: *types.DefaultConsensusParams(),
LastHeightValidatorsChanged: lastHeightValidatorsChanged,
InitialHeight: 1,
}
}
func makeRandomStateFromConsensusParams(consensusParams *tmproto.ConsensusParams,
height, lastHeightConsensusParamsChanged int64) sm.State {
val, _ := types.RandValidator(true, 10)
valSet := types.NewValidatorSet([]*types.Validator{val})
return sm.State{
LastBlockHeight: height - 1,
ConsensusParams: *consensusParams,
LastHeightConsensusParamsChanged: lastHeightConsensusParamsChanged,
NextValidators: valSet.CopyIncrementProposerPriority(2),
Validators: valSet.CopyIncrementProposerPriority(1),
LastValidators: valSet.Copy(),
LastHeightValidatorsChanged: height,
InitialHeight: 1,
}
}
//----------------------------------------------------------------------------
type testApp struct {


+ 1
- 1
state/mocks/evidence_pool.go View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.1.0. DO NOT EDIT.
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks


+ 6
- 6
state/mocks/store.go View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.1.0. DO NOT EDIT.
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
@ -162,13 +162,13 @@ func (_m *Store) LoadValidators(_a0 int64) (*tenderminttypes.ValidatorSet, error
return r0, r1
}
// PruneStates provides a mock function with given fields: _a0, _a1
func (_m *Store) PruneStates(_a0 int64, _a1 int64) error {
ret := _m.Called(_a0, _a1)
// PruneStates provides a mock function with given fields: _a0
func (_m *Store) PruneStates(_a0 int64) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(int64, int64) error); ok {
r0 = rf(_a0, _a1)
if rf, ok := ret.Get(0).(func(int64) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}


+ 129
- 102
state/store.go View File

@ -1,6 +1,7 @@
package state
import (
"bytes"
"errors"
"fmt"
@ -82,8 +83,8 @@ type Store interface {
SaveABCIResponses(int64, *tmstate.ABCIResponses) error
// Bootstrap is used for bootstrapping state when not starting from a initial height.
Bootstrap(State) error
// PruneStates takes the height from which to start prning and which height stop at
PruneStates(int64, int64) error
// PruneStates takes the height from which to prune up to (exclusive)
PruneStates(int64) error
}
// dbStore wraps a db (github.com/tendermint/tm-db)
@ -228,132 +229,158 @@ func (store dbStore) Bootstrap(state State) error {
return store.db.SetSync(stateKey, state.Bytes())
}
// PruneStates deletes states between the given heights (including from, excluding to). It is not
// PruneStates deletes states up to the height specified (exclusive). It is not
// guaranteed to delete all states, since the last checkpointed state and states being pointed to by
// e.g. `LastHeightChanged` must remain. The state at to must also exist.
//
// The from parameter is necessary since we can't do a key scan in a performant way due to the key
// encoding not preserving ordering: https://github.com/tendermint/tendermint/issues/4567
// This will cause some old states to be left behind when doing incremental partial prunes,
// specifically older checkpoints and LastHeightChanged targets.
func (store dbStore) PruneStates(from int64, to int64) error {
if from <= 0 || to <= 0 {
return fmt.Errorf("from height %v and to height %v must be greater than 0", from, to)
}
if from >= to {
return fmt.Errorf("from height %v must be lower than to height %v", from, to)
}
valInfo, err := loadValidatorsInfo(store.db, to)
if err != nil {
return fmt.Errorf("validators at height %v not found: %w", to, err)
// e.g. `LastHeightChanged` must remain. The state at retain height must also exist.
// Pruning is done in descending order.
func (store dbStore) PruneStates(retainHeight int64) error {
if retainHeight <= 0 {
return fmt.Errorf("height %v must be greater than 0", retainHeight)
}
paramsInfo, err := store.loadConsensusParamsInfo(to)
if err != nil {
return fmt.Errorf("consensus params at height %v not found: %w", to, err)
if err := store.pruneValidatorSets(retainHeight); err != nil {
return err
}
keepVals := make(map[int64]bool)
if valInfo.ValidatorSet == nil {
keepVals[valInfo.LastHeightChanged] = true
keepVals[lastStoredHeightFor(to, valInfo.LastHeightChanged)] = true // keep last checkpoint too
if err := store.pruneConsensusParams(retainHeight); err != nil {
return err
}
if err := store.pruneABCIResponses(retainHeight); err != nil {
return err
}
return nil
}
// pruneValidatorSets calls a reverse iterator from base height to retain height (exclusive), deleting
// all validator sets in between. Due to the fact that most validator sets stored reference an earlier
// validator set, it is likely that there will remain one validator set left after pruning.
func (store dbStore) pruneValidatorSets(height int64) error {
valInfo, err := loadValidatorsInfo(store.db, height)
if err != nil {
return fmt.Errorf("validators at height %v not found: %w", height, err)
}
// We will prune up to the validator set at the given "height". As we don't save validator sets every
// height but only when they change or at a check point, it is likely that the validator set at the height
// we prune to is empty and thus dependent on the validator set saved at a previous height. We must find
// that validator set and make sure it is not pruned.
lastRecordedValSetHeight := lastStoredHeightFor(height, valInfo.LastHeightChanged)
lastRecordedValSet, err := loadValidatorsInfo(store.db, lastRecordedValSetHeight)
if err != nil || lastRecordedValSet.ValidatorSet == nil {
return fmt.Errorf("couldn't find validators at height %d (height %d was originally requested): %w",
lastStoredHeightFor(height, valInfo.LastHeightChanged),
height,
err,
)
}
// batch delete all the validators sets up to height
return store.batchDelete(
validatorsKey(1),
validatorsKey(height),
validatorsKey(lastRecordedValSetHeight),
)
}
// pruneConsensusParams calls a reverse iterator from base height to retain height batch deleting
// all consensus params in between. If the consensus params at the new base height is dependent
// on a prior height then this will keep that lower height to.
func (store dbStore) pruneConsensusParams(retainHeight int64) error {
paramsInfo, err := store.loadConsensusParamsInfo(retainHeight)
if err != nil {
return fmt.Errorf("consensus params at height %v not found: %w", retainHeight, err)
}
keepParams := make(map[int64]bool)
// As we don't save the consensus params at every height, only when there is a consensus params change,
// we must not prune (or save) the last consensus params that the consensus params info at height
// is dependent on.
if paramsInfo.ConsensusParams.Equal(&tmproto.ConsensusParams{}) {
keepParams[paramsInfo.LastHeightChanged] = true
lastRecordedConsensusParams, err := store.loadConsensusParamsInfo(paramsInfo.LastHeightChanged)
if err != nil || lastRecordedConsensusParams.ConsensusParams.Equal(&tmproto.ConsensusParams{}) {
return fmt.Errorf(
"couldn't find consensus params at height %d as last changed from height %d: %w",
paramsInfo.LastHeightChanged,
retainHeight,
err,
)
}
}
// batch delete all the consensus params up to the retain height
return store.batchDelete(
consensusParamsKey(1),
consensusParamsKey(retainHeight),
consensusParamsKey(paramsInfo.LastHeightChanged),
)
}
// pruneABCIResponses calls a reverse iterator from base height to retain height batch deleting
// all abci responses in between
func (store dbStore) pruneABCIResponses(height int64) error {
return store.batchDelete(abciResponsesKey(1), abciResponsesKey(height), nil)
}
// batchDelete is a generic function for deleting a range of keys in reverse order. It will
// skip keys that have been
func (store dbStore) batchDelete(start []byte, end []byte, exception []byte) error {
iter, err := store.db.ReverseIterator(start, end)
if err != nil {
return fmt.Errorf("iterator error: %w", err)
}
defer iter.Close()
batch := store.db.NewBatch()
defer batch.Close()
pruned := uint64(0)
// We have to delete in reverse order, to avoid deleting previous heights that have validator
// sets and consensus params that we may need to retrieve.
for h := to - 1; h >= from; h-- {
// For heights we keep, we must make sure they have the full validator set or consensus
// params, otherwise they will panic if they're retrieved directly (instead of
// indirectly via a LastHeightChanged pointer).
if keepVals[h] {
v, err := loadValidatorsInfo(store.db, h)
if err != nil || v.ValidatorSet == nil {
vip, err := store.LoadValidators(h)
if err != nil {
return err
}
pvi, err := vip.ToProto()
if err != nil {
return err
}
v.ValidatorSet = pvi
v.LastHeightChanged = h
bz, err := v.Marshal()
if err != nil {
return err
}
err = batch.Set(validatorsKey(h), bz)
if err != nil {
return err
}
}
} else {
err = batch.Delete(validatorsKey(h))
if err != nil {
return err
}
pruned := 0
for iter.Valid() {
key := iter.Key()
if bytes.Equal(key, exception) {
iter.Next()
continue
}
if keepParams[h] {
p, err := store.loadConsensusParamsInfo(h)
if err != nil {
if err := batch.Delete(key); err != nil {
return fmt.Errorf("pruning error at key %X: %w", key, err)
}
pruned++
// avoid batches growing too large by flushing to disk regularly
if pruned%1000 == 0 {
if err := iter.Error(); err != nil {
return err
}
if err := iter.Close(); err != nil {
return err
}
if p.ConsensusParams.Equal(&tmproto.ConsensusParams{}) {
p.ConsensusParams, err = store.LoadConsensusParams(h)
if err != nil {
return err
}
p.LastHeightChanged = h
bz, err := p.Marshal()
if err != nil {
return err
}
err = batch.Set(consensusParamsKey(h), bz)
if err != nil {
return err
}
if err := batch.Write(); err != nil {
return fmt.Errorf("pruning error at key %X: %w", key, err)
}
} else {
err = batch.Delete(consensusParamsKey(h))
if err != nil {
if err := batch.Close(); err != nil {
return err
}
}
err = batch.Delete(abciResponsesKey(h))
if err != nil {
return err
}
pruned++
// avoid batches growing too large by flushing to database regularly
if pruned%1000 == 0 && pruned > 0 {
err := batch.Write()
iter, err = store.db.ReverseIterator(start, end)
if err != nil {
return err
return fmt.Errorf("iterator error: %w", err)
}
batch.Close()
defer iter.Close()
batch = store.db.NewBatch()
defer batch.Close()
} else {
iter.Next()
}
}
err = batch.WriteSync()
if err != nil {
if err := iter.Error(); err != nil {
return fmt.Errorf("iterator error: %w", err)
}
if err := batch.WriteSync(); err != nil {
return err
}


+ 97
- 24
state/store_test.go View File

@ -21,31 +21,84 @@ import (
"github.com/tendermint/tendermint/types"
)
const (
// make sure this is the same as in state/store.go
valSetCheckpointInterval = 100000
)
func TestStoreBootstrap(t *testing.T) {
stateDB := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB)
val, _ := types.RandValidator(true, 10)
val2, _ := types.RandValidator(true, 10)
val3, _ := types.RandValidator(true, 10)
vals := types.NewValidatorSet([]*types.Validator{val, val2, val3})
bootstrapState := makeRandomStateFromValidatorSet(vals, 100, 100)
err := stateStore.Bootstrap(bootstrapState)
require.NoError(t, err)
// bootstrap should also save the previous validator
_, err = stateStore.LoadValidators(99)
require.NoError(t, err)
_, err = stateStore.LoadValidators(100)
require.NoError(t, err)
_, err = stateStore.LoadValidators(101)
require.NoError(t, err)
state, err := stateStore.Load()
require.NoError(t, err)
require.Equal(t, bootstrapState, state)
}
func TestStoreLoadValidators(t *testing.T) {
stateDB := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB)
val, _ := types.RandValidator(true, 10)
vals := types.NewValidatorSet([]*types.Validator{val})
val2, _ := types.RandValidator(true, 10)
val3, _ := types.RandValidator(true, 10)
vals := types.NewValidatorSet([]*types.Validator{val, val2, val3})
// 1) LoadValidators loads validators using a height where they were last changed
err := sm.SaveValidatorsInfo(stateDB, 1, 1, vals)
// Note that only the next validators at height h + 1 are saved
err := stateStore.Save(makeRandomStateFromValidatorSet(vals, 1, 1))
require.NoError(t, err)
err = sm.SaveValidatorsInfo(stateDB, 2, 1, vals)
err = stateStore.Save(makeRandomStateFromValidatorSet(vals.CopyIncrementProposerPriority(1), 2, 1))
require.NoError(t, err)
loadedVals, err := stateStore.LoadValidators(2)
loadedVals, err := stateStore.LoadValidators(3)
require.NoError(t, err)
assert.NotZero(t, loadedVals.Size())
require.Equal(t, vals.CopyIncrementProposerPriority(3), loadedVals)
// 2) LoadValidators loads validators using a checkpoint height
err = sm.SaveValidatorsInfo(stateDB, sm.ValSetCheckpointInterval, 1, vals)
// add a validator set at the checkpoint
err = stateStore.Save(makeRandomStateFromValidatorSet(vals, valSetCheckpointInterval, 1))
require.NoError(t, err)
loadedVals, err = stateStore.LoadValidators(sm.ValSetCheckpointInterval)
// check that a request will go back to the last checkpoint
_, err = stateStore.LoadValidators(valSetCheckpointInterval + 1)
require.Error(t, err)
require.Equal(t, fmt.Sprintf("couldn't find validators at height %d (height %d was originally requested): "+
"value retrieved from db is empty",
valSetCheckpointInterval, valSetCheckpointInterval+1), err.Error())
// now save a validator set at that checkpoint
err = stateStore.Save(makeRandomStateFromValidatorSet(vals, valSetCheckpointInterval-1, 1))
require.NoError(t, err)
assert.NotZero(t, loadedVals.Size())
loadedVals, err = stateStore.LoadValidators(valSetCheckpointInterval)
require.NoError(t, err)
// validator set gets updated with the one given hence we expect it to equal next validators (with an increment of one)
// as opposed to being equal to an increment of 100000 - 1 (if we didn't save at the checkpoint)
require.Equal(t, vals.CopyIncrementProposerPriority(2), loadedVals)
require.NotEqual(t, vals.CopyIncrementProposerPriority(valSetCheckpointInterval), loadedVals)
}
// This benchmarks the speed of loading validators from different heights if there is no validator set change.
// NOTE: This isn't too indicative of validator retrieval speed as the db is always (regardless of height) only
// performing two operations: 1) retrieve validator info at height x, which has a last validator set change of 1
// and 2) retrieve the validator set at the aforementioned height 1.
func BenchmarkLoadValidators(b *testing.B) {
const valSetSize = 100
@ -67,9 +120,10 @@ func BenchmarkLoadValidators(b *testing.B) {
for i := 10; i < 10000000000; i *= 10 { // 10, 100, 1000, ...
i := i
if err := sm.SaveValidatorsInfo(stateDB,
int64(i), state.LastHeightValidatorsChanged, state.NextValidators); err != nil {
b.Fatal(err)
err = stateStore.Save(makeRandomStateFromValidatorSet(state.NextValidators,
int64(i)-1, state.LastHeightValidatorsChanged))
if err != nil {
b.Fatalf("error saving store: %v", err)
}
b.Run(fmt.Sprintf("height=%d", i), func(b *testing.B) {
@ -83,25 +137,44 @@ func BenchmarkLoadValidators(b *testing.B) {
}
}
func TestStoreLoadConsensusParams(t *testing.T) {
stateDB := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB)
err := stateStore.Save(makeRandomStateFromConsensusParams(types.DefaultConsensusParams(), 1, 1))
require.NoError(t, err)
params, err := stateStore.LoadConsensusParams(1)
require.NoError(t, err)
require.Equal(t, types.DefaultConsensusParams(), &params)
// we give the state store different params but say that the height hasn't changed, hence
// it should save a pointer to the params at height 1
differentParams := types.DefaultConsensusParams()
differentParams.Block.MaxBytes = 20000
err = stateStore.Save(makeRandomStateFromConsensusParams(differentParams, 10, 1))
require.NoError(t, err)
res, err := stateStore.LoadConsensusParams(10)
require.NoError(t, err)
require.Equal(t, res, params)
require.NotEqual(t, res, differentParams)
}
func TestPruneStates(t *testing.T) {
testcases := map[string]struct {
makeHeights int64
pruneFrom int64
pruneTo int64
pruneHeight int64
expectErr bool
expectVals []int64
expectParams []int64
expectABCI []int64
}{
"error on pruning from 0": {100, 0, 5, true, nil, nil, nil},
"error when from > to": {100, 3, 2, true, nil, nil, nil},
"error when from == to": {100, 3, 3, true, nil, nil, nil},
"error when to does not exist": {100, 1, 101, true, nil, nil, nil},
"prune all": {100, 1, 100, false, []int64{93, 100}, []int64{95, 100}, []int64{100}},
"prune some": {10, 2, 8, false, []int64{1, 3, 8, 9, 10},
[]int64{1, 5, 8, 9, 10}, []int64{1, 8, 9, 10}},
"prune across checkpoint": {100001, 1, 100001, false, []int64{99993, 100000, 100001},
[]int64{99995, 100001}, []int64{100001}},
"error when prune height is 0": {100, 0, true, nil, nil, nil},
"error when prune height is negative": {100, -10, true, nil, nil, nil},
"error when prune height does not exist": {100, 101, true, nil, nil, nil},
"prune all": {100, 100, false, []int64{93, 100}, []int64{95, 100}, []int64{100}},
"prune some": {10, 8, false, []int64{3, 8, 9, 10},
[]int64{5, 8, 9, 10}, []int64{8, 9, 10}},
"prune across checkpoint": {100002, 100002, false, []int64{100000, 100002},
[]int64{99995, 100002}, []int64{100002}},
}
for name, tc := range testcases {
tc := tc
@ -158,7 +231,7 @@ func TestPruneStates(t *testing.T) {
}
// Test assertions
err := stateStore.PruneStates(tc.pruneFrom, tc.pruneTo)
err := stateStore.PruneStates(tc.pruneHeight)
if tc.expectErr {
require.Error(t, err)
return
@ -182,7 +255,7 @@ func TestPruneStates(t *testing.T) {
params, err := stateStore.LoadConsensusParams(h)
if expectParams[h] {
require.NoError(t, err, "params height %v", h)
require.False(t, params.Equal(&tmproto.ConsensusParams{}))
require.False(t, params.Equal(&tmproto.ConsensusParams{}), "params should not be empty")
} else {
require.Error(t, err, "params height %v", h)
}


+ 2
- 1
test/maverick/consensus/state.go View File

@ -1559,7 +1559,8 @@ func (cs *State) pruneBlocks(retainHeight int64) (uint64, error) {
if err != nil {
return 0, fmt.Errorf("failed to prune block store: %w", err)
}
err = cs.blockExec.Store().PruneStates(base, retainHeight)
err = cs.blockExec.Store().PruneStates(retainHeight)
if err != nil {
return 0, fmt.Errorf("failed to prune state database: %w", err)
}


Loading…
Cancel
Save