You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

306 lines
7.6 KiB

  1. // eventmeter - generic system to subscribe to events and record their frequency.
  2. package eventmeter
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "sync"
  8. "time"
  9. metrics "github.com/rcrowley/go-metrics"
  10. client "github.com/tendermint/tendermint/rpc/lib/client"
  11. "github.com/tendermint/tmlibs/events"
  12. "github.com/tendermint/tmlibs/log"
  13. )
  14. const (
  15. // Get ping/pong latency and call LatencyCallbackFunc with this period.
  16. latencyPeriod = 1 * time.Second
  17. // Check if the WS client is connected every
  18. connectionCheckPeriod = 100 * time.Millisecond
  19. )
  20. // EventMetric exposes metrics for an event.
  21. type EventMetric struct {
  22. ID string `json:"id"`
  23. Started time.Time `json:"start_time"`
  24. LastHeard time.Time `json:"last_heard"`
  25. MinDuration int64 `json:"min_duration"`
  26. MaxDuration int64 `json:"max_duration"`
  27. // tracks event count and rate
  28. meter metrics.Meter
  29. // filled in from the Meter
  30. Count int64 `json:"count"`
  31. Rate1 float64 `json:"rate_1" wire:"unsafe"`
  32. Rate5 float64 `json:"rate_5" wire:"unsafe"`
  33. Rate15 float64 `json:"rate_15" wire:"unsafe"`
  34. RateMean float64 `json:"rate_mean" wire:"unsafe"`
  35. // so the event can have effects in the eventmeter's consumer. runs in a go
  36. // routine.
  37. callback EventCallbackFunc
  38. }
  39. func (metric *EventMetric) Copy() *EventMetric {
  40. metricCopy := *metric
  41. metricCopy.meter = metric.meter.Snapshot()
  42. return &metricCopy
  43. }
  44. // called on GetMetric
  45. func (metric *EventMetric) fillMetric() *EventMetric {
  46. metric.Count = metric.meter.Count()
  47. metric.Rate1 = metric.meter.Rate1()
  48. metric.Rate5 = metric.meter.Rate5()
  49. metric.Rate15 = metric.meter.Rate15()
  50. metric.RateMean = metric.meter.RateMean()
  51. return metric
  52. }
  53. // EventCallbackFunc is a closure to enable side effects from receiving an
  54. // event.
  55. type EventCallbackFunc func(em *EventMetric, data interface{})
  56. // EventUnmarshalFunc is a closure to get the eventType and data out of the raw
  57. // JSON received over the RPC WebSocket.
  58. type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error)
  59. // LatencyCallbackFunc is a closure to enable side effects from receiving a latency.
  60. type LatencyCallbackFunc func(meanLatencyNanoSeconds float64)
  61. // DisconnectCallbackFunc is a closure to notify a consumer that the connection
  62. // has died.
  63. type DisconnectCallbackFunc func()
  64. // EventMeter tracks events, reports latency and disconnects.
  65. type EventMeter struct {
  66. wsc *client.WSClient
  67. mtx sync.Mutex
  68. events map[string]*EventMetric
  69. unmarshalEvent EventUnmarshalFunc
  70. latencyCallback LatencyCallbackFunc
  71. disconnectCallback DisconnectCallbackFunc
  72. subscribed bool
  73. quit chan struct{}
  74. logger log.Logger
  75. }
  76. func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
  77. return &EventMeter{
  78. wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)),
  79. events: make(map[string]*EventMetric),
  80. unmarshalEvent: unmarshalEvent,
  81. logger: log.NewNopLogger(),
  82. }
  83. }
  84. // SetLogger lets you set your own logger.
  85. func (em *EventMeter) SetLogger(l log.Logger) {
  86. em.logger = l
  87. em.wsc.SetLogger(l.With("module", "rpcclient"))
  88. }
  89. // String returns a string representation of event meter.
  90. func (em *EventMeter) String() string {
  91. return em.wsc.Address
  92. }
  93. // Start boots up event meter.
  94. func (em *EventMeter) Start() error {
  95. if _, err := em.wsc.Start(); err != nil {
  96. return err
  97. }
  98. em.quit = make(chan struct{})
  99. go em.receiveRoutine()
  100. go em.disconnectRoutine()
  101. err := em.subscribe()
  102. if err != nil {
  103. return err
  104. }
  105. em.subscribed = true
  106. return nil
  107. }
  108. // Stop stops event meter.
  109. func (em *EventMeter) Stop() {
  110. close(em.quit)
  111. if em.wsc.IsRunning() {
  112. em.wsc.Stop()
  113. }
  114. }
  115. // Subscribe for the given event type. Callback function will be called upon
  116. // receiving an event.
  117. func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error {
  118. em.mtx.Lock()
  119. defer em.mtx.Unlock()
  120. if _, ok := em.events[eventType]; ok {
  121. return fmt.Errorf("subscribtion already exists")
  122. }
  123. if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil {
  124. return err
  125. }
  126. metric := &EventMetric{
  127. meter: metrics.NewMeter(),
  128. callback: cb,
  129. }
  130. em.events[eventType] = metric
  131. return nil
  132. }
  133. // Unsubscribe from the given event type.
  134. func (em *EventMeter) Unsubscribe(eventType string) error {
  135. em.mtx.Lock()
  136. defer em.mtx.Unlock()
  137. if err := em.wsc.Unsubscribe(context.TODO(), eventType); err != nil {
  138. return err
  139. }
  140. // XXX: should we persist or save this info first?
  141. delete(em.events, eventType)
  142. return nil
  143. }
  144. // GetMetric fills in the latest data for an event and return a copy.
  145. func (em *EventMeter) GetMetric(eventType string) (*EventMetric, error) {
  146. em.mtx.Lock()
  147. defer em.mtx.Unlock()
  148. metric, ok := em.events[eventType]
  149. if !ok {
  150. return nil, fmt.Errorf("unknown event: %s", eventType)
  151. }
  152. return metric.fillMetric().Copy(), nil
  153. }
  154. // RegisterLatencyCallback allows you to set latency callback.
  155. func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) {
  156. em.mtx.Lock()
  157. defer em.mtx.Unlock()
  158. em.latencyCallback = f
  159. }
  160. // RegisterDisconnectCallback allows you to set disconnect callback.
  161. func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) {
  162. em.mtx.Lock()
  163. defer em.mtx.Unlock()
  164. em.disconnectCallback = f
  165. }
  166. ///////////////////////////////////////////////////////////////////////////////
  167. // Private
  168. func (em *EventMeter) subscribe() error {
  169. for eventType, _ := range em.events {
  170. if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil {
  171. return err
  172. }
  173. }
  174. return nil
  175. }
  176. func (em *EventMeter) receiveRoutine() {
  177. latencyTicker := time.NewTicker(latencyPeriod)
  178. for {
  179. select {
  180. case rawEvent := <-em.wsc.ResultsCh:
  181. if rawEvent == nil {
  182. em.logger.Error("expected some event, got nil")
  183. continue
  184. }
  185. eventType, data, err := em.unmarshalEvent(rawEvent)
  186. if err != nil {
  187. em.logger.Error("failed to unmarshal event", "err", err)
  188. continue
  189. }
  190. if eventType != "" { // FIXME how can it be an empty string?
  191. em.updateMetric(eventType, data)
  192. }
  193. case err := <-em.wsc.ErrorsCh:
  194. if err != nil {
  195. em.logger.Error("expected some event, got error", "err", err)
  196. }
  197. case <-latencyTicker.C:
  198. if em.wsc.IsActive() {
  199. em.callLatencyCallback(em.wsc.PingPongLatencyTimer.Mean())
  200. }
  201. case <-em.wsc.Quit:
  202. return
  203. case <-em.quit:
  204. return
  205. }
  206. }
  207. }
  208. func (em *EventMeter) disconnectRoutine() {
  209. ticker := time.NewTicker(connectionCheckPeriod)
  210. for {
  211. select {
  212. case <-ticker.C:
  213. if em.wsc.IsReconnecting() && em.subscribed { // notify user about disconnect only once
  214. em.callDisconnectCallback()
  215. em.subscribed = false
  216. } else if !em.wsc.IsReconnecting() && !em.subscribed { // resubscribe
  217. em.subscribe()
  218. em.subscribed = true
  219. }
  220. case <-em.wsc.Quit:
  221. return
  222. case <-em.quit:
  223. return
  224. }
  225. }
  226. }
  227. func (em *EventMeter) updateMetric(eventType string, data events.EventData) {
  228. em.mtx.Lock()
  229. defer em.mtx.Unlock()
  230. metric, ok := em.events[eventType]
  231. if !ok {
  232. // we already unsubscribed, or got an unexpected event
  233. return
  234. }
  235. last := metric.LastHeard
  236. metric.LastHeard = time.Now()
  237. metric.meter.Mark(1)
  238. dur := int64(metric.LastHeard.Sub(last))
  239. if dur < metric.MinDuration {
  240. metric.MinDuration = dur
  241. }
  242. if !last.IsZero() && dur > metric.MaxDuration {
  243. metric.MaxDuration = dur
  244. }
  245. if metric.callback != nil {
  246. go metric.callback(metric.Copy(), data)
  247. }
  248. }
  249. func (em *EventMeter) callDisconnectCallback() {
  250. em.mtx.Lock()
  251. if em.disconnectCallback != nil {
  252. go em.disconnectCallback()
  253. }
  254. em.mtx.Unlock()
  255. }
  256. func (em *EventMeter) callLatencyCallback(meanLatencyNanoSeconds float64) {
  257. em.mtx.Lock()
  258. if em.latencyCallback != nil {
  259. go em.latencyCallback(meanLatencyNanoSeconds)
  260. }
  261. em.mtx.Unlock()
  262. }