|
|
@ -14,21 +14,26 @@ import ( |
|
|
|
wire "github.com/tendermint/go-wire" |
|
|
|
"github.com/tendermint/tendermint/state/txindex" |
|
|
|
"github.com/tendermint/tendermint/types" |
|
|
|
cmn "github.com/tendermint/tmlibs/common" |
|
|
|
db "github.com/tendermint/tmlibs/db" |
|
|
|
"github.com/tendermint/tmlibs/pubsub/query" |
|
|
|
) |
|
|
|
|
|
|
|
const ( |
|
|
|
tagKeySeparator = "/" |
|
|
|
) |
|
|
|
|
|
|
|
var _ txindex.TxIndexer = (*TxIndex)(nil) |
|
|
|
|
|
|
|
// TxIndex is the simplest possible indexer, backed by Key-Value storage (levelDB).
|
|
|
|
// It can only index transaction by its identifier.
|
|
|
|
// TxIndex is the simplest possible indexer, backed by key-value storage (levelDB).
|
|
|
|
type TxIndex struct { |
|
|
|
store db.DB |
|
|
|
store db.DB |
|
|
|
tagsToIndex []string |
|
|
|
} |
|
|
|
|
|
|
|
// NewTxIndex returns new instance of TxIndex.
|
|
|
|
func NewTxIndex(store db.DB) *TxIndex { |
|
|
|
return &TxIndex{store: store} |
|
|
|
// NewTxIndex creates new KV indexer.
|
|
|
|
func NewTxIndex(store db.DB, tagsToIndex []string) *TxIndex { |
|
|
|
return &TxIndex{store: store, tagsToIndex: tagsToIndex} |
|
|
|
} |
|
|
|
|
|
|
|
// Get gets transaction from the TxIndex storage and returns it or nil if the
|
|
|
@ -55,7 +60,7 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { |
|
|
|
} |
|
|
|
|
|
|
|
// AddBatch indexes a batch of transactions using the given list of tags.
|
|
|
|
func (txi *TxIndex) AddBatch(b *txindex.Batch, allowedTags []string) error { |
|
|
|
func (txi *TxIndex) AddBatch(b *txindex.Batch) error { |
|
|
|
storeBatch := txi.store.NewBatch() |
|
|
|
|
|
|
|
for _, result := range b.Ops { |
|
|
@ -63,7 +68,7 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch, allowedTags []string) error { |
|
|
|
|
|
|
|
// index tx by tags
|
|
|
|
for _, tag := range result.Result.Tags { |
|
|
|
if stringInSlice(tag.Key, allowedTags) { |
|
|
|
if stringInSlice(tag.Key, txi.tagsToIndex) { |
|
|
|
storeBatch.Set(keyForTag(tag, result), hash) |
|
|
|
} |
|
|
|
} |
|
|
@ -78,14 +83,21 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch, allowedTags []string) error { |
|
|
|
} |
|
|
|
|
|
|
|
// Index indexes a single transaction using the given list of tags.
|
|
|
|
func (txi *TxIndex) Index(result *types.TxResult, allowedTags []string) error { |
|
|
|
func (txi *TxIndex) Index(result *types.TxResult) error { |
|
|
|
batch := txindex.NewBatch(1) |
|
|
|
batch.Add(result) |
|
|
|
return txi.AddBatch(batch, allowedTags) |
|
|
|
return txi.AddBatch(batch) |
|
|
|
} |
|
|
|
|
|
|
|
// Search performs a search using the given query. It breaks the query into
|
|
|
|
// conditions (like "tx.height > 5"). For each condition, it queries the DB
|
|
|
|
// index. One special use cases here: (1) if "tx.hash" is found, it returns tx
|
|
|
|
// result for it (2) for range queries it is better for the client to provide
|
|
|
|
// both lower and upper bounds, so we are not performing a full scan. Results
|
|
|
|
// from querying indexes are then intersected and returned to the caller.
|
|
|
|
func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { |
|
|
|
hashes := make(map[string][]byte) // key - (base 16, upper-case hash)
|
|
|
|
var hashes [][]byte |
|
|
|
var hashesInitialized bool |
|
|
|
|
|
|
|
// get a list of conditions (like "tx.height > 5")
|
|
|
|
conditions := q.Conditions() |
|
|
@ -93,13 +105,13 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { |
|
|
|
// if there is a hash condition, return the result immediately
|
|
|
|
hash, err, ok := lookForHash(conditions) |
|
|
|
if err != nil { |
|
|
|
return []*types.TxResult{}, errors.Wrap(err, "error during searching for a hash in the query") |
|
|
|
return nil, errors.Wrap(err, "error during searching for a hash in the query") |
|
|
|
} else if ok { |
|
|
|
res, err := txi.Get(hash) |
|
|
|
return []*types.TxResult{res}, errors.Wrap(err, "error while retrieving the result") |
|
|
|
} |
|
|
|
|
|
|
|
// conditions to skip
|
|
|
|
// conditions to skip because they're handled before "everything else"
|
|
|
|
skipIndexes := make([]int, 0) |
|
|
|
|
|
|
|
// if there is a height condition ("tx.height=3"), extract it for faster lookups
|
|
|
@ -108,36 +120,19 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { |
|
|
|
skipIndexes = append(skipIndexes, heightIndex) |
|
|
|
} |
|
|
|
|
|
|
|
var hashes2 [][]byte |
|
|
|
|
|
|
|
// extract ranges
|
|
|
|
// if both upper and lower bounds exist, it's better to get them in order not
|
|
|
|
// no iterate over kvs that are not within range.
|
|
|
|
ranges, rangeIndexes := lookForRanges(conditions) |
|
|
|
if len(ranges) > 0 { |
|
|
|
skipIndexes = append(skipIndexes, rangeIndexes...) |
|
|
|
} |
|
|
|
for _, r := range ranges { |
|
|
|
hashes2 = txi.matchRange(r, startKeyForRange(r, height, heightIndex > 0)) |
|
|
|
|
|
|
|
// initialize hashes if we're running the first time
|
|
|
|
if len(hashes) == 0 { |
|
|
|
for _, h := range hashes2 { |
|
|
|
hashes[hashKey(h)] = h |
|
|
|
} |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
// no matches
|
|
|
|
if len(hashes2) == 0 { |
|
|
|
hashes = make(map[string][]byte) |
|
|
|
} else { |
|
|
|
// perform intersection as we go
|
|
|
|
for _, h := range hashes2 { |
|
|
|
k := hashKey(h) |
|
|
|
if _, ok := hashes[k]; !ok { |
|
|
|
delete(hashes, k) |
|
|
|
} |
|
|
|
for _, r := range ranges { |
|
|
|
if !hashesInitialized { |
|
|
|
hashes = txi.matchRange(r, startKeyForRange(r, height)) |
|
|
|
hashesInitialized = true |
|
|
|
} else { |
|
|
|
hashes = intersect(hashes, txi.matchRange(r, startKeyForRange(r, height))) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -148,27 +143,11 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
hashes2 = txi.match(c, startKey(c, height, heightIndex > 0)) |
|
|
|
|
|
|
|
// initialize hashes if we're running the first time
|
|
|
|
if len(hashes) == 0 { |
|
|
|
for _, h := range hashes2 { |
|
|
|
hashes[hashKey(h)] = h |
|
|
|
} |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
// no matches
|
|
|
|
if len(hashes2) == 0 { |
|
|
|
hashes = make(map[string][]byte) |
|
|
|
if !hashesInitialized { |
|
|
|
hashes = txi.match(c, startKey(c, height)) |
|
|
|
hashesInitialized = true |
|
|
|
} else { |
|
|
|
// perform intersection as we go
|
|
|
|
for _, h := range hashes2 { |
|
|
|
k := hashKey(h) |
|
|
|
if _, ok := hashes[k]; !ok { |
|
|
|
delete(hashes, k) |
|
|
|
} |
|
|
|
} |
|
|
|
hashes = intersect(hashes, txi.match(c, startKey(c, height))) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -177,7 +156,7 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { |
|
|
|
for _, h := range hashes { |
|
|
|
results[i], err = txi.Get(h) |
|
|
|
if err != nil { |
|
|
|
return []*types.TxResult{}, errors.Wrapf(err, "failed to get Tx{%X}", h) |
|
|
|
return nil, errors.Wrapf(err, "failed to get Tx{%X}", h) |
|
|
|
} |
|
|
|
i++ |
|
|
|
} |
|
|
@ -253,15 +232,16 @@ func isRangeOperation(op query.Operator) bool { |
|
|
|
func (txi *TxIndex) match(c query.Condition, startKey []byte) (hashes [][]byte) { |
|
|
|
if c.Op == query.OpEqual { |
|
|
|
it := txi.store.IteratorPrefix(startKey) |
|
|
|
defer it.Release() |
|
|
|
for it.Next() { |
|
|
|
hashes = append(hashes, it.Value()) |
|
|
|
} |
|
|
|
} else if c.Op == query.OpContains { |
|
|
|
// XXX: full scan
|
|
|
|
// XXX: doing full scan because startKey does not apply here
|
|
|
|
it := txi.store.Iterator() |
|
|
|
defer it.Release() |
|
|
|
for it.Next() { |
|
|
|
// if it is a hash key, continue
|
|
|
|
if !strings.Contains(string(it.Key()), "/") { |
|
|
|
if !isTagKey(it.Key()) { |
|
|
|
continue |
|
|
|
} |
|
|
|
if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) { |
|
|
@ -274,9 +254,42 @@ func (txi *TxIndex) match(c query.Condition, startKey []byte) (hashes [][]byte) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
func startKey(c query.Condition, height uint64, heightSpecified bool) []byte { |
|
|
|
func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) { |
|
|
|
it := txi.store.IteratorPrefix(startKey) |
|
|
|
defer it.Release() |
|
|
|
LOOP: |
|
|
|
for it.Next() { |
|
|
|
if !isTagKey(it.Key()) { |
|
|
|
continue |
|
|
|
} |
|
|
|
// no other way to stop iterator other than checking for upperBound
|
|
|
|
switch (r.upperBound).(type) { |
|
|
|
case int64: |
|
|
|
v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) |
|
|
|
if err == nil && v == r.upperBound { |
|
|
|
if r.includeUpperBound { |
|
|
|
hashes = append(hashes, it.Value()) |
|
|
|
} |
|
|
|
break LOOP |
|
|
|
} |
|
|
|
// XXX: passing time in a ABCI Tags is not yet implemented
|
|
|
|
// case time.Time:
|
|
|
|
// v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
|
|
|
|
// if v == r.upperBound {
|
|
|
|
// break
|
|
|
|
// }
|
|
|
|
} |
|
|
|
hashes = append(hashes, it.Value()) |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Keys
|
|
|
|
|
|
|
|
func startKey(c query.Condition, height uint64) []byte { |
|
|
|
var key string |
|
|
|
if heightSpecified { |
|
|
|
if height > 0 { |
|
|
|
key = fmt.Sprintf("%s/%v/%d", c.Tag, c.Operand, height) |
|
|
|
} else { |
|
|
|
key = fmt.Sprintf("%s/%v", c.Tag, c.Operand) |
|
|
@ -284,7 +297,7 @@ func startKey(c query.Condition, height uint64, heightSpecified bool) []byte { |
|
|
|
return []byte(key) |
|
|
|
} |
|
|
|
|
|
|
|
func startKeyForRange(r queryRange, height uint64, heightSpecified bool) []byte { |
|
|
|
func startKeyForRange(r queryRange, height uint64) []byte { |
|
|
|
var lowerBound interface{} |
|
|
|
if r.includeLowerBound { |
|
|
|
lowerBound = r.lowerBound |
|
|
@ -299,7 +312,7 @@ func startKeyForRange(r queryRange, height uint64, heightSpecified bool) []byte |
|
|
|
} |
|
|
|
} |
|
|
|
var key string |
|
|
|
if heightSpecified { |
|
|
|
if height > 0 { |
|
|
|
key = fmt.Sprintf("%s/%v/%d", r.key, lowerBound, height) |
|
|
|
} else { |
|
|
|
key = fmt.Sprintf("%s/%v", r.key, lowerBound) |
|
|
@ -307,35 +320,12 @@ func startKeyForRange(r queryRange, height uint64, heightSpecified bool) []byte |
|
|
|
return []byte(key) |
|
|
|
} |
|
|
|
|
|
|
|
func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) { |
|
|
|
it := txi.store.IteratorPrefix(startKey) |
|
|
|
defer it.Release() |
|
|
|
for it.Next() { |
|
|
|
// no other way to stop iterator other than checking for upperBound
|
|
|
|
switch (r.upperBound).(type) { |
|
|
|
case int64: |
|
|
|
v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) |
|
|
|
if err == nil && v == r.upperBound { |
|
|
|
if r.includeUpperBound { |
|
|
|
hashes = append(hashes, it.Value()) |
|
|
|
} |
|
|
|
break |
|
|
|
} |
|
|
|
// XXX: passing time in a ABCI Tags is not yet implemented
|
|
|
|
// case time.Time:
|
|
|
|
// v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
|
|
|
|
// if v == r.upperBound {
|
|
|
|
// break
|
|
|
|
// }
|
|
|
|
} |
|
|
|
hashes = append(hashes, it.Value()) |
|
|
|
} |
|
|
|
return |
|
|
|
func isTagKey(key []byte) bool { |
|
|
|
return strings.Count(string(key), tagKeySeparator) == 3 |
|
|
|
} |
|
|
|
|
|
|
|
func extractValueFromKey(key []byte) string { |
|
|
|
s := string(key) |
|
|
|
parts := strings.SplitN(s, "/", 3) |
|
|
|
parts := strings.SplitN(string(key), tagKeySeparator, 3) |
|
|
|
return parts[1] |
|
|
|
} |
|
|
|
|
|
|
@ -356,6 +346,9 @@ func hashKey(hash []byte) string { |
|
|
|
return fmt.Sprintf("%X", hash) |
|
|
|
} |
|
|
|
|
|
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Utils
|
|
|
|
|
|
|
|
func stringInSlice(a string, list []string) bool { |
|
|
|
for _, b := range list { |
|
|
|
if b == a { |
|
|
@ -373,3 +366,15 @@ func intInSlice(a int, list []int) bool { |
|
|
|
} |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
func intersect(as, bs [][]byte) [][]byte { |
|
|
|
i := make([][]byte, 0, cmn.MinInt(len(as), len(bs))) |
|
|
|
for _, a := range as { |
|
|
|
for _, b := range bs { |
|
|
|
if bytes.Equal(a, b) { |
|
|
|
i = append(i, a) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return i |
|
|
|
} |