From 8c6ae55bd0f2b34d07997028d78991da9556e977 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 2 Mar 2017 12:53:05 +0400 Subject: [PATCH] set network's `NumValidators` and node's `IsValidator` --- tm-monitor/glide.lock | 16 ++++----- tm-monitor/glide.yaml | 1 + tm-monitor/monitor.go | 48 ++++++++++++++++++++++++++ tm-monitor/network.go | 7 +--- tm-monitor/node.go | 80 ++++++++++++++++++++++++++++++++++++++----- tm-monitor/ton.go | 6 ++-- 6 files changed, 133 insertions(+), 25 deletions(-) diff --git a/tm-monitor/glide.lock b/tm-monitor/glide.lock index cd880823f..3797f60dd 100644 --- a/tm-monitor/glide.lock +++ b/tm-monitor/glide.lock @@ -1,5 +1,5 @@ -hash: 8d69350ae306418c61ce3e1ffdc40ddecef9591eafd645373f94a83aea9aadb1 -updated: 2017-02-28T14:36:53.495322474Z +hash: 9b2cdc3f80ccebf7154884c8466b1f2c0cb3e8b7f583baa732d49d671e83fe33 +updated: 2017-03-02T08:52:42.063901822Z imports: - name: github.com/btcsuite/btcd version: 583684b21bfbde9b5fc4403916fd7c807feb0289 @@ -16,7 +16,7 @@ imports: - name: github.com/golang/snappy version: 553a641470496b2327abcac10b36396bd98e45c9 - name: github.com/gorilla/websocket - version: 3f3e394da2b801fbe732a935ef40724762a67a07 + version: 4873052237e4eeda85cf50c071ef33836fe8e139 - name: github.com/jmhodges/levigo version: c42d9e0ca023e2198120196f842701bb4c55d7b9 - name: github.com/mattn/go-colorable @@ -26,12 +26,12 @@ imports: - name: github.com/rcrowley/go-metrics version: 1f30fe9094a513ce4c700b9a54458bbb0c96996c - name: github.com/stretchr/testify - version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0 + version: 4d4bfba8f1d1027c4fdbe371823030df51419987 subpackages: - assert - require - name: github.com/syndtr/goleveldb - version: 23851d93a2292dcc56e71a18ec9e0624d84a0f65 + version: 3c5717caf1475fd25964109a0fc640bd150fce43 subpackages: - leveldb - leveldb/cache @@ -117,11 +117,11 @@ imports: - lex/httplex - trace - name: golang.org/x/sys - version: e4594059fe4cde2daf423055a596c2cd1e6c9adf + version: 76cc09b634294339fa19ec41b5f2a0b3932cea8b subpackages: - unix - name: google.golang.org/grpc - version: d122f1dfe6ecece11d0c914ce4b2d9cab6d4b4ff + version: 8b2e129857480cb0f07ef7d9d10b8b252c7ac984 subpackages: - codes - credentials @@ -135,7 +135,7 @@ imports: - transport testImports: - name: github.com/davecgh/go-spew - version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9 + version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9 subpackages: - spew - name: github.com/pmezard/go-difflib diff --git a/tm-monitor/glide.yaml b/tm-monitor/glide.yaml index 9a4c964d6..84e137afb 100644 --- a/tm-monitor/glide.yaml +++ b/tm-monitor/glide.yaml @@ -11,3 +11,4 @@ import: - package: github.com/tendermint/go-wire - package: github.com/rcrowley/go-metrics - package: github.com/stretchr/testify +- package: github.com/tendermint/go-crypto diff --git a/tm-monitor/monitor.go b/tm-monitor/monitor.go index e61d3a8e6..69858888b 100644 --- a/tm-monitor/monitor.go +++ b/tm-monitor/monitor.go @@ -1,6 +1,7 @@ package main import ( + "math/rand" "time" tmtypes "github.com/tendermint/tendermint/types" @@ -40,12 +41,17 @@ func (m *Monitor) Monitor(n *Node) error { return err } + m.Network.NumValidatorsOnline++ + m.nodeQuit[n.Name] = make(chan struct{}) go m.listen(n.Name, blockCh, blockLatencyCh, disconnectCh, m.nodeQuit[n.Name]) + return nil } func (m *Monitor) Unmonitor(n *Node) { + m.Network.NumValidatorsOnline-- + n.Stop() close(m.nodeQuit[n.Name]) delete(m.nodeQuit, n.Name) @@ -54,6 +60,7 @@ func (m *Monitor) Unmonitor(n *Node) { func (m *Monitor) Start() error { go m.recalculateNetworkUptime() + go m.updateNumValidators() return nil } @@ -99,3 +106,44 @@ func (m *Monitor) recalculateNetworkUptime() { } } } + +// updateNumValidators sends a request to a random node once every N seconds, +// which in turn makes an RPC call to get the latest validators. +func (m *Monitor) updateNumValidators() { + rand.Seed(time.Now().Unix()) + + var height uint64 + var num int + var err error + + for { + if 0 == len(m.Nodes) { + m.Network.NumValidators = 0 + time.Sleep(5 * time.Second) + continue + } + + randomNodeIndex := rand.Intn(len(m.Nodes)) + + select { + case <-m.monitorQuit: + return + case <-time.After(5 * time.Second): + i := 0 + for _, n := range m.Nodes { + if i == randomNodeIndex { + height, num, err = n.NumValidators() + if err != nil { + log.Debug(err.Error()) + } + break + } + i++ + } + + if m.Network.Height <= height { + m.Network.NumValidators = num + } + } + } +} diff --git a/tm-monitor/network.go b/tm-monitor/network.go index 88e4f7d8e..cfbb2f03d 100644 --- a/tm-monitor/network.go +++ b/tm-monitor/network.go @@ -1,7 +1,6 @@ package main import ( - "fmt" "sync" "time" @@ -145,11 +144,7 @@ func (n *Network) NodeIsOnline(name string) { } func (n *Network) updateHealth() { - if n.NumValidatorsOnline > n.NumValidators { - panic(fmt.Sprintf("got %d validators. max %ds", n.NumValidatorsOnline, n.NumValidators)) - } - - if n.NumValidatorsOnline != n.NumValidators { + if n.NumValidatorsOnline < n.NumValidators { n.Health = ModerateHealth } diff --git a/tm-monitor/node.go b/tm-monitor/node.go index b3037d720..66e5ae7a3 100644 --- a/tm-monitor/node.go +++ b/tm-monitor/node.go @@ -8,8 +8,10 @@ import ( em "github.com/tendermint/go-event-meter" events "github.com/tendermint/go-events" + rpc_client "github.com/tendermint/go-rpc/client" tmtypes "github.com/tendermint/tendermint/types" + crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" ctypes "github.com/tendermint/tendermint/rpc/core/types" ) @@ -19,10 +21,8 @@ const maxRestarts = 25 type Node struct { rpcAddr string - IsValidator bool `json:"is_validator"` // validator or non-validator? - - // "github.com/tendermint/go-crypto" - // PubKey crypto.PubKey `json:"pub_key"` + IsValidator bool `json:"is_validator"` // validator or non-validator? + pubKey crypto.PubKey `json:"pub_key"` Name string `json:"name"` Online bool `json:"online"` @@ -32,9 +32,14 @@ type Node struct { // em holds the ws connection. Each eventMeter callback is called in a separate go-routine. em eventMeter + // rpcClient is an http client for making RPC calls to TM + rpcClient *rpc_client.ClientURI + blockCh chan<- tmtypes.Header blockLatencyCh chan<- float64 disconnectCh chan<- bool + + quit chan struct{} } func NewNode(rpcAddr string) *Node { @@ -44,9 +49,11 @@ func NewNode(rpcAddr string) *Node { func NewNodeWithEventMeter(rpcAddr string, em eventMeter) *Node { return &Node{ - rpcAddr: rpcAddr, - em: em, - Name: rpcAddr, + rpcAddr: rpcAddr, + em: em, + rpcClient: rpc_client.NewClientURI(rpcAddr), + Name: rpcAddr, + quit: make(chan struct{}), } } @@ -73,6 +80,8 @@ func (n *Node) Start() error { n.Online = true + go n.checkIsValidator() + return nil } @@ -85,6 +94,8 @@ func (n *Node) Stop() { // FIXME stop blocks at event_meter.go:140 // n.em.Stop() + + close(n.quit) } // implements eventmeter.EventCallbackFunc @@ -151,6 +162,59 @@ func (n *Node) RestartBackOff() error { } } +func (n *Node) NumValidators() (height uint64, num int, err error) { + height, vals, err := n.validators() + if err != nil { + return 0, 0, err + } + return height, len(vals), nil +} + +func (n *Node) validators() (height uint64, validators []*tmtypes.Validator, err error) { + var result ctypes.TMResult + if _, err = n.rpcClient.Call("validators", nil, &result); err != nil { + return 0, make([]*tmtypes.Validator, 0), err + } + vals := result.(*ctypes.ResultValidators) + return uint64(vals.BlockHeight), vals.Validators, nil +} + +func (n *Node) checkIsValidator() { + for { + select { + case <-n.quit: + return + case <-time.After(5 * time.Second): + _, validators, err := n.validators() + if err == nil { + for _, v := range validators { + key, err := n.getPubKey() + if err == nil && v.PubKey == key { + n.IsValidator = true + } + } + } else { + log.Debug(err.Error()) + } + } + } +} + +func (n *Node) getPubKey() (crypto.PubKey, error) { + if n.pubKey != nil { + return n.pubKey, nil + } + + var result ctypes.TMResult + _, err := n.rpcClient.Call("status", nil, &result) + if err != nil { + return nil, err + } + status := result.(*ctypes.ResultStatus) + n.pubKey = status.PubKey + return n.pubKey, nil +} + type eventMeter interface { Start() (bool, error) Stop() bool @@ -160,7 +224,7 @@ type eventMeter interface { Unsubscribe(string) error } -// Unmarshal a json event +// UnmarshalEvent unmarshals a json event func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) { var err error result := new(ctypes.TMResult) diff --git a/tm-monitor/ton.go b/tm-monitor/ton.go index 57296b9ae..a9189e644 100644 --- a/tm-monitor/ton.go +++ b/tm-monitor/ton.go @@ -70,10 +70,10 @@ func (o *Ton) printHeader() { } func (o *Ton) printTable() { - w := tabwriter.NewWriter(o.Output, 0, 0, 4, ' ', 0) - fmt.Fprintln(w, "NAME\tHEIGHT\tBLOCK LATENCY\tONLINE\t") + w := tabwriter.NewWriter(o.Output, 0, 0, 5, ' ', 0) + fmt.Fprintln(w, "NAME\tHEIGHT\tBLOCK LATENCY\tONLINE\tVALIDATOR\t") for _, n := range o.monitor.Nodes { - fmt.Fprintln(w, fmt.Sprintf("%s\t%d\t%.3f ms\t%v\t", n.Name, n.Height, n.BlockLatency, n.Online)) + fmt.Fprintln(w, fmt.Sprintf("%s\t%d\t%.3f ms\t%v\t%v\t", n.Name, n.Height, n.BlockLatency, n.Online, n.IsValidator)) } w.Flush() }