You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

225 lines
5.0 KiB

  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "os"
  7. "reflect"
  8. "sync"
  9. "time"
  10. . "github.com/tendermint/go-common"
  11. "github.com/tendermint/go-crypto"
  12. // register rpc and event types with go-
  13. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  14. // "github.com/tendermint/tendermint/types"
  15. client "github.com/tendermint/tendermint/rpc/client"
  16. "github.com/gorilla/websocket"
  17. "github.com/rcrowley/go-metrics"
  18. )
  19. //------------------------------------------------------
  20. // Connect to all validators for a blockchain
  21. type Blockchain struct {
  22. ID string
  23. Validators []Validator
  24. }
  25. type Validator struct {
  26. ID string
  27. PubKey crypto.PubKey
  28. IP string
  29. Port int
  30. }
  31. //------------------------------------------------------
  32. // Generic system to subscribe to events and record their frequency
  33. // Metrics for a given event
  34. type EventMetric struct {
  35. ID string `json:"id"`
  36. Started time.Time `json:"start_time"`
  37. LastHeard time.Time `json:"last_heard"`
  38. MinDuration int64 `json:"min_duration"`
  39. MaxDuration int64 `json:"max_duration"`
  40. // tracks event count and rate
  41. meter metrics.Meter
  42. // filled in from the Meter
  43. Count int64 `json:"count"`
  44. Rate1 float64 `json:"rate_1"`
  45. Rate5 float64 `json:"rate_5"`
  46. Rate15 float64 `json:"rate_15"`
  47. RateMean float64 `json:"rate_mean"`
  48. // XXX: move this
  49. // latency for node itself (not related to event)
  50. Latency float64 `json:"latency_mean"`
  51. }
  52. // Each node gets an event meter to track events for that node
  53. type EventMeter struct {
  54. QuitService
  55. wsc *client.WSClient
  56. mtx sync.Mutex
  57. events map[string]*EventMetric
  58. // to record latency
  59. timer metrics.Timer
  60. lastPing time.Time
  61. receivedPong bool
  62. }
  63. func NewEventMeter(addr string) *EventMeter {
  64. em := &EventMeter{
  65. wsc: client.NewWSClient(addr),
  66. events: make(map[string]*EventMetric),
  67. timer: metrics.NewTimer(),
  68. receivedPong: true,
  69. }
  70. em.QuitService = *NewQuitService(nil, "EventMeter", em)
  71. return em
  72. }
  73. func (em *EventMeter) OnStart() error {
  74. em.QuitService.OnStart()
  75. if err := em.wsc.OnStart(); err != nil {
  76. return err
  77. }
  78. em.wsc.Conn.SetPongHandler(func(m string) error {
  79. // NOTE: https://github.com/gorilla/websocket/issues/97
  80. em.mtx.Lock()
  81. defer em.mtx.Unlock()
  82. em.receivedPong = true
  83. em.timer.UpdateSince(em.lastPing)
  84. return nil
  85. })
  86. go em.receiveRoutine()
  87. return nil
  88. }
  89. func (em *EventMeter) OnStop() {
  90. em.wsc.OnStop()
  91. em.QuitService.OnStop()
  92. }
  93. func (em *EventMeter) Subscribe(eventid string) error {
  94. em.mtx.Lock()
  95. defer em.mtx.Unlock()
  96. if _, ok := em.events[eventid]; ok {
  97. return fmt.Errorf("Subscription already exists")
  98. }
  99. if err := em.wsc.Subscribe(eventid); err != nil {
  100. return err
  101. }
  102. em.events[eventid] = &EventMetric{
  103. Started: time.Now(),
  104. MinDuration: 1 << 62,
  105. meter: metrics.NewMeter(),
  106. }
  107. return nil
  108. }
  109. func (em *EventMeter) Unsubscribe(eventid string) error {
  110. em.mtx.Lock()
  111. defer em.mtx.Unlock()
  112. if err := em.wsc.Unsubscribe(eventid); err != nil {
  113. return err
  114. }
  115. // XXX: should we persist or save this info first?
  116. delete(em.events, eventid)
  117. return nil
  118. }
  119. //------------------------------------------------------
  120. func (em *EventMeter) receiveRoutine() {
  121. logTicker := time.NewTicker(time.Second * 3)
  122. pingTicker := time.NewTicker(time.Second * 1)
  123. for {
  124. select {
  125. case <-logTicker.C:
  126. em.mtx.Lock()
  127. for _, metric := range em.events {
  128. metric.Count = metric.meter.Count()
  129. metric.Rate1 = metric.meter.Rate1()
  130. metric.Rate5 = metric.meter.Rate5()
  131. metric.Rate15 = metric.meter.Rate15()
  132. metric.RateMean = metric.meter.RateMean()
  133. metric.Latency = em.timer.Mean()
  134. b, err := json.Marshal(metric)
  135. if err != nil {
  136. // TODO
  137. log.Error(err.Error())
  138. continue
  139. }
  140. var out bytes.Buffer
  141. json.Indent(&out, b, "", "\t")
  142. out.WriteTo(os.Stdout)
  143. }
  144. em.mtx.Unlock()
  145. case <-pingTicker.C:
  146. em.mtx.Lock()
  147. // ping to record latency
  148. if !em.receivedPong {
  149. // XXX: why is the pong taking so long? should we stop the conn?
  150. em.mtx.Unlock()
  151. continue
  152. }
  153. em.lastPing = time.Now()
  154. em.receivedPong = false
  155. err := em.wsc.Conn.WriteMessage(websocket.PingMessage, []byte{})
  156. if err != nil {
  157. log.Error("Failed to write ping message on websocket", "error", err)
  158. em.wsc.Stop()
  159. return
  160. }
  161. em.mtx.Unlock()
  162. case r := <-em.wsc.ResultsCh:
  163. em.mtx.Lock()
  164. switch r := r.(type) {
  165. case *ctypes.ResultEvent:
  166. id, _ := r.Event, r.Data
  167. metric, ok := em.events[id]
  168. if !ok {
  169. // we already unsubscribed, or got an unexpected event
  170. continue
  171. }
  172. last := metric.LastHeard
  173. metric.LastHeard = time.Now()
  174. metric.meter.Mark(1)
  175. dur := int64(metric.LastHeard.Sub(last))
  176. if dur < metric.MinDuration {
  177. metric.MinDuration = dur
  178. }
  179. if !last.IsZero() && dur > metric.MaxDuration {
  180. metric.MaxDuration = dur
  181. }
  182. default:
  183. log.Error("Unknown result event type", "type", reflect.TypeOf(r))
  184. }
  185. em.mtx.Unlock()
  186. case <-em.Quit:
  187. break
  188. }
  189. }
  190. }