Browse Source

rpc: stop txSearch result processing if context is done (#4418)

pull/4498/head
Gregory Terzian 5 years ago
committed by GitHub
parent
commit
c837a57ddd
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 182 additions and 13 deletions
  1. +10
    -0
      rpc/client/httpclient.go
  2. +22
    -0
      rpc/client/rpc_test.go
  3. +1
    -1
      rpc/core/tx.go
  4. +2
    -1
      state/txindex/indexer.go
  5. +61
    -5
      state/txindex/kv/kv.go
  6. +4
    -1
      state/txindex/kv/kv_bench_test.go
  7. +80
    -4
      state/txindex/kv/kv_test.go
  8. +2
    -1
      state/txindex/null/null.go

+ 10
- 0
rpc/client/httpclient.go View File

@ -96,6 +96,16 @@ func NewHTTP(remote, wsEndpoint string) (*HTTP, error) {
return NewHTTPWithClient(remote, wsEndpoint, httpClient) return NewHTTPWithClient(remote, wsEndpoint, httpClient)
} }
// Create timeout enabled http client
func NewHTTPWithTimeout(remote, wsEndpoint string, timeout uint) (*HTTP, error) {
httpClient, err := rpcclient.DefaultHTTPClient(remote)
if err != nil {
return nil, err
}
httpClient.Timeout = time.Duration(timeout) * time.Second
return NewHTTPWithClient(remote, wsEndpoint, httpClient)
}
// NewHTTPWithClient allows for setting a custom http client (See NewHTTP). // NewHTTPWithClient allows for setting a custom http client (See NewHTTP).
// An error is returned on invalid remote. The function panics when remote is nil. // An error is returned on invalid remote. The function panics when remote is nil.
func NewHTTPWithClient(remote, wsEndpoint string, client *http.Client) (*HTTP, error) { func NewHTTPWithClient(remote, wsEndpoint string, client *http.Client) (*HTTP, error) {


+ 22
- 0
rpc/client/rpc_test.go View File

@ -39,6 +39,16 @@ func getHTTPClient() *client.HTTP {
return c return c
} }
func getHTTPClientWithTimeout(timeout uint) *client.HTTP {
rpcAddr := rpctest.GetConfig().RPC.ListenAddress
c, err := client.NewHTTPWithTimeout(rpcAddr, "/websocket", timeout)
if err != nil {
panic(err)
}
c.SetLogger(log.TestingLogger())
return c
}
func getLocalClient() *client.Local { func getLocalClient() *client.Local {
return client.NewLocal(node) return client.NewLocal(node)
} }
@ -414,6 +424,18 @@ func TestTx(t *testing.T) {
} }
} }
func TestTxSearchWithTimeout(t *testing.T) {
// Get a client with a time-out of 10 secs.
timeoutClient := getHTTPClientWithTimeout(10)
// query using a compositeKey (see kvstore application)
result, err := timeoutClient.TxSearch("app.creator='Cosmoshi Netowoko'", false, 1, 30, "asc")
require.Nil(t, err)
if len(result.Txs) == 0 {
t.Fatal("expected a lot of transactions")
}
}
func TestTxSearch(t *testing.T) { func TestTxSearch(t *testing.T) {
c := getHTTPClient() c := getHTTPClient()


+ 1
- 1
rpc/core/tx.go View File

@ -67,7 +67,7 @@ func TxSearch(ctx *rpctypes.Context, query string, prove bool, page, perPage int
return nil, err return nil, err
} }
results, err := txIndexer.Search(q)
results, err := txIndexer.Search(ctx.Context(), q)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 2
- 1
state/txindex/indexer.go View File

@ -1,6 +1,7 @@
package txindex package txindex
import ( import (
"context"
"errors" "errors"
"github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/libs/pubsub/query"
@ -21,7 +22,7 @@ type TxIndexer interface {
Get(hash []byte) (*types.TxResult, error) Get(hash []byte) (*types.TxResult, error)
// Search allows you to query for transactions. // Search allows you to query for transactions.
Search(q *query.Query) ([]*types.TxResult, error)
Search(ctx context.Context, q *query.Query) ([]*types.TxResult, error)
} }
//---------------------------------------------------- //----------------------------------------------------


+ 61
- 5
state/txindex/kv/kv.go View File

@ -2,6 +2,7 @@ package kv
import ( import (
"bytes" "bytes"
"context"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"strconv" "strconv"
@ -167,7 +168,18 @@ func (txi *TxIndex) indexEvents(result *types.TxResult, hash []byte, store dbm.S
// better for the client to provide both lower and upper bounds, so we are not // better for the client to provide both lower and upper bounds, so we are not
// performing a full scan. Results from querying indexes are then intersected // performing a full scan. Results from querying indexes are then intersected
// and returned to the caller, in no particular order. // and returned to the caller, in no particular order.
func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
//
// Search will exit early and return any result fetched so far,
// when a message is received on the context chan.
func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*types.TxResult, error) {
// Potentially exit early.
select {
case <-ctx.Done():
results := make([]*types.TxResult, 0)
return results, nil
default:
}
var hashesInitialized bool var hashesInitialized bool
filteredHashes := make(map[string][]byte) filteredHashes := make(map[string][]byte)
@ -205,7 +217,7 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
for _, r := range ranges { for _, r := range ranges {
if !hashesInitialized { if !hashesInitialized {
filteredHashes = txi.matchRange(r, startKey(r.key), filteredHashes, true)
filteredHashes = txi.matchRange(ctx, r, startKey(r.key), filteredHashes, true)
hashesInitialized = true hashesInitialized = true
// Ignore any remaining conditions if the first condition resulted // Ignore any remaining conditions if the first condition resulted
@ -214,7 +226,7 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
break break
} }
} else { } else {
filteredHashes = txi.matchRange(r, startKey(r.key), filteredHashes, false)
filteredHashes = txi.matchRange(ctx, r, startKey(r.key), filteredHashes, false)
} }
} }
} }
@ -229,7 +241,7 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
} }
if !hashesInitialized { if !hashesInitialized {
filteredHashes = txi.match(c, startKeyForCondition(c, height), filteredHashes, true)
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, true)
hashesInitialized = true hashesInitialized = true
// Ignore any remaining conditions if the first condition resulted // Ignore any remaining conditions if the first condition resulted
@ -238,7 +250,7 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
break break
} }
} else { } else {
filteredHashes = txi.match(c, startKeyForCondition(c, height), filteredHashes, false)
filteredHashes = txi.match(ctx, c, startKeyForCondition(c, height), filteredHashes, false)
} }
} }
@ -249,6 +261,13 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
return nil, errors.Wrapf(err, "failed to get Tx{%X}", h) return nil, errors.Wrapf(err, "failed to get Tx{%X}", h)
} }
results = append(results, res) results = append(results, res)
// Potentially exit early.
select {
case <-ctx.Done():
break
default:
}
} }
return results, nil return results, nil
@ -374,6 +393,7 @@ func isRangeOperation(op query.Operator) bool {
// //
// NOTE: filteredHashes may be empty if no previous condition has matched. // NOTE: filteredHashes may be empty if no previous condition has matched.
func (txi *TxIndex) match( func (txi *TxIndex) match(
ctx context.Context,
c query.Condition, c query.Condition,
startKeyBz []byte, startKeyBz []byte,
filteredHashes map[string][]byte, filteredHashes map[string][]byte,
@ -397,6 +417,13 @@ func (txi *TxIndex) match(
for ; it.Valid(); it.Next() { for ; it.Valid(); it.Next() {
tmpHashes[string(it.Value())] = it.Value() tmpHashes[string(it.Value())] = it.Value()
// Potentially exit early.
select {
case <-ctx.Done():
break
default:
}
} }
case c.Op == query.OpContains: case c.Op == query.OpContains:
@ -417,6 +444,13 @@ func (txi *TxIndex) match(
if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) { if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
tmpHashes[string(it.Value())] = it.Value() tmpHashes[string(it.Value())] = it.Value()
} }
// Potentially exit early.
select {
case <-ctx.Done():
break
default:
}
} }
default: default:
panic("other operators should be handled already") panic("other operators should be handled already")
@ -438,6 +472,13 @@ func (txi *TxIndex) match(
for k := range filteredHashes { for k := range filteredHashes {
if tmpHashes[k] == nil { if tmpHashes[k] == nil {
delete(filteredHashes, k) delete(filteredHashes, k)
// Potentially exit early.
select {
case <-ctx.Done():
break
default:
}
} }
} }
@ -450,6 +491,7 @@ func (txi *TxIndex) match(
// //
// NOTE: filteredHashes may be empty if no previous condition has matched. // NOTE: filteredHashes may be empty if no previous condition has matched.
func (txi *TxIndex) matchRange( func (txi *TxIndex) matchRange(
ctx context.Context,
r queryRange, r queryRange,
startKey []byte, startKey []byte,
filteredHashes map[string][]byte, filteredHashes map[string][]byte,
@ -503,6 +545,13 @@ LOOP:
// break // break
// } // }
} }
// Potentially exit early.
select {
case <-ctx.Done():
break
default:
}
} }
if len(tmpHashes) == 0 || firstRun { if len(tmpHashes) == 0 || firstRun {
@ -521,6 +570,13 @@ LOOP:
for k := range filteredHashes { for k := range filteredHashes {
if tmpHashes[k] == nil { if tmpHashes[k] == nil {
delete(filteredHashes, k) delete(filteredHashes, k)
// Potentially exit early.
select {
case <-ctx.Done():
break
default:
}
} }
} }


+ 4
- 1
state/txindex/kv/kv_bench_test.go View File

@ -1,6 +1,7 @@
package kv package kv
import ( import (
"context"
"crypto/rand" "crypto/rand"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -64,8 +65,10 @@ func BenchmarkTxSearch(b *testing.B) {
b.ResetTimer() b.ResetTimer()
ctx := context.Background()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
if _, err := indexer.Search(txQuery); err != nil {
if _, err := indexer.Search(ctx, txQuery); err != nil {
b.Errorf("failed to query for txs: %s", err) b.Errorf("failed to query for txs: %s", err)
} }
} }


+ 80
- 4
state/txindex/kv/kv_test.go View File

@ -1,6 +1,7 @@
package kv package kv
import ( import (
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
@ -118,10 +119,12 @@ func TestTxSearch(t *testing.T) {
{"account.number CONTAINS 'Iv'", 0}, {"account.number CONTAINS 'Iv'", 0},
} }
ctx := context.Background()
for _, tc := range testCases { for _, tc := range testCases {
tc := tc tc := tc
t.Run(tc.q, func(t *testing.T) { t.Run(tc.q, func(t *testing.T) {
results, err := indexer.Search(query.MustParse(tc.q))
results, err := indexer.Search(ctx, query.MustParse(tc.q))
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, results, tc.resultsLength) assert.Len(t, results, tc.resultsLength)
@ -132,6 +135,73 @@ func TestTxSearch(t *testing.T) {
} }
} }
func TestTxSearchWithCancelation(t *testing.T) {
allowedKeys := []string{"account.number", "account.owner", "account.date"}
indexer := NewTxIndex(db.NewMemDB(), IndexEvents(allowedKeys))
txResult := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []kv.Pair{{Key: []byte("number"), Value: []byte("1")}}},
{Type: "account", Attributes: []kv.Pair{{Key: []byte("owner"), Value: []byte("Ivan")}}},
{Type: "", Attributes: []kv.Pair{{Key: []byte("not_allowed"), Value: []byte("Vlad")}}},
})
hash := txResult.Tx.Hash()
err := indexer.Index(txResult)
require.NoError(t, err)
testCases := []struct {
q string
resultsLength int
}{
// search by hash
{fmt.Sprintf("tx.hash = '%X'", hash), 0},
// search by exact match (one key)
{"account.number = 1", 0},
// search by exact match (two keys)
{"account.number = 1 AND account.owner = 'Ivan'", 0},
// search by exact match (two keys)
{"account.number = 1 AND account.owner = 'Vlad'", 0},
{"account.owner = 'Vlad' AND account.number = 1", 0},
{"account.number >= 1 AND account.owner = 'Vlad'", 0},
{"account.owner = 'Vlad' AND account.number >= 1", 0},
{"account.number <= 0", 0},
{"account.number <= 0 AND account.owner = 'Ivan'", 0},
// search using a prefix of the stored value
{"account.owner = 'Iv'", 0},
// search by range
{"account.number >= 1 AND account.number <= 5", 0},
// search by range (lower bound)
{"account.number >= 1", 0},
// search by range (upper bound)
{"account.number <= 5", 0},
// search using not allowed key
{"not_allowed = 'boom'", 0},
// search for not existing tx result
{"account.number >= 2 AND account.number <= 5", 0},
// search using not existing key
{"account.date >= TIME 2013-05-03T14:45:00Z", 0},
// search using CONTAINS
{"account.owner CONTAINS 'an'", 0},
// search for non existing value using CONTAINS
{"account.owner CONTAINS 'Vlad'", 0},
// search using the wrong key (of numeric type) using CONTAINS
{"account.number CONTAINS 'Iv'", 0},
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
for _, tc := range testCases {
tc := tc
t.Run(tc.q, func(t *testing.T) {
results, err := indexer.Search(ctx, query.MustParse(tc.q))
assert.NoError(t, err)
assert.Len(t, results, tc.resultsLength)
})
}
}
func TestTxSearchDeprecatedIndexing(t *testing.T) { func TestTxSearchDeprecatedIndexing(t *testing.T) {
allowedKeys := []string{"account.number", "sender"} allowedKeys := []string{"account.number", "sender"}
indexer := NewTxIndex(db.NewMemDB(), IndexEvents(allowedKeys)) indexer := NewTxIndex(db.NewMemDB(), IndexEvents(allowedKeys))
@ -192,10 +262,12 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) {
{"sender = 'addr1'", []*types.TxResult{txResult2}}, {"sender = 'addr1'", []*types.TxResult{txResult2}},
} }
ctx := context.Background()
for _, tc := range testCases { for _, tc := range testCases {
tc := tc tc := tc
t.Run(tc.q, func(t *testing.T) { t.Run(tc.q, func(t *testing.T) {
results, err := indexer.Search(query.MustParse(tc.q))
results, err := indexer.Search(ctx, query.MustParse(tc.q))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, results, tc.results) require.Equal(t, results, tc.results)
}) })
@ -214,7 +286,9 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
err := indexer.Index(txResult) err := indexer.Index(txResult)
require.NoError(t, err) require.NoError(t, err)
results, err := indexer.Search(query.MustParse("account.number >= 1"))
ctx := context.Background()
results, err := indexer.Search(ctx, query.MustParse("account.number >= 1"))
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, results, 1) assert.Len(t, results, 1)
@ -268,7 +342,9 @@ func TestTxSearchMultipleTxs(t *testing.T) {
err = indexer.Index(txResult4) err = indexer.Index(txResult4)
require.NoError(t, err) require.NoError(t, err)
results, err := indexer.Search(query.MustParse("account.number >= 1"))
ctx := context.Background()
results, err := indexer.Search(ctx, query.MustParse("account.number >= 1"))
assert.NoError(t, err) assert.NoError(t, err)
require.Len(t, results, 3) require.Len(t, results, 3)


+ 2
- 1
state/txindex/null/null.go View File

@ -1,6 +1,7 @@
package null package null
import ( import (
"context"
"errors" "errors"
"github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/libs/pubsub/query"
@ -28,6 +29,6 @@ func (txi *TxIndex) Index(result *types.TxResult) error {
return nil return nil
} }
func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*types.TxResult, error) {
return []*types.TxResult{}, nil return []*types.TxResult{}, nil
} }

Loading…
Cancel
Save