From 7268ec8d10eb86d07f80fafda2682f3ed3c5da57 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 29 Jan 2019 18:46:59 +0400 Subject: [PATCH] green pubsub tests :OK: --- libs/pubsub/example_test.go | 5 +- libs/pubsub/pubsub.go | 206 ++++++++++++++++-------------------- libs/pubsub/pubsub_test.go | 104 +++++++++--------- libs/pubsub/subscription.go | 60 +++++++++++ libs/pubsub/tag_map.go | 32 ++++++ 5 files changed, 241 insertions(+), 166 deletions(-) create mode 100644 libs/pubsub/subscription.go create mode 100644 libs/pubsub/tag_map.go diff --git a/libs/pubsub/example_test.go b/libs/pubsub/example_test.go index 4e4634de5..f7ed17c88 100644 --- a/libs/pubsub/example_test.go +++ b/libs/pubsub/example_test.go @@ -19,10 +19,9 @@ func TestExample(t *testing.T) { defer s.Stop() ctx := context.Background() - ch := make(chan interface{}, 1) - err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch) + subscription, err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), 1) require.NoError(t, err) err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]string{"abci.account.name": "John"})) require.NoError(t, err) - assertReceive(t, "Tombstone", ch) + assertReceive(t, "Tombstone", subscription.Out()) } diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index a39c8c731..f898c0e75 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -10,44 +10,28 @@ // match, this message will be pushed to all clients, subscribed to that query. // See query subpackage for our implementation. // -// Due to the blocking send implementation, a single subscriber can freeze an -// entire server by not reading messages before it unsubscribes. To avoid such -// scenario, subscribers must either: +// Example: // -// a) make sure they continue to read from the out channel until -// Unsubscribe(All) is called -// -// s.Subscribe(ctx, sub, qry, out) -// go func() { -// for msg := range out { -// // handle msg -// // will exit automatically when out is closed by Unsubscribe(All) -// } -// }() -// s.UnsubscribeAll(ctx, sub) -// -// b) drain the out channel before calling Unsubscribe(All) -// -// s.Subscribe(ctx, sub, qry, out) -// defer func() { -// // drain out to make sure we don't block -// LOOP: -// for { -// select { -// case <-out: -// default: -// break LOOP -// } -// } -// s.UnsubscribeAll(ctx, sub) -// }() -// for msg := range out { -// // handle msg -// if err != nil { -// return err -// } +// q, err := query.New("account.name='John'") +// if err != nil { +// return err +// } +// ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second) +// defer cancel() +// subscription, err := pubsub.Subscribe(ctx, "johns-transactions", q, 1) +// if err != nil { +// return err // } // +// for { +// select { +// case msgAndTags <- subscription.Out(): +// // handle msg and tags +// case <-subscription.Cancelled(): +// return subscription.Err() +// } +// } +// package pubsub import ( @@ -77,21 +61,25 @@ var ( ErrAlreadySubscribed = errors.New("already subscribed") ) -type cmd struct { - op operation - query Query - ch chan<- interface{} - clientID string - msg interface{} - tags TagMap -} - // Query defines an interface for a query to be used for subscribing. type Query interface { Matches(tags TagMap) bool String() string } +type cmd struct { + op operation + + // subscribe, unsubscribe + query Query + subscription *Subscription + clientID string + + // publish + msg interface{} + tags TagMap +} + // Server allows clients to subscribe/unsubscribe for messages, publishing // messages with or without tags, and manages internal state. type Server struct { @@ -107,37 +95,6 @@ type Server struct { // Option sets a parameter for the server. type Option func(*Server) -// TagMap is used to associate tags to a message. -// They can be queried by subscribers to choose messages they will received. -type TagMap interface { - // Get returns the value for a key, or nil if no value is present. - // The ok result indicates whether value was found in the tags. - Get(key string) (value string, ok bool) - // Len returns the number of tags. - Len() int -} - -type tagMap map[string]string - -var _ TagMap = (*tagMap)(nil) - -// NewTagMap constructs a new immutable tag set from a map. -func NewTagMap(data map[string]string) TagMap { - return tagMap(data) -} - -// Get returns the value for a key, or nil if no value is present. -// The ok result indicates whether value was found in the tags. -func (ts tagMap) Get(key string) (value string, ok bool) { - value, ok = ts[key] - return -} - -// Len returns the number of tags. -func (ts tagMap) Len() int { - return len(ts) -} - // NewServer returns a new server. See the commentary on the Option functions // for a detailed description of how to configure buffering. If no options are // provided, the resulting server's queue is unbuffered. @@ -174,11 +131,11 @@ func (s *Server) BufferCapacity() int { return s.cmdsCap } -// Subscribe creates a subscription for the given client. It accepts a channel -// on which messages matching the given query can be received. An error will be +// Subscribe creates a subscription for the given client. An error will be // returned to the caller if the context is canceled or if subscription already -// exist for pair clientID and query. -func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error { +// exist for pair clientID and query. outCapacity will be used to set a +// capacity for Subscription#Out channel. +func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, outCapacity int) (*Subscription, error) { s.mtx.RLock() clientSubscriptions, ok := s.subscriptions[clientID] if ok { @@ -186,22 +143,26 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou } s.mtx.RUnlock() if ok { - return ErrAlreadySubscribed + return nil, ErrAlreadySubscribed } + subscription := &Subscription{ + out: make(chan MsgAndTags, outCapacity), + cancelled: make(chan struct{}), + } select { - case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: + case s.cmds <- cmd{op: sub, clientID: clientID, query: query, subscription: subscription}: s.mtx.Lock() if _, ok = s.subscriptions[clientID]; !ok { s.subscriptions[clientID] = make(map[string]struct{}) } s.subscriptions[clientID][query.String()] = struct{}{} s.mtx.Unlock() - return nil + return subscription, nil case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() case <-s.Quit(): - return nil + return nil, nil } } @@ -285,8 +246,8 @@ func (s *Server) OnStop() { // NOTE: not goroutine safe type state struct { - // query string -> client -> ch - queryToChanMap map[string]map[string]chan<- interface{} + // query string -> client -> subscription + queryToSubscriptionMap map[string]map[string]*Subscription // client -> query string -> struct{} clientToQueryMap map[string]map[string]struct{} // query string -> queryPlusRefCount @@ -303,9 +264,9 @@ type queryPlusRefCount struct { // OnStart implements Service.OnStart by starting the server. func (s *Server) OnStart() error { go s.loop(state{ - queryToChanMap: make(map[string]map[string]chan<- interface{}), - clientToQueryMap: make(map[string]map[string]struct{}), - queries: make(map[string]*queryPlusRefCount), + queryToSubscriptionMap: make(map[string]map[string]*Subscription), + clientToQueryMap: make(map[string]map[string]struct{}), + queries: make(map[string]*queryPlusRefCount), }) return nil } @@ -321,33 +282,32 @@ loop: switch cmd.op { case unsub: if cmd.query != nil { - state.remove(cmd.clientID, cmd.query) + state.remove(cmd.clientID, cmd.query, ErrUnsubscribed) } else { - state.removeAll(cmd.clientID) + state.removeAll(cmd.clientID, ErrUnsubscribed) } case shutdown: for clientID := range state.clientToQueryMap { - state.removeAll(clientID) + state.removeAll(clientID, nil) } break loop case sub: - state.add(cmd.clientID, cmd.query, cmd.ch) + state.add(cmd.clientID, cmd.query, cmd.subscription) case pub: state.send(cmd.msg, cmd.tags) } } } -func (state *state) add(clientID string, q Query, ch chan<- interface{}) { +func (state *state) add(clientID string, q Query, subscription *Subscription) { qStr := q.String() - // initialize clientToChannelMap per query if needed - if _, ok := state.queryToChanMap[qStr]; !ok { - state.queryToChanMap[qStr] = make(map[string]chan<- interface{}) + // initialize clientToSubscriptionMap per query if needed + if _, ok := state.queryToSubscriptionMap[qStr]; !ok { + state.queryToSubscriptionMap[qStr] = make(map[string]*Subscription) } - // create subscription - state.queryToChanMap[qStr][clientID] = ch + state.queryToSubscriptionMap[qStr][clientID] = subscription // initialize queries if needed if _, ok := state.queries[qStr]; !ok { @@ -363,20 +323,23 @@ func (state *state) add(clientID string, q Query, ch chan<- interface{}) { state.clientToQueryMap[clientID][qStr] = struct{}{} } -func (state *state) remove(clientID string, q Query) { +func (state *state) remove(clientID string, q Query, reason error) { qStr := q.String() - clientToChannelMap, ok := state.queryToChanMap[qStr] + clientToSubscriptionMap, ok := state.queryToSubscriptionMap[qStr] if !ok { return } - ch, ok := clientToChannelMap[clientID] + subscription, ok := clientToSubscriptionMap[clientID] if !ok { return } - close(ch) + subscription.mtx.Lock() + subscription.err = reason + subscription.mtx.Unlock() + close(subscription.cancelled) // remove the query from client map. // if client is not subscribed to anything else, remove it. @@ -387,9 +350,9 @@ func (state *state) remove(clientID string, q Query) { // remove the client from query map. // if query has no other clients subscribed, remove it. - delete(state.queryToChanMap[qStr], clientID) - if len(state.queryToChanMap[qStr]) == 0 { - delete(state.queryToChanMap, qStr) + delete(state.queryToSubscriptionMap[qStr], clientID) + if len(state.queryToSubscriptionMap[qStr]) == 0 { + delete(state.queryToSubscriptionMap, qStr) } // decrease ref counter in queries @@ -400,21 +363,24 @@ func (state *state) remove(clientID string, q Query) { } } -func (state *state) removeAll(clientID string) { +func (state *state) removeAll(clientID string, reason error) { queryMap, ok := state.clientToQueryMap[clientID] if !ok { return } for qStr := range queryMap { - ch := state.queryToChanMap[qStr][clientID] - close(ch) + subscription := state.queryToSubscriptionMap[qStr][clientID] + subscription.mtx.Lock() + subscription.err = reason + subscription.mtx.Unlock() + close(subscription.cancelled) // remove the client from query map. // if query has no other clients subscribed, remove it. - delete(state.queryToChanMap[qStr], clientID) - if len(state.queryToChanMap[qStr]) == 0 { - delete(state.queryToChanMap, qStr) + delete(state.queryToSubscriptionMap[qStr], clientID) + if len(state.queryToSubscriptionMap[qStr]) == 0 { + delete(state.queryToSubscriptionMap, qStr) } // decrease ref counter in queries @@ -430,11 +396,21 @@ func (state *state) removeAll(clientID string) { } func (state *state) send(msg interface{}, tags TagMap) { - for qStr, clientToChannelMap := range state.queryToChanMap { + for qStr, clientToSubscriptionMap := range state.queryToSubscriptionMap { q := state.queries[qStr].q if q.Matches(tags) { - for _, ch := range clientToChannelMap { - ch <- msg + for clientID, subscription := range clientToSubscriptionMap { + if cap(subscription.out) == 0 { + // block on unbuffered channel + subscription.out <- MsgAndTags{msg, tags} + } else { + // don't block on buffered channels + select { + case subscription.out <- MsgAndTags{msg, tags}: + default: + state.remove(clientID, q, ErrOutOfCapacity) + } + } } } } diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go index bb660d9e0..9c6ef3b30 100644 --- a/libs/pubsub/pubsub_test.go +++ b/libs/pubsub/pubsub_test.go @@ -27,16 +27,15 @@ func TestSubscribe(t *testing.T) { defer s.Stop() ctx := context.Background() - ch := make(chan interface{}, 1) - err := s.Subscribe(ctx, clientID, query.Empty{}, ch) + subscription, err := s.Subscribe(ctx, clientID, query.Empty{}, 1) require.NoError(t, err) err = s.Publish(ctx, "Ka-Zar") require.NoError(t, err) - assertReceive(t, "Ka-Zar", ch) + assertReceive(t, "Ka-Zar", subscription.Out()) err = s.Publish(ctx, "Quicksilver") require.NoError(t, err) - assertReceive(t, "Quicksilver", ch) + assertReceive(t, "Quicksilver", subscription.Out()) } func TestDifferentClients(t *testing.T) { @@ -46,27 +45,24 @@ func TestDifferentClients(t *testing.T) { defer s.Stop() ctx := context.Background() - ch1 := make(chan interface{}, 1) - err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1) + subscription1, err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), 1) require.NoError(t, err) err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"})) require.NoError(t, err) - assertReceive(t, "Iceman", ch1) + assertReceive(t, "Iceman", subscription1.Out()) - ch2 := make(chan interface{}, 1) - err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2) + subscription2, err := s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), 1) require.NoError(t, err) err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})) require.NoError(t, err) - assertReceive(t, "Ultimo", ch1) - assertReceive(t, "Ultimo", ch2) + assertReceive(t, "Ultimo", subscription1.Out()) + assertReceive(t, "Ultimo", subscription2.Out()) - ch3 := make(chan interface{}, 1) - err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3) + subscription3, err := s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), 1) require.NoError(t, err) err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewRoundStep"})) require.NoError(t, err) - assert.Zero(t, len(ch3)) + assert.Zero(t, len(subscription3.Out())) } func TestClientSubscribesTwice(t *testing.T) { @@ -78,20 +74,19 @@ func TestClientSubscribesTwice(t *testing.T) { ctx := context.Background() q := query.MustParse("tm.events.type='NewBlock'") - ch1 := make(chan interface{}, 1) - err := s.Subscribe(ctx, clientID, q, ch1) + subscription1, err := s.Subscribe(ctx, clientID, q, 1) require.NoError(t, err) err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"})) require.NoError(t, err) - assertReceive(t, "Goblin Queen", ch1) + assertReceive(t, "Goblin Queen", subscription1.Out()) - ch2 := make(chan interface{}, 1) - err = s.Subscribe(ctx, clientID, q, ch2) + subscription2, err := s.Subscribe(ctx, clientID, q, 1) require.Error(t, err) + require.Nil(t, subscription2) err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"})) require.NoError(t, err) - assertReceive(t, "Spider-Man", ch1) + assertReceive(t, "Spider-Man", subscription1.Out()) } func TestUnsubscribe(t *testing.T) { @@ -101,18 +96,19 @@ func TestUnsubscribe(t *testing.T) { defer s.Stop() ctx := context.Background() - ch := make(chan interface{}) - err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch) + subscription, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), 1) require.NoError(t, err) err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'")) require.NoError(t, err) err = s.Publish(ctx, "Nick Fury") require.NoError(t, err) - assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") + assert.Zero(t, len(subscription.Out()), "Should not receive anything after Unsubscribe") - _, ok := <-ch + _, ok := <-subscription.Cancelled() assert.False(t, ok) + + assert.Equal(t, pubsub.ErrUnsubscribed, subscription.Err()) } func TestClientUnsubscribesTwice(t *testing.T) { @@ -122,8 +118,7 @@ func TestClientUnsubscribesTwice(t *testing.T) { defer s.Stop() ctx := context.Background() - ch := make(chan interface{}) - err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch) + _, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), 1) require.NoError(t, err) err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'")) require.NoError(t, err) @@ -141,18 +136,16 @@ func TestResubscribe(t *testing.T) { defer s.Stop() ctx := context.Background() - ch := make(chan interface{}) - err := s.Subscribe(ctx, clientID, query.Empty{}, ch) + subscription, err := s.Subscribe(ctx, clientID, query.Empty{}, 1) require.NoError(t, err) err = s.Unsubscribe(ctx, clientID, query.Empty{}) require.NoError(t, err) - ch = make(chan interface{}) - err = s.Subscribe(ctx, clientID, query.Empty{}, ch) + subscription, err = s.Subscribe(ctx, clientID, query.Empty{}, 1) require.NoError(t, err) err = s.Publish(ctx, "Cable") require.NoError(t, err) - assertReceive(t, "Cable", ch) + assertReceive(t, "Cable", subscription.Out()) } func TestUnsubscribeAll(t *testing.T) { @@ -162,10 +155,9 @@ func TestUnsubscribeAll(t *testing.T) { defer s.Stop() ctx := context.Background() - ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) - err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch1) + subscription1, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), 1) require.NoError(t, err) - err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"), ch2) + subscription2, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"), 1) require.NoError(t, err) err = s.UnsubscribeAll(ctx, clientID) @@ -173,13 +165,15 @@ func TestUnsubscribeAll(t *testing.T) { err = s.Publish(ctx, "Nick Fury") require.NoError(t, err) - assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") - assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") + assert.Zero(t, len(subscription1.Out()), "Should not receive anything after UnsubscribeAll") + assert.Zero(t, len(subscription2.Out()), "Should not receive anything after UnsubscribeAll") - _, ok := <-ch1 + _, ok := <-subscription1.Cancelled() assert.False(t, ok) - _, ok = <-ch2 + assert.Equal(t, pubsub.ErrUnsubscribed, subscription1.Err()) + _, ok = <-subscription2.Cancelled() assert.False(t, ok) + assert.Equal(t, pubsub.ErrUnsubscribed, subscription2.Err()) } func TestBufferCapacity(t *testing.T) { @@ -217,12 +211,20 @@ func benchmarkNClients(n int, b *testing.B) { ctx := context.Background() for i := 0; i < n; i++ { - ch := make(chan interface{}) + subscription, err := s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), 1) + if err != nil { + b.Fatal(err) + } go func() { - for range ch { + for { + select { + case <-subscription.Out(): + continue + case <-subscription.Cancelled(): + return + } } }() - s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), ch) } b.ReportAllocs() @@ -240,12 +242,20 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { ctx := context.Background() q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1") for i := 0; i < n; i++ { - ch := make(chan interface{}) + subscription, err := s.Subscribe(ctx, clientID, q, 1) + if err != nil { + b.Fatal(err) + } go func() { - for range ch { + for { + select { + case <-subscription.Out(): + continue + case <-subscription.Cancelled(): + return + } } }() - s.Subscribe(ctx, clientID, q, ch) } b.ReportAllocs() @@ -259,12 +269,10 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { /// HELPERS /////////////////////////////////////////////////////////////////////////////// -func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) { +func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.MsgAndTags, msgAndArgs ...interface{}) { select { case actual := <-ch: - if actual != nil { - assert.Equal(t, expected, actual, msgAndArgs...) - } + assert.Equal(t, expected, actual.Msg, msgAndArgs...) case <-time.After(1 * time.Second): t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected) debug.PrintStack() diff --git a/libs/pubsub/subscription.go b/libs/pubsub/subscription.go new file mode 100644 index 000000000..3bdfb0a48 --- /dev/null +++ b/libs/pubsub/subscription.go @@ -0,0 +1,60 @@ +package pubsub + +import ( + "errors" + "sync" +) + +var ( + // ErrUnsubscribed is returned by Err when a client unsubscribes. + ErrUnsubscribed = errors.New("client unsubscribed") + + // ErrOutOfCapacity is returned by Err when a client is not pulling messages + // fast enough. Note the client's subscription will be terminated. + ErrOutOfCapacity = errors.New("client is not pulling messages fast enough") +) + +// A Subscription represents a client subscription for a particular query and +// consists of three things: +// 1) channel onto which messages and tags are published +// 2) channel which is closed if a client is too slow or choose to unsubscribe +// 3) err indicating the reason for (2) +type Subscription struct { + out chan MsgAndTags + + cancelled chan struct{} + mtx sync.RWMutex + err error +} + +// Out returns a channel onto which messages and tags are published. +// Unsubscribe/UnsubscribeAll does not close the channel to avoid clients from +// receiving a nil message. +func (s *Subscription) Out() <-chan MsgAndTags { + return s.out +} + +// Cancelled returns a channel that's closed when the subscription is +// terminated and supposed to be used in a select statement. +func (s *Subscription) Cancelled() <-chan struct{} { + return s.cancelled +} + +// Err returns nil if the channel returned by Cancelled is not yet closed. +// If the channel is closed, Err returns a non-nil error explaining why: +// - ErrUnsubscribed if the subscriber choose to unsubscribe, +// - ErrOutOfCapacity if the subscriber is not pulling messages fast enough +// and the channel returned by Out became full, +// After Err returns a non-nil error, successive calls to Err return the same +// error. +func (s *Subscription) Err() error { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.err +} + +// MsgAndTags glues a message and tags together. +type MsgAndTags struct { + Msg interface{} + Tags TagMap +} diff --git a/libs/pubsub/tag_map.go b/libs/pubsub/tag_map.go new file mode 100644 index 000000000..3aa229aca --- /dev/null +++ b/libs/pubsub/tag_map.go @@ -0,0 +1,32 @@ +package pubsub + +// TagMap is used to associate tags to a message. +// They can be queried by subscribers to choose messages they will received. +type TagMap interface { + // Get returns the value for a key, or nil if no value is present. + // The ok result indicates whether value was found in the tags. + Get(key string) (value string, ok bool) + // Len returns the number of tags. + Len() int +} + +type tagMap map[string]string + +var _ TagMap = (*tagMap)(nil) + +// NewTagMap constructs a new immutable tag set from a map. +func NewTagMap(data map[string]string) TagMap { + return tagMap(data) +} + +// Get returns the value for a key, or nil if no value is present. +// The ok result indicates whether value was found in the tags. +func (ts tagMap) Get(key string) (value string, ok bool) { + value, ok = ts[key] + return +} + +// Len returns the number of tags. +func (ts tagMap) Len() int { + return len(ts) +}