From c837a57ddd5f1c05dcc822f3555e9a6202f05d8e Mon Sep 17 00:00:00 2001 From: Gregory Terzian <2792687+gterzian@users.noreply.github.com> Date: Fri, 28 Feb 2020 18:38:28 +0800 Subject: [PATCH] rpc: stop txSearch result processing if context is done (#4418) --- rpc/client/httpclient.go | 10 ++++ rpc/client/rpc_test.go | 22 ++++++++ rpc/core/tx.go | 2 +- state/txindex/indexer.go | 3 +- state/txindex/kv/kv.go | 66 ++++++++++++++++++++++-- state/txindex/kv/kv_bench_test.go | 5 +- state/txindex/kv/kv_test.go | 84 +++++++++++++++++++++++++++++-- state/txindex/null/null.go | 3 +- 8 files changed, 182 insertions(+), 13 deletions(-) diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index c844113c3..98875c91e 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -96,6 +96,16 @@ func NewHTTP(remote, wsEndpoint string) (*HTTP, error) { return NewHTTPWithClient(remote, wsEndpoint, httpClient) } +// Create timeout enabled http client +func NewHTTPWithTimeout(remote, wsEndpoint string, timeout uint) (*HTTP, error) { + httpClient, err := rpcclient.DefaultHTTPClient(remote) + if err != nil { + return nil, err + } + httpClient.Timeout = time.Duration(timeout) * time.Second + return NewHTTPWithClient(remote, wsEndpoint, httpClient) +} + // NewHTTPWithClient allows for setting a custom http client (See NewHTTP). // An error is returned on invalid remote. The function panics when remote is nil. func NewHTTPWithClient(remote, wsEndpoint string, client *http.Client) (*HTTP, error) { diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 90612d5e0..9196bead0 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -39,6 +39,16 @@ func getHTTPClient() *client.HTTP { return c } +func getHTTPClientWithTimeout(timeout uint) *client.HTTP { + rpcAddr := rpctest.GetConfig().RPC.ListenAddress + c, err := client.NewHTTPWithTimeout(rpcAddr, "/websocket", timeout) + if err != nil { + panic(err) + } + c.SetLogger(log.TestingLogger()) + return c +} + func getLocalClient() *client.Local { return client.NewLocal(node) } @@ -414,6 +424,18 @@ func TestTx(t *testing.T) { } } +func TestTxSearchWithTimeout(t *testing.T) { + // Get a client with a time-out of 10 secs. + timeoutClient := getHTTPClientWithTimeout(10) + + // query using a compositeKey (see kvstore application) + result, err := timeoutClient.TxSearch("app.creator='Cosmoshi Netowoko'", false, 1, 30, "asc") + require.Nil(t, err) + if len(result.Txs) == 0 { + t.Fatal("expected a lot of transactions") + } +} + func TestTxSearch(t *testing.T) { c := getHTTPClient() diff --git a/rpc/core/tx.go b/rpc/core/tx.go index f894424b7..e7e2582f6 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -67,7 +67,7 @@ func TxSearch(ctx *rpctypes.Context, query string, prove bool, page, perPage int return nil, err } - results, err := txIndexer.Search(q) + results, err := txIndexer.Search(ctx.Context(), q) if err != nil { return nil, err } diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go index c66f4322c..5f7ee7544 100644 --- a/state/txindex/indexer.go +++ b/state/txindex/indexer.go @@ -1,6 +1,7 @@ package txindex import ( + "context" "errors" "github.com/tendermint/tendermint/libs/pubsub/query" @@ -21,7 +22,7 @@ type TxIndexer interface { Get(hash []byte) (*types.TxResult, error) // Search allows you to query for transactions. - Search(q *query.Query) ([]*types.TxResult, error) + Search(ctx context.Context, q *query.Query) ([]*types.TxResult, error) } //---------------------------------------------------- diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index 5fcd5ab73..c5ba99ca3 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -2,6 +2,7 @@ package kv import ( "bytes" + "context" "encoding/hex" "fmt" "strconv" @@ -167,7 +168,18 @@ func (txi *TxIndex) indexEvents(result *types.TxResult, hash []byte, store dbm.S // better for the client to provide both lower and upper bounds, so we are not // performing a full scan. Results from querying indexes are then intersected // and returned to the caller, in no particular order. -func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { +// +// Search will exit early and return any result fetched so far, +// when a message is received on the context chan. +func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*types.TxResult, error) { + // Potentially exit early. + select { + case <-ctx.Done(): + results := make([]*types.TxResult, 0) + return results, nil + default: + } + var hashesInitialized bool filteredHashes := make(map[string][]byte) @@ -205,7 +217,7 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { for _, r := range ranges { if !hashesInitialized { - filteredHashes = txi.matchRange(r, startKey(r.key), filteredHashes, true) + filteredHashes = txi.matchRange(ctx, r, startKey(r.key), filteredHashes, true) hashesInitialized = true // Ignore any remaining conditions if the first condition resulted @@ -214,7 +226,7 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { break } } else { - filteredHashes = txi.matchRange(r, startKey(r.key), filteredHashes, false) + filteredHashes = txi.matchRange(ctx, r, startKey(r.key), filteredHashes, false) } } } @@ -229,7 +241,7 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { } if !hashesInitialized { - filteredHashes = txi.match(c, startKeyForCondition(c, height), filteredHashes, true) + filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, true) hashesInitialized = true // Ignore any remaining conditions if the first condition resulted @@ -238,7 +250,7 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { break } } else { - filteredHashes = txi.match(c, startKeyForCondition(c, height), filteredHashes, false) + filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, false) } } @@ -249,6 +261,13 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { return nil, errors.Wrapf(err, "failed to get Tx{%X}", h) } results = append(results, res) + + // Potentially exit early. + select { + case <-ctx.Done(): + break + default: + } } return results, nil @@ -374,6 +393,7 @@ func isRangeOperation(op query.Operator) bool { // // NOTE: filteredHashes may be empty if no previous condition has matched. func (txi *TxIndex) match( + ctx context.Context, c query.Condition, startKeyBz []byte, filteredHashes map[string][]byte, @@ -397,6 +417,13 @@ func (txi *TxIndex) match( for ; it.Valid(); it.Next() { tmpHashes[string(it.Value())] = it.Value() + + // Potentially exit early. + select { + case <-ctx.Done(): + break + default: + } } case c.Op == query.OpContains: @@ -417,6 +444,13 @@ func (txi *TxIndex) match( if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) { tmpHashes[string(it.Value())] = it.Value() } + + // Potentially exit early. + select { + case <-ctx.Done(): + break + default: + } } default: panic("other operators should be handled already") @@ -438,6 +472,13 @@ func (txi *TxIndex) match( for k := range filteredHashes { if tmpHashes[k] == nil { delete(filteredHashes, k) + + // Potentially exit early. + select { + case <-ctx.Done(): + break + default: + } } } @@ -450,6 +491,7 @@ func (txi *TxIndex) match( // // NOTE: filteredHashes may be empty if no previous condition has matched. func (txi *TxIndex) matchRange( + ctx context.Context, r queryRange, startKey []byte, filteredHashes map[string][]byte, @@ -503,6 +545,13 @@ LOOP: // break // } } + + // Potentially exit early. + select { + case <-ctx.Done(): + break + default: + } } if len(tmpHashes) == 0 || firstRun { @@ -521,6 +570,13 @@ LOOP: for k := range filteredHashes { if tmpHashes[k] == nil { delete(filteredHashes, k) + + // Potentially exit early. + select { + case <-ctx.Done(): + break + default: + } } } diff --git a/state/txindex/kv/kv_bench_test.go b/state/txindex/kv/kv_bench_test.go index 3e3bdced3..34d770040 100644 --- a/state/txindex/kv/kv_bench_test.go +++ b/state/txindex/kv/kv_bench_test.go @@ -1,6 +1,7 @@ package kv import ( + "context" "crypto/rand" "fmt" "io/ioutil" @@ -64,8 +65,10 @@ func BenchmarkTxSearch(b *testing.B) { b.ResetTimer() + ctx := context.Background() + for i := 0; i < b.N; i++ { - if _, err := indexer.Search(txQuery); err != nil { + if _, err := indexer.Search(ctx, txQuery); err != nil { b.Errorf("failed to query for txs: %s", err) } } diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index efb767f21..8756bc344 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -1,6 +1,7 @@ package kv import ( + "context" "fmt" "io/ioutil" "os" @@ -118,10 +119,12 @@ func TestTxSearch(t *testing.T) { {"account.number CONTAINS 'Iv'", 0}, } + ctx := context.Background() + for _, tc := range testCases { tc := tc t.Run(tc.q, func(t *testing.T) { - results, err := indexer.Search(query.MustParse(tc.q)) + results, err := indexer.Search(ctx, query.MustParse(tc.q)) assert.NoError(t, err) assert.Len(t, results, tc.resultsLength) @@ -132,6 +135,73 @@ func TestTxSearch(t *testing.T) { } } +func TestTxSearchWithCancelation(t *testing.T) { + allowedKeys := []string{"account.number", "account.owner", "account.date"} + indexer := NewTxIndex(db.NewMemDB(), IndexEvents(allowedKeys)) + + txResult := txResultWithEvents([]abci.Event{ + {Type: "account", Attributes: []kv.Pair{{Key: []byte("number"), Value: []byte("1")}}}, + {Type: "account", Attributes: []kv.Pair{{Key: []byte("owner"), Value: []byte("Ivan")}}}, + {Type: "", Attributes: []kv.Pair{{Key: []byte("not_allowed"), Value: []byte("Vlad")}}}, + }) + hash := txResult.Tx.Hash() + + err := indexer.Index(txResult) + require.NoError(t, err) + + testCases := []struct { + q string + resultsLength int + }{ + // search by hash + {fmt.Sprintf("tx.hash = '%X'", hash), 0}, + // search by exact match (one key) + {"account.number = 1", 0}, + // search by exact match (two keys) + {"account.number = 1 AND account.owner = 'Ivan'", 0}, + // search by exact match (two keys) + {"account.number = 1 AND account.owner = 'Vlad'", 0}, + {"account.owner = 'Vlad' AND account.number = 1", 0}, + {"account.number >= 1 AND account.owner = 'Vlad'", 0}, + {"account.owner = 'Vlad' AND account.number >= 1", 0}, + {"account.number <= 0", 0}, + {"account.number <= 0 AND account.owner = 'Ivan'", 0}, + // search using a prefix of the stored value + {"account.owner = 'Iv'", 0}, + // search by range + {"account.number >= 1 AND account.number <= 5", 0}, + // search by range (lower bound) + {"account.number >= 1", 0}, + // search by range (upper bound) + {"account.number <= 5", 0}, + // search using not allowed key + {"not_allowed = 'boom'", 0}, + // search for not existing tx result + {"account.number >= 2 AND account.number <= 5", 0}, + // search using not existing key + {"account.date >= TIME 2013-05-03T14:45:00Z", 0}, + // search using CONTAINS + {"account.owner CONTAINS 'an'", 0}, + // search for non existing value using CONTAINS + {"account.owner CONTAINS 'Vlad'", 0}, + // search using the wrong key (of numeric type) using CONTAINS + {"account.number CONTAINS 'Iv'", 0}, + } + + ctx, cancel := context.WithCancel(context.Background()) + + cancel() + + for _, tc := range testCases { + tc := tc + t.Run(tc.q, func(t *testing.T) { + results, err := indexer.Search(ctx, query.MustParse(tc.q)) + assert.NoError(t, err) + assert.Len(t, results, tc.resultsLength) + }) + } +} + func TestTxSearchDeprecatedIndexing(t *testing.T) { allowedKeys := []string{"account.number", "sender"} indexer := NewTxIndex(db.NewMemDB(), IndexEvents(allowedKeys)) @@ -192,10 +262,12 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) { {"sender = 'addr1'", []*types.TxResult{txResult2}}, } + ctx := context.Background() + for _, tc := range testCases { tc := tc t.Run(tc.q, func(t *testing.T) { - results, err := indexer.Search(query.MustParse(tc.q)) + results, err := indexer.Search(ctx, query.MustParse(tc.q)) require.NoError(t, err) require.Equal(t, results, tc.results) }) @@ -214,7 +286,9 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { err := indexer.Index(txResult) require.NoError(t, err) - results, err := indexer.Search(query.MustParse("account.number >= 1")) + ctx := context.Background() + + results, err := indexer.Search(ctx, query.MustParse("account.number >= 1")) assert.NoError(t, err) assert.Len(t, results, 1) @@ -268,7 +342,9 @@ func TestTxSearchMultipleTxs(t *testing.T) { err = indexer.Index(txResult4) require.NoError(t, err) - results, err := indexer.Search(query.MustParse("account.number >= 1")) + ctx := context.Background() + + results, err := indexer.Search(ctx, query.MustParse("account.number >= 1")) assert.NoError(t, err) require.Len(t, results, 3) diff --git a/state/txindex/null/null.go b/state/txindex/null/null.go index ae496a851..1ae7f7942 100644 --- a/state/txindex/null/null.go +++ b/state/txindex/null/null.go @@ -1,6 +1,7 @@ package null import ( + "context" "errors" "github.com/tendermint/tendermint/libs/pubsub/query" @@ -28,6 +29,6 @@ func (txi *TxIndex) Index(result *types.TxResult) error { return nil } -func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { +func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*types.TxResult, error) { return []*types.TxResult{}, nil }