|
@ -238,11 +238,9 @@ func (w *WSEvents) Stop() bool { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { |
|
|
func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { |
|
|
w.mtx.RLock() |
|
|
|
|
|
if _, ok := w.subscriptions[query]; ok { |
|
|
|
|
|
|
|
|
if ch := w.getSubscription(query); ch != nil { |
|
|
return errors.New("already subscribed") |
|
|
return errors.New("already subscribed") |
|
|
} |
|
|
} |
|
|
w.mtx.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
err := w.ws.Subscribe(ctx, query) |
|
|
err := w.ws.Subscribe(ctx, query) |
|
|
if err != nil { |
|
|
if err != nil { |
|
@ -312,10 +310,15 @@ func (w *WSEvents) eventListener() { |
|
|
fmt.Printf("ws err: %+v\n", resp.Error.Error()) |
|
|
fmt.Printf("ws err: %+v\n", resp.Error.Error()) |
|
|
continue |
|
|
continue |
|
|
} |
|
|
} |
|
|
err := w.parseEvent(*resp.Result) |
|
|
|
|
|
|
|
|
result := new(ctypes.ResultEvent) |
|
|
|
|
|
err = json.Unmarshal(*resp.Result, result) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
// FIXME: better logging/handling of errors??
|
|
|
|
|
|
fmt.Printf("ws result: %+v\n", err) |
|
|
|
|
|
|
|
|
// ignore silently (eg. subscribe, unsubscribe and maybe other events)
|
|
|
|
|
|
// TODO: ?
|
|
|
|
|
|
continue |
|
|
|
|
|
} |
|
|
|
|
|
if ch := getSubscription(result.Query); ch != nil { |
|
|
|
|
|
ch <- result.Data |
|
|
} |
|
|
} |
|
|
case <-w.quit: |
|
|
case <-w.quit: |
|
|
// send a message so we can wait for the routine to exit
|
|
|
// send a message so we can wait for the routine to exit
|
|
@ -326,21 +329,8 @@ func (w *WSEvents) eventListener() { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// parseEvent unmarshals the json message and converts it into
|
|
|
|
|
|
// some implementation of types.TMEventData, and sends it off
|
|
|
|
|
|
// on the merry way to the EventSwitch
|
|
|
|
|
|
func (w *WSEvents) parseEvent(data []byte) (err error) { |
|
|
|
|
|
result := new(ctypes.ResultEvent) |
|
|
|
|
|
err = json.Unmarshal(data, result) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
// ignore silently (eg. subscribe, unsubscribe and maybe other events)
|
|
|
|
|
|
// TODO: ?
|
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
func (w *WSEvents) getSubscription(query string) chan<- interface{} { |
|
|
w.mtx.RLock() |
|
|
w.mtx.RLock() |
|
|
if ch, ok := w.subscriptions[result.Query]; ok { |
|
|
|
|
|
ch <- result.Data |
|
|
|
|
|
} |
|
|
|
|
|
w.mtx.RUnlock() |
|
|
|
|
|
return nil |
|
|
|
|
|
|
|
|
defer w.mtx.RUnlock() |
|
|
|
|
|
return w.subscriptions[query] |
|
|
} |
|
|
} |