package kv import ( "context" "errors" "fmt" "sort" "strconv" "strings" "github.com/google/orderedcode" dbm "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/state/indexer" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/types" ) var _ indexer.BlockIndexer = (*BlockerIndexer)(nil) // BlockerIndexer implements a block indexer, indexing BeginBlock and EndBlock // events with an underlying KV store. Block events are indexed by their height, // such that matching search criteria returns the respective block height(s). type BlockerIndexer struct { store dbm.DB } func New(store dbm.DB) *BlockerIndexer { return &BlockerIndexer{ store: store, } } // Has returns true if the given height has been indexed. An error is returned // upon database query failure. func (idx *BlockerIndexer) Has(height int64) (bool, error) { key, err := heightKey(height) if err != nil { return false, fmt.Errorf("failed to create block height index key: %w", err) } return idx.store.Has(key) } // Index indexes BeginBlock and EndBlock events for a given block by its height. // The following is indexed: // // primary key: encode(block.height | height) => encode(height) // BeginBlock events: encode(eventType.eventAttr|eventValue|height|begin_block) => encode(height) // EndBlock events: encode(eventType.eventAttr|eventValue|height|end_block) => encode(height) func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockHeader) error { batch := idx.store.NewBatch() defer batch.Close() height := bh.Header.Height // 1. index by height key, err := heightKey(height) if err != nil { return fmt.Errorf("failed to create block height index key: %w", err) } if err := batch.Set(key, int64ToBytes(height)); err != nil { return err } // 2. index BeginBlock events if err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height); err != nil { return fmt.Errorf("failed to index BeginBlock events: %w", err) } // 3. index EndBlock events if err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height); err != nil { return fmt.Errorf("failed to index EndBlock events: %w", err) } return batch.WriteSync() } // Search performs a query for block heights that match a given BeginBlock // and Endblock event search criteria. The given query can match against zero, // one or more block heights. In the case of height queries, i.e. block.height=H, // if the height is indexed, that height alone will be returned. An error and // nil slice is returned. Otherwise, a non-nil slice and nil error is returned. func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, error) { results := make([]int64, 0) select { case <-ctx.Done(): return results, nil default: } conditions, err := q.Conditions() if err != nil { return nil, fmt.Errorf("failed to parse query conditions: %w", err) } // If there is an exact height query, return the result immediately // (if it exists). height, ok := lookForHeight(conditions) if ok { ok, err := idx.Has(height) if err != nil { return nil, err } if ok { return []int64{height}, nil } return results, nil } var heightsInitialized bool filteredHeights := make(map[string][]byte) // conditions to skip because they're handled before "everything else" skipIndexes := make([]int, 0) // Extract ranges. If both upper and lower bounds exist, it's better to get // them in order as to not iterate over kvs that are not within range. ranges, rangeIndexes := indexer.LookForRanges(conditions) if len(ranges) > 0 { skipIndexes = append(skipIndexes, rangeIndexes...) for _, qr := range ranges { prefix, err := orderedcode.Append(nil, qr.Key) if err != nil { return nil, fmt.Errorf("failed to create prefix key: %w", err) } if !heightsInitialized { filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, true) if err != nil { return nil, err } heightsInitialized = true // Ignore any remaining conditions if the first condition resulted in no // matches (assuming implicit AND operand). if len(filteredHeights) == 0 { break } } else { filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, false) if err != nil { return nil, err } } } } // for all other conditions for i, c := range conditions { if intInSlice(i, skipIndexes) { continue } startKey, err := orderedcode.Append(nil, c.CompositeKey, fmt.Sprintf("%v", c.Operand)) if err != nil { return nil, err } if !heightsInitialized { filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, true) if err != nil { return nil, err } heightsInitialized = true // Ignore any remaining conditions if the first condition resulted in no // matches (assuming implicit AND operand). if len(filteredHeights) == 0 { break } } else { filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, false) if err != nil { return nil, err } } } // fetch matching heights results = make([]int64, 0, len(filteredHeights)) heights: for _, hBz := range filteredHeights { h := int64FromBytes(hBz) ok, err := idx.Has(h) if err != nil { return nil, err } if ok { results = append(results, h) } select { case <-ctx.Done(): break heights default: } } sort.Slice(results, func(i, j int) bool { return results[i] < results[j] }) return results, nil } // matchRange returns all matching block heights that match a given QueryRange // and start key. An already filtered result (filteredHeights) is provided such // that any non-intersecting matches are removed. // // NOTE: The provided filteredHeights may be empty if no previous condition has // matched. func (idx *BlockerIndexer) matchRange( ctx context.Context, qr indexer.QueryRange, startKey []byte, filteredHeights map[string][]byte, firstRun bool, ) (map[string][]byte, error) { // A previous match was attempted but resulted in no matches, so we return // no matches (assuming AND operand). if !firstRun && len(filteredHeights) == 0 { return filteredHeights, nil } tmpHeights := make(map[string][]byte) lowerBound := qr.LowerBoundValue() upperBound := qr.UpperBoundValue() it, err := dbm.IteratePrefix(idx.store, startKey) if err != nil { return nil, fmt.Errorf("failed to create prefix iterator: %w", err) } defer it.Close() iter: for ; it.Valid(); it.Next() { var ( eventValue string err error ) if qr.Key == types.BlockHeightKey { eventValue, err = parseValueFromPrimaryKey(it.Key()) } else { eventValue, err = parseValueFromEventKey(it.Key()) } if err != nil { continue } if _, ok := qr.AnyBound().(int64); ok { v, err := strconv.ParseInt(eventValue, 10, 64) if err != nil { continue iter } include := true if lowerBound != nil && v < lowerBound.(int64) { include = false } if upperBound != nil && v > upperBound.(int64) { include = false } if include { tmpHeights[string(it.Value())] = it.Value() } } select { case <-ctx.Done(): break iter default: } } if err := it.Error(); err != nil { return nil, err } if len(tmpHeights) == 0 || firstRun { // Either: // // 1. Regardless if a previous match was attempted, which may have had // results, but no match was found for the current condition, then we // return no matches (assuming AND operand). // // 2. A previous match was not attempted, so we return all results. return tmpHeights, nil } // Remove/reduce matches in filteredHashes that were not found in this // match (tmpHashes). for k := range filteredHeights { if tmpHeights[k] == nil { delete(filteredHeights, k) select { case <-ctx.Done(): break default: } } } return filteredHeights, nil } // match returns all matching heights that meet a given query condition and start // key. An already filtered result (filteredHeights) is provided such that any // non-intersecting matches are removed. // // NOTE: The provided filteredHeights may be empty if no previous condition has // matched. func (idx *BlockerIndexer) match( ctx context.Context, c query.Condition, startKeyBz []byte, filteredHeights map[string][]byte, firstRun bool, ) (map[string][]byte, error) { // A previous match was attempted but resulted in no matches, so we return // no matches (assuming AND operand). if !firstRun && len(filteredHeights) == 0 { return filteredHeights, nil } tmpHeights := make(map[string][]byte) switch { case c.Op == query.OpEqual: it, err := dbm.IteratePrefix(idx.store, startKeyBz) if err != nil { return nil, fmt.Errorf("failed to create prefix iterator: %w", err) } defer it.Close() for ; it.Valid(); it.Next() { tmpHeights[string(it.Value())] = it.Value() if err := ctx.Err(); err != nil { break } } if err := it.Error(); err != nil { return nil, err } case c.Op == query.OpExists: prefix, err := orderedcode.Append(nil, c.CompositeKey) if err != nil { return nil, err } it, err := dbm.IteratePrefix(idx.store, prefix) if err != nil { return nil, fmt.Errorf("failed to create prefix iterator: %w", err) } defer it.Close() iterExists: for ; it.Valid(); it.Next() { tmpHeights[string(it.Value())] = it.Value() select { case <-ctx.Done(): break iterExists default: } } if err := it.Error(); err != nil { return nil, err } case c.Op == query.OpContains: prefix, err := orderedcode.Append(nil, c.CompositeKey) if err != nil { return nil, err } it, err := dbm.IteratePrefix(idx.store, prefix) if err != nil { return nil, fmt.Errorf("failed to create prefix iterator: %w", err) } defer it.Close() iterContains: for ; it.Valid(); it.Next() { eventValue, err := parseValueFromEventKey(it.Key()) if err != nil { continue } if strings.Contains(eventValue, c.Operand.(string)) { tmpHeights[string(it.Value())] = it.Value() } select { case <-ctx.Done(): break iterContains default: } } if err := it.Error(); err != nil { return nil, err } default: return nil, errors.New("other operators should be handled already") } if len(tmpHeights) == 0 || firstRun { // Either: // // 1. Regardless if a previous match was attempted, which may have had // results, but no match was found for the current condition, then we // return no matches (assuming AND operand). // // 2. A previous match was not attempted, so we return all results. return tmpHeights, nil } // Remove/reduce matches in filteredHeights that were not found in this // match (tmpHeights). for k := range filteredHeights { if tmpHeights[k] == nil { delete(filteredHeights, k) select { case <-ctx.Done(): break default: } } } return filteredHeights, nil } func (idx *BlockerIndexer) indexEvents(batch dbm.Batch, events []abci.Event, typ string, height int64) error { heightBz := int64ToBytes(height) for _, event := range events { // only index events with a non-empty type if len(event.Type) == 0 { continue } for _, attr := range event.Attributes { if len(attr.Key) == 0 { continue } // index iff the event specified index:true and it's not a reserved event compositeKey := fmt.Sprintf("%s.%s", event.Type, attr.Key) if compositeKey == types.BlockHeightKey { return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey) } if attr.GetIndex() { key, err := eventKey(compositeKey, typ, attr.Value, height) if err != nil { return fmt.Errorf("failed to create block index key: %w", err) } if err := batch.Set(key, heightBz); err != nil { return err } } } } return nil }