|
|
- package kv
-
- import (
- "context"
- "errors"
- "fmt"
- "sort"
- "strconv"
- "strings"
-
- "github.com/google/orderedcode"
- dbm "github.com/tendermint/tm-db"
-
- abci "github.com/tendermint/tendermint/abci/types"
- "github.com/tendermint/tendermint/libs/pubsub/query"
- "github.com/tendermint/tendermint/state/indexer"
- "github.com/tendermint/tendermint/types"
- )
-
- var _ indexer.BlockIndexer = (*BlockerIndexer)(nil)
-
- // BlockerIndexer implements a block indexer, indexing BeginBlock and EndBlock
- // events with an underlying KV store. Block events are indexed by their height,
- // such that matching search criteria returns the respective block height(s).
- type BlockerIndexer struct {
- store dbm.DB
- }
-
- func New(store dbm.DB) *BlockerIndexer {
- return &BlockerIndexer{
- store: store,
- }
- }
-
- // Has returns true if the given height has been indexed. An error is returned
- // upon database query failure.
- func (idx *BlockerIndexer) Has(height int64) (bool, error) {
- key, err := heightKey(height)
- if err != nil {
- return false, fmt.Errorf("failed to create block height index key: %w", err)
- }
-
- return idx.store.Has(key)
- }
-
- // Index indexes BeginBlock and EndBlock events for a given block by its height.
- // The following is indexed:
- //
- // primary key: encode(block.height | height) => encode(height)
- // BeginBlock events: encode(eventType.eventAttr|eventValue|height|begin_block) => encode(height)
- // EndBlock events: encode(eventType.eventAttr|eventValue|height|end_block) => encode(height)
- func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockHeader) error {
- batch := idx.store.NewBatch()
- defer batch.Close()
-
- height := bh.Header.Height
-
- // 1. index by height
- key, err := heightKey(height)
- if err != nil {
- return fmt.Errorf("failed to create block height index key: %w", err)
- }
- if err := batch.Set(key, int64ToBytes(height)); err != nil {
- return err
- }
-
- // 2. index BeginBlock events
- if err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height); err != nil {
- return fmt.Errorf("failed to index BeginBlock events: %w", err)
- }
-
- // 3. index EndBlock events
- if err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height); err != nil {
- return fmt.Errorf("failed to index EndBlock events: %w", err)
- }
-
- return batch.WriteSync()
- }
-
- // Search performs a query for block heights that match a given BeginBlock
- // and Endblock event search criteria. The given query can match against zero,
- // one or more block heights. In the case of height queries, i.e. block.height=H,
- // if the height is indexed, that height alone will be returned. An error and
- // nil slice is returned. Otherwise, a non-nil slice and nil error is returned.
- func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, error) {
- results := make([]int64, 0)
- select {
- case <-ctx.Done():
- return results, nil
-
- default:
- }
-
- conditions, err := q.Conditions()
- if err != nil {
- return nil, fmt.Errorf("failed to parse query conditions: %w", err)
- }
-
- // If there is an exact height query, return the result immediately
- // (if it exists).
- height, ok := lookForHeight(conditions)
- if ok {
- ok, err := idx.Has(height)
- if err != nil {
- return nil, err
- }
-
- if ok {
- return []int64{height}, nil
- }
-
- return results, nil
- }
-
- var heightsInitialized bool
- filteredHeights := make(map[string][]byte)
-
- // conditions to skip because they're handled before "everything else"
- skipIndexes := make([]int, 0)
-
- // Extract ranges. If both upper and lower bounds exist, it's better to get
- // them in order as to not iterate over kvs that are not within range.
- ranges, rangeIndexes := indexer.LookForRanges(conditions)
- if len(ranges) > 0 {
- skipIndexes = append(skipIndexes, rangeIndexes...)
-
- for _, qr := range ranges {
- prefix, err := orderedcode.Append(nil, qr.Key)
- if err != nil {
- return nil, fmt.Errorf("failed to create prefix key: %w", err)
- }
-
- if !heightsInitialized {
- filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, true)
- if err != nil {
- return nil, err
- }
-
- heightsInitialized = true
-
- // Ignore any remaining conditions if the first condition resulted in no
- // matches (assuming implicit AND operand).
- if len(filteredHeights) == 0 {
- break
- }
- } else {
- filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, false)
- if err != nil {
- return nil, err
- }
- }
- }
- }
-
- // for all other conditions
- for i, c := range conditions {
- if intInSlice(i, skipIndexes) {
- continue
- }
-
- startKey, err := orderedcode.Append(nil, c.CompositeKey, fmt.Sprintf("%v", c.Operand))
- if err != nil {
- return nil, err
- }
-
- if !heightsInitialized {
- filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, true)
- if err != nil {
- return nil, err
- }
-
- heightsInitialized = true
-
- // Ignore any remaining conditions if the first condition resulted in no
- // matches (assuming implicit AND operand).
- if len(filteredHeights) == 0 {
- break
- }
- } else {
- filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, false)
- if err != nil {
- return nil, err
- }
- }
- }
-
- // fetch matching heights
- results = make([]int64, 0, len(filteredHeights))
- for _, hBz := range filteredHeights {
- h := int64FromBytes(hBz)
-
- ok, err := idx.Has(h)
- if err != nil {
- return nil, err
- }
- if ok {
- results = append(results, h)
- }
-
- select {
- case <-ctx.Done():
- break
-
- default:
- }
- }
-
- sort.Slice(results, func(i, j int) bool { return results[i] < results[j] })
-
- return results, nil
- }
-
- // matchRange returns all matching block heights that match a given QueryRange
- // and start key. An already filtered result (filteredHeights) is provided such
- // that any non-intersecting matches are removed.
- //
- // NOTE: The provided filteredHeights may be empty if no previous condition has
- // matched.
- func (idx *BlockerIndexer) matchRange(
- ctx context.Context,
- qr indexer.QueryRange,
- startKey []byte,
- filteredHeights map[string][]byte,
- firstRun bool,
- ) (map[string][]byte, error) {
-
- // A previous match was attempted but resulted in no matches, so we return
- // no matches (assuming AND operand).
- if !firstRun && len(filteredHeights) == 0 {
- return filteredHeights, nil
- }
-
- tmpHeights := make(map[string][]byte)
- lowerBound := qr.LowerBoundValue()
- upperBound := qr.UpperBoundValue()
-
- it, err := dbm.IteratePrefix(idx.store, startKey)
- if err != nil {
- return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
- }
- defer it.Close()
-
- LOOP:
- for ; it.Valid(); it.Next() {
- var (
- eventValue string
- err error
- )
-
- if qr.Key == types.BlockHeightKey {
- eventValue, err = parseValueFromPrimaryKey(it.Key())
- } else {
- eventValue, err = parseValueFromEventKey(it.Key())
- }
-
- if err != nil {
- continue
- }
-
- if _, ok := qr.AnyBound().(int64); ok {
- v, err := strconv.ParseInt(eventValue, 10, 64)
- if err != nil {
- continue LOOP
- }
-
- include := true
- if lowerBound != nil && v < lowerBound.(int64) {
- include = false
- }
-
- if upperBound != nil && v > upperBound.(int64) {
- include = false
- }
-
- if include {
- tmpHeights[string(it.Value())] = it.Value()
- }
- }
-
- select {
- case <-ctx.Done():
- break
-
- default:
- }
- }
-
- if err := it.Error(); err != nil {
- return nil, err
- }
-
- if len(tmpHeights) == 0 || firstRun {
- // Either:
- //
- // 1. Regardless if a previous match was attempted, which may have had
- // results, but no match was found for the current condition, then we
- // return no matches (assuming AND operand).
- //
- // 2. A previous match was not attempted, so we return all results.
- return tmpHeights, nil
- }
-
- // Remove/reduce matches in filteredHashes that were not found in this
- // match (tmpHashes).
- for k := range filteredHeights {
- if tmpHeights[k] == nil {
- delete(filteredHeights, k)
-
- select {
- case <-ctx.Done():
- break
-
- default:
- }
- }
- }
-
- return filteredHeights, nil
- }
-
- // match returns all matching heights that meet a given query condition and start
- // key. An already filtered result (filteredHeights) is provided such that any
- // non-intersecting matches are removed.
- //
- // NOTE: The provided filteredHeights may be empty if no previous condition has
- // matched.
- func (idx *BlockerIndexer) match(
- ctx context.Context,
- c query.Condition,
- startKeyBz []byte,
- filteredHeights map[string][]byte,
- firstRun bool,
- ) (map[string][]byte, error) {
-
- // A previous match was attempted but resulted in no matches, so we return
- // no matches (assuming AND operand).
- if !firstRun && len(filteredHeights) == 0 {
- return filteredHeights, nil
- }
-
- tmpHeights := make(map[string][]byte)
-
- switch {
- case c.Op == query.OpEqual:
- it, err := dbm.IteratePrefix(idx.store, startKeyBz)
- if err != nil {
- return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
- }
- defer it.Close()
-
- for ; it.Valid(); it.Next() {
- tmpHeights[string(it.Value())] = it.Value()
-
- if err := ctx.Err(); err != nil {
- break
- }
- }
-
- if err := it.Error(); err != nil {
- return nil, err
- }
-
- case c.Op == query.OpExists:
- prefix, err := orderedcode.Append(nil, c.CompositeKey)
- if err != nil {
- return nil, err
- }
-
- it, err := dbm.IteratePrefix(idx.store, prefix)
- if err != nil {
- return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
- }
- defer it.Close()
-
- for ; it.Valid(); it.Next() {
- tmpHeights[string(it.Value())] = it.Value()
-
- select {
- case <-ctx.Done():
- break
-
- default:
- }
- }
-
- if err := it.Error(); err != nil {
- return nil, err
- }
-
- case c.Op == query.OpContains:
- prefix, err := orderedcode.Append(nil, c.CompositeKey)
- if err != nil {
- return nil, err
- }
-
- it, err := dbm.IteratePrefix(idx.store, prefix)
- if err != nil {
- return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
- }
- defer it.Close()
-
- for ; it.Valid(); it.Next() {
- eventValue, err := parseValueFromEventKey(it.Key())
- if err != nil {
- continue
- }
-
- if strings.Contains(eventValue, c.Operand.(string)) {
- tmpHeights[string(it.Value())] = it.Value()
- }
-
- select {
- case <-ctx.Done():
- break
-
- default:
- }
- }
- if err := it.Error(); err != nil {
- return nil, err
- }
-
- default:
- return nil, errors.New("other operators should be handled already")
- }
-
- if len(tmpHeights) == 0 || firstRun {
- // Either:
- //
- // 1. Regardless if a previous match was attempted, which may have had
- // results, but no match was found for the current condition, then we
- // return no matches (assuming AND operand).
- //
- // 2. A previous match was not attempted, so we return all results.
- return tmpHeights, nil
- }
-
- // Remove/reduce matches in filteredHeights that were not found in this
- // match (tmpHeights).
- for k := range filteredHeights {
- if tmpHeights[k] == nil {
- delete(filteredHeights, k)
-
- select {
- case <-ctx.Done():
- break
-
- default:
- }
- }
- }
-
- return filteredHeights, nil
- }
-
- func (idx *BlockerIndexer) indexEvents(batch dbm.Batch, events []abci.Event, typ string, height int64) error {
- heightBz := int64ToBytes(height)
-
- for _, event := range events {
- // only index events with a non-empty type
- if len(event.Type) == 0 {
- continue
- }
-
- for _, attr := range event.Attributes {
- if len(attr.Key) == 0 {
- continue
- }
-
- // index iff the event specified index:true and it's not a reserved event
- compositeKey := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
- if compositeKey == types.TxHashKey || compositeKey == types.TxHeightKey {
- return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
- }
- if attr.GetIndex() {
- key, err := eventKey(compositeKey, typ, string(attr.Value), height)
- if err != nil {
- return fmt.Errorf("failed to create block index key: %w", err)
- }
-
- if err := batch.Set(key, heightBz); err != nil {
- return err
- }
- }
- }
- }
-
- return nil
- }
|