Browse Source

txindexer: Refactor Tx Search Aggregation (#3851)

- Replace the previous intersect call, which was called at each query condition, with a map intersection.
- Replace fmt.Sprintf with string()

closes: #3076

Benchmarks

```
Old
goos: darwin
goarch: amd64
pkg: github.com/tendermint/tendermint/state/txindex/kv
BenchmarkTxSearch-4   	     200	 103641206 ns/op	 7998416 B/op	   71171 allocs/op
PASS
ok  	github.com/tendermint/tendermint/state/txindex/kv	26.019s

New
goos: darwin
goarch: amd64
pkg: github.com/tendermint/tendermint/state/txindex/kv
BenchmarkTxSearch-4   	    1000	  38615024 ns/op	13515226 B/op	  166460 allocs/op
PASS
ok  	github.com/tendermint/tendermint/state/txindex/kv	53.618s
```

~62% performance improvement

Commits:

* Refactor tx search

* Add pending changelog entry

* Add tx search benchmarking

* remove intermediate hashes list

also reset timer in BenchmarkTxSearch
and fix other benchmark

* fix import

* Add test cases

* Fix searching

* Replace fmt.Sprintf with string

* Update state/txindex/kv/kv.go

Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com>

* Rename params

* Cleanup

* Check error in benchmarks
pull/3926/head
Alexander Bezobchuk 5 years ago
committed by Jack Zampolin
parent
commit
aacc71dc29
4 changed files with 185 additions and 43 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +103
    -40
      state/txindex/kv/kv.go
  3. +72
    -0
      state/txindex/kv/kv_bench_test.go
  4. +9
    -3
      state/txindex/kv/kv_test.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -32,6 +32,7 @@ program](https://hackerone.com/tendermint).
- [p2p] \#3664 p2p/conn: reuse buffer when write/read from secret connection
- [mempool] \#3826 Make `max_msg_bytes` configurable
- [blockchain] \#3561 Add early version of the new blockchain reactor, which is supposed to be more modular and testable compared to the old version. To try it, you'll have to change `version` in the config file, [here](https://github.com/tendermint/tendermint/blob/master/config/toml.go#L303) NOTE: It's not ready for a production yet. For further information, see [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md) & [ADR-43](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-043-blockchain-riri-org.md)
- [rpc] \#3076 Improve transaction search performance
### BUG FIXES:


+ 103
- 40
state/txindex/kv/kv.go View File

@ -11,11 +11,12 @@ import (
"github.com/pkg/errors"
dbm "github.com/tendermint/tm-db"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tm-db"
)
const (
@ -163,8 +164,8 @@ func (txi *TxIndex) indexEvents(result *types.TxResult, hash []byte, store dbm.S
// both lower and upper bounds, so we are not performing a full scan. Results
// from querying indexes are then intersected and returned to the caller.
func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
var hashes [][]byte
var hashesInitialized bool
filteredHashes := make(map[string][]byte)
// get a list of conditions (like "tx.height > 5")
conditions := q.Conditions()
@ -193,10 +194,16 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
for _, r := range ranges {
if !hashesInitialized {
hashes = txi.matchRange(r, startKey(r.key))
filteredHashes = txi.matchRange(r, startKey(r.key), filteredHashes, true)
hashesInitialized = true
// Ignore any remaining conditions if the first condition resulted
// in no matches (assuming implicit AND operand).
if len(filteredHashes) == 0 {
break
}
} else {
hashes = intersect(hashes, txi.matchRange(r, startKey(r.key)))
filteredHashes = txi.matchRange(r, startKey(r.key), filteredHashes, false)
}
}
}
@ -211,21 +218,26 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
}
if !hashesInitialized {
hashes = txi.match(c, startKeyForCondition(c, height))
filteredHashes = txi.match(c, startKeyForCondition(c, height), filteredHashes, true)
hashesInitialized = true
// Ignore any remaining conditions if the first condition resulted
// in no matches (assuming implicit AND operand).
if len(filteredHashes) == 0 {
break
}
} else {
hashes = intersect(hashes, txi.match(c, startKeyForCondition(c, height)))
filteredHashes = txi.match(c, startKeyForCondition(c, height), filteredHashes, false)
}
}
results := make([]*types.TxResult, len(hashes))
i := 0
for _, h := range hashes {
results[i], err = txi.Get(h)
results := make([]*types.TxResult, 0, len(filteredHashes))
for _, h := range filteredHashes {
res, err := txi.Get(h)
if err != nil {
return nil, errors.Wrapf(err, "failed to get Tx{%X}", h)
}
i++
results = append(results, res)
}
// sort by height & index by default
@ -353,63 +365,115 @@ func isRangeOperation(op query.Operator) bool {
}
}
func (txi *TxIndex) match(c query.Condition, startKeyBz []byte) (hashes [][]byte) {
// match returns all matching txs by hash that meet a given condition and start
// key. An already filtered result (filteredHashes) is provided such that any
// non-intersecting matches are removed.
//
// NOTE: filteredHashes may be empty if no previous condition has matched.
func (txi *TxIndex) match(c query.Condition, startKeyBz []byte, filteredHashes map[string][]byte, firstRun bool) map[string][]byte {
// A previous match was attempted but resulted in no matches, so we return
// no matches (assuming AND operand).
if !firstRun && len(filteredHashes) == 0 {
return filteredHashes
}
tmpHashes := make(map[string][]byte)
if c.Op == query.OpEqual {
it := dbm.IteratePrefix(txi.store, startKeyBz)
defer it.Close()
for ; it.Valid(); it.Next() {
hashes = append(hashes, it.Value())
tmpHashes[string(it.Value())] = it.Value()
}
} else if c.Op == query.OpContains {
// XXX: startKey does not apply here.
// For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an"
// we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/"
it := dbm.IteratePrefix(txi.store, startKey(c.Tag))
defer it.Close()
for ; it.Valid(); it.Next() {
if !isTagKey(it.Key()) {
continue
}
if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
hashes = append(hashes, it.Value())
tmpHashes[string(it.Value())] = it.Value()
}
}
} else {
panic("other operators should be handled already")
}
return
if len(tmpHashes) == 0 || firstRun {
// Either:
//
// 1. Regardless if a previous match was attempted, which may have had
// results, but no match was found for the current condition, then we
// return no matches (assuming AND operand).
//
// 2. A previous match was not attempted, so we return all results.
return tmpHashes
}
// Remove/reduce matches in filteredHashes that were not found in this
// match (tmpHashes).
for k := range filteredHashes {
if tmpHashes[k] == nil {
delete(filteredHashes, k)
}
}
return filteredHashes
}
func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) {
// create a map to prevent duplicates
hashesMap := make(map[string][]byte)
// matchRange returns all matching txs by hash that meet a given queryRange and
// start key. An already filtered result (filteredHashes) is provided such that
// any non-intersecting matches are removed.
//
// NOTE: filteredHashes may be empty if no previous condition has matched.
func (txi *TxIndex) matchRange(r queryRange, startKey []byte, filteredHashes map[string][]byte, firstRun bool) map[string][]byte {
// A previous match was attempted but resulted in no matches, so we return
// no matches (assuming AND operand).
if !firstRun && len(filteredHashes) == 0 {
return filteredHashes
}
tmpHashes := make(map[string][]byte)
lowerBound := r.lowerBoundValue()
upperBound := r.upperBoundValue()
it := dbm.IteratePrefix(txi.store, startKey)
defer it.Close()
LOOP:
for ; it.Valid(); it.Next() {
if !isTagKey(it.Key()) {
continue
}
switch r.AnyBound().(type) {
case int64:
v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
if err != nil {
continue LOOP
}
include := true
if lowerBound != nil && v < lowerBound.(int64) {
include = false
}
if upperBound != nil && v > upperBound.(int64) {
include = false
}
if include {
hashesMap[fmt.Sprintf("%X", it.Value())] = it.Value()
tmpHashes[string(it.Value())] = it.Value()
}
// XXX: passing time in a ABCI Tags is not yet implemented
// case time.Time:
// v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
@ -418,13 +482,27 @@ LOOP:
// }
}
}
hashes = make([][]byte, len(hashesMap))
i := 0
for _, h := range hashesMap {
hashes[i] = h
i++
if len(tmpHashes) == 0 || firstRun {
// Either:
//
// 1. Regardless if a previous match was attempted, which may have had
// results, but no match was found for the current condition, then we
// return no matches (assuming AND operand).
//
// 2. A previous match was not attempted, so we return all results.
return tmpHashes
}
return
// Remove/reduce matches in filteredHashes that were not found in this
// match (tmpHashes).
for k := range filteredHashes {
if tmpHashes[k] == nil {
delete(filteredHashes, k)
}
}
return filteredHashes
}
///////////////////////////////////////////////////////////////////////////////
@ -471,18 +549,3 @@ func startKey(fields ...interface{}) []byte {
}
return b.Bytes()
}
///////////////////////////////////////////////////////////////////////////////
// Utils
func intersect(as, bs [][]byte) [][]byte {
i := make([][]byte, 0, cmn.MinInt(len(as), len(bs)))
for _, a := range as {
for _, b := range bs {
if bytes.Equal(a, b) {
i = append(i, a)
}
}
}
return i
}

+ 72
- 0
state/txindex/kv/kv_bench_test.go View File

@ -0,0 +1,72 @@
package kv
import (
"crypto/rand"
"fmt"
"io/ioutil"
"testing"
abci "github.com/tendermint/tendermint/abci/types"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tm-db"
)
func BenchmarkTxSearch(b *testing.B) {
dbDir, err := ioutil.TempDir("", "benchmark_tx_search_test")
if err != nil {
b.Errorf("failed to create temporary directory: %s", err)
}
db, err := dbm.NewGoLevelDB("benchmark_tx_search_test", dbDir)
if err != nil {
b.Errorf("failed to create database: %s", err)
}
allowedTags := []string{"transfer.address", "transfer.amount"}
indexer := NewTxIndex(db, IndexTags(allowedTags))
for i := 0; i < 35000; i++ {
events := []abci.Event{
{
Type: "transfer",
Attributes: []cmn.KVPair{
{Key: []byte("address"), Value: []byte(fmt.Sprintf("address_%d", i%100))},
{Key: []byte("amount"), Value: []byte("50")},
},
},
}
txBz := make([]byte, 8)
if _, err := rand.Read(txBz); err != nil {
b.Errorf("failed produce random bytes: %s", err)
}
txResult := &types.TxResult{
Height: int64(i),
Index: 0,
Tx: types.Tx(string(txBz)),
Result: abci.ResponseDeliverTx{
Data: []byte{0},
Code: abci.CodeTypeOK,
Log: "",
Events: events,
},
}
if err := indexer.Index(txResult); err != nil {
b.Errorf("failed to index tx: %s", err)
}
}
txQuery := query.MustParse("transfer.address = 'address_43' AND transfer.amount = 50")
b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := indexer.Search(txQuery); err != nil {
b.Errorf("failed to query for txs: %s", err)
}
}
}

+ 9
- 3
state/txindex/kv/kv_test.go View File

@ -8,10 +8,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
cmn "github.com/tendermint/tendermint/libs/common"
db "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
@ -89,6 +90,11 @@ func TestTxSearch(t *testing.T) {
{"account.number = 1 AND account.owner = 'Ivan'", 1},
// search by exact match (two tags)
{"account.number = 1 AND account.owner = 'Vlad'", 0},
{"account.owner = 'Vlad' AND account.number = 1", 0},
{"account.number >= 1 AND account.owner = 'Vlad'", 0},
{"account.owner = 'Vlad' AND account.number >= 1", 0},
{"account.number <= 0", 0},
{"account.number <= 0 AND account.owner = 'Ivan'", 0},
// search using a prefix of the stored value
{"account.owner = 'Iv'", 0},
// search by range
@ -310,7 +316,7 @@ func benchmarkTxIndex(txsCount int64, b *testing.B) {
}
defer os.RemoveAll(dir) // nolint: errcheck
store := db.NewDB("tx_index", "leveldb", dir)
store := db.NewDB("tx_index", "goleveldb", dir)
indexer := NewTxIndex(store)
batch := txindex.NewBatch(txsCount)


Loading…
Cancel
Save