You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

300 lines
7.3 KiB

  1. package eventmeter
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. metrics "github.com/rcrowley/go-metrics"
  9. events "github.com/tendermint/go-events"
  10. client "github.com/tendermint/go-rpc/client"
  11. log15 "github.com/tendermint/log15"
  12. )
  13. // Log allows you to set your own logger.
  14. var Log log15.Logger
  15. //------------------------------------------------------
  16. // Generic system to subscribe to events and record their frequency
  17. //------------------------------------------------------
  18. //------------------------------------------------------
  19. // Meter for a particular event
  20. // Closure to enable side effects from receiving an event
  21. type EventCallbackFunc func(em *EventMetric, data events.EventData)
  22. // Metrics for a given event
  23. type EventMetric struct {
  24. ID string `json:"id"`
  25. Started time.Time `json:"start_time"`
  26. LastHeard time.Time `json:"last_heard"`
  27. MinDuration int64 `json:"min_duration"`
  28. MaxDuration int64 `json:"max_duration"`
  29. // tracks event count and rate
  30. meter metrics.Meter
  31. // filled in from the Meter
  32. Count int64 `json:"count"`
  33. Rate1 float64 `json:"rate_1" wire:"unsafe"`
  34. Rate5 float64 `json:"rate_5" wire:"unsafe"`
  35. Rate15 float64 `json:"rate_15" wire:"unsafe"`
  36. RateMean float64 `json:"rate_mean" wire:"unsafe"`
  37. // so the event can have effects in the event-meter's consumer.
  38. // runs in a go routine
  39. callback EventCallbackFunc
  40. }
  41. func (metric *EventMetric) Copy() *EventMetric {
  42. metric2 := *metric
  43. metric2.meter = metric.meter.Snapshot()
  44. return &metric2
  45. }
  46. // called on GetMetric
  47. func (metric *EventMetric) fillMetric() *EventMetric {
  48. metric.Count = metric.meter.Count()
  49. metric.Rate1 = metric.meter.Rate1()
  50. metric.Rate5 = metric.meter.Rate5()
  51. metric.Rate15 = metric.meter.Rate15()
  52. metric.RateMean = metric.meter.RateMean()
  53. return metric
  54. }
  55. //------------------------------------------------------
  56. // Websocket client and event meter for many events
  57. const maxPingsPerPong = 30 // if we haven't received a pong in this many attempted pings we kill the conn
  58. // Get the eventID and data out of the raw json received over the go-rpc websocket
  59. type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error)
  60. // Closure to enable side effects from receiving a pong
  61. type LatencyCallbackFunc func(meanLatencyNanoSeconds float64)
  62. // Closure to notify consumer that the connection died
  63. type DisconnectCallbackFunc func()
  64. // Each node gets an event meter to track events for that node
  65. type EventMeter struct {
  66. wsc *client.WSClient
  67. mtx sync.Mutex
  68. events map[string]*EventMetric
  69. // to record ws latency
  70. timer metrics.Timer
  71. lastPing time.Time
  72. receivedPong bool
  73. latencyCallback LatencyCallbackFunc
  74. disconnectCallback DisconnectCallbackFunc
  75. unmarshalEvent EventUnmarshalFunc
  76. quit chan struct{}
  77. }
  78. func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
  79. em := &EventMeter{
  80. wsc: client.NewWSClient(addr, "/websocket"),
  81. events: make(map[string]*EventMetric),
  82. timer: metrics.NewTimer(),
  83. receivedPong: true,
  84. unmarshalEvent: unmarshalEvent,
  85. quit: make(chan struct{}),
  86. }
  87. return em
  88. }
  89. func (em *EventMeter) String() string {
  90. return em.wsc.Address
  91. }
  92. func (em *EventMeter) Start() error {
  93. if _, err := em.wsc.Start(); err != nil {
  94. return err
  95. }
  96. em.wsc.Conn.SetPongHandler(func(m string) error {
  97. // NOTE: https://github.com/gorilla/websocket/issues/97
  98. em.mtx.Lock()
  99. defer em.mtx.Unlock()
  100. em.receivedPong = true
  101. em.timer.UpdateSince(em.lastPing)
  102. if em.latencyCallback != nil {
  103. go em.latencyCallback(em.timer.Mean())
  104. }
  105. return nil
  106. })
  107. go em.receiveRoutine()
  108. return nil
  109. }
  110. func (em *EventMeter) Stop() {
  111. <-em.quit
  112. em.RegisterDisconnectCallback(nil) // so we don't try and reconnect
  113. em.wsc.Stop() // close(wsc.Quit)
  114. }
  115. func (em *EventMeter) StopAndReconnect() {
  116. em.wsc.Stop()
  117. em.mtx.Lock()
  118. defer em.mtx.Unlock()
  119. if em.disconnectCallback != nil {
  120. go em.disconnectCallback()
  121. }
  122. }
  123. func (em *EventMeter) Subscribe(eventID string, cb EventCallbackFunc) error {
  124. em.mtx.Lock()
  125. defer em.mtx.Unlock()
  126. if _, ok := em.events[eventID]; ok {
  127. return fmt.Errorf("subscribtion already exists")
  128. }
  129. if err := em.wsc.Subscribe(eventID); err != nil {
  130. return err
  131. }
  132. metric := &EventMetric{
  133. ID: eventID,
  134. Started: time.Now(),
  135. MinDuration: 1 << 62,
  136. meter: metrics.NewMeter(),
  137. callback: cb,
  138. }
  139. em.events[eventID] = metric
  140. return nil
  141. }
  142. func (em *EventMeter) Unsubscribe(eventID string) error {
  143. em.mtx.Lock()
  144. defer em.mtx.Unlock()
  145. if err := em.wsc.Unsubscribe(eventID); err != nil {
  146. return err
  147. }
  148. // XXX: should we persist or save this info first?
  149. delete(em.events, eventID)
  150. return nil
  151. }
  152. // Fill in the latest data for an event and return a copy
  153. func (em *EventMeter) GetMetric(eventID string) (*EventMetric, error) {
  154. em.mtx.Lock()
  155. defer em.mtx.Unlock()
  156. metric, ok := em.events[eventID]
  157. if !ok {
  158. return nil, fmt.Errorf("Unknown event %s", eventID)
  159. }
  160. return metric.fillMetric().Copy(), nil
  161. }
  162. // Return the average latency over the websocket
  163. func (em *EventMeter) Latency() float64 {
  164. em.mtx.Lock()
  165. defer em.mtx.Unlock()
  166. return em.timer.Mean()
  167. }
  168. func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) {
  169. em.mtx.Lock()
  170. defer em.mtx.Unlock()
  171. em.latencyCallback = f
  172. }
  173. func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) {
  174. em.mtx.Lock()
  175. defer em.mtx.Unlock()
  176. em.disconnectCallback = f
  177. }
  178. //------------------------------------------------------
  179. func (em *EventMeter) receiveRoutine() {
  180. pingTime := time.Second * 1
  181. pingTicker := time.NewTicker(pingTime)
  182. pingAttempts := 0 // if this hits maxPingsPerPong we kill the conn
  183. var err error
  184. for {
  185. select {
  186. case <-pingTicker.C:
  187. if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil {
  188. Log.Error("Failed to write ping message on websocket", err)
  189. em.StopAndReconnect()
  190. return
  191. } else if pingAttempts >= maxPingsPerPong {
  192. Log.Error(fmt.Sprintf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime))
  193. em.StopAndReconnect()
  194. return
  195. }
  196. case r := <-em.wsc.ResultsCh:
  197. if r == nil {
  198. em.StopAndReconnect()
  199. return
  200. }
  201. eventID, data, err := em.unmarshalEvent(r)
  202. if err != nil {
  203. Log.Error(err.Error())
  204. continue
  205. }
  206. if eventID != "" {
  207. em.updateMetric(eventID, data)
  208. }
  209. case <-em.wsc.Quit:
  210. em.StopAndReconnect()
  211. return
  212. case <-em.quit:
  213. return
  214. }
  215. }
  216. }
  217. func (em *EventMeter) pingForLatency(pingAttempts int) (int, error) {
  218. em.mtx.Lock()
  219. defer em.mtx.Unlock()
  220. // ping to record latency
  221. if !em.receivedPong {
  222. return pingAttempts + 1, nil
  223. }
  224. em.lastPing = time.Now()
  225. em.receivedPong = false
  226. err := em.wsc.Conn.WriteMessage(websocket.PingMessage, []byte{})
  227. if err != nil {
  228. return pingAttempts, err
  229. }
  230. return 0, nil
  231. }
  232. func (em *EventMeter) updateMetric(eventID string, data events.EventData) {
  233. em.mtx.Lock()
  234. defer em.mtx.Unlock()
  235. metric, ok := em.events[eventID]
  236. if !ok {
  237. // we already unsubscribed, or got an unexpected event
  238. return
  239. }
  240. last := metric.LastHeard
  241. metric.LastHeard = time.Now()
  242. metric.meter.Mark(1)
  243. dur := int64(metric.LastHeard.Sub(last))
  244. if dur < metric.MinDuration {
  245. metric.MinDuration = dur
  246. }
  247. if !last.IsZero() && dur > metric.MaxDuration {
  248. metric.MaxDuration = dur
  249. }
  250. if metric.callback != nil {
  251. go metric.callback(metric.Copy(), data)
  252. }
  253. }