Browse Source

tests for node and monitor

pull/1943/head
Anton Kaliaev 8 years ago
parent
commit
5c9ec9344a
No known key found for this signature in database GPG Key ID: 7B6881D965918214
7 changed files with 237 additions and 48 deletions
  1. +21
    -0
      tm-monitor/mock/mock.go
  2. +56
    -16
      tm-monitor/monitor.go
  3. +64
    -3
      tm-monitor/monitor_test.go
  4. +10
    -0
      tm-monitor/network.go
  5. +48
    -20
      tm-monitor/node.go
  6. +37
    -8
      tm-monitor/node_test.go
  7. +1
    -1
      tm-monitor/ton.go

+ 21
- 0
tm-monitor/mock/mock.go View File

@ -1,7 +1,11 @@
package mock package mock
import ( import (
"log"
"reflect"
em "github.com/tendermint/go-event-meter" em "github.com/tendermint/go-event-meter"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
) )
type EventMeter struct { type EventMeter struct {
@ -35,3 +39,20 @@ func (e *EventMeter) Call(callback string, args ...interface{}) {
e.eventCallback(args[0].(*em.EventMetric), args[1]) e.eventCallback(args[0].(*em.EventMetric), args[1])
} }
} }
type RpcClient struct {
Stubs map[string]ctypes.TMResult
}
func (c *RpcClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
s, ok := c.Stubs[method]
if !ok {
log.Fatalf("Call to %s, but no stub is defined for it", method)
}
rv, rt := reflect.ValueOf(result), reflect.TypeOf(result)
rv, rt = rv.Elem(), rt.Elem()
rv.Set(reflect.ValueOf(s))
return s, nil
}

+ 56
- 16
tm-monitor/monitor.go View File

@ -10,23 +10,59 @@ import (
// waiting more than this many seconds for a block means we're unhealthy // waiting more than this many seconds for a block means we're unhealthy
const nodeLivenessTimeout = 5 * time.Second const nodeLivenessTimeout = 5 * time.Second
// Monitor keeps track of the nodes and updates common statistics upon
// receiving new events from nodes.
//
// Common statistics is stored in Network struct.
type Monitor struct { type Monitor struct {
Nodes map[string]*Node Nodes map[string]*Node
Network *Network Network *Network
monitorQuit chan struct{} // monitor exitting monitorQuit chan struct{} // monitor exitting
nodeQuit map[string]chan struct{} // node is being stopped and removed from under the monitor nodeQuit map[string]chan struct{} // node is being stopped and removed from under the monitor
recalculateNetworkUptimeEvery time.Duration
numValidatorsUpdateInterval time.Duration
}
// NewMonitor creates new instance of a Monitor. You can provide options to
// change some default values.
//
// Example:
// NewMonitor(monitor.SetNumValidatorsUpdateInterval(1 * time.Second))
func NewMonitor(options ...func(*Monitor)) *Monitor {
m := &Monitor{
Nodes: make(map[string]*Node),
Network: NewNetwork(),
monitorQuit: make(chan struct{}),
nodeQuit: make(map[string]chan struct{}),
recalculateNetworkUptimeEvery: 10 * time.Second,
numValidatorsUpdateInterval: 5 * time.Second,
}
for _, option := range options {
option(m)
}
return m
}
// RecalculateNetworkUptimeEvery lets you change network uptime update interval.
func RecalculateNetworkUptimeEvery(d time.Duration) func(m *Monitor) {
return func(m *Monitor) {
m.recalculateNetworkUptimeEvery = d
}
} }
func NewMonitor() *Monitor {
return &Monitor{
Nodes: make(map[string]*Node),
Network: NewNetwork(),
monitorQuit: make(chan struct{}),
nodeQuit: make(map[string]chan struct{}),
// SetNumValidatorsUpdateInterval lets you change num validators update interval.
func SetNumValidatorsUpdateInterval(d time.Duration) func(m *Monitor) {
return func(m *Monitor) {
m.numValidatorsUpdateInterval = d
} }
} }
// Monitor begins to monitor the node `n`. The node will be started and added
// to the monitor.
func (m *Monitor) Monitor(n *Node) error { func (m *Monitor) Monitor(n *Node) error {
m.Nodes[n.Name] = n m.Nodes[n.Name] = n
@ -49,6 +85,8 @@ func (m *Monitor) Monitor(n *Node) error {
return nil return nil
} }
// Unmonitor stops monitoring node `n`. The node will be stopped and removed
// from the monitor.
func (m *Monitor) Unmonitor(n *Node) { func (m *Monitor) Unmonitor(n *Node) {
m.Network.NodeDeleted(n.Name) m.Network.NodeDeleted(n.Name)
@ -58,13 +96,16 @@ func (m *Monitor) Unmonitor(n *Node) {
delete(m.Nodes, n.Name) delete(m.Nodes, n.Name)
} }
// Start starts the monitor's routines: recalculating network uptime and
// updating number of validators.
func (m *Monitor) Start() error { func (m *Monitor) Start() error {
go m.recalculateNetworkUptime()
go m.updateNumValidators()
go m.recalculateNetworkUptimeLoop()
go m.updateNumValidatorLoop()
return nil return nil
} }
// Stop stops the monitor's routines.
func (m *Monitor) Stop() { func (m *Monitor) Stop() {
close(m.monitorQuit) close(m.monitorQuit)
@ -95,21 +136,21 @@ func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLa
} }
} }
// recalculateNetworkUptime every N seconds.
func (m *Monitor) recalculateNetworkUptime() {
// recalculateNetworkUptimeLoop every N seconds.
func (m *Monitor) recalculateNetworkUptimeLoop() {
for { for {
select { select {
case <-m.monitorQuit: case <-m.monitorQuit:
return return
case <-time.After(10 * time.Second):
case <-time.After(m.recalculateNetworkUptimeEvery):
m.Network.RecalculateUptime() m.Network.RecalculateUptime()
} }
} }
} }
// updateNumValidators sends a request to a random node once every N seconds,
// updateNumValidatorLoop sends a request to a random node once every N seconds,
// which in turn makes an RPC call to get the latest validators. // which in turn makes an RPC call to get the latest validators.
func (m *Monitor) updateNumValidators() {
func (m *Monitor) updateNumValidatorLoop() {
rand.Seed(time.Now().Unix()) rand.Seed(time.Now().Unix())
var height uint64 var height uint64
@ -118,8 +159,7 @@ func (m *Monitor) updateNumValidators() {
for { for {
if 0 == len(m.Nodes) { if 0 == len(m.Nodes) {
m.Network.NumValidators = 0
time.Sleep(5 * time.Second)
time.Sleep(m.numValidatorsUpdateInterval)
continue continue
} }
@ -128,7 +168,7 @@ func (m *Monitor) updateNumValidators() {
select { select {
case <-m.monitorQuit: case <-m.monitorQuit:
return return
case <-time.After(5 * time.Second):
case <-time.After(m.numValidatorsUpdateInterval):
i := 0 i := 0
for _, n := range m.Nodes { for _, n := range m.Nodes {
if i == randomNodeIndex { if i == randomNodeIndex {


+ 64
- 3
tm-monitor/monitor_test.go View File

@ -1,11 +1,72 @@
package main_test package main_test
import "testing"
import (
"testing"
"time"
func TestMonitorStartStop(t *testing.T) {
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
crypto "github.com/tendermint/go-crypto"
monitor "github.com/tendermint/netmon/tm-monitor"
mock "github.com/tendermint/netmon/tm-monitor/mock"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
)
func TestMonitorUpdatesNumberOfValidators(t *testing.T) {
assert := assert.New(t)
m := startMonitor(t)
defer m.Stop()
n, _ := createValidatorNode(t)
m.Monitor(n)
assert.Equal(1, m.Network.NumNodesMonitored)
assert.Equal(1, m.Network.NumNodesMonitoredOnline)
time.Sleep(1 * time.Second)
assert.Equal(1, m.Network.NumValidators)
}
func TestMonitorRecalculatesNetworkUptime(t *testing.T) {
assert := assert.New(t)
m := startMonitor(t)
defer m.Stop()
assert.Equal(100.0, m.Network.Uptime())
n, _ := createValidatorNode(t)
m.Monitor(n)
m.Network.NodeIsDown(n.Name) // simulate node failure
time.Sleep(200 * time.Millisecond)
m.Network.NodeIsOnline(n.Name)
time.Sleep(1 * time.Second)
assert.True(m.Network.Uptime() < 100.0, "Uptime should be less than 100%")
} }
func TestMonitorReceivesNewBlocksFromNodes(t *testing.T) {
func startMonitor(t *testing.T) *monitor.Monitor {
m := monitor.NewMonitor(
monitor.SetNumValidatorsUpdateInterval(200*time.Millisecond),
monitor.RecalculateNetworkUptimeEvery(200*time.Millisecond),
)
err := m.Start()
require.Nil(t, err)
return m
}
func createValidatorNode(t *testing.T) (n *monitor.Node, emMock *mock.EventMeter) {
emMock = &mock.EventMeter{}
stubs := make(map[string]ctypes.TMResult)
pubKey := crypto.GenPrivKeyEd25519().PubKey()
stubs["validators"] = &ctypes.ResultValidators{BlockHeight: blockHeight, Validators: []*tmtypes.Validator{tmtypes.NewValidator(pubKey, 0)}}
stubs["status"] = &ctypes.ResultStatus{PubKey: pubKey}
rpcClientMock := &mock.RpcClient{stubs}
n = monitor.NewNodeWithEventMeterAndRpcClient("tcp://127.0.0.1:46657", emMock, rpcClientMock)
return
} }

+ 10
- 0
tm-monitor/network.go View File

@ -182,3 +182,13 @@ func (n *Network) GetHealthString() string {
return "undefined" return "undefined"
} }
} }
// Uptime returns network's uptime in percentages.
func (n *Network) Uptime() float64 {
return n.UptimeData.Uptime
}
// StartTime returns time we started monitoring.
func (n *Network) StartTime() time.Time {
return n.UptimeData.StartTime
}

+ 48
- 20
tm-monitor/node.go View File

@ -16,6 +16,11 @@ import (
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
) )
// remove when https://github.com/tendermint/go-rpc/issues/8 will be fixed
type rpcClientI interface {
Call(method string, params map[string]interface{}, result interface{}) (interface{}, error)
}
const maxRestarts = 25 const maxRestarts = 25
type Node struct { type Node struct {
@ -32,28 +37,46 @@ type Node struct {
// em holds the ws connection. Each eventMeter callback is called in a separate go-routine. // em holds the ws connection. Each eventMeter callback is called in a separate go-routine.
em eventMeter em eventMeter
// rpcClient is an http client for making RPC calls to TM
rpcClient *rpc_client.ClientURI
// rpcClient is an client for making RPC calls to TM
rpcClient rpcClientI
blockCh chan<- tmtypes.Header blockCh chan<- tmtypes.Header
blockLatencyCh chan<- float64 blockLatencyCh chan<- float64
disconnectCh chan<- bool disconnectCh chan<- bool
checkIsValidatorInterval time.Duration
quit chan struct{} quit chan struct{}
} }
func NewNode(rpcAddr string) *Node {
func NewNode(rpcAddr string, options ...func(*Node)) *Node {
em := em.NewEventMeter(rpcAddr, UnmarshalEvent) em := em.NewEventMeter(rpcAddr, UnmarshalEvent)
return NewNodeWithEventMeter(rpcAddr, em)
rpcClient := rpc_client.NewClientURI(rpcAddr) // HTTP client by default
return NewNodeWithEventMeterAndRpcClient(rpcAddr, em, rpcClient, options...)
} }
func NewNodeWithEventMeter(rpcAddr string, em eventMeter) *Node {
return &Node{
func NewNodeWithEventMeterAndRpcClient(rpcAddr string, em eventMeter, rpcClient rpcClientI, options ...func(*Node)) *Node {
n := &Node{
rpcAddr: rpcAddr, rpcAddr: rpcAddr,
em: em, em: em,
rpcClient: rpc_client.NewClientURI(rpcAddr),
rpcClient: rpcClient,
Name: rpcAddr, Name: rpcAddr,
quit: make(chan struct{}), quit: make(chan struct{}),
checkIsValidatorInterval: 5 * time.Second,
}
for _, option := range options {
option(n)
}
return n
}
// SetCheckIsValidatorInterval lets you change interval for checking whenever
// node is still a validator or not.
func SetCheckIsValidatorInterval(d time.Duration) func(n *Node) {
return func(n *Node) {
n.checkIsValidatorInterval = d
} }
} }
@ -80,7 +103,8 @@ func (n *Node) Start() error {
n.Online = true n.Online = true
go n.checkIsValidator()
n.checkIsValidator()
go n.checkIsValidatorLoop()
return nil return nil
} }
@ -179,24 +203,28 @@ func (n *Node) validators() (height uint64, validators []*tmtypes.Validator, err
return uint64(vals.BlockHeight), vals.Validators, nil return uint64(vals.BlockHeight), vals.Validators, nil
} }
func (n *Node) checkIsValidator() {
func (n *Node) checkIsValidatorLoop() {
for { for {
select { select {
case <-n.quit: case <-n.quit:
return return
case <-time.After(5 * time.Second):
_, validators, err := n.validators()
if err == nil {
for _, v := range validators {
key, err := n.getPubKey()
if err == nil && v.PubKey == key {
n.IsValidator = true
}
}
} else {
log.Debug(err.Error())
case <-time.After(n.checkIsValidatorInterval):
n.checkIsValidator()
}
}
}
func (n *Node) checkIsValidator() {
_, validators, err := n.validators()
if err == nil {
for _, v := range validators {
key, err := n.getPubKey()
if err == nil && v.PubKey == key {
n.IsValidator = true
} }
} }
} else {
log.Debug(err.Error())
} }
} }


+ 37
- 8
tm-monitor/node_test.go View File

@ -6,26 +6,34 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
crypto "github.com/tendermint/go-crypto"
em "github.com/tendermint/go-event-meter" em "github.com/tendermint/go-event-meter"
monitor "github.com/tendermint/netmon/tm-monitor" monitor "github.com/tendermint/netmon/tm-monitor"
mock "github.com/tendermint/netmon/tm-monitor/mock" mock "github.com/tendermint/netmon/tm-monitor/mock"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types" tmtypes "github.com/tendermint/tendermint/types"
) )
const (
blockHeight = 1
)
func TestNodeStartStop(t *testing.T) { func TestNodeStartStop(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
n, _ := setupNode(t)
assert.Equal(true, n.Online)
n, _ := startValidatorNode(t)
defer n.Stop()
n.Stop()
assert.Equal(true, n.Online)
assert.Equal(true, n.IsValidator)
} }
func TestNodeNewBlockReceived(t *testing.T) { func TestNodeNewBlockReceived(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
blockCh := make(chan tmtypes.Header, 100) blockCh := make(chan tmtypes.Header, 100)
n, emMock := setupNode(t)
n, emMock := startValidatorNode(t)
defer n.Stop()
n.SendBlocksTo(blockCh) n.SendBlocksTo(blockCh)
blockHeader := &tmtypes.Header{Height: 5} blockHeader := &tmtypes.Header{Height: 5}
@ -39,7 +47,8 @@ func TestNodeNewBlockLatencyReceived(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
blockLatencyCh := make(chan float64, 100) blockLatencyCh := make(chan float64, 100)
n, emMock := setupNode(t)
n, emMock := startValidatorNode(t)
defer n.Stop()
n.SendBlockLatenciesTo(blockLatencyCh) n.SendBlockLatenciesTo(blockLatencyCh)
emMock.Call("latencyCallback", 1000000.0) emMock.Call("latencyCallback", 1000000.0)
@ -52,7 +61,8 @@ func TestNodeConnectionLost(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
disconnectCh := make(chan bool, 100) disconnectCh := make(chan bool, 100)
n, emMock := setupNode(t)
n, emMock := startValidatorNode(t)
defer n.Stop()
n.NotifyAboutDisconnects(disconnectCh) n.NotifyAboutDisconnects(disconnectCh)
emMock.Call("disconnectCallback") emMock.Call("disconnectCallback")
@ -64,9 +74,28 @@ func TestNodeConnectionLost(t *testing.T) {
assert.Equal(true, n.Online) assert.Equal(true, n.Online)
} }
func setupNode(t *testing.T) (n *monitor.Node, emMock *mock.EventMeter) {
func TestNumValidators(t *testing.T) {
assert := assert.New(t)
n, _ := startValidatorNode(t)
defer n.Stop()
height, num, err := n.NumValidators()
assert.Nil(err)
assert.Equal(uint64(blockHeight), height)
assert.Equal(1, num)
}
func startValidatorNode(t *testing.T) (n *monitor.Node, emMock *mock.EventMeter) {
emMock = &mock.EventMeter{} emMock = &mock.EventMeter{}
n = monitor.NewNodeWithEventMeter("tcp://127.0.0.1:46657", emMock)
stubs := make(map[string]ctypes.TMResult)
pubKey := crypto.GenPrivKeyEd25519().PubKey()
stubs["validators"] = &ctypes.ResultValidators{BlockHeight: blockHeight, Validators: []*tmtypes.Validator{tmtypes.NewValidator(pubKey, 0)}}
stubs["status"] = &ctypes.ResultStatus{PubKey: pubKey}
rpcClientMock := &mock.RpcClient{stubs}
n = monitor.NewNodeWithEventMeterAndRpcClient("tcp://127.0.0.1:46657", emMock, rpcClientMock)
err := n.Start() err := n.Start()
require.Nil(t, err) require.Nil(t, err)


+ 1
- 1
tm-monitor/ton.go View File

@ -59,7 +59,7 @@ func (o *Ton) Stop() {
func (o *Ton) printHeader() { func (o *Ton) printHeader() {
n := o.monitor.Network n := o.monitor.Network
fmt.Fprintf(o.Output, "%v up %.2f%%\n", n.UptimeData.StartTime, n.UptimeData.Uptime)
fmt.Fprintf(o.Output, "%v up %.2f%%\n", n.StartTime(), n.Uptime())
fmt.Println() fmt.Println()
fmt.Fprintf(o.Output, "Height: %d\n", n.Height) fmt.Fprintf(o.Output, "Height: %d\n", n.Height)
fmt.Fprintf(o.Output, "Avg block time: %.3f ms\n", n.AvgBlockTime) fmt.Fprintf(o.Output, "Avg block time: %.3f ms\n", n.AvgBlockTime)


Loading…
Cancel
Save