From 14fa8007738c51fe0951544a63eeea759679d495 Mon Sep 17 00:00:00 2001 From: Alexander Bezobchuk Date: Wed, 31 Jul 2019 08:01:55 -0700 Subject: [PATCH] txindexer: Refactor Tx Search Aggregation (#3851) - Replace the previous intersect call, which was called at each query condition, with a map intersection. - Replace fmt.Sprintf with string() closes: #3076 Benchmarks ``` Old goos: darwin goarch: amd64 pkg: github.com/tendermint/tendermint/state/txindex/kv BenchmarkTxSearch-4 200 103641206 ns/op 7998416 B/op 71171 allocs/op PASS ok github.com/tendermint/tendermint/state/txindex/kv 26.019s New goos: darwin goarch: amd64 pkg: github.com/tendermint/tendermint/state/txindex/kv BenchmarkTxSearch-4 1000 38615024 ns/op 13515226 B/op 166460 allocs/op PASS ok github.com/tendermint/tendermint/state/txindex/kv 53.618s ``` ~62% performance improvement Commits: * Refactor tx search * Add pending changelog entry * Add tx search benchmarking * remove intermediate hashes list also reset timer in BenchmarkTxSearch and fix other benchmark * fix import * Add test cases * Fix searching * Replace fmt.Sprintf with string * Update state/txindex/kv/kv.go Co-Authored-By: Anton Kaliaev * Rename params * Cleanup * Check error in benchmarks --- CHANGELOG_PENDING.md | 1 + state/txindex/kv/kv.go | 143 +++++++++++++++++++++--------- state/txindex/kv/kv_bench_test.go | 72 +++++++++++++++ state/txindex/kv/kv_test.go | 12 ++- 4 files changed, 185 insertions(+), 43 deletions(-) create mode 100644 state/txindex/kv/kv_bench_test.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 8d72dedb9..6a57dd9d9 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -32,6 +32,7 @@ program](https://hackerone.com/tendermint). - [p2p] \#3664 p2p/conn: reuse buffer when write/read from secret connection - [mempool] \#3826 Make `max_msg_bytes` configurable - [blockchain] \#3561 Add early version of the new blockchain reactor, which is supposed to be more modular and testable compared to the old version. To try it, you'll have to change `version` in the config file, [here](https://github.com/tendermint/tendermint/blob/master/config/toml.go#L303) NOTE: It's not ready for a production yet. For further information, see [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md) & [ADR-43](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-043-blockchain-riri-org.md) +- [rpc] \#3076 Improve transaction search performance ### BUG FIXES: diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index 4551aa9c9..89d8868a0 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -11,11 +11,12 @@ import ( "github.com/pkg/errors" + dbm "github.com/tendermint/tm-db" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" - dbm "github.com/tendermint/tm-db" ) const ( @@ -163,8 +164,8 @@ func (txi *TxIndex) indexEvents(result *types.TxResult, hash []byte, store dbm.S // both lower and upper bounds, so we are not performing a full scan. Results // from querying indexes are then intersected and returned to the caller. func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { - var hashes [][]byte var hashesInitialized bool + filteredHashes := make(map[string][]byte) // get a list of conditions (like "tx.height > 5") conditions := q.Conditions() @@ -193,10 +194,16 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { for _, r := range ranges { if !hashesInitialized { - hashes = txi.matchRange(r, startKey(r.key)) + filteredHashes = txi.matchRange(r, startKey(r.key), filteredHashes, true) hashesInitialized = true + + // Ignore any remaining conditions if the first condition resulted + // in no matches (assuming implicit AND operand). + if len(filteredHashes) == 0 { + break + } } else { - hashes = intersect(hashes, txi.matchRange(r, startKey(r.key))) + filteredHashes = txi.matchRange(r, startKey(r.key), filteredHashes, false) } } } @@ -211,21 +218,26 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { } if !hashesInitialized { - hashes = txi.match(c, startKeyForCondition(c, height)) + filteredHashes = txi.match(c, startKeyForCondition(c, height), filteredHashes, true) hashesInitialized = true + + // Ignore any remaining conditions if the first condition resulted + // in no matches (assuming implicit AND operand). + if len(filteredHashes) == 0 { + break + } } else { - hashes = intersect(hashes, txi.match(c, startKeyForCondition(c, height))) + filteredHashes = txi.match(c, startKeyForCondition(c, height), filteredHashes, false) } } - results := make([]*types.TxResult, len(hashes)) - i := 0 - for _, h := range hashes { - results[i], err = txi.Get(h) + results := make([]*types.TxResult, 0, len(filteredHashes)) + for _, h := range filteredHashes { + res, err := txi.Get(h) if err != nil { return nil, errors.Wrapf(err, "failed to get Tx{%X}", h) } - i++ + results = append(results, res) } // sort by height & index by default @@ -353,63 +365,115 @@ func isRangeOperation(op query.Operator) bool { } } -func (txi *TxIndex) match(c query.Condition, startKeyBz []byte) (hashes [][]byte) { +// match returns all matching txs by hash that meet a given condition and start +// key. An already filtered result (filteredHashes) is provided such that any +// non-intersecting matches are removed. +// +// NOTE: filteredHashes may be empty if no previous condition has matched. +func (txi *TxIndex) match(c query.Condition, startKeyBz []byte, filteredHashes map[string][]byte, firstRun bool) map[string][]byte { + // A previous match was attempted but resulted in no matches, so we return + // no matches (assuming AND operand). + if !firstRun && len(filteredHashes) == 0 { + return filteredHashes + } + + tmpHashes := make(map[string][]byte) + if c.Op == query.OpEqual { it := dbm.IteratePrefix(txi.store, startKeyBz) defer it.Close() + for ; it.Valid(); it.Next() { - hashes = append(hashes, it.Value()) + tmpHashes[string(it.Value())] = it.Value() } + } else if c.Op == query.OpContains { // XXX: startKey does not apply here. // For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an" // we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/" it := dbm.IteratePrefix(txi.store, startKey(c.Tag)) defer it.Close() + for ; it.Valid(); it.Next() { if !isTagKey(it.Key()) { continue } + if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) { - hashes = append(hashes, it.Value()) + tmpHashes[string(it.Value())] = it.Value() } } } else { panic("other operators should be handled already") } - return + + if len(tmpHashes) == 0 || firstRun { + // Either: + // + // 1. Regardless if a previous match was attempted, which may have had + // results, but no match was found for the current condition, then we + // return no matches (assuming AND operand). + // + // 2. A previous match was not attempted, so we return all results. + return tmpHashes + } + + // Remove/reduce matches in filteredHashes that were not found in this + // match (tmpHashes). + for k := range filteredHashes { + if tmpHashes[k] == nil { + delete(filteredHashes, k) + } + } + + return filteredHashes } -func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) { - // create a map to prevent duplicates - hashesMap := make(map[string][]byte) +// matchRange returns all matching txs by hash that meet a given queryRange and +// start key. An already filtered result (filteredHashes) is provided such that +// any non-intersecting matches are removed. +// +// NOTE: filteredHashes may be empty if no previous condition has matched. +func (txi *TxIndex) matchRange(r queryRange, startKey []byte, filteredHashes map[string][]byte, firstRun bool) map[string][]byte { + // A previous match was attempted but resulted in no matches, so we return + // no matches (assuming AND operand). + if !firstRun && len(filteredHashes) == 0 { + return filteredHashes + } + tmpHashes := make(map[string][]byte) lowerBound := r.lowerBoundValue() upperBound := r.upperBoundValue() it := dbm.IteratePrefix(txi.store, startKey) defer it.Close() + LOOP: for ; it.Valid(); it.Next() { if !isTagKey(it.Key()) { continue } + switch r.AnyBound().(type) { case int64: v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) if err != nil { continue LOOP } + include := true if lowerBound != nil && v < lowerBound.(int64) { include = false } + if upperBound != nil && v > upperBound.(int64) { include = false } + if include { - hashesMap[fmt.Sprintf("%X", it.Value())] = it.Value() + tmpHashes[string(it.Value())] = it.Value() } + // XXX: passing time in a ABCI Tags is not yet implemented // case time.Time: // v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) @@ -418,13 +482,27 @@ LOOP: // } } } - hashes = make([][]byte, len(hashesMap)) - i := 0 - for _, h := range hashesMap { - hashes[i] = h - i++ + + if len(tmpHashes) == 0 || firstRun { + // Either: + // + // 1. Regardless if a previous match was attempted, which may have had + // results, but no match was found for the current condition, then we + // return no matches (assuming AND operand). + // + // 2. A previous match was not attempted, so we return all results. + return tmpHashes } - return + + // Remove/reduce matches in filteredHashes that were not found in this + // match (tmpHashes). + for k := range filteredHashes { + if tmpHashes[k] == nil { + delete(filteredHashes, k) + } + } + + return filteredHashes } /////////////////////////////////////////////////////////////////////////////// @@ -471,18 +549,3 @@ func startKey(fields ...interface{}) []byte { } return b.Bytes() } - -/////////////////////////////////////////////////////////////////////////////// -// Utils - -func intersect(as, bs [][]byte) [][]byte { - i := make([][]byte, 0, cmn.MinInt(len(as), len(bs))) - for _, a := range as { - for _, b := range bs { - if bytes.Equal(a, b) { - i = append(i, a) - } - } - } - return i -} diff --git a/state/txindex/kv/kv_bench_test.go b/state/txindex/kv/kv_bench_test.go new file mode 100644 index 000000000..9c3442a01 --- /dev/null +++ b/state/txindex/kv/kv_bench_test.go @@ -0,0 +1,72 @@ +package kv + +import ( + "crypto/rand" + "fmt" + "io/ioutil" + "testing" + + abci "github.com/tendermint/tendermint/abci/types" + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/types" + dbm "github.com/tendermint/tm-db" +) + +func BenchmarkTxSearch(b *testing.B) { + dbDir, err := ioutil.TempDir("", "benchmark_tx_search_test") + if err != nil { + b.Errorf("failed to create temporary directory: %s", err) + } + + db, err := dbm.NewGoLevelDB("benchmark_tx_search_test", dbDir) + if err != nil { + b.Errorf("failed to create database: %s", err) + } + + allowedTags := []string{"transfer.address", "transfer.amount"} + indexer := NewTxIndex(db, IndexTags(allowedTags)) + + for i := 0; i < 35000; i++ { + events := []abci.Event{ + { + Type: "transfer", + Attributes: []cmn.KVPair{ + {Key: []byte("address"), Value: []byte(fmt.Sprintf("address_%d", i%100))}, + {Key: []byte("amount"), Value: []byte("50")}, + }, + }, + } + + txBz := make([]byte, 8) + if _, err := rand.Read(txBz); err != nil { + b.Errorf("failed produce random bytes: %s", err) + } + + txResult := &types.TxResult{ + Height: int64(i), + Index: 0, + Tx: types.Tx(string(txBz)), + Result: abci.ResponseDeliverTx{ + Data: []byte{0}, + Code: abci.CodeTypeOK, + Log: "", + Events: events, + }, + } + + if err := indexer.Index(txResult); err != nil { + b.Errorf("failed to index tx: %s", err) + } + } + + txQuery := query.MustParse("transfer.address = 'address_43' AND transfer.amount = 50") + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if _, err := indexer.Search(txQuery); err != nil { + b.Errorf("failed to query for txs: %s", err) + } + } +} diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index 1175ae2f0..a0c833e49 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -8,10 +8,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - abci "github.com/tendermint/tendermint/abci/types" - cmn "github.com/tendermint/tendermint/libs/common" + db "github.com/tendermint/tm-db" + abci "github.com/tendermint/tendermint/abci/types" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" @@ -89,6 +90,11 @@ func TestTxSearch(t *testing.T) { {"account.number = 1 AND account.owner = 'Ivan'", 1}, // search by exact match (two tags) {"account.number = 1 AND account.owner = 'Vlad'", 0}, + {"account.owner = 'Vlad' AND account.number = 1", 0}, + {"account.number >= 1 AND account.owner = 'Vlad'", 0}, + {"account.owner = 'Vlad' AND account.number >= 1", 0}, + {"account.number <= 0", 0}, + {"account.number <= 0 AND account.owner = 'Ivan'", 0}, // search using a prefix of the stored value {"account.owner = 'Iv'", 0}, // search by range @@ -310,7 +316,7 @@ func benchmarkTxIndex(txsCount int64, b *testing.B) { } defer os.RemoveAll(dir) // nolint: errcheck - store := db.NewDB("tx_index", "leveldb", dir) + store := db.NewDB("tx_index", "goleveldb", dir) indexer := NewTxIndex(store) batch := txindex.NewBatch(txsCount)