diff --git a/internal/eventbus/event_bus.go b/internal/eventbus/event_bus.go index f93b231d2..add10da0a 100644 --- a/internal/eventbus/event_bus.go +++ b/internal/eventbus/event_bus.go @@ -73,7 +73,7 @@ func (b *EventBus) Observe(ctx context.Context, observe func(tmpubsub.Message) e return b.pubsub.Observe(ctx, observe, queries...) } -func (b *EventBus) Publish(ctx context.Context, eventValue string, eventData types.TMEventData) error { +func (b *EventBus) Publish(ctx context.Context, eventValue string, eventData types.EventData) error { tokens := strings.Split(types.EventTypeKey, ".") event := abci.Event{ Type: tokens[0], diff --git a/internal/pubsub/example_test.go b/internal/pubsub/example_test.go index 39b179981..5eea61eb8 100644 --- a/internal/pubsub/example_test.go +++ b/internal/pubsub/example_test.go @@ -29,6 +29,6 @@ func TestExample(t *testing.T) { Attributes: []abci.EventAttribute{{Key: "name", Value: "John"}}, }, } - require.NoError(t, s.PublishWithEvents(ctx, "Tombstone", events)) - sub.mustReceive(ctx, "Tombstone") + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Tombstone"), events)) + sub.mustReceive(ctx, pubstring("Tombstone")) } diff --git a/internal/pubsub/pubsub.go b/internal/pubsub/pubsub.go index 53c631de4..5f6a1ee3b 100644 --- a/internal/pubsub/pubsub.go +++ b/internal/pubsub/pubsub.go @@ -40,10 +40,11 @@ import ( "fmt" "sync" - "github.com/tendermint/tendermint/abci/types" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/pubsub/query" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" + "github.com/tendermint/tendermint/types" ) var ( @@ -304,14 +305,14 @@ func (s *Server) NumClientSubscriptions(clientID string) int { // Publish publishes the given message. An error will be returned to the caller // if the context is canceled. -func (s *Server) Publish(ctx context.Context, msg interface{}) error { - return s.publish(ctx, msg, []types.Event{}) +func (s *Server) Publish(ctx context.Context, msg types.EventData) error { + return s.publish(ctx, msg, []abci.Event{}) } // PublishWithEvents publishes the given message with the set of events. The set // is matched with clients queries. If there is a match, the message is sent to // the client. -func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events []types.Event) error { +func (s *Server) PublishWithEvents(ctx context.Context, msg types.EventData, events []abci.Event) error { return s.publish(ctx, msg, events) } @@ -328,7 +329,7 @@ func (s *Server) OnStart(ctx context.Context) error { s.run(ctx); return nil } // OnReset implements Service.OnReset. It has no effect for this service. func (s *Server) OnReset() error { return nil } -func (s *Server) publish(ctx context.Context, data interface{}, events []types.Event) error { +func (s *Server) publish(ctx context.Context, data types.EventData, events []abci.Event) error { s.pubs.RLock() defer s.pubs.RUnlock() @@ -391,7 +392,7 @@ func (s *Server) removeSubs(evict subInfoSet, reason error) { // send delivers the given message to all matching subscribers. An error in // query matching stops transmission and is returned. -func (s *Server) send(data interface{}, events []types.Event) error { +func (s *Server) send(data types.EventData, events []abci.Event) error { // At exit, evict any subscriptions that were too slow. evict := make(subInfoSet) defer func() { diff --git a/internal/pubsub/pubsub_test.go b/internal/pubsub/pubsub_test.go index c11ba4cf5..9ba515d70 100644 --- a/internal/pubsub/pubsub_test.go +++ b/internal/pubsub/pubsub_test.go @@ -12,12 +12,19 @@ import ( "github.com/tendermint/tendermint/internal/pubsub" "github.com/tendermint/tendermint/internal/pubsub/query" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/types" ) const ( clientID = "test-client" ) +// pubstring is a trivial implementation of the EventData interface for +// string-valued test data. +type pubstring string + +func (pubstring) TypeTag() string { return "pubstring" } + func TestSubscribeWithArgs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -34,8 +41,8 @@ func TestSubscribeWithArgs(t *testing.T) { require.Equal(t, 1, s.NumClients()) require.Equal(t, 1, s.NumClientSubscriptions(clientID)) - require.NoError(t, s.Publish(ctx, "Ka-Zar")) - sub.mustReceive(ctx, "Ka-Zar") + require.NoError(t, s.Publish(ctx, pubstring("Ka-Zar"))) + sub.mustReceive(ctx, pubstring("Ka-Zar")) }) t.Run("PositiveLimit", func(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ @@ -43,8 +50,8 @@ func TestSubscribeWithArgs(t *testing.T) { Query: query.All, Limit: 10, })) - require.NoError(t, s.Publish(ctx, "Aggamon")) - sub.mustReceive(ctx, "Aggamon") + require.NoError(t, s.Publish(ctx, pubstring("Aggamon"))) + sub.mustReceive(ctx, pubstring("Aggamon")) }) } @@ -63,7 +70,7 @@ func TestObserver(t *testing.T) { return nil })) - const input = "Lions and tigers and bears, oh my!" + const input = pubstring("Lions and tigers and bears, oh my!") require.NoError(t, s.Publish(ctx, input)) <-done require.Equal(t, got, input) @@ -98,14 +105,14 @@ func TestPublishDoesNotBlock(t *testing.T) { go func() { defer close(published) - require.NoError(t, s.Publish(ctx, "Quicksilver")) - require.NoError(t, s.Publish(ctx, "Asylum")) - require.NoError(t, s.Publish(ctx, "Ivan")) + require.NoError(t, s.Publish(ctx, pubstring("Quicksilver"))) + require.NoError(t, s.Publish(ctx, pubstring("Asylum"))) + require.NoError(t, s.Publish(ctx, pubstring("Ivan"))) }() select { case <-published: - sub.mustReceive(ctx, "Quicksilver") + sub.mustReceive(ctx, pubstring("Quicksilver")) sub.mustFail(ctx, pubsub.ErrTerminated) case <-time.After(3 * time.Second): t.Fatal("Publishing should not have blocked") @@ -141,13 +148,13 @@ func TestSlowSubscriber(t *testing.T) { Query: query.All, })) - require.NoError(t, s.Publish(ctx, "Fat Cobra")) - require.NoError(t, s.Publish(ctx, "Viper")) - require.NoError(t, s.Publish(ctx, "Black Panther")) + require.NoError(t, s.Publish(ctx, pubstring("Fat Cobra"))) + require.NoError(t, s.Publish(ctx, pubstring("Viper"))) + require.NoError(t, s.Publish(ctx, pubstring("Black Panther"))) // We had capacity for one item, so we should get that item, but after that // the subscription should have been terminated by the publisher. - sub.mustReceive(ctx, "Fat Cobra") + sub.mustReceive(ctx, pubstring("Fat Cobra")) sub.mustFail(ctx, pubsub.ErrTerminated) } @@ -168,8 +175,8 @@ func TestDifferentClients(t *testing.T) { Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}}, }} - require.NoError(t, s.PublishWithEvents(ctx, "Iceman", events)) - sub1.mustReceive(ctx, "Iceman") + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Iceman"), events)) + sub1.mustReceive(ctx, pubstring("Iceman")) sub2 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: "client-2", @@ -187,9 +194,9 @@ func TestDifferentClients(t *testing.T) { }, } - require.NoError(t, s.PublishWithEvents(ctx, "Ultimo", events)) - sub1.mustReceive(ctx, "Ultimo") - sub2.mustReceive(ctx, "Ultimo") + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Ultimo"), events)) + sub1.mustReceive(ctx, pubstring("Ultimo")) + sub2.mustReceive(ctx, pubstring("Ultimo")) sub3 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: "client-3", @@ -202,7 +209,7 @@ func TestDifferentClients(t *testing.T) { Attributes: []abci.EventAttribute{{Key: "type", Value: "NewRoundStep"}}, }} - require.NoError(t, s.PublishWithEvents(ctx, "Valeria Richards", events)) + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Valeria Richards"), events)) sub3.mustTimeOut(ctx, 100*time.Millisecond) } @@ -215,11 +222,11 @@ func TestSubscribeDuplicateKeys(t *testing.T) { testCases := []struct { query string - expected interface{} + expected types.EventData }{ - {`withdraw.rewards='17'`, "Iceman"}, - {`withdraw.rewards='22'`, "Iceman"}, - {`withdraw.rewards='1' AND withdraw.rewards='22'`, "Iceman"}, + {`withdraw.rewards='17'`, pubstring("Iceman")}, + {`withdraw.rewards='22'`, pubstring("Iceman")}, + {`withdraw.rewards='1' AND withdraw.rewards='22'`, pubstring("Iceman")}, {`withdraw.rewards='100'`, nil}, } @@ -251,7 +258,7 @@ func TestSubscribeDuplicateKeys(t *testing.T) { }, } - require.NoError(t, s.PublishWithEvents(ctx, "Iceman", events)) + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Iceman"), events)) if tc.expected != nil { sub.mustReceive(ctx, tc.expected) @@ -280,8 +287,8 @@ func TestClientSubscribesTwice(t *testing.T) { Query: q, })) - require.NoError(t, s.PublishWithEvents(ctx, "Goblin Queen", events)) - sub1.mustReceive(ctx, "Goblin Queen") + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Goblin Queen"), events)) + sub1.mustReceive(ctx, pubstring("Goblin Queen")) // Subscribing a second time with the same client ID and query fails. { @@ -294,8 +301,8 @@ func TestClientSubscribesTwice(t *testing.T) { } // The attempt to re-subscribe does not disrupt the existing sub. - require.NoError(t, s.PublishWithEvents(ctx, "Spider-Man", events)) - sub1.mustReceive(ctx, "Spider-Man") + require.NoError(t, s.PublishWithEvents(ctx, pubstring("Spider-Man"), events)) + sub1.mustReceive(ctx, pubstring("Spider-Man")) } func TestUnsubscribe(t *testing.T) { @@ -317,7 +324,7 @@ func TestUnsubscribe(t *testing.T) { })) // Publishing should still work. - require.NoError(t, s.Publish(ctx, "Nick Fury")) + require.NoError(t, s.Publish(ctx, pubstring("Nick Fury"))) // The unsubscribed subscriber should report as such. sub.mustFail(ctx, pubsub.ErrUnsubscribed) @@ -365,8 +372,8 @@ func TestResubscribe(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, args)) - require.NoError(t, s.Publish(ctx, "Cable")) - sub.mustReceive(ctx, "Cable") + require.NoError(t, s.Publish(ctx, pubstring("Cable"))) + sub.mustReceive(ctx, pubstring("Cable")) } func TestUnsubscribeAll(t *testing.T) { @@ -386,7 +393,7 @@ func TestUnsubscribeAll(t *testing.T) { })) require.NoError(t, s.UnsubscribeAll(ctx, clientID)) - require.NoError(t, s.Publish(ctx, "Nick Fury")) + require.NoError(t, s.Publish(ctx, pubstring("Nick Fury"))) sub1.mustFail(ctx, pubsub.ErrUnsubscribed) sub2.mustFail(ctx, pubsub.ErrUnsubscribed) @@ -402,13 +409,13 @@ func TestBufferCapacity(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - require.NoError(t, s.Publish(ctx, "Nighthawk")) - require.NoError(t, s.Publish(ctx, "Sage")) + require.NoError(t, s.Publish(ctx, pubstring("Nighthawk"))) + require.NoError(t, s.Publish(ctx, pubstring("Sage"))) ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond) defer cancel() - require.ErrorIs(t, s.Publish(ctx, "Ironclad"), context.DeadlineExceeded) + require.ErrorIs(t, s.Publish(ctx, pubstring("Ironclad")), context.DeadlineExceeded) } func newTestServer(ctx context.Context, t testing.TB, logger log.Logger) *pubsub.Server { @@ -436,7 +443,7 @@ func (s *testSub) must(sub *pubsub.Subscription, err error) *testSub { return s } -func (s *testSub) mustReceive(ctx context.Context, want interface{}) { +func (s *testSub) mustReceive(ctx context.Context, want types.EventData) { s.t.Helper() got, err := s.Next(ctx) require.NoError(s.t, err) diff --git a/internal/pubsub/subindex.go b/internal/pubsub/subindex.go index c9bb1ae0e..eadb193af 100644 --- a/internal/pubsub/subindex.go +++ b/internal/pubsub/subindex.go @@ -1,14 +1,15 @@ package pubsub import ( - "github.com/tendermint/tendermint/abci/types" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/pubsub/query" + "github.com/tendermint/tendermint/types" ) // An item to be published to subscribers. type item struct { - Data interface{} - Events []types.Event + Data types.EventData + Events []abci.Event } // A subInfo value records a single subscription. diff --git a/internal/pubsub/subscription.go b/internal/pubsub/subscription.go index 6e8c6fd07..933e62e1c 100644 --- a/internal/pubsub/subscription.go +++ b/internal/pubsub/subscription.go @@ -5,8 +5,9 @@ import ( "errors" "github.com/google/uuid" - "github.com/tendermint/tendermint/abci/types" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/libs/queue" + "github.com/tendermint/tendermint/types" ) var ( @@ -73,8 +74,8 @@ func (s *Subscription) stop(err error) { // Message glues data and events together. type Message struct { subID string - data interface{} - events []types.Event + data types.EventData + events []abci.Event } // SubscriptionID returns the unique identifier for the subscription @@ -82,7 +83,7 @@ type Message struct { func (msg Message) SubscriptionID() string { return msg.subID } // Data returns an original data published. -func (msg Message) Data() interface{} { return msg.data } +func (msg Message) Data() types.EventData { return msg.data } // Events returns events, which matched the client's query. -func (msg Message) Events() []types.Event { return msg.events } +func (msg Message) Events() []abci.Event { return msg.events } diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 470d8d548..2919345e0 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -59,7 +59,7 @@ func WaitForHeight(ctx context.Context, c StatusClient, h int64, waiter Waiter) // when the timeout duration has expired. // // This handles subscribing and unsubscribing under the hood -func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) (types.TMEventData, error) { +func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) (types.EventData, error) { const subscriber = "helpers" ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() diff --git a/rpc/coretypes/responses.go b/rpc/coretypes/responses.go index 2a1cf45b0..39a2755e9 100644 --- a/rpc/coretypes/responses.go +++ b/rpc/coretypes/responses.go @@ -3,7 +3,6 @@ package coretypes import ( "encoding/json" "errors" - "fmt" "time" abci "github.com/tendermint/tendermint/abci/types" @@ -307,7 +306,7 @@ type ( type ResultEvent struct { SubscriptionID string Query string - Data types.TMEventData + Data types.EventData Events []abci.Event } @@ -319,11 +318,7 @@ type resultEventJSON struct { } func (r ResultEvent) MarshalJSON() ([]byte, error) { - data, ok := r.Data.(jsontypes.Tagged) - if !ok { - return nil, fmt.Errorf("type %T is not tagged", r.Data) - } - evt, err := jsontypes.Marshal(data) + evt, err := jsontypes.Marshal(r.Data) if err != nil { return nil, err } diff --git a/types/events.go b/types/events.go index 68139fec6..6edf30420 100644 --- a/types/events.go +++ b/types/events.go @@ -88,8 +88,10 @@ var ( // ENCODING / DECODING -// TMEventData implements events.EventData. -type TMEventData interface{} +// EventData is satisfied by types that can be published as event data. +type EventData interface { + jsontypes.Tagged +} func init() { jsontypes.MustRegister(EventDataBlockSyncStatus{})