Browse Source

[tm-monitor] update to tendermint 0.10.0

pull/1943/head
Ethan Buchman 8 years ago
parent
commit
309812389a
6 changed files with 72 additions and 113 deletions
  1. +8
    -8
      tm-monitor/eventmeter/eventmeter.go
  2. +29
    -45
      tm-monitor/glide.lock
  3. +5
    -16
      tm-monitor/main.go
  4. +4
    -4
      tm-monitor/monitor/monitor.go
  5. +21
    -36
      tm-monitor/monitor/node.go
  6. +5
    -4
      tm-monitor/rpc.go

+ 8
- 8
tm-monitor/eventmeter/eventmeter.go View File

@ -6,12 +6,12 @@ import (
"sync" "sync"
"time" "time"
"github.com/go-kit/kit/log"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/pkg/errors" "github.com/pkg/errors"
metrics "github.com/rcrowley/go-metrics" metrics "github.com/rcrowley/go-metrics"
events "github.com/tendermint/go-events"
client "github.com/tendermint/go-rpc/client"
client "github.com/tendermint/tendermint/rpc/lib/client"
"github.com/tendermint/tmlibs/events"
"github.com/tendermint/tmlibs/log"
) )
//------------------------------------------------------ //------------------------------------------------------
@ -253,30 +253,30 @@ func (em *EventMeter) receiveRoutine() {
select { select {
case <-pingTicker.C: case <-pingTicker.C:
if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil { if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil {
em.logger.Log("err", errors.Wrap(err, "failed to write ping message on websocket"))
em.logger.Error("err", errors.Wrap(err, "failed to write ping message on websocket"))
em.StopAndCallDisconnectCallback() em.StopAndCallDisconnectCallback()
return return
} else if pingAttempts >= maxPingsPerPong { } else if pingAttempts >= maxPingsPerPong {
em.logger.Log("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime))
em.logger.Error("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime))
em.StopAndCallDisconnectCallback() em.StopAndCallDisconnectCallback()
return return
} }
case r := <-em.wsc.ResultsCh: case r := <-em.wsc.ResultsCh:
if r == nil { if r == nil {
em.logger.Log("err", errors.New("Expected some event, received nil"))
em.logger.Error("err", errors.New("Expected some event, received nil"))
em.StopAndCallDisconnectCallback() em.StopAndCallDisconnectCallback()
return return
} }
eventID, data, err := em.unmarshalEvent(r) eventID, data, err := em.unmarshalEvent(r)
if err != nil { if err != nil {
em.logger.Log("err", errors.Wrap(err, "failed to unmarshal event"))
em.logger.Error("err", errors.Wrap(err, "failed to unmarshal event"))
continue continue
} }
if eventID != "" { if eventID != "" {
em.updateMetric(eventID, data) em.updateMetric(eventID, data)
} }
case <-em.wsc.Quit: case <-em.wsc.Quit:
em.logger.Log("err", errors.New("WSClient closed unexpectedly"))
em.logger.Error("err", errors.New("WSClient closed unexpectedly"))
em.StopAndCallDisconnectCallback() em.StopAndCallDisconnectCallback()
return return
case <-em.quit: case <-em.quit:


+ 29
- 45
tm-monitor/glide.lock View File

@ -1,16 +1,15 @@
hash: 7cd23c2bc6306917cf22ce16d4a1fc681a2572bc06d2cece54a37152bba7030d hash: 7cd23c2bc6306917cf22ce16d4a1fc681a2572bc06d2cece54a37152bba7030d
updated: 2017-04-20T19:07:14.174540927-04:00
updated: 2017-05-20T17:16:08.287741796-04:00
imports: imports:
- name: github.com/btcsuite/btcd - name: github.com/btcsuite/btcd
version: 583684b21bfbde9b5fc4403916fd7c807feb0289 version: 583684b21bfbde9b5fc4403916fd7c807feb0289
subpackages: subpackages:
- btcec - btcec
- name: github.com/BurntSushi/toml
version: 99064174e013895bbd9b025c31100bd1d9b590ca
- name: github.com/go-kit/kit - name: github.com/go-kit/kit
version: b6f30a2e0632f5722fb26d8765d726335b79d3e6 version: b6f30a2e0632f5722fb26d8765d726335b79d3e6
subpackages: subpackages:
- log - log
- log/level
- log/term - log/term
- term - term
- name: github.com/go-logfmt/logfmt - name: github.com/go-logfmt/logfmt
@ -21,12 +20,8 @@ imports:
version: 69b215d01a5606c843240eab4937eab3acee6530 version: 69b215d01a5606c843240eab4937eab3acee6530
subpackages: subpackages:
- proto - proto
- name: github.com/golang/snappy
version: 553a641470496b2327abcac10b36396bd98e45c9
- name: github.com/gorilla/websocket - name: github.com/gorilla/websocket
version: 3ab3a8b8831546bd18fd182c20687ca853b2bb13 version: 3ab3a8b8831546bd18fd182c20687ca853b2bb13
- name: github.com/jmhodges/levigo
version: c42d9e0ca023e2198120196f842701bb4c55d7b9
- name: github.com/kr/logfmt - name: github.com/kr/logfmt
version: b84e30acd515aadc4b783ad4ff83aff3299bdfe0 version: b84e30acd515aadc4b783ad4ff83aff3299bdfe0
- name: github.com/mattn/go-colorable - name: github.com/mattn/go-colorable
@ -42,24 +37,11 @@ imports:
subpackages: subpackages:
- assert - assert
- require - require
- name: github.com/syndtr/goleveldb
version: 3c5717caf1475fd25964109a0fc640bd150fce43
subpackages:
- leveldb
- leveldb/cache
- leveldb/comparer
- leveldb/errors
- leveldb/filter
- leveldb/iterator
- leveldb/journal
- leveldb/memdb
- leveldb/opt
- leveldb/storage
- leveldb/table
- leveldb/util
- name: github.com/tendermint/abci - name: github.com/tendermint/abci
version: 56e13d87f4e3ec1ea756957d6b23caa6ebcf0998
version: 864d1f80b36b440bde030a5c18d8ac3aa8c2949d
subpackages: subpackages:
- client
- example/dummy
- types - types
- name: github.com/tendermint/ed25519 - name: github.com/tendermint/ed25519
version: 1f52c6f8b8a5c7908aff4497c186af344b428925 version: 1f52c6f8b8a5c7908aff4497c186af344b428925
@ -68,45 +50,47 @@ imports:
- extra25519 - extra25519
- name: github.com/tendermint/go-common - name: github.com/tendermint/go-common
version: e289af53b6bf6af28da129d9ef64389a4cf7987f version: e289af53b6bf6af28da129d9ef64389a4cf7987f
- name: github.com/tendermint/go-config
version: 620dcbbd7d587cf3599dedbf329b64311b0c307a
- name: github.com/tendermint/go-crypto - name: github.com/tendermint/go-crypto
version: 0ca2c6fdb0706001ca4c4b9b80c9f428e8cf39da
- name: github.com/tendermint/go-data
version: e7fcc6d081ec8518912fcdc103188275f83a3ee5
- name: github.com/tendermint/go-db
version: 9643f60bc2578693844aacf380a7c32e4c029fee
version: 7dff40942a64cdeefefa9446b2d104750b349f8a
- name: github.com/tendermint/go-events - name: github.com/tendermint/go-events
version: fddee66d90305fccb6f6d84d16c34fa65ea5b7f6 version: fddee66d90305fccb6f6d84d16c34fa65ea5b7f6
- name: github.com/tendermint/go-flowrate
version: a20c98e61957faa93b4014fbd902f20ab9317a6a
subpackages:
- flowrate
- name: github.com/tendermint/go-logger - name: github.com/tendermint/go-logger
version: cefb3a45c0bf3c493a04e9bcd9b1540528be59f2 version: cefb3a45c0bf3c493a04e9bcd9b1540528be59f2
- name: github.com/tendermint/go-merkle
version: 714d4d04557fd068a7c2a1748241ce8428015a96
- name: github.com/tendermint/go-p2p
version: 17124989a93774833df33107fbf17157a7f8ef31
subpackages:
- upnp
- name: github.com/tendermint/go-rpc - name: github.com/tendermint/go-rpc
version: 1a42f946dc6bcd88f9f58c7f2fb86f785584d793
version: 15d5b2ac497da95cd2dceb9c087910ccec4dacb2
subpackages: subpackages:
- client - client
- server
- types
- name: github.com/tendermint/go-wire - name: github.com/tendermint/go-wire
version: 2f3b7aafe21c80b19b6ee3210ecb3e3d07c7a471
version: 5f88da3dbc1a72844e6dfaf274ce87f851d488eb
subpackages:
- data
- name: github.com/tendermint/log15 - name: github.com/tendermint/log15
version: ae0f3d6450da9eac7074b439c8e1c3cabf0d5ce6 version: ae0f3d6450da9eac7074b439c8e1c3cabf0d5ce6
subpackages: subpackages:
- term - term
- name: github.com/tendermint/tendermint - name: github.com/tendermint/tendermint
version: 083fe959e25421fca3d41298d9111167a3b47122
version: 267f134d44e76efb2adef5f0c993da8a5d5bd1b8
subpackages: subpackages:
- config
- p2p
- p2p/upnp
- rpc/core/types - rpc/core/types
- rpc/lib/client
- rpc/lib/server
- rpc/lib/types
- types - types
- name: github.com/tendermint/tmlibs
version: 306795ae1d8e4f4a10dcc8bdb32a00455843c9d5
subpackages:
- autofile
- cli
- clist
- common
- db
- events
- flowrate
- log
- merkle
- name: golang.org/x/crypto - name: golang.org/x/crypto
version: 453249f01cfeb54c3d549ddb75ff152ca243f9d8 version: 453249f01cfeb54c3d549ddb75ff152ca243f9d8
subpackages: subpackages:


+ 5
- 16
tm-monitor/main.go View File

@ -6,9 +6,8 @@ import (
"os" "os"
"strings" "strings"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/term"
cmn "github.com/tendermint/go-common" cmn "github.com/tendermint/go-common"
"github.com/tendermint/tmlibs/log"
monitor "github.com/tendermint/tools/tm-monitor/monitor" monitor "github.com/tendermint/tools/tm-monitor/monitor"
) )
@ -48,22 +47,12 @@ Examples:
} }
if noton { if noton {
// Color errors red
colorFn := func(keyvals ...interface{}) term.FgBgColor {
for i := 1; i < len(keyvals); i += 2 {
if _, ok := keyvals[i].(error); ok {
return term.FgBgColor{Fg: term.White, Bg: term.Red}
}
}
return term.FgBgColor{}
}
logger = term.NewLogger(os.Stdout, log.NewLogfmtLogger, colorFn)
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "tm-monitor")
} }
m := startMonitor(flag.Arg(0)) m := startMonitor(flag.Arg(0))
startRPC(listenAddr, m)
startRPC(listenAddr, m, logger)
var ton *Ton var ton *Ton
if !noton { if !noton {
@ -81,11 +70,11 @@ Examples:
func startMonitor(endpoints string) *monitor.Monitor { func startMonitor(endpoints string) *monitor.Monitor {
m := monitor.NewMonitor() m := monitor.NewMonitor()
m.SetLogger(log.With(logger, "component", "monitor"))
m.SetLogger(logger.With("component", "monitor"))
for _, e := range strings.Split(endpoints, ",") { for _, e := range strings.Split(endpoints, ",") {
n := monitor.NewNode(e) n := monitor.NewNode(e)
n.SetLogger(log.With(logger, "node", e))
n.SetLogger(logger.With("node", e))
if err := m.Monitor(n); err != nil { if err := m.Monitor(n); err != nil {
panic(err) panic(err)
} }


+ 4
- 4
tm-monitor/monitor/monitor.go View File

@ -5,9 +5,9 @@ import (
"math/rand" "math/rand"
"time" "time"
"github.com/go-kit/kit/log"
"github.com/pkg/errors" "github.com/pkg/errors"
tmtypes "github.com/tendermint/tendermint/types" tmtypes "github.com/tendermint/tendermint/types"
"github.com/tendermint/tmlibs/log"
) )
// waiting more than this many seconds for a block means we're unhealthy // waiting more than this many seconds for a block means we're unhealthy
@ -140,7 +140,7 @@ func (m *Monitor) Stop() {
// main loop where we listen for events from the node // main loop where we listen for events from the node
func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLatencyCh <-chan float64, disconnectCh <-chan bool, quit <-chan struct{}) { func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLatencyCh <-chan float64, disconnectCh <-chan bool, quit <-chan struct{}) {
logger := log.With(m.logger, "node", nodeName)
logger := m.logger.With("node", nodeName)
for { for {
select { select {
@ -159,7 +159,7 @@ func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLa
m.Network.NodeIsOnline(nodeName) m.Network.NodeIsOnline(nodeName)
} }
case <-time.After(nodeLivenessTimeout): case <-time.After(nodeLivenessTimeout):
logger.Log("event", fmt.Sprintf("node was not responding for %v", nodeLivenessTimeout))
logger.Info("event", fmt.Sprintf("node was not responding for %v", nodeLivenessTimeout))
m.Network.NodeIsDown(nodeName) m.Network.NodeIsDown(nodeName)
} }
} }
@ -203,7 +203,7 @@ func (m *Monitor) updateNumValidatorLoop() {
if i == randomNodeIndex { if i == randomNodeIndex {
height, num, err = n.NumValidators() height, num, err = n.NumValidators()
if err != nil { if err != nil {
m.logger.Log("err", errors.Wrap(err, "update num validators failed"))
m.logger.Info("err", errors.Wrap(err, "update num validators failed"))
} }
break break
} }


+ 21
- 36
tm-monitor/monitor/node.go View File

@ -5,22 +5,16 @@ import (
"math" "math"
"time" "time"
"github.com/go-kit/kit/log"
"github.com/pkg/errors" "github.com/pkg/errors"
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
events "github.com/tendermint/go-events"
rpc_client "github.com/tendermint/go-rpc/client"
wire "github.com/tendermint/go-wire"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpc_client "github.com/tendermint/tendermint/rpc/lib/client"
tmtypes "github.com/tendermint/tendermint/types" tmtypes "github.com/tendermint/tendermint/types"
"github.com/tendermint/tmlibs/events"
"github.com/tendermint/tmlibs/log"
em "github.com/tendermint/tools/tm-monitor/eventmeter" em "github.com/tendermint/tools/tm-monitor/eventmeter"
) )
// remove when https://github.com/tendermint/go-rpc/issues/8 will be fixed
type rpcClientI interface {
Call(method string, params map[string]interface{}, result interface{}) (interface{}, error)
}
const maxRestarts = 25 const maxRestarts = 25
type Node struct { type Node struct {
@ -38,7 +32,7 @@ type Node struct {
em eventMeter em eventMeter
// rpcClient is an client for making RPC calls to TM // rpcClient is an client for making RPC calls to TM
rpcClient rpcClientI
rpcClient rpc_client.HTTPClient
blockCh chan<- tmtypes.Header blockCh chan<- tmtypes.Header
blockLatencyCh chan<- float64 blockLatencyCh chan<- float64
@ -57,7 +51,7 @@ func NewNode(rpcAddr string, options ...func(*Node)) *Node {
return NewNodeWithEventMeterAndRpcClient(rpcAddr, em, rpcClient, options...) return NewNodeWithEventMeterAndRpcClient(rpcAddr, em, rpcClient, options...)
} }
func NewNodeWithEventMeterAndRpcClient(rpcAddr string, em eventMeter, rpcClient rpcClientI, options ...func(*Node)) *Node {
func NewNodeWithEventMeterAndRpcClient(rpcAddr string, em eventMeter, rpcClient rpc_client.HTTPClient, options ...func(*Node)) *Node {
n := &Node{ n := &Node{
rpcAddr: rpcAddr, rpcAddr: rpcAddr,
em: em, em: em,
@ -132,10 +126,10 @@ func (n *Node) Stop() {
// implements eventmeter.EventCallbackFunc // implements eventmeter.EventCallbackFunc
func newBlockCallback(n *Node) em.EventCallbackFunc { func newBlockCallback(n *Node) em.EventCallbackFunc {
return func(metric *em.EventMetric, data events.EventData) { return func(metric *em.EventMetric, data events.EventData) {
block := data.(tmtypes.EventDataNewBlockHeader).Header
block := data.(tmtypes.TMEventData).Unwrap().(tmtypes.EventDataNewBlockHeader).Header
n.Height = uint64(block.Height) n.Height = uint64(block.Height)
n.logger.Log("event", "new block", "height", block.Height, "numTxs", block.NumTxs)
n.logger.Info("event", "new block", "height", block.Height, "numTxs", block.NumTxs)
if n.blockCh != nil { if n.blockCh != nil {
n.blockCh <- *block n.blockCh <- *block
@ -147,7 +141,7 @@ func newBlockCallback(n *Node) em.EventCallbackFunc {
func latencyCallback(n *Node) em.LatencyCallbackFunc { func latencyCallback(n *Node) em.LatencyCallbackFunc {
return func(latency float64) { return func(latency float64) {
n.BlockLatency = latency / 1000000.0 // ns to ms n.BlockLatency = latency / 1000000.0 // ns to ms
n.logger.Log("event", "new block latency", "latency", n.BlockLatency)
n.logger.Info("event", "new block latency", "latency", n.BlockLatency)
if n.blockLatencyCh != nil { if n.blockLatencyCh != nil {
n.blockLatencyCh <- latency n.blockLatencyCh <- latency
@ -159,17 +153,17 @@ func latencyCallback(n *Node) em.LatencyCallbackFunc {
func disconnectCallback(n *Node) em.DisconnectCallbackFunc { func disconnectCallback(n *Node) em.DisconnectCallbackFunc {
return func() { return func() {
n.Online = false n.Online = false
n.logger.Log("status", "down")
n.logger.Info("status", "down")
if n.disconnectCh != nil { if n.disconnectCh != nil {
n.disconnectCh <- true n.disconnectCh <- true
} }
if err := n.RestartEventMeterBackoff(); err != nil { if err := n.RestartEventMeterBackoff(); err != nil {
n.logger.Log("err", errors.Wrap(err, "restart failed"))
n.logger.Info("err", errors.Wrap(err, "restart failed"))
} else { } else {
n.Online = true n.Online = true
n.logger.Log("status", "online")
n.logger.Info("status", "online")
if n.disconnectCh != nil { if n.disconnectCh != nil {
n.disconnectCh <- false n.disconnectCh <- false
@ -186,7 +180,7 @@ func (n *Node) RestartEventMeterBackoff() error {
time.Sleep(d * time.Second) time.Sleep(d * time.Second)
if err := n.em.Start(); err != nil { if err := n.em.Start(); err != nil {
n.logger.Log("err", errors.Wrap(err, "restart failed"))
n.logger.Info("err", errors.Wrap(err, "restart failed"))
} else { } else {
// TODO: authenticate pubkey // TODO: authenticate pubkey
return nil return nil
@ -209,11 +203,10 @@ func (n *Node) NumValidators() (height uint64, num int, err error) {
} }
func (n *Node) validators() (height uint64, validators []*tmtypes.Validator, err error) { func (n *Node) validators() (height uint64, validators []*tmtypes.Validator, err error) {
var result ctypes.TMResult
if _, err = n.rpcClient.Call("validators", nil, &result); err != nil {
vals := new(ctypes.ResultValidators)
if _, err = n.rpcClient.Call("validators", nil, vals); err != nil {
return 0, make([]*tmtypes.Validator, 0), err return 0, make([]*tmtypes.Validator, 0), err
} }
vals := result.(*ctypes.ResultValidators)
return uint64(vals.BlockHeight), vals.Validators, nil return uint64(vals.BlockHeight), vals.Validators, nil
} }
@ -238,21 +231,20 @@ func (n *Node) checkIsValidator() {
} }
} }
} else { } else {
n.logger.Log("err", errors.Wrap(err, "check is validator failed"))
n.logger.Info("err", errors.Wrap(err, "check is validator failed"))
} }
} }
func (n *Node) getPubKey() (crypto.PubKey, error) { func (n *Node) getPubKey() (crypto.PubKey, error) {
if n.pubKey != nil {
if !n.pubKey.Empty() {
return n.pubKey, nil return n.pubKey, nil
} }
var result ctypes.TMResult
_, err := n.rpcClient.Call("status", nil, &result)
status := new(ctypes.ResultStatus)
_, err := n.rpcClient.Call("status", nil, status)
if err != nil { if err != nil {
return nil, err
return crypto.PubKey{}, err
} }
status := result.(*ctypes.ResultStatus)
n.pubKey = status.PubKey n.pubKey = status.PubKey
return n.pubKey, nil return n.pubKey, nil
} }
@ -269,16 +261,9 @@ type eventMeter interface {
// UnmarshalEvent unmarshals a json event // UnmarshalEvent unmarshals a json event
func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) { func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) {
var err error
result := new(ctypes.TMResult)
wire.ReadJSONPtr(result, b, &err)
if err != nil {
event := new(ctypes.ResultEvent)
if err := json.Unmarshal(b, event); err != nil {
return "", nil, err return "", nil, err
} }
event, ok := (*result).(*ctypes.ResultEvent)
if !ok {
return "", nil, nil // TODO: handle non-event messages (ie. return from subscribe/unsubscribe)
// fmt.Errorf("Result is not type *ctypes.ResultEvent. Got %v", reflect.TypeOf(*result))
}
return event.Name, event.Data, nil return event.Name, event.Data, nil
} }

+ 5
- 4
tm-monitor/rpc.go View File

@ -4,19 +4,20 @@ import (
"errors" "errors"
"net/http" "net/http"
rpc "github.com/tendermint/go-rpc/server"
rpc "github.com/tendermint/tendermint/rpc/lib/server"
"github.com/tendermint/tmlibs/log"
monitor "github.com/tendermint/tools/tm-monitor/monitor" monitor "github.com/tendermint/tools/tm-monitor/monitor"
) )
func startRPC(listenAddr string, m *monitor.Monitor) {
func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) {
routes := routes(m) routes := routes(m)
// serve http and ws // serve http and ws
mux := http.NewServeMux() mux := http.NewServeMux()
wm := rpc.NewWebsocketManager(routes, nil) // TODO: evsw wm := rpc.NewWebsocketManager(routes, nil) // TODO: evsw
mux.HandleFunc("/websocket", wm.WebsocketHandler) mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpc.RegisterRPCFuncs(mux, routes)
if _, err := rpc.StartHTTPServer(listenAddr, mux); err != nil {
rpc.RegisterRPCFuncs(mux, routes, logger)
if _, err := rpc.StartHTTPServer(listenAddr, mux, logger); err != nil {
panic(err) panic(err)
} }
} }


Loading…
Cancel
Save