You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

260 lines
6.1 KiB

  1. package monitor
  2. import (
  3. "encoding/json"
  4. "math"
  5. "time"
  6. "github.com/pkg/errors"
  7. "github.com/tendermint/tendermint/crypto"
  8. "github.com/tendermint/tendermint/libs/events"
  9. "github.com/tendermint/tendermint/libs/log"
  10. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  11. rpc_client "github.com/tendermint/tendermint/rpc/lib/client"
  12. em "github.com/tendermint/tendermint/tools/tm-monitor/eventmeter"
  13. tmtypes "github.com/tendermint/tendermint/types"
  14. )
  15. const maxRestarts = 25
  16. type Node struct {
  17. rpcAddr string
  18. IsValidator bool `json:"is_validator"` // validator or non-validator?
  19. pubKey crypto.PubKey `json:"pub_key"`
  20. Name string `json:"name"`
  21. Online bool `json:"online"`
  22. Height int64 `json:"height"`
  23. BlockLatency float64 `json:"block_latency" amino:"unsafe"` // ms, interval between block commits
  24. // em holds the ws connection. Each eventMeter callback is called in a separate go-routine.
  25. em eventMeter
  26. // rpcClient is an client for making RPC calls to TM
  27. rpcClient rpc_client.HTTPClient
  28. blockCh chan<- tmtypes.Header
  29. blockLatencyCh chan<- float64
  30. disconnectCh chan<- bool
  31. checkIsValidatorInterval time.Duration
  32. quit chan struct{}
  33. logger log.Logger
  34. }
  35. func NewNode(rpcAddr string, options ...func(*Node)) *Node {
  36. em := em.NewEventMeter(rpcAddr, UnmarshalEvent)
  37. rpcClient := rpc_client.NewURIClient(rpcAddr) // HTTP client by default
  38. rpcClient.SetCodec(cdc)
  39. return NewNodeWithEventMeterAndRpcClient(rpcAddr, em, rpcClient, options...)
  40. }
  41. func NewNodeWithEventMeterAndRpcClient(rpcAddr string, em eventMeter, rpcClient rpc_client.HTTPClient, options ...func(*Node)) *Node {
  42. n := &Node{
  43. rpcAddr: rpcAddr,
  44. em: em,
  45. rpcClient: rpcClient,
  46. Name: rpcAddr,
  47. quit: make(chan struct{}),
  48. checkIsValidatorInterval: 5 * time.Second,
  49. logger: log.NewNopLogger(),
  50. }
  51. for _, option := range options {
  52. option(n)
  53. }
  54. return n
  55. }
  56. // SetCheckIsValidatorInterval lets you change interval for checking whenever
  57. // node is still a validator or not.
  58. func SetCheckIsValidatorInterval(d time.Duration) func(n *Node) {
  59. return func(n *Node) {
  60. n.checkIsValidatorInterval = d
  61. }
  62. }
  63. func (n *Node) SendBlocksTo(ch chan<- tmtypes.Header) {
  64. n.blockCh = ch
  65. }
  66. func (n *Node) SendBlockLatenciesTo(ch chan<- float64) {
  67. n.blockLatencyCh = ch
  68. }
  69. func (n *Node) NotifyAboutDisconnects(ch chan<- bool) {
  70. n.disconnectCh = ch
  71. }
  72. // SetLogger lets you set your own logger
  73. func (n *Node) SetLogger(l log.Logger) {
  74. n.logger = l
  75. n.em.SetLogger(l)
  76. }
  77. func (n *Node) Start() error {
  78. if err := n.em.Start(); err != nil {
  79. return err
  80. }
  81. n.em.RegisterLatencyCallback(latencyCallback(n))
  82. err := n.em.Subscribe(tmtypes.EventQueryNewBlockHeader.String(), newBlockCallback(n))
  83. if err != nil {
  84. return err
  85. }
  86. n.em.RegisterDisconnectCallback(disconnectCallback(n))
  87. n.Online = true
  88. n.checkIsValidator()
  89. go n.checkIsValidatorLoop()
  90. return nil
  91. }
  92. func (n *Node) Stop() {
  93. n.Online = false
  94. n.em.Stop()
  95. close(n.quit)
  96. }
  97. // implements eventmeter.EventCallbackFunc
  98. func newBlockCallback(n *Node) em.EventCallbackFunc {
  99. return func(metric *em.EventMetric, data interface{}) {
  100. block := data.(tmtypes.TMEventData).(tmtypes.EventDataNewBlockHeader).Header
  101. n.Height = block.Height
  102. n.logger.Info("new block", "height", block.Height, "numTxs", block.NumTxs)
  103. if n.blockCh != nil {
  104. n.blockCh <- block
  105. }
  106. }
  107. }
  108. // implements eventmeter.EventLatencyFunc
  109. func latencyCallback(n *Node) em.LatencyCallbackFunc {
  110. return func(latency float64) {
  111. n.BlockLatency = latency / 1000000.0 // ns to ms
  112. n.logger.Info("new block latency", "latency", n.BlockLatency)
  113. if n.blockLatencyCh != nil {
  114. n.blockLatencyCh <- latency
  115. }
  116. }
  117. }
  118. // implements eventmeter.DisconnectCallbackFunc
  119. func disconnectCallback(n *Node) em.DisconnectCallbackFunc {
  120. return func() {
  121. n.Online = false
  122. n.logger.Info("status", "down")
  123. if n.disconnectCh != nil {
  124. n.disconnectCh <- true
  125. }
  126. }
  127. }
  128. func (n *Node) RestartEventMeterBackoff() error {
  129. attempt := 0
  130. for {
  131. d := time.Duration(math.Exp2(float64(attempt)))
  132. time.Sleep(d * time.Second)
  133. if err := n.em.Start(); err != nil {
  134. n.logger.Info("restart failed", "err", err)
  135. } else {
  136. // TODO: authenticate pubkey
  137. return nil
  138. }
  139. attempt++
  140. if attempt > maxRestarts {
  141. return errors.New("Reached max restarts")
  142. }
  143. }
  144. }
  145. func (n *Node) NumValidators() (height int64, num int, err error) {
  146. height, vals, err := n.validators()
  147. if err != nil {
  148. return 0, 0, err
  149. }
  150. return height, len(vals), nil
  151. }
  152. func (n *Node) validators() (height int64, validators []*tmtypes.Validator, err error) {
  153. vals := new(ctypes.ResultValidators)
  154. if _, err = n.rpcClient.Call("validators", nil, vals); err != nil {
  155. return 0, make([]*tmtypes.Validator, 0), err
  156. }
  157. return vals.BlockHeight, vals.Validators, nil
  158. }
  159. func (n *Node) checkIsValidatorLoop() {
  160. for {
  161. select {
  162. case <-n.quit:
  163. return
  164. case <-time.After(n.checkIsValidatorInterval):
  165. n.checkIsValidator()
  166. }
  167. }
  168. }
  169. func (n *Node) checkIsValidator() {
  170. _, validators, err := n.validators()
  171. if err == nil {
  172. for _, v := range validators {
  173. key, err1 := n.getPubKey()
  174. if err1 == nil && v.PubKey.Equals(key) {
  175. n.IsValidator = true
  176. }
  177. }
  178. } else {
  179. n.logger.Info("check is validator failed", "err", err)
  180. }
  181. }
  182. func (n *Node) getPubKey() (crypto.PubKey, error) {
  183. if n.pubKey != nil {
  184. return n.pubKey, nil
  185. }
  186. status := new(ctypes.ResultStatus)
  187. _, err := n.rpcClient.Call("status", nil, status)
  188. if err != nil {
  189. return nil, err
  190. }
  191. n.pubKey = status.ValidatorInfo.PubKey
  192. return n.pubKey, nil
  193. }
  194. type eventMeter interface {
  195. Start() error
  196. Stop()
  197. RegisterLatencyCallback(em.LatencyCallbackFunc)
  198. RegisterDisconnectCallback(em.DisconnectCallbackFunc)
  199. Subscribe(string, em.EventCallbackFunc) error
  200. Unsubscribe(string) error
  201. SetLogger(l log.Logger)
  202. }
  203. // UnmarshalEvent unmarshals a json event
  204. func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) {
  205. event := new(ctypes.ResultEvent)
  206. if err := cdc.UnmarshalJSON(b, event); err != nil {
  207. return "", nil, err
  208. }
  209. return event.Query, event.Data, nil
  210. }