[pubsub] Prioritise internal subscribers (e.g. reactor) over external (e.g. RPC)pull/1630/merge
@ -0,0 +1,9 @@ | |||||
.PHONY: docs | |||||
REPO:=github.com/tendermint/tendermint/libs/events | |||||
docs: | |||||
@go get github.com/davecheney/godoc2md | |||||
godoc2md $(REPO) > README.md | |||||
test: | |||||
go test -v ./... |
@ -0,0 +1,175 @@ | |||||
# events | |||||
`import "github.com/tendermint/tendermint/libs/events"` | |||||
* [Overview](#pkg-overview) | |||||
* [Index](#pkg-index) | |||||
## <a name="pkg-overview">Overview</a> | |||||
Pub-Sub in go with event caching | |||||
## <a name="pkg-index">Index</a> | |||||
* [type EventCache](#EventCache) | |||||
* [func NewEventCache(evsw Fireable) *EventCache](#NewEventCache) | |||||
* [func (evc *EventCache) FireEvent(event string, data EventData)](#EventCache.FireEvent) | |||||
* [func (evc *EventCache) Flush()](#EventCache.Flush) | |||||
* [type EventCallback](#EventCallback) | |||||
* [type EventData](#EventData) | |||||
* [type EventSwitch](#EventSwitch) | |||||
* [func NewEventSwitch() EventSwitch](#NewEventSwitch) | |||||
* [type Eventable](#Eventable) | |||||
* [type Fireable](#Fireable) | |||||
#### <a name="pkg-files">Package files</a> | |||||
[event_cache.go](/src/github.com/tendermint/tendermint/libs/events/event_cache.go) [events.go](/src/github.com/tendermint/tendermint/libs/events/events.go) | |||||
## <a name="EventCache">type</a> [EventCache](/src/target/event_cache.go?s=116:179#L5) | |||||
``` go | |||||
type EventCache struct { | |||||
// contains filtered or unexported fields | |||||
} | |||||
``` | |||||
An EventCache buffers events for a Fireable | |||||
All events are cached. Filtering happens on Flush | |||||
### <a name="NewEventCache">func</a> [NewEventCache](/src/target/event_cache.go?s=239:284#L11) | |||||
``` go | |||||
func NewEventCache(evsw Fireable) *EventCache | |||||
``` | |||||
Create a new EventCache with an EventSwitch as backend | |||||
### <a name="EventCache.FireEvent">func</a> (\*EventCache) [FireEvent](/src/target/event_cache.go?s=449:511#L24) | |||||
``` go | |||||
func (evc *EventCache) FireEvent(event string, data EventData) | |||||
``` | |||||
Cache an event to be fired upon finality. | |||||
### <a name="EventCache.Flush">func</a> (\*EventCache) [Flush](/src/target/event_cache.go?s=735:765#L31) | |||||
``` go | |||||
func (evc *EventCache) Flush() | |||||
``` | |||||
Fire events by running evsw.FireEvent on all cached events. Blocks. | |||||
Clears cached events | |||||
## <a name="EventCallback">type</a> [EventCallback](/src/target/events.go?s=4201:4240#L185) | |||||
``` go | |||||
type EventCallback func(data EventData) | |||||
``` | |||||
## <a name="EventData">type</a> [EventData](/src/target/events.go?s=243:294#L14) | |||||
``` go | |||||
type EventData interface { | |||||
} | |||||
``` | |||||
Generic event data can be typed and registered with tendermint/go-amino | |||||
via concrete implementation of this interface | |||||
## <a name="EventSwitch">type</a> [EventSwitch](/src/target/events.go?s=560:771#L29) | |||||
``` go | |||||
type EventSwitch interface { | |||||
cmn.Service | |||||
Fireable | |||||
AddListenerForEvent(listenerID, event string, cb EventCallback) | |||||
RemoveListenerForEvent(event string, listenerID string) | |||||
RemoveListener(listenerID string) | |||||
} | |||||
``` | |||||
### <a name="NewEventSwitch">func</a> [NewEventSwitch](/src/target/events.go?s=917:950#L46) | |||||
``` go | |||||
func NewEventSwitch() EventSwitch | |||||
``` | |||||
## <a name="Eventable">type</a> [Eventable](/src/target/events.go?s=378:440#L20) | |||||
``` go | |||||
type Eventable interface { | |||||
SetEventSwitch(evsw EventSwitch) | |||||
} | |||||
``` | |||||
reactors and other modules should export | |||||
this interface to become eventable | |||||
## <a name="Fireable">type</a> [Fireable](/src/target/events.go?s=490:558#L25) | |||||
``` go | |||||
type Fireable interface { | |||||
FireEvent(event string, data EventData) | |||||
} | |||||
``` | |||||
an event switch or cache implements fireable | |||||
- - - | |||||
Generated by [godoc2md](http://godoc.org/github.com/davecheney/godoc2md) |
@ -0,0 +1,37 @@ | |||||
package events | |||||
// An EventCache buffers events for a Fireable | |||||
// All events are cached. Filtering happens on Flush | |||||
type EventCache struct { | |||||
evsw Fireable | |||||
events []eventInfo | |||||
} | |||||
// Create a new EventCache with an EventSwitch as backend | |||||
func NewEventCache(evsw Fireable) *EventCache { | |||||
return &EventCache{ | |||||
evsw: evsw, | |||||
} | |||||
} | |||||
// a cached event | |||||
type eventInfo struct { | |||||
event string | |||||
data EventData | |||||
} | |||||
// Cache an event to be fired upon finality. | |||||
func (evc *EventCache) FireEvent(event string, data EventData) { | |||||
// append to list (go will grow our backing array exponentially) | |||||
evc.events = append(evc.events, eventInfo{event, data}) | |||||
} | |||||
// Fire events by running evsw.FireEvent on all cached events. Blocks. | |||||
// Clears cached events | |||||
func (evc *EventCache) Flush() { | |||||
for _, ei := range evc.events { | |||||
evc.evsw.FireEvent(ei.event, ei.data) | |||||
} | |||||
// Clear the buffer, since we only add to it with append it's safe to just set it to nil and maybe safe an allocation | |||||
evc.events = nil | |||||
} |
@ -0,0 +1,35 @@ | |||||
package events | |||||
import ( | |||||
"testing" | |||||
"github.com/stretchr/testify/assert" | |||||
"github.com/stretchr/testify/require" | |||||
) | |||||
func TestEventCache_Flush(t *testing.T) { | |||||
evsw := NewEventSwitch() | |||||
evsw.Start() | |||||
evsw.AddListenerForEvent("nothingness", "", func(data EventData) { | |||||
// Check we are not initialising an empty buffer full of zeroed eventInfos in the EventCache | |||||
require.FailNow(t, "We should never receive a message on this switch since none are fired") | |||||
}) | |||||
evc := NewEventCache(evsw) | |||||
evc.Flush() | |||||
// Check after reset | |||||
evc.Flush() | |||||
fail := true | |||||
pass := false | |||||
evsw.AddListenerForEvent("somethingness", "something", func(data EventData) { | |||||
if fail { | |||||
require.FailNow(t, "Shouldn't see a message until flushed") | |||||
} | |||||
pass = true | |||||
}) | |||||
evc.FireEvent("something", struct{ int }{1}) | |||||
evc.FireEvent("something", struct{ int }{2}) | |||||
evc.FireEvent("something", struct{ int }{3}) | |||||
fail = false | |||||
evc.Flush() | |||||
assert.True(t, pass) | |||||
} |
@ -0,0 +1,220 @@ | |||||
/* | |||||
Pub-Sub in go with event caching | |||||
*/ | |||||
package events | |||||
import ( | |||||
"sync" | |||||
cmn "github.com/tendermint/tmlibs/common" | |||||
) | |||||
// Generic event data can be typed and registered with tendermint/go-amino | |||||
// via concrete implementation of this interface | |||||
type EventData interface { | |||||
//AssertIsEventData() | |||||
} | |||||
// reactors and other modules should export | |||||
// this interface to become eventable | |||||
type Eventable interface { | |||||
SetEventSwitch(evsw EventSwitch) | |||||
} | |||||
// an event switch or cache implements fireable | |||||
type Fireable interface { | |||||
FireEvent(event string, data EventData) | |||||
} | |||||
type EventSwitch interface { | |||||
cmn.Service | |||||
Fireable | |||||
AddListenerForEvent(listenerID, event string, cb EventCallback) | |||||
RemoveListenerForEvent(event string, listenerID string) | |||||
RemoveListener(listenerID string) | |||||
} | |||||
type eventSwitch struct { | |||||
cmn.BaseService | |||||
mtx sync.RWMutex | |||||
eventCells map[string]*eventCell | |||||
listeners map[string]*eventListener | |||||
} | |||||
func NewEventSwitch() EventSwitch { | |||||
evsw := &eventSwitch{ | |||||
eventCells: make(map[string]*eventCell), | |||||
listeners: make(map[string]*eventListener), | |||||
} | |||||
evsw.BaseService = *cmn.NewBaseService(nil, "EventSwitch", evsw) | |||||
return evsw | |||||
} | |||||
func (evsw *eventSwitch) OnStart() error { | |||||
return nil | |||||
} | |||||
func (evsw *eventSwitch) OnStop() {} | |||||
func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) { | |||||
// Get/Create eventCell and listener | |||||
evsw.mtx.Lock() | |||||
eventCell := evsw.eventCells[event] | |||||
if eventCell == nil { | |||||
eventCell = newEventCell() | |||||
evsw.eventCells[event] = eventCell | |||||
} | |||||
listener := evsw.listeners[listenerID] | |||||
if listener == nil { | |||||
listener = newEventListener(listenerID) | |||||
evsw.listeners[listenerID] = listener | |||||
} | |||||
evsw.mtx.Unlock() | |||||
// Add event and listener | |||||
eventCell.AddListener(listenerID, cb) | |||||
listener.AddEvent(event) | |||||
} | |||||
func (evsw *eventSwitch) RemoveListener(listenerID string) { | |||||
// Get and remove listener | |||||
evsw.mtx.RLock() | |||||
listener := evsw.listeners[listenerID] | |||||
evsw.mtx.RUnlock() | |||||
if listener == nil { | |||||
return | |||||
} | |||||
evsw.mtx.Lock() | |||||
delete(evsw.listeners, listenerID) | |||||
evsw.mtx.Unlock() | |||||
// Remove callback for each event. | |||||
listener.SetRemoved() | |||||
for _, event := range listener.GetEvents() { | |||||
evsw.RemoveListenerForEvent(event, listenerID) | |||||
} | |||||
} | |||||
func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) { | |||||
// Get eventCell | |||||
evsw.mtx.Lock() | |||||
eventCell := evsw.eventCells[event] | |||||
evsw.mtx.Unlock() | |||||
if eventCell == nil { | |||||
return | |||||
} | |||||
// Remove listenerID from eventCell | |||||
numListeners := eventCell.RemoveListener(listenerID) | |||||
// Maybe garbage collect eventCell. | |||||
if numListeners == 0 { | |||||
// Lock again and double check. | |||||
evsw.mtx.Lock() // OUTER LOCK | |||||
eventCell.mtx.Lock() // INNER LOCK | |||||
if len(eventCell.listeners) == 0 { | |||||
delete(evsw.eventCells, event) | |||||
} | |||||
eventCell.mtx.Unlock() // INNER LOCK | |||||
evsw.mtx.Unlock() // OUTER LOCK | |||||
} | |||||
} | |||||
func (evsw *eventSwitch) FireEvent(event string, data EventData) { | |||||
// Get the eventCell | |||||
evsw.mtx.RLock() | |||||
eventCell := evsw.eventCells[event] | |||||
evsw.mtx.RUnlock() | |||||
if eventCell == nil { | |||||
return | |||||
} | |||||
// Fire event for all listeners in eventCell | |||||
eventCell.FireEvent(data) | |||||
} | |||||
//----------------------------------------------------------------------------- | |||||
// eventCell handles keeping track of listener callbacks for a given event. | |||||
type eventCell struct { | |||||
mtx sync.RWMutex | |||||
listeners map[string]EventCallback | |||||
} | |||||
func newEventCell() *eventCell { | |||||
return &eventCell{ | |||||
listeners: make(map[string]EventCallback), | |||||
} | |||||
} | |||||
func (cell *eventCell) AddListener(listenerID string, cb EventCallback) { | |||||
cell.mtx.Lock() | |||||
cell.listeners[listenerID] = cb | |||||
cell.mtx.Unlock() | |||||
} | |||||
func (cell *eventCell) RemoveListener(listenerID string) int { | |||||
cell.mtx.Lock() | |||||
delete(cell.listeners, listenerID) | |||||
numListeners := len(cell.listeners) | |||||
cell.mtx.Unlock() | |||||
return numListeners | |||||
} | |||||
func (cell *eventCell) FireEvent(data EventData) { | |||||
cell.mtx.RLock() | |||||
for _, listener := range cell.listeners { | |||||
listener(data) | |||||
} | |||||
cell.mtx.RUnlock() | |||||
} | |||||
//----------------------------------------------------------------------------- | |||||
type EventCallback func(data EventData) | |||||
type eventListener struct { | |||||
id string | |||||
mtx sync.RWMutex | |||||
removed bool | |||||
events []string | |||||
} | |||||
func newEventListener(id string) *eventListener { | |||||
return &eventListener{ | |||||
id: id, | |||||
removed: false, | |||||
events: nil, | |||||
} | |||||
} | |||||
func (evl *eventListener) AddEvent(event string) { | |||||
evl.mtx.Lock() | |||||
defer evl.mtx.Unlock() | |||||
if evl.removed { | |||||
return | |||||
} | |||||
evl.events = append(evl.events, event) | |||||
} | |||||
func (evl *eventListener) GetEvents() []string { | |||||
evl.mtx.RLock() | |||||
defer evl.mtx.RUnlock() | |||||
events := make([]string, len(evl.events)) | |||||
copy(events, evl.events) | |||||
return events | |||||
} | |||||
func (evl *eventListener) SetRemoved() { | |||||
evl.mtx.Lock() | |||||
defer evl.mtx.Unlock() | |||||
evl.removed = true | |||||
} |
@ -0,0 +1,380 @@ | |||||
package events | |||||
import ( | |||||
"fmt" | |||||
"math/rand" | |||||
"testing" | |||||
"time" | |||||
"github.com/stretchr/testify/assert" | |||||
) | |||||
// TestAddListenerForEventFireOnce sets up an EventSwitch, subscribes a single | |||||
// listener to an event, and sends a string "data". | |||||
func TestAddListenerForEventFireOnce(t *testing.T) { | |||||
evsw := NewEventSwitch() | |||||
err := evsw.Start() | |||||
if err != nil { | |||||
t.Errorf("Failed to start EventSwitch, error: %v", err) | |||||
} | |||||
messages := make(chan EventData) | |||||
evsw.AddListenerForEvent("listener", "event", | |||||
func(data EventData) { | |||||
messages <- data | |||||
}) | |||||
go evsw.FireEvent("event", "data") | |||||
received := <-messages | |||||
if received != "data" { | |||||
t.Errorf("Message received does not match: %v", received) | |||||
} | |||||
} | |||||
// TestAddListenerForEventFireMany sets up an EventSwitch, subscribes a single | |||||
// listener to an event, and sends a thousand integers. | |||||
func TestAddListenerForEventFireMany(t *testing.T) { | |||||
evsw := NewEventSwitch() | |||||
err := evsw.Start() | |||||
if err != nil { | |||||
t.Errorf("Failed to start EventSwitch, error: %v", err) | |||||
} | |||||
doneSum := make(chan uint64) | |||||
doneSending := make(chan uint64) | |||||
numbers := make(chan uint64, 4) | |||||
// subscribe one listener for one event | |||||
evsw.AddListenerForEvent("listener", "event", | |||||
func(data EventData) { | |||||
numbers <- data.(uint64) | |||||
}) | |||||
// collect received events | |||||
go sumReceivedNumbers(numbers, doneSum) | |||||
// go fire events | |||||
go fireEvents(evsw, "event", doneSending, uint64(1)) | |||||
checkSum := <-doneSending | |||||
close(numbers) | |||||
eventSum := <-doneSum | |||||
if checkSum != eventSum { | |||||
t.Errorf("Not all messages sent were received.\n") | |||||
} | |||||
} | |||||
// TestAddListenerForDifferentEvents sets up an EventSwitch, subscribes a single | |||||
// listener to three different events and sends a thousand integers for each | |||||
// of the three events. | |||||
func TestAddListenerForDifferentEvents(t *testing.T) { | |||||
evsw := NewEventSwitch() | |||||
err := evsw.Start() | |||||
if err != nil { | |||||
t.Errorf("Failed to start EventSwitch, error: %v", err) | |||||
} | |||||
doneSum := make(chan uint64) | |||||
doneSending1 := make(chan uint64) | |||||
doneSending2 := make(chan uint64) | |||||
doneSending3 := make(chan uint64) | |||||
numbers := make(chan uint64, 4) | |||||
// subscribe one listener to three events | |||||
evsw.AddListenerForEvent("listener", "event1", | |||||
func(data EventData) { | |||||
numbers <- data.(uint64) | |||||
}) | |||||
evsw.AddListenerForEvent("listener", "event2", | |||||
func(data EventData) { | |||||
numbers <- data.(uint64) | |||||
}) | |||||
evsw.AddListenerForEvent("listener", "event3", | |||||
func(data EventData) { | |||||
numbers <- data.(uint64) | |||||
}) | |||||
// collect received events | |||||
go sumReceivedNumbers(numbers, doneSum) | |||||
// go fire events | |||||
go fireEvents(evsw, "event1", doneSending1, uint64(1)) | |||||
go fireEvents(evsw, "event2", doneSending2, uint64(1)) | |||||
go fireEvents(evsw, "event3", doneSending3, uint64(1)) | |||||
var checkSum uint64 = 0 | |||||
checkSum += <-doneSending1 | |||||
checkSum += <-doneSending2 | |||||
checkSum += <-doneSending3 | |||||
close(numbers) | |||||
eventSum := <-doneSum | |||||
if checkSum != eventSum { | |||||
t.Errorf("Not all messages sent were received.\n") | |||||
} | |||||
} | |||||
// TestAddDifferentListenerForDifferentEvents sets up an EventSwitch, | |||||
// subscribes a first listener to three events, and subscribes a second | |||||
// listener to two of those three events, and then sends a thousand integers | |||||
// for each of the three events. | |||||
func TestAddDifferentListenerForDifferentEvents(t *testing.T) { | |||||
evsw := NewEventSwitch() | |||||
err := evsw.Start() | |||||
if err != nil { | |||||
t.Errorf("Failed to start EventSwitch, error: %v", err) | |||||
} | |||||
doneSum1 := make(chan uint64) | |||||
doneSum2 := make(chan uint64) | |||||
doneSending1 := make(chan uint64) | |||||
doneSending2 := make(chan uint64) | |||||
doneSending3 := make(chan uint64) | |||||
numbers1 := make(chan uint64, 4) | |||||
numbers2 := make(chan uint64, 4) | |||||
// subscribe two listener to three events | |||||
evsw.AddListenerForEvent("listener1", "event1", | |||||
func(data EventData) { | |||||
numbers1 <- data.(uint64) | |||||
}) | |||||
evsw.AddListenerForEvent("listener1", "event2", | |||||
func(data EventData) { | |||||
numbers1 <- data.(uint64) | |||||
}) | |||||
evsw.AddListenerForEvent("listener1", "event3", | |||||
func(data EventData) { | |||||
numbers1 <- data.(uint64) | |||||
}) | |||||
evsw.AddListenerForEvent("listener2", "event2", | |||||
func(data EventData) { | |||||
numbers2 <- data.(uint64) | |||||
}) | |||||
evsw.AddListenerForEvent("listener2", "event3", | |||||
func(data EventData) { | |||||
numbers2 <- data.(uint64) | |||||
}) | |||||
// collect received events for listener1 | |||||
go sumReceivedNumbers(numbers1, doneSum1) | |||||
// collect received events for listener2 | |||||
go sumReceivedNumbers(numbers2, doneSum2) | |||||
// go fire events | |||||
go fireEvents(evsw, "event1", doneSending1, uint64(1)) | |||||
go fireEvents(evsw, "event2", doneSending2, uint64(1001)) | |||||
go fireEvents(evsw, "event3", doneSending3, uint64(2001)) | |||||
checkSumEvent1 := <-doneSending1 | |||||
checkSumEvent2 := <-doneSending2 | |||||
checkSumEvent3 := <-doneSending3 | |||||
checkSum1 := checkSumEvent1 + checkSumEvent2 + checkSumEvent3 | |||||
checkSum2 := checkSumEvent2 + checkSumEvent3 | |||||
close(numbers1) | |||||
close(numbers2) | |||||
eventSum1 := <-doneSum1 | |||||
eventSum2 := <-doneSum2 | |||||
if checkSum1 != eventSum1 || | |||||
checkSum2 != eventSum2 { | |||||
t.Errorf("Not all messages sent were received for different listeners to different events.\n") | |||||
} | |||||
} | |||||
// TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to | |||||
// two events, fires a thousand integers for the first event, then unsubscribes | |||||
// the listener and fires a thousand integers for the second event. | |||||
func TestAddAndRemoveListener(t *testing.T) { | |||||
evsw := NewEventSwitch() | |||||
err := evsw.Start() | |||||
if err != nil { | |||||
t.Errorf("Failed to start EventSwitch, error: %v", err) | |||||
} | |||||
doneSum1 := make(chan uint64) | |||||
doneSum2 := make(chan uint64) | |||||
doneSending1 := make(chan uint64) | |||||
doneSending2 := make(chan uint64) | |||||
numbers1 := make(chan uint64, 4) | |||||
numbers2 := make(chan uint64, 4) | |||||
// subscribe two listener to three events | |||||
evsw.AddListenerForEvent("listener", "event1", | |||||
func(data EventData) { | |||||
numbers1 <- data.(uint64) | |||||
}) | |||||
evsw.AddListenerForEvent("listener", "event2", | |||||
func(data EventData) { | |||||
numbers2 <- data.(uint64) | |||||
}) | |||||
// collect received events for event1 | |||||
go sumReceivedNumbers(numbers1, doneSum1) | |||||
// collect received events for event2 | |||||
go sumReceivedNumbers(numbers2, doneSum2) | |||||
// go fire events | |||||
go fireEvents(evsw, "event1", doneSending1, uint64(1)) | |||||
checkSumEvent1 := <-doneSending1 | |||||
// after sending all event1, unsubscribe for all events | |||||
evsw.RemoveListener("listener") | |||||
go fireEvents(evsw, "event2", doneSending2, uint64(1001)) | |||||
checkSumEvent2 := <-doneSending2 | |||||
close(numbers1) | |||||
close(numbers2) | |||||
eventSum1 := <-doneSum1 | |||||
eventSum2 := <-doneSum2 | |||||
if checkSumEvent1 != eventSum1 || | |||||
// correct value asserted by preceding tests, suffices to be non-zero | |||||
checkSumEvent2 == uint64(0) || | |||||
eventSum2 != uint64(0) { | |||||
t.Errorf("Not all messages sent were received or unsubscription did not register.\n") | |||||
} | |||||
} | |||||
// TestRemoveListener does basic tests on adding and removing | |||||
func TestRemoveListener(t *testing.T) { | |||||
evsw := NewEventSwitch() | |||||
err := evsw.Start() | |||||
if err != nil { | |||||
t.Errorf("Failed to start EventSwitch, error: %v", err) | |||||
} | |||||
count := 10 | |||||
sum1, sum2 := 0, 0 | |||||
// add some listeners and make sure they work | |||||
evsw.AddListenerForEvent("listener", "event1", | |||||
func(data EventData) { | |||||
sum1++ | |||||
}) | |||||
evsw.AddListenerForEvent("listener", "event2", | |||||
func(data EventData) { | |||||
sum2++ | |||||
}) | |||||
for i := 0; i < count; i++ { | |||||
evsw.FireEvent("event1", true) | |||||
evsw.FireEvent("event2", true) | |||||
} | |||||
assert.Equal(t, count, sum1) | |||||
assert.Equal(t, count, sum2) | |||||
// remove one by event and make sure it is gone | |||||
evsw.RemoveListenerForEvent("event2", "listener") | |||||
for i := 0; i < count; i++ { | |||||
evsw.FireEvent("event1", true) | |||||
evsw.FireEvent("event2", true) | |||||
} | |||||
assert.Equal(t, count*2, sum1) | |||||
assert.Equal(t, count, sum2) | |||||
// remove the listener entirely and make sure both gone | |||||
evsw.RemoveListener("listener") | |||||
for i := 0; i < count; i++ { | |||||
evsw.FireEvent("event1", true) | |||||
evsw.FireEvent("event2", true) | |||||
} | |||||
assert.Equal(t, count*2, sum1) | |||||
assert.Equal(t, count, sum2) | |||||
} | |||||
// TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two | |||||
// listeners to three events, and fires a thousand integers for each event. | |||||
// These two listeners serve as the baseline validation while other listeners | |||||
// are randomly subscribed and unsubscribed. | |||||
// More precisely it randomly subscribes new listeners (different from the first | |||||
// two listeners) to one of these three events. At the same time it starts | |||||
// randomly unsubscribing these additional listeners from all events they are | |||||
// at that point subscribed to. | |||||
// NOTE: it is important to run this test with race conditions tracking on, | |||||
// `go test -race`, to examine for possible race conditions. | |||||
func TestRemoveListenersAsync(t *testing.T) { | |||||
evsw := NewEventSwitch() | |||||
err := evsw.Start() | |||||
if err != nil { | |||||
t.Errorf("Failed to start EventSwitch, error: %v", err) | |||||
} | |||||
doneSum1 := make(chan uint64) | |||||
doneSum2 := make(chan uint64) | |||||
doneSending1 := make(chan uint64) | |||||
doneSending2 := make(chan uint64) | |||||
doneSending3 := make(chan uint64) | |||||
numbers1 := make(chan uint64, 4) | |||||
numbers2 := make(chan uint64, 4) | |||||
// subscribe two listener to three events | |||||
evsw.AddListenerForEvent("listener1", "event1", | |||||
func(data EventData) { | |||||
numbers1 <- data.(uint64) | |||||
}) | |||||
evsw.AddListenerForEvent("listener1", "event2", | |||||
func(data EventData) { | |||||
numbers1 <- data.(uint64) | |||||
}) | |||||
evsw.AddListenerForEvent("listener1", "event3", | |||||
func(data EventData) { | |||||
numbers1 <- data.(uint64) | |||||
}) | |||||
evsw.AddListenerForEvent("listener2", "event1", | |||||
func(data EventData) { | |||||
numbers2 <- data.(uint64) | |||||
}) | |||||
evsw.AddListenerForEvent("listener2", "event2", | |||||
func(data EventData) { | |||||
numbers2 <- data.(uint64) | |||||
}) | |||||
evsw.AddListenerForEvent("listener2", "event3", | |||||
func(data EventData) { | |||||
numbers2 <- data.(uint64) | |||||
}) | |||||
// collect received events for event1 | |||||
go sumReceivedNumbers(numbers1, doneSum1) | |||||
// collect received events for event2 | |||||
go sumReceivedNumbers(numbers2, doneSum2) | |||||
addListenersStress := func() { | |||||
s1 := rand.NewSource(time.Now().UnixNano()) | |||||
r1 := rand.New(s1) | |||||
for k := uint16(0); k < 400; k++ { | |||||
listenerNumber := r1.Intn(100) + 3 | |||||
eventNumber := r1.Intn(3) + 1 | |||||
go evsw.AddListenerForEvent(fmt.Sprintf("listener%v", listenerNumber), | |||||
fmt.Sprintf("event%v", eventNumber), | |||||
func(_ EventData) {}) | |||||
} | |||||
} | |||||
removeListenersStress := func() { | |||||
s2 := rand.NewSource(time.Now().UnixNano()) | |||||
r2 := rand.New(s2) | |||||
for k := uint16(0); k < 80; k++ { | |||||
listenerNumber := r2.Intn(100) + 3 | |||||
go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber)) | |||||
} | |||||
} | |||||
addListenersStress() | |||||
// go fire events | |||||
go fireEvents(evsw, "event1", doneSending1, uint64(1)) | |||||
removeListenersStress() | |||||
go fireEvents(evsw, "event2", doneSending2, uint64(1001)) | |||||
go fireEvents(evsw, "event3", doneSending3, uint64(2001)) | |||||
checkSumEvent1 := <-doneSending1 | |||||
checkSumEvent2 := <-doneSending2 | |||||
checkSumEvent3 := <-doneSending3 | |||||
checkSum := checkSumEvent1 + checkSumEvent2 + checkSumEvent3 | |||||
close(numbers1) | |||||
close(numbers2) | |||||
eventSum1 := <-doneSum1 | |||||
eventSum2 := <-doneSum2 | |||||
if checkSum != eventSum1 || | |||||
checkSum != eventSum2 { | |||||
t.Errorf("Not all messages sent were received.\n") | |||||
} | |||||
} | |||||
//------------------------------------------------------------------------------ | |||||
// Helper functions | |||||
// sumReceivedNumbers takes two channels and adds all numbers received | |||||
// until the receiving channel `numbers` is closed; it then sends the sum | |||||
// on `doneSum` and closes that channel. Expected to be run in a go-routine. | |||||
func sumReceivedNumbers(numbers, doneSum chan uint64) { | |||||
var sum uint64 = 0 | |||||
for { | |||||
j, more := <-numbers | |||||
sum += j | |||||
if !more { | |||||
doneSum <- sum | |||||
close(doneSum) | |||||
return | |||||
} | |||||
} | |||||
} | |||||
// fireEvents takes an EventSwitch and fires a thousand integers under | |||||
// a given `event` with the integers mootonically increasing from `offset` | |||||
// to `offset` + 999. It additionally returns the addition of all integers | |||||
// sent on `doneChan` for assertion that all events have been sent, and enabling | |||||
// the test to assert all events have also been received. | |||||
func fireEvents(evsw EventSwitch, event string, doneChan chan uint64, | |||||
offset uint64) { | |||||
var sentSum uint64 = 0 | |||||
for i := offset; i <= offset+uint64(999); i++ { | |||||
sentSum += i | |||||
evsw.FireEvent(event, i) | |||||
} | |||||
doneChan <- sentSum | |||||
close(doneChan) | |||||
} |
@ -0,0 +1,28 @@ | |||||
package pubsub_test | |||||
import ( | |||||
"context" | |||||
"testing" | |||||
"github.com/stretchr/testify/require" | |||||
"github.com/tendermint/tmlibs/log" | |||||
"github.com/tendermint/tendermint/libs/pubsub" | |||||
"github.com/tendermint/tendermint/libs/pubsub/query" | |||||
) | |||||
func TestExample(t *testing.T) { | |||||
s := pubsub.NewServer() | |||||
s.SetLogger(log.TestingLogger()) | |||||
s.Start() | |||||
defer s.Stop() | |||||
ctx := context.Background() | |||||
ch := make(chan interface{}, 1) | |||||
err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch) | |||||
require.NoError(t, err) | |||||
err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]interface{}{"abci.account.name": "John"})) | |||||
require.NoError(t, err) | |||||
assertReceive(t, "Tombstone", ch) | |||||
} |
@ -0,0 +1,342 @@ | |||||
// Package pubsub implements a pub-sub model with a single publisher (Server) | |||||
// and multiple subscribers (clients). | |||||
// | |||||
// Though you can have multiple publishers by sharing a pointer to a server or | |||||
// by giving the same channel to each publisher and publishing messages from | |||||
// that channel (fan-in). | |||||
// | |||||
// Clients subscribe for messages, which could be of any type, using a query. | |||||
// When some message is published, we match it with all queries. If there is a | |||||
// match, this message will be pushed to all clients, subscribed to that query. | |||||
// See query subpackage for our implementation. | |||||
package pubsub | |||||
import ( | |||||
"context" | |||||
"errors" | |||||
"sync" | |||||
cmn "github.com/tendermint/tmlibs/common" | |||||
) | |||||
type operation int | |||||
const ( | |||||
sub operation = iota | |||||
pub | |||||
unsub | |||||
shutdown | |||||
) | |||||
var ( | |||||
// ErrSubscriptionNotFound is returned when a client tries to unsubscribe | |||||
// from not existing subscription. | |||||
ErrSubscriptionNotFound = errors.New("subscription not found") | |||||
// ErrAlreadySubscribed is returned when a client tries to subscribe twice or | |||||
// more using the same query. | |||||
ErrAlreadySubscribed = errors.New("already subscribed") | |||||
) | |||||
// TagMap is used to associate tags to a message. | |||||
// They can be queried by subscribers to choose messages they will received. | |||||
type TagMap interface { | |||||
// Get returns the value for a key, or nil if no value is present. | |||||
// The ok result indicates whether value was found in the tags. | |||||
Get(key string) (value interface{}, ok bool) | |||||
// Len returns the number of tags. | |||||
Len() int | |||||
} | |||||
type tagMap map[string]interface{} | |||||
type cmd struct { | |||||
op operation | |||||
query Query | |||||
ch chan<- interface{} | |||||
clientID string | |||||
msg interface{} | |||||
tags TagMap | |||||
} | |||||
// Query defines an interface for a query to be used for subscribing. | |||||
type Query interface { | |||||
Matches(tags TagMap) bool | |||||
String() string | |||||
} | |||||
// Server allows clients to subscribe/unsubscribe for messages, publishing | |||||
// messages with or without tags, and manages internal state. | |||||
type Server struct { | |||||
cmn.BaseService | |||||
cmds chan cmd | |||||
cmdsCap int | |||||
mtx sync.RWMutex | |||||
subscriptions map[string]map[string]Query // subscriber -> query (string) -> Query | |||||
} | |||||
// Option sets a parameter for the server. | |||||
type Option func(*Server) | |||||
// NewTagMap constructs a new immutable tag set from a map. | |||||
func NewTagMap(data map[string]interface{}) TagMap { | |||||
return tagMap(data) | |||||
} | |||||
// Get returns the value for a key, or nil if no value is present. | |||||
// The ok result indicates whether value was found in the tags. | |||||
func (ts tagMap) Get(key string) (value interface{}, ok bool) { | |||||
value, ok = ts[key] | |||||
return | |||||
} | |||||
// Len returns the number of tags. | |||||
func (ts tagMap) Len() int { | |||||
return len(ts) | |||||
} | |||||
// NewServer returns a new server. See the commentary on the Option functions | |||||
// for a detailed description of how to configure buffering. If no options are | |||||
// provided, the resulting server's queue is unbuffered. | |||||
func NewServer(options ...Option) *Server { | |||||
s := &Server{ | |||||
subscriptions: make(map[string]map[string]Query), | |||||
} | |||||
s.BaseService = *cmn.NewBaseService(nil, "PubSub", s) | |||||
for _, option := range options { | |||||
option(s) | |||||
} | |||||
// if BufferCapacity option was not set, the channel is unbuffered | |||||
s.cmds = make(chan cmd, s.cmdsCap) | |||||
return s | |||||
} | |||||
// BufferCapacity allows you to specify capacity for the internal server's | |||||
// queue. Since the server, given Y subscribers, could only process X messages, | |||||
// this option could be used to survive spikes (e.g. high amount of | |||||
// transactions during peak hours). | |||||
func BufferCapacity(cap int) Option { | |||||
return func(s *Server) { | |||||
if cap > 0 { | |||||
s.cmdsCap = cap | |||||
} | |||||
} | |||||
} | |||||
// BufferCapacity returns capacity of the internal server's queue. | |||||
func (s *Server) BufferCapacity() int { | |||||
return s.cmdsCap | |||||
} | |||||
// Subscribe creates a subscription for the given client. It accepts a channel | |||||
// on which messages matching the given query can be received. An error will be | |||||
// returned to the caller if the context is canceled or if subscription already | |||||
// exist for pair clientID and query. | |||||
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error { | |||||
s.mtx.RLock() | |||||
clientSubscriptions, ok := s.subscriptions[clientID] | |||||
if ok { | |||||
_, ok = clientSubscriptions[query.String()] | |||||
} | |||||
s.mtx.RUnlock() | |||||
if ok { | |||||
return ErrAlreadySubscribed | |||||
} | |||||
select { | |||||
case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: | |||||
s.mtx.Lock() | |||||
if _, ok = s.subscriptions[clientID]; !ok { | |||||
s.subscriptions[clientID] = make(map[string]Query) | |||||
} | |||||
s.subscriptions[clientID][query.String()] = query | |||||
s.mtx.Unlock() | |||||
return nil | |||||
case <-ctx.Done(): | |||||
return ctx.Err() | |||||
} | |||||
} | |||||
// Unsubscribe removes the subscription on the given query. An error will be | |||||
// returned to the caller if the context is canceled or if subscription does | |||||
// not exist. | |||||
func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error { | |||||
var origQuery Query | |||||
s.mtx.RLock() | |||||
clientSubscriptions, ok := s.subscriptions[clientID] | |||||
if ok { | |||||
origQuery, ok = clientSubscriptions[query.String()] | |||||
} | |||||
s.mtx.RUnlock() | |||||
if !ok { | |||||
return ErrSubscriptionNotFound | |||||
} | |||||
// original query is used here because we're using pointers as map keys | |||||
select { | |||||
case s.cmds <- cmd{op: unsub, clientID: clientID, query: origQuery}: | |||||
s.mtx.Lock() | |||||
delete(clientSubscriptions, query.String()) | |||||
s.mtx.Unlock() | |||||
return nil | |||||
case <-ctx.Done(): | |||||
return ctx.Err() | |||||
} | |||||
} | |||||
// UnsubscribeAll removes all client subscriptions. An error will be returned | |||||
// to the caller if the context is canceled or if subscription does not exist. | |||||
func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { | |||||
s.mtx.RLock() | |||||
_, ok := s.subscriptions[clientID] | |||||
s.mtx.RUnlock() | |||||
if !ok { | |||||
return ErrSubscriptionNotFound | |||||
} | |||||
select { | |||||
case s.cmds <- cmd{op: unsub, clientID: clientID}: | |||||
s.mtx.Lock() | |||||
delete(s.subscriptions, clientID) | |||||
s.mtx.Unlock() | |||||
return nil | |||||
case <-ctx.Done(): | |||||
return ctx.Err() | |||||
} | |||||
} | |||||
// Publish publishes the given message. An error will be returned to the caller | |||||
// if the context is canceled. | |||||
func (s *Server) Publish(ctx context.Context, msg interface{}) error { | |||||
return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]interface{}))) | |||||
} | |||||
// PublishWithTags publishes the given message with the set of tags. The set is | |||||
// matched with clients queries. If there is a match, the message is sent to | |||||
// the client. | |||||
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error { | |||||
select { | |||||
case s.cmds <- cmd{op: pub, msg: msg, tags: tags}: | |||||
return nil | |||||
case <-ctx.Done(): | |||||
return ctx.Err() | |||||
} | |||||
} | |||||
// OnStop implements Service.OnStop by shutting down the server. | |||||
func (s *Server) OnStop() { | |||||
s.cmds <- cmd{op: shutdown} | |||||
} | |||||
// NOTE: not goroutine safe | |||||
type state struct { | |||||
// query -> client -> ch | |||||
queries map[Query]map[string]chan<- interface{} | |||||
// client -> query -> struct{} | |||||
clients map[string]map[Query]struct{} | |||||
} | |||||
// OnStart implements Service.OnStart by starting the server. | |||||
func (s *Server) OnStart() error { | |||||
go s.loop(state{ | |||||
queries: make(map[Query]map[string]chan<- interface{}), | |||||
clients: make(map[string]map[Query]struct{}), | |||||
}) | |||||
return nil | |||||
} | |||||
// OnReset implements Service.OnReset | |||||
func (s *Server) OnReset() error { | |||||
return nil | |||||
} | |||||
func (s *Server) loop(state state) { | |||||
loop: | |||||
for cmd := range s.cmds { | |||||
switch cmd.op { | |||||
case unsub: | |||||
if cmd.query != nil { | |||||
state.remove(cmd.clientID, cmd.query) | |||||
} else { | |||||
state.removeAll(cmd.clientID) | |||||
} | |||||
case shutdown: | |||||
for clientID := range state.clients { | |||||
state.removeAll(clientID) | |||||
} | |||||
break loop | |||||
case sub: | |||||
state.add(cmd.clientID, cmd.query, cmd.ch) | |||||
case pub: | |||||
state.send(cmd.msg, cmd.tags) | |||||
} | |||||
} | |||||
} | |||||
func (state *state) add(clientID string, q Query, ch chan<- interface{}) { | |||||
// add query if needed | |||||
if _, ok := state.queries[q]; !ok { | |||||
state.queries[q] = make(map[string]chan<- interface{}) | |||||
} | |||||
// create subscription | |||||
state.queries[q][clientID] = ch | |||||
// add client if needed | |||||
if _, ok := state.clients[clientID]; !ok { | |||||
state.clients[clientID] = make(map[Query]struct{}) | |||||
} | |||||
state.clients[clientID][q] = struct{}{} | |||||
} | |||||
func (state *state) remove(clientID string, q Query) { | |||||
clientToChannelMap, ok := state.queries[q] | |||||
if !ok { | |||||
return | |||||
} | |||||
ch, ok := clientToChannelMap[clientID] | |||||
if ok { | |||||
close(ch) | |||||
delete(state.clients[clientID], q) | |||||
// if it not subscribed to anything else, remove the client | |||||
if len(state.clients[clientID]) == 0 { | |||||
delete(state.clients, clientID) | |||||
} | |||||
delete(state.queries[q], clientID) | |||||
} | |||||
} | |||||
func (state *state) removeAll(clientID string) { | |||||
queryMap, ok := state.clients[clientID] | |||||
if !ok { | |||||
return | |||||
} | |||||
for q := range queryMap { | |||||
ch := state.queries[q][clientID] | |||||
close(ch) | |||||
delete(state.queries[q], clientID) | |||||
} | |||||
delete(state.clients, clientID) | |||||
} | |||||
func (state *state) send(msg interface{}, tags TagMap) { | |||||
for q, clientToChannelMap := range state.queries { | |||||
if q.Matches(tags) { | |||||
for _, ch := range clientToChannelMap { | |||||
ch <- msg | |||||
} | |||||
} | |||||
} | |||||
} |
@ -0,0 +1,253 @@ | |||||
package pubsub_test | |||||
import ( | |||||
"context" | |||||
"fmt" | |||||
"runtime/debug" | |||||
"testing" | |||||
"time" | |||||
"github.com/stretchr/testify/assert" | |||||
"github.com/stretchr/testify/require" | |||||
"github.com/tendermint/tmlibs/log" | |||||
"github.com/tendermint/tendermint/libs/pubsub" | |||||
"github.com/tendermint/tendermint/libs/pubsub/query" | |||||
) | |||||
const ( | |||||
clientID = "test-client" | |||||
) | |||||
func TestSubscribe(t *testing.T) { | |||||
s := pubsub.NewServer() | |||||
s.SetLogger(log.TestingLogger()) | |||||
s.Start() | |||||
defer s.Stop() | |||||
ctx := context.Background() | |||||
ch := make(chan interface{}, 1) | |||||
err := s.Subscribe(ctx, clientID, query.Empty{}, ch) | |||||
require.NoError(t, err) | |||||
err = s.Publish(ctx, "Ka-Zar") | |||||
require.NoError(t, err) | |||||
assertReceive(t, "Ka-Zar", ch) | |||||
err = s.Publish(ctx, "Quicksilver") | |||||
require.NoError(t, err) | |||||
assertReceive(t, "Quicksilver", ch) | |||||
} | |||||
func TestDifferentClients(t *testing.T) { | |||||
s := pubsub.NewServer() | |||||
s.SetLogger(log.TestingLogger()) | |||||
s.Start() | |||||
defer s.Stop() | |||||
ctx := context.Background() | |||||
ch1 := make(chan interface{}, 1) | |||||
err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1) | |||||
require.NoError(t, err) | |||||
err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) | |||||
require.NoError(t, err) | |||||
assertReceive(t, "Iceman", ch1) | |||||
ch2 := make(chan interface{}, 1) | |||||
err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2) | |||||
require.NoError(t, err) | |||||
err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})) | |||||
require.NoError(t, err) | |||||
assertReceive(t, "Ultimo", ch1) | |||||
assertReceive(t, "Ultimo", ch2) | |||||
ch3 := make(chan interface{}, 1) | |||||
err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3) | |||||
require.NoError(t, err) | |||||
err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewRoundStep"})) | |||||
require.NoError(t, err) | |||||
assert.Zero(t, len(ch3)) | |||||
} | |||||
func TestClientSubscribesTwice(t *testing.T) { | |||||
s := pubsub.NewServer() | |||||
s.SetLogger(log.TestingLogger()) | |||||
s.Start() | |||||
defer s.Stop() | |||||
ctx := context.Background() | |||||
q := query.MustParse("tm.events.type='NewBlock'") | |||||
ch1 := make(chan interface{}, 1) | |||||
err := s.Subscribe(ctx, clientID, q, ch1) | |||||
require.NoError(t, err) | |||||
err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) | |||||
require.NoError(t, err) | |||||
assertReceive(t, "Goblin Queen", ch1) | |||||
ch2 := make(chan interface{}, 1) | |||||
err = s.Subscribe(ctx, clientID, q, ch2) | |||||
require.Error(t, err) | |||||
err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) | |||||
require.NoError(t, err) | |||||
assertReceive(t, "Spider-Man", ch1) | |||||
} | |||||
func TestUnsubscribe(t *testing.T) { | |||||
s := pubsub.NewServer() | |||||
s.SetLogger(log.TestingLogger()) | |||||
s.Start() | |||||
defer s.Stop() | |||||
ctx := context.Background() | |||||
ch := make(chan interface{}) | |||||
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch) | |||||
require.NoError(t, err) | |||||
err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'")) | |||||
require.NoError(t, err) | |||||
err = s.Publish(ctx, "Nick Fury") | |||||
require.NoError(t, err) | |||||
assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") | |||||
_, ok := <-ch | |||||
assert.False(t, ok) | |||||
} | |||||
func TestResubscribe(t *testing.T) { | |||||
s := pubsub.NewServer() | |||||
s.SetLogger(log.TestingLogger()) | |||||
s.Start() | |||||
defer s.Stop() | |||||
ctx := context.Background() | |||||
ch := make(chan interface{}) | |||||
err := s.Subscribe(ctx, clientID, query.Empty{}, ch) | |||||
require.NoError(t, err) | |||||
err = s.Unsubscribe(ctx, clientID, query.Empty{}) | |||||
require.NoError(t, err) | |||||
ch = make(chan interface{}) | |||||
err = s.Subscribe(ctx, clientID, query.Empty{}, ch) | |||||
require.NoError(t, err) | |||||
err = s.Publish(ctx, "Cable") | |||||
require.NoError(t, err) | |||||
assertReceive(t, "Cable", ch) | |||||
} | |||||
func TestUnsubscribeAll(t *testing.T) { | |||||
s := pubsub.NewServer() | |||||
s.SetLogger(log.TestingLogger()) | |||||
s.Start() | |||||
defer s.Stop() | |||||
ctx := context.Background() | |||||
ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) | |||||
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch1) | |||||
require.NoError(t, err) | |||||
err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"), ch2) | |||||
require.NoError(t, err) | |||||
err = s.UnsubscribeAll(ctx, clientID) | |||||
require.NoError(t, err) | |||||
err = s.Publish(ctx, "Nick Fury") | |||||
require.NoError(t, err) | |||||
assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") | |||||
assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") | |||||
_, ok := <-ch1 | |||||
assert.False(t, ok) | |||||
_, ok = <-ch2 | |||||
assert.False(t, ok) | |||||
} | |||||
func TestBufferCapacity(t *testing.T) { | |||||
s := pubsub.NewServer(pubsub.BufferCapacity(2)) | |||||
s.SetLogger(log.TestingLogger()) | |||||
assert.Equal(t, 2, s.BufferCapacity()) | |||||
ctx := context.Background() | |||||
err := s.Publish(ctx, "Nighthawk") | |||||
require.NoError(t, err) | |||||
err = s.Publish(ctx, "Sage") | |||||
require.NoError(t, err) | |||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) | |||||
defer cancel() | |||||
err = s.Publish(ctx, "Ironclad") | |||||
if assert.Error(t, err) { | |||||
assert.Equal(t, context.DeadlineExceeded, err) | |||||
} | |||||
} | |||||
func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } | |||||
func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) } | |||||
func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) } | |||||
func Benchmark10ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(10, b) } | |||||
func Benchmark100ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(100, b) } | |||||
func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) } | |||||
func benchmarkNClients(n int, b *testing.B) { | |||||
s := pubsub.NewServer() | |||||
s.Start() | |||||
defer s.Stop() | |||||
ctx := context.Background() | |||||
for i := 0; i < n; i++ { | |||||
ch := make(chan interface{}) | |||||
go func() { | |||||
for range ch { | |||||
} | |||||
}() | |||||
s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), ch) | |||||
} | |||||
b.ReportAllocs() | |||||
b.ResetTimer() | |||||
for i := 0; i < b.N; i++ { | |||||
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})) | |||||
} | |||||
} | |||||
func benchmarkNClientsOneQuery(n int, b *testing.B) { | |||||
s := pubsub.NewServer() | |||||
s.Start() | |||||
defer s.Stop() | |||||
ctx := context.Background() | |||||
q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1") | |||||
for i := 0; i < n; i++ { | |||||
ch := make(chan interface{}) | |||||
go func() { | |||||
for range ch { | |||||
} | |||||
}() | |||||
s.Subscribe(ctx, clientID, q, ch) | |||||
} | |||||
b.ReportAllocs() | |||||
b.ResetTimer() | |||||
for i := 0; i < b.N; i++ { | |||||
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})) | |||||
} | |||||
} | |||||
/////////////////////////////////////////////////////////////////////////////// | |||||
/// HELPERS | |||||
/////////////////////////////////////////////////////////////////////////////// | |||||
func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) { | |||||
select { | |||||
case actual := <-ch: | |||||
if actual != nil { | |||||
assert.Equal(t, expected, actual, msgAndArgs...) | |||||
} | |||||
case <-time.After(1 * time.Second): | |||||
t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected) | |||||
debug.PrintStack() | |||||
} | |||||
} |
@ -0,0 +1,11 @@ | |||||
gen_query_parser: | |||||
@go get github.com/pointlander/peg | |||||
peg -inline -switch query.peg | |||||
fuzzy_test: | |||||
@go get github.com/dvyukov/go-fuzz/go-fuzz | |||||
@go get github.com/dvyukov/go-fuzz/go-fuzz-build | |||||
go-fuzz-build github.com/tendermint/tendermint/libs/pubsub/query/fuzz_test | |||||
go-fuzz -bin=./fuzz_test-fuzz.zip -workdir=./fuzz_test/output | |||||
.PHONY: gen_query_parser fuzzy_test |
@ -0,0 +1,16 @@ | |||||
package query | |||||
import "github.com/tendermint/tendermint/libs/pubsub" | |||||
// Empty query matches any set of tags. | |||||
type Empty struct { | |||||
} | |||||
// Matches always returns true. | |||||
func (Empty) Matches(tags pubsub.TagMap) bool { | |||||
return true | |||||
} | |||||
func (Empty) String() string { | |||||
return "empty" | |||||
} |
@ -0,0 +1,18 @@ | |||||
package query_test | |||||
import ( | |||||
"testing" | |||||
"github.com/stretchr/testify/assert" | |||||
"github.com/tendermint/tendermint/libs/pubsub" | |||||
"github.com/tendermint/tendermint/libs/pubsub/query" | |||||
) | |||||
func TestEmptyQueryMatchesAnything(t *testing.T) { | |||||
q := query.Empty{} | |||||
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{}))) | |||||
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Asher": "Roth"}))) | |||||
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66}))) | |||||
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66, "Billy": "Blue"}))) | |||||
} |
@ -0,0 +1,30 @@ | |||||
package fuzz_test | |||||
import ( | |||||
"fmt" | |||||
"github.com/tendermint/tendermint/libs/pubsub/query" | |||||
) | |||||
func Fuzz(data []byte) int { | |||||
sdata := string(data) | |||||
q0, err := query.New(sdata) | |||||
if err != nil { | |||||
return 0 | |||||
} | |||||
sdata1 := q0.String() | |||||
q1, err := query.New(sdata1) | |||||
if err != nil { | |||||
panic(err) | |||||
} | |||||
sdata2 := q1.String() | |||||
if sdata1 != sdata2 { | |||||
fmt.Printf("q0: %q\n", sdata1) | |||||
fmt.Printf("q1: %q\n", sdata2) | |||||
panic("query changed") | |||||
} | |||||
return 1 | |||||
} |
@ -0,0 +1,92 @@ | |||||
package query_test | |||||
import ( | |||||
"testing" | |||||
"github.com/stretchr/testify/assert" | |||||
"github.com/tendermint/tendermint/libs/pubsub/query" | |||||
) | |||||
// TODO: fuzzy testing? | |||||
func TestParser(t *testing.T) { | |||||
cases := []struct { | |||||
query string | |||||
valid bool | |||||
}{ | |||||
{"tm.events.type='NewBlock'", true}, | |||||
{"tm.events.type = 'NewBlock'", true}, | |||||
{"tm.events.name = ''", true}, | |||||
{"tm.events.type='TIME'", true}, | |||||
{"tm.events.type='DATE'", true}, | |||||
{"tm.events.type='='", true}, | |||||
{"tm.events.type='TIME", false}, | |||||
{"tm.events.type=TIME'", false}, | |||||
{"tm.events.type==", false}, | |||||
{"tm.events.type=NewBlock", false}, | |||||
{">==", false}, | |||||
{"tm.events.type 'NewBlock' =", false}, | |||||
{"tm.events.type>'NewBlock'", false}, | |||||
{"", false}, | |||||
{"=", false}, | |||||
{"='NewBlock'", false}, | |||||
{"tm.events.type=", false}, | |||||
{"tm.events.typeNewBlock", false}, | |||||
{"tm.events.type'NewBlock'", false}, | |||||
{"'NewBlock'", false}, | |||||
{"NewBlock", false}, | |||||
{"", false}, | |||||
{"tm.events.type='NewBlock' AND abci.account.name='Igor'", true}, | |||||
{"tm.events.type='NewBlock' AND", false}, | |||||
{"tm.events.type='NewBlock' AN", false}, | |||||
{"tm.events.type='NewBlock' AN tm.events.type='NewBlockHeader'", false}, | |||||
{"AND tm.events.type='NewBlock' ", false}, | |||||
{"abci.account.name CONTAINS 'Igor'", true}, | |||||
{"tx.date > DATE 2013-05-03", true}, | |||||
{"tx.date < DATE 2013-05-03", true}, | |||||
{"tx.date <= DATE 2013-05-03", true}, | |||||
{"tx.date >= DATE 2013-05-03", true}, | |||||
{"tx.date >= DAT 2013-05-03", false}, | |||||
{"tx.date <= DATE2013-05-03", false}, | |||||
{"tx.date <= DATE -05-03", false}, | |||||
{"tx.date >= DATE 20130503", false}, | |||||
{"tx.date >= DATE 2013+01-03", false}, | |||||
// incorrect year, month, day | |||||
{"tx.date >= DATE 0013-01-03", false}, | |||||
{"tx.date >= DATE 2013-31-03", false}, | |||||
{"tx.date >= DATE 2013-01-83", false}, | |||||
{"tx.date > TIME 2013-05-03T14:45:00+07:00", true}, | |||||
{"tx.date < TIME 2013-05-03T14:45:00-02:00", true}, | |||||
{"tx.date <= TIME 2013-05-03T14:45:00Z", true}, | |||||
{"tx.date >= TIME 2013-05-03T14:45:00Z", true}, | |||||
{"tx.date >= TIME2013-05-03T14:45:00Z", false}, | |||||
{"tx.date = IME 2013-05-03T14:45:00Z", false}, | |||||
{"tx.date = TIME 2013-05-:45:00Z", false}, | |||||
{"tx.date >= TIME 2013-05-03T14:45:00", false}, | |||||
{"tx.date >= TIME 0013-00-00T14:45:00Z", false}, | |||||
{"tx.date >= TIME 2013+05=03T14:45:00Z", false}, | |||||
{"account.balance=100", true}, | |||||
{"account.balance >= 200", true}, | |||||
{"account.balance >= -300", false}, | |||||
{"account.balance >>= 400", false}, | |||||
{"account.balance=33.22.1", false}, | |||||
{"hash='136E18F7E4C348B780CF873A0BF43922E5BAFA63'", true}, | |||||
{"hash=136E18F7E4C348B780CF873A0BF43922E5BAFA63", false}, | |||||
} | |||||
for _, c := range cases { | |||||
_, err := query.New(c.query) | |||||
if c.valid { | |||||
assert.NoErrorf(t, err, "Query was '%s'", c.query) | |||||
} else { | |||||
assert.Errorf(t, err, "Query was '%s'", c.query) | |||||
} | |||||
} | |||||
} |
@ -0,0 +1,345 @@ | |||||
// Package query provides a parser for a custom query format: | |||||
// | |||||
// abci.invoice.number=22 AND abci.invoice.owner=Ivan | |||||
// | |||||
// See query.peg for the grammar, which is a https://en.wikipedia.org/wiki/Parsing_expression_grammar. | |||||
// More: https://github.com/PhilippeSigaud/Pegged/wiki/PEG-Basics | |||||
// | |||||
// It has a support for numbers (integer and floating point), dates and times. | |||||
package query | |||||
import ( | |||||
"fmt" | |||||
"reflect" | |||||
"strconv" | |||||
"strings" | |||||
"time" | |||||
"github.com/tendermint/tendermint/libs/pubsub" | |||||
) | |||||
// Query holds the query string and the query parser. | |||||
type Query struct { | |||||
str string | |||||
parser *QueryParser | |||||
} | |||||
// Condition represents a single condition within a query and consists of tag | |||||
// (e.g. "tx.gas"), operator (e.g. "=") and operand (e.g. "7"). | |||||
type Condition struct { | |||||
Tag string | |||||
Op Operator | |||||
Operand interface{} | |||||
} | |||||
// New parses the given string and returns a query or error if the string is | |||||
// invalid. | |||||
func New(s string) (*Query, error) { | |||||
p := &QueryParser{Buffer: fmt.Sprintf(`"%s"`, s)} | |||||
p.Init() | |||||
if err := p.Parse(); err != nil { | |||||
return nil, err | |||||
} | |||||
return &Query{str: s, parser: p}, nil | |||||
} | |||||
// MustParse turns the given string into a query or panics; for tests or others | |||||
// cases where you know the string is valid. | |||||
func MustParse(s string) *Query { | |||||
q, err := New(s) | |||||
if err != nil { | |||||
panic(fmt.Sprintf("failed to parse %s: %v", s, err)) | |||||
} | |||||
return q | |||||
} | |||||
// String returns the original string. | |||||
func (q *Query) String() string { | |||||
return q.str | |||||
} | |||||
// Operator is an operator that defines some kind of relation between tag and | |||||
// operand (equality, etc.). | |||||
type Operator uint8 | |||||
const ( | |||||
// "<=" | |||||
OpLessEqual Operator = iota | |||||
// ">=" | |||||
OpGreaterEqual | |||||
// "<" | |||||
OpLess | |||||
// ">" | |||||
OpGreater | |||||
// "=" | |||||
OpEqual | |||||
// "CONTAINS"; used to check if a string contains a certain sub string. | |||||
OpContains | |||||
) | |||||
// Conditions returns a list of conditions. | |||||
func (q *Query) Conditions() []Condition { | |||||
conditions := make([]Condition, 0) | |||||
buffer, begin, end := q.parser.Buffer, 0, 0 | |||||
var tag string | |||||
var op Operator | |||||
// tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7") | |||||
for _, token := range q.parser.Tokens() { | |||||
switch token.pegRule { | |||||
case rulePegText: | |||||
begin, end = int(token.begin), int(token.end) | |||||
case ruletag: | |||||
tag = buffer[begin:end] | |||||
case rulele: | |||||
op = OpLessEqual | |||||
case rulege: | |||||
op = OpGreaterEqual | |||||
case rulel: | |||||
op = OpLess | |||||
case ruleg: | |||||
op = OpGreater | |||||
case ruleequal: | |||||
op = OpEqual | |||||
case rulecontains: | |||||
op = OpContains | |||||
case rulevalue: | |||||
// strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock") | |||||
valueWithoutSingleQuotes := buffer[begin+1 : end-1] | |||||
conditions = append(conditions, Condition{tag, op, valueWithoutSingleQuotes}) | |||||
case rulenumber: | |||||
number := buffer[begin:end] | |||||
if strings.Contains(number, ".") { // if it looks like a floating-point number | |||||
value, err := strconv.ParseFloat(number, 64) | |||||
if err != nil { | |||||
panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) | |||||
} | |||||
conditions = append(conditions, Condition{tag, op, value}) | |||||
} else { | |||||
value, err := strconv.ParseInt(number, 10, 64) | |||||
if err != nil { | |||||
panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number)) | |||||
} | |||||
conditions = append(conditions, Condition{tag, op, value}) | |||||
} | |||||
case ruletime: | |||||
value, err := time.Parse(time.RFC3339, buffer[begin:end]) | |||||
if err != nil { | |||||
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) | |||||
} | |||||
conditions = append(conditions, Condition{tag, op, value}) | |||||
case ruledate: | |||||
value, err := time.Parse("2006-01-02", buffer[begin:end]) | |||||
if err != nil { | |||||
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end])) | |||||
} | |||||
conditions = append(conditions, Condition{tag, op, value}) | |||||
} | |||||
} | |||||
return conditions | |||||
} | |||||
// Matches returns true if the query matches the given set of tags, false otherwise. | |||||
// | |||||
// For example, query "name=John" matches tags = {"name": "John"}. More | |||||
// examples could be found in parser_test.go and query_test.go. | |||||
func (q *Query) Matches(tags pubsub.TagMap) bool { | |||||
if tags.Len() == 0 { | |||||
return false | |||||
} | |||||
buffer, begin, end := q.parser.Buffer, 0, 0 | |||||
var tag string | |||||
var op Operator | |||||
// tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7") | |||||
for _, token := range q.parser.Tokens() { | |||||
switch token.pegRule { | |||||
case rulePegText: | |||||
begin, end = int(token.begin), int(token.end) | |||||
case ruletag: | |||||
tag = buffer[begin:end] | |||||
case rulele: | |||||
op = OpLessEqual | |||||
case rulege: | |||||
op = OpGreaterEqual | |||||
case rulel: | |||||
op = OpLess | |||||
case ruleg: | |||||
op = OpGreater | |||||
case ruleequal: | |||||
op = OpEqual | |||||
case rulecontains: | |||||
op = OpContains | |||||
case rulevalue: | |||||
// strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock") | |||||
valueWithoutSingleQuotes := buffer[begin+1 : end-1] | |||||
// see if the triplet (tag, operator, operand) matches any tag | |||||
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } | |||||
if !match(tag, op, reflect.ValueOf(valueWithoutSingleQuotes), tags) { | |||||
return false | |||||
} | |||||
case rulenumber: | |||||
number := buffer[begin:end] | |||||
if strings.Contains(number, ".") { // if it looks like a floating-point number | |||||
value, err := strconv.ParseFloat(number, 64) | |||||
if err != nil { | |||||
panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) | |||||
} | |||||
if !match(tag, op, reflect.ValueOf(value), tags) { | |||||
return false | |||||
} | |||||
} else { | |||||
value, err := strconv.ParseInt(number, 10, 64) | |||||
if err != nil { | |||||
panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number)) | |||||
} | |||||
if !match(tag, op, reflect.ValueOf(value), tags) { | |||||
return false | |||||
} | |||||
} | |||||
case ruletime: | |||||
value, err := time.Parse(time.RFC3339, buffer[begin:end]) | |||||
if err != nil { | |||||
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) | |||||
} | |||||
if !match(tag, op, reflect.ValueOf(value), tags) { | |||||
return false | |||||
} | |||||
case ruledate: | |||||
value, err := time.Parse("2006-01-02", buffer[begin:end]) | |||||
if err != nil { | |||||
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end])) | |||||
} | |||||
if !match(tag, op, reflect.ValueOf(value), tags) { | |||||
return false | |||||
} | |||||
} | |||||
} | |||||
return true | |||||
} | |||||
// match returns true if the given triplet (tag, operator, operand) matches any tag. | |||||
// | |||||
// First, it looks up the tag in tags and if it finds one, tries to compare the | |||||
// value from it to the operand using the operator. | |||||
// | |||||
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } | |||||
func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) bool { | |||||
// look up the tag from the query in tags | |||||
value, ok := tags.Get(tag) | |||||
if !ok { | |||||
return false | |||||
} | |||||
switch operand.Kind() { | |||||
case reflect.Struct: // time | |||||
operandAsTime := operand.Interface().(time.Time) | |||||
v, ok := value.(time.Time) | |||||
if !ok { // if value from tags is not time.Time | |||||
return false | |||||
} | |||||
switch op { | |||||
case OpLessEqual: | |||||
return v.Before(operandAsTime) || v.Equal(operandAsTime) | |||||
case OpGreaterEqual: | |||||
return v.Equal(operandAsTime) || v.After(operandAsTime) | |||||
case OpLess: | |||||
return v.Before(operandAsTime) | |||||
case OpGreater: | |||||
return v.After(operandAsTime) | |||||
case OpEqual: | |||||
return v.Equal(operandAsTime) | |||||
} | |||||
case reflect.Float64: | |||||
operandFloat64 := operand.Interface().(float64) | |||||
var v float64 | |||||
// try our best to convert value from tags to float64 | |||||
switch vt := value.(type) { | |||||
case float64: | |||||
v = vt | |||||
case float32: | |||||
v = float64(vt) | |||||
case int: | |||||
v = float64(vt) | |||||
case int8: | |||||
v = float64(vt) | |||||
case int16: | |||||
v = float64(vt) | |||||
case int32: | |||||
v = float64(vt) | |||||
case int64: | |||||
v = float64(vt) | |||||
default: // fail for all other types | |||||
panic(fmt.Sprintf("Incomparable types: %T (%v) vs float64 (%v)", value, value, operandFloat64)) | |||||
} | |||||
switch op { | |||||
case OpLessEqual: | |||||
return v <= operandFloat64 | |||||
case OpGreaterEqual: | |||||
return v >= operandFloat64 | |||||
case OpLess: | |||||
return v < operandFloat64 | |||||
case OpGreater: | |||||
return v > operandFloat64 | |||||
case OpEqual: | |||||
return v == operandFloat64 | |||||
} | |||||
case reflect.Int64: | |||||
operandInt := operand.Interface().(int64) | |||||
var v int64 | |||||
// try our best to convert value from tags to int64 | |||||
switch vt := value.(type) { | |||||
case int64: | |||||
v = vt | |||||
case int8: | |||||
v = int64(vt) | |||||
case int16: | |||||
v = int64(vt) | |||||
case int32: | |||||
v = int64(vt) | |||||
case int: | |||||
v = int64(vt) | |||||
case float64: | |||||
v = int64(vt) | |||||
case float32: | |||||
v = int64(vt) | |||||
default: // fail for all other types | |||||
panic(fmt.Sprintf("Incomparable types: %T (%v) vs int64 (%v)", value, value, operandInt)) | |||||
} | |||||
switch op { | |||||
case OpLessEqual: | |||||
return v <= operandInt | |||||
case OpGreaterEqual: | |||||
return v >= operandInt | |||||
case OpLess: | |||||
return v < operandInt | |||||
case OpGreater: | |||||
return v > operandInt | |||||
case OpEqual: | |||||
return v == operandInt | |||||
} | |||||
case reflect.String: | |||||
v, ok := value.(string) | |||||
if !ok { // if value from tags is not string | |||||
return false | |||||
} | |||||
switch op { | |||||
case OpEqual: | |||||
return v == operand.String() | |||||
case OpContains: | |||||
return strings.Contains(v, operand.String()) | |||||
} | |||||
default: | |||||
panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind())) | |||||
} | |||||
return false | |||||
} |
@ -0,0 +1,33 @@ | |||||
package query | |||||
type QueryParser Peg { | |||||
} | |||||
e <- '\"' condition ( ' '+ and ' '+ condition )* '\"' !. | |||||
condition <- tag ' '* (le ' '* (number / time / date) | |||||
/ ge ' '* (number / time / date) | |||||
/ l ' '* (number / time / date) | |||||
/ g ' '* (number / time / date) | |||||
/ equal ' '* (number / time / date / value) | |||||
/ contains ' '* value | |||||
) | |||||
tag <- < (![ \t\n\r\\()"'=><] .)+ > | |||||
value <- < '\'' (!["'] .)* '\''> | |||||
number <- < ('0' | |||||
/ [1-9] digit* ('.' digit*)?) > | |||||
digit <- [0-9] | |||||
time <- "TIME " < year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit (('-' / '+') digit digit ':' digit digit / 'Z') > | |||||
date <- "DATE " < year '-' month '-' day > | |||||
year <- ('1' / '2') digit digit digit | |||||
month <- ('0' / '1') digit | |||||
day <- ('0' / '1' / '2' / '3') digit | |||||
and <- "AND" | |||||
equal <- "=" | |||||
contains <- "CONTAINS" | |||||
le <- "<=" | |||||
ge <- ">=" | |||||
l <- "<" | |||||
g <- ">" |
@ -0,0 +1,87 @@ | |||||
package query_test | |||||
import ( | |||||
"testing" | |||||
"time" | |||||
"github.com/stretchr/testify/assert" | |||||
"github.com/stretchr/testify/require" | |||||
"github.com/tendermint/tendermint/libs/pubsub" | |||||
"github.com/tendermint/tendermint/libs/pubsub/query" | |||||
) | |||||
func TestMatches(t *testing.T) { | |||||
const shortForm = "2006-Jan-02" | |||||
txDate, err := time.Parse(shortForm, "2017-Jan-01") | |||||
require.NoError(t, err) | |||||
txTime, err := time.Parse(time.RFC3339, "2018-05-03T14:45:00Z") | |||||
require.NoError(t, err) | |||||
testCases := []struct { | |||||
s string | |||||
tags map[string]interface{} | |||||
err bool | |||||
matches bool | |||||
}{ | |||||
{"tm.events.type='NewBlock'", map[string]interface{}{"tm.events.type": "NewBlock"}, false, true}, | |||||
{"tx.gas > 7", map[string]interface{}{"tx.gas": 8}, false, true}, | |||||
{"tx.gas > 7 AND tx.gas < 9", map[string]interface{}{"tx.gas": 8}, false, true}, | |||||
{"body.weight >= 3.5", map[string]interface{}{"body.weight": 3.5}, false, true}, | |||||
{"account.balance < 1000.0", map[string]interface{}{"account.balance": 900}, false, true}, | |||||
{"apples.kg <= 4", map[string]interface{}{"apples.kg": 4.0}, false, true}, | |||||
{"body.weight >= 4.5", map[string]interface{}{"body.weight": float32(4.5)}, false, true}, | |||||
{"oranges.kg < 4 AND watermellons.kg > 10", map[string]interface{}{"oranges.kg": 3, "watermellons.kg": 12}, false, true}, | |||||
{"peaches.kg < 4", map[string]interface{}{"peaches.kg": 5}, false, false}, | |||||
{"tx.date > DATE 2017-01-01", map[string]interface{}{"tx.date": time.Now()}, false, true}, | |||||
{"tx.date = DATE 2017-01-01", map[string]interface{}{"tx.date": txDate}, false, true}, | |||||
{"tx.date = DATE 2018-01-01", map[string]interface{}{"tx.date": txDate}, false, false}, | |||||
{"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": time.Now()}, false, true}, | |||||
{"tx.time = TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": txTime}, false, false}, | |||||
{"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Igor,Ivan"}, false, true}, | |||||
{"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Pavel,Ivan"}, false, false}, | |||||
} | |||||
for _, tc := range testCases { | |||||
q, err := query.New(tc.s) | |||||
if !tc.err { | |||||
require.Nil(t, err) | |||||
} | |||||
if tc.matches { | |||||
assert.True(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should match %v", tc.s, tc.tags) | |||||
} else { | |||||
assert.False(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should not match %v", tc.s, tc.tags) | |||||
} | |||||
} | |||||
} | |||||
func TestMustParse(t *testing.T) { | |||||
assert.Panics(t, func() { query.MustParse("=") }) | |||||
assert.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") }) | |||||
} | |||||
func TestConditions(t *testing.T) { | |||||
txTime, err := time.Parse(time.RFC3339, "2013-05-03T14:45:00Z") | |||||
require.NoError(t, err) | |||||
testCases := []struct { | |||||
s string | |||||
conditions []query.Condition | |||||
}{ | |||||
{s: "tm.events.type='NewBlock'", conditions: []query.Condition{query.Condition{Tag: "tm.events.type", Op: query.OpEqual, Operand: "NewBlock"}}}, | |||||
{s: "tx.gas > 7 AND tx.gas < 9", conditions: []query.Condition{query.Condition{Tag: "tx.gas", Op: query.OpGreater, Operand: int64(7)}, query.Condition{Tag: "tx.gas", Op: query.OpLess, Operand: int64(9)}}}, | |||||
{s: "tx.time >= TIME 2013-05-03T14:45:00Z", conditions: []query.Condition{query.Condition{Tag: "tx.time", Op: query.OpGreaterEqual, Operand: txTime}}}, | |||||
} | |||||
for _, tc := range testCases { | |||||
q, err := query.New(tc.s) | |||||
require.Nil(t, err) | |||||
assert.Equal(t, tc.conditions, q.Conditions()) | |||||
} | |||||
} |