You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

296 lines
7.3 KiB

  1. // eventmeter - generic system to subscribe to events and record their frequency.
  2. package eventmeter
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "sync"
  8. "time"
  9. metrics "github.com/rcrowley/go-metrics"
  10. "github.com/tendermint/tendermint/libs/events"
  11. "github.com/tendermint/tendermint/libs/log"
  12. client "github.com/tendermint/tendermint/rpc/lib/client"
  13. )
  14. const (
  15. // Get ping/pong latency and call LatencyCallbackFunc with this period.
  16. latencyPeriod = 1 * time.Second
  17. // Check if the WS client is connected every
  18. connectionCheckPeriod = 100 * time.Millisecond
  19. )
  20. // EventMetric exposes metrics for an event.
  21. type EventMetric struct {
  22. ID string `json:"id"`
  23. Started time.Time `json:"start_time"`
  24. LastHeard time.Time `json:"last_heard"`
  25. MinDuration int64 `json:"min_duration"`
  26. MaxDuration int64 `json:"max_duration"`
  27. // tracks event count and rate
  28. meter metrics.Meter
  29. // filled in from the Meter
  30. Count int64 `json:"count"`
  31. Rate1 float64 `json:"rate_1" amino:"unsafe"`
  32. Rate5 float64 `json:"rate_5" amino:"unsafe"`
  33. Rate15 float64 `json:"rate_15" amino:"unsafe"`
  34. RateMean float64 `json:"rate_mean" amino:"unsafe"`
  35. // so the event can have effects in the eventmeter's consumer. runs in a go
  36. // routine.
  37. callback EventCallbackFunc
  38. }
  39. func (metric *EventMetric) Copy() *EventMetric {
  40. metricCopy := *metric
  41. metricCopy.meter = metric.meter.Snapshot()
  42. return &metricCopy
  43. }
  44. // called on GetMetric
  45. func (metric *EventMetric) fillMetric() *EventMetric {
  46. metric.Count = metric.meter.Count()
  47. metric.Rate1 = metric.meter.Rate1()
  48. metric.Rate5 = metric.meter.Rate5()
  49. metric.Rate15 = metric.meter.Rate15()
  50. metric.RateMean = metric.meter.RateMean()
  51. return metric
  52. }
  53. // EventCallbackFunc is a closure to enable side effects from receiving an
  54. // event.
  55. type EventCallbackFunc func(em *EventMetric, data interface{})
  56. // EventUnmarshalFunc is a closure to get the query and data out of the raw
  57. // JSON received over the RPC WebSocket.
  58. type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error)
  59. // LatencyCallbackFunc is a closure to enable side effects from receiving a latency.
  60. type LatencyCallbackFunc func(meanLatencyNanoSeconds float64)
  61. // DisconnectCallbackFunc is a closure to notify a consumer that the connection
  62. // has died.
  63. type DisconnectCallbackFunc func()
  64. // EventMeter tracks events, reports latency and disconnects.
  65. type EventMeter struct {
  66. wsc *client.WSClient
  67. mtx sync.Mutex
  68. queryToMetricMap map[string]*EventMetric
  69. unmarshalEvent EventUnmarshalFunc
  70. latencyCallback LatencyCallbackFunc
  71. disconnectCallback DisconnectCallbackFunc
  72. subscribed bool
  73. quit chan struct{}
  74. logger log.Logger
  75. }
  76. func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
  77. return &EventMeter{
  78. wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)),
  79. queryToMetricMap: make(map[string]*EventMetric),
  80. unmarshalEvent: unmarshalEvent,
  81. logger: log.NewNopLogger(),
  82. }
  83. }
  84. // SetLogger lets you set your own logger.
  85. func (em *EventMeter) SetLogger(l log.Logger) {
  86. em.logger = l
  87. em.wsc.SetLogger(l.With("module", "rpcclient"))
  88. }
  89. // String returns a string representation of event meter.
  90. func (em *EventMeter) String() string {
  91. return em.wsc.Address
  92. }
  93. // Start boots up event meter.
  94. func (em *EventMeter) Start() error {
  95. if err := em.wsc.Start(); err != nil {
  96. return err
  97. }
  98. em.quit = make(chan struct{})
  99. go em.receiveRoutine()
  100. go em.disconnectRoutine()
  101. err := em.subscribe()
  102. if err != nil {
  103. return err
  104. }
  105. em.subscribed = true
  106. return nil
  107. }
  108. // Stop stops event meter.
  109. func (em *EventMeter) Stop() {
  110. close(em.quit)
  111. if em.wsc.IsRunning() {
  112. em.wsc.Stop()
  113. }
  114. }
  115. // Subscribe for the given query. Callback function will be called upon
  116. // receiving an event.
  117. func (em *EventMeter) Subscribe(query string, cb EventCallbackFunc) error {
  118. em.mtx.Lock()
  119. defer em.mtx.Unlock()
  120. if err := em.wsc.Subscribe(context.TODO(), query); err != nil {
  121. return err
  122. }
  123. metric := &EventMetric{
  124. meter: metrics.NewMeter(),
  125. callback: cb,
  126. }
  127. em.queryToMetricMap[query] = metric
  128. return nil
  129. }
  130. // Unsubscribe from the given query.
  131. func (em *EventMeter) Unsubscribe(query string) error {
  132. em.mtx.Lock()
  133. defer em.mtx.Unlock()
  134. return em.wsc.Unsubscribe(context.TODO(), query)
  135. }
  136. // GetMetric fills in the latest data for an query and return a copy.
  137. func (em *EventMeter) GetMetric(query string) (*EventMetric, error) {
  138. em.mtx.Lock()
  139. defer em.mtx.Unlock()
  140. metric, ok := em.queryToMetricMap[query]
  141. if !ok {
  142. return nil, fmt.Errorf("unknown query: %s", query)
  143. }
  144. return metric.fillMetric().Copy(), nil
  145. }
  146. // RegisterLatencyCallback allows you to set latency callback.
  147. func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) {
  148. em.mtx.Lock()
  149. defer em.mtx.Unlock()
  150. em.latencyCallback = f
  151. }
  152. // RegisterDisconnectCallback allows you to set disconnect callback.
  153. func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) {
  154. em.mtx.Lock()
  155. defer em.mtx.Unlock()
  156. em.disconnectCallback = f
  157. }
  158. ///////////////////////////////////////////////////////////////////////////////
  159. // Private
  160. func (em *EventMeter) subscribe() error {
  161. for query, _ := range em.queryToMetricMap {
  162. if err := em.wsc.Subscribe(context.TODO(), query); err != nil {
  163. return err
  164. }
  165. }
  166. return nil
  167. }
  168. func (em *EventMeter) receiveRoutine() {
  169. latencyTicker := time.NewTicker(latencyPeriod)
  170. for {
  171. select {
  172. case resp := <-em.wsc.ResponsesCh:
  173. if resp.Error != nil {
  174. em.logger.Error("expected some event, got error", "err", resp.Error.Error())
  175. continue
  176. }
  177. query, data, err := em.unmarshalEvent(resp.Result)
  178. if err != nil {
  179. em.logger.Error("failed to unmarshal event", "err", err)
  180. continue
  181. }
  182. if query != "" { // FIXME how can it be an empty string?
  183. em.updateMetric(query, data)
  184. }
  185. case <-latencyTicker.C:
  186. if em.wsc.IsActive() {
  187. em.callLatencyCallback(em.wsc.PingPongLatencyTimer.Mean())
  188. }
  189. case <-em.wsc.Quit():
  190. return
  191. case <-em.quit:
  192. return
  193. }
  194. }
  195. }
  196. func (em *EventMeter) disconnectRoutine() {
  197. ticker := time.NewTicker(connectionCheckPeriod)
  198. for {
  199. select {
  200. case <-ticker.C:
  201. if em.wsc.IsReconnecting() && em.subscribed { // notify user about disconnect only once
  202. em.callDisconnectCallback()
  203. em.subscribed = false
  204. } else if !em.wsc.IsReconnecting() && !em.subscribed { // resubscribe
  205. em.subscribe()
  206. em.subscribed = true
  207. }
  208. case <-em.wsc.Quit():
  209. return
  210. case <-em.quit:
  211. return
  212. }
  213. }
  214. }
  215. func (em *EventMeter) updateMetric(query string, data events.EventData) {
  216. em.mtx.Lock()
  217. defer em.mtx.Unlock()
  218. metric, ok := em.queryToMetricMap[query]
  219. if !ok {
  220. // we already unsubscribed, or got an unexpected query
  221. return
  222. }
  223. last := metric.LastHeard
  224. metric.LastHeard = time.Now()
  225. metric.meter.Mark(1)
  226. dur := int64(metric.LastHeard.Sub(last))
  227. if dur < metric.MinDuration {
  228. metric.MinDuration = dur
  229. }
  230. if !last.IsZero() && dur > metric.MaxDuration {
  231. metric.MaxDuration = dur
  232. }
  233. if metric.callback != nil {
  234. go metric.callback(metric.Copy(), data)
  235. }
  236. }
  237. func (em *EventMeter) callDisconnectCallback() {
  238. em.mtx.Lock()
  239. if em.disconnectCallback != nil {
  240. go em.disconnectCallback()
  241. }
  242. em.mtx.Unlock()
  243. }
  244. func (em *EventMeter) callLatencyCallback(meanLatencyNanoSeconds float64) {
  245. em.mtx.Lock()
  246. if em.latencyCallback != nil {
  247. go em.latencyCallback(meanLatencyNanoSeconds)
  248. }
  249. em.mtx.Unlock()
  250. }