Browse Source

Performance improvements for the event query API (#7338)

A manual backport of #7319 and #7336.
pull/7351/head
M. J. Fromberger 3 years ago
committed by GitHub
parent
commit
1c1ce83e2d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1438 additions and 2910 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +1
    -1
      internal/inspect/inspect_test.go
  3. +10
    -12
      internal/state/indexer/block/kv/kv.go
  4. +9
    -9
      internal/state/indexer/block/kv/kv_test.go
  5. +4
    -4
      internal/state/indexer/block/kv/util.go
  6. +29
    -15
      internal/state/indexer/query_range.go
  7. +13
    -13
      internal/state/indexer/sink/kv/kv_test.go
  8. +17
    -19
      internal/state/indexer/tx/kv/kv.go
  9. +1
    -1
      internal/state/indexer/tx/kv/kv_bench_test.go
  10. +5
    -5
      internal/state/indexer/tx/kv/kv_test.go
  11. +1
    -1
      libs/pubsub/example_test.go
  12. +23
    -23
      libs/pubsub/pubsub_test.go
  13. +0
    -11
      libs/pubsub/query/Makefile
  14. +58
    -0
      libs/pubsub/query/bench_test.go
  15. +0
    -18
      libs/pubsub/query/empty.go
  16. +0
    -55
      libs/pubsub/query/empty_test.go
  17. +0
    -30
      libs/pubsub/query/fuzz_test/main.go
  18. +0
    -97
      libs/pubsub/query/parser_test.go
  19. +0
    -3
      libs/pubsub/query/peg.go
  20. +263
    -463
      libs/pubsub/query/query.go
  21. +0
    -35
      libs/pubsub/query/query.peg
  22. +0
    -1871
      libs/pubsub/query/query.peg.go
  23. +242
    -213
      libs/pubsub/query/query_test.go
  24. +34
    -0
      libs/pubsub/query/syntax/doc.go
  25. +213
    -0
      libs/pubsub/query/syntax/parser.go
  26. +312
    -0
      libs/pubsub/query/syntax/scanner.go
  27. +190
    -0
      libs/pubsub/query/syntax/syntax_test.go
  28. +6
    -6
      types/event_bus_test.go
  29. +2
    -2
      types/events.go
  30. +3
    -3
      types/events_test.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -28,6 +28,8 @@ Special thanks to external contributors on this release:
### IMPROVEMENTS
- [\#7338](https://github.com/tendermint/tendermint/pull/7338) pubsub: Performance improvements for the event query API (backport of #7319) (@creachadair)
### BUG FIXES
- [\#7310](https://github.com/tendermint/tendermint/issues/7310) pubsub: Report a non-nil error when shutting down (fixes #7306).

+ 1
- 1
internal/inspect/inspect_test.go View File

@ -114,7 +114,7 @@ func TestBlock(t *testing.T) {
func TestTxSearch(t *testing.T) {
testHash := []byte("test")
testTx := []byte("tx")
testQuery := fmt.Sprintf("tx.hash='%s'", string(testHash))
testQuery := fmt.Sprintf("tx.hash = '%s'", string(testHash))
testTxResult := &abcitypes.TxResult{
Height: 1,
Index: 100,


+ 10
- 12
internal/state/indexer/block/kv/kv.go View File

@ -14,6 +14,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
"github.com/tendermint/tendermint/types"
)
@ -91,10 +92,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
default:
}
conditions, err := q.Conditions()
if err != nil {
return nil, fmt.Errorf("failed to parse query conditions: %w", err)
}
conditions := q.Syntax()
// If there is an exact height query, return the result immediately
// (if it exists).
@ -158,7 +156,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64,
continue
}
startKey, err := orderedcode.Append(nil, c.CompositeKey, fmt.Sprintf("%v", c.Operand))
startKey, err := orderedcode.Append(nil, c.Tag, c.Arg.Value())
if err != nil {
return nil, err
}
@ -327,7 +325,7 @@ iter:
// matched.
func (idx *BlockerIndexer) match(
ctx context.Context,
c query.Condition,
c syntax.Condition,
startKeyBz []byte,
filteredHeights map[string][]byte,
firstRun bool,
@ -342,7 +340,7 @@ func (idx *BlockerIndexer) match(
tmpHeights := make(map[string][]byte)
switch {
case c.Op == query.OpEqual:
case c.Op == syntax.TEq:
it, err := dbm.IteratePrefix(idx.store, startKeyBz)
if err != nil {
return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
@ -361,8 +359,8 @@ func (idx *BlockerIndexer) match(
return nil, err
}
case c.Op == query.OpExists:
prefix, err := orderedcode.Append(nil, c.CompositeKey)
case c.Op == syntax.TExists:
prefix, err := orderedcode.Append(nil, c.Tag)
if err != nil {
return nil, err
}
@ -389,8 +387,8 @@ func (idx *BlockerIndexer) match(
return nil, err
}
case c.Op == query.OpContains:
prefix, err := orderedcode.Append(nil, c.CompositeKey)
case c.Op == syntax.TContains:
prefix, err := orderedcode.Append(nil, c.Tag)
if err != nil {
return nil, err
}
@ -408,7 +406,7 @@ func (idx *BlockerIndexer) match(
continue
}
if strings.Contains(eventValue, c.Operand.(string)) {
if strings.Contains(eventValue, c.Arg.Value()) {
tmpHeights[string(it.Value())] = it.Value()
}


+ 9
- 9
internal/state/indexer/block/kv/kv_test.go View File

@ -94,39 +94,39 @@ func TestBlockIndexer(t *testing.T) {
results []int64
}{
"block.height = 100": {
q: query.MustParse("block.height = 100"),
q: query.MustCompile(`block.height = 100`),
results: []int64{},
},
"block.height = 5": {
q: query.MustParse("block.height = 5"),
q: query.MustCompile(`block.height = 5`),
results: []int64{5},
},
"begin_event.key1 = 'value1'": {
q: query.MustParse("begin_event.key1 = 'value1'"),
q: query.MustCompile(`begin_event.key1 = 'value1'`),
results: []int64{},
},
"begin_event.proposer = 'FCAA001'": {
q: query.MustParse("begin_event.proposer = 'FCAA001'"),
q: query.MustCompile(`begin_event.proposer = 'FCAA001'`),
results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
},
"end_event.foo <= 5": {
q: query.MustParse("end_event.foo <= 5"),
q: query.MustCompile(`end_event.foo <= 5`),
results: []int64{2, 4},
},
"end_event.foo >= 100": {
q: query.MustParse("end_event.foo >= 100"),
q: query.MustCompile(`end_event.foo >= 100`),
results: []int64{1},
},
"block.height > 2 AND end_event.foo <= 8": {
q: query.MustParse("block.height > 2 AND end_event.foo <= 8"),
q: query.MustCompile(`block.height > 2 AND end_event.foo <= 8`),
results: []int64{4, 6, 8},
},
"begin_event.proposer CONTAINS 'FFFFFFF'": {
q: query.MustParse("begin_event.proposer CONTAINS 'FFFFFFF'"),
q: query.MustCompile(`begin_event.proposer CONTAINS 'FFFFFFF'`),
results: []int64{},
},
"begin_event.proposer CONTAINS 'FCAA001'": {
q: query.MustParse("begin_event.proposer CONTAINS 'FCAA001'"),
q: query.MustCompile(`begin_event.proposer CONTAINS 'FCAA001'`),
results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
},
}


+ 4
- 4
internal/state/indexer/block/kv/util.go View File

@ -6,7 +6,7 @@ import (
"strconv"
"github.com/google/orderedcode"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
"github.com/tendermint/tendermint/types"
)
@ -85,10 +85,10 @@ func parseValueFromEventKey(key []byte) (string, error) {
return eventValue, nil
}
func lookForHeight(conditions []query.Condition) (int64, bool) {
func lookForHeight(conditions []syntax.Condition) (int64, bool) {
for _, c := range conditions {
if c.CompositeKey == types.BlockHeightKey && c.Op == query.OpEqual {
return c.Operand.(int64), true
if c.Tag == types.BlockHeightKey && c.Op == syntax.TEq {
return int64(c.Arg.Number()), true
}
}


+ 29
- 15
internal/state/indexer/query_range.go View File

@ -3,7 +3,7 @@ package indexer
import (
"time"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
)
// QueryRanges defines a mapping between a composite event key and a QueryRange.
@ -77,32 +77,32 @@ func (qr QueryRange) UpperBoundValue() interface{} {
// LookForRanges returns a mapping of QueryRanges and the matching indexes in
// the provided query conditions.
func LookForRanges(conditions []query.Condition) (ranges QueryRanges, indexes []int) {
func LookForRanges(conditions []syntax.Condition) (ranges QueryRanges, indexes []int) {
ranges = make(QueryRanges)
for i, c := range conditions {
if IsRangeOperation(c.Op) {
r, ok := ranges[c.CompositeKey]
r, ok := ranges[c.Tag]
if !ok {
r = QueryRange{Key: c.CompositeKey}
r = QueryRange{Key: c.Tag}
}
switch c.Op {
case query.OpGreater:
r.LowerBound = c.Operand
case syntax.TGt:
r.LowerBound = conditionArg(c)
case query.OpGreaterEqual:
case syntax.TGeq:
r.IncludeLowerBound = true
r.LowerBound = c.Operand
r.LowerBound = conditionArg(c)
case query.OpLess:
r.UpperBound = c.Operand
case syntax.TLt:
r.UpperBound = conditionArg(c)
case query.OpLessEqual:
case syntax.TLeq:
r.IncludeUpperBound = true
r.UpperBound = c.Operand
r.UpperBound = conditionArg(c)
}
ranges[c.CompositeKey] = r
ranges[c.Tag] = r
indexes = append(indexes, i)
}
}
@ -112,12 +112,26 @@ func LookForRanges(conditions []query.Condition) (ranges QueryRanges, indexes []
// IsRangeOperation returns a boolean signifying if a query Operator is a range
// operation or not.
func IsRangeOperation(op query.Operator) bool {
func IsRangeOperation(op syntax.Token) bool {
switch op {
case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual:
case syntax.TGt, syntax.TGeq, syntax.TLt, syntax.TLeq:
return true
default:
return false
}
}
func conditionArg(c syntax.Condition) interface{} {
if c.Arg == nil {
return nil
}
switch c.Arg.Type {
case syntax.TNumber:
return int64(c.Arg.Number())
case syntax.TTime, syntax.TDate:
return c.Arg.Time()
default:
return c.Arg.Value() // string
}
}

+ 13
- 13
internal/state/indexer/sink/kv/kv_test.go View File

@ -111,39 +111,39 @@ func TestBlockFuncs(t *testing.T) {
results []int64
}{
"block.height = 100": {
q: query.MustParse("block.height = 100"),
q: query.MustCompile(`block.height = 100`),
results: []int64{},
},
"block.height = 5": {
q: query.MustParse("block.height = 5"),
q: query.MustCompile(`block.height = 5`),
results: []int64{5},
},
"begin_event.key1 = 'value1'": {
q: query.MustParse("begin_event.key1 = 'value1'"),
q: query.MustCompile(`begin_event.key1 = 'value1'`),
results: []int64{},
},
"begin_event.proposer = 'FCAA001'": {
q: query.MustParse("begin_event.proposer = 'FCAA001'"),
q: query.MustCompile(`begin_event.proposer = 'FCAA001'`),
results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
},
"end_event.foo <= 5": {
q: query.MustParse("end_event.foo <= 5"),
q: query.MustCompile(`end_event.foo <= 5`),
results: []int64{2, 4},
},
"end_event.foo >= 100": {
q: query.MustParse("end_event.foo >= 100"),
q: query.MustCompile(`end_event.foo >= 100`),
results: []int64{1},
},
"block.height > 2 AND end_event.foo <= 8": {
q: query.MustParse("block.height > 2 AND end_event.foo <= 8"),
q: query.MustCompile(`block.height > 2 AND end_event.foo <= 8`),
results: []int64{4, 6, 8},
},
"begin_event.proposer CONTAINS 'FFFFFFF'": {
q: query.MustParse("begin_event.proposer CONTAINS 'FFFFFFF'"),
q: query.MustCompile(`begin_event.proposer CONTAINS 'FFFFFFF'`),
results: []int64{},
},
"begin_event.proposer CONTAINS 'FCAA001'": {
q: query.MustParse("begin_event.proposer CONTAINS 'FCAA001'"),
q: query.MustCompile(`begin_event.proposer CONTAINS 'FCAA001'`),
results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
},
}
@ -175,7 +175,7 @@ func TestTxSearchWithCancelation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
results, err := indexer.SearchTxEvents(ctx, query.MustParse("account.number = 1"))
results, err := indexer.SearchTxEvents(ctx, query.MustCompile(`account.number = 1`))
assert.NoError(t, err)
assert.Empty(t, results)
}
@ -249,7 +249,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.q, func(t *testing.T) {
results, err := indexer.SearchTxEvents(ctx, query.MustParse(tc.q))
results, err := indexer.SearchTxEvents(ctx, query.MustCompile(tc.q))
require.NoError(t, err)
for _, txr := range results {
for _, tr := range tc.results {
@ -273,7 +273,7 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
ctx := context.Background()
results, err := indexer.SearchTxEvents(ctx, query.MustParse("account.number >= 1"))
results, err := indexer.SearchTxEvents(ctx, query.MustCompile(`account.number >= 1`))
assert.NoError(t, err)
assert.Len(t, results, 1)
@ -330,7 +330,7 @@ func TestTxSearchMultipleTxs(t *testing.T) {
ctx := context.Background()
results, err := indexer.SearchTxEvents(ctx, query.MustParse("account.number >= 1"))
results, err := indexer.SearchTxEvents(ctx, query.MustCompile(`account.number >= 1`))
assert.NoError(t, err)
require.Len(t, results, 3)


+ 17
- 19
internal/state/indexer/tx/kv/kv.go View File

@ -14,6 +14,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
indexer "github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
"github.com/tendermint/tendermint/types"
)
@ -148,10 +149,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
filteredHashes := make(map[string][]byte)
// get a list of conditions (like "tx.height > 5")
conditions, err := q.Conditions()
if err != nil {
return nil, fmt.Errorf("error during parsing conditions from query: %w", err)
}
conditions := q.Syntax()
// if there is a hash condition, return the result immediately
hash, ok, err := lookForHash(conditions)
@ -238,10 +236,10 @@ hashes:
return results, nil
}
func lookForHash(conditions []query.Condition) (hash []byte, ok bool, err error) {
func lookForHash(conditions []syntax.Condition) (hash []byte, ok bool, err error) {
for _, c := range conditions {
if c.CompositeKey == types.TxHashKey {
decoded, err := hex.DecodeString(c.Operand.(string))
if c.Tag == types.TxHashKey {
decoded, err := hex.DecodeString(c.Arg.Value())
return decoded, true, err
}
}
@ -249,10 +247,10 @@ func lookForHash(conditions []query.Condition) (hash []byte, ok bool, err error)
}
// lookForHeight returns a height if there is an "height=X" condition.
func lookForHeight(conditions []query.Condition) (height int64) {
func lookForHeight(conditions []syntax.Condition) (height int64) {
for _, c := range conditions {
if c.CompositeKey == types.TxHeightKey && c.Op == query.OpEqual {
return c.Operand.(int64)
if c.Tag == types.TxHeightKey && c.Op == syntax.TEq {
return int64(c.Arg.Number())
}
}
return 0
@ -265,7 +263,7 @@ func lookForHeight(conditions []query.Condition) (height int64) {
// NOTE: filteredHashes may be empty if no previous condition has matched.
func (txi *TxIndex) match(
ctx context.Context,
c query.Condition,
c syntax.Condition,
startKeyBz []byte,
filteredHashes map[string][]byte,
firstRun bool,
@ -279,7 +277,7 @@ func (txi *TxIndex) match(
tmpHashes := make(map[string][]byte)
switch {
case c.Op == query.OpEqual:
case c.Op == syntax.TEq:
it, err := dbm.IteratePrefix(txi.store, startKeyBz)
if err != nil {
panic(err)
@ -301,10 +299,10 @@ func (txi *TxIndex) match(
panic(err)
}
case c.Op == query.OpExists:
case c.Op == syntax.TExists:
// XXX: can't use startKeyBz here because c.Operand is nil
// (e.g. "account.owner/<nil>/" won't match w/ a single row)
it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.CompositeKey))
it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.Tag))
if err != nil {
panic(err)
}
@ -325,11 +323,11 @@ func (txi *TxIndex) match(
panic(err)
}
case c.Op == query.OpContains:
case c.Op == syntax.TContains:
// XXX: startKey does not apply here.
// For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an"
// we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/"
it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.CompositeKey))
it, err := dbm.IteratePrefix(txi.store, prefixFromCompositeKey(c.Tag))
if err != nil {
panic(err)
}
@ -341,7 +339,7 @@ func (txi *TxIndex) match(
if err != nil {
continue
}
if strings.Contains(value, c.Operand.(string)) {
if strings.Contains(value, c.Arg.Value()) {
tmpHashes[string(it.Value())] = it.Value()
}
@ -577,8 +575,8 @@ func prefixFromCompositeKeyAndValue(compositeKey, value string) []byte {
}
// a small utility function for getting a keys prefix based on a condition and a height
func prefixForCondition(c query.Condition, height int64) []byte {
key := prefixFromCompositeKeyAndValue(c.CompositeKey, fmt.Sprintf("%v", c.Operand))
func prefixForCondition(c syntax.Condition, height int64) []byte {
key := prefixFromCompositeKeyAndValue(c.Tag, c.Arg.Value())
if height > 0 {
var err error
key, err = orderedcode.Append(key, height)


+ 1
- 1
internal/state/indexer/tx/kv/kv_bench_test.go View File

@ -60,7 +60,7 @@ func BenchmarkTxSearch(b *testing.B) {
}
}
txQuery := query.MustParse("transfer.address = 'address_43' AND transfer.amount = 50")
txQuery := query.MustCompile(`transfer.address = 'address_43' AND transfer.amount = 50`)
b.ResetTimer()


+ 5
- 5
internal/state/indexer/tx/kv/kv_test.go View File

@ -132,7 +132,7 @@ func TestTxSearch(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.q, func(t *testing.T) {
results, err := indexer.Search(ctx, query.MustParse(tc.q))
results, err := indexer.Search(ctx, query.MustCompile(tc.q))
assert.NoError(t, err)
assert.Len(t, results, tc.resultsLength)
@ -158,7 +158,7 @@ func TestTxSearchWithCancelation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
results, err := indexer.Search(ctx, query.MustParse("account.number = 1"))
results, err := indexer.Search(ctx, query.MustCompile(`account.number = 1`))
assert.NoError(t, err)
assert.Empty(t, results)
}
@ -231,7 +231,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.q, func(t *testing.T) {
results, err := indexer.Search(ctx, query.MustParse(tc.q))
results, err := indexer.Search(ctx, query.MustCompile(tc.q))
require.NoError(t, err)
for _, txr := range results {
for _, tr := range tc.results {
@ -255,7 +255,7 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
ctx := context.Background()
results, err := indexer.Search(ctx, query.MustParse("account.number >= 1"))
results, err := indexer.Search(ctx, query.MustCompile(`account.number >= 1`))
assert.NoError(t, err)
assert.Len(t, results, 1)
@ -312,7 +312,7 @@ func TestTxSearchMultipleTxs(t *testing.T) {
ctx := context.Background()
results, err := indexer.Search(ctx, query.MustParse("account.number >= 1"))
results, err := indexer.Search(ctx, query.MustCompile(`account.number >= 1`))
assert.NoError(t, err)
require.Len(t, results, 3)


+ 1
- 1
libs/pubsub/example_test.go View File

@ -26,7 +26,7 @@ func TestExample(t *testing.T) {
ctx := context.Background()
subscription, err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"))
subscription, err := s.Subscribe(ctx, "example-client", query.MustCompile("abci.account.name='John'"))
require.NoError(t, err)
events := []abci.Event{


+ 23
- 23
libs/pubsub/pubsub_test.go View File

@ -33,7 +33,7 @@ func TestSubscribe(t *testing.T) {
})
ctx := context.Background()
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
subscription, err := s.Subscribe(ctx, clientID, query.All)
require.NoError(t, err)
require.Equal(t, 1, s.NumClients())
@ -79,14 +79,14 @@ func TestSubscribeWithCapacity(t *testing.T) {
ctx := context.Background()
require.Panics(t, func() {
_, err = s.Subscribe(ctx, clientID, query.Empty{}, -1)
_, err = s.Subscribe(ctx, clientID, query.All, -1)
require.NoError(t, err)
})
require.Panics(t, func() {
_, err = s.Subscribe(ctx, clientID, query.Empty{}, 0)
_, err = s.Subscribe(ctx, clientID, query.All, 0)
require.NoError(t, err)
})
subscription, err := s.Subscribe(ctx, clientID, query.Empty{}, 1)
subscription, err := s.Subscribe(ctx, clientID, query.All, 1)
require.NoError(t, err)
err = s.Publish(ctx, "Aggamon")
require.NoError(t, err)
@ -105,7 +105,7 @@ func TestSubscribeUnbuffered(t *testing.T) {
})
ctx := context.Background()
subscription, err := s.SubscribeUnbuffered(ctx, clientID, query.Empty{})
subscription, err := s.SubscribeUnbuffered(ctx, clientID, query.All)
require.NoError(t, err)
published := make(chan struct{})
@ -140,7 +140,7 @@ func TestSlowClientIsRemovedWithErrOutOfCapacity(t *testing.T) {
})
ctx := context.Background()
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
subscription, err := s.Subscribe(ctx, clientID, query.All)
require.NoError(t, err)
err = s.Publish(ctx, "Fat Cobra")
require.NoError(t, err)
@ -163,7 +163,7 @@ func TestDifferentClients(t *testing.T) {
ctx := context.Background()
subscription1, err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"))
subscription1, err := s.Subscribe(ctx, "client-1", query.MustCompile("tm.events.type='NewBlock'"))
require.NoError(t, err)
events := []abci.Event{
@ -179,7 +179,7 @@ func TestDifferentClients(t *testing.T) {
subscription2, err := s.Subscribe(
ctx,
"client-2",
query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"),
query.MustCompile("tm.events.type='NewBlock' AND abci.account.name='Igor'"),
)
require.NoError(t, err)
@ -201,7 +201,7 @@ func TestDifferentClients(t *testing.T) {
subscription3, err := s.Subscribe(
ctx,
"client-3",
query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"),
query.MustCompile("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"),
)
require.NoError(t, err)
@ -252,7 +252,7 @@ func TestSubscribeDuplicateKeys(t *testing.T) {
}
for i, tc := range testCases {
sub, err := s.Subscribe(ctx, fmt.Sprintf("client-%d", i), query.MustParse(tc.query))
sub, err := s.Subscribe(ctx, fmt.Sprintf("client-%d", i), query.MustCompile(tc.query))
require.NoError(t, err)
events := []abci.Event{
@ -296,7 +296,7 @@ func TestClientSubscribesTwice(t *testing.T) {
})
ctx := context.Background()
q := query.MustParse("tm.events.type='NewBlock'")
q := query.MustCompile("tm.events.type='NewBlock'")
subscription1, err := s.Subscribe(ctx, clientID, q)
require.NoError(t, err)
@ -331,11 +331,11 @@ func TestUnsubscribe(t *testing.T) {
})
ctx := context.Background()
subscription, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
subscription, err := s.Subscribe(ctx, clientID, query.MustCompile("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{
Subscriber: clientID,
Query: query.MustParse("tm.events.type='NewBlock'")})
Query: query.MustCompile("tm.events.type='NewBlock'")})
require.NoError(t, err)
err = s.Publish(ctx, "Nick Fury")
@ -357,16 +357,16 @@ func TestClientUnsubscribesTwice(t *testing.T) {
})
ctx := context.Background()
_, err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
_, err = s.Subscribe(ctx, clientID, query.MustCompile("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{
Subscriber: clientID,
Query: query.MustParse("tm.events.type='NewBlock'")})
Query: query.MustCompile("tm.events.type='NewBlock'")})
require.NoError(t, err)
err = s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{
Subscriber: clientID,
Query: query.MustParse("tm.events.type='NewBlock'")})
Query: query.MustCompile("tm.events.type='NewBlock'")})
require.Equal(t, pubsub.ErrSubscriptionNotFound, err)
err = s.UnsubscribeAll(ctx, clientID)
require.Equal(t, pubsub.ErrSubscriptionNotFound, err)
@ -384,11 +384,11 @@ func TestResubscribe(t *testing.T) {
})
ctx := context.Background()
_, err = s.Subscribe(ctx, clientID, query.Empty{})
_, err = s.Subscribe(ctx, clientID, query.All)
require.NoError(t, err)
err = s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{Subscriber: clientID, Query: query.Empty{}})
err = s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{Subscriber: clientID, Query: query.All})
require.NoError(t, err)
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
subscription, err := s.Subscribe(ctx, clientID, query.All)
require.NoError(t, err)
err = s.Publish(ctx, "Cable")
@ -408,9 +408,9 @@ func TestUnsubscribeAll(t *testing.T) {
})
ctx := context.Background()
subscription1, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
subscription1, err := s.Subscribe(ctx, clientID, query.MustCompile("tm.events.type='NewBlock'"))
require.NoError(t, err)
subscription2, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"))
subscription2, err := s.Subscribe(ctx, clientID, query.MustCompile("tm.events.type='NewBlockHeader'"))
require.NoError(t, err)
err = s.UnsubscribeAll(ctx, clientID)
@ -470,7 +470,7 @@ func benchmarkNClients(n int, b *testing.B) {
subscription, err := s.Subscribe(
ctx,
clientID,
query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)),
query.MustCompile(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)),
)
if err != nil {
b.Fatal(err)
@ -516,7 +516,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
})
ctx := context.Background()
q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1")
q := query.MustCompile("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1")
for i := 0; i < n; i++ {
id := fmt.Sprintf("clientID-%d", i+1)
subscription, err := s.Subscribe(ctx, id, q)


+ 0
- 11
libs/pubsub/query/Makefile View File

@ -1,11 +0,0 @@
gen_query_parser:
go get -u -v github.com/pointlander/peg
peg -inline -switch query.peg
fuzzy_test:
go get -u -v github.com/dvyukov/go-fuzz/go-fuzz
go get -u -v github.com/dvyukov/go-fuzz/go-fuzz-build
go-fuzz-build github.com/tendermint/tendermint/libs/pubsub/query/fuzz_test
go-fuzz -bin=./fuzz_test-fuzz.zip -workdir=./fuzz_test/output
.PHONY: gen_query_parser fuzzy_test

+ 58
- 0
libs/pubsub/query/bench_test.go View File

@ -0,0 +1,58 @@
package query_test
import (
"testing"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
const testQuery = `tm.events.type='NewBlock' AND abci.account.name='Igor'`
var testEvents = []types.Event{
{
Type: "tm.events",
Attributes: []types.EventAttribute{{
Key: "index",
Value: "25",
}, {
Key: "type",
Value: "NewBlock",
}},
},
{
Type: "abci.account",
Attributes: []types.EventAttribute{{
Key: "name",
Value: "Anya",
}, {
Key: "name",
Value: "Igor",
}},
},
}
func BenchmarkParseCustom(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := query.New(testQuery)
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkMatchCustom(b *testing.B) {
q, err := query.New(testQuery)
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
ok, err := q.Matches(testEvents)
if err != nil {
b.Fatal(err)
} else if !ok {
b.Error("no match")
}
}
}

+ 0
- 18
libs/pubsub/query/empty.go View File

@ -1,18 +0,0 @@
package query
import (
"github.com/tendermint/tendermint/abci/types"
)
// Empty query matches any set of events.
type Empty struct {
}
// Matches always returns true.
func (Empty) Matches(events []types.Event) (bool, error) {
return true, nil
}
func (Empty) String() string {
return "empty"
}

+ 0
- 55
libs/pubsub/query/empty_test.go View File

@ -1,55 +0,0 @@
package query_test
import (
"testing"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
func TestEmptyQueryMatchesAnything(t *testing.T) {
q := query.Empty{}
testCases := []struct {
events []abci.Event
}{
{
[]abci.Event{},
},
{
[]abci.Event{
{
Type: "Asher",
Attributes: []abci.EventAttribute{{Key: "Roth"}},
},
},
},
{
[]abci.Event{
{
Type: "Route",
Attributes: []abci.EventAttribute{{Key: "66"}},
},
},
},
{
[]abci.Event{
{
Type: "Route",
Attributes: []abci.EventAttribute{{Key: "66"}},
},
{
Type: "Billy",
Attributes: []abci.EventAttribute{{Key: "Blue"}},
},
},
},
}
for _, tc := range testCases {
match, err := q.Matches(tc.events)
require.Nil(t, err)
require.True(t, match)
}
}

+ 0
- 30
libs/pubsub/query/fuzz_test/main.go View File

@ -1,30 +0,0 @@
package fuzz_test
import (
"fmt"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
func Fuzz(data []byte) int {
sdata := string(data)
q0, err := query.New(sdata)
if err != nil {
return 0
}
sdata1 := q0.String()
q1, err := query.New(sdata1)
if err != nil {
panic(err)
}
sdata2 := q1.String()
if sdata1 != sdata2 {
fmt.Printf("q0: %q\n", sdata1)
fmt.Printf("q1: %q\n", sdata2)
panic("query changed")
}
return 1
}

+ 0
- 97
libs/pubsub/query/parser_test.go View File

@ -1,97 +0,0 @@
package query_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
// TODO: fuzzy testing?
func TestParser(t *testing.T) {
cases := []struct {
query string
valid bool
}{
{"tm.events.type='NewBlock'", true},
{"tm.events.type = 'NewBlock'", true},
{"tm.events.name = ''", true},
{"tm.events.type='TIME'", true},
{"tm.events.type='DATE'", true},
{"tm.events.type='='", true},
{"tm.events.type='TIME", false},
{"tm.events.type=TIME'", false},
{"tm.events.type==", false},
{"tm.events.type=NewBlock", false},
{">==", false},
{"tm.events.type 'NewBlock' =", false},
{"tm.events.type>'NewBlock'", false},
{"", false},
{"=", false},
{"='NewBlock'", false},
{"tm.events.type=", false},
{"tm.events.typeNewBlock", false},
{"tm.events.type'NewBlock'", false},
{"'NewBlock'", false},
{"NewBlock", false},
{"", false},
{"tm.events.type='NewBlock' AND abci.account.name='Igor'", true},
{"tm.events.type='NewBlock' AND", false},
{"tm.events.type='NewBlock' AN", false},
{"tm.events.type='NewBlock' AN tm.events.type='NewBlockHeader'", false},
{"AND tm.events.type='NewBlock' ", false},
{"abci.account.name CONTAINS 'Igor'", true},
{"tx.date > DATE 2013-05-03", true},
{"tx.date < DATE 2013-05-03", true},
{"tx.date <= DATE 2013-05-03", true},
{"tx.date >= DATE 2013-05-03", true},
{"tx.date >= DAT 2013-05-03", false},
{"tx.date <= DATE2013-05-03", false},
{"tx.date <= DATE -05-03", false},
{"tx.date >= DATE 20130503", false},
{"tx.date >= DATE 2013+01-03", false},
// incorrect year, month, day
{"tx.date >= DATE 0013-01-03", false},
{"tx.date >= DATE 2013-31-03", false},
{"tx.date >= DATE 2013-01-83", false},
{"tx.date > TIME 2013-05-03T14:45:00+07:00", true},
{"tx.date < TIME 2013-05-03T14:45:00-02:00", true},
{"tx.date <= TIME 2013-05-03T14:45:00Z", true},
{"tx.date >= TIME 2013-05-03T14:45:00Z", true},
{"tx.date >= TIME2013-05-03T14:45:00Z", false},
{"tx.date = IME 2013-05-03T14:45:00Z", false},
{"tx.date = TIME 2013-05-:45:00Z", false},
{"tx.date >= TIME 2013-05-03T14:45:00", false},
{"tx.date >= TIME 0013-00-00T14:45:00Z", false},
{"tx.date >= TIME 2013+05=03T14:45:00Z", false},
{"account.balance=100", true},
{"account.balance >= 200", true},
{"account.balance >= -300", false},
{"account.balance >>= 400", false},
{"account.balance=33.22.1", false},
{"slashing.amount EXISTS", true},
{"slashing.amount EXISTS AND account.balance=100", true},
{"account.balance=100 AND slashing.amount EXISTS", true},
{"slashing EXISTS", true},
{"hash='136E18F7E4C348B780CF873A0BF43922E5BAFA63'", true},
{"hash=136E18F7E4C348B780CF873A0BF43922E5BAFA63", false},
}
for _, c := range cases {
_, err := query.New(c.query)
if c.valid {
assert.NoErrorf(t, err, "Query was '%s'", c.query)
} else {
assert.Errorf(t, err, "Query was '%s'", c.query)
}
}
}

+ 0
- 3
libs/pubsub/query/peg.go View File

@ -1,3 +0,0 @@
package query
//go:generate peg -inline -switch query.peg

+ 263
- 463
libs/pubsub/query/query.go View File

@ -1,527 +1,327 @@
// Package query provides a parser for a custom query format:
// Package query implements the custom query format used to filter event
// subscriptions in Tendermint.
//
// abci.invoice.number=22 AND abci.invoice.owner=Ivan
// Query expressions describe properties of events and their attributes, using
// strings like:
//
// See query.peg for the grammar, which is a https://en.wikipedia.org/wiki/Parsing_expression_grammar.
// More: https://github.com/PhilippeSigaud/Pegged/wiki/PEG-Basics
// abci.invoice.number = 22 AND abci.invoice.owner = 'Ivan'
//
// Query expressions can handle attribute values encoding numbers, strings,
// dates, and timestamps. The complete query grammar is described in the
// query/syntax package.
//
// It has a support for numbers (integer and floating point), dates and times.
package query
import (
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
"time"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
)
var (
numRegex = regexp.MustCompile(`([0-9\.]+)`)
)
// All is a query that matches all events.
var All *Query
// Query holds the query string and the query parser.
// A Query is the compiled form of a query.
type Query struct {
str string
parser *QueryParser
ast syntax.Query
conds []condition
}
// Condition represents a single condition within a query and consists of composite key
// (e.g. "tx.gas"), operator (e.g. "=") and operand (e.g. "7").
type Condition struct {
CompositeKey string
Op Operator
Operand interface{}
}
// New parses the given string and returns a query or error if the string is
// invalid.
func New(s string) (*Query, error) {
p := &QueryParser{Buffer: fmt.Sprintf(`"%s"`, s)}
p.Init()
if err := p.Parse(); err != nil {
// New parses and compiles the query expression into an executable query.
func New(query string) (*Query, error) {
ast, err := syntax.Parse(query)
if err != nil {
return nil, err
}
return &Query{str: s, parser: p}, nil
return Compile(ast)
}
// MustParse turns the given string into a query or panics; for tests or others
// cases where you know the string is valid.
func MustParse(s string) *Query {
q, err := New(s)
// MustCompile compiles the query expression into an executable query.
// In case of error, MustCompile will panic.
//
// This is intended for use in program initialization; use query.New if you
// need to check errors.
func MustCompile(query string) *Query {
q, err := New(query)
if err != nil {
panic(fmt.Sprintf("failed to parse %s: %v", s, err))
panic(err)
}
return q
}
// String returns the original string.
func (q *Query) String() string {
return q.str
}
// Operator is an operator that defines some kind of relation between composite key and
// operand (equality, etc.).
type Operator uint8
const (
// "<="
OpLessEqual Operator = iota
// ">="
OpGreaterEqual
// "<"
OpLess
// ">"
OpGreater
// "="
OpEqual
// "CONTAINS"; used to check if a string contains a certain sub string.
OpContains
// "EXISTS"; used to check if a certain event attribute is present.
OpExists
)
const (
// DateLayout defines a layout for all dates (`DATE date`)
DateLayout = "2006-01-02"
// TimeLayout defines a layout for all times (`TIME time`)
TimeLayout = time.RFC3339
)
// Conditions returns a list of conditions. It returns an error if there is any
// error with the provided grammar in the Query.
func (q *Query) Conditions() ([]Condition, error) {
var (
eventAttr string
op Operator
)
conditions := make([]Condition, 0)
buffer, begin, end := q.parser.Buffer, 0, 0
// tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7")
for token := range q.parser.Tokens() {
switch token.pegRule {
case rulePegText:
begin, end = int(token.begin), int(token.end)
case ruletag:
eventAttr = buffer[begin:end]
case rulele:
op = OpLessEqual
case rulege:
op = OpGreaterEqual
case rulel:
op = OpLess
case ruleg:
op = OpGreater
case ruleequal:
op = OpEqual
case rulecontains:
op = OpContains
case ruleexists:
op = OpExists
conditions = append(conditions, Condition{eventAttr, op, nil})
case rulevalue:
// strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock")
valueWithoutSingleQuotes := buffer[begin+1 : end-1]
conditions = append(conditions, Condition{eventAttr, op, valueWithoutSingleQuotes})
case rulenumber:
number := buffer[begin:end]
if strings.ContainsAny(number, ".") { // if it looks like a floating-point number
value, err := strconv.ParseFloat(number, 64)
if err != nil {
err = fmt.Errorf(
"got %v while trying to parse %s as float64 (should never happen if the grammar is correct)",
err, number,
)
return nil, err
}
conditions = append(conditions, Condition{eventAttr, op, value})
} else {
value, err := strconv.ParseInt(number, 10, 64)
if err != nil {
err = fmt.Errorf(
"got %v while trying to parse %s as int64 (should never happen if the grammar is correct)",
err, number,
)
return nil, err
}
conditions = append(conditions, Condition{eventAttr, op, value})
}
case ruletime:
value, err := time.Parse(TimeLayout, buffer[begin:end])
if err != nil {
err = fmt.Errorf(
"got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)",
err, buffer[begin:end],
)
return nil, err
}
conditions = append(conditions, Condition{eventAttr, op, value})
case ruledate:
value, err := time.Parse("2006-01-02", buffer[begin:end])
if err != nil {
err = fmt.Errorf(
"got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)",
err, buffer[begin:end],
)
return nil, err
}
conditions = append(conditions, Condition{eventAttr, op, value})
// Compile compiles the given query AST so it can be used to match events.
func Compile(ast syntax.Query) (*Query, error) {
conds := make([]condition, len(ast))
for i, q := range ast {
cond, err := compileCondition(q)
if err != nil {
return nil, fmt.Errorf("compile %s: %w", q, err)
}
conds[i] = cond
}
return conditions, nil
return &Query{ast: ast, conds: conds}, nil
}
// Matches returns true if the query matches against any event in the given set
// of events, false otherwise. For each event, a match exists if the query is
// matched against *any* value in a slice of values. An error is returned if
// any attempted event match returns an error.
//
// For example, query "name=John" matches events = {"name": ["John", "Eric"]}.
// More examples could be found in parser_test.go and query_test.go.
func (q *Query) Matches(rawEvents []types.Event) (bool, error) {
if len(rawEvents) == 0 {
return false, nil
// Matches satisfies part of the pubsub.Query interface. This implementation
// never reports an error. A nil *Query matches all events.
func (q *Query) Matches(events []types.Event) (bool, error) {
if q == nil {
return true, nil
}
return q.matchesEvents(events), nil
}
events := flattenEvents(rawEvents)
var (
eventAttr string
op Operator
)
buffer, begin, end := q.parser.Buffer, 0, 0
// tokens must be in the following order:
// tag ("tx.gas") -> operator ("=") -> operand ("7")
for token := range q.parser.Tokens() {
switch token.pegRule {
case rulePegText:
begin, end = int(token.begin), int(token.end)
case ruletag:
eventAttr = buffer[begin:end]
case rulele:
op = OpLessEqual
case rulege:
op = OpGreaterEqual
case rulel:
op = OpLess
case ruleg:
op = OpGreater
case ruleequal:
op = OpEqual
case rulecontains:
op = OpContains
case ruleexists:
op = OpExists
if strings.Contains(eventAttr, ".") {
// Searching for a full "type.attribute" event.
_, ok := events[eventAttr]
if !ok {
return false, nil
}
} else {
foundEvent := false
loop:
for compositeKey := range events {
if strings.Index(compositeKey, eventAttr) == 0 {
foundEvent = true
break loop
}
}
if !foundEvent {
return false, nil
}
}
case rulevalue:
// strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock")
valueWithoutSingleQuotes := buffer[begin+1 : end-1]
// see if the triplet (event attribute, operator, operand) matches any event
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
match, err := match(eventAttr, op, reflect.ValueOf(valueWithoutSingleQuotes), events)
if err != nil {
return false, err
}
if !match {
return false, nil
}
case rulenumber:
number := buffer[begin:end]
if strings.ContainsAny(number, ".") { // if it looks like a floating-point number
value, err := strconv.ParseFloat(number, 64)
if err != nil {
err = fmt.Errorf(
"got %v while trying to parse %s as float64 (should never happen if the grammar is correct)",
err, number,
)
return false, err
}
match, err := match(eventAttr, op, reflect.ValueOf(value), events)
if err != nil {
return false, err
}
if !match {
return false, nil
}
} else {
value, err := strconv.ParseInt(number, 10, 64)
if err != nil {
err = fmt.Errorf(
"got %v while trying to parse %s as int64 (should never happen if the grammar is correct)",
err, number,
)
return false, err
}
match, err := match(eventAttr, op, reflect.ValueOf(value), events)
if err != nil {
return false, err
}
if !match {
return false, nil
}
}
case ruletime:
value, err := time.Parse(TimeLayout, buffer[begin:end])
if err != nil {
err = fmt.Errorf(
"got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)",
err, buffer[begin:end],
)
return false, err
}
match, err := match(eventAttr, op, reflect.ValueOf(value), events)
if err != nil {
return false, err
}
if !match {
return false, nil
}
case ruledate:
value, err := time.Parse("2006-01-02", buffer[begin:end])
if err != nil {
err = fmt.Errorf(
"got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)",
err, buffer[begin:end],
)
return false, err
}
match, err := match(eventAttr, op, reflect.ValueOf(value), events)
if err != nil {
return false, err
}
if !match {
return false, nil
}
}
// String matches part of the pubsub.Query interface.
func (q *Query) String() string {
if q == nil {
return "<empty>"
}
return true, nil
return q.ast.String()
}
// match returns true if the given triplet (attribute, operator, operand) matches
// any value in an event for that attribute. If any match fails with an error,
// that error is returned.
//
// First, it looks up the key in the events and if it finds one, tries to compare
// all the values from it to the operand using the operator.
//
// "tx.gas", "=", "7", {"tx": [{"gas": 7, "ID": "4AE393495334"}]}
func match(attr string, op Operator, operand reflect.Value, events map[string][]string) (bool, error) {
// look up the tag from the query in tags
values, ok := events[attr]
if !ok {
return false, nil
// Syntax returns the syntax tree representation of q.
func (q *Query) Syntax() syntax.Query {
if q == nil {
return nil
}
return q.ast
}
for _, value := range values {
// return true if any value in the set of the event's values matches
match, err := matchValue(value, op, operand)
if err != nil {
return false, err
}
if match {
return true, nil
// matchesEvents reports whether all the conditions match the given events.
func (q *Query) matchesEvents(events []types.Event) bool {
for _, cond := range q.conds {
if !cond.matchesAny(events) {
return false
}
}
return false, nil
return len(events) != 0
}
// matchValue will attempt to match a string value against an operator an
// operand. A boolean is returned representing the match result. It will return
// an error if the value cannot be parsed and matched against the operand type.
func matchValue(value string, op Operator, operand reflect.Value) (bool, error) {
switch operand.Kind() {
case reflect.Struct: // time
operandAsTime := operand.Interface().(time.Time)
// try our best to convert value from events to time.Time
var (
v time.Time
err error
)
// A condition is a compiled match condition. A condition matches an event if
// the event has the designated type, contains an attribute with the given
// name, and the match function returns true for the attribute value.
type condition struct {
tag string // e.g., "tx.hash"
match func(s string) bool
}
if strings.ContainsAny(value, "T") {
v, err = time.Parse(TimeLayout, value)
} else {
v, err = time.Parse(DateLayout, value)
}
if err != nil {
return false, fmt.Errorf("failed to convert value %v from event attribute to time.Time: %w", value, err)
// findAttr returns a slice of attribute values from event matching the
// condition tag, and reports whether the event type strictly equals the
// condition tag.
func (c condition) findAttr(event types.Event) ([]string, bool) {
if !strings.HasPrefix(c.tag, event.Type) {
return nil, false // type does not match tag
} else if len(c.tag) == len(event.Type) {
return nil, true // type == tag
}
var vals []string
for _, attr := range event.Attributes {
fullName := event.Type + "." + attr.Key
if fullName == c.tag {
vals = append(vals, attr.Value)
}
}
return vals, false
}
switch op {
case OpLessEqual:
return (v.Before(operandAsTime) || v.Equal(operandAsTime)), nil
case OpGreaterEqual:
return (v.Equal(operandAsTime) || v.After(operandAsTime)), nil
case OpLess:
return v.Before(operandAsTime), nil
case OpGreater:
return v.After(operandAsTime), nil
case OpEqual:
return v.Equal(operandAsTime), nil
// matchesAny reports whether c matches at least one of the given events.
func (c condition) matchesAny(events []types.Event) bool {
for _, event := range events {
if c.matchesEvent(event) {
return true
}
}
return false
}
case reflect.Float64:
var v float64
operandFloat64 := operand.Interface().(float64)
filteredValue := numRegex.FindString(value)
// try our best to convert value from tags to float64
v, err := strconv.ParseFloat(filteredValue, 64)
if err != nil {
return false, fmt.Errorf("failed to convert value %v from event attribute to float64: %w", filteredValue, err)
// matchesEvent reports whether c matches the given event.
func (c condition) matchesEvent(event types.Event) bool {
vs, tagEqualsType := c.findAttr(event)
if len(vs) == 0 {
// As a special case, a condition tag that exactly matches the event type
// is matched against an empty string. This allows existence checks to
// work for type-only queries.
if tagEqualsType {
return c.match("")
}
return false
}
switch op {
case OpLessEqual:
return v <= operandFloat64, nil
case OpGreaterEqual:
return v >= operandFloat64, nil
case OpLess:
return v < operandFloat64, nil
case OpGreater:
return v > operandFloat64, nil
case OpEqual:
return v == operandFloat64, nil
// At this point, we have candidate values.
for _, v := range vs {
if c.match(v) {
return true
}
}
return false
}
case reflect.Int64:
var v int64
operandInt := operand.Interface().(int64)
filteredValue := numRegex.FindString(value)
// if value looks like float, we try to parse it as float
if strings.ContainsAny(filteredValue, ".") {
v1, err := strconv.ParseFloat(filteredValue, 64)
if err != nil {
return false, fmt.Errorf("failed to convert value %v from event attribute to float64: %w", filteredValue, err)
}
v = int64(v1)
} else {
var err error
// try our best to convert value from tags to int64
v, err = strconv.ParseInt(filteredValue, 10, 64)
if err != nil {
return false, fmt.Errorf("failed to convert value %v from event attribute to int64: %w", filteredValue, err)
}
}
func compileCondition(cond syntax.Condition) (condition, error) {
out := condition{tag: cond.Tag}
switch op {
case OpLessEqual:
return v <= operandInt, nil
case OpGreaterEqual:
return v >= operandInt, nil
case OpLess:
return v < operandInt, nil
case OpGreater:
return v > operandInt, nil
case OpEqual:
return v == operandInt, nil
}
// Handle existence checks separately to simplify the logic below for
// comparisons that take arguments.
if cond.Op == syntax.TExists {
out.match = func(string) bool { return true }
return out, nil
}
case reflect.String:
switch op {
case OpEqual:
return value == operand.String(), nil
case OpContains:
return strings.Contains(value, operand.String()), nil
}
// All the other operators require an argument.
if cond.Arg == nil {
return condition{}, fmt.Errorf("missing argument for %v", cond.Op)
}
// Precompile the argument value matcher.
argType := cond.Arg.Type
var argValue interface{}
switch argType {
case syntax.TString:
argValue = cond.Arg.Value()
case syntax.TNumber:
argValue = cond.Arg.Number()
case syntax.TTime, syntax.TDate:
argValue = cond.Arg.Time()
default:
return false, fmt.Errorf("unknown kind of operand %v", operand.Kind())
return condition{}, fmt.Errorf("unknown argument type %v", argType)
}
return false, nil
mcons := opTypeMap[cond.Op][argType]
if mcons == nil {
return condition{}, fmt.Errorf("invalid op/arg combination (%v, %v)", cond.Op, argType)
}
out.match = mcons(argValue)
return out, nil
}
func flattenEvents(events []types.Event) map[string][]string {
flattened := make(map[string][]string)
// TODO(creachadair): The existing implementation allows anything number shaped
// to be treated as a number. This preserves the parts of that behavior we had
// tests for, but we should probably get rid of that.
var extractNum = regexp.MustCompile(`^\d+(\.\d+)?`)
for _, event := range events {
if len(event.Type) == 0 {
continue
}
func parseNumber(s string) (float64, error) {
return strconv.ParseFloat(extractNum.FindString(s), 64)
}
for _, attr := range event.Attributes {
if len(attr.Key) == 0 {
continue
// A map of operator ⇒ argtype ⇒ match-constructor.
// An entry does not exist if the combination is not valid.
//
// Disable the dupl lint for this map. The result isn't even correct.
//nolint:dupl
var opTypeMap = map[syntax.Token]map[syntax.Token]func(interface{}) func(string) bool{
syntax.TContains: {
syntax.TString: func(v interface{}) func(string) bool {
return func(s string) bool {
return strings.Contains(s, v.(string))
}
compositeEvent := fmt.Sprintf("%s.%s", event.Type, attr.Key)
flattened[compositeEvent] = append(flattened[compositeEvent], attr.Value)
}
}
return flattened
},
},
syntax.TEq: {
syntax.TString: func(v interface{}) func(string) bool {
return func(s string) bool { return s == v.(string) }
},
syntax.TNumber: func(v interface{}) func(string) bool {
return func(s string) bool {
w, err := parseNumber(s)
return err == nil && w == v.(float64)
}
},
syntax.TDate: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseDate(s)
return err == nil && ts.Equal(v.(time.Time))
}
},
syntax.TTime: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseTime(s)
return err == nil && ts.Equal(v.(time.Time))
}
},
},
syntax.TLt: {
syntax.TNumber: func(v interface{}) func(string) bool {
return func(s string) bool {
w, err := parseNumber(s)
return err == nil && w < v.(float64)
}
},
syntax.TDate: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseDate(s)
return err == nil && ts.Before(v.(time.Time))
}
},
syntax.TTime: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseTime(s)
return err == nil && ts.Before(v.(time.Time))
}
},
},
syntax.TLeq: {
syntax.TNumber: func(v interface{}) func(string) bool {
return func(s string) bool {
w, err := parseNumber(s)
return err == nil && w <= v.(float64)
}
},
syntax.TDate: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseDate(s)
return err == nil && !ts.After(v.(time.Time))
}
},
syntax.TTime: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseTime(s)
return err == nil && !ts.After(v.(time.Time))
}
},
},
syntax.TGt: {
syntax.TNumber: func(v interface{}) func(string) bool {
return func(s string) bool {
w, err := parseNumber(s)
return err == nil && w > v.(float64)
}
},
syntax.TDate: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseDate(s)
return err == nil && ts.After(v.(time.Time))
}
},
syntax.TTime: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseTime(s)
return err == nil && ts.After(v.(time.Time))
}
},
},
syntax.TGeq: {
syntax.TNumber: func(v interface{}) func(string) bool {
return func(s string) bool {
w, err := parseNumber(s)
return err == nil && w >= v.(float64)
}
},
syntax.TDate: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseDate(s)
return err == nil && !ts.Before(v.(time.Time))
}
},
syntax.TTime: func(v interface{}) func(string) bool {
return func(s string) bool {
ts, err := syntax.ParseTime(s)
return err == nil && !ts.Before(v.(time.Time))
}
},
},
}

+ 0
- 35
libs/pubsub/query/query.peg View File

@ -1,35 +0,0 @@
package query
type QueryParser Peg {
}
e <- '\"' condition ( ' '+ and ' '+ condition )* '\"' !.
condition <- tag ' '* (le ' '* (number / time / date)
/ ge ' '* (number / time / date)
/ l ' '* (number / time / date)
/ g ' '* (number / time / date)
/ equal ' '* (number / time / date / value)
/ contains ' '* value
/ exists
)
tag <- < (![ \t\n\r\\()"'=><] .)+ >
value <- < '\'' (!["'] .)* '\''>
number <- < ('0'
/ [1-9] digit* ('.' digit*)?) >
digit <- [0-9]
time <- "TIME " < year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit (('-' / '+') digit digit ':' digit digit / 'Z') >
date <- "DATE " < year '-' month '-' day >
year <- ('1' / '2') digit digit digit
month <- ('0' / '1') digit
day <- ('0' / '1' / '2' / '3') digit
and <- "AND"
equal <- "="
contains <- "CONTAINS"
exists <- "EXISTS"
le <- "<="
ge <- ">="
l <- "<"
g <- ">"

+ 0
- 1871
libs/pubsub/query/query.peg.go
File diff suppressed because it is too large
View File


+ 242
- 213
libs/pubsub/query/query_test.go View File

@ -6,242 +6,271 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
)
func expandEvents(flattenedEvents map[string][]string) []abci.Event {
events := make([]abci.Event, len(flattenedEvents))
var _ pubsub.Query = (*query.Query)(nil)
for composite, values := range flattenedEvents {
tokens := strings.Split(composite, ".")
attrs := make([]abci.EventAttribute, len(values))
for i, v := range values {
attrs[i] = abci.EventAttribute{
Key: tokens[len(tokens)-1],
Value: v,
}
}
events = append(events, abci.Event{
Type: strings.Join(tokens[:len(tokens)-1], "."),
Attributes: attrs,
})
}
return events
// Example events from the OpenAPI documentation:
// https://github.com/tendermint/tendermint/blob/master/rpc/openapi/openapi.yaml
//
// Redactions:
//
// - Add an explicit "tm" event for the built-in attributes.
// - Remove Index fields (not relevant to tests).
// - Add explicit balance values (to use in tests).
//
var apiEvents = []types.Event{
{
Type: "tm",
Attributes: []types.EventAttribute{
{Key: "event", Value: "Tx"},
{Key: "hash", Value: "XYZ"},
{Key: "height", Value: "5"},
},
},
{
Type: "rewards.withdraw",
Attributes: []types.EventAttribute{
{Key: "address", Value: "AddrA"},
{Key: "source", Value: "SrcX"},
{Key: "amount", Value: "100"},
{Key: "balance", Value: "1500"},
},
},
{
Type: "rewards.withdraw",
Attributes: []types.EventAttribute{
{Key: "address", Value: "AddrB"},
{Key: "source", Value: "SrcY"},
{Key: "amount", Value: "45"},
{Key: "balance", Value: "999"},
},
},
{
Type: "transfer",
Attributes: []types.EventAttribute{
{Key: "sender", Value: "AddrC"},
{Key: "recipient", Value: "AddrD"},
{Key: "amount", Value: "160"},
},
},
}
func TestMatches(t *testing.T) {
func TestCompiledMatches(t *testing.T) {
var (
txDate = "2017-01-01"
txTime = "2018-05-03T14:45:00Z"
)
//nolint:lll
testCases := []struct {
s string
events map[string][]string
err bool
matches bool
matchErr bool
s string
events []types.Event
matches bool
}{
{"tm.events.type='NewBlock'", map[string][]string{"tm.events.type": {"NewBlock"}}, false, true, false},
{"tx.gas > 7", map[string][]string{"tx.gas": {"8"}}, false, true, false},
{"transfer.amount > 7", map[string][]string{"transfer.amount": {"8stake"}}, false, true, false},
{"transfer.amount > 7", map[string][]string{"transfer.amount": {"8.045stake"}}, false, true, false},
{"transfer.amount > 7.043", map[string][]string{"transfer.amount": {"8.045stake"}}, false, true, false},
{"transfer.amount > 8.045", map[string][]string{"transfer.amount": {"8.045stake"}}, false, false, false},
{"tx.gas > 7 AND tx.gas < 9", map[string][]string{"tx.gas": {"8"}}, false, true, false},
{"body.weight >= 3.5", map[string][]string{"body.weight": {"3.5"}}, false, true, false},
{"account.balance < 1000.0", map[string][]string{"account.balance": {"900"}}, false, true, false},
{"apples.kg <= 4", map[string][]string{"apples.kg": {"4.0"}}, false, true, false},
{"body.weight >= 4.5", map[string][]string{"body.weight": {fmt.Sprintf("%v", float32(4.5))}}, false, true, false},
{
"oranges.kg < 4 AND watermellons.kg > 10",
map[string][]string{"oranges.kg": {"3"}, "watermellons.kg": {"12"}},
false,
true,
false,
},
{"peaches.kg < 4", map[string][]string{"peaches.kg": {"5"}}, false, false, false},
{
"tx.date > DATE 2017-01-01",
map[string][]string{"tx.date": {time.Now().Format(query.DateLayout)}},
false,
true,
false,
},
{"tx.date = DATE 2017-01-01", map[string][]string{"tx.date": {txDate}}, false, true, false},
{"tx.date = DATE 2018-01-01", map[string][]string{"tx.date": {txDate}}, false, false, false},
{
"tx.time >= TIME 2013-05-03T14:45:00Z",
map[string][]string{"tx.time": {time.Now().Format(query.TimeLayout)}},
false,
true,
false,
},
{"tx.time = TIME 2013-05-03T14:45:00Z", map[string][]string{"tx.time": {txTime}}, false, false, false},
{"abci.owner.name CONTAINS 'Igor'", map[string][]string{"abci.owner.name": {"Igor,Ivan"}}, false, true, false},
{"abci.owner.name CONTAINS 'Igor'", map[string][]string{"abci.owner.name": {"Pavel,Ivan"}}, false, false, false},
{"abci.owner.name = 'Igor'", map[string][]string{"abci.owner.name": {"Igor", "Ivan"}}, false, true, false},
{
"abci.owner.name = 'Ivan'",
map[string][]string{"abci.owner.name": {"Igor", "Ivan"}},
false,
true,
false,
},
{
"abci.owner.name = 'Ivan' AND abci.owner.name = 'Igor'",
map[string][]string{"abci.owner.name": {"Igor", "Ivan"}},
false,
true,
false,
},
{
"abci.owner.name = 'Ivan' AND abci.owner.name = 'John'",
map[string][]string{"abci.owner.name": {"Igor", "Ivan"}},
false,
false,
false,
},
{
"tm.events.type='NewBlock'",
map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}},
false,
true,
false,
},
{
"app.name = 'fuzzed'",
map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}},
false,
true,
false,
},
{
"tm.events.type='NewBlock' AND app.name = 'fuzzed'",
map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}},
false,
true,
false,
},
{
"tm.events.type='NewHeader' AND app.name = 'fuzzed'",
map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}},
false,
false,
false,
},
{"slash EXISTS",
map[string][]string{"slash.reason": {"missing_signature"}, "slash.power": {"6000"}},
false,
true,
false,
},
{"sl EXISTS",
map[string][]string{"slash.reason": {"missing_signature"}, "slash.power": {"6000"}},
false,
true,
false,
},
{"slash EXISTS",
map[string][]string{"transfer.recipient": {"cosmos1gu6y2a0ffteesyeyeesk23082c6998xyzmt9mz"},
"transfer.sender": {"cosmos1crje20aj4gxdtyct7z3knxqry2jqt2fuaey6u5"}},
false,
false,
false,
},
{"slash.reason EXISTS AND slash.power > 1000",
map[string][]string{"slash.reason": {"missing_signature"}, "slash.power": {"6000"}},
false,
true,
false,
},
{"slash.reason EXISTS AND slash.power > 1000",
map[string][]string{"slash.reason": {"missing_signature"}, "slash.power": {"500"}},
false,
false,
false,
},
{"slash.reason EXISTS",
map[string][]string{"transfer.recipient": {"cosmos1gu6y2a0ffteesyeyeesk23082c6998xyzmt9mz"},
"transfer.sender": {"cosmos1crje20aj4gxdtyct7z3knxqry2jqt2fuaey6u5"}},
false,
false,
false,
},
{`tm.events.type='NewBlock'`,
newTestEvents(`tm|events.type=NewBlock`),
true},
{`tx.gas > 7`,
newTestEvents(`tx|gas=8`),
true},
{`transfer.amount > 7`,
newTestEvents(`transfer|amount=8stake`),
true},
{`transfer.amount > 7`,
newTestEvents(`transfer|amount=8.045`),
true},
{`transfer.amount > 7.043`,
newTestEvents(`transfer|amount=8.045stake`),
true},
{`transfer.amount > 8.045`,
newTestEvents(`transfer|amount=8.045stake`),
false},
{`tx.gas > 7 AND tx.gas < 9`,
newTestEvents(`tx|gas=8`),
true},
{`body.weight >= 3.5`,
newTestEvents(`body|weight=3.5`),
true},
{`account.balance < 1000.0`,
newTestEvents(`account|balance=900`),
true},
{`apples.kg <= 4`,
newTestEvents(`apples|kg=4.0`),
true},
{`body.weight >= 4.5`,
newTestEvents(`body|weight=4.5`),
true},
{`oranges.kg < 4 AND watermellons.kg > 10`,
newTestEvents(`oranges|kg=3`, `watermellons|kg=12`),
true},
{`peaches.kg < 4`,
newTestEvents(`peaches|kg=5`),
false},
{`tx.date > DATE 2017-01-01`,
newTestEvents(`tx|date=` + time.Now().Format(syntax.DateFormat)),
true},
{`tx.date = DATE 2017-01-01`,
newTestEvents(`tx|date=` + txDate),
true},
{`tx.date = DATE 2018-01-01`,
newTestEvents(`tx|date=` + txDate),
false},
{`tx.time >= TIME 2013-05-03T14:45:00Z`,
newTestEvents(`tx|time=` + time.Now().Format(syntax.TimeFormat)),
true},
{`tx.time = TIME 2013-05-03T14:45:00Z`,
newTestEvents(`tx|time=` + txTime),
false},
{`abci.owner.name CONTAINS 'Igor'`,
newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`),
true},
{`abci.owner.name CONTAINS 'Igor'`,
newTestEvents(`abci|owner.name=Pavel|owner.name=Ivan`),
false},
{`abci.owner.name = 'Igor'`,
newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`),
true},
{`abci.owner.name = 'Ivan'`,
newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`),
true},
{`abci.owner.name = 'Ivan' AND abci.owner.name = 'Igor'`,
newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`),
true},
{`abci.owner.name = 'Ivan' AND abci.owner.name = 'John'`,
newTestEvents(`abci|owner.name=Igor|owner.name=Ivan`),
false},
{`tm.events.type='NewBlock'`,
newTestEvents(`tm|events.type=NewBlock`, `app|name=fuzzed`),
true},
{`app.name = 'fuzzed'`,
newTestEvents(`tm|events.type=NewBlock`, `app|name=fuzzed`),
true},
{`tm.events.type='NewBlock' AND app.name = 'fuzzed'`,
newTestEvents(`tm|events.type=NewBlock`, `app|name=fuzzed`),
true},
{`tm.events.type='NewHeader' AND app.name = 'fuzzed'`,
newTestEvents(`tm|events.type=NewBlock`, `app|name=fuzzed`),
false},
{`slash EXISTS`,
newTestEvents(`slash|reason=missing_signature|power=6000`),
true},
{`slash EXISTS`,
newTestEvents(`transfer|recipient=cosmos1gu6y2a0ffteesyeyeesk23082c6998xyzmt9mz|sender=cosmos1crje20aj4gxdtyct7z3knxqry2jqt2fuaey6u5`),
false},
{`slash.reason EXISTS AND slash.power > 1000`,
newTestEvents(`slash|reason=missing_signature|power=6000`),
true},
{`slash.reason EXISTS AND slash.power > 1000`,
newTestEvents(`slash|reason=missing_signature|power=500`),
false},
{`slash.reason EXISTS`,
newTestEvents(`transfer|recipient=cosmos1gu6y2a0ffteesyeyeesk23082c6998xyzmt9mz|sender=cosmos1crje20aj4gxdtyct7z3knxqry2jqt2fuaey6u5`),
false},
// Test cases based on the OpenAPI examples.
{`tm.event = 'Tx' AND rewards.withdraw.address = 'AddrA'`,
apiEvents, true},
{`tm.event = 'Tx' AND rewards.withdraw.address = 'AddrA' AND rewards.withdraw.source = 'SrcY'`,
apiEvents, true},
{`tm.event = 'Tx' AND transfer.sender = 'AddrA'`,
apiEvents, false},
{`tm.event = 'Tx' AND transfer.sender = 'AddrC'`,
apiEvents, true},
{`tm.event = 'Tx' AND transfer.sender = 'AddrZ'`,
apiEvents, false},
{`tm.event = 'Tx' AND rewards.withdraw.address = 'AddrZ'`,
apiEvents, false},
{`tm.event = 'Tx' AND rewards.withdraw.source = 'W'`,
apiEvents, false},
}
for _, tc := range testCases {
q, err := query.New(tc.s)
if !tc.err {
require.Nil(t, err)
}
require.NotNil(t, q, "Query '%s' should not be nil", tc.s)
// NOTE: The original implementation allowed arbitrary prefix matches on
// attribute tags, e.g., "sl" would match "slash".
//
// That is weird and probably wrong: "foo.ba" should not match "foo.bar",
// or there is no way to distinguish the case where there were two values
// for "foo.bar" or one value each for "foo.ba" and "foo.bar".
//
// Apart from a single test case, I could not find any attested usage of
// this implementation detail. It isn't documented in the OpenAPI docs and
// is not shown in any of the example inputs.
//
// On that basis, I removed that test case. This implementation still does
// correctly handle variable type/attribute splits ("x", "y.z" / "x.y", "z")
// since that was required by the original "flattened" event representation.
rawEvents := expandEvents(tc.events)
for i, tc := range testCases {
t.Run(fmt.Sprintf("%02d", i+1), func(t *testing.T) {
c, err := query.New(tc.s)
if err != nil {
t.Fatalf("NewCompiled %#q: unexpected error: %v", tc.s, err)
}
if tc.matches {
match, err := q.Matches(rawEvents)
require.Nil(t, err, "Query '%s' should not error on match %v", tc.s, tc.events)
require.True(t, match, "Query '%s' should match %v", tc.s, tc.events)
} else {
match, err := q.Matches(rawEvents)
require.Equal(t, tc.matchErr, err != nil, "Unexpected error for query '%s' match %v", tc.s, tc.events)
require.False(t, match, "Query '%s' should not match %v", tc.s, tc.events)
}
got, err := c.Matches(tc.events)
if err != nil {
t.Errorf("Query: %#q\nInput: %+v\nMatches: got error %v",
tc.s, tc.events, err)
}
if got != tc.matches {
t.Errorf("Query: %#q\nInput: %+v\nMatches: got %v, want %v",
tc.s, tc.events, got, tc.matches)
}
})
}
}
func TestMustParse(t *testing.T) {
require.Panics(t, func() { query.MustParse("=") })
require.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") })
func TestAllMatchesAll(t *testing.T) {
events := newTestEvents(
``,
`Asher|Roth=`,
`Route|66=`,
`Rilly|Blue=`,
)
for i := 0; i < len(events); i++ {
match, err := query.All.Matches(events[:i])
if err != nil {
t.Errorf("Matches failed: %v", err)
} else if !match {
t.Errorf("Did not match on %+v ", events[:i])
}
}
}
func TestConditions(t *testing.T) {
txTime, err := time.Parse(time.RFC3339, "2013-05-03T14:45:00Z")
require.NoError(t, err)
testCases := []struct {
s string
conditions []query.Condition
}{
{
s: "tm.events.type='NewBlock'",
conditions: []query.Condition{
{CompositeKey: "tm.events.type", Op: query.OpEqual, Operand: "NewBlock"},
},
},
{
s: "tx.gas > 7 AND tx.gas < 9",
conditions: []query.Condition{
{CompositeKey: "tx.gas", Op: query.OpGreater, Operand: int64(7)},
{CompositeKey: "tx.gas", Op: query.OpLess, Operand: int64(9)},
},
},
{
s: "tx.time >= TIME 2013-05-03T14:45:00Z",
conditions: []query.Condition{
{CompositeKey: "tx.time", Op: query.OpGreaterEqual, Operand: txTime},
},
},
{
s: "slashing EXISTS",
conditions: []query.Condition{
{CompositeKey: "slashing", Op: query.OpExists},
},
},
// newTestEvent constructs an Event message from a template string.
// The format is "type|attr1=val1|attr2=val2|...".
func newTestEvent(s string) types.Event {
var event types.Event
parts := strings.Split(s, "|")
event.Type = parts[0]
if len(parts) == 1 {
return event // type only, no attributes
}
for _, kv := range parts[1:] {
key, val := splitKV(kv)
event.Attributes = append(event.Attributes, types.EventAttribute{
Key: key,
Value: val,
})
}
return event
}
for _, tc := range testCases {
q, err := query.New(tc.s)
require.Nil(t, err)
c, err := q.Conditions()
require.NoError(t, err)
require.Equal(t, tc.conditions, c)
// newTestEvents constructs a slice of Event messages by applying newTestEvent
// to each element of ss.
func newTestEvents(ss ...string) []types.Event {
events := make([]types.Event, len(ss))
for i, s := range ss {
events[i] = newTestEvent(s)
}
return events
}
func splitKV(s string) (key, value string) {
kv := strings.SplitN(s, "=", 2)
return kv[0], kv[1]
}

+ 34
- 0
libs/pubsub/query/syntax/doc.go View File

@ -0,0 +1,34 @@
// Package syntax defines a scanner and parser for the Tendermint event filter
// query language. A query selects events by their types and attribute values.
//
// Grammar
//
// The grammar of the query language is defined by the following EBNF:
//
// query = conditions EOF
// conditions = condition {"AND" condition}
// condition = tag comparison
// comparison = equal / order / contains / "EXISTS"
// equal = "=" (date / number / time / value)
// order = cmp (date / number / time)
// contains = "CONTAINS" value
// cmp = "<" / "<=" / ">" / ">="
//
// The lexical terms are defined here using RE2 regular expression notation:
//
// // The name of an event attribute (type.value)
// tag = #'\w+(\.\w+)*'
//
// // A datestamp (YYYY-MM-DD)
// date = #'DATE \d{4}-\d{2}-\d{2}'
//
// // A number with optional fractional parts (0, 10, 3.25)
// number = #'\d+(\.\d+)?'
//
// // An RFC3339 timestamp (2021-11-23T22:04:19-09:00)
// time = #'TIME \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}([-+]\d{2}:\d{2}|Z)'
//
// // A quoted literal string value ('a b c')
// value = #'\'[^\']*\''
//
package syntax

+ 213
- 0
libs/pubsub/query/syntax/parser.go View File

@ -0,0 +1,213 @@
package syntax
import (
"fmt"
"io"
"math"
"strconv"
"strings"
"time"
)
// Parse parses the specified query string. It is shorthand for constructing a
// parser for s and calling its Parse method.
func Parse(s string) (Query, error) {
return NewParser(strings.NewReader(s)).Parse()
}
// Query is the root of the parse tree for a query. A query is the conjunction
// of one or more conditions.
type Query []Condition
func (q Query) String() string {
ss := make([]string, len(q))
for i, cond := range q {
ss[i] = cond.String()
}
return strings.Join(ss, " AND ")
}
// A Condition is a single conditional expression, consisting of a tag, a
// comparison operator, and an optional argument. The type of the argument
// depends on the operator.
type Condition struct {
Tag string
Op Token
Arg *Arg
opText string
}
func (c Condition) String() string {
s := c.Tag + " " + c.opText
if c.Arg != nil {
return s + " " + c.Arg.String()
}
return s
}
// An Arg is the argument of a comparison operator.
type Arg struct {
Type Token
text string
}
func (a *Arg) String() string {
if a == nil {
return ""
}
switch a.Type {
case TString:
return "'" + a.text + "'"
case TTime:
return "TIME " + a.text
case TDate:
return "DATE " + a.text
default:
return a.text
}
}
// Number returns the value of the argument text as a number, or a NaN if the
// text does not encode a valid number value.
func (a *Arg) Number() float64 {
if a == nil {
return -1
}
v, err := strconv.ParseFloat(a.text, 64)
if err == nil && v >= 0 {
return v
}
return math.NaN()
}
// Time returns the value of the argument text as a time, or the zero value if
// the text does not encode a timestamp or datestamp.
func (a *Arg) Time() time.Time {
var ts time.Time
if a == nil {
return ts
}
var err error
switch a.Type {
case TDate:
ts, err = ParseDate(a.text)
case TTime:
ts, err = ParseTime(a.text)
}
if err == nil {
return ts
}
return time.Time{}
}
// Value returns the value of the argument text as a string, or "".
func (a *Arg) Value() string {
if a == nil {
return ""
}
return a.text
}
// Parser is a query expression parser. The grammar for query expressions is
// defined in the syntax package documentation.
type Parser struct {
scanner *Scanner
}
// NewParser constructs a new parser that reads the input from r.
func NewParser(r io.Reader) *Parser {
return &Parser{scanner: NewScanner(r)}
}
// Parse parses the complete input and returns the resulting query.
func (p *Parser) Parse() (Query, error) {
cond, err := p.parseCond()
if err != nil {
return nil, err
}
conds := []Condition{cond}
for p.scanner.Next() != io.EOF {
if tok := p.scanner.Token(); tok != TAnd {
return nil, fmt.Errorf("offset %d: got %v, want %v", p.scanner.Pos(), tok, TAnd)
}
cond, err := p.parseCond()
if err != nil {
return nil, err
}
conds = append(conds, cond)
}
return conds, nil
}
// parseCond parses a conditional expression: tag OP value.
func (p *Parser) parseCond() (Condition, error) {
var cond Condition
if err := p.require(TTag); err != nil {
return cond, err
}
cond.Tag = p.scanner.Text()
if err := p.require(TLeq, TGeq, TLt, TGt, TEq, TContains, TExists); err != nil {
return cond, err
}
cond.Op = p.scanner.Token()
cond.opText = p.scanner.Text()
var err error
switch cond.Op {
case TLeq, TGeq, TLt, TGt:
err = p.require(TNumber, TTime, TDate)
case TEq:
err = p.require(TNumber, TTime, TDate, TString)
case TContains:
err = p.require(TString)
case TExists:
// no argument
return cond, nil
default:
return cond, fmt.Errorf("offset %d: unexpected operator %v", p.scanner.Pos(), cond.Op)
}
if err != nil {
return cond, err
}
cond.Arg = &Arg{Type: p.scanner.Token(), text: p.scanner.Text()}
return cond, nil
}
// require advances the scanner and requires that the resulting token is one of
// the specified token types.
func (p *Parser) require(tokens ...Token) error {
if err := p.scanner.Next(); err != nil {
return fmt.Errorf("offset %d: %w", p.scanner.Pos(), err)
}
got := p.scanner.Token()
for _, tok := range tokens {
if tok == got {
return nil
}
}
return fmt.Errorf("offset %d: got %v, wanted %s", p.scanner.Pos(), got, tokLabel(tokens))
}
// tokLabel makes a human-readable summary string for the given token types.
func tokLabel(tokens []Token) string {
if len(tokens) == 1 {
return tokens[0].String()
}
last := len(tokens) - 1
ss := make([]string, len(tokens)-1)
for i, tok := range tokens[:last] {
ss[i] = tok.String()
}
return strings.Join(ss, ", ") + " or " + tokens[last].String()
}
// ParseDate parses s as a date string in the format used by DATE values.
func ParseDate(s string) (time.Time, error) {
return time.Parse("2006-01-02", s)
}
// ParseTime parses s as a timestamp in the format used by TIME values.
func ParseTime(s string) (time.Time, error) {
return time.Parse(time.RFC3339, s)
}

+ 312
- 0
libs/pubsub/query/syntax/scanner.go View File

@ -0,0 +1,312 @@
package syntax
import (
"bufio"
"bytes"
"fmt"
"io"
"strings"
"time"
"unicode"
)
// Token is the type of a lexical token in the query grammar.
type Token byte
const (
TInvalid = iota // invalid or unknown token
TTag // field tag: x.y
TString // string value: 'foo bar'
TNumber // number: 0, 15.5, 100
TTime // timestamp: TIME yyyy-mm-ddThh:mm:ss([-+]hh:mm|Z)
TDate // datestamp: DATE yyyy-mm-dd
TAnd // operator: AND
TContains // operator: CONTAINS
TExists // operator: EXISTS
TEq // operator: =
TLt // operator: <
TLeq // operator: <=
TGt // operator: >
TGeq // operator: >=
// Do not reorder these values without updating the scanner code.
)
var tString = [...]string{
TInvalid: "invalid token",
TTag: "tag",
TString: "string",
TNumber: "number",
TTime: "timestamp",
TDate: "datestamp",
TAnd: "AND operator",
TContains: "CONTAINS operator",
TExists: "EXISTS operator",
TEq: "= operator",
TLt: "< operator",
TLeq: "<= operator",
TGt: "> operator",
TGeq: ">= operator",
}
func (t Token) String() string {
v := int(t)
if v > len(tString) {
return "unknown token type"
}
return tString[v]
}
const (
// TimeFormat is the format string used for timestamp values.
TimeFormat = time.RFC3339
// DateFormat is the format string used for datestamp values.
DateFormat = "2006-01-02"
)
// Scanner reads lexical tokens of the query language from an input stream.
// Each call to Next advances the scanner to the next token, or reports an
// error.
type Scanner struct {
r *bufio.Reader
buf bytes.Buffer
tok Token
err error
pos, last, end int
}
// NewScanner constructs a new scanner that reads from r.
func NewScanner(r io.Reader) *Scanner { return &Scanner{r: bufio.NewReader(r)} }
// Next advances s to the next token in the input, or reports an error. At the
// end of input, Next returns io.EOF.
func (s *Scanner) Next() error {
s.buf.Reset()
s.pos = s.end
s.tok = TInvalid
s.err = nil
for {
ch, err := s.rune()
if err != nil {
return s.fail(err)
}
if unicode.IsSpace(ch) {
s.pos = s.end
continue // skip whitespace
}
if '0' <= ch && ch <= '9' {
return s.scanNumber(ch)
} else if isTagRune(ch) {
return s.scanTagLike(ch)
}
switch ch {
case '\'':
return s.scanString(ch)
case '<', '>', '=':
return s.scanCompare(ch)
default:
return s.invalid(ch)
}
}
}
// Token returns the type of the current input token.
func (s *Scanner) Token() Token { return s.tok }
// Text returns the text of the current input token.
func (s *Scanner) Text() string { return s.buf.String() }
// Pos returns the start offset of the current token in the input.
func (s *Scanner) Pos() int { return s.pos }
// Err returns the last error reported by Next, if any.
func (s *Scanner) Err() error { return s.err }
// scanNumber scans for numbers with optional fractional parts.
// Examples: 0, 1, 3.14
func (s *Scanner) scanNumber(first rune) error {
s.buf.WriteRune(first)
if err := s.scanWhile(isDigit); err != nil {
return err
}
ch, err := s.rune()
if err != nil && err != io.EOF {
return err
}
if ch == '.' {
s.buf.WriteRune(ch)
if err := s.scanWhile(isDigit); err != nil {
return err
}
} else {
s.unrune()
}
s.tok = TNumber
return nil
}
func (s *Scanner) scanString(first rune) error {
// discard opening quote
for {
ch, err := s.rune()
if err != nil {
return s.fail(err)
} else if ch == first {
// discard closing quote
s.tok = TString
return nil
}
s.buf.WriteRune(ch)
}
}
func (s *Scanner) scanCompare(first rune) error {
s.buf.WriteRune(first)
switch first {
case '=':
s.tok = TEq
return nil
case '<':
s.tok = TLt
case '>':
s.tok = TGt
default:
return s.invalid(first)
}
ch, err := s.rune()
if err == io.EOF {
return nil // the assigned token is correct
} else if err != nil {
return s.fail(err)
}
if ch == '=' {
s.buf.WriteRune(ch)
s.tok++ // depends on token order
return nil
}
s.unrune()
return nil
}
func (s *Scanner) scanTagLike(first rune) error {
s.buf.WriteRune(first)
var hasSpace bool
for {
ch, err := s.rune()
if err == io.EOF {
break
} else if err != nil {
return s.fail(err)
}
if !isTagRune(ch) {
hasSpace = ch == ' ' // to check for TIME, DATE
break
}
s.buf.WriteRune(ch)
}
text := s.buf.String()
switch text {
case "TIME":
if hasSpace {
return s.scanTimestamp()
}
s.tok = TTag
case "DATE":
if hasSpace {
return s.scanDatestamp()
}
s.tok = TTag
case "AND":
s.tok = TAnd
case "EXISTS":
s.tok = TExists
case "CONTAINS":
s.tok = TContains
default:
s.tok = TTag
}
s.unrune()
return nil
}
func (s *Scanner) scanTimestamp() error {
s.buf.Reset() // discard "TIME" label
if err := s.scanWhile(isTimeRune); err != nil {
return err
}
if ts, err := time.Parse(TimeFormat, s.buf.String()); err != nil {
return s.fail(fmt.Errorf("invalid TIME value: %w", err))
} else if y := ts.Year(); y < 1900 || y > 2999 {
return s.fail(fmt.Errorf("timestamp year %d out of range", ts.Year()))
}
s.tok = TTime
return nil
}
func (s *Scanner) scanDatestamp() error {
s.buf.Reset() // discard "DATE" label
if err := s.scanWhile(isDateRune); err != nil {
return err
}
if ts, err := time.Parse(DateFormat, s.buf.String()); err != nil {
return s.fail(fmt.Errorf("invalid DATE value: %w", err))
} else if y := ts.Year(); y < 1900 || y > 2999 {
return s.fail(fmt.Errorf("datestamp year %d out of range", ts.Year()))
}
s.tok = TDate
return nil
}
func (s *Scanner) scanWhile(ok func(rune) bool) error {
for {
ch, err := s.rune()
if err == io.EOF {
return nil
} else if err != nil {
return s.fail(err)
} else if !ok(ch) {
s.unrune()
return nil
}
s.buf.WriteRune(ch)
}
}
func (s *Scanner) rune() (rune, error) {
ch, nb, err := s.r.ReadRune()
s.last = nb
s.end += nb
return ch, err
}
func (s *Scanner) unrune() {
_ = s.r.UnreadRune()
s.end -= s.last
}
func (s *Scanner) fail(err error) error {
s.err = err
return err
}
func (s *Scanner) invalid(ch rune) error {
return s.fail(fmt.Errorf("invalid input %c at offset %d", ch, s.end))
}
func isDigit(r rune) bool { return '0' <= r && r <= '9' }
func isTagRune(r rune) bool {
return r == '.' || r == '_' || unicode.IsLetter(r) || unicode.IsDigit(r)
}
func isTimeRune(r rune) bool {
return strings.ContainsRune("-T:+Z", r) || isDigit(r)
}
func isDateRune(r rune) bool { return isDigit(r) || r == '-' }

+ 190
- 0
libs/pubsub/query/syntax/syntax_test.go View File

@ -0,0 +1,190 @@
package syntax_test
import (
"io"
"reflect"
"strings"
"testing"
"github.com/tendermint/tendermint/libs/pubsub/query/syntax"
)
func TestScanner(t *testing.T) {
tests := []struct {
input string
want []syntax.Token
}{
// Empty inputs
{"", nil},
{" ", nil},
{"\t\n ", nil},
// Numbers
{`0 123`, []syntax.Token{syntax.TNumber, syntax.TNumber}},
{`0.32 3.14`, []syntax.Token{syntax.TNumber, syntax.TNumber}},
// Tags
{`foo foo.bar`, []syntax.Token{syntax.TTag, syntax.TTag}},
// Strings (values)
{` '' x 'x' 'x y'`, []syntax.Token{syntax.TString, syntax.TTag, syntax.TString, syntax.TString}},
{` 'you are not your job' `, []syntax.Token{syntax.TString}},
// Comparison operators
{`< <= = > >=`, []syntax.Token{
syntax.TLt, syntax.TLeq, syntax.TEq, syntax.TGt, syntax.TGeq,
}},
// Mixed values of various kinds.
{`x AND y`, []syntax.Token{syntax.TTag, syntax.TAnd, syntax.TTag}},
{`x.y CONTAINS 'z'`, []syntax.Token{syntax.TTag, syntax.TContains, syntax.TString}},
{`foo EXISTS`, []syntax.Token{syntax.TTag, syntax.TExists}},
{`and AND`, []syntax.Token{syntax.TTag, syntax.TAnd}},
// Timestamp
{`TIME 2021-11-23T15:16:17Z`, []syntax.Token{syntax.TTime}},
// Datestamp
{`DATE 2021-11-23`, []syntax.Token{syntax.TDate}},
}
for _, test := range tests {
s := syntax.NewScanner(strings.NewReader(test.input))
var got []syntax.Token
for s.Next() == nil {
got = append(got, s.Token())
}
if err := s.Err(); err != io.EOF {
t.Errorf("Next: unexpected error: %v", err)
}
if !reflect.DeepEqual(got, test.want) {
t.Logf("Scanner input: %q", test.input)
t.Errorf("Wrong tokens:\ngot: %+v\nwant: %+v", got, test.want)
}
}
}
func TestScannerErrors(t *testing.T) {
tests := []struct {
input string
}{
{`'incomplete string`},
{`-23`},
{`&`},
{`DATE xyz-pdq`},
{`DATE xyzp-dq-zv`},
{`DATE 0000-00-00`},
{`DATE 0000-00-000`},
{`DATE 2021-01-99`},
{`TIME 2021-01-01T34:56:78Z`},
{`TIME 2021-01-99T14:56:08Z`},
{`TIME 2021-01-99T34:56:08`},
{`TIME 2021-01-99T34:56:11+3`},
}
for _, test := range tests {
s := syntax.NewScanner(strings.NewReader(test.input))
if err := s.Next(); err == nil {
t.Errorf("Next: got %v (%#q), want error", s.Token(), s.Text())
}
}
}
// These parser tests were copied from the original implementation of the query
// parser, and are preserved here as a compatibility check.
func TestParseValid(t *testing.T) {
tests := []struct {
input string
valid bool
}{
{"tm.events.type='NewBlock'", true},
{"tm.events.type = 'NewBlock'", true},
{"tm.events.name = ''", true},
{"tm.events.type='TIME'", true},
{"tm.events.type='DATE'", true},
{"tm.events.type='='", true},
{"tm.events.type='TIME", false},
{"tm.events.type=TIME'", false},
{"tm.events.type==", false},
{"tm.events.type=NewBlock", false},
{">==", false},
{"tm.events.type 'NewBlock' =", false},
{"tm.events.type>'NewBlock'", false},
{"", false},
{"=", false},
{"='NewBlock'", false},
{"tm.events.type=", false},
{"tm.events.typeNewBlock", false},
{"tm.events.type'NewBlock'", false},
{"'NewBlock'", false},
{"NewBlock", false},
{"", false},
{"tm.events.type='NewBlock' AND abci.account.name='Igor'", true},
{"tm.events.type='NewBlock' AND", false},
{"tm.events.type='NewBlock' AN", false},
{"tm.events.type='NewBlock' AN tm.events.type='NewBlockHeader'", false},
{"AND tm.events.type='NewBlock' ", false},
{"abci.account.name CONTAINS 'Igor'", true},
{"tx.date > DATE 2013-05-03", true},
{"tx.date < DATE 2013-05-03", true},
{"tx.date <= DATE 2013-05-03", true},
{"tx.date >= DATE 2013-05-03", true},
{"tx.date >= DAT 2013-05-03", false},
{"tx.date <= DATE2013-05-03", false},
{"tx.date <= DATE -05-03", false},
{"tx.date >= DATE 20130503", false},
{"tx.date >= DATE 2013+01-03", false},
// incorrect year, month, day
{"tx.date >= DATE 0013-01-03", false},
{"tx.date >= DATE 2013-31-03", false},
{"tx.date >= DATE 2013-01-83", false},
{"tx.date > TIME 2013-05-03T14:45:00+07:00", true},
{"tx.date < TIME 2013-05-03T14:45:00-02:00", true},
{"tx.date <= TIME 2013-05-03T14:45:00Z", true},
{"tx.date >= TIME 2013-05-03T14:45:00Z", true},
{"tx.date >= TIME2013-05-03T14:45:00Z", false},
{"tx.date = IME 2013-05-03T14:45:00Z", false},
{"tx.date = TIME 2013-05-:45:00Z", false},
{"tx.date >= TIME 2013-05-03T14:45:00", false},
{"tx.date >= TIME 0013-00-00T14:45:00Z", false},
{"tx.date >= TIME 2013+05=03T14:45:00Z", false},
{"account.balance=100", true},
{"account.balance >= 200", true},
{"account.balance >= -300", false},
{"account.balance >>= 400", false},
{"account.balance=33.22.1", false},
{"slashing.amount EXISTS", true},
{"slashing.amount EXISTS AND account.balance=100", true},
{"account.balance=100 AND slashing.amount EXISTS", true},
{"slashing EXISTS", true},
{"hash='136E18F7E4C348B780CF873A0BF43922E5BAFA63'", true},
{"hash=136E18F7E4C348B780CF873A0BF43922E5BAFA63", false},
}
for _, test := range tests {
q, err := syntax.Parse(test.input)
if test.valid != (err == nil) {
t.Errorf("Parse %#q: valid %v got err=%v", test.input, test.valid, err)
}
// For valid queries, check that the query round-trips.
if test.valid {
qstr := q.String()
r, err := syntax.Parse(qstr)
if err != nil {
t.Errorf("Reparse %#q failed: %v", qstr, err)
}
if rstr := r.String(); rstr != qstr {
t.Errorf("Reparse diff\nold: %#q\nnew: %#q", qstr, rstr)
}
}
}
}

+ 6
- 6
types/event_bus_test.go View File

@ -35,7 +35,7 @@ func TestEventBusPublishEventTx(t *testing.T) {
// PublishEventTx adds 3 composite keys, so the query below should work
query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND testType.baz=1", tx.Hash())
txsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
txsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustCompile(query))
require.NoError(t, err)
done := make(chan struct{})
@ -89,7 +89,7 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
// PublishEventNewBlock adds the tm.event compositeKey, so the query below should work
query := "tm.event='NewBlock' AND testType.baz=1 AND testType.foz=2"
blocksSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
blocksSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustCompile(query))
require.NoError(t, err)
done := make(chan struct{})
@ -186,7 +186,7 @@ func TestEventBusPublishEventTxDuplicateKeys(t *testing.T) {
}
for i, tc := range testCases {
sub, err := eventBus.Subscribe(context.Background(), fmt.Sprintf("client-%d", i), tmquery.MustParse(tc.query))
sub, err := eventBus.Subscribe(context.Background(), fmt.Sprintf("client-%d", i), tmquery.MustCompile(tc.query))
require.NoError(t, err)
done := make(chan struct{})
@ -250,7 +250,7 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
// PublishEventNewBlockHeader adds the tm.event compositeKey, so the query below should work
query := "tm.event='NewBlockHeader' AND testType.baz=1 AND testType.foz=2"
headersSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
headersSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustCompile(query))
require.NoError(t, err)
done := make(chan struct{})
@ -290,7 +290,7 @@ func TestEventBusPublishEventNewEvidence(t *testing.T) {
ev := NewMockDuplicateVoteEvidence(1, time.Now(), "test-chain-id")
query := "tm.event='NewEvidence'"
evSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
evSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustCompile(query))
require.NoError(t, err)
done := make(chan struct{})
@ -327,7 +327,7 @@ func TestEventBusPublish(t *testing.T) {
const numEventsExpected = 14
sub, err := eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}, numEventsExpected)
sub, err := eventBus.Subscribe(context.Background(), "test", tmquery.All, numEventsExpected)
require.NoError(t, err)
done := make(chan struct{})


+ 2
- 2
types/events.go View File

@ -232,11 +232,11 @@ var (
)
func EventQueryTxFor(tx Tx) tmpubsub.Query {
return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTxValue, TxHashKey, tx.Hash()))
return tmquery.MustCompile(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTxValue, TxHashKey, tx.Hash()))
}
func QueryForEvent(eventValue string) tmpubsub.Query {
return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventValue))
return tmquery.MustCompile(fmt.Sprintf("%s='%s'", EventTypeKey, eventValue))
}
// BlockEventPublisher publishes all block related events


+ 3
- 3
types/events_test.go View File

@ -10,18 +10,18 @@ import (
func TestQueryTxFor(t *testing.T) {
tx := Tx("foo")
assert.Equal(t,
fmt.Sprintf("tm.event='Tx' AND tx.hash='%X'", tx.Hash()),
fmt.Sprintf("tm.event = 'Tx' AND tx.hash = '%X'", tx.Hash()),
EventQueryTxFor(tx).String(),
)
}
func TestQueryForEvent(t *testing.T) {
assert.Equal(t,
"tm.event='NewBlock'",
"tm.event = 'NewBlock'",
QueryForEvent(EventNewBlockValue).String(),
)
assert.Equal(t,
"tm.event='NewEvidence'",
"tm.event = 'NewEvidence'",
QueryForEvent(EventNewEvidenceValue).String(),
)
}

Loading…
Cancel
Save