package kv import ( "context" "encoding/hex" "fmt" "strconv" "strings" "github.com/gogo/protobuf/proto" "github.com/google/orderedcode" dbm "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" ) var _ txindex.TxIndexer = (*TxIndex)(nil) // TxIndex is the simplest possible indexer // It is backed by two kv stores: // 1. txhash - result (primary key) // 2. event - txhash (secondary key) type TxIndex struct { store dbm.DB } // NewTxIndex creates new KV indexer. func NewTxIndex(store dbm.DB) *TxIndex { return &TxIndex{ store: store, } } // Get gets transaction from the TxIndex storage and returns it or nil if the // transaction is not found. func (txi *TxIndex) Get(hash []byte) (*abci.TxResult, error) { if len(hash) == 0 { return nil, txindex.ErrorEmptyHash } rawBytes, err := txi.store.Get(primaryKey(hash)) if err != nil { panic(err) } if rawBytes == nil { return nil, nil } txResult := new(abci.TxResult) err = proto.Unmarshal(rawBytes, txResult) if err != nil { return nil, fmt.Errorf("error reading TxResult: %v", err) } return txResult, nil } // AddBatch indexes a batch of transactions using the given list of events. Each // key that indexed from the tx's events is a composite of the event type and // the respective attribute's key delimited by a "." (eg. "account.number"). // Any event with an empty type is not indexed. func (txi *TxIndex) AddBatch(b *txindex.Batch) error { storeBatch := txi.store.NewBatch() defer storeBatch.Close() for _, result := range b.Ops { hash := types.Tx(result.Tx).Hash() // index tx by events err := txi.indexEvents(result, hash, storeBatch) if err != nil { return err } // index by height (always) err = storeBatch.Set(keyFromHeight(result), hash) if err != nil { return err } rawBytes, err := proto.Marshal(result) if err != nil { return err } // index by hash (always) err = storeBatch.Set(primaryKey(hash), rawBytes) if err != nil { return err } } return storeBatch.WriteSync() } // Index indexes a single transaction using the given list of events. Each key // that indexed from the tx's events is a composite of the event type and the // respective attribute's key delimited by a "." (eg. "account.number"). // Any event with an empty type is not indexed. func (txi *TxIndex) Index(result *abci.TxResult) error { b := txi.store.NewBatch() defer b.Close() hash := types.Tx(result.Tx).Hash() // index tx by events err := txi.indexEvents(result, hash, b) if err != nil { return err } // index by height (always) err = b.Set(keyFromHeight(result), hash) if err != nil { return err } rawBytes, err := proto.Marshal(result) if err != nil { return err } // index by hash (always) err = b.Set(primaryKey(hash), rawBytes) if err != nil { return err } return b.WriteSync() } func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Batch) error { for _, event := range result.Result.Events { // only index events with a non-empty type if len(event.Type) == 0 { continue } for _, attr := range event.Attributes { if len(attr.Key) == 0 { continue } // index if `index: true` is set compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key)) // ensure event does not conflict with a reserved prefix key if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey { return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag) } if attr.GetIndex() { err := store.Set(keyFromEvent(compositeTag, attr.Value, result), hash) if err != nil { return err } } } } return nil } // Search performs a search using the given query. // // It breaks the query into conditions (like "tx.height > 5"). For each // condition, it queries the DB index. One special use cases here: (1) if // "tx.hash" is found, it returns tx result for it (2) for range queries it is // better for the client to provide both lower and upper bounds, so we are not // performing a full scan. Results from querying indexes are then intersected // and returned to the caller, in no particular order. // // Search will exit early and return any result fetched so far, // when a message is received on the context chan. func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) { select { case <-ctx.Done(): return make([]*abci.TxResult, 0), nil default: } var hashesInitialized bool filteredHashes := make(map[string][]byte) // get a list of conditions (like "tx.height > 5") conditions, err := q.Conditions() if err != nil { return nil, fmt.Errorf("error during parsing conditions from query: %w", err) } // if there is a hash condition, return the result immediately hash, ok, err := lookForHash(conditions) if err != nil { return nil, fmt.Errorf("error during searching for a hash in the query: %w", err) } else if ok { res, err := txi.Get(hash) switch { case err != nil: return []*abci.TxResult{}, fmt.Errorf("error while retrieving the result: %w", err) case res == nil: return []*abci.TxResult{}, nil default: return []*abci.TxResult{res}, nil } } // conditions to skip because they're handled before "everything else" skipIndexes := make([]int, 0) // extract ranges // if both upper and lower bounds exist, it's better to get them in order not // no iterate over kvs that are not within range. ranges, rangeIndexes := indexer.LookForRanges(conditions) if len(ranges) > 0 { skipIndexes = append(skipIndexes, rangeIndexes...) for _, qr := range ranges { if !hashesInitialized { filteredHashes = txi.matchRange(ctx, qr, prefixFromCompositeKey(qr.Key), filteredHashes, true) hashesInitialized = true // Ignore any remaining conditions if the first condition resulted // in no matches (assuming implicit AND operand). if len(filteredHashes) == 0 { break } } else { filteredHashes = txi.matchRange(ctx, qr, prefixFromCompositeKey(qr.Key), filteredHashes, false) } } } // if there is a height condition ("tx.height=3"), extract it height := lookForHeight(conditions) // for all other conditions for i, c := range conditions { if intInSlice(i, skipIndexes) { continue } if !hashesInitialized { filteredHashes = txi.match(ctx, c, prefixForCondition(c, height), filteredHashes, true) hashesInitialized = true // Ignore any remaining conditions if the first condition resulted // in no matches (assuming implicit AND operand). if len(filteredHashes) == 0 { break } } else { filteredHashes = txi.match(ctx, c, prefixForCondition(c, height), filteredHashes, false) } } results := make([]*abci.TxResult, 0, len(filteredHashes)) for _, h := range filteredHashes { res, err := txi.Get(h) if err != nil { return nil, fmt.Errorf("failed to get Tx{%X}: %w", h, err) } results = append(results, res) // Potentially exit early. select { case <-ctx.Done(): break default: } } return results, nil } func lookForHash(conditions []query.Condition) (hash []byte, ok bool, err error) { for _, c := range conditions { if c.CompositeKey == types.TxHashKey { decoded, err := hex.DecodeString(c.Operand.(string)) return decoded, true, err } } return } // lookForHeight returns a height if there is an "height=X" condition. func lookForHeight(conditions []query.Condition) (height int64) { for _, c := range conditions { if c.CompositeKey == types.TxHeightKey && c.Op == query.OpEqual { return c.Operand.(int64) } } return 0 } // match returns all matching txs by hash that meet a given condition and start // key. An already filtered result (filteredHashes) is provided such that any // non-intersecting matches are removed. // // NOTE: filteredHashes may be empty if no previous condition has matched. func (txi *TxIndex) match( ctx context.Context, c query.Condition, startKeyBz []byte, filteredHashes map[string][]byte, firstRun bool, ) map[string][]byte { // A previous match was attempted but resulted in no matches, so we return // no matches (assuming AND operand). if !firstRun && len(filteredHashes) == 0 { return filteredHashes } tmpHashes := make(map[string][]byte) switch { case c.Op == query.OpEqual: it, err := dbm.IteratePrefix(txi.store, startKeyBz) if err != nil { panic(err) } defer it.Close() for ; it.Valid(); it.Next() { tmpHashes[string(it.Value())] = it.Value() // Potentially exit early. select { case <-ctx.Done(): break default: } } if err := it.Error(); err != nil { panic(err) } case c.Op == query.OpExists: // XXX: can't use startKeyBz here because c.Operand is nil // (e.g. "account.owner//" won't match w/ a single row) it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.CompositeKey)) if err != nil { panic(err) } defer it.Close() for ; it.Valid(); it.Next() { tmpHashes[string(it.Value())] = it.Value() // Potentially exit early. select { case <-ctx.Done(): break default: } } if err := it.Error(); err != nil { panic(err) } case c.Op == query.OpContains: // XXX: startKey does not apply here. // For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an" // we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/" it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.CompositeKey)) if err != nil { panic(err) } defer it.Close() for ; it.Valid(); it.Next() { value, err := parseValueFromKey(it.Key()) if err != nil { continue } if strings.Contains(value, c.Operand.(string)) { tmpHashes[string(it.Value())] = it.Value() } // Potentially exit early. select { case <-ctx.Done(): break default: } } if err := it.Error(); err != nil { panic(err) } default: panic("other operators should be handled already") } if len(tmpHashes) == 0 || firstRun { // Either: // // 1. Regardless if a previous match was attempted, which may have had // results, but no match was found for the current condition, then we // return no matches (assuming AND operand). // // 2. A previous match was not attempted, so we return all results. return tmpHashes } // Remove/reduce matches in filteredHashes that were not found in this // match (tmpHashes). for k := range filteredHashes { if tmpHashes[k] == nil { delete(filteredHashes, k) // Potentially exit early. select { case <-ctx.Done(): break default: } } } return filteredHashes } // matchRange returns all matching txs by hash that meet a given queryRange and // start key. An already filtered result (filteredHashes) is provided such that // any non-intersecting matches are removed. // // NOTE: filteredHashes may be empty if no previous condition has matched. func (txi *TxIndex) matchRange( ctx context.Context, qr indexer.QueryRange, startKey []byte, filteredHashes map[string][]byte, firstRun bool, ) map[string][]byte { // A previous match was attempted but resulted in no matches, so we return // no matches (assuming AND operand). if !firstRun && len(filteredHashes) == 0 { return filteredHashes } tmpHashes := make(map[string][]byte) lowerBound := qr.LowerBoundValue() upperBound := qr.UpperBoundValue() it, err := dbm.IteratePrefix(txi.store, startKey) if err != nil { panic(err) } defer it.Close() LOOP: for ; it.Valid(); it.Next() { value, err := parseValueFromKey(it.Key()) if err != nil { continue } if _, ok := qr.AnyBound().(int64); ok { v, err := strconv.ParseInt(value, 10, 64) if err != nil { continue LOOP } include := true if lowerBound != nil && v < lowerBound.(int64) { include = false } if upperBound != nil && v > upperBound.(int64) { include = false } if include { tmpHashes[string(it.Value())] = it.Value() } // XXX: passing time in a ABCI Events is not yet implemented // case time.Time: // v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) // if v == r.upperBound { // break // } } // Potentially exit early. select { case <-ctx.Done(): break default: } } if err := it.Error(); err != nil { panic(err) } if len(tmpHashes) == 0 || firstRun { // Either: // // 1. Regardless if a previous match was attempted, which may have had // results, but no match was found for the current condition, then we // return no matches (assuming AND operand). // // 2. A previous match was not attempted, so we return all results. return tmpHashes } // Remove/reduce matches in filteredHashes that were not found in this // match (tmpHashes). for k := range filteredHashes { if tmpHashes[k] == nil { delete(filteredHashes, k) // Potentially exit early. select { case <-ctx.Done(): break default: } } } return filteredHashes } // ########################## Keys ############################# // // The indexer has two types of kv stores: // 1. txhash - result (primary key) // 2. event - txhash (secondary key) // // The event key can be decomposed into 4 parts. // 1. A composite key which can be any string. // Usually something like "tx.height" or "account.owner" // 2. A value. That corresponds to the key. In the above // example the value could be "5" or "Ivan" // 3. The height of the Tx that aligns with the key and value. // 4. The index of the Tx that aligns with the key and value // the hash/primary key func primaryKey(hash []byte) []byte { key, err := orderedcode.Append( nil, types.TxHashKey, string(hash), ) if err != nil { panic(err) } return key } // The event/secondary key func secondaryKey(compositeKey, value string, height int64, index uint32) []byte { key, err := orderedcode.Append( nil, compositeKey, value, height, int64(index), ) if err != nil { panic(err) } return key } // parseValueFromKey parses an event key and extracts out the value, returning an error if one arises. // This will also involve ensuring that the key has the correct format. // CONTRACT: function doesn't check that the prefix is correct. This should have already been done by the iterator func parseValueFromKey(key []byte) (string, error) { var ( compositeKey, value string height, index int64 ) remaining, err := orderedcode.Parse(string(key), &compositeKey, &value, &height, &index) if err != nil { return "", err } if len(remaining) != 0 { return "", fmt.Errorf("unexpected remainder in key: %s", remaining) } return value, nil } func keyFromEvent(compositeKey string, value []byte, result *abci.TxResult) []byte { return secondaryKey(compositeKey, string(value), result.Height, result.Index) } func keyFromHeight(result *abci.TxResult) []byte { return secondaryKey(types.TxHeightKey, fmt.Sprintf("%d", result.Height), result.Height, result.Index) } // Prefixes: these represent an initial part of the key and are used by iterators to iterate over a small // section of the kv store during searches. func prefixFromCompositeKey(compositeKey string) []byte { key, err := orderedcode.Append(nil, compositeKey) if err != nil { panic(err) } return key } func prefixFromCompositeKeyAndValue(compositeKey, value string) []byte { key, err := orderedcode.Append(nil, compositeKey, value) if err != nil { panic(err) } return key } // a small utility function for getting a keys prefix based on a condition and a height func prefixForCondition(c query.Condition, height int64) []byte { key := prefixFromCompositeKeyAndValue(c.CompositeKey, fmt.Sprintf("%v", c.Operand)) if height > 0 { var err error key, err = orderedcode.Append(key, height) if err != nil { panic(err) } } return key }