diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 684ff358a..776e0653b 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -156,6 +156,8 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou if _, ok = s.subscriptions[clientID]; !ok { s.subscriptions[clientID] = make(map[string]Query) } + // preserve original query + // see Unsubscribe s.subscriptions[clientID][query.String()] = query s.mtx.Unlock() return nil @@ -314,6 +316,9 @@ func (state *state) remove(clientID string, q Query) { } delete(state.queries[q], clientID) + if len(state.queries[q]) == 0 { + delete(state.queries, q) + } } } @@ -328,8 +333,10 @@ func (state *state) removeAll(clientID string) { close(ch) delete(state.queries[q], clientID) + if len(state.queries[q]) == 0 { + delete(state.queries, q) + } } - delete(state.clients, clientID) }