diff --git a/libs/pubsub/example_test.go b/libs/pubsub/example_test.go index 888368b9e..a43696266 100644 --- a/libs/pubsub/example_test.go +++ b/libs/pubsub/example_test.go @@ -21,7 +21,7 @@ func TestExample(t *testing.T) { ctx := context.Background() subscription, err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'")) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]string{"abci.account.name": "John"})) + err = s.PublishWithTags(ctx, "Tombstone", map[string]string{"abci.account.name": "John"}) require.NoError(t, err) assertReceive(t, "Tombstone", subscription.Out()) } diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index d5318c464..fd4a26936 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -63,7 +63,7 @@ var ( // Query defines an interface for a query to be used for subscribing. type Query interface { - Matches(tags TagMap) bool + Matches(tags map[string]string) bool String() string } @@ -77,7 +77,7 @@ type cmd struct { // publish msg interface{} - tags TagMap + tags map[string]string } // Server allows clients to subscribe/unsubscribe for messages, publishing @@ -131,10 +131,10 @@ func (s *Server) BufferCapacity() int { return s.cmdsCap } -// Subscribe creates a subscription for the given client. -// +// Subscribe creates a subscription for the given client. +// // An error will be returned to the caller if the context is canceled or if -// subscription already exist for pair clientID and query. +// subscription already exist for pair clientID and query. // // outCapacity can be used to set a capacity for Subscription#Out channel (1 by // default). Panics if outCapacity is less than or equal to zero. If you want @@ -245,13 +245,13 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { // Publish publishes the given message. An error will be returned to the caller // if the context is canceled. func (s *Server) Publish(ctx context.Context, msg interface{}) error { - return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]string))) + return s.PublishWithTags(ctx, msg, make(map[string]string)) } // PublishWithTags publishes the given message with the set of tags. The set is // matched with clients queries. If there is a match, the message is sent to // the client. -func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error { +func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]string) error { select { case s.cmds <- cmd{op: pub, msg: msg, tags: tags}: return nil @@ -382,7 +382,7 @@ func (state *state) removeAll(reason error) { } } -func (state *state) send(msg interface{}, tags TagMap) { +func (state *state) send(msg interface{}, tags map[string]string) { for qStr, clientSubscriptions := range state.subscriptions { q := state.queries[qStr].q if q.Matches(tags) { diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go index 973d6ba06..c72e56983 100644 --- a/libs/pubsub/pubsub_test.go +++ b/libs/pubsub/pubsub_test.go @@ -81,20 +81,20 @@ func TestDifferentClients(t *testing.T) { ctx := context.Background() subscription1, err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'")) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"})) + err = s.PublishWithTags(ctx, "Iceman", map[string]string{"tm.events.type": "NewBlock"}) require.NoError(t, err) assertReceive(t, "Iceman", subscription1.Out()) subscription2, err := s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'")) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})) + err = s.PublishWithTags(ctx, "Ultimo", map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) require.NoError(t, err) assertReceive(t, "Ultimo", subscription1.Out()) assertReceive(t, "Ultimo", subscription2.Out()) subscription3, err := s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10")) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewRoundStep"})) + err = s.PublishWithTags(ctx, "Valeria Richards", map[string]string{"tm.events.type": "NewRoundStep"}) require.NoError(t, err) assert.Zero(t, len(subscription3.Out())) } @@ -110,7 +110,7 @@ func TestClientSubscribesTwice(t *testing.T) { subscription1, err := s.Subscribe(ctx, clientID, q) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"})) + err = s.PublishWithTags(ctx, "Goblin Queen", map[string]string{"tm.events.type": "NewBlock"}) require.NoError(t, err) assertReceive(t, "Goblin Queen", subscription1.Out()) @@ -118,7 +118,7 @@ func TestClientSubscribesTwice(t *testing.T) { require.Error(t, err) require.Nil(t, subscription2) - err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"})) + err = s.PublishWithTags(ctx, "Spider-Man", map[string]string{"tm.events.type": "NewBlock"}) require.NoError(t, err) assertReceive(t, "Spider-Man", subscription1.Out()) } @@ -264,7 +264,7 @@ func benchmarkNClients(n int, b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": string(i)})) + s.PublishWithTags(ctx, "Gamora", map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": string(i)}) } } @@ -295,7 +295,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": "1"})) + s.PublishWithTags(ctx, "Gamora", map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": "1"}) } } diff --git a/libs/pubsub/query/empty.go b/libs/pubsub/query/empty.go index 17d7acefa..83271f047 100644 --- a/libs/pubsub/query/empty.go +++ b/libs/pubsub/query/empty.go @@ -1,13 +1,11 @@ package query -import "github.com/tendermint/tendermint/libs/pubsub" - // Empty query matches any set of tags. type Empty struct { } // Matches always returns true. -func (Empty) Matches(tags pubsub.TagMap) bool { +func (Empty) Matches(tags map[string]string) bool { return true } diff --git a/libs/pubsub/query/empty_test.go b/libs/pubsub/query/empty_test.go index 6183b6bd4..141fb9515 100644 --- a/libs/pubsub/query/empty_test.go +++ b/libs/pubsub/query/empty_test.go @@ -5,14 +5,13 @@ import ( "github.com/stretchr/testify/assert" - "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/libs/pubsub/query" ) func TestEmptyQueryMatchesAnything(t *testing.T) { q := query.Empty{} - assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{}))) - assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Asher": "Roth"}))) - assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Route": "66"}))) - assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Route": "66", "Billy": "Blue"}))) + assert.True(t, q.Matches(map[string]string{})) + assert.True(t, q.Matches(map[string]string{"Asher": "Roth"})) + assert.True(t, q.Matches(map[string]string{"Route": "66"})) + assert.True(t, q.Matches(map[string]string{"Route": "66", "Billy": "Blue"})) } diff --git a/libs/pubsub/query/query.go b/libs/pubsub/query/query.go index ec187486e..189110a3e 100644 --- a/libs/pubsub/query/query.go +++ b/libs/pubsub/query/query.go @@ -14,8 +14,6 @@ import ( "strconv" "strings" "time" - - "github.com/tendermint/tendermint/libs/pubsub" ) // Query holds the query string and the query parser. @@ -154,8 +152,8 @@ func (q *Query) Conditions() []Condition { // // For example, query "name=John" matches tags = {"name": "John"}. More // examples could be found in parser_test.go and query_test.go. -func (q *Query) Matches(tags pubsub.TagMap) bool { - if tags.Len() == 0 { +func (q *Query) Matches(tags map[string]string) bool { + if len(tags) == 0 { return false } @@ -240,9 +238,9 @@ func (q *Query) Matches(tags pubsub.TagMap) bool { // value from it to the operand using the operator. // // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } -func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) bool { +func match(tag string, op Operator, operand reflect.Value, tags map[string]string) bool { // look up the tag from the query in tags - value, ok := tags.Get(tag) + value, ok := tags[tag] if !ok { return false } diff --git a/libs/pubsub/query/query_test.go b/libs/pubsub/query/query_test.go index d1810f466..a3d83b259 100644 --- a/libs/pubsub/query/query_test.go +++ b/libs/pubsub/query/query_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/libs/pubsub/query" ) @@ -53,9 +52,9 @@ func TestMatches(t *testing.T) { } if tc.matches { - assert.True(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should match %v", tc.s, tc.tags) + assert.True(t, q.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags) } else { - assert.False(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should not match %v", tc.s, tc.tags) + assert.False(t, q.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags) } } } diff --git a/libs/pubsub/subscription.go b/libs/pubsub/subscription.go index e852e4906..e86693245 100644 --- a/libs/pubsub/subscription.go +++ b/libs/pubsub/subscription.go @@ -56,7 +56,7 @@ func (s *Subscription) Err() error { // Message glues data and tags together. type Message struct { data interface{} - tags TagMap + tags map[string]string } // Data returns an original data published. @@ -65,6 +65,6 @@ func (msg Message) Data() interface{} { } // Tags returns tags, which matched the client's query. -func (msg Message) Tags() TagMap { +func (msg Message) Tags() map[string]string { return msg.tags } diff --git a/libs/pubsub/tag_map.go b/libs/pubsub/tag_map.go deleted file mode 100644 index 3aa229aca..000000000 --- a/libs/pubsub/tag_map.go +++ /dev/null @@ -1,32 +0,0 @@ -package pubsub - -// TagMap is used to associate tags to a message. -// They can be queried by subscribers to choose messages they will received. -type TagMap interface { - // Get returns the value for a key, or nil if no value is present. - // The ok result indicates whether value was found in the tags. - Get(key string) (value string, ok bool) - // Len returns the number of tags. - Len() int -} - -type tagMap map[string]string - -var _ TagMap = (*tagMap)(nil) - -// NewTagMap constructs a new immutable tag set from a map. -func NewTagMap(data map[string]string) TagMap { - return tagMap(data) -} - -// Get returns the value for a key, or nil if no value is present. -// The ok result indicates whether value was found in the tags. -func (ts tagMap) Get(key string) (value string, ok bool) { - value, ok = ts[key] - return -} - -// Len returns the number of tags. -func (ts tagMap) Len() int { - return len(ts) -} diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 1a1d88c47..51c1d2e21 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -257,7 +257,7 @@ type WSEvents struct { ws *rpcclient.WSClient mtx sync.RWMutex - subscriptions map[string]chan<- interface{} + subscriptions map[string]chan<- EventMessage } func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { @@ -265,7 +265,7 @@ func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { cdc: cdc, endpoint: endpoint, remote: remote, - subscriptions: make(map[string]chan<- interface{}), + subscriptions: make(map[string]chan<- EventMessage), } wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents) @@ -341,7 +341,7 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error for _, ch := range w.subscriptions { close(ch) } - w.subscriptions = make(map[string]chan<- interface{}) + w.subscriptions = make(map[string]chan<- EventMessage) w.mtx.Unlock() return nil @@ -382,7 +382,7 @@ func (w *WSEvents) eventListener() { // Unsubscribe/UnsubscribeAll. w.mtx.RLock() if ch, ok := w.subscriptions[result.Query]; ok { - ch <- result.Data + ch <- EventMessage{result.Data, result.Tags} } w.mtx.RUnlock() case <-w.Quit(): diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 7477225e9..eeb10d2b9 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -21,7 +21,10 @@ implementation. */ import ( + "context" + cmn "github.com/tendermint/tendermint/libs/common" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" ) @@ -91,7 +94,9 @@ type NetworkClient interface { // EventsClient is reactive, you can subscribe to any message, given the proper // string. see tendermint/types/events.go type EventsClient interface { - types.EventBusSubscriber + Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error + Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error + UnsubscribeAll(ctx context.Context, subscriber string) error } // MempoolClient shows us data about current mempool state. diff --git a/rpc/client/types.go b/rpc/client/types.go index 6a23fa450..b2c73479b 100644 --- a/rpc/client/types.go +++ b/rpc/client/types.go @@ -1,5 +1,7 @@ package client +import "github.com/tendermint/tendermint/types" + // ABCIQueryOptions can be used to provide options for ABCIQuery call other // than the DefaultABCIQueryOptions. type ABCIQueryOptions struct { @@ -9,3 +11,9 @@ type ABCIQueryOptions struct { // DefaultABCIQueryOptions are latest height (0) and prove false. var DefaultABCIQueryOptions = ABCIQueryOptions{Height: 0, Prove: false} + +// EventMessage combines event data and tags. +type EventMessage struct { + Data types.TMEventData + Tags map[string]string +} diff --git a/rpc/core/events.go b/rpc/core/events.go index 2ab44d386..8b4966c05 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -9,7 +9,6 @@ import ( tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpctypes "github.com/tendermint/tendermint/rpc/lib/types" - tmtypes "github.com/tendermint/tendermint/types" ) // Subscribe for events via WebSocket. @@ -110,7 +109,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri for { select { case msg := <-sub.Out(): - resultEvent := &ctypes.ResultEvent{query, msg.Data().(tmtypes.TMEventData)} + resultEvent := &ctypes.ResultEvent{query, msg.Data(), msg.Tags()} wsCtx.TryWriteRPCResponse( rpctypes.NewRPCSuccessResponse( wsCtx.Codec(), diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 62be1cafd..b7932f28a 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -206,4 +206,5 @@ type ( type ResultEvent struct { Query string `json:"query"` Data types.TMEventData `json:"data"` + Tags map[string]string `json:"tags"` } diff --git a/types/event_bus.go b/types/event_bus.go index 2f017854b..04b272b2f 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -73,7 +73,7 @@ func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error func (b *EventBus) Publish(eventType string, eventData TMEventData) error { // no explicit deadline for publishing events ctx := context.Background() - b.pubsub.PublishWithTags(ctx, eventData, tmpubsub.NewTagMap(map[string]string{EventTypeKey: eventType})) + b.pubsub.PublishWithTags(ctx, eventData, map[string]string{EventTypeKey: eventType}) return nil } @@ -101,7 +101,7 @@ func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error { logIfTagExists(EventTypeKey, tags, b.Logger) tags[EventTypeKey] = EventNewBlock - b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags)) + b.pubsub.PublishWithTags(ctx, data, tags) return nil } @@ -117,7 +117,7 @@ func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) erro logIfTagExists(EventTypeKey, tags, b.Logger) tags[EventTypeKey] = EventNewBlockHeader - b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags)) + b.pubsub.PublishWithTags(ctx, data, tags) return nil } @@ -148,7 +148,7 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error { logIfTagExists(TxHeightKey, tags, b.Logger) tags[TxHeightKey] = fmt.Sprintf("%d", data.Height) - b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags)) + b.pubsub.PublishWithTags(ctx, data, tags) return nil }