diff --git a/docs/tendermint-core/running-in-production.md b/docs/tendermint-core/running-in-production.md index 6daa1f16e..1df370893 100644 --- a/docs/tendermint-core/running-in-production.md +++ b/docs/tendermint-core/running-in-production.md @@ -163,10 +163,6 @@ if something is wrong. Other useful endpoints include mentioned earlier `/status`, `/net_info` and `/validators`. -We have a small tool, called `tm-monitor`, which outputs information from -the endpoints above plus some statistics. The tool can be found -[here](https://github.com/tendermint/tendermint/tree/master/tools/tm-monitor). - Tendermint also can report and serve Prometheus metrics. See [Metrics](./metrics.md). diff --git a/docs/tools/monitoring.md b/docs/tools/monitoring.md deleted file mode 100644 index 0653a5639..000000000 --- a/docs/tools/monitoring.md +++ /dev/null @@ -1,92 +0,0 @@ -# tm-monitor - -Tendermint blockchain monitoring tool; watches over one or more nodes, -collecting and providing various statistics to the user: - -- [https://github.com/tendermint/tendermint/tree/master/tools/tm-monitor](https://github.com/tendermint/tendermint/tree/master/tools/tm-monitor) - -## Quick Start - -### Docker - -Assuming your application is running in another container with the name -`app`: - -``` -docker run -it --rm -v "/tmp:/tendermint" tendermint/tendermint init -docker run -it --rm -v "/tmp:/tendermint" -p "26657:26657" --name=tm --link=app tendermint/tendermint node --proxy_app=tcp://app:26658 - -docker run -it --rm -p "26670:26670" --link=tm tendermint/monitor tm:26657 -``` - -If you don't have an application yet, but still want to try monitor out, -use `kvstore`: - -``` -docker run -it --rm -v "/tmp:/tendermint" tendermint/tendermint init -docker run -it --rm -v "/tmp:/tendermint" -p "26657:26657" --name=tm tendermint/tendermint node --proxy_app=kvstore -``` - -``` -docker run -it --rm -p "26670:26670" --link=tm tendermint/monitor tm:26657 -``` - -### Using Binaries - -[Install Tendermint](../introduction/install.md). - -Start a Tendermint node: - -``` -tendermint init -tendermint node --proxy_app=kvstore -``` - -In another window, run the monitor: - -``` -tm-monitor localhost:26657 -``` - -## Usage - -``` -tm-monitor [-v] [-no-ton] [-listen-addr="tcp://0.0.0.0:26670"] [endpoints] - -Examples: - # monitor single instance - tm-monitor localhost:26657 - - # monitor a few instances by providing comma-separated list of RPC endpoints - tm-monitor host1:26657,host2:26657 -Flags: - -listen-addr string - HTTP and Websocket server listen address (default "tcp://0.0.0.0:26670") - -no-ton - Do not show ton (table of nodes) - -v verbose logging -``` - -### RPC UI - -Run `tm-monitor` and visit http://localhost:26670 You should see the -list of the available RPC endpoints: - -``` -http://localhost:26670/status -http://localhost:26670/status/network -http://localhost:26670/monitor?endpoint=_ -http://localhost:26670/status/node?name=_ -http://localhost:26670/unmonitor?endpoint=_ -``` - -The API is available as GET requests with URI encoded parameters, or as -JSONRPC POST requests. The JSONRPC methods are also exposed over -websocket. - -## Development - -``` -make tools -make test -``` diff --git a/rpc/lib/doc.go b/rpc/lib/doc.go index d22c2fc87..3e8314b80 100644 --- a/rpc/lib/doc.go +++ b/rpc/lib/doc.go @@ -82,5 +82,4 @@ // Examples // // - [Tendermint](https://github.com/tendermint/tendermint/blob/master/rpc/core/routes.go) -// - [tm-monitor](https://github.com/tendermint/tendermint/blob/master/tools/tm-monitor/rpc.go) package rpc diff --git a/tools.mk b/tools.mk index 3ad1f52dc..fcf3549e2 100644 --- a/tools.mk +++ b/tools.mk @@ -42,13 +42,10 @@ TOOLS_DESTDIR ?= $(GOPATH)/bin CERTSTRAP = $(TOOLS_DESTDIR)/certstrap PROTOBUF = $(TOOLS_DESTDIR)/protoc -GOX = $(TOOLS_DESTDIR)/gox GOODMAN = $(TOOLS_DESTDIR)/goodman all: tools -tools: certstrap protobuf gox goodman - check: check_tools check_tools: @@ -66,12 +63,6 @@ $(PROTOBUF): @echo "Get GoGo Protobuf" @go get github.com/gogo/protobuf/protoc-gen-gogo@v1.3.1 -# used to build tm-monitor binaries -gox: $(GOX) -$(GOX): - @echo "Get Gox" - @go get github.com/mitchellh/gox@v1.0.1 - goodman: $(GOODMAN) $(GOODMAN): @echo "Get Goodman" diff --git a/tools/tm-monitor/Dockerfile b/tools/tm-monitor/Dockerfile deleted file mode 100644 index 930fb639e..000000000 --- a/tools/tm-monitor/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM alpine:3.8 - -WORKDIR /app -COPY tm-monitor /app/tm-monitor - -ENTRYPOINT ["./tm-monitor"] diff --git a/tools/tm-monitor/Dockerfile.dev b/tools/tm-monitor/Dockerfile.dev deleted file mode 100644 index 347c7f0fb..000000000 --- a/tools/tm-monitor/Dockerfile.dev +++ /dev/null @@ -1,11 +0,0 @@ -FROM golang:latest - -RUN mkdir -p /go/src/github.com/tendermint/tools/tm-monitor -WORKDIR /go/src/github.com/tendermint/tools/tm-monitor - -COPY Makefile /go/src/github.com/tendermint/tools/tm-monitor/ - -RUN make tools - -COPY . /go/src/github.com/tendermint/tools/tm-monitor - diff --git a/tools/tm-monitor/Makefile b/tools/tm-monitor/Makefile deleted file mode 100644 index a71eb2642..000000000 --- a/tools/tm-monitor/Makefile +++ /dev/null @@ -1,49 +0,0 @@ -DIST_DIRS := find * -type d -exec -VERSION := $(shell perl -ne '/^TMCoreSemVer = "([^"]+)"$$/ && print "v$$1\n"' ../../version/version.go) - -all: build test install - -######################################## -### Build - -build: - @go build - -install: - @go install - -test: - @go test -race $(PACKAGES) - -build-all: - rm -rf ./dist - gox -verbose \ - -ldflags "-s -w" \ - -arch="amd64 386 arm arm64" \ - -os="linux darwin windows freebsd" \ - -osarch="!darwin/arm !darwin/arm64" \ - -output="dist/{{.OS}}-{{.Arch}}/{{.Dir}}" . - -dist: build-all - cd dist && \ - $(DIST_DIRS) cp ../LICENSE {} \; && \ - $(DIST_DIRS) tar -zcf tm-monitor-${VERSION}-{}.tar.gz {} \; && \ - shasum -a256 ./*.tar.gz > "./tm-monitor_${VERSION}_SHA256SUMS" && \ - cd .. - -######################################## -### Docker - -build-docker: - rm -f ./tm-monitor - docker run -it --rm -v "$(PWD)/../../:/go/src/github.com/tendermint/tendermint" -w "/go/src/github.com/tendermint/tendermint/tools/tm-monitor" -e "GO111MODULE=on" -e "CGO_ENABLED=0" golang:1.12 go build -ldflags "-s -w" -o tm-monitor - docker build -t "tendermint/monitor" . - -clean: - rm -f ./tm-monitor - rm -rf ./dist - -# To avoid unintended conflicts with file names, always add to .PHONY -# unless there is a reason not to. -# https://www.gnu.org/software/make/manual/html_node/Phony-Targets.html -.PHONY: build install test build-all dist build-docker clean diff --git a/tools/tm-monitor/README.md b/tools/tm-monitor/README.md deleted file mode 100644 index 1a8dfffc7..000000000 --- a/tools/tm-monitor/README.md +++ /dev/null @@ -1,91 +0,0 @@ -# tm-monitor - -Tendermint blockchain monitoring tool; watches over one or more nodes, -collecting and providing various statistics to the user: - -- [https://github.com/tendermint/tendermint/tree/master/tools/tm-monitor](https://github.com/tendermint/tendermint/tree/master/tools/tm-monitor) - -## Quick Start - -### Docker - -Assuming your application is running in another container with the name -`app`: - -``` -docker run -it --rm -v "/tmp:/tendermint" tendermint/tendermint init -docker run -it --rm -v "/tmp:/tendermint" -p "26657:26657" --name=tm --link=app tendermint/tendermint node --proxy_app=tcp://app:26658 - -docker run -it --rm -p "26670:26670" --link=tm tendermint/monitor tm:26657 -``` - -If you don't have an application yet, but still want to try monitor out, -use `kvstore`: - -``` -docker run -it --rm -v "/tmp:/tendermint" tendermint/tendermint init -docker run -it --rm -v "/tmp:/tendermint" -p "26657:26657" --name=tm tendermint/tendermint node --proxy_app=kvstore - -docker run -it --rm -p "26670:26670" --link=tm tendermint/monitor tm:26657 -``` - -### Using Binaries - -[Install Tendermint](https://github.com/tendermint/tendermint#install) - -then run: - -``` -tendermint init -tendermint node --proxy_app=kvstore - -tm-monitor localhost:26657 -``` - -with the last command being in a separate window. - -## Usage - -``` -Tendermint monitor watches over one or more Tendermint core -applications, collecting and providing various statistics to the user. - -Usage: - tm-monitor [-no-ton] [-listen-addr="tcp://0.0.0.0:26670"] [endpoints] - -Examples: - # monitor single instance - tm-monitor localhost:26657 - - # monitor a few instances by providing comma-separated list of RPC endpoints - tm-monitor host1:26657,host2:26657 -Flags: - -listen-addr string - HTTP and Websocket server listen address (default "tcp://0.0.0.0:26670") - -no-ton - Do not show ton (table of nodes) -``` - -### RPC UI - -Run `tm-monitor` and visit http://localhost:26670 You should see the -list of the available RPC endpoints: - -``` -http://localhost:26670/status -http://localhost:26670/status/network -http://localhost:26670/monitor?endpoint=_ -http://localhost:26670/status/node?name=_ -http://localhost:26670/unmonitor?endpoint=_ -``` - -The API is available as GET requests with URI encoded parameters, or as -JSONRPC POST requests. The JSONRPC methods are also exposed over -websocket. - -## Development - -``` -make tools -make test -``` diff --git a/tools/tm-monitor/codec.go b/tools/tm-monitor/codec.go deleted file mode 100644 index 071c363b0..000000000 --- a/tools/tm-monitor/codec.go +++ /dev/null @@ -1,12 +0,0 @@ -package main - -import ( - amino "github.com/tendermint/go-amino" - ctypes "github.com/tendermint/tendermint/rpc/core/types" -) - -var cdc = amino.NewCodec() - -func init() { - ctypes.RegisterAmino(cdc) -} diff --git a/tools/tm-monitor/eventmeter/eventmeter.go b/tools/tm-monitor/eventmeter/eventmeter.go deleted file mode 100644 index 63d58b96e..000000000 --- a/tools/tm-monitor/eventmeter/eventmeter.go +++ /dev/null @@ -1,296 +0,0 @@ -// eventmeter - generic system to subscribe to events and record their frequency. -package eventmeter - -import ( - "context" - "encoding/json" - "fmt" - "sync" - "time" - - metrics "github.com/rcrowley/go-metrics" - - "github.com/tendermint/tendermint/libs/events" - "github.com/tendermint/tendermint/libs/log" - client "github.com/tendermint/tendermint/rpc/lib/client" -) - -const ( - // Get ping/pong latency and call LatencyCallbackFunc with this period. - latencyPeriod = 1 * time.Second - - // Check if the WS client is connected every - connectionCheckPeriod = 100 * time.Millisecond -) - -// EventMetric exposes metrics for an event. -type EventMetric struct { - ID string `json:"id"` - Started time.Time `json:"start_time"` - LastHeard time.Time `json:"last_heard"` - MinDuration int64 `json:"min_duration"` - MaxDuration int64 `json:"max_duration"` - - // tracks event count and rate - meter metrics.Meter - - // filled in from the Meter - Count int64 `json:"count"` - Rate1 float64 `json:"rate_1" amino:"unsafe"` - Rate5 float64 `json:"rate_5" amino:"unsafe"` - Rate15 float64 `json:"rate_15" amino:"unsafe"` - RateMean float64 `json:"rate_mean" amino:"unsafe"` - - // so the event can have effects in the eventmeter's consumer. runs in a go - // routine. - callback EventCallbackFunc -} - -func (metric *EventMetric) Copy() *EventMetric { - metricCopy := *metric - metricCopy.meter = metric.meter.Snapshot() - return &metricCopy -} - -// called on GetMetric -func (metric *EventMetric) fillMetric() *EventMetric { - metric.Count = metric.meter.Count() - metric.Rate1 = metric.meter.Rate1() - metric.Rate5 = metric.meter.Rate5() - metric.Rate15 = metric.meter.Rate15() - metric.RateMean = metric.meter.RateMean() - return metric -} - -// EventCallbackFunc is a closure to enable side effects from receiving an -// event. -type EventCallbackFunc func(em *EventMetric, data interface{}) - -// EventUnmarshalFunc is a closure to get the query and data out of the raw -// JSON received over the RPC WebSocket. -type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error) - -// LatencyCallbackFunc is a closure to enable side effects from receiving a latency. -type LatencyCallbackFunc func(meanLatencyNanoSeconds float64) - -// DisconnectCallbackFunc is a closure to notify a consumer that the connection -// has died. -type DisconnectCallbackFunc func() - -// EventMeter tracks events, reports latency and disconnects. -type EventMeter struct { - wsc *client.WSClient - - mtx sync.Mutex - queryToMetricMap map[string]*EventMetric - - unmarshalEvent EventUnmarshalFunc - latencyCallback LatencyCallbackFunc - disconnectCallback DisconnectCallbackFunc - subscribed bool - - quit chan struct{} - - logger log.Logger -} - -func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { - return &EventMeter{ - wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)), - queryToMetricMap: make(map[string]*EventMetric), - unmarshalEvent: unmarshalEvent, - logger: log.NewNopLogger(), - } -} - -// SetLogger lets you set your own logger. -func (em *EventMeter) SetLogger(l log.Logger) { - em.logger = l - em.wsc.SetLogger(l.With("module", "rpcclient")) -} - -// String returns a string representation of event meter. -func (em *EventMeter) String() string { - return em.wsc.Address -} - -// Start boots up event meter. -func (em *EventMeter) Start() error { - if err := em.wsc.Start(); err != nil { - return err - } - - em.quit = make(chan struct{}) - go em.receiveRoutine() - go em.disconnectRoutine() - - err := em.subscribe() - if err != nil { - return err - } - em.subscribed = true - return nil -} - -// Stop stops event meter. -func (em *EventMeter) Stop() { - close(em.quit) - - if em.wsc.IsRunning() { - em.wsc.Stop() - } -} - -// Subscribe for the given query. Callback function will be called upon -// receiving an event. -func (em *EventMeter) Subscribe(query string, cb EventCallbackFunc) error { - em.mtx.Lock() - defer em.mtx.Unlock() - - if err := em.wsc.Subscribe(context.TODO(), query); err != nil { - return err - } - - metric := &EventMetric{ - meter: metrics.NewMeter(), - callback: cb, - } - em.queryToMetricMap[query] = metric - return nil -} - -// Unsubscribe from the given query. -func (em *EventMeter) Unsubscribe(query string) error { - em.mtx.Lock() - defer em.mtx.Unlock() - - return em.wsc.Unsubscribe(context.TODO(), query) -} - -// GetMetric fills in the latest data for an query and return a copy. -func (em *EventMeter) GetMetric(query string) (*EventMetric, error) { - em.mtx.Lock() - defer em.mtx.Unlock() - metric, ok := em.queryToMetricMap[query] - if !ok { - return nil, fmt.Errorf("unknown query: %s", query) - } - return metric.fillMetric().Copy(), nil -} - -// RegisterLatencyCallback allows you to set latency callback. -func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) { - em.mtx.Lock() - defer em.mtx.Unlock() - em.latencyCallback = f -} - -// RegisterDisconnectCallback allows you to set disconnect callback. -func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) { - em.mtx.Lock() - defer em.mtx.Unlock() - em.disconnectCallback = f -} - -/////////////////////////////////////////////////////////////////////////////// -// Private - -func (em *EventMeter) subscribe() error { - for query := range em.queryToMetricMap { - if err := em.wsc.Subscribe(context.TODO(), query); err != nil { - return err - } - } - return nil -} - -func (em *EventMeter) receiveRoutine() { - latencyTicker := time.NewTicker(latencyPeriod) - for { - select { - case resp := <-em.wsc.ResponsesCh: - if resp.Error != nil { - em.logger.Error("expected some event, got error", "err", resp.Error.Error()) - continue - } - query, data, err := em.unmarshalEvent(resp.Result) - if err != nil { - em.logger.Error("failed to unmarshal event", "err", err) - continue - } - if query != "" { // FIXME how can it be an empty string? - em.updateMetric(query, data) - } - case <-latencyTicker.C: - if em.wsc.IsActive() { - em.callLatencyCallback(em.wsc.PingPongLatencyTimer.Mean()) - } - case <-em.wsc.Quit(): - return - case <-em.quit: - return - } - } -} - -func (em *EventMeter) disconnectRoutine() { - ticker := time.NewTicker(connectionCheckPeriod) - for { - select { - case <-ticker.C: - if em.wsc.IsReconnecting() && em.subscribed { // notify user about disconnect only once - em.callDisconnectCallback() - em.subscribed = false - } else if !em.wsc.IsReconnecting() && !em.subscribed { // resubscribe - em.subscribe() - em.subscribed = true - } - case <-em.wsc.Quit(): - return - case <-em.quit: - return - } - } -} - -func (em *EventMeter) updateMetric(query string, data events.EventData) { - em.mtx.Lock() - defer em.mtx.Unlock() - - metric, ok := em.queryToMetricMap[query] - if !ok { - // we already unsubscribed, or got an unexpected query - return - } - - last := metric.LastHeard - metric.LastHeard = time.Now() - metric.meter.Mark(1) - dur := int64(metric.LastHeard.Sub(last)) - if dur < metric.MinDuration { - metric.MinDuration = dur - } - if !last.IsZero() && dur > metric.MaxDuration { - metric.MaxDuration = dur - } - - if metric.callback != nil { - go metric.callback(metric.Copy(), data) - } -} - -func (em *EventMeter) callDisconnectCallback() { - em.mtx.Lock() - if em.disconnectCallback != nil { - go em.disconnectCallback() - } - em.mtx.Unlock() -} - -func (em *EventMeter) callLatencyCallback(meanLatencyNanoSeconds float64) { - em.mtx.Lock() - if em.latencyCallback != nil { - go em.latencyCallback(meanLatencyNanoSeconds) - } - em.mtx.Unlock() -} diff --git a/tools/tm-monitor/main.go b/tools/tm-monitor/main.go deleted file mode 100644 index 7e4bffca2..000000000 --- a/tools/tm-monitor/main.go +++ /dev/null @@ -1,91 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "os" - "strings" - - "github.com/tendermint/tendermint/libs/log" - tmos "github.com/tendermint/tendermint/libs/os" - monitor "github.com/tendermint/tendermint/tools/tm-monitor/monitor" -) - -var logger = log.NewNopLogger() - -func main() { - var listenAddr string - var noton bool - - flag.StringVar(&listenAddr, "listen-addr", "tcp://0.0.0.0:26670", "HTTP and Websocket server listen address") - flag.BoolVar(¬on, "no-ton", false, "Do not show ton (table of nodes)") - - flag.Usage = func() { - fmt.Println(`Tendermint monitor watches over one or more Tendermint core -applications, collecting and providing various statistics to the user. - -Usage: - tm-monitor [-no-ton] [-listen-addr="tcp://0.0.0.0:26670"] [endpoints] - -Examples: - # monitor single instance - tm-monitor localhost:26657 - - # monitor a few instances by providing comma-separated list of RPC endpoints - tm-monitor host1:26657,host2:26657`) - fmt.Println("Flags:") - flag.PrintDefaults() - } - - flag.Parse() - - if flag.NArg() == 0 { - flag.Usage() - os.Exit(1) - } - - if noton { - logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)) - } - - monitor := startMonitor(flag.Arg(0)) - - listener := startRPC(listenAddr, monitor, logger) - - var ton *Ton - if !noton { - ton = NewTon(monitor) - ton.Start() - } - - // Stop upon receiving SIGTERM or CTRL-C. - tmos.TrapSignal(logger, func() { - if !noton { - ton.Stop() - } - monitor.Stop() - listener.Close() - }) - - // Run forever. - select {} -} - -func startMonitor(endpoints string) *monitor.Monitor { - m := monitor.NewMonitor() - m.SetLogger(logger.With("component", "monitor")) - - for _, e := range strings.Split(endpoints, ",") { - n := monitor.NewNode(e) - n.SetLogger(logger.With("node", e)) - if err := m.Monitor(n); err != nil { - panic(err) - } - } - - if err := m.Start(); err != nil { - panic(err) - } - - return m -} diff --git a/tools/tm-monitor/mock/eventmeter.go b/tools/tm-monitor/mock/eventmeter.go deleted file mode 100644 index e70137e6f..000000000 --- a/tools/tm-monitor/mock/eventmeter.go +++ /dev/null @@ -1,69 +0,0 @@ -package mock - -import ( - stdlog "log" - "reflect" - - amino "github.com/tendermint/go-amino" - "github.com/tendermint/tendermint/libs/log" - em "github.com/tendermint/tendermint/tools/tm-monitor/eventmeter" -) - -type EventMeter struct { - latencyCallback em.LatencyCallbackFunc - disconnectCallback em.DisconnectCallbackFunc - eventCallback em.EventCallbackFunc -} - -func (e *EventMeter) Start() error { return nil } -func (e *EventMeter) Stop() {} -func (e *EventMeter) SetLogger(l log.Logger) {} -func (e *EventMeter) RegisterLatencyCallback(cb em.LatencyCallbackFunc) { e.latencyCallback = cb } -func (e *EventMeter) RegisterDisconnectCallback(cb em.DisconnectCallbackFunc) { - e.disconnectCallback = cb -} -func (e *EventMeter) Subscribe(query string, cb em.EventCallbackFunc) error { - e.eventCallback = cb - return nil -} -func (e *EventMeter) Unsubscribe(query string) error { - e.eventCallback = nil - return nil -} - -func (e *EventMeter) Call(callback string, args ...interface{}) { - switch callback { - case "latencyCallback": - e.latencyCallback(args[0].(float64)) - case "disconnectCallback": - e.disconnectCallback() - case "eventCallback": - e.eventCallback(args[0].(*em.EventMetric), args[1]) - } -} - -type RPCClient struct { - Stubs map[string]interface{} - cdc *amino.Codec -} - -func (c *RPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { - s, ok := c.Stubs[method] - if !ok { - stdlog.Fatalf("Call to %s, but no stub is defined for it", method) - } - - rv, rt := reflect.ValueOf(result), reflect.TypeOf(result) - rv, _ = rv.Elem(), rt.Elem() - rv.Set(reflect.ValueOf(s)) - - return s, nil -} - -func (c *RPCClient) Codec() *amino.Codec { - return c.cdc -} - -func (c *RPCClient) SetCodec(cdc *amino.Codec) { - c.cdc = cdc -} diff --git a/tools/tm-monitor/monitor/codec.go b/tools/tm-monitor/monitor/codec.go deleted file mode 100644 index 696b02778..000000000 --- a/tools/tm-monitor/monitor/codec.go +++ /dev/null @@ -1,12 +0,0 @@ -package monitor - -import ( - amino "github.com/tendermint/go-amino" - ctypes "github.com/tendermint/tendermint/rpc/core/types" -) - -var cdc = amino.NewCodec() - -func init() { - ctypes.RegisterAmino(cdc) -} diff --git a/tools/tm-monitor/monitor/monitor.go b/tools/tm-monitor/monitor/monitor.go deleted file mode 100644 index 9aed914a8..000000000 --- a/tools/tm-monitor/monitor/monitor.go +++ /dev/null @@ -1,256 +0,0 @@ -package monitor - -import ( - "fmt" - "math/rand" - "sync" - "time" - - "github.com/pkg/errors" - "github.com/tendermint/tendermint/libs/log" - tmtypes "github.com/tendermint/tendermint/types" -) - -// waiting more than this many seconds for a block means we're unhealthy -const nodeLivenessTimeout = 5 * time.Second - -// Monitor keeps track of the nodes and updates common statistics upon -// receiving new events from nodes. -// -// Common statistics is stored in Network struct. -type Monitor struct { - mtx sync.Mutex - Nodes []*Node - - Network *Network - - monitorQuit chan struct{} // monitor exitting - nodeQuit map[string]chan struct{} // node is being stopped and removed from under the monitor - - recalculateNetworkUptimeEvery time.Duration - numValidatorsUpdateInterval time.Duration - - logger log.Logger -} - -// NewMonitor creates new instance of a Monitor. You can provide options to -// change some default values. -// -// Example: -// NewMonitor(monitor.SetNumValidatorsUpdateInterval(1 * time.Second)) -func NewMonitor(options ...func(*Monitor)) *Monitor { - m := &Monitor{ - Nodes: make([]*Node, 0), - Network: NewNetwork(), - monitorQuit: make(chan struct{}), - nodeQuit: make(map[string]chan struct{}), - recalculateNetworkUptimeEvery: 10 * time.Second, - numValidatorsUpdateInterval: 5 * time.Second, - logger: log.NewNopLogger(), - } - - for _, option := range options { - option(m) - } - - return m -} - -// RecalculateNetworkUptimeEvery lets you change network uptime update interval. -func RecalculateNetworkUptimeEvery(d time.Duration) func(m *Monitor) { - return func(m *Monitor) { - m.recalculateNetworkUptimeEvery = d - } -} - -// SetNumValidatorsUpdateInterval lets you change num validators update interval. -func SetNumValidatorsUpdateInterval(d time.Duration) func(m *Monitor) { - return func(m *Monitor) { - m.numValidatorsUpdateInterval = d - } -} - -// SetLogger lets you set your own logger -func (m *Monitor) SetLogger(l log.Logger) { - m.logger = l -} - -// Monitor begins to monitor the node `n`. The node will be started and added -// to the monitor. -func (m *Monitor) Monitor(n *Node) error { - m.mtx.Lock() - m.Nodes = append(m.Nodes, n) - m.mtx.Unlock() - - blockCh := make(chan *tmtypes.Block, 10) - n.SendBlocksTo(blockCh) - blockLatencyCh := make(chan float64, 10) - n.SendBlockLatenciesTo(blockLatencyCh) - disconnectCh := make(chan bool, 10) - n.NotifyAboutDisconnects(disconnectCh) - - if err := n.Start(); err != nil { - return err - } - - m.Network.NewNode(n.Name) - - m.nodeQuit[n.Name] = make(chan struct{}) - go m.listen(n.Name, blockCh, blockLatencyCh, disconnectCh, m.nodeQuit[n.Name]) - - return nil -} - -// Unmonitor stops monitoring node `n`. The node will be stopped and removed -// from the monitor. -func (m *Monitor) Unmonitor(n *Node) { - m.Network.NodeDeleted(n.Name) - - n.Stop() - close(m.nodeQuit[n.Name]) - delete(m.nodeQuit, n.Name) - i, _ := m.NodeByName(n.Name) - - m.mtx.Lock() - m.Nodes[i] = m.Nodes[len(m.Nodes)-1] - m.Nodes = m.Nodes[:len(m.Nodes)-1] - m.mtx.Unlock() -} - -// NodeByName returns the node and its index if such node exists within the -// monitor. Otherwise, -1 and nil are returned. -func (m *Monitor) NodeByName(name string) (index int, node *Node) { - m.mtx.Lock() - defer m.mtx.Unlock() - - for i, n := range m.Nodes { - if name == n.Name { - return i, n - } - } - return -1, nil -} - -// NodeIsOnline is called when connection to the node is restored. -// Must be safe to call multiple times. -func (m *Monitor) NodeIsOnline(name string) { - - _, node := m.NodeByName(name) - if nil != node { - if online, ok := m.Network.nodeStatusMap[name]; ok && online { - m.mtx.Lock() - node.Online = online - m.mtx.Unlock() - } - } - -} - -// Start starts the monitor's routines: recalculating network uptime and -// updating number of validators. -func (m *Monitor) Start() error { - go m.recalculateNetworkUptimeLoop() - go m.updateNumValidatorLoop() - - return nil -} - -// Stop stops the monitor's routines. -func (m *Monitor) Stop() { - close(m.monitorQuit) - - for _, n := range m.Nodes { - m.Unmonitor(n) - } -} - -// main loop where we listen for events from the node -func (m *Monitor) listen( - nodeName string, - blockCh <-chan *tmtypes.Block, - blockLatencyCh <-chan float64, - disconnectCh <-chan bool, - quit <-chan struct{}) { - logger := m.logger.With("node", nodeName) - - for { - select { - case <-quit: - return - case b := <-blockCh: - m.Network.NewBlock(b) - m.Network.NodeIsOnline(nodeName) - m.NodeIsOnline(nodeName) - case l := <-blockLatencyCh: - m.Network.NewBlockLatency(l) - m.Network.NodeIsOnline(nodeName) - m.NodeIsOnline(nodeName) - case disconnected := <-disconnectCh: - if disconnected { - m.Network.NodeIsDown(nodeName) - } else { - m.Network.NodeIsOnline(nodeName) - m.NodeIsOnline(nodeName) - } - case <-time.After(nodeLivenessTimeout): - logger.Info("event", fmt.Sprintf("node was not responding for %v", nodeLivenessTimeout)) - m.Network.NodeIsDown(nodeName) - } - } -} - -// recalculateNetworkUptimeLoop every N seconds. -func (m *Monitor) recalculateNetworkUptimeLoop() { - for { - select { - case <-m.monitorQuit: - return - case <-time.After(m.recalculateNetworkUptimeEvery): - m.Network.RecalculateUptime() - } - } -} - -// updateNumValidatorLoop sends a request to a random node once every N seconds, -// which in turn makes an RPC call to get the latest validators. -func (m *Monitor) updateNumValidatorLoop() { - rand.Seed(time.Now().Unix()) - - var height int64 - var num int - var err error - - for { - m.mtx.Lock() - nodesCount := len(m.Nodes) - m.mtx.Unlock() - if nodesCount == 0 { - time.Sleep(m.numValidatorsUpdateInterval) - continue - } - - randomNodeIndex := rand.Intn(nodesCount) - - select { - case <-m.monitorQuit: - return - case <-time.After(m.numValidatorsUpdateInterval): - i := 0 - - m.mtx.Lock() - for _, n := range m.Nodes { - if i == randomNodeIndex { - height, num, err = n.NumValidators() - if err != nil { - m.logger.Info("err", errors.Wrap(err, "update num validators failed")) - } - break - } - i++ - } - m.mtx.Unlock() - - m.Network.UpdateNumValidatorsForHeight(num, height) - } - } -} diff --git a/tools/tm-monitor/monitor/monitor_test.go b/tools/tm-monitor/monitor/monitor_test.go deleted file mode 100644 index e1318d146..000000000 --- a/tools/tm-monitor/monitor/monitor_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package monitor_test - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - amino "github.com/tendermint/go-amino" - "github.com/tendermint/tendermint/crypto/ed25519" - ctypes "github.com/tendermint/tendermint/rpc/core/types" - mock "github.com/tendermint/tendermint/tools/tm-monitor/mock" - monitor "github.com/tendermint/tendermint/tools/tm-monitor/monitor" - tmtypes "github.com/tendermint/tendermint/types" -) - -func TestMonitorUpdatesNumberOfValidators(t *testing.T) { - m := startMonitor(t) - defer m.Stop() - - n, _ := createValidatorNode(t) - m.Monitor(n) - assert.Equal(t, 1, m.Network.NumNodesMonitored) - assert.Equal(t, 1, m.Network.NumNodesMonitoredOnline) - - time.Sleep(1 * time.Second) - - // DATA RACE - // assert.Equal(t, 1, m.Network.NumValidators()) -} - -func TestMonitorRecalculatesNetworkUptime(t *testing.T) { - m := startMonitor(t) - defer m.Stop() - assert.Equal(t, 100.0, m.Network.Uptime()) - - n, _ := createValidatorNode(t) - m.Monitor(n) - - m.Network.NodeIsDown(n.Name) // simulate node failure - time.Sleep(200 * time.Millisecond) - m.Network.NodeIsOnline(n.Name) - time.Sleep(1 * time.Second) - - assert.True(t, m.Network.Uptime() < 100.0, "Uptime should be less than 100%") -} - -func startMonitor(t *testing.T) *monitor.Monitor { - m := monitor.NewMonitor( - monitor.SetNumValidatorsUpdateInterval(200*time.Millisecond), - monitor.RecalculateNetworkUptimeEvery(200*time.Millisecond), - ) - err := m.Start() - require.Nil(t, err) - return m -} - -func createValidatorNode(t *testing.T) (n *monitor.Node, emMock *mock.EventMeter) { - emMock = &mock.EventMeter{} - - stubs := make(map[string]interface{}) - pubKey := ed25519.GenPrivKey().PubKey() - stubs["validators"] = ctypes.ResultValidators{ - BlockHeight: blockHeight, - Validators: []*tmtypes.Validator{tmtypes.NewValidator(pubKey, 0)}, - } - stubs["status"] = ctypes.ResultStatus{ValidatorInfo: ctypes.ValidatorInfo{PubKey: pubKey}} - cdc := amino.NewCodec() - rpcClientMock := &mock.RPCClient{Stubs: stubs} - rpcClientMock.SetCodec(cdc) - - n = monitor.NewNodeWithEventMeterAndRPCClient("tcp://127.0.0.1:26657", emMock, rpcClientMock) - return -} diff --git a/tools/tm-monitor/monitor/network.go b/tools/tm-monitor/monitor/network.go deleted file mode 100644 index 0c51b793a..000000000 --- a/tools/tm-monitor/monitor/network.go +++ /dev/null @@ -1,209 +0,0 @@ -package monitor - -import ( - "sync" - "time" - - metrics "github.com/rcrowley/go-metrics" - tmtypes "github.com/tendermint/tendermint/types" -) - -// UptimeData stores data for how long network has been running. -type UptimeData struct { - StartTime time.Time `json:"start_time"` - Uptime float64 `json:"uptime" amino:"unsafe"` // percentage of time we've been healthy, ever - - totalDownTime time.Duration // total downtime (only updated when we come back online) - wentDown time.Time -} - -// Health describes the health of the network. Note that this applies only to -// the observed nodes, and not to the entire cluster, which may consist of -// thousands of machines. It may change in the future. -type Health int - -const ( - // FullHealth means all nodes online, synced, validators making blocks - FullHealth = Health(0) - // ModerateHealth means we're making blocks - ModerateHealth = Health(1) - // Dead means we're not making blocks due to all validators freezing or crashing - Dead = Health(2) -) - -// Common statistics for network of nodes -type Network struct { - Height int64 `json:"height"` - - AvgBlockTime float64 `json:"avg_block_time" amino:"unsafe"` // ms (avg over last minute) - blockTimeMeter metrics.Meter - AvgTxThroughput float64 `json:"avg_tx_throughput" amino:"unsafe"` // tx/s (avg over last minute) - txThroughputMeter metrics.Meter - AvgBlockLatency float64 `json:"avg_block_latency" amino:"unsafe"` // ms (avg over last minute) - blockLatencyMeter metrics.Meter - - NumValidators int `json:"num_validators"` - NumNodesMonitored int `json:"num_nodes_monitored"` - NumNodesMonitoredOnline int `json:"num_nodes_monitored_online"` - - Health Health `json:"health"` - - UptimeData *UptimeData `json:"uptime_data"` - - nodeStatusMap map[string]bool - - mu sync.Mutex -} - -func NewNetwork() *Network { - return &Network{ - blockTimeMeter: metrics.NewMeter(), - txThroughputMeter: metrics.NewMeter(), - blockLatencyMeter: metrics.NewMeter(), - Health: FullHealth, - UptimeData: &UptimeData{ - StartTime: time.Now(), - Uptime: 100.0, - }, - nodeStatusMap: make(map[string]bool), - } -} - -func (n *Network) NewBlock(b *tmtypes.Block) { - n.mu.Lock() - defer n.mu.Unlock() - - if n.Height >= b.Height { - return - } - - n.Height = b.Height - - n.blockTimeMeter.Mark(1) - if n.blockTimeMeter.Rate1() > 0.0 { - n.AvgBlockTime = (1.0 / n.blockTimeMeter.Rate1()) * 1000 // 1/s to ms - } else { - n.AvgBlockTime = 0.0 - } - n.txThroughputMeter.Mark(int64(len(b.Data.Txs))) - n.AvgTxThroughput = n.txThroughputMeter.Rate1() -} - -func (n *Network) NewBlockLatency(l float64) { - n.mu.Lock() - defer n.mu.Unlock() - - n.blockLatencyMeter.Mark(int64(l)) - n.AvgBlockLatency = n.blockLatencyMeter.Rate1() / 1000000.0 // ns to ms -} - -// RecalculateUptime calculates uptime on demand. -func (n *Network) RecalculateUptime() { - n.mu.Lock() - defer n.mu.Unlock() - - since := time.Since(n.UptimeData.StartTime) - uptime := since - n.UptimeData.totalDownTime - if n.Health != FullHealth { - uptime -= time.Since(n.UptimeData.wentDown) - } - n.UptimeData.Uptime = (float64(uptime) / float64(since)) * 100.0 -} - -// NodeIsDown is called when the node disconnects for whatever reason. -// Must be safe to call multiple times. -func (n *Network) NodeIsDown(name string) { - n.mu.Lock() - defer n.mu.Unlock() - - if online, ok := n.nodeStatusMap[name]; !ok || online { - n.nodeStatusMap[name] = false - n.NumNodesMonitoredOnline-- - n.UptimeData.wentDown = time.Now() - n.updateHealth() - } -} - -// NodeIsOnline is called when connection to the node is restored. -// Must be safe to call multiple times. -func (n *Network) NodeIsOnline(name string) { - n.mu.Lock() - defer n.mu.Unlock() - - if online, ok := n.nodeStatusMap[name]; ok && !online { - n.nodeStatusMap[name] = true - n.NumNodesMonitoredOnline++ - n.UptimeData.totalDownTime += time.Since(n.UptimeData.wentDown) - n.updateHealth() - } -} - -// NewNode is called when the new node is added to the monitor. -func (n *Network) NewNode(name string) { - n.mu.Lock() - defer n.mu.Unlock() - - n.NumNodesMonitored++ - n.NumNodesMonitoredOnline++ - n.updateHealth() -} - -// NodeDeleted is called when the node is deleted from under the monitor. -func (n *Network) NodeDeleted(name string) { - n.mu.Lock() - defer n.mu.Unlock() - - n.NumNodesMonitored-- - n.NumNodesMonitoredOnline-- - n.updateHealth() -} - -func (n *Network) updateHealth() { - // if we are connected to all validators, we're at full health - // TODO: make sure they're all at the same height (within a block) - // and all proposing (and possibly validating ) Alternatively, just - // check there hasn't been a new round in numValidators rounds - switch { - case n.NumValidators != 0 && n.NumNodesMonitoredOnline == n.NumValidators: - n.Health = FullHealth - case n.NumNodesMonitoredOnline > 0 && n.NumNodesMonitoredOnline <= n.NumNodesMonitored: - n.Health = ModerateHealth - default: - n.Health = Dead - } -} - -func (n *Network) UpdateNumValidatorsForHeight(num int, height int64) { - n.mu.Lock() - defer n.mu.Unlock() - - if n.Height <= height { - n.NumValidators = num - } - n.updateHealth() -} - -func (n *Network) GetHealthString() string { - switch n.Health { - case FullHealth: - return "full" - case ModerateHealth: - return "moderate" - case Dead: - return "dead" - default: - return "undefined" - } -} - -// Uptime returns network's uptime in percentages. -func (n *Network) Uptime() float64 { - n.mu.Lock() - defer n.mu.Unlock() - return n.UptimeData.Uptime -} - -// StartTime returns time we started monitoring. -func (n *Network) StartTime() time.Time { - return n.UptimeData.StartTime -} diff --git a/tools/tm-monitor/monitor/network_test.go b/tools/tm-monitor/monitor/network_test.go deleted file mode 100644 index afaf12de9..000000000 --- a/tools/tm-monitor/monitor/network_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package monitor_test - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - - monitor "github.com/tendermint/tendermint/tools/tm-monitor/monitor" - tmtypes "github.com/tendermint/tendermint/types" -) - -func TestNetworkNewBlock(t *testing.T) { - n := monitor.NewNetwork() - - n.NewBlock(&tmtypes.Block{ - Header: tmtypes.Header{Height: 5}, - }) - assert.Equal(t, int64(5), n.Height) - assert.Equal(t, 0.0, n.AvgBlockTime) - assert.Equal(t, 0.0, n.AvgTxThroughput) -} - -func TestNetworkNewBlockLatency(t *testing.T) { - n := monitor.NewNetwork() - - n.NewBlockLatency(9000000.0) // nanoseconds - assert.Equal(t, 0.0, n.AvgBlockLatency) -} - -func TestNetworkNodeIsDownThenOnline(t *testing.T) { - n := monitor.NewNetwork() - n.NewNode("test") - - n.NodeIsDown("test") - assert.Equal(t, 0, n.NumNodesMonitoredOnline) - assert.Equal(t, monitor.Dead, n.Health) - n.NodeIsDown("test") - assert.Equal(t, 0, n.NumNodesMonitoredOnline) - - n.NodeIsOnline("test") - assert.Equal(t, 1, n.NumNodesMonitoredOnline) - assert.Equal(t, monitor.ModerateHealth, n.Health) - n.NodeIsOnline("test") - assert.Equal(t, 1, n.NumNodesMonitoredOnline) -} - -func TestNetworkNewNode(t *testing.T) { - n := monitor.NewNetwork() - assert.Equal(t, 0, n.NumNodesMonitored) - assert.Equal(t, 0, n.NumNodesMonitoredOnline) - n.NewNode("test") - assert.Equal(t, 1, n.NumNodesMonitored) - assert.Equal(t, 1, n.NumNodesMonitoredOnline) -} - -func TestNetworkNodeDeleted(t *testing.T) { - n := monitor.NewNetwork() - n.NewNode("test") - n.NodeDeleted("test") - assert.Equal(t, 0, n.NumNodesMonitored) - assert.Equal(t, 0, n.NumNodesMonitoredOnline) -} - -func TestNetworkGetHealthString(t *testing.T) { - n := monitor.NewNetwork() - assert.Equal(t, "full", n.GetHealthString()) - n.Health = monitor.ModerateHealth - assert.Equal(t, "moderate", n.GetHealthString()) - n.Health = monitor.Dead - assert.Equal(t, "dead", n.GetHealthString()) -} - -func TestNetworkUptime(t *testing.T) { - n := monitor.NewNetwork() - assert.Equal(t, 100.0, n.Uptime()) -} - -func TestNetworkStartTime(t *testing.T) { - n := monitor.NewNetwork() - assert.True(t, n.StartTime().Before(time.Now())) -} diff --git a/tools/tm-monitor/monitor/node.go b/tools/tm-monitor/monitor/node.go deleted file mode 100644 index 7fd4ffe7f..000000000 --- a/tools/tm-monitor/monitor/node.go +++ /dev/null @@ -1,265 +0,0 @@ -package monitor - -import ( - "encoding/json" - "math" - "time" - - "github.com/pkg/errors" - - "github.com/tendermint/tendermint/crypto" - "github.com/tendermint/tendermint/libs/events" - "github.com/tendermint/tendermint/libs/log" - ctypes "github.com/tendermint/tendermint/rpc/core/types" - rpc_client "github.com/tendermint/tendermint/rpc/lib/client" - em "github.com/tendermint/tendermint/tools/tm-monitor/eventmeter" - tmtypes "github.com/tendermint/tendermint/types" -) - -const maxRestarts = 25 - -type Node struct { - IsValidator bool `json:"is_validator"` // validator or non-validator? - Online bool `json:"online"` - Height int64 `json:"height"` - rpcAddr string - Name string `json:"name"` - - pubKey crypto.PubKey - - BlockLatency float64 `json:"block_latency" amino:"unsafe"` // ms, interval between block commits - - // em holds the ws connection. Each eventMeter callback is called in a separate go-routine. - em eventMeter - - // rpcClient is an client for making RPC calls to TM - rpcClient rpc_client.HTTPClient - - blockCh chan<- *tmtypes.Block - blockLatencyCh chan<- float64 - disconnectCh chan<- bool - - checkIsValidatorInterval time.Duration - - quit chan struct{} - - logger log.Logger -} - -func NewNode(rpcAddr string, options ...func(*Node)) *Node { - em := em.NewEventMeter(rpcAddr, UnmarshalEvent) - rpcClient := rpc_client.NewURIClient(rpcAddr) // HTTP client by default - rpcClient.SetCodec(cdc) - return NewNodeWithEventMeterAndRPCClient(rpcAddr, em, rpcClient, options...) -} - -func NewNodeWithEventMeterAndRPCClient( - rpcAddr string, - em eventMeter, - rpcClient rpc_client.HTTPClient, - options ...func(*Node), -) *Node { - n := &Node{ - rpcAddr: rpcAddr, - em: em, - rpcClient: rpcClient, - Name: rpcAddr, - quit: make(chan struct{}), - checkIsValidatorInterval: 5 * time.Second, - logger: log.NewNopLogger(), - } - - for _, option := range options { - option(n) - } - - return n -} - -// SetCheckIsValidatorInterval lets you change interval for checking whenever -// node is still a validator or not. -func SetCheckIsValidatorInterval(d time.Duration) func(n *Node) { - return func(n *Node) { - n.checkIsValidatorInterval = d - } -} - -func (n *Node) SendBlocksTo(ch chan<- *tmtypes.Block) { - n.blockCh = ch -} - -func (n *Node) SendBlockLatenciesTo(ch chan<- float64) { - n.blockLatencyCh = ch -} - -func (n *Node) NotifyAboutDisconnects(ch chan<- bool) { - n.disconnectCh = ch -} - -// SetLogger lets you set your own logger -func (n *Node) SetLogger(l log.Logger) { - n.logger = l - n.em.SetLogger(l) -} - -func (n *Node) Start() error { - if err := n.em.Start(); err != nil { - return err - } - - n.em.RegisterLatencyCallback(latencyCallback(n)) - err := n.em.Subscribe(tmtypes.EventQueryNewBlock.String(), newBlockCallback(n)) - if err != nil { - return err - } - n.em.RegisterDisconnectCallback(disconnectCallback(n)) - - n.Online = true - - n.checkIsValidator() - go n.checkIsValidatorLoop() - - return nil -} - -func (n *Node) Stop() { - n.Online = false - - n.em.Stop() - - close(n.quit) -} - -// implements eventmeter.EventCallbackFunc -func newBlockCallback(n *Node) em.EventCallbackFunc { - return func(metric *em.EventMetric, data interface{}) { - block := data.(tmtypes.TMEventData).(tmtypes.EventDataNewBlock).Block - - n.Height = block.Height - n.logger.Info("new block", "height", block.Height) - - if n.blockCh != nil { - n.blockCh <- block - } - } -} - -// implements eventmeter.EventLatencyFunc -func latencyCallback(n *Node) em.LatencyCallbackFunc { - return func(latency float64) { - n.BlockLatency = latency / 1000000.0 // ns to ms - n.logger.Info("new block latency", "latency", n.BlockLatency) - - if n.blockLatencyCh != nil { - n.blockLatencyCh <- latency - } - } -} - -// implements eventmeter.DisconnectCallbackFunc -func disconnectCallback(n *Node) em.DisconnectCallbackFunc { - return func() { - n.Online = false - n.logger.Info("status", "down") - - if n.disconnectCh != nil { - n.disconnectCh <- true - } - } -} - -func (n *Node) RestartEventMeterBackoff() error { - attempt := 0 - - for { - d := time.Duration(math.Exp2(float64(attempt))) - time.Sleep(d * time.Second) - - if err := n.em.Start(); err != nil { - n.logger.Info("restart failed", "err", err) - } else { - // TODO: authenticate pubkey - return nil - } - - attempt++ - - if attempt > maxRestarts { - return errors.New("reached max restarts") - } - } -} - -func (n *Node) NumValidators() (height int64, num int, err error) { - height, vals, err := n.validators() - if err != nil { - return 0, 0, err - } - return height, len(vals), nil -} - -func (n *Node) validators() (height int64, validators []*tmtypes.Validator, err error) { - vals := new(ctypes.ResultValidators) - if _, err = n.rpcClient.Call("validators", nil, vals); err != nil { - return 0, make([]*tmtypes.Validator, 0), err - } - return vals.BlockHeight, vals.Validators, nil -} - -func (n *Node) checkIsValidatorLoop() { - for { - select { - case <-n.quit: - return - case <-time.After(n.checkIsValidatorInterval): - n.checkIsValidator() - } - } -} - -func (n *Node) checkIsValidator() { - _, validators, err := n.validators() - if err == nil { - for _, v := range validators { - key, err1 := n.getPubKey() - if err1 == nil && v.PubKey.Equals(key) { - n.IsValidator = true - } - } - } else { - n.logger.Info("check is validator failed", "err", err) - } -} - -func (n *Node) getPubKey() (crypto.PubKey, error) { - if n.pubKey != nil { - return n.pubKey, nil - } - - status := new(ctypes.ResultStatus) - _, err := n.rpcClient.Call("status", nil, status) - if err != nil { - return nil, err - } - n.pubKey = status.ValidatorInfo.PubKey - return n.pubKey, nil -} - -type eventMeter interface { - Start() error - Stop() - RegisterLatencyCallback(em.LatencyCallbackFunc) - RegisterDisconnectCallback(em.DisconnectCallbackFunc) - Subscribe(string, em.EventCallbackFunc) error - Unsubscribe(string) error - SetLogger(l log.Logger) -} - -// UnmarshalEvent unmarshals a json event -func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) { - event := new(ctypes.ResultEvent) - if err := cdc.UnmarshalJSON(b, event); err != nil { - return "", nil, err - } - return event.Query, event.Data, nil -} diff --git a/tools/tm-monitor/monitor/node_test.go b/tools/tm-monitor/monitor/node_test.go deleted file mode 100644 index 91e4da87a..000000000 --- a/tools/tm-monitor/monitor/node_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package monitor_test - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - amino "github.com/tendermint/go-amino" - "github.com/tendermint/tendermint/crypto/ed25519" - ctypes "github.com/tendermint/tendermint/rpc/core/types" - em "github.com/tendermint/tendermint/tools/tm-monitor/eventmeter" - mock "github.com/tendermint/tendermint/tools/tm-monitor/mock" - monitor "github.com/tendermint/tendermint/tools/tm-monitor/monitor" - tmtypes "github.com/tendermint/tendermint/types" -) - -const ( - blockHeight = int64(1) -) - -func TestNodeStartStop(t *testing.T) { - n, _ := startValidatorNode(t) - defer n.Stop() - - assert.Equal(t, true, n.Online) - assert.Equal(t, true, n.IsValidator) -} - -func TestNodeNewBlockReceived(t *testing.T) { - blockCh := make(chan *tmtypes.Block, 100) - n, emMock := startValidatorNode(t) - defer n.Stop() - n.SendBlocksTo(blockCh) - - block := &tmtypes.Block{Header: tmtypes.Header{Height: 5}} - emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlock{Block: block}) - - assert.Equal(t, int64(5), n.Height) - assert.Equal(t, block, <-blockCh) -} - -func TestNodeNewBlockLatencyReceived(t *testing.T) { - blockLatencyCh := make(chan float64, 100) - n, emMock := startValidatorNode(t) - defer n.Stop() - n.SendBlockLatenciesTo(blockLatencyCh) - - emMock.Call("latencyCallback", 1000000.0) - - assert.Equal(t, 1.0, n.BlockLatency) - assert.Equal(t, 1000000.0, <-blockLatencyCh) -} - -func TestNodeConnectionLost(t *testing.T) { - disconnectCh := make(chan bool, 100) - n, emMock := startValidatorNode(t) - defer n.Stop() - n.NotifyAboutDisconnects(disconnectCh) - - emMock.Call("disconnectCallback") - - assert.Equal(t, true, <-disconnectCh) - assert.Equal(t, false, n.Online) -} - -func TestNumValidators(t *testing.T) { - n, _ := startValidatorNode(t) - defer n.Stop() - - height, num, err := n.NumValidators() - assert.Nil(t, err) - assert.Equal(t, blockHeight, height) - assert.Equal(t, 1, num) -} - -func startValidatorNode(t *testing.T) (n *monitor.Node, emMock *mock.EventMeter) { - emMock = &mock.EventMeter{} - - stubs := make(map[string]interface{}) - pubKey := ed25519.GenPrivKey().PubKey() - stubs["validators"] = ctypes.ResultValidators{ - BlockHeight: blockHeight, - Validators: []*tmtypes.Validator{tmtypes.NewValidator(pubKey, 0)}, - } - stubs["status"] = ctypes.ResultStatus{ValidatorInfo: ctypes.ValidatorInfo{PubKey: pubKey}} - cdc := amino.NewCodec() - rpcClientMock := &mock.RPCClient{Stubs: stubs} - rpcClientMock.SetCodec(cdc) - - n = monitor.NewNodeWithEventMeterAndRPCClient("tcp://127.0.0.1:26657", emMock, rpcClientMock) - - err := n.Start() - require.Nil(t, err) - return -} diff --git a/tools/tm-monitor/rpc.go b/tools/tm-monitor/rpc.go deleted file mode 100644 index 42cc23075..000000000 --- a/tools/tm-monitor/rpc.go +++ /dev/null @@ -1,131 +0,0 @@ -package main - -import ( - "errors" - "net" - "net/http" - - rpctypes "github.com/tendermint/tendermint/rpc/lib/types" - - "github.com/tendermint/tendermint/libs/log" - rpc "github.com/tendermint/tendermint/rpc/lib/server" - "github.com/tendermint/tendermint/tools/tm-monitor/monitor" -) - -func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) net.Listener { - routes := routes(m) - - mux := http.NewServeMux() - wm := rpc.NewWebsocketManager(routes, nil) - mux.HandleFunc("/websocket", wm.WebsocketHandler) - rpc.RegisterRPCFuncs(mux, routes, cdc, logger) - config := rpc.DefaultConfig() - listener, err := rpc.Listen(listenAddr, config) - if err != nil { - panic(err) - } - go rpc.StartHTTPServer(listener, mux, logger, config) - return listener -} - -func routes(m *monitor.Monitor) map[string]*rpc.RPCFunc { - return map[string]*rpc.RPCFunc{ - "status": rpc.NewRPCFunc(RPCStatus(m), ""), - "status/network": rpc.NewRPCFunc(RPCNetworkStatus(m), ""), - "status/node": rpc.NewRPCFunc(RPCNodeStatus(m), "name"), - "monitor": rpc.NewRPCFunc(RPCMonitor(m), "endpoint"), - "unmonitor": rpc.NewRPCFunc(RPCUnmonitor(m), "endpoint"), - - // "start_meter": rpc.NewRPCFunc(network.StartMeter, "chainID,valID,event"), - // "stop_meter": rpc.NewRPCFunc(network.StopMeter, "chainID,valID,event"), - // "meter": rpc.NewRPCFunc(GetMeterResult(network), "chainID,valID,event"), - } -} - -// RPCStatus returns common statistics for the network and statistics per node. -func RPCStatus(m *monitor.Monitor) func(*rpctypes.Context) (networkAndNodes, error) { - return func(_ *rpctypes.Context) (networkAndNodes, error) { - return networkAndNodes{m.Network, m.Nodes}, nil - } -} - -// RPCNetworkStatus returns common statistics for the network. -func RPCNetworkStatus(m *monitor.Monitor) func(*rpctypes.Context) (*monitor.Network, error) { - return func(_ *rpctypes.Context) (*monitor.Network, error) { - return m.Network, nil - } -} - -// RPCNodeStatus returns statistics for the given node. -func RPCNodeStatus(m *monitor.Monitor) func(*rpctypes.Context, string) (*monitor.Node, error) { - return func(_ *rpctypes.Context, name string) (*monitor.Node, error) { - if i, n := m.NodeByName(name); i != -1 { - return n, nil - } - return nil, errors.New("cannot find node with that name") - } -} - -// RPCMonitor allows to dynamically add a endpoint to under the monitor. Safe -// to call multiple times. -func RPCMonitor(m *monitor.Monitor) func(*rpctypes.Context, string) (*monitor.Node, error) { - return func(_ *rpctypes.Context, endpoint string) (*monitor.Node, error) { - i, n := m.NodeByName(endpoint) - if i == -1 { - n = monitor.NewNode(endpoint) - if err := m.Monitor(n); err != nil { - return nil, err - } - } - return n, nil - } -} - -// RPCUnmonitor removes the given endpoint from under the monitor. -func RPCUnmonitor(m *monitor.Monitor) func(*rpctypes.Context, string) (bool, error) { - return func(_ *rpctypes.Context, endpoint string) (bool, error) { - if i, n := m.NodeByName(endpoint); i != -1 { - m.Unmonitor(n) - return true, nil - } - return false, errors.New("cannot find node with that name") - } -} - -// func (tn *TendermintNetwork) StartMeter(chainID, valID, eventID string) error { -// tn.mtx.Lock() -// defer tn.mtx.Unlock() -// val, err := tn.getChainVal(chainID, valID) -// if err != nil { -// return err -// } -// return val.EventMeter().Subscribe(eventID, nil) -// } - -// func (tn *TendermintNetwork) StopMeter(chainID, valID, eventID string) error { -// tn.mtx.Lock() -// defer tn.mtx.Unlock() -// val, err := tn.getChainVal(chainID, valID) -// if err != nil { -// return err -// } -// return val.EventMeter().Unsubscribe(eventID) -// } - -// func (tn *TendermintNetwork) GetMeter(chainID, valID, eventID string) (*eventmeter.EventMetric, error) { -// tn.mtx.Lock() -// defer tn.mtx.Unlock() -// val, err := tn.getChainVal(chainID, valID) -// if err != nil { -// return nil, err -// } - -// return val.EventMeter().GetMetric(eventID) -// } - -//--> types - -type networkAndNodes struct { - Network *monitor.Network `json:"network"` - Nodes []*monitor.Node `json:"nodes"` -} diff --git a/tools/tm-monitor/ton.go b/tools/tm-monitor/ton.go deleted file mode 100644 index 18353836e..000000000 --- a/tools/tm-monitor/ton.go +++ /dev/null @@ -1,105 +0,0 @@ -package main - -import ( - "fmt" - "io" - "os" - "text/tabwriter" - "time" - - monitor "github.com/tendermint/tendermint/tools/tm-monitor/monitor" -) - -const ( - // Default refresh rate - 200ms - defaultRefreshRate = time.Millisecond * 200 -) - -// Ton - table of nodes. -// -// It produces the unordered list of nodes and updates it periodically. -// -// Default output is stdout, but it could be changed. Note if you want for -// refresh to work properly, output must support [ANSI escape -// codes](http://en.wikipedia.org/wiki/ANSI_escape_code). -// -// Ton was inspired by [Linux top -// program](https://en.wikipedia.org/wiki/Top_(software)) as the name suggests. -type Ton struct { - monitor *monitor.Monitor - - RefreshRate time.Duration - Output io.Writer - quit chan struct{} -} - -func NewTon(m *monitor.Monitor) *Ton { - return &Ton{ - RefreshRate: defaultRefreshRate, - Output: os.Stdout, - quit: make(chan struct{}), - monitor: m, - } -} - -func (o *Ton) Start() { - clearScreen(o.Output) - o.Print() - go o.refresher() -} - -func (o *Ton) Print() { - moveCursor(o.Output, 1, 1) - o.printHeader() - fmt.Println() - o.printTable() -} - -func (o *Ton) Stop() { - close(o.quit) -} - -func (o *Ton) printHeader() { - n := o.monitor.Network - fmt.Fprintf(o.Output, "%v up %.2f%%\n", n.StartTime().Format(time.RFC1123Z), n.Uptime()) - fmt.Println() - fmt.Fprintf(o.Output, "Height: %d\n", n.Height) - fmt.Fprintf(o.Output, "Avg block time: %.3f ms\n", n.AvgBlockTime) - fmt.Fprintf(o.Output, "Avg tx throughput: %.3f per sec\n", n.AvgTxThroughput) - fmt.Fprintf(o.Output, "Avg block latency: %.3f ms\n", n.AvgBlockLatency) - fmt.Fprintf(o.Output, - "Active nodes: %d/%d (health: %s) Validators: %d\n", - n.NumNodesMonitoredOnline, - n.NumNodesMonitored, - n.GetHealthString(), - n.NumValidators) -} - -func (o *Ton) printTable() { - w := tabwriter.NewWriter(o.Output, 0, 0, 5, ' ', 0) - fmt.Fprintln(w, "NAME\tHEIGHT\tBLOCK LATENCY\tONLINE\tVALIDATOR\t") - for _, n := range o.monitor.Nodes { - fmt.Fprintln(w, fmt.Sprintf("%s\t%d\t%.3f ms\t%v\t%v\t", n.Name, n.Height, n.BlockLatency, n.Online, n.IsValidator)) - } - w.Flush() -} - -// Internal loop for refreshing -func (o *Ton) refresher() { - for { - select { - case <-o.quit: - return - case <-time.After(o.RefreshRate): - o.Print() - } - } -} - -func clearScreen(w io.Writer) { - fmt.Fprint(w, "\033[2J") -} - -func moveCursor(w io.Writer, x int, y int) { - fmt.Fprintf(w, "\033[%d;%dH", x, y) -}