From 5c9ec9344a7cf40e3dc01ff6a7a8718f37b4e7f2 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 6 Mar 2017 18:35:52 +0400 Subject: [PATCH] tests for node and monitor --- tm-monitor/mock/mock.go | 21 +++++++++++ tm-monitor/monitor.go | 72 +++++++++++++++++++++++++++++--------- tm-monitor/monitor_test.go | 67 +++++++++++++++++++++++++++++++++-- tm-monitor/network.go | 10 ++++++ tm-monitor/node.go | 68 ++++++++++++++++++++++++----------- tm-monitor/node_test.go | 45 +++++++++++++++++++----- tm-monitor/ton.go | 2 +- 7 files changed, 237 insertions(+), 48 deletions(-) diff --git a/tm-monitor/mock/mock.go b/tm-monitor/mock/mock.go index 80fca88b0..25b007e0a 100644 --- a/tm-monitor/mock/mock.go +++ b/tm-monitor/mock/mock.go @@ -1,7 +1,11 @@ package mock import ( + "log" + "reflect" + em "github.com/tendermint/go-event-meter" + ctypes "github.com/tendermint/tendermint/rpc/core/types" ) type EventMeter struct { @@ -35,3 +39,20 @@ func (e *EventMeter) Call(callback string, args ...interface{}) { e.eventCallback(args[0].(*em.EventMetric), args[1]) } } + +type RpcClient struct { + Stubs map[string]ctypes.TMResult +} + +func (c *RpcClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { + s, ok := c.Stubs[method] + if !ok { + log.Fatalf("Call to %s, but no stub is defined for it", method) + } + + rv, rt := reflect.ValueOf(result), reflect.TypeOf(result) + rv, rt = rv.Elem(), rt.Elem() + rv.Set(reflect.ValueOf(s)) + + return s, nil +} diff --git a/tm-monitor/monitor.go b/tm-monitor/monitor.go index 4a2ff1124..9d83d0f67 100644 --- a/tm-monitor/monitor.go +++ b/tm-monitor/monitor.go @@ -10,23 +10,59 @@ import ( // waiting more than this many seconds for a block means we're unhealthy const nodeLivenessTimeout = 5 * time.Second +// Monitor keeps track of the nodes and updates common statistics upon +// receiving new events from nodes. +// +// Common statistics is stored in Network struct. type Monitor struct { Nodes map[string]*Node Network *Network monitorQuit chan struct{} // monitor exitting nodeQuit map[string]chan struct{} // node is being stopped and removed from under the monitor + + recalculateNetworkUptimeEvery time.Duration + numValidatorsUpdateInterval time.Duration +} + +// NewMonitor creates new instance of a Monitor. You can provide options to +// change some default values. +// +// Example: +// NewMonitor(monitor.SetNumValidatorsUpdateInterval(1 * time.Second)) +func NewMonitor(options ...func(*Monitor)) *Monitor { + m := &Monitor{ + Nodes: make(map[string]*Node), + Network: NewNetwork(), + monitorQuit: make(chan struct{}), + nodeQuit: make(map[string]chan struct{}), + recalculateNetworkUptimeEvery: 10 * time.Second, + numValidatorsUpdateInterval: 5 * time.Second, + } + + for _, option := range options { + option(m) + } + + return m +} + +// RecalculateNetworkUptimeEvery lets you change network uptime update interval. +func RecalculateNetworkUptimeEvery(d time.Duration) func(m *Monitor) { + return func(m *Monitor) { + m.recalculateNetworkUptimeEvery = d + } } -func NewMonitor() *Monitor { - return &Monitor{ - Nodes: make(map[string]*Node), - Network: NewNetwork(), - monitorQuit: make(chan struct{}), - nodeQuit: make(map[string]chan struct{}), +// SetNumValidatorsUpdateInterval lets you change num validators update interval. +func SetNumValidatorsUpdateInterval(d time.Duration) func(m *Monitor) { + return func(m *Monitor) { + m.numValidatorsUpdateInterval = d } } +// Monitor begins to monitor the node `n`. The node will be started and added +// to the monitor. func (m *Monitor) Monitor(n *Node) error { m.Nodes[n.Name] = n @@ -49,6 +85,8 @@ func (m *Monitor) Monitor(n *Node) error { return nil } +// Unmonitor stops monitoring node `n`. The node will be stopped and removed +// from the monitor. func (m *Monitor) Unmonitor(n *Node) { m.Network.NodeDeleted(n.Name) @@ -58,13 +96,16 @@ func (m *Monitor) Unmonitor(n *Node) { delete(m.Nodes, n.Name) } +// Start starts the monitor's routines: recalculating network uptime and +// updating number of validators. func (m *Monitor) Start() error { - go m.recalculateNetworkUptime() - go m.updateNumValidators() + go m.recalculateNetworkUptimeLoop() + go m.updateNumValidatorLoop() return nil } +// Stop stops the monitor's routines. func (m *Monitor) Stop() { close(m.monitorQuit) @@ -95,21 +136,21 @@ func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLa } } -// recalculateNetworkUptime every N seconds. -func (m *Monitor) recalculateNetworkUptime() { +// recalculateNetworkUptimeLoop every N seconds. +func (m *Monitor) recalculateNetworkUptimeLoop() { for { select { case <-m.monitorQuit: return - case <-time.After(10 * time.Second): + case <-time.After(m.recalculateNetworkUptimeEvery): m.Network.RecalculateUptime() } } } -// updateNumValidators sends a request to a random node once every N seconds, +// updateNumValidatorLoop sends a request to a random node once every N seconds, // which in turn makes an RPC call to get the latest validators. -func (m *Monitor) updateNumValidators() { +func (m *Monitor) updateNumValidatorLoop() { rand.Seed(time.Now().Unix()) var height uint64 @@ -118,8 +159,7 @@ func (m *Monitor) updateNumValidators() { for { if 0 == len(m.Nodes) { - m.Network.NumValidators = 0 - time.Sleep(5 * time.Second) + time.Sleep(m.numValidatorsUpdateInterval) continue } @@ -128,7 +168,7 @@ func (m *Monitor) updateNumValidators() { select { case <-m.monitorQuit: return - case <-time.After(5 * time.Second): + case <-time.After(m.numValidatorsUpdateInterval): i := 0 for _, n := range m.Nodes { if i == randomNodeIndex { diff --git a/tm-monitor/monitor_test.go b/tm-monitor/monitor_test.go index de656de5f..bda29a452 100644 --- a/tm-monitor/monitor_test.go +++ b/tm-monitor/monitor_test.go @@ -1,11 +1,72 @@ package main_test -import "testing" +import ( + "testing" + "time" -func TestMonitorStartStop(t *testing.T) { + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + crypto "github.com/tendermint/go-crypto" + monitor "github.com/tendermint/netmon/tm-monitor" + mock "github.com/tendermint/netmon/tm-monitor/mock" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + tmtypes "github.com/tendermint/tendermint/types" +) + +func TestMonitorUpdatesNumberOfValidators(t *testing.T) { + assert := assert.New(t) + + m := startMonitor(t) + defer m.Stop() + + n, _ := createValidatorNode(t) + m.Monitor(n) + assert.Equal(1, m.Network.NumNodesMonitored) + assert.Equal(1, m.Network.NumNodesMonitoredOnline) + + time.Sleep(1 * time.Second) + + assert.Equal(1, m.Network.NumValidators) +} + +func TestMonitorRecalculatesNetworkUptime(t *testing.T) { + assert := assert.New(t) + + m := startMonitor(t) + defer m.Stop() + assert.Equal(100.0, m.Network.Uptime()) + + n, _ := createValidatorNode(t) + m.Monitor(n) + + m.Network.NodeIsDown(n.Name) // simulate node failure + time.Sleep(200 * time.Millisecond) + m.Network.NodeIsOnline(n.Name) + time.Sleep(1 * time.Second) + + assert.True(m.Network.Uptime() < 100.0, "Uptime should be less than 100%") } -func TestMonitorReceivesNewBlocksFromNodes(t *testing.T) { +func startMonitor(t *testing.T) *monitor.Monitor { + m := monitor.NewMonitor( + monitor.SetNumValidatorsUpdateInterval(200*time.Millisecond), + monitor.RecalculateNetworkUptimeEvery(200*time.Millisecond), + ) + err := m.Start() + require.Nil(t, err) + return m +} + +func createValidatorNode(t *testing.T) (n *monitor.Node, emMock *mock.EventMeter) { + emMock = &mock.EventMeter{} + + stubs := make(map[string]ctypes.TMResult) + pubKey := crypto.GenPrivKeyEd25519().PubKey() + stubs["validators"] = &ctypes.ResultValidators{BlockHeight: blockHeight, Validators: []*tmtypes.Validator{tmtypes.NewValidator(pubKey, 0)}} + stubs["status"] = &ctypes.ResultStatus{PubKey: pubKey} + rpcClientMock := &mock.RpcClient{stubs} + n = monitor.NewNodeWithEventMeterAndRpcClient("tcp://127.0.0.1:46657", emMock, rpcClientMock) + return } diff --git a/tm-monitor/network.go b/tm-monitor/network.go index 1929bc08e..0e9809b5a 100644 --- a/tm-monitor/network.go +++ b/tm-monitor/network.go @@ -182,3 +182,13 @@ func (n *Network) GetHealthString() string { return "undefined" } } + +// Uptime returns network's uptime in percentages. +func (n *Network) Uptime() float64 { + return n.UptimeData.Uptime +} + +// StartTime returns time we started monitoring. +func (n *Network) StartTime() time.Time { + return n.UptimeData.StartTime +} diff --git a/tm-monitor/node.go b/tm-monitor/node.go index 66e5ae7a3..3e668772e 100644 --- a/tm-monitor/node.go +++ b/tm-monitor/node.go @@ -16,6 +16,11 @@ import ( ctypes "github.com/tendermint/tendermint/rpc/core/types" ) +// remove when https://github.com/tendermint/go-rpc/issues/8 will be fixed +type rpcClientI interface { + Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) +} + const maxRestarts = 25 type Node struct { @@ -32,28 +37,46 @@ type Node struct { // em holds the ws connection. Each eventMeter callback is called in a separate go-routine. em eventMeter - // rpcClient is an http client for making RPC calls to TM - rpcClient *rpc_client.ClientURI + // rpcClient is an client for making RPC calls to TM + rpcClient rpcClientI blockCh chan<- tmtypes.Header blockLatencyCh chan<- float64 disconnectCh chan<- bool + checkIsValidatorInterval time.Duration + quit chan struct{} } -func NewNode(rpcAddr string) *Node { +func NewNode(rpcAddr string, options ...func(*Node)) *Node { em := em.NewEventMeter(rpcAddr, UnmarshalEvent) - return NewNodeWithEventMeter(rpcAddr, em) + rpcClient := rpc_client.NewClientURI(rpcAddr) // HTTP client by default + return NewNodeWithEventMeterAndRpcClient(rpcAddr, em, rpcClient, options...) } -func NewNodeWithEventMeter(rpcAddr string, em eventMeter) *Node { - return &Node{ +func NewNodeWithEventMeterAndRpcClient(rpcAddr string, em eventMeter, rpcClient rpcClientI, options ...func(*Node)) *Node { + n := &Node{ rpcAddr: rpcAddr, em: em, - rpcClient: rpc_client.NewClientURI(rpcAddr), + rpcClient: rpcClient, Name: rpcAddr, quit: make(chan struct{}), + checkIsValidatorInterval: 5 * time.Second, + } + + for _, option := range options { + option(n) + } + + return n +} + +// SetCheckIsValidatorInterval lets you change interval for checking whenever +// node is still a validator or not. +func SetCheckIsValidatorInterval(d time.Duration) func(n *Node) { + return func(n *Node) { + n.checkIsValidatorInterval = d } } @@ -80,7 +103,8 @@ func (n *Node) Start() error { n.Online = true - go n.checkIsValidator() + n.checkIsValidator() + go n.checkIsValidatorLoop() return nil } @@ -179,24 +203,28 @@ func (n *Node) validators() (height uint64, validators []*tmtypes.Validator, err return uint64(vals.BlockHeight), vals.Validators, nil } -func (n *Node) checkIsValidator() { +func (n *Node) checkIsValidatorLoop() { for { select { case <-n.quit: return - case <-time.After(5 * time.Second): - _, validators, err := n.validators() - if err == nil { - for _, v := range validators { - key, err := n.getPubKey() - if err == nil && v.PubKey == key { - n.IsValidator = true - } - } - } else { - log.Debug(err.Error()) + case <-time.After(n.checkIsValidatorInterval): + n.checkIsValidator() + } + } +} + +func (n *Node) checkIsValidator() { + _, validators, err := n.validators() + if err == nil { + for _, v := range validators { + key, err := n.getPubKey() + if err == nil && v.PubKey == key { + n.IsValidator = true } } + } else { + log.Debug(err.Error()) } } diff --git a/tm-monitor/node_test.go b/tm-monitor/node_test.go index 89d81324c..3818237dc 100644 --- a/tm-monitor/node_test.go +++ b/tm-monitor/node_test.go @@ -6,26 +6,34 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + crypto "github.com/tendermint/go-crypto" em "github.com/tendermint/go-event-meter" monitor "github.com/tendermint/netmon/tm-monitor" mock "github.com/tendermint/netmon/tm-monitor/mock" + ctypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" ) +const ( + blockHeight = 1 +) + func TestNodeStartStop(t *testing.T) { assert := assert.New(t) - n, _ := setupNode(t) - assert.Equal(true, n.Online) + n, _ := startValidatorNode(t) + defer n.Stop() - n.Stop() + assert.Equal(true, n.Online) + assert.Equal(true, n.IsValidator) } func TestNodeNewBlockReceived(t *testing.T) { assert := assert.New(t) blockCh := make(chan tmtypes.Header, 100) - n, emMock := setupNode(t) + n, emMock := startValidatorNode(t) + defer n.Stop() n.SendBlocksTo(blockCh) blockHeader := &tmtypes.Header{Height: 5} @@ -39,7 +47,8 @@ func TestNodeNewBlockLatencyReceived(t *testing.T) { assert := assert.New(t) blockLatencyCh := make(chan float64, 100) - n, emMock := setupNode(t) + n, emMock := startValidatorNode(t) + defer n.Stop() n.SendBlockLatenciesTo(blockLatencyCh) emMock.Call("latencyCallback", 1000000.0) @@ -52,7 +61,8 @@ func TestNodeConnectionLost(t *testing.T) { assert := assert.New(t) disconnectCh := make(chan bool, 100) - n, emMock := setupNode(t) + n, emMock := startValidatorNode(t) + defer n.Stop() n.NotifyAboutDisconnects(disconnectCh) emMock.Call("disconnectCallback") @@ -64,9 +74,28 @@ func TestNodeConnectionLost(t *testing.T) { assert.Equal(true, n.Online) } -func setupNode(t *testing.T) (n *monitor.Node, emMock *mock.EventMeter) { +func TestNumValidators(t *testing.T) { + assert := assert.New(t) + + n, _ := startValidatorNode(t) + defer n.Stop() + + height, num, err := n.NumValidators() + assert.Nil(err) + assert.Equal(uint64(blockHeight), height) + assert.Equal(1, num) +} + +func startValidatorNode(t *testing.T) (n *monitor.Node, emMock *mock.EventMeter) { emMock = &mock.EventMeter{} - n = monitor.NewNodeWithEventMeter("tcp://127.0.0.1:46657", emMock) + + stubs := make(map[string]ctypes.TMResult) + pubKey := crypto.GenPrivKeyEd25519().PubKey() + stubs["validators"] = &ctypes.ResultValidators{BlockHeight: blockHeight, Validators: []*tmtypes.Validator{tmtypes.NewValidator(pubKey, 0)}} + stubs["status"] = &ctypes.ResultStatus{PubKey: pubKey} + rpcClientMock := &mock.RpcClient{stubs} + + n = monitor.NewNodeWithEventMeterAndRpcClient("tcp://127.0.0.1:46657", emMock, rpcClientMock) err := n.Start() require.Nil(t, err) diff --git a/tm-monitor/ton.go b/tm-monitor/ton.go index 5418d593e..5ffaf6e94 100644 --- a/tm-monitor/ton.go +++ b/tm-monitor/ton.go @@ -59,7 +59,7 @@ func (o *Ton) Stop() { func (o *Ton) printHeader() { n := o.monitor.Network - fmt.Fprintf(o.Output, "%v up %.2f%%\n", n.UptimeData.StartTime, n.UptimeData.Uptime) + fmt.Fprintf(o.Output, "%v up %.2f%%\n", n.StartTime(), n.Uptime()) fmt.Println() fmt.Fprintf(o.Output, "Height: %d\n", n.Height) fmt.Fprintf(o.Output, "Avg block time: %.3f ms\n", n.AvgBlockTime)