diff --git a/pubsub/example_test.go b/pubsub/example_test.go index 3eda7d32d..71f1b9cd5 100644 --- a/pubsub/example_test.go +++ b/pubsub/example_test.go @@ -21,7 +21,7 @@ func TestExample(t *testing.T) { ch := make(chan interface{}, 1) err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Tombstone", map[string]interface{}{"abci.account.name": "John"}) + err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]interface{}{"abci.account.name": "John"})) require.NoError(t, err) assertReceive(t, "Tombstone", ch) } diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 90f6e4ae6..67f264ace 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -38,18 +38,30 @@ var ( ErrAlreadySubscribed = errors.New("already subscribed") ) +// TagMap is used to associate tags to a message. +// They can be queried by subscribers to choose messages they will received. +type TagMap interface { + // Get returns the value for a key, or nil if no value is present. + // The ok result indicates whether value was found in the tags. + Get(key string) (value interface{}, ok bool) + // Len returns the number of tags. + Len() int +} + +type tagMap map[string]interface{} + type cmd struct { op operation query Query ch chan<- interface{} clientID string msg interface{} - tags map[string]interface{} + tags TagMap } // Query defines an interface for a query to be used for subscribing. type Query interface { - Matches(tags map[string]interface{}) bool + Matches(tags TagMap) bool String() string } @@ -68,6 +80,23 @@ type Server struct { // Option sets a parameter for the server. type Option func(*Server) +// NewTagMap constructs a new immutable tag set from a map. +func NewTagMap(data map[string]interface{}) TagMap { + return tagMap(data) +} + +// Get returns the value for a key, or nil if no value is present. +// The ok result indicates whether value was found in the tags. +func (ts tagMap) Get(key string) (value interface{}, ok bool) { + value, ok = ts[key] + return +} + +// Len returns the number of tags. +func (ts tagMap) Len() int { + return len(ts) +} + // NewServer returns a new server. See the commentary on the Option functions // for a detailed description of how to configure buffering. If no options are // provided, the resulting server's queue is unbuffered. @@ -184,13 +213,13 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { // Publish publishes the given message. An error will be returned to the caller // if the context is canceled. func (s *Server) Publish(ctx context.Context, msg interface{}) error { - return s.PublishWithTags(ctx, msg, make(map[string]interface{})) + return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]interface{}))) } // PublishWithTags publishes the given message with the set of tags. The set is // matched with clients queries. If there is a match, the message is sent to // the client. -func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error { +func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error { select { case s.cmds <- cmd{op: pub, msg: msg, tags: tags}: return nil @@ -302,7 +331,7 @@ func (state *state) removeAll(clientID string) { delete(state.clients, clientID) } -func (state *state) send(msg interface{}, tags map[string]interface{}) { +func (state *state) send(msg interface{}, tags TagMap) { for q, clientToChannelMap := range state.queries { if q.Matches(tags) { for _, ch := range clientToChannelMap { diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 2af7cea46..f853d163b 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -48,14 +48,14 @@ func TestDifferentClients(t *testing.T) { ch1 := make(chan interface{}, 1) err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) + err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) require.NoError(t, err) assertReceive(t, "Iceman", ch1) ch2 := make(chan interface{}, 1) err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) + err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})) require.NoError(t, err) assertReceive(t, "Ultimo", ch1) assertReceive(t, "Ultimo", ch2) @@ -63,7 +63,7 @@ func TestDifferentClients(t *testing.T) { ch3 := make(chan interface{}, 1) err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) + err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewRoundStep"})) require.NoError(t, err) assert.Zero(t, len(ch3)) } @@ -80,7 +80,7 @@ func TestClientSubscribesTwice(t *testing.T) { ch1 := make(chan interface{}, 1) err := s.Subscribe(ctx, clientID, q, ch1) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) + err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) require.NoError(t, err) assertReceive(t, "Goblin Queen", ch1) @@ -88,7 +88,7 @@ func TestClientSubscribesTwice(t *testing.T) { err = s.Subscribe(ctx, clientID, q, ch2) require.Error(t, err) - err = s.PublishWithTags(ctx, "Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) + err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) require.NoError(t, err) assertReceive(t, "Spider-Man", ch1) } @@ -208,7 +208,7 @@ func benchmarkNClients(n int, b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i}) + s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})) } } @@ -231,7 +231,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}) + s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})) } } diff --git a/pubsub/query/empty.go b/pubsub/query/empty.go index 2d60a8923..cefdace4a 100644 --- a/pubsub/query/empty.go +++ b/pubsub/query/empty.go @@ -1,11 +1,13 @@ package query +import "github.com/tendermint/tmlibs/pubsub" + // Empty query matches any set of tags. type Empty struct { } // Matches always returns true. -func (Empty) Matches(tags map[string]interface{}) bool { +func (Empty) Matches(tags pubsub.TagMap) bool { return true } diff --git a/pubsub/query/empty_test.go b/pubsub/query/empty_test.go index 663acb191..b5e8a3001 100644 --- a/pubsub/query/empty_test.go +++ b/pubsub/query/empty_test.go @@ -4,13 +4,14 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/tendermint/tmlibs/pubsub" "github.com/tendermint/tmlibs/pubsub/query" ) func TestEmptyQueryMatchesAnything(t *testing.T) { q := query.Empty{} - assert.True(t, q.Matches(map[string]interface{}{})) - assert.True(t, q.Matches(map[string]interface{}{"Asher": "Roth"})) - assert.True(t, q.Matches(map[string]interface{}{"Route": 66})) - assert.True(t, q.Matches(map[string]interface{}{"Route": 66, "Billy": "Blue"})) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{}))) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Asher": "Roth"}))) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66}))) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66, "Billy": "Blue"}))) } diff --git a/pubsub/query/query.go b/pubsub/query/query.go index 56f2829d2..84c3aa180 100644 --- a/pubsub/query/query.go +++ b/pubsub/query/query.go @@ -14,6 +14,8 @@ import ( "strconv" "strings" "time" + + "github.com/tendermint/tmlibs/pubsub" ) // Query holds the query string and the query parser. @@ -145,8 +147,8 @@ func (q *Query) Conditions() []Condition { // // For example, query "name=John" matches tags = {"name": "John"}. More // examples could be found in parser_test.go and query_test.go. -func (q *Query) Matches(tags map[string]interface{}) bool { - if len(tags) == 0 { +func (q *Query) Matches(tags pubsub.TagMap) bool { + if tags.Len() == 0 { return false } @@ -231,9 +233,9 @@ func (q *Query) Matches(tags map[string]interface{}) bool { // value from it to the operand using the operator. // // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } -func match(tag string, op Operator, operand reflect.Value, tags map[string]interface{}) bool { +func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) bool { // look up the tag from the query in tags - value, ok := tags[tag] + value, ok := tags.Get(tag) if !ok { return false } diff --git a/pubsub/query/query_test.go b/pubsub/query/query_test.go index b980a79c0..7d3ac6ba4 100644 --- a/pubsub/query/query_test.go +++ b/pubsub/query/query_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tendermint/tmlibs/pubsub" "github.com/tendermint/tmlibs/pubsub/query" ) @@ -51,9 +52,9 @@ func TestMatches(t *testing.T) { } if tc.matches { - assert.True(t, q.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags) + assert.True(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should match %v", tc.s, tc.tags) } else { - assert.False(t, q.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags) + assert.False(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should not match %v", tc.s, tc.tags) } } }