|
|
@ -247,9 +247,7 @@ func (s *Server) OnStop() { |
|
|
|
// NOTE: not goroutine safe
|
|
|
|
type state struct { |
|
|
|
// query string -> client -> subscription
|
|
|
|
queryToSubscriptionMap map[string]map[string]*Subscription |
|
|
|
// client -> query string -> struct{}
|
|
|
|
clientToQueryMap map[string]map[string]struct{} |
|
|
|
subscriptions map[string]map[string]*Subscription |
|
|
|
// query string -> queryPlusRefCount
|
|
|
|
queries map[string]*queryPlusRefCount |
|
|
|
} |
|
|
@ -264,8 +262,7 @@ type queryPlusRefCount struct { |
|
|
|
// OnStart implements Service.OnStart by starting the server.
|
|
|
|
func (s *Server) OnStart() error { |
|
|
|
go s.loop(state{ |
|
|
|
queryToSubscriptionMap: make(map[string]map[string]*Subscription), |
|
|
|
clientToQueryMap: make(map[string]map[string]struct{}), |
|
|
|
subscriptions: make(map[string]map[string]*Subscription), |
|
|
|
queries: make(map[string]*queryPlusRefCount), |
|
|
|
}) |
|
|
|
return nil |
|
|
@ -282,14 +279,12 @@ loop: |
|
|
|
switch cmd.op { |
|
|
|
case unsub: |
|
|
|
if cmd.query != nil { |
|
|
|
state.remove(cmd.clientID, cmd.query, ErrUnsubscribed) |
|
|
|
state.remove(cmd.clientID, cmd.query.String(), ErrUnsubscribed) |
|
|
|
} else { |
|
|
|
state.removeAll(cmd.clientID, ErrUnsubscribed) |
|
|
|
state.removeClient(cmd.clientID, ErrUnsubscribed) |
|
|
|
} |
|
|
|
case shutdown: |
|
|
|
for clientID := range state.clientToQueryMap { |
|
|
|
state.removeAll(clientID, nil) |
|
|
|
} |
|
|
|
state.removeAll(nil) |
|
|
|
break loop |
|
|
|
case sub: |
|
|
|
state.add(cmd.clientID, cmd.query, cmd.subscription) |
|
|
@ -302,36 +297,28 @@ loop: |
|
|
|
func (state *state) add(clientID string, q Query, subscription *Subscription) { |
|
|
|
qStr := q.String() |
|
|
|
|
|
|
|
// initialize clientToSubscriptionMap per query if needed
|
|
|
|
if _, ok := state.queryToSubscriptionMap[qStr]; !ok { |
|
|
|
state.queryToSubscriptionMap[qStr] = make(map[string]*Subscription) |
|
|
|
// initialize subscription for this client per query if needed
|
|
|
|
if _, ok := state.subscriptions[qStr]; !ok { |
|
|
|
state.subscriptions[qStr] = make(map[string]*Subscription) |
|
|
|
} |
|
|
|
// create subscription
|
|
|
|
state.queryToSubscriptionMap[qStr][clientID] = subscription |
|
|
|
state.subscriptions[qStr][clientID] = subscription |
|
|
|
|
|
|
|
// initialize queries if needed
|
|
|
|
// initialize query if needed
|
|
|
|
if _, ok := state.queries[qStr]; !ok { |
|
|
|
state.queries[qStr] = &queryPlusRefCount{q: q, refCount: 0} |
|
|
|
} |
|
|
|
// increment reference counter
|
|
|
|
state.queries[qStr].refCount++ |
|
|
|
|
|
|
|
// add client if needed
|
|
|
|
if _, ok := state.clientToQueryMap[clientID]; !ok { |
|
|
|
state.clientToQueryMap[clientID] = make(map[string]struct{}) |
|
|
|
} |
|
|
|
state.clientToQueryMap[clientID][qStr] = struct{}{} |
|
|
|
} |
|
|
|
|
|
|
|
func (state *state) remove(clientID string, q Query, reason error) { |
|
|
|
qStr := q.String() |
|
|
|
|
|
|
|
clientToSubscriptionMap, ok := state.queryToSubscriptionMap[qStr] |
|
|
|
func (state *state) remove(clientID string, qStr string, reason error) { |
|
|
|
clientSubscriptions, ok := state.subscriptions[qStr] |
|
|
|
if !ok { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
subscription, ok := clientToSubscriptionMap[clientID] |
|
|
|
subscription, ok := clientSubscriptions[clientID] |
|
|
|
if !ok { |
|
|
|
return |
|
|
|
} |
|
|
@ -341,18 +328,11 @@ func (state *state) remove(clientID string, q Query, reason error) { |
|
|
|
subscription.mtx.Unlock() |
|
|
|
close(subscription.cancelled) |
|
|
|
|
|
|
|
// remove the query from client map.
|
|
|
|
// if client is not subscribed to anything else, remove it.
|
|
|
|
delete(state.clientToQueryMap[clientID], qStr) |
|
|
|
if len(state.clientToQueryMap[clientID]) == 0 { |
|
|
|
delete(state.clientToQueryMap, clientID) |
|
|
|
} |
|
|
|
|
|
|
|
// remove the client from query map.
|
|
|
|
// remove client from query map.
|
|
|
|
// if query has no other clients subscribed, remove it.
|
|
|
|
delete(state.queryToSubscriptionMap[qStr], clientID) |
|
|
|
if len(state.queryToSubscriptionMap[qStr]) == 0 { |
|
|
|
delete(state.queryToSubscriptionMap, qStr) |
|
|
|
delete(state.subscriptions[qStr], clientID) |
|
|
|
if len(state.subscriptions[qStr]) == 0 { |
|
|
|
delete(state.subscriptions, qStr) |
|
|
|
} |
|
|
|
|
|
|
|
// decrease ref counter in queries
|
|
|
@ -363,43 +343,27 @@ func (state *state) remove(clientID string, q Query, reason error) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (state *state) removeAll(clientID string, reason error) { |
|
|
|
queryMap, ok := state.clientToQueryMap[clientID] |
|
|
|
if !ok { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
for qStr := range queryMap { |
|
|
|
subscription := state.queryToSubscriptionMap[qStr][clientID] |
|
|
|
subscription.mtx.Lock() |
|
|
|
subscription.err = reason |
|
|
|
subscription.mtx.Unlock() |
|
|
|
close(subscription.cancelled) |
|
|
|
|
|
|
|
// remove the client from query map.
|
|
|
|
// if query has no other clients subscribed, remove it.
|
|
|
|
delete(state.queryToSubscriptionMap[qStr], clientID) |
|
|
|
if len(state.queryToSubscriptionMap[qStr]) == 0 { |
|
|
|
delete(state.queryToSubscriptionMap, qStr) |
|
|
|
func (state *state) removeClient(clientID string, reason error) { |
|
|
|
for qStr, clientSubscriptions := range state.subscriptions { |
|
|
|
if _, ok := clientSubscriptions[clientID]; ok { |
|
|
|
state.remove(clientID, qStr, reason) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// decrease ref counter in queries
|
|
|
|
state.queries[qStr].refCount-- |
|
|
|
// remove the query if nobody else is using it
|
|
|
|
if state.queries[qStr].refCount == 0 { |
|
|
|
delete(state.queries, qStr) |
|
|
|
func (state *state) removeAll(reason error) { |
|
|
|
for qStr, clientSubscriptions := range state.subscriptions { |
|
|
|
for clientID := range clientSubscriptions { |
|
|
|
state.remove(clientID, qStr, reason) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// remove the client.
|
|
|
|
delete(state.clientToQueryMap, clientID) |
|
|
|
} |
|
|
|
|
|
|
|
func (state *state) send(msg interface{}, tags TagMap) { |
|
|
|
for qStr, clientToSubscriptionMap := range state.queryToSubscriptionMap { |
|
|
|
for qStr, clientSubscriptions := range state.subscriptions { |
|
|
|
q := state.queries[qStr].q |
|
|
|
if q.Matches(tags) { |
|
|
|
for clientID, subscription := range clientToSubscriptionMap { |
|
|
|
for clientID, subscription := range clientSubscriptions { |
|
|
|
if cap(subscription.out) == 0 { |
|
|
|
// block on unbuffered channel
|
|
|
|
subscription.out <- MsgAndTags{msg, tags} |
|
|
@ -408,7 +372,7 @@ func (state *state) send(msg interface{}, tags TagMap) { |
|
|
|
select { |
|
|
|
case subscription.out <- MsgAndTags{msg, tags}: |
|
|
|
default: |
|
|
|
state.remove(clientID, q, ErrOutOfCapacity) |
|
|
|
state.remove(clientID, qStr, ErrOutOfCapacity) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|