- // eventmeter - generic system to subscribe to events and record their frequency.
- package eventmeter
-
- import (
- "context"
- "encoding/json"
- "fmt"
- "sync"
- "time"
-
- metrics "github.com/rcrowley/go-metrics"
- client "github.com/tendermint/tendermint/rpc/lib/client"
- "github.com/tendermint/tmlibs/events"
- "github.com/tendermint/tmlibs/log"
- )
-
- const (
- // Get ping/pong latency and call LatencyCallbackFunc with this period.
- latencyPeriod = 1 * time.Second
-
- // Check if the WS client is connected every
- connectionCheckPeriod = 100 * time.Millisecond
- )
-
- // EventMetric exposes metrics for an event.
- type EventMetric struct {
- ID string `json:"id"`
- Started time.Time `json:"start_time"`
- LastHeard time.Time `json:"last_heard"`
- MinDuration int64 `json:"min_duration"`
- MaxDuration int64 `json:"max_duration"`
-
- // tracks event count and rate
- meter metrics.Meter
-
- // filled in from the Meter
- Count int64 `json:"count"`
- Rate1 float64 `json:"rate_1" wire:"unsafe"`
- Rate5 float64 `json:"rate_5" wire:"unsafe"`
- Rate15 float64 `json:"rate_15" wire:"unsafe"`
- RateMean float64 `json:"rate_mean" wire:"unsafe"`
-
- // so the event can have effects in the eventmeter's consumer. runs in a go
- // routine.
- callback EventCallbackFunc
- }
-
- func (metric *EventMetric) Copy() *EventMetric {
- metricCopy := *metric
- metricCopy.meter = metric.meter.Snapshot()
- return &metricCopy
- }
-
- // called on GetMetric
- func (metric *EventMetric) fillMetric() *EventMetric {
- metric.Count = metric.meter.Count()
- metric.Rate1 = metric.meter.Rate1()
- metric.Rate5 = metric.meter.Rate5()
- metric.Rate15 = metric.meter.Rate15()
- metric.RateMean = metric.meter.RateMean()
- return metric
- }
-
- // EventCallbackFunc is a closure to enable side effects from receiving an
- // event.
- type EventCallbackFunc func(em *EventMetric, data interface{})
-
- // EventUnmarshalFunc is a closure to get the query and data out of the raw
- // JSON received over the RPC WebSocket.
- type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error)
-
- // LatencyCallbackFunc is a closure to enable side effects from receiving a latency.
- type LatencyCallbackFunc func(meanLatencyNanoSeconds float64)
-
- // DisconnectCallbackFunc is a closure to notify a consumer that the connection
- // has died.
- type DisconnectCallbackFunc func()
-
- // EventMeter tracks events, reports latency and disconnects.
- type EventMeter struct {
- wsc *client.WSClient
-
- mtx sync.Mutex
- queryToMetricMap map[string]*EventMetric
-
- unmarshalEvent EventUnmarshalFunc
- latencyCallback LatencyCallbackFunc
- disconnectCallback DisconnectCallbackFunc
- subscribed bool
-
- quit chan struct{}
-
- logger log.Logger
- }
-
- func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
- return &EventMeter{
- wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)),
- queryToMetricMap: make(map[string]*EventMetric),
- unmarshalEvent: unmarshalEvent,
- logger: log.NewNopLogger(),
- }
- }
-
- // SetLogger lets you set your own logger.
- func (em *EventMeter) SetLogger(l log.Logger) {
- em.logger = l
- em.wsc.SetLogger(l.With("module", "rpcclient"))
- }
-
- // String returns a string representation of event meter.
- func (em *EventMeter) String() string {
- return em.wsc.Address
- }
-
- // Start boots up event meter.
- func (em *EventMeter) Start() error {
- if err := em.wsc.Start(); err != nil {
- return err
- }
-
- em.quit = make(chan struct{})
- go em.receiveRoutine()
- go em.disconnectRoutine()
-
- err := em.subscribe()
- if err != nil {
- return err
- }
- em.subscribed = true
- return nil
- }
-
- // Stop stops event meter.
- func (em *EventMeter) Stop() {
- close(em.quit)
-
- if em.wsc.IsRunning() {
- em.wsc.Stop()
- }
- }
-
- // Subscribe for the given query. Callback function will be called upon
- // receiving an event.
- func (em *EventMeter) Subscribe(query string, cb EventCallbackFunc) error {
- em.mtx.Lock()
- defer em.mtx.Unlock()
-
- if err := em.wsc.Subscribe(context.TODO(), query); err != nil {
- return err
- }
-
- metric := &EventMetric{
- meter: metrics.NewMeter(),
- callback: cb,
- }
- em.queryToMetricMap[query] = metric
- return nil
- }
-
- // Unsubscribe from the given query.
- func (em *EventMeter) Unsubscribe(query string) error {
- em.mtx.Lock()
- defer em.mtx.Unlock()
- if err := em.wsc.Unsubscribe(context.TODO(), query); err != nil {
- return err
- }
-
- return nil
- }
-
- // GetMetric fills in the latest data for an query and return a copy.
- func (em *EventMeter) GetMetric(query string) (*EventMetric, error) {
- em.mtx.Lock()
- defer em.mtx.Unlock()
- metric, ok := em.queryToMetricMap[query]
- if !ok {
- return nil, fmt.Errorf("unknown query: %s", query)
- }
- return metric.fillMetric().Copy(), nil
- }
-
- // RegisterLatencyCallback allows you to set latency callback.
- func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) {
- em.mtx.Lock()
- defer em.mtx.Unlock()
- em.latencyCallback = f
- }
-
- // RegisterDisconnectCallback allows you to set disconnect callback.
- func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) {
- em.mtx.Lock()
- defer em.mtx.Unlock()
- em.disconnectCallback = f
- }
-
- ///////////////////////////////////////////////////////////////////////////////
- // Private
-
- func (em *EventMeter) subscribe() error {
- for query, _ := range em.queryToMetricMap {
- if err := em.wsc.Subscribe(context.TODO(), query); err != nil {
- return err
- }
- }
- return nil
- }
-
- func (em *EventMeter) receiveRoutine() {
- latencyTicker := time.NewTicker(latencyPeriod)
- for {
- select {
- case resp := <-em.wsc.ResponsesCh:
- if resp.Error != nil {
- em.logger.Error("expected some event, got error", "err", resp.Error.Error())
- continue
- }
- query, data, err := em.unmarshalEvent(resp.Result)
- if err != nil {
- em.logger.Error("failed to unmarshal event", "err", err)
- continue
- }
- if query != "" { // FIXME how can it be an empty string?
- em.updateMetric(query, data)
- }
- case <-latencyTicker.C:
- if em.wsc.IsActive() {
- em.callLatencyCallback(em.wsc.PingPongLatencyTimer.Mean())
- }
- case <-em.wsc.Quit():
- return
- case <-em.quit:
- return
- }
- }
- }
-
- func (em *EventMeter) disconnectRoutine() {
- ticker := time.NewTicker(connectionCheckPeriod)
- for {
- select {
- case <-ticker.C:
- if em.wsc.IsReconnecting() && em.subscribed { // notify user about disconnect only once
- em.callDisconnectCallback()
- em.subscribed = false
- } else if !em.wsc.IsReconnecting() && !em.subscribed { // resubscribe
- em.subscribe()
- em.subscribed = true
- }
- case <-em.wsc.Quit():
- return
- case <-em.quit:
- return
- }
- }
- }
-
- func (em *EventMeter) updateMetric(query string, data events.EventData) {
- em.mtx.Lock()
- defer em.mtx.Unlock()
-
- metric, ok := em.queryToMetricMap[query]
- if !ok {
- // we already unsubscribed, or got an unexpected query
- return
- }
-
- last := metric.LastHeard
- metric.LastHeard = time.Now()
- metric.meter.Mark(1)
- dur := int64(metric.LastHeard.Sub(last))
- if dur < metric.MinDuration {
- metric.MinDuration = dur
- }
- if !last.IsZero() && dur > metric.MaxDuration {
- metric.MaxDuration = dur
- }
-
- if metric.callback != nil {
- go metric.callback(metric.Copy(), data)
- }
- }
-
- func (em *EventMeter) callDisconnectCallback() {
- em.mtx.Lock()
- if em.disconnectCallback != nil {
- go em.disconnectCallback()
- }
- em.mtx.Unlock()
- }
-
- func (em *EventMeter) callLatencyCallback(meanLatencyNanoSeconds float64) {
- em.mtx.Lock()
- if em.latencyCallback != nil {
- go em.latencyCallback(meanLatencyNanoSeconds)
- }
- em.mtx.Unlock()
- }
|