package kv import ( "bytes" "encoding/hex" "fmt" "strconv" "strings" "time" "github.com/pkg/errors" abci "github.com/tendermint/abci/types" wire "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" db "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/pubsub/query" ) const ( tagKeySeparator = "/" ) var _ txindex.TxIndexer = (*TxIndex)(nil) // TxIndex is the simplest possible indexer, backed by key-value storage (levelDB). type TxIndex struct { store db.DB tagsToIndex []string indexAllTags bool } // NewTxIndex creates new KV indexer. func NewTxIndex(store db.DB, options ...func(*TxIndex)) *TxIndex { txi := &TxIndex{store: store, tagsToIndex: make([]string, 0), indexAllTags: false} for _, o := range options { o(txi) } return txi } // IndexTags is an option for setting which tags to index. func IndexTags(tags []string) func(*TxIndex) { return func(txi *TxIndex) { txi.tagsToIndex = tags } } // IndexAllTags is an option for indexing all tags. func IndexAllTags() func(*TxIndex) { return func(txi *TxIndex) { txi.indexAllTags = true } } // Get gets transaction from the TxIndex storage and returns it or nil if the // transaction is not found. func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { if len(hash) == 0 { return nil, txindex.ErrorEmptyHash } rawBytes := txi.store.Get(hash) if rawBytes == nil { return nil, nil } r := bytes.NewReader(rawBytes) var n int var err error txResult := wire.ReadBinary(&types.TxResult{}, r, 0, &n, &err).(*types.TxResult) if err != nil { return nil, fmt.Errorf("Error reading TxResult: %v", err) } return txResult, nil } // AddBatch indexes a batch of transactions using the given list of tags. func (txi *TxIndex) AddBatch(b *txindex.Batch) error { storeBatch := txi.store.NewBatch() for _, result := range b.Ops { hash := result.Tx.Hash() // index tx by tags for _, tag := range result.Result.Tags { if txi.indexAllTags || cmn.StringInSlice(tag.Key, txi.tagsToIndex) { storeBatch.Set(keyForTag(tag, result), hash) } } // index tx by hash rawBytes := wire.BinaryBytes(result) storeBatch.Set(hash, rawBytes) } storeBatch.Write() return nil } // Index indexes a single transaction using the given list of tags. func (txi *TxIndex) Index(result *types.TxResult) error { b := txi.store.NewBatch() hash := result.Tx.Hash() // index tx by tags for _, tag := range result.Result.Tags { if txi.indexAllTags || cmn.StringInSlice(tag.Key, txi.tagsToIndex) { b.Set(keyForTag(tag, result), hash) } } // index tx by hash rawBytes := wire.BinaryBytes(result) b.Set(hash, rawBytes) b.Write() return nil } // Search performs a search using the given query. It breaks the query into // conditions (like "tx.height > 5"). For each condition, it queries the DB // index. One special use cases here: (1) if "tx.hash" is found, it returns tx // result for it (2) for range queries it is better for the client to provide // both lower and upper bounds, so we are not performing a full scan. Results // from querying indexes are then intersected and returned to the caller. func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { var hashes [][]byte var hashesInitialized bool // get a list of conditions (like "tx.height > 5") conditions := q.Conditions() // if there is a hash condition, return the result immediately hash, err, ok := lookForHash(conditions) if err != nil { return nil, errors.Wrap(err, "error during searching for a hash in the query") } else if ok { res, err := txi.Get(hash) if res == nil { return []*types.TxResult{}, nil } else { return []*types.TxResult{res}, errors.Wrap(err, "error while retrieving the result") } } // conditions to skip because they're handled before "everything else" skipIndexes := make([]int, 0) // if there is a height condition ("tx.height=3"), extract it for faster lookups height, heightIndex := lookForHeight(conditions) if heightIndex >= 0 { skipIndexes = append(skipIndexes, heightIndex) } // extract ranges // if both upper and lower bounds exist, it's better to get them in order not // no iterate over kvs that are not within range. ranges, rangeIndexes := lookForRanges(conditions) if len(ranges) > 0 { skipIndexes = append(skipIndexes, rangeIndexes...) for _, r := range ranges { if !hashesInitialized { hashes = txi.matchRange(r, startKeyForRange(r, height)) hashesInitialized = true } else { hashes = intersect(hashes, txi.matchRange(r, startKeyForRange(r, height))) } } } // for all other conditions for i, c := range conditions { if cmn.IntInSlice(i, skipIndexes) { continue } if !hashesInitialized { hashes = txi.match(c, startKey(c, height)) hashesInitialized = true } else { hashes = intersect(hashes, txi.match(c, startKey(c, height))) } } results := make([]*types.TxResult, len(hashes)) i := 0 for _, h := range hashes { results[i], err = txi.Get(h) if err != nil { return nil, errors.Wrapf(err, "failed to get Tx{%X}", h) } i++ } return results, nil } func lookForHash(conditions []query.Condition) (hash []byte, err error, ok bool) { for _, c := range conditions { if c.Tag == types.TxHashKey { decoded, err := hex.DecodeString(c.Operand.(string)) return decoded, err, true } } return } func lookForHeight(conditions []query.Condition) (height int64, index int) { for i, c := range conditions { if c.Tag == types.TxHeightKey { return c.Operand.(int64), i } } return 0, -1 } // special map to hold range conditions // Example: account.number => queryRange{lowerBound: 1, upperBound: 5} type queryRanges map[string]queryRange type queryRange struct { key string lowerBound interface{} // int || time.Time includeLowerBound bool upperBound interface{} // int || time.Time includeUpperBound bool } func lookForRanges(conditions []query.Condition) (ranges queryRanges, indexes []int) { ranges = make(queryRanges) for i, c := range conditions { if isRangeOperation(c.Op) { r, ok := ranges[c.Tag] if !ok { r = queryRange{key: c.Tag} } switch c.Op { case query.OpGreater: r.lowerBound = c.Operand case query.OpGreaterEqual: r.includeLowerBound = true r.lowerBound = c.Operand case query.OpLess: r.upperBound = c.Operand case query.OpLessEqual: r.includeUpperBound = true r.upperBound = c.Operand } ranges[c.Tag] = r indexes = append(indexes, i) } } return ranges, indexes } func isRangeOperation(op query.Operator) bool { switch op { case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual: return true default: return false } } func (txi *TxIndex) match(c query.Condition, startKey []byte) (hashes [][]byte) { if c.Op == query.OpEqual { it := txi.store.IteratorPrefix(startKey) defer it.Release() for it.Next() { hashes = append(hashes, it.Value()) } } else if c.Op == query.OpContains { // XXX: doing full scan because startKey does not apply here // For example, if startKey = "account.owner=an" and search query = "accoutn.owner CONSISTS an" // we can't iterate with prefix "account.owner=an" because we might miss keys like "account.owner=Ulan" it := txi.store.Iterator() defer it.Release() for it.Next() { if !isTagKey(it.Key()) { continue } if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) { hashes = append(hashes, it.Value()) } } } else { panic("other operators should be handled already") } return } func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) { it := txi.store.IteratorPrefix(startKey) defer it.Release() LOOP: for it.Next() { if !isTagKey(it.Key()) { continue } if r.upperBound != nil { // no other way to stop iterator other than checking for upperBound switch (r.upperBound).(type) { case int64: v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) if err == nil && v == r.upperBound { if r.includeUpperBound { hashes = append(hashes, it.Value()) } break LOOP } // XXX: passing time in a ABCI Tags is not yet implemented // case time.Time: // v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) // if v == r.upperBound { // break // } } } hashes = append(hashes, it.Value()) } return } /////////////////////////////////////////////////////////////////////////////// // Keys func startKey(c query.Condition, height int64) []byte { var key string if height > 0 { key = fmt.Sprintf("%s/%v/%d", c.Tag, c.Operand, height) } else { key = fmt.Sprintf("%s/%v", c.Tag, c.Operand) } return []byte(key) } func startKeyForRange(r queryRange, height int64) []byte { if r.lowerBound == nil { return []byte(fmt.Sprintf("%s", r.key)) } var lowerBound interface{} if r.includeLowerBound { lowerBound = r.lowerBound } else { switch t := r.lowerBound.(type) { case int64: lowerBound = t + 1 case time.Time: lowerBound = t.Unix() + 1 default: panic("not implemented") } } var key string if height > 0 { key = fmt.Sprintf("%s/%v/%d", r.key, lowerBound, height) } else { key = fmt.Sprintf("%s/%v", r.key, lowerBound) } return []byte(key) } func isTagKey(key []byte) bool { return strings.Count(string(key), tagKeySeparator) == 3 } func extractValueFromKey(key []byte) string { parts := strings.SplitN(string(key), tagKeySeparator, 3) return parts[1] } func keyForTag(tag *abci.KVPair, result *types.TxResult) []byte { switch tag.ValueType { case abci.KVPair_STRING: return []byte(fmt.Sprintf("%s/%v/%d/%d", tag.Key, tag.ValueString, result.Height, result.Index)) case abci.KVPair_INT: return []byte(fmt.Sprintf("%s/%v/%d/%d", tag.Key, tag.ValueInt, result.Height, result.Index)) // case abci.KVPair_TIME: // return []byte(fmt.Sprintf("%s/%d/%d/%d", tag.Key, tag.ValueTime.Unix(), result.Height, result.Index)) default: panic(fmt.Sprintf("Undefined value type: %v", tag.ValueType)) } } /////////////////////////////////////////////////////////////////////////////// // Utils func intersect(as, bs [][]byte) [][]byte { i := make([][]byte, 0, cmn.MinInt(len(as), len(bs))) for _, a := range as { for _, b := range bs { if bytes.Equal(a, b) { i = append(i, a) } } } return i }