diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 31a80f580..962ecfd7c 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -238,11 +238,9 @@ func (w *WSEvents) Stop() bool { } func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { - w.mtx.RLock() - if _, ok := w.subscriptions[query]; ok { + if ch := w.getSubscription(query); ch != nil { return errors.New("already subscribed") } - w.mtx.RUnlock() err := w.ws.Subscribe(ctx, query) if err != nil { @@ -312,10 +310,15 @@ func (w *WSEvents) eventListener() { fmt.Printf("ws err: %+v\n", resp.Error.Error()) continue } - err := w.parseEvent(*resp.Result) + result := new(ctypes.ResultEvent) + err = json.Unmarshal(*resp.Result, result) if err != nil { - // FIXME: better logging/handling of errors?? - fmt.Printf("ws result: %+v\n", err) + // ignore silently (eg. subscribe, unsubscribe and maybe other events) + // TODO: ? + continue + } + if ch := getSubscription(result.Query); ch != nil { + ch <- result.Data } case <-w.quit: // send a message so we can wait for the routine to exit @@ -326,21 +329,8 @@ func (w *WSEvents) eventListener() { } } -// parseEvent unmarshals the json message and converts it into -// some implementation of types.TMEventData, and sends it off -// on the merry way to the EventSwitch -func (w *WSEvents) parseEvent(data []byte) (err error) { - result := new(ctypes.ResultEvent) - err = json.Unmarshal(data, result) - if err != nil { - // ignore silently (eg. subscribe, unsubscribe and maybe other events) - // TODO: ? - return nil - } +func (w *WSEvents) getSubscription(query string) chan<- interface{} { w.mtx.RLock() - if ch, ok := w.subscriptions[result.Query]; ok { - ch <- result.Data - } - w.mtx.RUnlock() - return nil + defer w.mtx.RUnlock() + return w.subscriptions[query] }