From 886519e3cafd8fc976b883fa7ab7292fa5758f37 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 15 Jun 2021 13:33:47 -0400 Subject: [PATCH] rpc: add subscription id to events (#6386) Addresses #3931 --- go.mod | 1 + go.sum | 2 + internal/consensus/replay_file.go | 7 +- libs/pubsub/pubsub.go | 129 ++++++++++++++++++++++++------ libs/pubsub/pubsub_test.go | 14 +++- libs/pubsub/subscription.go | 41 +++++++--- rpc/client/http/ws.go | 38 ++++++--- rpc/client/local/local.go | 21 ++++- rpc/core/events.go | 16 ++-- rpc/core/mempool.go | 4 +- rpc/core/types/responses.go | 7 +- types/event_bus.go | 9 ++- 12 files changed, 220 insertions(+), 69 deletions(-) diff --git a/go.mod b/go.mod index 6e7f26605..60bbffebd 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.2 github.com/google/orderedcode v0.0.1 + github.com/google/uuid v1.2.0 github.com/gorilla/websocket v1.4.2 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 diff --git a/go.sum b/go.sum index 9c1fb0e19..5eb80d021 100644 --- a/go.sum +++ b/go.sum @@ -218,6 +218,8 @@ github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OI github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index ed4007048..51cb090d7 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -15,6 +15,7 @@ import ( cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" tmos "github.com/tendermint/tendermint/libs/os" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/store" @@ -58,7 +59,8 @@ func (cs *State) ReplayFile(file string, console bool) error { return fmt.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep) } defer func() { - if err := cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep); err != nil { + args := tmpubsub.UnsubscribeArgs{Subscriber: subscriber, Query: types.EventQueryNewRoundStep} + if err := cs.eventBus.Unsubscribe(ctx, args); err != nil { cs.Logger.Error("Error unsubscribing to event bus", "err", err) } }() @@ -225,7 +227,8 @@ func (pb *playback) replayConsoleLoop() int { tmos.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)) } defer func() { - if err := pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep); err != nil { + args := tmpubsub.UnsubscribeArgs{Subscriber: subscriber, Query: types.EventQueryNewRoundStep} + if err := pb.cs.eventBus.Unsubscribe(ctx, args); err != nil { pb.cs.Logger.Error("Error unsubscribing from eventBus", "err", err) } }() diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index aa030f07d..7b08319bb 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -40,6 +40,7 @@ import ( "fmt" tmsync "github.com/tendermint/tendermint/internal/libs/sync" + "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/libs/service" ) @@ -73,6 +74,24 @@ type Query interface { String() string } +type UnsubscribeArgs struct { + ID string + Subscriber string + Query Query +} + +func (args UnsubscribeArgs) Validate() error { + if args.Subscriber == "" { + return errors.New("must specify a subscriber") + } + + if args.ID == "" && args.Query == nil { + return fmt.Errorf("subscription is not fully defined [subscriber=%q]", args.Subscriber) + } + + return nil +} + type cmd struct { op operation @@ -96,8 +115,12 @@ type Server struct { // check if we have subscription before // subscribing or unsubscribing - mtx tmsync.RWMutex - subscriptions map[string]map[string]struct{} // subscriber -> query (string) -> empty struct + mtx tmsync.RWMutex + + // subscriber -> [query->id (string) OR id->query (string))], + // track connections both by ID (new) and query (legacy) to + // avoid breaking the interface. + subscriptions map[string]map[string]string } // Option sets a parameter for the server. @@ -108,7 +131,7 @@ type Option func(*Server) // provided, the resulting server's queue is unbuffered. func NewServer(options ...Option) *Server { s := &Server{ - subscriptions: make(map[string]map[string]struct{}), + subscriptions: make(map[string]map[string]string), } s.BaseService = *service.NewBaseService(nil, "PubSub", s) @@ -186,9 +209,10 @@ func (s *Server) subscribe(ctx context.Context, clientID string, query Query, ou case s.cmds <- cmd{op: sub, clientID: clientID, query: query, subscription: subscription}: s.mtx.Lock() if _, ok = s.subscriptions[clientID]; !ok { - s.subscriptions[clientID] = make(map[string]struct{}) + s.subscriptions[clientID] = make(map[string]string) } - s.subscriptions[clientID][query.String()] = struct{}{} + s.subscriptions[clientID][query.String()] = subscription.id + s.subscriptions[clientID][subscription.id] = query.String() s.mtx.Unlock() return subscription, nil case <-ctx.Done(): @@ -201,23 +225,45 @@ func (s *Server) subscribe(ctx context.Context, clientID string, query Query, ou // Unsubscribe removes the subscription on the given query. An error will be // returned to the caller if the context is canceled or if subscription does // not exist. -func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error { +func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error { + if err := args.Validate(); err != nil { + return err + } + var qs string + if args.Query != nil { + qs = args.Query.String() + } + s.mtx.RLock() - clientSubscriptions, ok := s.subscriptions[clientID] - if ok { - _, ok = clientSubscriptions[query.String()] + clientSubscriptions, ok := s.subscriptions[args.Subscriber] + if args.ID != "" { + qs, ok = clientSubscriptions[args.ID] + + if ok && args.Query == nil { + var err error + args.Query, err = query.New(qs) + if err != nil { + return err + } + } + } else if qs != "" { + args.ID, ok = clientSubscriptions[qs] } + s.mtx.RUnlock() if !ok { return ErrSubscriptionNotFound } select { - case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}: + case s.cmds <- cmd{op: unsub, clientID: args.Subscriber, query: args.Query, subscription: &Subscription{id: args.ID}}: s.mtx.Lock() - delete(clientSubscriptions, query.String()) + + delete(clientSubscriptions, args.ID) + delete(clientSubscriptions, qs) + if len(clientSubscriptions) == 0 { - delete(s.subscriptions, clientID) + delete(s.subscriptions, args.Subscriber) } s.mtx.Unlock() return nil @@ -262,7 +308,7 @@ func (s *Server) NumClients() int { func (s *Server) NumClientSubscriptions(clientID string) int { s.mtx.RLock() defer s.mtx.RUnlock() - return len(s.subscriptions[clientID]) + return len(s.subscriptions[clientID]) / 2 } // Publish publishes the given message. An error will be returned to the caller @@ -325,7 +371,7 @@ loop: switch cmd.op { case unsub: if cmd.query != nil { - state.remove(cmd.clientID, cmd.query.String(), ErrUnsubscribed) + state.remove(cmd.clientID, cmd.query.String(), cmd.subscription.id, ErrUnsubscribed) } else { state.removeClient(cmd.clientID, ErrUnsubscribed) } @@ -349,8 +395,14 @@ func (state *state) add(clientID string, q Query, subscription *Subscription) { if _, ok := state.subscriptions[qStr]; !ok { state.subscriptions[qStr] = make(map[string]*Subscription) } + + if _, ok := state.subscriptions[subscription.id]; !ok { + state.subscriptions[subscription.id] = make(map[string]*Subscription) + } + // create subscription state.subscriptions[qStr][clientID] = subscription + state.subscriptions[subscription.id][clientID] = subscription // initialize query if needed if _, ok := state.queries[qStr]; !ok { @@ -360,7 +412,7 @@ func (state *state) add(clientID string, q Query, subscription *Subscription) { state.queries[qStr].refCount++ } -func (state *state) remove(clientID string, qStr string, reason error) { +func (state *state) remove(clientID string, qStr, id string, reason error) { clientSubscriptions, ok := state.subscriptions[qStr] if !ok { return @@ -376,37 +428,62 @@ func (state *state) remove(clientID string, qStr string, reason error) { // remove client from query map. // if query has no other clients subscribed, remove it. delete(state.subscriptions[qStr], clientID) + delete(state.subscriptions[id], clientID) if len(state.subscriptions[qStr]) == 0 { delete(state.subscriptions, qStr) } // decrease ref counter in queries - state.queries[qStr].refCount-- - // remove the query if nobody else is using it - if state.queries[qStr].refCount == 0 { - delete(state.queries, qStr) + if ref, ok := state.queries[qStr]; ok { + ref.refCount-- + if ref.refCount == 0 { + // remove the query if nobody else is using it + delete(state.queries, qStr) + } } } func (state *state) removeClient(clientID string, reason error) { + seen := map[string]struct{}{} for qStr, clientSubscriptions := range state.subscriptions { - if _, ok := clientSubscriptions[clientID]; ok { - state.remove(clientID, qStr, reason) + if sub, ok := clientSubscriptions[clientID]; ok { + if _, ok = seen[sub.id]; ok { + // all subscriptions are double indexed by ID and query, only + // process them once. + continue + } + state.remove(clientID, qStr, sub.id, reason) + seen[sub.id] = struct{}{} } } } func (state *state) removeAll(reason error) { for qStr, clientSubscriptions := range state.subscriptions { + sub, ok := clientSubscriptions[qStr] + if !ok || ok && sub.id == qStr { + // all subscriptions are double indexed by ID and query, only + // process them once. + continue + } + for clientID := range clientSubscriptions { - state.remove(clientID, qStr, reason) + state.remove(clientID, qStr, sub.id, reason) } } } func (state *state) send(msg interface{}, events map[string][]string) error { for qStr, clientSubscriptions := range state.subscriptions { - q := state.queries[qStr].q + if sub, ok := clientSubscriptions[qStr]; ok && sub.id == qStr { + continue + } + var q Query + if qi, ok := state.queries[qStr]; ok { + q = qi.q + } else { + continue + } match, err := q.Matches(events) if err != nil { @@ -417,13 +494,13 @@ func (state *state) send(msg interface{}, events map[string][]string) error { for clientID, subscription := range clientSubscriptions { if cap(subscription.out) == 0 { // block on unbuffered channel - subscription.out <- NewMessage(msg, events) + subscription.out <- NewMessage(subscription.id, msg, events) } else { // don't block on buffered channels select { - case subscription.out <- NewMessage(msg, events): + case subscription.out <- NewMessage(subscription.id, msg, events): default: - state.remove(clientID, qStr, ErrOutOfCapacity) + state.remove(clientID, qStr, subscription.id, ErrOutOfCapacity) } } } diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go index 3d5fb21e2..88c52cf25 100644 --- a/libs/pubsub/pubsub_test.go +++ b/libs/pubsub/pubsub_test.go @@ -291,7 +291,9 @@ func TestUnsubscribe(t *testing.T) { ctx := context.Background() subscription, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'")) require.NoError(t, err) - err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'")) + err = s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{ + Subscriber: clientID, + Query: query.MustParse("tm.events.type='NewBlock'")}) require.NoError(t, err) err = s.Publish(ctx, "Nick Fury") @@ -315,10 +317,14 @@ func TestClientUnsubscribesTwice(t *testing.T) { ctx := context.Background() _, err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'")) require.NoError(t, err) - err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'")) + err = s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{ + Subscriber: clientID, + Query: query.MustParse("tm.events.type='NewBlock'")}) require.NoError(t, err) - err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'")) + err = s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{ + Subscriber: clientID, + Query: query.MustParse("tm.events.type='NewBlock'")}) assert.Equal(t, pubsub.ErrSubscriptionNotFound, err) err = s.UnsubscribeAll(ctx, clientID) assert.Equal(t, pubsub.ErrSubscriptionNotFound, err) @@ -338,7 +344,7 @@ func TestResubscribe(t *testing.T) { ctx := context.Background() _, err = s.Subscribe(ctx, clientID, query.Empty{}) require.NoError(t, err) - err = s.Unsubscribe(ctx, clientID, query.Empty{}) + err = s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{Subscriber: clientID, Query: query.Empty{}}) require.NoError(t, err) subscription, err := s.Subscribe(ctx, clientID, query.Empty{}) require.NoError(t, err) diff --git a/libs/pubsub/subscription.go b/libs/pubsub/subscription.go index a5f9f398e..34eab997f 100644 --- a/libs/pubsub/subscription.go +++ b/libs/pubsub/subscription.go @@ -2,7 +2,9 @@ package pubsub import ( "errors" + "fmt" + "github.com/google/uuid" tmsync "github.com/tendermint/tendermint/internal/libs/sync" ) @@ -21,6 +23,7 @@ var ( // 2) channel which is closed if a client is too slow or choose to unsubscribe // 3) err indicating the reason for (2) type Subscription struct { + id string out chan Message canceled chan struct{} @@ -31,6 +34,7 @@ type Subscription struct { // NewSubscription returns a new subscription with the given outCapacity. func NewSubscription(outCapacity int) *Subscription { return &Subscription{ + id: uuid.NewString(), out: make(chan Message, outCapacity), canceled: make(chan struct{}), } @@ -43,6 +47,8 @@ func (s *Subscription) Out() <-chan Message { return s.out } +func (s *Subscription) ID() string { return s.id } + // Canceled returns a channel that's closed when the subscription is // terminated and supposed to be used in a select statement. func (s *Subscription) Canceled() <-chan struct{} { @@ -64,27 +70,42 @@ func (s *Subscription) Err() error { func (s *Subscription) cancel(err error) { s.mtx.Lock() - s.err = err - s.mtx.Unlock() + defer s.mtx.Unlock() + defer func() { + perr := recover() + if err == nil && perr != nil { + err = fmt.Errorf("problem closing subscription: %v", perr) + } + }() + + if s.err == nil && err != nil { + s.err = err + } + close(s.canceled) } // Message glues data and events together. type Message struct { + subID string data interface{} events map[string][]string } -func NewMessage(data interface{}, events map[string][]string) Message { - return Message{data, events} +func NewMessage(subID string, data interface{}, events map[string][]string) Message { + return Message{ + subID: subID, + data: data, + events: events, + } } +// SubscriptionID returns the unique identifier for the subscription +// that produced this message. +func (msg Message) SubscriptionID() string { return msg.subID } + // Data returns an original data published. -func (msg Message) Data() interface{} { - return msg.data -} +func (msg Message) Data() interface{} { return msg.data } // Events returns events, which matched the client's query. -func (msg Message) Events() map[string][]string { - return msg.events -} +func (msg Message) Events() map[string][]string { return msg.events } diff --git a/rpc/client/http/ws.go b/rpc/client/http/ws.go index bda943a63..afdaec861 100644 --- a/rpc/client/http/ws.go +++ b/rpc/client/http/ws.go @@ -52,7 +52,13 @@ type wsEvents struct { ws *jsonrpcclient.WSClient mtx tmsync.RWMutex - subscriptions map[string]chan ctypes.ResultEvent // query -> chan + subscriptions map[string]*wsSubscription +} + +type wsSubscription struct { + res chan ctypes.ResultEvent + id string + query string } var _ rpcclient.EventsClient = (*wsEvents)(nil) @@ -70,7 +76,7 @@ func newWsEvents(remote string, wso WSOptions) (*wsEvents, error) { } w := &wsEvents{ - subscriptions: make(map[string]chan ctypes.ResultEvent), + subscriptions: make(map[string]*wsSubscription), } w.BaseService = *service.NewBaseService(nil, "wsEvents", w) @@ -136,10 +142,10 @@ func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string, outc := make(chan ctypes.ResultEvent, outCap) w.mtx.Lock() + defer w.mtx.Unlock() // subscriber param is ignored because Tendermint will override it with // remote IP anyway. - w.subscriptions[query] = outc - w.mtx.Unlock() + w.subscriptions[query] = &wsSubscription{res: outc, query: query} return outc, nil } @@ -158,9 +164,12 @@ func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) er } w.mtx.Lock() - _, ok := w.subscriptions[query] + info, ok := w.subscriptions[query] if ok { - delete(w.subscriptions, query) + if info.id != "" { + delete(w.subscriptions, info.id) + } + delete(w.subscriptions, info.query) } w.mtx.Unlock() @@ -181,7 +190,7 @@ func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error } w.mtx.Lock() - w.subscriptions = make(map[string]chan ctypes.ResultEvent) + w.subscriptions = make(map[string]*wsSubscription) w.mtx.Unlock() return nil @@ -196,7 +205,11 @@ func (w *wsEvents) redoSubscriptionsAfter(d time.Duration) { w.mtx.Lock() defer w.mtx.Unlock() - for q := range w.subscriptions { + + for q, info := range w.subscriptions { + if q != "" && q == info.id { + continue + } err := w.ws.Subscribe(ctx, q) if err != nil { w.Logger.Error("failed to resubscribe", "query", q, "err", err) @@ -240,10 +253,17 @@ func (w *wsEvents) eventListener() { w.mtx.RLock() out, ok := w.subscriptions[result.Query] + if ok { + if _, idOk := w.subscriptions[result.SubscriptionID]; !idOk { + out.id = result.SubscriptionID + w.subscriptions[result.SubscriptionID] = out + } + } + w.mtx.RUnlock() if ok { select { - case out <- *result: + case out.res <- *result: case <-w.Quit(): return } diff --git a/rpc/client/local/local.go b/rpc/client/local/local.go index fe93902fb..0663ebf67 100644 --- a/rpc/client/local/local.go +++ b/rpc/client/local/local.go @@ -248,7 +248,13 @@ func (c *Local) eventsRoutine( for { select { case msg := <-sub.Out(): - result := ctypes.ResultEvent{Query: q.String(), Data: msg.Data(), Events: msg.Events()} + result := ctypes.ResultEvent{ + SubscriptionID: msg.SubscriptionID(), + Query: q.String(), + Data: msg.Data(), + Events: msg.Events(), + } + if cap(outc) == 0 { outc <- result } else { @@ -293,11 +299,18 @@ func (c *Local) resubscribe(subscriber string, q tmpubsub.Query) types.Subscript } func (c *Local) Unsubscribe(ctx context.Context, subscriber, query string) error { - q, err := tmquery.New(query) + args := tmpubsub.UnsubscribeArgs{Subscriber: subscriber} + var err error + args.Query, err = tmquery.New(query) if err != nil { - return fmt.Errorf("failed to parse query: %w", err) + // if this isn't a valid query it might be an ID, so + // we'll try that. It'll turn into an error when we + // try to unsubscribe. Eventually, perhaps, we'll want + // to change the interface to only allow + // unsubscription by ID, but that's a larger change. + args.ID = query } - return c.EventBus.Unsubscribe(ctx, subscriber, q) + return c.EventBus.Unsubscribe(ctx, args) } func (c *Local) UnsubscribeAll(ctx context.Context, subscriber string) error { diff --git a/rpc/core/events.go b/rpc/core/events.go index b73daa87f..e56295c52 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -86,13 +86,17 @@ func (env *Environment) Subscribe(ctx *rpctypes.Context, query string) (*ctypes. // Unsubscribe from events via WebSocket. // More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe func (env *Environment) Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) { - addr := ctx.RemoteAddr() - env.Logger.Info("Unsubscribe from query", "remote", addr, "query", query) - q, err := tmquery.New(query) + args := tmpubsub.UnsubscribeArgs{Subscriber: ctx.RemoteAddr()} + env.Logger.Info("Unsubscribe from query", "remote", args.Subscriber, "subscription", query) + + var err error + args.Query, err = tmquery.New(query) + if err != nil { - return nil, fmt.Errorf("failed to parse query: %w", err) + args.ID = query } - err = env.EventBus.Unsubscribe(context.Background(), addr, q) + + err = env.EventBus.Unsubscribe(ctx.Context(), args) if err != nil { return nil, err } @@ -104,7 +108,7 @@ func (env *Environment) Unsubscribe(ctx *rpctypes.Context, query string) (*ctype func (env *Environment) UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { addr := ctx.RemoteAddr() env.Logger.Info("Unsubscribe from all", "remote", addr) - err := env.EventBus.UnsubscribeAll(context.Background(), addr) + err := env.EventBus.UnsubscribeAll(ctx.Context(), addr) if err != nil { return nil, err } diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 51360166f..392453777 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -8,6 +8,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" mempl "github.com/tendermint/tendermint/internal/mempool" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types" "github.com/tendermint/tendermint/types" @@ -77,7 +78,8 @@ func (env *Environment) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (* return nil, err } defer func() { - if err := env.EventBus.Unsubscribe(context.Background(), subscriber, q); err != nil { + args := tmpubsub.UnsubscribeArgs{Subscriber: subscriber, Query: q} + if err := env.EventBus.Unsubscribe(context.Background(), args); err != nil { env.Logger.Error("Error unsubscribing from eventBus", "err", err) } }() diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 3685664eb..c69b520f0 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -258,7 +258,8 @@ type ( // Event data from a subscription type ResultEvent struct { - Query string `json:"query"` - Data types.TMEventData `json:"data"` - Events map[string][]string `json:"events"` + SubscriptionID string `json:"subscription_id"` + Query string `json:"query"` + Data types.TMEventData `json:"data"` + Events map[string][]string `json:"events"` } diff --git a/types/event_bus.go b/types/event_bus.go index 9aefdceee..094fd6748 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -14,7 +14,7 @@ const defaultCapacity = 0 type EventBusSubscriber interface { Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error) - Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error + Unsubscribe(ctx context.Context, args tmpubsub.UnsubscribeArgs) error UnsubscribeAll(ctx context.Context, subscriber string) error NumClients() int @@ -22,6 +22,7 @@ type EventBusSubscriber interface { } type Subscription interface { + ID() string Out() <-chan tmpubsub.Message Canceled() <-chan struct{} Err() error @@ -91,8 +92,8 @@ func (b *EventBus) SubscribeUnbuffered( return b.pubsub.SubscribeUnbuffered(ctx, subscriber, query) } -func (b *EventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { - return b.pubsub.Unsubscribe(ctx, subscriber, query) +func (b *EventBus) Unsubscribe(ctx context.Context, args tmpubsub.UnsubscribeArgs) error { + return b.pubsub.Unsubscribe(ctx, args) } func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error { @@ -239,7 +240,7 @@ func (NopEventBus) Subscribe( return nil } -func (NopEventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { +func (NopEventBus) Unsubscribe(ctx context.Context, args tmpubsub.UnsubscribeArgs) error { return nil }