Browse Source

fix tx_indexer's matchRange

before we're using IteratePrefix, which is wrong because we want full
range, not just "account.number=1".
pull/1374/head
Anton Kaliaev 6 years ago
parent
commit
2b63f57b4c
2 changed files with 116 additions and 53 deletions
  1. +83
    -49
      state/txindex/kv/kv.go
  2. +33
    -4
      state/txindex/kv/kv_test.go

+ 83
- 49
state/txindex/kv/kv.go View File

@ -171,10 +171,10 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
for _, r := range ranges {
if !hashesInitialized {
hashes = txi.matchRange(r, startKeyForRange(r, height))
hashes = txi.matchRange(r, []byte(r.key))
hashesInitialized = true
} else {
hashes = intersect(hashes, txi.matchRange(r, startKeyForRange(r, height)))
hashes = intersect(hashes, txi.matchRange(r, []byte(r.key)))
}
}
}
@ -242,6 +242,52 @@ type queryRange struct {
includeUpperBound bool
}
func (r queryRange) lowerBoundValue() interface{} {
if r.lowerBound == nil {
return nil
}
if r.includeLowerBound {
return r.lowerBound
} else {
switch t := r.lowerBound.(type) {
case int64:
return t + 1
case time.Time:
return t.Unix() + 1
default:
panic("not implemented")
}
}
}
func (r queryRange) AnyBound() interface{} {
if r.lowerBound != nil {
return r.lowerBound
} else {
return r.upperBound
}
}
func (r queryRange) upperBoundValue() interface{} {
if r.upperBound == nil {
return nil
}
if r.includeUpperBound {
return r.upperBound
} else {
switch t := r.upperBound.(type) {
case int64:
return t - 1
case time.Time:
return t.Unix() - 1
default:
panic("not implemented")
}
}
}
func lookForRanges(conditions []query.Condition) (ranges queryRanges, indexes []int) {
ranges = make(queryRanges)
for i, c := range conditions {
@ -305,34 +351,49 @@ func (txi *TxIndex) match(c query.Condition, startKey []byte) (hashes [][]byte)
return
}
func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) {
it := dbm.IteratePrefix(txi.store, startKey)
func (txi *TxIndex) matchRange(r queryRange, prefix []byte) (hashes [][]byte) {
// create a map to prevent duplicates
hashesMap := make(map[string][]byte)
lowerBound := r.lowerBoundValue()
upperBound := r.upperBoundValue()
it := dbm.IteratePrefix(txi.store, prefix)
defer it.Close()
LOOP:
for ; it.Valid(); it.Next() {
if !isTagKey(it.Key()) {
continue
}
if r.upperBound != nil {
// no other way to stop iterator other than checking for upperBound
switch (r.upperBound).(type) {
case int64:
v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
if err == nil && v == r.upperBound {
if r.includeUpperBound {
hashes = append(hashes, it.Value())
}
break LOOP
}
// XXX: passing time in a ABCI Tags is not yet implemented
// case time.Time:
// v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
// if v == r.upperBound {
// break
// }
switch r.AnyBound().(type) {
case int64:
v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
if err != nil {
continue LOOP
}
include := true
if lowerBound != nil && v < lowerBound.(int64) {
include = false
}
if upperBound != nil && v > upperBound.(int64) {
include = false
}
if include {
hashesMap[fmt.Sprintf("%X", it.Value())] = it.Value()
}
// XXX: passing time in a ABCI Tags is not yet implemented
// case time.Time:
// v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
// if v == r.upperBound {
// break
// }
}
hashes = append(hashes, it.Value())
}
hashes = make([][]byte, len(hashesMap))
i := 0
for _, h := range hashesMap {
hashes[i] = h
i++
}
return
}
@ -350,33 +411,6 @@ func startKey(c query.Condition, height int64) []byte {
return []byte(key)
}
func startKeyForRange(r queryRange, height int64) []byte {
if r.lowerBound == nil {
return []byte(r.key)
}
var lowerBound interface{}
if r.includeLowerBound {
lowerBound = r.lowerBound
} else {
switch t := r.lowerBound.(type) {
case int64:
lowerBound = t + 1
case time.Time:
lowerBound = t.Unix() + 1
default:
panic("not implemented")
}
}
var key string
if height > 0 {
key = fmt.Sprintf("%s/%v/%d", r.key, lowerBound, height)
} else {
key = fmt.Sprintf("%s/%v", r.key, lowerBound)
}
return []byte(key)
}
func isTagKey(key []byte) bool {
return strings.Count(string(key), tagKeySeparator) == 3
}


+ 33
- 4
state/txindex/kv/kv_test.go View File

@ -20,7 +20,7 @@ func TestTxIndex(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB())
tx := types.Tx("HELLO WORLD")
txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeTypeOK, Log: "", Tags: []cmn.KVPair{}, Fee: cmn.KI64Pair{Key: []uint8{}, Value: 0}}}
txResult := &types.TxResult{Height: 1, Index: 0, Tx: tx, Result: abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeTypeOK, Log: "", Tags: []cmn.KVPair{}, Fee: cmn.KI64Pair{Key: []uint8{}, Value: 0}}}
hash := tx.Hash()
batch := txindex.NewBatch(1)
@ -35,7 +35,7 @@ func TestTxIndex(t *testing.T) {
assert.Equal(t, txResult, loadedTxResult)
tx2 := types.Tx("BYE BYE WORLD")
txResult2 := &types.TxResult{1, 0, tx2, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeTypeOK, Log: "", Tags: []cmn.KVPair{}, Fee: cmn.KI64Pair{Key: []uint8{}, Value: 0}}}
txResult2 := &types.TxResult{Height: 1, Index: 0, Tx: tx2, Result: abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeTypeOK, Log: "", Tags: []cmn.KVPair{}, Fee: cmn.KI64Pair{Key: []uint8{}, Value: 0}}}
hash2 := tx2.Hash()
err = indexer.Index(txResult2)
@ -122,6 +122,35 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
assert.Equal(t, []*types.TxResult{txResult}, results)
}
func TestTxSearchMultipleTxs(t *testing.T) {
allowedTags := []string{"account.number"}
indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags))
// indexed first, but bigger height (to test the order of transactions)
txResult := txResultWithTags([]cmn.KVPair{
{Key: []byte("account.number"), Value: []byte("1")},
})
txResult.Tx = types.Tx("Bob's account")
txResult.Height = 2
err := indexer.Index(txResult)
require.NoError(t, err)
// indexed second, but smaller height (to test the order of transactions)
txResult2 := txResultWithTags([]cmn.KVPair{
{Key: []byte("account.number"), Value: []byte("2")},
})
txResult2.Tx = types.Tx("Alice's account")
txResult2.Height = 1
err = indexer.Index(txResult2)
require.NoError(t, err)
results, err := indexer.Search(query.MustParse("account.number >= 1"))
assert.NoError(t, err)
require.Len(t, results, 2)
assert.Equal(t, []*types.TxResult{txResult2, txResult}, results)
}
func TestIndexAllTags(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB(), IndexAllTags())
@ -146,12 +175,12 @@ func TestIndexAllTags(t *testing.T) {
func txResultWithTags(tags []cmn.KVPair) *types.TxResult {
tx := types.Tx("HELLO WORLD")
return &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeTypeOK, Log: "", Tags: tags, Fee: cmn.KI64Pair{Key: []uint8{}, Value: 0}}}
return &types.TxResult{Height: 1, Index: 0, Tx: tx, Result: abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeTypeOK, Log: "", Tags: tags, Fee: cmn.KI64Pair{Key: []uint8{}, Value: 0}}}
}
func benchmarkTxIndex(txsCount int, b *testing.B) {
tx := types.Tx("HELLO WORLD")
txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeTypeOK, Log: "", Tags: []cmn.KVPair{}, Fee: cmn.KI64Pair{Key: []uint8{}, Value: 0}}}
txResult := &types.TxResult{Height: 1, Index: 0, Tx: tx, Result: abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeTypeOK, Log: "", Tags: []cmn.KVPair{}, Fee: cmn.KI64Pair{Key: []uint8{}, Value: 0}}}
dir, err := ioutil.TempDir("", "tx_index_db")
if err != nil {


Loading…
Cancel
Save