diff --git a/.gitignore b/.gitignore index 62f28681c..34d0bf1f9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ vendor .glide + +pubsub/query/fuzz_test/output diff --git a/pubsub/example_test.go b/pubsub/example_test.go new file mode 100644 index 000000000..d64b96eab --- /dev/null +++ b/pubsub/example_test.go @@ -0,0 +1,24 @@ +package pubsub_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tmlibs/log" + "github.com/tendermint/tmlibs/pubsub" + "github.com/tendermint/tmlibs/pubsub/query" +) + +func TestExample(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ch := make(chan interface{}, 1) + s.Subscribe("example-client", query.MustParse("abci.account.name=John"), ch) + err := s.PublishWithTags("Tombstone", map[string]interface{}{"abci.account.name": "John"}) + require.NoError(t, err) + assertReceive(t, "Tombstone", ch) +} diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go new file mode 100644 index 000000000..264848161 --- /dev/null +++ b/pubsub/pubsub.go @@ -0,0 +1,314 @@ +// Package pubsub implements a pub-sub model with a single publisher (Server) +// and multiple subscribers (clients). +// +// Though you can have multiple publishers by sharing a pointer to a server or +// by giving the same channel to each publisher and publishing messages from +// that channel (fan-in). +// +// Clients subscribe for messages, which could be of any type, using a query. +// When some message is published, we match it with all queries. If there is a +// match, this message will be pushed to all clients, subscribed to that query. +// See query subpackage for our implementation. +// +// Overflow strategies (incoming publish requests): +// +// 1) drop - drops publish requests when there are too many of them +// 2) wait - blocks until the server is ready to accept more publish requests (default) +// +// Subscribe/Unsubscribe calls are always blocking. +// +// Overflow strategies (outgoing messages): +// +// 1) skip - do not send a message if the client is busy or slow (default) +// 2) wait - wait until the client is ready to accept new messages +// +package pubsub + +import ( + "errors" + + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" +) + +type operation int + +const ( + sub operation = iota + pub + unsub + shutdown +) + +type overflowStrategy int + +const ( + drop overflowStrategy = iota + wait +) + +var ( + ErrorOverflow = errors.New("Server overflowed") +) + +type cmd struct { + op operation + query Query + ch chan<- interface{} + clientID string + msg interface{} + tags map[string]interface{} +} + +// Query defines an interface for a query to be used for subscribing. +type Query interface { + Matches(tags map[string]interface{}) bool +} + +// Server allows clients to subscribe/unsubscribe for messages, pubsling +// messages with or without tags, and manages internal state. +type Server struct { + cmn.BaseService + + cmds chan cmd + + overflowStrategy overflowStrategy + slowClientStrategy overflowStrategy +} + +// Option sets a parameter for the server. +type Option func(*Server) + +// NewServer returns a new server. See the commentary on the Option functions +// for a detailed description of how to configure buffering and overflow +// behavior. If no options are provided, the resulting server's queue is +// unbuffered and it blocks when overflowed. +func NewServer(options ...Option) *Server { + s := &Server{overflowStrategy: wait, slowClientStrategy: drop} + s.BaseService = *cmn.NewBaseService(nil, "PubSub", s) + + for _, option := range options { + option(s) + } + + if s.cmds == nil { // if BufferCapacity was not set, create unbuffered channel + s.cmds = make(chan cmd) + } + + return s +} + +// BufferCapacity allows you to specify capacity for the internal server's +// queue. Since the server, given Y subscribers, could only process X messages, +// this option could be used to survive spikes (e.g. high amount of +// transactions during peak hours). +func BufferCapacity(cap int) Option { + return func(s *Server) { + if cap > 0 { + s.cmds = make(chan cmd, cap) + } + } +} + +// OverflowStrategyDrop will tell the server to drop messages when it can't +// process more messages. +func OverflowStrategyDrop() Option { + return func(s *Server) { + s.overflowStrategy = drop + } +} + +// OverflowStrategyWait will tell the server to block and wait for some time +// for server to process other messages. Default strategy. +func OverflowStrategyWait() func(*Server) { + return func(s *Server) { + s.overflowStrategy = wait + } +} + +// WaitSlowClients will tell the server to block and wait until subscriber +// reads a messages even if it is fast enough to process them. +func WaitSlowClients() func(*Server) { + return func(s *Server) { + s.slowClientStrategy = wait + } +} + +// SkipSlowClients will tell the server to skip subscriber if it is busy +// processing previous message(s). Default strategy. +func SkipSlowClients() func(*Server) { + return func(s *Server) { + s.slowClientStrategy = drop + } +} + +// Subscribe returns a channel on which messages matching the given query can +// be received. If the subscription already exists old channel will be closed +// and new one returned. +func (s *Server) Subscribe(clientID string, query Query, out chan<- interface{}) { + s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out} +} + +// Unsubscribe unsubscribes the given client from the query. +func (s *Server) Unsubscribe(clientID string, query Query) { + s.cmds <- cmd{op: unsub, clientID: clientID, query: query} +} + +// Unsubscribe unsubscribes the given channel. +func (s *Server) UnsubscribeAll(clientID string) { + s.cmds <- cmd{op: unsub, clientID: clientID} +} + +// Publish publishes the given message. +func (s *Server) Publish(msg interface{}) error { + return s.PublishWithTags(msg, make(map[string]interface{})) +} + +// PublishWithTags publishes the given message with a set of tags. This set of +// tags will be matched with client queries. If there is a match, the message +// will be sent to a client. +func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) error { + pubCmd := cmd{op: pub, msg: msg, tags: tags} + switch s.overflowStrategy { + case drop: + select { + case s.cmds <- pubCmd: + default: + s.Logger.Error("Server overflowed, dropping message...", "msg", msg) + return ErrorOverflow + } + case wait: + s.cmds <- pubCmd + } + return nil +} + +// OnStop implements Service.OnStop by shutting down the server. +func (s *Server) OnStop() { + s.cmds <- cmd{op: shutdown} +} + +// NOTE: not goroutine safe +type state struct { + // query -> client -> ch + queries map[Query]map[string]chan<- interface{} + // client -> query -> struct{} + clients map[string]map[Query]struct{} +} + +// OnStart implements Service.OnStart by creating a main loop. +func (s *Server) OnStart() error { + go s.loop(state{ + queries: make(map[Query]map[string]chan<- interface{}), + clients: make(map[string]map[Query]struct{}), + }) + return nil +} + +func (s *Server) loop(state state) { +loop: + for cmd := range s.cmds { + switch cmd.op { + case unsub: + if cmd.query != nil { + state.remove(cmd.clientID, cmd.query) + } else { + state.removeAll(cmd.clientID) + } + case shutdown: + state.reset() + break loop + case sub: + state.add(cmd.clientID, cmd.query, cmd.ch) + case pub: + state.send(cmd.msg, cmd.tags, s.slowClientStrategy, s.Logger) + } + } +} + +func (state *state) add(clientID string, q Query, ch chan<- interface{}) { + // add query if needed + if clientToChannelMap, ok := state.queries[q]; !ok { + state.queries[q] = make(map[string]chan<- interface{}) + } else { + // check if already subscribed + if oldCh, ok := clientToChannelMap[clientID]; ok { + close(oldCh) + } + } + state.queries[q][clientID] = ch + + // add client if needed + if _, ok := state.clients[clientID]; !ok { + state.clients[clientID] = make(map[Query]struct{}) + } + state.clients[clientID][q] = struct{}{} + + // create subscription + clientToChannelMap := state.queries[q] + clientToChannelMap[clientID] = ch +} + +func (state *state) remove(clientID string, q Query) { + clientToChannelMap, ok := state.queries[q] + if !ok { + return + } + + ch, ok := clientToChannelMap[clientID] + if ok { + close(ch) + + delete(state.clients[clientID], q) + + // if it not subscribed to anything else, remove the client + if len(state.clients[clientID]) == 0 { + delete(state.clients, clientID) + } + + delete(state.queries[q], clientID) + } +} + +func (state *state) removeAll(clientID string) { + queryMap, ok := state.clients[clientID] + if !ok { + return + } + + for q, _ := range queryMap { + ch := state.queries[q][clientID] + close(ch) + + delete(state.queries[q], clientID) + } + + delete(state.clients, clientID) +} + +func (state *state) reset() { + state.queries = make(map[Query]map[string]chan<- interface{}) + state.clients = make(map[string]map[Query]struct{}) +} + +func (state *state) send(msg interface{}, tags map[string]interface{}, slowClientStrategy overflowStrategy, logger log.Logger) { + for q, clientToChannelMap := range state.queries { + // NOTE we can use LRU cache to speed up common cases like query = " + // tm.events.type=NewBlock" and tags = {"tm.events.type": "NewBlock"} + if q.Matches(tags) { + for clientID, ch := range clientToChannelMap { + logger.Info("Sending message to client", "msg", msg, "client", clientID) + switch slowClientStrategy { + case drop: + select { + case ch <- msg: + default: + logger.Error("Client is busy, skipping...", "clientID", clientID) + } + case wait: + ch <- msg + } + } + } + } +} diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go new file mode 100644 index 000000000..570f76a82 --- /dev/null +++ b/pubsub/pubsub_test.go @@ -0,0 +1,227 @@ +package pubsub_test + +import ( + "fmt" + "runtime/debug" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tmlibs/log" + "github.com/tendermint/tmlibs/pubsub" + "github.com/tendermint/tmlibs/pubsub/query" +) + +const ( + clientID = "test-client" +) + +func TestSubscribe(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ch := make(chan interface{}, 1) + s.Subscribe(clientID, query.Empty{}, ch) + err := s.Publish("Ka-Zar") + require.NoError(t, err) + assertReceive(t, "Ka-Zar", ch) + + err = s.Publish("Quicksilver") + require.NoError(t, err) + assertReceive(t, "Quicksilver", ch) +} + +func TestDifferentClients(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + ch1 := make(chan interface{}, 1) + s.Subscribe("client-1", query.MustParse("tm.events.type=NewBlock"), ch1) + err := s.PublishWithTags("Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) + require.NoError(t, err) + assertReceive(t, "Iceman", ch1) + + ch2 := make(chan interface{}, 1) + s.Subscribe("client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2) + err = s.PublishWithTags("Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) + require.NoError(t, err) + assertReceive(t, "Ultimo", ch1) + assertReceive(t, "Ultimo", ch2) + + ch3 := make(chan interface{}, 1) + s.Subscribe("client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3) + err = s.PublishWithTags("Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) + require.NoError(t, err) + assert.Zero(t, len(ch3)) +} + +func TestClientResubscribes(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + q := query.MustParse("tm.events.type=NewBlock") + + ch1 := make(chan interface{}, 1) + s.Subscribe(clientID, q, ch1) + err := s.PublishWithTags("Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) + require.NoError(t, err) + assertReceive(t, "Goblin Queen", ch1) + + ch2 := make(chan interface{}, 1) + s.Subscribe(clientID, q, ch2) + + _, ok := <-ch1 + assert.False(t, ok) + + err = s.PublishWithTags("Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) + require.NoError(t, err) + assertReceive(t, "Spider-Man", ch2) +} + +func TestUnsubscribe(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ch := make(chan interface{}) + s.Subscribe(clientID, query.Empty{}, ch) + s.Unsubscribe(clientID, query.Empty{}) + + err := s.Publish("Nick Fury") + require.NoError(t, err) + assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") + + _, ok := <-ch + assert.False(t, ok) +} + +func TestUnsubscribeAll(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) + s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch1) + s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2) + + s.UnsubscribeAll(clientID) + + err := s.Publish("Nick Fury") + require.NoError(t, err) + assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") + assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") + + _, ok := <-ch1 + assert.False(t, ok) + _, ok = <-ch2 + assert.False(t, ok) +} + +func TestOverflowStrategyDrop(t *testing.T) { + s := pubsub.NewServer(pubsub.OverflowStrategyDrop()) + s.SetLogger(log.TestingLogger()) + + err := s.Publish("Veda") + if assert.Error(t, err) { + assert.Equal(t, pubsub.ErrorOverflow, err) + } +} + +func TestOverflowStrategyWait(t *testing.T) { + s := pubsub.NewServer(pubsub.OverflowStrategyWait()) + s.SetLogger(log.TestingLogger()) + + go func() { + time.Sleep(1 * time.Second) + s.Start() + defer s.Stop() + }() + + err := s.Publish("Veda") + assert.NoError(t, err) +} + +func TestBufferCapacity(t *testing.T) { + s := pubsub.NewServer(pubsub.BufferCapacity(2)) + s.SetLogger(log.TestingLogger()) + + err := s.Publish("Nighthawk") + require.NoError(t, err) + err = s.Publish("Sage") + require.NoError(t, err) +} + +func TestWaitSlowClients(t *testing.T) { + s := pubsub.NewServer(pubsub.WaitSlowClients()) + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ch := make(chan interface{}) + s.Subscribe(clientID, query.Empty{}, ch) + err := s.Publish("Wonderwoman") + require.NoError(t, err) + + time.Sleep(1 * time.Second) + + assertReceive(t, "Wonderwoman", ch) +} + +func TestSkipSlowClients(t *testing.T) { + s := pubsub.NewServer(pubsub.SkipSlowClients()) + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ch := make(chan interface{}) + s.Subscribe(clientID, query.Empty{}, ch) + err := s.Publish("Cyclops") + require.NoError(t, err) + assert.Zero(t, len(ch)) +} + +func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } +func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) } +func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) } + +func benchmarkNClients(n int, b *testing.B) { + s := pubsub.NewServer(pubsub.BufferCapacity(b.N)) + s.Start() + defer s.Stop() + + for i := 0; i < n; i++ { + ch := make(chan interface{}) + s.Subscribe(clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = Ivan AND abci.Invoices.Number = %d", i)), ch) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.PublishWithTags("Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i}) + } +} + +/////////////////////////////////////////////////////////////////////////////// +/// HELPERS +/////////////////////////////////////////////////////////////////////////////// + +func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) { + select { + case actual := <-ch: + if actual != nil { + assert.Equal(t, expected, actual, msgAndArgs...) + } + case <-time.After(1 * time.Second): + t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected) + debug.PrintStack() + } +} diff --git a/pubsub/query/Makefile b/pubsub/query/Makefile new file mode 100644 index 000000000..ca3ff5b56 --- /dev/null +++ b/pubsub/query/Makefile @@ -0,0 +1,11 @@ +gen_query_parser: + @go get github.com/pointlander/peg + peg -inline -switch query.peg + +fuzzy_test: + @go get github.com/dvyukov/go-fuzz/go-fuzz + @go get github.com/dvyukov/go-fuzz/go-fuzz-build + go-fuzz-build github.com/tendermint/tmlibs/pubsub/query/fuzz_test + go-fuzz -bin=./fuzz_test-fuzz.zip -workdir=./fuzz_test/output + +.PHONY: gen_query_parser fuzzy_test diff --git a/pubsub/query/empty.go b/pubsub/query/empty.go new file mode 100644 index 000000000..2d60a8923 --- /dev/null +++ b/pubsub/query/empty.go @@ -0,0 +1,14 @@ +package query + +// Empty query matches any set of tags. +type Empty struct { +} + +// Matches always returns true. +func (Empty) Matches(tags map[string]interface{}) bool { + return true +} + +func (Empty) String() string { + return "empty" +} diff --git a/pubsub/query/empty_test.go b/pubsub/query/empty_test.go new file mode 100644 index 000000000..663acb191 --- /dev/null +++ b/pubsub/query/empty_test.go @@ -0,0 +1,16 @@ +package query_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tendermint/tmlibs/pubsub/query" +) + +func TestEmptyQueryMatchesAnything(t *testing.T) { + q := query.Empty{} + assert.True(t, q.Matches(map[string]interface{}{})) + assert.True(t, q.Matches(map[string]interface{}{"Asher": "Roth"})) + assert.True(t, q.Matches(map[string]interface{}{"Route": 66})) + assert.True(t, q.Matches(map[string]interface{}{"Route": 66, "Billy": "Blue"})) +} diff --git a/pubsub/query/fuzz_test/main.go b/pubsub/query/fuzz_test/main.go new file mode 100644 index 000000000..3b0ef1473 --- /dev/null +++ b/pubsub/query/fuzz_test/main.go @@ -0,0 +1,30 @@ +package fuzz_test + +import ( + "fmt" + + "github.com/tendermint/tmlibs/pubsub/query" +) + +func Fuzz(data []byte) int { + sdata := string(data) + q0, err := query.New(sdata) + if err != nil { + return 0 + } + + sdata1 := q0.String() + q1, err := query.New(sdata1) + if err != nil { + panic(err) + } + + sdata2 := q1.String() + if sdata1 != sdata2 { + fmt.Printf("q0: %q\n", sdata1) + fmt.Printf("q1: %q\n", sdata2) + panic("query changed") + } + + return 1 +} diff --git a/pubsub/query/parser_test.go b/pubsub/query/parser_test.go new file mode 100644 index 000000000..194966664 --- /dev/null +++ b/pubsub/query/parser_test.go @@ -0,0 +1,81 @@ +package query_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tendermint/tmlibs/pubsub/query" +) + +// TODO: fuzzy testing? +func TestParser(t *testing.T) { + cases := []struct { + query string + valid bool + }{ + {"tm.events.type=NewBlock", true}, + {"tm.events.type = NewBlock", true}, + {"tm.events.type=TIME", true}, + {"tm.events.type=DATE", true}, + {"tm.events.type==", false}, + {">==", false}, + {"tm.events.type NewBlock =", false}, + {"tm.events.type>NewBlock", false}, + {"", false}, + {"=", false}, + {"=NewBlock", false}, + {"tm.events.type=", false}, + + {"tm.events.typeNewBlock", false}, + {"NewBlock", false}, + {"", false}, + + {"tm.events.type=NewBlock AND abci.account.name=Igor", true}, + {"tm.events.type=NewBlock AND", false}, + {"tm.events.type=NewBlock AN", false}, + {"tm.events.type=NewBlock AN tm.events.type=NewBlockHeader", false}, + {"AND tm.events.type=NewBlock ", false}, + + {"abci.account.name CONTAINS Igor", true}, + + {"tx.date > DATE 2013-05-03", true}, + {"tx.date < DATE 2013-05-03", true}, + {"tx.date <= DATE 2013-05-03", true}, + {"tx.date >= DATE 2013-05-03", true}, + {"tx.date >= DAT 2013-05-03", false}, + {"tx.date <= DATE2013-05-03", false}, + {"tx.date <= DATE -05-03", false}, + {"tx.date >= DATE 20130503", false}, + {"tx.date >= DATE 2013+01-03", false}, + // incorrect year, month, day + {"tx.date >= DATE 0013-01-03", false}, + {"tx.date >= DATE 2013-31-03", false}, + {"tx.date >= DATE 2013-01-83", false}, + + {"tx.date > TIME 2013-05-03T14:45:00+07:00", true}, + {"tx.date < TIME 2013-05-03T14:45:00-02:00", true}, + {"tx.date <= TIME 2013-05-03T14:45:00Z", true}, + {"tx.date >= TIME 2013-05-03T14:45:00Z", true}, + {"tx.date >= TIME2013-05-03T14:45:00Z", false}, + {"tx.date = IME 2013-05-03T14:45:00Z", false}, + {"tx.date = TIME 2013-05-:45:00Z", false}, + {"tx.date >= TIME 2013-05-03T14:45:00", false}, + {"tx.date >= TIME 0013-00-00T14:45:00Z", false}, + {"tx.date >= TIME 2013+05=03T14:45:00Z", false}, + + {"account.balance=100", true}, + {"account.balance >= 200", true}, + {"account.balance >= -300", false}, + {"account.balance >>= 400", false}, + {"account.balance=33.22.1", false}, + } + + for _, c := range cases { + _, err := query.New(c.query) + if c.valid { + assert.NoError(t, err, "Query was '%s'", c.query) + } else { + assert.Error(t, err, "Query was '%s'", c.query) + } + } +} diff --git a/pubsub/query/query.go b/pubsub/query/query.go new file mode 100644 index 000000000..f084a3f98 --- /dev/null +++ b/pubsub/query/query.go @@ -0,0 +1,258 @@ +// Package query provides a parser for a custom query format: +// +// abci.invoice.number=22 AND abci.invoice.owner=Ivan +// +// See query.peg for the grammar, which is a https://en.wikipedia.org/wiki/Parsing_expression_grammar. +// More: https://github.com/PhilippeSigaud/Pegged/wiki/PEG-Basics +// +// It has a support for numbers (integer and floating point), dates and times. +package query + +import ( + "fmt" + "reflect" + "strconv" + "strings" + "time" +) + +// Query holds the query string and the query parser. +type Query struct { + str string + parser *QueryParser +} + +// New parses the given string and returns a query or error if the string is +// invalid. +func New(s string) (*Query, error) { + p := &QueryParser{Buffer: fmt.Sprintf(`"%s"`, s)} + p.Init() + if err := p.Parse(); err != nil { + return nil, err + } + return &Query{str: s, parser: p}, nil +} + +// MustParse turns the given string into a query or panics; for tests or others +// cases where you know the string is valid. +func MustParse(s string) *Query { + q, err := New(s) + if err != nil { + panic(fmt.Sprintf("failed to parse %s: %v", s, err)) + } + return q +} + +// String returns the original string. +func (q *Query) String() string { + return q.str +} + +type operator uint8 + +const ( + opLessEqual operator = iota + opGreaterEqual + opLess + opGreater + opEqual + opContains +) + +// Matches returns true if the query matches the given set of tags, false otherwise. +// +// For example, query "name=John" matches tags = {"name": "John"}. More +// examples could be found in parser_test.go and query_test.go. +func (q *Query) Matches(tags map[string]interface{}) bool { + if len(tags) == 0 { + return false + } + + buffer, begin, end := q.parser.Buffer, 0, 0 + + var tag string + var op operator + + // tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7") + for _, token := range q.parser.Tokens() { + switch token.pegRule { + + case rulePegText: + begin, end = int(token.begin), int(token.end) + case ruletag: + tag = buffer[begin:end] + case rulele: + op = opLessEqual + case rulege: + op = opGreaterEqual + case rulel: + op = opLess + case ruleg: + op = opGreater + case ruleequal: + op = opEqual + case rulecontains: + op = opContains + case rulevalue: + // see if the triplet (tag, operator, operand) matches any tag + // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } + if !match(tag, op, reflect.ValueOf(buffer[begin:end]), tags) { + return false + } + case rulenumber: + number := buffer[begin:end] + if strings.Contains(number, ".") { // if it looks like a floating-point number + value, err := strconv.ParseFloat(number, 64) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + } else { + value, err := strconv.ParseInt(number, 10, 64) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number)) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + } + case ruletime: + value, err := time.Parse(time.RFC3339, buffer[begin:end]) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + case ruledate: + value, err := time.Parse("2006-01-02", buffer[begin:end]) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end])) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + } + } + + return true +} + +// match returns true if the given triplet (tag, operator, operand) matches any tag. +// +// First, it looks up the tag in tags and if it finds one, tries to compare the +// value from it to the operand using the operator. +// +// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } +func match(tag string, op operator, operand reflect.Value, tags map[string]interface{}) bool { + // look up the tag from the query in tags + value, ok := tags[tag] + if !ok { + return false + } + switch operand.Kind() { + case reflect.Struct: // time + operandAsTime := operand.Interface().(time.Time) + v, ok := value.(time.Time) + if !ok { // if value from tags is not time.Time + return false + } + switch op { + case opLessEqual: + return v.Before(operandAsTime) || v.Equal(operandAsTime) + case opGreaterEqual: + return v.Equal(operandAsTime) || v.After(operandAsTime) + case opLess: + return v.Before(operandAsTime) + case opGreater: + return v.After(operandAsTime) + case opEqual: + return v.Equal(operandAsTime) + } + case reflect.Float64: + operandFloat64 := operand.Interface().(float64) + var v float64 + // try our best to convert value from tags to float64 + switch vt := value.(type) { + case float64: + v = vt + case float32: + v = float64(vt) + case int: + v = float64(vt) + case int8: + v = float64(vt) + case int16: + v = float64(vt) + case int32: + v = float64(vt) + case int64: + v = float64(vt) + default: // fail for all other types + panic(fmt.Sprintf("Incomparable types: %T (%v) vs float64 (%v)", value, value, operandFloat64)) + } + switch op { + case opLessEqual: + return v <= operandFloat64 + case opGreaterEqual: + return v >= operandFloat64 + case opLess: + return v < operandFloat64 + case opGreater: + return v > operandFloat64 + case opEqual: + return v == operandFloat64 + } + case reflect.Int64: + operandInt := operand.Interface().(int64) + var v int64 + // try our best to convert value from tags to int64 + switch vt := value.(type) { + case int64: + v = vt + case int8: + v = int64(vt) + case int16: + v = int64(vt) + case int32: + v = int64(vt) + case int: + v = int64(vt) + case float64: + v = int64(vt) + case float32: + v = int64(vt) + default: // fail for all other types + panic(fmt.Sprintf("Incomparable types: %T (%v) vs int64 (%v)", value, value, operandInt)) + } + switch op { + case opLessEqual: + return v <= operandInt + case opGreaterEqual: + return v >= operandInt + case opLess: + return v < operandInt + case opGreater: + return v > operandInt + case opEqual: + return v == operandInt + } + case reflect.String: + v, ok := value.(string) + if !ok { // if value from tags is not string + return false + } + switch op { + case opEqual: + return v == operand.String() + case opContains: + return strings.Contains(v, operand.String()) + } + default: + panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind())) + } + + return false +} diff --git a/pubsub/query/query.peg b/pubsub/query/query.peg new file mode 100644 index 000000000..9654289c4 --- /dev/null +++ b/pubsub/query/query.peg @@ -0,0 +1,33 @@ +package query + +type QueryParser Peg { +} + +e <- '\"' condition ( ' '+ and ' '+ condition )* '\"' !. + +condition <- tag ' '* (le ' '* (number / time / date) + / ge ' '* (number / time / date) + / l ' '* (number / time / date) + / g ' '* (number / time / date) + / equal ' '* (number / time / date / value) + / contains ' '* value + ) + +tag <- < (![ \t\n\r\\()"=><] .)+ > +value <- < (![ \t\n\r\\()"=><] .)+ > +number <- < ('0' + / [1-9] digit* ('.' digit*)?) > +digit <- [0-9] +time <- "TIME " < year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit (('-' / '+') digit digit ':' digit digit / 'Z') > +date <- "DATE " < year '-' month '-' day > +year <- ('1' / '2') digit digit digit +month <- ('0' / '1') digit +day <- ('0' / '1' / '2' / '3') digit +and <- "AND" + +equal <- "=" +contains <- "CONTAINS" +le <- "<=" +ge <- ">=" +l <- "<" +g <- ">" diff --git a/pubsub/query/query.peg.go b/pubsub/query/query.peg.go new file mode 100644 index 000000000..5cd0a9e32 --- /dev/null +++ b/pubsub/query/query.peg.go @@ -0,0 +1,1668 @@ +package query + +import ( + "fmt" + "math" + "sort" + "strconv" +) + +const endSymbol rune = 1114112 + +/* The rule types inferred from the grammar are below. */ +type pegRule uint8 + +const ( + ruleUnknown pegRule = iota + rulee + rulecondition + ruletag + rulevalue + rulenumber + ruledigit + ruletime + ruledate + ruleyear + rulemonth + ruleday + ruleand + ruleequal + rulecontains + rulele + rulege + rulel + ruleg + rulePegText +) + +var rul3s = [...]string{ + "Unknown", + "e", + "condition", + "tag", + "value", + "number", + "digit", + "time", + "date", + "year", + "month", + "day", + "and", + "equal", + "contains", + "le", + "ge", + "l", + "g", + "PegText", +} + +type token32 struct { + pegRule + begin, end uint32 +} + +func (t *token32) String() string { + return fmt.Sprintf("\x1B[34m%v\x1B[m %v %v", rul3s[t.pegRule], t.begin, t.end) +} + +type node32 struct { + token32 + up, next *node32 +} + +func (node *node32) print(pretty bool, buffer string) { + var print func(node *node32, depth int) + print = func(node *node32, depth int) { + for node != nil { + for c := 0; c < depth; c++ { + fmt.Printf(" ") + } + rule := rul3s[node.pegRule] + quote := strconv.Quote(string(([]rune(buffer)[node.begin:node.end]))) + if !pretty { + fmt.Printf("%v %v\n", rule, quote) + } else { + fmt.Printf("\x1B[34m%v\x1B[m %v\n", rule, quote) + } + if node.up != nil { + print(node.up, depth+1) + } + node = node.next + } + } + print(node, 0) +} + +func (node *node32) Print(buffer string) { + node.print(false, buffer) +} + +func (node *node32) PrettyPrint(buffer string) { + node.print(true, buffer) +} + +type tokens32 struct { + tree []token32 +} + +func (t *tokens32) Trim(length uint32) { + t.tree = t.tree[:length] +} + +func (t *tokens32) Print() { + for _, token := range t.tree { + fmt.Println(token.String()) + } +} + +func (t *tokens32) AST() *node32 { + type element struct { + node *node32 + down *element + } + tokens := t.Tokens() + var stack *element + for _, token := range tokens { + if token.begin == token.end { + continue + } + node := &node32{token32: token} + for stack != nil && stack.node.begin >= token.begin && stack.node.end <= token.end { + stack.node.next = node.up + node.up = stack.node + stack = stack.down + } + stack = &element{node: node, down: stack} + } + if stack != nil { + return stack.node + } + return nil +} + +func (t *tokens32) PrintSyntaxTree(buffer string) { + t.AST().Print(buffer) +} + +func (t *tokens32) PrettyPrintSyntaxTree(buffer string) { + t.AST().PrettyPrint(buffer) +} + +func (t *tokens32) Add(rule pegRule, begin, end, index uint32) { + if tree := t.tree; int(index) >= len(tree) { + expanded := make([]token32, 2*len(tree)) + copy(expanded, tree) + t.tree = expanded + } + t.tree[index] = token32{ + pegRule: rule, + begin: begin, + end: end, + } +} + +func (t *tokens32) Tokens() []token32 { + return t.tree +} + +type QueryParser struct { + Buffer string + buffer []rune + rules [20]func() bool + parse func(rule ...int) error + reset func() + Pretty bool + tokens32 +} + +func (p *QueryParser) Parse(rule ...int) error { + return p.parse(rule...) +} + +func (p *QueryParser) Reset() { + p.reset() +} + +type textPosition struct { + line, symbol int +} + +type textPositionMap map[int]textPosition + +func translatePositions(buffer []rune, positions []int) textPositionMap { + length, translations, j, line, symbol := len(positions), make(textPositionMap, len(positions)), 0, 1, 0 + sort.Ints(positions) + +search: + for i, c := range buffer { + if c == '\n' { + line, symbol = line+1, 0 + } else { + symbol++ + } + if i == positions[j] { + translations[positions[j]] = textPosition{line, symbol} + for j++; j < length; j++ { + if i != positions[j] { + continue search + } + } + break search + } + } + + return translations +} + +type parseError struct { + p *QueryParser + max token32 +} + +func (e *parseError) Error() string { + tokens, error := []token32{e.max}, "\n" + positions, p := make([]int, 2*len(tokens)), 0 + for _, token := range tokens { + positions[p], p = int(token.begin), p+1 + positions[p], p = int(token.end), p+1 + } + translations := translatePositions(e.p.buffer, positions) + format := "parse error near %v (line %v symbol %v - line %v symbol %v):\n%v\n" + if e.p.Pretty { + format = "parse error near \x1B[34m%v\x1B[m (line %v symbol %v - line %v symbol %v):\n%v\n" + } + for _, token := range tokens { + begin, end := int(token.begin), int(token.end) + error += fmt.Sprintf(format, + rul3s[token.pegRule], + translations[begin].line, translations[begin].symbol, + translations[end].line, translations[end].symbol, + strconv.Quote(string(e.p.buffer[begin:end]))) + } + + return error +} + +func (p *QueryParser) PrintSyntaxTree() { + if p.Pretty { + p.tokens32.PrettyPrintSyntaxTree(p.Buffer) + } else { + p.tokens32.PrintSyntaxTree(p.Buffer) + } +} + +func (p *QueryParser) Init() { + var ( + max token32 + position, tokenIndex uint32 + buffer []rune + ) + p.reset = func() { + max = token32{} + position, tokenIndex = 0, 0 + + p.buffer = []rune(p.Buffer) + if len(p.buffer) == 0 || p.buffer[len(p.buffer)-1] != endSymbol { + p.buffer = append(p.buffer, endSymbol) + } + buffer = p.buffer + } + p.reset() + + _rules := p.rules + tree := tokens32{tree: make([]token32, math.MaxInt16)} + p.parse = func(rule ...int) error { + r := 1 + if len(rule) > 0 { + r = rule[0] + } + matches := p.rules[r]() + p.tokens32 = tree + if matches { + p.Trim(tokenIndex) + return nil + } + return &parseError{p, max} + } + + add := func(rule pegRule, begin uint32) { + tree.Add(rule, begin, position, tokenIndex) + tokenIndex++ + if begin != position && position > max.end { + max = token32{rule, begin, position} + } + } + + matchDot := func() bool { + if buffer[position] != endSymbol { + position++ + return true + } + return false + } + + /*matchChar := func(c byte) bool { + if buffer[position] == c { + position++ + return true + } + return false + }*/ + + /*matchRange := func(lower byte, upper byte) bool { + if c := buffer[position]; c >= lower && c <= upper { + position++ + return true + } + return false + }*/ + + _rules = [...]func() bool{ + nil, + /* 0 e <- <('"' condition (' '+ and ' '+ condition)* '"' !.)> */ + func() bool { + position0, tokenIndex0 := position, tokenIndex + { + position1 := position + if buffer[position] != rune('"') { + goto l0 + } + position++ + if !_rules[rulecondition]() { + goto l0 + } + l2: + { + position3, tokenIndex3 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l3 + } + position++ + l4: + { + position5, tokenIndex5 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l5 + } + position++ + goto l4 + l5: + position, tokenIndex = position5, tokenIndex5 + } + { + position6 := position + { + position7, tokenIndex7 := position, tokenIndex + if buffer[position] != rune('a') { + goto l8 + } + position++ + goto l7 + l8: + position, tokenIndex = position7, tokenIndex7 + if buffer[position] != rune('A') { + goto l3 + } + position++ + } + l7: + { + position9, tokenIndex9 := position, tokenIndex + if buffer[position] != rune('n') { + goto l10 + } + position++ + goto l9 + l10: + position, tokenIndex = position9, tokenIndex9 + if buffer[position] != rune('N') { + goto l3 + } + position++ + } + l9: + { + position11, tokenIndex11 := position, tokenIndex + if buffer[position] != rune('d') { + goto l12 + } + position++ + goto l11 + l12: + position, tokenIndex = position11, tokenIndex11 + if buffer[position] != rune('D') { + goto l3 + } + position++ + } + l11: + add(ruleand, position6) + } + if buffer[position] != rune(' ') { + goto l3 + } + position++ + l13: + { + position14, tokenIndex14 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l14 + } + position++ + goto l13 + l14: + position, tokenIndex = position14, tokenIndex14 + } + if !_rules[rulecondition]() { + goto l3 + } + goto l2 + l3: + position, tokenIndex = position3, tokenIndex3 + } + if buffer[position] != rune('"') { + goto l0 + } + position++ + { + position15, tokenIndex15 := position, tokenIndex + if !matchDot() { + goto l15 + } + goto l0 + l15: + position, tokenIndex = position15, tokenIndex15 + } + add(rulee, position1) + } + return true + l0: + position, tokenIndex = position0, tokenIndex0 + return false + }, + /* 1 condition <- <(tag ' '* ((le ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number))) / (ge ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number))) / ((&('=') (equal ' '* (number / time / date / value))) | (&('>') (g ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('<') (l ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('C' | 'c') (contains ' '* value)))))> */ + func() bool { + position16, tokenIndex16 := position, tokenIndex + { + position17 := position + { + position18 := position + { + position19 := position + { + position22, tokenIndex22 := position, tokenIndex + { + switch buffer[position] { + case '<': + if buffer[position] != rune('<') { + goto l22 + } + position++ + break + case '>': + if buffer[position] != rune('>') { + goto l22 + } + position++ + break + case '=': + if buffer[position] != rune('=') { + goto l22 + } + position++ + break + case '"': + if buffer[position] != rune('"') { + goto l22 + } + position++ + break + case ')': + if buffer[position] != rune(')') { + goto l22 + } + position++ + break + case '(': + if buffer[position] != rune('(') { + goto l22 + } + position++ + break + case '\\': + if buffer[position] != rune('\\') { + goto l22 + } + position++ + break + case '\r': + if buffer[position] != rune('\r') { + goto l22 + } + position++ + break + case '\n': + if buffer[position] != rune('\n') { + goto l22 + } + position++ + break + case '\t': + if buffer[position] != rune('\t') { + goto l22 + } + position++ + break + default: + if buffer[position] != rune(' ') { + goto l22 + } + position++ + break + } + } + + goto l16 + l22: + position, tokenIndex = position22, tokenIndex22 + } + if !matchDot() { + goto l16 + } + l20: + { + position21, tokenIndex21 := position, tokenIndex + { + position24, tokenIndex24 := position, tokenIndex + { + switch buffer[position] { + case '<': + if buffer[position] != rune('<') { + goto l24 + } + position++ + break + case '>': + if buffer[position] != rune('>') { + goto l24 + } + position++ + break + case '=': + if buffer[position] != rune('=') { + goto l24 + } + position++ + break + case '"': + if buffer[position] != rune('"') { + goto l24 + } + position++ + break + case ')': + if buffer[position] != rune(')') { + goto l24 + } + position++ + break + case '(': + if buffer[position] != rune('(') { + goto l24 + } + position++ + break + case '\\': + if buffer[position] != rune('\\') { + goto l24 + } + position++ + break + case '\r': + if buffer[position] != rune('\r') { + goto l24 + } + position++ + break + case '\n': + if buffer[position] != rune('\n') { + goto l24 + } + position++ + break + case '\t': + if buffer[position] != rune('\t') { + goto l24 + } + position++ + break + default: + if buffer[position] != rune(' ') { + goto l24 + } + position++ + break + } + } + + goto l21 + l24: + position, tokenIndex = position24, tokenIndex24 + } + if !matchDot() { + goto l21 + } + goto l20 + l21: + position, tokenIndex = position21, tokenIndex21 + } + add(rulePegText, position19) + } + add(ruletag, position18) + } + l26: + { + position27, tokenIndex27 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l27 + } + position++ + goto l26 + l27: + position, tokenIndex = position27, tokenIndex27 + } + { + position28, tokenIndex28 := position, tokenIndex + { + position30 := position + if buffer[position] != rune('<') { + goto l29 + } + position++ + if buffer[position] != rune('=') { + goto l29 + } + position++ + add(rulele, position30) + } + l31: + { + position32, tokenIndex32 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l32 + } + position++ + goto l31 + l32: + position, tokenIndex = position32, tokenIndex32 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l29 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l29 + } + break + default: + if !_rules[rulenumber]() { + goto l29 + } + break + } + } + + goto l28 + l29: + position, tokenIndex = position28, tokenIndex28 + { + position35 := position + if buffer[position] != rune('>') { + goto l34 + } + position++ + if buffer[position] != rune('=') { + goto l34 + } + position++ + add(rulege, position35) + } + l36: + { + position37, tokenIndex37 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l37 + } + position++ + goto l36 + l37: + position, tokenIndex = position37, tokenIndex37 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l34 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l34 + } + break + default: + if !_rules[rulenumber]() { + goto l34 + } + break + } + } + + goto l28 + l34: + position, tokenIndex = position28, tokenIndex28 + { + switch buffer[position] { + case '=': + { + position40 := position + if buffer[position] != rune('=') { + goto l16 + } + position++ + add(ruleequal, position40) + } + l41: + { + position42, tokenIndex42 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l42 + } + position++ + goto l41 + l42: + position, tokenIndex = position42, tokenIndex42 + } + { + position43, tokenIndex43 := position, tokenIndex + if !_rules[rulenumber]() { + goto l44 + } + goto l43 + l44: + position, tokenIndex = position43, tokenIndex43 + if !_rules[ruletime]() { + goto l45 + } + goto l43 + l45: + position, tokenIndex = position43, tokenIndex43 + if !_rules[ruledate]() { + goto l46 + } + goto l43 + l46: + position, tokenIndex = position43, tokenIndex43 + if !_rules[rulevalue]() { + goto l16 + } + } + l43: + break + case '>': + { + position47 := position + if buffer[position] != rune('>') { + goto l16 + } + position++ + add(ruleg, position47) + } + l48: + { + position49, tokenIndex49 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l49 + } + position++ + goto l48 + l49: + position, tokenIndex = position49, tokenIndex49 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l16 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l16 + } + break + default: + if !_rules[rulenumber]() { + goto l16 + } + break + } + } + + break + case '<': + { + position51 := position + if buffer[position] != rune('<') { + goto l16 + } + position++ + add(rulel, position51) + } + l52: + { + position53, tokenIndex53 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l53 + } + position++ + goto l52 + l53: + position, tokenIndex = position53, tokenIndex53 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l16 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l16 + } + break + default: + if !_rules[rulenumber]() { + goto l16 + } + break + } + } + + break + default: + { + position55 := position + { + position56, tokenIndex56 := position, tokenIndex + if buffer[position] != rune('c') { + goto l57 + } + position++ + goto l56 + l57: + position, tokenIndex = position56, tokenIndex56 + if buffer[position] != rune('C') { + goto l16 + } + position++ + } + l56: + { + position58, tokenIndex58 := position, tokenIndex + if buffer[position] != rune('o') { + goto l59 + } + position++ + goto l58 + l59: + position, tokenIndex = position58, tokenIndex58 + if buffer[position] != rune('O') { + goto l16 + } + position++ + } + l58: + { + position60, tokenIndex60 := position, tokenIndex + if buffer[position] != rune('n') { + goto l61 + } + position++ + goto l60 + l61: + position, tokenIndex = position60, tokenIndex60 + if buffer[position] != rune('N') { + goto l16 + } + position++ + } + l60: + { + position62, tokenIndex62 := position, tokenIndex + if buffer[position] != rune('t') { + goto l63 + } + position++ + goto l62 + l63: + position, tokenIndex = position62, tokenIndex62 + if buffer[position] != rune('T') { + goto l16 + } + position++ + } + l62: + { + position64, tokenIndex64 := position, tokenIndex + if buffer[position] != rune('a') { + goto l65 + } + position++ + goto l64 + l65: + position, tokenIndex = position64, tokenIndex64 + if buffer[position] != rune('A') { + goto l16 + } + position++ + } + l64: + { + position66, tokenIndex66 := position, tokenIndex + if buffer[position] != rune('i') { + goto l67 + } + position++ + goto l66 + l67: + position, tokenIndex = position66, tokenIndex66 + if buffer[position] != rune('I') { + goto l16 + } + position++ + } + l66: + { + position68, tokenIndex68 := position, tokenIndex + if buffer[position] != rune('n') { + goto l69 + } + position++ + goto l68 + l69: + position, tokenIndex = position68, tokenIndex68 + if buffer[position] != rune('N') { + goto l16 + } + position++ + } + l68: + { + position70, tokenIndex70 := position, tokenIndex + if buffer[position] != rune('s') { + goto l71 + } + position++ + goto l70 + l71: + position, tokenIndex = position70, tokenIndex70 + if buffer[position] != rune('S') { + goto l16 + } + position++ + } + l70: + add(rulecontains, position55) + } + l72: + { + position73, tokenIndex73 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l73 + } + position++ + goto l72 + l73: + position, tokenIndex = position73, tokenIndex73 + } + if !_rules[rulevalue]() { + goto l16 + } + break + } + } + + } + l28: + add(rulecondition, position17) + } + return true + l16: + position, tokenIndex = position16, tokenIndex16 + return false + }, + /* 2 tag <- <<(!((&('<') '<') | (&('>') '>') | (&('=') '=') | (&('"') '"') | (&(')') ')') | (&('(') '(') | (&('\\') '\\') | (&('\r') '\r') | (&('\n') '\n') | (&('\t') '\t') | (&(' ') ' ')) .)+>> */ + nil, + /* 3 value <- <<(!((&('<') '<') | (&('>') '>') | (&('=') '=') | (&('"') '"') | (&(')') ')') | (&('(') '(') | (&('\\') '\\') | (&('\r') '\r') | (&('\n') '\n') | (&('\t') '\t') | (&(' ') ' ')) .)+>> */ + func() bool { + position75, tokenIndex75 := position, tokenIndex + { + position76 := position + { + position77 := position + { + position80, tokenIndex80 := position, tokenIndex + { + switch buffer[position] { + case '<': + if buffer[position] != rune('<') { + goto l80 + } + position++ + break + case '>': + if buffer[position] != rune('>') { + goto l80 + } + position++ + break + case '=': + if buffer[position] != rune('=') { + goto l80 + } + position++ + break + case '"': + if buffer[position] != rune('"') { + goto l80 + } + position++ + break + case ')': + if buffer[position] != rune(')') { + goto l80 + } + position++ + break + case '(': + if buffer[position] != rune('(') { + goto l80 + } + position++ + break + case '\\': + if buffer[position] != rune('\\') { + goto l80 + } + position++ + break + case '\r': + if buffer[position] != rune('\r') { + goto l80 + } + position++ + break + case '\n': + if buffer[position] != rune('\n') { + goto l80 + } + position++ + break + case '\t': + if buffer[position] != rune('\t') { + goto l80 + } + position++ + break + default: + if buffer[position] != rune(' ') { + goto l80 + } + position++ + break + } + } + + goto l75 + l80: + position, tokenIndex = position80, tokenIndex80 + } + if !matchDot() { + goto l75 + } + l78: + { + position79, tokenIndex79 := position, tokenIndex + { + position82, tokenIndex82 := position, tokenIndex + { + switch buffer[position] { + case '<': + if buffer[position] != rune('<') { + goto l82 + } + position++ + break + case '>': + if buffer[position] != rune('>') { + goto l82 + } + position++ + break + case '=': + if buffer[position] != rune('=') { + goto l82 + } + position++ + break + case '"': + if buffer[position] != rune('"') { + goto l82 + } + position++ + break + case ')': + if buffer[position] != rune(')') { + goto l82 + } + position++ + break + case '(': + if buffer[position] != rune('(') { + goto l82 + } + position++ + break + case '\\': + if buffer[position] != rune('\\') { + goto l82 + } + position++ + break + case '\r': + if buffer[position] != rune('\r') { + goto l82 + } + position++ + break + case '\n': + if buffer[position] != rune('\n') { + goto l82 + } + position++ + break + case '\t': + if buffer[position] != rune('\t') { + goto l82 + } + position++ + break + default: + if buffer[position] != rune(' ') { + goto l82 + } + position++ + break + } + } + + goto l79 + l82: + position, tokenIndex = position82, tokenIndex82 + } + if !matchDot() { + goto l79 + } + goto l78 + l79: + position, tokenIndex = position79, tokenIndex79 + } + add(rulePegText, position77) + } + add(rulevalue, position76) + } + return true + l75: + position, tokenIndex = position75, tokenIndex75 + return false + }, + /* 4 number <- <<('0' / ([1-9] digit* ('.' digit*)?))>> */ + func() bool { + position84, tokenIndex84 := position, tokenIndex + { + position85 := position + { + position86 := position + { + position87, tokenIndex87 := position, tokenIndex + if buffer[position] != rune('0') { + goto l88 + } + position++ + goto l87 + l88: + position, tokenIndex = position87, tokenIndex87 + if c := buffer[position]; c < rune('1') || c > rune('9') { + goto l84 + } + position++ + l89: + { + position90, tokenIndex90 := position, tokenIndex + if !_rules[ruledigit]() { + goto l90 + } + goto l89 + l90: + position, tokenIndex = position90, tokenIndex90 + } + { + position91, tokenIndex91 := position, tokenIndex + if buffer[position] != rune('.') { + goto l91 + } + position++ + l93: + { + position94, tokenIndex94 := position, tokenIndex + if !_rules[ruledigit]() { + goto l94 + } + goto l93 + l94: + position, tokenIndex = position94, tokenIndex94 + } + goto l92 + l91: + position, tokenIndex = position91, tokenIndex91 + } + l92: + } + l87: + add(rulePegText, position86) + } + add(rulenumber, position85) + } + return true + l84: + position, tokenIndex = position84, tokenIndex84 + return false + }, + /* 5 digit <- <[0-9]> */ + func() bool { + position95, tokenIndex95 := position, tokenIndex + { + position96 := position + if c := buffer[position]; c < rune('0') || c > rune('9') { + goto l95 + } + position++ + add(ruledigit, position96) + } + return true + l95: + position, tokenIndex = position95, tokenIndex95 + return false + }, + /* 6 time <- <(('t' / 'T') ('i' / 'I') ('m' / 'M') ('e' / 'E') ' ' <(year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit ((('-' / '+') digit digit ':' digit digit) / 'Z'))>)> */ + func() bool { + position97, tokenIndex97 := position, tokenIndex + { + position98 := position + { + position99, tokenIndex99 := position, tokenIndex + if buffer[position] != rune('t') { + goto l100 + } + position++ + goto l99 + l100: + position, tokenIndex = position99, tokenIndex99 + if buffer[position] != rune('T') { + goto l97 + } + position++ + } + l99: + { + position101, tokenIndex101 := position, tokenIndex + if buffer[position] != rune('i') { + goto l102 + } + position++ + goto l101 + l102: + position, tokenIndex = position101, tokenIndex101 + if buffer[position] != rune('I') { + goto l97 + } + position++ + } + l101: + { + position103, tokenIndex103 := position, tokenIndex + if buffer[position] != rune('m') { + goto l104 + } + position++ + goto l103 + l104: + position, tokenIndex = position103, tokenIndex103 + if buffer[position] != rune('M') { + goto l97 + } + position++ + } + l103: + { + position105, tokenIndex105 := position, tokenIndex + if buffer[position] != rune('e') { + goto l106 + } + position++ + goto l105 + l106: + position, tokenIndex = position105, tokenIndex105 + if buffer[position] != rune('E') { + goto l97 + } + position++ + } + l105: + if buffer[position] != rune(' ') { + goto l97 + } + position++ + { + position107 := position + if !_rules[ruleyear]() { + goto l97 + } + if buffer[position] != rune('-') { + goto l97 + } + position++ + if !_rules[rulemonth]() { + goto l97 + } + if buffer[position] != rune('-') { + goto l97 + } + position++ + if !_rules[ruleday]() { + goto l97 + } + if buffer[position] != rune('T') { + goto l97 + } + position++ + if !_rules[ruledigit]() { + goto l97 + } + if !_rules[ruledigit]() { + goto l97 + } + if buffer[position] != rune(':') { + goto l97 + } + position++ + if !_rules[ruledigit]() { + goto l97 + } + if !_rules[ruledigit]() { + goto l97 + } + if buffer[position] != rune(':') { + goto l97 + } + position++ + if !_rules[ruledigit]() { + goto l97 + } + if !_rules[ruledigit]() { + goto l97 + } + { + position108, tokenIndex108 := position, tokenIndex + { + position110, tokenIndex110 := position, tokenIndex + if buffer[position] != rune('-') { + goto l111 + } + position++ + goto l110 + l111: + position, tokenIndex = position110, tokenIndex110 + if buffer[position] != rune('+') { + goto l109 + } + position++ + } + l110: + if !_rules[ruledigit]() { + goto l109 + } + if !_rules[ruledigit]() { + goto l109 + } + if buffer[position] != rune(':') { + goto l109 + } + position++ + if !_rules[ruledigit]() { + goto l109 + } + if !_rules[ruledigit]() { + goto l109 + } + goto l108 + l109: + position, tokenIndex = position108, tokenIndex108 + if buffer[position] != rune('Z') { + goto l97 + } + position++ + } + l108: + add(rulePegText, position107) + } + add(ruletime, position98) + } + return true + l97: + position, tokenIndex = position97, tokenIndex97 + return false + }, + /* 7 date <- <(('d' / 'D') ('a' / 'A') ('t' / 'T') ('e' / 'E') ' ' <(year '-' month '-' day)>)> */ + func() bool { + position112, tokenIndex112 := position, tokenIndex + { + position113 := position + { + position114, tokenIndex114 := position, tokenIndex + if buffer[position] != rune('d') { + goto l115 + } + position++ + goto l114 + l115: + position, tokenIndex = position114, tokenIndex114 + if buffer[position] != rune('D') { + goto l112 + } + position++ + } + l114: + { + position116, tokenIndex116 := position, tokenIndex + if buffer[position] != rune('a') { + goto l117 + } + position++ + goto l116 + l117: + position, tokenIndex = position116, tokenIndex116 + if buffer[position] != rune('A') { + goto l112 + } + position++ + } + l116: + { + position118, tokenIndex118 := position, tokenIndex + if buffer[position] != rune('t') { + goto l119 + } + position++ + goto l118 + l119: + position, tokenIndex = position118, tokenIndex118 + if buffer[position] != rune('T') { + goto l112 + } + position++ + } + l118: + { + position120, tokenIndex120 := position, tokenIndex + if buffer[position] != rune('e') { + goto l121 + } + position++ + goto l120 + l121: + position, tokenIndex = position120, tokenIndex120 + if buffer[position] != rune('E') { + goto l112 + } + position++ + } + l120: + if buffer[position] != rune(' ') { + goto l112 + } + position++ + { + position122 := position + if !_rules[ruleyear]() { + goto l112 + } + if buffer[position] != rune('-') { + goto l112 + } + position++ + if !_rules[rulemonth]() { + goto l112 + } + if buffer[position] != rune('-') { + goto l112 + } + position++ + if !_rules[ruleday]() { + goto l112 + } + add(rulePegText, position122) + } + add(ruledate, position113) + } + return true + l112: + position, tokenIndex = position112, tokenIndex112 + return false + }, + /* 8 year <- <(('1' / '2') digit digit digit)> */ + func() bool { + position123, tokenIndex123 := position, tokenIndex + { + position124 := position + { + position125, tokenIndex125 := position, tokenIndex + if buffer[position] != rune('1') { + goto l126 + } + position++ + goto l125 + l126: + position, tokenIndex = position125, tokenIndex125 + if buffer[position] != rune('2') { + goto l123 + } + position++ + } + l125: + if !_rules[ruledigit]() { + goto l123 + } + if !_rules[ruledigit]() { + goto l123 + } + if !_rules[ruledigit]() { + goto l123 + } + add(ruleyear, position124) + } + return true + l123: + position, tokenIndex = position123, tokenIndex123 + return false + }, + /* 9 month <- <(('0' / '1') digit)> */ + func() bool { + position127, tokenIndex127 := position, tokenIndex + { + position128 := position + { + position129, tokenIndex129 := position, tokenIndex + if buffer[position] != rune('0') { + goto l130 + } + position++ + goto l129 + l130: + position, tokenIndex = position129, tokenIndex129 + if buffer[position] != rune('1') { + goto l127 + } + position++ + } + l129: + if !_rules[ruledigit]() { + goto l127 + } + add(rulemonth, position128) + } + return true + l127: + position, tokenIndex = position127, tokenIndex127 + return false + }, + /* 10 day <- <(((&('3') '3') | (&('2') '2') | (&('1') '1') | (&('0') '0')) digit)> */ + func() bool { + position131, tokenIndex131 := position, tokenIndex + { + position132 := position + { + switch buffer[position] { + case '3': + if buffer[position] != rune('3') { + goto l131 + } + position++ + break + case '2': + if buffer[position] != rune('2') { + goto l131 + } + position++ + break + case '1': + if buffer[position] != rune('1') { + goto l131 + } + position++ + break + default: + if buffer[position] != rune('0') { + goto l131 + } + position++ + break + } + } + + if !_rules[ruledigit]() { + goto l131 + } + add(ruleday, position132) + } + return true + l131: + position, tokenIndex = position131, tokenIndex131 + return false + }, + /* 11 and <- <(('a' / 'A') ('n' / 'N') ('d' / 'D'))> */ + nil, + /* 12 equal <- <'='> */ + nil, + /* 13 contains <- <(('c' / 'C') ('o' / 'O') ('n' / 'N') ('t' / 'T') ('a' / 'A') ('i' / 'I') ('n' / 'N') ('s' / 'S'))> */ + nil, + /* 14 le <- <('<' '=')> */ + nil, + /* 15 ge <- <('>' '=')> */ + nil, + /* 16 l <- <'<'> */ + nil, + /* 17 g <- <'>'> */ + nil, + nil, + } + p.rules = _rules +} diff --git a/pubsub/query/query_test.go b/pubsub/query/query_test.go new file mode 100644 index 000000000..75d02ee49 --- /dev/null +++ b/pubsub/query/query_test.go @@ -0,0 +1,64 @@ +package query_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tendermint/tmlibs/pubsub/query" +) + +func TestMatches(t *testing.T) { + const shortForm = "2006-Jan-02" + txDate, err := time.Parse(shortForm, "2017-Jan-01") + require.NoError(t, err) + txTime, err := time.Parse(time.RFC3339, "2018-05-03T14:45:00Z") + require.NoError(t, err) + + testCases := []struct { + s string + tags map[string]interface{} + err bool + matches bool + }{ + {"tm.events.type=NewBlock", map[string]interface{}{"tm.events.type": "NewBlock"}, false, true}, + + {"tx.gas > 7", map[string]interface{}{"tx.gas": 8}, false, true}, + {"tx.gas > 7 AND tx.gas < 9", map[string]interface{}{"tx.gas": 8}, false, true}, + {"body.weight >= 3.5", map[string]interface{}{"body.weight": 3.5}, false, true}, + {"account.balance < 1000.0", map[string]interface{}{"account.balance": 900}, false, true}, + {"apples.kg <= 4", map[string]interface{}{"apples.kg": 4.0}, false, true}, + {"body.weight >= 4.5", map[string]interface{}{"body.weight": float32(4.5)}, false, true}, + {"oranges.kg < 4 AND watermellons.kg > 10", map[string]interface{}{"oranges.kg": 3, "watermellons.kg": 12}, false, true}, + {"peaches.kg < 4", map[string]interface{}{"peaches.kg": 5}, false, false}, + + {"tx.date > DATE 2017-01-01", map[string]interface{}{"tx.date": time.Now()}, false, true}, + {"tx.date = DATE 2017-01-01", map[string]interface{}{"tx.date": txDate}, false, true}, + {"tx.date = DATE 2018-01-01", map[string]interface{}{"tx.date": txDate}, false, false}, + + {"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": time.Now()}, false, true}, + {"tx.time = TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": txTime}, false, false}, + + {"abci.owner.name CONTAINS Igor", map[string]interface{}{"abci.owner.name": "Igor,Ivan"}, false, true}, + {"abci.owner.name CONTAINS Igor", map[string]interface{}{"abci.owner.name": "Pavel,Ivan"}, false, false}, + } + + for _, tc := range testCases { + query, err := query.New(tc.s) + if !tc.err { + require.Nil(t, err) + } + + if tc.matches { + assert.True(t, query.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags) + } else { + assert.False(t, query.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags) + } + } +} + +func TestMustParse(t *testing.T) { + assert.Panics(t, func() { query.MustParse("=") }) + assert.NotPanics(t, func() { query.MustParse("tm.events.type=NewBlock") }) +}