From 441ecbaeec73ee31bae73bc77e2b7c6af2850059 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Wed, 26 Jan 2022 07:01:55 -0800 Subject: [PATCH] types: rename and extend the EventData interface (#7687) This is the interface shared by types that can be used as event data in, for example, subscriptions via the RPC. To be compatible with the RPC service, data need to support JSON encoding. Require this as part of the interface. --- internal/eventbus/event_bus.go | 2 +- internal/pubsub/example_test.go | 4 +- internal/pubsub/pubsub.go | 13 +++--- internal/pubsub/pubsub_test.go | 79 ++++++++++++++++++--------------- internal/pubsub/subindex.go | 7 +-- internal/pubsub/subscription.go | 11 ++--- rpc/client/helpers.go | 2 +- rpc/coretypes/responses.go | 9 +--- types/events.go | 6 ++- 9 files changed, 70 insertions(+), 63 deletions(-) diff --git a/internal/eventbus/event_bus.go b/internal/eventbus/event_bus.go index f93b231d2..add10da0a 100644 --- a/internal/eventbus/event_bus.go +++ b/internal/eventbus/event_bus.go @@ -73,7 +73,7 @@ func (b *EventBus) Observe(ctx context.Context, observe func(tmpubsub.Message) e return b.pubsub.Observe(ctx, observe, queries...) } -func (b *EventBus) Publish(ctx context.Context, eventValue string, eventData types.TMEventData) error { +func (b *EventBus) Publish(ctx context.Context, eventValue string, eventData types.EventData) error { tokens := strings.Split(types.EventTypeKey, ".") event := abci.Event{ Type: tokens[0], diff --git a/internal/pubsub/example_test.go b/internal/pubsub/example_test.go index 39b179981..5eea61eb8 100644 --- a/internal/pubsub/example_test.go +++ b/internal/pubsub/example_test.go @@ -29,6 +29,6 @@ func TestExample(t *testing.T) { Attributes: []abci.EventAttribute{{Key: "name", Value: "John"}}, }, } - require.NoError(t, s.PublishWithEvents(ctx, "Tombstone", events)) - sub.mustReceive(ctx, "Tombstone") + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Tombstone"), events)) + sub.mustReceive(ctx, pubstring("Tombstone")) } diff --git a/internal/pubsub/pubsub.go b/internal/pubsub/pubsub.go index 53c631de4..5f6a1ee3b 100644 --- a/internal/pubsub/pubsub.go +++ b/internal/pubsub/pubsub.go @@ -40,10 +40,11 @@ import ( "fmt" "sync" - "github.com/tendermint/tendermint/abci/types" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/pubsub/query" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" + "github.com/tendermint/tendermint/types" ) var ( @@ -304,14 +305,14 @@ func (s *Server) NumClientSubscriptions(clientID string) int { // Publish publishes the given message. An error will be returned to the caller // if the context is canceled. -func (s *Server) Publish(ctx context.Context, msg interface{}) error { - return s.publish(ctx, msg, []types.Event{}) +func (s *Server) Publish(ctx context.Context, msg types.EventData) error { + return s.publish(ctx, msg, []abci.Event{}) } // PublishWithEvents publishes the given message with the set of events. The set // is matched with clients queries. If there is a match, the message is sent to // the client. -func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events []types.Event) error { +func (s *Server) PublishWithEvents(ctx context.Context, msg types.EventData, events []abci.Event) error { return s.publish(ctx, msg, events) } @@ -328,7 +329,7 @@ func (s *Server) OnStart(ctx context.Context) error { s.run(ctx); return nil } // OnReset implements Service.OnReset. It has no effect for this service. func (s *Server) OnReset() error { return nil } -func (s *Server) publish(ctx context.Context, data interface{}, events []types.Event) error { +func (s *Server) publish(ctx context.Context, data types.EventData, events []abci.Event) error { s.pubs.RLock() defer s.pubs.RUnlock() @@ -391,7 +392,7 @@ func (s *Server) removeSubs(evict subInfoSet, reason error) { // send delivers the given message to all matching subscribers. An error in // query matching stops transmission and is returned. -func (s *Server) send(data interface{}, events []types.Event) error { +func (s *Server) send(data types.EventData, events []abci.Event) error { // At exit, evict any subscriptions that were too slow. evict := make(subInfoSet) defer func() { diff --git a/internal/pubsub/pubsub_test.go b/internal/pubsub/pubsub_test.go index c11ba4cf5..9ba515d70 100644 --- a/internal/pubsub/pubsub_test.go +++ b/internal/pubsub/pubsub_test.go @@ -12,12 +12,19 @@ import ( "github.com/tendermint/tendermint/internal/pubsub" "github.com/tendermint/tendermint/internal/pubsub/query" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/types" ) const ( clientID = "test-client" ) +// pubstring is a trivial implementation of the EventData interface for +// string-valued test data. +type pubstring string + +func (pubstring) TypeTag() string { return "pubstring" } + func TestSubscribeWithArgs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -34,8 +41,8 @@ func TestSubscribeWithArgs(t *testing.T) { require.Equal(t, 1, s.NumClients()) require.Equal(t, 1, s.NumClientSubscriptions(clientID)) - require.NoError(t, s.Publish(ctx, "Ka-Zar")) - sub.mustReceive(ctx, "Ka-Zar") + require.NoError(t, s.Publish(ctx, pubstring("Ka-Zar"))) + sub.mustReceive(ctx, pubstring("Ka-Zar")) }) t.Run("PositiveLimit", func(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ @@ -43,8 +50,8 @@ func TestSubscribeWithArgs(t *testing.T) { Query: query.All, Limit: 10, })) - require.NoError(t, s.Publish(ctx, "Aggamon")) - sub.mustReceive(ctx, "Aggamon") + require.NoError(t, s.Publish(ctx, pubstring("Aggamon"))) + sub.mustReceive(ctx, pubstring("Aggamon")) }) } @@ -63,7 +70,7 @@ func TestObserver(t *testing.T) { return nil })) - const input = "Lions and tigers and bears, oh my!" + const input = pubstring("Lions and tigers and bears, oh my!") require.NoError(t, s.Publish(ctx, input)) <-done require.Equal(t, got, input) @@ -98,14 +105,14 @@ func TestPublishDoesNotBlock(t *testing.T) { go func() { defer close(published) - require.NoError(t, s.Publish(ctx, "Quicksilver")) - require.NoError(t, s.Publish(ctx, "Asylum")) - require.NoError(t, s.Publish(ctx, "Ivan")) + require.NoError(t, s.Publish(ctx, pubstring("Quicksilver"))) + require.NoError(t, s.Publish(ctx, pubstring("Asylum"))) + require.NoError(t, s.Publish(ctx, pubstring("Ivan"))) }() select { case <-published: - sub.mustReceive(ctx, "Quicksilver") + sub.mustReceive(ctx, pubstring("Quicksilver")) sub.mustFail(ctx, pubsub.ErrTerminated) case <-time.After(3 * time.Second): t.Fatal("Publishing should not have blocked") @@ -141,13 +148,13 @@ func TestSlowSubscriber(t *testing.T) { Query: query.All, })) - require.NoError(t, s.Publish(ctx, "Fat Cobra")) - require.NoError(t, s.Publish(ctx, "Viper")) - require.NoError(t, s.Publish(ctx, "Black Panther")) + require.NoError(t, s.Publish(ctx, pubstring("Fat Cobra"))) + require.NoError(t, s.Publish(ctx, pubstring("Viper"))) + require.NoError(t, s.Publish(ctx, pubstring("Black Panther"))) // We had capacity for one item, so we should get that item, but after that // the subscription should have been terminated by the publisher. - sub.mustReceive(ctx, "Fat Cobra") + sub.mustReceive(ctx, pubstring("Fat Cobra")) sub.mustFail(ctx, pubsub.ErrTerminated) } @@ -168,8 +175,8 @@ func TestDifferentClients(t *testing.T) { Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}}, }} - require.NoError(t, s.PublishWithEvents(ctx, "Iceman", events)) - sub1.mustReceive(ctx, "Iceman") + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Iceman"), events)) + sub1.mustReceive(ctx, pubstring("Iceman")) sub2 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: "client-2", @@ -187,9 +194,9 @@ func TestDifferentClients(t *testing.T) { }, } - require.NoError(t, s.PublishWithEvents(ctx, "Ultimo", events)) - sub1.mustReceive(ctx, "Ultimo") - sub2.mustReceive(ctx, "Ultimo") + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Ultimo"), events)) + sub1.mustReceive(ctx, pubstring("Ultimo")) + sub2.mustReceive(ctx, pubstring("Ultimo")) sub3 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: "client-3", @@ -202,7 +209,7 @@ func TestDifferentClients(t *testing.T) { Attributes: []abci.EventAttribute{{Key: "type", Value: "NewRoundStep"}}, }} - require.NoError(t, s.PublishWithEvents(ctx, "Valeria Richards", events)) + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Valeria Richards"), events)) sub3.mustTimeOut(ctx, 100*time.Millisecond) } @@ -215,11 +222,11 @@ func TestSubscribeDuplicateKeys(t *testing.T) { testCases := []struct { query string - expected interface{} + expected types.EventData }{ - {`withdraw.rewards='17'`, "Iceman"}, - {`withdraw.rewards='22'`, "Iceman"}, - {`withdraw.rewards='1' AND withdraw.rewards='22'`, "Iceman"}, + {`withdraw.rewards='17'`, pubstring("Iceman")}, + {`withdraw.rewards='22'`, pubstring("Iceman")}, + {`withdraw.rewards='1' AND withdraw.rewards='22'`, pubstring("Iceman")}, {`withdraw.rewards='100'`, nil}, } @@ -251,7 +258,7 @@ func TestSubscribeDuplicateKeys(t *testing.T) { }, } - require.NoError(t, s.PublishWithEvents(ctx, "Iceman", events)) + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Iceman"), events)) if tc.expected != nil { sub.mustReceive(ctx, tc.expected) @@ -280,8 +287,8 @@ func TestClientSubscribesTwice(t *testing.T) { Query: q, })) - require.NoError(t, s.PublishWithEvents(ctx, "Goblin Queen", events)) - sub1.mustReceive(ctx, "Goblin Queen") + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Goblin Queen"), events)) + sub1.mustReceive(ctx, pubstring("Goblin Queen")) // Subscribing a second time with the same client ID and query fails. { @@ -294,8 +301,8 @@ func TestClientSubscribesTwice(t *testing.T) { } // The attempt to re-subscribe does not disrupt the existing sub. - require.NoError(t, s.PublishWithEvents(ctx, "Spider-Man", events)) - sub1.mustReceive(ctx, "Spider-Man") + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Spider-Man"), events)) + sub1.mustReceive(ctx, pubstring("Spider-Man")) } func TestUnsubscribe(t *testing.T) { @@ -317,7 +324,7 @@ func TestUnsubscribe(t *testing.T) { })) // Publishing should still work. - require.NoError(t, s.Publish(ctx, "Nick Fury")) + require.NoError(t, s.Publish(ctx, pubstring("Nick Fury"))) // The unsubscribed subscriber should report as such. sub.mustFail(ctx, pubsub.ErrUnsubscribed) @@ -365,8 +372,8 @@ func TestResubscribe(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, args)) - require.NoError(t, s.Publish(ctx, "Cable")) - sub.mustReceive(ctx, "Cable") + require.NoError(t, s.Publish(ctx, pubstring("Cable"))) + sub.mustReceive(ctx, pubstring("Cable")) } func TestUnsubscribeAll(t *testing.T) { @@ -386,7 +393,7 @@ func TestUnsubscribeAll(t *testing.T) { })) require.NoError(t, s.UnsubscribeAll(ctx, clientID)) - require.NoError(t, s.Publish(ctx, "Nick Fury")) + require.NoError(t, s.Publish(ctx, pubstring("Nick Fury"))) sub1.mustFail(ctx, pubsub.ErrUnsubscribed) sub2.mustFail(ctx, pubsub.ErrUnsubscribed) @@ -402,13 +409,13 @@ func TestBufferCapacity(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - require.NoError(t, s.Publish(ctx, "Nighthawk")) - require.NoError(t, s.Publish(ctx, "Sage")) + require.NoError(t, s.Publish(ctx, pubstring("Nighthawk"))) + require.NoError(t, s.Publish(ctx, pubstring("Sage"))) ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond) defer cancel() - require.ErrorIs(t, s.Publish(ctx, "Ironclad"), context.DeadlineExceeded) + require.ErrorIs(t, s.Publish(ctx, pubstring("Ironclad")), context.DeadlineExceeded) } func newTestServer(ctx context.Context, t testing.TB, logger log.Logger) *pubsub.Server { @@ -436,7 +443,7 @@ func (s *testSub) must(sub *pubsub.Subscription, err error) *testSub { return s } -func (s *testSub) mustReceive(ctx context.Context, want interface{}) { +func (s *testSub) mustReceive(ctx context.Context, want types.EventData) { s.t.Helper() got, err := s.Next(ctx) require.NoError(s.t, err) diff --git a/internal/pubsub/subindex.go b/internal/pubsub/subindex.go index c9bb1ae0e..eadb193af 100644 --- a/internal/pubsub/subindex.go +++ b/internal/pubsub/subindex.go @@ -1,14 +1,15 @@ package pubsub import ( - "github.com/tendermint/tendermint/abci/types" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/pubsub/query" + "github.com/tendermint/tendermint/types" ) // An item to be published to subscribers. type item struct { - Data interface{} - Events []types.Event + Data types.EventData + Events []abci.Event } // A subInfo value records a single subscription. diff --git a/internal/pubsub/subscription.go b/internal/pubsub/subscription.go index 6e8c6fd07..933e62e1c 100644 --- a/internal/pubsub/subscription.go +++ b/internal/pubsub/subscription.go @@ -5,8 +5,9 @@ import ( "errors" "github.com/google/uuid" - "github.com/tendermint/tendermint/abci/types" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/libs/queue" + "github.com/tendermint/tendermint/types" ) var ( @@ -73,8 +74,8 @@ func (s *Subscription) stop(err error) { // Message glues data and events together. type Message struct { subID string - data interface{} - events []types.Event + data types.EventData + events []abci.Event } // SubscriptionID returns the unique identifier for the subscription @@ -82,7 +83,7 @@ type Message struct { func (msg Message) SubscriptionID() string { return msg.subID } // Data returns an original data published. -func (msg Message) Data() interface{} { return msg.data } +func (msg Message) Data() types.EventData { return msg.data } // Events returns events, which matched the client's query. -func (msg Message) Events() []types.Event { return msg.events } +func (msg Message) Events() []abci.Event { return msg.events } diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 470d8d548..2919345e0 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -59,7 +59,7 @@ func WaitForHeight(ctx context.Context, c StatusClient, h int64, waiter Waiter) // when the timeout duration has expired. // // This handles subscribing and unsubscribing under the hood -func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) (types.TMEventData, error) { +func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) (types.EventData, error) { const subscriber = "helpers" ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() diff --git a/rpc/coretypes/responses.go b/rpc/coretypes/responses.go index 2a1cf45b0..39a2755e9 100644 --- a/rpc/coretypes/responses.go +++ b/rpc/coretypes/responses.go @@ -3,7 +3,6 @@ package coretypes import ( "encoding/json" "errors" - "fmt" "time" abci "github.com/tendermint/tendermint/abci/types" @@ -307,7 +306,7 @@ type ( type ResultEvent struct { SubscriptionID string Query string - Data types.TMEventData + Data types.EventData Events []abci.Event } @@ -319,11 +318,7 @@ type resultEventJSON struct { } func (r ResultEvent) MarshalJSON() ([]byte, error) { - data, ok := r.Data.(jsontypes.Tagged) - if !ok { - return nil, fmt.Errorf("type %T is not tagged", r.Data) - } - evt, err := jsontypes.Marshal(data) + evt, err := jsontypes.Marshal(r.Data) if err != nil { return nil, err } diff --git a/types/events.go b/types/events.go index 68139fec6..6edf30420 100644 --- a/types/events.go +++ b/types/events.go @@ -88,8 +88,10 @@ var ( // ENCODING / DECODING -// TMEventData implements events.EventData. -type TMEventData interface{} +// EventData is satisfied by types that can be published as event data. +type EventData interface { + jsontypes.Tagged +} func init() { jsontypes.MustRegister(EventDataBlockSyncStatus{})