|
|
@ -65,7 +65,7 @@ func (metric *EventMetric) fillMetric() *EventMetric { |
|
|
|
// event.
|
|
|
|
type EventCallbackFunc func(em *EventMetric, data interface{}) |
|
|
|
|
|
|
|
// EventUnmarshalFunc is a closure to get the eventType and data out of the raw
|
|
|
|
// EventUnmarshalFunc is a closure to get the query and data out of the raw
|
|
|
|
// JSON received over the RPC WebSocket.
|
|
|
|
type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error) |
|
|
|
|
|
|
@ -81,7 +81,7 @@ type EventMeter struct { |
|
|
|
wsc *client.WSClient |
|
|
|
|
|
|
|
mtx sync.Mutex |
|
|
|
events map[string]*EventMetric |
|
|
|
queryToMetricMap map[string]*EventMetric |
|
|
|
|
|
|
|
unmarshalEvent EventUnmarshalFunc |
|
|
|
latencyCallback LatencyCallbackFunc |
|
|
@ -96,7 +96,7 @@ type EventMeter struct { |
|
|
|
func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { |
|
|
|
return &EventMeter{ |
|
|
|
wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)), |
|
|
|
events: make(map[string]*EventMetric), |
|
|
|
queryToMetricMap: make(map[string]*EventMetric), |
|
|
|
unmarshalEvent: unmarshalEvent, |
|
|
|
logger: log.NewNopLogger(), |
|
|
|
} |
|
|
@ -115,7 +115,7 @@ func (em *EventMeter) String() string { |
|
|
|
|
|
|
|
// Start boots up event meter.
|
|
|
|
func (em *EventMeter) Start() error { |
|
|
|
if _, err := em.wsc.Start(); err != nil { |
|
|
|
if err := em.wsc.Start(); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
@ -140,16 +140,13 @@ func (em *EventMeter) Stop() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Subscribe for the given event type. Callback function will be called upon
|
|
|
|
// Subscribe for the given query. Callback function will be called upon
|
|
|
|
// receiving an event.
|
|
|
|
func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error { |
|
|
|
func (em *EventMeter) Subscribe(query string, cb EventCallbackFunc) error { |
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
|
|
|
|
if _, ok := em.events[eventType]; ok { |
|
|
|
return fmt.Errorf("subscribtion already exists") |
|
|
|
} |
|
|
|
if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil { |
|
|
|
if err := em.wsc.Subscribe(context.TODO(), query); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
@ -157,29 +154,28 @@ func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error { |
|
|
|
meter: metrics.NewMeter(), |
|
|
|
callback: cb, |
|
|
|
} |
|
|
|
em.events[eventType] = metric |
|
|
|
em.queryToMetricMap[query] = metric |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// Unsubscribe from the given event type.
|
|
|
|
func (em *EventMeter) Unsubscribe(eventType string) error { |
|
|
|
// Unsubscribe from the given query.
|
|
|
|
func (em *EventMeter) Unsubscribe(query string) error { |
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
if err := em.wsc.Unsubscribe(context.TODO(), eventType); err != nil { |
|
|
|
if err := em.wsc.Unsubscribe(context.TODO(), query); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
// XXX: should we persist or save this info first?
|
|
|
|
delete(em.events, eventType) |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// GetMetric fills in the latest data for an event and return a copy.
|
|
|
|
func (em *EventMeter) GetMetric(eventType string) (*EventMetric, error) { |
|
|
|
// GetMetric fills in the latest data for an query and return a copy.
|
|
|
|
func (em *EventMeter) GetMetric(query string) (*EventMetric, error) { |
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
metric, ok := em.events[eventType] |
|
|
|
metric, ok := em.queryToMetricMap[query] |
|
|
|
if !ok { |
|
|
|
return nil, fmt.Errorf("unknown event: %s", eventType) |
|
|
|
return nil, fmt.Errorf("unknown query: %s", query) |
|
|
|
} |
|
|
|
return metric.fillMetric().Copy(), nil |
|
|
|
} |
|
|
@ -202,8 +198,8 @@ func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) { |
|
|
|
// Private
|
|
|
|
|
|
|
|
func (em *EventMeter) subscribe() error { |
|
|
|
for eventType, _ := range em.events { |
|
|
|
if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil { |
|
|
|
for query, _ := range em.queryToMetricMap { |
|
|
|
if err := em.wsc.Subscribe(context.TODO(), query); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
@ -219,19 +215,19 @@ func (em *EventMeter) receiveRoutine() { |
|
|
|
em.logger.Error("expected some event, got error", "err", resp.Error.Error()) |
|
|
|
continue |
|
|
|
} |
|
|
|
eventType, data, err := em.unmarshalEvent(*resp.Result) |
|
|
|
query, data, err := em.unmarshalEvent(resp.Result) |
|
|
|
if err != nil { |
|
|
|
em.logger.Error("failed to unmarshal event", "err", err) |
|
|
|
continue |
|
|
|
} |
|
|
|
if eventType != "" { // FIXME how can it be an empty string?
|
|
|
|
em.updateMetric(eventType, data) |
|
|
|
if query != "" { // FIXME how can it be an empty string?
|
|
|
|
em.updateMetric(query, data) |
|
|
|
} |
|
|
|
case <-latencyTicker.C: |
|
|
|
if em.wsc.IsActive() { |
|
|
|
em.callLatencyCallback(em.wsc.PingPongLatencyTimer.Mean()) |
|
|
|
} |
|
|
|
case <-em.wsc.Quit: |
|
|
|
case <-em.wsc.Quit(): |
|
|
|
return |
|
|
|
case <-em.quit: |
|
|
|
return |
|
|
@ -251,7 +247,7 @@ func (em *EventMeter) disconnectRoutine() { |
|
|
|
em.subscribe() |
|
|
|
em.subscribed = true |
|
|
|
} |
|
|
|
case <-em.wsc.Quit: |
|
|
|
case <-em.wsc.Quit(): |
|
|
|
return |
|
|
|
case <-em.quit: |
|
|
|
return |
|
|
@ -259,13 +255,13 @@ func (em *EventMeter) disconnectRoutine() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (em *EventMeter) updateMetric(eventType string, data events.EventData) { |
|
|
|
func (em *EventMeter) updateMetric(query string, data events.EventData) { |
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
|
|
|
|
metric, ok := em.events[eventType] |
|
|
|
metric, ok := em.queryToMetricMap[query] |
|
|
|
if !ok { |
|
|
|
// we already unsubscribed, or got an unexpected event
|
|
|
|
// we already unsubscribed, or got an unexpected query
|
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|