Browse Source

set network's `NumValidators` and node's `IsValidator`

pull/1943/head
Anton Kaliaev 8 years ago
parent
commit
8c6ae55bd0
No known key found for this signature in database GPG Key ID: 7B6881D965918214
6 changed files with 133 additions and 25 deletions
  1. +8
    -8
      tm-monitor/glide.lock
  2. +1
    -0
      tm-monitor/glide.yaml
  3. +48
    -0
      tm-monitor/monitor.go
  4. +1
    -6
      tm-monitor/network.go
  5. +72
    -8
      tm-monitor/node.go
  6. +3
    -3
      tm-monitor/ton.go

+ 8
- 8
tm-monitor/glide.lock View File

@ -1,5 +1,5 @@
hash: 8d69350ae306418c61ce3e1ffdc40ddecef9591eafd645373f94a83aea9aadb1
updated: 2017-02-28T14:36:53.495322474Z
hash: 9b2cdc3f80ccebf7154884c8466b1f2c0cb3e8b7f583baa732d49d671e83fe33
updated: 2017-03-02T08:52:42.063901822Z
imports: imports:
- name: github.com/btcsuite/btcd - name: github.com/btcsuite/btcd
version: 583684b21bfbde9b5fc4403916fd7c807feb0289 version: 583684b21bfbde9b5fc4403916fd7c807feb0289
@ -16,7 +16,7 @@ imports:
- name: github.com/golang/snappy - name: github.com/golang/snappy
version: 553a641470496b2327abcac10b36396bd98e45c9 version: 553a641470496b2327abcac10b36396bd98e45c9
- name: github.com/gorilla/websocket - name: github.com/gorilla/websocket
version: 3f3e394da2b801fbe732a935ef40724762a67a07
version: 4873052237e4eeda85cf50c071ef33836fe8e139
- name: github.com/jmhodges/levigo - name: github.com/jmhodges/levigo
version: c42d9e0ca023e2198120196f842701bb4c55d7b9 version: c42d9e0ca023e2198120196f842701bb4c55d7b9
- name: github.com/mattn/go-colorable - name: github.com/mattn/go-colorable
@ -26,12 +26,12 @@ imports:
- name: github.com/rcrowley/go-metrics - name: github.com/rcrowley/go-metrics
version: 1f30fe9094a513ce4c700b9a54458bbb0c96996c version: 1f30fe9094a513ce4c700b9a54458bbb0c96996c
- name: github.com/stretchr/testify - name: github.com/stretchr/testify
version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0
version: 4d4bfba8f1d1027c4fdbe371823030df51419987
subpackages: subpackages:
- assert - assert
- require - require
- name: github.com/syndtr/goleveldb - name: github.com/syndtr/goleveldb
version: 23851d93a2292dcc56e71a18ec9e0624d84a0f65
version: 3c5717caf1475fd25964109a0fc640bd150fce43
subpackages: subpackages:
- leveldb - leveldb
- leveldb/cache - leveldb/cache
@ -117,11 +117,11 @@ imports:
- lex/httplex - lex/httplex
- trace - trace
- name: golang.org/x/sys - name: golang.org/x/sys
version: e4594059fe4cde2daf423055a596c2cd1e6c9adf
version: 76cc09b634294339fa19ec41b5f2a0b3932cea8b
subpackages: subpackages:
- unix - unix
- name: google.golang.org/grpc - name: google.golang.org/grpc
version: d122f1dfe6ecece11d0c914ce4b2d9cab6d4b4ff
version: 8b2e129857480cb0f07ef7d9d10b8b252c7ac984
subpackages: subpackages:
- codes - codes
- credentials - credentials
@ -135,7 +135,7 @@ imports:
- transport - transport
testImports: testImports:
- name: github.com/davecgh/go-spew - name: github.com/davecgh/go-spew
version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9
version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9
subpackages: subpackages:
- spew - spew
- name: github.com/pmezard/go-difflib - name: github.com/pmezard/go-difflib


+ 1
- 0
tm-monitor/glide.yaml View File

@ -11,3 +11,4 @@ import:
- package: github.com/tendermint/go-wire - package: github.com/tendermint/go-wire
- package: github.com/rcrowley/go-metrics - package: github.com/rcrowley/go-metrics
- package: github.com/stretchr/testify - package: github.com/stretchr/testify
- package: github.com/tendermint/go-crypto

+ 48
- 0
tm-monitor/monitor.go View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"math/rand"
"time" "time"
tmtypes "github.com/tendermint/tendermint/types" tmtypes "github.com/tendermint/tendermint/types"
@ -40,12 +41,17 @@ func (m *Monitor) Monitor(n *Node) error {
return err return err
} }
m.Network.NumValidatorsOnline++
m.nodeQuit[n.Name] = make(chan struct{}) m.nodeQuit[n.Name] = make(chan struct{})
go m.listen(n.Name, blockCh, blockLatencyCh, disconnectCh, m.nodeQuit[n.Name]) go m.listen(n.Name, blockCh, blockLatencyCh, disconnectCh, m.nodeQuit[n.Name])
return nil return nil
} }
func (m *Monitor) Unmonitor(n *Node) { func (m *Monitor) Unmonitor(n *Node) {
m.Network.NumValidatorsOnline--
n.Stop() n.Stop()
close(m.nodeQuit[n.Name]) close(m.nodeQuit[n.Name])
delete(m.nodeQuit, n.Name) delete(m.nodeQuit, n.Name)
@ -54,6 +60,7 @@ func (m *Monitor) Unmonitor(n *Node) {
func (m *Monitor) Start() error { func (m *Monitor) Start() error {
go m.recalculateNetworkUptime() go m.recalculateNetworkUptime()
go m.updateNumValidators()
return nil return nil
} }
@ -99,3 +106,44 @@ func (m *Monitor) recalculateNetworkUptime() {
} }
} }
} }
// updateNumValidators sends a request to a random node once every N seconds,
// which in turn makes an RPC call to get the latest validators.
func (m *Monitor) updateNumValidators() {
rand.Seed(time.Now().Unix())
var height uint64
var num int
var err error
for {
if 0 == len(m.Nodes) {
m.Network.NumValidators = 0
time.Sleep(5 * time.Second)
continue
}
randomNodeIndex := rand.Intn(len(m.Nodes))
select {
case <-m.monitorQuit:
return
case <-time.After(5 * time.Second):
i := 0
for _, n := range m.Nodes {
if i == randomNodeIndex {
height, num, err = n.NumValidators()
if err != nil {
log.Debug(err.Error())
}
break
}
i++
}
if m.Network.Height <= height {
m.Network.NumValidators = num
}
}
}
}

+ 1
- 6
tm-monitor/network.go View File

@ -1,7 +1,6 @@
package main package main
import ( import (
"fmt"
"sync" "sync"
"time" "time"
@ -145,11 +144,7 @@ func (n *Network) NodeIsOnline(name string) {
} }
func (n *Network) updateHealth() { func (n *Network) updateHealth() {
if n.NumValidatorsOnline > n.NumValidators {
panic(fmt.Sprintf("got %d validators. max %ds", n.NumValidatorsOnline, n.NumValidators))
}
if n.NumValidatorsOnline != n.NumValidators {
if n.NumValidatorsOnline < n.NumValidators {
n.Health = ModerateHealth n.Health = ModerateHealth
} }


+ 72
- 8
tm-monitor/node.go View File

@ -8,8 +8,10 @@ import (
em "github.com/tendermint/go-event-meter" em "github.com/tendermint/go-event-meter"
events "github.com/tendermint/go-events" events "github.com/tendermint/go-events"
rpc_client "github.com/tendermint/go-rpc/client"
tmtypes "github.com/tendermint/tendermint/types" tmtypes "github.com/tendermint/tendermint/types"
crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
) )
@ -19,10 +21,8 @@ const maxRestarts = 25
type Node struct { type Node struct {
rpcAddr string rpcAddr string
IsValidator bool `json:"is_validator"` // validator or non-validator?
// "github.com/tendermint/go-crypto"
// PubKey crypto.PubKey `json:"pub_key"`
IsValidator bool `json:"is_validator"` // validator or non-validator?
pubKey crypto.PubKey `json:"pub_key"`
Name string `json:"name"` Name string `json:"name"`
Online bool `json:"online"` Online bool `json:"online"`
@ -32,9 +32,14 @@ type Node struct {
// em holds the ws connection. Each eventMeter callback is called in a separate go-routine. // em holds the ws connection. Each eventMeter callback is called in a separate go-routine.
em eventMeter em eventMeter
// rpcClient is an http client for making RPC calls to TM
rpcClient *rpc_client.ClientURI
blockCh chan<- tmtypes.Header blockCh chan<- tmtypes.Header
blockLatencyCh chan<- float64 blockLatencyCh chan<- float64
disconnectCh chan<- bool disconnectCh chan<- bool
quit chan struct{}
} }
func NewNode(rpcAddr string) *Node { func NewNode(rpcAddr string) *Node {
@ -44,9 +49,11 @@ func NewNode(rpcAddr string) *Node {
func NewNodeWithEventMeter(rpcAddr string, em eventMeter) *Node { func NewNodeWithEventMeter(rpcAddr string, em eventMeter) *Node {
return &Node{ return &Node{
rpcAddr: rpcAddr,
em: em,
Name: rpcAddr,
rpcAddr: rpcAddr,
em: em,
rpcClient: rpc_client.NewClientURI(rpcAddr),
Name: rpcAddr,
quit: make(chan struct{}),
} }
} }
@ -73,6 +80,8 @@ func (n *Node) Start() error {
n.Online = true n.Online = true
go n.checkIsValidator()
return nil return nil
} }
@ -85,6 +94,8 @@ func (n *Node) Stop() {
// FIXME stop blocks at event_meter.go:140 // FIXME stop blocks at event_meter.go:140
// n.em.Stop() // n.em.Stop()
close(n.quit)
} }
// implements eventmeter.EventCallbackFunc // implements eventmeter.EventCallbackFunc
@ -151,6 +162,59 @@ func (n *Node) RestartBackOff() error {
} }
} }
func (n *Node) NumValidators() (height uint64, num int, err error) {
height, vals, err := n.validators()
if err != nil {
return 0, 0, err
}
return height, len(vals), nil
}
func (n *Node) validators() (height uint64, validators []*tmtypes.Validator, err error) {
var result ctypes.TMResult
if _, err = n.rpcClient.Call("validators", nil, &result); err != nil {
return 0, make([]*tmtypes.Validator, 0), err
}
vals := result.(*ctypes.ResultValidators)
return uint64(vals.BlockHeight), vals.Validators, nil
}
func (n *Node) checkIsValidator() {
for {
select {
case <-n.quit:
return
case <-time.After(5 * time.Second):
_, validators, err := n.validators()
if err == nil {
for _, v := range validators {
key, err := n.getPubKey()
if err == nil && v.PubKey == key {
n.IsValidator = true
}
}
} else {
log.Debug(err.Error())
}
}
}
}
func (n *Node) getPubKey() (crypto.PubKey, error) {
if n.pubKey != nil {
return n.pubKey, nil
}
var result ctypes.TMResult
_, err := n.rpcClient.Call("status", nil, &result)
if err != nil {
return nil, err
}
status := result.(*ctypes.ResultStatus)
n.pubKey = status.PubKey
return n.pubKey, nil
}
type eventMeter interface { type eventMeter interface {
Start() (bool, error) Start() (bool, error)
Stop() bool Stop() bool
@ -160,7 +224,7 @@ type eventMeter interface {
Unsubscribe(string) error Unsubscribe(string) error
} }
// Unmarshal a json event
// UnmarshalEvent unmarshals a json event
func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) { func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) {
var err error var err error
result := new(ctypes.TMResult) result := new(ctypes.TMResult)


+ 3
- 3
tm-monitor/ton.go View File

@ -70,10 +70,10 @@ func (o *Ton) printHeader() {
} }
func (o *Ton) printTable() { func (o *Ton) printTable() {
w := tabwriter.NewWriter(o.Output, 0, 0, 4, ' ', 0)
fmt.Fprintln(w, "NAME\tHEIGHT\tBLOCK LATENCY\tONLINE\t")
w := tabwriter.NewWriter(o.Output, 0, 0, 5, ' ', 0)
fmt.Fprintln(w, "NAME\tHEIGHT\tBLOCK LATENCY\tONLINE\tVALIDATOR\t")
for _, n := range o.monitor.Nodes { for _, n := range o.monitor.Nodes {
fmt.Fprintln(w, fmt.Sprintf("%s\t%d\t%.3f ms\t%v\t", n.Name, n.Height, n.BlockLatency, n.Online))
fmt.Fprintln(w, fmt.Sprintf("%s\t%d\t%.3f ms\t%v\t%v\t", n.Name, n.Height, n.BlockLatency, n.Online, n.IsValidator))
} }
w.Flush() w.Flush()
} }


Loading…
Cancel
Save