Browse Source

remove TagMap

it does not bring any additional benefits
pull/3227/head
Anton Kaliaev 6 years ago
parent
commit
1be7e341b1
No known key found for this signature in database GPG Key ID: 7B6881D965918214
15 changed files with 53 additions and 78 deletions
  1. +1
    -1
      libs/pubsub/example_test.go
  2. +8
    -8
      libs/pubsub/pubsub.go
  3. +7
    -7
      libs/pubsub/pubsub_test.go
  4. +1
    -3
      libs/pubsub/query/empty.go
  5. +4
    -5
      libs/pubsub/query/empty_test.go
  6. +4
    -6
      libs/pubsub/query/query.go
  7. +2
    -3
      libs/pubsub/query/query_test.go
  8. +2
    -2
      libs/pubsub/subscription.go
  9. +0
    -32
      libs/pubsub/tag_map.go
  10. +4
    -4
      rpc/client/httpclient.go
  11. +6
    -1
      rpc/client/interface.go
  12. +8
    -0
      rpc/client/types.go
  13. +1
    -2
      rpc/core/events.go
  14. +1
    -0
      rpc/core/types/responses.go
  15. +4
    -4
      types/event_bus.go

+ 1
- 1
libs/pubsub/example_test.go View File

@ -21,7 +21,7 @@ func TestExample(t *testing.T) {
ctx := context.Background()
subscription, err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"))
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]string{"abci.account.name": "John"}))
err = s.PublishWithTags(ctx, "Tombstone", map[string]string{"abci.account.name": "John"})
require.NoError(t, err)
assertReceive(t, "Tombstone", subscription.Out())
}

+ 8
- 8
libs/pubsub/pubsub.go View File

@ -63,7 +63,7 @@ var (
// Query defines an interface for a query to be used for subscribing.
type Query interface {
Matches(tags TagMap) bool
Matches(tags map[string]string) bool
String() string
}
@ -77,7 +77,7 @@ type cmd struct {
// publish
msg interface{}
tags TagMap
tags map[string]string
}
// Server allows clients to subscribe/unsubscribe for messages, publishing
@ -131,10 +131,10 @@ func (s *Server) BufferCapacity() int {
return s.cmdsCap
}
// Subscribe creates a subscription for the given client.
//
// Subscribe creates a subscription for the given client.
//
// An error will be returned to the caller if the context is canceled or if
// subscription already exist for pair clientID and query.
// subscription already exist for pair clientID and query.
//
// outCapacity can be used to set a capacity for Subscription#Out channel (1 by
// default). Panics if outCapacity is less than or equal to zero. If you want
@ -245,13 +245,13 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
// Publish publishes the given message. An error will be returned to the caller
// if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error {
return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]string)))
return s.PublishWithTags(ctx, msg, make(map[string]string))
}
// PublishWithTags publishes the given message with the set of tags. The set is
// matched with clients queries. If there is a match, the message is sent to
// the client.
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error {
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]string) error {
select {
case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
return nil
@ -382,7 +382,7 @@ func (state *state) removeAll(reason error) {
}
}
func (state *state) send(msg interface{}, tags TagMap) {
func (state *state) send(msg interface{}, tags map[string]string) {
for qStr, clientSubscriptions := range state.subscriptions {
q := state.queries[qStr].q
if q.Matches(tags) {


+ 7
- 7
libs/pubsub/pubsub_test.go View File

@ -81,20 +81,20 @@ func TestDifferentClients(t *testing.T) {
ctx := context.Background()
subscription1, err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
err = s.PublishWithTags(ctx, "Iceman", map[string]string{"tm.events.type": "NewBlock"})
require.NoError(t, err)
assertReceive(t, "Iceman", subscription1.Out())
subscription2, err := s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"))
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}))
err = s.PublishWithTags(ctx, "Ultimo", map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})
require.NoError(t, err)
assertReceive(t, "Ultimo", subscription1.Out())
assertReceive(t, "Ultimo", subscription2.Out())
subscription3, err := s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"))
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewRoundStep"}))
err = s.PublishWithTags(ctx, "Valeria Richards", map[string]string{"tm.events.type": "NewRoundStep"})
require.NoError(t, err)
assert.Zero(t, len(subscription3.Out()))
}
@ -110,7 +110,7 @@ func TestClientSubscribesTwice(t *testing.T) {
subscription1, err := s.Subscribe(ctx, clientID, q)
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
err = s.PublishWithTags(ctx, "Goblin Queen", map[string]string{"tm.events.type": "NewBlock"})
require.NoError(t, err)
assertReceive(t, "Goblin Queen", subscription1.Out())
@ -118,7 +118,7 @@ func TestClientSubscribesTwice(t *testing.T) {
require.Error(t, err)
require.Nil(t, subscription2)
err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
err = s.PublishWithTags(ctx, "Spider-Man", map[string]string{"tm.events.type": "NewBlock"})
require.NoError(t, err)
assertReceive(t, "Spider-Man", subscription1.Out())
}
@ -264,7 +264,7 @@ func benchmarkNClients(n int, b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": string(i)}))
s.PublishWithTags(ctx, "Gamora", map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": string(i)})
}
}
@ -295,7 +295,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": "1"}))
s.PublishWithTags(ctx, "Gamora", map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": "1"})
}
}


+ 1
- 3
libs/pubsub/query/empty.go View File

@ -1,13 +1,11 @@
package query
import "github.com/tendermint/tendermint/libs/pubsub"
// Empty query matches any set of tags.
type Empty struct {
}
// Matches always returns true.
func (Empty) Matches(tags pubsub.TagMap) bool {
func (Empty) Matches(tags map[string]string) bool {
return true
}


+ 4
- 5
libs/pubsub/query/empty_test.go View File

@ -5,14 +5,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
func TestEmptyQueryMatchesAnything(t *testing.T) {
q := query.Empty{}
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{})))
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Asher": "Roth"})))
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Route": "66"})))
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Route": "66", "Billy": "Blue"})))
assert.True(t, q.Matches(map[string]string{}))
assert.True(t, q.Matches(map[string]string{"Asher": "Roth"}))
assert.True(t, q.Matches(map[string]string{"Route": "66"}))
assert.True(t, q.Matches(map[string]string{"Route": "66", "Billy": "Blue"}))
}

+ 4
- 6
libs/pubsub/query/query.go View File

@ -14,8 +14,6 @@ import (
"strconv"
"strings"
"time"
"github.com/tendermint/tendermint/libs/pubsub"
)
// Query holds the query string and the query parser.
@ -154,8 +152,8 @@ func (q *Query) Conditions() []Condition {
//
// For example, query "name=John" matches tags = {"name": "John"}. More
// examples could be found in parser_test.go and query_test.go.
func (q *Query) Matches(tags pubsub.TagMap) bool {
if tags.Len() == 0 {
func (q *Query) Matches(tags map[string]string) bool {
if len(tags) == 0 {
return false
}
@ -240,9 +238,9 @@ func (q *Query) Matches(tags pubsub.TagMap) bool {
// value from it to the operand using the operator.
//
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) bool {
func match(tag string, op Operator, operand reflect.Value, tags map[string]string) bool {
// look up the tag from the query in tags
value, ok := tags.Get(tag)
value, ok := tags[tag]
if !ok {
return false
}


+ 2
- 3
libs/pubsub/query/query_test.go View File

@ -8,7 +8,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
@ -53,9 +52,9 @@ func TestMatches(t *testing.T) {
}
if tc.matches {
assert.True(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should match %v", tc.s, tc.tags)
assert.True(t, q.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags)
} else {
assert.False(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should not match %v", tc.s, tc.tags)
assert.False(t, q.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags)
}
}
}


+ 2
- 2
libs/pubsub/subscription.go View File

@ -56,7 +56,7 @@ func (s *Subscription) Err() error {
// Message glues data and tags together.
type Message struct {
data interface{}
tags TagMap
tags map[string]string
}
// Data returns an original data published.
@ -65,6 +65,6 @@ func (msg Message) Data() interface{} {
}
// Tags returns tags, which matched the client's query.
func (msg Message) Tags() TagMap {
func (msg Message) Tags() map[string]string {
return msg.tags
}

+ 0
- 32
libs/pubsub/tag_map.go View File

@ -1,32 +0,0 @@
package pubsub
// TagMap is used to associate tags to a message.
// They can be queried by subscribers to choose messages they will received.
type TagMap interface {
// Get returns the value for a key, or nil if no value is present.
// The ok result indicates whether value was found in the tags.
Get(key string) (value string, ok bool)
// Len returns the number of tags.
Len() int
}
type tagMap map[string]string
var _ TagMap = (*tagMap)(nil)
// NewTagMap constructs a new immutable tag set from a map.
func NewTagMap(data map[string]string) TagMap {
return tagMap(data)
}
// Get returns the value for a key, or nil if no value is present.
// The ok result indicates whether value was found in the tags.
func (ts tagMap) Get(key string) (value string, ok bool) {
value, ok = ts[key]
return
}
// Len returns the number of tags.
func (ts tagMap) Len() int {
return len(ts)
}

+ 4
- 4
rpc/client/httpclient.go View File

@ -257,7 +257,7 @@ type WSEvents struct {
ws *rpcclient.WSClient
mtx sync.RWMutex
subscriptions map[string]chan<- interface{}
subscriptions map[string]chan<- EventMessage
}
func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
@ -265,7 +265,7 @@ func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
cdc: cdc,
endpoint: endpoint,
remote: remote,
subscriptions: make(map[string]chan<- interface{}),
subscriptions: make(map[string]chan<- EventMessage),
}
wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents)
@ -341,7 +341,7 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error
for _, ch := range w.subscriptions {
close(ch)
}
w.subscriptions = make(map[string]chan<- interface{})
w.subscriptions = make(map[string]chan<- EventMessage)
w.mtx.Unlock()
return nil
@ -382,7 +382,7 @@ func (w *WSEvents) eventListener() {
// Unsubscribe/UnsubscribeAll.
w.mtx.RLock()
if ch, ok := w.subscriptions[result.Query]; ok {
ch <- result.Data
ch <- EventMessage{result.Data, result.Tags}
}
w.mtx.RUnlock()
case <-w.Quit():


+ 6
- 1
rpc/client/interface.go View File

@ -21,7 +21,10 @@ implementation.
*/
import (
"context"
cmn "github.com/tendermint/tendermint/libs/common"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
)
@ -91,7 +94,9 @@ type NetworkClient interface {
// EventsClient is reactive, you can subscribe to any message, given the proper
// string. see tendermint/types/events.go
type EventsClient interface {
types.EventBusSubscriber
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
}
// MempoolClient shows us data about current mempool state.


+ 8
- 0
rpc/client/types.go View File

@ -1,5 +1,7 @@
package client
import "github.com/tendermint/tendermint/types"
// ABCIQueryOptions can be used to provide options for ABCIQuery call other
// than the DefaultABCIQueryOptions.
type ABCIQueryOptions struct {
@ -9,3 +11,9 @@ type ABCIQueryOptions struct {
// DefaultABCIQueryOptions are latest height (0) and prove false.
var DefaultABCIQueryOptions = ABCIQueryOptions{Height: 0, Prove: false}
// EventMessage combines event data and tags.
type EventMessage struct {
Data types.TMEventData
Tags map[string]string
}

+ 1
- 2
rpc/core/events.go View File

@ -9,7 +9,6 @@ import (
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
tmtypes "github.com/tendermint/tendermint/types"
)
// Subscribe for events via WebSocket.
@ -110,7 +109,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
for {
select {
case msg := <-sub.Out():
resultEvent := &ctypes.ResultEvent{query, msg.Data().(tmtypes.TMEventData)}
resultEvent := &ctypes.ResultEvent{query, msg.Data(), msg.Tags()}
wsCtx.TryWriteRPCResponse(
rpctypes.NewRPCSuccessResponse(
wsCtx.Codec(),


+ 1
- 0
rpc/core/types/responses.go View File

@ -206,4 +206,5 @@ type (
type ResultEvent struct {
Query string `json:"query"`
Data types.TMEventData `json:"data"`
Tags map[string]string `json:"tags"`
}

+ 4
- 4
types/event_bus.go View File

@ -73,7 +73,7 @@ func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error
func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
// no explicit deadline for publishing events
ctx := context.Background()
b.pubsub.PublishWithTags(ctx, eventData, tmpubsub.NewTagMap(map[string]string{EventTypeKey: eventType}))
b.pubsub.PublishWithTags(ctx, eventData, map[string]string{EventTypeKey: eventType})
return nil
}
@ -101,7 +101,7 @@ func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlock
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
b.pubsub.PublishWithTags(ctx, data, tags)
return nil
}
@ -117,7 +117,7 @@ func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) erro
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlockHeader
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
b.pubsub.PublishWithTags(ctx, data, tags)
return nil
}
@ -148,7 +148,7 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error {
logIfTagExists(TxHeightKey, tags, b.Logger)
tags[TxHeightKey] = fmt.Sprintf("%d", data.Height)
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
b.pubsub.PublishWithTags(ctx, data, tags)
return nil
}


Loading…
Cancel
Save