Browse Source

store: fix deadlock in pruning (#6007)

pull/6031/head
Callum Waters 3 years ago
committed by GitHub
parent
commit
90d3f56797
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 334 additions and 236 deletions
  1. +72
    -36
      evidence/pool.go
  2. +21
    -4
      evidence/pool_test.go
  3. +26
    -26
      light/store/db/db.go
  4. +0
    -5
      state/state.go
  5. +103
    -68
      state/store.go
  6. +45
    -43
      state/store_test.go
  7. +67
    -54
      store/store.go

+ 72
- 36
evidence/pool.go View File

@ -23,8 +23,8 @@ import (
const (
// prefixes are unique across all tm db's
prefixCommitted = int64(8)
prefixPending = int64(9)
prefixCommitted = int64(9)
prefixPending = int64(10)
)
// Pool maintains a pool of valid evidence to be broadcasted and committed
@ -132,7 +132,7 @@ func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) {
evpool.updateState(state)
// move committed evidence out from the pending pool and into the committed pool
evpool.markEvidenceAsCommitted(ev)
evpool.markEvidenceAsCommitted(ev, state.LastBlockHeight)
// Prune pending evidence when it has expired. This also updates when the next
// evidence will expire.
@ -386,23 +386,18 @@ func (evpool *Pool) addPendingEvidence(ev types.Evidence) error {
return nil
}
func (evpool *Pool) removePendingEvidence(evidence types.Evidence) {
key := keyPending(evidence)
if err := evpool.evidenceStore.Delete(key); err != nil {
evpool.logger.Error("failed to delete pending evidence", "err", err)
} else {
atomic.AddUint32(&evpool.evidenceSize, ^uint32(0))
evpool.logger.Debug("deleted pending evidence", "evidence", evidence)
}
}
// markEvidenceAsCommitted processes all the evidence in the block, marking it as
// committed and removing it from the pending database.
func (evpool *Pool) markEvidenceAsCommitted(evidence types.EvidenceList) {
func (evpool *Pool) markEvidenceAsCommitted(evidence types.EvidenceList, height int64) {
blockEvidenceMap := make(map[string]struct{}, len(evidence))
batch := evpool.evidenceStore.NewBatch()
defer batch.Close()
for _, ev := range evidence {
if evpool.isPending(ev) {
evpool.removePendingEvidence(ev)
if err := batch.Delete(keyPending(ev)); err != nil {
evpool.logger.Error("failed to batch pending evidence", "err", err)
}
blockEvidenceMap[evMapKey(ev)] = struct{}{}
}
@ -410,7 +405,7 @@ func (evpool *Pool) markEvidenceAsCommitted(evidence types.EvidenceList) {
// we only need to record the height that it was saved at.
key := keyCommitted(ev)
h := gogotypes.Int64Value{Value: ev.Height()}
h := gogotypes.Int64Value{Value: height}
evBytes, err := proto.Marshal(&h)
if err != nil {
evpool.logger.Error("failed to marshal committed evidence", "key(height/hash)", key, "err", err)
@ -424,10 +419,22 @@ func (evpool *Pool) markEvidenceAsCommitted(evidence types.EvidenceList) {
evpool.logger.Debug("marked evidence as committed", "evidence", ev)
}
// remove committed evidence from the clist
if len(blockEvidenceMap) != 0 {
evpool.removeEvidenceFromList(blockEvidenceMap)
// check if we need to remove any pending evidence
if len(blockEvidenceMap) == 0 {
return
}
// remove committed evidence from pending bucket
if err := batch.WriteSync(); err != nil {
evpool.logger.Error("failed to batch delete pending evidence", "err", err)
return
}
// remove committed evidence from the clist
evpool.removeEvidenceFromList(blockEvidenceMap)
// update the evidence size
atomic.AddUint32(&evpool.evidenceSize, ^uint32(len(blockEvidenceMap)-1))
}
// listEvidence retrieves lists evidence from oldest to newest within maxBytes.
@ -481,44 +488,73 @@ func (evpool *Pool) listEvidence(prefixKey int64, maxBytes int64) ([]types.Evide
}
func (evpool *Pool) removeExpiredPendingEvidence() (int64, time.Time) {
iter, err := dbm.IteratePrefix(evpool.evidenceStore, prefixToBytes(prefixPending))
if err != nil {
evpool.logger.Error("failed to iterate over pending evidence", "err", err)
batch := evpool.evidenceStore.NewBatch()
defer batch.Close()
height, time, blockEvidenceMap := evpool.batchExpiredPendingEvidence(batch)
// if we haven't removed any evidence then return early
if len(blockEvidenceMap) == 0 {
return height, time
}
evpool.logger.Debug("removing expired evidence",
"height", evpool.State().LastBlockHeight,
"time", evpool.State().LastBlockTime,
"expired evidence", len(blockEvidenceMap),
)
// remove expired evidence from pending bucket
if err := batch.WriteSync(); err != nil {
evpool.logger.Error("failed to batch delete pending evidence", "err", err)
return evpool.State().LastBlockHeight, evpool.State().LastBlockTime
}
defer iter.Close()
// remove evidence from the clist
evpool.removeEvidenceFromList(blockEvidenceMap)
// update the evidence size
atomic.AddUint32(&evpool.evidenceSize, ^uint32(len(blockEvidenceMap)-1))
return height, time
}
func (evpool *Pool) batchExpiredPendingEvidence(batch dbm.Batch) (int64, time.Time, map[string]struct{}) {
blockEvidenceMap := make(map[string]struct{})
iter, err := dbm.IteratePrefix(evpool.evidenceStore, prefixToBytes(prefixPending))
if err != nil {
evpool.logger.Error("failed to iterate over pending evidence", "err", err)
return evpool.State().LastBlockHeight, evpool.State().LastBlockTime, blockEvidenceMap
}
defer iter.Close()
for ; iter.Valid(); iter.Next() {
ev, err := bytesToEv(iter.Value())
if err != nil {
evpool.logger.Error("failed to transition evidence from protobuf", "err", err)
evpool.logger.Error("failed to transition evidence from protobuf", "err", err, "ev", ev)
continue
}
// if true, we have looped through all expired evidence
if !evpool.isExpired(ev.Height(), ev.Time()) {
if len(blockEvidenceMap) != 0 {
evpool.removeEvidenceFromList(blockEvidenceMap)
}
// Return the height and time with which this evidence will have expired
// so we know when to prune next.
return ev.Height() + evpool.State().ConsensusParams.Evidence.MaxAgeNumBlocks + 1,
ev.Time().Add(evpool.State().ConsensusParams.Evidence.MaxAgeDuration).Add(time.Second)
ev.Time().Add(evpool.State().ConsensusParams.Evidence.MaxAgeDuration).Add(time.Second),
blockEvidenceMap
}
evpool.removePendingEvidence(ev)
blockEvidenceMap[evMapKey(ev)] = struct{}{}
}
// else add to the batch
if err := batch.Delete(iter.Key()); err != nil {
evpool.logger.Error("failed to batch evidence", "err", err, "ev", ev)
continue
}
// we either have no pending evidence or all evidence has expired
if len(blockEvidenceMap) != 0 {
evpool.removeEvidenceFromList(blockEvidenceMap)
// and add to the map to remove the evidence from the clist
blockEvidenceMap[evMapKey(ev)] = struct{}{}
}
return evpool.State().LastBlockHeight, evpool.State().LastBlockTime
return evpool.State().LastBlockHeight, evpool.State().LastBlockTime, blockEvidenceMap
}
func (evpool *Pool) removeEvidenceFromList(


+ 21
- 4
evidence/pool_test.go View File

@ -172,7 +172,7 @@ func TestEvidencePoolUpdate(t *testing.T) {
pool, val := defaultTestPool(t, height)
state := pool.State()
// create new block (no need to save it to blockStore)
// create two lots of old evidence that we expect to be pruned when we update
prunedEv := types.NewMockDuplicateVoteEvidenceWithValidator(
1,
defaultEvidenceTime.Add(1*time.Minute),
@ -180,7 +180,15 @@ func TestEvidencePoolUpdate(t *testing.T) {
evidenceChainID,
)
notPrunedEv := types.NewMockDuplicateVoteEvidenceWithValidator(
2,
defaultEvidenceTime.Add(2*time.Minute),
val,
evidenceChainID,
)
require.NoError(t, pool.AddEvidence(prunedEv))
require.NoError(t, pool.AddEvidence(notPrunedEv))
ev := types.NewMockDuplicateVoteEvidenceWithValidator(
height,
@ -195,14 +203,23 @@ func TestEvidencePoolUpdate(t *testing.T) {
state.LastBlockHeight = height + 1
state.LastBlockTime = defaultEvidenceTime.Add(22 * time.Minute)
evList, _ := pool.PendingEvidence(2 * defaultEvidenceMaxBytes)
require.Equal(t, 2, len(evList))
require.Equal(t, uint32(2), pool.Size())
require.NoError(t, pool.CheckEvidence(types.EvidenceList{ev}))
evList, _ = pool.PendingEvidence(3 * defaultEvidenceMaxBytes)
require.Equal(t, 3, len(evList))
require.Equal(t, uint32(3), pool.Size())
pool.Update(state, block.Evidence.Evidence)
// a) Update marks evidence as committed so pending evidence should be empty
evList, evSize := pool.PendingEvidence(defaultEvidenceMaxBytes)
require.Empty(t, evList)
require.Zero(t, evSize)
evList, _ = pool.PendingEvidence(defaultEvidenceMaxBytes)
require.Equal(t, []types.Evidence{notPrunedEv}, evList)
// b) If we try to check this evidence again it should fail because it has already been committed
err := pool.CheckEvidence(types.EvidenceList{ev})


+ 26
- 26
light/store/db/db.go View File

@ -14,8 +14,8 @@ import (
)
const (
prefixLightBlock = int64(0x0a)
prefixSize = int64(0x0b)
prefixLightBlock = int64(11)
prefixSize = int64(12)
)
type dbs struct {
@ -230,27 +230,11 @@ func (s *dbs) Prune(size uint16) error {
}
numToPrune := sSize - size
// 2) Iterate over headers and perform a batch operation.
itr, err := s.db.Iterator(
s.lbKey(1),
append(s.lbKey(1<<63-1), byte(0x00)),
)
if err != nil {
panic(err)
}
defer itr.Close()
b := s.db.NewBatch()
defer b.Close()
for itr.Valid() && numToPrune > 0 {
if err = b.Delete(itr.Key()); err != nil {
return err
}
itr.Next()
numToPrune--
}
if err = itr.Error(); err != nil {
// 2) use an iterator to batch together all the blocks that need to be deleted
if err := s.batchDelete(b, numToPrune); err != nil {
return err
}
@ -261,12 +245,7 @@ func (s *dbs) Prune(size uint16) error {
}
// 4) write batch deletion to disk
err = b.WriteSync()
if err != nil {
return err
}
return nil
return b.WriteSync()
}
// Size returns the number of header & validator set pairs.
@ -278,6 +257,27 @@ func (s *dbs) Size() uint16 {
return s.size
}
func (s *dbs) batchDelete(batch dbm.Batch, numToPrune uint16) error {
itr, err := s.db.Iterator(
s.lbKey(1),
append(s.lbKey(1<<63-1), byte(0x00)),
)
if err != nil {
return err
}
defer itr.Close()
for itr.Valid() && numToPrune > 0 {
if err = batch.Delete(itr.Key()); err != nil {
return err
}
itr.Next()
numToPrune--
}
return itr.Error()
}
func (s *dbs) sizeKey() []byte {
key, err := orderedcode.Append(nil, prefixSize)
if err != nil {


+ 0
- 5
state/state.go View File

@ -16,11 +16,6 @@ import (
"github.com/tendermint/tendermint/version"
)
// database keys
var (
stateKey = []byte("stateKey")
)
//-----------------------------------------------------------------------------
type Version struct {


+ 103
- 68
state/store.go View File

@ -32,6 +32,7 @@ const (
prefixValidators = int64(5)
prefixConsensusParams = int64(6)
prefixABCIResponses = int64(7)
prefixState = int64(8)
)
func encodeKey(prefix int64, height int64) []byte {
@ -54,6 +55,17 @@ func abciResponsesKey(height int64) []byte {
return encodeKey(prefixABCIResponses, height)
}
// stateKey should never change after being set in init()
var stateKey []byte
func init() {
var err error
stateKey, err = orderedcode.Append(nil, prefixState)
if err != nil {
panic(err)
}
}
//----------------------
//go:generate mockery --case underscore --name Store
@ -239,11 +251,16 @@ func (store dbStore) PruneStates(retainHeight int64) error {
return fmt.Errorf("height %v must be greater than 0", retainHeight)
}
if err := store.pruneValidatorSets(retainHeight); err != nil {
// NOTE: We need to prune consensus params first because the validator
// sets have always one extra height. If validator sets were pruned first
// we could get a situation where we prune up to the last validator set
// yet don't have the respective consensus params at that height and thus
// return an error
if err := store.pruneConsensusParams(retainHeight); err != nil {
return err
}
if err := store.pruneConsensusParams(retainHeight); err != nil {
if err := store.pruneValidatorSets(retainHeight); err != nil {
return err
}
@ -257,37 +274,48 @@ func (store dbStore) PruneStates(retainHeight int64) error {
// pruneValidatorSets calls a reverse iterator from base height to retain height (exclusive), deleting
// all validator sets in between. Due to the fact that most validator sets stored reference an earlier
// validator set, it is likely that there will remain one validator set left after pruning.
func (store dbStore) pruneValidatorSets(height int64) error {
valInfo, err := loadValidatorsInfo(store.db, height)
func (store dbStore) pruneValidatorSets(retainHeight int64) error {
valInfo, err := loadValidatorsInfo(store.db, retainHeight)
if err != nil {
return fmt.Errorf("validators at height %v not found: %w", height, err)
return fmt.Errorf("validators at height %v not found: %w", retainHeight, err)
}
// We will prune up to the validator set at the given "height". As we don't save validator sets every
// height but only when they change or at a check point, it is likely that the validator set at the height
// we prune to is empty and thus dependent on the validator set saved at a previous height. We must find
// that validator set and make sure it is not pruned.
lastRecordedValSetHeight := lastStoredHeightFor(height, valInfo.LastHeightChanged)
lastRecordedValSetHeight := lastStoredHeightFor(retainHeight, valInfo.LastHeightChanged)
lastRecordedValSet, err := loadValidatorsInfo(store.db, lastRecordedValSetHeight)
if err != nil || lastRecordedValSet.ValidatorSet == nil {
return fmt.Errorf("couldn't find validators at height %d (height %d was originally requested): %w",
lastStoredHeightFor(height, valInfo.LastHeightChanged),
height,
lastStoredHeightFor(retainHeight, valInfo.LastHeightChanged),
retainHeight,
err,
)
}
// batch delete all the validators sets up to height
return store.batchDelete(
// if this is not equal to the retain height, prune from the retain height to the height above
// the last saved validator set. This way we can skip over the dependent validator set.
if lastRecordedValSetHeight < retainHeight {
err := store.pruneRange(
validatorsKey(lastRecordedValSetHeight+1),
validatorsKey(retainHeight),
)
if err != nil {
return err
}
}
// prune all the validators sets up to last saved validator set
return store.pruneRange(
validatorsKey(1),
validatorsKey(height),
validatorsKey(lastRecordedValSetHeight),
)
}
// pruneConsensusParams calls a reverse iterator from base height to retain height batch deleting
// all consensus params in between. If the consensus params at the new base height is dependent
// on a prior height then this will keep that lower height to.
// on a prior height then this will keep that lower height too.
func (store dbStore) pruneConsensusParams(retainHeight int64) error {
paramsInfo, err := store.loadConsensusParamsInfo(retainHeight)
if err != nil {
@ -298,21 +326,31 @@ func (store dbStore) pruneConsensusParams(retainHeight int64) error {
// we must not prune (or save) the last consensus params that the consensus params info at height
// is dependent on.
if paramsInfo.ConsensusParams.Equal(&tmproto.ConsensusParams{}) {
// sanity check that the consensus params at the last height it was changed is there
lastRecordedConsensusParams, err := store.loadConsensusParamsInfo(paramsInfo.LastHeightChanged)
if err != nil || lastRecordedConsensusParams.ConsensusParams.Equal(&tmproto.ConsensusParams{}) {
return fmt.Errorf(
"couldn't find consensus params at height %d as last changed from height %d: %w",
"couldn't find consensus params at height %d (height %d was originally requested): %w",
paramsInfo.LastHeightChanged,
retainHeight,
err,
)
}
// prune the params above the height with which it last changed and below the retain height.
err = store.pruneRange(
consensusParamsKey(paramsInfo.LastHeightChanged+1),
consensusParamsKey(retainHeight),
)
if err != nil {
return err
}
}
// batch delete all the consensus params up to the retain height
return store.batchDelete(
// prune all the consensus params up to either the last height the params changed or if the params
// last changed at the retain height, then up to the retain height.
return store.pruneRange(
consensusParamsKey(1),
consensusParamsKey(retainHeight),
consensusParamsKey(paramsInfo.LastHeightChanged),
)
}
@ -320,72 +358,69 @@ func (store dbStore) pruneConsensusParams(retainHeight int64) error {
// pruneABCIResponses calls a reverse iterator from base height to retain height batch deleting
// all abci responses in between
func (store dbStore) pruneABCIResponses(height int64) error {
return store.batchDelete(abciResponsesKey(1), abciResponsesKey(height), nil)
return store.pruneRange(abciResponsesKey(1), abciResponsesKey(height))
}
// batchDelete is a generic function for deleting a range of keys in reverse order. It will
// skip keys that have been
func (store dbStore) batchDelete(start []byte, end []byte, exception []byte) error {
iter, err := store.db.ReverseIterator(start, end)
if err != nil {
return fmt.Errorf("iterator error: %w", err)
}
defer iter.Close()
// pruneRange is a generic function for deleting a range of keys in reverse order.
// we keep filling up batches of at most 1000 keys, perform a deletion and continue until
// we have gone through all of keys in the range. This avoids doing any writes whilst
// iterating.
func (store dbStore) pruneRange(start []byte, end []byte) error {
var err error
batch := store.db.NewBatch()
defer batch.Close()
pruned := 0
for iter.Valid() {
key := iter.Key()
if bytes.Equal(key, exception) {
iter.Next()
continue
end, err = store.reverseBatchDelete(batch, start, end)
if err != nil {
return err
}
// iterate until the last batch of the pruning range in which case we will perform a
// write sync
for !bytes.Equal(start, end) {
if err := batch.Write(); err != nil {
return err
}
if err := batch.Delete(key); err != nil {
return fmt.Errorf("pruning error at key %X: %w", key, err)
if err := batch.Close(); err != nil {
return err
}
pruned++
// avoid batches growing too large by flushing to disk regularly
if pruned%1000 == 0 {
if err := iter.Error(); err != nil {
return err
}
if err := iter.Close(); err != nil {
return err
}
if err := batch.Write(); err != nil {
return fmt.Errorf("pruning error at key %X: %w", key, err)
}
if err := batch.Close(); err != nil {
return err
}
iter, err = store.db.ReverseIterator(start, end)
if err != nil {
return fmt.Errorf("iterator error: %w", err)
}
defer iter.Close()
batch = store.db.NewBatch()
defer batch.Close()
} else {
iter.Next()
batch = store.db.NewBatch()
// fill a new batch of keys for deletion over the remainding range
end, err = store.reverseBatchDelete(batch, start, end)
if err != nil {
return err
}
}
if err := iter.Error(); err != nil {
return fmt.Errorf("iterator error: %w", err)
}
return batch.WriteSync()
}
if err := batch.WriteSync(); err != nil {
return err
// reverseBatchDelete runs a reverse iterator (from end to start) filling up a batch until either
// (a) the iterator reaches the start or (b) the iterator has added a 1000 keys (this avoids the
// batch from growing too large)
func (store dbStore) reverseBatchDelete(batch dbm.Batch, start, end []byte) ([]byte, error) {
iter, err := store.db.ReverseIterator(start, end)
if err != nil {
return end, fmt.Errorf("iterator error: %w", err)
}
defer iter.Close()
return nil
size := 0
for ; iter.Valid(); iter.Next() {
if err := batch.Delete(iter.Key()); err != nil {
return end, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err)
}
// avoid batches growing too large by capping them
size++
if size == 1000 {
return iter.Key(), iter.Error()
}
}
return start, iter.Error()
}
//------------------------------------------------------------------------
@ -584,7 +619,7 @@ func (store dbStore) LoadConsensusParams(height int64) (types.ConsensusParams, e
paramsInfo2, err := store.loadConsensusParamsInfo(paramsInfo.LastHeightChanged)
if err != nil {
return empty, fmt.Errorf(
"couldn't find consensus params at height %d as last changed from height %d: %w",
"couldn't find consensus params at height %d (height %d was originally requested): %w",
paramsInfo.LastHeightChanged,
height,
err,


+ 45
- 43
state/store_test.go View File

@ -159,26 +159,28 @@ func TestStoreLoadConsensusParams(t *testing.T) {
func TestPruneStates(t *testing.T) {
testcases := map[string]struct {
makeHeights int64
pruneHeight int64
expectErr bool
expectVals []int64
expectParams []int64
expectABCI []int64
startHeight int64
endHeight int64
pruneHeight int64
expectErr bool
remainingValSetHeight int64
remainingParamsHeight int64
}{
"error when prune height is 0": {100, 0, true, nil, nil, nil},
"error when prune height is negative": {100, -10, true, nil, nil, nil},
"error when prune height does not exist": {100, 101, true, nil, nil, nil},
"prune all": {100, 100, false, []int64{93, 100}, []int64{95, 100}, []int64{100}},
"prune some": {10, 8, false, []int64{3, 8, 9, 10},
[]int64{5, 8, 9, 10}, []int64{8, 9, 10}},
"prune across checkpoint": {100002, 100002, false, []int64{100000, 100002},
[]int64{99995, 100002}, []int64{100002}},
"error when prune height is 0": {1, 100, 0, true, 0, 0},
"error when prune height is negative": {1, 100, -10, true, 0, 0},
"error when prune height does not exist": {1, 100, 101, true, 0, 0},
"prune all": {1, 100, 100, false, 93, 95},
"prune from non 1 height": {10, 50, 40, false, 33, 35},
"prune some": {1, 10, 8, false, 3, 5},
// we test this because we flush to disk every 1000 "states"
"prune more than 1000 state": {1, 1010, 1010, false, 1003, 1005},
"prune across checkpoint": {99900, 100002, 100002, false, 100000, 99995},
}
for name, tc := range testcases {
tc := tc
t.Run(name, func(t *testing.T) {
db := dbm.NewMemDB()
stateStore := sm.NewStore(db)
pk := ed25519.GenPrivKey().PubKey()
@ -192,7 +194,7 @@ func TestPruneStates(t *testing.T) {
valsChanged := int64(0)
paramsChanged := int64(0)
for h := int64(1); h <= tc.makeHeights; h++ {
for h := tc.startHeight; h <= tc.endHeight; h++ {
if valsChanged == 0 || h%10 == 2 {
valsChanged = h + 1 // Have to add 1, since NextValidators is what's stored
}
@ -237,36 +239,44 @@ func TestPruneStates(t *testing.T) {
}
require.NoError(t, err)
expectVals := sliceToMap(tc.expectVals)
expectParams := sliceToMap(tc.expectParams)
expectABCI := sliceToMap(tc.expectABCI)
for h := tc.pruneHeight; h <= tc.endHeight; h++ {
vals, err := stateStore.LoadValidators(h)
require.NoError(t, err, h)
require.NotNil(t, vals, h)
params, err := stateStore.LoadConsensusParams(h)
require.NoError(t, err, h)
require.NotNil(t, params, h)
abci, err := stateStore.LoadABCIResponses(h)
require.NoError(t, err, h)
require.NotNil(t, abci, h)
}
for h := int64(1); h <= tc.makeHeights; h++ {
emptyParams := types.ConsensusParams{}
for h := tc.startHeight; h < tc.pruneHeight; h++ {
vals, err := stateStore.LoadValidators(h)
if expectVals[h] {
require.NoError(t, err, "validators height %v", h)
require.NotNil(t, vals)
if h == tc.remainingValSetHeight {
require.NoError(t, err, h)
require.NotNil(t, vals, h)
} else {
require.Error(t, err, "validators height %v", h)
require.Equal(t, sm.ErrNoValSetForHeight{Height: h}, err)
require.Error(t, err, h)
require.Nil(t, vals, h)
}
params, err := stateStore.LoadConsensusParams(h)
if expectParams[h] {
require.NoError(t, err, "params height %v", h)
require.False(t, params.Equals(&types.ConsensusParams{}), "params should not be empty")
if h == tc.remainingParamsHeight {
require.NoError(t, err, h)
require.NotEqual(t, emptyParams, params, h)
} else {
require.Error(t, err, "params height %v", h)
require.Error(t, err, h)
require.Equal(t, emptyParams, params, h)
}
abci, err := stateStore.LoadABCIResponses(h)
if expectABCI[h] {
require.NoError(t, err, "abci height %v", h)
require.NotNil(t, abci)
} else {
require.Error(t, err, "abci height %v", h)
require.Equal(t, sm.ErrNoABCIResponsesForHeight{Height: h}, err)
}
require.Error(t, err, h)
require.Nil(t, abci, h)
}
})
}
@ -293,11 +303,3 @@ func TestABCIResponsesResultsHash(t *testing.T) {
require.NoError(t, err)
assert.NoError(t, proof.Verify(root, bz))
}
func sliceToMap(s []int64) map[int64]bool {
m := make(map[int64]bool, len(s))
for _, i := range s {
m[i] = true
}
return m
}

+ 67
- 54
store/store.go View File

@ -1,6 +1,7 @@
package store
import (
"bytes"
"fmt"
"strconv"
@ -315,99 +316,111 @@ func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) {
// remove block meta first as this is used to indicate whether the block exists.
// For this reason, we also use ony block meta as a measure of the amount of blocks pruned
pruned, err := bs.batchDelete(blockMetaKey(0), blockMetaKey(height), removeBlockHash)
pruned, err := bs.pruneRange(blockMetaKey(0), blockMetaKey(height), removeBlockHash)
if err != nil {
return pruned, err
}
if _, err := bs.batchDelete(blockPartKey(0, 0), blockPartKey(height, 0), nil); err != nil {
if _, err := bs.pruneRange(blockPartKey(0, 0), blockPartKey(height, 0), nil); err != nil {
return pruned, err
}
if _, err := bs.batchDelete(blockCommitKey(0), blockCommitKey(height), nil); err != nil {
if _, err := bs.pruneRange(blockCommitKey(0), blockCommitKey(height), nil); err != nil {
return pruned, err
}
if _, err := bs.batchDelete(seenCommitKey(0), seenCommitKey(height), nil); err != nil {
if _, err := bs.pruneRange(seenCommitKey(0), seenCommitKey(height), nil); err != nil {
return pruned, err
}
return pruned, nil
}
// batchDelete is a generic function for deleting a range of values based on the lowest
// pruneRange is a generic function for deleting a range of values based on the lowest
// height up to but excluding retainHeight. For each key/value pair, an optional hook can be
// executed before the deletion itself is made
func (bs *BlockStore) batchDelete(
// executed before the deletion itself is made. pruneRange will use batch delete to delete
// keys in batches of at most 1000 keys.
func (bs *BlockStore) pruneRange(
start []byte,
end []byte,
preDeletionHook func(key, value []byte, batch dbm.Batch) error,
) (uint64, error) {
var (
err error
pruned uint64
totalPruned uint64 = 0
)
batch := bs.db.NewBatch()
defer batch.Close()
pruned, start, err = bs.batchDelete(batch, start, end, preDeletionHook)
if err != nil {
return totalPruned, err
}
// loop until we have finished iterating over all the keys by writing, opening a new batch
// and incrementing through the next range of keys.
for !bytes.Equal(start, end) {
if err := batch.Write(); err != nil {
return totalPruned, err
}
totalPruned += pruned
if err := batch.Close(); err != nil {
return totalPruned, err
}
batch = bs.db.NewBatch()
pruned, start, err = bs.batchDelete(batch, start, end, preDeletionHook)
if err != nil {
return totalPruned, err
}
}
// once we looped over all keys we do a final flush to disk
if err := batch.WriteSync(); err != nil {
return totalPruned, err
}
totalPruned += pruned
return totalPruned, nil
}
// batchDelete runs an iterator over a set of keys, first preforming a pre deletion hook before adding it to the batch.
// The function ends when either 1000 keys have been added to the batch or the iterator has reached the end.
func (bs *BlockStore) batchDelete(
batch dbm.Batch,
start, end []byte,
preDeletionHook func(key, value []byte, batch dbm.Batch) error,
) (uint64, []byte, error) {
var pruned uint64 = 0
iter, err := bs.db.Iterator(start, end)
if err != nil {
panic(err)
return pruned, start, err
}
defer iter.Close()
batch := bs.db.NewBatch()
defer batch.Close()
pruned := uint64(0)
flushed := pruned
for iter.Valid() {
for ; iter.Valid(); iter.Next() {
key := iter.Key()
if preDeletionHook != nil {
if err := preDeletionHook(key, iter.Value(), batch); err != nil {
return flushed, err
return 0, start, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err)
}
}
if err := batch.Delete(key); err != nil {
return flushed, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err)
return 0, start, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err)
}
pruned++
// avoid batches growing too large by flushing to database regularly
if pruned%1000 == 0 {
if err := iter.Error(); err != nil {
return flushed, err
}
if err := iter.Close(); err != nil {
return flushed, err
}
err := batch.Write()
if err != nil {
return flushed, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err)
}
if err := batch.Close(); err != nil {
return flushed, err
}
flushed = pruned
iter, err = bs.db.Iterator(start, end)
if err != nil {
panic(err)
}
defer iter.Close()
batch = bs.db.NewBatch()
defer batch.Close()
} else {
iter.Next()
if pruned == 1000 {
return pruned, iter.Key(), iter.Error()
}
}
flushed = pruned
if err := iter.Error(); err != nil {
return flushed, err
}
err = batch.WriteSync()
if err != nil {
return flushed, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err)
}
return flushed, nil
return pruned, end, iter.Error()
}
// SaveBlock persists the given block, blockParts, and seenCommit to the underlying db.


Loading…
Cancel
Save