diff --git a/config/config.go b/config/config.go index 8b1c0e289..fc3671d8e 100644 --- a/config/config.go +++ b/config/config.go @@ -418,9 +418,17 @@ func (c *ConsensusConfig) SetWalFile(walFile string) { // indexer, including tags to index. type TxIndexConfig struct { // What indexer to use for transactions + // + // Options: + // 1) "null" (default) + // 2) "kv" - the simplest possible indexer, backed by key-value storage (defaults to levelDB; see DBBackend). Indexer string `mapstructure:"indexer"` - // Comma-separated list of tags to index (by default only by tx hash) + // Comma-separated list of tags to index (by default the only tag is tx hash) + // + // It's recommended to index only a subset of tags due to possible memory + // bloat. This is, of course, depends on the indexer's DB and the volume of + // transactions. IndexTags string `mapstructure:"index_tags"` } diff --git a/node/node.go b/node/node.go index fff550bf8..57fbfbf28 100644 --- a/node/node.go +++ b/node/node.go @@ -287,7 +287,7 @@ func NewNode(config *cfg.Config, if err != nil { return nil, err } - txIndexer = kv.NewTxIndex(store) + txIndexer = kv.NewTxIndex(store, strings.Split(config.TxIndex.IndexTags, ",")) default: txIndexer = &null.TxIndex{} } @@ -299,7 +299,7 @@ func NewNode(config *cfg.Config, for event := range ch { // XXX: may be not perfomant to write one event at a time txResult := event.(types.TMEventData).Unwrap().(types.EventDataTx).TxResult - txIndexer.Index(&txResult, strings.Split(config.TxIndex.IndexTags, ",")) + txIndexer.Index(&txResult) } }() diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 3609c05d9..20fc2c966 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -88,6 +88,7 @@ func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { var proof types.TxProof if prove { + // TODO: handle overflow block := blockStore.LoadBlock(int(height)) proof = block.Data.Txs.Proof(int(index)) } @@ -109,21 +110,24 @@ func TxSearch(query string, prove bool) ([]*ctypes.ResultTx, error) { q, err := tmquery.New(query) if err != nil { - return []*ctypes.ResultTx{}, err + return nil, err } results, err := txIndexer.Search(q) if err != nil { - return []*ctypes.ResultTx{}, err + return nil, err } + // TODO: we may want to consider putting a maximum on this length and somehow + // informing the user that things were truncated. apiResults := make([]*ctypes.ResultTx, len(results)) + var proof types.TxProof for i, r := range results { height := r.Height index := r.Index - var proof types.TxProof if prove { + // TODO: handle overflow block := blockStore.LoadBlock(int(height)) proof = block.Data.Txs.Proof(int(index)) } diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go index 07a544bd9..bd51fbb29 100644 --- a/state/txindex/indexer.go +++ b/state/txindex/indexer.go @@ -11,10 +11,10 @@ import ( type TxIndexer interface { // AddBatch analyzes, indexes and stores a batch of transactions. - AddBatch(b *Batch, allowedTags []string) error + AddBatch(b *Batch) error // Index analyzes, indexes and stores a single transaction. - Index(result *types.TxResult, allowedTags []string) error + Index(result *types.TxResult) error // Get returns the transaction specified by hash or nil if the transaction is not indexed // or stored. diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index ee81674b9..d77711ed8 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -14,21 +14,26 @@ import ( wire "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tmlibs/common" db "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/pubsub/query" ) +const ( + tagKeySeparator = "/" +) + var _ txindex.TxIndexer = (*TxIndex)(nil) -// TxIndex is the simplest possible indexer, backed by Key-Value storage (levelDB). -// It can only index transaction by its identifier. +// TxIndex is the simplest possible indexer, backed by key-value storage (levelDB). type TxIndex struct { - store db.DB + store db.DB + tagsToIndex []string } -// NewTxIndex returns new instance of TxIndex. -func NewTxIndex(store db.DB) *TxIndex { - return &TxIndex{store: store} +// NewTxIndex creates new KV indexer. +func NewTxIndex(store db.DB, tagsToIndex []string) *TxIndex { + return &TxIndex{store: store, tagsToIndex: tagsToIndex} } // Get gets transaction from the TxIndex storage and returns it or nil if the @@ -55,7 +60,7 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { } // AddBatch indexes a batch of transactions using the given list of tags. -func (txi *TxIndex) AddBatch(b *txindex.Batch, allowedTags []string) error { +func (txi *TxIndex) AddBatch(b *txindex.Batch) error { storeBatch := txi.store.NewBatch() for _, result := range b.Ops { @@ -63,7 +68,7 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch, allowedTags []string) error { // index tx by tags for _, tag := range result.Result.Tags { - if stringInSlice(tag.Key, allowedTags) { + if stringInSlice(tag.Key, txi.tagsToIndex) { storeBatch.Set(keyForTag(tag, result), hash) } } @@ -78,14 +83,21 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch, allowedTags []string) error { } // Index indexes a single transaction using the given list of tags. -func (txi *TxIndex) Index(result *types.TxResult, allowedTags []string) error { +func (txi *TxIndex) Index(result *types.TxResult) error { batch := txindex.NewBatch(1) batch.Add(result) - return txi.AddBatch(batch, allowedTags) + return txi.AddBatch(batch) } +// Search performs a search using the given query. It breaks the query into +// conditions (like "tx.height > 5"). For each condition, it queries the DB +// index. One special use cases here: (1) if "tx.hash" is found, it returns tx +// result for it (2) for range queries it is better for the client to provide +// both lower and upper bounds, so we are not performing a full scan. Results +// from querying indexes are then intersected and returned to the caller. func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { - hashes := make(map[string][]byte) // key - (base 16, upper-case hash) + var hashes [][]byte + var hashesInitialized bool // get a list of conditions (like "tx.height > 5") conditions := q.Conditions() @@ -93,13 +105,13 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { // if there is a hash condition, return the result immediately hash, err, ok := lookForHash(conditions) if err != nil { - return []*types.TxResult{}, errors.Wrap(err, "error during searching for a hash in the query") + return nil, errors.Wrap(err, "error during searching for a hash in the query") } else if ok { res, err := txi.Get(hash) return []*types.TxResult{res}, errors.Wrap(err, "error while retrieving the result") } - // conditions to skip + // conditions to skip because they're handled before "everything else" skipIndexes := make([]int, 0) // if there is a height condition ("tx.height=3"), extract it for faster lookups @@ -108,36 +120,19 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { skipIndexes = append(skipIndexes, heightIndex) } - var hashes2 [][]byte - // extract ranges // if both upper and lower bounds exist, it's better to get them in order not // no iterate over kvs that are not within range. ranges, rangeIndexes := lookForRanges(conditions) if len(ranges) > 0 { skipIndexes = append(skipIndexes, rangeIndexes...) - } - for _, r := range ranges { - hashes2 = txi.matchRange(r, startKeyForRange(r, height, heightIndex > 0)) - - // initialize hashes if we're running the first time - if len(hashes) == 0 { - for _, h := range hashes2 { - hashes[hashKey(h)] = h - } - continue - } - // no matches - if len(hashes2) == 0 { - hashes = make(map[string][]byte) - } else { - // perform intersection as we go - for _, h := range hashes2 { - k := hashKey(h) - if _, ok := hashes[k]; !ok { - delete(hashes, k) - } + for _, r := range ranges { + if !hashesInitialized { + hashes = txi.matchRange(r, startKeyForRange(r, height)) + hashesInitialized = true + } else { + hashes = intersect(hashes, txi.matchRange(r, startKeyForRange(r, height))) } } } @@ -148,27 +143,11 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { continue } - hashes2 = txi.match(c, startKey(c, height, heightIndex > 0)) - - // initialize hashes if we're running the first time - if len(hashes) == 0 { - for _, h := range hashes2 { - hashes[hashKey(h)] = h - } - continue - } - - // no matches - if len(hashes2) == 0 { - hashes = make(map[string][]byte) + if !hashesInitialized { + hashes = txi.match(c, startKey(c, height)) + hashesInitialized = true } else { - // perform intersection as we go - for _, h := range hashes2 { - k := hashKey(h) - if _, ok := hashes[k]; !ok { - delete(hashes, k) - } - } + hashes = intersect(hashes, txi.match(c, startKey(c, height))) } } @@ -177,7 +156,7 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { for _, h := range hashes { results[i], err = txi.Get(h) if err != nil { - return []*types.TxResult{}, errors.Wrapf(err, "failed to get Tx{%X}", h) + return nil, errors.Wrapf(err, "failed to get Tx{%X}", h) } i++ } @@ -253,15 +232,16 @@ func isRangeOperation(op query.Operator) bool { func (txi *TxIndex) match(c query.Condition, startKey []byte) (hashes [][]byte) { if c.Op == query.OpEqual { it := txi.store.IteratorPrefix(startKey) + defer it.Release() for it.Next() { hashes = append(hashes, it.Value()) } } else if c.Op == query.OpContains { - // XXX: full scan + // XXX: doing full scan because startKey does not apply here it := txi.store.Iterator() + defer it.Release() for it.Next() { - // if it is a hash key, continue - if !strings.Contains(string(it.Key()), "/") { + if !isTagKey(it.Key()) { continue } if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) { @@ -274,9 +254,42 @@ func (txi *TxIndex) match(c query.Condition, startKey []byte) (hashes [][]byte) return } -func startKey(c query.Condition, height uint64, heightSpecified bool) []byte { +func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) { + it := txi.store.IteratorPrefix(startKey) + defer it.Release() +LOOP: + for it.Next() { + if !isTagKey(it.Key()) { + continue + } + // no other way to stop iterator other than checking for upperBound + switch (r.upperBound).(type) { + case int64: + v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) + if err == nil && v == r.upperBound { + if r.includeUpperBound { + hashes = append(hashes, it.Value()) + } + break LOOP + } + // XXX: passing time in a ABCI Tags is not yet implemented + // case time.Time: + // v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) + // if v == r.upperBound { + // break + // } + } + hashes = append(hashes, it.Value()) + } + return +} + +/////////////////////////////////////////////////////////////////////////////// +// Keys + +func startKey(c query.Condition, height uint64) []byte { var key string - if heightSpecified { + if height > 0 { key = fmt.Sprintf("%s/%v/%d", c.Tag, c.Operand, height) } else { key = fmt.Sprintf("%s/%v", c.Tag, c.Operand) @@ -284,7 +297,7 @@ func startKey(c query.Condition, height uint64, heightSpecified bool) []byte { return []byte(key) } -func startKeyForRange(r queryRange, height uint64, heightSpecified bool) []byte { +func startKeyForRange(r queryRange, height uint64) []byte { var lowerBound interface{} if r.includeLowerBound { lowerBound = r.lowerBound @@ -299,7 +312,7 @@ func startKeyForRange(r queryRange, height uint64, heightSpecified bool) []byte } } var key string - if heightSpecified { + if height > 0 { key = fmt.Sprintf("%s/%v/%d", r.key, lowerBound, height) } else { key = fmt.Sprintf("%s/%v", r.key, lowerBound) @@ -307,35 +320,12 @@ func startKeyForRange(r queryRange, height uint64, heightSpecified bool) []byte return []byte(key) } -func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) { - it := txi.store.IteratorPrefix(startKey) - defer it.Release() - for it.Next() { - // no other way to stop iterator other than checking for upperBound - switch (r.upperBound).(type) { - case int64: - v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) - if err == nil && v == r.upperBound { - if r.includeUpperBound { - hashes = append(hashes, it.Value()) - } - break - } - // XXX: passing time in a ABCI Tags is not yet implemented - // case time.Time: - // v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) - // if v == r.upperBound { - // break - // } - } - hashes = append(hashes, it.Value()) - } - return +func isTagKey(key []byte) bool { + return strings.Count(string(key), tagKeySeparator) == 3 } func extractValueFromKey(key []byte) string { - s := string(key) - parts := strings.SplitN(s, "/", 3) + parts := strings.SplitN(string(key), tagKeySeparator, 3) return parts[1] } @@ -356,6 +346,9 @@ func hashKey(hash []byte) string { return fmt.Sprintf("%X", hash) } +/////////////////////////////////////////////////////////////////////////////// +// Utils + func stringInSlice(a string, list []string) bool { for _, b := range list { if b == a { @@ -373,3 +366,15 @@ func intInSlice(a int, list []int) bool { } return false } + +func intersect(as, bs [][]byte) [][]byte { + i := make([][]byte, 0, cmn.MinInt(len(as), len(bs))) + for _, a := range as { + for _, b := range bs { + if bytes.Equal(a, b) { + i = append(i, a) + } + } + } + return i +} diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index b1f9840e0..a51bb4bf8 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -16,7 +16,7 @@ import ( ) func TestTxIndex(t *testing.T) { - indexer := &TxIndex{store: db.NewMemDB()} + indexer := NewTxIndex(db.NewMemDB(), []string{}) tx := types.Tx("HELLO WORLD") txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}} @@ -26,7 +26,7 @@ func TestTxIndex(t *testing.T) { if err := batch.Add(txResult); err != nil { t.Error(err) } - err := indexer.AddBatch(batch, []string{}) + err := indexer.AddBatch(batch) require.NoError(t, err) loadedTxResult, err := indexer.Get(hash) @@ -37,7 +37,7 @@ func TestTxIndex(t *testing.T) { txResult2 := &types.TxResult{1, 0, tx2, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}} hash2 := tx2.Hash() - err = indexer.Index(txResult2, []string{}) + err = indexer.Index(txResult2) require.NoError(t, err) loadedTxResult2, err := indexer.Get(hash2) @@ -46,61 +46,55 @@ func TestTxIndex(t *testing.T) { } func TestTxSearch(t *testing.T) { - indexer := &TxIndex{store: db.NewMemDB()} + tagsToIndex := []string{"account.number", "account.owner", "account.date"} + indexer := NewTxIndex(db.NewMemDB(), tagsToIndex) tx := types.Tx("HELLO WORLD") tags := []*abci.KVPair{ - &abci.KVPair{Key: "account.number", ValueType: abci.KVPair_INT, ValueInt: 1}, - &abci.KVPair{Key: "account.owner", ValueType: abci.KVPair_STRING, ValueString: "Ivan"}, - &abci.KVPair{Key: "not_allowed", ValueType: abci.KVPair_STRING, ValueString: "Vlad"}, + {Key: "account.number", ValueType: abci.KVPair_INT, ValueInt: 1}, + {Key: "account.owner", ValueType: abci.KVPair_STRING, ValueString: "Ivan"}, + {Key: "not_allowed", ValueType: abci.KVPair_STRING, ValueString: "Vlad"}, } txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: tags}} hash := tx.Hash() - allowedTags := []string{"account.number", "account.owner", "account.date"} - err := indexer.Index(txResult, allowedTags) + err := indexer.Index(txResult) require.NoError(t, err) testCases := []struct { q string - expectError bool resultsLength int - results []*types.TxResult }{ // search by hash - {fmt.Sprintf("tx.hash = '%X'", hash), false, 1, []*types.TxResult{txResult}}, + {fmt.Sprintf("tx.hash = '%X'", hash), 1}, // search by exact match (one tag) - {"account.number = 1", false, 1, []*types.TxResult{txResult}}, + {"account.number = 1", 1}, // search by exact match (two tags) - {"account.number = 1 AND account.owner = 'Ivan'", false, 1, []*types.TxResult{txResult}}, + {"account.number = 1 AND account.owner = 'Ivan'", 1}, // search by exact match (two tags) - {"account.number = 1 AND account.owner = 'Vlad'", false, 0, []*types.TxResult{}}, + {"account.number = 1 AND account.owner = 'Vlad'", 0}, // search by range - {"account.number >= 1 AND account.number <= 5", false, 1, []*types.TxResult{txResult}}, + {"account.number >= 1 AND account.number <= 5", 1}, // search using not allowed tag - {"not_allowed = 'boom'", false, 0, []*types.TxResult{}}, + {"not_allowed = 'boom'", 0}, // search for not existing tx result - {"account.number >= 2 AND account.number <= 5", false, 0, []*types.TxResult{}}, + {"account.number >= 2 AND account.number <= 5", 0}, // search using not existing tag - {"account.date >= TIME 2013-05-03T14:45:00Z", false, 0, []*types.TxResult{}}, + {"account.date >= TIME 2013-05-03T14:45:00Z", 0}, // search using CONTAINS - {"account.owner CONTAINS 'an'", false, 1, []*types.TxResult{txResult}}, + {"account.owner CONTAINS 'an'", 1}, // search using CONTAINS - {"account.owner CONTAINS 'Vlad'", false, 0, []*types.TxResult{}}, + {"account.owner CONTAINS 'Vlad'", 0}, } for _, tc := range testCases { t.Run(tc.q, func(t *testing.T) { results, err := indexer.Search(query.MustParse(tc.q)) - if tc.expectError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } + assert.NoError(t, err) assert.Len(t, results, tc.resultsLength) if tc.resultsLength > 0 { - assert.Equal(t, tc.results, results) + assert.Equal(t, []*types.TxResult{txResult}, results) } }) } @@ -117,7 +111,7 @@ func benchmarkTxIndex(txsCount int, b *testing.B) { defer os.RemoveAll(dir) // nolint: errcheck store := db.NewDB("tx_index", "leveldb", dir) - indexer := &TxIndex{store: store} + indexer := NewTxIndex(store, []string{}) batch := txindex.NewBatch(txsCount) for i := 0; i < txsCount; i++ { @@ -130,7 +124,7 @@ func benchmarkTxIndex(txsCount int, b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - err = indexer.AddBatch(batch, []string{}) + err = indexer.AddBatch(batch) } if err != nil { b.Fatal(err) diff --git a/state/txindex/null/null.go b/state/txindex/null/null.go index 12f5eb91a..0764faa9e 100644 --- a/state/txindex/null/null.go +++ b/state/txindex/null/null.go @@ -19,12 +19,12 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { } // AddBatch is a noop and always returns nil. -func (txi *TxIndex) AddBatch(batch *txindex.Batch, allowedTags []string) error { +func (txi *TxIndex) AddBatch(batch *txindex.Batch) error { return nil } // Index is a noop and always returns nil. -func (txi *TxIndex) Index(result *types.TxResult, allowedTags []string) error { +func (txi *TxIndex) Index(result *types.TxResult) error { return nil } diff --git a/types/event_bus.go b/types/event_bus.go index 1a89ef296..6cee1d82b 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -106,26 +106,15 @@ func (b *EventBus) PublishEventTx(event EventDataTx) error { } // add predefined tags - if tag, ok := tags[EventTypeKey]; ok { - b.Logger.Error("Found predefined tag (value will be overwritten)", "tag", tag) - } + logIfTagExists(EventTypeKey, tags, b.Logger) tags[EventTypeKey] = EventTx - if tag, ok := tags[TxHashKey]; ok { - b.Logger.Error("Found predefined tag (value will be overwritten)", "tag", tag) - } + logIfTagExists(TxHashKey, tags, b.Logger) tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash()) - if tag, ok := tags[TxHeightKey]; ok { - b.Logger.Error("Found predefined tag (value will be overwritten)", "tag", tag) - } + logIfTagExists(TxHeightKey, tags, b.Logger) tags[TxHeightKey] = event.Height - if tag, ok := tags[TxIndexKey]; ok { - b.Logger.Error("Found predefined tag (value will be overwritten)", "tag", tag) - } - tags[TxIndexKey] = event.Index - b.pubsub.PublishWithTags(ctx, TMEventData{event}, tags) return nil } @@ -171,3 +160,9 @@ func (b *EventBus) PublishEventRelock(event EventDataRoundState) error { func (b *EventBus) PublishEventLock(event EventDataRoundState) error { return b.Publish(EventLock, TMEventData{event}) } + +func logIfTagExists(tag string, tags map[string]interface{}, logger log.Logger) { + if value, ok := tags[tag]; ok { + logger.Error("Found predefined tag (value will be overwritten)", "tag", tag, "value", value) + } +} diff --git a/types/events.go b/types/events.go index 10df2643f..9bf7a5a43 100644 --- a/types/events.go +++ b/types/events.go @@ -143,9 +143,6 @@ const ( // TxHeightKey is a reserved key, used to specify transaction block's height. // see EventBus#PublishEventTx TxHeightKey = "tx.height" - // TxIndexKey is a reserved key, used to specify transaction's index within the block. - // see EventBus#PublishEventTx - TxIndexKey = "tx.index" ) var (