* delete tm-monitor tool #4247 * removed docs * remove goxpull/4259/head
@ -1,92 +0,0 @@ | |||
# tm-monitor | |||
Tendermint blockchain monitoring tool; watches over one or more nodes, | |||
collecting and providing various statistics to the user: | |||
- [https://github.com/tendermint/tendermint/tree/master/tools/tm-monitor](https://github.com/tendermint/tendermint/tree/master/tools/tm-monitor) | |||
## Quick Start | |||
### Docker | |||
Assuming your application is running in another container with the name | |||
`app`: | |||
``` | |||
docker run -it --rm -v "/tmp:/tendermint" tendermint/tendermint init | |||
docker run -it --rm -v "/tmp:/tendermint" -p "26657:26657" --name=tm --link=app tendermint/tendermint node --proxy_app=tcp://app:26658 | |||
docker run -it --rm -p "26670:26670" --link=tm tendermint/monitor tm:26657 | |||
``` | |||
If you don't have an application yet, but still want to try monitor out, | |||
use `kvstore`: | |||
``` | |||
docker run -it --rm -v "/tmp:/tendermint" tendermint/tendermint init | |||
docker run -it --rm -v "/tmp:/tendermint" -p "26657:26657" --name=tm tendermint/tendermint node --proxy_app=kvstore | |||
``` | |||
``` | |||
docker run -it --rm -p "26670:26670" --link=tm tendermint/monitor tm:26657 | |||
``` | |||
### Using Binaries | |||
[Install Tendermint](../introduction/install.md). | |||
Start a Tendermint node: | |||
``` | |||
tendermint init | |||
tendermint node --proxy_app=kvstore | |||
``` | |||
In another window, run the monitor: | |||
``` | |||
tm-monitor localhost:26657 | |||
``` | |||
## Usage | |||
``` | |||
tm-monitor [-v] [-no-ton] [-listen-addr="tcp://0.0.0.0:26670"] [endpoints] | |||
Examples: | |||
# monitor single instance | |||
tm-monitor localhost:26657 | |||
# monitor a few instances by providing comma-separated list of RPC endpoints | |||
tm-monitor host1:26657,host2:26657 | |||
Flags: | |||
-listen-addr string | |||
HTTP and Websocket server listen address (default "tcp://0.0.0.0:26670") | |||
-no-ton | |||
Do not show ton (table of nodes) | |||
-v verbose logging | |||
``` | |||
### RPC UI | |||
Run `tm-monitor` and visit http://localhost:26670 You should see the | |||
list of the available RPC endpoints: | |||
``` | |||
http://localhost:26670/status | |||
http://localhost:26670/status/network | |||
http://localhost:26670/monitor?endpoint=_ | |||
http://localhost:26670/status/node?name=_ | |||
http://localhost:26670/unmonitor?endpoint=_ | |||
``` | |||
The API is available as GET requests with URI encoded parameters, or as | |||
JSONRPC POST requests. The JSONRPC methods are also exposed over | |||
websocket. | |||
## Development | |||
``` | |||
make tools | |||
make test | |||
``` |
@ -1,6 +0,0 @@ | |||
FROM alpine:3.8 | |||
WORKDIR /app | |||
COPY tm-monitor /app/tm-monitor | |||
ENTRYPOINT ["./tm-monitor"] |
@ -1,11 +0,0 @@ | |||
FROM golang:latest | |||
RUN mkdir -p /go/src/github.com/tendermint/tools/tm-monitor | |||
WORKDIR /go/src/github.com/tendermint/tools/tm-monitor | |||
COPY Makefile /go/src/github.com/tendermint/tools/tm-monitor/ | |||
RUN make tools | |||
COPY . /go/src/github.com/tendermint/tools/tm-monitor | |||
@ -1,49 +0,0 @@ | |||
DIST_DIRS := find * -type d -exec | |||
VERSION := $(shell perl -ne '/^TMCoreSemVer = "([^"]+)"$$/ && print "v$$1\n"' ../../version/version.go) | |||
all: build test install | |||
######################################## | |||
### Build | |||
build: | |||
@go build | |||
install: | |||
@go install | |||
test: | |||
@go test -race $(PACKAGES) | |||
build-all: | |||
rm -rf ./dist | |||
gox -verbose \ | |||
-ldflags "-s -w" \ | |||
-arch="amd64 386 arm arm64" \ | |||
-os="linux darwin windows freebsd" \ | |||
-osarch="!darwin/arm !darwin/arm64" \ | |||
-output="dist/{{.OS}}-{{.Arch}}/{{.Dir}}" . | |||
dist: build-all | |||
cd dist && \ | |||
$(DIST_DIRS) cp ../LICENSE {} \; && \ | |||
$(DIST_DIRS) tar -zcf tm-monitor-${VERSION}-{}.tar.gz {} \; && \ | |||
shasum -a256 ./*.tar.gz > "./tm-monitor_${VERSION}_SHA256SUMS" && \ | |||
cd .. | |||
######################################## | |||
### Docker | |||
build-docker: | |||
rm -f ./tm-monitor | |||
docker run -it --rm -v "$(PWD)/../../:/go/src/github.com/tendermint/tendermint" -w "/go/src/github.com/tendermint/tendermint/tools/tm-monitor" -e "GO111MODULE=on" -e "CGO_ENABLED=0" golang:1.12 go build -ldflags "-s -w" -o tm-monitor | |||
docker build -t "tendermint/monitor" . | |||
clean: | |||
rm -f ./tm-monitor | |||
rm -rf ./dist | |||
# To avoid unintended conflicts with file names, always add to .PHONY | |||
# unless there is a reason not to. | |||
# https://www.gnu.org/software/make/manual/html_node/Phony-Targets.html | |||
.PHONY: build install test build-all dist build-docker clean |
@ -1,91 +0,0 @@ | |||
# tm-monitor | |||
Tendermint blockchain monitoring tool; watches over one or more nodes, | |||
collecting and providing various statistics to the user: | |||
- [https://github.com/tendermint/tendermint/tree/master/tools/tm-monitor](https://github.com/tendermint/tendermint/tree/master/tools/tm-monitor) | |||
## Quick Start | |||
### Docker | |||
Assuming your application is running in another container with the name | |||
`app`: | |||
``` | |||
docker run -it --rm -v "/tmp:/tendermint" tendermint/tendermint init | |||
docker run -it --rm -v "/tmp:/tendermint" -p "26657:26657" --name=tm --link=app tendermint/tendermint node --proxy_app=tcp://app:26658 | |||
docker run -it --rm -p "26670:26670" --link=tm tendermint/monitor tm:26657 | |||
``` | |||
If you don't have an application yet, but still want to try monitor out, | |||
use `kvstore`: | |||
``` | |||
docker run -it --rm -v "/tmp:/tendermint" tendermint/tendermint init | |||
docker run -it --rm -v "/tmp:/tendermint" -p "26657:26657" --name=tm tendermint/tendermint node --proxy_app=kvstore | |||
docker run -it --rm -p "26670:26670" --link=tm tendermint/monitor tm:26657 | |||
``` | |||
### Using Binaries | |||
[Install Tendermint](https://github.com/tendermint/tendermint#install) | |||
then run: | |||
``` | |||
tendermint init | |||
tendermint node --proxy_app=kvstore | |||
tm-monitor localhost:26657 | |||
``` | |||
with the last command being in a separate window. | |||
## Usage | |||
``` | |||
Tendermint monitor watches over one or more Tendermint core | |||
applications, collecting and providing various statistics to the user. | |||
Usage: | |||
tm-monitor [-no-ton] [-listen-addr="tcp://0.0.0.0:26670"] [endpoints] | |||
Examples: | |||
# monitor single instance | |||
tm-monitor localhost:26657 | |||
# monitor a few instances by providing comma-separated list of RPC endpoints | |||
tm-monitor host1:26657,host2:26657 | |||
Flags: | |||
-listen-addr string | |||
HTTP and Websocket server listen address (default "tcp://0.0.0.0:26670") | |||
-no-ton | |||
Do not show ton (table of nodes) | |||
``` | |||
### RPC UI | |||
Run `tm-monitor` and visit http://localhost:26670 You should see the | |||
list of the available RPC endpoints: | |||
``` | |||
http://localhost:26670/status | |||
http://localhost:26670/status/network | |||
http://localhost:26670/monitor?endpoint=_ | |||
http://localhost:26670/status/node?name=_ | |||
http://localhost:26670/unmonitor?endpoint=_ | |||
``` | |||
The API is available as GET requests with URI encoded parameters, or as | |||
JSONRPC POST requests. The JSONRPC methods are also exposed over | |||
websocket. | |||
## Development | |||
``` | |||
make tools | |||
make test | |||
``` |
@ -1,12 +0,0 @@ | |||
package main | |||
import ( | |||
amino "github.com/tendermint/go-amino" | |||
ctypes "github.com/tendermint/tendermint/rpc/core/types" | |||
) | |||
var cdc = amino.NewCodec() | |||
func init() { | |||
ctypes.RegisterAmino(cdc) | |||
} |
@ -1,296 +0,0 @@ | |||
// eventmeter - generic system to subscribe to events and record their frequency. | |||
package eventmeter | |||
import ( | |||
"context" | |||
"encoding/json" | |||
"fmt" | |||
"sync" | |||
"time" | |||
metrics "github.com/rcrowley/go-metrics" | |||
"github.com/tendermint/tendermint/libs/events" | |||
"github.com/tendermint/tendermint/libs/log" | |||
client "github.com/tendermint/tendermint/rpc/lib/client" | |||
) | |||
const ( | |||
// Get ping/pong latency and call LatencyCallbackFunc with this period. | |||
latencyPeriod = 1 * time.Second | |||
// Check if the WS client is connected every | |||
connectionCheckPeriod = 100 * time.Millisecond | |||
) | |||
// EventMetric exposes metrics for an event. | |||
type EventMetric struct { | |||
ID string `json:"id"` | |||
Started time.Time `json:"start_time"` | |||
LastHeard time.Time `json:"last_heard"` | |||
MinDuration int64 `json:"min_duration"` | |||
MaxDuration int64 `json:"max_duration"` | |||
// tracks event count and rate | |||
meter metrics.Meter | |||
// filled in from the Meter | |||
Count int64 `json:"count"` | |||
Rate1 float64 `json:"rate_1" amino:"unsafe"` | |||
Rate5 float64 `json:"rate_5" amino:"unsafe"` | |||
Rate15 float64 `json:"rate_15" amino:"unsafe"` | |||
RateMean float64 `json:"rate_mean" amino:"unsafe"` | |||
// so the event can have effects in the eventmeter's consumer. runs in a go | |||
// routine. | |||
callback EventCallbackFunc | |||
} | |||
func (metric *EventMetric) Copy() *EventMetric { | |||
metricCopy := *metric | |||
metricCopy.meter = metric.meter.Snapshot() | |||
return &metricCopy | |||
} | |||
// called on GetMetric | |||
func (metric *EventMetric) fillMetric() *EventMetric { | |||
metric.Count = metric.meter.Count() | |||
metric.Rate1 = metric.meter.Rate1() | |||
metric.Rate5 = metric.meter.Rate5() | |||
metric.Rate15 = metric.meter.Rate15() | |||
metric.RateMean = metric.meter.RateMean() | |||
return metric | |||
} | |||
// EventCallbackFunc is a closure to enable side effects from receiving an | |||
// event. | |||
type EventCallbackFunc func(em *EventMetric, data interface{}) | |||
// EventUnmarshalFunc is a closure to get the query and data out of the raw | |||
// JSON received over the RPC WebSocket. | |||
type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error) | |||
// LatencyCallbackFunc is a closure to enable side effects from receiving a latency. | |||
type LatencyCallbackFunc func(meanLatencyNanoSeconds float64) | |||
// DisconnectCallbackFunc is a closure to notify a consumer that the connection | |||
// has died. | |||
type DisconnectCallbackFunc func() | |||
// EventMeter tracks events, reports latency and disconnects. | |||
type EventMeter struct { | |||
wsc *client.WSClient | |||
mtx sync.Mutex | |||
queryToMetricMap map[string]*EventMetric | |||
unmarshalEvent EventUnmarshalFunc | |||
latencyCallback LatencyCallbackFunc | |||
disconnectCallback DisconnectCallbackFunc | |||
subscribed bool | |||
quit chan struct{} | |||
logger log.Logger | |||
} | |||
func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { | |||
return &EventMeter{ | |||
wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)), | |||
queryToMetricMap: make(map[string]*EventMetric), | |||
unmarshalEvent: unmarshalEvent, | |||
logger: log.NewNopLogger(), | |||
} | |||
} | |||
// SetLogger lets you set your own logger. | |||
func (em *EventMeter) SetLogger(l log.Logger) { | |||
em.logger = l | |||
em.wsc.SetLogger(l.With("module", "rpcclient")) | |||
} | |||
// String returns a string representation of event meter. | |||
func (em *EventMeter) String() string { | |||
return em.wsc.Address | |||
} | |||
// Start boots up event meter. | |||
func (em *EventMeter) Start() error { | |||
if err := em.wsc.Start(); err != nil { | |||
return err | |||
} | |||
em.quit = make(chan struct{}) | |||
go em.receiveRoutine() | |||
go em.disconnectRoutine() | |||
err := em.subscribe() | |||
if err != nil { | |||
return err | |||
} | |||
em.subscribed = true | |||
return nil | |||
} | |||
// Stop stops event meter. | |||
func (em *EventMeter) Stop() { | |||
close(em.quit) | |||
if em.wsc.IsRunning() { | |||
em.wsc.Stop() | |||
} | |||
} | |||
// Subscribe for the given query. Callback function will be called upon | |||
// receiving an event. | |||
func (em *EventMeter) Subscribe(query string, cb EventCallbackFunc) error { | |||
em.mtx.Lock() | |||
defer em.mtx.Unlock() | |||
if err := em.wsc.Subscribe(context.TODO(), query); err != nil { | |||
return err | |||
} | |||
metric := &EventMetric{ | |||
meter: metrics.NewMeter(), | |||
callback: cb, | |||
} | |||
em.queryToMetricMap[query] = metric | |||
return nil | |||
} | |||
// Unsubscribe from the given query. | |||
func (em *EventMeter) Unsubscribe(query string) error { | |||
em.mtx.Lock() | |||
defer em.mtx.Unlock() | |||
return em.wsc.Unsubscribe(context.TODO(), query) | |||
} | |||
// GetMetric fills in the latest data for an query and return a copy. | |||
func (em *EventMeter) GetMetric(query string) (*EventMetric, error) { | |||
em.mtx.Lock() | |||
defer em.mtx.Unlock() | |||
metric, ok := em.queryToMetricMap[query] | |||
if !ok { | |||
return nil, fmt.Errorf("unknown query: %s", query) | |||
} | |||
return metric.fillMetric().Copy(), nil | |||
} | |||
// RegisterLatencyCallback allows you to set latency callback. | |||
func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) { | |||
em.mtx.Lock() | |||
defer em.mtx.Unlock() | |||
em.latencyCallback = f | |||
} | |||
// RegisterDisconnectCallback allows you to set disconnect callback. | |||
func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) { | |||
em.mtx.Lock() | |||
defer em.mtx.Unlock() | |||
em.disconnectCallback = f | |||
} | |||
/////////////////////////////////////////////////////////////////////////////// | |||
// Private | |||
func (em *EventMeter) subscribe() error { | |||
for query := range em.queryToMetricMap { | |||
if err := em.wsc.Subscribe(context.TODO(), query); err != nil { | |||
return err | |||
} | |||
} | |||
return nil | |||
} | |||
func (em *EventMeter) receiveRoutine() { | |||
latencyTicker := time.NewTicker(latencyPeriod) | |||
for { | |||
select { | |||
case resp := <-em.wsc.ResponsesCh: | |||
if resp.Error != nil { | |||
em.logger.Error("expected some event, got error", "err", resp.Error.Error()) | |||
continue | |||
} | |||
query, data, err := em.unmarshalEvent(resp.Result) | |||
if err != nil { | |||
em.logger.Error("failed to unmarshal event", "err", err) | |||
continue | |||
} | |||
if query != "" { // FIXME how can it be an empty string? | |||
em.updateMetric(query, data) | |||
} | |||
case <-latencyTicker.C: | |||
if em.wsc.IsActive() { | |||
em.callLatencyCallback(em.wsc.PingPongLatencyTimer.Mean()) | |||
} | |||
case <-em.wsc.Quit(): | |||
return | |||
case <-em.quit: | |||
return | |||
} | |||
} | |||
} | |||
func (em *EventMeter) disconnectRoutine() { | |||
ticker := time.NewTicker(connectionCheckPeriod) | |||
for { | |||
select { | |||
case <-ticker.C: | |||
if em.wsc.IsReconnecting() && em.subscribed { // notify user about disconnect only once | |||
em.callDisconnectCallback() | |||
em.subscribed = false | |||
} else if !em.wsc.IsReconnecting() && !em.subscribed { // resubscribe | |||
em.subscribe() | |||
em.subscribed = true | |||
} | |||
case <-em.wsc.Quit(): | |||
return | |||
case <-em.quit: | |||
return | |||
} | |||
} | |||
} | |||
func (em *EventMeter) updateMetric(query string, data events.EventData) { | |||
em.mtx.Lock() | |||
defer em.mtx.Unlock() | |||
metric, ok := em.queryToMetricMap[query] | |||
if !ok { | |||
// we already unsubscribed, or got an unexpected query | |||
return | |||
} | |||
last := metric.LastHeard | |||
metric.LastHeard = time.Now() | |||
metric.meter.Mark(1) | |||
dur := int64(metric.LastHeard.Sub(last)) | |||
if dur < metric.MinDuration { | |||
metric.MinDuration = dur | |||
} | |||
if !last.IsZero() && dur > metric.MaxDuration { | |||
metric.MaxDuration = dur | |||
} | |||
if metric.callback != nil { | |||
go metric.callback(metric.Copy(), data) | |||
} | |||
} | |||
func (em *EventMeter) callDisconnectCallback() { | |||
em.mtx.Lock() | |||
if em.disconnectCallback != nil { | |||
go em.disconnectCallback() | |||
} | |||
em.mtx.Unlock() | |||
} | |||
func (em *EventMeter) callLatencyCallback(meanLatencyNanoSeconds float64) { | |||
em.mtx.Lock() | |||
if em.latencyCallback != nil { | |||
go em.latencyCallback(meanLatencyNanoSeconds) | |||
} | |||
em.mtx.Unlock() | |||
} |
@ -1,91 +0,0 @@ | |||
package main | |||
import ( | |||
"flag" | |||
"fmt" | |||
"os" | |||
"strings" | |||
"github.com/tendermint/tendermint/libs/log" | |||
tmos "github.com/tendermint/tendermint/libs/os" | |||
monitor "github.com/tendermint/tendermint/tools/tm-monitor/monitor" | |||
) | |||
var logger = log.NewNopLogger() | |||
func main() { | |||
var listenAddr string | |||
var noton bool | |||
flag.StringVar(&listenAddr, "listen-addr", "tcp://0.0.0.0:26670", "HTTP and Websocket server listen address") | |||
flag.BoolVar(¬on, "no-ton", false, "Do not show ton (table of nodes)") | |||
flag.Usage = func() { | |||
fmt.Println(`Tendermint monitor watches over one or more Tendermint core | |||
applications, collecting and providing various statistics to the user. | |||
Usage: | |||
tm-monitor [-no-ton] [-listen-addr="tcp://0.0.0.0:26670"] [endpoints] | |||
Examples: | |||
# monitor single instance | |||
tm-monitor localhost:26657 | |||
# monitor a few instances by providing comma-separated list of RPC endpoints | |||
tm-monitor host1:26657,host2:26657`) | |||
fmt.Println("Flags:") | |||
flag.PrintDefaults() | |||
} | |||
flag.Parse() | |||
if flag.NArg() == 0 { | |||
flag.Usage() | |||
os.Exit(1) | |||
} | |||
if noton { | |||
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)) | |||
} | |||
monitor := startMonitor(flag.Arg(0)) | |||
listener := startRPC(listenAddr, monitor, logger) | |||
var ton *Ton | |||
if !noton { | |||
ton = NewTon(monitor) | |||
ton.Start() | |||
} | |||
// Stop upon receiving SIGTERM or CTRL-C. | |||
tmos.TrapSignal(logger, func() { | |||
if !noton { | |||
ton.Stop() | |||
} | |||
monitor.Stop() | |||
listener.Close() | |||
}) | |||
// Run forever. | |||
select {} | |||
} | |||
func startMonitor(endpoints string) *monitor.Monitor { | |||
m := monitor.NewMonitor() | |||
m.SetLogger(logger.With("component", "monitor")) | |||
for _, e := range strings.Split(endpoints, ",") { | |||
n := monitor.NewNode(e) | |||
n.SetLogger(logger.With("node", e)) | |||
if err := m.Monitor(n); err != nil { | |||
panic(err) | |||
} | |||
} | |||
if err := m.Start(); err != nil { | |||
panic(err) | |||
} | |||
return m | |||
} |
@ -1,69 +0,0 @@ | |||
package mock | |||
import ( | |||
stdlog "log" | |||
"reflect" | |||
amino "github.com/tendermint/go-amino" | |||
"github.com/tendermint/tendermint/libs/log" | |||
em "github.com/tendermint/tendermint/tools/tm-monitor/eventmeter" | |||
) | |||
type EventMeter struct { | |||
latencyCallback em.LatencyCallbackFunc | |||
disconnectCallback em.DisconnectCallbackFunc | |||
eventCallback em.EventCallbackFunc | |||
} | |||
func (e *EventMeter) Start() error { return nil } | |||
func (e *EventMeter) Stop() {} | |||
func (e *EventMeter) SetLogger(l log.Logger) {} | |||
func (e *EventMeter) RegisterLatencyCallback(cb em.LatencyCallbackFunc) { e.latencyCallback = cb } | |||
func (e *EventMeter) RegisterDisconnectCallback(cb em.DisconnectCallbackFunc) { | |||
e.disconnectCallback = cb | |||
} | |||
func (e *EventMeter) Subscribe(query string, cb em.EventCallbackFunc) error { | |||
e.eventCallback = cb | |||
return nil | |||
} | |||
func (e *EventMeter) Unsubscribe(query string) error { | |||
e.eventCallback = nil | |||
return nil | |||
} | |||
func (e *EventMeter) Call(callback string, args ...interface{}) { | |||
switch callback { | |||
case "latencyCallback": | |||
e.latencyCallback(args[0].(float64)) | |||
case "disconnectCallback": | |||
e.disconnectCallback() | |||
case "eventCallback": | |||
e.eventCallback(args[0].(*em.EventMetric), args[1]) | |||
} | |||
} | |||
type RPCClient struct { | |||
Stubs map[string]interface{} | |||
cdc *amino.Codec | |||
} | |||
func (c *RPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { | |||
s, ok := c.Stubs[method] | |||
if !ok { | |||
stdlog.Fatalf("Call to %s, but no stub is defined for it", method) | |||
} | |||
rv, rt := reflect.ValueOf(result), reflect.TypeOf(result) | |||
rv, _ = rv.Elem(), rt.Elem() | |||
rv.Set(reflect.ValueOf(s)) | |||
return s, nil | |||
} | |||
func (c *RPCClient) Codec() *amino.Codec { | |||
return c.cdc | |||
} | |||
func (c *RPCClient) SetCodec(cdc *amino.Codec) { | |||
c.cdc = cdc | |||
} |
@ -1,12 +0,0 @@ | |||
package monitor | |||
import ( | |||
amino "github.com/tendermint/go-amino" | |||
ctypes "github.com/tendermint/tendermint/rpc/core/types" | |||
) | |||
var cdc = amino.NewCodec() | |||
func init() { | |||
ctypes.RegisterAmino(cdc) | |||
} |
@ -1,256 +0,0 @@ | |||
package monitor | |||
import ( | |||
"fmt" | |||
"math/rand" | |||
"sync" | |||
"time" | |||
"github.com/pkg/errors" | |||
"github.com/tendermint/tendermint/libs/log" | |||
tmtypes "github.com/tendermint/tendermint/types" | |||
) | |||
// waiting more than this many seconds for a block means we're unhealthy | |||
const nodeLivenessTimeout = 5 * time.Second | |||
// Monitor keeps track of the nodes and updates common statistics upon | |||
// receiving new events from nodes. | |||
// | |||
// Common statistics is stored in Network struct. | |||
type Monitor struct { | |||
mtx sync.Mutex | |||
Nodes []*Node | |||
Network *Network | |||
monitorQuit chan struct{} // monitor exitting | |||
nodeQuit map[string]chan struct{} // node is being stopped and removed from under the monitor | |||
recalculateNetworkUptimeEvery time.Duration | |||
numValidatorsUpdateInterval time.Duration | |||
logger log.Logger | |||
} | |||
// NewMonitor creates new instance of a Monitor. You can provide options to | |||
// change some default values. | |||
// | |||
// Example: | |||
// NewMonitor(monitor.SetNumValidatorsUpdateInterval(1 * time.Second)) | |||
func NewMonitor(options ...func(*Monitor)) *Monitor { | |||
m := &Monitor{ | |||
Nodes: make([]*Node, 0), | |||
Network: NewNetwork(), | |||
monitorQuit: make(chan struct{}), | |||
nodeQuit: make(map[string]chan struct{}), | |||
recalculateNetworkUptimeEvery: 10 * time.Second, | |||
numValidatorsUpdateInterval: 5 * time.Second, | |||
logger: log.NewNopLogger(), | |||
} | |||
for _, option := range options { | |||
option(m) | |||
} | |||
return m | |||
} | |||
// RecalculateNetworkUptimeEvery lets you change network uptime update interval. | |||
func RecalculateNetworkUptimeEvery(d time.Duration) func(m *Monitor) { | |||
return func(m *Monitor) { | |||
m.recalculateNetworkUptimeEvery = d | |||
} | |||
} | |||
// SetNumValidatorsUpdateInterval lets you change num validators update interval. | |||
func SetNumValidatorsUpdateInterval(d time.Duration) func(m *Monitor) { | |||
return func(m *Monitor) { | |||
m.numValidatorsUpdateInterval = d | |||
} | |||
} | |||
// SetLogger lets you set your own logger | |||
func (m *Monitor) SetLogger(l log.Logger) { | |||
m.logger = l | |||
} | |||
// Monitor begins to monitor the node `n`. The node will be started and added | |||
// to the monitor. | |||
func (m *Monitor) Monitor(n *Node) error { | |||
m.mtx.Lock() | |||
m.Nodes = append(m.Nodes, n) | |||
m.mtx.Unlock() | |||
blockCh := make(chan *tmtypes.Block, 10) | |||
n.SendBlocksTo(blockCh) | |||
blockLatencyCh := make(chan float64, 10) | |||
n.SendBlockLatenciesTo(blockLatencyCh) | |||
disconnectCh := make(chan bool, 10) | |||
n.NotifyAboutDisconnects(disconnectCh) | |||
if err := n.Start(); err != nil { | |||
return err | |||
} | |||
m.Network.NewNode(n.Name) | |||
m.nodeQuit[n.Name] = make(chan struct{}) | |||
go m.listen(n.Name, blockCh, blockLatencyCh, disconnectCh, m.nodeQuit[n.Name]) | |||
return nil | |||
} | |||
// Unmonitor stops monitoring node `n`. The node will be stopped and removed | |||
// from the monitor. | |||
func (m *Monitor) Unmonitor(n *Node) { | |||
m.Network.NodeDeleted(n.Name) | |||
n.Stop() | |||
close(m.nodeQuit[n.Name]) | |||
delete(m.nodeQuit, n.Name) | |||
i, _ := m.NodeByName(n.Name) | |||
m.mtx.Lock() | |||
m.Nodes[i] = m.Nodes[len(m.Nodes)-1] | |||
m.Nodes = m.Nodes[:len(m.Nodes)-1] | |||
m.mtx.Unlock() | |||
} | |||
// NodeByName returns the node and its index if such node exists within the | |||
// monitor. Otherwise, -1 and nil are returned. | |||
func (m *Monitor) NodeByName(name string) (index int, node *Node) { | |||
m.mtx.Lock() | |||
defer m.mtx.Unlock() | |||
for i, n := range m.Nodes { | |||
if name == n.Name { | |||
return i, n | |||
} | |||
} | |||
return -1, nil | |||
} | |||
// NodeIsOnline is called when connection to the node is restored. | |||
// Must be safe to call multiple times. | |||
func (m *Monitor) NodeIsOnline(name string) { | |||
_, node := m.NodeByName(name) | |||
if nil != node { | |||
if online, ok := m.Network.nodeStatusMap[name]; ok && online { | |||
m.mtx.Lock() | |||
node.Online = online | |||
m.mtx.Unlock() | |||
} | |||
} | |||
} | |||
// Start starts the monitor's routines: recalculating network uptime and | |||
// updating number of validators. | |||
func (m *Monitor) Start() error { | |||
go m.recalculateNetworkUptimeLoop() | |||
go m.updateNumValidatorLoop() | |||
return nil | |||
} | |||
// Stop stops the monitor's routines. | |||
func (m *Monitor) Stop() { | |||
close(m.monitorQuit) | |||
for _, n := range m.Nodes { | |||
m.Unmonitor(n) | |||
} | |||
} | |||
// main loop where we listen for events from the node | |||
func (m *Monitor) listen( | |||
nodeName string, | |||
blockCh <-chan *tmtypes.Block, | |||
blockLatencyCh <-chan float64, | |||
disconnectCh <-chan bool, | |||
quit <-chan struct{}) { | |||
logger := m.logger.With("node", nodeName) | |||
for { | |||
select { | |||
case <-quit: | |||
return | |||
case b := <-blockCh: | |||
m.Network.NewBlock(b) | |||
m.Network.NodeIsOnline(nodeName) | |||
m.NodeIsOnline(nodeName) | |||
case l := <-blockLatencyCh: | |||
m.Network.NewBlockLatency(l) | |||
m.Network.NodeIsOnline(nodeName) | |||
m.NodeIsOnline(nodeName) | |||
case disconnected := <-disconnectCh: | |||
if disconnected { | |||
m.Network.NodeIsDown(nodeName) | |||
} else { | |||
m.Network.NodeIsOnline(nodeName) | |||
m.NodeIsOnline(nodeName) | |||
} | |||
case <-time.After(nodeLivenessTimeout): | |||
logger.Info("event", fmt.Sprintf("node was not responding for %v", nodeLivenessTimeout)) | |||
m.Network.NodeIsDown(nodeName) | |||
} | |||
} | |||
} | |||
// recalculateNetworkUptimeLoop every N seconds. | |||
func (m *Monitor) recalculateNetworkUptimeLoop() { | |||
for { | |||
select { | |||
case <-m.monitorQuit: | |||
return | |||
case <-time.After(m.recalculateNetworkUptimeEvery): | |||
m.Network.RecalculateUptime() | |||
} | |||
} | |||
} | |||
// updateNumValidatorLoop sends a request to a random node once every N seconds, | |||
// which in turn makes an RPC call to get the latest validators. | |||
func (m *Monitor) updateNumValidatorLoop() { | |||
rand.Seed(time.Now().Unix()) | |||
var height int64 | |||
var num int | |||
var err error | |||
for { | |||
m.mtx.Lock() | |||
nodesCount := len(m.Nodes) | |||
m.mtx.Unlock() | |||
if nodesCount == 0 { | |||
time.Sleep(m.numValidatorsUpdateInterval) | |||
continue | |||
} | |||
randomNodeIndex := rand.Intn(nodesCount) | |||
select { | |||
case <-m.monitorQuit: | |||
return | |||
case <-time.After(m.numValidatorsUpdateInterval): | |||
i := 0 | |||
m.mtx.Lock() | |||
for _, n := range m.Nodes { | |||
if i == randomNodeIndex { | |||
height, num, err = n.NumValidators() | |||
if err != nil { | |||
m.logger.Info("err", errors.Wrap(err, "update num validators failed")) | |||
} | |||
break | |||
} | |||
i++ | |||
} | |||
m.mtx.Unlock() | |||
m.Network.UpdateNumValidatorsForHeight(num, height) | |||
} | |||
} | |||
} |
@ -1,75 +0,0 @@ | |||
package monitor_test | |||
import ( | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/assert" | |||
"github.com/stretchr/testify/require" | |||
amino "github.com/tendermint/go-amino" | |||
"github.com/tendermint/tendermint/crypto/ed25519" | |||
ctypes "github.com/tendermint/tendermint/rpc/core/types" | |||
mock "github.com/tendermint/tendermint/tools/tm-monitor/mock" | |||
monitor "github.com/tendermint/tendermint/tools/tm-monitor/monitor" | |||
tmtypes "github.com/tendermint/tendermint/types" | |||
) | |||
func TestMonitorUpdatesNumberOfValidators(t *testing.T) { | |||
m := startMonitor(t) | |||
defer m.Stop() | |||
n, _ := createValidatorNode(t) | |||
m.Monitor(n) | |||
assert.Equal(t, 1, m.Network.NumNodesMonitored) | |||
assert.Equal(t, 1, m.Network.NumNodesMonitoredOnline) | |||
time.Sleep(1 * time.Second) | |||
// DATA RACE | |||
// assert.Equal(t, 1, m.Network.NumValidators()) | |||
} | |||
func TestMonitorRecalculatesNetworkUptime(t *testing.T) { | |||
m := startMonitor(t) | |||
defer m.Stop() | |||
assert.Equal(t, 100.0, m.Network.Uptime()) | |||
n, _ := createValidatorNode(t) | |||
m.Monitor(n) | |||
m.Network.NodeIsDown(n.Name) // simulate node failure | |||
time.Sleep(200 * time.Millisecond) | |||
m.Network.NodeIsOnline(n.Name) | |||
time.Sleep(1 * time.Second) | |||
assert.True(t, m.Network.Uptime() < 100.0, "Uptime should be less than 100%") | |||
} | |||
func startMonitor(t *testing.T) *monitor.Monitor { | |||
m := monitor.NewMonitor( | |||
monitor.SetNumValidatorsUpdateInterval(200*time.Millisecond), | |||
monitor.RecalculateNetworkUptimeEvery(200*time.Millisecond), | |||
) | |||
err := m.Start() | |||
require.Nil(t, err) | |||
return m | |||
} | |||
func createValidatorNode(t *testing.T) (n *monitor.Node, emMock *mock.EventMeter) { | |||
emMock = &mock.EventMeter{} | |||
stubs := make(map[string]interface{}) | |||
pubKey := ed25519.GenPrivKey().PubKey() | |||
stubs["validators"] = ctypes.ResultValidators{ | |||
BlockHeight: blockHeight, | |||
Validators: []*tmtypes.Validator{tmtypes.NewValidator(pubKey, 0)}, | |||
} | |||
stubs["status"] = ctypes.ResultStatus{ValidatorInfo: ctypes.ValidatorInfo{PubKey: pubKey}} | |||
cdc := amino.NewCodec() | |||
rpcClientMock := &mock.RPCClient{Stubs: stubs} | |||
rpcClientMock.SetCodec(cdc) | |||
n = monitor.NewNodeWithEventMeterAndRPCClient("tcp://127.0.0.1:26657", emMock, rpcClientMock) | |||
return | |||
} |
@ -1,209 +0,0 @@ | |||
package monitor | |||
import ( | |||
"sync" | |||
"time" | |||
metrics "github.com/rcrowley/go-metrics" | |||
tmtypes "github.com/tendermint/tendermint/types" | |||
) | |||
// UptimeData stores data for how long network has been running. | |||
type UptimeData struct { | |||
StartTime time.Time `json:"start_time"` | |||
Uptime float64 `json:"uptime" amino:"unsafe"` // percentage of time we've been healthy, ever | |||
totalDownTime time.Duration // total downtime (only updated when we come back online) | |||
wentDown time.Time | |||
} | |||
// Health describes the health of the network. Note that this applies only to | |||
// the observed nodes, and not to the entire cluster, which may consist of | |||
// thousands of machines. It may change in the future. | |||
type Health int | |||
const ( | |||
// FullHealth means all nodes online, synced, validators making blocks | |||
FullHealth = Health(0) | |||
// ModerateHealth means we're making blocks | |||
ModerateHealth = Health(1) | |||
// Dead means we're not making blocks due to all validators freezing or crashing | |||
Dead = Health(2) | |||
) | |||
// Common statistics for network of nodes | |||
type Network struct { | |||
Height int64 `json:"height"` | |||
AvgBlockTime float64 `json:"avg_block_time" amino:"unsafe"` // ms (avg over last minute) | |||
blockTimeMeter metrics.Meter | |||
AvgTxThroughput float64 `json:"avg_tx_throughput" amino:"unsafe"` // tx/s (avg over last minute) | |||
txThroughputMeter metrics.Meter | |||
AvgBlockLatency float64 `json:"avg_block_latency" amino:"unsafe"` // ms (avg over last minute) | |||
blockLatencyMeter metrics.Meter | |||
NumValidators int `json:"num_validators"` | |||
NumNodesMonitored int `json:"num_nodes_monitored"` | |||
NumNodesMonitoredOnline int `json:"num_nodes_monitored_online"` | |||
Health Health `json:"health"` | |||
UptimeData *UptimeData `json:"uptime_data"` | |||
nodeStatusMap map[string]bool | |||
mu sync.Mutex | |||
} | |||
func NewNetwork() *Network { | |||
return &Network{ | |||
blockTimeMeter: metrics.NewMeter(), | |||
txThroughputMeter: metrics.NewMeter(), | |||
blockLatencyMeter: metrics.NewMeter(), | |||
Health: FullHealth, | |||
UptimeData: &UptimeData{ | |||
StartTime: time.Now(), | |||
Uptime: 100.0, | |||
}, | |||
nodeStatusMap: make(map[string]bool), | |||
} | |||
} | |||
func (n *Network) NewBlock(b *tmtypes.Block) { | |||
n.mu.Lock() | |||
defer n.mu.Unlock() | |||
if n.Height >= b.Height { | |||
return | |||
} | |||
n.Height = b.Height | |||
n.blockTimeMeter.Mark(1) | |||
if n.blockTimeMeter.Rate1() > 0.0 { | |||
n.AvgBlockTime = (1.0 / n.blockTimeMeter.Rate1()) * 1000 // 1/s to ms | |||
} else { | |||
n.AvgBlockTime = 0.0 | |||
} | |||
n.txThroughputMeter.Mark(int64(len(b.Data.Txs))) | |||
n.AvgTxThroughput = n.txThroughputMeter.Rate1() | |||
} | |||
func (n *Network) NewBlockLatency(l float64) { | |||
n.mu.Lock() | |||
defer n.mu.Unlock() | |||
n.blockLatencyMeter.Mark(int64(l)) | |||
n.AvgBlockLatency = n.blockLatencyMeter.Rate1() / 1000000.0 // ns to ms | |||
} | |||
// RecalculateUptime calculates uptime on demand. | |||
func (n *Network) RecalculateUptime() { | |||
n.mu.Lock() | |||
defer n.mu.Unlock() | |||
since := time.Since(n.UptimeData.StartTime) | |||
uptime := since - n.UptimeData.totalDownTime | |||
if n.Health != FullHealth { | |||
uptime -= time.Since(n.UptimeData.wentDown) | |||
} | |||
n.UptimeData.Uptime = (float64(uptime) / float64(since)) * 100.0 | |||
} | |||
// NodeIsDown is called when the node disconnects for whatever reason. | |||
// Must be safe to call multiple times. | |||
func (n *Network) NodeIsDown(name string) { | |||
n.mu.Lock() | |||
defer n.mu.Unlock() | |||
if online, ok := n.nodeStatusMap[name]; !ok || online { | |||
n.nodeStatusMap[name] = false | |||
n.NumNodesMonitoredOnline-- | |||
n.UptimeData.wentDown = time.Now() | |||
n.updateHealth() | |||
} | |||
} | |||
// NodeIsOnline is called when connection to the node is restored. | |||
// Must be safe to call multiple times. | |||
func (n *Network) NodeIsOnline(name string) { | |||
n.mu.Lock() | |||
defer n.mu.Unlock() | |||
if online, ok := n.nodeStatusMap[name]; ok && !online { | |||
n.nodeStatusMap[name] = true | |||
n.NumNodesMonitoredOnline++ | |||
n.UptimeData.totalDownTime += time.Since(n.UptimeData.wentDown) | |||
n.updateHealth() | |||
} | |||
} | |||
// NewNode is called when the new node is added to the monitor. | |||
func (n *Network) NewNode(name string) { | |||
n.mu.Lock() | |||
defer n.mu.Unlock() | |||
n.NumNodesMonitored++ | |||
n.NumNodesMonitoredOnline++ | |||
n.updateHealth() | |||
} | |||
// NodeDeleted is called when the node is deleted from under the monitor. | |||
func (n *Network) NodeDeleted(name string) { | |||
n.mu.Lock() | |||
defer n.mu.Unlock() | |||
n.NumNodesMonitored-- | |||
n.NumNodesMonitoredOnline-- | |||
n.updateHealth() | |||
} | |||
func (n *Network) updateHealth() { | |||
// if we are connected to all validators, we're at full health | |||
// TODO: make sure they're all at the same height (within a block) | |||
// and all proposing (and possibly validating ) Alternatively, just | |||
// check there hasn't been a new round in numValidators rounds | |||
switch { | |||
case n.NumValidators != 0 && n.NumNodesMonitoredOnline == n.NumValidators: | |||
n.Health = FullHealth | |||
case n.NumNodesMonitoredOnline > 0 && n.NumNodesMonitoredOnline <= n.NumNodesMonitored: | |||
n.Health = ModerateHealth | |||
default: | |||
n.Health = Dead | |||
} | |||
} | |||
func (n *Network) UpdateNumValidatorsForHeight(num int, height int64) { | |||
n.mu.Lock() | |||
defer n.mu.Unlock() | |||
if n.Height <= height { | |||
n.NumValidators = num | |||
} | |||
n.updateHealth() | |||
} | |||
func (n *Network) GetHealthString() string { | |||
switch n.Health { | |||
case FullHealth: | |||
return "full" | |||
case ModerateHealth: | |||
return "moderate" | |||
case Dead: | |||
return "dead" | |||
default: | |||
return "undefined" | |||
} | |||
} | |||
// Uptime returns network's uptime in percentages. | |||
func (n *Network) Uptime() float64 { | |||
n.mu.Lock() | |||
defer n.mu.Unlock() | |||
return n.UptimeData.Uptime | |||
} | |||
// StartTime returns time we started monitoring. | |||
func (n *Network) StartTime() time.Time { | |||
return n.UptimeData.StartTime | |||
} |
@ -1,82 +0,0 @@ | |||
package monitor_test | |||
import ( | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/assert" | |||
monitor "github.com/tendermint/tendermint/tools/tm-monitor/monitor" | |||
tmtypes "github.com/tendermint/tendermint/types" | |||
) | |||
func TestNetworkNewBlock(t *testing.T) { | |||
n := monitor.NewNetwork() | |||
n.NewBlock(&tmtypes.Block{ | |||
Header: tmtypes.Header{Height: 5}, | |||
}) | |||
assert.Equal(t, int64(5), n.Height) | |||
assert.Equal(t, 0.0, n.AvgBlockTime) | |||
assert.Equal(t, 0.0, n.AvgTxThroughput) | |||
} | |||
func TestNetworkNewBlockLatency(t *testing.T) { | |||
n := monitor.NewNetwork() | |||
n.NewBlockLatency(9000000.0) // nanoseconds | |||
assert.Equal(t, 0.0, n.AvgBlockLatency) | |||
} | |||
func TestNetworkNodeIsDownThenOnline(t *testing.T) { | |||
n := monitor.NewNetwork() | |||
n.NewNode("test") | |||
n.NodeIsDown("test") | |||
assert.Equal(t, 0, n.NumNodesMonitoredOnline) | |||
assert.Equal(t, monitor.Dead, n.Health) | |||
n.NodeIsDown("test") | |||
assert.Equal(t, 0, n.NumNodesMonitoredOnline) | |||
n.NodeIsOnline("test") | |||
assert.Equal(t, 1, n.NumNodesMonitoredOnline) | |||
assert.Equal(t, monitor.ModerateHealth, n.Health) | |||
n.NodeIsOnline("test") | |||
assert.Equal(t, 1, n.NumNodesMonitoredOnline) | |||
} | |||
func TestNetworkNewNode(t *testing.T) { | |||
n := monitor.NewNetwork() | |||
assert.Equal(t, 0, n.NumNodesMonitored) | |||
assert.Equal(t, 0, n.NumNodesMonitoredOnline) | |||
n.NewNode("test") | |||
assert.Equal(t, 1, n.NumNodesMonitored) | |||
assert.Equal(t, 1, n.NumNodesMonitoredOnline) | |||
} | |||
func TestNetworkNodeDeleted(t *testing.T) { | |||
n := monitor.NewNetwork() | |||
n.NewNode("test") | |||
n.NodeDeleted("test") | |||
assert.Equal(t, 0, n.NumNodesMonitored) | |||
assert.Equal(t, 0, n.NumNodesMonitoredOnline) | |||
} | |||
func TestNetworkGetHealthString(t *testing.T) { | |||
n := monitor.NewNetwork() | |||
assert.Equal(t, "full", n.GetHealthString()) | |||
n.Health = monitor.ModerateHealth | |||
assert.Equal(t, "moderate", n.GetHealthString()) | |||
n.Health = monitor.Dead | |||
assert.Equal(t, "dead", n.GetHealthString()) | |||
} | |||
func TestNetworkUptime(t *testing.T) { | |||
n := monitor.NewNetwork() | |||
assert.Equal(t, 100.0, n.Uptime()) | |||
} | |||
func TestNetworkStartTime(t *testing.T) { | |||
n := monitor.NewNetwork() | |||
assert.True(t, n.StartTime().Before(time.Now())) | |||
} |
@ -1,265 +0,0 @@ | |||
package monitor | |||
import ( | |||
"encoding/json" | |||
"math" | |||
"time" | |||
"github.com/pkg/errors" | |||
"github.com/tendermint/tendermint/crypto" | |||
"github.com/tendermint/tendermint/libs/events" | |||
"github.com/tendermint/tendermint/libs/log" | |||
ctypes "github.com/tendermint/tendermint/rpc/core/types" | |||
rpc_client "github.com/tendermint/tendermint/rpc/lib/client" | |||
em "github.com/tendermint/tendermint/tools/tm-monitor/eventmeter" | |||
tmtypes "github.com/tendermint/tendermint/types" | |||
) | |||
const maxRestarts = 25 | |||
type Node struct { | |||
IsValidator bool `json:"is_validator"` // validator or non-validator? | |||
Online bool `json:"online"` | |||
Height int64 `json:"height"` | |||
rpcAddr string | |||
Name string `json:"name"` | |||
pubKey crypto.PubKey | |||
BlockLatency float64 `json:"block_latency" amino:"unsafe"` // ms, interval between block commits | |||
// em holds the ws connection. Each eventMeter callback is called in a separate go-routine. | |||
em eventMeter | |||
// rpcClient is an client for making RPC calls to TM | |||
rpcClient rpc_client.HTTPClient | |||
blockCh chan<- *tmtypes.Block | |||
blockLatencyCh chan<- float64 | |||
disconnectCh chan<- bool | |||
checkIsValidatorInterval time.Duration | |||
quit chan struct{} | |||
logger log.Logger | |||
} | |||
func NewNode(rpcAddr string, options ...func(*Node)) *Node { | |||
em := em.NewEventMeter(rpcAddr, UnmarshalEvent) | |||
rpcClient := rpc_client.NewURIClient(rpcAddr) // HTTP client by default | |||
rpcClient.SetCodec(cdc) | |||
return NewNodeWithEventMeterAndRPCClient(rpcAddr, em, rpcClient, options...) | |||
} | |||
func NewNodeWithEventMeterAndRPCClient( | |||
rpcAddr string, | |||
em eventMeter, | |||
rpcClient rpc_client.HTTPClient, | |||
options ...func(*Node), | |||
) *Node { | |||
n := &Node{ | |||
rpcAddr: rpcAddr, | |||
em: em, | |||
rpcClient: rpcClient, | |||
Name: rpcAddr, | |||
quit: make(chan struct{}), | |||
checkIsValidatorInterval: 5 * time.Second, | |||
logger: log.NewNopLogger(), | |||
} | |||
for _, option := range options { | |||
option(n) | |||
} | |||
return n | |||
} | |||
// SetCheckIsValidatorInterval lets you change interval for checking whenever | |||
// node is still a validator or not. | |||
func SetCheckIsValidatorInterval(d time.Duration) func(n *Node) { | |||
return func(n *Node) { | |||
n.checkIsValidatorInterval = d | |||
} | |||
} | |||
func (n *Node) SendBlocksTo(ch chan<- *tmtypes.Block) { | |||
n.blockCh = ch | |||
} | |||
func (n *Node) SendBlockLatenciesTo(ch chan<- float64) { | |||
n.blockLatencyCh = ch | |||
} | |||
func (n *Node) NotifyAboutDisconnects(ch chan<- bool) { | |||
n.disconnectCh = ch | |||
} | |||
// SetLogger lets you set your own logger | |||
func (n *Node) SetLogger(l log.Logger) { | |||
n.logger = l | |||
n.em.SetLogger(l) | |||
} | |||
func (n *Node) Start() error { | |||
if err := n.em.Start(); err != nil { | |||
return err | |||
} | |||
n.em.RegisterLatencyCallback(latencyCallback(n)) | |||
err := n.em.Subscribe(tmtypes.EventQueryNewBlock.String(), newBlockCallback(n)) | |||
if err != nil { | |||
return err | |||
} | |||
n.em.RegisterDisconnectCallback(disconnectCallback(n)) | |||
n.Online = true | |||
n.checkIsValidator() | |||
go n.checkIsValidatorLoop() | |||
return nil | |||
} | |||
func (n *Node) Stop() { | |||
n.Online = false | |||
n.em.Stop() | |||
close(n.quit) | |||
} | |||
// implements eventmeter.EventCallbackFunc | |||
func newBlockCallback(n *Node) em.EventCallbackFunc { | |||
return func(metric *em.EventMetric, data interface{}) { | |||
block := data.(tmtypes.TMEventData).(tmtypes.EventDataNewBlock).Block | |||
n.Height = block.Height | |||
n.logger.Info("new block", "height", block.Height) | |||
if n.blockCh != nil { | |||
n.blockCh <- block | |||
} | |||
} | |||
} | |||
// implements eventmeter.EventLatencyFunc | |||
func latencyCallback(n *Node) em.LatencyCallbackFunc { | |||
return func(latency float64) { | |||
n.BlockLatency = latency / 1000000.0 // ns to ms | |||
n.logger.Info("new block latency", "latency", n.BlockLatency) | |||
if n.blockLatencyCh != nil { | |||
n.blockLatencyCh <- latency | |||
} | |||
} | |||
} | |||
// implements eventmeter.DisconnectCallbackFunc | |||
func disconnectCallback(n *Node) em.DisconnectCallbackFunc { | |||
return func() { | |||
n.Online = false | |||
n.logger.Info("status", "down") | |||
if n.disconnectCh != nil { | |||
n.disconnectCh <- true | |||
} | |||
} | |||
} | |||
func (n *Node) RestartEventMeterBackoff() error { | |||
attempt := 0 | |||
for { | |||
d := time.Duration(math.Exp2(float64(attempt))) | |||
time.Sleep(d * time.Second) | |||
if err := n.em.Start(); err != nil { | |||
n.logger.Info("restart failed", "err", err) | |||
} else { | |||
// TODO: authenticate pubkey | |||
return nil | |||
} | |||
attempt++ | |||
if attempt > maxRestarts { | |||
return errors.New("reached max restarts") | |||
} | |||
} | |||
} | |||
func (n *Node) NumValidators() (height int64, num int, err error) { | |||
height, vals, err := n.validators() | |||
if err != nil { | |||
return 0, 0, err | |||
} | |||
return height, len(vals), nil | |||
} | |||
func (n *Node) validators() (height int64, validators []*tmtypes.Validator, err error) { | |||
vals := new(ctypes.ResultValidators) | |||
if _, err = n.rpcClient.Call("validators", nil, vals); err != nil { | |||
return 0, make([]*tmtypes.Validator, 0), err | |||
} | |||
return vals.BlockHeight, vals.Validators, nil | |||
} | |||
func (n *Node) checkIsValidatorLoop() { | |||
for { | |||
select { | |||
case <-n.quit: | |||
return | |||
case <-time.After(n.checkIsValidatorInterval): | |||
n.checkIsValidator() | |||
} | |||
} | |||
} | |||
func (n *Node) checkIsValidator() { | |||
_, validators, err := n.validators() | |||
if err == nil { | |||
for _, v := range validators { | |||
key, err1 := n.getPubKey() | |||
if err1 == nil && v.PubKey.Equals(key) { | |||
n.IsValidator = true | |||
} | |||
} | |||
} else { | |||
n.logger.Info("check is validator failed", "err", err) | |||
} | |||
} | |||
func (n *Node) getPubKey() (crypto.PubKey, error) { | |||
if n.pubKey != nil { | |||
return n.pubKey, nil | |||
} | |||
status := new(ctypes.ResultStatus) | |||
_, err := n.rpcClient.Call("status", nil, status) | |||
if err != nil { | |||
return nil, err | |||
} | |||
n.pubKey = status.ValidatorInfo.PubKey | |||
return n.pubKey, nil | |||
} | |||
type eventMeter interface { | |||
Start() error | |||
Stop() | |||
RegisterLatencyCallback(em.LatencyCallbackFunc) | |||
RegisterDisconnectCallback(em.DisconnectCallbackFunc) | |||
Subscribe(string, em.EventCallbackFunc) error | |||
Unsubscribe(string) error | |||
SetLogger(l log.Logger) | |||
} | |||
// UnmarshalEvent unmarshals a json event | |||
func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) { | |||
event := new(ctypes.ResultEvent) | |||
if err := cdc.UnmarshalJSON(b, event); err != nil { | |||
return "", nil, err | |||
} | |||
return event.Query, event.Data, nil | |||
} |
@ -1,96 +0,0 @@ | |||
package monitor_test | |||
import ( | |||
"testing" | |||
"github.com/stretchr/testify/assert" | |||
"github.com/stretchr/testify/require" | |||
amino "github.com/tendermint/go-amino" | |||
"github.com/tendermint/tendermint/crypto/ed25519" | |||
ctypes "github.com/tendermint/tendermint/rpc/core/types" | |||
em "github.com/tendermint/tendermint/tools/tm-monitor/eventmeter" | |||
mock "github.com/tendermint/tendermint/tools/tm-monitor/mock" | |||
monitor "github.com/tendermint/tendermint/tools/tm-monitor/monitor" | |||
tmtypes "github.com/tendermint/tendermint/types" | |||
) | |||
const ( | |||
blockHeight = int64(1) | |||
) | |||
func TestNodeStartStop(t *testing.T) { | |||
n, _ := startValidatorNode(t) | |||
defer n.Stop() | |||
assert.Equal(t, true, n.Online) | |||
assert.Equal(t, true, n.IsValidator) | |||
} | |||
func TestNodeNewBlockReceived(t *testing.T) { | |||
blockCh := make(chan *tmtypes.Block, 100) | |||
n, emMock := startValidatorNode(t) | |||
defer n.Stop() | |||
n.SendBlocksTo(blockCh) | |||
block := &tmtypes.Block{Header: tmtypes.Header{Height: 5}} | |||
emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlock{Block: block}) | |||
assert.Equal(t, int64(5), n.Height) | |||
assert.Equal(t, block, <-blockCh) | |||
} | |||
func TestNodeNewBlockLatencyReceived(t *testing.T) { | |||
blockLatencyCh := make(chan float64, 100) | |||
n, emMock := startValidatorNode(t) | |||
defer n.Stop() | |||
n.SendBlockLatenciesTo(blockLatencyCh) | |||
emMock.Call("latencyCallback", 1000000.0) | |||
assert.Equal(t, 1.0, n.BlockLatency) | |||
assert.Equal(t, 1000000.0, <-blockLatencyCh) | |||
} | |||
func TestNodeConnectionLost(t *testing.T) { | |||
disconnectCh := make(chan bool, 100) | |||
n, emMock := startValidatorNode(t) | |||
defer n.Stop() | |||
n.NotifyAboutDisconnects(disconnectCh) | |||
emMock.Call("disconnectCallback") | |||
assert.Equal(t, true, <-disconnectCh) | |||
assert.Equal(t, false, n.Online) | |||
} | |||
func TestNumValidators(t *testing.T) { | |||
n, _ := startValidatorNode(t) | |||
defer n.Stop() | |||
height, num, err := n.NumValidators() | |||
assert.Nil(t, err) | |||
assert.Equal(t, blockHeight, height) | |||
assert.Equal(t, 1, num) | |||
} | |||
func startValidatorNode(t *testing.T) (n *monitor.Node, emMock *mock.EventMeter) { | |||
emMock = &mock.EventMeter{} | |||
stubs := make(map[string]interface{}) | |||
pubKey := ed25519.GenPrivKey().PubKey() | |||
stubs["validators"] = ctypes.ResultValidators{ | |||
BlockHeight: blockHeight, | |||
Validators: []*tmtypes.Validator{tmtypes.NewValidator(pubKey, 0)}, | |||
} | |||
stubs["status"] = ctypes.ResultStatus{ValidatorInfo: ctypes.ValidatorInfo{PubKey: pubKey}} | |||
cdc := amino.NewCodec() | |||
rpcClientMock := &mock.RPCClient{Stubs: stubs} | |||
rpcClientMock.SetCodec(cdc) | |||
n = monitor.NewNodeWithEventMeterAndRPCClient("tcp://127.0.0.1:26657", emMock, rpcClientMock) | |||
err := n.Start() | |||
require.Nil(t, err) | |||
return | |||
} |
@ -1,131 +0,0 @@ | |||
package main | |||
import ( | |||
"errors" | |||
"net" | |||
"net/http" | |||
rpctypes "github.com/tendermint/tendermint/rpc/lib/types" | |||
"github.com/tendermint/tendermint/libs/log" | |||
rpc "github.com/tendermint/tendermint/rpc/lib/server" | |||
"github.com/tendermint/tendermint/tools/tm-monitor/monitor" | |||
) | |||
func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) net.Listener { | |||
routes := routes(m) | |||
mux := http.NewServeMux() | |||
wm := rpc.NewWebsocketManager(routes, nil) | |||
mux.HandleFunc("/websocket", wm.WebsocketHandler) | |||
rpc.RegisterRPCFuncs(mux, routes, cdc, logger) | |||
config := rpc.DefaultConfig() | |||
listener, err := rpc.Listen(listenAddr, config) | |||
if err != nil { | |||
panic(err) | |||
} | |||
go rpc.StartHTTPServer(listener, mux, logger, config) | |||
return listener | |||
} | |||
func routes(m *monitor.Monitor) map[string]*rpc.RPCFunc { | |||
return map[string]*rpc.RPCFunc{ | |||
"status": rpc.NewRPCFunc(RPCStatus(m), ""), | |||
"status/network": rpc.NewRPCFunc(RPCNetworkStatus(m), ""), | |||
"status/node": rpc.NewRPCFunc(RPCNodeStatus(m), "name"), | |||
"monitor": rpc.NewRPCFunc(RPCMonitor(m), "endpoint"), | |||
"unmonitor": rpc.NewRPCFunc(RPCUnmonitor(m), "endpoint"), | |||
// "start_meter": rpc.NewRPCFunc(network.StartMeter, "chainID,valID,event"), | |||
// "stop_meter": rpc.NewRPCFunc(network.StopMeter, "chainID,valID,event"), | |||
// "meter": rpc.NewRPCFunc(GetMeterResult(network), "chainID,valID,event"), | |||
} | |||
} | |||
// RPCStatus returns common statistics for the network and statistics per node. | |||
func RPCStatus(m *monitor.Monitor) func(*rpctypes.Context) (networkAndNodes, error) { | |||
return func(_ *rpctypes.Context) (networkAndNodes, error) { | |||
return networkAndNodes{m.Network, m.Nodes}, nil | |||
} | |||
} | |||
// RPCNetworkStatus returns common statistics for the network. | |||
func RPCNetworkStatus(m *monitor.Monitor) func(*rpctypes.Context) (*monitor.Network, error) { | |||
return func(_ *rpctypes.Context) (*monitor.Network, error) { | |||
return m.Network, nil | |||
} | |||
} | |||
// RPCNodeStatus returns statistics for the given node. | |||
func RPCNodeStatus(m *monitor.Monitor) func(*rpctypes.Context, string) (*monitor.Node, error) { | |||
return func(_ *rpctypes.Context, name string) (*monitor.Node, error) { | |||
if i, n := m.NodeByName(name); i != -1 { | |||
return n, nil | |||
} | |||
return nil, errors.New("cannot find node with that name") | |||
} | |||
} | |||
// RPCMonitor allows to dynamically add a endpoint to under the monitor. Safe | |||
// to call multiple times. | |||
func RPCMonitor(m *monitor.Monitor) func(*rpctypes.Context, string) (*monitor.Node, error) { | |||
return func(_ *rpctypes.Context, endpoint string) (*monitor.Node, error) { | |||
i, n := m.NodeByName(endpoint) | |||
if i == -1 { | |||
n = monitor.NewNode(endpoint) | |||
if err := m.Monitor(n); err != nil { | |||
return nil, err | |||
} | |||
} | |||
return n, nil | |||
} | |||
} | |||
// RPCUnmonitor removes the given endpoint from under the monitor. | |||
func RPCUnmonitor(m *monitor.Monitor) func(*rpctypes.Context, string) (bool, error) { | |||
return func(_ *rpctypes.Context, endpoint string) (bool, error) { | |||
if i, n := m.NodeByName(endpoint); i != -1 { | |||
m.Unmonitor(n) | |||
return true, nil | |||
} | |||
return false, errors.New("cannot find node with that name") | |||
} | |||
} | |||
// func (tn *TendermintNetwork) StartMeter(chainID, valID, eventID string) error { | |||
// tn.mtx.Lock() | |||
// defer tn.mtx.Unlock() | |||
// val, err := tn.getChainVal(chainID, valID) | |||
// if err != nil { | |||
// return err | |||
// } | |||
// return val.EventMeter().Subscribe(eventID, nil) | |||
// } | |||
// func (tn *TendermintNetwork) StopMeter(chainID, valID, eventID string) error { | |||
// tn.mtx.Lock() | |||
// defer tn.mtx.Unlock() | |||
// val, err := tn.getChainVal(chainID, valID) | |||
// if err != nil { | |||
// return err | |||
// } | |||
// return val.EventMeter().Unsubscribe(eventID) | |||
// } | |||
// func (tn *TendermintNetwork) GetMeter(chainID, valID, eventID string) (*eventmeter.EventMetric, error) { | |||
// tn.mtx.Lock() | |||
// defer tn.mtx.Unlock() | |||
// val, err := tn.getChainVal(chainID, valID) | |||
// if err != nil { | |||
// return nil, err | |||
// } | |||
// return val.EventMeter().GetMetric(eventID) | |||
// } | |||
//--> types | |||
type networkAndNodes struct { | |||
Network *monitor.Network `json:"network"` | |||
Nodes []*monitor.Node `json:"nodes"` | |||
} |
@ -1,105 +0,0 @@ | |||
package main | |||
import ( | |||
"fmt" | |||
"io" | |||
"os" | |||
"text/tabwriter" | |||
"time" | |||
monitor "github.com/tendermint/tendermint/tools/tm-monitor/monitor" | |||
) | |||
const ( | |||
// Default refresh rate - 200ms | |||
defaultRefreshRate = time.Millisecond * 200 | |||
) | |||
// Ton - table of nodes. | |||
// | |||
// It produces the unordered list of nodes and updates it periodically. | |||
// | |||
// Default output is stdout, but it could be changed. Note if you want for | |||
// refresh to work properly, output must support [ANSI escape | |||
// codes](http://en.wikipedia.org/wiki/ANSI_escape_code). | |||
// | |||
// Ton was inspired by [Linux top | |||
// program](https://en.wikipedia.org/wiki/Top_(software)) as the name suggests. | |||
type Ton struct { | |||
monitor *monitor.Monitor | |||
RefreshRate time.Duration | |||
Output io.Writer | |||
quit chan struct{} | |||
} | |||
func NewTon(m *monitor.Monitor) *Ton { | |||
return &Ton{ | |||
RefreshRate: defaultRefreshRate, | |||
Output: os.Stdout, | |||
quit: make(chan struct{}), | |||
monitor: m, | |||
} | |||
} | |||
func (o *Ton) Start() { | |||
clearScreen(o.Output) | |||
o.Print() | |||
go o.refresher() | |||
} | |||
func (o *Ton) Print() { | |||
moveCursor(o.Output, 1, 1) | |||
o.printHeader() | |||
fmt.Println() | |||
o.printTable() | |||
} | |||
func (o *Ton) Stop() { | |||
close(o.quit) | |||
} | |||
func (o *Ton) printHeader() { | |||
n := o.monitor.Network | |||
fmt.Fprintf(o.Output, "%v up %.2f%%\n", n.StartTime().Format(time.RFC1123Z), n.Uptime()) | |||
fmt.Println() | |||
fmt.Fprintf(o.Output, "Height: %d\n", n.Height) | |||
fmt.Fprintf(o.Output, "Avg block time: %.3f ms\n", n.AvgBlockTime) | |||
fmt.Fprintf(o.Output, "Avg tx throughput: %.3f per sec\n", n.AvgTxThroughput) | |||
fmt.Fprintf(o.Output, "Avg block latency: %.3f ms\n", n.AvgBlockLatency) | |||
fmt.Fprintf(o.Output, | |||
"Active nodes: %d/%d (health: %s) Validators: %d\n", | |||
n.NumNodesMonitoredOnline, | |||
n.NumNodesMonitored, | |||
n.GetHealthString(), | |||
n.NumValidators) | |||
} | |||
func (o *Ton) printTable() { | |||
w := tabwriter.NewWriter(o.Output, 0, 0, 5, ' ', 0) | |||
fmt.Fprintln(w, "NAME\tHEIGHT\tBLOCK LATENCY\tONLINE\tVALIDATOR\t") | |||
for _, n := range o.monitor.Nodes { | |||
fmt.Fprintln(w, fmt.Sprintf("%s\t%d\t%.3f ms\t%v\t%v\t", n.Name, n.Height, n.BlockLatency, n.Online, n.IsValidator)) | |||
} | |||
w.Flush() | |||
} | |||
// Internal loop for refreshing | |||
func (o *Ton) refresher() { | |||
for { | |||
select { | |||
case <-o.quit: | |||
return | |||
case <-time.After(o.RefreshRate): | |||
o.Print() | |||
} | |||
} | |||
} | |||
func clearScreen(w io.Writer) { | |||
fmt.Fprint(w, "\033[2J") | |||
} | |||
func moveCursor(w io.Writer, x int, y int) { | |||
fmt.Fprintf(w, "\033[%d;%dH", x, y) | |||
} |