From c6e701524521f5277b2c02283ab4de3cc43eed9f Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 30 Jan 2019 10:24:14 +0400 Subject: [PATCH] get rid of clientToQueryMap --- libs/pubsub/pubsub.go | 96 ++++++++++++++----------------------------- 1 file changed, 30 insertions(+), 66 deletions(-) diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index f898c0e75..c118df0e0 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -247,9 +247,7 @@ func (s *Server) OnStop() { // NOTE: not goroutine safe type state struct { // query string -> client -> subscription - queryToSubscriptionMap map[string]map[string]*Subscription - // client -> query string -> struct{} - clientToQueryMap map[string]map[string]struct{} + subscriptions map[string]map[string]*Subscription // query string -> queryPlusRefCount queries map[string]*queryPlusRefCount } @@ -264,8 +262,7 @@ type queryPlusRefCount struct { // OnStart implements Service.OnStart by starting the server. func (s *Server) OnStart() error { go s.loop(state{ - queryToSubscriptionMap: make(map[string]map[string]*Subscription), - clientToQueryMap: make(map[string]map[string]struct{}), + subscriptions: make(map[string]map[string]*Subscription), queries: make(map[string]*queryPlusRefCount), }) return nil @@ -282,14 +279,12 @@ loop: switch cmd.op { case unsub: if cmd.query != nil { - state.remove(cmd.clientID, cmd.query, ErrUnsubscribed) + state.remove(cmd.clientID, cmd.query.String(), ErrUnsubscribed) } else { - state.removeAll(cmd.clientID, ErrUnsubscribed) + state.removeClient(cmd.clientID, ErrUnsubscribed) } case shutdown: - for clientID := range state.clientToQueryMap { - state.removeAll(clientID, nil) - } + state.removeAll(nil) break loop case sub: state.add(cmd.clientID, cmd.query, cmd.subscription) @@ -302,36 +297,28 @@ loop: func (state *state) add(clientID string, q Query, subscription *Subscription) { qStr := q.String() - // initialize clientToSubscriptionMap per query if needed - if _, ok := state.queryToSubscriptionMap[qStr]; !ok { - state.queryToSubscriptionMap[qStr] = make(map[string]*Subscription) + // initialize subscription for this client per query if needed + if _, ok := state.subscriptions[qStr]; !ok { + state.subscriptions[qStr] = make(map[string]*Subscription) } // create subscription - state.queryToSubscriptionMap[qStr][clientID] = subscription + state.subscriptions[qStr][clientID] = subscription - // initialize queries if needed + // initialize query if needed if _, ok := state.queries[qStr]; !ok { state.queries[qStr] = &queryPlusRefCount{q: q, refCount: 0} } // increment reference counter state.queries[qStr].refCount++ - - // add client if needed - if _, ok := state.clientToQueryMap[clientID]; !ok { - state.clientToQueryMap[clientID] = make(map[string]struct{}) - } - state.clientToQueryMap[clientID][qStr] = struct{}{} } -func (state *state) remove(clientID string, q Query, reason error) { - qStr := q.String() - - clientToSubscriptionMap, ok := state.queryToSubscriptionMap[qStr] +func (state *state) remove(clientID string, qStr string, reason error) { + clientSubscriptions, ok := state.subscriptions[qStr] if !ok { return } - subscription, ok := clientToSubscriptionMap[clientID] + subscription, ok := clientSubscriptions[clientID] if !ok { return } @@ -341,18 +328,11 @@ func (state *state) remove(clientID string, q Query, reason error) { subscription.mtx.Unlock() close(subscription.cancelled) - // remove the query from client map. - // if client is not subscribed to anything else, remove it. - delete(state.clientToQueryMap[clientID], qStr) - if len(state.clientToQueryMap[clientID]) == 0 { - delete(state.clientToQueryMap, clientID) - } - - // remove the client from query map. + // remove client from query map. // if query has no other clients subscribed, remove it. - delete(state.queryToSubscriptionMap[qStr], clientID) - if len(state.queryToSubscriptionMap[qStr]) == 0 { - delete(state.queryToSubscriptionMap, qStr) + delete(state.subscriptions[qStr], clientID) + if len(state.subscriptions[qStr]) == 0 { + delete(state.subscriptions, qStr) } // decrease ref counter in queries @@ -363,43 +343,27 @@ func (state *state) remove(clientID string, q Query, reason error) { } } -func (state *state) removeAll(clientID string, reason error) { - queryMap, ok := state.clientToQueryMap[clientID] - if !ok { - return - } - - for qStr := range queryMap { - subscription := state.queryToSubscriptionMap[qStr][clientID] - subscription.mtx.Lock() - subscription.err = reason - subscription.mtx.Unlock() - close(subscription.cancelled) - - // remove the client from query map. - // if query has no other clients subscribed, remove it. - delete(state.queryToSubscriptionMap[qStr], clientID) - if len(state.queryToSubscriptionMap[qStr]) == 0 { - delete(state.queryToSubscriptionMap, qStr) +func (state *state) removeClient(clientID string, reason error) { + for qStr, clientSubscriptions := range state.subscriptions { + if _, ok := clientSubscriptions[clientID]; ok { + state.remove(clientID, qStr, reason) } + } +} - // decrease ref counter in queries - state.queries[qStr].refCount-- - // remove the query if nobody else is using it - if state.queries[qStr].refCount == 0 { - delete(state.queries, qStr) +func (state *state) removeAll(reason error) { + for qStr, clientSubscriptions := range state.subscriptions { + for clientID := range clientSubscriptions { + state.remove(clientID, qStr, reason) } } - - // remove the client. - delete(state.clientToQueryMap, clientID) } func (state *state) send(msg interface{}, tags TagMap) { - for qStr, clientToSubscriptionMap := range state.queryToSubscriptionMap { + for qStr, clientSubscriptions := range state.subscriptions { q := state.queries[qStr].q if q.Matches(tags) { - for clientID, subscription := range clientToSubscriptionMap { + for clientID, subscription := range clientSubscriptions { if cap(subscription.out) == 0 { // block on unbuffered channel subscription.out <- MsgAndTags{msg, tags} @@ -408,7 +372,7 @@ func (state *state) send(msg interface{}, tags TagMap) { select { case subscription.out <- MsgAndTags{msg, tags}: default: - state.remove(clientID, q, ErrOutOfCapacity) + state.remove(clientID, qStr, ErrOutOfCapacity) } } }