Browse Source

tx indexer: use different field separator for keys (#5865)

pull/5891/head
Callum Waters 4 years ago
committed by GitHub
parent
commit
5b698ed13b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 119 additions and 54 deletions
  1. +112
    -53
      state/txindex/kv/kv.go
  2. +7
    -1
      state/txindex/kv/kv_test.go

+ 112
- 53
state/txindex/kv/kv.go View File

@ -1,7 +1,6 @@
package kv
import (
"bytes"
"context"
"encoding/hex"
"fmt"
@ -10,6 +9,7 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/google/orderedcode"
dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
@ -18,13 +18,12 @@ import (
"github.com/tendermint/tendermint/types"
)
const (
tagKeySeparator = "/"
)
var _ txindex.TxIndexer = (*TxIndex)(nil)
// TxIndex is the simplest possible indexer, backed by key-value storage (levelDB).
// TxIndex is the simplest possible indexer
// It is backed by two kv stores:
// 1. txhash - result (primary key)
// 2. event - txhash (secondary key)
type TxIndex struct {
store dbm.DB
}
@ -43,7 +42,7 @@ func (txi *TxIndex) Get(hash []byte) (*abci.TxResult, error) {
return nil, txindex.ErrorEmptyHash
}
rawBytes, err := txi.store.Get(hash)
rawBytes, err := txi.store.Get(primaryKey(hash))
if err != nil {
panic(err)
}
@ -78,7 +77,7 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
}
// index by height (always)
err = storeBatch.Set(keyForHeight(result), hash)
err = storeBatch.Set(keyFromHeight(result), hash)
if err != nil {
return err
}
@ -88,7 +87,7 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
return err
}
// index by hash (always)
err = storeBatch.Set(hash, rawBytes)
err = storeBatch.Set(primaryKey(hash), rawBytes)
if err != nil {
return err
}
@ -114,7 +113,7 @@ func (txi *TxIndex) Index(result *abci.TxResult) error {
}
// index by height (always)
err = b.Set(keyForHeight(result), hash)
err = b.Set(keyFromHeight(result), hash)
if err != nil {
return err
}
@ -124,7 +123,7 @@ func (txi *TxIndex) Index(result *abci.TxResult) error {
return err
}
// index by hash (always)
err = b.Set(hash, rawBytes)
err = b.Set(primaryKey(hash), rawBytes)
if err != nil {
return err
}
@ -146,8 +145,12 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Ba
// index if `index: true` is set
compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
// ensure event does not conflict with a reserved prefix key
if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey {
return fmt.Errorf("event type and attribute key \"%s\" is reserved. Please use a different key", compositeTag)
}
if attr.GetIndex() {
err := store.Set(keyForEvent(compositeTag, attr.Value, result), hash)
err := store.Set(keyFromEvent(compositeTag, attr.Value, result), hash)
if err != nil {
return err
}
@ -215,7 +218,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
for _, r := range ranges {
if !hashesInitialized {
filteredHashes = txi.matchRange(ctx, r, startKey(r.key), filteredHashes, true)
filteredHashes = txi.matchRange(ctx, r, prefixFromCompositeKey(r.key), filteredHashes, true)
hashesInitialized = true
// Ignore any remaining conditions if the first condition resulted
@ -224,7 +227,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
break
}
} else {
filteredHashes = txi.matchRange(ctx, r, startKey(r.key), filteredHashes, false)
filteredHashes = txi.matchRange(ctx, r, prefixFromCompositeKey(r.key), filteredHashes, false)
}
}
}
@ -239,7 +242,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
}
if !hashesInitialized {
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, true)
filteredHashes = txi.match(ctx, c, prefixForCondition(c, height), filteredHashes, true)
hashesInitialized = true
// Ignore any remaining conditions if the first condition resulted
@ -248,7 +251,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
break
}
} else {
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, false)
filteredHashes = txi.match(ctx, c, prefixForCondition(c, height), filteredHashes, false)
}
}
@ -430,7 +433,7 @@ func (txi *TxIndex) match(
case c.Op == query.OpExists:
// XXX: can't use startKeyBz here because c.Operand is nil
// (e.g. "account.owner/<nil>/" won't match w/ a single row)
it, err := dbm.IteratePrefix(txi.store, startKey(c.CompositeKey))
it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.CompositeKey))
if err != nil {
panic(err)
}
@ -454,18 +457,18 @@ func (txi *TxIndex) match(
// XXX: startKey does not apply here.
// For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an"
// we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/"
it, err := dbm.IteratePrefix(txi.store, startKey(c.CompositeKey))
it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.CompositeKey))
if err != nil {
panic(err)
}
defer it.Close()
for ; it.Valid(); it.Next() {
if !isTagKey(it.Key()) {
value, err := parseValueFromKey(it.Key())
if err != nil {
continue
}
if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
if strings.Contains(value, c.Operand.(string)) {
tmpHashes[string(it.Value())] = it.Value()
}
@ -542,12 +545,12 @@ func (txi *TxIndex) matchRange(
LOOP:
for ; it.Valid(); it.Next() {
if !isTagKey(it.Key()) {
value, err := parseValueFromKey(it.Key())
if err != nil {
continue
}
if _, ok := r.AnyBound().(int64); ok {
v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
v, err := strconv.ParseInt(value, 10, 64)
if err != nil {
continue LOOP
}
@ -613,46 +616,102 @@ LOOP:
return filteredHashes
}
// Keys
// ########################## Keys #############################
//
// The indexer has two types of kv stores:
// 1. txhash - result (primary key)
// 2. event - txhash (secondary key)
//
// The event key can be decomposed into 4 parts.
// 1. A composite key which can be any string.
// Usually something like "tx.height" or "account.owner"
// 2. A value. That corresponds to the key. In the above
// example the value could be "5" or "Ivan"
// 3. The height of the Tx that aligns with the key and value.
// 4. The index of the Tx that aligns with the key and value
// the hash/primary key
func primaryKey(hash []byte) []byte {
key, err := orderedcode.Append(
nil,
types.TxHashKey,
string(hash),
)
if err != nil {
panic(err)
}
return key
}
// The event/secondary key
func secondaryKey(compositeKey, value string, height int64, index uint32) []byte {
key, err := orderedcode.Append(
nil,
compositeKey,
value,
height,
int64(index),
)
if err != nil {
panic(err)
}
return key
}
func isTagKey(key []byte) bool {
return strings.Count(string(key), tagKeySeparator) == 3
// parseValueFromKey parses an event key and extracts out the value, returning an error if one arises.
// This will also involve ensuring that the key has the correct format.
// CONTRACT: function doesn't check that the prefix is correct. This should have already been done by the iterator
func parseValueFromKey(key []byte) (string, error) {
var (
compositeKey, value string
height, index int64
)
remaining, err := orderedcode.Parse(string(key), &compositeKey, &value, &height, &index)
if err != nil {
return "", err
}
if len(remaining) != 0 {
return "", fmt.Errorf("unexpected remainder in key: %s", remaining)
}
return value, nil
}
func extractValueFromKey(key []byte) string {
parts := strings.SplitN(string(key), tagKeySeparator, 3)
return parts[1]
func keyFromEvent(compositeKey string, value []byte, result *abci.TxResult) []byte {
return secondaryKey(compositeKey, string(value), result.Height, result.Index)
}
func keyForEvent(key string, value []byte, result *abci.TxResult) []byte {
return []byte(fmt.Sprintf("%s/%s/%d/%d",
key,
value,
result.Height,
result.Index,
))
func keyFromHeight(result *abci.TxResult) []byte {
return secondaryKey(types.TxHeightKey, fmt.Sprintf("%d", result.Height), result.Height, result.Index)
}
func keyForHeight(result *abci.TxResult) []byte {
return []byte(fmt.Sprintf("%s/%d/%d/%d",
types.TxHeightKey,
result.Height,
result.Height,
result.Index,
))
// Prefixes: these represent an initial part of the key and are used by iterators to iterate over a small
// section of the kv store during searches.
func prefixFromCompositeKey(compositeKey string) []byte {
key, err := orderedcode.Append(nil, compositeKey)
if err != nil {
panic(err)
}
return key
}
func startKeyForCondition(c query.Condition, height int64) []byte {
if height > 0 {
return startKey(c.CompositeKey, c.Operand, height)
func prefixFromCompositeKeyAndValue(compositeKey, value string) []byte {
key, err := orderedcode.Append(nil, compositeKey, value)
if err != nil {
panic(err)
}
return startKey(c.CompositeKey, c.Operand)
return key
}
func startKey(fields ...interface{}) []byte {
var b bytes.Buffer
for _, f := range fields {
b.Write([]byte(fmt.Sprintf("%v", f) + tagKeySeparator))
// a small utility function for getting a keys prefix based on a condition and a height
func prefixForCondition(c query.Condition, height int64) []byte {
key := prefixFromCompositeKeyAndValue(c.CompositeKey, fmt.Sprintf("%v", c.Operand))
if height > 0 {
var err error
key, err = orderedcode.Append(key, height)
if err != nil {
panic(err)
}
}
return b.Bytes()
return key
}

+ 7
- 1
state/txindex/kv/kv_test.go View File

@ -120,6 +120,12 @@ func TestTxSearch(t *testing.T) {
{"account.number EXISTS", 1},
// search using EXISTS for non existing key
{"account.date EXISTS", 0},
// search using height
{"account.number = 1 AND tx.height = 1", 1},
// search using incorrect height
{"account.number = 1 AND tx.height = 3", 0},
// search using height only
{"tx.height = 1", 1},
}
ctx := context.Background()
@ -189,7 +195,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) {
err = b.Set(depKey, hash2)
require.NoError(t, err)
err = b.Set(keyForHeight(txResult2), hash2)
err = b.Set(keyFromHeight(txResult2), hash2)
require.NoError(t, err)
err = b.Set(hash2, rawBytes)
require.NoError(t, err)


Loading…
Cancel
Save