You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

265 lines
6.0 KiB

lint: Enable Golint (#4212) * Fix many golint errors * Fix golint errors in the 'lite' package * Don't export Pool.store * Fix typo * Revert unwanted changes * Fix errors in counter package * Fix linter errors in kvstore package * Fix linter error in example package * Fix error in tests package * Fix linter errors in v2 package * Fix linter errors in consensus package * Fix linter errors in evidence package * Fix linter error in fail package * Fix linter errors in query package * Fix linter errors in core package * Fix linter errors in node package * Fix linter errors in mempool package * Fix linter error in conn package * Fix linter errors in pex package * Rename PEXReactor export to Reactor * Fix linter errors in trust package * Fix linter errors in upnp package * Fix linter errors in p2p package * Fix linter errors in proxy package * Fix linter errors in mock_test package * Fix linter error in client_test package * Fix linter errors in coretypes package * Fix linter errors in coregrpc package * Fix linter errors in rpcserver package * Fix linter errors in rpctypes package * Fix linter errors in rpctest package * Fix linter error in json2wal script * Fix linter error in wal2json script * Fix linter errors in kv package * Fix linter error in state package * Fix linter error in grpc_client * Fix linter errors in types package * Fix linter error in version package * Fix remaining errors * Address review comments * Fix broken tests * Reconcile package coregrpc * Fix golangci bot error * Fix new golint errors * Fix broken reference * Enable golint linter * minor changes to bring golint into line * fix failing test * fix pex reactor naming * address PR comments
5 years ago
  1. package monitor
  2. import (
  3. "encoding/json"
  4. "math"
  5. "time"
  6. "github.com/pkg/errors"
  7. "github.com/tendermint/tendermint/crypto"
  8. "github.com/tendermint/tendermint/libs/events"
  9. "github.com/tendermint/tendermint/libs/log"
  10. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  11. rpc_client "github.com/tendermint/tendermint/rpc/lib/client"
  12. em "github.com/tendermint/tendermint/tools/tm-monitor/eventmeter"
  13. tmtypes "github.com/tendermint/tendermint/types"
  14. )
  15. const maxRestarts = 25
  16. type Node struct {
  17. IsValidator bool `json:"is_validator"` // validator or non-validator?
  18. Online bool `json:"online"`
  19. Height int64 `json:"height"`
  20. rpcAddr string
  21. Name string `json:"name"`
  22. pubKey crypto.PubKey
  23. BlockLatency float64 `json:"block_latency" amino:"unsafe"` // ms, interval between block commits
  24. // em holds the ws connection. Each eventMeter callback is called in a separate go-routine.
  25. em eventMeter
  26. // rpcClient is an client for making RPC calls to TM
  27. rpcClient rpc_client.HTTPClient
  28. blockCh chan<- *tmtypes.Block
  29. blockLatencyCh chan<- float64
  30. disconnectCh chan<- bool
  31. checkIsValidatorInterval time.Duration
  32. quit chan struct{}
  33. logger log.Logger
  34. }
  35. func NewNode(rpcAddr string, options ...func(*Node)) *Node {
  36. em := em.NewEventMeter(rpcAddr, UnmarshalEvent)
  37. rpcClient := rpc_client.NewURIClient(rpcAddr) // HTTP client by default
  38. rpcClient.SetCodec(cdc)
  39. return NewNodeWithEventMeterAndRPCClient(rpcAddr, em, rpcClient, options...)
  40. }
  41. func NewNodeWithEventMeterAndRPCClient(
  42. rpcAddr string,
  43. em eventMeter,
  44. rpcClient rpc_client.HTTPClient,
  45. options ...func(*Node),
  46. ) *Node {
  47. n := &Node{
  48. rpcAddr: rpcAddr,
  49. em: em,
  50. rpcClient: rpcClient,
  51. Name: rpcAddr,
  52. quit: make(chan struct{}),
  53. checkIsValidatorInterval: 5 * time.Second,
  54. logger: log.NewNopLogger(),
  55. }
  56. for _, option := range options {
  57. option(n)
  58. }
  59. return n
  60. }
  61. // SetCheckIsValidatorInterval lets you change interval for checking whenever
  62. // node is still a validator or not.
  63. func SetCheckIsValidatorInterval(d time.Duration) func(n *Node) {
  64. return func(n *Node) {
  65. n.checkIsValidatorInterval = d
  66. }
  67. }
  68. func (n *Node) SendBlocksTo(ch chan<- *tmtypes.Block) {
  69. n.blockCh = ch
  70. }
  71. func (n *Node) SendBlockLatenciesTo(ch chan<- float64) {
  72. n.blockLatencyCh = ch
  73. }
  74. func (n *Node) NotifyAboutDisconnects(ch chan<- bool) {
  75. n.disconnectCh = ch
  76. }
  77. // SetLogger lets you set your own logger
  78. func (n *Node) SetLogger(l log.Logger) {
  79. n.logger = l
  80. n.em.SetLogger(l)
  81. }
  82. func (n *Node) Start() error {
  83. if err := n.em.Start(); err != nil {
  84. return err
  85. }
  86. n.em.RegisterLatencyCallback(latencyCallback(n))
  87. err := n.em.Subscribe(tmtypes.EventQueryNewBlock.String(), newBlockCallback(n))
  88. if err != nil {
  89. return err
  90. }
  91. n.em.RegisterDisconnectCallback(disconnectCallback(n))
  92. n.Online = true
  93. n.checkIsValidator()
  94. go n.checkIsValidatorLoop()
  95. return nil
  96. }
  97. func (n *Node) Stop() {
  98. n.Online = false
  99. n.em.Stop()
  100. close(n.quit)
  101. }
  102. // implements eventmeter.EventCallbackFunc
  103. func newBlockCallback(n *Node) em.EventCallbackFunc {
  104. return func(metric *em.EventMetric, data interface{}) {
  105. block := data.(tmtypes.TMEventData).(tmtypes.EventDataNewBlock).Block
  106. n.Height = block.Height
  107. n.logger.Info("new block", "height", block.Height)
  108. if n.blockCh != nil {
  109. n.blockCh <- block
  110. }
  111. }
  112. }
  113. // implements eventmeter.EventLatencyFunc
  114. func latencyCallback(n *Node) em.LatencyCallbackFunc {
  115. return func(latency float64) {
  116. n.BlockLatency = latency / 1000000.0 // ns to ms
  117. n.logger.Info("new block latency", "latency", n.BlockLatency)
  118. if n.blockLatencyCh != nil {
  119. n.blockLatencyCh <- latency
  120. }
  121. }
  122. }
  123. // implements eventmeter.DisconnectCallbackFunc
  124. func disconnectCallback(n *Node) em.DisconnectCallbackFunc {
  125. return func() {
  126. n.Online = false
  127. n.logger.Info("status", "down")
  128. if n.disconnectCh != nil {
  129. n.disconnectCh <- true
  130. }
  131. }
  132. }
  133. func (n *Node) RestartEventMeterBackoff() error {
  134. attempt := 0
  135. for {
  136. d := time.Duration(math.Exp2(float64(attempt)))
  137. time.Sleep(d * time.Second)
  138. if err := n.em.Start(); err != nil {
  139. n.logger.Info("restart failed", "err", err)
  140. } else {
  141. // TODO: authenticate pubkey
  142. return nil
  143. }
  144. attempt++
  145. if attempt > maxRestarts {
  146. return errors.New("reached max restarts")
  147. }
  148. }
  149. }
  150. func (n *Node) NumValidators() (height int64, num int, err error) {
  151. height, vals, err := n.validators()
  152. if err != nil {
  153. return 0, 0, err
  154. }
  155. return height, len(vals), nil
  156. }
  157. func (n *Node) validators() (height int64, validators []*tmtypes.Validator, err error) {
  158. vals := new(ctypes.ResultValidators)
  159. if _, err = n.rpcClient.Call("validators", nil, vals); err != nil {
  160. return 0, make([]*tmtypes.Validator, 0), err
  161. }
  162. return vals.BlockHeight, vals.Validators, nil
  163. }
  164. func (n *Node) checkIsValidatorLoop() {
  165. for {
  166. select {
  167. case <-n.quit:
  168. return
  169. case <-time.After(n.checkIsValidatorInterval):
  170. n.checkIsValidator()
  171. }
  172. }
  173. }
  174. func (n *Node) checkIsValidator() {
  175. _, validators, err := n.validators()
  176. if err == nil {
  177. for _, v := range validators {
  178. key, err1 := n.getPubKey()
  179. if err1 == nil && v.PubKey.Equals(key) {
  180. n.IsValidator = true
  181. }
  182. }
  183. } else {
  184. n.logger.Info("check is validator failed", "err", err)
  185. }
  186. }
  187. func (n *Node) getPubKey() (crypto.PubKey, error) {
  188. if n.pubKey != nil {
  189. return n.pubKey, nil
  190. }
  191. status := new(ctypes.ResultStatus)
  192. _, err := n.rpcClient.Call("status", nil, status)
  193. if err != nil {
  194. return nil, err
  195. }
  196. n.pubKey = status.ValidatorInfo.PubKey
  197. return n.pubKey, nil
  198. }
  199. type eventMeter interface {
  200. Start() error
  201. Stop()
  202. RegisterLatencyCallback(em.LatencyCallbackFunc)
  203. RegisterDisconnectCallback(em.DisconnectCallbackFunc)
  204. Subscribe(string, em.EventCallbackFunc) error
  205. Unsubscribe(string) error
  206. SetLogger(l log.Logger)
  207. }
  208. // UnmarshalEvent unmarshals a json event
  209. func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) {
  210. event := new(ctypes.ResultEvent)
  211. if err := cdc.UnmarshalJSON(b, event); err != nil {
  212. return "", nil, err
  213. }
  214. return event.Query, event.Data, nil
  215. }