Browse Source

Rename eventType to query

pull/1943/head
zhangyelong 7 years ago
parent
commit
858cad05f1
2 changed files with 25 additions and 29 deletions
  1. +23
    -27
      tm-monitor/eventmeter/eventmeter.go
  2. +2
    -2
      tm-monitor/mock/eventmeter.go

+ 23
- 27
tm-monitor/eventmeter/eventmeter.go View File

@ -65,7 +65,7 @@ func (metric *EventMetric) fillMetric() *EventMetric {
// event.
type EventCallbackFunc func(em *EventMetric, data interface{})
// EventUnmarshalFunc is a closure to get the eventType and data out of the raw
// EventUnmarshalFunc is a closure to get the query and data out of the raw
// JSON received over the RPC WebSocket.
type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error)
@ -81,7 +81,7 @@ type EventMeter struct {
wsc *client.WSClient
mtx sync.Mutex
events map[string]*EventMetric
queries map[string]*EventMetric
unmarshalEvent EventUnmarshalFunc
latencyCallback LatencyCallbackFunc
@ -96,7 +96,7 @@ type EventMeter struct {
func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
return &EventMeter{
wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)),
events: make(map[string]*EventMetric),
queries: make(map[string]*EventMetric),
unmarshalEvent: unmarshalEvent,
logger: log.NewNopLogger(),
}
@ -140,16 +140,13 @@ func (em *EventMeter) Stop() {
}
}
// Subscribe for the given event type. Callback function will be called upon
// Subscribe for the given query. Callback function will be called upon
// receiving an event.
func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error {
func (em *EventMeter) Subscribe(query string, cb EventCallbackFunc) error {
em.mtx.Lock()
defer em.mtx.Unlock()
if _, ok := em.events[eventType]; ok {
return fmt.Errorf("subscribtion already exists")
}
if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil {
if err := em.wsc.Subscribe(context.TODO(), query); err != nil {
return err
}
@ -157,29 +154,28 @@ func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error {
meter: metrics.NewMeter(),
callback: cb,
}
em.events[eventType] = metric
em.queries[query] = metric
return nil
}
// Unsubscribe from the given event type.
func (em *EventMeter) Unsubscribe(eventType string) error {
// Unsubscribe from the given query.
func (em *EventMeter) Unsubscribe(query string) error {
em.mtx.Lock()
defer em.mtx.Unlock()
if err := em.wsc.Unsubscribe(context.TODO(), eventType); err != nil {
if err := em.wsc.Unsubscribe(context.TODO(), query); err != nil {
return err
}
// XXX: should we persist or save this info first?
delete(em.events, eventType)
return nil
}
// GetMetric fills in the latest data for an event and return a copy.
func (em *EventMeter) GetMetric(eventType string) (*EventMetric, error) {
// GetMetric fills in the latest data for an query and return a copy.
func (em *EventMeter) GetMetric(query string) (*EventMetric, error) {
em.mtx.Lock()
defer em.mtx.Unlock()
metric, ok := em.events[eventType]
metric, ok := em.queries[query]
if !ok {
return nil, fmt.Errorf("unknown event: %s", eventType)
return nil, fmt.Errorf("unknown query: %s", query)
}
return metric.fillMetric().Copy(), nil
}
@ -202,8 +198,8 @@ func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) {
// Private
func (em *EventMeter) subscribe() error {
for eventType, _ := range em.events {
if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil {
for query, _ := range em.queries {
if err := em.wsc.Subscribe(context.TODO(), query); err != nil {
return err
}
}
@ -219,13 +215,13 @@ func (em *EventMeter) receiveRoutine() {
em.logger.Error("expected some event, got error", "err", resp.Error.Error())
continue
}
eventType, data, err := em.unmarshalEvent(resp.Result)
query, data, err := em.unmarshalEvent(resp.Result)
if err != nil {
em.logger.Error("failed to unmarshal event", "err", err)
continue
}
if eventType != "" { // FIXME how can it be an empty string?
em.updateMetric(eventType, data)
if query != "" { // FIXME how can it be an empty string?
em.updateMetric(query, data)
}
case <-latencyTicker.C:
if em.wsc.IsActive() {
@ -259,13 +255,13 @@ func (em *EventMeter) disconnectRoutine() {
}
}
func (em *EventMeter) updateMetric(eventType string, data events.EventData) {
func (em *EventMeter) updateMetric(query string, data events.EventData) {
em.mtx.Lock()
defer em.mtx.Unlock()
metric, ok := em.events[eventType]
metric, ok := em.queries[query]
if !ok {
// we already unsubscribed, or got an unexpected event
// we already unsubscribed, or got an unexpected query
return
}


+ 2
- 2
tm-monitor/mock/eventmeter.go View File

@ -21,11 +21,11 @@ func (e *EventMeter) RegisterLatencyCallback(cb em.LatencyCallbackFunc) { e.late
func (e *EventMeter) RegisterDisconnectCallback(cb em.DisconnectCallbackFunc) {
e.disconnectCallback = cb
}
func (e *EventMeter) Subscribe(eventID string, cb em.EventCallbackFunc) error {
func (e *EventMeter) Subscribe(query string, cb em.EventCallbackFunc) error {
e.eventCallback = cb
return nil
}
func (e *EventMeter) Unsubscribe(eventID string) error {
func (e *EventMeter) Unsubscribe(query string) error {
e.eventCallback = nil
return nil
}


Loading…
Cancel
Save