|
|
@ -216,7 +216,9 @@ loop: |
|
|
|
state.removeAll(cmd.clientID) |
|
|
|
} |
|
|
|
case shutdown: |
|
|
|
state.reset() |
|
|
|
for clientID, _ := range state.clients { |
|
|
|
state.removeAll(clientID) |
|
|
|
} |
|
|
|
break loop |
|
|
|
case sub: |
|
|
|
state.add(cmd.clientID, cmd.query, cmd.ch) |
|
|
@ -286,11 +288,6 @@ func (state *state) removeAll(clientID string) { |
|
|
|
delete(state.clients, clientID) |
|
|
|
} |
|
|
|
|
|
|
|
func (state *state) reset() { |
|
|
|
state.queries = make(map[Query]map[string]chan<- interface{}) |
|
|
|
state.clients = make(map[string]map[Query]struct{}) |
|
|
|
} |
|
|
|
|
|
|
|
func (state *state) send(msg interface{}, tags map[string]interface{}, slowClientStrategy overflowStrategy, logger log.Logger) { |
|
|
|
for q, clientToChannelMap := range state.queries { |
|
|
|
// NOTE we can use LRU cache to speed up common cases like query = "
|
|
|
|