diff --git a/tm-monitor/eventmeter/eventmeter.go b/tm-monitor/eventmeter/eventmeter.go index 92e50ad8f..494273af4 100644 --- a/tm-monitor/eventmeter/eventmeter.go +++ b/tm-monitor/eventmeter/eventmeter.go @@ -65,7 +65,7 @@ func (metric *EventMetric) fillMetric() *EventMetric { // event. type EventCallbackFunc func(em *EventMetric, data interface{}) -// EventUnmarshalFunc is a closure to get the eventType and data out of the raw +// EventUnmarshalFunc is a closure to get the query and data out of the raw // JSON received over the RPC WebSocket. type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error) @@ -81,7 +81,7 @@ type EventMeter struct { wsc *client.WSClient mtx sync.Mutex - events map[string]*EventMetric + queryToMetricMap map[string]*EventMetric unmarshalEvent EventUnmarshalFunc latencyCallback LatencyCallbackFunc @@ -96,7 +96,7 @@ type EventMeter struct { func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { return &EventMeter{ wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)), - events: make(map[string]*EventMetric), + queryToMetricMap: make(map[string]*EventMetric), unmarshalEvent: unmarshalEvent, logger: log.NewNopLogger(), } @@ -115,7 +115,7 @@ func (em *EventMeter) String() string { // Start boots up event meter. func (em *EventMeter) Start() error { - if _, err := em.wsc.Start(); err != nil { + if err := em.wsc.Start(); err != nil { return err } @@ -140,16 +140,13 @@ func (em *EventMeter) Stop() { } } -// Subscribe for the given event type. Callback function will be called upon +// Subscribe for the given query. Callback function will be called upon // receiving an event. -func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error { +func (em *EventMeter) Subscribe(query string, cb EventCallbackFunc) error { em.mtx.Lock() defer em.mtx.Unlock() - if _, ok := em.events[eventType]; ok { - return fmt.Errorf("subscribtion already exists") - } - if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil { + if err := em.wsc.Subscribe(context.TODO(), query); err != nil { return err } @@ -157,29 +154,28 @@ func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error { meter: metrics.NewMeter(), callback: cb, } - em.events[eventType] = metric + em.queryToMetricMap[query] = metric return nil } -// Unsubscribe from the given event type. -func (em *EventMeter) Unsubscribe(eventType string) error { +// Unsubscribe from the given query. +func (em *EventMeter) Unsubscribe(query string) error { em.mtx.Lock() defer em.mtx.Unlock() - if err := em.wsc.Unsubscribe(context.TODO(), eventType); err != nil { + if err := em.wsc.Unsubscribe(context.TODO(), query); err != nil { return err } - // XXX: should we persist or save this info first? - delete(em.events, eventType) + return nil } -// GetMetric fills in the latest data for an event and return a copy. -func (em *EventMeter) GetMetric(eventType string) (*EventMetric, error) { +// GetMetric fills in the latest data for an query and return a copy. +func (em *EventMeter) GetMetric(query string) (*EventMetric, error) { em.mtx.Lock() defer em.mtx.Unlock() - metric, ok := em.events[eventType] + metric, ok := em.queryToMetricMap[query] if !ok { - return nil, fmt.Errorf("unknown event: %s", eventType) + return nil, fmt.Errorf("unknown query: %s", query) } return metric.fillMetric().Copy(), nil } @@ -202,8 +198,8 @@ func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) { // Private func (em *EventMeter) subscribe() error { - for eventType, _ := range em.events { - if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil { + for query, _ := range em.queryToMetricMap { + if err := em.wsc.Subscribe(context.TODO(), query); err != nil { return err } } @@ -219,19 +215,19 @@ func (em *EventMeter) receiveRoutine() { em.logger.Error("expected some event, got error", "err", resp.Error.Error()) continue } - eventType, data, err := em.unmarshalEvent(*resp.Result) + query, data, err := em.unmarshalEvent(resp.Result) if err != nil { em.logger.Error("failed to unmarshal event", "err", err) continue } - if eventType != "" { // FIXME how can it be an empty string? - em.updateMetric(eventType, data) + if query != "" { // FIXME how can it be an empty string? + em.updateMetric(query, data) } case <-latencyTicker.C: if em.wsc.IsActive() { em.callLatencyCallback(em.wsc.PingPongLatencyTimer.Mean()) } - case <-em.wsc.Quit: + case <-em.wsc.Quit(): return case <-em.quit: return @@ -251,7 +247,7 @@ func (em *EventMeter) disconnectRoutine() { em.subscribe() em.subscribed = true } - case <-em.wsc.Quit: + case <-em.wsc.Quit(): return case <-em.quit: return @@ -259,13 +255,13 @@ func (em *EventMeter) disconnectRoutine() { } } -func (em *EventMeter) updateMetric(eventType string, data events.EventData) { +func (em *EventMeter) updateMetric(query string, data events.EventData) { em.mtx.Lock() defer em.mtx.Unlock() - metric, ok := em.events[eventType] + metric, ok := em.queryToMetricMap[query] if !ok { - // we already unsubscribed, or got an unexpected event + // we already unsubscribed, or got an unexpected query return } diff --git a/tm-monitor/glide.lock b/tm-monitor/glide.lock index 98b9d1b63..6fcb26101 100644 --- a/tm-monitor/glide.lock +++ b/tm-monitor/glide.lock @@ -1,50 +1,75 @@ -hash: 156fcaac82d95af15aa920438cd12ab6ba1ac0ea5dfe8a5ca7eae94eeae625be -updated: 2017-12-06T18:01:20.739645218Z +hash: defdaf7b594915e7916df4f96d43363c2976ffcb2c35f0dd50b23ec1dd2551fb +updated: 2018-03-27T16:08:55.112172+08:00 imports: - name: github.com/btcsuite/btcd - version: 583684b21bfbde9b5fc4403916fd7c807feb0289 + version: 2e60448ffcc6bf78332d1fe590260095f554dd78 subpackages: - btcec +- name: github.com/ebuchman/fail-test + version: 95f809107225be108efcf10a3509e4ea6ceef3c4 - name: github.com/go-kit/kit - version: b6f30a2e0632f5722fb26d8765d726335b79d3e6 + version: ca4112baa34cb55091301bdc13b1420a122b1b9e subpackages: - log - log/level - log/term - name: github.com/go-logfmt/logfmt version: 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 -- name: github.com/go-playground/locales - version: e4cbcb5d0652150d40ad0646651076b6bd2be4f6 - subpackages: - - currency -- name: github.com/go-playground/universal-translator - version: 71201497bace774495daed26a3874fd339e0b538 - name: github.com/go-stack/stack - version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82 + version: 259ab82a6cad3992b4e21ff5cac294ccb06474bc +- name: github.com/gogo/protobuf + version: 1adfc126b41513cc696b209667c8656ea7aac67c + subpackages: + - gogoproto + - jsonpb + - proto + - protoc-gen-gogo/descriptor + - sortkeys + - types - name: github.com/golang/protobuf - version: 69b215d01a5606c843240eab4937eab3acee6530 + version: 925541529c1fa6821df4e44ce2723319eb2be768 subpackages: - proto - ptypes - ptypes/any - ptypes/duration - ptypes/timestamp +- name: github.com/golang/snappy + version: 553a641470496b2327abcac10b36396bd98e45c9 - name: github.com/gorilla/websocket version: ea4d1f681babbce9545c9c5f3d5194a789c89f5b +- name: github.com/jmhodges/levigo + version: c42d9e0ca023e2198120196f842701bb4c55d7b9 - name: github.com/kr/logfmt version: b84e30acd515aadc4b783ad4ff83aff3299bdfe0 - name: github.com/pkg/errors version: 645ef00459ed84a119197bfb8d8205042c6df63d - name: github.com/rcrowley/go-metrics - version: 1f30fe9094a513ce4c700b9a54458bbb0c96996c + version: e181e095bae94582363434144c61a9653aff6e50 +- name: github.com/syndtr/goleveldb + version: adf24ef3f94bd13ec4163060b21a5678f22b429b + subpackages: + - leveldb + - leveldb/cache + - leveldb/comparer + - leveldb/errors + - leveldb/filter + - leveldb/iterator + - leveldb/journal + - leveldb/memdb + - leveldb/opt + - leveldb/storage + - leveldb/table + - leveldb/util - name: github.com/tendermint/abci - version: 76ef8a0697c6179220a74c479b36c27a5b53008a + version: 68592f4d8ee34e97db94b7a7976b1309efdb7eb9 subpackages: - client + - example/code - example/dummy - types - name: github.com/tendermint/ed25519 - version: 1f52c6f8b8a5c7908aff4497c186af344b428925 + version: d8387025d2b9d158cf4efb07e7ebf814bcce2057 subpackages: - edwards25519 - extra25519 @@ -55,27 +80,33 @@ imports: subpackages: - data - name: github.com/tendermint/tendermint - version: c7f923c5b0d0f0f26566281aa251259d1bef3a6c + version: c8a2bdf78ba7aaaf4284fa78c1b9b05c5e7342bc subpackages: - config - consensus/types - p2p + - p2p/conn - p2p/upnp + - proxy - rpc/core/types - rpc/lib/client - rpc/lib/server - rpc/lib/types + - state - types - name: github.com/tendermint/tmlibs - version: b854baa1fce7101c90b1d301b3359bb412f981c0 + version: 1b9b5652a199ab0be2e781393fb275b66377309d subpackages: - common + - db - events - flowrate - log - merkle + - pubsub + - pubsub/query - name: golang.org/x/crypto - version: 453249f01cfeb54c3d549ddb75ff152ca243f9d8 + version: 94eea52f7b742c7cbe0b03b22f0c4c8631ece122 subpackages: - curve25519 - nacl/box @@ -86,7 +117,7 @@ imports: - ripemd160 - salsa20/salsa - name: golang.org/x/net - version: 906cda9512f77671ab44f8c8563b13a8e707b230 + version: 5ccada7d0a7ba9aeb5d3aca8d3501b4c2a509fec subpackages: - context - http2 @@ -95,12 +126,23 @@ imports: - internal/timeseries - lex/httplex - trace +- name: golang.org/x/text + version: 57961680700a5336d15015c8c50686ca5ba362a4 + subpackages: + - secure/bidirule + - transform + - unicode/bidi + - unicode/norm - name: google.golang.org/genproto - version: 7f0da29060c682909f650ad8ed4e515bd74fa12a + version: a8101f21cf983e773d0c1133ebc5424792003214 + repo: https://github.com/google/go-genproto + vcs: git subpackages: - googleapis/rpc/status - name: google.golang.org/grpc - version: f7bf885db0b7479a537ec317c6e48ce53145f3db + version: 401e0e00e4bb830a10496d64cd95e068c5bf50de + repo: https://github.com/grpc/grpc-go + vcs: git subpackages: - balancer - codes @@ -118,8 +160,6 @@ imports: - status - tap - transport -- name: gopkg.in/go-playground/validator.v9 - version: 61caf9d3038e1af346dbf5c2e16f6678e1548364 testImports: - name: github.com/davecgh/go-spew version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9 @@ -130,7 +170,7 @@ testImports: subpackages: - difflib - name: github.com/stretchr/testify - version: 4d4bfba8f1d1027c4fdbe371823030df51419987 + version: 2aa2c176b9dab406a6970f6a55f513e8a8c8b18f subpackages: - assert - require diff --git a/tm-monitor/glide.yaml b/tm-monitor/glide.yaml index c24ccad69..0e9a37441 100644 --- a/tm-monitor/glide.yaml +++ b/tm-monitor/glide.yaml @@ -5,18 +5,19 @@ import: - package: github.com/rcrowley/go-metrics - package: github.com/tendermint/go-crypto - package: github.com/tendermint/tendermint - version: v0.12.1 - subpackages: - - rpc/core/types - - rpc/lib/client - - rpc/lib/server - - types + version: v0.16.0 - package: github.com/tendermint/tmlibs - version: v0.4.1 + version: v0.7.0 subpackages: - common - events - log +- package: google.golang.org/grpc + repo: https://github.com/grpc/grpc-go + vcs: git +- package: google.golang.org/genproto + repo: https://github.com/google/go-genproto + vcs: git testImport: - package: github.com/stretchr/testify subpackages: diff --git a/tm-monitor/mock/eventmeter.go b/tm-monitor/mock/eventmeter.go index 949afe992..bab88e167 100644 --- a/tm-monitor/mock/eventmeter.go +++ b/tm-monitor/mock/eventmeter.go @@ -21,11 +21,11 @@ func (e *EventMeter) RegisterLatencyCallback(cb em.LatencyCallbackFunc) { e.late func (e *EventMeter) RegisterDisconnectCallback(cb em.DisconnectCallbackFunc) { e.disconnectCallback = cb } -func (e *EventMeter) Subscribe(eventID string, cb em.EventCallbackFunc) error { +func (e *EventMeter) Subscribe(query string, cb em.EventCallbackFunc) error { e.eventCallback = cb return nil } -func (e *EventMeter) Unsubscribe(eventID string) error { +func (e *EventMeter) Unsubscribe(query string) error { e.eventCallback = nil return nil } diff --git a/tm-monitor/monitor/node.go b/tm-monitor/monitor/node.go index 53f76f332..4b8feaaaf 100644 --- a/tm-monitor/monitor/node.go +++ b/tm-monitor/monitor/node.go @@ -101,7 +101,7 @@ func (n *Node) Start() error { } n.em.RegisterLatencyCallback(latencyCallback(n)) - err := n.em.Subscribe(tmtypes.EventStringNewBlockHeader(), newBlockCallback(n)) + err := n.em.Subscribe(tmtypes.EventQueryNewBlockHeader.String(), newBlockCallback(n)) if err != nil { return err } @@ -254,5 +254,5 @@ func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) { if err := json.Unmarshal(b, event); err != nil { return "", nil, err } - return event.Name, event.Data, nil + return event.Query, event.Data, nil }