You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

218 lines
5.4 KiB

  1. package monitor
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "time"
  6. "github.com/go-kit/kit/log"
  7. "github.com/pkg/errors"
  8. tmtypes "github.com/tendermint/tendermint/types"
  9. )
  10. // waiting more than this many seconds for a block means we're unhealthy
  11. const nodeLivenessTimeout = 5 * time.Second
  12. // Monitor keeps track of the nodes and updates common statistics upon
  13. // receiving new events from nodes.
  14. //
  15. // Common statistics is stored in Network struct.
  16. type Monitor struct {
  17. Nodes []*Node
  18. Network *Network
  19. monitorQuit chan struct{} // monitor exitting
  20. nodeQuit map[string]chan struct{} // node is being stopped and removed from under the monitor
  21. recalculateNetworkUptimeEvery time.Duration
  22. numValidatorsUpdateInterval time.Duration
  23. logger log.Logger
  24. }
  25. // NewMonitor creates new instance of a Monitor. You can provide options to
  26. // change some default values.
  27. //
  28. // Example:
  29. // NewMonitor(monitor.SetNumValidatorsUpdateInterval(1 * time.Second))
  30. func NewMonitor(options ...func(*Monitor)) *Monitor {
  31. m := &Monitor{
  32. Nodes: make([]*Node, 0),
  33. Network: NewNetwork(),
  34. monitorQuit: make(chan struct{}),
  35. nodeQuit: make(map[string]chan struct{}),
  36. recalculateNetworkUptimeEvery: 10 * time.Second,
  37. numValidatorsUpdateInterval: 5 * time.Second,
  38. logger: log.NewNopLogger(),
  39. }
  40. for _, option := range options {
  41. option(m)
  42. }
  43. return m
  44. }
  45. // RecalculateNetworkUptimeEvery lets you change network uptime update interval.
  46. func RecalculateNetworkUptimeEvery(d time.Duration) func(m *Monitor) {
  47. return func(m *Monitor) {
  48. m.recalculateNetworkUptimeEvery = d
  49. }
  50. }
  51. // SetNumValidatorsUpdateInterval lets you change num validators update interval.
  52. func SetNumValidatorsUpdateInterval(d time.Duration) func(m *Monitor) {
  53. return func(m *Monitor) {
  54. m.numValidatorsUpdateInterval = d
  55. }
  56. }
  57. // SetLogger lets you set your own logger
  58. func (m *Monitor) SetLogger(l log.Logger) {
  59. m.logger = l
  60. }
  61. // Monitor begins to monitor the node `n`. The node will be started and added
  62. // to the monitor.
  63. func (m *Monitor) Monitor(n *Node) error {
  64. m.Nodes = append(m.Nodes, n)
  65. blockCh := make(chan tmtypes.Header, 10)
  66. n.SendBlocksTo(blockCh)
  67. blockLatencyCh := make(chan float64, 10)
  68. n.SendBlockLatenciesTo(blockLatencyCh)
  69. disconnectCh := make(chan bool, 10)
  70. n.NotifyAboutDisconnects(disconnectCh)
  71. if err := n.Start(); err != nil {
  72. return err
  73. }
  74. m.Network.NewNode(n.Name)
  75. m.nodeQuit[n.Name] = make(chan struct{})
  76. go m.listen(n.Name, blockCh, blockLatencyCh, disconnectCh, m.nodeQuit[n.Name])
  77. return nil
  78. }
  79. // Unmonitor stops monitoring node `n`. The node will be stopped and removed
  80. // from the monitor.
  81. func (m *Monitor) Unmonitor(n *Node) {
  82. m.Network.NodeDeleted(n.Name)
  83. n.Stop()
  84. close(m.nodeQuit[n.Name])
  85. delete(m.nodeQuit, n.Name)
  86. i, _ := m.NodeByName(n.Name)
  87. m.Nodes[i] = m.Nodes[len(m.Nodes)-1]
  88. m.Nodes = m.Nodes[:len(m.Nodes)-1]
  89. }
  90. // NodeByName returns the node and its index if such node exists within the
  91. // monitor. Otherwise, -1 and nil are returned.
  92. func (m *Monitor) NodeByName(name string) (index int, node *Node) {
  93. for i, n := range m.Nodes {
  94. if name == n.Name {
  95. return i, n
  96. }
  97. }
  98. return -1, nil
  99. }
  100. // Start starts the monitor's routines: recalculating network uptime and
  101. // updating number of validators.
  102. func (m *Monitor) Start() error {
  103. go m.recalculateNetworkUptimeLoop()
  104. go m.updateNumValidatorLoop()
  105. return nil
  106. }
  107. // Stop stops the monitor's routines.
  108. func (m *Monitor) Stop() {
  109. close(m.monitorQuit)
  110. for _, n := range m.Nodes {
  111. m.Unmonitor(n)
  112. }
  113. }
  114. // main loop where we listen for events from the node
  115. func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLatencyCh <-chan float64, disconnectCh <-chan bool, quit <-chan struct{}) {
  116. logger := log.With(m.logger, "node", nodeName)
  117. for {
  118. select {
  119. case <-quit:
  120. return
  121. case b := <-blockCh:
  122. m.Network.NewBlock(b)
  123. m.Network.NodeIsOnline(nodeName)
  124. case l := <-blockLatencyCh:
  125. m.Network.NewBlockLatency(l)
  126. m.Network.NodeIsOnline(nodeName)
  127. case disconnected := <-disconnectCh:
  128. if disconnected {
  129. m.Network.NodeIsDown(nodeName)
  130. } else {
  131. m.Network.NodeIsOnline(nodeName)
  132. }
  133. case <-time.After(nodeLivenessTimeout):
  134. logger.Log("event", fmt.Sprintf("node was not responding for %v", nodeLivenessTimeout))
  135. m.Network.NodeIsDown(nodeName)
  136. }
  137. }
  138. }
  139. // recalculateNetworkUptimeLoop every N seconds.
  140. func (m *Monitor) recalculateNetworkUptimeLoop() {
  141. for {
  142. select {
  143. case <-m.monitorQuit:
  144. return
  145. case <-time.After(m.recalculateNetworkUptimeEvery):
  146. m.Network.RecalculateUptime()
  147. }
  148. }
  149. }
  150. // updateNumValidatorLoop sends a request to a random node once every N seconds,
  151. // which in turn makes an RPC call to get the latest validators.
  152. func (m *Monitor) updateNumValidatorLoop() {
  153. rand.Seed(time.Now().Unix())
  154. var height uint64
  155. var num int
  156. var err error
  157. for {
  158. if 0 == len(m.Nodes) {
  159. time.Sleep(m.numValidatorsUpdateInterval)
  160. continue
  161. }
  162. randomNodeIndex := rand.Intn(len(m.Nodes))
  163. select {
  164. case <-m.monitorQuit:
  165. return
  166. case <-time.After(m.numValidatorsUpdateInterval):
  167. i := 0
  168. for _, n := range m.Nodes {
  169. if i == randomNodeIndex {
  170. height, num, err = n.NumValidators()
  171. if err != nil {
  172. m.logger.Log("err", errors.Wrap(err, "update num validators failed"))
  173. }
  174. break
  175. }
  176. i++
  177. }
  178. if m.Network.Height <= height {
  179. m.Network.NumValidators = num
  180. }
  181. }
  182. }
  183. }