|
@ -81,7 +81,7 @@ type EventMeter struct { |
|
|
wsc *client.WSClient |
|
|
wsc *client.WSClient |
|
|
|
|
|
|
|
|
mtx sync.Mutex |
|
|
mtx sync.Mutex |
|
|
queries map[string]*EventMetric |
|
|
|
|
|
|
|
|
queryToMetricMap map[string]*EventMetric |
|
|
|
|
|
|
|
|
unmarshalEvent EventUnmarshalFunc |
|
|
unmarshalEvent EventUnmarshalFunc |
|
|
latencyCallback LatencyCallbackFunc |
|
|
latencyCallback LatencyCallbackFunc |
|
@ -96,7 +96,7 @@ type EventMeter struct { |
|
|
func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { |
|
|
func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { |
|
|
return &EventMeter{ |
|
|
return &EventMeter{ |
|
|
wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)), |
|
|
wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)), |
|
|
queries: make(map[string]*EventMetric), |
|
|
|
|
|
|
|
|
queryToMetricMap: make(map[string]*EventMetric), |
|
|
unmarshalEvent: unmarshalEvent, |
|
|
unmarshalEvent: unmarshalEvent, |
|
|
logger: log.NewNopLogger(), |
|
|
logger: log.NewNopLogger(), |
|
|
} |
|
|
} |
|
@ -154,7 +154,7 @@ func (em *EventMeter) Subscribe(query string, cb EventCallbackFunc) error { |
|
|
meter: metrics.NewMeter(), |
|
|
meter: metrics.NewMeter(), |
|
|
callback: cb, |
|
|
callback: cb, |
|
|
} |
|
|
} |
|
|
em.queries[query] = metric |
|
|
|
|
|
|
|
|
em.queryToMetricMap[query] = metric |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -173,7 +173,7 @@ func (em *EventMeter) Unsubscribe(query string) error { |
|
|
func (em *EventMeter) GetMetric(query string) (*EventMetric, error) { |
|
|
func (em *EventMeter) GetMetric(query string) (*EventMetric, error) { |
|
|
em.mtx.Lock() |
|
|
em.mtx.Lock() |
|
|
defer em.mtx.Unlock() |
|
|
defer em.mtx.Unlock() |
|
|
metric, ok := em.queries[query] |
|
|
|
|
|
|
|
|
metric, ok := em.queryToMetricMap[query] |
|
|
if !ok { |
|
|
if !ok { |
|
|
return nil, fmt.Errorf("unknown query: %s", query) |
|
|
return nil, fmt.Errorf("unknown query: %s", query) |
|
|
} |
|
|
} |
|
@ -198,7 +198,7 @@ func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) { |
|
|
// Private
|
|
|
// Private
|
|
|
|
|
|
|
|
|
func (em *EventMeter) subscribe() error { |
|
|
func (em *EventMeter) subscribe() error { |
|
|
for query, _ := range em.queries { |
|
|
|
|
|
|
|
|
for query, _ := range em.queryToMetricMap { |
|
|
if err := em.wsc.Subscribe(context.TODO(), query); err != nil { |
|
|
if err := em.wsc.Subscribe(context.TODO(), query); err != nil { |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
@ -259,7 +259,7 @@ func (em *EventMeter) updateMetric(query string, data events.EventData) { |
|
|
em.mtx.Lock() |
|
|
em.mtx.Lock() |
|
|
defer em.mtx.Unlock() |
|
|
defer em.mtx.Unlock() |
|
|
|
|
|
|
|
|
metric, ok := em.queries[query] |
|
|
|
|
|
|
|
|
metric, ok := em.queryToMetricMap[query] |
|
|
if !ok { |
|
|
if !ok { |
|
|
// we already unsubscribed, or got an unexpected query
|
|
|
// we already unsubscribed, or got an unexpected query
|
|
|
return |
|
|
return |
|
|