|
|
@ -1,30 +1,28 @@ |
|
|
|
// eventmeter - generic system to subscribe to events and record their frequency.
|
|
|
|
package eventmeter |
|
|
|
|
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"encoding/json" |
|
|
|
"fmt" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/gorilla/websocket" |
|
|
|
"github.com/pkg/errors" |
|
|
|
metrics "github.com/rcrowley/go-metrics" |
|
|
|
client "github.com/tendermint/tendermint/rpc/lib/client" |
|
|
|
"github.com/tendermint/tmlibs/events" |
|
|
|
"github.com/tendermint/tmlibs/log" |
|
|
|
) |
|
|
|
|
|
|
|
//------------------------------------------------------
|
|
|
|
// Generic system to subscribe to events and record their frequency
|
|
|
|
//------------------------------------------------------
|
|
|
|
const ( |
|
|
|
// Get ping/pong latency and call LatencyCallbackFunc with this period.
|
|
|
|
latencyPeriod = 1 * time.Second |
|
|
|
|
|
|
|
//------------------------------------------------------
|
|
|
|
// Meter for a particular event
|
|
|
|
|
|
|
|
// Closure to enable side effects from receiving an event
|
|
|
|
type EventCallbackFunc func(em *EventMetric, data interface{}) |
|
|
|
// Check if the WS client is connected every
|
|
|
|
connectionCheckPeriod = 100 * time.Millisecond |
|
|
|
) |
|
|
|
|
|
|
|
// Metrics for a given event
|
|
|
|
// EventMetric exposes metrics for an event.
|
|
|
|
type EventMetric struct { |
|
|
|
ID string `json:"id"` |
|
|
|
Started time.Time `json:"start_time"` |
|
|
@ -42,15 +40,15 @@ type EventMetric struct { |
|
|
|
Rate15 float64 `json:"rate_15" wire:"unsafe"` |
|
|
|
RateMean float64 `json:"rate_mean" wire:"unsafe"` |
|
|
|
|
|
|
|
// so the event can have effects in the event-meter's consumer.
|
|
|
|
// runs in a go routine
|
|
|
|
// so the event can have effects in the eventmeter's consumer. runs in a go
|
|
|
|
// routine.
|
|
|
|
callback EventCallbackFunc |
|
|
|
} |
|
|
|
|
|
|
|
func (metric *EventMetric) Copy() *EventMetric { |
|
|
|
metric2 := *metric |
|
|
|
metric2.meter = metric.meter.Snapshot() |
|
|
|
return &metric2 |
|
|
|
metricCopy := *metric |
|
|
|
metricCopy.meter = metric.meter.Snapshot() |
|
|
|
return &metricCopy |
|
|
|
} |
|
|
|
|
|
|
|
// called on GetMetric
|
|
|
@ -63,35 +61,32 @@ func (metric *EventMetric) fillMetric() *EventMetric { |
|
|
|
return metric |
|
|
|
} |
|
|
|
|
|
|
|
//------------------------------------------------------
|
|
|
|
// Websocket client and event meter for many events
|
|
|
|
|
|
|
|
const maxPingsPerPong = 30 // if we haven't received a pong in this many attempted pings we kill the conn
|
|
|
|
// EventCallbackFunc is a closure to enable side effects from receiving an
|
|
|
|
// event.
|
|
|
|
type EventCallbackFunc func(em *EventMetric, data interface{}) |
|
|
|
|
|
|
|
// Get the eventID and data out of the raw json received over the go-rpc websocket
|
|
|
|
// EventUnmarshalFunc is a closure to get the eventType and data out of the raw
|
|
|
|
// JSON received over the RPC WebSocket.
|
|
|
|
type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error) |
|
|
|
|
|
|
|
// Closure to enable side effects from receiving a pong
|
|
|
|
// LatencyCallbackFunc is a closure to enable side effects from receiving a latency.
|
|
|
|
type LatencyCallbackFunc func(meanLatencyNanoSeconds float64) |
|
|
|
|
|
|
|
// Closure to notify consumer that the connection died
|
|
|
|
// DisconnectCallbackFunc is a closure to notify a consumer that the connection
|
|
|
|
// has died.
|
|
|
|
type DisconnectCallbackFunc func() |
|
|
|
|
|
|
|
// Each node gets an event meter to track events for that node
|
|
|
|
// EventMeter tracks events, reports latency and disconnects.
|
|
|
|
type EventMeter struct { |
|
|
|
wsc *client.WSClient |
|
|
|
|
|
|
|
mtx sync.Mutex |
|
|
|
events map[string]*EventMetric |
|
|
|
|
|
|
|
// to record ws latency
|
|
|
|
timer metrics.Timer |
|
|
|
lastPing time.Time |
|
|
|
receivedPong bool |
|
|
|
unmarshalEvent EventUnmarshalFunc |
|
|
|
latencyCallback LatencyCallbackFunc |
|
|
|
disconnectCallback DisconnectCallbackFunc |
|
|
|
|
|
|
|
unmarshalEvent EventUnmarshalFunc |
|
|
|
subscribed bool |
|
|
|
|
|
|
|
quit chan struct{} |
|
|
|
|
|
|
@ -99,54 +94,44 @@ type EventMeter struct { |
|
|
|
} |
|
|
|
|
|
|
|
func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { |
|
|
|
em := &EventMeter{ |
|
|
|
wsc: client.NewWSClient(addr, "/websocket"), |
|
|
|
return &EventMeter{ |
|
|
|
wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)), |
|
|
|
events: make(map[string]*EventMetric), |
|
|
|
timer: metrics.NewTimer(), |
|
|
|
receivedPong: true, |
|
|
|
unmarshalEvent: unmarshalEvent, |
|
|
|
logger: log.NewNopLogger(), |
|
|
|
} |
|
|
|
return em |
|
|
|
} |
|
|
|
|
|
|
|
// SetLogger lets you set your own logger
|
|
|
|
// SetLogger lets you set your own logger.
|
|
|
|
func (em *EventMeter) SetLogger(l log.Logger) { |
|
|
|
em.logger = l |
|
|
|
em.wsc.SetLogger(l.With("module", "rpcclient")) |
|
|
|
} |
|
|
|
|
|
|
|
// String returns a string representation of event meter.
|
|
|
|
func (em *EventMeter) String() string { |
|
|
|
return em.wsc.Address |
|
|
|
} |
|
|
|
|
|
|
|
// Start boots up event meter.
|
|
|
|
func (em *EventMeter) Start() error { |
|
|
|
if _, err := em.wsc.Reset(); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
if _, err := em.wsc.Start(); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
em.wsc.Conn.SetPongHandler(func(m string) error { |
|
|
|
// NOTE: https://github.com/gorilla/websocket/issues/97
|
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
em.receivedPong = true |
|
|
|
em.timer.UpdateSince(em.lastPing) |
|
|
|
if em.latencyCallback != nil { |
|
|
|
go em.latencyCallback(em.timer.Mean()) |
|
|
|
} |
|
|
|
return nil |
|
|
|
}) |
|
|
|
|
|
|
|
em.quit = make(chan struct{}) |
|
|
|
go em.receiveRoutine() |
|
|
|
go em.disconnectRoutine() |
|
|
|
|
|
|
|
return em.resubscribe() |
|
|
|
err := em.subscribe() |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
em.subscribed = true |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// Stop stops the EventMeter.
|
|
|
|
// Stop stops event meter.
|
|
|
|
func (em *EventMeter) Stop() { |
|
|
|
close(em.quit) |
|
|
|
|
|
|
@ -155,88 +140,70 @@ func (em *EventMeter) Stop() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// StopAndCallDisconnectCallback stops the EventMeter and calls
|
|
|
|
// disconnectCallback if present.
|
|
|
|
func (em *EventMeter) StopAndCallDisconnectCallback() { |
|
|
|
if em.wsc.IsRunning() { |
|
|
|
em.wsc.Stop() |
|
|
|
} |
|
|
|
|
|
|
|
// Subscribe for the given event type. Callback function will be called upon
|
|
|
|
// receiving an event.
|
|
|
|
func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error { |
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
if em.disconnectCallback != nil { |
|
|
|
go em.disconnectCallback() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (em *EventMeter) Subscribe(eventID string, cb EventCallbackFunc) error { |
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
|
|
|
|
if _, ok := em.events[eventID]; ok { |
|
|
|
if _, ok := em.events[eventType]; ok { |
|
|
|
return fmt.Errorf("subscribtion already exists") |
|
|
|
} |
|
|
|
if err := em.wsc.Subscribe(eventID); err != nil { |
|
|
|
if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
metric := &EventMetric{ |
|
|
|
ID: eventID, |
|
|
|
Started: time.Now(), |
|
|
|
MinDuration: 1 << 62, |
|
|
|
meter: metrics.NewMeter(), |
|
|
|
callback: cb, |
|
|
|
meter: metrics.NewMeter(), |
|
|
|
callback: cb, |
|
|
|
} |
|
|
|
em.events[eventID] = metric |
|
|
|
em.events[eventType] = metric |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (em *EventMeter) Unsubscribe(eventID string) error { |
|
|
|
// Unsubscribe from the given event type.
|
|
|
|
func (em *EventMeter) Unsubscribe(eventType string) error { |
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
if err := em.wsc.Unsubscribe(eventID); err != nil { |
|
|
|
if err := em.wsc.Unsubscribe(context.TODO(), eventType); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
// XXX: should we persist or save this info first?
|
|
|
|
delete(em.events, eventID) |
|
|
|
delete(em.events, eventType) |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// Fill in the latest data for an event and return a copy
|
|
|
|
func (em *EventMeter) GetMetric(eventID string) (*EventMetric, error) { |
|
|
|
// GetMetric fills in the latest data for an event and return a copy.
|
|
|
|
func (em *EventMeter) GetMetric(eventType string) (*EventMetric, error) { |
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
metric, ok := em.events[eventID] |
|
|
|
metric, ok := em.events[eventType] |
|
|
|
if !ok { |
|
|
|
return nil, fmt.Errorf("Unknown event %s", eventID) |
|
|
|
return nil, fmt.Errorf("unknown event: %s", eventType) |
|
|
|
} |
|
|
|
return metric.fillMetric().Copy(), nil |
|
|
|
} |
|
|
|
|
|
|
|
// Return the average latency over the websocket
|
|
|
|
func (em *EventMeter) Latency() float64 { |
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
return em.timer.Mean() |
|
|
|
} |
|
|
|
|
|
|
|
// RegisterLatencyCallback allows you to set latency callback.
|
|
|
|
func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) { |
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
em.latencyCallback = f |
|
|
|
} |
|
|
|
|
|
|
|
// RegisterDisconnectCallback allows you to set disconnect callback.
|
|
|
|
func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) { |
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
em.disconnectCallback = f |
|
|
|
} |
|
|
|
|
|
|
|
//------------------------------------------------------
|
|
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Private
|
|
|
|
|
|
|
|
func (em *EventMeter) resubscribe() error { |
|
|
|
for eventID, _ := range em.events { |
|
|
|
if err := em.wsc.Subscribe(eventID); err != nil { |
|
|
|
func (em *EventMeter) subscribe() error { |
|
|
|
for eventType, _ := range em.events { |
|
|
|
if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
@ -244,40 +211,31 @@ func (em *EventMeter) resubscribe() error { |
|
|
|
} |
|
|
|
|
|
|
|
func (em *EventMeter) receiveRoutine() { |
|
|
|
pingTime := time.Second * 1 |
|
|
|
pingTicker := time.NewTicker(pingTime) |
|
|
|
pingAttempts := 0 // if this hits maxPingsPerPong we kill the conn
|
|
|
|
|
|
|
|
var err error |
|
|
|
latencyTicker := time.NewTicker(latencyPeriod) |
|
|
|
for { |
|
|
|
select { |
|
|
|
case <-pingTicker.C: |
|
|
|
if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil { |
|
|
|
em.logger.Error("err", errors.Wrap(err, "failed to write ping message on websocket")) |
|
|
|
em.StopAndCallDisconnectCallback() |
|
|
|
return |
|
|
|
} else if pingAttempts >= maxPingsPerPong { |
|
|
|
em.logger.Error("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime)) |
|
|
|
em.StopAndCallDisconnectCallback() |
|
|
|
return |
|
|
|
} |
|
|
|
case r := <-em.wsc.ResultsCh: |
|
|
|
if r == nil { |
|
|
|
em.logger.Error("err", errors.New("Expected some event, received nil")) |
|
|
|
em.StopAndCallDisconnectCallback() |
|
|
|
return |
|
|
|
case rawEvent := <-em.wsc.ResultsCh: |
|
|
|
if rawEvent == nil { |
|
|
|
em.logger.Error("expected some event, got nil") |
|
|
|
continue |
|
|
|
} |
|
|
|
eventID, data, err := em.unmarshalEvent(r) |
|
|
|
eventType, data, err := em.unmarshalEvent(rawEvent) |
|
|
|
if err != nil { |
|
|
|
em.logger.Error("err", errors.Wrap(err, "failed to unmarshal event")) |
|
|
|
em.logger.Error("failed to unmarshal event", "err", err) |
|
|
|
continue |
|
|
|
} |
|
|
|
if eventID != "" { |
|
|
|
em.updateMetric(eventID, data) |
|
|
|
if eventType != "" { // FIXME how can it be an empty string?
|
|
|
|
em.updateMetric(eventType, data) |
|
|
|
} |
|
|
|
case err := <-em.wsc.ErrorsCh: |
|
|
|
if err != nil { |
|
|
|
em.logger.Error("expected some event, got error", "err", err) |
|
|
|
} |
|
|
|
case <-latencyTicker.C: |
|
|
|
if em.wsc.IsActive() { |
|
|
|
em.callLatencyCallback(em.wsc.PingPongLatencyTimer.Mean()) |
|
|
|
} |
|
|
|
case <-em.wsc.Quit: |
|
|
|
em.logger.Error("err", errors.New("WSClient closed unexpectedly")) |
|
|
|
em.StopAndCallDisconnectCallback() |
|
|
|
return |
|
|
|
case <-em.quit: |
|
|
|
return |
|
|
@ -285,29 +243,31 @@ func (em *EventMeter) receiveRoutine() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (em *EventMeter) pingForLatency(pingAttempts int) (int, error) { |
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
|
|
|
|
// ping to record latency
|
|
|
|
if !em.receivedPong { |
|
|
|
return pingAttempts + 1, nil |
|
|
|
} |
|
|
|
|
|
|
|
em.lastPing = time.Now() |
|
|
|
em.receivedPong = false |
|
|
|
err := em.wsc.Conn.WriteMessage(websocket.PingMessage, []byte{}) |
|
|
|
if err != nil { |
|
|
|
return pingAttempts, err |
|
|
|
func (em *EventMeter) disconnectRoutine() { |
|
|
|
ticker := time.NewTicker(connectionCheckPeriod) |
|
|
|
for { |
|
|
|
select { |
|
|
|
case <-ticker.C: |
|
|
|
if em.wsc.IsReconnecting() && em.subscribed { // notify user about disconnect only once
|
|
|
|
em.callDisconnectCallback() |
|
|
|
em.subscribed = false |
|
|
|
} else if !em.wsc.IsReconnecting() && !em.subscribed { // resubscribe
|
|
|
|
em.subscribe() |
|
|
|
em.subscribed = true |
|
|
|
} |
|
|
|
case <-em.wsc.Quit: |
|
|
|
return |
|
|
|
case <-em.quit: |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
return 0, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (em *EventMeter) updateMetric(eventID string, data events.EventData) { |
|
|
|
func (em *EventMeter) updateMetric(eventType string, data events.EventData) { |
|
|
|
em.mtx.Lock() |
|
|
|
defer em.mtx.Unlock() |
|
|
|
|
|
|
|
metric, ok := em.events[eventID] |
|
|
|
metric, ok := em.events[eventType] |
|
|
|
if !ok { |
|
|
|
// we already unsubscribed, or got an unexpected event
|
|
|
|
return |
|
|
@ -328,3 +288,19 @@ func (em *EventMeter) updateMetric(eventID string, data events.EventData) { |
|
|
|
go metric.callback(metric.Copy(), data) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (em *EventMeter) callDisconnectCallback() { |
|
|
|
em.mtx.Lock() |
|
|
|
if em.disconnectCallback != nil { |
|
|
|
go em.disconnectCallback() |
|
|
|
} |
|
|
|
em.mtx.Unlock() |
|
|
|
} |
|
|
|
|
|
|
|
func (em *EventMeter) callLatencyCallback(meanLatencyNanoSeconds float64) { |
|
|
|
em.mtx.Lock() |
|
|
|
if em.latencyCallback != nil { |
|
|
|
go em.latencyCallback(meanLatencyNanoSeconds) |
|
|
|
} |
|
|
|
em.mtx.Unlock() |
|
|
|
} |