You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

149 lines
3.1 KiB

  1. package main
  2. import (
  3. "math/rand"
  4. "time"
  5. tmtypes "github.com/tendermint/tendermint/types"
  6. )
  7. // waiting more than this many seconds for a block means we're unhealthy
  8. const nodeLivenessTimeout = 5 * time.Second
  9. type Monitor struct {
  10. Nodes map[string]*Node
  11. Network *Network
  12. monitorQuit chan struct{} // monitor exitting
  13. nodeQuit map[string]chan struct{} // node is being stopped and removed from under the monitor
  14. }
  15. func NewMonitor() *Monitor {
  16. return &Monitor{
  17. Nodes: make(map[string]*Node),
  18. Network: NewNetwork(),
  19. monitorQuit: make(chan struct{}),
  20. nodeQuit: make(map[string]chan struct{}),
  21. }
  22. }
  23. func (m *Monitor) Monitor(n *Node) error {
  24. m.Nodes[n.Name] = n
  25. blockCh := make(chan tmtypes.Header, 10)
  26. n.SendBlocksTo(blockCh)
  27. blockLatencyCh := make(chan float64, 10)
  28. n.SendBlockLatenciesTo(blockLatencyCh)
  29. disconnectCh := make(chan bool, 10)
  30. n.NotifyAboutDisconnects(disconnectCh)
  31. if err := n.Start(); err != nil {
  32. return err
  33. }
  34. m.Network.NewNode(n.Name)
  35. m.nodeQuit[n.Name] = make(chan struct{})
  36. go m.listen(n.Name, blockCh, blockLatencyCh, disconnectCh, m.nodeQuit[n.Name])
  37. return nil
  38. }
  39. func (m *Monitor) Unmonitor(n *Node) {
  40. m.Network.NodeDeleted(n.Name)
  41. n.Stop()
  42. close(m.nodeQuit[n.Name])
  43. delete(m.nodeQuit, n.Name)
  44. delete(m.Nodes, n.Name)
  45. }
  46. func (m *Monitor) Start() error {
  47. go m.recalculateNetworkUptime()
  48. go m.updateNumValidators()
  49. return nil
  50. }
  51. func (m *Monitor) Stop() {
  52. close(m.monitorQuit)
  53. for _, n := range m.Nodes {
  54. m.Unmonitor(n)
  55. }
  56. }
  57. // main loop where we listen for events from the node
  58. func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLatencyCh <-chan float64, disconnectCh <-chan bool, quit <-chan struct{}) {
  59. for {
  60. select {
  61. case <-quit:
  62. return
  63. case b := <-blockCh:
  64. m.Network.NewBlock(b)
  65. case l := <-blockLatencyCh:
  66. m.Network.NewBlockLatency(l)
  67. case disconnected := <-disconnectCh:
  68. if disconnected {
  69. m.Network.NodeIsDown(nodeName)
  70. } else {
  71. m.Network.NodeIsOnline(nodeName)
  72. }
  73. case <-time.After(nodeLivenessTimeout):
  74. m.Network.NodeIsDown(nodeName)
  75. }
  76. }
  77. }
  78. // recalculateNetworkUptime every N seconds.
  79. func (m *Monitor) recalculateNetworkUptime() {
  80. for {
  81. select {
  82. case <-m.monitorQuit:
  83. return
  84. case <-time.After(10 * time.Second):
  85. m.Network.RecalculateUptime()
  86. }
  87. }
  88. }
  89. // updateNumValidators sends a request to a random node once every N seconds,
  90. // which in turn makes an RPC call to get the latest validators.
  91. func (m *Monitor) updateNumValidators() {
  92. rand.Seed(time.Now().Unix())
  93. var height uint64
  94. var num int
  95. var err error
  96. for {
  97. if 0 == len(m.Nodes) {
  98. m.Network.NumValidators = 0
  99. time.Sleep(5 * time.Second)
  100. continue
  101. }
  102. randomNodeIndex := rand.Intn(len(m.Nodes))
  103. select {
  104. case <-m.monitorQuit:
  105. return
  106. case <-time.After(5 * time.Second):
  107. i := 0
  108. for _, n := range m.Nodes {
  109. if i == randomNodeIndex {
  110. height, num, err = n.NumValidators()
  111. if err != nil {
  112. log.Debug(err.Error())
  113. }
  114. break
  115. }
  116. i++
  117. }
  118. if m.Network.Height <= height {
  119. m.Network.NumValidators = num
  120. }
  121. }
  122. }
  123. }