diff --git a/.gitignore b/.gitignore
index e76fb1fc5..c67019bd9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,3 +25,5 @@ scripts/cutWALUntil/cutWALUntil
.idea/
*.iml
+
+libs/pubsub/query/fuzz_test/output
diff --git a/Gopkg.lock b/Gopkg.lock
index df971a47b..6e34258f4 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -278,12 +278,9 @@
"clist",
"common",
"db",
- "events",
"flowrate",
"log",
"merkle",
- "pubsub",
- "pubsub/query",
"test"
]
revision = "cc5f287c4798ffe88c04d02df219ecb6932080fd"
@@ -385,6 +382,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
- inputs-digest = "a88c20b6e36b3529d6fdcffc3603d9eb193fc3809de8afbba07bad990539b256"
+ inputs-digest = "d85c98dcac32cc1fe05d006aa75e8985f6447a150a041b972a673a65e7681da9"
solver-name = "gps-cdcl"
solver-version = 1
diff --git a/consensus/reactor.go b/consensus/reactor.go
index f171aa8d9..19b0c0fe2 100644
--- a/consensus/reactor.go
+++ b/consensus/reactor.go
@@ -10,10 +10,10 @@ import (
amino "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tmlibs/common"
- tmevents "github.com/tendermint/tmlibs/events"
"github.com/tendermint/tmlibs/log"
cstypes "github.com/tendermint/tendermint/consensus/types"
+ tmevents "github.com/tendermint/tendermint/libs/events"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
diff --git a/consensus/state.go b/consensus/state.go
index 0c6f7b487..3b713e2ec 100644
--- a/consensus/state.go
+++ b/consensus/state.go
@@ -11,11 +11,11 @@ import (
fail "github.com/ebuchman/fail-test"
cmn "github.com/tendermint/tmlibs/common"
- tmevents "github.com/tendermint/tmlibs/events"
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
cstypes "github.com/tendermint/tendermint/consensus/types"
+ tmevents "github.com/tendermint/tendermint/libs/events"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
diff --git a/consensus/state_test.go b/consensus/state_test.go
index 0d7cad484..f4d79ca77 100644
--- a/consensus/state_test.go
+++ b/consensus/state_test.go
@@ -11,7 +11,7 @@ import (
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
- tmpubsub "github.com/tendermint/tmlibs/pubsub"
+ tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
)
func init() {
diff --git a/libs/events/Makefile b/libs/events/Makefile
new file mode 100644
index 000000000..696aafff1
--- /dev/null
+++ b/libs/events/Makefile
@@ -0,0 +1,9 @@
+.PHONY: docs
+REPO:=github.com/tendermint/tendermint/libs/events
+
+docs:
+ @go get github.com/davecheney/godoc2md
+ godoc2md $(REPO) > README.md
+
+test:
+ go test -v ./...
diff --git a/libs/events/README.md b/libs/events/README.md
new file mode 100644
index 000000000..14aa498ff
--- /dev/null
+++ b/libs/events/README.md
@@ -0,0 +1,175 @@
+
+
+# events
+`import "github.com/tendermint/tendermint/libs/events"`
+
+* [Overview](#pkg-overview)
+* [Index](#pkg-index)
+
+## Overview
+Pub-Sub in go with event caching
+
+
+
+
+## Index
+* [type EventCache](#EventCache)
+ * [func NewEventCache(evsw Fireable) *EventCache](#NewEventCache)
+ * [func (evc *EventCache) FireEvent(event string, data EventData)](#EventCache.FireEvent)
+ * [func (evc *EventCache) Flush()](#EventCache.Flush)
+* [type EventCallback](#EventCallback)
+* [type EventData](#EventData)
+* [type EventSwitch](#EventSwitch)
+ * [func NewEventSwitch() EventSwitch](#NewEventSwitch)
+* [type Eventable](#Eventable)
+* [type Fireable](#Fireable)
+
+
+#### Package files
+[event_cache.go](/src/github.com/tendermint/tendermint/libs/events/event_cache.go) [events.go](/src/github.com/tendermint/tendermint/libs/events/events.go)
+
+
+
+
+
+
+## type [EventCache](/src/target/event_cache.go?s=116:179#L5)
+``` go
+type EventCache struct {
+ // contains filtered or unexported fields
+}
+```
+An EventCache buffers events for a Fireable
+All events are cached. Filtering happens on Flush
+
+
+
+
+
+
+
+### func [NewEventCache](/src/target/event_cache.go?s=239:284#L11)
+``` go
+func NewEventCache(evsw Fireable) *EventCache
+```
+Create a new EventCache with an EventSwitch as backend
+
+
+
+
+
+### func (\*EventCache) [FireEvent](/src/target/event_cache.go?s=449:511#L24)
+``` go
+func (evc *EventCache) FireEvent(event string, data EventData)
+```
+Cache an event to be fired upon finality.
+
+
+
+
+### func (\*EventCache) [Flush](/src/target/event_cache.go?s=735:765#L31)
+``` go
+func (evc *EventCache) Flush()
+```
+Fire events by running evsw.FireEvent on all cached events. Blocks.
+Clears cached events
+
+
+
+
+## type [EventCallback](/src/target/events.go?s=4201:4240#L185)
+``` go
+type EventCallback func(data EventData)
+```
+
+
+
+
+
+
+
+
+
+## type [EventData](/src/target/events.go?s=243:294#L14)
+``` go
+type EventData interface {
+}
+```
+Generic event data can be typed and registered with tendermint/go-amino
+via concrete implementation of this interface
+
+
+
+
+
+
+
+
+
+
+## type [EventSwitch](/src/target/events.go?s=560:771#L29)
+``` go
+type EventSwitch interface {
+ cmn.Service
+ Fireable
+
+ AddListenerForEvent(listenerID, event string, cb EventCallback)
+ RemoveListenerForEvent(event string, listenerID string)
+ RemoveListener(listenerID string)
+}
+```
+
+
+
+
+
+
+### func [NewEventSwitch](/src/target/events.go?s=917:950#L46)
+``` go
+func NewEventSwitch() EventSwitch
+```
+
+
+
+
+## type [Eventable](/src/target/events.go?s=378:440#L20)
+``` go
+type Eventable interface {
+ SetEventSwitch(evsw EventSwitch)
+}
+```
+reactors and other modules should export
+this interface to become eventable
+
+
+
+
+
+
+
+
+
+
+## type [Fireable](/src/target/events.go?s=490:558#L25)
+``` go
+type Fireable interface {
+ FireEvent(event string, data EventData)
+}
+```
+an event switch or cache implements fireable
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+- - -
+Generated by [godoc2md](http://godoc.org/github.com/davecheney/godoc2md)
diff --git a/libs/events/event_cache.go b/libs/events/event_cache.go
new file mode 100644
index 000000000..f508e873d
--- /dev/null
+++ b/libs/events/event_cache.go
@@ -0,0 +1,37 @@
+package events
+
+// An EventCache buffers events for a Fireable
+// All events are cached. Filtering happens on Flush
+type EventCache struct {
+ evsw Fireable
+ events []eventInfo
+}
+
+// Create a new EventCache with an EventSwitch as backend
+func NewEventCache(evsw Fireable) *EventCache {
+ return &EventCache{
+ evsw: evsw,
+ }
+}
+
+// a cached event
+type eventInfo struct {
+ event string
+ data EventData
+}
+
+// Cache an event to be fired upon finality.
+func (evc *EventCache) FireEvent(event string, data EventData) {
+ // append to list (go will grow our backing array exponentially)
+ evc.events = append(evc.events, eventInfo{event, data})
+}
+
+// Fire events by running evsw.FireEvent on all cached events. Blocks.
+// Clears cached events
+func (evc *EventCache) Flush() {
+ for _, ei := range evc.events {
+ evc.evsw.FireEvent(ei.event, ei.data)
+ }
+ // Clear the buffer, since we only add to it with append it's safe to just set it to nil and maybe safe an allocation
+ evc.events = nil
+}
diff --git a/libs/events/event_cache_test.go b/libs/events/event_cache_test.go
new file mode 100644
index 000000000..ab321da3a
--- /dev/null
+++ b/libs/events/event_cache_test.go
@@ -0,0 +1,35 @@
+package events
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestEventCache_Flush(t *testing.T) {
+ evsw := NewEventSwitch()
+ evsw.Start()
+ evsw.AddListenerForEvent("nothingness", "", func(data EventData) {
+ // Check we are not initialising an empty buffer full of zeroed eventInfos in the EventCache
+ require.FailNow(t, "We should never receive a message on this switch since none are fired")
+ })
+ evc := NewEventCache(evsw)
+ evc.Flush()
+ // Check after reset
+ evc.Flush()
+ fail := true
+ pass := false
+ evsw.AddListenerForEvent("somethingness", "something", func(data EventData) {
+ if fail {
+ require.FailNow(t, "Shouldn't see a message until flushed")
+ }
+ pass = true
+ })
+ evc.FireEvent("something", struct{ int }{1})
+ evc.FireEvent("something", struct{ int }{2})
+ evc.FireEvent("something", struct{ int }{3})
+ fail = false
+ evc.Flush()
+ assert.True(t, pass)
+}
diff --git a/libs/events/events.go b/libs/events/events.go
new file mode 100644
index 000000000..f1b2a754e
--- /dev/null
+++ b/libs/events/events.go
@@ -0,0 +1,226 @@
+/*
+Pub-Sub in go with event caching
+*/
+package events
+
+import (
+ "sync"
+
+ cmn "github.com/tendermint/tmlibs/common"
+)
+
+// Generic event data can be typed and registered with tendermint/go-amino
+// via concrete implementation of this interface
+type EventData interface {
+ //AssertIsEventData()
+}
+
+// reactors and other modules should export
+// this interface to become eventable
+type Eventable interface {
+ SetEventSwitch(evsw EventSwitch)
+}
+
+// an event switch or cache implements fireable
+type Fireable interface {
+ FireEvent(event string, data EventData)
+}
+
+type EventSwitch interface {
+ cmn.Service
+ Fireable
+
+ AddListenerForEvent(listenerID, event string, cb EventCallback)
+ RemoveListenerForEvent(event string, listenerID string)
+ RemoveListener(listenerID string)
+}
+
+type eventSwitch struct {
+ cmn.BaseService
+
+ mtx sync.RWMutex
+ eventCells map[string]*eventCell
+ listeners map[string]*eventListener
+}
+
+func NewEventSwitch() EventSwitch {
+ evsw := &eventSwitch{}
+ evsw.BaseService = *cmn.NewBaseService(nil, "EventSwitch", evsw)
+ return evsw
+}
+
+func (evsw *eventSwitch) OnStart() error {
+ evsw.BaseService.OnStart()
+ evsw.eventCells = make(map[string]*eventCell)
+ evsw.listeners = make(map[string]*eventListener)
+ return nil
+}
+
+func (evsw *eventSwitch) OnStop() {
+ evsw.mtx.Lock()
+ defer evsw.mtx.Unlock()
+ evsw.BaseService.OnStop()
+ evsw.eventCells = nil
+ evsw.listeners = nil
+}
+
+func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) {
+ // Get/Create eventCell and listener
+ evsw.mtx.Lock()
+ eventCell := evsw.eventCells[event]
+ if eventCell == nil {
+ eventCell = newEventCell()
+ evsw.eventCells[event] = eventCell
+ }
+ listener := evsw.listeners[listenerID]
+ if listener == nil {
+ listener = newEventListener(listenerID)
+ evsw.listeners[listenerID] = listener
+ }
+ evsw.mtx.Unlock()
+
+ // Add event and listener
+ eventCell.AddListener(listenerID, cb)
+ listener.AddEvent(event)
+}
+
+func (evsw *eventSwitch) RemoveListener(listenerID string) {
+ // Get and remove listener
+ evsw.mtx.RLock()
+ listener := evsw.listeners[listenerID]
+ evsw.mtx.RUnlock()
+ if listener == nil {
+ return
+ }
+
+ evsw.mtx.Lock()
+ delete(evsw.listeners, listenerID)
+ evsw.mtx.Unlock()
+
+ // Remove callback for each event.
+ listener.SetRemoved()
+ for _, event := range listener.GetEvents() {
+ evsw.RemoveListenerForEvent(event, listenerID)
+ }
+}
+
+func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) {
+ // Get eventCell
+ evsw.mtx.Lock()
+ eventCell := evsw.eventCells[event]
+ evsw.mtx.Unlock()
+
+ if eventCell == nil {
+ return
+ }
+
+ // Remove listenerID from eventCell
+ numListeners := eventCell.RemoveListener(listenerID)
+
+ // Maybe garbage collect eventCell.
+ if numListeners == 0 {
+ // Lock again and double check.
+ evsw.mtx.Lock() // OUTER LOCK
+ eventCell.mtx.Lock() // INNER LOCK
+ if len(eventCell.listeners) == 0 {
+ delete(evsw.eventCells, event)
+ }
+ eventCell.mtx.Unlock() // INNER LOCK
+ evsw.mtx.Unlock() // OUTER LOCK
+ }
+}
+
+func (evsw *eventSwitch) FireEvent(event string, data EventData) {
+ // Get the eventCell
+ evsw.mtx.RLock()
+ eventCell := evsw.eventCells[event]
+ evsw.mtx.RUnlock()
+
+ if eventCell == nil {
+ return
+ }
+
+ // Fire event for all listeners in eventCell
+ eventCell.FireEvent(data)
+}
+
+//-----------------------------------------------------------------------------
+
+// eventCell handles keeping track of listener callbacks for a given event.
+type eventCell struct {
+ mtx sync.RWMutex
+ listeners map[string]EventCallback
+}
+
+func newEventCell() *eventCell {
+ return &eventCell{
+ listeners: make(map[string]EventCallback),
+ }
+}
+
+func (cell *eventCell) AddListener(listenerID string, cb EventCallback) {
+ cell.mtx.Lock()
+ cell.listeners[listenerID] = cb
+ cell.mtx.Unlock()
+}
+
+func (cell *eventCell) RemoveListener(listenerID string) int {
+ cell.mtx.Lock()
+ delete(cell.listeners, listenerID)
+ numListeners := len(cell.listeners)
+ cell.mtx.Unlock()
+ return numListeners
+}
+
+func (cell *eventCell) FireEvent(data EventData) {
+ cell.mtx.RLock()
+ for _, listener := range cell.listeners {
+ listener(data)
+ }
+ cell.mtx.RUnlock()
+}
+
+//-----------------------------------------------------------------------------
+
+type EventCallback func(data EventData)
+
+type eventListener struct {
+ id string
+
+ mtx sync.RWMutex
+ removed bool
+ events []string
+}
+
+func newEventListener(id string) *eventListener {
+ return &eventListener{
+ id: id,
+ removed: false,
+ events: nil,
+ }
+}
+
+func (evl *eventListener) AddEvent(event string) {
+ evl.mtx.Lock()
+ defer evl.mtx.Unlock()
+
+ if evl.removed {
+ return
+ }
+ evl.events = append(evl.events, event)
+}
+
+func (evl *eventListener) GetEvents() []string {
+ evl.mtx.RLock()
+ defer evl.mtx.RUnlock()
+
+ events := make([]string, len(evl.events))
+ copy(events, evl.events)
+ return events
+}
+
+func (evl *eventListener) SetRemoved() {
+ evl.mtx.Lock()
+ defer evl.mtx.Unlock()
+ evl.removed = true
+}
diff --git a/libs/events/events_test.go b/libs/events/events_test.go
new file mode 100644
index 000000000..4995ae730
--- /dev/null
+++ b/libs/events/events_test.go
@@ -0,0 +1,380 @@
+package events
+
+import (
+ "fmt"
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+// TestAddListenerForEventFireOnce sets up an EventSwitch, subscribes a single
+// listener to an event, and sends a string "data".
+func TestAddListenerForEventFireOnce(t *testing.T) {
+ evsw := NewEventSwitch()
+ err := evsw.Start()
+ if err != nil {
+ t.Errorf("Failed to start EventSwitch, error: %v", err)
+ }
+ messages := make(chan EventData)
+ evsw.AddListenerForEvent("listener", "event",
+ func(data EventData) {
+ messages <- data
+ })
+ go evsw.FireEvent("event", "data")
+ received := <-messages
+ if received != "data" {
+ t.Errorf("Message received does not match: %v", received)
+ }
+}
+
+// TestAddListenerForEventFireMany sets up an EventSwitch, subscribes a single
+// listener to an event, and sends a thousand integers.
+func TestAddListenerForEventFireMany(t *testing.T) {
+ evsw := NewEventSwitch()
+ err := evsw.Start()
+ if err != nil {
+ t.Errorf("Failed to start EventSwitch, error: %v", err)
+ }
+ doneSum := make(chan uint64)
+ doneSending := make(chan uint64)
+ numbers := make(chan uint64, 4)
+ // subscribe one listener for one event
+ evsw.AddListenerForEvent("listener", "event",
+ func(data EventData) {
+ numbers <- data.(uint64)
+ })
+ // collect received events
+ go sumReceivedNumbers(numbers, doneSum)
+ // go fire events
+ go fireEvents(evsw, "event", doneSending, uint64(1))
+ checkSum := <-doneSending
+ close(numbers)
+ eventSum := <-doneSum
+ if checkSum != eventSum {
+ t.Errorf("Not all messages sent were received.\n")
+ }
+}
+
+// TestAddListenerForDifferentEvents sets up an EventSwitch, subscribes a single
+// listener to three different events and sends a thousand integers for each
+// of the three events.
+func TestAddListenerForDifferentEvents(t *testing.T) {
+ evsw := NewEventSwitch()
+ err := evsw.Start()
+ if err != nil {
+ t.Errorf("Failed to start EventSwitch, error: %v", err)
+ }
+ doneSum := make(chan uint64)
+ doneSending1 := make(chan uint64)
+ doneSending2 := make(chan uint64)
+ doneSending3 := make(chan uint64)
+ numbers := make(chan uint64, 4)
+ // subscribe one listener to three events
+ evsw.AddListenerForEvent("listener", "event1",
+ func(data EventData) {
+ numbers <- data.(uint64)
+ })
+ evsw.AddListenerForEvent("listener", "event2",
+ func(data EventData) {
+ numbers <- data.(uint64)
+ })
+ evsw.AddListenerForEvent("listener", "event3",
+ func(data EventData) {
+ numbers <- data.(uint64)
+ })
+ // collect received events
+ go sumReceivedNumbers(numbers, doneSum)
+ // go fire events
+ go fireEvents(evsw, "event1", doneSending1, uint64(1))
+ go fireEvents(evsw, "event2", doneSending2, uint64(1))
+ go fireEvents(evsw, "event3", doneSending3, uint64(1))
+ var checkSum uint64 = 0
+ checkSum += <-doneSending1
+ checkSum += <-doneSending2
+ checkSum += <-doneSending3
+ close(numbers)
+ eventSum := <-doneSum
+ if checkSum != eventSum {
+ t.Errorf("Not all messages sent were received.\n")
+ }
+}
+
+// TestAddDifferentListenerForDifferentEvents sets up an EventSwitch,
+// subscribes a first listener to three events, and subscribes a second
+// listener to two of those three events, and then sends a thousand integers
+// for each of the three events.
+func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
+ evsw := NewEventSwitch()
+ err := evsw.Start()
+ if err != nil {
+ t.Errorf("Failed to start EventSwitch, error: %v", err)
+ }
+ doneSum1 := make(chan uint64)
+ doneSum2 := make(chan uint64)
+ doneSending1 := make(chan uint64)
+ doneSending2 := make(chan uint64)
+ doneSending3 := make(chan uint64)
+ numbers1 := make(chan uint64, 4)
+ numbers2 := make(chan uint64, 4)
+ // subscribe two listener to three events
+ evsw.AddListenerForEvent("listener1", "event1",
+ func(data EventData) {
+ numbers1 <- data.(uint64)
+ })
+ evsw.AddListenerForEvent("listener1", "event2",
+ func(data EventData) {
+ numbers1 <- data.(uint64)
+ })
+ evsw.AddListenerForEvent("listener1", "event3",
+ func(data EventData) {
+ numbers1 <- data.(uint64)
+ })
+ evsw.AddListenerForEvent("listener2", "event2",
+ func(data EventData) {
+ numbers2 <- data.(uint64)
+ })
+ evsw.AddListenerForEvent("listener2", "event3",
+ func(data EventData) {
+ numbers2 <- data.(uint64)
+ })
+ // collect received events for listener1
+ go sumReceivedNumbers(numbers1, doneSum1)
+ // collect received events for listener2
+ go sumReceivedNumbers(numbers2, doneSum2)
+ // go fire events
+ go fireEvents(evsw, "event1", doneSending1, uint64(1))
+ go fireEvents(evsw, "event2", doneSending2, uint64(1001))
+ go fireEvents(evsw, "event3", doneSending3, uint64(2001))
+ checkSumEvent1 := <-doneSending1
+ checkSumEvent2 := <-doneSending2
+ checkSumEvent3 := <-doneSending3
+ checkSum1 := checkSumEvent1 + checkSumEvent2 + checkSumEvent3
+ checkSum2 := checkSumEvent2 + checkSumEvent3
+ close(numbers1)
+ close(numbers2)
+ eventSum1 := <-doneSum1
+ eventSum2 := <-doneSum2
+ if checkSum1 != eventSum1 ||
+ checkSum2 != eventSum2 {
+ t.Errorf("Not all messages sent were received for different listeners to different events.\n")
+ }
+}
+
+// TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to
+// two events, fires a thousand integers for the first event, then unsubscribes
+// the listener and fires a thousand integers for the second event.
+func TestAddAndRemoveListener(t *testing.T) {
+ evsw := NewEventSwitch()
+ err := evsw.Start()
+ if err != nil {
+ t.Errorf("Failed to start EventSwitch, error: %v", err)
+ }
+ doneSum1 := make(chan uint64)
+ doneSum2 := make(chan uint64)
+ doneSending1 := make(chan uint64)
+ doneSending2 := make(chan uint64)
+ numbers1 := make(chan uint64, 4)
+ numbers2 := make(chan uint64, 4)
+ // subscribe two listener to three events
+ evsw.AddListenerForEvent("listener", "event1",
+ func(data EventData) {
+ numbers1 <- data.(uint64)
+ })
+ evsw.AddListenerForEvent("listener", "event2",
+ func(data EventData) {
+ numbers2 <- data.(uint64)
+ })
+ // collect received events for event1
+ go sumReceivedNumbers(numbers1, doneSum1)
+ // collect received events for event2
+ go sumReceivedNumbers(numbers2, doneSum2)
+ // go fire events
+ go fireEvents(evsw, "event1", doneSending1, uint64(1))
+ checkSumEvent1 := <-doneSending1
+ // after sending all event1, unsubscribe for all events
+ evsw.RemoveListener("listener")
+ go fireEvents(evsw, "event2", doneSending2, uint64(1001))
+ checkSumEvent2 := <-doneSending2
+ close(numbers1)
+ close(numbers2)
+ eventSum1 := <-doneSum1
+ eventSum2 := <-doneSum2
+ if checkSumEvent1 != eventSum1 ||
+ // correct value asserted by preceding tests, suffices to be non-zero
+ checkSumEvent2 == uint64(0) ||
+ eventSum2 != uint64(0) {
+ t.Errorf("Not all messages sent were received or unsubscription did not register.\n")
+ }
+}
+
+// TestRemoveListener does basic tests on adding and removing
+func TestRemoveListener(t *testing.T) {
+ evsw := NewEventSwitch()
+ err := evsw.Start()
+ if err != nil {
+ t.Errorf("Failed to start EventSwitch, error: %v", err)
+ }
+ count := 10
+ sum1, sum2 := 0, 0
+ // add some listeners and make sure they work
+ evsw.AddListenerForEvent("listener", "event1",
+ func(data EventData) {
+ sum1++
+ })
+ evsw.AddListenerForEvent("listener", "event2",
+ func(data EventData) {
+ sum2++
+ })
+ for i := 0; i < count; i++ {
+ evsw.FireEvent("event1", true)
+ evsw.FireEvent("event2", true)
+ }
+ assert.Equal(t, count, sum1)
+ assert.Equal(t, count, sum2)
+
+ // remove one by event and make sure it is gone
+ evsw.RemoveListenerForEvent("event2", "listener")
+ for i := 0; i < count; i++ {
+ evsw.FireEvent("event1", true)
+ evsw.FireEvent("event2", true)
+ }
+ assert.Equal(t, count*2, sum1)
+ assert.Equal(t, count, sum2)
+
+ // remove the listener entirely and make sure both gone
+ evsw.RemoveListener("listener")
+ for i := 0; i < count; i++ {
+ evsw.FireEvent("event1", true)
+ evsw.FireEvent("event2", true)
+ }
+ assert.Equal(t, count*2, sum1)
+ assert.Equal(t, count, sum2)
+}
+
+// TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two
+// listeners to three events, and fires a thousand integers for each event.
+// These two listeners serve as the baseline validation while other listeners
+// are randomly subscribed and unsubscribed.
+// More precisely it randomly subscribes new listeners (different from the first
+// two listeners) to one of these three events. At the same time it starts
+// randomly unsubscribing these additional listeners from all events they are
+// at that point subscribed to.
+// NOTE: it is important to run this test with race conditions tracking on,
+// `go test -race`, to examine for possible race conditions.
+func TestRemoveListenersAsync(t *testing.T) {
+ evsw := NewEventSwitch()
+ err := evsw.Start()
+ if err != nil {
+ t.Errorf("Failed to start EventSwitch, error: %v", err)
+ }
+ doneSum1 := make(chan uint64)
+ doneSum2 := make(chan uint64)
+ doneSending1 := make(chan uint64)
+ doneSending2 := make(chan uint64)
+ doneSending3 := make(chan uint64)
+ numbers1 := make(chan uint64, 4)
+ numbers2 := make(chan uint64, 4)
+ // subscribe two listener to three events
+ evsw.AddListenerForEvent("listener1", "event1",
+ func(data EventData) {
+ numbers1 <- data.(uint64)
+ })
+ evsw.AddListenerForEvent("listener1", "event2",
+ func(data EventData) {
+ numbers1 <- data.(uint64)
+ })
+ evsw.AddListenerForEvent("listener1", "event3",
+ func(data EventData) {
+ numbers1 <- data.(uint64)
+ })
+ evsw.AddListenerForEvent("listener2", "event1",
+ func(data EventData) {
+ numbers2 <- data.(uint64)
+ })
+ evsw.AddListenerForEvent("listener2", "event2",
+ func(data EventData) {
+ numbers2 <- data.(uint64)
+ })
+ evsw.AddListenerForEvent("listener2", "event3",
+ func(data EventData) {
+ numbers2 <- data.(uint64)
+ })
+ // collect received events for event1
+ go sumReceivedNumbers(numbers1, doneSum1)
+ // collect received events for event2
+ go sumReceivedNumbers(numbers2, doneSum2)
+ addListenersStress := func() {
+ s1 := rand.NewSource(time.Now().UnixNano())
+ r1 := rand.New(s1)
+ for k := uint16(0); k < 400; k++ {
+ listenerNumber := r1.Intn(100) + 3
+ eventNumber := r1.Intn(3) + 1
+ go evsw.AddListenerForEvent(fmt.Sprintf("listener%v", listenerNumber),
+ fmt.Sprintf("event%v", eventNumber),
+ func(_ EventData) {})
+ }
+ }
+ removeListenersStress := func() {
+ s2 := rand.NewSource(time.Now().UnixNano())
+ r2 := rand.New(s2)
+ for k := uint16(0); k < 80; k++ {
+ listenerNumber := r2.Intn(100) + 3
+ go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber))
+ }
+ }
+ addListenersStress()
+ // go fire events
+ go fireEvents(evsw, "event1", doneSending1, uint64(1))
+ removeListenersStress()
+ go fireEvents(evsw, "event2", doneSending2, uint64(1001))
+ go fireEvents(evsw, "event3", doneSending3, uint64(2001))
+ checkSumEvent1 := <-doneSending1
+ checkSumEvent2 := <-doneSending2
+ checkSumEvent3 := <-doneSending3
+ checkSum := checkSumEvent1 + checkSumEvent2 + checkSumEvent3
+ close(numbers1)
+ close(numbers2)
+ eventSum1 := <-doneSum1
+ eventSum2 := <-doneSum2
+ if checkSum != eventSum1 ||
+ checkSum != eventSum2 {
+ t.Errorf("Not all messages sent were received.\n")
+ }
+}
+
+//------------------------------------------------------------------------------
+// Helper functions
+
+// sumReceivedNumbers takes two channels and adds all numbers received
+// until the receiving channel `numbers` is closed; it then sends the sum
+// on `doneSum` and closes that channel. Expected to be run in a go-routine.
+func sumReceivedNumbers(numbers, doneSum chan uint64) {
+ var sum uint64 = 0
+ for {
+ j, more := <-numbers
+ sum += j
+ if !more {
+ doneSum <- sum
+ close(doneSum)
+ return
+ }
+ }
+}
+
+// fireEvents takes an EventSwitch and fires a thousand integers under
+// a given `event` with the integers mootonically increasing from `offset`
+// to `offset` + 999. It additionally returns the addition of all integers
+// sent on `doneChan` for assertion that all events have been sent, and enabling
+// the test to assert all events have also been received.
+func fireEvents(evsw EventSwitch, event string, doneChan chan uint64,
+ offset uint64) {
+ var sentSum uint64 = 0
+ for i := offset; i <= offset+uint64(999); i++ {
+ sentSum += i
+ evsw.FireEvent(event, i)
+ }
+ doneChan <- sentSum
+ close(doneChan)
+}
diff --git a/libs/pubsub/example_test.go b/libs/pubsub/example_test.go
new file mode 100644
index 000000000..550b4447e
--- /dev/null
+++ b/libs/pubsub/example_test.go
@@ -0,0 +1,28 @@
+package pubsub_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/tendermint/tmlibs/log"
+
+ "github.com/tendermint/tendermint/libs/pubsub"
+ "github.com/tendermint/tendermint/libs/pubsub/query"
+)
+
+func TestExample(t *testing.T) {
+ s := pubsub.NewServer()
+ s.SetLogger(log.TestingLogger())
+ s.Start()
+ defer s.Stop()
+
+ ctx := context.Background()
+ ch := make(chan interface{}, 1)
+ err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch)
+ require.NoError(t, err)
+ err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]interface{}{"abci.account.name": "John"}))
+ require.NoError(t, err)
+ assertReceive(t, "Tombstone", ch)
+}
diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go
new file mode 100644
index 000000000..67f264ace
--- /dev/null
+++ b/libs/pubsub/pubsub.go
@@ -0,0 +1,342 @@
+// Package pubsub implements a pub-sub model with a single publisher (Server)
+// and multiple subscribers (clients).
+//
+// Though you can have multiple publishers by sharing a pointer to a server or
+// by giving the same channel to each publisher and publishing messages from
+// that channel (fan-in).
+//
+// Clients subscribe for messages, which could be of any type, using a query.
+// When some message is published, we match it with all queries. If there is a
+// match, this message will be pushed to all clients, subscribed to that query.
+// See query subpackage for our implementation.
+package pubsub
+
+import (
+ "context"
+ "errors"
+ "sync"
+
+ cmn "github.com/tendermint/tmlibs/common"
+)
+
+type operation int
+
+const (
+ sub operation = iota
+ pub
+ unsub
+ shutdown
+)
+
+var (
+ // ErrSubscriptionNotFound is returned when a client tries to unsubscribe
+ // from not existing subscription.
+ ErrSubscriptionNotFound = errors.New("subscription not found")
+
+ // ErrAlreadySubscribed is returned when a client tries to subscribe twice or
+ // more using the same query.
+ ErrAlreadySubscribed = errors.New("already subscribed")
+)
+
+// TagMap is used to associate tags to a message.
+// They can be queried by subscribers to choose messages they will received.
+type TagMap interface {
+ // Get returns the value for a key, or nil if no value is present.
+ // The ok result indicates whether value was found in the tags.
+ Get(key string) (value interface{}, ok bool)
+ // Len returns the number of tags.
+ Len() int
+}
+
+type tagMap map[string]interface{}
+
+type cmd struct {
+ op operation
+ query Query
+ ch chan<- interface{}
+ clientID string
+ msg interface{}
+ tags TagMap
+}
+
+// Query defines an interface for a query to be used for subscribing.
+type Query interface {
+ Matches(tags TagMap) bool
+ String() string
+}
+
+// Server allows clients to subscribe/unsubscribe for messages, publishing
+// messages with or without tags, and manages internal state.
+type Server struct {
+ cmn.BaseService
+
+ cmds chan cmd
+ cmdsCap int
+
+ mtx sync.RWMutex
+ subscriptions map[string]map[string]Query // subscriber -> query (string) -> Query
+}
+
+// Option sets a parameter for the server.
+type Option func(*Server)
+
+// NewTagMap constructs a new immutable tag set from a map.
+func NewTagMap(data map[string]interface{}) TagMap {
+ return tagMap(data)
+}
+
+// Get returns the value for a key, or nil if no value is present.
+// The ok result indicates whether value was found in the tags.
+func (ts tagMap) Get(key string) (value interface{}, ok bool) {
+ value, ok = ts[key]
+ return
+}
+
+// Len returns the number of tags.
+func (ts tagMap) Len() int {
+ return len(ts)
+}
+
+// NewServer returns a new server. See the commentary on the Option functions
+// for a detailed description of how to configure buffering. If no options are
+// provided, the resulting server's queue is unbuffered.
+func NewServer(options ...Option) *Server {
+ s := &Server{
+ subscriptions: make(map[string]map[string]Query),
+ }
+ s.BaseService = *cmn.NewBaseService(nil, "PubSub", s)
+
+ for _, option := range options {
+ option(s)
+ }
+
+ // if BufferCapacity option was not set, the channel is unbuffered
+ s.cmds = make(chan cmd, s.cmdsCap)
+
+ return s
+}
+
+// BufferCapacity allows you to specify capacity for the internal server's
+// queue. Since the server, given Y subscribers, could only process X messages,
+// this option could be used to survive spikes (e.g. high amount of
+// transactions during peak hours).
+func BufferCapacity(cap int) Option {
+ return func(s *Server) {
+ if cap > 0 {
+ s.cmdsCap = cap
+ }
+ }
+}
+
+// BufferCapacity returns capacity of the internal server's queue.
+func (s *Server) BufferCapacity() int {
+ return s.cmdsCap
+}
+
+// Subscribe creates a subscription for the given client. It accepts a channel
+// on which messages matching the given query can be received. An error will be
+// returned to the caller if the context is canceled or if subscription already
+// exist for pair clientID and query.
+func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error {
+ s.mtx.RLock()
+ clientSubscriptions, ok := s.subscriptions[clientID]
+ if ok {
+ _, ok = clientSubscriptions[query.String()]
+ }
+ s.mtx.RUnlock()
+ if ok {
+ return ErrAlreadySubscribed
+ }
+
+ select {
+ case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
+ s.mtx.Lock()
+ if _, ok = s.subscriptions[clientID]; !ok {
+ s.subscriptions[clientID] = make(map[string]Query)
+ }
+ s.subscriptions[clientID][query.String()] = query
+ s.mtx.Unlock()
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+}
+
+// Unsubscribe removes the subscription on the given query. An error will be
+// returned to the caller if the context is canceled or if subscription does
+// not exist.
+func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error {
+ var origQuery Query
+ s.mtx.RLock()
+ clientSubscriptions, ok := s.subscriptions[clientID]
+ if ok {
+ origQuery, ok = clientSubscriptions[query.String()]
+ }
+ s.mtx.RUnlock()
+ if !ok {
+ return ErrSubscriptionNotFound
+ }
+
+ // original query is used here because we're using pointers as map keys
+ select {
+ case s.cmds <- cmd{op: unsub, clientID: clientID, query: origQuery}:
+ s.mtx.Lock()
+ delete(clientSubscriptions, query.String())
+ s.mtx.Unlock()
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+}
+
+// UnsubscribeAll removes all client subscriptions. An error will be returned
+// to the caller if the context is canceled or if subscription does not exist.
+func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
+ s.mtx.RLock()
+ _, ok := s.subscriptions[clientID]
+ s.mtx.RUnlock()
+ if !ok {
+ return ErrSubscriptionNotFound
+ }
+
+ select {
+ case s.cmds <- cmd{op: unsub, clientID: clientID}:
+ s.mtx.Lock()
+ delete(s.subscriptions, clientID)
+ s.mtx.Unlock()
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+}
+
+// Publish publishes the given message. An error will be returned to the caller
+// if the context is canceled.
+func (s *Server) Publish(ctx context.Context, msg interface{}) error {
+ return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]interface{})))
+}
+
+// PublishWithTags publishes the given message with the set of tags. The set is
+// matched with clients queries. If there is a match, the message is sent to
+// the client.
+func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error {
+ select {
+ case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+}
+
+// OnStop implements Service.OnStop by shutting down the server.
+func (s *Server) OnStop() {
+ s.cmds <- cmd{op: shutdown}
+}
+
+// NOTE: not goroutine safe
+type state struct {
+ // query -> client -> ch
+ queries map[Query]map[string]chan<- interface{}
+ // client -> query -> struct{}
+ clients map[string]map[Query]struct{}
+}
+
+// OnStart implements Service.OnStart by starting the server.
+func (s *Server) OnStart() error {
+ go s.loop(state{
+ queries: make(map[Query]map[string]chan<- interface{}),
+ clients: make(map[string]map[Query]struct{}),
+ })
+ return nil
+}
+
+// OnReset implements Service.OnReset
+func (s *Server) OnReset() error {
+ return nil
+}
+
+func (s *Server) loop(state state) {
+loop:
+ for cmd := range s.cmds {
+ switch cmd.op {
+ case unsub:
+ if cmd.query != nil {
+ state.remove(cmd.clientID, cmd.query)
+ } else {
+ state.removeAll(cmd.clientID)
+ }
+ case shutdown:
+ for clientID := range state.clients {
+ state.removeAll(clientID)
+ }
+ break loop
+ case sub:
+ state.add(cmd.clientID, cmd.query, cmd.ch)
+ case pub:
+ state.send(cmd.msg, cmd.tags)
+ }
+ }
+}
+
+func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
+ // add query if needed
+ if _, ok := state.queries[q]; !ok {
+ state.queries[q] = make(map[string]chan<- interface{})
+ }
+
+ // create subscription
+ state.queries[q][clientID] = ch
+
+ // add client if needed
+ if _, ok := state.clients[clientID]; !ok {
+ state.clients[clientID] = make(map[Query]struct{})
+ }
+ state.clients[clientID][q] = struct{}{}
+}
+
+func (state *state) remove(clientID string, q Query) {
+ clientToChannelMap, ok := state.queries[q]
+ if !ok {
+ return
+ }
+
+ ch, ok := clientToChannelMap[clientID]
+ if ok {
+ close(ch)
+
+ delete(state.clients[clientID], q)
+
+ // if it not subscribed to anything else, remove the client
+ if len(state.clients[clientID]) == 0 {
+ delete(state.clients, clientID)
+ }
+
+ delete(state.queries[q], clientID)
+ }
+}
+
+func (state *state) removeAll(clientID string) {
+ queryMap, ok := state.clients[clientID]
+ if !ok {
+ return
+ }
+
+ for q := range queryMap {
+ ch := state.queries[q][clientID]
+ close(ch)
+
+ delete(state.queries[q], clientID)
+ }
+
+ delete(state.clients, clientID)
+}
+
+func (state *state) send(msg interface{}, tags TagMap) {
+ for q, clientToChannelMap := range state.queries {
+ if q.Matches(tags) {
+ for _, ch := range clientToChannelMap {
+ ch <- msg
+ }
+ }
+ }
+}
diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go
new file mode 100644
index 000000000..a39d015ce
--- /dev/null
+++ b/libs/pubsub/pubsub_test.go
@@ -0,0 +1,253 @@
+package pubsub_test
+
+import (
+ "context"
+ "fmt"
+ "runtime/debug"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/tendermint/tmlibs/log"
+
+ "github.com/tendermint/tendermint/libs/pubsub"
+ "github.com/tendermint/tendermint/libs/pubsub/query"
+)
+
+const (
+ clientID = "test-client"
+)
+
+func TestSubscribe(t *testing.T) {
+ s := pubsub.NewServer()
+ s.SetLogger(log.TestingLogger())
+ s.Start()
+ defer s.Stop()
+
+ ctx := context.Background()
+ ch := make(chan interface{}, 1)
+ err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
+ require.NoError(t, err)
+ err = s.Publish(ctx, "Ka-Zar")
+ require.NoError(t, err)
+ assertReceive(t, "Ka-Zar", ch)
+
+ err = s.Publish(ctx, "Quicksilver")
+ require.NoError(t, err)
+ assertReceive(t, "Quicksilver", ch)
+}
+
+func TestDifferentClients(t *testing.T) {
+ s := pubsub.NewServer()
+ s.SetLogger(log.TestingLogger())
+ s.Start()
+ defer s.Stop()
+
+ ctx := context.Background()
+ ch1 := make(chan interface{}, 1)
+ err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1)
+ require.NoError(t, err)
+ err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"}))
+ require.NoError(t, err)
+ assertReceive(t, "Iceman", ch1)
+
+ ch2 := make(chan interface{}, 1)
+ err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2)
+ require.NoError(t, err)
+ err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}))
+ require.NoError(t, err)
+ assertReceive(t, "Ultimo", ch1)
+ assertReceive(t, "Ultimo", ch2)
+
+ ch3 := make(chan interface{}, 1)
+ err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3)
+ require.NoError(t, err)
+ err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewRoundStep"}))
+ require.NoError(t, err)
+ assert.Zero(t, len(ch3))
+}
+
+func TestClientSubscribesTwice(t *testing.T) {
+ s := pubsub.NewServer()
+ s.SetLogger(log.TestingLogger())
+ s.Start()
+ defer s.Stop()
+
+ ctx := context.Background()
+ q := query.MustParse("tm.events.type='NewBlock'")
+
+ ch1 := make(chan interface{}, 1)
+ err := s.Subscribe(ctx, clientID, q, ch1)
+ require.NoError(t, err)
+ err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"}))
+ require.NoError(t, err)
+ assertReceive(t, "Goblin Queen", ch1)
+
+ ch2 := make(chan interface{}, 1)
+ err = s.Subscribe(ctx, clientID, q, ch2)
+ require.Error(t, err)
+
+ err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"}))
+ require.NoError(t, err)
+ assertReceive(t, "Spider-Man", ch1)
+}
+
+func TestUnsubscribe(t *testing.T) {
+ s := pubsub.NewServer()
+ s.SetLogger(log.TestingLogger())
+ s.Start()
+ defer s.Stop()
+
+ ctx := context.Background()
+ ch := make(chan interface{})
+ err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch)
+ require.NoError(t, err)
+ err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
+ require.NoError(t, err)
+
+ err = s.Publish(ctx, "Nick Fury")
+ require.NoError(t, err)
+ assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe")
+
+ _, ok := <-ch
+ assert.False(t, ok)
+}
+
+func TestResubscribe(t *testing.T) {
+ s := pubsub.NewServer()
+ s.SetLogger(log.TestingLogger())
+ s.Start()
+ defer s.Stop()
+
+ ctx := context.Background()
+ ch := make(chan interface{})
+ err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
+ require.NoError(t, err)
+ err = s.Unsubscribe(ctx, clientID, query.Empty{})
+ require.NoError(t, err)
+ ch = make(chan interface{})
+ err = s.Subscribe(ctx, clientID, query.Empty{}, ch)
+ require.NoError(t, err)
+
+ err = s.Publish(ctx, "Cable")
+ require.NoError(t, err)
+ assertReceive(t, "Cable", ch)
+}
+
+func TestUnsubscribeAll(t *testing.T) {
+ s := pubsub.NewServer()
+ s.SetLogger(log.TestingLogger())
+ s.Start()
+ defer s.Stop()
+
+ ctx := context.Background()
+ ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1)
+ err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch1)
+ require.NoError(t, err)
+ err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"), ch2)
+ require.NoError(t, err)
+
+ err = s.UnsubscribeAll(ctx, clientID)
+ require.NoError(t, err)
+
+ err = s.Publish(ctx, "Nick Fury")
+ require.NoError(t, err)
+ assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll")
+ assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll")
+
+ _, ok := <-ch1
+ assert.False(t, ok)
+ _, ok = <-ch2
+ assert.False(t, ok)
+}
+
+func TestBufferCapacity(t *testing.T) {
+ s := pubsub.NewServer(pubsub.BufferCapacity(2))
+ s.SetLogger(log.TestingLogger())
+
+ assert.Equal(t, 2, s.BufferCapacity())
+
+ ctx := context.Background()
+ err := s.Publish(ctx, "Nighthawk")
+ require.NoError(t, err)
+ err = s.Publish(ctx, "Sage")
+ require.NoError(t, err)
+
+ ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
+ defer cancel()
+ err = s.Publish(ctx, "Ironclad")
+ if assert.Error(t, err) {
+ assert.Equal(t, context.DeadlineExceeded, err)
+ }
+}
+
+func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) }
+func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) }
+func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) }
+
+func Benchmark10ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(10, b) }
+func Benchmark100ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(100, b) }
+func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) }
+
+func benchmarkNClients(n int, b *testing.B) {
+ s := pubsub.NewServer()
+ s.Start()
+ defer s.Stop()
+
+ ctx := context.Background()
+ for i := 0; i < n; i++ {
+ ch := make(chan interface{})
+ go func() {
+ for range ch {
+ }
+ }()
+ s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), ch)
+ }
+
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i}))
+ }
+}
+
+func benchmarkNClientsOneQuery(n int, b *testing.B) {
+ s := pubsub.NewServer()
+ s.Start()
+ defer s.Stop()
+
+ ctx := context.Background()
+ q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1")
+ for i := 0; i < n; i++ {
+ ch := make(chan interface{})
+ go func() {
+ for range ch {
+ }
+ }()
+ s.Subscribe(ctx, clientID, q, ch)
+ }
+
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}))
+ }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+/// HELPERS
+///////////////////////////////////////////////////////////////////////////////
+
+func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) {
+ select {
+ case actual := <-ch:
+ if actual != nil {
+ assert.Equal(t, expected, actual, msgAndArgs...)
+ }
+ case <-time.After(1 * time.Second):
+ t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected)
+ debug.PrintStack()
+ }
+}
diff --git a/libs/pubsub/query/Makefile b/libs/pubsub/query/Makefile
new file mode 100644
index 000000000..91030ef09
--- /dev/null
+++ b/libs/pubsub/query/Makefile
@@ -0,0 +1,11 @@
+gen_query_parser:
+ @go get github.com/pointlander/peg
+ peg -inline -switch query.peg
+
+fuzzy_test:
+ @go get github.com/dvyukov/go-fuzz/go-fuzz
+ @go get github.com/dvyukov/go-fuzz/go-fuzz-build
+ go-fuzz-build github.com/tendermint/tendermint/libs/pubsub/query/fuzz_test
+ go-fuzz -bin=./fuzz_test-fuzz.zip -workdir=./fuzz_test/output
+
+.PHONY: gen_query_parser fuzzy_test
diff --git a/libs/pubsub/query/empty.go b/libs/pubsub/query/empty.go
new file mode 100644
index 000000000..17d7acefa
--- /dev/null
+++ b/libs/pubsub/query/empty.go
@@ -0,0 +1,16 @@
+package query
+
+import "github.com/tendermint/tendermint/libs/pubsub"
+
+// Empty query matches any set of tags.
+type Empty struct {
+}
+
+// Matches always returns true.
+func (Empty) Matches(tags pubsub.TagMap) bool {
+ return true
+}
+
+func (Empty) String() string {
+ return "empty"
+}
diff --git a/libs/pubsub/query/empty_test.go b/libs/pubsub/query/empty_test.go
new file mode 100644
index 000000000..9c82f73ed
--- /dev/null
+++ b/libs/pubsub/query/empty_test.go
@@ -0,0 +1,18 @@
+package query_test
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/tendermint/tendermint/libs/pubsub"
+ "github.com/tendermint/tendermint/libs/pubsub/query"
+)
+
+func TestEmptyQueryMatchesAnything(t *testing.T) {
+ q := query.Empty{}
+ assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{})))
+ assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Asher": "Roth"})))
+ assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66})))
+ assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66, "Billy": "Blue"})))
+}
diff --git a/libs/pubsub/query/fuzz_test/main.go b/libs/pubsub/query/fuzz_test/main.go
new file mode 100644
index 000000000..7a46116b5
--- /dev/null
+++ b/libs/pubsub/query/fuzz_test/main.go
@@ -0,0 +1,30 @@
+package fuzz_test
+
+import (
+ "fmt"
+
+ "github.com/tendermint/tendermint/libs/pubsub/query"
+)
+
+func Fuzz(data []byte) int {
+ sdata := string(data)
+ q0, err := query.New(sdata)
+ if err != nil {
+ return 0
+ }
+
+ sdata1 := q0.String()
+ q1, err := query.New(sdata1)
+ if err != nil {
+ panic(err)
+ }
+
+ sdata2 := q1.String()
+ if sdata1 != sdata2 {
+ fmt.Printf("q0: %q\n", sdata1)
+ fmt.Printf("q1: %q\n", sdata2)
+ panic("query changed")
+ }
+
+ return 1
+}
diff --git a/libs/pubsub/query/parser_test.go b/libs/pubsub/query/parser_test.go
new file mode 100644
index 000000000..708dee484
--- /dev/null
+++ b/libs/pubsub/query/parser_test.go
@@ -0,0 +1,92 @@
+package query_test
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/tendermint/tendermint/libs/pubsub/query"
+)
+
+// TODO: fuzzy testing?
+func TestParser(t *testing.T) {
+ cases := []struct {
+ query string
+ valid bool
+ }{
+ {"tm.events.type='NewBlock'", true},
+ {"tm.events.type = 'NewBlock'", true},
+ {"tm.events.name = ''", true},
+ {"tm.events.type='TIME'", true},
+ {"tm.events.type='DATE'", true},
+ {"tm.events.type='='", true},
+ {"tm.events.type='TIME", false},
+ {"tm.events.type=TIME'", false},
+ {"tm.events.type==", false},
+ {"tm.events.type=NewBlock", false},
+ {">==", false},
+ {"tm.events.type 'NewBlock' =", false},
+ {"tm.events.type>'NewBlock'", false},
+ {"", false},
+ {"=", false},
+ {"='NewBlock'", false},
+ {"tm.events.type=", false},
+
+ {"tm.events.typeNewBlock", false},
+ {"tm.events.type'NewBlock'", false},
+ {"'NewBlock'", false},
+ {"NewBlock", false},
+ {"", false},
+
+ {"tm.events.type='NewBlock' AND abci.account.name='Igor'", true},
+ {"tm.events.type='NewBlock' AND", false},
+ {"tm.events.type='NewBlock' AN", false},
+ {"tm.events.type='NewBlock' AN tm.events.type='NewBlockHeader'", false},
+ {"AND tm.events.type='NewBlock' ", false},
+
+ {"abci.account.name CONTAINS 'Igor'", true},
+
+ {"tx.date > DATE 2013-05-03", true},
+ {"tx.date < DATE 2013-05-03", true},
+ {"tx.date <= DATE 2013-05-03", true},
+ {"tx.date >= DATE 2013-05-03", true},
+ {"tx.date >= DAT 2013-05-03", false},
+ {"tx.date <= DATE2013-05-03", false},
+ {"tx.date <= DATE -05-03", false},
+ {"tx.date >= DATE 20130503", false},
+ {"tx.date >= DATE 2013+01-03", false},
+ // incorrect year, month, day
+ {"tx.date >= DATE 0013-01-03", false},
+ {"tx.date >= DATE 2013-31-03", false},
+ {"tx.date >= DATE 2013-01-83", false},
+
+ {"tx.date > TIME 2013-05-03T14:45:00+07:00", true},
+ {"tx.date < TIME 2013-05-03T14:45:00-02:00", true},
+ {"tx.date <= TIME 2013-05-03T14:45:00Z", true},
+ {"tx.date >= TIME 2013-05-03T14:45:00Z", true},
+ {"tx.date >= TIME2013-05-03T14:45:00Z", false},
+ {"tx.date = IME 2013-05-03T14:45:00Z", false},
+ {"tx.date = TIME 2013-05-:45:00Z", false},
+ {"tx.date >= TIME 2013-05-03T14:45:00", false},
+ {"tx.date >= TIME 0013-00-00T14:45:00Z", false},
+ {"tx.date >= TIME 2013+05=03T14:45:00Z", false},
+
+ {"account.balance=100", true},
+ {"account.balance >= 200", true},
+ {"account.balance >= -300", false},
+ {"account.balance >>= 400", false},
+ {"account.balance=33.22.1", false},
+
+ {"hash='136E18F7E4C348B780CF873A0BF43922E5BAFA63'", true},
+ {"hash=136E18F7E4C348B780CF873A0BF43922E5BAFA63", false},
+ }
+
+ for _, c := range cases {
+ _, err := query.New(c.query)
+ if c.valid {
+ assert.NoErrorf(t, err, "Query was '%s'", c.query)
+ } else {
+ assert.Errorf(t, err, "Query was '%s'", c.query)
+ }
+ }
+}
diff --git a/libs/pubsub/query/query.go b/libs/pubsub/query/query.go
new file mode 100644
index 000000000..a900d9838
--- /dev/null
+++ b/libs/pubsub/query/query.go
@@ -0,0 +1,345 @@
+// Package query provides a parser for a custom query format:
+//
+// abci.invoice.number=22 AND abci.invoice.owner=Ivan
+//
+// See query.peg for the grammar, which is a https://en.wikipedia.org/wiki/Parsing_expression_grammar.
+// More: https://github.com/PhilippeSigaud/Pegged/wiki/PEG-Basics
+//
+// It has a support for numbers (integer and floating point), dates and times.
+package query
+
+import (
+ "fmt"
+ "reflect"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/tendermint/tendermint/libs/pubsub"
+)
+
+// Query holds the query string and the query parser.
+type Query struct {
+ str string
+ parser *QueryParser
+}
+
+// Condition represents a single condition within a query and consists of tag
+// (e.g. "tx.gas"), operator (e.g. "=") and operand (e.g. "7").
+type Condition struct {
+ Tag string
+ Op Operator
+ Operand interface{}
+}
+
+// New parses the given string and returns a query or error if the string is
+// invalid.
+func New(s string) (*Query, error) {
+ p := &QueryParser{Buffer: fmt.Sprintf(`"%s"`, s)}
+ p.Init()
+ if err := p.Parse(); err != nil {
+ return nil, err
+ }
+ return &Query{str: s, parser: p}, nil
+}
+
+// MustParse turns the given string into a query or panics; for tests or others
+// cases where you know the string is valid.
+func MustParse(s string) *Query {
+ q, err := New(s)
+ if err != nil {
+ panic(fmt.Sprintf("failed to parse %s: %v", s, err))
+ }
+ return q
+}
+
+// String returns the original string.
+func (q *Query) String() string {
+ return q.str
+}
+
+// Operator is an operator that defines some kind of relation between tag and
+// operand (equality, etc.).
+type Operator uint8
+
+const (
+ // "<="
+ OpLessEqual Operator = iota
+ // ">="
+ OpGreaterEqual
+ // "<"
+ OpLess
+ // ">"
+ OpGreater
+ // "="
+ OpEqual
+ // "CONTAINS"; used to check if a string contains a certain sub string.
+ OpContains
+)
+
+// Conditions returns a list of conditions.
+func (q *Query) Conditions() []Condition {
+ conditions := make([]Condition, 0)
+
+ buffer, begin, end := q.parser.Buffer, 0, 0
+
+ var tag string
+ var op Operator
+
+ // tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7")
+ for _, token := range q.parser.Tokens() {
+ switch token.pegRule {
+
+ case rulePegText:
+ begin, end = int(token.begin), int(token.end)
+ case ruletag:
+ tag = buffer[begin:end]
+ case rulele:
+ op = OpLessEqual
+ case rulege:
+ op = OpGreaterEqual
+ case rulel:
+ op = OpLess
+ case ruleg:
+ op = OpGreater
+ case ruleequal:
+ op = OpEqual
+ case rulecontains:
+ op = OpContains
+ case rulevalue:
+ // strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock")
+ valueWithoutSingleQuotes := buffer[begin+1 : end-1]
+ conditions = append(conditions, Condition{tag, op, valueWithoutSingleQuotes})
+ case rulenumber:
+ number := buffer[begin:end]
+ if strings.Contains(number, ".") { // if it looks like a floating-point number
+ value, err := strconv.ParseFloat(number, 64)
+ if err != nil {
+ panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number))
+ }
+ conditions = append(conditions, Condition{tag, op, value})
+ } else {
+ value, err := strconv.ParseInt(number, 10, 64)
+ if err != nil {
+ panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number))
+ }
+ conditions = append(conditions, Condition{tag, op, value})
+ }
+ case ruletime:
+ value, err := time.Parse(time.RFC3339, buffer[begin:end])
+ if err != nil {
+ panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end]))
+ }
+ conditions = append(conditions, Condition{tag, op, value})
+ case ruledate:
+ value, err := time.Parse("2006-01-02", buffer[begin:end])
+ if err != nil {
+ panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end]))
+ }
+ conditions = append(conditions, Condition{tag, op, value})
+ }
+ }
+
+ return conditions
+}
+
+// Matches returns true if the query matches the given set of tags, false otherwise.
+//
+// For example, query "name=John" matches tags = {"name": "John"}. More
+// examples could be found in parser_test.go and query_test.go.
+func (q *Query) Matches(tags pubsub.TagMap) bool {
+ if tags.Len() == 0 {
+ return false
+ }
+
+ buffer, begin, end := q.parser.Buffer, 0, 0
+
+ var tag string
+ var op Operator
+
+ // tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7")
+ for _, token := range q.parser.Tokens() {
+ switch token.pegRule {
+
+ case rulePegText:
+ begin, end = int(token.begin), int(token.end)
+ case ruletag:
+ tag = buffer[begin:end]
+ case rulele:
+ op = OpLessEqual
+ case rulege:
+ op = OpGreaterEqual
+ case rulel:
+ op = OpLess
+ case ruleg:
+ op = OpGreater
+ case ruleequal:
+ op = OpEqual
+ case rulecontains:
+ op = OpContains
+ case rulevalue:
+ // strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock")
+ valueWithoutSingleQuotes := buffer[begin+1 : end-1]
+
+ // see if the triplet (tag, operator, operand) matches any tag
+ // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
+ if !match(tag, op, reflect.ValueOf(valueWithoutSingleQuotes), tags) {
+ return false
+ }
+ case rulenumber:
+ number := buffer[begin:end]
+ if strings.Contains(number, ".") { // if it looks like a floating-point number
+ value, err := strconv.ParseFloat(number, 64)
+ if err != nil {
+ panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number))
+ }
+ if !match(tag, op, reflect.ValueOf(value), tags) {
+ return false
+ }
+ } else {
+ value, err := strconv.ParseInt(number, 10, 64)
+ if err != nil {
+ panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number))
+ }
+ if !match(tag, op, reflect.ValueOf(value), tags) {
+ return false
+ }
+ }
+ case ruletime:
+ value, err := time.Parse(time.RFC3339, buffer[begin:end])
+ if err != nil {
+ panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end]))
+ }
+ if !match(tag, op, reflect.ValueOf(value), tags) {
+ return false
+ }
+ case ruledate:
+ value, err := time.Parse("2006-01-02", buffer[begin:end])
+ if err != nil {
+ panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end]))
+ }
+ if !match(tag, op, reflect.ValueOf(value), tags) {
+ return false
+ }
+ }
+ }
+
+ return true
+}
+
+// match returns true if the given triplet (tag, operator, operand) matches any tag.
+//
+// First, it looks up the tag in tags and if it finds one, tries to compare the
+// value from it to the operand using the operator.
+//
+// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
+func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) bool {
+ // look up the tag from the query in tags
+ value, ok := tags.Get(tag)
+ if !ok {
+ return false
+ }
+ switch operand.Kind() {
+ case reflect.Struct: // time
+ operandAsTime := operand.Interface().(time.Time)
+ v, ok := value.(time.Time)
+ if !ok { // if value from tags is not time.Time
+ return false
+ }
+ switch op {
+ case OpLessEqual:
+ return v.Before(operandAsTime) || v.Equal(operandAsTime)
+ case OpGreaterEqual:
+ return v.Equal(operandAsTime) || v.After(operandAsTime)
+ case OpLess:
+ return v.Before(operandAsTime)
+ case OpGreater:
+ return v.After(operandAsTime)
+ case OpEqual:
+ return v.Equal(operandAsTime)
+ }
+ case reflect.Float64:
+ operandFloat64 := operand.Interface().(float64)
+ var v float64
+ // try our best to convert value from tags to float64
+ switch vt := value.(type) {
+ case float64:
+ v = vt
+ case float32:
+ v = float64(vt)
+ case int:
+ v = float64(vt)
+ case int8:
+ v = float64(vt)
+ case int16:
+ v = float64(vt)
+ case int32:
+ v = float64(vt)
+ case int64:
+ v = float64(vt)
+ default: // fail for all other types
+ panic(fmt.Sprintf("Incomparable types: %T (%v) vs float64 (%v)", value, value, operandFloat64))
+ }
+ switch op {
+ case OpLessEqual:
+ return v <= operandFloat64
+ case OpGreaterEqual:
+ return v >= operandFloat64
+ case OpLess:
+ return v < operandFloat64
+ case OpGreater:
+ return v > operandFloat64
+ case OpEqual:
+ return v == operandFloat64
+ }
+ case reflect.Int64:
+ operandInt := operand.Interface().(int64)
+ var v int64
+ // try our best to convert value from tags to int64
+ switch vt := value.(type) {
+ case int64:
+ v = vt
+ case int8:
+ v = int64(vt)
+ case int16:
+ v = int64(vt)
+ case int32:
+ v = int64(vt)
+ case int:
+ v = int64(vt)
+ case float64:
+ v = int64(vt)
+ case float32:
+ v = int64(vt)
+ default: // fail for all other types
+ panic(fmt.Sprintf("Incomparable types: %T (%v) vs int64 (%v)", value, value, operandInt))
+ }
+ switch op {
+ case OpLessEqual:
+ return v <= operandInt
+ case OpGreaterEqual:
+ return v >= operandInt
+ case OpLess:
+ return v < operandInt
+ case OpGreater:
+ return v > operandInt
+ case OpEqual:
+ return v == operandInt
+ }
+ case reflect.String:
+ v, ok := value.(string)
+ if !ok { // if value from tags is not string
+ return false
+ }
+ switch op {
+ case OpEqual:
+ return v == operand.String()
+ case OpContains:
+ return strings.Contains(v, operand.String())
+ }
+ default:
+ panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind()))
+ }
+
+ return false
+}
diff --git a/libs/pubsub/query/query.peg b/libs/pubsub/query/query.peg
new file mode 100644
index 000000000..739892e4f
--- /dev/null
+++ b/libs/pubsub/query/query.peg
@@ -0,0 +1,33 @@
+package query
+
+type QueryParser Peg {
+}
+
+e <- '\"' condition ( ' '+ and ' '+ condition )* '\"' !.
+
+condition <- tag ' '* (le ' '* (number / time / date)
+ / ge ' '* (number / time / date)
+ / l ' '* (number / time / date)
+ / g ' '* (number / time / date)
+ / equal ' '* (number / time / date / value)
+ / contains ' '* value
+ )
+
+tag <- < (![ \t\n\r\\()"'=><] .)+ >
+value <- < '\'' (!["'] .)* '\''>
+number <- < ('0'
+ / [1-9] digit* ('.' digit*)?) >
+digit <- [0-9]
+time <- "TIME " < year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit (('-' / '+') digit digit ':' digit digit / 'Z') >
+date <- "DATE " < year '-' month '-' day >
+year <- ('1' / '2') digit digit digit
+month <- ('0' / '1') digit
+day <- ('0' / '1' / '2' / '3') digit
+and <- "AND"
+
+equal <- "="
+contains <- "CONTAINS"
+le <- "<="
+ge <- ">="
+l <- "<"
+g <- ">"
diff --git a/libs/pubsub/query/query.peg.go b/libs/pubsub/query/query.peg.go
new file mode 100644
index 000000000..c86e4a47f
--- /dev/null
+++ b/libs/pubsub/query/query.peg.go
@@ -0,0 +1,1553 @@
+// nolint
+package query
+
+import (
+ "fmt"
+ "math"
+ "sort"
+ "strconv"
+)
+
+const endSymbol rune = 1114112
+
+/* The rule types inferred from the grammar are below. */
+type pegRule uint8
+
+const (
+ ruleUnknown pegRule = iota
+ rulee
+ rulecondition
+ ruletag
+ rulevalue
+ rulenumber
+ ruledigit
+ ruletime
+ ruledate
+ ruleyear
+ rulemonth
+ ruleday
+ ruleand
+ ruleequal
+ rulecontains
+ rulele
+ rulege
+ rulel
+ ruleg
+ rulePegText
+)
+
+var rul3s = [...]string{
+ "Unknown",
+ "e",
+ "condition",
+ "tag",
+ "value",
+ "number",
+ "digit",
+ "time",
+ "date",
+ "year",
+ "month",
+ "day",
+ "and",
+ "equal",
+ "contains",
+ "le",
+ "ge",
+ "l",
+ "g",
+ "PegText",
+}
+
+type token32 struct {
+ pegRule
+ begin, end uint32
+}
+
+func (t *token32) String() string {
+ return fmt.Sprintf("\x1B[34m%v\x1B[m %v %v", rul3s[t.pegRule], t.begin, t.end)
+}
+
+type node32 struct {
+ token32
+ up, next *node32
+}
+
+func (node *node32) print(pretty bool, buffer string) {
+ var print func(node *node32, depth int)
+ print = func(node *node32, depth int) {
+ for node != nil {
+ for c := 0; c < depth; c++ {
+ fmt.Printf(" ")
+ }
+ rule := rul3s[node.pegRule]
+ quote := strconv.Quote(string(([]rune(buffer)[node.begin:node.end])))
+ if !pretty {
+ fmt.Printf("%v %v\n", rule, quote)
+ } else {
+ fmt.Printf("\x1B[34m%v\x1B[m %v\n", rule, quote)
+ }
+ if node.up != nil {
+ print(node.up, depth+1)
+ }
+ node = node.next
+ }
+ }
+ print(node, 0)
+}
+
+func (node *node32) Print(buffer string) {
+ node.print(false, buffer)
+}
+
+func (node *node32) PrettyPrint(buffer string) {
+ node.print(true, buffer)
+}
+
+type tokens32 struct {
+ tree []token32
+}
+
+func (t *tokens32) Trim(length uint32) {
+ t.tree = t.tree[:length]
+}
+
+func (t *tokens32) Print() {
+ for _, token := range t.tree {
+ fmt.Println(token.String())
+ }
+}
+
+func (t *tokens32) AST() *node32 {
+ type element struct {
+ node *node32
+ down *element
+ }
+ tokens := t.Tokens()
+ var stack *element
+ for _, token := range tokens {
+ if token.begin == token.end {
+ continue
+ }
+ node := &node32{token32: token}
+ for stack != nil && stack.node.begin >= token.begin && stack.node.end <= token.end {
+ stack.node.next = node.up
+ node.up = stack.node
+ stack = stack.down
+ }
+ stack = &element{node: node, down: stack}
+ }
+ if stack != nil {
+ return stack.node
+ }
+ return nil
+}
+
+func (t *tokens32) PrintSyntaxTree(buffer string) {
+ t.AST().Print(buffer)
+}
+
+func (t *tokens32) PrettyPrintSyntaxTree(buffer string) {
+ t.AST().PrettyPrint(buffer)
+}
+
+func (t *tokens32) Add(rule pegRule, begin, end, index uint32) {
+ if tree := t.tree; int(index) >= len(tree) {
+ expanded := make([]token32, 2*len(tree))
+ copy(expanded, tree)
+ t.tree = expanded
+ }
+ t.tree[index] = token32{
+ pegRule: rule,
+ begin: begin,
+ end: end,
+ }
+}
+
+func (t *tokens32) Tokens() []token32 {
+ return t.tree
+}
+
+type QueryParser struct {
+ Buffer string
+ buffer []rune
+ rules [20]func() bool
+ parse func(rule ...int) error
+ reset func()
+ Pretty bool
+ tokens32
+}
+
+func (p *QueryParser) Parse(rule ...int) error {
+ return p.parse(rule...)
+}
+
+func (p *QueryParser) Reset() {
+ p.reset()
+}
+
+type textPosition struct {
+ line, symbol int
+}
+
+type textPositionMap map[int]textPosition
+
+func translatePositions(buffer []rune, positions []int) textPositionMap {
+ length, translations, j, line, symbol := len(positions), make(textPositionMap, len(positions)), 0, 1, 0
+ sort.Ints(positions)
+
+search:
+ for i, c := range buffer {
+ if c == '\n' {
+ line, symbol = line+1, 0
+ } else {
+ symbol++
+ }
+ if i == positions[j] {
+ translations[positions[j]] = textPosition{line, symbol}
+ for j++; j < length; j++ {
+ if i != positions[j] {
+ continue search
+ }
+ }
+ break search
+ }
+ }
+
+ return translations
+}
+
+type parseError struct {
+ p *QueryParser
+ max token32
+}
+
+func (e *parseError) Error() string {
+ tokens, error := []token32{e.max}, "\n"
+ positions, p := make([]int, 2*len(tokens)), 0
+ for _, token := range tokens {
+ positions[p], p = int(token.begin), p+1
+ positions[p], p = int(token.end), p+1
+ }
+ translations := translatePositions(e.p.buffer, positions)
+ format := "parse error near %v (line %v symbol %v - line %v symbol %v):\n%v\n"
+ if e.p.Pretty {
+ format = "parse error near \x1B[34m%v\x1B[m (line %v symbol %v - line %v symbol %v):\n%v\n"
+ }
+ for _, token := range tokens {
+ begin, end := int(token.begin), int(token.end)
+ error += fmt.Sprintf(format,
+ rul3s[token.pegRule],
+ translations[begin].line, translations[begin].symbol,
+ translations[end].line, translations[end].symbol,
+ strconv.Quote(string(e.p.buffer[begin:end])))
+ }
+
+ return error
+}
+
+func (p *QueryParser) PrintSyntaxTree() {
+ if p.Pretty {
+ p.tokens32.PrettyPrintSyntaxTree(p.Buffer)
+ } else {
+ p.tokens32.PrintSyntaxTree(p.Buffer)
+ }
+}
+
+func (p *QueryParser) Init() {
+ var (
+ max token32
+ position, tokenIndex uint32
+ buffer []rune
+ )
+ p.reset = func() {
+ max = token32{}
+ position, tokenIndex = 0, 0
+
+ p.buffer = []rune(p.Buffer)
+ if len(p.buffer) == 0 || p.buffer[len(p.buffer)-1] != endSymbol {
+ p.buffer = append(p.buffer, endSymbol)
+ }
+ buffer = p.buffer
+ }
+ p.reset()
+
+ _rules := p.rules
+ tree := tokens32{tree: make([]token32, math.MaxInt16)}
+ p.parse = func(rule ...int) error {
+ r := 1
+ if len(rule) > 0 {
+ r = rule[0]
+ }
+ matches := p.rules[r]()
+ p.tokens32 = tree
+ if matches {
+ p.Trim(tokenIndex)
+ return nil
+ }
+ return &parseError{p, max}
+ }
+
+ add := func(rule pegRule, begin uint32) {
+ tree.Add(rule, begin, position, tokenIndex)
+ tokenIndex++
+ if begin != position && position > max.end {
+ max = token32{rule, begin, position}
+ }
+ }
+
+ matchDot := func() bool {
+ if buffer[position] != endSymbol {
+ position++
+ return true
+ }
+ return false
+ }
+
+ /*matchChar := func(c byte) bool {
+ if buffer[position] == c {
+ position++
+ return true
+ }
+ return false
+ }*/
+
+ /*matchRange := func(lower byte, upper byte) bool {
+ if c := buffer[position]; c >= lower && c <= upper {
+ position++
+ return true
+ }
+ return false
+ }*/
+
+ _rules = [...]func() bool{
+ nil,
+ /* 0 e <- <('"' condition (' '+ and ' '+ condition)* '"' !.)> */
+ func() bool {
+ position0, tokenIndex0 := position, tokenIndex
+ {
+ position1 := position
+ if buffer[position] != rune('"') {
+ goto l0
+ }
+ position++
+ if !_rules[rulecondition]() {
+ goto l0
+ }
+ l2:
+ {
+ position3, tokenIndex3 := position, tokenIndex
+ if buffer[position] != rune(' ') {
+ goto l3
+ }
+ position++
+ l4:
+ {
+ position5, tokenIndex5 := position, tokenIndex
+ if buffer[position] != rune(' ') {
+ goto l5
+ }
+ position++
+ goto l4
+ l5:
+ position, tokenIndex = position5, tokenIndex5
+ }
+ {
+ position6 := position
+ {
+ position7, tokenIndex7 := position, tokenIndex
+ if buffer[position] != rune('a') {
+ goto l8
+ }
+ position++
+ goto l7
+ l8:
+ position, tokenIndex = position7, tokenIndex7
+ if buffer[position] != rune('A') {
+ goto l3
+ }
+ position++
+ }
+ l7:
+ {
+ position9, tokenIndex9 := position, tokenIndex
+ if buffer[position] != rune('n') {
+ goto l10
+ }
+ position++
+ goto l9
+ l10:
+ position, tokenIndex = position9, tokenIndex9
+ if buffer[position] != rune('N') {
+ goto l3
+ }
+ position++
+ }
+ l9:
+ {
+ position11, tokenIndex11 := position, tokenIndex
+ if buffer[position] != rune('d') {
+ goto l12
+ }
+ position++
+ goto l11
+ l12:
+ position, tokenIndex = position11, tokenIndex11
+ if buffer[position] != rune('D') {
+ goto l3
+ }
+ position++
+ }
+ l11:
+ add(ruleand, position6)
+ }
+ if buffer[position] != rune(' ') {
+ goto l3
+ }
+ position++
+ l13:
+ {
+ position14, tokenIndex14 := position, tokenIndex
+ if buffer[position] != rune(' ') {
+ goto l14
+ }
+ position++
+ goto l13
+ l14:
+ position, tokenIndex = position14, tokenIndex14
+ }
+ if !_rules[rulecondition]() {
+ goto l3
+ }
+ goto l2
+ l3:
+ position, tokenIndex = position3, tokenIndex3
+ }
+ if buffer[position] != rune('"') {
+ goto l0
+ }
+ position++
+ {
+ position15, tokenIndex15 := position, tokenIndex
+ if !matchDot() {
+ goto l15
+ }
+ goto l0
+ l15:
+ position, tokenIndex = position15, tokenIndex15
+ }
+ add(rulee, position1)
+ }
+ return true
+ l0:
+ position, tokenIndex = position0, tokenIndex0
+ return false
+ },
+ /* 1 condition <- <(tag ' '* ((le ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number))) / (ge ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number))) / ((&('=') (equal ' '* ((&('\'') value) | (&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('>') (g ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('<') (l ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('C' | 'c') (contains ' '* value)))))> */
+ func() bool {
+ position16, tokenIndex16 := position, tokenIndex
+ {
+ position17 := position
+ {
+ position18 := position
+ {
+ position19 := position
+ {
+ position22, tokenIndex22 := position, tokenIndex
+ {
+ switch buffer[position] {
+ case '<':
+ if buffer[position] != rune('<') {
+ goto l22
+ }
+ position++
+ break
+ case '>':
+ if buffer[position] != rune('>') {
+ goto l22
+ }
+ position++
+ break
+ case '=':
+ if buffer[position] != rune('=') {
+ goto l22
+ }
+ position++
+ break
+ case '\'':
+ if buffer[position] != rune('\'') {
+ goto l22
+ }
+ position++
+ break
+ case '"':
+ if buffer[position] != rune('"') {
+ goto l22
+ }
+ position++
+ break
+ case ')':
+ if buffer[position] != rune(')') {
+ goto l22
+ }
+ position++
+ break
+ case '(':
+ if buffer[position] != rune('(') {
+ goto l22
+ }
+ position++
+ break
+ case '\\':
+ if buffer[position] != rune('\\') {
+ goto l22
+ }
+ position++
+ break
+ case '\r':
+ if buffer[position] != rune('\r') {
+ goto l22
+ }
+ position++
+ break
+ case '\n':
+ if buffer[position] != rune('\n') {
+ goto l22
+ }
+ position++
+ break
+ case '\t':
+ if buffer[position] != rune('\t') {
+ goto l22
+ }
+ position++
+ break
+ default:
+ if buffer[position] != rune(' ') {
+ goto l22
+ }
+ position++
+ break
+ }
+ }
+
+ goto l16
+ l22:
+ position, tokenIndex = position22, tokenIndex22
+ }
+ if !matchDot() {
+ goto l16
+ }
+ l20:
+ {
+ position21, tokenIndex21 := position, tokenIndex
+ {
+ position24, tokenIndex24 := position, tokenIndex
+ {
+ switch buffer[position] {
+ case '<':
+ if buffer[position] != rune('<') {
+ goto l24
+ }
+ position++
+ break
+ case '>':
+ if buffer[position] != rune('>') {
+ goto l24
+ }
+ position++
+ break
+ case '=':
+ if buffer[position] != rune('=') {
+ goto l24
+ }
+ position++
+ break
+ case '\'':
+ if buffer[position] != rune('\'') {
+ goto l24
+ }
+ position++
+ break
+ case '"':
+ if buffer[position] != rune('"') {
+ goto l24
+ }
+ position++
+ break
+ case ')':
+ if buffer[position] != rune(')') {
+ goto l24
+ }
+ position++
+ break
+ case '(':
+ if buffer[position] != rune('(') {
+ goto l24
+ }
+ position++
+ break
+ case '\\':
+ if buffer[position] != rune('\\') {
+ goto l24
+ }
+ position++
+ break
+ case '\r':
+ if buffer[position] != rune('\r') {
+ goto l24
+ }
+ position++
+ break
+ case '\n':
+ if buffer[position] != rune('\n') {
+ goto l24
+ }
+ position++
+ break
+ case '\t':
+ if buffer[position] != rune('\t') {
+ goto l24
+ }
+ position++
+ break
+ default:
+ if buffer[position] != rune(' ') {
+ goto l24
+ }
+ position++
+ break
+ }
+ }
+
+ goto l21
+ l24:
+ position, tokenIndex = position24, tokenIndex24
+ }
+ if !matchDot() {
+ goto l21
+ }
+ goto l20
+ l21:
+ position, tokenIndex = position21, tokenIndex21
+ }
+ add(rulePegText, position19)
+ }
+ add(ruletag, position18)
+ }
+ l26:
+ {
+ position27, tokenIndex27 := position, tokenIndex
+ if buffer[position] != rune(' ') {
+ goto l27
+ }
+ position++
+ goto l26
+ l27:
+ position, tokenIndex = position27, tokenIndex27
+ }
+ {
+ position28, tokenIndex28 := position, tokenIndex
+ {
+ position30 := position
+ if buffer[position] != rune('<') {
+ goto l29
+ }
+ position++
+ if buffer[position] != rune('=') {
+ goto l29
+ }
+ position++
+ add(rulele, position30)
+ }
+ l31:
+ {
+ position32, tokenIndex32 := position, tokenIndex
+ if buffer[position] != rune(' ') {
+ goto l32
+ }
+ position++
+ goto l31
+ l32:
+ position, tokenIndex = position32, tokenIndex32
+ }
+ {
+ switch buffer[position] {
+ case 'D', 'd':
+ if !_rules[ruledate]() {
+ goto l29
+ }
+ break
+ case 'T', 't':
+ if !_rules[ruletime]() {
+ goto l29
+ }
+ break
+ default:
+ if !_rules[rulenumber]() {
+ goto l29
+ }
+ break
+ }
+ }
+
+ goto l28
+ l29:
+ position, tokenIndex = position28, tokenIndex28
+ {
+ position35 := position
+ if buffer[position] != rune('>') {
+ goto l34
+ }
+ position++
+ if buffer[position] != rune('=') {
+ goto l34
+ }
+ position++
+ add(rulege, position35)
+ }
+ l36:
+ {
+ position37, tokenIndex37 := position, tokenIndex
+ if buffer[position] != rune(' ') {
+ goto l37
+ }
+ position++
+ goto l36
+ l37:
+ position, tokenIndex = position37, tokenIndex37
+ }
+ {
+ switch buffer[position] {
+ case 'D', 'd':
+ if !_rules[ruledate]() {
+ goto l34
+ }
+ break
+ case 'T', 't':
+ if !_rules[ruletime]() {
+ goto l34
+ }
+ break
+ default:
+ if !_rules[rulenumber]() {
+ goto l34
+ }
+ break
+ }
+ }
+
+ goto l28
+ l34:
+ position, tokenIndex = position28, tokenIndex28
+ {
+ switch buffer[position] {
+ case '=':
+ {
+ position40 := position
+ if buffer[position] != rune('=') {
+ goto l16
+ }
+ position++
+ add(ruleequal, position40)
+ }
+ l41:
+ {
+ position42, tokenIndex42 := position, tokenIndex
+ if buffer[position] != rune(' ') {
+ goto l42
+ }
+ position++
+ goto l41
+ l42:
+ position, tokenIndex = position42, tokenIndex42
+ }
+ {
+ switch buffer[position] {
+ case '\'':
+ if !_rules[rulevalue]() {
+ goto l16
+ }
+ break
+ case 'D', 'd':
+ if !_rules[ruledate]() {
+ goto l16
+ }
+ break
+ case 'T', 't':
+ if !_rules[ruletime]() {
+ goto l16
+ }
+ break
+ default:
+ if !_rules[rulenumber]() {
+ goto l16
+ }
+ break
+ }
+ }
+
+ break
+ case '>':
+ {
+ position44 := position
+ if buffer[position] != rune('>') {
+ goto l16
+ }
+ position++
+ add(ruleg, position44)
+ }
+ l45:
+ {
+ position46, tokenIndex46 := position, tokenIndex
+ if buffer[position] != rune(' ') {
+ goto l46
+ }
+ position++
+ goto l45
+ l46:
+ position, tokenIndex = position46, tokenIndex46
+ }
+ {
+ switch buffer[position] {
+ case 'D', 'd':
+ if !_rules[ruledate]() {
+ goto l16
+ }
+ break
+ case 'T', 't':
+ if !_rules[ruletime]() {
+ goto l16
+ }
+ break
+ default:
+ if !_rules[rulenumber]() {
+ goto l16
+ }
+ break
+ }
+ }
+
+ break
+ case '<':
+ {
+ position48 := position
+ if buffer[position] != rune('<') {
+ goto l16
+ }
+ position++
+ add(rulel, position48)
+ }
+ l49:
+ {
+ position50, tokenIndex50 := position, tokenIndex
+ if buffer[position] != rune(' ') {
+ goto l50
+ }
+ position++
+ goto l49
+ l50:
+ position, tokenIndex = position50, tokenIndex50
+ }
+ {
+ switch buffer[position] {
+ case 'D', 'd':
+ if !_rules[ruledate]() {
+ goto l16
+ }
+ break
+ case 'T', 't':
+ if !_rules[ruletime]() {
+ goto l16
+ }
+ break
+ default:
+ if !_rules[rulenumber]() {
+ goto l16
+ }
+ break
+ }
+ }
+
+ break
+ default:
+ {
+ position52 := position
+ {
+ position53, tokenIndex53 := position, tokenIndex
+ if buffer[position] != rune('c') {
+ goto l54
+ }
+ position++
+ goto l53
+ l54:
+ position, tokenIndex = position53, tokenIndex53
+ if buffer[position] != rune('C') {
+ goto l16
+ }
+ position++
+ }
+ l53:
+ {
+ position55, tokenIndex55 := position, tokenIndex
+ if buffer[position] != rune('o') {
+ goto l56
+ }
+ position++
+ goto l55
+ l56:
+ position, tokenIndex = position55, tokenIndex55
+ if buffer[position] != rune('O') {
+ goto l16
+ }
+ position++
+ }
+ l55:
+ {
+ position57, tokenIndex57 := position, tokenIndex
+ if buffer[position] != rune('n') {
+ goto l58
+ }
+ position++
+ goto l57
+ l58:
+ position, tokenIndex = position57, tokenIndex57
+ if buffer[position] != rune('N') {
+ goto l16
+ }
+ position++
+ }
+ l57:
+ {
+ position59, tokenIndex59 := position, tokenIndex
+ if buffer[position] != rune('t') {
+ goto l60
+ }
+ position++
+ goto l59
+ l60:
+ position, tokenIndex = position59, tokenIndex59
+ if buffer[position] != rune('T') {
+ goto l16
+ }
+ position++
+ }
+ l59:
+ {
+ position61, tokenIndex61 := position, tokenIndex
+ if buffer[position] != rune('a') {
+ goto l62
+ }
+ position++
+ goto l61
+ l62:
+ position, tokenIndex = position61, tokenIndex61
+ if buffer[position] != rune('A') {
+ goto l16
+ }
+ position++
+ }
+ l61:
+ {
+ position63, tokenIndex63 := position, tokenIndex
+ if buffer[position] != rune('i') {
+ goto l64
+ }
+ position++
+ goto l63
+ l64:
+ position, tokenIndex = position63, tokenIndex63
+ if buffer[position] != rune('I') {
+ goto l16
+ }
+ position++
+ }
+ l63:
+ {
+ position65, tokenIndex65 := position, tokenIndex
+ if buffer[position] != rune('n') {
+ goto l66
+ }
+ position++
+ goto l65
+ l66:
+ position, tokenIndex = position65, tokenIndex65
+ if buffer[position] != rune('N') {
+ goto l16
+ }
+ position++
+ }
+ l65:
+ {
+ position67, tokenIndex67 := position, tokenIndex
+ if buffer[position] != rune('s') {
+ goto l68
+ }
+ position++
+ goto l67
+ l68:
+ position, tokenIndex = position67, tokenIndex67
+ if buffer[position] != rune('S') {
+ goto l16
+ }
+ position++
+ }
+ l67:
+ add(rulecontains, position52)
+ }
+ l69:
+ {
+ position70, tokenIndex70 := position, tokenIndex
+ if buffer[position] != rune(' ') {
+ goto l70
+ }
+ position++
+ goto l69
+ l70:
+ position, tokenIndex = position70, tokenIndex70
+ }
+ if !_rules[rulevalue]() {
+ goto l16
+ }
+ break
+ }
+ }
+
+ }
+ l28:
+ add(rulecondition, position17)
+ }
+ return true
+ l16:
+ position, tokenIndex = position16, tokenIndex16
+ return false
+ },
+ /* 2 tag <- <<(!((&('<') '<') | (&('>') '>') | (&('=') '=') | (&('\'') '\'') | (&('"') '"') | (&(')') ')') | (&('(') '(') | (&('\\') '\\') | (&('\r') '\r') | (&('\n') '\n') | (&('\t') '\t') | (&(' ') ' ')) .)+>> */
+ nil,
+ /* 3 value <- <<('\'' (!('"' / '\'') .)* '\'')>> */
+ func() bool {
+ position72, tokenIndex72 := position, tokenIndex
+ {
+ position73 := position
+ {
+ position74 := position
+ if buffer[position] != rune('\'') {
+ goto l72
+ }
+ position++
+ l75:
+ {
+ position76, tokenIndex76 := position, tokenIndex
+ {
+ position77, tokenIndex77 := position, tokenIndex
+ {
+ position78, tokenIndex78 := position, tokenIndex
+ if buffer[position] != rune('"') {
+ goto l79
+ }
+ position++
+ goto l78
+ l79:
+ position, tokenIndex = position78, tokenIndex78
+ if buffer[position] != rune('\'') {
+ goto l77
+ }
+ position++
+ }
+ l78:
+ goto l76
+ l77:
+ position, tokenIndex = position77, tokenIndex77
+ }
+ if !matchDot() {
+ goto l76
+ }
+ goto l75
+ l76:
+ position, tokenIndex = position76, tokenIndex76
+ }
+ if buffer[position] != rune('\'') {
+ goto l72
+ }
+ position++
+ add(rulePegText, position74)
+ }
+ add(rulevalue, position73)
+ }
+ return true
+ l72:
+ position, tokenIndex = position72, tokenIndex72
+ return false
+ },
+ /* 4 number <- <<('0' / ([1-9] digit* ('.' digit*)?))>> */
+ func() bool {
+ position80, tokenIndex80 := position, tokenIndex
+ {
+ position81 := position
+ {
+ position82 := position
+ {
+ position83, tokenIndex83 := position, tokenIndex
+ if buffer[position] != rune('0') {
+ goto l84
+ }
+ position++
+ goto l83
+ l84:
+ position, tokenIndex = position83, tokenIndex83
+ if c := buffer[position]; c < rune('1') || c > rune('9') {
+ goto l80
+ }
+ position++
+ l85:
+ {
+ position86, tokenIndex86 := position, tokenIndex
+ if !_rules[ruledigit]() {
+ goto l86
+ }
+ goto l85
+ l86:
+ position, tokenIndex = position86, tokenIndex86
+ }
+ {
+ position87, tokenIndex87 := position, tokenIndex
+ if buffer[position] != rune('.') {
+ goto l87
+ }
+ position++
+ l89:
+ {
+ position90, tokenIndex90 := position, tokenIndex
+ if !_rules[ruledigit]() {
+ goto l90
+ }
+ goto l89
+ l90:
+ position, tokenIndex = position90, tokenIndex90
+ }
+ goto l88
+ l87:
+ position, tokenIndex = position87, tokenIndex87
+ }
+ l88:
+ }
+ l83:
+ add(rulePegText, position82)
+ }
+ add(rulenumber, position81)
+ }
+ return true
+ l80:
+ position, tokenIndex = position80, tokenIndex80
+ return false
+ },
+ /* 5 digit <- <[0-9]> */
+ func() bool {
+ position91, tokenIndex91 := position, tokenIndex
+ {
+ position92 := position
+ if c := buffer[position]; c < rune('0') || c > rune('9') {
+ goto l91
+ }
+ position++
+ add(ruledigit, position92)
+ }
+ return true
+ l91:
+ position, tokenIndex = position91, tokenIndex91
+ return false
+ },
+ /* 6 time <- <(('t' / 'T') ('i' / 'I') ('m' / 'M') ('e' / 'E') ' ' <(year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit ((('-' / '+') digit digit ':' digit digit) / 'Z'))>)> */
+ func() bool {
+ position93, tokenIndex93 := position, tokenIndex
+ {
+ position94 := position
+ {
+ position95, tokenIndex95 := position, tokenIndex
+ if buffer[position] != rune('t') {
+ goto l96
+ }
+ position++
+ goto l95
+ l96:
+ position, tokenIndex = position95, tokenIndex95
+ if buffer[position] != rune('T') {
+ goto l93
+ }
+ position++
+ }
+ l95:
+ {
+ position97, tokenIndex97 := position, tokenIndex
+ if buffer[position] != rune('i') {
+ goto l98
+ }
+ position++
+ goto l97
+ l98:
+ position, tokenIndex = position97, tokenIndex97
+ if buffer[position] != rune('I') {
+ goto l93
+ }
+ position++
+ }
+ l97:
+ {
+ position99, tokenIndex99 := position, tokenIndex
+ if buffer[position] != rune('m') {
+ goto l100
+ }
+ position++
+ goto l99
+ l100:
+ position, tokenIndex = position99, tokenIndex99
+ if buffer[position] != rune('M') {
+ goto l93
+ }
+ position++
+ }
+ l99:
+ {
+ position101, tokenIndex101 := position, tokenIndex
+ if buffer[position] != rune('e') {
+ goto l102
+ }
+ position++
+ goto l101
+ l102:
+ position, tokenIndex = position101, tokenIndex101
+ if buffer[position] != rune('E') {
+ goto l93
+ }
+ position++
+ }
+ l101:
+ if buffer[position] != rune(' ') {
+ goto l93
+ }
+ position++
+ {
+ position103 := position
+ if !_rules[ruleyear]() {
+ goto l93
+ }
+ if buffer[position] != rune('-') {
+ goto l93
+ }
+ position++
+ if !_rules[rulemonth]() {
+ goto l93
+ }
+ if buffer[position] != rune('-') {
+ goto l93
+ }
+ position++
+ if !_rules[ruleday]() {
+ goto l93
+ }
+ if buffer[position] != rune('T') {
+ goto l93
+ }
+ position++
+ if !_rules[ruledigit]() {
+ goto l93
+ }
+ if !_rules[ruledigit]() {
+ goto l93
+ }
+ if buffer[position] != rune(':') {
+ goto l93
+ }
+ position++
+ if !_rules[ruledigit]() {
+ goto l93
+ }
+ if !_rules[ruledigit]() {
+ goto l93
+ }
+ if buffer[position] != rune(':') {
+ goto l93
+ }
+ position++
+ if !_rules[ruledigit]() {
+ goto l93
+ }
+ if !_rules[ruledigit]() {
+ goto l93
+ }
+ {
+ position104, tokenIndex104 := position, tokenIndex
+ {
+ position106, tokenIndex106 := position, tokenIndex
+ if buffer[position] != rune('-') {
+ goto l107
+ }
+ position++
+ goto l106
+ l107:
+ position, tokenIndex = position106, tokenIndex106
+ if buffer[position] != rune('+') {
+ goto l105
+ }
+ position++
+ }
+ l106:
+ if !_rules[ruledigit]() {
+ goto l105
+ }
+ if !_rules[ruledigit]() {
+ goto l105
+ }
+ if buffer[position] != rune(':') {
+ goto l105
+ }
+ position++
+ if !_rules[ruledigit]() {
+ goto l105
+ }
+ if !_rules[ruledigit]() {
+ goto l105
+ }
+ goto l104
+ l105:
+ position, tokenIndex = position104, tokenIndex104
+ if buffer[position] != rune('Z') {
+ goto l93
+ }
+ position++
+ }
+ l104:
+ add(rulePegText, position103)
+ }
+ add(ruletime, position94)
+ }
+ return true
+ l93:
+ position, tokenIndex = position93, tokenIndex93
+ return false
+ },
+ /* 7 date <- <(('d' / 'D') ('a' / 'A') ('t' / 'T') ('e' / 'E') ' ' <(year '-' month '-' day)>)> */
+ func() bool {
+ position108, tokenIndex108 := position, tokenIndex
+ {
+ position109 := position
+ {
+ position110, tokenIndex110 := position, tokenIndex
+ if buffer[position] != rune('d') {
+ goto l111
+ }
+ position++
+ goto l110
+ l111:
+ position, tokenIndex = position110, tokenIndex110
+ if buffer[position] != rune('D') {
+ goto l108
+ }
+ position++
+ }
+ l110:
+ {
+ position112, tokenIndex112 := position, tokenIndex
+ if buffer[position] != rune('a') {
+ goto l113
+ }
+ position++
+ goto l112
+ l113:
+ position, tokenIndex = position112, tokenIndex112
+ if buffer[position] != rune('A') {
+ goto l108
+ }
+ position++
+ }
+ l112:
+ {
+ position114, tokenIndex114 := position, tokenIndex
+ if buffer[position] != rune('t') {
+ goto l115
+ }
+ position++
+ goto l114
+ l115:
+ position, tokenIndex = position114, tokenIndex114
+ if buffer[position] != rune('T') {
+ goto l108
+ }
+ position++
+ }
+ l114:
+ {
+ position116, tokenIndex116 := position, tokenIndex
+ if buffer[position] != rune('e') {
+ goto l117
+ }
+ position++
+ goto l116
+ l117:
+ position, tokenIndex = position116, tokenIndex116
+ if buffer[position] != rune('E') {
+ goto l108
+ }
+ position++
+ }
+ l116:
+ if buffer[position] != rune(' ') {
+ goto l108
+ }
+ position++
+ {
+ position118 := position
+ if !_rules[ruleyear]() {
+ goto l108
+ }
+ if buffer[position] != rune('-') {
+ goto l108
+ }
+ position++
+ if !_rules[rulemonth]() {
+ goto l108
+ }
+ if buffer[position] != rune('-') {
+ goto l108
+ }
+ position++
+ if !_rules[ruleday]() {
+ goto l108
+ }
+ add(rulePegText, position118)
+ }
+ add(ruledate, position109)
+ }
+ return true
+ l108:
+ position, tokenIndex = position108, tokenIndex108
+ return false
+ },
+ /* 8 year <- <(('1' / '2') digit digit digit)> */
+ func() bool {
+ position119, tokenIndex119 := position, tokenIndex
+ {
+ position120 := position
+ {
+ position121, tokenIndex121 := position, tokenIndex
+ if buffer[position] != rune('1') {
+ goto l122
+ }
+ position++
+ goto l121
+ l122:
+ position, tokenIndex = position121, tokenIndex121
+ if buffer[position] != rune('2') {
+ goto l119
+ }
+ position++
+ }
+ l121:
+ if !_rules[ruledigit]() {
+ goto l119
+ }
+ if !_rules[ruledigit]() {
+ goto l119
+ }
+ if !_rules[ruledigit]() {
+ goto l119
+ }
+ add(ruleyear, position120)
+ }
+ return true
+ l119:
+ position, tokenIndex = position119, tokenIndex119
+ return false
+ },
+ /* 9 month <- <(('0' / '1') digit)> */
+ func() bool {
+ position123, tokenIndex123 := position, tokenIndex
+ {
+ position124 := position
+ {
+ position125, tokenIndex125 := position, tokenIndex
+ if buffer[position] != rune('0') {
+ goto l126
+ }
+ position++
+ goto l125
+ l126:
+ position, tokenIndex = position125, tokenIndex125
+ if buffer[position] != rune('1') {
+ goto l123
+ }
+ position++
+ }
+ l125:
+ if !_rules[ruledigit]() {
+ goto l123
+ }
+ add(rulemonth, position124)
+ }
+ return true
+ l123:
+ position, tokenIndex = position123, tokenIndex123
+ return false
+ },
+ /* 10 day <- <(((&('3') '3') | (&('2') '2') | (&('1') '1') | (&('0') '0')) digit)> */
+ func() bool {
+ position127, tokenIndex127 := position, tokenIndex
+ {
+ position128 := position
+ {
+ switch buffer[position] {
+ case '3':
+ if buffer[position] != rune('3') {
+ goto l127
+ }
+ position++
+ break
+ case '2':
+ if buffer[position] != rune('2') {
+ goto l127
+ }
+ position++
+ break
+ case '1':
+ if buffer[position] != rune('1') {
+ goto l127
+ }
+ position++
+ break
+ default:
+ if buffer[position] != rune('0') {
+ goto l127
+ }
+ position++
+ break
+ }
+ }
+
+ if !_rules[ruledigit]() {
+ goto l127
+ }
+ add(ruleday, position128)
+ }
+ return true
+ l127:
+ position, tokenIndex = position127, tokenIndex127
+ return false
+ },
+ /* 11 and <- <(('a' / 'A') ('n' / 'N') ('d' / 'D'))> */
+ nil,
+ /* 12 equal <- <'='> */
+ nil,
+ /* 13 contains <- <(('c' / 'C') ('o' / 'O') ('n' / 'N') ('t' / 'T') ('a' / 'A') ('i' / 'I') ('n' / 'N') ('s' / 'S'))> */
+ nil,
+ /* 14 le <- <('<' '=')> */
+ nil,
+ /* 15 ge <- <('>' '=')> */
+ nil,
+ /* 16 l <- <'<'> */
+ nil,
+ /* 17 g <- <'>'> */
+ nil,
+ nil,
+ }
+ p.rules = _rules
+}
diff --git a/libs/pubsub/query/query_test.go b/libs/pubsub/query/query_test.go
new file mode 100644
index 000000000..f266b1214
--- /dev/null
+++ b/libs/pubsub/query/query_test.go
@@ -0,0 +1,87 @@
+package query_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/tendermint/tendermint/libs/pubsub"
+ "github.com/tendermint/tendermint/libs/pubsub/query"
+)
+
+func TestMatches(t *testing.T) {
+ const shortForm = "2006-Jan-02"
+ txDate, err := time.Parse(shortForm, "2017-Jan-01")
+ require.NoError(t, err)
+ txTime, err := time.Parse(time.RFC3339, "2018-05-03T14:45:00Z")
+ require.NoError(t, err)
+
+ testCases := []struct {
+ s string
+ tags map[string]interface{}
+ err bool
+ matches bool
+ }{
+ {"tm.events.type='NewBlock'", map[string]interface{}{"tm.events.type": "NewBlock"}, false, true},
+
+ {"tx.gas > 7", map[string]interface{}{"tx.gas": 8}, false, true},
+ {"tx.gas > 7 AND tx.gas < 9", map[string]interface{}{"tx.gas": 8}, false, true},
+ {"body.weight >= 3.5", map[string]interface{}{"body.weight": 3.5}, false, true},
+ {"account.balance < 1000.0", map[string]interface{}{"account.balance": 900}, false, true},
+ {"apples.kg <= 4", map[string]interface{}{"apples.kg": 4.0}, false, true},
+ {"body.weight >= 4.5", map[string]interface{}{"body.weight": float32(4.5)}, false, true},
+ {"oranges.kg < 4 AND watermellons.kg > 10", map[string]interface{}{"oranges.kg": 3, "watermellons.kg": 12}, false, true},
+ {"peaches.kg < 4", map[string]interface{}{"peaches.kg": 5}, false, false},
+
+ {"tx.date > DATE 2017-01-01", map[string]interface{}{"tx.date": time.Now()}, false, true},
+ {"tx.date = DATE 2017-01-01", map[string]interface{}{"tx.date": txDate}, false, true},
+ {"tx.date = DATE 2018-01-01", map[string]interface{}{"tx.date": txDate}, false, false},
+
+ {"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": time.Now()}, false, true},
+ {"tx.time = TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": txTime}, false, false},
+
+ {"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Igor,Ivan"}, false, true},
+ {"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Pavel,Ivan"}, false, false},
+ }
+
+ for _, tc := range testCases {
+ q, err := query.New(tc.s)
+ if !tc.err {
+ require.Nil(t, err)
+ }
+
+ if tc.matches {
+ assert.True(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should match %v", tc.s, tc.tags)
+ } else {
+ assert.False(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should not match %v", tc.s, tc.tags)
+ }
+ }
+}
+
+func TestMustParse(t *testing.T) {
+ assert.Panics(t, func() { query.MustParse("=") })
+ assert.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") })
+}
+
+func TestConditions(t *testing.T) {
+ txTime, err := time.Parse(time.RFC3339, "2013-05-03T14:45:00Z")
+ require.NoError(t, err)
+
+ testCases := []struct {
+ s string
+ conditions []query.Condition
+ }{
+ {s: "tm.events.type='NewBlock'", conditions: []query.Condition{query.Condition{Tag: "tm.events.type", Op: query.OpEqual, Operand: "NewBlock"}}},
+ {s: "tx.gas > 7 AND tx.gas < 9", conditions: []query.Condition{query.Condition{Tag: "tx.gas", Op: query.OpGreater, Operand: int64(7)}, query.Condition{Tag: "tx.gas", Op: query.OpLess, Operand: int64(9)}}},
+ {s: "tx.time >= TIME 2013-05-03T14:45:00Z", conditions: []query.Condition{query.Condition{Tag: "tx.time", Op: query.OpGreaterEqual, Operand: txTime}}},
+ }
+
+ for _, tc := range testCases {
+ q, err := query.New(tc.s)
+ require.Nil(t, err)
+
+ assert.Equal(t, tc.conditions, q.Conditions())
+ }
+}
diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go
index ed1a5b32d..e25cac1f5 100644
--- a/rpc/client/httpclient.go
+++ b/rpc/client/httpclient.go
@@ -11,7 +11,7 @@ import (
rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
- tmpubsub "github.com/tendermint/tmlibs/pubsub"
+ tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
)
/*
diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go
index c9bdddf1c..d3eeb4261 100644
--- a/rpc/client/localclient.go
+++ b/rpc/client/localclient.go
@@ -8,7 +8,7 @@ import (
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
- tmpubsub "github.com/tendermint/tmlibs/pubsub"
+ tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
)
/*
diff --git a/rpc/core/events.go b/rpc/core/events.go
index a46e0947c..36722fcf9 100644
--- a/rpc/core/events.go
+++ b/rpc/core/events.go
@@ -5,10 +5,10 @@ import (
"github.com/pkg/errors"
+ tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
tmtypes "github.com/tendermint/tendermint/types"
- tmquery "github.com/tendermint/tmlibs/pubsub/query"
)
// Subscribe for events via WebSocket.
@@ -46,10 +46,10 @@ import (
// https://godoc.org/github.com/tendermint/tendermint/types#pkg-constants
//
// For complete query syntax, check out
-// https://godoc.org/github.com/tendermint/tmlibs/pubsub/query.
+// https://godoc.org/github.com/tendermint/tendermint/libs/pubsub/query.
//
// ```go
-// import "github.com/tendermint/tmlibs/pubsub/query"
+// import "github.com/tendermint/tendermint/libs/pubsub/query"
// import "github.com/tendermint/tendermint/types"
//
// client := client.NewHTTP("tcp://0.0.0.0:46657", "/websocket")
diff --git a/rpc/core/tx.go b/rpc/core/tx.go
index 5fc01a86d..615136a92 100644
--- a/rpc/core/tx.go
+++ b/rpc/core/tx.go
@@ -3,11 +3,12 @@ package core
import (
"fmt"
+ cmn "github.com/tendermint/tmlibs/common"
+
+ tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types"
- cmn "github.com/tendermint/tmlibs/common"
- tmquery "github.com/tendermint/tmlibs/pubsub/query"
)
// Tx allows you to query the transaction results. `nil` could mean the
diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go
index 5fa723bb4..1eeb19ea8 100644
--- a/rpc/lib/types/types.go
+++ b/rpc/lib/types/types.go
@@ -8,7 +8,7 @@ import (
"github.com/pkg/errors"
"github.com/tendermint/go-amino"
- tmpubsub "github.com/tendermint/tmlibs/pubsub"
+ tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
)
//----------------------------------------
diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go
index e23840f14..bf7760fc8 100644
--- a/state/txindex/indexer.go
+++ b/state/txindex/indexer.go
@@ -4,7 +4,7 @@ import (
"errors"
"github.com/tendermint/tendermint/types"
- "github.com/tendermint/tmlibs/pubsub/query"
+ "github.com/tendermint/tendermint/libs/pubsub/query"
)
// TxIndexer interface defines methods to index and search transactions.
diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go
index 93e6269e8..264be1fd8 100644
--- a/state/txindex/indexer_service.go
+++ b/state/txindex/indexer_service.go
@@ -3,8 +3,9 @@ package txindex
import (
"context"
- "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
+
+ "github.com/tendermint/tendermint/types"
)
const (
diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go
index 87861f050..718a55d15 100644
--- a/state/txindex/kv/kv.go
+++ b/state/txindex/kv/kv.go
@@ -12,8 +12,8 @@ import (
"github.com/pkg/errors"
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
- "github.com/tendermint/tmlibs/pubsub/query"
+ "github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
)
diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go
index a8537219d..af35ec411 100644
--- a/state/txindex/kv/kv_test.go
+++ b/state/txindex/kv/kv_test.go
@@ -11,8 +11,8 @@ import (
abci "github.com/tendermint/abci/types"
cmn "github.com/tendermint/tmlibs/common"
db "github.com/tendermint/tmlibs/db"
- "github.com/tendermint/tmlibs/pubsub/query"
+ "github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
)
diff --git a/state/txindex/null/null.go b/state/txindex/null/null.go
index 0764faa9e..2d3961e6b 100644
--- a/state/txindex/null/null.go
+++ b/state/txindex/null/null.go
@@ -5,7 +5,7 @@ import (
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
- "github.com/tendermint/tmlibs/pubsub/query"
+ "github.com/tendermint/tendermint/libs/pubsub/query"
)
var _ txindex.TxIndexer = (*TxIndex)(nil)
diff --git a/types/event_bus.go b/types/event_bus.go
index 460a3e294..925907fd0 100644
--- a/types/event_bus.go
+++ b/types/event_bus.go
@@ -6,7 +6,7 @@ import (
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
- tmpubsub "github.com/tendermint/tmlibs/pubsub"
+ tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
)
const defaultCapacity = 1000
diff --git a/types/event_bus_test.go b/types/event_bus_test.go
index 70a537745..95d061f40 100644
--- a/types/event_bus_test.go
+++ b/types/event_bus_test.go
@@ -12,8 +12,8 @@ import (
abci "github.com/tendermint/abci/types"
cmn "github.com/tendermint/tmlibs/common"
- tmpubsub "github.com/tendermint/tmlibs/pubsub"
- tmquery "github.com/tendermint/tmlibs/pubsub/query"
+ tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
+ tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
)
func TestEventBusPublishEventTx(t *testing.T) {
diff --git a/types/events.go b/types/events.go
index 342d4bc20..2b87297cd 100644
--- a/types/events.go
+++ b/types/events.go
@@ -3,9 +3,9 @@ package types
import (
"fmt"
- "github.com/tendermint/go-amino"
- tmpubsub "github.com/tendermint/tmlibs/pubsub"
- tmquery "github.com/tendermint/tmlibs/pubsub/query"
+ amino "github.com/tendermint/go-amino"
+ tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
+ tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
)
// Reserved event types
diff --git a/types/nop_event_bus.go b/types/nop_event_bus.go
index 06b70987d..cd1eab8cd 100644
--- a/types/nop_event_bus.go
+++ b/types/nop_event_bus.go
@@ -3,7 +3,7 @@ package types
import (
"context"
- tmpubsub "github.com/tendermint/tmlibs/pubsub"
+ tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
)
type NopEventBus struct{}