You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

177 lines
4.0 KiB

  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "math"
  6. "time"
  7. em "github.com/tendermint/go-event-meter"
  8. events "github.com/tendermint/go-events"
  9. tmtypes "github.com/tendermint/tendermint/types"
  10. wire "github.com/tendermint/go-wire"
  11. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  12. )
  13. const maxRestarts = 25
  14. type Node struct {
  15. rpcAddr string
  16. IsValidator bool `json:"is_validator"` // validator or non-validator?
  17. // "github.com/tendermint/go-crypto"
  18. // PubKey crypto.PubKey `json:"pub_key"`
  19. Name string `json:"name"`
  20. Online bool `json:"online"`
  21. Height uint64 `json:"height"`
  22. BlockLatency float64 `json:"block_latency" wire:"unsafe"` // ms, interval between block commits
  23. // em holds the ws connection. Each eventMeter callback is called in a separate go-routine.
  24. em eventMeter
  25. blockCh chan<- tmtypes.Header
  26. blockLatencyCh chan<- float64
  27. disconnectCh chan<- bool
  28. }
  29. func NewNode(rpcAddr string) *Node {
  30. em := em.NewEventMeter(rpcAddr, UnmarshalEvent)
  31. return NewNodeWithEventMeter(rpcAddr, em)
  32. }
  33. func NewNodeWithEventMeter(rpcAddr string, em eventMeter) *Node {
  34. return &Node{
  35. rpcAddr: rpcAddr,
  36. em: em,
  37. Name: rpcAddr,
  38. }
  39. }
  40. func (n *Node) SendBlocksTo(ch chan<- tmtypes.Header) {
  41. n.blockCh = ch
  42. }
  43. func (n *Node) SendBlockLatenciesTo(ch chan<- float64) {
  44. n.blockLatencyCh = ch
  45. }
  46. func (n *Node) NotifyAboutDisconnects(ch chan<- bool) {
  47. n.disconnectCh = ch
  48. }
  49. func (n *Node) Start() error {
  50. if _, err := n.em.Start(); err != nil {
  51. return err
  52. }
  53. n.em.RegisterLatencyCallback(latencyCallback(n))
  54. n.em.Subscribe(tmtypes.EventStringNewBlockHeader(), newBlockCallback(n))
  55. n.em.RegisterDisconnectCallback(disconnectCallback(n))
  56. n.Online = true
  57. return nil
  58. }
  59. func (n *Node) Stop() {
  60. n.Online = false
  61. n.em.RegisterLatencyCallback(nil)
  62. n.em.Unsubscribe(tmtypes.EventStringNewBlockHeader())
  63. n.em.RegisterDisconnectCallback(nil)
  64. // FIXME stop blocks at event_meter.go:140
  65. // n.em.Stop()
  66. }
  67. // implements eventmeter.EventCallbackFunc
  68. func newBlockCallback(n *Node) em.EventCallbackFunc {
  69. return func(metric *em.EventMetric, data events.EventData) {
  70. block := data.(tmtypes.EventDataNewBlockHeader).Header
  71. n.Height = uint64(block.Height)
  72. if n.blockCh != nil {
  73. n.blockCh <- *block
  74. }
  75. }
  76. }
  77. // implements eventmeter.EventLatencyFunc
  78. func latencyCallback(n *Node) em.LatencyCallbackFunc {
  79. return func(latency float64) {
  80. n.BlockLatency = latency / 1000000.0 // ns to ms
  81. if n.blockLatencyCh != nil {
  82. n.blockLatencyCh <- latency
  83. }
  84. }
  85. }
  86. // implements eventmeter.DisconnectCallbackFunc
  87. func disconnectCallback(n *Node) em.DisconnectCallbackFunc {
  88. return func() {
  89. n.Online = false
  90. if n.disconnectCh != nil {
  91. n.disconnectCh <- true
  92. }
  93. if err := n.RestartBackOff(); err != nil {
  94. log.Error(err.Error())
  95. } else {
  96. n.Online = true
  97. if n.disconnectCh != nil {
  98. n.disconnectCh <- false
  99. }
  100. }
  101. }
  102. }
  103. func (n *Node) RestartBackOff() error {
  104. attempt := 0
  105. for {
  106. d := time.Duration(math.Exp2(float64(attempt)))
  107. time.Sleep(d * time.Second)
  108. if err := n.Start(); err != nil {
  109. log.Debug("Can't connect to node %v due to %v", n, err)
  110. } else {
  111. // TODO: authenticate pubkey
  112. return nil
  113. }
  114. attempt++
  115. if attempt > maxRestarts {
  116. return fmt.Errorf("Reached max restarts for node %v", n)
  117. }
  118. }
  119. }
  120. type eventMeter interface {
  121. Start() (bool, error)
  122. Stop() bool
  123. RegisterLatencyCallback(em.LatencyCallbackFunc)
  124. RegisterDisconnectCallback(em.DisconnectCallbackFunc)
  125. Subscribe(string, em.EventCallbackFunc) error
  126. Unsubscribe(string) error
  127. }
  128. // Unmarshal a json event
  129. func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) {
  130. var err error
  131. result := new(ctypes.TMResult)
  132. wire.ReadJSONPtr(result, b, &err)
  133. if err != nil {
  134. return "", nil, err
  135. }
  136. event, ok := (*result).(*ctypes.ResultEvent)
  137. if !ok {
  138. return "", nil, nil // TODO: handle non-event messages (ie. return from subscribe/unsubscribe)
  139. // fmt.Errorf("Result is not type *ctypes.ResultEvent. Got %v", reflect.TypeOf(*result))
  140. }
  141. return event.Name, event.Data, nil
  142. }