You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

330 lines
8.1 KiB

  1. package eventmeter
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. "github.com/pkg/errors"
  9. metrics "github.com/rcrowley/go-metrics"
  10. client "github.com/tendermint/tendermint/rpc/lib/client"
  11. "github.com/tendermint/tmlibs/events"
  12. "github.com/tendermint/tmlibs/log"
  13. )
  14. //------------------------------------------------------
  15. // Generic system to subscribe to events and record their frequency
  16. //------------------------------------------------------
  17. //------------------------------------------------------
  18. // Meter for a particular event
  19. // Closure to enable side effects from receiving an event
  20. type EventCallbackFunc func(em *EventMetric, data interface{})
  21. // Metrics for a given event
  22. type EventMetric struct {
  23. ID string `json:"id"`
  24. Started time.Time `json:"start_time"`
  25. LastHeard time.Time `json:"last_heard"`
  26. MinDuration int64 `json:"min_duration"`
  27. MaxDuration int64 `json:"max_duration"`
  28. // tracks event count and rate
  29. meter metrics.Meter
  30. // filled in from the Meter
  31. Count int64 `json:"count"`
  32. Rate1 float64 `json:"rate_1" wire:"unsafe"`
  33. Rate5 float64 `json:"rate_5" wire:"unsafe"`
  34. Rate15 float64 `json:"rate_15" wire:"unsafe"`
  35. RateMean float64 `json:"rate_mean" wire:"unsafe"`
  36. // so the event can have effects in the event-meter's consumer.
  37. // runs in a go routine
  38. callback EventCallbackFunc
  39. }
  40. func (metric *EventMetric) Copy() *EventMetric {
  41. metricCopy := *metric
  42. metricCopy.meter = metric.meter.Snapshot()
  43. return &metricCopy
  44. }
  45. // called on GetMetric
  46. func (metric *EventMetric) fillMetric() *EventMetric {
  47. metric.Count = metric.meter.Count()
  48. metric.Rate1 = metric.meter.Rate1()
  49. metric.Rate5 = metric.meter.Rate5()
  50. metric.Rate15 = metric.meter.Rate15()
  51. metric.RateMean = metric.meter.RateMean()
  52. return metric
  53. }
  54. //------------------------------------------------------
  55. // Websocket client and event meter for many events
  56. const maxPingsPerPong = 30 // if we haven't received a pong in this many attempted pings we kill the conn
  57. // Get the eventID and data out of the raw json received over the go-rpc websocket
  58. type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error)
  59. // Closure to enable side effects from receiving a pong
  60. type LatencyCallbackFunc func(meanLatencyNanoSeconds float64)
  61. // Closure to notify consumer that the connection died
  62. type DisconnectCallbackFunc func()
  63. // Each node gets an event meter to track events for that node
  64. type EventMeter struct {
  65. wsc *client.WSClient
  66. mtx sync.Mutex
  67. events map[string]*EventMetric
  68. // to record ws latency
  69. timer metrics.Timer
  70. lastPing time.Time
  71. receivedPong bool
  72. latencyCallback LatencyCallbackFunc
  73. disconnectCallback DisconnectCallbackFunc
  74. unmarshalEvent EventUnmarshalFunc
  75. quit chan struct{}
  76. logger log.Logger
  77. }
  78. func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
  79. em := &EventMeter{
  80. wsc: client.NewWSClient(addr, "/websocket"),
  81. events: make(map[string]*EventMetric),
  82. timer: metrics.NewTimer(),
  83. receivedPong: true,
  84. unmarshalEvent: unmarshalEvent,
  85. logger: log.NewNopLogger(),
  86. }
  87. return em
  88. }
  89. // SetLogger lets you set your own logger
  90. func (em *EventMeter) SetLogger(l log.Logger) {
  91. em.logger = l
  92. }
  93. func (em *EventMeter) String() string {
  94. return em.wsc.Address
  95. }
  96. func (em *EventMeter) Start() error {
  97. if _, err := em.wsc.Reset(); err != nil {
  98. return err
  99. }
  100. if _, err := em.wsc.Start(); err != nil {
  101. return err
  102. }
  103. em.wsc.Conn.SetPongHandler(func(m string) error {
  104. // NOTE: https://github.com/gorilla/websocket/issues/97
  105. em.mtx.Lock()
  106. defer em.mtx.Unlock()
  107. em.receivedPong = true
  108. em.timer.UpdateSince(em.lastPing)
  109. if em.latencyCallback != nil {
  110. go em.latencyCallback(em.timer.Mean())
  111. }
  112. return nil
  113. })
  114. em.quit = make(chan struct{})
  115. go em.receiveRoutine()
  116. return em.resubscribe()
  117. }
  118. // Stop stops the EventMeter.
  119. func (em *EventMeter) Stop() {
  120. close(em.quit)
  121. if em.wsc.IsRunning() {
  122. em.wsc.Stop()
  123. }
  124. }
  125. // StopAndCallDisconnectCallback stops the EventMeter and calls
  126. // disconnectCallback if present.
  127. func (em *EventMeter) StopAndCallDisconnectCallback() {
  128. if em.wsc.IsRunning() {
  129. em.wsc.Stop()
  130. }
  131. em.mtx.Lock()
  132. defer em.mtx.Unlock()
  133. if em.disconnectCallback != nil {
  134. go em.disconnectCallback()
  135. }
  136. }
  137. func (em *EventMeter) Subscribe(eventID string, cb EventCallbackFunc) error {
  138. em.mtx.Lock()
  139. defer em.mtx.Unlock()
  140. if _, ok := em.events[eventID]; ok {
  141. return fmt.Errorf("subscribtion already exists")
  142. }
  143. if err := em.wsc.Subscribe(eventID); err != nil {
  144. return err
  145. }
  146. metric := &EventMetric{
  147. ID: eventID,
  148. Started: time.Now(),
  149. MinDuration: 1 << 62,
  150. meter: metrics.NewMeter(),
  151. callback: cb,
  152. }
  153. em.events[eventID] = metric
  154. return nil
  155. }
  156. func (em *EventMeter) Unsubscribe(eventID string) error {
  157. em.mtx.Lock()
  158. defer em.mtx.Unlock()
  159. if err := em.wsc.Unsubscribe(eventID); err != nil {
  160. return err
  161. }
  162. // XXX: should we persist or save this info first?
  163. delete(em.events, eventID)
  164. return nil
  165. }
  166. // Fill in the latest data for an event and return a copy
  167. func (em *EventMeter) GetMetric(eventID string) (*EventMetric, error) {
  168. em.mtx.Lock()
  169. defer em.mtx.Unlock()
  170. metric, ok := em.events[eventID]
  171. if !ok {
  172. return nil, fmt.Errorf("Unknown event %s", eventID)
  173. }
  174. return metric.fillMetric().Copy(), nil
  175. }
  176. // Return the average latency over the websocket
  177. func (em *EventMeter) Latency() float64 {
  178. em.mtx.Lock()
  179. defer em.mtx.Unlock()
  180. return em.timer.Mean()
  181. }
  182. func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) {
  183. em.mtx.Lock()
  184. defer em.mtx.Unlock()
  185. em.latencyCallback = f
  186. }
  187. func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) {
  188. em.mtx.Lock()
  189. defer em.mtx.Unlock()
  190. em.disconnectCallback = f
  191. }
  192. //------------------------------------------------------
  193. func (em *EventMeter) resubscribe() error {
  194. for eventID, _ := range em.events {
  195. if err := em.wsc.Subscribe(eventID); err != nil {
  196. return err
  197. }
  198. }
  199. return nil
  200. }
  201. func (em *EventMeter) receiveRoutine() {
  202. pingTime := time.Second * 1
  203. pingTicker := time.NewTicker(pingTime)
  204. pingAttempts := 0 // if this hits maxPingsPerPong we kill the conn
  205. var err error
  206. for {
  207. select {
  208. case <-pingTicker.C:
  209. if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil {
  210. em.logger.Error("err", errors.Wrap(err, "failed to write ping message on websocket"))
  211. em.StopAndCallDisconnectCallback()
  212. return
  213. } else if pingAttempts >= maxPingsPerPong {
  214. em.logger.Error("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime))
  215. em.StopAndCallDisconnectCallback()
  216. return
  217. }
  218. case r := <-em.wsc.ResultsCh:
  219. if r == nil {
  220. em.logger.Error("err", errors.New("Expected some event, received nil"))
  221. em.StopAndCallDisconnectCallback()
  222. return
  223. }
  224. eventID, data, err := em.unmarshalEvent(r)
  225. if err != nil {
  226. em.logger.Error("err", errors.Wrap(err, "failed to unmarshal event"))
  227. continue
  228. }
  229. if eventID != "" {
  230. em.updateMetric(eventID, data)
  231. }
  232. case <-em.wsc.Quit:
  233. em.logger.Error("err", errors.New("WSClient closed unexpectedly"))
  234. em.StopAndCallDisconnectCallback()
  235. return
  236. case <-em.quit:
  237. return
  238. }
  239. }
  240. }
  241. func (em *EventMeter) pingForLatency(pingAttempts int) (int, error) {
  242. em.mtx.Lock()
  243. defer em.mtx.Unlock()
  244. // ping to record latency
  245. if !em.receivedPong {
  246. return pingAttempts + 1, nil
  247. }
  248. em.lastPing = time.Now()
  249. em.receivedPong = false
  250. err := em.wsc.Conn.WriteMessage(websocket.PingMessage, []byte{})
  251. if err != nil {
  252. return pingAttempts, err
  253. }
  254. return 0, nil
  255. }
  256. func (em *EventMeter) updateMetric(eventID string, data events.EventData) {
  257. em.mtx.Lock()
  258. defer em.mtx.Unlock()
  259. metric, ok := em.events[eventID]
  260. if !ok {
  261. // we already unsubscribed, or got an unexpected event
  262. return
  263. }
  264. last := metric.LastHeard
  265. metric.LastHeard = time.Now()
  266. metric.meter.Mark(1)
  267. dur := int64(metric.LastHeard.Sub(last))
  268. if dur < metric.MinDuration {
  269. metric.MinDuration = dur
  270. }
  271. if !last.IsZero() && dur > metric.MaxDuration {
  272. metric.MaxDuration = dur
  273. }
  274. if metric.callback != nil {
  275. go metric.callback(metric.Copy(), data)
  276. }
  277. }