You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

205 lines
5.0 KiB

  1. package monitor
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "time"
  6. "github.com/go-kit/kit/log"
  7. "github.com/pkg/errors"
  8. tmtypes "github.com/tendermint/tendermint/types"
  9. )
  10. // waiting more than this many seconds for a block means we're unhealthy
  11. const nodeLivenessTimeout = 5 * time.Second
  12. // Monitor keeps track of the nodes and updates common statistics upon
  13. // receiving new events from nodes.
  14. //
  15. // Common statistics is stored in Network struct.
  16. type Monitor struct {
  17. Nodes map[string]*Node
  18. Network *Network
  19. monitorQuit chan struct{} // monitor exitting
  20. nodeQuit map[string]chan struct{} // node is being stopped and removed from under the monitor
  21. recalculateNetworkUptimeEvery time.Duration
  22. numValidatorsUpdateInterval time.Duration
  23. logger log.Logger
  24. }
  25. // NewMonitor creates new instance of a Monitor. You can provide options to
  26. // change some default values.
  27. //
  28. // Example:
  29. // NewMonitor(monitor.SetNumValidatorsUpdateInterval(1 * time.Second))
  30. func NewMonitor(options ...func(*Monitor)) *Monitor {
  31. m := &Monitor{
  32. Nodes: make(map[string]*Node),
  33. Network: NewNetwork(),
  34. monitorQuit: make(chan struct{}),
  35. nodeQuit: make(map[string]chan struct{}),
  36. recalculateNetworkUptimeEvery: 10 * time.Second,
  37. numValidatorsUpdateInterval: 5 * time.Second,
  38. logger: log.NewNopLogger(),
  39. }
  40. for _, option := range options {
  41. option(m)
  42. }
  43. return m
  44. }
  45. // RecalculateNetworkUptimeEvery lets you change network uptime update interval.
  46. func RecalculateNetworkUptimeEvery(d time.Duration) func(m *Monitor) {
  47. return func(m *Monitor) {
  48. m.recalculateNetworkUptimeEvery = d
  49. }
  50. }
  51. // SetNumValidatorsUpdateInterval lets you change num validators update interval.
  52. func SetNumValidatorsUpdateInterval(d time.Duration) func(m *Monitor) {
  53. return func(m *Monitor) {
  54. m.numValidatorsUpdateInterval = d
  55. }
  56. }
  57. // SetLogger lets you set your own logger
  58. func (m *Monitor) SetLogger(l log.Logger) {
  59. m.logger = l
  60. }
  61. // Monitor begins to monitor the node `n`. The node will be started and added
  62. // to the monitor.
  63. func (m *Monitor) Monitor(n *Node) error {
  64. m.Nodes[n.Name] = n
  65. blockCh := make(chan tmtypes.Header, 10)
  66. n.SendBlocksTo(blockCh)
  67. blockLatencyCh := make(chan float64, 10)
  68. n.SendBlockLatenciesTo(blockLatencyCh)
  69. disconnectCh := make(chan bool, 10)
  70. n.NotifyAboutDisconnects(disconnectCh)
  71. if err := n.Start(); err != nil {
  72. return err
  73. }
  74. m.Network.NewNode(n.Name)
  75. m.nodeQuit[n.Name] = make(chan struct{})
  76. go m.listen(n.Name, blockCh, blockLatencyCh, disconnectCh, m.nodeQuit[n.Name])
  77. return nil
  78. }
  79. // Unmonitor stops monitoring node `n`. The node will be stopped and removed
  80. // from the monitor.
  81. func (m *Monitor) Unmonitor(n *Node) {
  82. m.Network.NodeDeleted(n.Name)
  83. n.Stop()
  84. close(m.nodeQuit[n.Name])
  85. delete(m.nodeQuit, n.Name)
  86. delete(m.Nodes, n.Name)
  87. }
  88. // Start starts the monitor's routines: recalculating network uptime and
  89. // updating number of validators.
  90. func (m *Monitor) Start() error {
  91. go m.recalculateNetworkUptimeLoop()
  92. go m.updateNumValidatorLoop()
  93. return nil
  94. }
  95. // Stop stops the monitor's routines.
  96. func (m *Monitor) Stop() {
  97. close(m.monitorQuit)
  98. for _, n := range m.Nodes {
  99. m.Unmonitor(n)
  100. }
  101. }
  102. // main loop where we listen for events from the node
  103. func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLatencyCh <-chan float64, disconnectCh <-chan bool, quit <-chan struct{}) {
  104. logger := log.With(m.logger, "node", nodeName)
  105. for {
  106. select {
  107. case <-quit:
  108. return
  109. case b := <-blockCh:
  110. m.Network.NewBlock(b)
  111. m.Network.NodeIsOnline(nodeName)
  112. case l := <-blockLatencyCh:
  113. m.Network.NewBlockLatency(l)
  114. m.Network.NodeIsOnline(nodeName)
  115. case disconnected := <-disconnectCh:
  116. if disconnected {
  117. m.Network.NodeIsDown(nodeName)
  118. } else {
  119. m.Network.NodeIsOnline(nodeName)
  120. }
  121. case <-time.After(nodeLivenessTimeout):
  122. logger.Log("event", fmt.Sprintf("node was not responding for %v", nodeLivenessTimeout))
  123. m.Network.NodeIsDown(nodeName)
  124. }
  125. }
  126. }
  127. // recalculateNetworkUptimeLoop every N seconds.
  128. func (m *Monitor) recalculateNetworkUptimeLoop() {
  129. for {
  130. select {
  131. case <-m.monitorQuit:
  132. return
  133. case <-time.After(m.recalculateNetworkUptimeEvery):
  134. m.Network.RecalculateUptime()
  135. }
  136. }
  137. }
  138. // updateNumValidatorLoop sends a request to a random node once every N seconds,
  139. // which in turn makes an RPC call to get the latest validators.
  140. func (m *Monitor) updateNumValidatorLoop() {
  141. rand.Seed(time.Now().Unix())
  142. var height uint64
  143. var num int
  144. var err error
  145. for {
  146. if 0 == len(m.Nodes) {
  147. time.Sleep(m.numValidatorsUpdateInterval)
  148. continue
  149. }
  150. randomNodeIndex := rand.Intn(len(m.Nodes))
  151. select {
  152. case <-m.monitorQuit:
  153. return
  154. case <-time.After(m.numValidatorsUpdateInterval):
  155. i := 0
  156. for _, n := range m.Nodes {
  157. if i == randomNodeIndex {
  158. height, num, err = n.NumValidators()
  159. if err != nil {
  160. m.logger.Log("err", errors.Wrap(err, "update num validators failed"))
  161. }
  162. break
  163. }
  164. i++
  165. }
  166. if m.Network.Height <= height {
  167. m.Network.NumValidators = num
  168. }
  169. }
  170. }
  171. }