diff --git a/tm-monitor/eventmeter/eventmeter.go b/tm-monitor/eventmeter/eventmeter.go new file mode 100644 index 000000000..5626ba134 --- /dev/null +++ b/tm-monitor/eventmeter/eventmeter.go @@ -0,0 +1,300 @@ +package eventmeter + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/gorilla/websocket" + metrics "github.com/rcrowley/go-metrics" + events "github.com/tendermint/go-events" + client "github.com/tendermint/go-rpc/client" + log15 "github.com/tendermint/log15" +) + +// Log allows you to set your own logger. +var Log log15.Logger + +//------------------------------------------------------ +// Generic system to subscribe to events and record their frequency +//------------------------------------------------------ + +//------------------------------------------------------ +// Meter for a particular event + +// Closure to enable side effects from receiving an event +type EventCallbackFunc func(em *EventMetric, data events.EventData) + +// Metrics for a given event +type EventMetric struct { + ID string `json:"id"` + Started time.Time `json:"start_time"` + LastHeard time.Time `json:"last_heard"` + MinDuration int64 `json:"min_duration"` + MaxDuration int64 `json:"max_duration"` + + // tracks event count and rate + meter metrics.Meter + + // filled in from the Meter + Count int64 `json:"count"` + Rate1 float64 `json:"rate_1" wire:"unsafe"` + Rate5 float64 `json:"rate_5" wire:"unsafe"` + Rate15 float64 `json:"rate_15" wire:"unsafe"` + RateMean float64 `json:"rate_mean" wire:"unsafe"` + + // so the event can have effects in the event-meter's consumer. + // runs in a go routine + callback EventCallbackFunc +} + +func (metric *EventMetric) Copy() *EventMetric { + metric2 := *metric + metric2.meter = metric.meter.Snapshot() + return &metric2 +} + +// called on GetMetric +func (metric *EventMetric) fillMetric() *EventMetric { + metric.Count = metric.meter.Count() + metric.Rate1 = metric.meter.Rate1() + metric.Rate5 = metric.meter.Rate5() + metric.Rate15 = metric.meter.Rate15() + metric.RateMean = metric.meter.RateMean() + return metric +} + +//------------------------------------------------------ +// Websocket client and event meter for many events + +const maxPingsPerPong = 30 // if we haven't received a pong in this many attempted pings we kill the conn + +// Get the eventID and data out of the raw json received over the go-rpc websocket +type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error) + +// Closure to enable side effects from receiving a pong +type LatencyCallbackFunc func(meanLatencyNanoSeconds float64) + +// Closure to notify consumer that the connection died +type DisconnectCallbackFunc func() + +// Each node gets an event meter to track events for that node +type EventMeter struct { + wsc *client.WSClient + + mtx sync.Mutex + events map[string]*EventMetric + + // to record ws latency + timer metrics.Timer + lastPing time.Time + receivedPong bool + latencyCallback LatencyCallbackFunc + disconnectCallback DisconnectCallbackFunc + + unmarshalEvent EventUnmarshalFunc + + quit chan struct{} +} + +func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { + em := &EventMeter{ + wsc: client.NewWSClient(addr, "/websocket"), + events: make(map[string]*EventMetric), + timer: metrics.NewTimer(), + receivedPong: true, + unmarshalEvent: unmarshalEvent, + quit: make(chan struct{}), + } + return em +} + +func (em *EventMeter) String() string { + return em.wsc.Address +} + +func (em *EventMeter) Start() error { + if _, err := em.wsc.Start(); err != nil { + return err + } + + em.wsc.Conn.SetPongHandler(func(m string) error { + // NOTE: https://github.com/gorilla/websocket/issues/97 + em.mtx.Lock() + defer em.mtx.Unlock() + em.receivedPong = true + em.timer.UpdateSince(em.lastPing) + if em.latencyCallback != nil { + go em.latencyCallback(em.timer.Mean()) + } + return nil + }) + go em.receiveRoutine() + return nil +} + +func (em *EventMeter) Stop() { + <-em.quit + + em.RegisterDisconnectCallback(nil) // so we don't try and reconnect + em.wsc.Stop() // close(wsc.Quit) +} + +func (em *EventMeter) StopAndReconnect() { + em.wsc.Stop() + + em.mtx.Lock() + defer em.mtx.Unlock() + if em.disconnectCallback != nil { + go em.disconnectCallback() + } +} + +func (em *EventMeter) Subscribe(eventID string, cb EventCallbackFunc) error { + em.mtx.Lock() + defer em.mtx.Unlock() + + if _, ok := em.events[eventID]; ok { + return fmt.Errorf("subscribtion already exists") + } + if err := em.wsc.Subscribe(eventID); err != nil { + return err + } + + metric := &EventMetric{ + ID: eventID, + Started: time.Now(), + MinDuration: 1 << 62, + meter: metrics.NewMeter(), + callback: cb, + } + em.events[eventID] = metric + return nil +} + +func (em *EventMeter) Unsubscribe(eventID string) error { + em.mtx.Lock() + defer em.mtx.Unlock() + if err := em.wsc.Unsubscribe(eventID); err != nil { + return err + } + // XXX: should we persist or save this info first? + delete(em.events, eventID) + return nil +} + +// Fill in the latest data for an event and return a copy +func (em *EventMeter) GetMetric(eventID string) (*EventMetric, error) { + em.mtx.Lock() + defer em.mtx.Unlock() + metric, ok := em.events[eventID] + if !ok { + return nil, fmt.Errorf("Unknown event %s", eventID) + } + return metric.fillMetric().Copy(), nil +} + +// Return the average latency over the websocket +func (em *EventMeter) Latency() float64 { + em.mtx.Lock() + defer em.mtx.Unlock() + return em.timer.Mean() +} + +func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) { + em.mtx.Lock() + defer em.mtx.Unlock() + em.latencyCallback = f +} + +func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) { + em.mtx.Lock() + defer em.mtx.Unlock() + em.disconnectCallback = f +} + +//------------------------------------------------------ + +func (em *EventMeter) receiveRoutine() { + pingTime := time.Second * 1 + pingTicker := time.NewTicker(pingTime) + pingAttempts := 0 // if this hits maxPingsPerPong we kill the conn + var err error + for { + select { + case <-pingTicker.C: + if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil { + Log.Error("Failed to write ping message on websocket", err) + em.StopAndReconnect() + return + } else if pingAttempts >= maxPingsPerPong { + Log.Error(fmt.Sprintf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime)) + em.StopAndReconnect() + return + } + case r := <-em.wsc.ResultsCh: + if r == nil { + em.StopAndReconnect() + return + } + eventID, data, err := em.unmarshalEvent(r) + if err != nil { + Log.Error(err.Error()) + continue + } + if eventID != "" { + em.updateMetric(eventID, data) + } + case <-em.wsc.Quit: + em.StopAndReconnect() + return + case <-em.quit: + return + } + } +} + +func (em *EventMeter) pingForLatency(pingAttempts int) (int, error) { + em.mtx.Lock() + defer em.mtx.Unlock() + + // ping to record latency + if !em.receivedPong { + return pingAttempts + 1, nil + } + + em.lastPing = time.Now() + em.receivedPong = false + err := em.wsc.Conn.WriteMessage(websocket.PingMessage, []byte{}) + if err != nil { + return pingAttempts, err + } + return 0, nil +} + +func (em *EventMeter) updateMetric(eventID string, data events.EventData) { + em.mtx.Lock() + defer em.mtx.Unlock() + + metric, ok := em.events[eventID] + if !ok { + // we already unsubscribed, or got an unexpected event + return + } + + last := metric.LastHeard + metric.LastHeard = time.Now() + metric.meter.Mark(1) + dur := int64(metric.LastHeard.Sub(last)) + if dur < metric.MinDuration { + metric.MinDuration = dur + } + if !last.IsZero() && dur > metric.MaxDuration { + metric.MaxDuration = dur + } + + if metric.callback != nil { + go metric.callback(metric.Copy(), data) + } +} diff --git a/tm-monitor/glide.lock b/tm-monitor/glide.lock index 3797f60dd..040bc1264 100644 --- a/tm-monitor/glide.lock +++ b/tm-monitor/glide.lock @@ -1,5 +1,5 @@ -hash: 9b2cdc3f80ccebf7154884c8466b1f2c0cb3e8b7f583baa732d49d671e83fe33 -updated: 2017-03-02T08:52:42.063901822Z +hash: 3315dcf12e2554e2927f2a0907f2547cd86d5c8926461d055662a7bee88caa4a +updated: 2017-03-07T08:38:45.613512657Z imports: - name: github.com/btcsuite/btcd version: 583684b21bfbde9b5fc4403916fd7c807feb0289 @@ -16,7 +16,7 @@ imports: - name: github.com/golang/snappy version: 553a641470496b2327abcac10b36396bd98e45c9 - name: github.com/gorilla/websocket - version: 4873052237e4eeda85cf50c071ef33836fe8e139 + version: 3ab3a8b8831546bd18fd182c20687ca853b2bb13 - name: github.com/jmhodges/levigo version: c42d9e0ca023e2198120196f842701bb4c55d7b9 - name: github.com/mattn/go-colorable @@ -62,8 +62,6 @@ imports: version: 4b11d62bdb324027ea01554e5767b71174680ba0 - name: github.com/tendermint/go-db version: 72f6dacd22a686cdf7fcd60286503e3aceda77ba -- name: github.com/tendermint/go-event-meter - version: c9240a51209b7afbfc9270faac841e3cb033a4d9 - name: github.com/tendermint/go-events version: fddee66d90305fccb6f6d84d16c34fa65ea5b7f6 - name: github.com/tendermint/go-flowrate diff --git a/tm-monitor/glide.yaml b/tm-monitor/glide.yaml index 4a7892230..e881af990 100644 --- a/tm-monitor/glide.yaml +++ b/tm-monitor/glide.yaml @@ -1,7 +1,6 @@ package: github.com/tendermint/tools/tm-monitor import: - package: github.com/tendermint/go-common -- package: github.com/tendermint/go-event-meter - package: github.com/tendermint/go-events - package: github.com/tendermint/go-logger - package: github.com/tendermint/tendermint @@ -12,3 +11,8 @@ import: - package: github.com/rcrowley/go-metrics - package: github.com/stretchr/testify - package: github.com/tendermint/go-crypto +- package: github.com/gorilla/websocket +- package: github.com/tendermint/go-rpc + subpackages: + - client +- package: github.com/tendermint/log15 diff --git a/tm-monitor/main.go b/tm-monitor/main.go index 51de0fa9a..a4bfae091 100644 --- a/tm-monitor/main.go +++ b/tm-monitor/main.go @@ -9,6 +9,7 @@ import ( cmn "github.com/tendermint/go-common" logger "github.com/tendermint/go-logger" log15 "github.com/tendermint/log15" + em "github.com/tendermint/tools/tm-monitor/eventmeter" ) var version = "0.2.0" @@ -98,6 +99,7 @@ func logToStdout(verbose bool) { logger.BypassHandler(), )) } + em.Log = log } func logToFile(filename string, verbose bool) { @@ -112,4 +114,5 @@ func logToFile(filename string, verbose bool) { log15.Must.FileHandler(filename, log15.LogfmtFormat()), )) } + em.Log = log } diff --git a/tm-monitor/mock/mock.go b/tm-monitor/mock/mock.go index 25b007e0a..e765dbe19 100644 --- a/tm-monitor/mock/mock.go +++ b/tm-monitor/mock/mock.go @@ -4,8 +4,8 @@ import ( "log" "reflect" - em "github.com/tendermint/go-event-meter" ctypes "github.com/tendermint/tendermint/rpc/core/types" + em "github.com/tendermint/tools/tm-monitor/eventmeter" ) type EventMeter struct { @@ -14,8 +14,8 @@ type EventMeter struct { eventCallback em.EventCallbackFunc } -func (e *EventMeter) Start() (bool, error) { return true, nil } -func (e *EventMeter) Stop() bool { return true } +func (e *EventMeter) Start() error { return nil } +func (e *EventMeter) Stop() {} func (e *EventMeter) RegisterLatencyCallback(cb em.LatencyCallbackFunc) { e.latencyCallback = cb } func (e *EventMeter) RegisterDisconnectCallback(cb em.DisconnectCallbackFunc) { e.disconnectCallback = cb diff --git a/tm-monitor/node.go b/tm-monitor/node.go index 3e668772e..15a119dea 100644 --- a/tm-monitor/node.go +++ b/tm-monitor/node.go @@ -6,14 +6,13 @@ import ( "math" "time" - em "github.com/tendermint/go-event-meter" + crypto "github.com/tendermint/go-crypto" events "github.com/tendermint/go-events" rpc_client "github.com/tendermint/go-rpc/client" - tmtypes "github.com/tendermint/tendermint/types" - - crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" ctypes "github.com/tendermint/tendermint/rpc/core/types" + tmtypes "github.com/tendermint/tendermint/types" + em "github.com/tendermint/tools/tm-monitor/eventmeter" ) // remove when https://github.com/tendermint/go-rpc/issues/8 will be fixed @@ -93,7 +92,7 @@ func (n *Node) NotifyAboutDisconnects(ch chan<- bool) { } func (n *Node) Start() error { - if _, err := n.em.Start(); err != nil { + if err := n.em.Start(); err != nil { return err } @@ -244,8 +243,8 @@ func (n *Node) getPubKey() (crypto.PubKey, error) { } type eventMeter interface { - Start() (bool, error) - Stop() bool + Start() error + Stop() RegisterLatencyCallback(em.LatencyCallbackFunc) RegisterDisconnectCallback(em.DisconnectCallbackFunc) Subscribe(string, em.EventCallbackFunc) error diff --git a/tm-monitor/node_test.go b/tm-monitor/node_test.go index 7ebbd04d1..15afe2717 100644 --- a/tm-monitor/node_test.go +++ b/tm-monitor/node_test.go @@ -7,10 +7,10 @@ import ( "github.com/stretchr/testify/require" crypto "github.com/tendermint/go-crypto" - em "github.com/tendermint/go-event-meter" ctypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" monitor "github.com/tendermint/tools/tm-monitor" + em "github.com/tendermint/tools/tm-monitor/eventmeter" mock "github.com/tendermint/tools/tm-monitor/mock" )