You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

269 lines
6.3 KiB

  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "math"
  6. "time"
  7. em "github.com/tendermint/go-event-meter"
  8. events "github.com/tendermint/go-events"
  9. rpc_client "github.com/tendermint/go-rpc/client"
  10. tmtypes "github.com/tendermint/tendermint/types"
  11. crypto "github.com/tendermint/go-crypto"
  12. wire "github.com/tendermint/go-wire"
  13. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  14. )
  15. // remove when https://github.com/tendermint/go-rpc/issues/8 will be fixed
  16. type rpcClientI interface {
  17. Call(method string, params map[string]interface{}, result interface{}) (interface{}, error)
  18. }
  19. const maxRestarts = 25
  20. type Node struct {
  21. rpcAddr string
  22. IsValidator bool `json:"is_validator"` // validator or non-validator?
  23. pubKey crypto.PubKey `json:"pub_key"`
  24. Name string `json:"name"`
  25. Online bool `json:"online"`
  26. Height uint64 `json:"height"`
  27. BlockLatency float64 `json:"block_latency" wire:"unsafe"` // ms, interval between block commits
  28. // em holds the ws connection. Each eventMeter callback is called in a separate go-routine.
  29. em eventMeter
  30. // rpcClient is an client for making RPC calls to TM
  31. rpcClient rpcClientI
  32. blockCh chan<- tmtypes.Header
  33. blockLatencyCh chan<- float64
  34. disconnectCh chan<- bool
  35. checkIsValidatorInterval time.Duration
  36. quit chan struct{}
  37. }
  38. func NewNode(rpcAddr string, options ...func(*Node)) *Node {
  39. em := em.NewEventMeter(rpcAddr, UnmarshalEvent)
  40. rpcClient := rpc_client.NewClientURI(rpcAddr) // HTTP client by default
  41. return NewNodeWithEventMeterAndRpcClient(rpcAddr, em, rpcClient, options...)
  42. }
  43. func NewNodeWithEventMeterAndRpcClient(rpcAddr string, em eventMeter, rpcClient rpcClientI, options ...func(*Node)) *Node {
  44. n := &Node{
  45. rpcAddr: rpcAddr,
  46. em: em,
  47. rpcClient: rpcClient,
  48. Name: rpcAddr,
  49. quit: make(chan struct{}),
  50. checkIsValidatorInterval: 5 * time.Second,
  51. }
  52. for _, option := range options {
  53. option(n)
  54. }
  55. return n
  56. }
  57. // SetCheckIsValidatorInterval lets you change interval for checking whenever
  58. // node is still a validator or not.
  59. func SetCheckIsValidatorInterval(d time.Duration) func(n *Node) {
  60. return func(n *Node) {
  61. n.checkIsValidatorInterval = d
  62. }
  63. }
  64. func (n *Node) SendBlocksTo(ch chan<- tmtypes.Header) {
  65. n.blockCh = ch
  66. }
  67. func (n *Node) SendBlockLatenciesTo(ch chan<- float64) {
  68. n.blockLatencyCh = ch
  69. }
  70. func (n *Node) NotifyAboutDisconnects(ch chan<- bool) {
  71. n.disconnectCh = ch
  72. }
  73. func (n *Node) Start() error {
  74. if _, err := n.em.Start(); err != nil {
  75. return err
  76. }
  77. n.em.RegisterLatencyCallback(latencyCallback(n))
  78. n.em.Subscribe(tmtypes.EventStringNewBlockHeader(), newBlockCallback(n))
  79. n.em.RegisterDisconnectCallback(disconnectCallback(n))
  80. n.Online = true
  81. n.checkIsValidator()
  82. go n.checkIsValidatorLoop()
  83. return nil
  84. }
  85. func (n *Node) Stop() {
  86. n.Online = false
  87. n.em.RegisterLatencyCallback(nil)
  88. n.em.Unsubscribe(tmtypes.EventStringNewBlockHeader())
  89. n.em.RegisterDisconnectCallback(nil)
  90. // FIXME stop blocks at event_meter.go:140
  91. // n.em.Stop()
  92. close(n.quit)
  93. }
  94. // implements eventmeter.EventCallbackFunc
  95. func newBlockCallback(n *Node) em.EventCallbackFunc {
  96. return func(metric *em.EventMetric, data events.EventData) {
  97. block := data.(tmtypes.EventDataNewBlockHeader).Header
  98. n.Height = uint64(block.Height)
  99. if n.blockCh != nil {
  100. n.blockCh <- *block
  101. }
  102. }
  103. }
  104. // implements eventmeter.EventLatencyFunc
  105. func latencyCallback(n *Node) em.LatencyCallbackFunc {
  106. return func(latency float64) {
  107. n.BlockLatency = latency / 1000000.0 // ns to ms
  108. if n.blockLatencyCh != nil {
  109. n.blockLatencyCh <- latency
  110. }
  111. }
  112. }
  113. // implements eventmeter.DisconnectCallbackFunc
  114. func disconnectCallback(n *Node) em.DisconnectCallbackFunc {
  115. return func() {
  116. n.Online = false
  117. if n.disconnectCh != nil {
  118. n.disconnectCh <- true
  119. }
  120. if err := n.RestartBackOff(); err != nil {
  121. log.Error(err.Error())
  122. } else {
  123. n.Online = true
  124. if n.disconnectCh != nil {
  125. n.disconnectCh <- false
  126. }
  127. }
  128. }
  129. }
  130. func (n *Node) RestartBackOff() error {
  131. attempt := 0
  132. for {
  133. d := time.Duration(math.Exp2(float64(attempt)))
  134. time.Sleep(d * time.Second)
  135. if err := n.Start(); err != nil {
  136. log.Debug("Can't connect to node %v due to %v", n, err)
  137. } else {
  138. // TODO: authenticate pubkey
  139. return nil
  140. }
  141. attempt++
  142. if attempt > maxRestarts {
  143. return fmt.Errorf("Reached max restarts for node %v", n)
  144. }
  145. }
  146. }
  147. func (n *Node) NumValidators() (height uint64, num int, err error) {
  148. height, vals, err := n.validators()
  149. if err != nil {
  150. return 0, 0, err
  151. }
  152. return height, len(vals), nil
  153. }
  154. func (n *Node) validators() (height uint64, validators []*tmtypes.Validator, err error) {
  155. var result ctypes.TMResult
  156. if _, err = n.rpcClient.Call("validators", nil, &result); err != nil {
  157. return 0, make([]*tmtypes.Validator, 0), err
  158. }
  159. vals := result.(*ctypes.ResultValidators)
  160. return uint64(vals.BlockHeight), vals.Validators, nil
  161. }
  162. func (n *Node) checkIsValidatorLoop() {
  163. for {
  164. select {
  165. case <-n.quit:
  166. return
  167. case <-time.After(n.checkIsValidatorInterval):
  168. n.checkIsValidator()
  169. }
  170. }
  171. }
  172. func (n *Node) checkIsValidator() {
  173. _, validators, err := n.validators()
  174. if err == nil {
  175. for _, v := range validators {
  176. key, err := n.getPubKey()
  177. if err == nil && v.PubKey == key {
  178. n.IsValidator = true
  179. }
  180. }
  181. } else {
  182. log.Debug(err.Error())
  183. }
  184. }
  185. func (n *Node) getPubKey() (crypto.PubKey, error) {
  186. if n.pubKey != nil {
  187. return n.pubKey, nil
  188. }
  189. var result ctypes.TMResult
  190. _, err := n.rpcClient.Call("status", nil, &result)
  191. if err != nil {
  192. return nil, err
  193. }
  194. status := result.(*ctypes.ResultStatus)
  195. n.pubKey = status.PubKey
  196. return n.pubKey, nil
  197. }
  198. type eventMeter interface {
  199. Start() (bool, error)
  200. Stop() bool
  201. RegisterLatencyCallback(em.LatencyCallbackFunc)
  202. RegisterDisconnectCallback(em.DisconnectCallbackFunc)
  203. Subscribe(string, em.EventCallbackFunc) error
  204. Unsubscribe(string) error
  205. }
  206. // UnmarshalEvent unmarshals a json event
  207. func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) {
  208. var err error
  209. result := new(ctypes.TMResult)
  210. wire.ReadJSONPtr(result, b, &err)
  211. if err != nil {
  212. return "", nil, err
  213. }
  214. event, ok := (*result).(*ctypes.ResultEvent)
  215. if !ok {
  216. return "", nil, nil // TODO: handle non-event messages (ie. return from subscribe/unsubscribe)
  217. // fmt.Errorf("Result is not type *ctypes.ResultEvent. Got %v", reflect.TypeOf(*result))
  218. }
  219. return event.Name, event.Data, nil
  220. }