From a99b8a6210071060600a43b04daed800d8f4d125 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 20 Jun 2017 17:25:42 +0400 Subject: [PATCH 01/10] new events package query parser use parser compiler to generate query parser I used https://github.com/pointlander/peg which has a nice API and seems to be the most popular Golang compiler parser using PEG on Github. More about PEG: - https://en.wikipedia.org/wiki/Parsing_expression_grammar - https://github.com/PhilippeSigaud/Pegged/wiki/PEG-Basics - https://github.com/PhilippeSigaud/Pegged/wiki/Grammar-Examples rename implement query match function match function uncomment test lines add more test cases for query#Matches fix int case rename events to pubsub add comment about cache assertReceive helper to not block on receive in tests fix bug with multiple conditions uncomment benchmark first results: ``` Benchmark10Clients-2 1000 1305493 ns/op 3957519 B/op 355 allocs/op Benchmark100Clients-2 100 12278304 ns/op 39571751 B/op 3505 allocs/op Benchmark1000Clients-2 10 124120909 ns/op 395714004 B/op 35005 allocs/op ``` 124ms to publish message to 1000 clients. A lot. use AST from query.peg.go separate pubsub and query packages by using Query interface in pubsub wrote docs and refactor code updates from Frey's review refactor type assertion to use type switch cleanup during shutdown subscriber should create output channel, not the server overflow strategies, server buffer capacity context as the first argument for Publish log error introduce Option type update NewServer comment move helpers into pubsub_test increase assertReceive timeout add query.MustParse add more false tests for parser add more false tests for query.Matches parse numbers as int64 / float64 try our best to convert from other types add number to panic output add more comments save commit introduce client argument as first argument to Subscribe > Why we do not specify buffer size on the output channel in Subscribe? The choice of buffer size of N here depends on knowing the number of messages server will receive and the number of messages downstream subscribers will consume. This is fragile: if we publish an additional message, or if one of the downstream subscribers reads any fewer messages, we will again have blocked goroutines. save commit remove reference counting fix test test client resubscribe test UnsubscribeAll client options [pubsub/query] fuzzy testing do not print msg as it creates data race! --- .gitignore | 2 + pubsub/example_test.go | 24 + pubsub/pubsub.go | 314 ++++++ pubsub/pubsub_test.go | 227 +++++ pubsub/query/Makefile | 11 + pubsub/query/empty.go | 14 + pubsub/query/empty_test.go | 16 + pubsub/query/fuzz_test/main.go | 30 + pubsub/query/parser_test.go | 81 ++ pubsub/query/query.go | 258 +++++ pubsub/query/query.peg | 33 + pubsub/query/query.peg.go | 1668 ++++++++++++++++++++++++++++++++ pubsub/query/query_test.go | 64 ++ 13 files changed, 2742 insertions(+) create mode 100644 pubsub/example_test.go create mode 100644 pubsub/pubsub.go create mode 100644 pubsub/pubsub_test.go create mode 100644 pubsub/query/Makefile create mode 100644 pubsub/query/empty.go create mode 100644 pubsub/query/empty_test.go create mode 100644 pubsub/query/fuzz_test/main.go create mode 100644 pubsub/query/parser_test.go create mode 100644 pubsub/query/query.go create mode 100644 pubsub/query/query.peg create mode 100644 pubsub/query/query.peg.go create mode 100644 pubsub/query/query_test.go diff --git a/.gitignore b/.gitignore index 62f28681c..34d0bf1f9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ vendor .glide + +pubsub/query/fuzz_test/output diff --git a/pubsub/example_test.go b/pubsub/example_test.go new file mode 100644 index 000000000..d64b96eab --- /dev/null +++ b/pubsub/example_test.go @@ -0,0 +1,24 @@ +package pubsub_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tmlibs/log" + "github.com/tendermint/tmlibs/pubsub" + "github.com/tendermint/tmlibs/pubsub/query" +) + +func TestExample(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ch := make(chan interface{}, 1) + s.Subscribe("example-client", query.MustParse("abci.account.name=John"), ch) + err := s.PublishWithTags("Tombstone", map[string]interface{}{"abci.account.name": "John"}) + require.NoError(t, err) + assertReceive(t, "Tombstone", ch) +} diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go new file mode 100644 index 000000000..264848161 --- /dev/null +++ b/pubsub/pubsub.go @@ -0,0 +1,314 @@ +// Package pubsub implements a pub-sub model with a single publisher (Server) +// and multiple subscribers (clients). +// +// Though you can have multiple publishers by sharing a pointer to a server or +// by giving the same channel to each publisher and publishing messages from +// that channel (fan-in). +// +// Clients subscribe for messages, which could be of any type, using a query. +// When some message is published, we match it with all queries. If there is a +// match, this message will be pushed to all clients, subscribed to that query. +// See query subpackage for our implementation. +// +// Overflow strategies (incoming publish requests): +// +// 1) drop - drops publish requests when there are too many of them +// 2) wait - blocks until the server is ready to accept more publish requests (default) +// +// Subscribe/Unsubscribe calls are always blocking. +// +// Overflow strategies (outgoing messages): +// +// 1) skip - do not send a message if the client is busy or slow (default) +// 2) wait - wait until the client is ready to accept new messages +// +package pubsub + +import ( + "errors" + + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" +) + +type operation int + +const ( + sub operation = iota + pub + unsub + shutdown +) + +type overflowStrategy int + +const ( + drop overflowStrategy = iota + wait +) + +var ( + ErrorOverflow = errors.New("Server overflowed") +) + +type cmd struct { + op operation + query Query + ch chan<- interface{} + clientID string + msg interface{} + tags map[string]interface{} +} + +// Query defines an interface for a query to be used for subscribing. +type Query interface { + Matches(tags map[string]interface{}) bool +} + +// Server allows clients to subscribe/unsubscribe for messages, pubsling +// messages with or without tags, and manages internal state. +type Server struct { + cmn.BaseService + + cmds chan cmd + + overflowStrategy overflowStrategy + slowClientStrategy overflowStrategy +} + +// Option sets a parameter for the server. +type Option func(*Server) + +// NewServer returns a new server. See the commentary on the Option functions +// for a detailed description of how to configure buffering and overflow +// behavior. If no options are provided, the resulting server's queue is +// unbuffered and it blocks when overflowed. +func NewServer(options ...Option) *Server { + s := &Server{overflowStrategy: wait, slowClientStrategy: drop} + s.BaseService = *cmn.NewBaseService(nil, "PubSub", s) + + for _, option := range options { + option(s) + } + + if s.cmds == nil { // if BufferCapacity was not set, create unbuffered channel + s.cmds = make(chan cmd) + } + + return s +} + +// BufferCapacity allows you to specify capacity for the internal server's +// queue. Since the server, given Y subscribers, could only process X messages, +// this option could be used to survive spikes (e.g. high amount of +// transactions during peak hours). +func BufferCapacity(cap int) Option { + return func(s *Server) { + if cap > 0 { + s.cmds = make(chan cmd, cap) + } + } +} + +// OverflowStrategyDrop will tell the server to drop messages when it can't +// process more messages. +func OverflowStrategyDrop() Option { + return func(s *Server) { + s.overflowStrategy = drop + } +} + +// OverflowStrategyWait will tell the server to block and wait for some time +// for server to process other messages. Default strategy. +func OverflowStrategyWait() func(*Server) { + return func(s *Server) { + s.overflowStrategy = wait + } +} + +// WaitSlowClients will tell the server to block and wait until subscriber +// reads a messages even if it is fast enough to process them. +func WaitSlowClients() func(*Server) { + return func(s *Server) { + s.slowClientStrategy = wait + } +} + +// SkipSlowClients will tell the server to skip subscriber if it is busy +// processing previous message(s). Default strategy. +func SkipSlowClients() func(*Server) { + return func(s *Server) { + s.slowClientStrategy = drop + } +} + +// Subscribe returns a channel on which messages matching the given query can +// be received. If the subscription already exists old channel will be closed +// and new one returned. +func (s *Server) Subscribe(clientID string, query Query, out chan<- interface{}) { + s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out} +} + +// Unsubscribe unsubscribes the given client from the query. +func (s *Server) Unsubscribe(clientID string, query Query) { + s.cmds <- cmd{op: unsub, clientID: clientID, query: query} +} + +// Unsubscribe unsubscribes the given channel. +func (s *Server) UnsubscribeAll(clientID string) { + s.cmds <- cmd{op: unsub, clientID: clientID} +} + +// Publish publishes the given message. +func (s *Server) Publish(msg interface{}) error { + return s.PublishWithTags(msg, make(map[string]interface{})) +} + +// PublishWithTags publishes the given message with a set of tags. This set of +// tags will be matched with client queries. If there is a match, the message +// will be sent to a client. +func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) error { + pubCmd := cmd{op: pub, msg: msg, tags: tags} + switch s.overflowStrategy { + case drop: + select { + case s.cmds <- pubCmd: + default: + s.Logger.Error("Server overflowed, dropping message...", "msg", msg) + return ErrorOverflow + } + case wait: + s.cmds <- pubCmd + } + return nil +} + +// OnStop implements Service.OnStop by shutting down the server. +func (s *Server) OnStop() { + s.cmds <- cmd{op: shutdown} +} + +// NOTE: not goroutine safe +type state struct { + // query -> client -> ch + queries map[Query]map[string]chan<- interface{} + // client -> query -> struct{} + clients map[string]map[Query]struct{} +} + +// OnStart implements Service.OnStart by creating a main loop. +func (s *Server) OnStart() error { + go s.loop(state{ + queries: make(map[Query]map[string]chan<- interface{}), + clients: make(map[string]map[Query]struct{}), + }) + return nil +} + +func (s *Server) loop(state state) { +loop: + for cmd := range s.cmds { + switch cmd.op { + case unsub: + if cmd.query != nil { + state.remove(cmd.clientID, cmd.query) + } else { + state.removeAll(cmd.clientID) + } + case shutdown: + state.reset() + break loop + case sub: + state.add(cmd.clientID, cmd.query, cmd.ch) + case pub: + state.send(cmd.msg, cmd.tags, s.slowClientStrategy, s.Logger) + } + } +} + +func (state *state) add(clientID string, q Query, ch chan<- interface{}) { + // add query if needed + if clientToChannelMap, ok := state.queries[q]; !ok { + state.queries[q] = make(map[string]chan<- interface{}) + } else { + // check if already subscribed + if oldCh, ok := clientToChannelMap[clientID]; ok { + close(oldCh) + } + } + state.queries[q][clientID] = ch + + // add client if needed + if _, ok := state.clients[clientID]; !ok { + state.clients[clientID] = make(map[Query]struct{}) + } + state.clients[clientID][q] = struct{}{} + + // create subscription + clientToChannelMap := state.queries[q] + clientToChannelMap[clientID] = ch +} + +func (state *state) remove(clientID string, q Query) { + clientToChannelMap, ok := state.queries[q] + if !ok { + return + } + + ch, ok := clientToChannelMap[clientID] + if ok { + close(ch) + + delete(state.clients[clientID], q) + + // if it not subscribed to anything else, remove the client + if len(state.clients[clientID]) == 0 { + delete(state.clients, clientID) + } + + delete(state.queries[q], clientID) + } +} + +func (state *state) removeAll(clientID string) { + queryMap, ok := state.clients[clientID] + if !ok { + return + } + + for q, _ := range queryMap { + ch := state.queries[q][clientID] + close(ch) + + delete(state.queries[q], clientID) + } + + delete(state.clients, clientID) +} + +func (state *state) reset() { + state.queries = make(map[Query]map[string]chan<- interface{}) + state.clients = make(map[string]map[Query]struct{}) +} + +func (state *state) send(msg interface{}, tags map[string]interface{}, slowClientStrategy overflowStrategy, logger log.Logger) { + for q, clientToChannelMap := range state.queries { + // NOTE we can use LRU cache to speed up common cases like query = " + // tm.events.type=NewBlock" and tags = {"tm.events.type": "NewBlock"} + if q.Matches(tags) { + for clientID, ch := range clientToChannelMap { + logger.Info("Sending message to client", "msg", msg, "client", clientID) + switch slowClientStrategy { + case drop: + select { + case ch <- msg: + default: + logger.Error("Client is busy, skipping...", "clientID", clientID) + } + case wait: + ch <- msg + } + } + } + } +} diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go new file mode 100644 index 000000000..570f76a82 --- /dev/null +++ b/pubsub/pubsub_test.go @@ -0,0 +1,227 @@ +package pubsub_test + +import ( + "fmt" + "runtime/debug" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tmlibs/log" + "github.com/tendermint/tmlibs/pubsub" + "github.com/tendermint/tmlibs/pubsub/query" +) + +const ( + clientID = "test-client" +) + +func TestSubscribe(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ch := make(chan interface{}, 1) + s.Subscribe(clientID, query.Empty{}, ch) + err := s.Publish("Ka-Zar") + require.NoError(t, err) + assertReceive(t, "Ka-Zar", ch) + + err = s.Publish("Quicksilver") + require.NoError(t, err) + assertReceive(t, "Quicksilver", ch) +} + +func TestDifferentClients(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + ch1 := make(chan interface{}, 1) + s.Subscribe("client-1", query.MustParse("tm.events.type=NewBlock"), ch1) + err := s.PublishWithTags("Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) + require.NoError(t, err) + assertReceive(t, "Iceman", ch1) + + ch2 := make(chan interface{}, 1) + s.Subscribe("client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2) + err = s.PublishWithTags("Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) + require.NoError(t, err) + assertReceive(t, "Ultimo", ch1) + assertReceive(t, "Ultimo", ch2) + + ch3 := make(chan interface{}, 1) + s.Subscribe("client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3) + err = s.PublishWithTags("Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) + require.NoError(t, err) + assert.Zero(t, len(ch3)) +} + +func TestClientResubscribes(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + q := query.MustParse("tm.events.type=NewBlock") + + ch1 := make(chan interface{}, 1) + s.Subscribe(clientID, q, ch1) + err := s.PublishWithTags("Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) + require.NoError(t, err) + assertReceive(t, "Goblin Queen", ch1) + + ch2 := make(chan interface{}, 1) + s.Subscribe(clientID, q, ch2) + + _, ok := <-ch1 + assert.False(t, ok) + + err = s.PublishWithTags("Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) + require.NoError(t, err) + assertReceive(t, "Spider-Man", ch2) +} + +func TestUnsubscribe(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ch := make(chan interface{}) + s.Subscribe(clientID, query.Empty{}, ch) + s.Unsubscribe(clientID, query.Empty{}) + + err := s.Publish("Nick Fury") + require.NoError(t, err) + assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") + + _, ok := <-ch + assert.False(t, ok) +} + +func TestUnsubscribeAll(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) + s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch1) + s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2) + + s.UnsubscribeAll(clientID) + + err := s.Publish("Nick Fury") + require.NoError(t, err) + assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") + assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") + + _, ok := <-ch1 + assert.False(t, ok) + _, ok = <-ch2 + assert.False(t, ok) +} + +func TestOverflowStrategyDrop(t *testing.T) { + s := pubsub.NewServer(pubsub.OverflowStrategyDrop()) + s.SetLogger(log.TestingLogger()) + + err := s.Publish("Veda") + if assert.Error(t, err) { + assert.Equal(t, pubsub.ErrorOverflow, err) + } +} + +func TestOverflowStrategyWait(t *testing.T) { + s := pubsub.NewServer(pubsub.OverflowStrategyWait()) + s.SetLogger(log.TestingLogger()) + + go func() { + time.Sleep(1 * time.Second) + s.Start() + defer s.Stop() + }() + + err := s.Publish("Veda") + assert.NoError(t, err) +} + +func TestBufferCapacity(t *testing.T) { + s := pubsub.NewServer(pubsub.BufferCapacity(2)) + s.SetLogger(log.TestingLogger()) + + err := s.Publish("Nighthawk") + require.NoError(t, err) + err = s.Publish("Sage") + require.NoError(t, err) +} + +func TestWaitSlowClients(t *testing.T) { + s := pubsub.NewServer(pubsub.WaitSlowClients()) + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ch := make(chan interface{}) + s.Subscribe(clientID, query.Empty{}, ch) + err := s.Publish("Wonderwoman") + require.NoError(t, err) + + time.Sleep(1 * time.Second) + + assertReceive(t, "Wonderwoman", ch) +} + +func TestSkipSlowClients(t *testing.T) { + s := pubsub.NewServer(pubsub.SkipSlowClients()) + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ch := make(chan interface{}) + s.Subscribe(clientID, query.Empty{}, ch) + err := s.Publish("Cyclops") + require.NoError(t, err) + assert.Zero(t, len(ch)) +} + +func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } +func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) } +func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) } + +func benchmarkNClients(n int, b *testing.B) { + s := pubsub.NewServer(pubsub.BufferCapacity(b.N)) + s.Start() + defer s.Stop() + + for i := 0; i < n; i++ { + ch := make(chan interface{}) + s.Subscribe(clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = Ivan AND abci.Invoices.Number = %d", i)), ch) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.PublishWithTags("Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i}) + } +} + +/////////////////////////////////////////////////////////////////////////////// +/// HELPERS +/////////////////////////////////////////////////////////////////////////////// + +func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) { + select { + case actual := <-ch: + if actual != nil { + assert.Equal(t, expected, actual, msgAndArgs...) + } + case <-time.After(1 * time.Second): + t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected) + debug.PrintStack() + } +} diff --git a/pubsub/query/Makefile b/pubsub/query/Makefile new file mode 100644 index 000000000..ca3ff5b56 --- /dev/null +++ b/pubsub/query/Makefile @@ -0,0 +1,11 @@ +gen_query_parser: + @go get github.com/pointlander/peg + peg -inline -switch query.peg + +fuzzy_test: + @go get github.com/dvyukov/go-fuzz/go-fuzz + @go get github.com/dvyukov/go-fuzz/go-fuzz-build + go-fuzz-build github.com/tendermint/tmlibs/pubsub/query/fuzz_test + go-fuzz -bin=./fuzz_test-fuzz.zip -workdir=./fuzz_test/output + +.PHONY: gen_query_parser fuzzy_test diff --git a/pubsub/query/empty.go b/pubsub/query/empty.go new file mode 100644 index 000000000..2d60a8923 --- /dev/null +++ b/pubsub/query/empty.go @@ -0,0 +1,14 @@ +package query + +// Empty query matches any set of tags. +type Empty struct { +} + +// Matches always returns true. +func (Empty) Matches(tags map[string]interface{}) bool { + return true +} + +func (Empty) String() string { + return "empty" +} diff --git a/pubsub/query/empty_test.go b/pubsub/query/empty_test.go new file mode 100644 index 000000000..663acb191 --- /dev/null +++ b/pubsub/query/empty_test.go @@ -0,0 +1,16 @@ +package query_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tendermint/tmlibs/pubsub/query" +) + +func TestEmptyQueryMatchesAnything(t *testing.T) { + q := query.Empty{} + assert.True(t, q.Matches(map[string]interface{}{})) + assert.True(t, q.Matches(map[string]interface{}{"Asher": "Roth"})) + assert.True(t, q.Matches(map[string]interface{}{"Route": 66})) + assert.True(t, q.Matches(map[string]interface{}{"Route": 66, "Billy": "Blue"})) +} diff --git a/pubsub/query/fuzz_test/main.go b/pubsub/query/fuzz_test/main.go new file mode 100644 index 000000000..3b0ef1473 --- /dev/null +++ b/pubsub/query/fuzz_test/main.go @@ -0,0 +1,30 @@ +package fuzz_test + +import ( + "fmt" + + "github.com/tendermint/tmlibs/pubsub/query" +) + +func Fuzz(data []byte) int { + sdata := string(data) + q0, err := query.New(sdata) + if err != nil { + return 0 + } + + sdata1 := q0.String() + q1, err := query.New(sdata1) + if err != nil { + panic(err) + } + + sdata2 := q1.String() + if sdata1 != sdata2 { + fmt.Printf("q0: %q\n", sdata1) + fmt.Printf("q1: %q\n", sdata2) + panic("query changed") + } + + return 1 +} diff --git a/pubsub/query/parser_test.go b/pubsub/query/parser_test.go new file mode 100644 index 000000000..194966664 --- /dev/null +++ b/pubsub/query/parser_test.go @@ -0,0 +1,81 @@ +package query_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tendermint/tmlibs/pubsub/query" +) + +// TODO: fuzzy testing? +func TestParser(t *testing.T) { + cases := []struct { + query string + valid bool + }{ + {"tm.events.type=NewBlock", true}, + {"tm.events.type = NewBlock", true}, + {"tm.events.type=TIME", true}, + {"tm.events.type=DATE", true}, + {"tm.events.type==", false}, + {">==", false}, + {"tm.events.type NewBlock =", false}, + {"tm.events.type>NewBlock", false}, + {"", false}, + {"=", false}, + {"=NewBlock", false}, + {"tm.events.type=", false}, + + {"tm.events.typeNewBlock", false}, + {"NewBlock", false}, + {"", false}, + + {"tm.events.type=NewBlock AND abci.account.name=Igor", true}, + {"tm.events.type=NewBlock AND", false}, + {"tm.events.type=NewBlock AN", false}, + {"tm.events.type=NewBlock AN tm.events.type=NewBlockHeader", false}, + {"AND tm.events.type=NewBlock ", false}, + + {"abci.account.name CONTAINS Igor", true}, + + {"tx.date > DATE 2013-05-03", true}, + {"tx.date < DATE 2013-05-03", true}, + {"tx.date <= DATE 2013-05-03", true}, + {"tx.date >= DATE 2013-05-03", true}, + {"tx.date >= DAT 2013-05-03", false}, + {"tx.date <= DATE2013-05-03", false}, + {"tx.date <= DATE -05-03", false}, + {"tx.date >= DATE 20130503", false}, + {"tx.date >= DATE 2013+01-03", false}, + // incorrect year, month, day + {"tx.date >= DATE 0013-01-03", false}, + {"tx.date >= DATE 2013-31-03", false}, + {"tx.date >= DATE 2013-01-83", false}, + + {"tx.date > TIME 2013-05-03T14:45:00+07:00", true}, + {"tx.date < TIME 2013-05-03T14:45:00-02:00", true}, + {"tx.date <= TIME 2013-05-03T14:45:00Z", true}, + {"tx.date >= TIME 2013-05-03T14:45:00Z", true}, + {"tx.date >= TIME2013-05-03T14:45:00Z", false}, + {"tx.date = IME 2013-05-03T14:45:00Z", false}, + {"tx.date = TIME 2013-05-:45:00Z", false}, + {"tx.date >= TIME 2013-05-03T14:45:00", false}, + {"tx.date >= TIME 0013-00-00T14:45:00Z", false}, + {"tx.date >= TIME 2013+05=03T14:45:00Z", false}, + + {"account.balance=100", true}, + {"account.balance >= 200", true}, + {"account.balance >= -300", false}, + {"account.balance >>= 400", false}, + {"account.balance=33.22.1", false}, + } + + for _, c := range cases { + _, err := query.New(c.query) + if c.valid { + assert.NoError(t, err, "Query was '%s'", c.query) + } else { + assert.Error(t, err, "Query was '%s'", c.query) + } + } +} diff --git a/pubsub/query/query.go b/pubsub/query/query.go new file mode 100644 index 000000000..f084a3f98 --- /dev/null +++ b/pubsub/query/query.go @@ -0,0 +1,258 @@ +// Package query provides a parser for a custom query format: +// +// abci.invoice.number=22 AND abci.invoice.owner=Ivan +// +// See query.peg for the grammar, which is a https://en.wikipedia.org/wiki/Parsing_expression_grammar. +// More: https://github.com/PhilippeSigaud/Pegged/wiki/PEG-Basics +// +// It has a support for numbers (integer and floating point), dates and times. +package query + +import ( + "fmt" + "reflect" + "strconv" + "strings" + "time" +) + +// Query holds the query string and the query parser. +type Query struct { + str string + parser *QueryParser +} + +// New parses the given string and returns a query or error if the string is +// invalid. +func New(s string) (*Query, error) { + p := &QueryParser{Buffer: fmt.Sprintf(`"%s"`, s)} + p.Init() + if err := p.Parse(); err != nil { + return nil, err + } + return &Query{str: s, parser: p}, nil +} + +// MustParse turns the given string into a query or panics; for tests or others +// cases where you know the string is valid. +func MustParse(s string) *Query { + q, err := New(s) + if err != nil { + panic(fmt.Sprintf("failed to parse %s: %v", s, err)) + } + return q +} + +// String returns the original string. +func (q *Query) String() string { + return q.str +} + +type operator uint8 + +const ( + opLessEqual operator = iota + opGreaterEqual + opLess + opGreater + opEqual + opContains +) + +// Matches returns true if the query matches the given set of tags, false otherwise. +// +// For example, query "name=John" matches tags = {"name": "John"}. More +// examples could be found in parser_test.go and query_test.go. +func (q *Query) Matches(tags map[string]interface{}) bool { + if len(tags) == 0 { + return false + } + + buffer, begin, end := q.parser.Buffer, 0, 0 + + var tag string + var op operator + + // tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7") + for _, token := range q.parser.Tokens() { + switch token.pegRule { + + case rulePegText: + begin, end = int(token.begin), int(token.end) + case ruletag: + tag = buffer[begin:end] + case rulele: + op = opLessEqual + case rulege: + op = opGreaterEqual + case rulel: + op = opLess + case ruleg: + op = opGreater + case ruleequal: + op = opEqual + case rulecontains: + op = opContains + case rulevalue: + // see if the triplet (tag, operator, operand) matches any tag + // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } + if !match(tag, op, reflect.ValueOf(buffer[begin:end]), tags) { + return false + } + case rulenumber: + number := buffer[begin:end] + if strings.Contains(number, ".") { // if it looks like a floating-point number + value, err := strconv.ParseFloat(number, 64) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + } else { + value, err := strconv.ParseInt(number, 10, 64) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number)) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + } + case ruletime: + value, err := time.Parse(time.RFC3339, buffer[begin:end]) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + case ruledate: + value, err := time.Parse("2006-01-02", buffer[begin:end]) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end])) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + } + } + + return true +} + +// match returns true if the given triplet (tag, operator, operand) matches any tag. +// +// First, it looks up the tag in tags and if it finds one, tries to compare the +// value from it to the operand using the operator. +// +// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } +func match(tag string, op operator, operand reflect.Value, tags map[string]interface{}) bool { + // look up the tag from the query in tags + value, ok := tags[tag] + if !ok { + return false + } + switch operand.Kind() { + case reflect.Struct: // time + operandAsTime := operand.Interface().(time.Time) + v, ok := value.(time.Time) + if !ok { // if value from tags is not time.Time + return false + } + switch op { + case opLessEqual: + return v.Before(operandAsTime) || v.Equal(operandAsTime) + case opGreaterEqual: + return v.Equal(operandAsTime) || v.After(operandAsTime) + case opLess: + return v.Before(operandAsTime) + case opGreater: + return v.After(operandAsTime) + case opEqual: + return v.Equal(operandAsTime) + } + case reflect.Float64: + operandFloat64 := operand.Interface().(float64) + var v float64 + // try our best to convert value from tags to float64 + switch vt := value.(type) { + case float64: + v = vt + case float32: + v = float64(vt) + case int: + v = float64(vt) + case int8: + v = float64(vt) + case int16: + v = float64(vt) + case int32: + v = float64(vt) + case int64: + v = float64(vt) + default: // fail for all other types + panic(fmt.Sprintf("Incomparable types: %T (%v) vs float64 (%v)", value, value, operandFloat64)) + } + switch op { + case opLessEqual: + return v <= operandFloat64 + case opGreaterEqual: + return v >= operandFloat64 + case opLess: + return v < operandFloat64 + case opGreater: + return v > operandFloat64 + case opEqual: + return v == operandFloat64 + } + case reflect.Int64: + operandInt := operand.Interface().(int64) + var v int64 + // try our best to convert value from tags to int64 + switch vt := value.(type) { + case int64: + v = vt + case int8: + v = int64(vt) + case int16: + v = int64(vt) + case int32: + v = int64(vt) + case int: + v = int64(vt) + case float64: + v = int64(vt) + case float32: + v = int64(vt) + default: // fail for all other types + panic(fmt.Sprintf("Incomparable types: %T (%v) vs int64 (%v)", value, value, operandInt)) + } + switch op { + case opLessEqual: + return v <= operandInt + case opGreaterEqual: + return v >= operandInt + case opLess: + return v < operandInt + case opGreater: + return v > operandInt + case opEqual: + return v == operandInt + } + case reflect.String: + v, ok := value.(string) + if !ok { // if value from tags is not string + return false + } + switch op { + case opEqual: + return v == operand.String() + case opContains: + return strings.Contains(v, operand.String()) + } + default: + panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind())) + } + + return false +} diff --git a/pubsub/query/query.peg b/pubsub/query/query.peg new file mode 100644 index 000000000..9654289c4 --- /dev/null +++ b/pubsub/query/query.peg @@ -0,0 +1,33 @@ +package query + +type QueryParser Peg { +} + +e <- '\"' condition ( ' '+ and ' '+ condition )* '\"' !. + +condition <- tag ' '* (le ' '* (number / time / date) + / ge ' '* (number / time / date) + / l ' '* (number / time / date) + / g ' '* (number / time / date) + / equal ' '* (number / time / date / value) + / contains ' '* value + ) + +tag <- < (![ \t\n\r\\()"=><] .)+ > +value <- < (![ \t\n\r\\()"=><] .)+ > +number <- < ('0' + / [1-9] digit* ('.' digit*)?) > +digit <- [0-9] +time <- "TIME " < year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit (('-' / '+') digit digit ':' digit digit / 'Z') > +date <- "DATE " < year '-' month '-' day > +year <- ('1' / '2') digit digit digit +month <- ('0' / '1') digit +day <- ('0' / '1' / '2' / '3') digit +and <- "AND" + +equal <- "=" +contains <- "CONTAINS" +le <- "<=" +ge <- ">=" +l <- "<" +g <- ">" diff --git a/pubsub/query/query.peg.go b/pubsub/query/query.peg.go new file mode 100644 index 000000000..5cd0a9e32 --- /dev/null +++ b/pubsub/query/query.peg.go @@ -0,0 +1,1668 @@ +package query + +import ( + "fmt" + "math" + "sort" + "strconv" +) + +const endSymbol rune = 1114112 + +/* The rule types inferred from the grammar are below. */ +type pegRule uint8 + +const ( + ruleUnknown pegRule = iota + rulee + rulecondition + ruletag + rulevalue + rulenumber + ruledigit + ruletime + ruledate + ruleyear + rulemonth + ruleday + ruleand + ruleequal + rulecontains + rulele + rulege + rulel + ruleg + rulePegText +) + +var rul3s = [...]string{ + "Unknown", + "e", + "condition", + "tag", + "value", + "number", + "digit", + "time", + "date", + "year", + "month", + "day", + "and", + "equal", + "contains", + "le", + "ge", + "l", + "g", + "PegText", +} + +type token32 struct { + pegRule + begin, end uint32 +} + +func (t *token32) String() string { + return fmt.Sprintf("\x1B[34m%v\x1B[m %v %v", rul3s[t.pegRule], t.begin, t.end) +} + +type node32 struct { + token32 + up, next *node32 +} + +func (node *node32) print(pretty bool, buffer string) { + var print func(node *node32, depth int) + print = func(node *node32, depth int) { + for node != nil { + for c := 0; c < depth; c++ { + fmt.Printf(" ") + } + rule := rul3s[node.pegRule] + quote := strconv.Quote(string(([]rune(buffer)[node.begin:node.end]))) + if !pretty { + fmt.Printf("%v %v\n", rule, quote) + } else { + fmt.Printf("\x1B[34m%v\x1B[m %v\n", rule, quote) + } + if node.up != nil { + print(node.up, depth+1) + } + node = node.next + } + } + print(node, 0) +} + +func (node *node32) Print(buffer string) { + node.print(false, buffer) +} + +func (node *node32) PrettyPrint(buffer string) { + node.print(true, buffer) +} + +type tokens32 struct { + tree []token32 +} + +func (t *tokens32) Trim(length uint32) { + t.tree = t.tree[:length] +} + +func (t *tokens32) Print() { + for _, token := range t.tree { + fmt.Println(token.String()) + } +} + +func (t *tokens32) AST() *node32 { + type element struct { + node *node32 + down *element + } + tokens := t.Tokens() + var stack *element + for _, token := range tokens { + if token.begin == token.end { + continue + } + node := &node32{token32: token} + for stack != nil && stack.node.begin >= token.begin && stack.node.end <= token.end { + stack.node.next = node.up + node.up = stack.node + stack = stack.down + } + stack = &element{node: node, down: stack} + } + if stack != nil { + return stack.node + } + return nil +} + +func (t *tokens32) PrintSyntaxTree(buffer string) { + t.AST().Print(buffer) +} + +func (t *tokens32) PrettyPrintSyntaxTree(buffer string) { + t.AST().PrettyPrint(buffer) +} + +func (t *tokens32) Add(rule pegRule, begin, end, index uint32) { + if tree := t.tree; int(index) >= len(tree) { + expanded := make([]token32, 2*len(tree)) + copy(expanded, tree) + t.tree = expanded + } + t.tree[index] = token32{ + pegRule: rule, + begin: begin, + end: end, + } +} + +func (t *tokens32) Tokens() []token32 { + return t.tree +} + +type QueryParser struct { + Buffer string + buffer []rune + rules [20]func() bool + parse func(rule ...int) error + reset func() + Pretty bool + tokens32 +} + +func (p *QueryParser) Parse(rule ...int) error { + return p.parse(rule...) +} + +func (p *QueryParser) Reset() { + p.reset() +} + +type textPosition struct { + line, symbol int +} + +type textPositionMap map[int]textPosition + +func translatePositions(buffer []rune, positions []int) textPositionMap { + length, translations, j, line, symbol := len(positions), make(textPositionMap, len(positions)), 0, 1, 0 + sort.Ints(positions) + +search: + for i, c := range buffer { + if c == '\n' { + line, symbol = line+1, 0 + } else { + symbol++ + } + if i == positions[j] { + translations[positions[j]] = textPosition{line, symbol} + for j++; j < length; j++ { + if i != positions[j] { + continue search + } + } + break search + } + } + + return translations +} + +type parseError struct { + p *QueryParser + max token32 +} + +func (e *parseError) Error() string { + tokens, error := []token32{e.max}, "\n" + positions, p := make([]int, 2*len(tokens)), 0 + for _, token := range tokens { + positions[p], p = int(token.begin), p+1 + positions[p], p = int(token.end), p+1 + } + translations := translatePositions(e.p.buffer, positions) + format := "parse error near %v (line %v symbol %v - line %v symbol %v):\n%v\n" + if e.p.Pretty { + format = "parse error near \x1B[34m%v\x1B[m (line %v symbol %v - line %v symbol %v):\n%v\n" + } + for _, token := range tokens { + begin, end := int(token.begin), int(token.end) + error += fmt.Sprintf(format, + rul3s[token.pegRule], + translations[begin].line, translations[begin].symbol, + translations[end].line, translations[end].symbol, + strconv.Quote(string(e.p.buffer[begin:end]))) + } + + return error +} + +func (p *QueryParser) PrintSyntaxTree() { + if p.Pretty { + p.tokens32.PrettyPrintSyntaxTree(p.Buffer) + } else { + p.tokens32.PrintSyntaxTree(p.Buffer) + } +} + +func (p *QueryParser) Init() { + var ( + max token32 + position, tokenIndex uint32 + buffer []rune + ) + p.reset = func() { + max = token32{} + position, tokenIndex = 0, 0 + + p.buffer = []rune(p.Buffer) + if len(p.buffer) == 0 || p.buffer[len(p.buffer)-1] != endSymbol { + p.buffer = append(p.buffer, endSymbol) + } + buffer = p.buffer + } + p.reset() + + _rules := p.rules + tree := tokens32{tree: make([]token32, math.MaxInt16)} + p.parse = func(rule ...int) error { + r := 1 + if len(rule) > 0 { + r = rule[0] + } + matches := p.rules[r]() + p.tokens32 = tree + if matches { + p.Trim(tokenIndex) + return nil + } + return &parseError{p, max} + } + + add := func(rule pegRule, begin uint32) { + tree.Add(rule, begin, position, tokenIndex) + tokenIndex++ + if begin != position && position > max.end { + max = token32{rule, begin, position} + } + } + + matchDot := func() bool { + if buffer[position] != endSymbol { + position++ + return true + } + return false + } + + /*matchChar := func(c byte) bool { + if buffer[position] == c { + position++ + return true + } + return false + }*/ + + /*matchRange := func(lower byte, upper byte) bool { + if c := buffer[position]; c >= lower && c <= upper { + position++ + return true + } + return false + }*/ + + _rules = [...]func() bool{ + nil, + /* 0 e <- <('"' condition (' '+ and ' '+ condition)* '"' !.)> */ + func() bool { + position0, tokenIndex0 := position, tokenIndex + { + position1 := position + if buffer[position] != rune('"') { + goto l0 + } + position++ + if !_rules[rulecondition]() { + goto l0 + } + l2: + { + position3, tokenIndex3 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l3 + } + position++ + l4: + { + position5, tokenIndex5 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l5 + } + position++ + goto l4 + l5: + position, tokenIndex = position5, tokenIndex5 + } + { + position6 := position + { + position7, tokenIndex7 := position, tokenIndex + if buffer[position] != rune('a') { + goto l8 + } + position++ + goto l7 + l8: + position, tokenIndex = position7, tokenIndex7 + if buffer[position] != rune('A') { + goto l3 + } + position++ + } + l7: + { + position9, tokenIndex9 := position, tokenIndex + if buffer[position] != rune('n') { + goto l10 + } + position++ + goto l9 + l10: + position, tokenIndex = position9, tokenIndex9 + if buffer[position] != rune('N') { + goto l3 + } + position++ + } + l9: + { + position11, tokenIndex11 := position, tokenIndex + if buffer[position] != rune('d') { + goto l12 + } + position++ + goto l11 + l12: + position, tokenIndex = position11, tokenIndex11 + if buffer[position] != rune('D') { + goto l3 + } + position++ + } + l11: + add(ruleand, position6) + } + if buffer[position] != rune(' ') { + goto l3 + } + position++ + l13: + { + position14, tokenIndex14 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l14 + } + position++ + goto l13 + l14: + position, tokenIndex = position14, tokenIndex14 + } + if !_rules[rulecondition]() { + goto l3 + } + goto l2 + l3: + position, tokenIndex = position3, tokenIndex3 + } + if buffer[position] != rune('"') { + goto l0 + } + position++ + { + position15, tokenIndex15 := position, tokenIndex + if !matchDot() { + goto l15 + } + goto l0 + l15: + position, tokenIndex = position15, tokenIndex15 + } + add(rulee, position1) + } + return true + l0: + position, tokenIndex = position0, tokenIndex0 + return false + }, + /* 1 condition <- <(tag ' '* ((le ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number))) / (ge ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number))) / ((&('=') (equal ' '* (number / time / date / value))) | (&('>') (g ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('<') (l ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('C' | 'c') (contains ' '* value)))))> */ + func() bool { + position16, tokenIndex16 := position, tokenIndex + { + position17 := position + { + position18 := position + { + position19 := position + { + position22, tokenIndex22 := position, tokenIndex + { + switch buffer[position] { + case '<': + if buffer[position] != rune('<') { + goto l22 + } + position++ + break + case '>': + if buffer[position] != rune('>') { + goto l22 + } + position++ + break + case '=': + if buffer[position] != rune('=') { + goto l22 + } + position++ + break + case '"': + if buffer[position] != rune('"') { + goto l22 + } + position++ + break + case ')': + if buffer[position] != rune(')') { + goto l22 + } + position++ + break + case '(': + if buffer[position] != rune('(') { + goto l22 + } + position++ + break + case '\\': + if buffer[position] != rune('\\') { + goto l22 + } + position++ + break + case '\r': + if buffer[position] != rune('\r') { + goto l22 + } + position++ + break + case '\n': + if buffer[position] != rune('\n') { + goto l22 + } + position++ + break + case '\t': + if buffer[position] != rune('\t') { + goto l22 + } + position++ + break + default: + if buffer[position] != rune(' ') { + goto l22 + } + position++ + break + } + } + + goto l16 + l22: + position, tokenIndex = position22, tokenIndex22 + } + if !matchDot() { + goto l16 + } + l20: + { + position21, tokenIndex21 := position, tokenIndex + { + position24, tokenIndex24 := position, tokenIndex + { + switch buffer[position] { + case '<': + if buffer[position] != rune('<') { + goto l24 + } + position++ + break + case '>': + if buffer[position] != rune('>') { + goto l24 + } + position++ + break + case '=': + if buffer[position] != rune('=') { + goto l24 + } + position++ + break + case '"': + if buffer[position] != rune('"') { + goto l24 + } + position++ + break + case ')': + if buffer[position] != rune(')') { + goto l24 + } + position++ + break + case '(': + if buffer[position] != rune('(') { + goto l24 + } + position++ + break + case '\\': + if buffer[position] != rune('\\') { + goto l24 + } + position++ + break + case '\r': + if buffer[position] != rune('\r') { + goto l24 + } + position++ + break + case '\n': + if buffer[position] != rune('\n') { + goto l24 + } + position++ + break + case '\t': + if buffer[position] != rune('\t') { + goto l24 + } + position++ + break + default: + if buffer[position] != rune(' ') { + goto l24 + } + position++ + break + } + } + + goto l21 + l24: + position, tokenIndex = position24, tokenIndex24 + } + if !matchDot() { + goto l21 + } + goto l20 + l21: + position, tokenIndex = position21, tokenIndex21 + } + add(rulePegText, position19) + } + add(ruletag, position18) + } + l26: + { + position27, tokenIndex27 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l27 + } + position++ + goto l26 + l27: + position, tokenIndex = position27, tokenIndex27 + } + { + position28, tokenIndex28 := position, tokenIndex + { + position30 := position + if buffer[position] != rune('<') { + goto l29 + } + position++ + if buffer[position] != rune('=') { + goto l29 + } + position++ + add(rulele, position30) + } + l31: + { + position32, tokenIndex32 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l32 + } + position++ + goto l31 + l32: + position, tokenIndex = position32, tokenIndex32 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l29 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l29 + } + break + default: + if !_rules[rulenumber]() { + goto l29 + } + break + } + } + + goto l28 + l29: + position, tokenIndex = position28, tokenIndex28 + { + position35 := position + if buffer[position] != rune('>') { + goto l34 + } + position++ + if buffer[position] != rune('=') { + goto l34 + } + position++ + add(rulege, position35) + } + l36: + { + position37, tokenIndex37 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l37 + } + position++ + goto l36 + l37: + position, tokenIndex = position37, tokenIndex37 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l34 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l34 + } + break + default: + if !_rules[rulenumber]() { + goto l34 + } + break + } + } + + goto l28 + l34: + position, tokenIndex = position28, tokenIndex28 + { + switch buffer[position] { + case '=': + { + position40 := position + if buffer[position] != rune('=') { + goto l16 + } + position++ + add(ruleequal, position40) + } + l41: + { + position42, tokenIndex42 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l42 + } + position++ + goto l41 + l42: + position, tokenIndex = position42, tokenIndex42 + } + { + position43, tokenIndex43 := position, tokenIndex + if !_rules[rulenumber]() { + goto l44 + } + goto l43 + l44: + position, tokenIndex = position43, tokenIndex43 + if !_rules[ruletime]() { + goto l45 + } + goto l43 + l45: + position, tokenIndex = position43, tokenIndex43 + if !_rules[ruledate]() { + goto l46 + } + goto l43 + l46: + position, tokenIndex = position43, tokenIndex43 + if !_rules[rulevalue]() { + goto l16 + } + } + l43: + break + case '>': + { + position47 := position + if buffer[position] != rune('>') { + goto l16 + } + position++ + add(ruleg, position47) + } + l48: + { + position49, tokenIndex49 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l49 + } + position++ + goto l48 + l49: + position, tokenIndex = position49, tokenIndex49 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l16 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l16 + } + break + default: + if !_rules[rulenumber]() { + goto l16 + } + break + } + } + + break + case '<': + { + position51 := position + if buffer[position] != rune('<') { + goto l16 + } + position++ + add(rulel, position51) + } + l52: + { + position53, tokenIndex53 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l53 + } + position++ + goto l52 + l53: + position, tokenIndex = position53, tokenIndex53 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l16 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l16 + } + break + default: + if !_rules[rulenumber]() { + goto l16 + } + break + } + } + + break + default: + { + position55 := position + { + position56, tokenIndex56 := position, tokenIndex + if buffer[position] != rune('c') { + goto l57 + } + position++ + goto l56 + l57: + position, tokenIndex = position56, tokenIndex56 + if buffer[position] != rune('C') { + goto l16 + } + position++ + } + l56: + { + position58, tokenIndex58 := position, tokenIndex + if buffer[position] != rune('o') { + goto l59 + } + position++ + goto l58 + l59: + position, tokenIndex = position58, tokenIndex58 + if buffer[position] != rune('O') { + goto l16 + } + position++ + } + l58: + { + position60, tokenIndex60 := position, tokenIndex + if buffer[position] != rune('n') { + goto l61 + } + position++ + goto l60 + l61: + position, tokenIndex = position60, tokenIndex60 + if buffer[position] != rune('N') { + goto l16 + } + position++ + } + l60: + { + position62, tokenIndex62 := position, tokenIndex + if buffer[position] != rune('t') { + goto l63 + } + position++ + goto l62 + l63: + position, tokenIndex = position62, tokenIndex62 + if buffer[position] != rune('T') { + goto l16 + } + position++ + } + l62: + { + position64, tokenIndex64 := position, tokenIndex + if buffer[position] != rune('a') { + goto l65 + } + position++ + goto l64 + l65: + position, tokenIndex = position64, tokenIndex64 + if buffer[position] != rune('A') { + goto l16 + } + position++ + } + l64: + { + position66, tokenIndex66 := position, tokenIndex + if buffer[position] != rune('i') { + goto l67 + } + position++ + goto l66 + l67: + position, tokenIndex = position66, tokenIndex66 + if buffer[position] != rune('I') { + goto l16 + } + position++ + } + l66: + { + position68, tokenIndex68 := position, tokenIndex + if buffer[position] != rune('n') { + goto l69 + } + position++ + goto l68 + l69: + position, tokenIndex = position68, tokenIndex68 + if buffer[position] != rune('N') { + goto l16 + } + position++ + } + l68: + { + position70, tokenIndex70 := position, tokenIndex + if buffer[position] != rune('s') { + goto l71 + } + position++ + goto l70 + l71: + position, tokenIndex = position70, tokenIndex70 + if buffer[position] != rune('S') { + goto l16 + } + position++ + } + l70: + add(rulecontains, position55) + } + l72: + { + position73, tokenIndex73 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l73 + } + position++ + goto l72 + l73: + position, tokenIndex = position73, tokenIndex73 + } + if !_rules[rulevalue]() { + goto l16 + } + break + } + } + + } + l28: + add(rulecondition, position17) + } + return true + l16: + position, tokenIndex = position16, tokenIndex16 + return false + }, + /* 2 tag <- <<(!((&('<') '<') | (&('>') '>') | (&('=') '=') | (&('"') '"') | (&(')') ')') | (&('(') '(') | (&('\\') '\\') | (&('\r') '\r') | (&('\n') '\n') | (&('\t') '\t') | (&(' ') ' ')) .)+>> */ + nil, + /* 3 value <- <<(!((&('<') '<') | (&('>') '>') | (&('=') '=') | (&('"') '"') | (&(')') ')') | (&('(') '(') | (&('\\') '\\') | (&('\r') '\r') | (&('\n') '\n') | (&('\t') '\t') | (&(' ') ' ')) .)+>> */ + func() bool { + position75, tokenIndex75 := position, tokenIndex + { + position76 := position + { + position77 := position + { + position80, tokenIndex80 := position, tokenIndex + { + switch buffer[position] { + case '<': + if buffer[position] != rune('<') { + goto l80 + } + position++ + break + case '>': + if buffer[position] != rune('>') { + goto l80 + } + position++ + break + case '=': + if buffer[position] != rune('=') { + goto l80 + } + position++ + break + case '"': + if buffer[position] != rune('"') { + goto l80 + } + position++ + break + case ')': + if buffer[position] != rune(')') { + goto l80 + } + position++ + break + case '(': + if buffer[position] != rune('(') { + goto l80 + } + position++ + break + case '\\': + if buffer[position] != rune('\\') { + goto l80 + } + position++ + break + case '\r': + if buffer[position] != rune('\r') { + goto l80 + } + position++ + break + case '\n': + if buffer[position] != rune('\n') { + goto l80 + } + position++ + break + case '\t': + if buffer[position] != rune('\t') { + goto l80 + } + position++ + break + default: + if buffer[position] != rune(' ') { + goto l80 + } + position++ + break + } + } + + goto l75 + l80: + position, tokenIndex = position80, tokenIndex80 + } + if !matchDot() { + goto l75 + } + l78: + { + position79, tokenIndex79 := position, tokenIndex + { + position82, tokenIndex82 := position, tokenIndex + { + switch buffer[position] { + case '<': + if buffer[position] != rune('<') { + goto l82 + } + position++ + break + case '>': + if buffer[position] != rune('>') { + goto l82 + } + position++ + break + case '=': + if buffer[position] != rune('=') { + goto l82 + } + position++ + break + case '"': + if buffer[position] != rune('"') { + goto l82 + } + position++ + break + case ')': + if buffer[position] != rune(')') { + goto l82 + } + position++ + break + case '(': + if buffer[position] != rune('(') { + goto l82 + } + position++ + break + case '\\': + if buffer[position] != rune('\\') { + goto l82 + } + position++ + break + case '\r': + if buffer[position] != rune('\r') { + goto l82 + } + position++ + break + case '\n': + if buffer[position] != rune('\n') { + goto l82 + } + position++ + break + case '\t': + if buffer[position] != rune('\t') { + goto l82 + } + position++ + break + default: + if buffer[position] != rune(' ') { + goto l82 + } + position++ + break + } + } + + goto l79 + l82: + position, tokenIndex = position82, tokenIndex82 + } + if !matchDot() { + goto l79 + } + goto l78 + l79: + position, tokenIndex = position79, tokenIndex79 + } + add(rulePegText, position77) + } + add(rulevalue, position76) + } + return true + l75: + position, tokenIndex = position75, tokenIndex75 + return false + }, + /* 4 number <- <<('0' / ([1-9] digit* ('.' digit*)?))>> */ + func() bool { + position84, tokenIndex84 := position, tokenIndex + { + position85 := position + { + position86 := position + { + position87, tokenIndex87 := position, tokenIndex + if buffer[position] != rune('0') { + goto l88 + } + position++ + goto l87 + l88: + position, tokenIndex = position87, tokenIndex87 + if c := buffer[position]; c < rune('1') || c > rune('9') { + goto l84 + } + position++ + l89: + { + position90, tokenIndex90 := position, tokenIndex + if !_rules[ruledigit]() { + goto l90 + } + goto l89 + l90: + position, tokenIndex = position90, tokenIndex90 + } + { + position91, tokenIndex91 := position, tokenIndex + if buffer[position] != rune('.') { + goto l91 + } + position++ + l93: + { + position94, tokenIndex94 := position, tokenIndex + if !_rules[ruledigit]() { + goto l94 + } + goto l93 + l94: + position, tokenIndex = position94, tokenIndex94 + } + goto l92 + l91: + position, tokenIndex = position91, tokenIndex91 + } + l92: + } + l87: + add(rulePegText, position86) + } + add(rulenumber, position85) + } + return true + l84: + position, tokenIndex = position84, tokenIndex84 + return false + }, + /* 5 digit <- <[0-9]> */ + func() bool { + position95, tokenIndex95 := position, tokenIndex + { + position96 := position + if c := buffer[position]; c < rune('0') || c > rune('9') { + goto l95 + } + position++ + add(ruledigit, position96) + } + return true + l95: + position, tokenIndex = position95, tokenIndex95 + return false + }, + /* 6 time <- <(('t' / 'T') ('i' / 'I') ('m' / 'M') ('e' / 'E') ' ' <(year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit ((('-' / '+') digit digit ':' digit digit) / 'Z'))>)> */ + func() bool { + position97, tokenIndex97 := position, tokenIndex + { + position98 := position + { + position99, tokenIndex99 := position, tokenIndex + if buffer[position] != rune('t') { + goto l100 + } + position++ + goto l99 + l100: + position, tokenIndex = position99, tokenIndex99 + if buffer[position] != rune('T') { + goto l97 + } + position++ + } + l99: + { + position101, tokenIndex101 := position, tokenIndex + if buffer[position] != rune('i') { + goto l102 + } + position++ + goto l101 + l102: + position, tokenIndex = position101, tokenIndex101 + if buffer[position] != rune('I') { + goto l97 + } + position++ + } + l101: + { + position103, tokenIndex103 := position, tokenIndex + if buffer[position] != rune('m') { + goto l104 + } + position++ + goto l103 + l104: + position, tokenIndex = position103, tokenIndex103 + if buffer[position] != rune('M') { + goto l97 + } + position++ + } + l103: + { + position105, tokenIndex105 := position, tokenIndex + if buffer[position] != rune('e') { + goto l106 + } + position++ + goto l105 + l106: + position, tokenIndex = position105, tokenIndex105 + if buffer[position] != rune('E') { + goto l97 + } + position++ + } + l105: + if buffer[position] != rune(' ') { + goto l97 + } + position++ + { + position107 := position + if !_rules[ruleyear]() { + goto l97 + } + if buffer[position] != rune('-') { + goto l97 + } + position++ + if !_rules[rulemonth]() { + goto l97 + } + if buffer[position] != rune('-') { + goto l97 + } + position++ + if !_rules[ruleday]() { + goto l97 + } + if buffer[position] != rune('T') { + goto l97 + } + position++ + if !_rules[ruledigit]() { + goto l97 + } + if !_rules[ruledigit]() { + goto l97 + } + if buffer[position] != rune(':') { + goto l97 + } + position++ + if !_rules[ruledigit]() { + goto l97 + } + if !_rules[ruledigit]() { + goto l97 + } + if buffer[position] != rune(':') { + goto l97 + } + position++ + if !_rules[ruledigit]() { + goto l97 + } + if !_rules[ruledigit]() { + goto l97 + } + { + position108, tokenIndex108 := position, tokenIndex + { + position110, tokenIndex110 := position, tokenIndex + if buffer[position] != rune('-') { + goto l111 + } + position++ + goto l110 + l111: + position, tokenIndex = position110, tokenIndex110 + if buffer[position] != rune('+') { + goto l109 + } + position++ + } + l110: + if !_rules[ruledigit]() { + goto l109 + } + if !_rules[ruledigit]() { + goto l109 + } + if buffer[position] != rune(':') { + goto l109 + } + position++ + if !_rules[ruledigit]() { + goto l109 + } + if !_rules[ruledigit]() { + goto l109 + } + goto l108 + l109: + position, tokenIndex = position108, tokenIndex108 + if buffer[position] != rune('Z') { + goto l97 + } + position++ + } + l108: + add(rulePegText, position107) + } + add(ruletime, position98) + } + return true + l97: + position, tokenIndex = position97, tokenIndex97 + return false + }, + /* 7 date <- <(('d' / 'D') ('a' / 'A') ('t' / 'T') ('e' / 'E') ' ' <(year '-' month '-' day)>)> */ + func() bool { + position112, tokenIndex112 := position, tokenIndex + { + position113 := position + { + position114, tokenIndex114 := position, tokenIndex + if buffer[position] != rune('d') { + goto l115 + } + position++ + goto l114 + l115: + position, tokenIndex = position114, tokenIndex114 + if buffer[position] != rune('D') { + goto l112 + } + position++ + } + l114: + { + position116, tokenIndex116 := position, tokenIndex + if buffer[position] != rune('a') { + goto l117 + } + position++ + goto l116 + l117: + position, tokenIndex = position116, tokenIndex116 + if buffer[position] != rune('A') { + goto l112 + } + position++ + } + l116: + { + position118, tokenIndex118 := position, tokenIndex + if buffer[position] != rune('t') { + goto l119 + } + position++ + goto l118 + l119: + position, tokenIndex = position118, tokenIndex118 + if buffer[position] != rune('T') { + goto l112 + } + position++ + } + l118: + { + position120, tokenIndex120 := position, tokenIndex + if buffer[position] != rune('e') { + goto l121 + } + position++ + goto l120 + l121: + position, tokenIndex = position120, tokenIndex120 + if buffer[position] != rune('E') { + goto l112 + } + position++ + } + l120: + if buffer[position] != rune(' ') { + goto l112 + } + position++ + { + position122 := position + if !_rules[ruleyear]() { + goto l112 + } + if buffer[position] != rune('-') { + goto l112 + } + position++ + if !_rules[rulemonth]() { + goto l112 + } + if buffer[position] != rune('-') { + goto l112 + } + position++ + if !_rules[ruleday]() { + goto l112 + } + add(rulePegText, position122) + } + add(ruledate, position113) + } + return true + l112: + position, tokenIndex = position112, tokenIndex112 + return false + }, + /* 8 year <- <(('1' / '2') digit digit digit)> */ + func() bool { + position123, tokenIndex123 := position, tokenIndex + { + position124 := position + { + position125, tokenIndex125 := position, tokenIndex + if buffer[position] != rune('1') { + goto l126 + } + position++ + goto l125 + l126: + position, tokenIndex = position125, tokenIndex125 + if buffer[position] != rune('2') { + goto l123 + } + position++ + } + l125: + if !_rules[ruledigit]() { + goto l123 + } + if !_rules[ruledigit]() { + goto l123 + } + if !_rules[ruledigit]() { + goto l123 + } + add(ruleyear, position124) + } + return true + l123: + position, tokenIndex = position123, tokenIndex123 + return false + }, + /* 9 month <- <(('0' / '1') digit)> */ + func() bool { + position127, tokenIndex127 := position, tokenIndex + { + position128 := position + { + position129, tokenIndex129 := position, tokenIndex + if buffer[position] != rune('0') { + goto l130 + } + position++ + goto l129 + l130: + position, tokenIndex = position129, tokenIndex129 + if buffer[position] != rune('1') { + goto l127 + } + position++ + } + l129: + if !_rules[ruledigit]() { + goto l127 + } + add(rulemonth, position128) + } + return true + l127: + position, tokenIndex = position127, tokenIndex127 + return false + }, + /* 10 day <- <(((&('3') '3') | (&('2') '2') | (&('1') '1') | (&('0') '0')) digit)> */ + func() bool { + position131, tokenIndex131 := position, tokenIndex + { + position132 := position + { + switch buffer[position] { + case '3': + if buffer[position] != rune('3') { + goto l131 + } + position++ + break + case '2': + if buffer[position] != rune('2') { + goto l131 + } + position++ + break + case '1': + if buffer[position] != rune('1') { + goto l131 + } + position++ + break + default: + if buffer[position] != rune('0') { + goto l131 + } + position++ + break + } + } + + if !_rules[ruledigit]() { + goto l131 + } + add(ruleday, position132) + } + return true + l131: + position, tokenIndex = position131, tokenIndex131 + return false + }, + /* 11 and <- <(('a' / 'A') ('n' / 'N') ('d' / 'D'))> */ + nil, + /* 12 equal <- <'='> */ + nil, + /* 13 contains <- <(('c' / 'C') ('o' / 'O') ('n' / 'N') ('t' / 'T') ('a' / 'A') ('i' / 'I') ('n' / 'N') ('s' / 'S'))> */ + nil, + /* 14 le <- <('<' '=')> */ + nil, + /* 15 ge <- <('>' '=')> */ + nil, + /* 16 l <- <'<'> */ + nil, + /* 17 g <- <'>'> */ + nil, + nil, + } + p.rules = _rules +} diff --git a/pubsub/query/query_test.go b/pubsub/query/query_test.go new file mode 100644 index 000000000..75d02ee49 --- /dev/null +++ b/pubsub/query/query_test.go @@ -0,0 +1,64 @@ +package query_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tendermint/tmlibs/pubsub/query" +) + +func TestMatches(t *testing.T) { + const shortForm = "2006-Jan-02" + txDate, err := time.Parse(shortForm, "2017-Jan-01") + require.NoError(t, err) + txTime, err := time.Parse(time.RFC3339, "2018-05-03T14:45:00Z") + require.NoError(t, err) + + testCases := []struct { + s string + tags map[string]interface{} + err bool + matches bool + }{ + {"tm.events.type=NewBlock", map[string]interface{}{"tm.events.type": "NewBlock"}, false, true}, + + {"tx.gas > 7", map[string]interface{}{"tx.gas": 8}, false, true}, + {"tx.gas > 7 AND tx.gas < 9", map[string]interface{}{"tx.gas": 8}, false, true}, + {"body.weight >= 3.5", map[string]interface{}{"body.weight": 3.5}, false, true}, + {"account.balance < 1000.0", map[string]interface{}{"account.balance": 900}, false, true}, + {"apples.kg <= 4", map[string]interface{}{"apples.kg": 4.0}, false, true}, + {"body.weight >= 4.5", map[string]interface{}{"body.weight": float32(4.5)}, false, true}, + {"oranges.kg < 4 AND watermellons.kg > 10", map[string]interface{}{"oranges.kg": 3, "watermellons.kg": 12}, false, true}, + {"peaches.kg < 4", map[string]interface{}{"peaches.kg": 5}, false, false}, + + {"tx.date > DATE 2017-01-01", map[string]interface{}{"tx.date": time.Now()}, false, true}, + {"tx.date = DATE 2017-01-01", map[string]interface{}{"tx.date": txDate}, false, true}, + {"tx.date = DATE 2018-01-01", map[string]interface{}{"tx.date": txDate}, false, false}, + + {"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": time.Now()}, false, true}, + {"tx.time = TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": txTime}, false, false}, + + {"abci.owner.name CONTAINS Igor", map[string]interface{}{"abci.owner.name": "Igor,Ivan"}, false, true}, + {"abci.owner.name CONTAINS Igor", map[string]interface{}{"abci.owner.name": "Pavel,Ivan"}, false, false}, + } + + for _, tc := range testCases { + query, err := query.New(tc.s) + if !tc.err { + require.Nil(t, err) + } + + if tc.matches { + assert.True(t, query.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags) + } else { + assert.False(t, query.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags) + } + } +} + +func TestMustParse(t *testing.T) { + assert.Panics(t, func() { query.MustParse("=") }) + assert.NotPanics(t, func() { query.MustParse("tm.events.type=NewBlock") }) +} From 8062ade7876d1d82b7385c09a3cdb6aa0b7ba490 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 12 Jul 2017 13:10:36 +0300 Subject: [PATCH 02/10] remove all clients (including closing all channels) on shutdown --- pubsub/pubsub.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 264848161..ae642a4fb 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -216,7 +216,9 @@ loop: state.removeAll(cmd.clientID) } case shutdown: - state.reset() + for clientID, _ := range state.clients { + state.removeAll(clientID) + } break loop case sub: state.add(cmd.clientID, cmd.query, cmd.ch) @@ -286,11 +288,6 @@ func (state *state) removeAll(clientID string) { delete(state.clients, clientID) } -func (state *state) reset() { - state.queries = make(map[Query]map[string]chan<- interface{}) - state.clients = make(map[string]map[Query]struct{}) -} - func (state *state) send(msg interface{}, tags map[string]interface{}, slowClientStrategy overflowStrategy, logger log.Logger) { for q, clientToChannelMap := range state.queries { // NOTE we can use LRU cache to speed up common cases like query = " From e4f3f9d9bf327083eba26afa2b85ff09189856c3 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 12 Jul 2017 22:52:13 +0300 Subject: [PATCH 03/10] remove comment about LRU cache (see comments below) I've tried https://github.com/hashicorp/golang-lru/tree/master/simplelru today and here are the results: with LRU cache: ``` Benchmark10Clients-2 50000 29021 ns/op 3976 B/op 105 allocs/op Benchmark100Clients-2 3000 363432 ns/op 36382 B/op 1005 allocs/op Benchmark1000Clients-2 500 2473752 ns/op 360500 B/op 10009 allocs/op Benchmark10ClientsUsingTheSameQuery-2 300000 4059 ns/op 773 B/op 15 allocs/op Benchmark100ClientsUsingTheSameQuery-2 500000 4360 ns/op 773 B/op 15 allocs/op Benchmark1000ClientsUsingTheSameQuery-2 300000 4204 ns/op 773 B/op 15 allocs/op ``` without LRU cache: ``` Benchmark10Clients-2 200000 5267 ns/op 616 B/op 25 allocs/op Benchmark100Clients-2 30000 42134 ns/op 2776 B/op 205 allocs/op Benchmark1000Clients-2 3000 552648 ns/op 24376 B/op 2005 allocs/op Benchmark10ClientsOneQuery-2 1000000 2127 ns/op 462 B/op 9 allocs/op Benchmark100ClientsOneQuery-2 500000 2353 ns/op 462 B/op 9 allocs/op Benchmark1000ClientsOneQuery-2 500000 2339 ns/op 462 B/op 9 allocs/op ``` > How were you using the lru cache exactly? I was adding a KV pair each time there is a match plus checking if `lru.Contains(key)` before running the actual check (`q.Matches(tags)`). ``` key = fmt.Sprintf("%s/%v", query + tags) ``` --- pubsub/pubsub.go | 3 --- pubsub/pubsub_test.go | 32 +++++++++++++++++++++++++++++++- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index ae642a4fb..007f93f32 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -290,11 +290,8 @@ func (state *state) removeAll(clientID string) { func (state *state) send(msg interface{}, tags map[string]interface{}, slowClientStrategy overflowStrategy, logger log.Logger) { for q, clientToChannelMap := range state.queries { - // NOTE we can use LRU cache to speed up common cases like query = " - // tm.events.type=NewBlock" and tags = {"tm.events.type": "NewBlock"} if q.Matches(tags) { for clientID, ch := range clientToChannelMap { - logger.Info("Sending message to client", "msg", msg, "client", clientID) switch slowClientStrategy { case drop: select { diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 570f76a82..3112ab5d3 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -193,13 +193,21 @@ func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) } func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) } +func Benchmark10ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(10, b) } +func Benchmark100ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(100, b) } +func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) } + func benchmarkNClients(n int, b *testing.B) { - s := pubsub.NewServer(pubsub.BufferCapacity(b.N)) + s := pubsub.NewServer() s.Start() defer s.Stop() for i := 0; i < n; i++ { ch := make(chan interface{}) + go func() { + for range ch { + } + }() s.Subscribe(clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = Ivan AND abci.Invoices.Number = %d", i)), ch) } @@ -210,6 +218,28 @@ func benchmarkNClients(n int, b *testing.B) { } } +func benchmarkNClientsOneQuery(n int, b *testing.B) { + s := pubsub.NewServer() + s.Start() + defer s.Stop() + + q := query.MustParse("abci.Account.Owner = Ivan AND abci.Invoices.Number = 1") + for i := 0; i < n; i++ { + ch := make(chan interface{}) + go func() { + for range ch { + } + }() + s.Subscribe(clientID, q, ch) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.PublishWithTags("Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}) + } +} + /////////////////////////////////////////////////////////////////////////////// /// HELPERS /////////////////////////////////////////////////////////////////////////////// From 4aa024d843b081977304c0184e8c56c05d22c32f Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 12 Jul 2017 23:10:36 +0300 Subject: [PATCH 04/10] add more info to error messages --- pubsub/pubsub.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 007f93f32..aec60bcc0 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -26,6 +26,7 @@ package pubsub import ( "errors" + "fmt" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" @@ -174,7 +175,7 @@ func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) e select { case s.cmds <- pubCmd: default: - s.Logger.Error("Server overflowed, dropping message...", "msg", msg) + s.Logger.Error("Server overflowed, dropping message...", "msg", msg, "tags", fmt.Sprintf("%v", tags)) return ErrorOverflow } case wait: @@ -297,7 +298,7 @@ func (state *state) send(msg interface{}, tags map[string]interface{}, slowClien select { case ch <- msg: default: - logger.Error("Client is busy, skipping...", "clientID", clientID) + logger.Error("Wanted to send a message, but the client is busy", "msg", msg, "tags", fmt.Sprintf("%v", tags), "clientID", clientID) } case wait: ch <- msg From 13207a5927e21d96afb9a9520f5a1a7b42b323bb Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 14 Jul 2017 12:32:01 +0300 Subject: [PATCH 05/10] remove overflow options --- pubsub/pubsub.go | 91 ++++--------------------------------------- pubsub/pubsub_test.go | 53 ------------------------- 2 files changed, 8 insertions(+), 136 deletions(-) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index aec60bcc0..2b1a569c7 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -10,26 +10,13 @@ // match, this message will be pushed to all clients, subscribed to that query. // See query subpackage for our implementation. // -// Overflow strategies (incoming publish requests): -// -// 1) drop - drops publish requests when there are too many of them -// 2) wait - blocks until the server is ready to accept more publish requests (default) -// // Subscribe/Unsubscribe calls are always blocking. -// -// Overflow strategies (outgoing messages): -// -// 1) skip - do not send a message if the client is busy or slow (default) -// 2) wait - wait until the client is ready to accept new messages -// package pubsub import ( "errors" - "fmt" cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tmlibs/log" ) type operation int @@ -41,13 +28,6 @@ const ( shutdown ) -type overflowStrategy int - -const ( - drop overflowStrategy = iota - wait -) - var ( ErrorOverflow = errors.New("Server overflowed") ) @@ -72,20 +52,16 @@ type Server struct { cmn.BaseService cmds chan cmd - - overflowStrategy overflowStrategy - slowClientStrategy overflowStrategy } // Option sets a parameter for the server. type Option func(*Server) // NewServer returns a new server. See the commentary on the Option functions -// for a detailed description of how to configure buffering and overflow -// behavior. If no options are provided, the resulting server's queue is -// unbuffered and it blocks when overflowed. +// for a detailed description of how to configure buffering. If no options are +// provided, the resulting server's queue is unbuffered. func NewServer(options ...Option) *Server { - s := &Server{overflowStrategy: wait, slowClientStrategy: drop} + s := &Server{} s.BaseService = *cmn.NewBaseService(nil, "PubSub", s) for _, option := range options { @@ -111,38 +87,6 @@ func BufferCapacity(cap int) Option { } } -// OverflowStrategyDrop will tell the server to drop messages when it can't -// process more messages. -func OverflowStrategyDrop() Option { - return func(s *Server) { - s.overflowStrategy = drop - } -} - -// OverflowStrategyWait will tell the server to block and wait for some time -// for server to process other messages. Default strategy. -func OverflowStrategyWait() func(*Server) { - return func(s *Server) { - s.overflowStrategy = wait - } -} - -// WaitSlowClients will tell the server to block and wait until subscriber -// reads a messages even if it is fast enough to process them. -func WaitSlowClients() func(*Server) { - return func(s *Server) { - s.slowClientStrategy = wait - } -} - -// SkipSlowClients will tell the server to skip subscriber if it is busy -// processing previous message(s). Default strategy. -func SkipSlowClients() func(*Server) { - return func(s *Server) { - s.slowClientStrategy = drop - } -} - // Subscribe returns a channel on which messages matching the given query can // be received. If the subscription already exists old channel will be closed // and new one returned. @@ -170,17 +114,7 @@ func (s *Server) Publish(msg interface{}) error { // will be sent to a client. func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) error { pubCmd := cmd{op: pub, msg: msg, tags: tags} - switch s.overflowStrategy { - case drop: - select { - case s.cmds <- pubCmd: - default: - s.Logger.Error("Server overflowed, dropping message...", "msg", msg, "tags", fmt.Sprintf("%v", tags)) - return ErrorOverflow - } - case wait: - s.cmds <- pubCmd - } + s.cmds <- pubCmd return nil } @@ -224,7 +158,7 @@ loop: case sub: state.add(cmd.clientID, cmd.query, cmd.ch) case pub: - state.send(cmd.msg, cmd.tags, s.slowClientStrategy, s.Logger) + state.send(cmd.msg, cmd.tags) } } } @@ -289,20 +223,11 @@ func (state *state) removeAll(clientID string) { delete(state.clients, clientID) } -func (state *state) send(msg interface{}, tags map[string]interface{}, slowClientStrategy overflowStrategy, logger log.Logger) { +func (state *state) send(msg interface{}, tags map[string]interface{}) { for q, clientToChannelMap := range state.queries { if q.Matches(tags) { - for clientID, ch := range clientToChannelMap { - switch slowClientStrategy { - case drop: - select { - case ch <- msg: - default: - logger.Error("Wanted to send a message, but the client is busy", "msg", msg, "tags", fmt.Sprintf("%v", tags), "clientID", clientID) - } - case wait: - ch <- msg - } + for _, ch := range clientToChannelMap { + ch <- msg } } } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 3112ab5d3..7cc4e5998 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -126,30 +126,6 @@ func TestUnsubscribeAll(t *testing.T) { assert.False(t, ok) } -func TestOverflowStrategyDrop(t *testing.T) { - s := pubsub.NewServer(pubsub.OverflowStrategyDrop()) - s.SetLogger(log.TestingLogger()) - - err := s.Publish("Veda") - if assert.Error(t, err) { - assert.Equal(t, pubsub.ErrorOverflow, err) - } -} - -func TestOverflowStrategyWait(t *testing.T) { - s := pubsub.NewServer(pubsub.OverflowStrategyWait()) - s.SetLogger(log.TestingLogger()) - - go func() { - time.Sleep(1 * time.Second) - s.Start() - defer s.Stop() - }() - - err := s.Publish("Veda") - assert.NoError(t, err) -} - func TestBufferCapacity(t *testing.T) { s := pubsub.NewServer(pubsub.BufferCapacity(2)) s.SetLogger(log.TestingLogger()) @@ -160,35 +136,6 @@ func TestBufferCapacity(t *testing.T) { require.NoError(t, err) } -func TestWaitSlowClients(t *testing.T) { - s := pubsub.NewServer(pubsub.WaitSlowClients()) - s.SetLogger(log.TestingLogger()) - s.Start() - defer s.Stop() - - ch := make(chan interface{}) - s.Subscribe(clientID, query.Empty{}, ch) - err := s.Publish("Wonderwoman") - require.NoError(t, err) - - time.Sleep(1 * time.Second) - - assertReceive(t, "Wonderwoman", ch) -} - -func TestSkipSlowClients(t *testing.T) { - s := pubsub.NewServer(pubsub.SkipSlowClients()) - s.SetLogger(log.TestingLogger()) - s.Start() - defer s.Stop() - - ch := make(chan interface{}) - s.Subscribe(clientID, query.Empty{}, ch) - err := s.Publish("Cyclops") - require.NoError(t, err) - assert.Zero(t, len(ch)) -} - func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) } func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) } From 0006bfc359e2b50a6a083ea750e2b1368477fcbc Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 14 Jul 2017 13:02:32 +0300 Subject: [PATCH 06/10] return ErrorOverflow on Subscribe if server is overflowed > why we need it? most of our subscribers will be RPC WS subscribers, so if there are too many, nothing wrong with rejecting to subscribe. however, consensus reactor must be the first to subscribe, since its work depends on the pubsub package. --- pubsub/example_test.go | 4 +-- pubsub/pubsub.go | 50 +++++++++++++++++++++------------- pubsub/pubsub_test.go | 61 ++++++++++++++++++++++++------------------ 3 files changed, 68 insertions(+), 47 deletions(-) diff --git a/pubsub/example_test.go b/pubsub/example_test.go index d64b96eab..38026ccd6 100644 --- a/pubsub/example_test.go +++ b/pubsub/example_test.go @@ -17,8 +17,8 @@ func TestExample(t *testing.T) { defer s.Stop() ch := make(chan interface{}, 1) - s.Subscribe("example-client", query.MustParse("abci.account.name=John"), ch) - err := s.PublishWithTags("Tombstone", map[string]interface{}{"abci.account.name": "John"}) + err := s.Subscribe("example-client", query.MustParse("abci.account.name=John"), ch) require.NoError(t, err) + s.PublishWithTags("Tombstone", map[string]interface{}{"abci.account.name": "John"}) assertReceive(t, "Tombstone", ch) } diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 2b1a569c7..9ac260c93 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -15,6 +15,7 @@ package pubsub import ( "errors" + "time" cmn "github.com/tendermint/tmlibs/common" ) @@ -28,8 +29,10 @@ const ( shutdown ) +const subscribeTimeout = 10 * time.Millisecond + var ( - ErrorOverflow = errors.New("Server overflowed") + ErrorOverflow = errors.New("server overflowed") ) type cmd struct { @@ -51,7 +54,8 @@ type Query interface { type Server struct { cmn.BaseService - cmds chan cmd + cmds chan cmd + cmdsCap int } // Option sets a parameter for the server. @@ -68,9 +72,8 @@ func NewServer(options ...Option) *Server { option(s) } - if s.cmds == nil { // if BufferCapacity was not set, create unbuffered channel - s.cmds = make(chan cmd) - } + // if BufferCapacity option was not set, the channel is unbuffered + s.cmds = make(chan cmd, s.cmdsCap) return s } @@ -82,40 +85,49 @@ func NewServer(options ...Option) *Server { func BufferCapacity(cap int) Option { return func(s *Server) { if cap > 0 { - s.cmds = make(chan cmd, cap) + s.cmdsCap = cap } } } +// Returns capacity of the internal server's queue. +func (s Server) BufferCapacity() int { + return s.cmdsCap +} + // Subscribe returns a channel on which messages matching the given query can // be received. If the subscription already exists old channel will be closed -// and new one returned. -func (s *Server) Subscribe(clientID string, query Query, out chan<- interface{}) { - s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out} +// and new one returned. Error will be returned to the caller if the server is +// overflowed. +func (s *Server) Subscribe(clientID string, query Query, out chan<- interface{}) error { + select { + case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: + return nil + case <-time.After(subscribeTimeout): + return ErrorOverflow + } } -// Unsubscribe unsubscribes the given client from the query. +// Unsubscribe unsubscribes the given client from the query. Blocking. func (s *Server) Unsubscribe(clientID string, query Query) { s.cmds <- cmd{op: unsub, clientID: clientID, query: query} } -// Unsubscribe unsubscribes the given channel. +// Unsubscribe unsubscribes the given channel. Blocking. func (s *Server) UnsubscribeAll(clientID string) { s.cmds <- cmd{op: unsub, clientID: clientID} } -// Publish publishes the given message. -func (s *Server) Publish(msg interface{}) error { - return s.PublishWithTags(msg, make(map[string]interface{})) +// Publish publishes the given message. Blocking. +func (s *Server) Publish(msg interface{}) { + s.PublishWithTags(msg, make(map[string]interface{})) } // PublishWithTags publishes the given message with a set of tags. This set of // tags will be matched with client queries. If there is a match, the message -// will be sent to a client. -func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) error { - pubCmd := cmd{op: pub, msg: msg, tags: tags} - s.cmds <- pubCmd - return nil +// will be sent to a client. Blocking. +func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) { + s.cmds <- cmd{op: pub, msg: msg, tags: tags} } // OnStop implements Service.OnStop by shutting down the server. diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 7cc4e5998..fb15b3489 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -25,13 +25,12 @@ func TestSubscribe(t *testing.T) { defer s.Stop() ch := make(chan interface{}, 1) - s.Subscribe(clientID, query.Empty{}, ch) - err := s.Publish("Ka-Zar") + err := s.Subscribe(clientID, query.Empty{}, ch) require.NoError(t, err) + s.Publish("Ka-Zar") assertReceive(t, "Ka-Zar", ch) - err = s.Publish("Quicksilver") - require.NoError(t, err) + s.Publish("Quicksilver") assertReceive(t, "Quicksilver", ch) } @@ -41,22 +40,22 @@ func TestDifferentClients(t *testing.T) { s.Start() defer s.Stop() ch1 := make(chan interface{}, 1) - s.Subscribe("client-1", query.MustParse("tm.events.type=NewBlock"), ch1) - err := s.PublishWithTags("Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) + err := s.Subscribe("client-1", query.MustParse("tm.events.type=NewBlock"), ch1) require.NoError(t, err) + s.PublishWithTags("Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) assertReceive(t, "Iceman", ch1) ch2 := make(chan interface{}, 1) - s.Subscribe("client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2) - err = s.PublishWithTags("Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) + err = s.Subscribe("client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2) require.NoError(t, err) + s.PublishWithTags("Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) assertReceive(t, "Ultimo", ch1) assertReceive(t, "Ultimo", ch2) ch3 := make(chan interface{}, 1) - s.Subscribe("client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3) - err = s.PublishWithTags("Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) + err = s.Subscribe("client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3) require.NoError(t, err) + s.PublishWithTags("Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) assert.Zero(t, len(ch3)) } @@ -69,19 +68,19 @@ func TestClientResubscribes(t *testing.T) { q := query.MustParse("tm.events.type=NewBlock") ch1 := make(chan interface{}, 1) - s.Subscribe(clientID, q, ch1) - err := s.PublishWithTags("Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) + err := s.Subscribe(clientID, q, ch1) require.NoError(t, err) + s.PublishWithTags("Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) assertReceive(t, "Goblin Queen", ch1) ch2 := make(chan interface{}, 1) - s.Subscribe(clientID, q, ch2) + err = s.Subscribe(clientID, q, ch2) + require.NoError(t, err) _, ok := <-ch1 assert.False(t, ok) - err = s.PublishWithTags("Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) - require.NoError(t, err) + s.PublishWithTags("Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) assertReceive(t, "Spider-Man", ch2) } @@ -92,11 +91,11 @@ func TestUnsubscribe(t *testing.T) { defer s.Stop() ch := make(chan interface{}) - s.Subscribe(clientID, query.Empty{}, ch) + err := s.Subscribe(clientID, query.Empty{}, ch) + require.NoError(t, err) s.Unsubscribe(clientID, query.Empty{}) - err := s.Publish("Nick Fury") - require.NoError(t, err) + s.Publish("Nick Fury") assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") _, ok := <-ch @@ -110,13 +109,14 @@ func TestUnsubscribeAll(t *testing.T) { defer s.Stop() ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) - s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch1) - s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2) + err := s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch1) + require.NoError(t, err) + err = s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2) + require.NoError(t, err) s.UnsubscribeAll(clientID) - err := s.Publish("Nick Fury") - require.NoError(t, err) + s.Publish("Nick Fury") assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") @@ -130,10 +130,19 @@ func TestBufferCapacity(t *testing.T) { s := pubsub.NewServer(pubsub.BufferCapacity(2)) s.SetLogger(log.TestingLogger()) - err := s.Publish("Nighthawk") - require.NoError(t, err) - err = s.Publish("Sage") - require.NoError(t, err) + s.Publish("Nighthawk") + s.Publish("Sage") +} + +func TestSubscribeReturnsErrorIfServerOverflowed(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + + ch := make(chan interface{}, 1) + err := s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch) + if assert.Error(t, err) { + assert.Equal(t, pubsub.ErrorOverflow, err) + } } func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } From e664f9c68861299060174ba348ad64a6854551d3 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 14 Jul 2017 14:49:25 +0300 Subject: [PATCH 07/10] use context to provide timeouts! --- pubsub/example_test.go | 7 +++- pubsub/pubsub.go | 53 +++++++++++++++----------- pubsub/pubsub_test.go | 86 ++++++++++++++++++++++++------------------ 3 files changed, 85 insertions(+), 61 deletions(-) diff --git a/pubsub/example_test.go b/pubsub/example_test.go index 38026ccd6..6597c858d 100644 --- a/pubsub/example_test.go +++ b/pubsub/example_test.go @@ -1,6 +1,7 @@ package pubsub_test import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -16,9 +17,11 @@ func TestExample(t *testing.T) { s.Start() defer s.Stop() + ctx := context.Background() ch := make(chan interface{}, 1) - err := s.Subscribe("example-client", query.MustParse("abci.account.name=John"), ch) + err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name=John"), ch) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Tombstone", map[string]interface{}{"abci.account.name": "John"}) require.NoError(t, err) - s.PublishWithTags("Tombstone", map[string]interface{}{"abci.account.name": "John"}) assertReceive(t, "Tombstone", ch) } diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 9ac260c93..34df86a45 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -14,8 +14,7 @@ package pubsub import ( - "errors" - "time" + "context" cmn "github.com/tendermint/tmlibs/common" ) @@ -29,12 +28,6 @@ const ( shutdown ) -const subscribeTimeout = 10 * time.Millisecond - -var ( - ErrorOverflow = errors.New("server overflowed") -) - type cmd struct { op operation query Query @@ -97,37 +90,53 @@ func (s Server) BufferCapacity() int { // Subscribe returns a channel on which messages matching the given query can // be received. If the subscription already exists old channel will be closed -// and new one returned. Error will be returned to the caller if the server is -// overflowed. -func (s *Server) Subscribe(clientID string, query Query, out chan<- interface{}) error { +// and new one returned. Error will be returned to the caller if the context is +// cancelled. +func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error { select { case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: return nil - case <-time.After(subscribeTimeout): - return ErrorOverflow + case <-ctx.Done(): + return ctx.Err() } } -// Unsubscribe unsubscribes the given client from the query. Blocking. -func (s *Server) Unsubscribe(clientID string, query Query) { - s.cmds <- cmd{op: unsub, clientID: clientID, query: query} +// Unsubscribe unsubscribes the given client from the query. Error will be +// returned to the caller if the context is cancelled. +func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error { + select { + case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}: + return nil + case <-ctx.Done(): + return ctx.Err() + } } // Unsubscribe unsubscribes the given channel. Blocking. -func (s *Server) UnsubscribeAll(clientID string) { - s.cmds <- cmd{op: unsub, clientID: clientID} +func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { + select { + case s.cmds <- cmd{op: unsub, clientID: clientID}: + return nil + case <-ctx.Done(): + return ctx.Err() + } } // Publish publishes the given message. Blocking. -func (s *Server) Publish(msg interface{}) { - s.PublishWithTags(msg, make(map[string]interface{})) +func (s *Server) Publish(ctx context.Context, msg interface{}) error { + return s.PublishWithTags(ctx, msg, make(map[string]interface{})) } // PublishWithTags publishes the given message with a set of tags. This set of // tags will be matched with client queries. If there is a match, the message // will be sent to a client. Blocking. -func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) { - s.cmds <- cmd{op: pub, msg: msg, tags: tags} +func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error { + select { + case s.cmds <- cmd{op: pub, msg: msg, tags: tags}: + return nil + case <-ctx.Done(): + return ctx.Err() + } } // OnStop implements Service.OnStop by shutting down the server. diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index fb15b3489..9c9841440 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -1,6 +1,7 @@ package pubsub_test import ( + "context" "fmt" "runtime/debug" "testing" @@ -24,13 +25,16 @@ func TestSubscribe(t *testing.T) { s.Start() defer s.Stop() + ctx := context.Background() ch := make(chan interface{}, 1) - err := s.Subscribe(clientID, query.Empty{}, ch) + err := s.Subscribe(ctx, clientID, query.Empty{}, ch) + require.NoError(t, err) + err = s.Publish(ctx, "Ka-Zar") require.NoError(t, err) - s.Publish("Ka-Zar") assertReceive(t, "Ka-Zar", ch) - s.Publish("Quicksilver") + err = s.Publish(ctx, "Quicksilver") + require.NoError(t, err) assertReceive(t, "Quicksilver", ch) } @@ -39,23 +43,28 @@ func TestDifferentClients(t *testing.T) { s.SetLogger(log.TestingLogger()) s.Start() defer s.Stop() + + ctx := context.Background() ch1 := make(chan interface{}, 1) - err := s.Subscribe("client-1", query.MustParse("tm.events.type=NewBlock"), ch1) + err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type=NewBlock"), ch1) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) require.NoError(t, err) - s.PublishWithTags("Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) assertReceive(t, "Iceman", ch1) ch2 := make(chan interface{}, 1) - err = s.Subscribe("client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2) + err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) require.NoError(t, err) - s.PublishWithTags("Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) assertReceive(t, "Ultimo", ch1) assertReceive(t, "Ultimo", ch2) ch3 := make(chan interface{}, 1) - err = s.Subscribe("client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3) + err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) require.NoError(t, err) - s.PublishWithTags("Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) assert.Zero(t, len(ch3)) } @@ -65,22 +74,25 @@ func TestClientResubscribes(t *testing.T) { s.Start() defer s.Stop() + ctx := context.Background() q := query.MustParse("tm.events.type=NewBlock") ch1 := make(chan interface{}, 1) - err := s.Subscribe(clientID, q, ch1) + err := s.Subscribe(ctx, clientID, q, ch1) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) require.NoError(t, err) - s.PublishWithTags("Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) assertReceive(t, "Goblin Queen", ch1) ch2 := make(chan interface{}, 1) - err = s.Subscribe(clientID, q, ch2) + err = s.Subscribe(ctx, clientID, q, ch2) require.NoError(t, err) _, ok := <-ch1 assert.False(t, ok) - s.PublishWithTags("Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) + err = s.PublishWithTags(ctx, "Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) + require.NoError(t, err) assertReceive(t, "Spider-Man", ch2) } @@ -90,12 +102,15 @@ func TestUnsubscribe(t *testing.T) { s.Start() defer s.Stop() + ctx := context.Background() ch := make(chan interface{}) - err := s.Subscribe(clientID, query.Empty{}, ch) + err := s.Subscribe(ctx, clientID, query.Empty{}, ch) + require.NoError(t, err) + err = s.Unsubscribe(ctx, clientID, query.Empty{}) require.NoError(t, err) - s.Unsubscribe(clientID, query.Empty{}) - s.Publish("Nick Fury") + err = s.Publish(ctx, "Nick Fury") + require.NoError(t, err) assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") _, ok := <-ch @@ -108,15 +123,18 @@ func TestUnsubscribeAll(t *testing.T) { s.Start() defer s.Stop() + ctx := context.Background() ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) - err := s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch1) + err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type=NewBlock"), ch1) require.NoError(t, err) - err = s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2) + err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2) require.NoError(t, err) - s.UnsubscribeAll(clientID) + err = s.UnsubscribeAll(ctx, clientID) + require.NoError(t, err) - s.Publish("Nick Fury") + err = s.Publish(ctx, "Nick Fury") + require.NoError(t, err) assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") @@ -130,19 +148,11 @@ func TestBufferCapacity(t *testing.T) { s := pubsub.NewServer(pubsub.BufferCapacity(2)) s.SetLogger(log.TestingLogger()) - s.Publish("Nighthawk") - s.Publish("Sage") -} - -func TestSubscribeReturnsErrorIfServerOverflowed(t *testing.T) { - s := pubsub.NewServer() - s.SetLogger(log.TestingLogger()) - - ch := make(chan interface{}, 1) - err := s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch) - if assert.Error(t, err) { - assert.Equal(t, pubsub.ErrorOverflow, err) - } + ctx := context.Background() + err := s.Publish(ctx, "Nighthawk") + require.NoError(t, err) + err = s.Publish(ctx, "Sage") + require.NoError(t, err) } func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } @@ -158,19 +168,20 @@ func benchmarkNClients(n int, b *testing.B) { s.Start() defer s.Stop() + ctx := context.Background() for i := 0; i < n; i++ { ch := make(chan interface{}) go func() { for range ch { } }() - s.Subscribe(clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = Ivan AND abci.Invoices.Number = %d", i)), ch) + s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = Ivan AND abci.Invoices.Number = %d", i)), ch) } b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.PublishWithTags("Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i}) + s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i}) } } @@ -179,6 +190,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { s.Start() defer s.Stop() + ctx := context.Background() q := query.MustParse("abci.Account.Owner = Ivan AND abci.Invoices.Number = 1") for i := 0; i < n; i++ { ch := make(chan interface{}) @@ -186,13 +198,13 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { for range ch { } }() - s.Subscribe(clientID, q, ch) + s.Subscribe(ctx, clientID, q, ch) } b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.PublishWithTags("Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}) + s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}) } } From 17d6091ef42305e0a32a119e815f961924b64de2 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Sat, 15 Jul 2017 13:33:47 +0300 Subject: [PATCH 08/10] updates as per Bucky's comments --- pubsub/pubsub.go | 38 ++++++++++++++++++-------------------- pubsub/pubsub_test.go | 7 +++++++ 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 34df86a45..f5df418a8 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -9,8 +9,6 @@ // When some message is published, we match it with all queries. If there is a // match, this message will be pushed to all clients, subscribed to that query. // See query subpackage for our implementation. -// -// Subscribe/Unsubscribe calls are always blocking. package pubsub import ( @@ -42,7 +40,7 @@ type Query interface { Matches(tags map[string]interface{}) bool } -// Server allows clients to subscribe/unsubscribe for messages, pubsling +// Server allows clients to subscribe/unsubscribe for messages, publishing // messages with or without tags, and manages internal state. type Server struct { cmn.BaseService @@ -83,15 +81,15 @@ func BufferCapacity(cap int) Option { } } -// Returns capacity of the internal server's queue. +// BufferCapacity returns capacity of the internal server's queue. func (s Server) BufferCapacity() int { return s.cmdsCap } -// Subscribe returns a channel on which messages matching the given query can -// be received. If the subscription already exists old channel will be closed -// and new one returned. Error will be returned to the caller if the context is -// cancelled. +// Subscribe creates a subscription for the given client. It accepts a channel +// on which messages matching the given query can be received. If the +// subscription already exists, the old channel will be closed. An error will +// be returned to the caller if the context is canceled. func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error { select { case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: @@ -101,8 +99,8 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou } } -// Unsubscribe unsubscribes the given client from the query. Error will be -// returned to the caller if the context is cancelled. +// Unsubscribe removes the subscription on the given query. An error will be +// returned to the caller if the context is canceled. func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error { select { case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}: @@ -112,7 +110,8 @@ func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) } } -// Unsubscribe unsubscribes the given channel. Blocking. +// Unsubscribe removes all client subscriptions. An error will be returned to +// the caller if the context is canceled. func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { select { case s.cmds <- cmd{op: unsub, clientID: clientID}: @@ -122,14 +121,15 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { } } -// Publish publishes the given message. Blocking. +// Publish publishes the given message. An error will be returned to the caller +// if the context is canceled. func (s *Server) Publish(ctx context.Context, msg interface{}) error { return s.PublishWithTags(ctx, msg, make(map[string]interface{})) } -// PublishWithTags publishes the given message with a set of tags. This set of -// tags will be matched with client queries. If there is a match, the message -// will be sent to a client. Blocking. +// PublishWithTags publishes the given message with the set of tags. The set is +// matched with clients queries. If there is a match, the message is sent to +// the client. func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error { select { case s.cmds <- cmd{op: pub, msg: msg, tags: tags}: @@ -152,7 +152,7 @@ type state struct { clients map[string]map[Query]struct{} } -// OnStart implements Service.OnStart by creating a main loop. +// OnStart implements Service.OnStart by starting the server. func (s *Server) OnStart() error { go s.loop(state{ queries: make(map[Query]map[string]chan<- interface{}), @@ -194,6 +194,8 @@ func (state *state) add(clientID string, q Query, ch chan<- interface{}) { close(oldCh) } } + + // create subscription state.queries[q][clientID] = ch // add client if needed @@ -201,10 +203,6 @@ func (state *state) add(clientID string, q Query, ch chan<- interface{}) { state.clients[clientID] = make(map[Query]struct{}) } state.clients[clientID][q] = struct{}{} - - // create subscription - clientToChannelMap := state.queries[q] - clientToChannelMap[clientID] = ch } func (state *state) remove(clientID string, q Query) { diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 9c9841440..9d003cff0 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -153,6 +153,13 @@ func TestBufferCapacity(t *testing.T) { require.NoError(t, err) err = s.Publish(ctx, "Sage") require.NoError(t, err) + + ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + err = s.Publish(ctx, "Ironclad") + if assert.Error(t, err) { + assert.Equal(t, context.DeadlineExceeded, err) + } } func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } From 992c54253f14b1d76766403bb2ca25ede71d5022 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 18 Jul 2017 11:47:30 +0300 Subject: [PATCH 09/10] fixes from gometalinter review --- pubsub/pubsub.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index f5df418a8..52b8361f8 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -110,8 +110,8 @@ func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) } } -// Unsubscribe removes all client subscriptions. An error will be returned to -// the caller if the context is canceled. +// UnsubscribeAll removes all client subscriptions. An error will be returned +// to the caller if the context is canceled. func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { select { case s.cmds <- cmd{op: unsub, clientID: clientID}: @@ -172,7 +172,7 @@ loop: state.removeAll(cmd.clientID) } case shutdown: - for clientID, _ := range state.clients { + for clientID := range state.clients { state.removeAll(clientID) } break loop @@ -232,7 +232,7 @@ func (state *state) removeAll(clientID string) { return } - for q, _ := range queryMap { + for q := range queryMap { ch := state.queries[q][clientID] close(ch) From 77f6febb034e36424791a92848385bd420930a9b Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 20 Jul 2017 11:46:22 +0300 Subject: [PATCH 10/10] rename TestClientResubscribes to TestClientSubscribesTwice test UnsubscribeAll properly test BufferCapacity getter --- pubsub/pubsub_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 9d003cff0..85b4b1e4b 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -68,7 +68,7 @@ func TestDifferentClients(t *testing.T) { assert.Zero(t, len(ch3)) } -func TestClientResubscribes(t *testing.T) { +func TestClientSubscribesTwice(t *testing.T) { s := pubsub.NewServer() s.SetLogger(log.TestingLogger()) s.Start() @@ -125,9 +125,9 @@ func TestUnsubscribeAll(t *testing.T) { ctx := context.Background() ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) - err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type=NewBlock"), ch1) + err := s.Subscribe(ctx, clientID, query.Empty{}, ch1) require.NoError(t, err) - err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2) + err = s.Subscribe(ctx, clientID, query.Empty{}, ch2) require.NoError(t, err) err = s.UnsubscribeAll(ctx, clientID) @@ -148,6 +148,8 @@ func TestBufferCapacity(t *testing.T) { s := pubsub.NewServer(pubsub.BufferCapacity(2)) s.SetLogger(log.TestingLogger()) + assert.Equal(t, 2, s.BufferCapacity()) + ctx := context.Background() err := s.Publish(ctx, "Nighthawk") require.NoError(t, err)