|
|
@ -8,8 +8,6 @@ import ( |
|
|
|
"github.com/google/orderedcode" |
|
|
|
dbm "github.com/tendermint/tm-db" |
|
|
|
|
|
|
|
tmsync "github.com/tendermint/tendermint/libs/sync" |
|
|
|
tmstore "github.com/tendermint/tendermint/proto/tendermint/store" |
|
|
|
tmproto "github.com/tendermint/tendermint/proto/tendermint/types" |
|
|
|
"github.com/tendermint/tendermint/types" |
|
|
|
) |
|
|
@ -33,60 +31,98 @@ The store can be assumed to contain all contiguous blocks between base and heigh |
|
|
|
*/ |
|
|
|
type BlockStore struct { |
|
|
|
db dbm.DB |
|
|
|
|
|
|
|
// mtx guards access to the struct fields listed below it. We rely on the database to enforce
|
|
|
|
// fine-grained concurrency control for its data, and thus this mutex does not apply to
|
|
|
|
// database contents. The only reason for keeping these fields in the struct is that the data
|
|
|
|
// can't efficiently be queried from the database since the key encoding we use is not
|
|
|
|
// lexicographically ordered (see https://github.com/tendermint/tendermint/issues/4567).
|
|
|
|
mtx tmsync.RWMutex |
|
|
|
base int64 |
|
|
|
height int64 |
|
|
|
} |
|
|
|
|
|
|
|
// NewBlockStore returns a new BlockStore with the given DB,
|
|
|
|
// initialized to the last height that was committed to the DB.
|
|
|
|
func NewBlockStore(db dbm.DB) *BlockStore { |
|
|
|
bs := LoadBlockStoreState(db) |
|
|
|
return &BlockStore{ |
|
|
|
base: bs.Base, |
|
|
|
height: bs.Height, |
|
|
|
db: db, |
|
|
|
} |
|
|
|
return &BlockStore{db} |
|
|
|
} |
|
|
|
|
|
|
|
// Base returns the first known contiguous block height, or 0 for empty block stores.
|
|
|
|
func (bs *BlockStore) Base() int64 { |
|
|
|
bs.mtx.RLock() |
|
|
|
defer bs.mtx.RUnlock() |
|
|
|
return bs.base |
|
|
|
iter, err := bs.db.Iterator( |
|
|
|
blockMetaKey(1), |
|
|
|
blockMetaKey(1<<63-1), |
|
|
|
) |
|
|
|
if err != nil { |
|
|
|
panic(err) |
|
|
|
} |
|
|
|
defer iter.Close() |
|
|
|
|
|
|
|
if iter.Valid() { |
|
|
|
height, err := decodeBlockMetaKey(iter.Key()) |
|
|
|
if err == nil { |
|
|
|
return height |
|
|
|
} |
|
|
|
} |
|
|
|
if err := iter.Error(); err != nil { |
|
|
|
panic(err) |
|
|
|
} |
|
|
|
|
|
|
|
return 0 |
|
|
|
} |
|
|
|
|
|
|
|
// Height returns the last known contiguous block height, or 0 for empty block stores.
|
|
|
|
func (bs *BlockStore) Height() int64 { |
|
|
|
bs.mtx.RLock() |
|
|
|
defer bs.mtx.RUnlock() |
|
|
|
return bs.height |
|
|
|
iter, err := bs.db.ReverseIterator( |
|
|
|
blockMetaKey(1), |
|
|
|
blockMetaKey(1<<63-1), |
|
|
|
) |
|
|
|
if err != nil { |
|
|
|
panic(err) |
|
|
|
} |
|
|
|
defer iter.Close() |
|
|
|
|
|
|
|
if iter.Valid() { |
|
|
|
height, err := decodeBlockMetaKey(iter.Key()) |
|
|
|
if err == nil { |
|
|
|
return height |
|
|
|
} |
|
|
|
} |
|
|
|
if err := iter.Error(); err != nil { |
|
|
|
panic(err) |
|
|
|
} |
|
|
|
|
|
|
|
return 0 |
|
|
|
} |
|
|
|
|
|
|
|
// Size returns the number of blocks in the block store.
|
|
|
|
func (bs *BlockStore) Size() int64 { |
|
|
|
bs.mtx.RLock() |
|
|
|
defer bs.mtx.RUnlock() |
|
|
|
if bs.height == 0 { |
|
|
|
height := bs.Height() |
|
|
|
if height == 0 { |
|
|
|
return 0 |
|
|
|
} |
|
|
|
return bs.height - bs.base + 1 |
|
|
|
return height + 1 - bs.Base() |
|
|
|
} |
|
|
|
|
|
|
|
// LoadBase atomically loads the base block meta, or returns nil if no base is found.
|
|
|
|
func (bs *BlockStore) LoadBaseMeta() *types.BlockMeta { |
|
|
|
bs.mtx.RLock() |
|
|
|
defer bs.mtx.RUnlock() |
|
|
|
if bs.base == 0 { |
|
|
|
iter, err := bs.db.Iterator( |
|
|
|
blockMetaKey(1), |
|
|
|
blockMetaKey(1<<63-1), |
|
|
|
) |
|
|
|
if err != nil { |
|
|
|
return nil |
|
|
|
} |
|
|
|
return bs.LoadBlockMeta(bs.base) |
|
|
|
defer iter.Close() |
|
|
|
|
|
|
|
if iter.Valid() { |
|
|
|
var pbbm = new(tmproto.BlockMeta) |
|
|
|
err = proto.Unmarshal(iter.Value(), pbbm) |
|
|
|
if err != nil { |
|
|
|
panic(fmt.Errorf("unmarshal to tmproto.BlockMeta: %w", err)) |
|
|
|
} |
|
|
|
|
|
|
|
blockMeta, err := types.BlockMetaFromProto(pbbm) |
|
|
|
if err != nil { |
|
|
|
panic(fmt.Errorf("error from proto blockMeta: %w", err)) |
|
|
|
} |
|
|
|
|
|
|
|
return blockMeta |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// LoadBlock returns the block with the given height.
|
|
|
@ -245,82 +281,133 @@ func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit { |
|
|
|
return commit |
|
|
|
} |
|
|
|
|
|
|
|
// PruneBlocks removes block up to (but not including) a height. It returns number of blocks pruned.
|
|
|
|
// PruneBlocks removes block up to (but not including) a height. It returns the number of blocks pruned.
|
|
|
|
func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) { |
|
|
|
if height <= 0 { |
|
|
|
return 0, fmt.Errorf("height must be greater than 0") |
|
|
|
} |
|
|
|
bs.mtx.RLock() |
|
|
|
if height > bs.height { |
|
|
|
bs.mtx.RUnlock() |
|
|
|
return 0, fmt.Errorf("cannot prune beyond the latest height %v", bs.height) |
|
|
|
} |
|
|
|
base := bs.base |
|
|
|
bs.mtx.RUnlock() |
|
|
|
if height < base { |
|
|
|
return 0, fmt.Errorf("cannot prune to height %v, it is lower than base height %v", |
|
|
|
height, base) |
|
|
|
|
|
|
|
if height > bs.Height() { |
|
|
|
return 0, fmt.Errorf("height must be equal to or less than the latest height %d", bs.Height()) |
|
|
|
} |
|
|
|
|
|
|
|
pruned := uint64(0) |
|
|
|
batch := bs.db.NewBatch() |
|
|
|
defer batch.Close() |
|
|
|
flush := func(batch dbm.Batch, base int64) error { |
|
|
|
// We can't trust batches to be atomic, so update base first to make sure noone
|
|
|
|
// tries to access missing blocks.
|
|
|
|
bs.mtx.Lock() |
|
|
|
bs.base = base |
|
|
|
bs.mtx.Unlock() |
|
|
|
bs.saveState() |
|
|
|
|
|
|
|
err := batch.WriteSync() |
|
|
|
// when removing the block meta, use the hash to remove the hash key at the same time
|
|
|
|
removeBlockHash := func(key, value []byte, batch dbm.Batch) error { |
|
|
|
// unmarshal block meta
|
|
|
|
var pbbm = new(tmproto.BlockMeta) |
|
|
|
err := proto.Unmarshal(value, pbbm) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to prune up to height %v: %w", base, err) |
|
|
|
return fmt.Errorf("unmarshal to tmproto.BlockMeta: %w", err) |
|
|
|
} |
|
|
|
batch.Close() |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
for h := base; h < height; h++ { |
|
|
|
meta := bs.LoadBlockMeta(h) |
|
|
|
if meta == nil { // assume already deleted
|
|
|
|
continue |
|
|
|
} |
|
|
|
if err := batch.Delete(blockMetaKey(h)); err != nil { |
|
|
|
return 0, err |
|
|
|
} |
|
|
|
if err := batch.Delete(blockHashKey(meta.BlockID.Hash)); err != nil { |
|
|
|
return 0, err |
|
|
|
} |
|
|
|
if err := batch.Delete(blockCommitKey(h)); err != nil { |
|
|
|
return 0, err |
|
|
|
blockMeta, err := types.BlockMetaFromProto(pbbm) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("error from proto blockMeta: %w", err) |
|
|
|
} |
|
|
|
if err := batch.Delete(seenCommitKey(h)); err != nil { |
|
|
|
return 0, err |
|
|
|
|
|
|
|
// delete the hash key corresponding to the block meta's hash
|
|
|
|
if err := batch.Delete(blockHashKey(blockMeta.BlockID.Hash)); err != nil { |
|
|
|
return fmt.Errorf("failed to delete hash key: %X: %w", blockHashKey(blockMeta.BlockID.Hash), err) |
|
|
|
} |
|
|
|
for p := 0; p < int(meta.BlockID.PartSetHeader.Total); p++ { |
|
|
|
if err := batch.Delete(blockPartKey(h, p)); err != nil { |
|
|
|
return 0, err |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// remove block meta first as this is used to indicate whether the block exists.
|
|
|
|
// For this reason, we also use ony block meta as a measure of the amount of blocks pruned
|
|
|
|
pruned, err := bs.batchDelete(blockMetaKey(0), blockMetaKey(height), removeBlockHash) |
|
|
|
if err != nil { |
|
|
|
return pruned, err |
|
|
|
} |
|
|
|
|
|
|
|
if _, err := bs.batchDelete(blockPartKey(0, 0), blockPartKey(height, 0), nil); err != nil { |
|
|
|
return pruned, err |
|
|
|
} |
|
|
|
|
|
|
|
if _, err := bs.batchDelete(blockCommitKey(0), blockCommitKey(height), nil); err != nil { |
|
|
|
return pruned, err |
|
|
|
} |
|
|
|
|
|
|
|
if _, err := bs.batchDelete(seenCommitKey(0), seenCommitKey(height), nil); err != nil { |
|
|
|
return pruned, err |
|
|
|
} |
|
|
|
|
|
|
|
return pruned, nil |
|
|
|
} |
|
|
|
|
|
|
|
// batchDelete is a generic function for deleting a range of values based on the lowest
|
|
|
|
// height up to but excluding retainHeight. For each key/value pair, an optional hook can be
|
|
|
|
// executed before the deletion itself is made
|
|
|
|
func (bs *BlockStore) batchDelete( |
|
|
|
start []byte, |
|
|
|
end []byte, |
|
|
|
preDeletionHook func(key, value []byte, batch dbm.Batch) error, |
|
|
|
) (uint64, error) { |
|
|
|
iter, err := bs.db.Iterator(start, end) |
|
|
|
if err != nil { |
|
|
|
panic(err) |
|
|
|
} |
|
|
|
defer iter.Close() |
|
|
|
|
|
|
|
batch := bs.db.NewBatch() |
|
|
|
defer batch.Close() |
|
|
|
|
|
|
|
pruned := uint64(0) |
|
|
|
flushed := pruned |
|
|
|
for iter.Valid() { |
|
|
|
key := iter.Key() |
|
|
|
if preDeletionHook != nil { |
|
|
|
if err := preDeletionHook(key, iter.Value(), batch); err != nil { |
|
|
|
return flushed, err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if err := batch.Delete(key); err != nil { |
|
|
|
return flushed, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err) |
|
|
|
} |
|
|
|
|
|
|
|
pruned++ |
|
|
|
// avoid batches growing too large by flushing to database regularly
|
|
|
|
if pruned%1000 == 0 { |
|
|
|
if err := iter.Error(); err != nil { |
|
|
|
return flushed, err |
|
|
|
} |
|
|
|
if err := iter.Close(); err != nil { |
|
|
|
return flushed, err |
|
|
|
} |
|
|
|
|
|
|
|
// flush every 1000 blocks to avoid batches becoming too large
|
|
|
|
if pruned%1000 == 0 && pruned > 0 { |
|
|
|
err := flush(batch, h) |
|
|
|
err := batch.Write() |
|
|
|
if err != nil { |
|
|
|
return 0, err |
|
|
|
return flushed, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err) |
|
|
|
} |
|
|
|
if err := batch.Close(); err != nil { |
|
|
|
return flushed, err |
|
|
|
} |
|
|
|
flushed = pruned |
|
|
|
|
|
|
|
iter, err = bs.db.Iterator(start, end) |
|
|
|
if err != nil { |
|
|
|
panic(err) |
|
|
|
} |
|
|
|
defer iter.Close() |
|
|
|
|
|
|
|
batch = bs.db.NewBatch() |
|
|
|
defer batch.Close() |
|
|
|
} else { |
|
|
|
iter.Next() |
|
|
|
} |
|
|
|
} |
|
|
|
flushed = pruned |
|
|
|
if err := iter.Error(); err != nil { |
|
|
|
return flushed, err |
|
|
|
} |
|
|
|
|
|
|
|
err := flush(batch, height) |
|
|
|
err = batch.WriteSync() |
|
|
|
if err != nil { |
|
|
|
return 0, err |
|
|
|
return flushed, fmt.Errorf("pruning error at key %X: %w", iter.Key(), err) |
|
|
|
} |
|
|
|
return pruned, nil |
|
|
|
|
|
|
|
return flushed, nil |
|
|
|
} |
|
|
|
|
|
|
|
// SaveBlock persists the given block, blockParts, and seenCommit to the underlying db.
|
|
|
@ -378,20 +465,10 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s |
|
|
|
// NOTE: we can delete this at a later height
|
|
|
|
pbsc := seenCommit.ToProto() |
|
|
|
seenCommitBytes := mustEncode(pbsc) |
|
|
|
if err := bs.db.Set(seenCommitKey(height), seenCommitBytes); err != nil { |
|
|
|
if err := bs.db.SetSync(seenCommitKey(height), seenCommitBytes); err != nil { |
|
|
|
panic(err) |
|
|
|
} |
|
|
|
|
|
|
|
// Done!
|
|
|
|
bs.mtx.Lock() |
|
|
|
bs.height = height |
|
|
|
if bs.base == 0 { |
|
|
|
bs.base = height |
|
|
|
} |
|
|
|
bs.mtx.Unlock() |
|
|
|
|
|
|
|
// Save new BlockStoreState descriptor. This also flushes the database.
|
|
|
|
bs.saveState() |
|
|
|
} |
|
|
|
|
|
|
|
func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) { |
|
|
@ -405,16 +482,6 @@ func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (bs *BlockStore) saveState() { |
|
|
|
bs.mtx.RLock() |
|
|
|
bss := tmstore.BlockStoreState{ |
|
|
|
Base: bs.base, |
|
|
|
Height: bs.height, |
|
|
|
} |
|
|
|
bs.mtx.RUnlock() |
|
|
|
SaveBlockStoreState(&bss, bs.db) |
|
|
|
} |
|
|
|
|
|
|
|
// SaveSeenCommit saves a seen commit, used by e.g. the state sync reactor when bootstrapping node.
|
|
|
|
func (bs *BlockStore) SaveSeenCommit(height int64, seenCommit *types.Commit) error { |
|
|
|
pbc := seenCommit.ToProto() |
|
|
@ -445,6 +512,21 @@ func blockMetaKey(height int64) []byte { |
|
|
|
return key |
|
|
|
} |
|
|
|
|
|
|
|
func decodeBlockMetaKey(key []byte) (height int64, err error) { |
|
|
|
var prefix int64 |
|
|
|
remaining, err := orderedcode.Parse(string(key), &prefix, &height) |
|
|
|
if err != nil { |
|
|
|
return |
|
|
|
} |
|
|
|
if len(remaining) != 0 { |
|
|
|
return -1, fmt.Errorf("expected complete key but got remainder: %s", remaining) |
|
|
|
} |
|
|
|
if prefix != prefixBlockMeta { |
|
|
|
return -1, fmt.Errorf("incorrect prefix. Expected %v, got %v", prefixBlockMeta, prefix) |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
func blockPartKey(height int64, partIndex int) []byte { |
|
|
|
key, err := orderedcode.Append(nil, prefixBlockPart, height, int64(partIndex)) |
|
|
|
if err != nil { |
|
|
@ -479,46 +561,6 @@ func blockHashKey(hash []byte) []byte { |
|
|
|
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
var blockStoreKey = []byte("blockStore") |
|
|
|
|
|
|
|
// SaveBlockStoreState persists the blockStore state to the database.
|
|
|
|
func SaveBlockStoreState(bsj *tmstore.BlockStoreState, db dbm.DB) { |
|
|
|
bytes, err := proto.Marshal(bsj) |
|
|
|
if err != nil { |
|
|
|
panic(fmt.Sprintf("Could not marshal state bytes: %v", err)) |
|
|
|
} |
|
|
|
if err := db.SetSync(blockStoreKey, bytes); err != nil { |
|
|
|
panic(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// LoadBlockStoreState returns the BlockStoreState as loaded from disk.
|
|
|
|
// If no BlockStoreState was previously persisted, it returns the zero value.
|
|
|
|
func LoadBlockStoreState(db dbm.DB) tmstore.BlockStoreState { |
|
|
|
bytes, err := db.Get(blockStoreKey) |
|
|
|
if err != nil { |
|
|
|
panic(err) |
|
|
|
} |
|
|
|
|
|
|
|
if len(bytes) == 0 { |
|
|
|
return tmstore.BlockStoreState{ |
|
|
|
Base: 0, |
|
|
|
Height: 0, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
var bsj tmstore.BlockStoreState |
|
|
|
if err := proto.Unmarshal(bytes, &bsj); err != nil { |
|
|
|
panic(fmt.Sprintf("Could not unmarshal bytes: %X", bytes)) |
|
|
|
} |
|
|
|
|
|
|
|
// Backwards compatibility with persisted data from before Base existed.
|
|
|
|
if bsj.Height > 0 && bsj.Base == 0 { |
|
|
|
bsj.Base = 1 |
|
|
|
} |
|
|
|
return bsj |
|
|
|
} |
|
|
|
|
|
|
|
// mustEncode proto encodes a proto.message and panics if fails
|
|
|
|
func mustEncode(pb proto.Message) []byte { |
|
|
|
bz, err := proto.Marshal(pb) |
|
|
|