From 003f394512977d8e3d0a6b64bb5f5d34d2ff6c4a Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Mon, 22 Mar 2021 11:47:35 -0400 Subject: [PATCH] rpc: index block events to support block event queries (#6226) --- CHANGELOG_PENDING.md | 1 + docs/app-dev/indexing-transactions.md | 67 +++- light/proxy/routes.go | 39 +- light/rpc/client.go | 40 ++- node/node.go | 37 +- rpc/client/http/http.go | 35 +- rpc/client/interface.go | 21 +- rpc/client/local/local.go | 11 +- rpc/client/mocks/client.go | 25 +- rpc/core/blocks.go | 69 ++++ rpc/core/env.go | 2 + rpc/core/routes.go | 1 + rpc/core/tx.go | 12 +- rpc/core/types/responses.go | 6 + rpc/openapi/openapi.yaml | 56 +++ state/indexer/block.go | 22 ++ state/indexer/block/kv/kv.go | 489 ++++++++++++++++++++++++++ state/indexer/block/kv/kv_test.go | 141 ++++++++ state/indexer/block/kv/util.go | 96 +++++ state/indexer/block/null/null.go | 26 ++ state/indexer/query_range.go | 123 +++++++ state/txindex/indexer.go | 9 +- state/txindex/indexer_service.go | 44 ++- state/txindex/indexer_service_test.go | 20 +- state/txindex/kv/kv.go | 119 +------ test/e2e/app/app.go | 26 +- types/events.go | 4 + 27 files changed, 1353 insertions(+), 188 deletions(-) create mode 100644 state/indexer/block.go create mode 100644 state/indexer/block/kv/kv.go create mode 100644 state/indexer/block/kv/kv_test.go create mode 100644 state/indexer/block/kv/util.go create mode 100644 state/indexer/block/null/null.go create mode 100644 state/indexer/query_range.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index e432b9293..da3fdd837 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -51,6 +51,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi ### FEATURES +- [rpc] \#6226 Index block events and expose a new RPC method, `/block_search`, to allow querying for blocks by `BeginBlock` and `EndBlock` events. (@alexanderbez) - [config] Add `--mode` flag and config variable. See [ADR-52](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-052-tendermint-mode.md) @dongsam ### IMPROVEMENTS diff --git a/docs/app-dev/indexing-transactions.md b/docs/app-dev/indexing-transactions.md index 579f8690d..78161a040 100644 --- a/docs/app-dev/indexing-transactions.md +++ b/docs/app-dev/indexing-transactions.md @@ -4,12 +4,13 @@ order: 6 # Indexing Transactions -Tendermint allows you to index transactions and later query or subscribe to their results. - -Events can be used to index transactions and blocks according to what happened -during their execution. Note that the set of events returned for a block from -`BeginBlock` and `EndBlock` are merged. In case both methods return the same -type, only the key-value pairs defined in `EndBlock` are used. +Tendermint allows you to index transactions and blocks and later query or +subscribe to their results. Transactions are indexed by `TxResult.Events` and +blocks are indexed by `Response(Begin|End)Block.Events`. However, transactions +are also indexed by a primary key which includes the transaction hash and maps +to and stores the corresponding `TxResult`. Blocks are indexed by a primary key +which includes the block height and maps to and stores the block height, i.e. +the block itself is never stored. Each event contains a type and a list of attributes, which are key-value pairs denoting something about what happened during the method's execution. For more @@ -17,7 +18,7 @@ details on `Events`, see the [ABCI](https://github.com/tendermint/spec/blob/master/spec/abci/abci.md#events) documentation. -An Event has a composite key associated with it. A `compositeKey` is +An `Event` has a composite key associated with it. A `compositeKey` is constructed by its type and key separated by a dot. For example: @@ -44,11 +45,29 @@ Let's take a look at the `[tx_index]` config section: indexer = "kv" ``` -By default, Tendermint will index all transactions by their respective -hashes and height using an embedded simple indexer. +By default, Tendermint will index all transactions by their respective hashes +and height and blocks by their height. You can turn off indexing completely by setting `tx_index` to `null`. +## Default Indexes + +The Tendermint tx and block event indexer indexes a few select reserved events +by default. + +### Transactions + +The following indexes are indexed by default: + +- `tx.height` +- `tx.hash` + +### Blocks + +The following indexes are indexed by default: + +- `block.height` + ## Adding Events Applications are free to define which events to index. Tendermint does not @@ -77,19 +96,21 @@ func (app *KVStoreApplication) DeliverTx(req types.RequestDeliverTx) types.Resul } ``` -The transaction will be indexed (if the indexer is not `null`) with a certain attribute if the attribute's `Index` field is set to `true`. -In the above example, all attributes will be indexed. +If the indexer is not `null`, the transaction will be indexed. Each event is +indexed using a composite key in the form of `{eventType}.{eventAttribute}={eventValue}`, +e.g. `transfer.sender=bob`. -## Querying Transactions +## Querying Transactions Events -You can query the transaction results by calling `/tx_search` RPC endpoint: +You can query for a paginated set of transaction by their events by calling the +`/tx_search` RPC endpoint: ```bash -curl "localhost:26657/tx_search?query=\"account.name='igor'\"&prove=true" +curl "localhost:26657/tx_search?query=\"message.sender='cosmos1...'\"&prove=true" ``` -Check out [API docs](https://docs.tendermint.com/master/rpc/#/Info/tx_search) for more information -on query syntax and other options. +Check out [API docs](https://docs.tendermint.com/master/rpc/#/Info/tx_search) +for more information on query syntax and other options. ## Subscribing to Transactions @@ -102,10 +123,22 @@ a query to `/subscribe` RPC endpoint. "method": "subscribe", "id": "0", "params": { - "query": "account.name='igor'" + "query": "message.sender='cosmos1...'" } } ``` Check out [API docs](https://docs.tendermint.com/master/rpc/#subscribe) for more information on query syntax and other options. + +## Querying Blocks Events + +You can query for a paginated set of blocks by their events by calling the +`/block_search` RPC endpoint: + +```bash +curl "localhost:26657/block_search?query=\"block.height > 10 AND val_set.num_changed > 0\"" +``` + +Check out [API docs](https://docs.tendermint.com/master/rpc/#/Info/block_search) +for more information on query syntax and other options. diff --git a/light/proxy/routes.go b/light/proxy/routes.go index 0ed7f9433..e28b23e0c 100644 --- a/light/proxy/routes.go +++ b/light/proxy/routes.go @@ -29,6 +29,7 @@ func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc { "commit": rpcserver.NewRPCFunc(makeCommitFunc(c), "height"), "tx": rpcserver.NewRPCFunc(makeTxFunc(c), "hash,prove"), "tx_search": rpcserver.NewRPCFunc(makeTxSearchFunc(c), "query,prove,page,per_page,order_by"), + "block_search": rpcserver.NewRPCFunc(makeBlockSearchFunc(c), "query,page,per_page,order_by"), "validators": rpcserver.NewRPCFunc(makeValidatorsFunc(c), "height,page,per_page"), "dump_consensus_state": rpcserver.NewRPCFunc(makeDumpConsensusStateFunc(c), ""), "consensus_state": rpcserver.NewRPCFunc(makeConsensusStateFunc(c), ""), @@ -131,16 +132,46 @@ func makeTxFunc(c *lrpc.Client) rpcTxFunc { } } -type rpcTxSearchFunc func(ctx *rpctypes.Context, query string, prove bool, - page, perPage *int, orderBy string) (*ctypes.ResultTxSearch, error) +type rpcTxSearchFunc func( + ctx *rpctypes.Context, + query string, + prove bool, + page, perPage *int, + orderBy string, +) (*ctypes.ResultTxSearch, error) func makeTxSearchFunc(c *lrpc.Client) rpcTxSearchFunc { - return func(ctx *rpctypes.Context, query string, prove bool, page, perPage *int, orderBy string) ( - *ctypes.ResultTxSearch, error) { + return func( + ctx *rpctypes.Context, + query string, + prove bool, + page, perPage *int, + orderBy string, + ) (*ctypes.ResultTxSearch, error) { return c.TxSearch(ctx.Context(), query, prove, page, perPage, orderBy) } } +type rpcBlockSearchFunc func( + ctx *rpctypes.Context, + query string, + prove bool, + page, perPage *int, + orderBy string, +) (*ctypes.ResultBlockSearch, error) + +func makeBlockSearchFunc(c *lrpc.Client) rpcBlockSearchFunc { + return func( + ctx *rpctypes.Context, + query string, + prove bool, + page, perPage *int, + orderBy string, + ) (*ctypes.ResultBlockSearch, error) { + return c.BlockSearch(ctx.Context(), query, page, perPage, orderBy) + } +} + type rpcValidatorsFunc func(ctx *rpctypes.Context, height *int64, page, perPage *int) (*ctypes.ResultValidators, error) diff --git a/light/rpc/client.go b/light/rpc/client.go index 5c14619c8..6a216ed2e 100644 --- a/light/rpc/client.go +++ b/light/rpc/client.go @@ -33,15 +33,18 @@ type LightClient interface { TrustedLightBlock(height int64) (*types.LightBlock, error) } +var _ rpcclient.Client = (*Client)(nil) + // Client is an RPC client, which uses light#Client to verify data (if it can -// be proved!). merkle.DefaultProofRuntime is used to verify values returned by -// ABCIQuery. +// be proved). Note, merkle.DefaultProofRuntime is used to verify values +// returned by ABCI#Query. type Client struct { service.BaseService next rpcclient.Client lc LightClient - // Proof runtime used to verify values returned by ABCIQuery + + // proof runtime used to verify values returned by ABCIQuery prt *merkle.ProofRuntime keyPathFn KeyPathFunc } @@ -468,16 +471,34 @@ func (c *Client) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.Resul return res, res.Proof.Validate(l.DataHash) } -func (c *Client) TxSearch(ctx context.Context, query string, prove bool, page, perPage *int, orderBy string) ( - *ctypes.ResultTxSearch, error) { +func (c *Client) TxSearch( + ctx context.Context, + query string, + prove bool, + page, perPage *int, + orderBy string, +) (*ctypes.ResultTxSearch, error) { return c.next.TxSearch(ctx, query, prove, page, perPage, orderBy) } +func (c *Client) BlockSearch( + ctx context.Context, + query string, + page, perPage *int, + orderBy string, +) (*ctypes.ResultBlockSearch, error) { + return c.next.BlockSearch(ctx, query, page, perPage, orderBy) +} + // Validators fetches and verifies validators. -func (c *Client) Validators(ctx context.Context, height *int64, pagePtr, perPagePtr *int) (*ctypes.ResultValidators, - error) { - // Update the light client if we're behind and retrieve the light block at the requested height - // or at the latest height if no height is provided. +func (c *Client) Validators( + ctx context.Context, + height *int64, + pagePtr, perPagePtr *int, +) (*ctypes.ResultValidators, error) { + + // Update the light client if we're behind and retrieve the light block at the + // requested height or at the latest height if no height is provided. l, err := c.updateLightClientIfNeededTo(ctx, height) if err != nil { return nil, err @@ -491,7 +512,6 @@ func (c *Client) Validators(ctx context.Context, height *int64, pagePtr, perPage } skipCount := validateSkipCount(page, perPage) - v := l.ValidatorSet.Validators[skipCount : skipCount+tmmath.MinInt(perPage, totalCount-skipCount)] return &ctypes.ResultValidators{ diff --git a/node/node.go b/node/node.go index 4f7901e3e..32374ac8c 100644 --- a/node/node.go +++ b/node/node.go @@ -15,7 +15,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/cors" - dbm "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" @@ -42,6 +41,9 @@ import ( grpccore "github.com/tendermint/tendermint/rpc/grpc" rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server" sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" + blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv" + blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex/kv" "github.com/tendermint/tendermint/state/txindex/null" @@ -230,6 +232,7 @@ type Node struct { proxyApp proxy.AppConns // connection to the application rpcListeners []net.Listener // rpc servers txIndexer txindex.TxIndexer + blockIndexer indexer.BlockIndexer indexerService *txindex.IndexerService prometheusSrv *http.Server } @@ -268,27 +271,40 @@ func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) { return eventBus, nil } -func createAndStartIndexerService(config *cfg.Config, dbProvider DBProvider, - eventBus *types.EventBus, logger log.Logger) (*txindex.IndexerService, txindex.TxIndexer, error) { +func createAndStartIndexerService( + config *cfg.Config, + dbProvider DBProvider, + eventBus *types.EventBus, + logger log.Logger, +) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) { + + var ( + txIndexer txindex.TxIndexer + blockIndexer indexer.BlockIndexer + ) - var txIndexer txindex.TxIndexer switch config.TxIndex.Indexer { case "kv": store, err := dbProvider(&DBContext{"tx_index", config}) if err != nil { - return nil, nil, err + return nil, nil, nil, err } + txIndexer = kv.NewTxIndex(store) + blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events"))) default: txIndexer = &null.TxIndex{} + blockIndexer = &blockidxnull.BlockerIndexer{} } - indexerService := txindex.NewIndexerService(txIndexer, eventBus) + indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) indexerService.SetLogger(logger.With("module", "txindex")) + if err := indexerService.Start(); err != nil { - return nil, nil, err + return nil, nil, nil, err } - return indexerService, txIndexer, nil + + return indexerService, txIndexer, blockIndexer, nil } func doHandshake( @@ -962,8 +978,7 @@ func NewNode(config *cfg.Config, return nil, err } - // Transaction indexing - indexerService, txIndexer, err := createAndStartIndexerService(config, dbProvider, eventBus, logger) + indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config, dbProvider, eventBus, logger) if err != nil { return nil, err } @@ -1215,6 +1230,7 @@ func NewNode(config *cfg.Config, proxyApp: proxyApp, txIndexer: txIndexer, indexerService: indexerService, + blockIndexer: blockIndexer, eventBus: eventBus, } node.BaseService = *service.NewBaseService(logger, "Node", node) @@ -1449,6 +1465,7 @@ func (n *Node) ConfigureRPC() error { GenDoc: n.genesisDoc, TxIndexer: n.txIndexer, + BlockIndexer: n.blockIndexer, ConsensusReactor: n.consensusReactor, EventBus: n.eventBus, Mempool: n.mempool, diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index 7009fad01..27f5576c8 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -471,24 +471,55 @@ func (c *baseRPCClient) TxSearch( page, perPage *int, orderBy string, -) ( - *ctypes.ResultTxSearch, error) { +) (*ctypes.ResultTxSearch, error) { + result := new(ctypes.ResultTxSearch) params := map[string]interface{}{ "query": query, "prove": prove, "order_by": orderBy, } + if page != nil { params["page"] = page } if perPage != nil { params["per_page"] = perPage } + _, err := c.caller.Call(ctx, "tx_search", params, result) if err != nil { return nil, err } + + return result, nil +} + +func (c *baseRPCClient) BlockSearch( + ctx context.Context, + query string, + page, perPage *int, + orderBy string, +) (*ctypes.ResultBlockSearch, error) { + + result := new(ctypes.ResultBlockSearch) + params := map[string]interface{}{ + "query": query, + "order_by": orderBy, + } + + if page != nil { + params["page"] = page + } + if perPage != nil { + params["per_page"] = perPage + } + + _, err := c.caller.Call(ctx, "block_search", params, result) + if err != nil { + return nil, err + } + return result, nil } diff --git a/rpc/client/interface.go b/rpc/client/interface.go index f2c68a5c2..e31d1457f 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -70,8 +70,25 @@ type SignClient interface { Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) Validators(ctx context.Context, height *int64, page, perPage *int) (*ctypes.ResultValidators, error) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) - TxSearch(ctx context.Context, query string, prove bool, page, perPage *int, - orderBy string) (*ctypes.ResultTxSearch, error) + + // TxSearch defines a method to search for a paginated set of transactions by + // DeliverTx event search criteria. + TxSearch( + ctx context.Context, + query string, + prove bool, + page, perPage *int, + orderBy string, + ) (*ctypes.ResultTxSearch, error) + + // BlockSearch defines a method to search for a paginated set of blocks by + // BeginBlock and EndBlock event search criteria. + BlockSearch( + ctx context.Context, + query string, + page, perPage *int, + orderBy string, + ) (*ctypes.ResultBlockSearch, error) } // HistoryClient provides access to data from genesis to now in large chunks. diff --git a/rpc/client/local/local.go b/rpc/client/local/local.go index 841e325f4..7e508b603 100644 --- a/rpc/client/local/local.go +++ b/rpc/client/local/local.go @@ -178,7 +178,7 @@ func (c *Local) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.Result } func (c *Local) TxSearch( - ctx context.Context, + _ context.Context, query string, prove bool, page, @@ -188,6 +188,15 @@ func (c *Local) TxSearch( return core.TxSearch(c.ctx, query, prove, page, perPage, orderBy) } +func (c *Local) BlockSearch( + _ context.Context, + query string, + page, perPage *int, + orderBy string, +) (*ctypes.ResultBlockSearch, error) { + return core.BlockSearch(c.ctx, query, page, perPage, orderBy) +} + func (c *Local) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) { return core.BroadcastEvidence(c.ctx, ev) } diff --git a/rpc/client/mocks/client.go b/rpc/client/mocks/client.go index 6a9008717..265ba796d 100644 --- a/rpc/client/mocks/client.go +++ b/rpc/client/mocks/client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.3.0. DO NOT EDIT. +// Code generated by mockery v2.6.0. DO NOT EDIT. package mocks @@ -160,6 +160,29 @@ func (_m *Client) BlockResults(ctx context.Context, height *int64) (*coretypes.R return r0, r1 } +// BlockSearch provides a mock function with given fields: ctx, query, page, perPage, orderBy +func (_m *Client) BlockSearch(ctx context.Context, query string, page *int, perPage *int, orderBy string) (*coretypes.ResultBlockSearch, error) { + ret := _m.Called(ctx, query, page, perPage, orderBy) + + var r0 *coretypes.ResultBlockSearch + if rf, ok := ret.Get(0).(func(context.Context, string, *int, *int, string) *coretypes.ResultBlockSearch); ok { + r0 = rf(ctx, query, page, perPage, orderBy) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*coretypes.ResultBlockSearch) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, *int, *int, string) error); ok { + r1 = rf(ctx, query, page, perPage, orderBy) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // BlockchainInfo provides a mock function with given fields: ctx, minHeight, maxHeight func (_m *Client) BlockchainInfo(ctx context.Context, minHeight int64, maxHeight int64) (*coretypes.ResultBlockchainInfo, error) { ret := _m.Called(ctx, minHeight, maxHeight) diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index e846f0a89..51d25217c 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -1,11 +1,15 @@ package core import ( + "errors" "fmt" + "sort" tmmath "github.com/tendermint/tendermint/libs/math" + tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types" + blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null" "github.com/tendermint/tendermint/types" ) @@ -164,3 +168,68 @@ func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockR ConsensusParamUpdates: results.EndBlock.ConsensusParamUpdates, }, nil } + +// BlockSearch searches for a paginated set of blocks matching BeginBlock and +// EndBlock event search criteria. +func BlockSearch( + ctx *rpctypes.Context, + query string, + pagePtr, perPagePtr *int, + orderBy string, +) (*ctypes.ResultBlockSearch, error) { + + // skip if block indexing is disabled + if _, ok := env.BlockIndexer.(*blockidxnull.BlockerIndexer); ok { + return nil, errors.New("block indexing is disabled") + } + + q, err := tmquery.New(query) + if err != nil { + return nil, err + } + + results, err := env.BlockIndexer.Search(ctx.Context(), q) + if err != nil { + return nil, err + } + + // sort results (must be done before pagination) + switch orderBy { + case "desc", "": + sort.Slice(results, func(i, j int) bool { return results[i] > results[j] }) + + case "asc": + sort.Slice(results, func(i, j int) bool { return results[i] < results[j] }) + + default: + return nil, fmt.Errorf("%w: expected order_by to be either `asc` or `desc` or empty", ctypes.ErrInvalidRequest) + } + + // paginate results + totalCount := len(results) + perPage := validatePerPage(perPagePtr) + + page, err := validatePage(pagePtr, perPage, totalCount) + if err != nil { + return nil, err + } + + skipCount := validateSkipCount(page, perPage) + pageSize := tmmath.MinInt(perPage, totalCount-skipCount) + + apiResults := make([]*ctypes.ResultBlock, 0, pageSize) + for i := skipCount; i < skipCount+pageSize; i++ { + block := env.BlockStore.LoadBlock(results[i]) + if block != nil { + blockMeta := env.BlockStore.LoadBlockMeta(block.Height) + if blockMeta != nil { + apiResults = append(apiResults, &ctypes.ResultBlock{ + Block: block, + BlockID: blockMeta.BlockID, + }) + } + } + } + + return &ctypes.ResultBlockSearch{Blocks: apiResults, TotalCount: totalCount}, nil +} diff --git a/rpc/core/env.go b/rpc/core/env.go index 7584023ca..889c1df14 100644 --- a/rpc/core/env.go +++ b/rpc/core/env.go @@ -13,6 +13,7 @@ import ( "github.com/tendermint/tendermint/proxy" ctypes "github.com/tendermint/tendermint/rpc/core/types" sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" ) @@ -83,6 +84,7 @@ type Environment struct { PubKey crypto.PubKey GenDoc *types.GenesisDoc // cache the genesis structure TxIndexer txindex.TxIndexer + BlockIndexer indexer.BlockIndexer ConsensusReactor *consensus.Reactor EventBus *types.EventBus // thread safe Mempool mempl.Mempool diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 639a4be93..5583d6f29 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -26,6 +26,7 @@ var Routes = map[string]*rpc.RPCFunc{ "check_tx": rpc.NewRPCFunc(CheckTx, "tx"), "tx": rpc.NewRPCFunc(Tx, "hash,prove"), "tx_search": rpc.NewRPCFunc(TxSearch, "query,prove,page,per_page,order_by"), + "block_search": rpc.NewRPCFunc(BlockSearch, "query,page,per_page,order_by"), "validators": rpc.NewRPCFunc(Validators, "height,page,per_page"), "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""), "consensus_state": rpc.NewRPCFunc(ConsensusState, ""), diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 98721db8e..4922b040c 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -54,8 +54,14 @@ func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error // TxSearch allows you to query for multiple transactions results. It returns a // list of transactions (maximum ?per_page entries) and the total count. // More: https://docs.tendermint.com/master/rpc/#/Info/tx_search -func TxSearch(ctx *rpctypes.Context, query string, prove bool, pagePtr, perPagePtr *int, orderBy string) ( - *ctypes.ResultTxSearch, error) { +func TxSearch( + ctx *rpctypes.Context, + query string, + prove bool, + pagePtr, perPagePtr *int, + orderBy string, +) (*ctypes.ResultTxSearch, error) { + // if index is disabled, return error if _, ok := env.TxIndexer.(*null.TxIndex); ok { return nil, errors.New("transaction indexing is disabled") @@ -94,10 +100,12 @@ func TxSearch(ctx *rpctypes.Context, query string, prove bool, pagePtr, perPageP // paginate results totalCount := len(results) perPage := validatePerPage(perPagePtr) + page, err := validatePage(pagePtr, perPage, totalCount) if err != nil { return nil, err } + skipCount := validateSkipCount(page, perPage) pageSize := tmmath.MinInt(perPage, totalCount-skipCount) diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 3390c6502..c5bd2fef2 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -208,6 +208,12 @@ type ResultTxSearch struct { TotalCount int `json:"total_count"` } +// ResultBlockSearch defines the RPC response type for a block search by events. +type ResultBlockSearch struct { + Blocks []*ResultBlock `json:"blocks"` + TotalCount int `json:"total_count"` +} + // List of mempool txs type ResultUnconfirmedTxs struct { Count int `json:"n_txs"` diff --git a/rpc/openapi/openapi.yaml b/rpc/openapi/openapi.yaml index e5985d080..008357fef 100644 --- a/rpc/openapi/openapi.yaml +++ b/rpc/openapi/openapi.yaml @@ -1021,6 +1021,62 @@ paths: application/json: schema: $ref: "#/components/schemas/ErrorResponse" + /block_search: + get: + summary: Search for blocks by BeginBlock and EndBlock events + description: | + Search for blocks by BeginBlock and EndBlock events. + + See /subscribe for the query syntax. + operationId: block_search + parameters: + - in: query + name: query + description: Query + required: true + schema: + type: string + example: "block.height > 1000 AND valset.changed > 0" + - in: query + name: page + description: "Page number (1-based)" + required: false + schema: + type: integer + default: 1 + example: 1 + - in: query + name: per_page + description: "Number of entries per page (max: 100)" + required: false + schema: + type: integer + default: 30 + example: 30 + - in: query + name: order_by + description: Order in which blocks are sorted ("asc" or "desc"), by height. If empty, default sorting will be still applied. + required: false + schema: + type: string + default: "desc" + example: "asc" + tags: + - Info + responses: + "200": + description: List of paginated blocks matching the search criteria. + content: + application/json: + schema: + $ref: "#/components/schemas/BlockSearchResponse" + "500": + description: Error + content: + application/json: + schema: + $ref: "#/components/schemas/ErrorResponse" + /tx: get: summary: Get transactions by hash diff --git a/state/indexer/block.go b/state/indexer/block.go new file mode 100644 index 000000000..a3a2abc5b --- /dev/null +++ b/state/indexer/block.go @@ -0,0 +1,22 @@ +package indexer + +import ( + "context" + + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/types" +) + +// BlockIndexer defines an interface contract for indexing block events. +type BlockIndexer interface { + // Has returns true if the given height has been indexed. An error is returned + // upon database query failure. + Has(height int64) (bool, error) + + // Index indexes BeginBlock and EndBlock events for a given block by its height. + Index(types.EventDataNewBlockHeader) error + + // Search performs a query for block heights that match a given BeginBlock + // and Endblock event search criteria. + Search(ctx context.Context, q *query.Query) ([]int64, error) +} diff --git a/state/indexer/block/kv/kv.go b/state/indexer/block/kv/kv.go new file mode 100644 index 000000000..916c5f8ae --- /dev/null +++ b/state/indexer/block/kv/kv.go @@ -0,0 +1,489 @@ +package kv + +import ( + "context" + "errors" + "fmt" + "sort" + "strconv" + "strings" + + "github.com/google/orderedcode" + dbm "github.com/tendermint/tm-db" + + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/types" +) + +var _ indexer.BlockIndexer = (*BlockerIndexer)(nil) + +// BlockerIndexer implements a block indexer, indexing BeginBlock and EndBlock +// events with an underlying KV store. Block events are indexed by their height, +// such that matching search criteria returns the respective block height(s). +type BlockerIndexer struct { + store dbm.DB +} + +func New(store dbm.DB) *BlockerIndexer { + return &BlockerIndexer{ + store: store, + } +} + +// Has returns true if the given height has been indexed. An error is returned +// upon database query failure. +func (idx *BlockerIndexer) Has(height int64) (bool, error) { + key, err := heightKey(height) + if err != nil { + return false, fmt.Errorf("failed to create block height index key: %w", err) + } + + return idx.store.Has(key) +} + +// Index indexes BeginBlock and EndBlock events for a given block by its height. +// The following is indexed: +// +// primary key: encode(block.height | height) => encode(height) +// BeginBlock events: encode(eventType.eventAttr|eventValue|height|begin_block) => encode(height) +// EndBlock events: encode(eventType.eventAttr|eventValue|height|end_block) => encode(height) +func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockHeader) error { + batch := idx.store.NewBatch() + defer batch.Close() + + height := bh.Header.Height + + // 1. index by height + key, err := heightKey(height) + if err != nil { + return fmt.Errorf("failed to create block height index key: %w", err) + } + if err := batch.Set(key, int64ToBytes(height)); err != nil { + return err + } + + // 2. index BeginBlock events + if err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height); err != nil { + return fmt.Errorf("failed to index BeginBlock events: %w", err) + } + + // 3. index EndBlock events + if err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height); err != nil { + return fmt.Errorf("failed to index EndBlock events: %w", err) + } + + return batch.WriteSync() +} + +// Search performs a query for block heights that match a given BeginBlock +// and Endblock event search criteria. The given query can match against zero, +// one or more block heights. In the case of height queries, i.e. block.height=H, +// if the height is indexed, that height alone will be returned. An error and +// nil slice is returned. Otherwise, a non-nil slice and nil error is returned. +func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, error) { + results := make([]int64, 0) + select { + case <-ctx.Done(): + return results, nil + + default: + } + + conditions, err := q.Conditions() + if err != nil { + return nil, fmt.Errorf("failed to parse query conditions: %w", err) + } + + // If there is an exact height query, return the result immediately + // (if it exists). + height, ok := lookForHeight(conditions) + if ok { + ok, err := idx.Has(height) + if err != nil { + return nil, err + } + + if ok { + return []int64{height}, nil + } + + return results, nil + } + + var heightsInitialized bool + filteredHeights := make(map[string][]byte) + + // conditions to skip because they're handled before "everything else" + skipIndexes := make([]int, 0) + + // Extract ranges. If both upper and lower bounds exist, it's better to get + // them in order as to not iterate over kvs that are not within range. + ranges, rangeIndexes := indexer.LookForRanges(conditions) + if len(ranges) > 0 { + skipIndexes = append(skipIndexes, rangeIndexes...) + + for _, qr := range ranges { + prefix, err := orderedcode.Append(nil, qr.Key) + if err != nil { + return nil, fmt.Errorf("failed to create prefix key: %w", err) + } + + if !heightsInitialized { + filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, true) + if err != nil { + return nil, err + } + + heightsInitialized = true + + // Ignore any remaining conditions if the first condition resulted in no + // matches (assuming implicit AND operand). + if len(filteredHeights) == 0 { + break + } + } else { + filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, false) + if err != nil { + return nil, err + } + } + } + } + + // for all other conditions + for i, c := range conditions { + if intInSlice(i, skipIndexes) { + continue + } + + startKey, err := orderedcode.Append(nil, c.CompositeKey, fmt.Sprintf("%v", c.Operand)) + if err != nil { + return nil, err + } + + if !heightsInitialized { + filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, true) + if err != nil { + return nil, err + } + + heightsInitialized = true + + // Ignore any remaining conditions if the first condition resulted in no + // matches (assuming implicit AND operand). + if len(filteredHeights) == 0 { + break + } + } else { + filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, false) + if err != nil { + return nil, err + } + } + } + + // fetch matching heights + results = make([]int64, 0, len(filteredHeights)) + for _, hBz := range filteredHeights { + h := int64FromBytes(hBz) + + ok, err := idx.Has(h) + if err != nil { + return nil, err + } + if ok { + results = append(results, h) + } + + select { + case <-ctx.Done(): + break + + default: + } + } + + sort.Slice(results, func(i, j int) bool { return results[i] < results[j] }) + + return results, nil +} + +// matchRange returns all matching block heights that match a given QueryRange +// and start key. An already filtered result (filteredHeights) is provided such +// that any non-intersecting matches are removed. +// +// NOTE: The provided filteredHeights may be empty if no previous condition has +// matched. +func (idx *BlockerIndexer) matchRange( + ctx context.Context, + qr indexer.QueryRange, + startKey []byte, + filteredHeights map[string][]byte, + firstRun bool, +) (map[string][]byte, error) { + + // A previous match was attempted but resulted in no matches, so we return + // no matches (assuming AND operand). + if !firstRun && len(filteredHeights) == 0 { + return filteredHeights, nil + } + + tmpHeights := make(map[string][]byte) + lowerBound := qr.LowerBoundValue() + upperBound := qr.UpperBoundValue() + + it, err := dbm.IteratePrefix(idx.store, startKey) + if err != nil { + return nil, fmt.Errorf("failed to create prefix iterator: %w", err) + } + defer it.Close() + +LOOP: + for ; it.Valid(); it.Next() { + var ( + eventValue string + err error + ) + + if qr.Key == types.BlockHeightKey { + eventValue, err = parseValueFromPrimaryKey(it.Key()) + } else { + eventValue, err = parseValueFromEventKey(it.Key()) + } + + if err != nil { + continue + } + + if _, ok := qr.AnyBound().(int64); ok { + v, err := strconv.ParseInt(eventValue, 10, 64) + if err != nil { + continue LOOP + } + + include := true + if lowerBound != nil && v < lowerBound.(int64) { + include = false + } + + if upperBound != nil && v > upperBound.(int64) { + include = false + } + + if include { + tmpHeights[string(it.Value())] = it.Value() + } + } + + select { + case <-ctx.Done(): + break + + default: + } + } + + if err := it.Error(); err != nil { + return nil, err + } + + if len(tmpHeights) == 0 || firstRun { + // Either: + // + // 1. Regardless if a previous match was attempted, which may have had + // results, but no match was found for the current condition, then we + // return no matches (assuming AND operand). + // + // 2. A previous match was not attempted, so we return all results. + return tmpHeights, nil + } + + // Remove/reduce matches in filteredHashes that were not found in this + // match (tmpHashes). + for k := range filteredHeights { + if tmpHeights[k] == nil { + delete(filteredHeights, k) + + select { + case <-ctx.Done(): + break + + default: + } + } + } + + return filteredHeights, nil +} + +// match returns all matching heights that meet a given query condition and start +// key. An already filtered result (filteredHeights) is provided such that any +// non-intersecting matches are removed. +// +// NOTE: The provided filteredHeights may be empty if no previous condition has +// matched. +func (idx *BlockerIndexer) match( + ctx context.Context, + c query.Condition, + startKeyBz []byte, + filteredHeights map[string][]byte, + firstRun bool, +) (map[string][]byte, error) { + + // A previous match was attempted but resulted in no matches, so we return + // no matches (assuming AND operand). + if !firstRun && len(filteredHeights) == 0 { + return filteredHeights, nil + } + + tmpHeights := make(map[string][]byte) + + switch { + case c.Op == query.OpEqual: + it, err := dbm.IteratePrefix(idx.store, startKeyBz) + if err != nil { + return nil, fmt.Errorf("failed to create prefix iterator: %w", err) + } + defer it.Close() + + for ; it.Valid(); it.Next() { + tmpHeights[string(it.Value())] = it.Value() + + if err := ctx.Err(); err != nil { + break + } + } + + if err := it.Error(); err != nil { + return nil, err + } + + case c.Op == query.OpExists: + prefix, err := orderedcode.Append(nil, c.CompositeKey) + if err != nil { + return nil, err + } + + it, err := dbm.IteratePrefix(idx.store, prefix) + if err != nil { + return nil, fmt.Errorf("failed to create prefix iterator: %w", err) + } + defer it.Close() + + for ; it.Valid(); it.Next() { + tmpHeights[string(it.Value())] = it.Value() + + select { + case <-ctx.Done(): + break + + default: + } + } + + if err := it.Error(); err != nil { + return nil, err + } + + case c.Op == query.OpContains: + prefix, err := orderedcode.Append(nil, c.CompositeKey) + if err != nil { + return nil, err + } + + it, err := dbm.IteratePrefix(idx.store, prefix) + if err != nil { + return nil, fmt.Errorf("failed to create prefix iterator: %w", err) + } + defer it.Close() + + for ; it.Valid(); it.Next() { + eventValue, err := parseValueFromEventKey(it.Key()) + if err != nil { + continue + } + + if strings.Contains(eventValue, c.Operand.(string)) { + tmpHeights[string(it.Value())] = it.Value() + } + + select { + case <-ctx.Done(): + break + + default: + } + } + if err := it.Error(); err != nil { + return nil, err + } + + default: + return nil, errors.New("other operators should be handled already") + } + + if len(tmpHeights) == 0 || firstRun { + // Either: + // + // 1. Regardless if a previous match was attempted, which may have had + // results, but no match was found for the current condition, then we + // return no matches (assuming AND operand). + // + // 2. A previous match was not attempted, so we return all results. + return tmpHeights, nil + } + + // Remove/reduce matches in filteredHeights that were not found in this + // match (tmpHeights). + for k := range filteredHeights { + if tmpHeights[k] == nil { + delete(filteredHeights, k) + + select { + case <-ctx.Done(): + break + + default: + } + } + } + + return filteredHeights, nil +} + +func (idx *BlockerIndexer) indexEvents(batch dbm.Batch, events []abci.Event, typ string, height int64) error { + heightBz := int64ToBytes(height) + + for _, event := range events { + // only index events with a non-empty type + if len(event.Type) == 0 { + continue + } + + for _, attr := range event.Attributes { + if len(attr.Key) == 0 { + continue + } + + // index iff the event specified index:true and it's not a reserved event + compositeKey := fmt.Sprintf("%s.%s", event.Type, string(attr.Key)) + if compositeKey == types.TxHashKey || compositeKey == types.TxHeightKey { + return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey) + } + if attr.GetIndex() { + key, err := eventKey(compositeKey, typ, string(attr.Value), height) + if err != nil { + return fmt.Errorf("failed to create block index key: %w", err) + } + + if err := batch.Set(key, heightBz); err != nil { + return err + } + } + } + } + + return nil +} diff --git a/state/indexer/block/kv/kv_test.go b/state/indexer/block/kv/kv_test.go new file mode 100644 index 000000000..eacf51c42 --- /dev/null +++ b/state/indexer/block/kv/kv_test.go @@ -0,0 +1,141 @@ +package kv_test + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/pubsub/query" + blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv" + "github.com/tendermint/tendermint/types" + db "github.com/tendermint/tm-db" +) + +func TestBlockIndexer(t *testing.T) { + store := db.NewPrefixDB(db.NewMemDB(), []byte("block_events")) + indexer := blockidxkv.New(store) + + require.NoError(t, indexer.Index(types.EventDataNewBlockHeader{ + Header: types.Header{Height: 1}, + ResultBeginBlock: abci.ResponseBeginBlock{ + Events: []abci.Event{ + { + Type: "begin_event", + Attributes: []abci.EventAttribute{ + { + Key: []byte("proposer"), + Value: []byte("FCAA001"), + Index: true, + }, + }, + }, + }, + }, + ResultEndBlock: abci.ResponseEndBlock{ + Events: []abci.Event{ + { + Type: "end_event", + Attributes: []abci.EventAttribute{ + { + Key: []byte("foo"), + Value: []byte("100"), + Index: true, + }, + }, + }, + }, + }, + })) + + for i := 2; i < 12; i++ { + var index bool + if i%2 == 0 { + index = true + } + + require.NoError(t, indexer.Index(types.EventDataNewBlockHeader{ + Header: types.Header{Height: int64(i)}, + ResultBeginBlock: abci.ResponseBeginBlock{ + Events: []abci.Event{ + { + Type: "begin_event", + Attributes: []abci.EventAttribute{ + { + Key: []byte("proposer"), + Value: []byte("FCAA001"), + Index: true, + }, + }, + }, + }, + }, + ResultEndBlock: abci.ResponseEndBlock{ + Events: []abci.Event{ + { + Type: "end_event", + Attributes: []abci.EventAttribute{ + { + Key: []byte("foo"), + Value: []byte(fmt.Sprintf("%d", i)), + Index: index, + }, + }, + }, + }, + }, + })) + } + + testCases := map[string]struct { + q *query.Query + results []int64 + }{ + "block.height = 100": { + q: query.MustParse("block.height = 100"), + results: []int64{}, + }, + "block.height = 5": { + q: query.MustParse("block.height = 5"), + results: []int64{5}, + }, + "begin_event.key1 = 'value1'": { + q: query.MustParse("begin_event.key1 = 'value1'"), + results: []int64{}, + }, + "begin_event.proposer = 'FCAA001'": { + q: query.MustParse("begin_event.proposer = 'FCAA001'"), + results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + }, + "end_event.foo <= 5": { + q: query.MustParse("end_event.foo <= 5"), + results: []int64{2, 4}, + }, + "end_event.foo >= 100": { + q: query.MustParse("end_event.foo >= 100"), + results: []int64{1}, + }, + "block.height > 2 AND end_event.foo <= 8": { + q: query.MustParse("block.height > 2 AND end_event.foo <= 8"), + results: []int64{4, 6, 8}, + }, + "begin_event.proposer CONTAINS 'FFFFFFF'": { + q: query.MustParse("begin_event.proposer CONTAINS 'FFFFFFF'"), + results: []int64{}, + }, + "begin_event.proposer CONTAINS 'FCAA001'": { + q: query.MustParse("begin_event.proposer CONTAINS 'FCAA001'"), + results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + }, + } + + for name, tc := range testCases { + tc := tc + t.Run(name, func(t *testing.T) { + results, err := indexer.Search(context.Background(), tc.q) + require.NoError(t, err) + require.Equal(t, tc.results, results) + }) + } +} diff --git a/state/indexer/block/kv/util.go b/state/indexer/block/kv/util.go new file mode 100644 index 000000000..c0b88018e --- /dev/null +++ b/state/indexer/block/kv/util.go @@ -0,0 +1,96 @@ +package kv + +import ( + "encoding/binary" + "fmt" + "strconv" + + "github.com/google/orderedcode" + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/types" +) + +func intInSlice(a int, list []int) bool { + for _, b := range list { + if b == a { + return true + } + } + + return false +} + +func int64FromBytes(bz []byte) int64 { + v, _ := binary.Varint(bz) + return v +} + +func int64ToBytes(i int64) []byte { + buf := make([]byte, binary.MaxVarintLen64) + n := binary.PutVarint(buf, i) + return buf[:n] +} + +func heightKey(height int64) ([]byte, error) { + return orderedcode.Append( + nil, + types.BlockHeightKey, + height, + ) +} + +func eventKey(compositeKey, typ, eventValue string, height int64) ([]byte, error) { + return orderedcode.Append( + nil, + compositeKey, + eventValue, + height, + typ, + ) +} + +func parseValueFromPrimaryKey(key []byte) (string, error) { + var ( + compositeKey string + height int64 + ) + + remaining, err := orderedcode.Parse(string(key), &compositeKey, &height) + if err != nil { + return "", fmt.Errorf("failed to parse event key: %w", err) + } + + if len(remaining) != 0 { + return "", fmt.Errorf("unexpected remainder in key: %s", remaining) + } + + return strconv.FormatInt(height, 10), nil +} + +func parseValueFromEventKey(key []byte) (string, error) { + var ( + compositeKey, typ, eventValue string + height int64 + ) + + remaining, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ) + if err != nil { + return "", fmt.Errorf("failed to parse event key: %w", err) + } + + if len(remaining) != 0 { + return "", fmt.Errorf("unexpected remainder in key: %s", remaining) + } + + return eventValue, nil +} + +func lookForHeight(conditions []query.Condition) (int64, bool) { + for _, c := range conditions { + if c.CompositeKey == types.BlockHeightKey && c.Op == query.OpEqual { + return c.Operand.(int64), true + } + } + + return 0, false +} diff --git a/state/indexer/block/null/null.go b/state/indexer/block/null/null.go new file mode 100644 index 000000000..7cc536ed6 --- /dev/null +++ b/state/indexer/block/null/null.go @@ -0,0 +1,26 @@ +package null + +import ( + "context" + + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/types" +) + +var _ indexer.BlockIndexer = (*BlockerIndexer)(nil) + +// TxIndex implements a no-op block indexer. +type BlockerIndexer struct{} + +func (idx *BlockerIndexer) Has(height int64) (bool, error) { + return false, nil +} + +func (idx *BlockerIndexer) Index(types.EventDataNewBlockHeader) error { + return nil +} + +func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, error) { + return nil, nil +} diff --git a/state/indexer/query_range.go b/state/indexer/query_range.go new file mode 100644 index 000000000..b4edf53c5 --- /dev/null +++ b/state/indexer/query_range.go @@ -0,0 +1,123 @@ +package indexer + +import ( + "time" + + "github.com/tendermint/tendermint/libs/pubsub/query" +) + +// QueryRanges defines a mapping between a composite event key and a QueryRange. +// +// e.g.account.number => queryRange{lowerBound: 1, upperBound: 5} +type QueryRanges map[string]QueryRange + +// QueryRange defines a range within a query condition. +type QueryRange struct { + LowerBound interface{} // int || time.Time + UpperBound interface{} // int || time.Time + Key string + IncludeLowerBound bool + IncludeUpperBound bool +} + +// AnyBound returns either the lower bound if non-nil, otherwise the upper bound. +func (qr QueryRange) AnyBound() interface{} { + if qr.LowerBound != nil { + return qr.LowerBound + } + + return qr.UpperBound +} + +// LowerBoundValue returns the value for the lower bound. If the lower bound is +// nil, nil will be returned. +func (qr QueryRange) LowerBoundValue() interface{} { + if qr.LowerBound == nil { + return nil + } + + if qr.IncludeLowerBound { + return qr.LowerBound + } + + switch t := qr.LowerBound.(type) { + case int64: + return t + 1 + + case time.Time: + return t.Unix() + 1 + + default: + panic("not implemented") + } +} + +// UpperBoundValue returns the value for the upper bound. If the upper bound is +// nil, nil will be returned. +func (qr QueryRange) UpperBoundValue() interface{} { + if qr.UpperBound == nil { + return nil + } + + if qr.IncludeUpperBound { + return qr.UpperBound + } + + switch t := qr.UpperBound.(type) { + case int64: + return t - 1 + + case time.Time: + return t.Unix() - 1 + + default: + panic("not implemented") + } +} + +// LookForRanges returns a mapping of QueryRanges and the matching indexes in +// the provided query conditions. +func LookForRanges(conditions []query.Condition) (ranges QueryRanges, indexes []int) { + ranges = make(QueryRanges) + for i, c := range conditions { + if IsRangeOperation(c.Op) { + r, ok := ranges[c.CompositeKey] + if !ok { + r = QueryRange{Key: c.CompositeKey} + } + + switch c.Op { + case query.OpGreater: + r.LowerBound = c.Operand + + case query.OpGreaterEqual: + r.IncludeLowerBound = true + r.LowerBound = c.Operand + + case query.OpLess: + r.UpperBound = c.Operand + + case query.OpLessEqual: + r.IncludeUpperBound = true + r.UpperBound = c.Operand + } + + ranges[c.CompositeKey] = r + indexes = append(indexes, i) + } + } + + return ranges, indexes +} + +// IsRangeOperation returns a boolean signifying if a query Operator is a range +// operation or not. +func IsRangeOperation(op query.Operator) bool { + switch op { + case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual: + return true + + default: + return false + } +} diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go index e141f82d8..388d47c18 100644 --- a/state/txindex/indexer.go +++ b/state/txindex/indexer.go @@ -8,9 +8,10 @@ import ( "github.com/tendermint/tendermint/libs/pubsub/query" ) +// XXX/TODO: These types should be moved to the indexer package. + // TxIndexer interface defines methods to index and search transactions. type TxIndexer interface { - // AddBatch analyzes, indexes and stores a batch of transactions. AddBatch(b *Batch) error @@ -25,9 +26,6 @@ type TxIndexer interface { Search(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) } -//---------------------------------------------------- -// Txs are written as a batch - // Batch groups together multiple Index operations to be performed at the same time. // NOTE: Batch is NOT thread-safe and must not be modified after starting its execution. type Batch struct { @@ -52,8 +50,5 @@ func (b *Batch) Size() int { return len(b.Ops) } -//---------------------------------------------------- -// Errors - // ErrorEmptyHash indicates empty hash var ErrorEmptyHash = errors.New("transaction hash cannot be empty") diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index b311bf0de..d7192c8fa 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -4,26 +4,34 @@ import ( "context" "github.com/tendermint/tendermint/libs/service" - + "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/types" ) +// XXX/TODO: These types should be moved to the indexer package. + const ( subscriber = "IndexerService" ) -// IndexerService connects event bus and transaction indexer together in order -// to index transactions coming from event bus. +// IndexerService connects event bus, transaction and block indexers together in +// order to index transactions and blocks coming from the event bus. type IndexerService struct { service.BaseService - idr TxIndexer - eventBus *types.EventBus + txIdxr TxIndexer + blockIdxr indexer.BlockIndexer + eventBus *types.EventBus } // NewIndexerService returns a new service instance. -func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService { - is := &IndexerService{idr: idr, eventBus: eventBus} +func NewIndexerService( + txIdxr TxIndexer, + blockIdxr indexer.BlockIndexer, + eventBus *types.EventBus, +) *IndexerService { + + is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus} is.BaseService = *service.NewBaseService(nil, "IndexerService", is) return is } @@ -34,7 +42,6 @@ func (is *IndexerService) OnStart() error { // Use SubscribeUnbuffered here to ensure both subscriptions does not get // canceled due to not pulling messages fast enough. Cause this might // sometimes happen when there are no other subscribers. - blockHeadersSub, err := is.eventBus.SubscribeUnbuffered( context.Background(), subscriber, @@ -54,20 +61,31 @@ func (is *IndexerService) OnStart() error { eventDataHeader := msg.Data().(types.EventDataNewBlockHeader) height := eventDataHeader.Header.Height batch := NewBatch(eventDataHeader.NumTxs) + for i := int64(0); i < eventDataHeader.NumTxs; i++ { msg2 := <-txsSub.Out() txResult := msg2.Data().(types.EventDataTx).TxResult + if err = batch.Add(&txResult); err != nil { - is.Logger.Error("Can't add tx to batch", + is.Logger.Error( + "failed to add tx to batch", "height", height, "index", txResult.Index, - "err", err) + "err", err, + ) } } - if err = is.idr.AddBatch(batch); err != nil { - is.Logger.Error("Failed to index block", "height", height, "err", err) + + if err := is.blockIdxr.Index(eventDataHeader); err != nil { + is.Logger.Error("failed to index block", "height", height, "err", err) + } else { + is.Logger.Error("indexed block", "height", height, "err", err) + } + + if err = is.txIdxr.AddBatch(batch); err != nil { + is.Logger.Error("failed to index block txs", "height", height, "err", err) } else { - is.Logger.Debug("Indexed block", "height", height) + is.Logger.Debug("indexed block txs", "height", height, "num_txs", eventDataHeader.NumTxs) } } }() diff --git a/state/txindex/indexer_service_test.go b/state/txindex/indexer_service_test.go index 5394e3a7e..5cf512ec4 100644 --- a/state/txindex/indexer_service_test.go +++ b/state/txindex/indexer_service_test.go @@ -4,13 +4,12 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - db "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" + blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex/kv" "github.com/tendermint/tendermint/types" @@ -31,8 +30,9 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { // tx indexer store := db.NewMemDB() txIndexer := kv.NewTxIndex(store) + blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events"))) - service := txindex.NewIndexerService(txIndexer, eventBus) + service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) service.SetLogger(log.TestingLogger()) err = service.Start() require.NoError(t, err) @@ -67,11 +67,15 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { time.Sleep(100 * time.Millisecond) - // check the result res, err := txIndexer.Get(types.Tx("foo").Hash()) - assert.NoError(t, err) - assert.Equal(t, txResult1, res) + require.NoError(t, err) + require.Equal(t, txResult1, res) + + ok, err := blockIndexer.Has(1) + require.NoError(t, err) + require.True(t, ok) + res, err = txIndexer.Get(types.Tx("bar").Hash()) - assert.NoError(t, err) - assert.Equal(t, txResult2, res) + require.NoError(t, err) + require.Equal(t, txResult2, res) } diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index bef0a2f87..1e27017f4 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -6,7 +6,6 @@ import ( "fmt" "strconv" "strings" - "time" "github.com/gogo/protobuf/proto" "github.com/google/orderedcode" @@ -14,6 +13,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" ) @@ -147,7 +147,7 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Ba compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key)) // ensure event does not conflict with a reserved prefix key if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey { - return fmt.Errorf("event type and attribute key \"%s\" is reserved. Please use a different key", compositeTag) + return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag) } if attr.GetIndex() { err := store.Set(keyFromEvent(compositeTag, attr.Value, result), hash) @@ -173,11 +173,10 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Ba // Search will exit early and return any result fetched so far, // when a message is received on the context chan. func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) { - // Potentially exit early. select { case <-ctx.Done(): - results := make([]*abci.TxResult, 0) - return results, nil + return make([]*abci.TxResult, 0), nil + default: } @@ -212,13 +211,13 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul // extract ranges // if both upper and lower bounds exist, it's better to get them in order not // no iterate over kvs that are not within range. - ranges, rangeIndexes := lookForRanges(conditions) + ranges, rangeIndexes := indexer.LookForRanges(conditions) if len(ranges) > 0 { skipIndexes = append(skipIndexes, rangeIndexes...) - for _, r := range ranges { + for _, qr := range ranges { if !hashesInitialized { - filteredHashes = txi.matchRange(ctx, r, prefixFromCompositeKey(r.key), filteredHashes, true) + filteredHashes = txi.matchRange(ctx, qr, prefixFromCompositeKey(qr.Key), filteredHashes, true) hashesInitialized = true // Ignore any remaining conditions if the first condition resulted @@ -227,7 +226,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul break } } else { - filteredHashes = txi.matchRange(ctx, r, prefixFromCompositeKey(r.key), filteredHashes, false) + filteredHashes = txi.matchRange(ctx, qr, prefixFromCompositeKey(qr.Key), filteredHashes, false) } } } @@ -294,100 +293,6 @@ func lookForHeight(conditions []query.Condition) (height int64) { return 0 } -// special map to hold range conditions -// Example: account.number => queryRange{lowerBound: 1, upperBound: 5} -type queryRanges map[string]queryRange - -type queryRange struct { - lowerBound interface{} // int || time.Time - upperBound interface{} // int || time.Time - key string - includeLowerBound bool - includeUpperBound bool -} - -func (r queryRange) lowerBoundValue() interface{} { - if r.lowerBound == nil { - return nil - } - - if r.includeLowerBound { - return r.lowerBound - } - - switch t := r.lowerBound.(type) { - case int64: - return t + 1 - case time.Time: - return t.Unix() + 1 - default: - panic("not implemented") - } -} - -func (r queryRange) AnyBound() interface{} { - if r.lowerBound != nil { - return r.lowerBound - } - - return r.upperBound -} - -func (r queryRange) upperBoundValue() interface{} { - if r.upperBound == nil { - return nil - } - - if r.includeUpperBound { - return r.upperBound - } - - switch t := r.upperBound.(type) { - case int64: - return t - 1 - case time.Time: - return t.Unix() - 1 - default: - panic("not implemented") - } -} - -func lookForRanges(conditions []query.Condition) (ranges queryRanges, indexes []int) { - ranges = make(queryRanges) - for i, c := range conditions { - if isRangeOperation(c.Op) { - r, ok := ranges[c.CompositeKey] - if !ok { - r = queryRange{key: c.CompositeKey} - } - switch c.Op { - case query.OpGreater: - r.lowerBound = c.Operand - case query.OpGreaterEqual: - r.includeLowerBound = true - r.lowerBound = c.Operand - case query.OpLess: - r.upperBound = c.Operand - case query.OpLessEqual: - r.includeUpperBound = true - r.upperBound = c.Operand - } - ranges[c.CompositeKey] = r - indexes = append(indexes, i) - } - } - return ranges, indexes -} - -func isRangeOperation(op query.Operator) bool { - switch op { - case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual: - return true - default: - return false - } -} - // match returns all matching txs by hash that meet a given condition and start // key. An already filtered result (filteredHashes) is provided such that any // non-intersecting matches are removed. @@ -522,7 +427,7 @@ func (txi *TxIndex) match( // NOTE: filteredHashes may be empty if no previous condition has matched. func (txi *TxIndex) matchRange( ctx context.Context, - r queryRange, + qr indexer.QueryRange, startKey []byte, filteredHashes map[string][]byte, firstRun bool, @@ -534,8 +439,8 @@ func (txi *TxIndex) matchRange( } tmpHashes := make(map[string][]byte) - lowerBound := r.lowerBoundValue() - upperBound := r.upperBoundValue() + lowerBound := qr.LowerBoundValue() + upperBound := qr.UpperBoundValue() it, err := dbm.IteratePrefix(txi.store, startKey) if err != nil { @@ -549,7 +454,7 @@ LOOP: if err != nil { continue } - if _, ok := r.AnyBound().(int64); ok { + if _, ok := qr.AnyBound().(int64); ok { v, err := strconv.ParseInt(value, 10, 64) if err != nil { continue LOOP diff --git a/test/e2e/app/app.go b/test/e2e/app/app.go index 13002a708..09a11a49a 100644 --- a/test/e2e/app/app.go +++ b/test/e2e/app/app.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "github.com/tendermint/tendermint/abci/example/code" abci "github.com/tendermint/tendermint/abci/types" @@ -98,12 +99,29 @@ func (app *Application) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDelive // EndBlock implements ABCI. func (app *Application) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock { - var err error - resp := abci.ResponseEndBlock{} - if resp.ValidatorUpdates, err = app.validatorUpdates(uint64(req.Height)); err != nil { + valUpdates, err := app.validatorUpdates(uint64(req.Height)) + if err != nil { panic(err) } - return resp + + return abci.ResponseEndBlock{ + ValidatorUpdates: valUpdates, + Events: []abci.Event{ + { + Type: "val_updates", + Attributes: []abci.EventAttribute{ + { + Key: []byte("size"), + Value: []byte(strconv.Itoa(valUpdates.Len())), + }, + { + Key: []byte("height"), + Value: []byte(strconv.Itoa(int(req.Height))), + }, + }, + }, + }, + } } // Commit implements ABCI. diff --git a/types/events.go b/types/events.go index 38b356983..b71661a05 100644 --- a/types/events.go +++ b/types/events.go @@ -136,6 +136,10 @@ const ( // TxHeightKey is a reserved key, used to specify transaction block's height. // see EventBus#PublishEventTx TxHeightKey = "tx.height" + + // BlockHeightKey is a reserved key used for indexing BeginBlock and Endblock + // events. + BlockHeightKey = "block.height" ) var (