From 600458734788aa499ae2f8ffbe4d7e942262180e Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 28 May 2018 14:37:11 +0400 Subject: [PATCH] expect all tags to be strings (#1498) * expect all tags to be strings Refs #1369 * port changes from https://github.com/tendermint/tmlibs/pull/204 Refs #1369 --- CHANGELOG.md | 7 +++ libs/pubsub/example_test.go | 2 +- libs/pubsub/pubsub.go | 32 ++++++------ libs/pubsub/pubsub_test.go | 14 ++--- libs/pubsub/query/empty_test.go | 8 +-- libs/pubsub/query/query.go | 90 +++++++++++++++------------------ libs/pubsub/query/query_test.go | 52 +++++++++---------- state/state_test.go | 4 +- types/event_bus.go | 8 +-- types/event_bus_test.go | 4 +- 10 files changed, 112 insertions(+), 109 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bde5b2e40..a288d7404 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 0.20.0 + +BREAKING: + +- [libs/pubsub] TagMap#Get returns a string value +- [libs/pubsub] NewTagMap accepts a map of strings + ## 0.19.6 FEATURES diff --git a/libs/pubsub/example_test.go b/libs/pubsub/example_test.go index 550b4447e..260521cd9 100644 --- a/libs/pubsub/example_test.go +++ b/libs/pubsub/example_test.go @@ -22,7 +22,7 @@ func TestExample(t *testing.T) { ch := make(chan interface{}, 1) err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]interface{}{"abci.account.name": "John"})) + err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]string{"abci.account.name": "John"})) require.NoError(t, err) assertReceive(t, "Tombstone", ch) } diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 67f264ace..684ff358a 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -38,18 +38,6 @@ var ( ErrAlreadySubscribed = errors.New("already subscribed") ) -// TagMap is used to associate tags to a message. -// They can be queried by subscribers to choose messages they will received. -type TagMap interface { - // Get returns the value for a key, or nil if no value is present. - // The ok result indicates whether value was found in the tags. - Get(key string) (value interface{}, ok bool) - // Len returns the number of tags. - Len() int -} - -type tagMap map[string]interface{} - type cmd struct { op operation query Query @@ -80,14 +68,28 @@ type Server struct { // Option sets a parameter for the server. type Option func(*Server) +// TagMap is used to associate tags to a message. +// They can be queried by subscribers to choose messages they will received. +type TagMap interface { + // Get returns the value for a key, or nil if no value is present. + // The ok result indicates whether value was found in the tags. + Get(key string) (value string, ok bool) + // Len returns the number of tags. + Len() int +} + +type tagMap map[string]string + +var _ TagMap = (*tagMap)(nil) + // NewTagMap constructs a new immutable tag set from a map. -func NewTagMap(data map[string]interface{}) TagMap { +func NewTagMap(data map[string]string) TagMap { return tagMap(data) } // Get returns the value for a key, or nil if no value is present. // The ok result indicates whether value was found in the tags. -func (ts tagMap) Get(key string) (value interface{}, ok bool) { +func (ts tagMap) Get(key string) (value string, ok bool) { value, ok = ts[key] return } @@ -213,7 +215,7 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { // Publish publishes the given message. An error will be returned to the caller // if the context is canceled. func (s *Server) Publish(ctx context.Context, msg interface{}) error { - return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]interface{}))) + return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]string))) } // PublishWithTags publishes the given message with the set of tags. The set is diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go index a39d015ce..fd6c11cf4 100644 --- a/libs/pubsub/pubsub_test.go +++ b/libs/pubsub/pubsub_test.go @@ -49,14 +49,14 @@ func TestDifferentClients(t *testing.T) { ch1 := make(chan interface{}, 1) err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) + err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"})) require.NoError(t, err) assertReceive(t, "Iceman", ch1) ch2 := make(chan interface{}, 1) err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})) + err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})) require.NoError(t, err) assertReceive(t, "Ultimo", ch1) assertReceive(t, "Ultimo", ch2) @@ -64,7 +64,7 @@ func TestDifferentClients(t *testing.T) { ch3 := make(chan interface{}, 1) err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewRoundStep"})) + err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewRoundStep"})) require.NoError(t, err) assert.Zero(t, len(ch3)) } @@ -81,7 +81,7 @@ func TestClientSubscribesTwice(t *testing.T) { ch1 := make(chan interface{}, 1) err := s.Subscribe(ctx, clientID, q, ch1) require.NoError(t, err) - err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) + err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"})) require.NoError(t, err) assertReceive(t, "Goblin Queen", ch1) @@ -89,7 +89,7 @@ func TestClientSubscribesTwice(t *testing.T) { err = s.Subscribe(ctx, clientID, q, ch2) require.Error(t, err) - err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]interface{}{"tm.events.type": "NewBlock"})) + err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"})) require.NoError(t, err) assertReceive(t, "Spider-Man", ch1) } @@ -209,7 +209,7 @@ func benchmarkNClients(n int, b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})) + s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": string(i)})) } } @@ -232,7 +232,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})) + s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": "1"})) } } diff --git a/libs/pubsub/query/empty_test.go b/libs/pubsub/query/empty_test.go index 9c82f73ed..6183b6bd4 100644 --- a/libs/pubsub/query/empty_test.go +++ b/libs/pubsub/query/empty_test.go @@ -11,8 +11,8 @@ import ( func TestEmptyQueryMatchesAnything(t *testing.T) { q := query.Empty{} - assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{}))) - assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Asher": "Roth"}))) - assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66}))) - assert.True(t, q.Matches(pubsub.NewTagMap(map[string]interface{}{"Route": 66, "Billy": "Blue"}))) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{}))) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Asher": "Roth"}))) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Route": "66"}))) + assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Route": "66", "Billy": "Blue"}))) } diff --git a/libs/pubsub/query/query.go b/libs/pubsub/query/query.go index a900d9838..ec187486e 100644 --- a/libs/pubsub/query/query.go +++ b/libs/pubsub/query/query.go @@ -77,6 +77,13 @@ const ( OpContains ) +const ( + // DateLayout defines a layout for all dates (`DATE date`) + DateLayout = "2006-01-02" + // TimeLayout defines a layout for all times (`TIME time`) + TimeLayout = time.RFC3339 +) + // Conditions returns a list of conditions. func (q *Query) Conditions() []Condition { conditions := make([]Condition, 0) @@ -112,7 +119,7 @@ func (q *Query) Conditions() []Condition { conditions = append(conditions, Condition{tag, op, valueWithoutSingleQuotes}) case rulenumber: number := buffer[begin:end] - if strings.Contains(number, ".") { // if it looks like a floating-point number + if strings.ContainsAny(number, ".") { // if it looks like a floating-point number value, err := strconv.ParseFloat(number, 64) if err != nil { panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) @@ -126,7 +133,7 @@ func (q *Query) Conditions() []Condition { conditions = append(conditions, Condition{tag, op, value}) } case ruletime: - value, err := time.Parse(time.RFC3339, buffer[begin:end]) + value, err := time.Parse(TimeLayout, buffer[begin:end]) if err != nil { panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) } @@ -188,7 +195,7 @@ func (q *Query) Matches(tags pubsub.TagMap) bool { } case rulenumber: number := buffer[begin:end] - if strings.Contains(number, ".") { // if it looks like a floating-point number + if strings.ContainsAny(number, ".") { // if it looks like a floating-point number value, err := strconv.ParseFloat(number, 64) if err != nil { panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) @@ -206,7 +213,7 @@ func (q *Query) Matches(tags pubsub.TagMap) bool { } } case ruletime: - value, err := time.Parse(time.RFC3339, buffer[begin:end]) + value, err := time.Parse(TimeLayout, buffer[begin:end]) if err != nil { panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) } @@ -242,9 +249,18 @@ func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) b switch operand.Kind() { case reflect.Struct: // time operandAsTime := operand.Interface().(time.Time) - v, ok := value.(time.Time) - if !ok { // if value from tags is not time.Time - return false + // try our best to convert value from tags to time.Time + var ( + v time.Time + err error + ) + if strings.ContainsAny(value, "T") { + v, err = time.Parse(TimeLayout, value) + } else { + v, err = time.Parse(DateLayout, value) + } + if err != nil { + panic(fmt.Sprintf("Failed to convert value %v from tag to time.Time: %v", value, err)) } switch op { case OpLessEqual: @@ -262,23 +278,9 @@ func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) b operandFloat64 := operand.Interface().(float64) var v float64 // try our best to convert value from tags to float64 - switch vt := value.(type) { - case float64: - v = vt - case float32: - v = float64(vt) - case int: - v = float64(vt) - case int8: - v = float64(vt) - case int16: - v = float64(vt) - case int32: - v = float64(vt) - case int64: - v = float64(vt) - default: // fail for all other types - panic(fmt.Sprintf("Incomparable types: %T (%v) vs float64 (%v)", value, value, operandFloat64)) + v, err := strconv.ParseFloat(value, 64) + if err != nil { + panic(fmt.Sprintf("Failed to convert value %v from tag to float64: %v", value, err)) } switch op { case OpLessEqual: @@ -295,24 +297,20 @@ func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) b case reflect.Int64: operandInt := operand.Interface().(int64) var v int64 - // try our best to convert value from tags to int64 - switch vt := value.(type) { - case int64: - v = vt - case int8: - v = int64(vt) - case int16: - v = int64(vt) - case int32: - v = int64(vt) - case int: - v = int64(vt) - case float64: - v = int64(vt) - case float32: - v = int64(vt) - default: // fail for all other types - panic(fmt.Sprintf("Incomparable types: %T (%v) vs int64 (%v)", value, value, operandInt)) + // if value looks like float, we try to parse it as float + if strings.ContainsAny(value, ".") { + v1, err := strconv.ParseFloat(value, 64) + if err != nil { + panic(fmt.Sprintf("Failed to convert value %v from tag to float64: %v", value, err)) + } + v = int64(v1) + } else { + var err error + // try our best to convert value from tags to int64 + v, err = strconv.ParseInt(value, 10, 64) + if err != nil { + panic(fmt.Sprintf("Failed to convert value %v from tag to int64: %v", value, err)) + } } switch op { case OpLessEqual: @@ -327,15 +325,11 @@ func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) b return v == operandInt } case reflect.String: - v, ok := value.(string) - if !ok { // if value from tags is not string - return false - } switch op { case OpEqual: - return v == operand.String() + return value == operand.String() case OpContains: - return strings.Contains(v, operand.String()) + return strings.Contains(value, operand.String()) } default: panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind())) diff --git a/libs/pubsub/query/query_test.go b/libs/pubsub/query/query_test.go index f266b1214..f0d940992 100644 --- a/libs/pubsub/query/query_test.go +++ b/libs/pubsub/query/query_test.go @@ -1,6 +1,7 @@ package query_test import ( + "fmt" "testing" "time" @@ -12,38 +13,37 @@ import ( ) func TestMatches(t *testing.T) { - const shortForm = "2006-Jan-02" - txDate, err := time.Parse(shortForm, "2017-Jan-01") - require.NoError(t, err) - txTime, err := time.Parse(time.RFC3339, "2018-05-03T14:45:00Z") - require.NoError(t, err) + var ( + txDate = "2017-01-01" + txTime = "2018-05-03T14:45:00Z" + ) testCases := []struct { s string - tags map[string]interface{} + tags map[string]string err bool matches bool }{ - {"tm.events.type='NewBlock'", map[string]interface{}{"tm.events.type": "NewBlock"}, false, true}, - - {"tx.gas > 7", map[string]interface{}{"tx.gas": 8}, false, true}, - {"tx.gas > 7 AND tx.gas < 9", map[string]interface{}{"tx.gas": 8}, false, true}, - {"body.weight >= 3.5", map[string]interface{}{"body.weight": 3.5}, false, true}, - {"account.balance < 1000.0", map[string]interface{}{"account.balance": 900}, false, true}, - {"apples.kg <= 4", map[string]interface{}{"apples.kg": 4.0}, false, true}, - {"body.weight >= 4.5", map[string]interface{}{"body.weight": float32(4.5)}, false, true}, - {"oranges.kg < 4 AND watermellons.kg > 10", map[string]interface{}{"oranges.kg": 3, "watermellons.kg": 12}, false, true}, - {"peaches.kg < 4", map[string]interface{}{"peaches.kg": 5}, false, false}, - - {"tx.date > DATE 2017-01-01", map[string]interface{}{"tx.date": time.Now()}, false, true}, - {"tx.date = DATE 2017-01-01", map[string]interface{}{"tx.date": txDate}, false, true}, - {"tx.date = DATE 2018-01-01", map[string]interface{}{"tx.date": txDate}, false, false}, - - {"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": time.Now()}, false, true}, - {"tx.time = TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": txTime}, false, false}, - - {"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Igor,Ivan"}, false, true}, - {"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Pavel,Ivan"}, false, false}, + {"tm.events.type='NewBlock'", map[string]string{"tm.events.type": "NewBlock"}, false, true}, + + {"tx.gas > 7", map[string]string{"tx.gas": "8"}, false, true}, + {"tx.gas > 7 AND tx.gas < 9", map[string]string{"tx.gas": "8"}, false, true}, + {"body.weight >= 3.5", map[string]string{"body.weight": "3.5"}, false, true}, + {"account.balance < 1000.0", map[string]string{"account.balance": "900"}, false, true}, + {"apples.kg <= 4", map[string]string{"apples.kg": "4.0"}, false, true}, + {"body.weight >= 4.5", map[string]string{"body.weight": fmt.Sprintf("%v", float32(4.5))}, false, true}, + {"oranges.kg < 4 AND watermellons.kg > 10", map[string]string{"oranges.kg": "3", "watermellons.kg": "12"}, false, true}, + {"peaches.kg < 4", map[string]string{"peaches.kg": "5"}, false, false}, + + {"tx.date > DATE 2017-01-01", map[string]string{"tx.date": time.Now().Format(query.DateLayout)}, false, true}, + {"tx.date = DATE 2017-01-01", map[string]string{"tx.date": txDate}, false, true}, + {"tx.date = DATE 2018-01-01", map[string]string{"tx.date": txDate}, false, false}, + + {"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]string{"tx.time": time.Now().Format(query.TimeLayout)}, false, true}, + {"tx.time = TIME 2013-05-03T14:45:00Z", map[string]string{"tx.time": txTime}, false, false}, + + {"abci.owner.name CONTAINS 'Igor'", map[string]string{"abci.owner.name": "Igor,Ivan"}, false, true}, + {"abci.owner.name CONTAINS 'Igor'", map[string]string{"abci.owner.name": "Pavel,Ivan"}, false, false}, } for _, tc := range testCases { diff --git a/state/state_test.go b/state/state_test.go index ba995cc00..497695373 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" abci "github.com/tendermint/abci/types" - "github.com/tendermint/go-crypto" + crypto "github.com/tendermint/go-crypto" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" @@ -121,7 +121,7 @@ func TestABCIResponsesSaveLoad2(t *testing.T) { {Code: 383}, {Data: []byte("Gotcha!"), Tags: []cmn.KVPair{ - cmn.KVPair{[]byte("a"), []byte{1}}, + cmn.KVPair{[]byte("a"), []byte("1")}, cmn.KVPair{[]byte("build"), []byte("stuff")}, }}, }, diff --git a/types/event_bus.go b/types/event_bus.go index 925907fd0..2bc339da7 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -67,7 +67,7 @@ func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error func (b *EventBus) Publish(eventType string, eventData TMEventData) error { // no explicit deadline for publishing events ctx := context.Background() - b.pubsub.PublishWithTags(ctx, eventData, tmpubsub.NewTagMap(map[string]interface{}{EventTypeKey: eventType})) + b.pubsub.PublishWithTags(ctx, eventData, tmpubsub.NewTagMap(map[string]string{EventTypeKey: eventType})) return nil } @@ -92,7 +92,7 @@ func (b *EventBus) PublishEventTx(event EventDataTx) error { // no explicit deadline for publishing events ctx := context.Background() - tags := make(map[string]interface{}) + tags := make(map[string]string) // validate and fill tags from tx result for _, tag := range event.Result.Tags { @@ -112,7 +112,7 @@ func (b *EventBus) PublishEventTx(event EventDataTx) error { tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash()) logIfTagExists(TxHeightKey, tags, b.Logger) - tags[TxHeightKey] = event.Height + tags[TxHeightKey] = fmt.Sprintf("%d", event.Height) b.pubsub.PublishWithTags(ctx, event, tmpubsub.NewTagMap(tags)) return nil @@ -160,7 +160,7 @@ func (b *EventBus) PublishEventLock(event EventDataRoundState) error { return b.Publish(EventLock, event) } -func logIfTagExists(tag string, tags map[string]interface{}, logger log.Logger) { +func logIfTagExists(tag string, tags map[string]string, logger log.Logger) { if value, ok := tags[tag]; ok { logger.Error("Found predefined tag (value will be overwritten)", "tag", tag, "value", value) } diff --git a/types/event_bus_test.go b/types/event_bus_test.go index 95d061f40..8358ad261 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -23,12 +23,12 @@ func TestEventBusPublishEventTx(t *testing.T) { defer eventBus.Stop() tx := Tx("foo") - result := abci.ResponseDeliverTx{Data: []byte("bar"), Tags: []cmn.KVPair{}, Fee: cmn.KI64Pair{Key: []uint8{}, Value: 0}} + result := abci.ResponseDeliverTx{Data: []byte("bar"), Tags: []cmn.KVPair{{[]byte("baz"), []byte("1")}}, Fee: cmn.KI64Pair{Key: []uint8{}, Value: 0}} txEventsCh := make(chan interface{}) // PublishEventTx adds all these 3 tags, so the query below should work - query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X'", tx.Hash()) + query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND baz=1", tx.Hash()) err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh) require.NoError(t, err)