Browse Source

Use an interface for tags. (#195)

* Use an interface for tags.

* rename TagSet to TagMap.

* add documentation to TagMap.
pull/1780/head
Thomas Corbière 7 years ago
committed by Anton Kaliaev
parent
commit
75345c2046
7 changed files with 59 additions and 24 deletions
  1. +1
    -1
      pubsub/example_test.go
  2. +34
    -5
      pubsub/pubsub.go
  3. +7
    -7
      pubsub/pubsub_test.go
  4. +3
    -1
      pubsub/query/empty.go
  5. +5
    -4
      pubsub/query/empty_test.go
  6. +6
    -4
      pubsub/query/query.go
  7. +3
    -2
      pubsub/query/query_test.go

+ 1
- 1
pubsub/example_test.go View File

@ -21,7 +21,7 @@ func TestExample(t *testing.T) {
ch := make(chan interface{}, 1) ch := make(chan interface{}, 1)
err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch) err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Tombstone", map[string]interface{}{"abci.account.name": "John"})
err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]interface{}{"abci.account.name": "John"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Tombstone", ch) assertReceive(t, "Tombstone", ch)
} }

+ 34
- 5
pubsub/pubsub.go View File

@ -38,18 +38,30 @@ var (
ErrAlreadySubscribed = errors.New("already subscribed") ErrAlreadySubscribed = errors.New("already subscribed")
) )
// TagMap is used to associate tags to a message.
// They can be queried by subscribers to choose messages they will received.
type TagMap interface {
// Get returns the value for a key, or nil if no value is present.
// The ok result indicates whether value was found in the tags.
Get(key string) (value interface{}, ok bool)
// Len returns the number of tags.
Len() int
}
type tagMap map[string]interface{}
type cmd struct { type cmd struct {
op operation op operation
query Query query Query
ch chan<- interface{} ch chan<- interface{}
clientID string clientID string
msg interface{} msg interface{}
tags map[string]interface{}
tags TagMap
} }
// Query defines an interface for a query to be used for subscribing. // Query defines an interface for a query to be used for subscribing.
type Query interface { type Query interface {
Matches(tags map[string]interface{}) bool
Matches(tags TagMap) bool
String() string String() string
} }
@ -68,6 +80,23 @@ type Server struct {
// Option sets a parameter for the server. // Option sets a parameter for the server.
type Option func(*Server) type Option func(*Server)
// NewTagMap constructs a new immutable tag set from a map.
func NewTagMap(data map[string]interface{}) TagMap {
return tagMap(data)
}
// Get returns the value for a key, or nil if no value is present.
// The ok result indicates whether value was found in the tags.
func (ts tagMap) Get(key string) (value interface{}, ok bool) {
value, ok = ts[key]
return
}
// Len returns the number of tags.
func (ts tagMap) Len() int {
return len(ts)
}
// NewServer returns a new server. See the commentary on the Option functions // NewServer returns a new server. See the commentary on the Option functions
// for a detailed description of how to configure buffering. If no options are // for a detailed description of how to configure buffering. If no options are
// provided, the resulting server's queue is unbuffered. // provided, the resulting server's queue is unbuffered.
@ -184,13 +213,13 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
// Publish publishes the given message. An error will be returned to the caller // Publish publishes the given message. An error will be returned to the caller
// if the context is canceled. // if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error { func (s *Server) Publish(ctx context.Context, msg interface{}) error {
return s.PublishWithTags(ctx, msg, make(map[string]interface{}))
return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]interface{})))
} }
// PublishWithTags publishes the given message with the set of tags. The set is // PublishWithTags publishes the given message with the set of tags. The set is
// matched with clients queries. If there is a match, the message is sent to // matched with clients queries. If there is a match, the message is sent to
// the client. // the client.
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error {
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error {
select { select {
case s.cmds <- cmd{op: pub, msg: msg, tags: tags}: case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
return nil return nil
@ -302,7 +331,7 @@ func (state *state) removeAll(clientID string) {
delete(state.clients, clientID) delete(state.clients, clientID)
} }
func (state *state) send(msg interface{}, tags map[string]interface{}) {
func (state *state) send(msg interface{}, tags TagMap) {
for q, clientToChannelMap := range state.queries { for q, clientToChannelMap := range state.queries {
if q.Matches(tags) { if q.Matches(tags) {
for _, ch := range clientToChannelMap { for _, ch := range clientToChannelMap {


+ 7
- 7
pubsub/pubsub_test.go View File

@ -48,14 +48,14 @@ func TestDifferentClients(t *testing.T) {
ch1 := make(chan interface{}, 1) ch1 := make(chan interface{}, 1)
err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1) err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Iceman", map[string]interface{}{"tm.events.type": "NewBlock"})
err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Iceman", ch1) assertReceive(t, "Iceman", ch1)
ch2 := make(chan interface{}, 1) ch2 := make(chan interface{}, 1)
err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2) err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})
err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Ultimo", ch1) assertReceive(t, "Ultimo", ch1)
assertReceive(t, "Ultimo", ch2) assertReceive(t, "Ultimo", ch2)
@ -63,7 +63,7 @@ func TestDifferentClients(t *testing.T) {
ch3 := make(chan interface{}, 1) ch3 := make(chan interface{}, 1)
err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3) err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"})
err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewRoundStep"}))
require.NoError(t, err) require.NoError(t, err)
assert.Zero(t, len(ch3)) assert.Zero(t, len(ch3))
} }
@ -80,7 +80,7 @@ func TestClientSubscribesTwice(t *testing.T) {
ch1 := make(chan interface{}, 1) ch1 := make(chan interface{}, 1)
err := s.Subscribe(ctx, clientID, q, ch1) err := s.Subscribe(ctx, clientID, q, ch1)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"})
err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Goblin Queen", ch1) assertReceive(t, "Goblin Queen", ch1)
@ -88,7 +88,7 @@ func TestClientSubscribesTwice(t *testing.T) {
err = s.Subscribe(ctx, clientID, q, ch2) err = s.Subscribe(ctx, clientID, q, ch2)
require.Error(t, err) require.Error(t, err)
err = s.PublishWithTags(ctx, "Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"})
err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Spider-Man", ch1) assertReceive(t, "Spider-Man", ch1)
} }
@ -208,7 +208,7 @@ func benchmarkNClients(n int, b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i}))
} }
} }
@ -231,7 +231,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}))
} }
} }


+ 3
- 1
pubsub/query/empty.go View File

@ -1,11 +1,13 @@
package query package query
import "github.com/tendermint/tmlibs/pubsub"
// Empty query matches any set of tags. // Empty query matches any set of tags.
type Empty struct { type Empty struct {
} }
// Matches always returns true. // Matches always returns true.
func (Empty) Matches(tags map[string]interface{}) bool {
func (Empty) Matches(tags pubsub.TagMap) bool {
return true return true
} }


+ 5
- 4
pubsub/query/empty_test.go View File

@ -4,13 +4,14 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tendermint/tmlibs/pubsub"
"github.com/tendermint/tmlibs/pubsub/query" "github.com/tendermint/tmlibs/pubsub/query"
) )
func TestEmptyQueryMatchesAnything(t *testing.T) { func TestEmptyQueryMatchesAnything(t *testing.T) {
q := query.Empty{} q := query.Empty{}
assert.True(t, q.Matches(map[string]interface{}{}))
assert.True(t, q.Matches(map[string]interface{}{"Asher": "Roth"}))
assert.True(t, q.Matches(map[string]interface{}{"Route": 66}))
assert.True(t, q.Matches(map[string]interface{}{"Route": 66, "Billy": "Blue"}))
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{})))
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Asher": "Roth"})))
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66})))
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66, "Billy": "Blue"})))
} }

+ 6
- 4
pubsub/query/query.go View File

@ -14,6 +14,8 @@ import (
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/tendermint/tmlibs/pubsub"
) )
// Query holds the query string and the query parser. // Query holds the query string and the query parser.
@ -145,8 +147,8 @@ func (q *Query) Conditions() []Condition {
// //
// For example, query "name=John" matches tags = {"name": "John"}. More // For example, query "name=John" matches tags = {"name": "John"}. More
// examples could be found in parser_test.go and query_test.go. // examples could be found in parser_test.go and query_test.go.
func (q *Query) Matches(tags map[string]interface{}) bool {
if len(tags) == 0 {
func (q *Query) Matches(tags pubsub.TagMap) bool {
if tags.Len() == 0 {
return false return false
} }
@ -231,9 +233,9 @@ func (q *Query) Matches(tags map[string]interface{}) bool {
// value from it to the operand using the operator. // value from it to the operand using the operator.
// //
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
func match(tag string, op Operator, operand reflect.Value, tags map[string]interface{}) bool {
func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) bool {
// look up the tag from the query in tags // look up the tag from the query in tags
value, ok := tags[tag]
value, ok := tags.Get(tag)
if !ok { if !ok {
return false return false
} }


+ 3
- 2
pubsub/query/query_test.go View File

@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/tendermint/tmlibs/pubsub"
"github.com/tendermint/tmlibs/pubsub/query" "github.com/tendermint/tmlibs/pubsub/query"
) )
@ -51,9 +52,9 @@ func TestMatches(t *testing.T) {
} }
if tc.matches { if tc.matches {
assert.True(t, q.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags)
assert.True(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should match %v", tc.s, tc.tags)
} else { } else {
assert.False(t, q.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags)
assert.False(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should not match %v", tc.s, tc.tags)
} }
} }
} }


Loading…
Cancel
Save