From bb9aa85d2281aaf1705839a3f4a9f947ee018d9d Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 16 May 2018 11:03:11 +0400 Subject: [PATCH] copy events and pubsub packages from tmlibs Refs #847 --- .gitignore | 2 + Gopkg.lock | 5 +- consensus/reactor.go | 2 +- consensus/state.go | 2 +- consensus/state_test.go | 2 +- libs/events/Makefile | 9 + libs/events/README.md | 175 +++ libs/events/event_cache.go | 37 + libs/events/event_cache_test.go | 35 + libs/events/events.go | 226 ++++ libs/events/events_test.go | 380 +++++++ libs/pubsub/example_test.go | 28 + libs/pubsub/pubsub.go | 342 ++++++ libs/pubsub/pubsub_test.go | 253 +++++ libs/pubsub/query/Makefile | 11 + libs/pubsub/query/empty.go | 16 + libs/pubsub/query/empty_test.go | 18 + libs/pubsub/query/fuzz_test/main.go | 30 + libs/pubsub/query/parser_test.go | 92 ++ libs/pubsub/query/query.go | 345 ++++++ libs/pubsub/query/query.peg | 33 + libs/pubsub/query/query.peg.go | 1553 +++++++++++++++++++++++++++ libs/pubsub/query/query_test.go | 87 ++ rpc/client/httpclient.go | 2 +- rpc/client/localclient.go | 2 +- rpc/core/events.go | 6 +- rpc/core/tx.go | 5 +- rpc/lib/types/types.go | 2 +- state/txindex/indexer.go | 2 +- state/txindex/indexer_service.go | 3 +- state/txindex/kv/kv.go | 2 +- state/txindex/kv/kv_test.go | 2 +- state/txindex/null/null.go | 2 +- types/event_bus.go | 2 +- types/event_bus_test.go | 4 +- types/events.go | 6 +- types/nop_event_bus.go | 2 +- 37 files changed, 3698 insertions(+), 27 deletions(-) create mode 100644 libs/events/Makefile create mode 100644 libs/events/README.md create mode 100644 libs/events/event_cache.go create mode 100644 libs/events/event_cache_test.go create mode 100644 libs/events/events.go create mode 100644 libs/events/events_test.go create mode 100644 libs/pubsub/example_test.go create mode 100644 libs/pubsub/pubsub.go create mode 100644 libs/pubsub/pubsub_test.go create mode 100644 libs/pubsub/query/Makefile create mode 100644 libs/pubsub/query/empty.go create mode 100644 libs/pubsub/query/empty_test.go create mode 100644 libs/pubsub/query/fuzz_test/main.go create mode 100644 libs/pubsub/query/parser_test.go create mode 100644 libs/pubsub/query/query.go create mode 100644 libs/pubsub/query/query.peg create mode 100644 libs/pubsub/query/query.peg.go create mode 100644 libs/pubsub/query/query_test.go diff --git a/.gitignore b/.gitignore index e76fb1fc5..c67019bd9 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,5 @@ scripts/cutWALUntil/cutWALUntil .idea/ *.iml + +libs/pubsub/query/fuzz_test/output diff --git a/Gopkg.lock b/Gopkg.lock index df971a47b..6e34258f4 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -278,12 +278,9 @@ "clist", "common", "db", - "events", "flowrate", "log", "merkle", - "pubsub", - "pubsub/query", "test" ] revision = "cc5f287c4798ffe88c04d02df219ecb6932080fd" @@ -385,6 +382,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "a88c20b6e36b3529d6fdcffc3603d9eb193fc3809de8afbba07bad990539b256" + inputs-digest = "d85c98dcac32cc1fe05d006aa75e8985f6447a150a041b972a673a65e7681da9" solver-name = "gps-cdcl" solver-version = 1 diff --git a/consensus/reactor.go b/consensus/reactor.go index f171aa8d9..19b0c0fe2 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -10,10 +10,10 @@ import ( amino "github.com/tendermint/go-amino" cmn "github.com/tendermint/tmlibs/common" - tmevents "github.com/tendermint/tmlibs/events" "github.com/tendermint/tmlibs/log" cstypes "github.com/tendermint/tendermint/consensus/types" + tmevents "github.com/tendermint/tendermint/libs/events" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" diff --git a/consensus/state.go b/consensus/state.go index 0c6f7b487..3b713e2ec 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -11,11 +11,11 @@ import ( fail "github.com/ebuchman/fail-test" cmn "github.com/tendermint/tmlibs/common" - tmevents "github.com/tendermint/tmlibs/events" "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" cstypes "github.com/tendermint/tendermint/consensus/types" + tmevents "github.com/tendermint/tendermint/libs/events" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" diff --git a/consensus/state_test.go b/consensus/state_test.go index 0d7cad484..f4d79ca77 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -11,7 +11,7 @@ import ( "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" - tmpubsub "github.com/tendermint/tmlibs/pubsub" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ) func init() { diff --git a/libs/events/Makefile b/libs/events/Makefile new file mode 100644 index 000000000..696aafff1 --- /dev/null +++ b/libs/events/Makefile @@ -0,0 +1,9 @@ +.PHONY: docs +REPO:=github.com/tendermint/tendermint/libs/events + +docs: + @go get github.com/davecheney/godoc2md + godoc2md $(REPO) > README.md + +test: + go test -v ./... diff --git a/libs/events/README.md b/libs/events/README.md new file mode 100644 index 000000000..14aa498ff --- /dev/null +++ b/libs/events/README.md @@ -0,0 +1,175 @@ + + +# events +`import "github.com/tendermint/tendermint/libs/events"` + +* [Overview](#pkg-overview) +* [Index](#pkg-index) + +## Overview +Pub-Sub in go with event caching + + + + +## Index +* [type EventCache](#EventCache) + * [func NewEventCache(evsw Fireable) *EventCache](#NewEventCache) + * [func (evc *EventCache) FireEvent(event string, data EventData)](#EventCache.FireEvent) + * [func (evc *EventCache) Flush()](#EventCache.Flush) +* [type EventCallback](#EventCallback) +* [type EventData](#EventData) +* [type EventSwitch](#EventSwitch) + * [func NewEventSwitch() EventSwitch](#NewEventSwitch) +* [type Eventable](#Eventable) +* [type Fireable](#Fireable) + + +#### Package files +[event_cache.go](/src/github.com/tendermint/tendermint/libs/events/event_cache.go) [events.go](/src/github.com/tendermint/tendermint/libs/events/events.go) + + + + + + +## type [EventCache](/src/target/event_cache.go?s=116:179#L5) +``` go +type EventCache struct { + // contains filtered or unexported fields +} +``` +An EventCache buffers events for a Fireable +All events are cached. Filtering happens on Flush + + + + + + + +### func [NewEventCache](/src/target/event_cache.go?s=239:284#L11) +``` go +func NewEventCache(evsw Fireable) *EventCache +``` +Create a new EventCache with an EventSwitch as backend + + + + + +### func (\*EventCache) [FireEvent](/src/target/event_cache.go?s=449:511#L24) +``` go +func (evc *EventCache) FireEvent(event string, data EventData) +``` +Cache an event to be fired upon finality. + + + + +### func (\*EventCache) [Flush](/src/target/event_cache.go?s=735:765#L31) +``` go +func (evc *EventCache) Flush() +``` +Fire events by running evsw.FireEvent on all cached events. Blocks. +Clears cached events + + + + +## type [EventCallback](/src/target/events.go?s=4201:4240#L185) +``` go +type EventCallback func(data EventData) +``` + + + + + + + + + +## type [EventData](/src/target/events.go?s=243:294#L14) +``` go +type EventData interface { +} +``` +Generic event data can be typed and registered with tendermint/go-amino +via concrete implementation of this interface + + + + + + + + + + +## type [EventSwitch](/src/target/events.go?s=560:771#L29) +``` go +type EventSwitch interface { + cmn.Service + Fireable + + AddListenerForEvent(listenerID, event string, cb EventCallback) + RemoveListenerForEvent(event string, listenerID string) + RemoveListener(listenerID string) +} +``` + + + + + + +### func [NewEventSwitch](/src/target/events.go?s=917:950#L46) +``` go +func NewEventSwitch() EventSwitch +``` + + + + +## type [Eventable](/src/target/events.go?s=378:440#L20) +``` go +type Eventable interface { + SetEventSwitch(evsw EventSwitch) +} +``` +reactors and other modules should export +this interface to become eventable + + + + + + + + + + +## type [Fireable](/src/target/events.go?s=490:558#L25) +``` go +type Fireable interface { + FireEvent(event string, data EventData) +} +``` +an event switch or cache implements fireable + + + + + + + + + + + + + + +- - - +Generated by [godoc2md](http://godoc.org/github.com/davecheney/godoc2md) diff --git a/libs/events/event_cache.go b/libs/events/event_cache.go new file mode 100644 index 000000000..f508e873d --- /dev/null +++ b/libs/events/event_cache.go @@ -0,0 +1,37 @@ +package events + +// An EventCache buffers events for a Fireable +// All events are cached. Filtering happens on Flush +type EventCache struct { + evsw Fireable + events []eventInfo +} + +// Create a new EventCache with an EventSwitch as backend +func NewEventCache(evsw Fireable) *EventCache { + return &EventCache{ + evsw: evsw, + } +} + +// a cached event +type eventInfo struct { + event string + data EventData +} + +// Cache an event to be fired upon finality. +func (evc *EventCache) FireEvent(event string, data EventData) { + // append to list (go will grow our backing array exponentially) + evc.events = append(evc.events, eventInfo{event, data}) +} + +// Fire events by running evsw.FireEvent on all cached events. Blocks. +// Clears cached events +func (evc *EventCache) Flush() { + for _, ei := range evc.events { + evc.evsw.FireEvent(ei.event, ei.data) + } + // Clear the buffer, since we only add to it with append it's safe to just set it to nil and maybe safe an allocation + evc.events = nil +} diff --git a/libs/events/event_cache_test.go b/libs/events/event_cache_test.go new file mode 100644 index 000000000..ab321da3a --- /dev/null +++ b/libs/events/event_cache_test.go @@ -0,0 +1,35 @@ +package events + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventCache_Flush(t *testing.T) { + evsw := NewEventSwitch() + evsw.Start() + evsw.AddListenerForEvent("nothingness", "", func(data EventData) { + // Check we are not initialising an empty buffer full of zeroed eventInfos in the EventCache + require.FailNow(t, "We should never receive a message on this switch since none are fired") + }) + evc := NewEventCache(evsw) + evc.Flush() + // Check after reset + evc.Flush() + fail := true + pass := false + evsw.AddListenerForEvent("somethingness", "something", func(data EventData) { + if fail { + require.FailNow(t, "Shouldn't see a message until flushed") + } + pass = true + }) + evc.FireEvent("something", struct{ int }{1}) + evc.FireEvent("something", struct{ int }{2}) + evc.FireEvent("something", struct{ int }{3}) + fail = false + evc.Flush() + assert.True(t, pass) +} diff --git a/libs/events/events.go b/libs/events/events.go new file mode 100644 index 000000000..f1b2a754e --- /dev/null +++ b/libs/events/events.go @@ -0,0 +1,226 @@ +/* +Pub-Sub in go with event caching +*/ +package events + +import ( + "sync" + + cmn "github.com/tendermint/tmlibs/common" +) + +// Generic event data can be typed and registered with tendermint/go-amino +// via concrete implementation of this interface +type EventData interface { + //AssertIsEventData() +} + +// reactors and other modules should export +// this interface to become eventable +type Eventable interface { + SetEventSwitch(evsw EventSwitch) +} + +// an event switch or cache implements fireable +type Fireable interface { + FireEvent(event string, data EventData) +} + +type EventSwitch interface { + cmn.Service + Fireable + + AddListenerForEvent(listenerID, event string, cb EventCallback) + RemoveListenerForEvent(event string, listenerID string) + RemoveListener(listenerID string) +} + +type eventSwitch struct { + cmn.BaseService + + mtx sync.RWMutex + eventCells map[string]*eventCell + listeners map[string]*eventListener +} + +func NewEventSwitch() EventSwitch { + evsw := &eventSwitch{} + evsw.BaseService = *cmn.NewBaseService(nil, "EventSwitch", evsw) + return evsw +} + +func (evsw *eventSwitch) OnStart() error { + evsw.BaseService.OnStart() + evsw.eventCells = make(map[string]*eventCell) + evsw.listeners = make(map[string]*eventListener) + return nil +} + +func (evsw *eventSwitch) OnStop() { + evsw.mtx.Lock() + defer evsw.mtx.Unlock() + evsw.BaseService.OnStop() + evsw.eventCells = nil + evsw.listeners = nil +} + +func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) { + // Get/Create eventCell and listener + evsw.mtx.Lock() + eventCell := evsw.eventCells[event] + if eventCell == nil { + eventCell = newEventCell() + evsw.eventCells[event] = eventCell + } + listener := evsw.listeners[listenerID] + if listener == nil { + listener = newEventListener(listenerID) + evsw.listeners[listenerID] = listener + } + evsw.mtx.Unlock() + + // Add event and listener + eventCell.AddListener(listenerID, cb) + listener.AddEvent(event) +} + +func (evsw *eventSwitch) RemoveListener(listenerID string) { + // Get and remove listener + evsw.mtx.RLock() + listener := evsw.listeners[listenerID] + evsw.mtx.RUnlock() + if listener == nil { + return + } + + evsw.mtx.Lock() + delete(evsw.listeners, listenerID) + evsw.mtx.Unlock() + + // Remove callback for each event. + listener.SetRemoved() + for _, event := range listener.GetEvents() { + evsw.RemoveListenerForEvent(event, listenerID) + } +} + +func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) { + // Get eventCell + evsw.mtx.Lock() + eventCell := evsw.eventCells[event] + evsw.mtx.Unlock() + + if eventCell == nil { + return + } + + // Remove listenerID from eventCell + numListeners := eventCell.RemoveListener(listenerID) + + // Maybe garbage collect eventCell. + if numListeners == 0 { + // Lock again and double check. + evsw.mtx.Lock() // OUTER LOCK + eventCell.mtx.Lock() // INNER LOCK + if len(eventCell.listeners) == 0 { + delete(evsw.eventCells, event) + } + eventCell.mtx.Unlock() // INNER LOCK + evsw.mtx.Unlock() // OUTER LOCK + } +} + +func (evsw *eventSwitch) FireEvent(event string, data EventData) { + // Get the eventCell + evsw.mtx.RLock() + eventCell := evsw.eventCells[event] + evsw.mtx.RUnlock() + + if eventCell == nil { + return + } + + // Fire event for all listeners in eventCell + eventCell.FireEvent(data) +} + +//----------------------------------------------------------------------------- + +// eventCell handles keeping track of listener callbacks for a given event. +type eventCell struct { + mtx sync.RWMutex + listeners map[string]EventCallback +} + +func newEventCell() *eventCell { + return &eventCell{ + listeners: make(map[string]EventCallback), + } +} + +func (cell *eventCell) AddListener(listenerID string, cb EventCallback) { + cell.mtx.Lock() + cell.listeners[listenerID] = cb + cell.mtx.Unlock() +} + +func (cell *eventCell) RemoveListener(listenerID string) int { + cell.mtx.Lock() + delete(cell.listeners, listenerID) + numListeners := len(cell.listeners) + cell.mtx.Unlock() + return numListeners +} + +func (cell *eventCell) FireEvent(data EventData) { + cell.mtx.RLock() + for _, listener := range cell.listeners { + listener(data) + } + cell.mtx.RUnlock() +} + +//----------------------------------------------------------------------------- + +type EventCallback func(data EventData) + +type eventListener struct { + id string + + mtx sync.RWMutex + removed bool + events []string +} + +func newEventListener(id string) *eventListener { + return &eventListener{ + id: id, + removed: false, + events: nil, + } +} + +func (evl *eventListener) AddEvent(event string) { + evl.mtx.Lock() + defer evl.mtx.Unlock() + + if evl.removed { + return + } + evl.events = append(evl.events, event) +} + +func (evl *eventListener) GetEvents() []string { + evl.mtx.RLock() + defer evl.mtx.RUnlock() + + events := make([]string, len(evl.events)) + copy(events, evl.events) + return events +} + +func (evl *eventListener) SetRemoved() { + evl.mtx.Lock() + defer evl.mtx.Unlock() + evl.removed = true +} diff --git a/libs/events/events_test.go b/libs/events/events_test.go new file mode 100644 index 000000000..4995ae730 --- /dev/null +++ b/libs/events/events_test.go @@ -0,0 +1,380 @@ +package events + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// TestAddListenerForEventFireOnce sets up an EventSwitch, subscribes a single +// listener to an event, and sends a string "data". +func TestAddListenerForEventFireOnce(t *testing.T) { + evsw := NewEventSwitch() + err := evsw.Start() + if err != nil { + t.Errorf("Failed to start EventSwitch, error: %v", err) + } + messages := make(chan EventData) + evsw.AddListenerForEvent("listener", "event", + func(data EventData) { + messages <- data + }) + go evsw.FireEvent("event", "data") + received := <-messages + if received != "data" { + t.Errorf("Message received does not match: %v", received) + } +} + +// TestAddListenerForEventFireMany sets up an EventSwitch, subscribes a single +// listener to an event, and sends a thousand integers. +func TestAddListenerForEventFireMany(t *testing.T) { + evsw := NewEventSwitch() + err := evsw.Start() + if err != nil { + t.Errorf("Failed to start EventSwitch, error: %v", err) + } + doneSum := make(chan uint64) + doneSending := make(chan uint64) + numbers := make(chan uint64, 4) + // subscribe one listener for one event + evsw.AddListenerForEvent("listener", "event", + func(data EventData) { + numbers <- data.(uint64) + }) + // collect received events + go sumReceivedNumbers(numbers, doneSum) + // go fire events + go fireEvents(evsw, "event", doneSending, uint64(1)) + checkSum := <-doneSending + close(numbers) + eventSum := <-doneSum + if checkSum != eventSum { + t.Errorf("Not all messages sent were received.\n") + } +} + +// TestAddListenerForDifferentEvents sets up an EventSwitch, subscribes a single +// listener to three different events and sends a thousand integers for each +// of the three events. +func TestAddListenerForDifferentEvents(t *testing.T) { + evsw := NewEventSwitch() + err := evsw.Start() + if err != nil { + t.Errorf("Failed to start EventSwitch, error: %v", err) + } + doneSum := make(chan uint64) + doneSending1 := make(chan uint64) + doneSending2 := make(chan uint64) + doneSending3 := make(chan uint64) + numbers := make(chan uint64, 4) + // subscribe one listener to three events + evsw.AddListenerForEvent("listener", "event1", + func(data EventData) { + numbers <- data.(uint64) + }) + evsw.AddListenerForEvent("listener", "event2", + func(data EventData) { + numbers <- data.(uint64) + }) + evsw.AddListenerForEvent("listener", "event3", + func(data EventData) { + numbers <- data.(uint64) + }) + // collect received events + go sumReceivedNumbers(numbers, doneSum) + // go fire events + go fireEvents(evsw, "event1", doneSending1, uint64(1)) + go fireEvents(evsw, "event2", doneSending2, uint64(1)) + go fireEvents(evsw, "event3", doneSending3, uint64(1)) + var checkSum uint64 = 0 + checkSum += <-doneSending1 + checkSum += <-doneSending2 + checkSum += <-doneSending3 + close(numbers) + eventSum := <-doneSum + if checkSum != eventSum { + t.Errorf("Not all messages sent were received.\n") + } +} + +// TestAddDifferentListenerForDifferentEvents sets up an EventSwitch, +// subscribes a first listener to three events, and subscribes a second +// listener to two of those three events, and then sends a thousand integers +// for each of the three events. +func TestAddDifferentListenerForDifferentEvents(t *testing.T) { + evsw := NewEventSwitch() + err := evsw.Start() + if err != nil { + t.Errorf("Failed to start EventSwitch, error: %v", err) + } + doneSum1 := make(chan uint64) + doneSum2 := make(chan uint64) + doneSending1 := make(chan uint64) + doneSending2 := make(chan uint64) + doneSending3 := make(chan uint64) + numbers1 := make(chan uint64, 4) + numbers2 := make(chan uint64, 4) + // subscribe two listener to three events + evsw.AddListenerForEvent("listener1", "event1", + func(data EventData) { + numbers1 <- data.(uint64) + }) + evsw.AddListenerForEvent("listener1", "event2", + func(data EventData) { + numbers1 <- data.(uint64) + }) + evsw.AddListenerForEvent("listener1", "event3", + func(data EventData) { + numbers1 <- data.(uint64) + }) + evsw.AddListenerForEvent("listener2", "event2", + func(data EventData) { + numbers2 <- data.(uint64) + }) + evsw.AddListenerForEvent("listener2", "event3", + func(data EventData) { + numbers2 <- data.(uint64) + }) + // collect received events for listener1 + go sumReceivedNumbers(numbers1, doneSum1) + // collect received events for listener2 + go sumReceivedNumbers(numbers2, doneSum2) + // go fire events + go fireEvents(evsw, "event1", doneSending1, uint64(1)) + go fireEvents(evsw, "event2", doneSending2, uint64(1001)) + go fireEvents(evsw, "event3", doneSending3, uint64(2001)) + checkSumEvent1 := <-doneSending1 + checkSumEvent2 := <-doneSending2 + checkSumEvent3 := <-doneSending3 + checkSum1 := checkSumEvent1 + checkSumEvent2 + checkSumEvent3 + checkSum2 := checkSumEvent2 + checkSumEvent3 + close(numbers1) + close(numbers2) + eventSum1 := <-doneSum1 + eventSum2 := <-doneSum2 + if checkSum1 != eventSum1 || + checkSum2 != eventSum2 { + t.Errorf("Not all messages sent were received for different listeners to different events.\n") + } +} + +// TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to +// two events, fires a thousand integers for the first event, then unsubscribes +// the listener and fires a thousand integers for the second event. +func TestAddAndRemoveListener(t *testing.T) { + evsw := NewEventSwitch() + err := evsw.Start() + if err != nil { + t.Errorf("Failed to start EventSwitch, error: %v", err) + } + doneSum1 := make(chan uint64) + doneSum2 := make(chan uint64) + doneSending1 := make(chan uint64) + doneSending2 := make(chan uint64) + numbers1 := make(chan uint64, 4) + numbers2 := make(chan uint64, 4) + // subscribe two listener to three events + evsw.AddListenerForEvent("listener", "event1", + func(data EventData) { + numbers1 <- data.(uint64) + }) + evsw.AddListenerForEvent("listener", "event2", + func(data EventData) { + numbers2 <- data.(uint64) + }) + // collect received events for event1 + go sumReceivedNumbers(numbers1, doneSum1) + // collect received events for event2 + go sumReceivedNumbers(numbers2, doneSum2) + // go fire events + go fireEvents(evsw, "event1", doneSending1, uint64(1)) + checkSumEvent1 := <-doneSending1 + // after sending all event1, unsubscribe for all events + evsw.RemoveListener("listener") + go fireEvents(evsw, "event2", doneSending2, uint64(1001)) + checkSumEvent2 := <-doneSending2 + close(numbers1) + close(numbers2) + eventSum1 := <-doneSum1 + eventSum2 := <-doneSum2 + if checkSumEvent1 != eventSum1 || + // correct value asserted by preceding tests, suffices to be non-zero + checkSumEvent2 == uint64(0) || + eventSum2 != uint64(0) { + t.Errorf("Not all messages sent were received or unsubscription did not register.\n") + } +} + +// TestRemoveListener does basic tests on adding and removing +func TestRemoveListener(t *testing.T) { + evsw := NewEventSwitch() + err := evsw.Start() + if err != nil { + t.Errorf("Failed to start EventSwitch, error: %v", err) + } + count := 10 + sum1, sum2 := 0, 0 + // add some listeners and make sure they work + evsw.AddListenerForEvent("listener", "event1", + func(data EventData) { + sum1++ + }) + evsw.AddListenerForEvent("listener", "event2", + func(data EventData) { + sum2++ + }) + for i := 0; i < count; i++ { + evsw.FireEvent("event1", true) + evsw.FireEvent("event2", true) + } + assert.Equal(t, count, sum1) + assert.Equal(t, count, sum2) + + // remove one by event and make sure it is gone + evsw.RemoveListenerForEvent("event2", "listener") + for i := 0; i < count; i++ { + evsw.FireEvent("event1", true) + evsw.FireEvent("event2", true) + } + assert.Equal(t, count*2, sum1) + assert.Equal(t, count, sum2) + + // remove the listener entirely and make sure both gone + evsw.RemoveListener("listener") + for i := 0; i < count; i++ { + evsw.FireEvent("event1", true) + evsw.FireEvent("event2", true) + } + assert.Equal(t, count*2, sum1) + assert.Equal(t, count, sum2) +} + +// TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two +// listeners to three events, and fires a thousand integers for each event. +// These two listeners serve as the baseline validation while other listeners +// are randomly subscribed and unsubscribed. +// More precisely it randomly subscribes new listeners (different from the first +// two listeners) to one of these three events. At the same time it starts +// randomly unsubscribing these additional listeners from all events they are +// at that point subscribed to. +// NOTE: it is important to run this test with race conditions tracking on, +// `go test -race`, to examine for possible race conditions. +func TestRemoveListenersAsync(t *testing.T) { + evsw := NewEventSwitch() + err := evsw.Start() + if err != nil { + t.Errorf("Failed to start EventSwitch, error: %v", err) + } + doneSum1 := make(chan uint64) + doneSum2 := make(chan uint64) + doneSending1 := make(chan uint64) + doneSending2 := make(chan uint64) + doneSending3 := make(chan uint64) + numbers1 := make(chan uint64, 4) + numbers2 := make(chan uint64, 4) + // subscribe two listener to three events + evsw.AddListenerForEvent("listener1", "event1", + func(data EventData) { + numbers1 <- data.(uint64) + }) + evsw.AddListenerForEvent("listener1", "event2", + func(data EventData) { + numbers1 <- data.(uint64) + }) + evsw.AddListenerForEvent("listener1", "event3", + func(data EventData) { + numbers1 <- data.(uint64) + }) + evsw.AddListenerForEvent("listener2", "event1", + func(data EventData) { + numbers2 <- data.(uint64) + }) + evsw.AddListenerForEvent("listener2", "event2", + func(data EventData) { + numbers2 <- data.(uint64) + }) + evsw.AddListenerForEvent("listener2", "event3", + func(data EventData) { + numbers2 <- data.(uint64) + }) + // collect received events for event1 + go sumReceivedNumbers(numbers1, doneSum1) + // collect received events for event2 + go sumReceivedNumbers(numbers2, doneSum2) + addListenersStress := func() { + s1 := rand.NewSource(time.Now().UnixNano()) + r1 := rand.New(s1) + for k := uint16(0); k < 400; k++ { + listenerNumber := r1.Intn(100) + 3 + eventNumber := r1.Intn(3) + 1 + go evsw.AddListenerForEvent(fmt.Sprintf("listener%v", listenerNumber), + fmt.Sprintf("event%v", eventNumber), + func(_ EventData) {}) + } + } + removeListenersStress := func() { + s2 := rand.NewSource(time.Now().UnixNano()) + r2 := rand.New(s2) + for k := uint16(0); k < 80; k++ { + listenerNumber := r2.Intn(100) + 3 + go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber)) + } + } + addListenersStress() + // go fire events + go fireEvents(evsw, "event1", doneSending1, uint64(1)) + removeListenersStress() + go fireEvents(evsw, "event2", doneSending2, uint64(1001)) + go fireEvents(evsw, "event3", doneSending3, uint64(2001)) + checkSumEvent1 := <-doneSending1 + checkSumEvent2 := <-doneSending2 + checkSumEvent3 := <-doneSending3 + checkSum := checkSumEvent1 + checkSumEvent2 + checkSumEvent3 + close(numbers1) + close(numbers2) + eventSum1 := <-doneSum1 + eventSum2 := <-doneSum2 + if checkSum != eventSum1 || + checkSum != eventSum2 { + t.Errorf("Not all messages sent were received.\n") + } +} + +//------------------------------------------------------------------------------ +// Helper functions + +// sumReceivedNumbers takes two channels and adds all numbers received +// until the receiving channel `numbers` is closed; it then sends the sum +// on `doneSum` and closes that channel. Expected to be run in a go-routine. +func sumReceivedNumbers(numbers, doneSum chan uint64) { + var sum uint64 = 0 + for { + j, more := <-numbers + sum += j + if !more { + doneSum <- sum + close(doneSum) + return + } + } +} + +// fireEvents takes an EventSwitch and fires a thousand integers under +// a given `event` with the integers mootonically increasing from `offset` +// to `offset` + 999. It additionally returns the addition of all integers +// sent on `doneChan` for assertion that all events have been sent, and enabling +// the test to assert all events have also been received. +func fireEvents(evsw EventSwitch, event string, doneChan chan uint64, + offset uint64) { + var sentSum uint64 = 0 + for i := offset; i <= offset+uint64(999); i++ { + sentSum += i + evsw.FireEvent(event, i) + } + doneChan <- sentSum + close(doneChan) +} diff --git a/libs/pubsub/example_test.go b/libs/pubsub/example_test.go new file mode 100644 index 000000000..550b4447e --- /dev/null +++ b/libs/pubsub/example_test.go @@ -0,0 +1,28 @@ +package pubsub_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tmlibs/log" + + "github.com/tendermint/tendermint/libs/pubsub" + "github.com/tendermint/tendermint/libs/pubsub/query" +) + +func TestExample(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ctx := context.Background() + ch := make(chan interface{}, 1) + err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]interface{}{"abci.account.name": "John"})) + require.NoError(t, err) + assertReceive(t, "Tombstone", ch) +} diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go new file mode 100644 index 000000000..67f264ace --- /dev/null +++ b/libs/pubsub/pubsub.go @@ -0,0 +1,342 @@ +// Package pubsub implements a pub-sub model with a single publisher (Server) +// and multiple subscribers (clients). +// +// Though you can have multiple publishers by sharing a pointer to a server or +// by giving the same channel to each publisher and publishing messages from +// that channel (fan-in). +// +// Clients subscribe for messages, which could be of any type, using a query. +// When some message is published, we match it with all queries. If there is a +// match, this message will be pushed to all clients, subscribed to that query. +// See query subpackage for our implementation. +package pubsub + +import ( + "context" + "errors" + "sync" + + cmn "github.com/tendermint/tmlibs/common" +) + +type operation int + +const ( + sub operation = iota + pub + unsub + shutdown +) + +var ( + // ErrSubscriptionNotFound is returned when a client tries to unsubscribe + // from not existing subscription. + ErrSubscriptionNotFound = errors.New("subscription not found") + + // ErrAlreadySubscribed is returned when a client tries to subscribe twice or + // more using the same query. + ErrAlreadySubscribed = errors.New("already subscribed") +) + +// TagMap is used to associate tags to a message. +// They can be queried by subscribers to choose messages they will received. +type TagMap interface { + // Get returns the value for a key, or nil if no value is present. + // The ok result indicates whether value was found in the tags. + Get(key string) (value interface{}, ok bool) + // Len returns the number of tags. + Len() int +} + +type tagMap map[string]interface{} + +type cmd struct { + op operation + query Query + ch chan<- interface{} + clientID string + msg interface{} + tags TagMap +} + +// Query defines an interface for a query to be used for subscribing. +type Query interface { + Matches(tags TagMap) bool + String() string +} + +// Server allows clients to subscribe/unsubscribe for messages, publishing +// messages with or without tags, and manages internal state. +type Server struct { + cmn.BaseService + + cmds chan cmd + cmdsCap int + + mtx sync.RWMutex + subscriptions map[string]map[string]Query // subscriber -> query (string) -> Query +} + +// Option sets a parameter for the server. +type Option func(*Server) + +// NewTagMap constructs a new immutable tag set from a map. +func NewTagMap(data map[string]interface{}) TagMap { + return tagMap(data) +} + +// Get returns the value for a key, or nil if no value is present. +// The ok result indicates whether value was found in the tags. +func (ts tagMap) Get(key string) (value interface{}, ok bool) { + value, ok = ts[key] + return +} + +// Len returns the number of tags. +func (ts tagMap) Len() int { + return len(ts) +} + +// NewServer returns a new server. See the commentary on the Option functions +// for a detailed description of how to configure buffering. If no options are +// provided, the resulting server's queue is unbuffered. +func NewServer(options ...Option) *Server { + s := &Server{ + subscriptions: make(map[string]map[string]Query), + } + s.BaseService = *cmn.NewBaseService(nil, "PubSub", s) + + for _, option := range options { + option(s) + } + + // if BufferCapacity option was not set, the channel is unbuffered + s.cmds = make(chan cmd, s.cmdsCap) + + return s +} + +// BufferCapacity allows you to specify capacity for the internal server's +// queue. Since the server, given Y subscribers, could only process X messages, +// this option could be used to survive spikes (e.g. high amount of +// transactions during peak hours). +func BufferCapacity(cap int) Option { + return func(s *Server) { + if cap > 0 { + s.cmdsCap = cap + } + } +} + +// BufferCapacity returns capacity of the internal server's queue. +func (s *Server) BufferCapacity() int { + return s.cmdsCap +} + +// Subscribe creates a subscription for the given client. It accepts a channel +// on which messages matching the given query can be received. An error will be +// returned to the caller if the context is canceled or if subscription already +// exist for pair clientID and query. +func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error { + s.mtx.RLock() + clientSubscriptions, ok := s.subscriptions[clientID] + if ok { + _, ok = clientSubscriptions[query.String()] + } + s.mtx.RUnlock() + if ok { + return ErrAlreadySubscribed + } + + select { + case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: + s.mtx.Lock() + if _, ok = s.subscriptions[clientID]; !ok { + s.subscriptions[clientID] = make(map[string]Query) + } + s.subscriptions[clientID][query.String()] = query + s.mtx.Unlock() + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// Unsubscribe removes the subscription on the given query. An error will be +// returned to the caller if the context is canceled or if subscription does +// not exist. +func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error { + var origQuery Query + s.mtx.RLock() + clientSubscriptions, ok := s.subscriptions[clientID] + if ok { + origQuery, ok = clientSubscriptions[query.String()] + } + s.mtx.RUnlock() + if !ok { + return ErrSubscriptionNotFound + } + + // original query is used here because we're using pointers as map keys + select { + case s.cmds <- cmd{op: unsub, clientID: clientID, query: origQuery}: + s.mtx.Lock() + delete(clientSubscriptions, query.String()) + s.mtx.Unlock() + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// UnsubscribeAll removes all client subscriptions. An error will be returned +// to the caller if the context is canceled or if subscription does not exist. +func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { + s.mtx.RLock() + _, ok := s.subscriptions[clientID] + s.mtx.RUnlock() + if !ok { + return ErrSubscriptionNotFound + } + + select { + case s.cmds <- cmd{op: unsub, clientID: clientID}: + s.mtx.Lock() + delete(s.subscriptions, clientID) + s.mtx.Unlock() + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// Publish publishes the given message. An error will be returned to the caller +// if the context is canceled. +func (s *Server) Publish(ctx context.Context, msg interface{}) error { + return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]interface{}))) +} + +// PublishWithTags publishes the given message with the set of tags. The set is +// matched with clients queries. If there is a match, the message is sent to +// the client. +func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error { + select { + case s.cmds <- cmd{op: pub, msg: msg, tags: tags}: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// OnStop implements Service.OnStop by shutting down the server. +func (s *Server) OnStop() { + s.cmds <- cmd{op: shutdown} +} + +// NOTE: not goroutine safe +type state struct { + // query -> client -> ch + queries map[Query]map[string]chan<- interface{} + // client -> query -> struct{} + clients map[string]map[Query]struct{} +} + +// OnStart implements Service.OnStart by starting the server. +func (s *Server) OnStart() error { + go s.loop(state{ + queries: make(map[Query]map[string]chan<- interface{}), + clients: make(map[string]map[Query]struct{}), + }) + return nil +} + +// OnReset implements Service.OnReset +func (s *Server) OnReset() error { + return nil +} + +func (s *Server) loop(state state) { +loop: + for cmd := range s.cmds { + switch cmd.op { + case unsub: + if cmd.query != nil { + state.remove(cmd.clientID, cmd.query) + } else { + state.removeAll(cmd.clientID) + } + case shutdown: + for clientID := range state.clients { + state.removeAll(clientID) + } + break loop + case sub: + state.add(cmd.clientID, cmd.query, cmd.ch) + case pub: + state.send(cmd.msg, cmd.tags) + } + } +} + +func (state *state) add(clientID string, q Query, ch chan<- interface{}) { + // add query if needed + if _, ok := state.queries[q]; !ok { + state.queries[q] = make(map[string]chan<- interface{}) + } + + // create subscription + state.queries[q][clientID] = ch + + // add client if needed + if _, ok := state.clients[clientID]; !ok { + state.clients[clientID] = make(map[Query]struct{}) + } + state.clients[clientID][q] = struct{}{} +} + +func (state *state) remove(clientID string, q Query) { + clientToChannelMap, ok := state.queries[q] + if !ok { + return + } + + ch, ok := clientToChannelMap[clientID] + if ok { + close(ch) + + delete(state.clients[clientID], q) + + // if it not subscribed to anything else, remove the client + if len(state.clients[clientID]) == 0 { + delete(state.clients, clientID) + } + + delete(state.queries[q], clientID) + } +} + +func (state *state) removeAll(clientID string) { + queryMap, ok := state.clients[clientID] + if !ok { + return + } + + for q := range queryMap { + ch := state.queries[q][clientID] + close(ch) + + delete(state.queries[q], clientID) + } + + delete(state.clients, clientID) +} + +func (state *state) send(msg interface{}, tags TagMap) { + for q, clientToChannelMap := range state.queries { + if q.Matches(tags) { + for _, ch := range clientToChannelMap { + ch <- msg + } + } + } +} diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go new file mode 100644 index 000000000..a39d015ce --- /dev/null +++ b/libs/pubsub/pubsub_test.go @@ -0,0 +1,253 @@ +package pubsub_test + +import ( + "context" + "fmt" + "runtime/debug" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tmlibs/log" + + "github.com/tendermint/tendermint/libs/pubsub" + "github.com/tendermint/tendermint/libs/pubsub/query" +) + +const ( + clientID = "test-client" +) + +func TestSubscribe(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ctx := context.Background() + ch := make(chan interface{}, 1) + err := s.Subscribe(ctx, clientID, query.Empty{}, ch) + require.NoError(t, err) + err = s.Publish(ctx, "Ka-Zar") + require.NoError(t, err) + assertReceive(t, "Ka-Zar", ch) + + err = s.Publish(ctx, "Quicksilver") + require.NoError(t, err) + assertReceive(t, "Quicksilver", ch) +} + +func TestDifferentClients(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ctx := context.Background() + ch1 := make(chan interface{}, 1) + err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) + require.NoError(t, err) + assertReceive(t, "Iceman", ch1) + + ch2 := make(chan interface{}, 1) + err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})) + require.NoError(t, err) + assertReceive(t, "Ultimo", ch1) + assertReceive(t, "Ultimo", ch2) + + ch3 := make(chan interface{}, 1) + err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewRoundStep"})) + require.NoError(t, err) + assert.Zero(t, len(ch3)) +} + +func TestClientSubscribesTwice(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ctx := context.Background() + q := query.MustParse("tm.events.type='NewBlock'") + + ch1 := make(chan interface{}, 1) + err := s.Subscribe(ctx, clientID, q, ch1) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) + require.NoError(t, err) + assertReceive(t, "Goblin Queen", ch1) + + ch2 := make(chan interface{}, 1) + err = s.Subscribe(ctx, clientID, q, ch2) + require.Error(t, err) + + err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) + require.NoError(t, err) + assertReceive(t, "Spider-Man", ch1) +} + +func TestUnsubscribe(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ctx := context.Background() + ch := make(chan interface{}) + err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch) + require.NoError(t, err) + err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'")) + require.NoError(t, err) + + err = s.Publish(ctx, "Nick Fury") + require.NoError(t, err) + assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") + + _, ok := <-ch + assert.False(t, ok) +} + +func TestResubscribe(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ctx := context.Background() + ch := make(chan interface{}) + err := s.Subscribe(ctx, clientID, query.Empty{}, ch) + require.NoError(t, err) + err = s.Unsubscribe(ctx, clientID, query.Empty{}) + require.NoError(t, err) + ch = make(chan interface{}) + err = s.Subscribe(ctx, clientID, query.Empty{}, ch) + require.NoError(t, err) + + err = s.Publish(ctx, "Cable") + require.NoError(t, err) + assertReceive(t, "Cable", ch) +} + +func TestUnsubscribeAll(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ctx := context.Background() + ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) + err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch1) + require.NoError(t, err) + err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"), ch2) + require.NoError(t, err) + + err = s.UnsubscribeAll(ctx, clientID) + require.NoError(t, err) + + err = s.Publish(ctx, "Nick Fury") + require.NoError(t, err) + assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") + assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") + + _, ok := <-ch1 + assert.False(t, ok) + _, ok = <-ch2 + assert.False(t, ok) +} + +func TestBufferCapacity(t *testing.T) { + s := pubsub.NewServer(pubsub.BufferCapacity(2)) + s.SetLogger(log.TestingLogger()) + + assert.Equal(t, 2, s.BufferCapacity()) + + ctx := context.Background() + err := s.Publish(ctx, "Nighthawk") + require.NoError(t, err) + err = s.Publish(ctx, "Sage") + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + err = s.Publish(ctx, "Ironclad") + if assert.Error(t, err) { + assert.Equal(t, context.DeadlineExceeded, err) + } +} + +func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } +func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) } +func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) } + +func Benchmark10ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(10, b) } +func Benchmark100ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(100, b) } +func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) } + +func benchmarkNClients(n int, b *testing.B) { + s := pubsub.NewServer() + s.Start() + defer s.Stop() + + ctx := context.Background() + for i := 0; i < n; i++ { + ch := make(chan interface{}) + go func() { + for range ch { + } + }() + s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), ch) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})) + } +} + +func benchmarkNClientsOneQuery(n int, b *testing.B) { + s := pubsub.NewServer() + s.Start() + defer s.Stop() + + ctx := context.Background() + q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1") + for i := 0; i < n; i++ { + ch := make(chan interface{}) + go func() { + for range ch { + } + }() + s.Subscribe(ctx, clientID, q, ch) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})) + } +} + +/////////////////////////////////////////////////////////////////////////////// +/// HELPERS +/////////////////////////////////////////////////////////////////////////////// + +func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) { + select { + case actual := <-ch: + if actual != nil { + assert.Equal(t, expected, actual, msgAndArgs...) + } + case <-time.After(1 * time.Second): + t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected) + debug.PrintStack() + } +} diff --git a/libs/pubsub/query/Makefile b/libs/pubsub/query/Makefile new file mode 100644 index 000000000..91030ef09 --- /dev/null +++ b/libs/pubsub/query/Makefile @@ -0,0 +1,11 @@ +gen_query_parser: + @go get github.com/pointlander/peg + peg -inline -switch query.peg + +fuzzy_test: + @go get github.com/dvyukov/go-fuzz/go-fuzz + @go get github.com/dvyukov/go-fuzz/go-fuzz-build + go-fuzz-build github.com/tendermint/tendermint/libs/pubsub/query/fuzz_test + go-fuzz -bin=./fuzz_test-fuzz.zip -workdir=./fuzz_test/output + +.PHONY: gen_query_parser fuzzy_test diff --git a/libs/pubsub/query/empty.go b/libs/pubsub/query/empty.go new file mode 100644 index 000000000..17d7acefa --- /dev/null +++ b/libs/pubsub/query/empty.go @@ -0,0 +1,16 @@ +package query + +import "github.com/tendermint/tendermint/libs/pubsub" + +// Empty query matches any set of tags. +type Empty struct { +} + +// Matches always returns true. +func (Empty) Matches(tags pubsub.TagMap) bool { + return true +} + +func (Empty) String() string { + return "empty" +} diff --git a/libs/pubsub/query/empty_test.go b/libs/pubsub/query/empty_test.go new file mode 100644 index 000000000..9c82f73ed --- /dev/null +++ b/libs/pubsub/query/empty_test.go @@ -0,0 +1,18 @@ +package query_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/tendermint/tendermint/libs/pubsub" + "github.com/tendermint/tendermint/libs/pubsub/query" +) + +func TestEmptyQueryMatchesAnything(t *testing.T) { + q := query.Empty{} + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{}))) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Asher": "Roth"}))) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66}))) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66, "Billy": "Blue"}))) +} diff --git a/libs/pubsub/query/fuzz_test/main.go b/libs/pubsub/query/fuzz_test/main.go new file mode 100644 index 000000000..7a46116b5 --- /dev/null +++ b/libs/pubsub/query/fuzz_test/main.go @@ -0,0 +1,30 @@ +package fuzz_test + +import ( + "fmt" + + "github.com/tendermint/tendermint/libs/pubsub/query" +) + +func Fuzz(data []byte) int { + sdata := string(data) + q0, err := query.New(sdata) + if err != nil { + return 0 + } + + sdata1 := q0.String() + q1, err := query.New(sdata1) + if err != nil { + panic(err) + } + + sdata2 := q1.String() + if sdata1 != sdata2 { + fmt.Printf("q0: %q\n", sdata1) + fmt.Printf("q1: %q\n", sdata2) + panic("query changed") + } + + return 1 +} diff --git a/libs/pubsub/query/parser_test.go b/libs/pubsub/query/parser_test.go new file mode 100644 index 000000000..708dee484 --- /dev/null +++ b/libs/pubsub/query/parser_test.go @@ -0,0 +1,92 @@ +package query_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/tendermint/tendermint/libs/pubsub/query" +) + +// TODO: fuzzy testing? +func TestParser(t *testing.T) { + cases := []struct { + query string + valid bool + }{ + {"tm.events.type='NewBlock'", true}, + {"tm.events.type = 'NewBlock'", true}, + {"tm.events.name = ''", true}, + {"tm.events.type='TIME'", true}, + {"tm.events.type='DATE'", true}, + {"tm.events.type='='", true}, + {"tm.events.type='TIME", false}, + {"tm.events.type=TIME'", false}, + {"tm.events.type==", false}, + {"tm.events.type=NewBlock", false}, + {">==", false}, + {"tm.events.type 'NewBlock' =", false}, + {"tm.events.type>'NewBlock'", false}, + {"", false}, + {"=", false}, + {"='NewBlock'", false}, + {"tm.events.type=", false}, + + {"tm.events.typeNewBlock", false}, + {"tm.events.type'NewBlock'", false}, + {"'NewBlock'", false}, + {"NewBlock", false}, + {"", false}, + + {"tm.events.type='NewBlock' AND abci.account.name='Igor'", true}, + {"tm.events.type='NewBlock' AND", false}, + {"tm.events.type='NewBlock' AN", false}, + {"tm.events.type='NewBlock' AN tm.events.type='NewBlockHeader'", false}, + {"AND tm.events.type='NewBlock' ", false}, + + {"abci.account.name CONTAINS 'Igor'", true}, + + {"tx.date > DATE 2013-05-03", true}, + {"tx.date < DATE 2013-05-03", true}, + {"tx.date <= DATE 2013-05-03", true}, + {"tx.date >= DATE 2013-05-03", true}, + {"tx.date >= DAT 2013-05-03", false}, + {"tx.date <= DATE2013-05-03", false}, + {"tx.date <= DATE -05-03", false}, + {"tx.date >= DATE 20130503", false}, + {"tx.date >= DATE 2013+01-03", false}, + // incorrect year, month, day + {"tx.date >= DATE 0013-01-03", false}, + {"tx.date >= DATE 2013-31-03", false}, + {"tx.date >= DATE 2013-01-83", false}, + + {"tx.date > TIME 2013-05-03T14:45:00+07:00", true}, + {"tx.date < TIME 2013-05-03T14:45:00-02:00", true}, + {"tx.date <= TIME 2013-05-03T14:45:00Z", true}, + {"tx.date >= TIME 2013-05-03T14:45:00Z", true}, + {"tx.date >= TIME2013-05-03T14:45:00Z", false}, + {"tx.date = IME 2013-05-03T14:45:00Z", false}, + {"tx.date = TIME 2013-05-:45:00Z", false}, + {"tx.date >= TIME 2013-05-03T14:45:00", false}, + {"tx.date >= TIME 0013-00-00T14:45:00Z", false}, + {"tx.date >= TIME 2013+05=03T14:45:00Z", false}, + + {"account.balance=100", true}, + {"account.balance >= 200", true}, + {"account.balance >= -300", false}, + {"account.balance >>= 400", false}, + {"account.balance=33.22.1", false}, + + {"hash='136E18F7E4C348B780CF873A0BF43922E5BAFA63'", true}, + {"hash=136E18F7E4C348B780CF873A0BF43922E5BAFA63", false}, + } + + for _, c := range cases { + _, err := query.New(c.query) + if c.valid { + assert.NoErrorf(t, err, "Query was '%s'", c.query) + } else { + assert.Errorf(t, err, "Query was '%s'", c.query) + } + } +} diff --git a/libs/pubsub/query/query.go b/libs/pubsub/query/query.go new file mode 100644 index 000000000..a900d9838 --- /dev/null +++ b/libs/pubsub/query/query.go @@ -0,0 +1,345 @@ +// Package query provides a parser for a custom query format: +// +// abci.invoice.number=22 AND abci.invoice.owner=Ivan +// +// See query.peg for the grammar, which is a https://en.wikipedia.org/wiki/Parsing_expression_grammar. +// More: https://github.com/PhilippeSigaud/Pegged/wiki/PEG-Basics +// +// It has a support for numbers (integer and floating point), dates and times. +package query + +import ( + "fmt" + "reflect" + "strconv" + "strings" + "time" + + "github.com/tendermint/tendermint/libs/pubsub" +) + +// Query holds the query string and the query parser. +type Query struct { + str string + parser *QueryParser +} + +// Condition represents a single condition within a query and consists of tag +// (e.g. "tx.gas"), operator (e.g. "=") and operand (e.g. "7"). +type Condition struct { + Tag string + Op Operator + Operand interface{} +} + +// New parses the given string and returns a query or error if the string is +// invalid. +func New(s string) (*Query, error) { + p := &QueryParser{Buffer: fmt.Sprintf(`"%s"`, s)} + p.Init() + if err := p.Parse(); err != nil { + return nil, err + } + return &Query{str: s, parser: p}, nil +} + +// MustParse turns the given string into a query or panics; for tests or others +// cases where you know the string is valid. +func MustParse(s string) *Query { + q, err := New(s) + if err != nil { + panic(fmt.Sprintf("failed to parse %s: %v", s, err)) + } + return q +} + +// String returns the original string. +func (q *Query) String() string { + return q.str +} + +// Operator is an operator that defines some kind of relation between tag and +// operand (equality, etc.). +type Operator uint8 + +const ( + // "<=" + OpLessEqual Operator = iota + // ">=" + OpGreaterEqual + // "<" + OpLess + // ">" + OpGreater + // "=" + OpEqual + // "CONTAINS"; used to check if a string contains a certain sub string. + OpContains +) + +// Conditions returns a list of conditions. +func (q *Query) Conditions() []Condition { + conditions := make([]Condition, 0) + + buffer, begin, end := q.parser.Buffer, 0, 0 + + var tag string + var op Operator + + // tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7") + for _, token := range q.parser.Tokens() { + switch token.pegRule { + + case rulePegText: + begin, end = int(token.begin), int(token.end) + case ruletag: + tag = buffer[begin:end] + case rulele: + op = OpLessEqual + case rulege: + op = OpGreaterEqual + case rulel: + op = OpLess + case ruleg: + op = OpGreater + case ruleequal: + op = OpEqual + case rulecontains: + op = OpContains + case rulevalue: + // strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock") + valueWithoutSingleQuotes := buffer[begin+1 : end-1] + conditions = append(conditions, Condition{tag, op, valueWithoutSingleQuotes}) + case rulenumber: + number := buffer[begin:end] + if strings.Contains(number, ".") { // if it looks like a floating-point number + value, err := strconv.ParseFloat(number, 64) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) + } + conditions = append(conditions, Condition{tag, op, value}) + } else { + value, err := strconv.ParseInt(number, 10, 64) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number)) + } + conditions = append(conditions, Condition{tag, op, value}) + } + case ruletime: + value, err := time.Parse(time.RFC3339, buffer[begin:end]) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) + } + conditions = append(conditions, Condition{tag, op, value}) + case ruledate: + value, err := time.Parse("2006-01-02", buffer[begin:end]) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end])) + } + conditions = append(conditions, Condition{tag, op, value}) + } + } + + return conditions +} + +// Matches returns true if the query matches the given set of tags, false otherwise. +// +// For example, query "name=John" matches tags = {"name": "John"}. More +// examples could be found in parser_test.go and query_test.go. +func (q *Query) Matches(tags pubsub.TagMap) bool { + if tags.Len() == 0 { + return false + } + + buffer, begin, end := q.parser.Buffer, 0, 0 + + var tag string + var op Operator + + // tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7") + for _, token := range q.parser.Tokens() { + switch token.pegRule { + + case rulePegText: + begin, end = int(token.begin), int(token.end) + case ruletag: + tag = buffer[begin:end] + case rulele: + op = OpLessEqual + case rulege: + op = OpGreaterEqual + case rulel: + op = OpLess + case ruleg: + op = OpGreater + case ruleequal: + op = OpEqual + case rulecontains: + op = OpContains + case rulevalue: + // strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock") + valueWithoutSingleQuotes := buffer[begin+1 : end-1] + + // see if the triplet (tag, operator, operand) matches any tag + // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } + if !match(tag, op, reflect.ValueOf(valueWithoutSingleQuotes), tags) { + return false + } + case rulenumber: + number := buffer[begin:end] + if strings.Contains(number, ".") { // if it looks like a floating-point number + value, err := strconv.ParseFloat(number, 64) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + } else { + value, err := strconv.ParseInt(number, 10, 64) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number)) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + } + case ruletime: + value, err := time.Parse(time.RFC3339, buffer[begin:end]) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + case ruledate: + value, err := time.Parse("2006-01-02", buffer[begin:end]) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end])) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + } + } + + return true +} + +// match returns true if the given triplet (tag, operator, operand) matches any tag. +// +// First, it looks up the tag in tags and if it finds one, tries to compare the +// value from it to the operand using the operator. +// +// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } +func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) bool { + // look up the tag from the query in tags + value, ok := tags.Get(tag) + if !ok { + return false + } + switch operand.Kind() { + case reflect.Struct: // time + operandAsTime := operand.Interface().(time.Time) + v, ok := value.(time.Time) + if !ok { // if value from tags is not time.Time + return false + } + switch op { + case OpLessEqual: + return v.Before(operandAsTime) || v.Equal(operandAsTime) + case OpGreaterEqual: + return v.Equal(operandAsTime) || v.After(operandAsTime) + case OpLess: + return v.Before(operandAsTime) + case OpGreater: + return v.After(operandAsTime) + case OpEqual: + return v.Equal(operandAsTime) + } + case reflect.Float64: + operandFloat64 := operand.Interface().(float64) + var v float64 + // try our best to convert value from tags to float64 + switch vt := value.(type) { + case float64: + v = vt + case float32: + v = float64(vt) + case int: + v = float64(vt) + case int8: + v = float64(vt) + case int16: + v = float64(vt) + case int32: + v = float64(vt) + case int64: + v = float64(vt) + default: // fail for all other types + panic(fmt.Sprintf("Incomparable types: %T (%v) vs float64 (%v)", value, value, operandFloat64)) + } + switch op { + case OpLessEqual: + return v <= operandFloat64 + case OpGreaterEqual: + return v >= operandFloat64 + case OpLess: + return v < operandFloat64 + case OpGreater: + return v > operandFloat64 + case OpEqual: + return v == operandFloat64 + } + case reflect.Int64: + operandInt := operand.Interface().(int64) + var v int64 + // try our best to convert value from tags to int64 + switch vt := value.(type) { + case int64: + v = vt + case int8: + v = int64(vt) + case int16: + v = int64(vt) + case int32: + v = int64(vt) + case int: + v = int64(vt) + case float64: + v = int64(vt) + case float32: + v = int64(vt) + default: // fail for all other types + panic(fmt.Sprintf("Incomparable types: %T (%v) vs int64 (%v)", value, value, operandInt)) + } + switch op { + case OpLessEqual: + return v <= operandInt + case OpGreaterEqual: + return v >= operandInt + case OpLess: + return v < operandInt + case OpGreater: + return v > operandInt + case OpEqual: + return v == operandInt + } + case reflect.String: + v, ok := value.(string) + if !ok { // if value from tags is not string + return false + } + switch op { + case OpEqual: + return v == operand.String() + case OpContains: + return strings.Contains(v, operand.String()) + } + default: + panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind())) + } + + return false +} diff --git a/libs/pubsub/query/query.peg b/libs/pubsub/query/query.peg new file mode 100644 index 000000000..739892e4f --- /dev/null +++ b/libs/pubsub/query/query.peg @@ -0,0 +1,33 @@ +package query + +type QueryParser Peg { +} + +e <- '\"' condition ( ' '+ and ' '+ condition )* '\"' !. + +condition <- tag ' '* (le ' '* (number / time / date) + / ge ' '* (number / time / date) + / l ' '* (number / time / date) + / g ' '* (number / time / date) + / equal ' '* (number / time / date / value) + / contains ' '* value + ) + +tag <- < (![ \t\n\r\\()"'=><] .)+ > +value <- < '\'' (!["'] .)* '\''> +number <- < ('0' + / [1-9] digit* ('.' digit*)?) > +digit <- [0-9] +time <- "TIME " < year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit (('-' / '+') digit digit ':' digit digit / 'Z') > +date <- "DATE " < year '-' month '-' day > +year <- ('1' / '2') digit digit digit +month <- ('0' / '1') digit +day <- ('0' / '1' / '2' / '3') digit +and <- "AND" + +equal <- "=" +contains <- "CONTAINS" +le <- "<=" +ge <- ">=" +l <- "<" +g <- ">" diff --git a/libs/pubsub/query/query.peg.go b/libs/pubsub/query/query.peg.go new file mode 100644 index 000000000..c86e4a47f --- /dev/null +++ b/libs/pubsub/query/query.peg.go @@ -0,0 +1,1553 @@ +// nolint +package query + +import ( + "fmt" + "math" + "sort" + "strconv" +) + +const endSymbol rune = 1114112 + +/* The rule types inferred from the grammar are below. */ +type pegRule uint8 + +const ( + ruleUnknown pegRule = iota + rulee + rulecondition + ruletag + rulevalue + rulenumber + ruledigit + ruletime + ruledate + ruleyear + rulemonth + ruleday + ruleand + ruleequal + rulecontains + rulele + rulege + rulel + ruleg + rulePegText +) + +var rul3s = [...]string{ + "Unknown", + "e", + "condition", + "tag", + "value", + "number", + "digit", + "time", + "date", + "year", + "month", + "day", + "and", + "equal", + "contains", + "le", + "ge", + "l", + "g", + "PegText", +} + +type token32 struct { + pegRule + begin, end uint32 +} + +func (t *token32) String() string { + return fmt.Sprintf("\x1B[34m%v\x1B[m %v %v", rul3s[t.pegRule], t.begin, t.end) +} + +type node32 struct { + token32 + up, next *node32 +} + +func (node *node32) print(pretty bool, buffer string) { + var print func(node *node32, depth int) + print = func(node *node32, depth int) { + for node != nil { + for c := 0; c < depth; c++ { + fmt.Printf(" ") + } + rule := rul3s[node.pegRule] + quote := strconv.Quote(string(([]rune(buffer)[node.begin:node.end]))) + if !pretty { + fmt.Printf("%v %v\n", rule, quote) + } else { + fmt.Printf("\x1B[34m%v\x1B[m %v\n", rule, quote) + } + if node.up != nil { + print(node.up, depth+1) + } + node = node.next + } + } + print(node, 0) +} + +func (node *node32) Print(buffer string) { + node.print(false, buffer) +} + +func (node *node32) PrettyPrint(buffer string) { + node.print(true, buffer) +} + +type tokens32 struct { + tree []token32 +} + +func (t *tokens32) Trim(length uint32) { + t.tree = t.tree[:length] +} + +func (t *tokens32) Print() { + for _, token := range t.tree { + fmt.Println(token.String()) + } +} + +func (t *tokens32) AST() *node32 { + type element struct { + node *node32 + down *element + } + tokens := t.Tokens() + var stack *element + for _, token := range tokens { + if token.begin == token.end { + continue + } + node := &node32{token32: token} + for stack != nil && stack.node.begin >= token.begin && stack.node.end <= token.end { + stack.node.next = node.up + node.up = stack.node + stack = stack.down + } + stack = &element{node: node, down: stack} + } + if stack != nil { + return stack.node + } + return nil +} + +func (t *tokens32) PrintSyntaxTree(buffer string) { + t.AST().Print(buffer) +} + +func (t *tokens32) PrettyPrintSyntaxTree(buffer string) { + t.AST().PrettyPrint(buffer) +} + +func (t *tokens32) Add(rule pegRule, begin, end, index uint32) { + if tree := t.tree; int(index) >= len(tree) { + expanded := make([]token32, 2*len(tree)) + copy(expanded, tree) + t.tree = expanded + } + t.tree[index] = token32{ + pegRule: rule, + begin: begin, + end: end, + } +} + +func (t *tokens32) Tokens() []token32 { + return t.tree +} + +type QueryParser struct { + Buffer string + buffer []rune + rules [20]func() bool + parse func(rule ...int) error + reset func() + Pretty bool + tokens32 +} + +func (p *QueryParser) Parse(rule ...int) error { + return p.parse(rule...) +} + +func (p *QueryParser) Reset() { + p.reset() +} + +type textPosition struct { + line, symbol int +} + +type textPositionMap map[int]textPosition + +func translatePositions(buffer []rune, positions []int) textPositionMap { + length, translations, j, line, symbol := len(positions), make(textPositionMap, len(positions)), 0, 1, 0 + sort.Ints(positions) + +search: + for i, c := range buffer { + if c == '\n' { + line, symbol = line+1, 0 + } else { + symbol++ + } + if i == positions[j] { + translations[positions[j]] = textPosition{line, symbol} + for j++; j < length; j++ { + if i != positions[j] { + continue search + } + } + break search + } + } + + return translations +} + +type parseError struct { + p *QueryParser + max token32 +} + +func (e *parseError) Error() string { + tokens, error := []token32{e.max}, "\n" + positions, p := make([]int, 2*len(tokens)), 0 + for _, token := range tokens { + positions[p], p = int(token.begin), p+1 + positions[p], p = int(token.end), p+1 + } + translations := translatePositions(e.p.buffer, positions) + format := "parse error near %v (line %v symbol %v - line %v symbol %v):\n%v\n" + if e.p.Pretty { + format = "parse error near \x1B[34m%v\x1B[m (line %v symbol %v - line %v symbol %v):\n%v\n" + } + for _, token := range tokens { + begin, end := int(token.begin), int(token.end) + error += fmt.Sprintf(format, + rul3s[token.pegRule], + translations[begin].line, translations[begin].symbol, + translations[end].line, translations[end].symbol, + strconv.Quote(string(e.p.buffer[begin:end]))) + } + + return error +} + +func (p *QueryParser) PrintSyntaxTree() { + if p.Pretty { + p.tokens32.PrettyPrintSyntaxTree(p.Buffer) + } else { + p.tokens32.PrintSyntaxTree(p.Buffer) + } +} + +func (p *QueryParser) Init() { + var ( + max token32 + position, tokenIndex uint32 + buffer []rune + ) + p.reset = func() { + max = token32{} + position, tokenIndex = 0, 0 + + p.buffer = []rune(p.Buffer) + if len(p.buffer) == 0 || p.buffer[len(p.buffer)-1] != endSymbol { + p.buffer = append(p.buffer, endSymbol) + } + buffer = p.buffer + } + p.reset() + + _rules := p.rules + tree := tokens32{tree: make([]token32, math.MaxInt16)} + p.parse = func(rule ...int) error { + r := 1 + if len(rule) > 0 { + r = rule[0] + } + matches := p.rules[r]() + p.tokens32 = tree + if matches { + p.Trim(tokenIndex) + return nil + } + return &parseError{p, max} + } + + add := func(rule pegRule, begin uint32) { + tree.Add(rule, begin, position, tokenIndex) + tokenIndex++ + if begin != position && position > max.end { + max = token32{rule, begin, position} + } + } + + matchDot := func() bool { + if buffer[position] != endSymbol { + position++ + return true + } + return false + } + + /*matchChar := func(c byte) bool { + if buffer[position] == c { + position++ + return true + } + return false + }*/ + + /*matchRange := func(lower byte, upper byte) bool { + if c := buffer[position]; c >= lower && c <= upper { + position++ + return true + } + return false + }*/ + + _rules = [...]func() bool{ + nil, + /* 0 e <- <('"' condition (' '+ and ' '+ condition)* '"' !.)> */ + func() bool { + position0, tokenIndex0 := position, tokenIndex + { + position1 := position + if buffer[position] != rune('"') { + goto l0 + } + position++ + if !_rules[rulecondition]() { + goto l0 + } + l2: + { + position3, tokenIndex3 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l3 + } + position++ + l4: + { + position5, tokenIndex5 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l5 + } + position++ + goto l4 + l5: + position, tokenIndex = position5, tokenIndex5 + } + { + position6 := position + { + position7, tokenIndex7 := position, tokenIndex + if buffer[position] != rune('a') { + goto l8 + } + position++ + goto l7 + l8: + position, tokenIndex = position7, tokenIndex7 + if buffer[position] != rune('A') { + goto l3 + } + position++ + } + l7: + { + position9, tokenIndex9 := position, tokenIndex + if buffer[position] != rune('n') { + goto l10 + } + position++ + goto l9 + l10: + position, tokenIndex = position9, tokenIndex9 + if buffer[position] != rune('N') { + goto l3 + } + position++ + } + l9: + { + position11, tokenIndex11 := position, tokenIndex + if buffer[position] != rune('d') { + goto l12 + } + position++ + goto l11 + l12: + position, tokenIndex = position11, tokenIndex11 + if buffer[position] != rune('D') { + goto l3 + } + position++ + } + l11: + add(ruleand, position6) + } + if buffer[position] != rune(' ') { + goto l3 + } + position++ + l13: + { + position14, tokenIndex14 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l14 + } + position++ + goto l13 + l14: + position, tokenIndex = position14, tokenIndex14 + } + if !_rules[rulecondition]() { + goto l3 + } + goto l2 + l3: + position, tokenIndex = position3, tokenIndex3 + } + if buffer[position] != rune('"') { + goto l0 + } + position++ + { + position15, tokenIndex15 := position, tokenIndex + if !matchDot() { + goto l15 + } + goto l0 + l15: + position, tokenIndex = position15, tokenIndex15 + } + add(rulee, position1) + } + return true + l0: + position, tokenIndex = position0, tokenIndex0 + return false + }, + /* 1 condition <- <(tag ' '* ((le ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number))) / (ge ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number))) / ((&('=') (equal ' '* ((&('\'') value) | (&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('>') (g ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('<') (l ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('C' | 'c') (contains ' '* value)))))> */ + func() bool { + position16, tokenIndex16 := position, tokenIndex + { + position17 := position + { + position18 := position + { + position19 := position + { + position22, tokenIndex22 := position, tokenIndex + { + switch buffer[position] { + case '<': + if buffer[position] != rune('<') { + goto l22 + } + position++ + break + case '>': + if buffer[position] != rune('>') { + goto l22 + } + position++ + break + case '=': + if buffer[position] != rune('=') { + goto l22 + } + position++ + break + case '\'': + if buffer[position] != rune('\'') { + goto l22 + } + position++ + break + case '"': + if buffer[position] != rune('"') { + goto l22 + } + position++ + break + case ')': + if buffer[position] != rune(')') { + goto l22 + } + position++ + break + case '(': + if buffer[position] != rune('(') { + goto l22 + } + position++ + break + case '\\': + if buffer[position] != rune('\\') { + goto l22 + } + position++ + break + case '\r': + if buffer[position] != rune('\r') { + goto l22 + } + position++ + break + case '\n': + if buffer[position] != rune('\n') { + goto l22 + } + position++ + break + case '\t': + if buffer[position] != rune('\t') { + goto l22 + } + position++ + break + default: + if buffer[position] != rune(' ') { + goto l22 + } + position++ + break + } + } + + goto l16 + l22: + position, tokenIndex = position22, tokenIndex22 + } + if !matchDot() { + goto l16 + } + l20: + { + position21, tokenIndex21 := position, tokenIndex + { + position24, tokenIndex24 := position, tokenIndex + { + switch buffer[position] { + case '<': + if buffer[position] != rune('<') { + goto l24 + } + position++ + break + case '>': + if buffer[position] != rune('>') { + goto l24 + } + position++ + break + case '=': + if buffer[position] != rune('=') { + goto l24 + } + position++ + break + case '\'': + if buffer[position] != rune('\'') { + goto l24 + } + position++ + break + case '"': + if buffer[position] != rune('"') { + goto l24 + } + position++ + break + case ')': + if buffer[position] != rune(')') { + goto l24 + } + position++ + break + case '(': + if buffer[position] != rune('(') { + goto l24 + } + position++ + break + case '\\': + if buffer[position] != rune('\\') { + goto l24 + } + position++ + break + case '\r': + if buffer[position] != rune('\r') { + goto l24 + } + position++ + break + case '\n': + if buffer[position] != rune('\n') { + goto l24 + } + position++ + break + case '\t': + if buffer[position] != rune('\t') { + goto l24 + } + position++ + break + default: + if buffer[position] != rune(' ') { + goto l24 + } + position++ + break + } + } + + goto l21 + l24: + position, tokenIndex = position24, tokenIndex24 + } + if !matchDot() { + goto l21 + } + goto l20 + l21: + position, tokenIndex = position21, tokenIndex21 + } + add(rulePegText, position19) + } + add(ruletag, position18) + } + l26: + { + position27, tokenIndex27 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l27 + } + position++ + goto l26 + l27: + position, tokenIndex = position27, tokenIndex27 + } + { + position28, tokenIndex28 := position, tokenIndex + { + position30 := position + if buffer[position] != rune('<') { + goto l29 + } + position++ + if buffer[position] != rune('=') { + goto l29 + } + position++ + add(rulele, position30) + } + l31: + { + position32, tokenIndex32 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l32 + } + position++ + goto l31 + l32: + position, tokenIndex = position32, tokenIndex32 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l29 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l29 + } + break + default: + if !_rules[rulenumber]() { + goto l29 + } + break + } + } + + goto l28 + l29: + position, tokenIndex = position28, tokenIndex28 + { + position35 := position + if buffer[position] != rune('>') { + goto l34 + } + position++ + if buffer[position] != rune('=') { + goto l34 + } + position++ + add(rulege, position35) + } + l36: + { + position37, tokenIndex37 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l37 + } + position++ + goto l36 + l37: + position, tokenIndex = position37, tokenIndex37 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l34 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l34 + } + break + default: + if !_rules[rulenumber]() { + goto l34 + } + break + } + } + + goto l28 + l34: + position, tokenIndex = position28, tokenIndex28 + { + switch buffer[position] { + case '=': + { + position40 := position + if buffer[position] != rune('=') { + goto l16 + } + position++ + add(ruleequal, position40) + } + l41: + { + position42, tokenIndex42 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l42 + } + position++ + goto l41 + l42: + position, tokenIndex = position42, tokenIndex42 + } + { + switch buffer[position] { + case '\'': + if !_rules[rulevalue]() { + goto l16 + } + break + case 'D', 'd': + if !_rules[ruledate]() { + goto l16 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l16 + } + break + default: + if !_rules[rulenumber]() { + goto l16 + } + break + } + } + + break + case '>': + { + position44 := position + if buffer[position] != rune('>') { + goto l16 + } + position++ + add(ruleg, position44) + } + l45: + { + position46, tokenIndex46 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l46 + } + position++ + goto l45 + l46: + position, tokenIndex = position46, tokenIndex46 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l16 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l16 + } + break + default: + if !_rules[rulenumber]() { + goto l16 + } + break + } + } + + break + case '<': + { + position48 := position + if buffer[position] != rune('<') { + goto l16 + } + position++ + add(rulel, position48) + } + l49: + { + position50, tokenIndex50 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l50 + } + position++ + goto l49 + l50: + position, tokenIndex = position50, tokenIndex50 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l16 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l16 + } + break + default: + if !_rules[rulenumber]() { + goto l16 + } + break + } + } + + break + default: + { + position52 := position + { + position53, tokenIndex53 := position, tokenIndex + if buffer[position] != rune('c') { + goto l54 + } + position++ + goto l53 + l54: + position, tokenIndex = position53, tokenIndex53 + if buffer[position] != rune('C') { + goto l16 + } + position++ + } + l53: + { + position55, tokenIndex55 := position, tokenIndex + if buffer[position] != rune('o') { + goto l56 + } + position++ + goto l55 + l56: + position, tokenIndex = position55, tokenIndex55 + if buffer[position] != rune('O') { + goto l16 + } + position++ + } + l55: + { + position57, tokenIndex57 := position, tokenIndex + if buffer[position] != rune('n') { + goto l58 + } + position++ + goto l57 + l58: + position, tokenIndex = position57, tokenIndex57 + if buffer[position] != rune('N') { + goto l16 + } + position++ + } + l57: + { + position59, tokenIndex59 := position, tokenIndex + if buffer[position] != rune('t') { + goto l60 + } + position++ + goto l59 + l60: + position, tokenIndex = position59, tokenIndex59 + if buffer[position] != rune('T') { + goto l16 + } + position++ + } + l59: + { + position61, tokenIndex61 := position, tokenIndex + if buffer[position] != rune('a') { + goto l62 + } + position++ + goto l61 + l62: + position, tokenIndex = position61, tokenIndex61 + if buffer[position] != rune('A') { + goto l16 + } + position++ + } + l61: + { + position63, tokenIndex63 := position, tokenIndex + if buffer[position] != rune('i') { + goto l64 + } + position++ + goto l63 + l64: + position, tokenIndex = position63, tokenIndex63 + if buffer[position] != rune('I') { + goto l16 + } + position++ + } + l63: + { + position65, tokenIndex65 := position, tokenIndex + if buffer[position] != rune('n') { + goto l66 + } + position++ + goto l65 + l66: + position, tokenIndex = position65, tokenIndex65 + if buffer[position] != rune('N') { + goto l16 + } + position++ + } + l65: + { + position67, tokenIndex67 := position, tokenIndex + if buffer[position] != rune('s') { + goto l68 + } + position++ + goto l67 + l68: + position, tokenIndex = position67, tokenIndex67 + if buffer[position] != rune('S') { + goto l16 + } + position++ + } + l67: + add(rulecontains, position52) + } + l69: + { + position70, tokenIndex70 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l70 + } + position++ + goto l69 + l70: + position, tokenIndex = position70, tokenIndex70 + } + if !_rules[rulevalue]() { + goto l16 + } + break + } + } + + } + l28: + add(rulecondition, position17) + } + return true + l16: + position, tokenIndex = position16, tokenIndex16 + return false + }, + /* 2 tag <- <<(!((&('<') '<') | (&('>') '>') | (&('=') '=') | (&('\'') '\'') | (&('"') '"') | (&(')') ')') | (&('(') '(') | (&('\\') '\\') | (&('\r') '\r') | (&('\n') '\n') | (&('\t') '\t') | (&(' ') ' ')) .)+>> */ + nil, + /* 3 value <- <<('\'' (!('"' / '\'') .)* '\'')>> */ + func() bool { + position72, tokenIndex72 := position, tokenIndex + { + position73 := position + { + position74 := position + if buffer[position] != rune('\'') { + goto l72 + } + position++ + l75: + { + position76, tokenIndex76 := position, tokenIndex + { + position77, tokenIndex77 := position, tokenIndex + { + position78, tokenIndex78 := position, tokenIndex + if buffer[position] != rune('"') { + goto l79 + } + position++ + goto l78 + l79: + position, tokenIndex = position78, tokenIndex78 + if buffer[position] != rune('\'') { + goto l77 + } + position++ + } + l78: + goto l76 + l77: + position, tokenIndex = position77, tokenIndex77 + } + if !matchDot() { + goto l76 + } + goto l75 + l76: + position, tokenIndex = position76, tokenIndex76 + } + if buffer[position] != rune('\'') { + goto l72 + } + position++ + add(rulePegText, position74) + } + add(rulevalue, position73) + } + return true + l72: + position, tokenIndex = position72, tokenIndex72 + return false + }, + /* 4 number <- <<('0' / ([1-9] digit* ('.' digit*)?))>> */ + func() bool { + position80, tokenIndex80 := position, tokenIndex + { + position81 := position + { + position82 := position + { + position83, tokenIndex83 := position, tokenIndex + if buffer[position] != rune('0') { + goto l84 + } + position++ + goto l83 + l84: + position, tokenIndex = position83, tokenIndex83 + if c := buffer[position]; c < rune('1') || c > rune('9') { + goto l80 + } + position++ + l85: + { + position86, tokenIndex86 := position, tokenIndex + if !_rules[ruledigit]() { + goto l86 + } + goto l85 + l86: + position, tokenIndex = position86, tokenIndex86 + } + { + position87, tokenIndex87 := position, tokenIndex + if buffer[position] != rune('.') { + goto l87 + } + position++ + l89: + { + position90, tokenIndex90 := position, tokenIndex + if !_rules[ruledigit]() { + goto l90 + } + goto l89 + l90: + position, tokenIndex = position90, tokenIndex90 + } + goto l88 + l87: + position, tokenIndex = position87, tokenIndex87 + } + l88: + } + l83: + add(rulePegText, position82) + } + add(rulenumber, position81) + } + return true + l80: + position, tokenIndex = position80, tokenIndex80 + return false + }, + /* 5 digit <- <[0-9]> */ + func() bool { + position91, tokenIndex91 := position, tokenIndex + { + position92 := position + if c := buffer[position]; c < rune('0') || c > rune('9') { + goto l91 + } + position++ + add(ruledigit, position92) + } + return true + l91: + position, tokenIndex = position91, tokenIndex91 + return false + }, + /* 6 time <- <(('t' / 'T') ('i' / 'I') ('m' / 'M') ('e' / 'E') ' ' <(year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit ((('-' / '+') digit digit ':' digit digit) / 'Z'))>)> */ + func() bool { + position93, tokenIndex93 := position, tokenIndex + { + position94 := position + { + position95, tokenIndex95 := position, tokenIndex + if buffer[position] != rune('t') { + goto l96 + } + position++ + goto l95 + l96: + position, tokenIndex = position95, tokenIndex95 + if buffer[position] != rune('T') { + goto l93 + } + position++ + } + l95: + { + position97, tokenIndex97 := position, tokenIndex + if buffer[position] != rune('i') { + goto l98 + } + position++ + goto l97 + l98: + position, tokenIndex = position97, tokenIndex97 + if buffer[position] != rune('I') { + goto l93 + } + position++ + } + l97: + { + position99, tokenIndex99 := position, tokenIndex + if buffer[position] != rune('m') { + goto l100 + } + position++ + goto l99 + l100: + position, tokenIndex = position99, tokenIndex99 + if buffer[position] != rune('M') { + goto l93 + } + position++ + } + l99: + { + position101, tokenIndex101 := position, tokenIndex + if buffer[position] != rune('e') { + goto l102 + } + position++ + goto l101 + l102: + position, tokenIndex = position101, tokenIndex101 + if buffer[position] != rune('E') { + goto l93 + } + position++ + } + l101: + if buffer[position] != rune(' ') { + goto l93 + } + position++ + { + position103 := position + if !_rules[ruleyear]() { + goto l93 + } + if buffer[position] != rune('-') { + goto l93 + } + position++ + if !_rules[rulemonth]() { + goto l93 + } + if buffer[position] != rune('-') { + goto l93 + } + position++ + if !_rules[ruleday]() { + goto l93 + } + if buffer[position] != rune('T') { + goto l93 + } + position++ + if !_rules[ruledigit]() { + goto l93 + } + if !_rules[ruledigit]() { + goto l93 + } + if buffer[position] != rune(':') { + goto l93 + } + position++ + if !_rules[ruledigit]() { + goto l93 + } + if !_rules[ruledigit]() { + goto l93 + } + if buffer[position] != rune(':') { + goto l93 + } + position++ + if !_rules[ruledigit]() { + goto l93 + } + if !_rules[ruledigit]() { + goto l93 + } + { + position104, tokenIndex104 := position, tokenIndex + { + position106, tokenIndex106 := position, tokenIndex + if buffer[position] != rune('-') { + goto l107 + } + position++ + goto l106 + l107: + position, tokenIndex = position106, tokenIndex106 + if buffer[position] != rune('+') { + goto l105 + } + position++ + } + l106: + if !_rules[ruledigit]() { + goto l105 + } + if !_rules[ruledigit]() { + goto l105 + } + if buffer[position] != rune(':') { + goto l105 + } + position++ + if !_rules[ruledigit]() { + goto l105 + } + if !_rules[ruledigit]() { + goto l105 + } + goto l104 + l105: + position, tokenIndex = position104, tokenIndex104 + if buffer[position] != rune('Z') { + goto l93 + } + position++ + } + l104: + add(rulePegText, position103) + } + add(ruletime, position94) + } + return true + l93: + position, tokenIndex = position93, tokenIndex93 + return false + }, + /* 7 date <- <(('d' / 'D') ('a' / 'A') ('t' / 'T') ('e' / 'E') ' ' <(year '-' month '-' day)>)> */ + func() bool { + position108, tokenIndex108 := position, tokenIndex + { + position109 := position + { + position110, tokenIndex110 := position, tokenIndex + if buffer[position] != rune('d') { + goto l111 + } + position++ + goto l110 + l111: + position, tokenIndex = position110, tokenIndex110 + if buffer[position] != rune('D') { + goto l108 + } + position++ + } + l110: + { + position112, tokenIndex112 := position, tokenIndex + if buffer[position] != rune('a') { + goto l113 + } + position++ + goto l112 + l113: + position, tokenIndex = position112, tokenIndex112 + if buffer[position] != rune('A') { + goto l108 + } + position++ + } + l112: + { + position114, tokenIndex114 := position, tokenIndex + if buffer[position] != rune('t') { + goto l115 + } + position++ + goto l114 + l115: + position, tokenIndex = position114, tokenIndex114 + if buffer[position] != rune('T') { + goto l108 + } + position++ + } + l114: + { + position116, tokenIndex116 := position, tokenIndex + if buffer[position] != rune('e') { + goto l117 + } + position++ + goto l116 + l117: + position, tokenIndex = position116, tokenIndex116 + if buffer[position] != rune('E') { + goto l108 + } + position++ + } + l116: + if buffer[position] != rune(' ') { + goto l108 + } + position++ + { + position118 := position + if !_rules[ruleyear]() { + goto l108 + } + if buffer[position] != rune('-') { + goto l108 + } + position++ + if !_rules[rulemonth]() { + goto l108 + } + if buffer[position] != rune('-') { + goto l108 + } + position++ + if !_rules[ruleday]() { + goto l108 + } + add(rulePegText, position118) + } + add(ruledate, position109) + } + return true + l108: + position, tokenIndex = position108, tokenIndex108 + return false + }, + /* 8 year <- <(('1' / '2') digit digit digit)> */ + func() bool { + position119, tokenIndex119 := position, tokenIndex + { + position120 := position + { + position121, tokenIndex121 := position, tokenIndex + if buffer[position] != rune('1') { + goto l122 + } + position++ + goto l121 + l122: + position, tokenIndex = position121, tokenIndex121 + if buffer[position] != rune('2') { + goto l119 + } + position++ + } + l121: + if !_rules[ruledigit]() { + goto l119 + } + if !_rules[ruledigit]() { + goto l119 + } + if !_rules[ruledigit]() { + goto l119 + } + add(ruleyear, position120) + } + return true + l119: + position, tokenIndex = position119, tokenIndex119 + return false + }, + /* 9 month <- <(('0' / '1') digit)> */ + func() bool { + position123, tokenIndex123 := position, tokenIndex + { + position124 := position + { + position125, tokenIndex125 := position, tokenIndex + if buffer[position] != rune('0') { + goto l126 + } + position++ + goto l125 + l126: + position, tokenIndex = position125, tokenIndex125 + if buffer[position] != rune('1') { + goto l123 + } + position++ + } + l125: + if !_rules[ruledigit]() { + goto l123 + } + add(rulemonth, position124) + } + return true + l123: + position, tokenIndex = position123, tokenIndex123 + return false + }, + /* 10 day <- <(((&('3') '3') | (&('2') '2') | (&('1') '1') | (&('0') '0')) digit)> */ + func() bool { + position127, tokenIndex127 := position, tokenIndex + { + position128 := position + { + switch buffer[position] { + case '3': + if buffer[position] != rune('3') { + goto l127 + } + position++ + break + case '2': + if buffer[position] != rune('2') { + goto l127 + } + position++ + break + case '1': + if buffer[position] != rune('1') { + goto l127 + } + position++ + break + default: + if buffer[position] != rune('0') { + goto l127 + } + position++ + break + } + } + + if !_rules[ruledigit]() { + goto l127 + } + add(ruleday, position128) + } + return true + l127: + position, tokenIndex = position127, tokenIndex127 + return false + }, + /* 11 and <- <(('a' / 'A') ('n' / 'N') ('d' / 'D'))> */ + nil, + /* 12 equal <- <'='> */ + nil, + /* 13 contains <- <(('c' / 'C') ('o' / 'O') ('n' / 'N') ('t' / 'T') ('a' / 'A') ('i' / 'I') ('n' / 'N') ('s' / 'S'))> */ + nil, + /* 14 le <- <('<' '=')> */ + nil, + /* 15 ge <- <('>' '=')> */ + nil, + /* 16 l <- <'<'> */ + nil, + /* 17 g <- <'>'> */ + nil, + nil, + } + p.rules = _rules +} diff --git a/libs/pubsub/query/query_test.go b/libs/pubsub/query/query_test.go new file mode 100644 index 000000000..f266b1214 --- /dev/null +++ b/libs/pubsub/query/query_test.go @@ -0,0 +1,87 @@ +package query_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/libs/pubsub" + "github.com/tendermint/tendermint/libs/pubsub/query" +) + +func TestMatches(t *testing.T) { + const shortForm = "2006-Jan-02" + txDate, err := time.Parse(shortForm, "2017-Jan-01") + require.NoError(t, err) + txTime, err := time.Parse(time.RFC3339, "2018-05-03T14:45:00Z") + require.NoError(t, err) + + testCases := []struct { + s string + tags map[string]interface{} + err bool + matches bool + }{ + {"tm.events.type='NewBlock'", map[string]interface{}{"tm.events.type": "NewBlock"}, false, true}, + + {"tx.gas > 7", map[string]interface{}{"tx.gas": 8}, false, true}, + {"tx.gas > 7 AND tx.gas < 9", map[string]interface{}{"tx.gas": 8}, false, true}, + {"body.weight >= 3.5", map[string]interface{}{"body.weight": 3.5}, false, true}, + {"account.balance < 1000.0", map[string]interface{}{"account.balance": 900}, false, true}, + {"apples.kg <= 4", map[string]interface{}{"apples.kg": 4.0}, false, true}, + {"body.weight >= 4.5", map[string]interface{}{"body.weight": float32(4.5)}, false, true}, + {"oranges.kg < 4 AND watermellons.kg > 10", map[string]interface{}{"oranges.kg": 3, "watermellons.kg": 12}, false, true}, + {"peaches.kg < 4", map[string]interface{}{"peaches.kg": 5}, false, false}, + + {"tx.date > DATE 2017-01-01", map[string]interface{}{"tx.date": time.Now()}, false, true}, + {"tx.date = DATE 2017-01-01", map[string]interface{}{"tx.date": txDate}, false, true}, + {"tx.date = DATE 2018-01-01", map[string]interface{}{"tx.date": txDate}, false, false}, + + {"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": time.Now()}, false, true}, + {"tx.time = TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": txTime}, false, false}, + + {"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Igor,Ivan"}, false, true}, + {"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Pavel,Ivan"}, false, false}, + } + + for _, tc := range testCases { + q, err := query.New(tc.s) + if !tc.err { + require.Nil(t, err) + } + + if tc.matches { + assert.True(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should match %v", tc.s, tc.tags) + } else { + assert.False(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should not match %v", tc.s, tc.tags) + } + } +} + +func TestMustParse(t *testing.T) { + assert.Panics(t, func() { query.MustParse("=") }) + assert.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") }) +} + +func TestConditions(t *testing.T) { + txTime, err := time.Parse(time.RFC3339, "2013-05-03T14:45:00Z") + require.NoError(t, err) + + testCases := []struct { + s string + conditions []query.Condition + }{ + {s: "tm.events.type='NewBlock'", conditions: []query.Condition{query.Condition{Tag: "tm.events.type", Op: query.OpEqual, Operand: "NewBlock"}}}, + {s: "tx.gas > 7 AND tx.gas < 9", conditions: []query.Condition{query.Condition{Tag: "tx.gas", Op: query.OpGreater, Operand: int64(7)}, query.Condition{Tag: "tx.gas", Op: query.OpLess, Operand: int64(9)}}}, + {s: "tx.time >= TIME 2013-05-03T14:45:00Z", conditions: []query.Condition{query.Condition{Tag: "tx.time", Op: query.OpGreaterEqual, Operand: txTime}}}, + } + + for _, tc := range testCases { + q, err := query.New(tc.s) + require.Nil(t, err) + + assert.Equal(t, tc.conditions, q.Conditions()) + } +} diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index ed1a5b32d..e25cac1f5 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -11,7 +11,7 @@ import ( rpcclient "github.com/tendermint/tendermint/rpc/lib/client" "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" - tmpubsub "github.com/tendermint/tmlibs/pubsub" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ) /* diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go index c9bdddf1c..d3eeb4261 100644 --- a/rpc/client/localclient.go +++ b/rpc/client/localclient.go @@ -8,7 +8,7 @@ import ( ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" - tmpubsub "github.com/tendermint/tmlibs/pubsub" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ) /* diff --git a/rpc/core/events.go b/rpc/core/events.go index a46e0947c..36722fcf9 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -5,10 +5,10 @@ import ( "github.com/pkg/errors" + tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpctypes "github.com/tendermint/tendermint/rpc/lib/types" tmtypes "github.com/tendermint/tendermint/types" - tmquery "github.com/tendermint/tmlibs/pubsub/query" ) // Subscribe for events via WebSocket. @@ -46,10 +46,10 @@ import ( // https://godoc.org/github.com/tendermint/tendermint/types#pkg-constants // // For complete query syntax, check out -// https://godoc.org/github.com/tendermint/tmlibs/pubsub/query. +// https://godoc.org/github.com/tendermint/tendermint/libs/pubsub/query. // // ```go -// import "github.com/tendermint/tmlibs/pubsub/query" +// import "github.com/tendermint/tendermint/libs/pubsub/query" // import "github.com/tendermint/tendermint/types" // // client := client.NewHTTP("tcp://0.0.0.0:46657", "/websocket") diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 5fc01a86d..615136a92 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -3,11 +3,12 @@ package core import ( "fmt" + cmn "github.com/tendermint/tmlibs/common" + + tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/types" - cmn "github.com/tendermint/tmlibs/common" - tmquery "github.com/tendermint/tmlibs/pubsub/query" ) // Tx allows you to query the transaction results. `nil` could mean the diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index 5fa723bb4..1eeb19ea8 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -8,7 +8,7 @@ import ( "github.com/pkg/errors" "github.com/tendermint/go-amino" - tmpubsub "github.com/tendermint/tmlibs/pubsub" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ) //---------------------------------------- diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go index e23840f14..bf7760fc8 100644 --- a/state/txindex/indexer.go +++ b/state/txindex/indexer.go @@ -4,7 +4,7 @@ import ( "errors" "github.com/tendermint/tendermint/types" - "github.com/tendermint/tmlibs/pubsub/query" + "github.com/tendermint/tendermint/libs/pubsub/query" ) // TxIndexer interface defines methods to index and search transactions. diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index 93e6269e8..264be1fd8 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -3,8 +3,9 @@ package txindex import ( "context" - "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" + + "github.com/tendermint/tendermint/types" ) const ( diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index 87861f050..718a55d15 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -12,8 +12,8 @@ import ( "github.com/pkg/errors" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" - "github.com/tendermint/tmlibs/pubsub/query" + "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" ) diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index a8537219d..af35ec411 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -11,8 +11,8 @@ import ( abci "github.com/tendermint/abci/types" cmn "github.com/tendermint/tmlibs/common" db "github.com/tendermint/tmlibs/db" - "github.com/tendermint/tmlibs/pubsub/query" + "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" ) diff --git a/state/txindex/null/null.go b/state/txindex/null/null.go index 0764faa9e..2d3961e6b 100644 --- a/state/txindex/null/null.go +++ b/state/txindex/null/null.go @@ -5,7 +5,7 @@ import ( "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" - "github.com/tendermint/tmlibs/pubsub/query" + "github.com/tendermint/tendermint/libs/pubsub/query" ) var _ txindex.TxIndexer = (*TxIndex)(nil) diff --git a/types/event_bus.go b/types/event_bus.go index 460a3e294..925907fd0 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -6,7 +6,7 @@ import ( cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" - tmpubsub "github.com/tendermint/tmlibs/pubsub" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ) const defaultCapacity = 1000 diff --git a/types/event_bus_test.go b/types/event_bus_test.go index 70a537745..95d061f40 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -12,8 +12,8 @@ import ( abci "github.com/tendermint/abci/types" cmn "github.com/tendermint/tmlibs/common" - tmpubsub "github.com/tendermint/tmlibs/pubsub" - tmquery "github.com/tendermint/tmlibs/pubsub/query" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ) func TestEventBusPublishEventTx(t *testing.T) { diff --git a/types/events.go b/types/events.go index 342d4bc20..2b87297cd 100644 --- a/types/events.go +++ b/types/events.go @@ -3,9 +3,9 @@ package types import ( "fmt" - "github.com/tendermint/go-amino" - tmpubsub "github.com/tendermint/tmlibs/pubsub" - tmquery "github.com/tendermint/tmlibs/pubsub/query" + amino "github.com/tendermint/go-amino" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ) // Reserved event types diff --git a/types/nop_event_bus.go b/types/nop_event_bus.go index 06b70987d..cd1eab8cd 100644 --- a/types/nop_event_bus.go +++ b/types/nop_event_bus.go @@ -3,7 +3,7 @@ package types import ( "context" - tmpubsub "github.com/tendermint/tmlibs/pubsub" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ) type NopEventBus struct{}