diff --git a/evidence/pool.go b/evidence/pool.go index 34c267c0c..bc09ea94e 100644 --- a/evidence/pool.go +++ b/evidence/pool.go @@ -23,8 +23,8 @@ import ( const ( // prefixes are unique across all tm db's - prefixCommitted = int64(8) - prefixPending = int64(9) + prefixCommitted = int64(9) + prefixPending = int64(10) ) // Pool maintains a pool of valid evidence to be broadcasted and committed @@ -132,7 +132,7 @@ func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) { evpool.updateState(state) // move committed evidence out from the pending pool and into the committed pool - evpool.markEvidenceAsCommitted(ev) + evpool.markEvidenceAsCommitted(ev, state.LastBlockHeight) // Prune pending evidence when it has expired. This also updates when the next // evidence will expire. @@ -386,23 +386,18 @@ func (evpool *Pool) addPendingEvidence(ev types.Evidence) error { return nil } -func (evpool *Pool) removePendingEvidence(evidence types.Evidence) { - key := keyPending(evidence) - if err := evpool.evidenceStore.Delete(key); err != nil { - evpool.logger.Error("failed to delete pending evidence", "err", err) - } else { - atomic.AddUint32(&evpool.evidenceSize, ^uint32(0)) - evpool.logger.Debug("deleted pending evidence", "evidence", evidence) - } -} - // markEvidenceAsCommitted processes all the evidence in the block, marking it as // committed and removing it from the pending database. -func (evpool *Pool) markEvidenceAsCommitted(evidence types.EvidenceList) { +func (evpool *Pool) markEvidenceAsCommitted(evidence types.EvidenceList, height int64) { blockEvidenceMap := make(map[string]struct{}, len(evidence)) + batch := evpool.evidenceStore.NewBatch() + defer batch.Close() + for _, ev := range evidence { if evpool.isPending(ev) { - evpool.removePendingEvidence(ev) + if err := batch.Delete(keyPending(ev)); err != nil { + evpool.logger.Error("failed to batch pending evidence", "err", err) + } blockEvidenceMap[evMapKey(ev)] = struct{}{} } @@ -410,7 +405,7 @@ func (evpool *Pool) markEvidenceAsCommitted(evidence types.EvidenceList) { // we only need to record the height that it was saved at. key := keyCommitted(ev) - h := gogotypes.Int64Value{Value: ev.Height()} + h := gogotypes.Int64Value{Value: height} evBytes, err := proto.Marshal(&h) if err != nil { evpool.logger.Error("failed to marshal committed evidence", "key(height/hash)", key, "err", err) @@ -424,10 +419,22 @@ func (evpool *Pool) markEvidenceAsCommitted(evidence types.EvidenceList) { evpool.logger.Debug("marked evidence as committed", "evidence", ev) } - // remove committed evidence from the clist - if len(blockEvidenceMap) != 0 { - evpool.removeEvidenceFromList(blockEvidenceMap) + // check if we need to remove any pending evidence + if len(blockEvidenceMap) == 0 { + return } + + // remove committed evidence from pending bucket + if err := batch.WriteSync(); err != nil { + evpool.logger.Error("failed to batch delete pending evidence", "err", err) + return + } + + // remove committed evidence from the clist + evpool.removeEvidenceFromList(blockEvidenceMap) + + // update the evidence size + atomic.AddUint32(&evpool.evidenceSize, ^uint32(len(blockEvidenceMap)-1)) } // listEvidence retrieves lists evidence from oldest to newest within maxBytes. @@ -481,44 +488,73 @@ func (evpool *Pool) listEvidence(prefixKey int64, maxBytes int64) ([]types.Evide } func (evpool *Pool) removeExpiredPendingEvidence() (int64, time.Time) { - iter, err := dbm.IteratePrefix(evpool.evidenceStore, prefixToBytes(prefixPending)) - if err != nil { - evpool.logger.Error("failed to iterate over pending evidence", "err", err) + batch := evpool.evidenceStore.NewBatch() + defer batch.Close() + + height, time, blockEvidenceMap := evpool.batchExpiredPendingEvidence(batch) + + // if we haven't removed any evidence then return early + if len(blockEvidenceMap) == 0 { + return height, time + } + + evpool.logger.Debug("removing expired evidence", + "height", evpool.State().LastBlockHeight, + "time", evpool.State().LastBlockTime, + "expired evidence", len(blockEvidenceMap), + ) + + // remove expired evidence from pending bucket + if err := batch.WriteSync(); err != nil { + evpool.logger.Error("failed to batch delete pending evidence", "err", err) return evpool.State().LastBlockHeight, evpool.State().LastBlockTime } - defer iter.Close() + // remove evidence from the clist + evpool.removeEvidenceFromList(blockEvidenceMap) + // update the evidence size + atomic.AddUint32(&evpool.evidenceSize, ^uint32(len(blockEvidenceMap)-1)) + + return height, time +} + +func (evpool *Pool) batchExpiredPendingEvidence(batch dbm.Batch) (int64, time.Time, map[string]struct{}) { blockEvidenceMap := make(map[string]struct{}) + iter, err := dbm.IteratePrefix(evpool.evidenceStore, prefixToBytes(prefixPending)) + if err != nil { + evpool.logger.Error("failed to iterate over pending evidence", "err", err) + return evpool.State().LastBlockHeight, evpool.State().LastBlockTime, blockEvidenceMap + } + defer iter.Close() for ; iter.Valid(); iter.Next() { ev, err := bytesToEv(iter.Value()) if err != nil { - evpool.logger.Error("failed to transition evidence from protobuf", "err", err) + evpool.logger.Error("failed to transition evidence from protobuf", "err", err, "ev", ev) continue } + // if true, we have looped through all expired evidence if !evpool.isExpired(ev.Height(), ev.Time()) { - if len(blockEvidenceMap) != 0 { - evpool.removeEvidenceFromList(blockEvidenceMap) - } - // Return the height and time with which this evidence will have expired // so we know when to prune next. return ev.Height() + evpool.State().ConsensusParams.Evidence.MaxAgeNumBlocks + 1, - ev.Time().Add(evpool.State().ConsensusParams.Evidence.MaxAgeDuration).Add(time.Second) + ev.Time().Add(evpool.State().ConsensusParams.Evidence.MaxAgeDuration).Add(time.Second), + blockEvidenceMap } - evpool.removePendingEvidence(ev) - blockEvidenceMap[evMapKey(ev)] = struct{}{} - } + // else add to the batch + if err := batch.Delete(iter.Key()); err != nil { + evpool.logger.Error("failed to batch evidence", "err", err, "ev", ev) + continue + } - // we either have no pending evidence or all evidence has expired - if len(blockEvidenceMap) != 0 { - evpool.removeEvidenceFromList(blockEvidenceMap) + // and add to the map to remove the evidence from the clist + blockEvidenceMap[evMapKey(ev)] = struct{}{} } - return evpool.State().LastBlockHeight, evpool.State().LastBlockTime + return evpool.State().LastBlockHeight, evpool.State().LastBlockTime, blockEvidenceMap } func (evpool *Pool) removeEvidenceFromList( diff --git a/evidence/pool_test.go b/evidence/pool_test.go index 3d17fef13..522419a08 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -172,7 +172,7 @@ func TestEvidencePoolUpdate(t *testing.T) { pool, val := defaultTestPool(t, height) state := pool.State() - // create new block (no need to save it to blockStore) + // create two lots of old evidence that we expect to be pruned when we update prunedEv := types.NewMockDuplicateVoteEvidenceWithValidator( 1, defaultEvidenceTime.Add(1*time.Minute), @@ -180,7 +180,15 @@ func TestEvidencePoolUpdate(t *testing.T) { evidenceChainID, ) + notPrunedEv := types.NewMockDuplicateVoteEvidenceWithValidator( + 2, + defaultEvidenceTime.Add(2*time.Minute), + val, + evidenceChainID, + ) + require.NoError(t, pool.AddEvidence(prunedEv)) + require.NoError(t, pool.AddEvidence(notPrunedEv)) ev := types.NewMockDuplicateVoteEvidenceWithValidator( height, @@ -195,14 +203,23 @@ func TestEvidencePoolUpdate(t *testing.T) { state.LastBlockHeight = height + 1 state.LastBlockTime = defaultEvidenceTime.Add(22 * time.Minute) + evList, _ := pool.PendingEvidence(2 * defaultEvidenceMaxBytes) + require.Equal(t, 2, len(evList)) + + require.Equal(t, uint32(2), pool.Size()) + require.NoError(t, pool.CheckEvidence(types.EvidenceList{ev})) + evList, _ = pool.PendingEvidence(3 * defaultEvidenceMaxBytes) + require.Equal(t, 3, len(evList)) + + require.Equal(t, uint32(3), pool.Size()) + pool.Update(state, block.Evidence.Evidence) // a) Update marks evidence as committed so pending evidence should be empty - evList, evSize := pool.PendingEvidence(defaultEvidenceMaxBytes) - require.Empty(t, evList) - require.Zero(t, evSize) + evList, _ = pool.PendingEvidence(defaultEvidenceMaxBytes) + require.Equal(t, []types.Evidence{notPrunedEv}, evList) // b) If we try to check this evidence again it should fail because it has already been committed err := pool.CheckEvidence(types.EvidenceList{ev}) diff --git a/light/store/db/db.go b/light/store/db/db.go index ee3973c8e..9ed121a80 100644 --- a/light/store/db/db.go +++ b/light/store/db/db.go @@ -14,8 +14,8 @@ import ( ) const ( - prefixLightBlock = int64(0x0a) - prefixSize = int64(0x0b) + prefixLightBlock = int64(11) + prefixSize = int64(12) ) type dbs struct { @@ -230,27 +230,11 @@ func (s *dbs) Prune(size uint16) error { } numToPrune := sSize - size - // 2) Iterate over headers and perform a batch operation. - itr, err := s.db.Iterator( - s.lbKey(1), - append(s.lbKey(1<<63-1), byte(0x00)), - ) - if err != nil { - panic(err) - } - defer itr.Close() - b := s.db.NewBatch() defer b.Close() - for itr.Valid() && numToPrune > 0 { - if err = b.Delete(itr.Key()); err != nil { - return err - } - itr.Next() - numToPrune-- - } - if err = itr.Error(); err != nil { + // 2) use an iterator to batch together all the blocks that need to be deleted + if err := s.batchDelete(b, numToPrune); err != nil { return err } @@ -261,12 +245,7 @@ func (s *dbs) Prune(size uint16) error { } // 4) write batch deletion to disk - err = b.WriteSync() - if err != nil { - return err - } - - return nil + return b.WriteSync() } // Size returns the number of header & validator set pairs. @@ -278,6 +257,27 @@ func (s *dbs) Size() uint16 { return s.size } +func (s *dbs) batchDelete(batch dbm.Batch, numToPrune uint16) error { + itr, err := s.db.Iterator( + s.lbKey(1), + append(s.lbKey(1<<63-1), byte(0x00)), + ) + if err != nil { + return err + } + defer itr.Close() + + for itr.Valid() && numToPrune > 0 { + if err = batch.Delete(itr.Key()); err != nil { + return err + } + itr.Next() + numToPrune-- + } + + return itr.Error() +} + func (s *dbs) sizeKey() []byte { key, err := orderedcode.Append(nil, prefixSize) if err != nil { diff --git a/state/state.go b/state/state.go index ffebaf3b4..430dbf88b 100644 --- a/state/state.go +++ b/state/state.go @@ -16,11 +16,6 @@ import ( "github.com/tendermint/tendermint/version" ) -// database keys -var ( - stateKey = []byte("stateKey") -) - //----------------------------------------------------------------------------- type Version struct { diff --git a/state/store.go b/state/store.go index 19ac01daa..4829a8b75 100644 --- a/state/store.go +++ b/state/store.go @@ -32,6 +32,7 @@ const ( prefixValidators = int64(5) prefixConsensusParams = int64(6) prefixABCIResponses = int64(7) + prefixState = int64(8) ) func encodeKey(prefix int64, height int64) []byte { @@ -54,6 +55,17 @@ func abciResponsesKey(height int64) []byte { return encodeKey(prefixABCIResponses, height) } +// stateKey should never change after being set in init() +var stateKey []byte + +func init() { + var err error + stateKey, err = orderedcode.Append(nil, prefixState) + if err != nil { + panic(err) + } +} + //---------------------- //go:generate mockery --case underscore --name Store @@ -239,11 +251,16 @@ func (store dbStore) PruneStates(retainHeight int64) error { return fmt.Errorf("height %v must be greater than 0", retainHeight) } - if err := store.pruneValidatorSets(retainHeight); err != nil { + // NOTE: We need to prune consensus params first because the validator + // sets have always one extra height. If validator sets were pruned first + // we could get a situation where we prune up to the last validator set + // yet don't have the respective consensus params at that height and thus + // return an error + if err := store.pruneConsensusParams(retainHeight); err != nil { return err } - if err := store.pruneConsensusParams(retainHeight); err != nil { + if err := store.pruneValidatorSets(retainHeight); err != nil { return err } @@ -257,37 +274,48 @@ func (store dbStore) PruneStates(retainHeight int64) error { // pruneValidatorSets calls a reverse iterator from base height to retain height (exclusive), deleting // all validator sets in between. Due to the fact that most validator sets stored reference an earlier // validator set, it is likely that there will remain one validator set left after pruning. -func (store dbStore) pruneValidatorSets(height int64) error { - valInfo, err := loadValidatorsInfo(store.db, height) +func (store dbStore) pruneValidatorSets(retainHeight int64) error { + valInfo, err := loadValidatorsInfo(store.db, retainHeight) if err != nil { - return fmt.Errorf("validators at height %v not found: %w", height, err) + return fmt.Errorf("validators at height %v not found: %w", retainHeight, err) } // We will prune up to the validator set at the given "height". As we don't save validator sets every // height but only when they change or at a check point, it is likely that the validator set at the height // we prune to is empty and thus dependent on the validator set saved at a previous height. We must find // that validator set and make sure it is not pruned. - lastRecordedValSetHeight := lastStoredHeightFor(height, valInfo.LastHeightChanged) + lastRecordedValSetHeight := lastStoredHeightFor(retainHeight, valInfo.LastHeightChanged) lastRecordedValSet, err := loadValidatorsInfo(store.db, lastRecordedValSetHeight) if err != nil || lastRecordedValSet.ValidatorSet == nil { return fmt.Errorf("couldn't find validators at height %d (height %d was originally requested): %w", - lastStoredHeightFor(height, valInfo.LastHeightChanged), - height, + lastStoredHeightFor(retainHeight, valInfo.LastHeightChanged), + retainHeight, err, ) } - // batch delete all the validators sets up to height - return store.batchDelete( + // if this is not equal to the retain height, prune from the retain height to the height above + // the last saved validator set. This way we can skip over the dependent validator set. + if lastRecordedValSetHeight < retainHeight { + err := store.pruneRange( + validatorsKey(lastRecordedValSetHeight+1), + validatorsKey(retainHeight), + ) + if err != nil { + return err + } + } + + // prune all the validators sets up to last saved validator set + return store.pruneRange( validatorsKey(1), - validatorsKey(height), validatorsKey(lastRecordedValSetHeight), ) } // pruneConsensusParams calls a reverse iterator from base height to retain height batch deleting // all consensus params in between. If the consensus params at the new base height is dependent -// on a prior height then this will keep that lower height to. +// on a prior height then this will keep that lower height too. func (store dbStore) pruneConsensusParams(retainHeight int64) error { paramsInfo, err := store.loadConsensusParamsInfo(retainHeight) if err != nil { @@ -298,21 +326,31 @@ func (store dbStore) pruneConsensusParams(retainHeight int64) error { // we must not prune (or save) the last consensus params that the consensus params info at height // is dependent on. if paramsInfo.ConsensusParams.Equal(&tmproto.ConsensusParams{}) { + // sanity check that the consensus params at the last height it was changed is there lastRecordedConsensusParams, err := store.loadConsensusParamsInfo(paramsInfo.LastHeightChanged) if err != nil || lastRecordedConsensusParams.ConsensusParams.Equal(&tmproto.ConsensusParams{}) { return fmt.Errorf( - "couldn't find consensus params at height %d as last changed from height %d: %w", + "couldn't find consensus params at height %d (height %d was originally requested): %w", paramsInfo.LastHeightChanged, retainHeight, err, ) } + + // prune the params above the height with which it last changed and below the retain height. + err = store.pruneRange( + consensusParamsKey(paramsInfo.LastHeightChanged+1), + consensusParamsKey(retainHeight), + ) + if err != nil { + return err + } } - // batch delete all the consensus params up to the retain height - return store.batchDelete( + // prune all the consensus params up to either the last height the params changed or if the params + // last changed at the retain height, then up to the retain height. + return store.pruneRange( consensusParamsKey(1), - consensusParamsKey(retainHeight), consensusParamsKey(paramsInfo.LastHeightChanged), ) } @@ -320,72 +358,69 @@ func (store dbStore) pruneConsensusParams(retainHeight int64) error { // pruneABCIResponses calls a reverse iterator from base height to retain height batch deleting // all abci responses in between func (store dbStore) pruneABCIResponses(height int64) error { - return store.batchDelete(abciResponsesKey(1), abciResponsesKey(height), nil) + return store.pruneRange(abciResponsesKey(1), abciResponsesKey(height)) } -// batchDelete is a generic function for deleting a range of keys in reverse order. It will -// skip keys that have been -func (store dbStore) batchDelete(start []byte, end []byte, exception []byte) error { - iter, err := store.db.ReverseIterator(start, end) - if err != nil { - return fmt.Errorf("iterator error: %w", err) - } - defer iter.Close() - +// pruneRange is a generic function for deleting a range of keys in reverse order. +// we keep filling up batches of at most 1000 keys, perform a deletion and continue until +// we have gone through all of keys in the range. This avoids doing any writes whilst +// iterating. +func (store dbStore) pruneRange(start []byte, end []byte) error { + var err error batch := store.db.NewBatch() defer batch.Close() - pruned := 0 - for iter.Valid() { - key := iter.Key() - if bytes.Equal(key, exception) { - iter.Next() - continue + end, err = store.reverseBatchDelete(batch, start, end) + if err != nil { + return err + } + + // iterate until the last batch of the pruning range in which case we will perform a + // write sync + for !bytes.Equal(start, end) { + if err := batch.Write(); err != nil { + return err } - if err := batch.Delete(key); err != nil { - return fmt.Errorf("pruning error at key %X: %w", key, err) + if err := batch.Close(); err != nil { + return err } - pruned++ - // avoid batches growing too large by flushing to disk regularly - if pruned%1000 == 0 { - if err := iter.Error(); err != nil { - return err - } - if err := iter.Close(); err != nil { - return err - } - - if err := batch.Write(); err != nil { - return fmt.Errorf("pruning error at key %X: %w", key, err) - } - if err := batch.Close(); err != nil { - return err - } - - iter, err = store.db.ReverseIterator(start, end) - if err != nil { - return fmt.Errorf("iterator error: %w", err) - } - defer iter.Close() - - batch = store.db.NewBatch() - defer batch.Close() - } else { - iter.Next() + batch = store.db.NewBatch() + + // fill a new batch of keys for deletion over the remainding range + end, err = store.reverseBatchDelete(batch, start, end) + if err != nil { + return err } } - if err := iter.Error(); err != nil { - return fmt.Errorf("iterator error: %w", err) - } + return batch.WriteSync() +} - if err := batch.WriteSync(); err != nil { - return err +// reverseBatchDelete runs a reverse iterator (from end to start) filling up a batch until either +// (a) the iterator reaches the start or (b) the iterator has added a 1000 keys (this avoids the +// batch from growing too large) +func (store dbStore) reverseBatchDelete(batch dbm.Batch, start, end []byte) ([]byte, error) { + iter, err := store.db.ReverseIterator(start, end) + if err != nil { + return end, fmt.Errorf("iterator error: %w", err) } + defer iter.Close() - return nil + size := 0 + for ; iter.Valid(); iter.Next() { + if err := batch.Delete(iter.Key()); err != nil { + return end, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err) + } + + // avoid batches growing too large by capping them + size++ + if size == 1000 { + return iter.Key(), iter.Error() + } + } + return start, iter.Error() } //------------------------------------------------------------------------ @@ -584,7 +619,7 @@ func (store dbStore) LoadConsensusParams(height int64) (types.ConsensusParams, e paramsInfo2, err := store.loadConsensusParamsInfo(paramsInfo.LastHeightChanged) if err != nil { return empty, fmt.Errorf( - "couldn't find consensus params at height %d as last changed from height %d: %w", + "couldn't find consensus params at height %d (height %d was originally requested): %w", paramsInfo.LastHeightChanged, height, err, diff --git a/state/store_test.go b/state/store_test.go index b51a25739..927f3ff45 100644 --- a/state/store_test.go +++ b/state/store_test.go @@ -159,26 +159,28 @@ func TestStoreLoadConsensusParams(t *testing.T) { func TestPruneStates(t *testing.T) { testcases := map[string]struct { - makeHeights int64 - pruneHeight int64 - expectErr bool - expectVals []int64 - expectParams []int64 - expectABCI []int64 + startHeight int64 + endHeight int64 + pruneHeight int64 + expectErr bool + remainingValSetHeight int64 + remainingParamsHeight int64 }{ - "error when prune height is 0": {100, 0, true, nil, nil, nil}, - "error when prune height is negative": {100, -10, true, nil, nil, nil}, - "error when prune height does not exist": {100, 101, true, nil, nil, nil}, - "prune all": {100, 100, false, []int64{93, 100}, []int64{95, 100}, []int64{100}}, - "prune some": {10, 8, false, []int64{3, 8, 9, 10}, - []int64{5, 8, 9, 10}, []int64{8, 9, 10}}, - "prune across checkpoint": {100002, 100002, false, []int64{100000, 100002}, - []int64{99995, 100002}, []int64{100002}}, + "error when prune height is 0": {1, 100, 0, true, 0, 0}, + "error when prune height is negative": {1, 100, -10, true, 0, 0}, + "error when prune height does not exist": {1, 100, 101, true, 0, 0}, + "prune all": {1, 100, 100, false, 93, 95}, + "prune from non 1 height": {10, 50, 40, false, 33, 35}, + "prune some": {1, 10, 8, false, 3, 5}, + // we test this because we flush to disk every 1000 "states" + "prune more than 1000 state": {1, 1010, 1010, false, 1003, 1005}, + "prune across checkpoint": {99900, 100002, 100002, false, 100000, 99995}, } for name, tc := range testcases { tc := tc t.Run(name, func(t *testing.T) { db := dbm.NewMemDB() + stateStore := sm.NewStore(db) pk := ed25519.GenPrivKey().PubKey() @@ -192,7 +194,7 @@ func TestPruneStates(t *testing.T) { valsChanged := int64(0) paramsChanged := int64(0) - for h := int64(1); h <= tc.makeHeights; h++ { + for h := tc.startHeight; h <= tc.endHeight; h++ { if valsChanged == 0 || h%10 == 2 { valsChanged = h + 1 // Have to add 1, since NextValidators is what's stored } @@ -237,36 +239,44 @@ func TestPruneStates(t *testing.T) { } require.NoError(t, err) - expectVals := sliceToMap(tc.expectVals) - expectParams := sliceToMap(tc.expectParams) - expectABCI := sliceToMap(tc.expectABCI) + for h := tc.pruneHeight; h <= tc.endHeight; h++ { + vals, err := stateStore.LoadValidators(h) + require.NoError(t, err, h) + require.NotNil(t, vals, h) + + params, err := stateStore.LoadConsensusParams(h) + require.NoError(t, err, h) + require.NotNil(t, params, h) + + abci, err := stateStore.LoadABCIResponses(h) + require.NoError(t, err, h) + require.NotNil(t, abci, h) + } - for h := int64(1); h <= tc.makeHeights; h++ { + emptyParams := types.ConsensusParams{} + + for h := tc.startHeight; h < tc.pruneHeight; h++ { vals, err := stateStore.LoadValidators(h) - if expectVals[h] { - require.NoError(t, err, "validators height %v", h) - require.NotNil(t, vals) + if h == tc.remainingValSetHeight { + require.NoError(t, err, h) + require.NotNil(t, vals, h) } else { - require.Error(t, err, "validators height %v", h) - require.Equal(t, sm.ErrNoValSetForHeight{Height: h}, err) + require.Error(t, err, h) + require.Nil(t, vals, h) } params, err := stateStore.LoadConsensusParams(h) - if expectParams[h] { - require.NoError(t, err, "params height %v", h) - require.False(t, params.Equals(&types.ConsensusParams{}), "params should not be empty") + if h == tc.remainingParamsHeight { + require.NoError(t, err, h) + require.NotEqual(t, emptyParams, params, h) } else { - require.Error(t, err, "params height %v", h) + require.Error(t, err, h) + require.Equal(t, emptyParams, params, h) } abci, err := stateStore.LoadABCIResponses(h) - if expectABCI[h] { - require.NoError(t, err, "abci height %v", h) - require.NotNil(t, abci) - } else { - require.Error(t, err, "abci height %v", h) - require.Equal(t, sm.ErrNoABCIResponsesForHeight{Height: h}, err) - } + require.Error(t, err, h) + require.Nil(t, abci, h) } }) } @@ -293,11 +303,3 @@ func TestABCIResponsesResultsHash(t *testing.T) { require.NoError(t, err) assert.NoError(t, proof.Verify(root, bz)) } - -func sliceToMap(s []int64) map[int64]bool { - m := make(map[int64]bool, len(s)) - for _, i := range s { - m[i] = true - } - return m -} diff --git a/store/store.go b/store/store.go index 6d1ac870f..ed335c3c8 100644 --- a/store/store.go +++ b/store/store.go @@ -1,6 +1,7 @@ package store import ( + "bytes" "fmt" "strconv" @@ -315,99 +316,111 @@ func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) { // remove block meta first as this is used to indicate whether the block exists. // For this reason, we also use ony block meta as a measure of the amount of blocks pruned - pruned, err := bs.batchDelete(blockMetaKey(0), blockMetaKey(height), removeBlockHash) + pruned, err := bs.pruneRange(blockMetaKey(0), blockMetaKey(height), removeBlockHash) if err != nil { return pruned, err } - if _, err := bs.batchDelete(blockPartKey(0, 0), blockPartKey(height, 0), nil); err != nil { + if _, err := bs.pruneRange(blockPartKey(0, 0), blockPartKey(height, 0), nil); err != nil { return pruned, err } - if _, err := bs.batchDelete(blockCommitKey(0), blockCommitKey(height), nil); err != nil { + if _, err := bs.pruneRange(blockCommitKey(0), blockCommitKey(height), nil); err != nil { return pruned, err } - if _, err := bs.batchDelete(seenCommitKey(0), seenCommitKey(height), nil); err != nil { + if _, err := bs.pruneRange(seenCommitKey(0), seenCommitKey(height), nil); err != nil { return pruned, err } return pruned, nil } -// batchDelete is a generic function for deleting a range of values based on the lowest +// pruneRange is a generic function for deleting a range of values based on the lowest // height up to but excluding retainHeight. For each key/value pair, an optional hook can be -// executed before the deletion itself is made -func (bs *BlockStore) batchDelete( +// executed before the deletion itself is made. pruneRange will use batch delete to delete +// keys in batches of at most 1000 keys. +func (bs *BlockStore) pruneRange( start []byte, end []byte, preDeletionHook func(key, value []byte, batch dbm.Batch) error, ) (uint64, error) { + var ( + err error + pruned uint64 + totalPruned uint64 = 0 + ) + + batch := bs.db.NewBatch() + defer batch.Close() + + pruned, start, err = bs.batchDelete(batch, start, end, preDeletionHook) + if err != nil { + return totalPruned, err + } + + // loop until we have finished iterating over all the keys by writing, opening a new batch + // and incrementing through the next range of keys. + for !bytes.Equal(start, end) { + if err := batch.Write(); err != nil { + return totalPruned, err + } + + totalPruned += pruned + + if err := batch.Close(); err != nil { + return totalPruned, err + } + + batch = bs.db.NewBatch() + + pruned, start, err = bs.batchDelete(batch, start, end, preDeletionHook) + if err != nil { + return totalPruned, err + } + } + + // once we looped over all keys we do a final flush to disk + if err := batch.WriteSync(); err != nil { + return totalPruned, err + } + totalPruned += pruned + return totalPruned, nil +} + +// batchDelete runs an iterator over a set of keys, first preforming a pre deletion hook before adding it to the batch. +// The function ends when either 1000 keys have been added to the batch or the iterator has reached the end. +func (bs *BlockStore) batchDelete( + batch dbm.Batch, + start, end []byte, + preDeletionHook func(key, value []byte, batch dbm.Batch) error, +) (uint64, []byte, error) { + var pruned uint64 = 0 iter, err := bs.db.Iterator(start, end) if err != nil { - panic(err) + return pruned, start, err } defer iter.Close() - batch := bs.db.NewBatch() - defer batch.Close() - - pruned := uint64(0) - flushed := pruned - for iter.Valid() { + for ; iter.Valid(); iter.Next() { key := iter.Key() if preDeletionHook != nil { if err := preDeletionHook(key, iter.Value(), batch); err != nil { - return flushed, err + return 0, start, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err) } } if err := batch.Delete(key); err != nil { - return flushed, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err) + return 0, start, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err) } pruned++ - // avoid batches growing too large by flushing to database regularly - if pruned%1000 == 0 { - if err := iter.Error(); err != nil { - return flushed, err - } - if err := iter.Close(); err != nil { - return flushed, err - } - - err := batch.Write() - if err != nil { - return flushed, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err) - } - if err := batch.Close(); err != nil { - return flushed, err - } - flushed = pruned - - iter, err = bs.db.Iterator(start, end) - if err != nil { - panic(err) - } - defer iter.Close() - - batch = bs.db.NewBatch() - defer batch.Close() - } else { - iter.Next() + if pruned == 1000 { + return pruned, iter.Key(), iter.Error() } } - flushed = pruned - if err := iter.Error(); err != nil { - return flushed, err - } - - err = batch.WriteSync() - if err != nil { - return flushed, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err) - } - return flushed, nil + return pruned, end, iter.Error() } // SaveBlock persists the given block, blockParts, and seenCommit to the underlying db.