You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

269 lines
6.2 KiB

  1. package monitor
  2. import (
  3. "encoding/json"
  4. "math"
  5. "time"
  6. "github.com/pkg/errors"
  7. crypto "github.com/tendermint/go-crypto"
  8. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  9. rpc_client "github.com/tendermint/tendermint/rpc/lib/client"
  10. tmtypes "github.com/tendermint/tendermint/types"
  11. "github.com/tendermint/tmlibs/events"
  12. "github.com/tendermint/tmlibs/log"
  13. em "github.com/tendermint/tools/tm-monitor/eventmeter"
  14. )
  15. const maxRestarts = 25
  16. type Node struct {
  17. rpcAddr string
  18. IsValidator bool `json:"is_validator"` // validator or non-validator?
  19. pubKey crypto.PubKey `json:"pub_key"`
  20. Name string `json:"name"`
  21. Online bool `json:"online"`
  22. Height uint64 `json:"height"`
  23. BlockLatency float64 `json:"block_latency" wire:"unsafe"` // ms, interval between block commits
  24. // em holds the ws connection. Each eventMeter callback is called in a separate go-routine.
  25. em eventMeter
  26. // rpcClient is an client for making RPC calls to TM
  27. rpcClient rpc_client.HTTPClient
  28. blockCh chan<- tmtypes.Header
  29. blockLatencyCh chan<- float64
  30. disconnectCh chan<- bool
  31. checkIsValidatorInterval time.Duration
  32. quit chan struct{}
  33. logger log.Logger
  34. }
  35. func NewNode(rpcAddr string, options ...func(*Node)) *Node {
  36. em := em.NewEventMeter(rpcAddr, UnmarshalEvent)
  37. rpcClient := rpc_client.NewURIClient(rpcAddr) // HTTP client by default
  38. return NewNodeWithEventMeterAndRpcClient(rpcAddr, em, rpcClient, options...)
  39. }
  40. func NewNodeWithEventMeterAndRpcClient(rpcAddr string, em eventMeter, rpcClient rpc_client.HTTPClient, options ...func(*Node)) *Node {
  41. n := &Node{
  42. rpcAddr: rpcAddr,
  43. em: em,
  44. rpcClient: rpcClient,
  45. Name: rpcAddr,
  46. quit: make(chan struct{}),
  47. checkIsValidatorInterval: 5 * time.Second,
  48. logger: log.NewNopLogger(),
  49. }
  50. for _, option := range options {
  51. option(n)
  52. }
  53. return n
  54. }
  55. // SetCheckIsValidatorInterval lets you change interval for checking whenever
  56. // node is still a validator or not.
  57. func SetCheckIsValidatorInterval(d time.Duration) func(n *Node) {
  58. return func(n *Node) {
  59. n.checkIsValidatorInterval = d
  60. }
  61. }
  62. func (n *Node) SendBlocksTo(ch chan<- tmtypes.Header) {
  63. n.blockCh = ch
  64. }
  65. func (n *Node) SendBlockLatenciesTo(ch chan<- float64) {
  66. n.blockLatencyCh = ch
  67. }
  68. func (n *Node) NotifyAboutDisconnects(ch chan<- bool) {
  69. n.disconnectCh = ch
  70. }
  71. // SetLogger lets you set your own logger
  72. func (n *Node) SetLogger(l log.Logger) {
  73. n.logger = l
  74. n.em.SetLogger(l)
  75. }
  76. func (n *Node) Start() error {
  77. if err := n.em.Start(); err != nil {
  78. return err
  79. }
  80. n.em.RegisterLatencyCallback(latencyCallback(n))
  81. err := n.em.Subscribe(tmtypes.EventStringNewBlockHeader(), newBlockCallback(n))
  82. if err != nil {
  83. return err
  84. }
  85. n.em.RegisterDisconnectCallback(disconnectCallback(n))
  86. n.Online = true
  87. n.checkIsValidator()
  88. go n.checkIsValidatorLoop()
  89. return nil
  90. }
  91. func (n *Node) Stop() {
  92. n.Online = false
  93. n.em.Stop()
  94. close(n.quit)
  95. }
  96. // implements eventmeter.EventCallbackFunc
  97. func newBlockCallback(n *Node) em.EventCallbackFunc {
  98. return func(metric *em.EventMetric, data events.EventData) {
  99. block := data.(tmtypes.TMEventData).Unwrap().(tmtypes.EventDataNewBlockHeader).Header
  100. n.Height = uint64(block.Height)
  101. n.logger.Info("event", "new block", "height", block.Height, "numTxs", block.NumTxs)
  102. if n.blockCh != nil {
  103. n.blockCh <- *block
  104. }
  105. }
  106. }
  107. // implements eventmeter.EventLatencyFunc
  108. func latencyCallback(n *Node) em.LatencyCallbackFunc {
  109. return func(latency float64) {
  110. n.BlockLatency = latency / 1000000.0 // ns to ms
  111. n.logger.Info("event", "new block latency", "latency", n.BlockLatency)
  112. if n.blockLatencyCh != nil {
  113. n.blockLatencyCh <- latency
  114. }
  115. }
  116. }
  117. // implements eventmeter.DisconnectCallbackFunc
  118. func disconnectCallback(n *Node) em.DisconnectCallbackFunc {
  119. return func() {
  120. n.Online = false
  121. n.logger.Info("status", "down")
  122. if n.disconnectCh != nil {
  123. n.disconnectCh <- true
  124. }
  125. if err := n.RestartEventMeterBackoff(); err != nil {
  126. n.logger.Info("err", errors.Wrap(err, "restart failed"))
  127. } else {
  128. n.Online = true
  129. n.logger.Info("status", "online")
  130. if n.disconnectCh != nil {
  131. n.disconnectCh <- false
  132. }
  133. }
  134. }
  135. }
  136. func (n *Node) RestartEventMeterBackoff() error {
  137. attempt := 0
  138. for {
  139. d := time.Duration(math.Exp2(float64(attempt)))
  140. time.Sleep(d * time.Second)
  141. if err := n.em.Start(); err != nil {
  142. n.logger.Info("err", errors.Wrap(err, "restart failed"))
  143. } else {
  144. // TODO: authenticate pubkey
  145. return nil
  146. }
  147. attempt++
  148. if attempt > maxRestarts {
  149. return errors.New("Reached max restarts")
  150. }
  151. }
  152. }
  153. func (n *Node) NumValidators() (height uint64, num int, err error) {
  154. height, vals, err := n.validators()
  155. if err != nil {
  156. return 0, 0, err
  157. }
  158. return height, len(vals), nil
  159. }
  160. func (n *Node) validators() (height uint64, validators []*tmtypes.Validator, err error) {
  161. vals := new(ctypes.ResultValidators)
  162. if _, err = n.rpcClient.Call("validators", nil, vals); err != nil {
  163. return 0, make([]*tmtypes.Validator, 0), err
  164. }
  165. return uint64(vals.BlockHeight), vals.Validators, nil
  166. }
  167. func (n *Node) checkIsValidatorLoop() {
  168. for {
  169. select {
  170. case <-n.quit:
  171. return
  172. case <-time.After(n.checkIsValidatorInterval):
  173. n.checkIsValidator()
  174. }
  175. }
  176. }
  177. func (n *Node) checkIsValidator() {
  178. _, validators, err := n.validators()
  179. if err == nil {
  180. for _, v := range validators {
  181. key, err := n.getPubKey()
  182. if err == nil && v.PubKey == key {
  183. n.IsValidator = true
  184. }
  185. }
  186. } else {
  187. n.logger.Info("err", errors.Wrap(err, "check is validator failed"))
  188. }
  189. }
  190. func (n *Node) getPubKey() (crypto.PubKey, error) {
  191. if !n.pubKey.Empty() {
  192. return n.pubKey, nil
  193. }
  194. status := new(ctypes.ResultStatus)
  195. _, err := n.rpcClient.Call("status", nil, status)
  196. if err != nil {
  197. return crypto.PubKey{}, err
  198. }
  199. n.pubKey = status.PubKey
  200. return n.pubKey, nil
  201. }
  202. type eventMeter interface {
  203. Start() error
  204. Stop()
  205. RegisterLatencyCallback(em.LatencyCallbackFunc)
  206. RegisterDisconnectCallback(em.DisconnectCallbackFunc)
  207. Subscribe(string, em.EventCallbackFunc) error
  208. Unsubscribe(string) error
  209. SetLogger(l log.Logger)
  210. }
  211. // UnmarshalEvent unmarshals a json event
  212. func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) {
  213. event := new(ctypes.ResultEvent)
  214. if err := json.Unmarshal(b, event); err != nil {
  215. return "", nil, err
  216. }
  217. return event.Name, event.Data, nil
  218. }