You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

298 lines
7.3 KiB

  1. // eventmeter - generic system to subscribe to events and record their frequency.
  2. package eventmeter
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "sync"
  8. "time"
  9. metrics "github.com/rcrowley/go-metrics"
  10. client "github.com/tendermint/tendermint/rpc/lib/client"
  11. "github.com/tendermint/tmlibs/events"
  12. "github.com/tendermint/tmlibs/log"
  13. )
  14. const (
  15. // Get ping/pong latency and call LatencyCallbackFunc with this period.
  16. latencyPeriod = 1 * time.Second
  17. // Check if the WS client is connected every
  18. connectionCheckPeriod = 100 * time.Millisecond
  19. )
  20. // EventMetric exposes metrics for an event.
  21. type EventMetric struct {
  22. ID string `json:"id"`
  23. Started time.Time `json:"start_time"`
  24. LastHeard time.Time `json:"last_heard"`
  25. MinDuration int64 `json:"min_duration"`
  26. MaxDuration int64 `json:"max_duration"`
  27. // tracks event count and rate
  28. meter metrics.Meter
  29. // filled in from the Meter
  30. Count int64 `json:"count"`
  31. Rate1 float64 `json:"rate_1" wire:"unsafe"`
  32. Rate5 float64 `json:"rate_5" wire:"unsafe"`
  33. Rate15 float64 `json:"rate_15" wire:"unsafe"`
  34. RateMean float64 `json:"rate_mean" wire:"unsafe"`
  35. // so the event can have effects in the eventmeter's consumer. runs in a go
  36. // routine.
  37. callback EventCallbackFunc
  38. }
  39. func (metric *EventMetric) Copy() *EventMetric {
  40. metricCopy := *metric
  41. metricCopy.meter = metric.meter.Snapshot()
  42. return &metricCopy
  43. }
  44. // called on GetMetric
  45. func (metric *EventMetric) fillMetric() *EventMetric {
  46. metric.Count = metric.meter.Count()
  47. metric.Rate1 = metric.meter.Rate1()
  48. metric.Rate5 = metric.meter.Rate5()
  49. metric.Rate15 = metric.meter.Rate15()
  50. metric.RateMean = metric.meter.RateMean()
  51. return metric
  52. }
  53. // EventCallbackFunc is a closure to enable side effects from receiving an
  54. // event.
  55. type EventCallbackFunc func(em *EventMetric, data interface{})
  56. // EventUnmarshalFunc is a closure to get the query and data out of the raw
  57. // JSON received over the RPC WebSocket.
  58. type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error)
  59. // LatencyCallbackFunc is a closure to enable side effects from receiving a latency.
  60. type LatencyCallbackFunc func(meanLatencyNanoSeconds float64)
  61. // DisconnectCallbackFunc is a closure to notify a consumer that the connection
  62. // has died.
  63. type DisconnectCallbackFunc func()
  64. // EventMeter tracks events, reports latency and disconnects.
  65. type EventMeter struct {
  66. wsc *client.WSClient
  67. mtx sync.Mutex
  68. queryToMetricMap map[string]*EventMetric
  69. unmarshalEvent EventUnmarshalFunc
  70. latencyCallback LatencyCallbackFunc
  71. disconnectCallback DisconnectCallbackFunc
  72. subscribed bool
  73. quit chan struct{}
  74. logger log.Logger
  75. }
  76. func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
  77. return &EventMeter{
  78. wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)),
  79. queryToMetricMap: make(map[string]*EventMetric),
  80. unmarshalEvent: unmarshalEvent,
  81. logger: log.NewNopLogger(),
  82. }
  83. }
  84. // SetLogger lets you set your own logger.
  85. func (em *EventMeter) SetLogger(l log.Logger) {
  86. em.logger = l
  87. em.wsc.SetLogger(l.With("module", "rpcclient"))
  88. }
  89. // String returns a string representation of event meter.
  90. func (em *EventMeter) String() string {
  91. return em.wsc.Address
  92. }
  93. // Start boots up event meter.
  94. func (em *EventMeter) Start() error {
  95. if err := em.wsc.Start(); err != nil {
  96. return err
  97. }
  98. em.quit = make(chan struct{})
  99. go em.receiveRoutine()
  100. go em.disconnectRoutine()
  101. err := em.subscribe()
  102. if err != nil {
  103. return err
  104. }
  105. em.subscribed = true
  106. return nil
  107. }
  108. // Stop stops event meter.
  109. func (em *EventMeter) Stop() {
  110. close(em.quit)
  111. if em.wsc.IsRunning() {
  112. em.wsc.Stop()
  113. }
  114. }
  115. // Subscribe for the given query. Callback function will be called upon
  116. // receiving an event.
  117. func (em *EventMeter) Subscribe(query string, cb EventCallbackFunc) error {
  118. em.mtx.Lock()
  119. defer em.mtx.Unlock()
  120. if err := em.wsc.Subscribe(context.TODO(), query); err != nil {
  121. return err
  122. }
  123. metric := &EventMetric{
  124. meter: metrics.NewMeter(),
  125. callback: cb,
  126. }
  127. em.queryToMetricMap[query] = metric
  128. return nil
  129. }
  130. // Unsubscribe from the given query.
  131. func (em *EventMeter) Unsubscribe(query string) error {
  132. em.mtx.Lock()
  133. defer em.mtx.Unlock()
  134. if err := em.wsc.Unsubscribe(context.TODO(), query); err != nil {
  135. return err
  136. }
  137. return nil
  138. }
  139. // GetMetric fills in the latest data for an query and return a copy.
  140. func (em *EventMeter) GetMetric(query string) (*EventMetric, error) {
  141. em.mtx.Lock()
  142. defer em.mtx.Unlock()
  143. metric, ok := em.queryToMetricMap[query]
  144. if !ok {
  145. return nil, fmt.Errorf("unknown query: %s", query)
  146. }
  147. return metric.fillMetric().Copy(), nil
  148. }
  149. // RegisterLatencyCallback allows you to set latency callback.
  150. func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) {
  151. em.mtx.Lock()
  152. defer em.mtx.Unlock()
  153. em.latencyCallback = f
  154. }
  155. // RegisterDisconnectCallback allows you to set disconnect callback.
  156. func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) {
  157. em.mtx.Lock()
  158. defer em.mtx.Unlock()
  159. em.disconnectCallback = f
  160. }
  161. ///////////////////////////////////////////////////////////////////////////////
  162. // Private
  163. func (em *EventMeter) subscribe() error {
  164. for query, _ := range em.queryToMetricMap {
  165. if err := em.wsc.Subscribe(context.TODO(), query); err != nil {
  166. return err
  167. }
  168. }
  169. return nil
  170. }
  171. func (em *EventMeter) receiveRoutine() {
  172. latencyTicker := time.NewTicker(latencyPeriod)
  173. for {
  174. select {
  175. case resp := <-em.wsc.ResponsesCh:
  176. if resp.Error != nil {
  177. em.logger.Error("expected some event, got error", "err", resp.Error.Error())
  178. continue
  179. }
  180. query, data, err := em.unmarshalEvent(resp.Result)
  181. if err != nil {
  182. em.logger.Error("failed to unmarshal event", "err", err)
  183. continue
  184. }
  185. if query != "" { // FIXME how can it be an empty string?
  186. em.updateMetric(query, data)
  187. }
  188. case <-latencyTicker.C:
  189. if em.wsc.IsActive() {
  190. em.callLatencyCallback(em.wsc.PingPongLatencyTimer.Mean())
  191. }
  192. case <-em.wsc.Quit():
  193. return
  194. case <-em.quit:
  195. return
  196. }
  197. }
  198. }
  199. func (em *EventMeter) disconnectRoutine() {
  200. ticker := time.NewTicker(connectionCheckPeriod)
  201. for {
  202. select {
  203. case <-ticker.C:
  204. if em.wsc.IsReconnecting() && em.subscribed { // notify user about disconnect only once
  205. em.callDisconnectCallback()
  206. em.subscribed = false
  207. } else if !em.wsc.IsReconnecting() && !em.subscribed { // resubscribe
  208. em.subscribe()
  209. em.subscribed = true
  210. }
  211. case <-em.wsc.Quit():
  212. return
  213. case <-em.quit:
  214. return
  215. }
  216. }
  217. }
  218. func (em *EventMeter) updateMetric(query string, data events.EventData) {
  219. em.mtx.Lock()
  220. defer em.mtx.Unlock()
  221. metric, ok := em.queryToMetricMap[query]
  222. if !ok {
  223. // we already unsubscribed, or got an unexpected query
  224. return
  225. }
  226. last := metric.LastHeard
  227. metric.LastHeard = time.Now()
  228. metric.meter.Mark(1)
  229. dur := int64(metric.LastHeard.Sub(last))
  230. if dur < metric.MinDuration {
  231. metric.MinDuration = dur
  232. }
  233. if !last.IsZero() && dur > metric.MaxDuration {
  234. metric.MaxDuration = dur
  235. }
  236. if metric.callback != nil {
  237. go metric.callback(metric.Copy(), data)
  238. }
  239. }
  240. func (em *EventMeter) callDisconnectCallback() {
  241. em.mtx.Lock()
  242. if em.disconnectCallback != nil {
  243. go em.disconnectCallback()
  244. }
  245. em.mtx.Unlock()
  246. }
  247. func (em *EventMeter) callLatencyCallback(meanLatencyNanoSeconds float64) {
  248. em.mtx.Lock()
  249. if em.latencyCallback != nil {
  250. go em.latencyCallback(meanLatencyNanoSeconds)
  251. }
  252. em.mtx.Unlock()
  253. }