diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 1e2749b3c..491836106 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -88,7 +88,7 @@ jobs: go-version: "1.17" - name: test & coverage report creation run: | - cat pkgs.txt.part.${{ matrix.part }} | xargs go test -mod=readonly -timeout 8m -race -coverprofile=${{ matrix.part }}profile.out + cat pkgs.txt.part.${{ matrix.part }} | xargs go test -mod=readonly -timeout 15m -race -coverprofile=${{ matrix.part }}profile.out if: env.GIT_DIFF - uses: actions/upload-artifact@v2 with: diff --git a/abci/client/mocks/client.go b/abci/client/mocks/client.go index f0d82a50e..e727fd544 100644 --- a/abci/client/mocks/client.go +++ b/abci/client/mocks/client.go @@ -680,22 +680,6 @@ func (_m *Client) QuerySync(_a0 context.Context, _a1 types.RequestQuery) (*types return r0, r1 } -// Quit provides a mock function with given fields: -func (_m *Client) Quit() <-chan struct{} { - ret := _m.Called() - - var r0 <-chan struct{} - if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan struct{}) - } - } - - return r0 -} - // SetResponseCallback provides a mock function with given fields: _a0 func (_m *Client) SetResponseCallback(_a0 abciclient.Callback) { _m.Called(_a0) diff --git a/internal/eventbus/event_bus_test.go b/internal/eventbus/event_bus_test.go index 3e9069718..c65c8cdd6 100644 --- a/internal/eventbus/event_bus_test.go +++ b/internal/eventbus/event_bus_test.go @@ -38,7 +38,7 @@ func TestEventBusPublishEventTx(t *testing.T) { query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND testType.baz=1", tx.Hash()) txsSub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: "test", - Query: tmquery.MustParse(query), + Query: tmquery.MustCompile(query), }) require.NoError(t, err) @@ -96,7 +96,7 @@ func TestEventBusPublishEventNewBlock(t *testing.T) { query := "tm.event='NewBlock' AND testType.baz=1 AND testType.foz=2" blocksSub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: "test", - Query: tmquery.MustParse(query), + Query: tmquery.MustCompile(query), }) require.NoError(t, err) @@ -205,7 +205,7 @@ func TestEventBusPublishEventTxDuplicateKeys(t *testing.T) { sub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: fmt.Sprintf("client-%d", i), - Query: tmquery.MustParse(tc.query), + Query: tmquery.MustCompile(tc.query), }) require.NoError(t, err) @@ -269,7 +269,7 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) { query := "tm.event='NewBlockHeader' AND testType.baz=1 AND testType.foz=2" headersSub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: "test", - Query: tmquery.MustParse(query), + Query: tmquery.MustCompile(query), }) require.NoError(t, err) @@ -312,7 +312,7 @@ func TestEventBusPublishEventNewEvidence(t *testing.T) { const query = `tm.event='NewEvidence'` evSub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: "test", - Query: tmquery.MustParse(query), + Query: tmquery.MustCompile(query), }) require.NoError(t, err) @@ -352,7 +352,7 @@ func TestEventBusPublish(t *testing.T) { sub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: "test", - Query: tmquery.Empty{}, + Query: tmquery.All, Limit: numEventsExpected, }) require.NoError(t, err) diff --git a/internal/inspect/inspect_test.go b/internal/inspect/inspect_test.go index 15a555ab0..ff6ade0d0 100644 --- a/internal/inspect/inspect_test.go +++ b/internal/inspect/inspect_test.go @@ -115,7 +115,7 @@ func TestBlock(t *testing.T) { func TestTxSearch(t *testing.T) { testHash := []byte("test") testTx := []byte("tx") - testQuery := fmt.Sprintf("tx.hash='%s'", string(testHash)) + testQuery := fmt.Sprintf("tx.hash = '%s'", string(testHash)) testTxResult := &abcitypes.TxResult{ Height: 1, Index: 100, diff --git a/internal/state/indexer/block/kv/kv.go b/internal/state/indexer/block/kv/kv.go index d52f06c96..26fdcf1fc 100644 --- a/internal/state/indexer/block/kv/kv.go +++ b/internal/state/indexer/block/kv/kv.go @@ -14,6 +14,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/state/indexer" "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/libs/pubsub/query/syntax" "github.com/tendermint/tendermint/types" ) @@ -91,10 +92,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, default: } - conditions, err := q.Conditions() - if err != nil { - return nil, fmt.Errorf("failed to parse query conditions: %w", err) - } + conditions := q.Syntax() // If there is an exact height query, return the result immediately // (if it exists). @@ -158,7 +156,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, continue } - startKey, err := orderedcode.Append(nil, c.CompositeKey, fmt.Sprintf("%v", c.Operand)) + startKey, err := orderedcode.Append(nil, c.Tag, c.Arg.Value()) if err != nil { return nil, err } @@ -327,7 +325,7 @@ iter: // matched. func (idx *BlockerIndexer) match( ctx context.Context, - c query.Condition, + c syntax.Condition, startKeyBz []byte, filteredHeights map[string][]byte, firstRun bool, @@ -342,7 +340,7 @@ func (idx *BlockerIndexer) match( tmpHeights := make(map[string][]byte) switch { - case c.Op == query.OpEqual: + case c.Op == syntax.TEq: it, err := dbm.IteratePrefix(idx.store, startKeyBz) if err != nil { return nil, fmt.Errorf("failed to create prefix iterator: %w", err) @@ -361,8 +359,8 @@ func (idx *BlockerIndexer) match( return nil, err } - case c.Op == query.OpExists: - prefix, err := orderedcode.Append(nil, c.CompositeKey) + case c.Op == syntax.TExists: + prefix, err := orderedcode.Append(nil, c.Tag) if err != nil { return nil, err } @@ -389,8 +387,8 @@ func (idx *BlockerIndexer) match( return nil, err } - case c.Op == query.OpContains: - prefix, err := orderedcode.Append(nil, c.CompositeKey) + case c.Op == syntax.TContains: + prefix, err := orderedcode.Append(nil, c.Tag) if err != nil { return nil, err } @@ -408,7 +406,7 @@ func (idx *BlockerIndexer) match( continue } - if strings.Contains(eventValue, c.Operand.(string)) { + if strings.Contains(eventValue, c.Arg.Value()) { tmpHeights[string(it.Value())] = it.Value() } diff --git a/internal/state/indexer/block/kv/kv_test.go b/internal/state/indexer/block/kv/kv_test.go index 024df332c..650723dbf 100644 --- a/internal/state/indexer/block/kv/kv_test.go +++ b/internal/state/indexer/block/kv/kv_test.go @@ -94,39 +94,39 @@ func TestBlockIndexer(t *testing.T) { results []int64 }{ "block.height = 100": { - q: query.MustParse("block.height = 100"), + q: query.MustCompile(`block.height = 100`), results: []int64{}, }, "block.height = 5": { - q: query.MustParse("block.height = 5"), + q: query.MustCompile(`block.height = 5`), results: []int64{5}, }, "begin_event.key1 = 'value1'": { - q: query.MustParse("begin_event.key1 = 'value1'"), + q: query.MustCompile(`begin_event.key1 = 'value1'`), results: []int64{}, }, "begin_event.proposer = 'FCAA001'": { - q: query.MustParse("begin_event.proposer = 'FCAA001'"), + q: query.MustCompile(`begin_event.proposer = 'FCAA001'`), results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, }, "end_event.foo <= 5": { - q: query.MustParse("end_event.foo <= 5"), + q: query.MustCompile(`end_event.foo <= 5`), results: []int64{2, 4}, }, "end_event.foo >= 100": { - q: query.MustParse("end_event.foo >= 100"), + q: query.MustCompile(`end_event.foo >= 100`), results: []int64{1}, }, "block.height > 2 AND end_event.foo <= 8": { - q: query.MustParse("block.height > 2 AND end_event.foo <= 8"), + q: query.MustCompile(`block.height > 2 AND end_event.foo <= 8`), results: []int64{4, 6, 8}, }, "begin_event.proposer CONTAINS 'FFFFFFF'": { - q: query.MustParse("begin_event.proposer CONTAINS 'FFFFFFF'"), + q: query.MustCompile(`begin_event.proposer CONTAINS 'FFFFFFF'`), results: []int64{}, }, "begin_event.proposer CONTAINS 'FCAA001'": { - q: query.MustParse("begin_event.proposer CONTAINS 'FCAA001'"), + q: query.MustCompile(`begin_event.proposer CONTAINS 'FCAA001'`), results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, }, } diff --git a/internal/state/indexer/block/kv/util.go b/internal/state/indexer/block/kv/util.go index c0b88018e..fff88046c 100644 --- a/internal/state/indexer/block/kv/util.go +++ b/internal/state/indexer/block/kv/util.go @@ -6,7 +6,7 @@ import ( "strconv" "github.com/google/orderedcode" - "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/libs/pubsub/query/syntax" "github.com/tendermint/tendermint/types" ) @@ -85,10 +85,10 @@ func parseValueFromEventKey(key []byte) (string, error) { return eventValue, nil } -func lookForHeight(conditions []query.Condition) (int64, bool) { +func lookForHeight(conditions []syntax.Condition) (int64, bool) { for _, c := range conditions { - if c.CompositeKey == types.BlockHeightKey && c.Op == query.OpEqual { - return c.Operand.(int64), true + if c.Tag == types.BlockHeightKey && c.Op == syntax.TEq { + return int64(c.Arg.Number()), true } } diff --git a/internal/state/indexer/query_range.go b/internal/state/indexer/query_range.go index b4edf53c5..4c026955d 100644 --- a/internal/state/indexer/query_range.go +++ b/internal/state/indexer/query_range.go @@ -3,7 +3,7 @@ package indexer import ( "time" - "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/libs/pubsub/query/syntax" ) // QueryRanges defines a mapping between a composite event key and a QueryRange. @@ -77,32 +77,32 @@ func (qr QueryRange) UpperBoundValue() interface{} { // LookForRanges returns a mapping of QueryRanges and the matching indexes in // the provided query conditions. -func LookForRanges(conditions []query.Condition) (ranges QueryRanges, indexes []int) { +func LookForRanges(conditions []syntax.Condition) (ranges QueryRanges, indexes []int) { ranges = make(QueryRanges) for i, c := range conditions { if IsRangeOperation(c.Op) { - r, ok := ranges[c.CompositeKey] + r, ok := ranges[c.Tag] if !ok { - r = QueryRange{Key: c.CompositeKey} + r = QueryRange{Key: c.Tag} } switch c.Op { - case query.OpGreater: - r.LowerBound = c.Operand + case syntax.TGt: + r.LowerBound = conditionArg(c) - case query.OpGreaterEqual: + case syntax.TGeq: r.IncludeLowerBound = true - r.LowerBound = c.Operand + r.LowerBound = conditionArg(c) - case query.OpLess: - r.UpperBound = c.Operand + case syntax.TLt: + r.UpperBound = conditionArg(c) - case query.OpLessEqual: + case syntax.TLeq: r.IncludeUpperBound = true - r.UpperBound = c.Operand + r.UpperBound = conditionArg(c) } - ranges[c.CompositeKey] = r + ranges[c.Tag] = r indexes = append(indexes, i) } } @@ -112,12 +112,26 @@ func LookForRanges(conditions []query.Condition) (ranges QueryRanges, indexes [] // IsRangeOperation returns a boolean signifying if a query Operator is a range // operation or not. -func IsRangeOperation(op query.Operator) bool { +func IsRangeOperation(op syntax.Token) bool { switch op { - case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual: + case syntax.TGt, syntax.TGeq, syntax.TLt, syntax.TLeq: return true default: return false } } + +func conditionArg(c syntax.Condition) interface{} { + if c.Arg == nil { + return nil + } + switch c.Arg.Type { + case syntax.TNumber: + return int64(c.Arg.Number()) + case syntax.TTime, syntax.TDate: + return c.Arg.Time() + default: + return c.Arg.Value() // string + } +} diff --git a/internal/state/indexer/sink/kv/kv_test.go b/internal/state/indexer/sink/kv/kv_test.go index 7d7552946..47b1f5364 100644 --- a/internal/state/indexer/sink/kv/kv_test.go +++ b/internal/state/indexer/sink/kv/kv_test.go @@ -111,39 +111,39 @@ func TestBlockFuncs(t *testing.T) { results []int64 }{ "block.height = 100": { - q: query.MustParse("block.height = 100"), + q: query.MustCompile(`block.height = 100`), results: []int64{}, }, "block.height = 5": { - q: query.MustParse("block.height = 5"), + q: query.MustCompile(`block.height = 5`), results: []int64{5}, }, "begin_event.key1 = 'value1'": { - q: query.MustParse("begin_event.key1 = 'value1'"), + q: query.MustCompile(`begin_event.key1 = 'value1'`), results: []int64{}, }, "begin_event.proposer = 'FCAA001'": { - q: query.MustParse("begin_event.proposer = 'FCAA001'"), + q: query.MustCompile(`begin_event.proposer = 'FCAA001'`), results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, }, "end_event.foo <= 5": { - q: query.MustParse("end_event.foo <= 5"), + q: query.MustCompile(`end_event.foo <= 5`), results: []int64{2, 4}, }, "end_event.foo >= 100": { - q: query.MustParse("end_event.foo >= 100"), + q: query.MustCompile(`end_event.foo >= 100`), results: []int64{1}, }, "block.height > 2 AND end_event.foo <= 8": { - q: query.MustParse("block.height > 2 AND end_event.foo <= 8"), + q: query.MustCompile(`block.height > 2 AND end_event.foo <= 8`), results: []int64{4, 6, 8}, }, "begin_event.proposer CONTAINS 'FFFFFFF'": { - q: query.MustParse("begin_event.proposer CONTAINS 'FFFFFFF'"), + q: query.MustCompile(`begin_event.proposer CONTAINS 'FFFFFFF'`), results: []int64{}, }, "begin_event.proposer CONTAINS 'FCAA001'": { - q: query.MustParse("begin_event.proposer CONTAINS 'FCAA001'"), + q: query.MustCompile(`begin_event.proposer CONTAINS 'FCAA001'`), results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, }, } @@ -175,7 +175,7 @@ func TestTxSearchWithCancelation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - results, err := indexer.SearchTxEvents(ctx, query.MustParse("account.number = 1")) + results, err := indexer.SearchTxEvents(ctx, query.MustCompile(`account.number = 1`)) assert.NoError(t, err) assert.Empty(t, results) } @@ -249,7 +249,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.q, func(t *testing.T) { - results, err := indexer.SearchTxEvents(ctx, query.MustParse(tc.q)) + results, err := indexer.SearchTxEvents(ctx, query.MustCompile(tc.q)) require.NoError(t, err) for _, txr := range results { for _, tr := range tc.results { @@ -273,7 +273,7 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { ctx := context.Background() - results, err := indexer.SearchTxEvents(ctx, query.MustParse("account.number >= 1")) + results, err := indexer.SearchTxEvents(ctx, query.MustCompile(`account.number >= 1`)) assert.NoError(t, err) assert.Len(t, results, 1) @@ -330,7 +330,7 @@ func TestTxSearchMultipleTxs(t *testing.T) { ctx := context.Background() - results, err := indexer.SearchTxEvents(ctx, query.MustParse("account.number >= 1")) + results, err := indexer.SearchTxEvents(ctx, query.MustCompile(`account.number >= 1`)) assert.NoError(t, err) require.Len(t, results, 3) diff --git a/internal/state/indexer/tx/kv/kv.go b/internal/state/indexer/tx/kv/kv.go index f0550f8f3..4bcff958b 100644 --- a/internal/state/indexer/tx/kv/kv.go +++ b/internal/state/indexer/tx/kv/kv.go @@ -14,6 +14,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" indexer "github.com/tendermint/tendermint/internal/state/indexer" "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/libs/pubsub/query/syntax" "github.com/tendermint/tendermint/types" ) @@ -148,10 +149,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul filteredHashes := make(map[string][]byte) // get a list of conditions (like "tx.height > 5") - conditions, err := q.Conditions() - if err != nil { - return nil, fmt.Errorf("error during parsing conditions from query: %w", err) - } + conditions := q.Syntax() // if there is a hash condition, return the result immediately hash, ok, err := lookForHash(conditions) @@ -238,10 +236,10 @@ hashes: return results, nil } -func lookForHash(conditions []query.Condition) (hash []byte, ok bool, err error) { +func lookForHash(conditions []syntax.Condition) (hash []byte, ok bool, err error) { for _, c := range conditions { - if c.CompositeKey == types.TxHashKey { - decoded, err := hex.DecodeString(c.Operand.(string)) + if c.Tag == types.TxHashKey { + decoded, err := hex.DecodeString(c.Arg.Value()) return decoded, true, err } } @@ -249,10 +247,10 @@ func lookForHash(conditions []query.Condition) (hash []byte, ok bool, err error) } // lookForHeight returns a height if there is an "height=X" condition. -func lookForHeight(conditions []query.Condition) (height int64) { +func lookForHeight(conditions []syntax.Condition) (height int64) { for _, c := range conditions { - if c.CompositeKey == types.TxHeightKey && c.Op == query.OpEqual { - return c.Operand.(int64) + if c.Tag == types.TxHeightKey && c.Op == syntax.TEq { + return int64(c.Arg.Number()) } } return 0 @@ -265,7 +263,7 @@ func lookForHeight(conditions []query.Condition) (height int64) { // NOTE: filteredHashes may be empty if no previous condition has matched. func (txi *TxIndex) match( ctx context.Context, - c query.Condition, + c syntax.Condition, startKeyBz []byte, filteredHashes map[string][]byte, firstRun bool, @@ -279,7 +277,7 @@ func (txi *TxIndex) match( tmpHashes := make(map[string][]byte) switch { - case c.Op == query.OpEqual: + case c.Op == syntax.TEq: it, err := dbm.IteratePrefix(txi.store, startKeyBz) if err != nil { panic(err) @@ -301,10 +299,10 @@ func (txi *TxIndex) match( panic(err) } - case c.Op == query.OpExists: + case c.Op == syntax.TExists: // XXX: can't use startKeyBz here because c.Operand is nil // (e.g. "account.owner//" won't match w/ a single row) - it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.CompositeKey)) + it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.Tag)) if err != nil { panic(err) } @@ -325,11 +323,11 @@ func (txi *TxIndex) match( panic(err) } - case c.Op == query.OpContains: + case c.Op == syntax.TContains: // XXX: startKey does not apply here. // For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an" // we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/" - it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.CompositeKey)) + it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.Tag)) if err != nil { panic(err) } @@ -341,7 +339,7 @@ func (txi *TxIndex) match( if err != nil { continue } - if strings.Contains(value, c.Operand.(string)) { + if strings.Contains(value, c.Arg.Value()) { tmpHashes[string(it.Value())] = it.Value() } @@ -577,8 +575,8 @@ func prefixFromCompositeKeyAndValue(compositeKey, value string) []byte { } // a small utility function for getting a keys prefix based on a condition and a height -func prefixForCondition(c query.Condition, height int64) []byte { - key := prefixFromCompositeKeyAndValue(c.CompositeKey, fmt.Sprintf("%v", c.Operand)) +func prefixForCondition(c syntax.Condition, height int64) []byte { + key := prefixFromCompositeKeyAndValue(c.Tag, c.Arg.Value()) if height > 0 { var err error key, err = orderedcode.Append(key, height) diff --git a/internal/state/indexer/tx/kv/kv_bench_test.go b/internal/state/indexer/tx/kv/kv_bench_test.go index e8504ebcc..b8f72b2fa 100644 --- a/internal/state/indexer/tx/kv/kv_bench_test.go +++ b/internal/state/indexer/tx/kv/kv_bench_test.go @@ -60,7 +60,7 @@ func BenchmarkTxSearch(b *testing.B) { } } - txQuery := query.MustParse("transfer.address = 'address_43' AND transfer.amount = 50") + txQuery := query.MustCompile(`transfer.address = 'address_43' AND transfer.amount = 50`) b.ResetTimer() diff --git a/internal/state/indexer/tx/kv/kv_test.go b/internal/state/indexer/tx/kv/kv_test.go index 985d58f42..e65f9ca2d 100644 --- a/internal/state/indexer/tx/kv/kv_test.go +++ b/internal/state/indexer/tx/kv/kv_test.go @@ -131,7 +131,7 @@ func TestTxSearch(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.q, func(t *testing.T) { - results, err := indexer.Search(ctx, query.MustParse(tc.q)) + results, err := indexer.Search(ctx, query.MustCompile(tc.q)) assert.NoError(t, err) assert.Len(t, results, tc.resultsLength) @@ -157,7 +157,7 @@ func TestTxSearchWithCancelation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - results, err := indexer.Search(ctx, query.MustParse("account.number = 1")) + results, err := indexer.Search(ctx, query.MustCompile(`account.number = 1`)) assert.NoError(t, err) assert.Empty(t, results) } @@ -230,7 +230,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.q, func(t *testing.T) { - results, err := indexer.Search(ctx, query.MustParse(tc.q)) + results, err := indexer.Search(ctx, query.MustCompile(tc.q)) require.NoError(t, err) for _, txr := range results { for _, tr := range tc.results { @@ -254,7 +254,7 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { ctx := context.Background() - results, err := indexer.Search(ctx, query.MustParse("account.number >= 1")) + results, err := indexer.Search(ctx, query.MustCompile(`account.number >= 1`)) assert.NoError(t, err) assert.Len(t, results, 1) @@ -311,7 +311,7 @@ func TestTxSearchMultipleTxs(t *testing.T) { ctx := context.Background() - results, err := indexer.Search(ctx, query.MustParse("account.number >= 1")) + results, err := indexer.Search(ctx, query.MustCompile(`account.number >= 1`)) assert.NoError(t, err) require.Len(t, results, 3) diff --git a/libs/pubsub/example_test.go b/libs/pubsub/example_test.go index 4d317215f..0d608ceff 100644 --- a/libs/pubsub/example_test.go +++ b/libs/pubsub/example_test.go @@ -18,7 +18,7 @@ func TestExample(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: "example-client", - Query: query.MustParse("abci.account.name='John'"), + Query: query.MustCompile(`abci.account.name='John'`), })) events := []abci.Event{ diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go index be7f3e6e0..926a263a4 100644 --- a/libs/pubsub/pubsub_test.go +++ b/libs/pubsub/pubsub_test.go @@ -27,7 +27,7 @@ func TestSubscribeWithArgs(t *testing.T) { t.Run("DefaultLimit", func(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, - Query: query.Empty{}, + Query: query.All, })) require.Equal(t, 1, s.NumClients()) @@ -39,7 +39,7 @@ func TestSubscribeWithArgs(t *testing.T) { t.Run("PositiveLimit", func(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID + "-2", - Query: query.Empty{}, + Query: query.All, Limit: 10, })) require.NoError(t, s.Publish(ctx, "Aggamon")) @@ -73,9 +73,9 @@ func TestObserverErrors(t *testing.T) { s := newTestServer(ctx, t) - require.Error(t, s.Observe(ctx, nil, query.Empty{})) + require.Error(t, s.Observe(ctx, nil, query.All)) require.NoError(t, s.Observe(ctx, func(pubsub.Message) error { return nil })) - require.Error(t, s.Observe(ctx, func(pubsub.Message) error { return nil }, query.Empty{})) + require.Error(t, s.Observe(ctx, func(pubsub.Message) error { return nil }, query.All)) } func TestPublishDoesNotBlock(t *testing.T) { @@ -86,7 +86,7 @@ func TestPublishDoesNotBlock(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, - Query: query.Empty{}, + Query: query.All, })) published := make(chan struct{}) go func() { @@ -119,7 +119,7 @@ func TestSubscribeErrors(t *testing.T) { t.Run("NegativeLimitErr", func(t *testing.T) { _, err := s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, - Query: query.Empty{}, + Query: query.All, Limit: -5, }) require.Error(t, err) @@ -134,7 +134,7 @@ func TestSlowSubscriber(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, - Query: query.Empty{}, + Query: query.All, })) require.NoError(t, s.Publish(ctx, "Fat Cobra")) @@ -155,7 +155,7 @@ func TestDifferentClients(t *testing.T) { sub1 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: "client-1", - Query: query.MustParse("tm.events.type='NewBlock'"), + Query: query.MustCompile(`tm.events.type='NewBlock'`), })) events := []abci.Event{{ @@ -168,7 +168,7 @@ func TestDifferentClients(t *testing.T) { sub2 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: "client-2", - Query: query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), + Query: query.MustCompile(`tm.events.type='NewBlock' AND abci.account.name='Igor'`), })) events = []abci.Event{ @@ -188,7 +188,8 @@ func TestDifferentClients(t *testing.T) { sub3 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: "client-3", - Query: query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), + Query: query.MustCompile( + `tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10`), })) events = []abci.Event{{ @@ -218,7 +219,7 @@ func TestSubscribeDuplicateKeys(t *testing.T) { for i, tc := range testCases { id := fmt.Sprintf("client-%d", i) - q := query.MustParse(tc.query) + q := query.MustCompile(tc.query) t.Run(id, func(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: id, @@ -261,7 +262,7 @@ func TestClientSubscribesTwice(t *testing.T) { s := newTestServer(ctx, t) - q := query.MustParse("tm.events.type='NewBlock'") + q := query.MustCompile(`tm.events.type='NewBlock'`) events := []abci.Event{{ Type: "tm.events", Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}}, @@ -298,13 +299,13 @@ func TestUnsubscribe(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, - Query: query.MustParse("tm.events.type='NewBlock'"), + Query: query.MustCompile(`tm.events.type='NewBlock'`), })) // Removing the subscription we just made should succeed. require.NoError(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{ Subscriber: clientID, - Query: query.MustParse("tm.events.type='NewBlock'"), + Query: query.MustCompile(`tm.events.type='NewBlock'`), })) // Publishing should still work. @@ -322,15 +323,15 @@ func TestClientUnsubscribesTwice(t *testing.T) { newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, - Query: query.MustParse("tm.events.type='NewBlock'"), + Query: query.MustCompile(`tm.events.type='NewBlock'`), })) require.NoError(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{ Subscriber: clientID, - Query: query.MustParse("tm.events.type='NewBlock'"), + Query: query.MustCompile(`tm.events.type='NewBlock'`), })) require.ErrorIs(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{ Subscriber: clientID, - Query: query.MustParse("tm.events.type='NewBlock'"), + Query: query.MustCompile(`tm.events.type='NewBlock'`), }), pubsub.ErrSubscriptionNotFound) require.ErrorIs(t, s.UnsubscribeAll(ctx, clientID), pubsub.ErrSubscriptionNotFound) } @@ -343,13 +344,13 @@ func TestResubscribe(t *testing.T) { args := pubsub.SubscribeArgs{ ClientID: clientID, - Query: query.Empty{}, + Query: query.All, } newTestSub(t).must(s.SubscribeWithArgs(ctx, args)) require.NoError(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{ Subscriber: clientID, - Query: query.Empty{}, + Query: query.All, })) sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, args)) @@ -366,11 +367,11 @@ func TestUnsubscribeAll(t *testing.T) { sub1 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, - Query: query.MustParse("tm.events.type='NewBlock'"), + Query: query.MustCompile(`tm.events.type='NewBlock'`), })) sub2 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, - Query: query.MustParse("tm.events.type='NewBlockHeader'"), + Query: query.MustCompile(`tm.events.type='NewBlockHeader'`), })) require.NoError(t, s.UnsubscribeAll(ctx, clientID)) diff --git a/libs/pubsub/query/bench_test.go b/libs/pubsub/query/bench_test.go new file mode 100644 index 000000000..2e49b58ae --- /dev/null +++ b/libs/pubsub/query/bench_test.go @@ -0,0 +1,84 @@ +package query_test + +import ( + "testing" + + "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/pubsub/query" + oldquery "github.com/tendermint/tendermint/libs/pubsub/query/oldquery" +) + +const testQuery = `tm.events.type='NewBlock' AND abci.account.name='Igor'` + +var testEvents = []types.Event{ + { + Type: "tm.events", + Attributes: []types.EventAttribute{{ + Key: "index", + Value: "25", + }, { + Key: "type", + Value: "NewBlock", + }}, + }, + { + Type: "abci.account", + Attributes: []types.EventAttribute{{ + Key: "name", + Value: "Anya", + }, { + Key: "name", + Value: "Igor", + }}, + }, +} + +func BenchmarkParsePEG(b *testing.B) { + for i := 0; i < b.N; i++ { + _, err := oldquery.New(testQuery) + if err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkParseCustom(b *testing.B) { + for i := 0; i < b.N; i++ { + _, err := query.New(testQuery) + if err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkMatchPEG(b *testing.B) { + q, err := oldquery.New(testQuery) + if err != nil { + b.Fatal(err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + ok, err := q.Matches(testEvents) + if err != nil { + b.Fatal(err) + } else if !ok { + b.Error("no match") + } + } +} + +func BenchmarkMatchCustom(b *testing.B) { + q, err := query.New(testQuery) + if err != nil { + b.Fatal(err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + ok, err := q.Matches(testEvents) + if err != nil { + b.Fatal(err) + } else if !ok { + b.Error("no match") + } + } +} diff --git a/libs/pubsub/query/Makefile b/libs/pubsub/query/oldquery/Makefile similarity index 81% rename from libs/pubsub/query/Makefile rename to libs/pubsub/query/oldquery/Makefile index aef42b2df..8ec72981e 100644 --- a/libs/pubsub/query/Makefile +++ b/libs/pubsub/query/oldquery/Makefile @@ -1,6 +1,5 @@ gen_query_parser: - go get -u -v github.com/pointlander/peg - peg -inline -switch query.peg + go generate . fuzzy_test: go get -u -v github.com/dvyukov/go-fuzz/go-fuzz diff --git a/libs/pubsub/query/empty.go b/libs/pubsub/query/oldquery/empty.go similarity index 100% rename from libs/pubsub/query/empty.go rename to libs/pubsub/query/oldquery/empty.go diff --git a/libs/pubsub/query/empty_test.go b/libs/pubsub/query/oldquery/empty_test.go similarity index 92% rename from libs/pubsub/query/empty_test.go rename to libs/pubsub/query/oldquery/empty_test.go index 4bb3067d6..81f313d30 100644 --- a/libs/pubsub/query/empty_test.go +++ b/libs/pubsub/query/oldquery/empty_test.go @@ -5,7 +5,7 @@ import ( "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/libs/pubsub/query" + query "github.com/tendermint/tendermint/libs/pubsub/query/oldquery" ) func TestEmptyQueryMatchesAnything(t *testing.T) { diff --git a/libs/pubsub/query/fuzz_test/main.go b/libs/pubsub/query/oldquery/fuzz_test/main.go similarity index 85% rename from libs/pubsub/query/fuzz_test/main.go rename to libs/pubsub/query/oldquery/fuzz_test/main.go index 7a46116b5..8bbcaa25f 100644 --- a/libs/pubsub/query/fuzz_test/main.go +++ b/libs/pubsub/query/oldquery/fuzz_test/main.go @@ -3,7 +3,7 @@ package fuzz_test import ( "fmt" - "github.com/tendermint/tendermint/libs/pubsub/query" + query "github.com/tendermint/tendermint/libs/pubsub/query/oldquery" ) func Fuzz(data []byte) int { diff --git a/libs/pubsub/query/parser_test.go b/libs/pubsub/query/oldquery/parser_test.go similarity index 97% rename from libs/pubsub/query/parser_test.go rename to libs/pubsub/query/oldquery/parser_test.go index a08a0d16d..661a80f93 100644 --- a/libs/pubsub/query/parser_test.go +++ b/libs/pubsub/query/oldquery/parser_test.go @@ -5,7 +5,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/tendermint/tendermint/libs/pubsub/query" + query "github.com/tendermint/tendermint/libs/pubsub/query/oldquery" ) // TODO: fuzzy testing? diff --git a/libs/pubsub/query/oldquery/peg.go b/libs/pubsub/query/oldquery/peg.go new file mode 100644 index 000000000..bf6789b58 --- /dev/null +++ b/libs/pubsub/query/oldquery/peg.go @@ -0,0 +1,3 @@ +package query + +//go:generate go run github.com/pointlander/peg@v1.0.0 -inline -switch query.peg diff --git a/libs/pubsub/query/oldquery/query.go b/libs/pubsub/query/oldquery/query.go new file mode 100644 index 000000000..7b1dfe0f9 --- /dev/null +++ b/libs/pubsub/query/oldquery/query.go @@ -0,0 +1,527 @@ +// Package query provides a parser for a custom query format: +// +// abci.invoice.number=22 AND abci.invoice.owner=Ivan +// +// See query.peg for the grammar, which is a https://en.wikipedia.org/wiki/Parsing_expression_grammar. +// More: https://github.com/PhilippeSigaud/Pegged/wiki/PEG-Basics +// +// It has a support for numbers (integer and floating point), dates and times. +package query + +import ( + "fmt" + "reflect" + "regexp" + "strconv" + "strings" + "time" + + "github.com/tendermint/tendermint/abci/types" +) + +var ( + numRegex = regexp.MustCompile(`([0-9\.]+)`) +) + +// Query holds the query string and the query parser. +type Query struct { + str string + parser *QueryParser +} + +// Condition represents a single condition within a query and consists of composite key +// (e.g. "tx.gas"), operator (e.g. "=") and operand (e.g. "7"). +type Condition struct { + CompositeKey string + Op Operator + Operand interface{} +} + +// New parses the given string and returns a query or error if the string is +// invalid. +func New(s string) (*Query, error) { + p := &QueryParser{Buffer: fmt.Sprintf(`"%s"`, s)} + p.Init() + if err := p.Parse(); err != nil { + return nil, err + } + return &Query{str: s, parser: p}, nil +} + +// MustParse turns the given string into a query or panics; for tests or others +// cases where you know the string is valid. +func MustParse(s string) *Query { + q, err := New(s) + if err != nil { + panic(fmt.Sprintf("failed to parse %s: %v", s, err)) + } + return q +} + +// String returns the original string. +func (q *Query) String() string { + return q.str +} + +// Operator is an operator that defines some kind of relation between composite key and +// operand (equality, etc.). +type Operator uint8 + +const ( + // "<=" + OpLessEqual Operator = iota + // ">=" + OpGreaterEqual + // "<" + OpLess + // ">" + OpGreater + // "=" + OpEqual + // "CONTAINS"; used to check if a string contains a certain sub string. + OpContains + // "EXISTS"; used to check if a certain event attribute is present. + OpExists +) + +const ( + // DateLayout defines a layout for all dates (`DATE date`) + DateLayout = "2006-01-02" + // TimeLayout defines a layout for all times (`TIME time`) + TimeLayout = time.RFC3339 +) + +// Conditions returns a list of conditions. It returns an error if there is any +// error with the provided grammar in the Query. +func (q *Query) Conditions() ([]Condition, error) { + var ( + eventAttr string + op Operator + ) + + conditions := make([]Condition, 0) + buffer, begin, end := q.parser.Buffer, 0, 0 + + // tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7") + for token := range q.parser.Tokens() { + switch token.pegRule { + case rulePegText: + begin, end = int(token.begin), int(token.end) + + case ruletag: + eventAttr = buffer[begin:end] + + case rulele: + op = OpLessEqual + + case rulege: + op = OpGreaterEqual + + case rulel: + op = OpLess + + case ruleg: + op = OpGreater + + case ruleequal: + op = OpEqual + + case rulecontains: + op = OpContains + + case ruleexists: + op = OpExists + conditions = append(conditions, Condition{eventAttr, op, nil}) + + case rulevalue: + // strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock") + valueWithoutSingleQuotes := buffer[begin+1 : end-1] + conditions = append(conditions, Condition{eventAttr, op, valueWithoutSingleQuotes}) + + case rulenumber: + number := buffer[begin:end] + if strings.ContainsAny(number, ".") { // if it looks like a floating-point number + value, err := strconv.ParseFloat(number, 64) + if err != nil { + err = fmt.Errorf( + "got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", + err, number, + ) + return nil, err + } + + conditions = append(conditions, Condition{eventAttr, op, value}) + } else { + value, err := strconv.ParseInt(number, 10, 64) + if err != nil { + err = fmt.Errorf( + "got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", + err, number, + ) + return nil, err + } + + conditions = append(conditions, Condition{eventAttr, op, value}) + } + + case ruletime: + value, err := time.Parse(TimeLayout, buffer[begin:end]) + if err != nil { + err = fmt.Errorf( + "got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", + err, buffer[begin:end], + ) + return nil, err + } + + conditions = append(conditions, Condition{eventAttr, op, value}) + + case ruledate: + value, err := time.Parse("2006-01-02", buffer[begin:end]) + if err != nil { + err = fmt.Errorf( + "got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", + err, buffer[begin:end], + ) + return nil, err + } + + conditions = append(conditions, Condition{eventAttr, op, value}) + } + } + + return conditions, nil +} + +// Matches returns true if the query matches against any event in the given set +// of events, false otherwise. For each event, a match exists if the query is +// matched against *any* value in a slice of values. An error is returned if +// any attempted event match returns an error. +// +// For example, query "name=John" matches events = {"name": ["John", "Eric"]}. +// More examples could be found in parser_test.go and query_test.go. +func (q *Query) Matches(rawEvents []types.Event) (bool, error) { + if len(rawEvents) == 0 { + return false, nil + } + + events := flattenEvents(rawEvents) + + var ( + eventAttr string + op Operator + ) + + buffer, begin, end := q.parser.Buffer, 0, 0 + + // tokens must be in the following order: + + // tag ("tx.gas") -> operator ("=") -> operand ("7") + for token := range q.parser.Tokens() { + switch token.pegRule { + case rulePegText: + begin, end = int(token.begin), int(token.end) + + case ruletag: + eventAttr = buffer[begin:end] + + case rulele: + op = OpLessEqual + + case rulege: + op = OpGreaterEqual + + case rulel: + op = OpLess + + case ruleg: + op = OpGreater + + case ruleequal: + op = OpEqual + + case rulecontains: + op = OpContains + case ruleexists: + op = OpExists + if strings.Contains(eventAttr, ".") { + // Searching for a full "type.attribute" event. + _, ok := events[eventAttr] + if !ok { + return false, nil + } + } else { + foundEvent := false + + loop: + for compositeKey := range events { + if strings.Index(compositeKey, eventAttr) == 0 { + foundEvent = true + break loop + } + } + if !foundEvent { + return false, nil + } + } + + case rulevalue: + // strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock") + valueWithoutSingleQuotes := buffer[begin+1 : end-1] + + // see if the triplet (event attribute, operator, operand) matches any event + // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } + match, err := match(eventAttr, op, reflect.ValueOf(valueWithoutSingleQuotes), events) + if err != nil { + return false, err + } + + if !match { + return false, nil + } + + case rulenumber: + number := buffer[begin:end] + if strings.ContainsAny(number, ".") { // if it looks like a floating-point number + value, err := strconv.ParseFloat(number, 64) + if err != nil { + err = fmt.Errorf( + "got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", + err, number, + ) + return false, err + } + + match, err := match(eventAttr, op, reflect.ValueOf(value), events) + if err != nil { + return false, err + } + + if !match { + return false, nil + } + } else { + value, err := strconv.ParseInt(number, 10, 64) + if err != nil { + err = fmt.Errorf( + "got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", + err, number, + ) + return false, err + } + + match, err := match(eventAttr, op, reflect.ValueOf(value), events) + if err != nil { + return false, err + } + + if !match { + return false, nil + } + } + + case ruletime: + value, err := time.Parse(TimeLayout, buffer[begin:end]) + if err != nil { + err = fmt.Errorf( + "got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", + err, buffer[begin:end], + ) + return false, err + } + + match, err := match(eventAttr, op, reflect.ValueOf(value), events) + if err != nil { + return false, err + } + + if !match { + return false, nil + } + + case ruledate: + value, err := time.Parse("2006-01-02", buffer[begin:end]) + if err != nil { + err = fmt.Errorf( + "got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", + err, buffer[begin:end], + ) + return false, err + } + + match, err := match(eventAttr, op, reflect.ValueOf(value), events) + if err != nil { + return false, err + } + + if !match { + return false, nil + } + } + } + + return true, nil +} + +// match returns true if the given triplet (attribute, operator, operand) matches +// any value in an event for that attribute. If any match fails with an error, +// that error is returned. +// +// First, it looks up the key in the events and if it finds one, tries to compare +// all the values from it to the operand using the operator. +// +// "tx.gas", "=", "7", {"tx": [{"gas": 7, "ID": "4AE393495334"}]} +func match(attr string, op Operator, operand reflect.Value, events map[string][]string) (bool, error) { + // look up the tag from the query in tags + values, ok := events[attr] + if !ok { + return false, nil + } + + for _, value := range values { + // return true if any value in the set of the event's values matches + match, err := matchValue(value, op, operand) + if err != nil { + return false, err + } + + if match { + return true, nil + } + } + + return false, nil +} + +// matchValue will attempt to match a string value against an operator an +// operand. A boolean is returned representing the match result. It will return +// an error if the value cannot be parsed and matched against the operand type. +func matchValue(value string, op Operator, operand reflect.Value) (bool, error) { + switch operand.Kind() { + case reflect.Struct: // time + operandAsTime := operand.Interface().(time.Time) + + // try our best to convert value from events to time.Time + var ( + v time.Time + err error + ) + + if strings.ContainsAny(value, "T") { + v, err = time.Parse(TimeLayout, value) + } else { + v, err = time.Parse(DateLayout, value) + } + if err != nil { + return false, fmt.Errorf("failed to convert value %v from event attribute to time.Time: %w", value, err) + } + + switch op { + case OpLessEqual: + return (v.Before(operandAsTime) || v.Equal(operandAsTime)), nil + case OpGreaterEqual: + return (v.Equal(operandAsTime) || v.After(operandAsTime)), nil + case OpLess: + return v.Before(operandAsTime), nil + case OpGreater: + return v.After(operandAsTime), nil + case OpEqual: + return v.Equal(operandAsTime), nil + } + + case reflect.Float64: + var v float64 + + operandFloat64 := operand.Interface().(float64) + filteredValue := numRegex.FindString(value) + + // try our best to convert value from tags to float64 + v, err := strconv.ParseFloat(filteredValue, 64) + if err != nil { + return false, fmt.Errorf("failed to convert value %v from event attribute to float64: %w", filteredValue, err) + } + + switch op { + case OpLessEqual: + return v <= operandFloat64, nil + case OpGreaterEqual: + return v >= operandFloat64, nil + case OpLess: + return v < operandFloat64, nil + case OpGreater: + return v > operandFloat64, nil + case OpEqual: + return v == operandFloat64, nil + } + + case reflect.Int64: + var v int64 + + operandInt := operand.Interface().(int64) + filteredValue := numRegex.FindString(value) + + // if value looks like float, we try to parse it as float + if strings.ContainsAny(filteredValue, ".") { + v1, err := strconv.ParseFloat(filteredValue, 64) + if err != nil { + return false, fmt.Errorf("failed to convert value %v from event attribute to float64: %w", filteredValue, err) + } + + v = int64(v1) + } else { + var err error + // try our best to convert value from tags to int64 + v, err = strconv.ParseInt(filteredValue, 10, 64) + if err != nil { + return false, fmt.Errorf("failed to convert value %v from event attribute to int64: %w", filteredValue, err) + } + } + + switch op { + case OpLessEqual: + return v <= operandInt, nil + case OpGreaterEqual: + return v >= operandInt, nil + case OpLess: + return v < operandInt, nil + case OpGreater: + return v > operandInt, nil + case OpEqual: + return v == operandInt, nil + } + + case reflect.String: + switch op { + case OpEqual: + return value == operand.String(), nil + case OpContains: + return strings.Contains(value, operand.String()), nil + } + + default: + return false, fmt.Errorf("unknown kind of operand %v", operand.Kind()) + } + + return false, nil +} + +func flattenEvents(events []types.Event) map[string][]string { + flattened := make(map[string][]string) + + for _, event := range events { + if len(event.Type) == 0 { + continue + } + + for _, attr := range event.Attributes { + if len(attr.Key) == 0 { + continue + } + + compositeEvent := fmt.Sprintf("%s.%s", event.Type, attr.Key) + flattened[compositeEvent] = append(flattened[compositeEvent], attr.Value) + } + } + + return flattened +} diff --git a/libs/pubsub/query/query.peg b/libs/pubsub/query/oldquery/query.peg similarity index 100% rename from libs/pubsub/query/query.peg rename to libs/pubsub/query/oldquery/query.peg diff --git a/libs/pubsub/query/query.peg.go b/libs/pubsub/query/oldquery/query.peg.go similarity index 100% rename from libs/pubsub/query/query.peg.go rename to libs/pubsub/query/oldquery/query.peg.go diff --git a/libs/pubsub/query/oldquery/query_test.go b/libs/pubsub/query/oldquery/query_test.go new file mode 100644 index 000000000..9db7651c7 --- /dev/null +++ b/libs/pubsub/query/oldquery/query_test.go @@ -0,0 +1,205 @@ +package query_test + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" + query "github.com/tendermint/tendermint/libs/pubsub/query/oldquery" +) + +func expandEvents(flattenedEvents map[string][]string) []abci.Event { + events := make([]abci.Event, len(flattenedEvents)) + + for composite, values := range flattenedEvents { + tokens := strings.Split(composite, ".") + + attrs := make([]abci.EventAttribute, len(values)) + for i, v := range values { + attrs[i] = abci.EventAttribute{ + Key: tokens[len(tokens)-1], + Value: v, + } + } + + events = append(events, abci.Event{ + Type: strings.Join(tokens[:len(tokens)-1], "."), + Attributes: attrs, + }) + } + + return events +} + +func TestMatches(t *testing.T) { + var ( + txDate = "2017-01-01" + txTime = "2018-05-03T14:45:00Z" + ) + + testCases := []struct { + s string + events map[string][]string + matches bool + }{ + {"tm.events.type='NewBlock'", map[string][]string{"tm.events.type": {"NewBlock"}}, true}, + {"tx.gas > 7", map[string][]string{"tx.gas": {"8"}}, true}, + {"transfer.amount > 7", map[string][]string{"transfer.amount": {"8stake"}}, true}, + {"transfer.amount > 7", map[string][]string{"transfer.amount": {"8.045stake"}}, true}, + {"transfer.amount > 7.043", map[string][]string{"transfer.amount": {"8.045stake"}}, true}, + {"transfer.amount > 8.045", map[string][]string{"transfer.amount": {"8.045stake"}}, false}, + {"tx.gas > 7 AND tx.gas < 9", map[string][]string{"tx.gas": {"8"}}, true}, + {"body.weight >= 3.5", map[string][]string{"body.weight": {"3.5"}}, true}, + {"account.balance < 1000.0", map[string][]string{"account.balance": {"900"}}, true}, + {"apples.kg <= 4", map[string][]string{"apples.kg": {"4.0"}}, true}, + {"body.weight >= 4.5", map[string][]string{"body.weight": {fmt.Sprintf("%v", float32(4.5))}}, true}, + { + "oranges.kg < 4 AND watermellons.kg > 10", + map[string][]string{"oranges.kg": {"3"}, "watermellons.kg": {"12"}}, + true, + }, + {"peaches.kg < 4", map[string][]string{"peaches.kg": {"5"}}, false}, + { + "tx.date > DATE 2017-01-01", + map[string][]string{"tx.date": {time.Now().Format(query.DateLayout)}}, + true, + }, + {"tx.date = DATE 2017-01-01", map[string][]string{"tx.date": {txDate}}, true}, + {"tx.date = DATE 2018-01-01", map[string][]string{"tx.date": {txDate}}, false}, + { + "tx.time >= TIME 2013-05-03T14:45:00Z", + map[string][]string{"tx.time": {time.Now().Format(query.TimeLayout)}}, + true, + }, + {"tx.time = TIME 2013-05-03T14:45:00Z", map[string][]string{"tx.time": {txTime}}, false}, + {"abci.owner.name CONTAINS 'Igor'", map[string][]string{"abci.owner.name": {"Igor,Ivan"}}, true}, + {"abci.owner.name CONTAINS 'Igor'", map[string][]string{"abci.owner.name": {"Pavel,Ivan"}}, false}, + {"abci.owner.name = 'Igor'", map[string][]string{"abci.owner.name": {"Igor", "Ivan"}}, true}, + { + "abci.owner.name = 'Ivan'", + map[string][]string{"abci.owner.name": {"Igor", "Ivan"}}, + true, + }, + { + "abci.owner.name = 'Ivan' AND abci.owner.name = 'Igor'", + map[string][]string{"abci.owner.name": {"Igor", "Ivan"}}, + true, + }, + { + "abci.owner.name = 'Ivan' AND abci.owner.name = 'John'", + map[string][]string{"abci.owner.name": {"Igor", "Ivan"}}, + false, + }, + { + "tm.events.type='NewBlock'", + map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}}, + true, + }, + { + "app.name = 'fuzzed'", + map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}}, + true, + }, + { + "tm.events.type='NewBlock' AND app.name = 'fuzzed'", + map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}}, + true, + }, + { + "tm.events.type='NewHeader' AND app.name = 'fuzzed'", + map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}}, + false, + }, + {"slash EXISTS", + map[string][]string{"slash.reason": {"missing_signature"}, "slash.power": {"6000"}}, + true, + }, + {"sl EXISTS", + map[string][]string{"slash.reason": {"missing_signature"}, "slash.power": {"6000"}}, + true, + }, + {"slash EXISTS", + map[string][]string{"transfer.recipient": {"cosmos1gu6y2a0ffteesyeyeesk23082c6998xyzmt9mz"}, + "transfer.sender": {"cosmos1crje20aj4gxdtyct7z3knxqry2jqt2fuaey6u5"}}, + false, + }, + {"slash.reason EXISTS AND slash.power > 1000", + map[string][]string{"slash.reason": {"missing_signature"}, "slash.power": {"6000"}}, + true, + }, + {"slash.reason EXISTS AND slash.power > 1000", + map[string][]string{"slash.reason": {"missing_signature"}, "slash.power": {"500"}}, + false, + }, + {"slash.reason EXISTS", + map[string][]string{"transfer.recipient": {"cosmos1gu6y2a0ffteesyeyeesk23082c6998xyzmt9mz"}, + "transfer.sender": {"cosmos1crje20aj4gxdtyct7z3knxqry2jqt2fuaey6u5"}}, + false, + }, + } + + for _, tc := range testCases { + q, err := query.New(tc.s) + require.Nil(t, err) + require.NotNil(t, q, "Query '%s' should not be nil", tc.s) + + rawEvents := expandEvents(tc.events) + match, err := q.Matches(rawEvents) + require.Nil(t, err, "Query '%s' should not error on input %v", tc.s, tc.events) + require.Equal(t, tc.matches, match, "Query '%s' on input %v: got %v, want %v", + tc.s, tc.events, match, tc.matches) + } +} + +func TestMustParse(t *testing.T) { + require.Panics(t, func() { query.MustParse("=") }) + require.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") }) +} + +func TestConditions(t *testing.T) { + txTime, err := time.Parse(time.RFC3339, "2013-05-03T14:45:00Z") + require.NoError(t, err) + + testCases := []struct { + s string + conditions []query.Condition + }{ + { + s: "tm.events.type='NewBlock'", + conditions: []query.Condition{ + {CompositeKey: "tm.events.type", Op: query.OpEqual, Operand: "NewBlock"}, + }, + }, + { + s: "tx.gas > 7 AND tx.gas < 9", + conditions: []query.Condition{ + {CompositeKey: "tx.gas", Op: query.OpGreater, Operand: int64(7)}, + {CompositeKey: "tx.gas", Op: query.OpLess, Operand: int64(9)}, + }, + }, + { + s: "tx.time >= TIME 2013-05-03T14:45:00Z", + conditions: []query.Condition{ + {CompositeKey: "tx.time", Op: query.OpGreaterEqual, Operand: txTime}, + }, + }, + { + s: "slashing EXISTS", + conditions: []query.Condition{ + {CompositeKey: "slashing", Op: query.OpExists}, + }, + }, + } + + for _, tc := range testCases { + q, err := query.New(tc.s) + require.Nil(t, err) + + c, err := q.Conditions() + require.NoError(t, err) + require.Equal(t, tc.conditions, c) + } +} diff --git a/libs/pubsub/query/peg.go b/libs/pubsub/query/peg.go deleted file mode 100644 index 816589f02..000000000 --- a/libs/pubsub/query/peg.go +++ /dev/null @@ -1,3 +0,0 @@ -package query - -//go:generate peg -inline -switch query.peg diff --git a/libs/pubsub/query/query.go b/libs/pubsub/query/query.go index 7b1dfe0f9..e874f037c 100644 --- a/libs/pubsub/query/query.go +++ b/libs/pubsub/query/query.go @@ -1,527 +1,327 @@ -// Package query provides a parser for a custom query format: +// Package query implements the custom query format used to filter event +// subscriptions in Tendermint. // -// abci.invoice.number=22 AND abci.invoice.owner=Ivan +// Query expressions describe properties of events and their attributes, using +// strings like: // -// See query.peg for the grammar, which is a https://en.wikipedia.org/wiki/Parsing_expression_grammar. -// More: https://github.com/PhilippeSigaud/Pegged/wiki/PEG-Basics +// abci.invoice.number = 22 AND abci.invoice.owner = 'Ivan' +// +// Query expressions can handle attribute values encoding numbers, strings, +// dates, and timestamps. The complete query grammar is described in the +// query/syntax package. // -// It has a support for numbers (integer and floating point), dates and times. package query import ( "fmt" - "reflect" "regexp" "strconv" "strings" "time" "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/pubsub/query/syntax" ) -var ( - numRegex = regexp.MustCompile(`([0-9\.]+)`) -) +// All is a query that matches all events. +var All *Query -// Query holds the query string and the query parser. +// A Query is the compiled form of a query. type Query struct { - str string - parser *QueryParser + ast syntax.Query + conds []condition } -// Condition represents a single condition within a query and consists of composite key -// (e.g. "tx.gas"), operator (e.g. "=") and operand (e.g. "7"). -type Condition struct { - CompositeKey string - Op Operator - Operand interface{} -} - -// New parses the given string and returns a query or error if the string is -// invalid. -func New(s string) (*Query, error) { - p := &QueryParser{Buffer: fmt.Sprintf(`"%s"`, s)} - p.Init() - if err := p.Parse(); err != nil { +// New parses and compiles the query expression into an executable query. +func New(query string) (*Query, error) { + ast, err := syntax.Parse(query) + if err != nil { return nil, err } - return &Query{str: s, parser: p}, nil + return Compile(ast) } -// MustParse turns the given string into a query or panics; for tests or others -// cases where you know the string is valid. -func MustParse(s string) *Query { - q, err := New(s) +// MustCompile compiles the query expression into an executable query. +// In case of error, MustCompile will panic. +// +// This is intended for use in program initialization; use query.New if you +// need to check errors. +func MustCompile(query string) *Query { + q, err := New(query) if err != nil { - panic(fmt.Sprintf("failed to parse %s: %v", s, err)) + panic(err) } return q } -// String returns the original string. -func (q *Query) String() string { - return q.str -} - -// Operator is an operator that defines some kind of relation between composite key and -// operand (equality, etc.). -type Operator uint8 - -const ( - // "<=" - OpLessEqual Operator = iota - // ">=" - OpGreaterEqual - // "<" - OpLess - // ">" - OpGreater - // "=" - OpEqual - // "CONTAINS"; used to check if a string contains a certain sub string. - OpContains - // "EXISTS"; used to check if a certain event attribute is present. - OpExists -) - -const ( - // DateLayout defines a layout for all dates (`DATE date`) - DateLayout = "2006-01-02" - // TimeLayout defines a layout for all times (`TIME time`) - TimeLayout = time.RFC3339 -) - -// Conditions returns a list of conditions. It returns an error if there is any -// error with the provided grammar in the Query. -func (q *Query) Conditions() ([]Condition, error) { - var ( - eventAttr string - op Operator - ) - - conditions := make([]Condition, 0) - buffer, begin, end := q.parser.Buffer, 0, 0 - - // tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7") - for token := range q.parser.Tokens() { - switch token.pegRule { - case rulePegText: - begin, end = int(token.begin), int(token.end) - - case ruletag: - eventAttr = buffer[begin:end] - - case rulele: - op = OpLessEqual - - case rulege: - op = OpGreaterEqual - - case rulel: - op = OpLess - - case ruleg: - op = OpGreater - - case ruleequal: - op = OpEqual - - case rulecontains: - op = OpContains - - case ruleexists: - op = OpExists - conditions = append(conditions, Condition{eventAttr, op, nil}) - - case rulevalue: - // strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock") - valueWithoutSingleQuotes := buffer[begin+1 : end-1] - conditions = append(conditions, Condition{eventAttr, op, valueWithoutSingleQuotes}) - - case rulenumber: - number := buffer[begin:end] - if strings.ContainsAny(number, ".") { // if it looks like a floating-point number - value, err := strconv.ParseFloat(number, 64) - if err != nil { - err = fmt.Errorf( - "got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", - err, number, - ) - return nil, err - } - - conditions = append(conditions, Condition{eventAttr, op, value}) - } else { - value, err := strconv.ParseInt(number, 10, 64) - if err != nil { - err = fmt.Errorf( - "got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", - err, number, - ) - return nil, err - } - - conditions = append(conditions, Condition{eventAttr, op, value}) - } - - case ruletime: - value, err := time.Parse(TimeLayout, buffer[begin:end]) - if err != nil { - err = fmt.Errorf( - "got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", - err, buffer[begin:end], - ) - return nil, err - } - - conditions = append(conditions, Condition{eventAttr, op, value}) - - case ruledate: - value, err := time.Parse("2006-01-02", buffer[begin:end]) - if err != nil { - err = fmt.Errorf( - "got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", - err, buffer[begin:end], - ) - return nil, err - } - - conditions = append(conditions, Condition{eventAttr, op, value}) +// Compile compiles the given query AST so it can be used to match events. +func Compile(ast syntax.Query) (*Query, error) { + conds := make([]condition, len(ast)) + for i, q := range ast { + cond, err := compileCondition(q) + if err != nil { + return nil, fmt.Errorf("compile %s: %w", q, err) } + conds[i] = cond } - - return conditions, nil + return &Query{ast: ast, conds: conds}, nil } -// Matches returns true if the query matches against any event in the given set -// of events, false otherwise. For each event, a match exists if the query is -// matched against *any* value in a slice of values. An error is returned if -// any attempted event match returns an error. -// -// For example, query "name=John" matches events = {"name": ["John", "Eric"]}. -// More examples could be found in parser_test.go and query_test.go. -func (q *Query) Matches(rawEvents []types.Event) (bool, error) { - if len(rawEvents) == 0 { - return false, nil +// Matches satisfies part of the pubsub.Query interface. This implementation +// never reports an error. A nil *Query matches all events. +func (q *Query) Matches(events []types.Event) (bool, error) { + if q == nil { + return true, nil } + return q.matchesEvents(events), nil +} - events := flattenEvents(rawEvents) - - var ( - eventAttr string - op Operator - ) - - buffer, begin, end := q.parser.Buffer, 0, 0 - - // tokens must be in the following order: - - // tag ("tx.gas") -> operator ("=") -> operand ("7") - for token := range q.parser.Tokens() { - switch token.pegRule { - case rulePegText: - begin, end = int(token.begin), int(token.end) - - case ruletag: - eventAttr = buffer[begin:end] - - case rulele: - op = OpLessEqual - - case rulege: - op = OpGreaterEqual - - case rulel: - op = OpLess - - case ruleg: - op = OpGreater - - case ruleequal: - op = OpEqual - - case rulecontains: - op = OpContains - case ruleexists: - op = OpExists - if strings.Contains(eventAttr, ".") { - // Searching for a full "type.attribute" event. - _, ok := events[eventAttr] - if !ok { - return false, nil - } - } else { - foundEvent := false - - loop: - for compositeKey := range events { - if strings.Index(compositeKey, eventAttr) == 0 { - foundEvent = true - break loop - } - } - if !foundEvent { - return false, nil - } - } - - case rulevalue: - // strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock") - valueWithoutSingleQuotes := buffer[begin+1 : end-1] - - // see if the triplet (event attribute, operator, operand) matches any event - // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } - match, err := match(eventAttr, op, reflect.ValueOf(valueWithoutSingleQuotes), events) - if err != nil { - return false, err - } - - if !match { - return false, nil - } - - case rulenumber: - number := buffer[begin:end] - if strings.ContainsAny(number, ".") { // if it looks like a floating-point number - value, err := strconv.ParseFloat(number, 64) - if err != nil { - err = fmt.Errorf( - "got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", - err, number, - ) - return false, err - } - - match, err := match(eventAttr, op, reflect.ValueOf(value), events) - if err != nil { - return false, err - } - - if !match { - return false, nil - } - } else { - value, err := strconv.ParseInt(number, 10, 64) - if err != nil { - err = fmt.Errorf( - "got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", - err, number, - ) - return false, err - } - - match, err := match(eventAttr, op, reflect.ValueOf(value), events) - if err != nil { - return false, err - } - - if !match { - return false, nil - } - } - - case ruletime: - value, err := time.Parse(TimeLayout, buffer[begin:end]) - if err != nil { - err = fmt.Errorf( - "got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", - err, buffer[begin:end], - ) - return false, err - } - - match, err := match(eventAttr, op, reflect.ValueOf(value), events) - if err != nil { - return false, err - } - - if !match { - return false, nil - } - - case ruledate: - value, err := time.Parse("2006-01-02", buffer[begin:end]) - if err != nil { - err = fmt.Errorf( - "got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", - err, buffer[begin:end], - ) - return false, err - } - - match, err := match(eventAttr, op, reflect.ValueOf(value), events) - if err != nil { - return false, err - } - - if !match { - return false, nil - } - } +// String matches part of the pubsub.Query interface. +func (q *Query) String() string { + if q == nil { + return "" } - - return true, nil + return q.ast.String() } -// match returns true if the given triplet (attribute, operator, operand) matches -// any value in an event for that attribute. If any match fails with an error, -// that error is returned. -// -// First, it looks up the key in the events and if it finds one, tries to compare -// all the values from it to the operand using the operator. -// -// "tx.gas", "=", "7", {"tx": [{"gas": 7, "ID": "4AE393495334"}]} -func match(attr string, op Operator, operand reflect.Value, events map[string][]string) (bool, error) { - // look up the tag from the query in tags - values, ok := events[attr] - if !ok { - return false, nil +// Syntax returns the syntax tree representation of q. +func (q *Query) Syntax() syntax.Query { + if q == nil { + return nil } + return q.ast +} - for _, value := range values { - // return true if any value in the set of the event's values matches - match, err := matchValue(value, op, operand) - if err != nil { - return false, err - } - - if match { - return true, nil +// matchesEvents reports whether all the conditions match the given events. +func (q *Query) matchesEvents(events []types.Event) bool { + for _, cond := range q.conds { + if !cond.matchesAny(events) { + return false } } - - return false, nil + return len(events) != 0 } -// matchValue will attempt to match a string value against an operator an -// operand. A boolean is returned representing the match result. It will return -// an error if the value cannot be parsed and matched against the operand type. -func matchValue(value string, op Operator, operand reflect.Value) (bool, error) { - switch operand.Kind() { - case reflect.Struct: // time - operandAsTime := operand.Interface().(time.Time) - - // try our best to convert value from events to time.Time - var ( - v time.Time - err error - ) +// A condition is a compiled match condition. A condition matches an event if +// the event has the designated type, contains an attribute with the given +// name, and the match function returns true for the attribute value. +type condition struct { + tag string // e.g., "tx.hash" + match func(s string) bool +} - if strings.ContainsAny(value, "T") { - v, err = time.Parse(TimeLayout, value) - } else { - v, err = time.Parse(DateLayout, value) - } - if err != nil { - return false, fmt.Errorf("failed to convert value %v from event attribute to time.Time: %w", value, err) +// findAttr returns a slice of attribute values from event matching the +// condition tag, and reports whether the event type strictly equals the +// condition tag. +func (c condition) findAttr(event types.Event) ([]string, bool) { + if !strings.HasPrefix(c.tag, event.Type) { + return nil, false // type does not match tag + } else if len(c.tag) == len(event.Type) { + return nil, true // type == tag + } + var vals []string + for _, attr := range event.Attributes { + fullName := event.Type + "." + attr.Key + if fullName == c.tag { + vals = append(vals, attr.Value) } + } + return vals, false +} - switch op { - case OpLessEqual: - return (v.Before(operandAsTime) || v.Equal(operandAsTime)), nil - case OpGreaterEqual: - return (v.Equal(operandAsTime) || v.After(operandAsTime)), nil - case OpLess: - return v.Before(operandAsTime), nil - case OpGreater: - return v.After(operandAsTime), nil - case OpEqual: - return v.Equal(operandAsTime), nil +// matchesAny reports whether c matches at least one of the given events. +func (c condition) matchesAny(events []types.Event) bool { + for _, event := range events { + if c.matchesEvent(event) { + return true } + } + return false +} - case reflect.Float64: - var v float64 - - operandFloat64 := operand.Interface().(float64) - filteredValue := numRegex.FindString(value) - - // try our best to convert value from tags to float64 - v, err := strconv.ParseFloat(filteredValue, 64) - if err != nil { - return false, fmt.Errorf("failed to convert value %v from event attribute to float64: %w", filteredValue, err) +// matchesEvent reports whether c matches the given event. +func (c condition) matchesEvent(event types.Event) bool { + vs, tagEqualsType := c.findAttr(event) + if len(vs) == 0 { + // As a special case, a condition tag that exactly matches the event type + // is matched against an empty string. This allows existence checks to + // work for type-only queries. + if tagEqualsType { + return c.match("") } + return false + } - switch op { - case OpLessEqual: - return v <= operandFloat64, nil - case OpGreaterEqual: - return v >= operandFloat64, nil - case OpLess: - return v < operandFloat64, nil - case OpGreater: - return v > operandFloat64, nil - case OpEqual: - return v == operandFloat64, nil + // At this point, we have candidate values. + for _, v := range vs { + if c.match(v) { + return true } + } + return false +} - case reflect.Int64: - var v int64 - - operandInt := operand.Interface().(int64) - filteredValue := numRegex.FindString(value) - - // if value looks like float, we try to parse it as float - if strings.ContainsAny(filteredValue, ".") { - v1, err := strconv.ParseFloat(filteredValue, 64) - if err != nil { - return false, fmt.Errorf("failed to convert value %v from event attribute to float64: %w", filteredValue, err) - } - - v = int64(v1) - } else { - var err error - // try our best to convert value from tags to int64 - v, err = strconv.ParseInt(filteredValue, 10, 64) - if err != nil { - return false, fmt.Errorf("failed to convert value %v from event attribute to int64: %w", filteredValue, err) - } - } +func compileCondition(cond syntax.Condition) (condition, error) { + out := condition{tag: cond.Tag} - switch op { - case OpLessEqual: - return v <= operandInt, nil - case OpGreaterEqual: - return v >= operandInt, nil - case OpLess: - return v < operandInt, nil - case OpGreater: - return v > operandInt, nil - case OpEqual: - return v == operandInt, nil - } + // Handle existence checks separately to simplify the logic below for + // comparisons that take arguments. + if cond.Op == syntax.TExists { + out.match = func(string) bool { return true } + return out, nil + } - case reflect.String: - switch op { - case OpEqual: - return value == operand.String(), nil - case OpContains: - return strings.Contains(value, operand.String()), nil - } + // All the other operators require an argument. + if cond.Arg == nil { + return condition{}, fmt.Errorf("missing argument for %v", cond.Op) + } + // Precompile the argument value matcher. + argType := cond.Arg.Type + var argValue interface{} + + switch argType { + case syntax.TString: + argValue = cond.Arg.Value() + case syntax.TNumber: + argValue = cond.Arg.Number() + case syntax.TTime, syntax.TDate: + argValue = cond.Arg.Time() default: - return false, fmt.Errorf("unknown kind of operand %v", operand.Kind()) + return condition{}, fmt.Errorf("unknown argument type %v", argType) } - return false, nil + mcons := opTypeMap[cond.Op][argType] + if mcons == nil { + return condition{}, fmt.Errorf("invalid op/arg combination (%v, %v)", cond.Op, argType) + } + out.match = mcons(argValue) + return out, nil } -func flattenEvents(events []types.Event) map[string][]string { - flattened := make(map[string][]string) +// TODO(creachadair): The existing implementation allows anything number shaped +// to be treated as a number. This preserves the parts of that behavior we had +// tests for, but we should probably get rid of that. +var extractNum = regexp.MustCompile(`^\d+(\.\d+)?`) - for _, event := range events { - if len(event.Type) == 0 { - continue - } +func parseNumber(s string) (float64, error) { + return strconv.ParseFloat(extractNum.FindString(s), 64) +} - for _, attr := range event.Attributes { - if len(attr.Key) == 0 { - continue +// A map of operator ⇒ argtype ⇒ match-constructor. +// An entry does not exist if the combination is not valid. +// +// Disable the dupl lint for this map. The result isn't even correct. +//nolint:dupl +var opTypeMap = map[syntax.Token]map[syntax.Token]func(interface{}) func(string) bool{ + syntax.TContains: { + syntax.TString: func(v interface{}) func(string) bool { + return func(s string) bool { + return strings.Contains(s, v.(string)) } - - compositeEvent := fmt.Sprintf("%s.%s", event.Type, attr.Key) - flattened[compositeEvent] = append(flattened[compositeEvent], attr.Value) - } - } - - return flattened + }, + }, + syntax.TEq: { + syntax.TString: func(v interface{}) func(string) bool { + return func(s string) bool { return s == v.(string) } + }, + syntax.TNumber: func(v interface{}) func(string) bool { + return func(s string) bool { + w, err := parseNumber(s) + return err == nil && w == v.(float64) + } + }, + syntax.TDate: func(v interface{}) func(string) bool { + return func(s string) bool { + ts, err := syntax.ParseDate(s) + return err == nil && ts.Equal(v.(time.Time)) + } + }, + syntax.TTime: func(v interface{}) func(string) bool { + return func(s string) bool { + ts, err := syntax.ParseTime(s) + return err == nil && ts.Equal(v.(time.Time)) + } + }, + }, + syntax.TLt: { + syntax.TNumber: func(v interface{}) func(string) bool { + return func(s string) bool { + w, err := parseNumber(s) + return err == nil && w < v.(float64) + } + }, + syntax.TDate: func(v interface{}) func(string) bool { + return func(s string) bool { + ts, err := syntax.ParseDate(s) + return err == nil && ts.Before(v.(time.Time)) + } + }, + syntax.TTime: func(v interface{}) func(string) bool { + return func(s string) bool { + ts, err := syntax.ParseTime(s) + return err == nil && ts.Before(v.(time.Time)) + } + }, + }, + syntax.TLeq: { + syntax.TNumber: func(v interface{}) func(string) bool { + return func(s string) bool { + w, err := parseNumber(s) + return err == nil && w <= v.(float64) + } + }, + syntax.TDate: func(v interface{}) func(string) bool { + return func(s string) bool { + ts, err := syntax.ParseDate(s) + return err == nil && !ts.After(v.(time.Time)) + } + }, + syntax.TTime: func(v interface{}) func(string) bool { + return func(s string) bool { + ts, err := syntax.ParseTime(s) + return err == nil && !ts.After(v.(time.Time)) + } + }, + }, + syntax.TGt: { + syntax.TNumber: func(v interface{}) func(string) bool { + return func(s string) bool { + w, err := parseNumber(s) + return err == nil && w > v.(float64) + } + }, + syntax.TDate: func(v interface{}) func(string) bool { + return func(s string) bool { + ts, err := syntax.ParseDate(s) + return err == nil && ts.After(v.(time.Time)) + } + }, + syntax.TTime: func(v interface{}) func(string) bool { + return func(s string) bool { + ts, err := syntax.ParseTime(s) + return err == nil && ts.After(v.(time.Time)) + } + }, + }, + syntax.TGeq: { + syntax.TNumber: func(v interface{}) func(string) bool { + return func(s string) bool { + w, err := parseNumber(s) + return err == nil && w >= v.(float64) + } + }, + syntax.TDate: func(v interface{}) func(string) bool { + return func(s string) bool { + ts, err := syntax.ParseDate(s) + return err == nil && !ts.Before(v.(time.Time)) + } + }, + syntax.TTime: func(v interface{}) func(string) bool { + return func(s string) bool { + ts, err := syntax.ParseTime(s) + return err == nil && !ts.Before(v.(time.Time)) + } + }, + }, } diff --git a/libs/pubsub/query/query_test.go b/libs/pubsub/query/query_test.go index 87f61aafe..883e771c6 100644 --- a/libs/pubsub/query/query_test.go +++ b/libs/pubsub/query/query_test.go @@ -6,242 +6,271 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/libs/pubsub/query/syntax" ) -func expandEvents(flattenedEvents map[string][]string) []abci.Event { - events := make([]abci.Event, len(flattenedEvents)) +var _ pubsub.Query = (*query.Query)(nil) - for composite, values := range flattenedEvents { - tokens := strings.Split(composite, ".") - - attrs := make([]abci.EventAttribute, len(values)) - for i, v := range values { - attrs[i] = abci.EventAttribute{ - Key: tokens[len(tokens)-1], - Value: v, - } - } - - events = append(events, abci.Event{ - Type: strings.Join(tokens[:len(tokens)-1], "."), - Attributes: attrs, - }) - } - - return events +// Example events from the OpenAPI documentation: +// https://github.com/tendermint/tendermint/blob/master/rpc/openapi/openapi.yaml +// +// Redactions: +// +// - Add an explicit "tm" event for the built-in attributes. +// - Remove Index fields (not relevant to tests). +// - Add explicit balance values (to use in tests). +// +var apiEvents = []types.Event{ + { + Type: "tm", + Attributes: []types.EventAttribute{ + {Key: "event", Value: "Tx"}, + {Key: "hash", Value: "XYZ"}, + {Key: "height", Value: "5"}, + }, + }, + { + Type: "rewards.withdraw", + Attributes: []types.EventAttribute{ + {Key: "address", Value: "AddrA"}, + {Key: "source", Value: "SrcX"}, + {Key: "amount", Value: "100"}, + {Key: "balance", Value: "1500"}, + }, + }, + { + Type: "rewards.withdraw", + Attributes: []types.EventAttribute{ + {Key: "address", Value: "AddrB"}, + {Key: "source", Value: "SrcY"}, + {Key: "amount", Value: "45"}, + {Key: "balance", Value: "999"}, + }, + }, + { + Type: "transfer", + Attributes: []types.EventAttribute{ + {Key: "sender", Value: "AddrC"}, + {Key: "recipient", Value: "AddrD"}, + {Key: "amount", Value: "160"}, + }, + }, } -func TestMatches(t *testing.T) { +func TestCompiledMatches(t *testing.T) { var ( txDate = "2017-01-01" txTime = "2018-05-03T14:45:00Z" ) + //nolint:lll testCases := []struct { - s string - events map[string][]string - err bool - matches bool - matchErr bool + s string + events []types.Event + matches bool }{ - {"tm.events.type='NewBlock'", map[string][]string{"tm.events.type": {"NewBlock"}}, false, true, false}, - {"tx.gas > 7", map[string][]string{"tx.gas": {"8"}}, false, true, false}, - {"transfer.amount > 7", map[string][]string{"transfer.amount": {"8stake"}}, false, true, false}, - {"transfer.amount > 7", map[string][]string{"transfer.amount": {"8.045stake"}}, false, true, false}, - {"transfer.amount > 7.043", map[string][]string{"transfer.amount": {"8.045stake"}}, false, true, false}, - {"transfer.amount > 8.045", map[string][]string{"transfer.amount": {"8.045stake"}}, false, false, false}, - {"tx.gas > 7 AND tx.gas < 9", map[string][]string{"tx.gas": {"8"}}, false, true, false}, - {"body.weight >= 3.5", map[string][]string{"body.weight": {"3.5"}}, false, true, false}, - {"account.balance < 1000.0", map[string][]string{"account.balance": {"900"}}, false, true, false}, - {"apples.kg <= 4", map[string][]string{"apples.kg": {"4.0"}}, false, true, false}, - {"body.weight >= 4.5", map[string][]string{"body.weight": {fmt.Sprintf("%v", float32(4.5))}}, false, true, false}, - { - "oranges.kg < 4 AND watermellons.kg > 10", - map[string][]string{"oranges.kg": {"3"}, "watermellons.kg": {"12"}}, - false, - true, - false, - }, - {"peaches.kg < 4", map[string][]string{"peaches.kg": {"5"}}, false, false, false}, - { - "tx.date > DATE 2017-01-01", - map[string][]string{"tx.date": {time.Now().Format(query.DateLayout)}}, - false, - true, - false, - }, - {"tx.date = DATE 2017-01-01", map[string][]string{"tx.date": {txDate}}, false, true, false}, - {"tx.date = DATE 2018-01-01", map[string][]string{"tx.date": {txDate}}, false, false, false}, - { - "tx.time >= TIME 2013-05-03T14:45:00Z", - map[string][]string{"tx.time": {time.Now().Format(query.TimeLayout)}}, - false, - true, - false, - }, - {"tx.time = TIME 2013-05-03T14:45:00Z", map[string][]string{"tx.time": {txTime}}, false, false, false}, - {"abci.owner.name CONTAINS 'Igor'", map[string][]string{"abci.owner.name": {"Igor,Ivan"}}, false, true, false}, - {"abci.owner.name CONTAINS 'Igor'", map[string][]string{"abci.owner.name": {"Pavel,Ivan"}}, false, false, false}, - {"abci.owner.name = 'Igor'", map[string][]string{"abci.owner.name": {"Igor", "Ivan"}}, false, true, false}, - { - "abci.owner.name = 'Ivan'", - map[string][]string{"abci.owner.name": {"Igor", "Ivan"}}, - false, - true, - false, - }, - { - "abci.owner.name = 'Ivan' AND abci.owner.name = 'Igor'", - map[string][]string{"abci.owner.name": {"Igor", "Ivan"}}, - false, - true, - false, - }, - { - "abci.owner.name = 'Ivan' AND abci.owner.name = 'John'", - map[string][]string{"abci.owner.name": {"Igor", "Ivan"}}, - false, - false, - false, - }, - { - "tm.events.type='NewBlock'", - map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}}, - false, - true, - false, - }, - { - "app.name = 'fuzzed'", - map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}}, - false, - true, - false, - }, - { - "tm.events.type='NewBlock' AND app.name = 'fuzzed'", - map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}}, - false, - true, - false, - }, - { - "tm.events.type='NewHeader' AND app.name = 'fuzzed'", - map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}}, - false, - false, - false, - }, - {"slash EXISTS", - map[string][]string{"slash.reason": {"missing_signature"}, "slash.power": {"6000"}}, - false, - true, - false, - }, - {"sl EXISTS", - map[string][]string{"slash.reason": {"missing_signature"}, "slash.power": {"6000"}}, - false, - true, - false, - }, - {"slash EXISTS", - map[string][]string{"transfer.recipient": {"cosmos1gu6y2a0ffteesyeyeesk23082c6998xyzmt9mz"}, - "transfer.sender": {"cosmos1crje20aj4gxdtyct7z3knxqry2jqt2fuaey6u5"}}, - false, - false, - false, - }, - {"slash.reason EXISTS AND slash.power > 1000", - map[string][]string{"slash.reason": {"missing_signature"}, "slash.power": {"6000"}}, - false, - true, - false, - }, - {"slash.reason EXISTS AND slash.power > 1000", - map[string][]string{"slash.reason": {"missing_signature"}, "slash.power": {"500"}}, - false, - false, - false, - }, - {"slash.reason EXISTS", - map[string][]string{"transfer.recipient": {"cosmos1gu6y2a0ffteesyeyeesk23082c6998xyzmt9mz"}, - "transfer.sender": {"cosmos1crje20aj4gxdtyct7z3knxqry2jqt2fuaey6u5"}}, - false, - false, - false, - }, + {`tm.events.type='NewBlock'`, + newTestEvents(`tm|events.type=NewBlock`), + true}, + {`tx.gas > 7`, + newTestEvents(`tx|gas=8`), + true}, + {`transfer.amount > 7`, + newTestEvents(`transfer|amount=8stake`), + true}, + {`transfer.amount > 7`, + newTestEvents(`transfer|amount=8.045`), + true}, + {`transfer.amount > 7.043`, + newTestEvents(`transfer|amount=8.045stake`), + true}, + {`transfer.amount > 8.045`, + newTestEvents(`transfer|amount=8.045stake`), + false}, + {`tx.gas > 7 AND tx.gas < 9`, + newTestEvents(`tx|gas=8`), + true}, + {`body.weight >= 3.5`, + newTestEvents(`body|weight=3.5`), + true}, + {`account.balance < 1000.0`, + newTestEvents(`account|balance=900`), + true}, + {`apples.kg <= 4`, + newTestEvents(`apples|kg=4.0`), + true}, + {`body.weight >= 4.5`, + newTestEvents(`body|weight=4.5`), + true}, + {`oranges.kg < 4 AND watermellons.kg > 10`, + newTestEvents(`oranges|kg=3`, `watermellons|kg=12`), + true}, + {`peaches.kg < 4`, + newTestEvents(`peaches|kg=5`), + false}, + {`tx.date > DATE 2017-01-01`, + newTestEvents(`tx|date=` + time.Now().Format(syntax.DateFormat)), + true}, + {`tx.date = DATE 2017-01-01`, + newTestEvents(`tx|date=` + txDate), + true}, + {`tx.date = DATE 2018-01-01`, + newTestEvents(`tx|date=` + txDate), + false}, + {`tx.time >= TIME 2013-05-03T14:45:00Z`, + newTestEvents(`tx|time=` + time.Now().Format(syntax.TimeFormat)), + true}, + {`tx.time = TIME 2013-05-03T14:45:00Z`, + newTestEvents(`tx|time=` + txTime), + false}, + {`abci.owner.name CONTAINS 'Igor'`, + newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`), + true}, + {`abci.owner.name CONTAINS 'Igor'`, + newTestEvents(`abci|owner.name=Pavel|owner.name=Ivan`), + false}, + {`abci.owner.name = 'Igor'`, + newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`), + true}, + {`abci.owner.name = 'Ivan'`, + newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`), + true}, + {`abci.owner.name = 'Ivan' AND abci.owner.name = 'Igor'`, + newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`), + true}, + {`abci.owner.name = 'Ivan' AND abci.owner.name = 'John'`, + newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`), + false}, + {`tm.events.type='NewBlock'`, + newTestEvents(`tm|events.type=NewBlock`, `app|name=fuzzed`), + true}, + {`app.name = 'fuzzed'`, + newTestEvents(`tm|events.type=NewBlock`, `app|name=fuzzed`), + true}, + {`tm.events.type='NewBlock' AND app.name = 'fuzzed'`, + newTestEvents(`tm|events.type=NewBlock`, `app|name=fuzzed`), + true}, + {`tm.events.type='NewHeader' AND app.name = 'fuzzed'`, + newTestEvents(`tm|events.type=NewBlock`, `app|name=fuzzed`), + false}, + {`slash EXISTS`, + newTestEvents(`slash|reason=missing_signature|power=6000`), + true}, + {`slash EXISTS`, + newTestEvents(`transfer|recipient=cosmos1gu6y2a0ffteesyeyeesk23082c6998xyzmt9mz|sender=cosmos1crje20aj4gxdtyct7z3knxqry2jqt2fuaey6u5`), + false}, + {`slash.reason EXISTS AND slash.power > 1000`, + newTestEvents(`slash|reason=missing_signature|power=6000`), + true}, + {`slash.reason EXISTS AND slash.power > 1000`, + newTestEvents(`slash|reason=missing_signature|power=500`), + false}, + {`slash.reason EXISTS`, + newTestEvents(`transfer|recipient=cosmos1gu6y2a0ffteesyeyeesk23082c6998xyzmt9mz|sender=cosmos1crje20aj4gxdtyct7z3knxqry2jqt2fuaey6u5`), + false}, + + // Test cases based on the OpenAPI examples. + {`tm.event = 'Tx' AND rewards.withdraw.address = 'AddrA'`, + apiEvents, true}, + {`tm.event = 'Tx' AND rewards.withdraw.address = 'AddrA' AND rewards.withdraw.source = 'SrcY'`, + apiEvents, true}, + {`tm.event = 'Tx' AND transfer.sender = 'AddrA'`, + apiEvents, false}, + {`tm.event = 'Tx' AND transfer.sender = 'AddrC'`, + apiEvents, true}, + {`tm.event = 'Tx' AND transfer.sender = 'AddrZ'`, + apiEvents, false}, + {`tm.event = 'Tx' AND rewards.withdraw.address = 'AddrZ'`, + apiEvents, false}, + {`tm.event = 'Tx' AND rewards.withdraw.source = 'W'`, + apiEvents, false}, } - for _, tc := range testCases { - q, err := query.New(tc.s) - if !tc.err { - require.Nil(t, err) - } - require.NotNil(t, q, "Query '%s' should not be nil", tc.s) + // NOTE: The original implementation allowed arbitrary prefix matches on + // attribute tags, e.g., "sl" would match "slash". + // + // That is weird and probably wrong: "foo.ba" should not match "foo.bar", + // or there is no way to distinguish the case where there were two values + // for "foo.bar" or one value each for "foo.ba" and "foo.bar". + // + // Apart from a single test case, I could not find any attested usage of + // this implementation detail. It isn't documented in the OpenAPI docs and + // is not shown in any of the example inputs. + // + // On that basis, I removed that test case. This implementation still does + // correctly handle variable type/attribute splits ("x", "y.z" / "x.y", "z") + // since that was required by the original "flattened" event representation. - rawEvents := expandEvents(tc.events) + for i, tc := range testCases { + t.Run(fmt.Sprintf("%02d", i+1), func(t *testing.T) { + c, err := query.New(tc.s) + if err != nil { + t.Fatalf("NewCompiled %#q: unexpected error: %v", tc.s, err) + } - if tc.matches { - match, err := q.Matches(rawEvents) - require.Nil(t, err, "Query '%s' should not error on match %v", tc.s, tc.events) - require.True(t, match, "Query '%s' should match %v", tc.s, tc.events) - } else { - match, err := q.Matches(rawEvents) - require.Equal(t, tc.matchErr, err != nil, "Unexpected error for query '%s' match %v", tc.s, tc.events) - require.False(t, match, "Query '%s' should not match %v", tc.s, tc.events) - } + got, err := c.Matches(tc.events) + if err != nil { + t.Errorf("Query: %#q\nInput: %+v\nMatches: got error %v", + tc.s, tc.events, err) + } + if got != tc.matches { + t.Errorf("Query: %#q\nInput: %+v\nMatches: got %v, want %v", + tc.s, tc.events, got, tc.matches) + } + }) } } -func TestMustParse(t *testing.T) { - require.Panics(t, func() { query.MustParse("=") }) - require.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") }) +func TestAllMatchesAll(t *testing.T) { + events := newTestEvents( + ``, + `Asher|Roth=`, + `Route|66=`, + `Rilly|Blue=`, + ) + for i := 0; i < len(events); i++ { + match, err := query.All.Matches(events[:i]) + if err != nil { + t.Errorf("Matches failed: %v", err) + } else if !match { + t.Errorf("Did not match on %+v ", events[:i]) + } + } } -func TestConditions(t *testing.T) { - txTime, err := time.Parse(time.RFC3339, "2013-05-03T14:45:00Z") - require.NoError(t, err) - - testCases := []struct { - s string - conditions []query.Condition - }{ - { - s: "tm.events.type='NewBlock'", - conditions: []query.Condition{ - {CompositeKey: "tm.events.type", Op: query.OpEqual, Operand: "NewBlock"}, - }, - }, - { - s: "tx.gas > 7 AND tx.gas < 9", - conditions: []query.Condition{ - {CompositeKey: "tx.gas", Op: query.OpGreater, Operand: int64(7)}, - {CompositeKey: "tx.gas", Op: query.OpLess, Operand: int64(9)}, - }, - }, - { - s: "tx.time >= TIME 2013-05-03T14:45:00Z", - conditions: []query.Condition{ - {CompositeKey: "tx.time", Op: query.OpGreaterEqual, Operand: txTime}, - }, - }, - { - s: "slashing EXISTS", - conditions: []query.Condition{ - {CompositeKey: "slashing", Op: query.OpExists}, - }, - }, +// newTestEvent constructs an Event message from a template string. +// The format is "type|attr1=val1|attr2=val2|...". +func newTestEvent(s string) types.Event { + var event types.Event + parts := strings.Split(s, "|") + event.Type = parts[0] + if len(parts) == 1 { + return event // type only, no attributes } + for _, kv := range parts[1:] { + key, val := splitKV(kv) + event.Attributes = append(event.Attributes, types.EventAttribute{ + Key: key, + Value: val, + }) + } + return event +} - for _, tc := range testCases { - q, err := query.New(tc.s) - require.Nil(t, err) - - c, err := q.Conditions() - require.NoError(t, err) - require.Equal(t, tc.conditions, c) +// newTestEvents constructs a slice of Event messages by applying newTestEvent +// to each element of ss. +func newTestEvents(ss ...string) []types.Event { + events := make([]types.Event, len(ss)) + for i, s := range ss { + events[i] = newTestEvent(s) } + return events +} + +func splitKV(s string) (key, value string) { + kv := strings.SplitN(s, "=", 2) + return kv[0], kv[1] } diff --git a/libs/pubsub/query/syntax/doc.go b/libs/pubsub/query/syntax/doc.go new file mode 100644 index 000000000..e7a9896c4 --- /dev/null +++ b/libs/pubsub/query/syntax/doc.go @@ -0,0 +1,34 @@ +// Package syntax defines a scanner and parser for the Tendermint event filter +// query language. A query selects events by their types and attribute values. +// +// Grammar +// +// The grammar of the query language is defined by the following EBNF: +// +// query = conditions EOF +// conditions = condition {"AND" condition} +// condition = tag comparison +// comparison = equal / order / contains / "EXISTS" +// equal = "=" (date / number / time / value) +// order = cmp (date / number / time) +// contains = "CONTAINS" value +// cmp = "<" / "<=" / ">" / ">=" +// +// The lexical terms are defined here using RE2 regular expression notation: +// +// // The name of an event attribute (type.value) +// tag = #'\w+(\.\w+)*' +// +// // A datestamp (YYYY-MM-DD) +// date = #'DATE \d{4}-\d{2}-\d{2}' +// +// // A number with optional fractional parts (0, 10, 3.25) +// number = #'\d+(\.\d+)?' +// +// // An RFC3339 timestamp (2021-11-23T22:04:19-09:00) +// time = #'TIME \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}([-+]\d{2}:\d{2}|Z)' +// +// // A quoted literal string value ('a b c') +// value = #'\'[^\']*\'' +// +package syntax diff --git a/libs/pubsub/query/syntax/parser.go b/libs/pubsub/query/syntax/parser.go new file mode 100644 index 000000000..a100ec79c --- /dev/null +++ b/libs/pubsub/query/syntax/parser.go @@ -0,0 +1,213 @@ +package syntax + +import ( + "fmt" + "io" + "math" + "strconv" + "strings" + "time" +) + +// Parse parses the specified query string. It is shorthand for constructing a +// parser for s and calling its Parse method. +func Parse(s string) (Query, error) { + return NewParser(strings.NewReader(s)).Parse() +} + +// Query is the root of the parse tree for a query. A query is the conjunction +// of one or more conditions. +type Query []Condition + +func (q Query) String() string { + ss := make([]string, len(q)) + for i, cond := range q { + ss[i] = cond.String() + } + return strings.Join(ss, " AND ") +} + +// A Condition is a single conditional expression, consisting of a tag, a +// comparison operator, and an optional argument. The type of the argument +// depends on the operator. +type Condition struct { + Tag string + Op Token + Arg *Arg + + opText string +} + +func (c Condition) String() string { + s := c.Tag + " " + c.opText + if c.Arg != nil { + return s + " " + c.Arg.String() + } + return s +} + +// An Arg is the argument of a comparison operator. +type Arg struct { + Type Token + text string +} + +func (a *Arg) String() string { + if a == nil { + return "" + } + switch a.Type { + case TString: + return "'" + a.text + "'" + case TTime: + return "TIME " + a.text + case TDate: + return "DATE " + a.text + default: + return a.text + } +} + +// Number returns the value of the argument text as a number, or a NaN if the +// text does not encode a valid number value. +func (a *Arg) Number() float64 { + if a == nil { + return -1 + } + v, err := strconv.ParseFloat(a.text, 64) + if err == nil && v >= 0 { + return v + } + return math.NaN() +} + +// Time returns the value of the argument text as a time, or the zero value if +// the text does not encode a timestamp or datestamp. +func (a *Arg) Time() time.Time { + var ts time.Time + if a == nil { + return ts + } + var err error + switch a.Type { + case TDate: + ts, err = ParseDate(a.text) + case TTime: + ts, err = ParseTime(a.text) + } + if err == nil { + return ts + } + return time.Time{} +} + +// Value returns the value of the argument text as a string, or "". +func (a *Arg) Value() string { + if a == nil { + return "" + } + return a.text +} + +// Parser is a query expression parser. The grammar for query expressions is +// defined in the syntax package documentation. +type Parser struct { + scanner *Scanner +} + +// NewParser constructs a new parser that reads the input from r. +func NewParser(r io.Reader) *Parser { + return &Parser{scanner: NewScanner(r)} +} + +// Parse parses the complete input and returns the resulting query. +func (p *Parser) Parse() (Query, error) { + cond, err := p.parseCond() + if err != nil { + return nil, err + } + conds := []Condition{cond} + for p.scanner.Next() != io.EOF { + if tok := p.scanner.Token(); tok != TAnd { + return nil, fmt.Errorf("offset %d: got %v, want %v", p.scanner.Pos(), tok, TAnd) + } + cond, err := p.parseCond() + if err != nil { + return nil, err + } + conds = append(conds, cond) + } + return conds, nil +} + +// parseCond parses a conditional expression: tag OP value. +func (p *Parser) parseCond() (Condition, error) { + var cond Condition + if err := p.require(TTag); err != nil { + return cond, err + } + cond.Tag = p.scanner.Text() + if err := p.require(TLeq, TGeq, TLt, TGt, TEq, TContains, TExists); err != nil { + return cond, err + } + cond.Op = p.scanner.Token() + cond.opText = p.scanner.Text() + + var err error + switch cond.Op { + case TLeq, TGeq, TLt, TGt: + err = p.require(TNumber, TTime, TDate) + case TEq: + err = p.require(TNumber, TTime, TDate, TString) + case TContains: + err = p.require(TString) + case TExists: + // no argument + return cond, nil + default: + return cond, fmt.Errorf("offset %d: unexpected operator %v", p.scanner.Pos(), cond.Op) + } + if err != nil { + return cond, err + } + cond.Arg = &Arg{Type: p.scanner.Token(), text: p.scanner.Text()} + return cond, nil +} + +// require advances the scanner and requires that the resulting token is one of +// the specified token types. +func (p *Parser) require(tokens ...Token) error { + if err := p.scanner.Next(); err != nil { + return fmt.Errorf("offset %d: %w", p.scanner.Pos(), err) + } + got := p.scanner.Token() + for _, tok := range tokens { + if tok == got { + return nil + } + } + return fmt.Errorf("offset %d: got %v, wanted %s", p.scanner.Pos(), got, tokLabel(tokens)) +} + +// tokLabel makes a human-readable summary string for the given token types. +func tokLabel(tokens []Token) string { + if len(tokens) == 1 { + return tokens[0].String() + } + last := len(tokens) - 1 + ss := make([]string, len(tokens)-1) + for i, tok := range tokens[:last] { + ss[i] = tok.String() + } + return strings.Join(ss, ", ") + " or " + tokens[last].String() +} + +// ParseDate parses s as a date string in the format used by DATE values. +func ParseDate(s string) (time.Time, error) { + return time.Parse("2006-01-02", s) +} + +// ParseTime parses s as a timestamp in the format used by TIME values. +func ParseTime(s string) (time.Time, error) { + return time.Parse(time.RFC3339, s) +} diff --git a/libs/pubsub/query/syntax/scanner.go b/libs/pubsub/query/syntax/scanner.go new file mode 100644 index 000000000..332e3f7b1 --- /dev/null +++ b/libs/pubsub/query/syntax/scanner.go @@ -0,0 +1,312 @@ +package syntax + +import ( + "bufio" + "bytes" + "fmt" + "io" + "strings" + "time" + "unicode" +) + +// Token is the type of a lexical token in the query grammar. +type Token byte + +const ( + TInvalid = iota // invalid or unknown token + TTag // field tag: x.y + TString // string value: 'foo bar' + TNumber // number: 0, 15.5, 100 + TTime // timestamp: TIME yyyy-mm-ddThh:mm:ss([-+]hh:mm|Z) + TDate // datestamp: DATE yyyy-mm-dd + TAnd // operator: AND + TContains // operator: CONTAINS + TExists // operator: EXISTS + TEq // operator: = + TLt // operator: < + TLeq // operator: <= + TGt // operator: > + TGeq // operator: >= + + // Do not reorder these values without updating the scanner code. +) + +var tString = [...]string{ + TInvalid: "invalid token", + TTag: "tag", + TString: "string", + TNumber: "number", + TTime: "timestamp", + TDate: "datestamp", + TAnd: "AND operator", + TContains: "CONTAINS operator", + TExists: "EXISTS operator", + TEq: "= operator", + TLt: "< operator", + TLeq: "<= operator", + TGt: "> operator", + TGeq: ">= operator", +} + +func (t Token) String() string { + v := int(t) + if v > len(tString) { + return "unknown token type" + } + return tString[v] +} + +const ( + // TimeFormat is the format string used for timestamp values. + TimeFormat = time.RFC3339 + + // DateFormat is the format string used for datestamp values. + DateFormat = "2006-01-02" +) + +// Scanner reads lexical tokens of the query language from an input stream. +// Each call to Next advances the scanner to the next token, or reports an +// error. +type Scanner struct { + r *bufio.Reader + buf bytes.Buffer + tok Token + err error + + pos, last, end int +} + +// NewScanner constructs a new scanner that reads from r. +func NewScanner(r io.Reader) *Scanner { return &Scanner{r: bufio.NewReader(r)} } + +// Next advances s to the next token in the input, or reports an error. At the +// end of input, Next returns io.EOF. +func (s *Scanner) Next() error { + s.buf.Reset() + s.pos = s.end + s.tok = TInvalid + s.err = nil + + for { + ch, err := s.rune() + if err != nil { + return s.fail(err) + } + if unicode.IsSpace(ch) { + s.pos = s.end + continue // skip whitespace + } + if '0' <= ch && ch <= '9' { + return s.scanNumber(ch) + } else if isTagRune(ch) { + return s.scanTagLike(ch) + } + switch ch { + case '\'': + return s.scanString(ch) + case '<', '>', '=': + return s.scanCompare(ch) + default: + return s.invalid(ch) + } + } +} + +// Token returns the type of the current input token. +func (s *Scanner) Token() Token { return s.tok } + +// Text returns the text of the current input token. +func (s *Scanner) Text() string { return s.buf.String() } + +// Pos returns the start offset of the current token in the input. +func (s *Scanner) Pos() int { return s.pos } + +// Err returns the last error reported by Next, if any. +func (s *Scanner) Err() error { return s.err } + +// scanNumber scans for numbers with optional fractional parts. +// Examples: 0, 1, 3.14 +func (s *Scanner) scanNumber(first rune) error { + s.buf.WriteRune(first) + if err := s.scanWhile(isDigit); err != nil { + return err + } + + ch, err := s.rune() + if err != nil && err != io.EOF { + return err + } + if ch == '.' { + s.buf.WriteRune(ch) + if err := s.scanWhile(isDigit); err != nil { + return err + } + } else { + s.unrune() + } + s.tok = TNumber + return nil +} + +func (s *Scanner) scanString(first rune) error { + // discard opening quote + for { + ch, err := s.rune() + if err != nil { + return s.fail(err) + } else if ch == first { + // discard closing quote + s.tok = TString + return nil + } + s.buf.WriteRune(ch) + } +} + +func (s *Scanner) scanCompare(first rune) error { + s.buf.WriteRune(first) + switch first { + case '=': + s.tok = TEq + return nil + case '<': + s.tok = TLt + case '>': + s.tok = TGt + default: + return s.invalid(first) + } + + ch, err := s.rune() + if err == io.EOF { + return nil // the assigned token is correct + } else if err != nil { + return s.fail(err) + } + if ch == '=' { + s.buf.WriteRune(ch) + s.tok++ // depends on token order + return nil + } + s.unrune() + return nil +} + +func (s *Scanner) scanTagLike(first rune) error { + s.buf.WriteRune(first) + var hasSpace bool + for { + ch, err := s.rune() + if err == io.EOF { + break + } else if err != nil { + return s.fail(err) + } + if !isTagRune(ch) { + hasSpace = ch == ' ' // to check for TIME, DATE + break + } + s.buf.WriteRune(ch) + } + + text := s.buf.String() + switch text { + case "TIME": + if hasSpace { + return s.scanTimestamp() + } + s.tok = TTag + case "DATE": + if hasSpace { + return s.scanDatestamp() + } + s.tok = TTag + case "AND": + s.tok = TAnd + case "EXISTS": + s.tok = TExists + case "CONTAINS": + s.tok = TContains + default: + s.tok = TTag + } + s.unrune() + return nil +} + +func (s *Scanner) scanTimestamp() error { + s.buf.Reset() // discard "TIME" label + if err := s.scanWhile(isTimeRune); err != nil { + return err + } + if ts, err := time.Parse(TimeFormat, s.buf.String()); err != nil { + return s.fail(fmt.Errorf("invalid TIME value: %w", err)) + } else if y := ts.Year(); y < 1900 || y > 2999 { + return s.fail(fmt.Errorf("timestamp year %d out of range", ts.Year())) + } + s.tok = TTime + return nil +} + +func (s *Scanner) scanDatestamp() error { + s.buf.Reset() // discard "DATE" label + if err := s.scanWhile(isDateRune); err != nil { + return err + } + if ts, err := time.Parse(DateFormat, s.buf.String()); err != nil { + return s.fail(fmt.Errorf("invalid DATE value: %w", err)) + } else if y := ts.Year(); y < 1900 || y > 2999 { + return s.fail(fmt.Errorf("datestamp year %d out of range", ts.Year())) + } + s.tok = TDate + return nil +} + +func (s *Scanner) scanWhile(ok func(rune) bool) error { + for { + ch, err := s.rune() + if err == io.EOF { + return nil + } else if err != nil { + return s.fail(err) + } else if !ok(ch) { + s.unrune() + return nil + } + s.buf.WriteRune(ch) + } +} + +func (s *Scanner) rune() (rune, error) { + ch, nb, err := s.r.ReadRune() + s.last = nb + s.end += nb + return ch, err +} + +func (s *Scanner) unrune() { + _ = s.r.UnreadRune() + s.end -= s.last +} + +func (s *Scanner) fail(err error) error { + s.err = err + return err +} + +func (s *Scanner) invalid(ch rune) error { + return s.fail(fmt.Errorf("invalid input %c at offset %d", ch, s.end)) +} + +func isDigit(r rune) bool { return '0' <= r && r <= '9' } + +func isTagRune(r rune) bool { + return r == '.' || r == '_' || unicode.IsLetter(r) || unicode.IsDigit(r) +} + +func isTimeRune(r rune) bool { + return strings.ContainsRune("-T:+Z", r) || isDigit(r) +} + +func isDateRune(r rune) bool { return isDigit(r) || r == '-' } diff --git a/libs/pubsub/query/syntax/syntax_test.go b/libs/pubsub/query/syntax/syntax_test.go new file mode 100644 index 000000000..ac95fd8b1 --- /dev/null +++ b/libs/pubsub/query/syntax/syntax_test.go @@ -0,0 +1,190 @@ +package syntax_test + +import ( + "io" + "reflect" + "strings" + "testing" + + "github.com/tendermint/tendermint/libs/pubsub/query/syntax" +) + +func TestScanner(t *testing.T) { + tests := []struct { + input string + want []syntax.Token + }{ + // Empty inputs + {"", nil}, + {" ", nil}, + {"\t\n ", nil}, + + // Numbers + {`0 123`, []syntax.Token{syntax.TNumber, syntax.TNumber}}, + {`0.32 3.14`, []syntax.Token{syntax.TNumber, syntax.TNumber}}, + + // Tags + {`foo foo.bar`, []syntax.Token{syntax.TTag, syntax.TTag}}, + + // Strings (values) + {` '' x 'x' 'x y'`, []syntax.Token{syntax.TString, syntax.TTag, syntax.TString, syntax.TString}}, + {` 'you are not your job' `, []syntax.Token{syntax.TString}}, + + // Comparison operators + {`< <= = > >=`, []syntax.Token{ + syntax.TLt, syntax.TLeq, syntax.TEq, syntax.TGt, syntax.TGeq, + }}, + + // Mixed values of various kinds. + {`x AND y`, []syntax.Token{syntax.TTag, syntax.TAnd, syntax.TTag}}, + {`x.y CONTAINS 'z'`, []syntax.Token{syntax.TTag, syntax.TContains, syntax.TString}}, + {`foo EXISTS`, []syntax.Token{syntax.TTag, syntax.TExists}}, + {`and AND`, []syntax.Token{syntax.TTag, syntax.TAnd}}, + + // Timestamp + {`TIME 2021-11-23T15:16:17Z`, []syntax.Token{syntax.TTime}}, + + // Datestamp + {`DATE 2021-11-23`, []syntax.Token{syntax.TDate}}, + } + + for _, test := range tests { + s := syntax.NewScanner(strings.NewReader(test.input)) + var got []syntax.Token + for s.Next() == nil { + got = append(got, s.Token()) + } + if err := s.Err(); err != io.EOF { + t.Errorf("Next: unexpected error: %v", err) + } + + if !reflect.DeepEqual(got, test.want) { + t.Logf("Scanner input: %q", test.input) + t.Errorf("Wrong tokens:\ngot: %+v\nwant: %+v", got, test.want) + } + } +} + +func TestScannerErrors(t *testing.T) { + tests := []struct { + input string + }{ + {`'incomplete string`}, + {`-23`}, + {`&`}, + {`DATE xyz-pdq`}, + {`DATE xyzp-dq-zv`}, + {`DATE 0000-00-00`}, + {`DATE 0000-00-000`}, + {`DATE 2021-01-99`}, + {`TIME 2021-01-01T34:56:78Z`}, + {`TIME 2021-01-99T14:56:08Z`}, + {`TIME 2021-01-99T34:56:08`}, + {`TIME 2021-01-99T34:56:11+3`}, + } + for _, test := range tests { + s := syntax.NewScanner(strings.NewReader(test.input)) + if err := s.Next(); err == nil { + t.Errorf("Next: got %v (%#q), want error", s.Token(), s.Text()) + } + } +} + +// These parser tests were copied from the original implementation of the query +// parser, and are preserved here as a compatibility check. +func TestParseValid(t *testing.T) { + tests := []struct { + input string + valid bool + }{ + {"tm.events.type='NewBlock'", true}, + {"tm.events.type = 'NewBlock'", true}, + {"tm.events.name = ''", true}, + {"tm.events.type='TIME'", true}, + {"tm.events.type='DATE'", true}, + {"tm.events.type='='", true}, + {"tm.events.type='TIME", false}, + {"tm.events.type=TIME'", false}, + {"tm.events.type==", false}, + {"tm.events.type=NewBlock", false}, + {">==", false}, + {"tm.events.type 'NewBlock' =", false}, + {"tm.events.type>'NewBlock'", false}, + {"", false}, + {"=", false}, + {"='NewBlock'", false}, + {"tm.events.type=", false}, + + {"tm.events.typeNewBlock", false}, + {"tm.events.type'NewBlock'", false}, + {"'NewBlock'", false}, + {"NewBlock", false}, + {"", false}, + + {"tm.events.type='NewBlock' AND abci.account.name='Igor'", true}, + {"tm.events.type='NewBlock' AND", false}, + {"tm.events.type='NewBlock' AN", false}, + {"tm.events.type='NewBlock' AN tm.events.type='NewBlockHeader'", false}, + {"AND tm.events.type='NewBlock' ", false}, + + {"abci.account.name CONTAINS 'Igor'", true}, + + {"tx.date > DATE 2013-05-03", true}, + {"tx.date < DATE 2013-05-03", true}, + {"tx.date <= DATE 2013-05-03", true}, + {"tx.date >= DATE 2013-05-03", true}, + {"tx.date >= DAT 2013-05-03", false}, + {"tx.date <= DATE2013-05-03", false}, + {"tx.date <= DATE -05-03", false}, + {"tx.date >= DATE 20130503", false}, + {"tx.date >= DATE 2013+01-03", false}, + // incorrect year, month, day + {"tx.date >= DATE 0013-01-03", false}, + {"tx.date >= DATE 2013-31-03", false}, + {"tx.date >= DATE 2013-01-83", false}, + + {"tx.date > TIME 2013-05-03T14:45:00+07:00", true}, + {"tx.date < TIME 2013-05-03T14:45:00-02:00", true}, + {"tx.date <= TIME 2013-05-03T14:45:00Z", true}, + {"tx.date >= TIME 2013-05-03T14:45:00Z", true}, + {"tx.date >= TIME2013-05-03T14:45:00Z", false}, + {"tx.date = IME 2013-05-03T14:45:00Z", false}, + {"tx.date = TIME 2013-05-:45:00Z", false}, + {"tx.date >= TIME 2013-05-03T14:45:00", false}, + {"tx.date >= TIME 0013-00-00T14:45:00Z", false}, + {"tx.date >= TIME 2013+05=03T14:45:00Z", false}, + + {"account.balance=100", true}, + {"account.balance >= 200", true}, + {"account.balance >= -300", false}, + {"account.balance >>= 400", false}, + {"account.balance=33.22.1", false}, + + {"slashing.amount EXISTS", true}, + {"slashing.amount EXISTS AND account.balance=100", true}, + {"account.balance=100 AND slashing.amount EXISTS", true}, + {"slashing EXISTS", true}, + + {"hash='136E18F7E4C348B780CF873A0BF43922E5BAFA63'", true}, + {"hash=136E18F7E4C348B780CF873A0BF43922E5BAFA63", false}, + } + + for _, test := range tests { + q, err := syntax.Parse(test.input) + if test.valid != (err == nil) { + t.Errorf("Parse %#q: valid %v got err=%v", test.input, test.valid, err) + } + + // For valid queries, check that the query round-trips. + if test.valid { + qstr := q.String() + r, err := syntax.Parse(qstr) + if err != nil { + t.Errorf("Reparse %#q failed: %v", qstr, err) + } + if rstr := r.String(); rstr != qstr { + t.Errorf("Reparse diff\nold: %#q\nnew: %#q", qstr, rstr) + } + } + } +} diff --git a/types/events.go b/types/events.go index 46f150abd..7bb183fb7 100644 --- a/types/events.go +++ b/types/events.go @@ -232,11 +232,11 @@ var ( ) func EventQueryTxFor(tx Tx) tmpubsub.Query { - return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTxValue, TxHashKey, tx.Hash())) + return tmquery.MustCompile(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTxValue, TxHashKey, tx.Hash())) } func QueryForEvent(eventValue string) tmpubsub.Query { - return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventValue)) + return tmquery.MustCompile(fmt.Sprintf("%s='%s'", EventTypeKey, eventValue)) } // BlockEventPublisher publishes all block related events diff --git a/types/events_test.go b/types/events_test.go index dcd998ace..bd4bde264 100644 --- a/types/events_test.go +++ b/types/events_test.go @@ -10,18 +10,18 @@ import ( func TestQueryTxFor(t *testing.T) { tx := Tx("foo") assert.Equal(t, - fmt.Sprintf("tm.event='Tx' AND tx.hash='%X'", tx.Hash()), + fmt.Sprintf("tm.event = 'Tx' AND tx.hash = '%X'", tx.Hash()), EventQueryTxFor(tx).String(), ) } func TestQueryForEvent(t *testing.T) { assert.Equal(t, - "tm.event='NewBlock'", + "tm.event = 'NewBlock'", QueryForEvent(EventNewBlockValue).String(), ) assert.Equal(t, - "tm.event='NewEvidence'", + "tm.event = 'NewEvidence'", QueryForEvent(EventNewEvidenceValue).String(), ) }