Browse Source

Merge pull request #28 from tendermint/feature/376-events.v2

New pubsub package plus the query subpackage
Ethan Buchman 8 years ago
committed by GitHub
13 changed files with 2691 additions and 0 deletions
  1. +2
  2. +27
  3. +253
  4. +234
  5. +11
  6. +14
  7. +16
  8. +30
  9. +81
  10. +258
  11. +33
  12. +1668
  13. +64

+ 2
- 0
.gitignore View File

@ -1,3 +1,5 @@

+ 27
- 0
pubsub/example_test.go View File

@ -0,0 +1,27 @@
package pubsub_test
import (
func TestExample(t *testing.T) {
s := pubsub.NewServer()
defer s.Stop()
ctx := context.Background()
ch := make(chan interface{}, 1)
err := s.Subscribe(ctx, "example-client", query.MustParse(""), ch)
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Tombstone", map[string]interface{}{"": "John"})
require.NoError(t, err)
assertReceive(t, "Tombstone", ch)

+ 253
- 0
pubsub/pubsub.go View File

@ -0,0 +1,253 @@
// Package pubsub implements a pub-sub model with a single publisher (Server)
// and multiple subscribers (clients).
// Though you can have multiple publishers by sharing a pointer to a server or
// by giving the same channel to each publisher and publishing messages from
// that channel (fan-in).
// Clients subscribe for messages, which could be of any type, using a query.
// When some message is published, we match it with all queries. If there is a
// match, this message will be pushed to all clients, subscribed to that query.
// See query subpackage for our implementation.
package pubsub
import (
cmn ""
type operation int
const (
sub operation = iota
type cmd struct {
op operation
query Query
ch chan<- interface{}
clientID string
msg interface{}
tags map[string]interface{}
// Query defines an interface for a query to be used for subscribing.
type Query interface {
Matches(tags map[string]interface{}) bool
// Server allows clients to subscribe/unsubscribe for messages, publishing
// messages with or without tags, and manages internal state.
type Server struct {
cmds chan cmd
cmdsCap int
// Option sets a parameter for the server.
type Option func(*Server)
// NewServer returns a new server. See the commentary on the Option functions
// for a detailed description of how to configure buffering. If no options are
// provided, the resulting server's queue is unbuffered.
func NewServer(options ...Option) *Server {
s := &Server{}
s.BaseService = *cmn.NewBaseService(nil, "PubSub", s)
for _, option := range options {
// if BufferCapacity option was not set, the channel is unbuffered
s.cmds = make(chan cmd, s.cmdsCap)
return s
// BufferCapacity allows you to specify capacity for the internal server's
// queue. Since the server, given Y subscribers, could only process X messages,
// this option could be used to survive spikes (e.g. high amount of
// transactions during peak hours).
func BufferCapacity(cap int) Option {
return func(s *Server) {
if cap > 0 {
s.cmdsCap = cap
// BufferCapacity returns capacity of the internal server's queue.
func (s Server) BufferCapacity() int {
return s.cmdsCap
// Subscribe creates a subscription for the given client. It accepts a channel
// on which messages matching the given query can be received. If the
// subscription already exists, the old channel will be closed. An error will
// be returned to the caller if the context is canceled.
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error {
select {
case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
return nil
case <-ctx.Done():
return ctx.Err()
// Unsubscribe removes the subscription on the given query. An error will be
// returned to the caller if the context is canceled.
func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error {
select {
case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}:
return nil
case <-ctx.Done():
return ctx.Err()
// UnsubscribeAll removes all client subscriptions. An error will be returned
// to the caller if the context is canceled.
func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
select {
case s.cmds <- cmd{op: unsub, clientID: clientID}:
return nil
case <-ctx.Done():
return ctx.Err()
// Publish publishes the given message. An error will be returned to the caller
// if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error {
return s.PublishWithTags(ctx, msg, make(map[string]interface{}))
// PublishWithTags publishes the given message with the set of tags. The set is
// matched with clients queries. If there is a match, the message is sent to
// the client.
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error {
select {
case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
return nil
case <-ctx.Done():
return ctx.Err()
// OnStop implements Service.OnStop by shutting down the server.
func (s *Server) OnStop() {
s.cmds <- cmd{op: shutdown}
// NOTE: not goroutine safe
type state struct {
// query -> client -> ch
queries map[Query]map[string]chan<- interface{}
// client -> query -> struct{}
clients map[string]map[Query]struct{}
// OnStart implements Service.OnStart by starting the server.
func (s *Server) OnStart() error {
go s.loop(state{
queries: make(map[Query]map[string]chan<- interface{}),
clients: make(map[string]map[Query]struct{}),
return nil
func (s *Server) loop(state state) {
for cmd := range s.cmds {
switch cmd.op {
case unsub:
if cmd.query != nil {
state.remove(cmd.clientID, cmd.query)
} else {
case shutdown:
for clientID := range state.clients {
break loop
case sub:
state.add(cmd.clientID, cmd.query,
case pub:
state.send(cmd.msg, cmd.tags)
func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
// add query if needed
if clientToChannelMap, ok := state.queries[q]; !ok {
state.queries[q] = make(map[string]chan<- interface{})
} else {
// check if already subscribed
if oldCh, ok := clientToChannelMap[clientID]; ok {
// create subscription
state.queries[q][clientID] = ch
// add client if needed
if _, ok := state.clients[clientID]; !ok {
state.clients[clientID] = make(map[Query]struct{})
state.clients[clientID][q] = struct{}{}
func (state *state) remove(clientID string, q Query) {
clientToChannelMap, ok := state.queries[q]
if !ok {
ch, ok := clientToChannelMap[clientID]
if ok {
delete(state.clients[clientID], q)
// if it not subscribed to anything else, remove the client
if len(state.clients[clientID]) == 0 {
delete(state.clients, clientID)
delete(state.queries[q], clientID)
func (state *state) removeAll(clientID string) {
queryMap, ok := state.clients[clientID]
if !ok {
for q := range queryMap {
ch := state.queries[q][clientID]
delete(state.queries[q], clientID)
delete(state.clients, clientID)
func (state *state) send(msg interface{}, tags map[string]interface{}) {
for q, clientToChannelMap := range state.queries {
if q.Matches(tags) {
for _, ch := range clientToChannelMap {
ch <- msg

+ 234
- 0
pubsub/pubsub_test.go View File

@ -0,0 +1,234 @@
package pubsub_test
import (
const (
clientID = "test-client"
func TestSubscribe(t *testing.T) {
s := pubsub.NewServer()
defer s.Stop()
ctx := context.Background()
ch := make(chan interface{}, 1)
err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
require.NoError(t, err)
err = s.Publish(ctx, "Ka-Zar")
require.NoError(t, err)
assertReceive(t, "Ka-Zar", ch)
err = s.Publish(ctx, "Quicksilver")
require.NoError(t, err)
assertReceive(t, "Quicksilver", ch)
func TestDifferentClients(t *testing.T) {
s := pubsub.NewServer()
defer s.Stop()
ctx := context.Background()
ch1 := make(chan interface{}, 1)
err := s.Subscribe(ctx, "client-1", query.MustParse(""), ch1)
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Iceman", map[string]interface{}{"": "NewBlock"})
require.NoError(t, err)
assertReceive(t, "Iceman", ch1)
ch2 := make(chan interface{}, 1)
err = s.Subscribe(ctx, "client-2", query.MustParse(" AND"), ch2)
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Ultimo", map[string]interface{}{"": "NewBlock", "": "Igor"})
require.NoError(t, err)
assertReceive(t, "Ultimo", ch1)
assertReceive(t, "Ultimo", ch2)
ch3 := make(chan interface{}, 1)
err = s.Subscribe(ctx, "client-3", query.MustParse(" AND AND abci.invoice.number = 10"), ch3)
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Valeria Richards", map[string]interface{}{"": "NewRoundStep"})
require.NoError(t, err)
assert.Zero(t, len(ch3))
func TestClientSubscribesTwice(t *testing.T) {
s := pubsub.NewServer()
defer s.Stop()
ctx := context.Background()
q := query.MustParse("")
ch1 := make(chan interface{}, 1)
err := s.Subscribe(ctx, clientID, q, ch1)
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Goblin Queen", map[string]interface{}{"": "NewBlock"})
require.NoError(t, err)
assertReceive(t, "Goblin Queen", ch1)
ch2 := make(chan interface{}, 1)
err = s.Subscribe(ctx, clientID, q, ch2)
require.NoError(t, err)
_, ok := <-ch1
assert.False(t, ok)
err = s.PublishWithTags(ctx, "Spider-Man", map[string]interface{}{"": "NewBlock"})
require.NoError(t, err)
assertReceive(t, "Spider-Man", ch2)
func TestUnsubscribe(t *testing.T) {
s := pubsub.NewServer()
defer s.Stop()
ctx := context.Background()
ch := make(chan interface{})
err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
require.NoError(t, err)
err = s.Unsubscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
err = s.Publish(ctx, "Nick Fury")
require.NoError(t, err)
assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe")
_, ok := <-ch
assert.False(t, ok)
func TestUnsubscribeAll(t *testing.T) {
s := pubsub.NewServer()
defer s.Stop()
ctx := context.Background()
ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1)
err := s.Subscribe(ctx, clientID, query.Empty{}, ch1)
require.NoError(t, err)
err = s.Subscribe(ctx, clientID, query.Empty{}, ch2)
require.NoError(t, err)
err = s.UnsubscribeAll(ctx, clientID)
require.NoError(t, err)
err = s.Publish(ctx, "Nick Fury")
require.NoError(t, err)
assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll")
assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll")
_, ok := <-ch1
assert.False(t, ok)
_, ok = <-ch2
assert.False(t, ok)
func TestBufferCapacity(t *testing.T) {
s := pubsub.NewServer(pubsub.BufferCapacity(2))
assert.Equal(t, 2, s.BufferCapacity())
ctx := context.Background()
err := s.Publish(ctx, "Nighthawk")
require.NoError(t, err)
err = s.Publish(ctx, "Sage")
require.NoError(t, err)
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
err = s.Publish(ctx, "Ironclad")
if assert.Error(t, err) {
assert.Equal(t, context.DeadlineExceeded, err)
func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) }
func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) }
func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) }
func Benchmark10ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(10, b) }
func Benchmark100ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(100, b) }
func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) }
func benchmarkNClients(n int, b *testing.B) {
s := pubsub.NewServer()
defer s.Stop()
ctx := context.Background()
for i := 0; i < n; i++ {
ch := make(chan interface{})
go func() {
for range ch {
s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = Ivan AND abci.Invoices.Number = %d", i)), ch)
for i := 0; i < b.N; i++ {
s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})
func benchmarkNClientsOneQuery(n int, b *testing.B) {
s := pubsub.NewServer()
defer s.Stop()
ctx := context.Background()
q := query.MustParse("abci.Account.Owner = Ivan AND abci.Invoices.Number = 1")
for i := 0; i < n; i++ {
ch := make(chan interface{})
go func() {
for range ch {
s.Subscribe(ctx, clientID, q, ch)
for i := 0; i < b.N; i++ {
s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})
func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) {
select {
case actual := <-ch:
if actual != nil {
assert.Equal(t, expected, actual, msgAndArgs...)
case <-time.After(1 * time.Second):
t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected)

+ 11
- 0
pubsub/query/Makefile View File

@ -0,0 +1,11 @@
@go get
peg -inline -switch query.peg
@go get
@go get
go-fuzz -bin=./ -workdir=./fuzz_test/output
.PHONY: gen_query_parser fuzzy_test

+ 14
- 0
pubsub/query/empty.go View File

@ -0,0 +1,14 @@
package query
// Empty query matches any set of tags.
type Empty struct {
// Matches always returns true.
func (Empty) Matches(tags map[string]interface{}) bool {
return true
func (Empty) String() string {
return "empty"

+ 16
- 0
pubsub/query/empty_test.go View File

@ -0,0 +1,16 @@
package query_test
import (
func TestEmptyQueryMatchesAnything(t *testing.T) {
q := query.Empty{}
assert.True(t, q.Matches(map[string]interface{}{}))
assert.True(t, q.Matches(map[string]interface{}{"Asher": "Roth"}))
assert.True(t, q.Matches(map[string]interface{}{"Route": 66}))
assert.True(t, q.Matches(map[string]interface{}{"Route": 66, "Billy": "Blue"}))

+ 30
- 0
pubsub/query/fuzz_test/main.go View File

@ -0,0 +1,30 @@
package fuzz_test
import (
func Fuzz(data []byte) int {
sdata := string(data)
q0, err := query.New(sdata)
if err != nil {
return 0
sdata1 := q0.String()
q1, err := query.New(sdata1)
if err != nil {
sdata2 := q1.String()
if sdata1 != sdata2 {
fmt.Printf("q0: %q\n", sdata1)
fmt.Printf("q1: %q\n", sdata2)
panic("query changed")
return 1

+ 81
- 0
pubsub/query/parser_test.go View File

@ -0,0 +1,81 @@
package query_test
import (
// TODO: fuzzy testing?
func TestParser(t *testing.T) {
cases := []struct {
query string
valid bool
{"", true},
{" = NewBlock", true},
{"", true},
{"", true},
{"", false},
{">==", false},
{" NewBlock =", false},
{">NewBlock", false},
{"", false},
{"=", false},
{"=NewBlock", false},
{"", false},
{"", false},
{"NewBlock", false},
{"", false},
{" AND", true},
{" AND", false},
{" AN", false},
{" AN", false},
{"AND ", false},
{" CONTAINS Igor", true},
{" > DATE 2013-05-03", true},
{" < DATE 2013-05-03", true},
{" <= DATE 2013-05-03", true},
{" >= DATE 2013-05-03", true},
{" >= DAT 2013-05-03", false},
{" <= DATE2013-05-03", false},
{" <= DATE -05-03", false},
{" >= DATE 20130503", false},
{" >= DATE 2013+01-03", false},
// incorrect year, month, day
{" >= DATE 0013-01-03", false},
{" >= DATE 2013-31-03", false},
{" >= DATE 2013-01-83", false},
{" > TIME 2013-05-03T14:45:00+07:00", true},
{" < TIME 2013-05-03T14:45:00-02:00", true},
{" <= TIME 2013-05-03T14:45:00Z", true},
{" >= TIME 2013-05-03T14:45:00Z", true},
{" >= TIME2013-05-03T14:45:00Z", false},
{" = IME 2013-05-03T14:45:00Z", false},
{" = TIME 2013-05-:45:00Z", false},
{" >= TIME 2013-05-03T14:45:00", false},
{" >= TIME 0013-00-00T14:45:00Z", false},
{" >= TIME 2013+05=03T14:45:00Z", false},
{"account.balance=100", true},
{"account.balance >= 200", true},
{"account.balance >= -300", false},
{"account.balance >>= 400", false},
{"account.balance=33.22.1", false},
for _, c := range cases {
_, err := query.New(c.query)
if c.valid {
assert.NoError(t, err, "Query was '%s'", c.query)
} else {
assert.Error(t, err, "Query was '%s'", c.query)

+ 258
- 0
pubsub/query/query.go View File

@ -0,0 +1,258 @@
// Package query provides a parser for a custom query format:
// abci.invoice.number=22 AND abci.invoice.owner=Ivan
// See query.peg for the grammar, which is a
// More:
// It has a support for numbers (integer and floating point), dates and times.
package query
import (
// Query holds the query string and the query parser.
type Query struct {
str string
parser *QueryParser
// New parses the given string and returns a query or error if the string is
// invalid.
func New(s string) (*Query, error) {
p := &QueryParser{Buffer: fmt.Sprintf(`"%s"`, s)}
if err := p.Parse(); err != nil {
return nil, err
return &Query{str: s, parser: p}, nil
// MustParse turns the given string into a query or panics; for tests or others
// cases where you know the string is valid.
func MustParse(s string) *Query {
q, err := New(s)
if err != nil {
panic(fmt.Sprintf("failed to parse %s: %v", s, err))
return q
// String returns the original string.
func (q *Query) String() string {
return q.str
type operator uint8
const (
opLessEqual operator = iota
// Matches returns true if the query matches the given set of tags, false otherwise.
// For example, query "name=John" matches tags = {"name": "John"}. More
// examples could be found in parser_test.go and query_test.go.
func (q *Query) Matches(tags map[string]interface{}) bool {
if len(tags) == 0 {
return false
buffer, begin, end := q.parser.Buffer, 0, 0
var tag string
var op operator
// tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7")
for _, token := range q.parser.Tokens() {
switch token.pegRule {
case rulePegText:
begin, end = int(token.begin), int(token.end)
case ruletag:
tag = buffer[begin:end]
case rulele:
op = opLessEqual
case rulege:
op = opGreaterEqual
case rulel:
op = opLess
case ruleg:
op = opGreater
case ruleequal:
op = opEqual
case rulecontains:
op = opContains
case rulevalue:
// see if the triplet (tag, operator, operand) matches any tag
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
if !match(tag, op, reflect.ValueOf(buffer[begin:end]), tags) {
return false
case rulenumber:
number := buffer[begin:end]
if strings.Contains(number, ".") { // if it looks like a floating-point number
value, err := strconv.ParseFloat(number, 64)
if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number))
if !match(tag, op, reflect.ValueOf(value), tags) {
return false
} else {
value, err := strconv.ParseInt(number, 10, 64)
if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number))
if !match(tag, op, reflect.ValueOf(value), tags) {
return false
case ruletime:
value, err := time.Parse(time.RFC3339, buffer[begin:end])
if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end]))
if !match(tag, op, reflect.ValueOf(value), tags) {
return false
case ruledate:
value, err := time.Parse("2006-01-02", buffer[begin:end])
if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end]))
if !match(tag, op, reflect.ValueOf(value), tags) {
return false
return true
// match returns true if the given triplet (tag, operator, operand) matches any tag.
// First, it looks up the tag in tags and if it finds one, tries to compare the
// value from it to the operand using the operator.
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
func match(tag string, op operator, operand reflect.Value, tags map[string]interface{}) bool {
// look up the tag from the query in tags
value, ok := tags[tag]
if !ok {
return false
switch operand.Kind() {
case reflect.Struct: // time
operandAsTime := operand.Interface().(time.Time)
v, ok := value.(time.Time)
if !ok { // if value from tags is not time.Time
return false
switch op {
case opLessEqual:
return v.Before(operandAsTime) || v.Equal(operandAsTime)
case opGreaterEqual:
return v.Equal(operandAsTime) || v.After(operandAsTime)
case opLess:
return v.Before(operandAsTime)
case opGreater:
return v.After(operandAsTime)
case opEqual:
return v.Equal(operandAsTime)
case reflect.Float64:
operandFloat64 := operand.Interface().(float64)
var v float64
// try our best to convert value from tags to float64
switch vt := value.(type) {
case float64:
v = vt
case float32:
v = float64(vt)
case int:
v = float64(vt)
case int8:
v = float64(vt)
case int16:
v = float64(vt)
case int32:
v = float64(vt)
case int64:
v = float64(vt)
default: // fail for all other types
panic(fmt.Sprintf("Incomparable types: %T (%v) vs float64 (%v)", value, value, operandFloat64))
switch op {
case opLessEqual:
return v <= operandFloat64
case opGreaterEqual:
return v >= operandFloat64
case opLess:
return v < operandFloat64
case opGreater:
return v > operandFloat64
case opEqual:
return v == operandFloat64
case reflect.Int64:
operandInt := operand.Interface().(int64)
var v int64
// try our best to convert value from tags to int64
switch vt := value.(type) {
case int64:
v = vt
case int8:
v = int64(vt)
case int16:
v = int64(vt)
case int32:
v = int64(vt)
case int:
v = int64(vt)
case float64:
v = int64(vt)
case float32:
v = int64(vt)
default: // fail for all other types
panic(fmt.Sprintf("Incomparable types: %T (%v) vs int64 (%v)", value, value, operandInt))
switch op {
case opLessEqual:
return v <= operandInt
case opGreaterEqual:
return v >= operandInt
case opLess:
return v < operandInt
case opGreater:
return v > operandInt
case opEqual:
return v == operandInt
case reflect.String:
v, ok := value.(string)
if !ok { // if value from tags is not string
return false
switch op {
case opEqual:
return v == operand.String()
case opContains:
return strings.Contains(v, operand.String())
panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind()))
return false

+ 33
- 0
pubsub/query/query.peg View File

@ -0,0 +1,33 @@
package query
type QueryParser Peg {
e <- '\"' condition ( ' '+ and ' '+ condition )* '\"' !.
condition <- tag ' '* (le ' '* (number / time / date)
/ ge ' '* (number / time / date)
/ l ' '* (number / time / date)
/ g ' '* (number / time / date)
/ equal ' '* (number / time / date / value)
/ contains ' '* value
tag <- < (![ \t\n\r\\()"=><] .)+ >
value <- < (![ \t\n\r\\()"=><] .)+ >
number <- < ('0'
/ [1-9] digit* ('.' digit*)?) >
digit <- [0-9]
time <- "TIME " < year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit (('-' / '+') digit digit ':' digit digit / 'Z') >
date <- "DATE " < year '-' month '-' day >
year <- ('1' / '2') digit digit digit
month <- ('0' / '1') digit
day <- ('0' / '1' / '2' / '3') digit
and <- "AND"
equal <- "="
contains <- "CONTAINS"
le <- "<="
ge <- ">="
l <- "<"
g <- ">"

+ 1668
- 0
File diff suppressed because it is too large
View File

+ 64
- 0
pubsub/query/query_test.go View File

@ -0,0 +1,64 @@
package query_test
import (
func TestMatches(t *testing.T) {
const shortForm = "2006-Jan-02"
txDate, err := time.Parse(shortForm, "2017-Jan-01")
require.NoError(t, err)
txTime, err := time.Parse(time.RFC3339, "2018-05-03T14:45:00Z")
require.NoError(t, err)
testCases := []struct {
s string
tags map[string]interface{}
err bool
matches bool
{"", map[string]interface{}{"": "NewBlock"}, false, true},
{"tx.gas > 7", map[string]interface{}{"tx.gas": 8}, false, true},
{"tx.gas > 7 AND tx.gas < 9", map[string]interface{}{"tx.gas": 8}, false, true},
{"body.weight >= 3.5", map[string]interface{}{"body.weight": 3.5}, false, true},
{"account.balance < 1000.0", map[string]interface{}{"account.balance": 900}, false, true},
{" <= 4", map[string]interface{}{"": 4.0}, false, true},
{"body.weight >= 4.5", map[string]interface{}{"body.weight": float32(4.5)}, false, true},
{" < 4 AND > 10", map[string]interface{}{"": 3, "": 12}, false, true},
{" < 4", map[string]interface{}{"": 5}, false, false},
{" > DATE 2017-01-01", map[string]interface{}{"": time.Now()}, false, true},
{" = DATE 2017-01-01", map[string]interface{}{"": txDate}, false, true},
{" = DATE 2018-01-01", map[string]interface{}{"": txDate}, false, false},
{"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": time.Now()}, false, true},
{"tx.time = TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": txTime}, false, false},
{" CONTAINS Igor", map[string]interface{}{"": "Igor,Ivan"}, false, true},
{" CONTAINS Igor", map[string]interface{}{"": "Pavel,Ivan"}, false, false},
for _, tc := range testCases {
query, err := query.New(tc.s)
if !tc.err {
require.Nil(t, err)
if tc.matches {
assert.True(t, query.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags)
} else {
assert.False(t, query.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags)
func TestMustParse(t *testing.T) {
assert.Panics(t, func() { query.MustParse("=") })
assert.NotPanics(t, func() { query.MustParse("") })
