You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

251 lines
6.0 KiB

package monitor
import (
"fmt"
"math/rand"
"sync"
"time"
"github.com/pkg/errors"
"github.com/tendermint/tendermint/libs/log"
tmtypes "github.com/tendermint/tendermint/types"
)
// waiting more than this many seconds for a block means we're unhealthy
const nodeLivenessTimeout = 5 * time.Second
// Monitor keeps track of the nodes and updates common statistics upon
// receiving new events from nodes.
//
// Common statistics is stored in Network struct.
type Monitor struct {
mtx sync.Mutex
Nodes []*Node
Network *Network
monitorQuit chan struct{} // monitor exitting
nodeQuit map[string]chan struct{} // node is being stopped and removed from under the monitor
recalculateNetworkUptimeEvery time.Duration
numValidatorsUpdateInterval time.Duration
logger log.Logger
}
// NewMonitor creates new instance of a Monitor. You can provide options to
// change some default values.
//
// Example:
// NewMonitor(monitor.SetNumValidatorsUpdateInterval(1 * time.Second))
func NewMonitor(options ...func(*Monitor)) *Monitor {
m := &Monitor{
Nodes: make([]*Node, 0),
Network: NewNetwork(),
monitorQuit: make(chan struct{}),
nodeQuit: make(map[string]chan struct{}),
recalculateNetworkUptimeEvery: 10 * time.Second,
numValidatorsUpdateInterval: 5 * time.Second,
logger: log.NewNopLogger(),
}
for _, option := range options {
option(m)
}
return m
}
// RecalculateNetworkUptimeEvery lets you change network uptime update interval.
func RecalculateNetworkUptimeEvery(d time.Duration) func(m *Monitor) {
return func(m *Monitor) {
m.recalculateNetworkUptimeEvery = d
}
}
// SetNumValidatorsUpdateInterval lets you change num validators update interval.
func SetNumValidatorsUpdateInterval(d time.Duration) func(m *Monitor) {
return func(m *Monitor) {
m.numValidatorsUpdateInterval = d
}
}
// SetLogger lets you set your own logger
func (m *Monitor) SetLogger(l log.Logger) {
m.logger = l
}
// Monitor begins to monitor the node `n`. The node will be started and added
// to the monitor.
func (m *Monitor) Monitor(n *Node) error {
m.mtx.Lock()
m.Nodes = append(m.Nodes, n)
m.mtx.Unlock()
blockCh := make(chan tmtypes.Header, 10)
n.SendBlocksTo(blockCh)
blockLatencyCh := make(chan float64, 10)
n.SendBlockLatenciesTo(blockLatencyCh)
disconnectCh := make(chan bool, 10)
n.NotifyAboutDisconnects(disconnectCh)
if err := n.Start(); err != nil {
return err
}
m.Network.NewNode(n.Name)
m.nodeQuit[n.Name] = make(chan struct{})
go m.listen(n.Name, blockCh, blockLatencyCh, disconnectCh, m.nodeQuit[n.Name])
return nil
}
// Unmonitor stops monitoring node `n`. The node will be stopped and removed
// from the monitor.
func (m *Monitor) Unmonitor(n *Node) {
m.Network.NodeDeleted(n.Name)
n.Stop()
close(m.nodeQuit[n.Name])
delete(m.nodeQuit, n.Name)
i, _ := m.NodeByName(n.Name)
m.mtx.Lock()
m.Nodes[i] = m.Nodes[len(m.Nodes)-1]
m.Nodes = m.Nodes[:len(m.Nodes)-1]
m.mtx.Unlock()
}
// NodeByName returns the node and its index if such node exists within the
// monitor. Otherwise, -1 and nil are returned.
func (m *Monitor) NodeByName(name string) (index int, node *Node) {
m.mtx.Lock()
defer m.mtx.Unlock()
for i, n := range m.Nodes {
if name == n.Name {
return i, n
}
}
return -1, nil
}
// NodeIsOnline is called when connection to the node is restored.
// Must be safe to call multiple times.
func (m *Monitor) NodeIsOnline(name string) {
_, node := m.NodeByName(name)
if nil != node {
if online, ok := m.Network.nodeStatusMap[name]; ok && online {
m.mtx.Lock()
node.Online = online
m.mtx.Unlock()
}
}
}
// Start starts the monitor's routines: recalculating network uptime and
// updating number of validators.
func (m *Monitor) Start() error {
go m.recalculateNetworkUptimeLoop()
go m.updateNumValidatorLoop()
return nil
}
// Stop stops the monitor's routines.
func (m *Monitor) Stop() {
close(m.monitorQuit)
for _, n := range m.Nodes {
m.Unmonitor(n)
}
}
// main loop where we listen for events from the node
func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLatencyCh <-chan float64, disconnectCh <-chan bool, quit <-chan struct{}) {
logger := m.logger.With("node", nodeName)
for {
select {
case <-quit:
return
case b := <-blockCh:
m.Network.NewBlock(b)
m.Network.NodeIsOnline(nodeName)
m.NodeIsOnline(nodeName)
case l := <-blockLatencyCh:
m.Network.NewBlockLatency(l)
m.Network.NodeIsOnline(nodeName)
m.NodeIsOnline(nodeName)
case disconnected := <-disconnectCh:
if disconnected {
m.Network.NodeIsDown(nodeName)
} else {
m.Network.NodeIsOnline(nodeName)
m.NodeIsOnline(nodeName)
}
case <-time.After(nodeLivenessTimeout):
logger.Info("event", fmt.Sprintf("node was not responding for %v", nodeLivenessTimeout))
m.Network.NodeIsDown(nodeName)
}
}
}
// recalculateNetworkUptimeLoop every N seconds.
func (m *Monitor) recalculateNetworkUptimeLoop() {
for {
select {
case <-m.monitorQuit:
return
case <-time.After(m.recalculateNetworkUptimeEvery):
m.Network.RecalculateUptime()
}
}
}
// updateNumValidatorLoop sends a request to a random node once every N seconds,
// which in turn makes an RPC call to get the latest validators.
func (m *Monitor) updateNumValidatorLoop() {
rand.Seed(time.Now().Unix())
var height int64
var num int
var err error
for {
m.mtx.Lock()
nodesCount := len(m.Nodes)
m.mtx.Unlock()
if 0 == nodesCount {
time.Sleep(m.numValidatorsUpdateInterval)
continue
}
randomNodeIndex := rand.Intn(nodesCount)
select {
case <-m.monitorQuit:
return
case <-time.After(m.numValidatorsUpdateInterval):
i := 0
m.mtx.Lock()
for _, n := range m.Nodes {
if i == randomNodeIndex {
height, num, err = n.NumValidators()
if err != nil {
m.logger.Info("err", errors.Wrap(err, "update num validators failed"))
}
break
}
i++
}
m.mtx.Unlock()
m.Network.UpdateNumValidatorsForHeight(num, height)
}
}
}