diff --git a/tm-monitor/eventmeter/eventmeter.go b/tm-monitor/eventmeter/eventmeter.go index 5626ba134..314e039d3 100644 --- a/tm-monitor/eventmeter/eventmeter.go +++ b/tm-monitor/eventmeter/eventmeter.go @@ -6,16 +6,14 @@ import ( "sync" "time" + "github.com/go-kit/kit/log" "github.com/gorilla/websocket" + "github.com/pkg/errors" metrics "github.com/rcrowley/go-metrics" events "github.com/tendermint/go-events" client "github.com/tendermint/go-rpc/client" - log15 "github.com/tendermint/log15" ) -// Log allows you to set your own logger. -var Log log15.Logger - //------------------------------------------------------ // Generic system to subscribe to events and record their frequency //------------------------------------------------------ @@ -96,6 +94,8 @@ type EventMeter struct { unmarshalEvent EventUnmarshalFunc quit chan struct{} + + logger log.Logger } func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { @@ -105,16 +105,25 @@ func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { timer: metrics.NewTimer(), receivedPong: true, unmarshalEvent: unmarshalEvent, - quit: make(chan struct{}), + logger: log.NewNopLogger(), } return em } +// SetLogger lets you set your own logger +func (em *EventMeter) SetLogger(l log.Logger) { + em.logger = l +} + func (em *EventMeter) String() string { return em.wsc.Address } func (em *EventMeter) Start() error { + if _, err := em.wsc.Reset(); err != nil { + return err + } + if _, err := em.wsc.Start(); err != nil { return err } @@ -130,19 +139,28 @@ func (em *EventMeter) Start() error { } return nil }) + + em.quit = make(chan struct{}) go em.receiveRoutine() - return nil + + return em.resubscribe() } +// Stop stops the EventMeter. func (em *EventMeter) Stop() { - <-em.quit + close(em.quit) - em.RegisterDisconnectCallback(nil) // so we don't try and reconnect - em.wsc.Stop() // close(wsc.Quit) + if em.wsc.IsRunning() { + em.wsc.Stop() + } } -func (em *EventMeter) StopAndReconnect() { - em.wsc.Stop() +// StopAndCallDisconnectCallback stops the EventMeter and calls +// disconnectCallback if present. +func (em *EventMeter) StopAndCallDisconnectCallback() { + if em.wsc.IsRunning() { + em.wsc.Stop() + } em.mtx.Lock() defer em.mtx.Unlock() @@ -216,38 +234,50 @@ func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) { //------------------------------------------------------ +func (em *EventMeter) resubscribe() error { + for eventID, _ := range em.events { + if err := em.wsc.Subscribe(eventID); err != nil { + return err + } + } + return nil +} + func (em *EventMeter) receiveRoutine() { pingTime := time.Second * 1 pingTicker := time.NewTicker(pingTime) pingAttempts := 0 // if this hits maxPingsPerPong we kill the conn + var err error for { select { case <-pingTicker.C: if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil { - Log.Error("Failed to write ping message on websocket", err) - em.StopAndReconnect() + em.logger.Log("err", errors.Wrap(err, "failed to write ping message on websocket")) + em.StopAndCallDisconnectCallback() return } else if pingAttempts >= maxPingsPerPong { - Log.Error(fmt.Sprintf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime)) - em.StopAndReconnect() + em.logger.Log("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime)) + em.StopAndCallDisconnectCallback() return } case r := <-em.wsc.ResultsCh: if r == nil { - em.StopAndReconnect() + em.logger.Log("err", errors.New("Expected some event, received nil")) + em.StopAndCallDisconnectCallback() return } eventID, data, err := em.unmarshalEvent(r) if err != nil { - Log.Error(err.Error()) + em.logger.Log("err", errors.Wrap(err, "failed to unmarshal event")) continue } if eventID != "" { em.updateMetric(eventID, data) } case <-em.wsc.Quit: - em.StopAndReconnect() + em.logger.Log("err", errors.New("WSClient closed unexpectedly")) + em.StopAndCallDisconnectCallback() return case <-em.quit: return diff --git a/tm-monitor/glide.yaml b/tm-monitor/glide.yaml index 14bc82fbc..f9bd303d1 100644 --- a/tm-monitor/glide.yaml +++ b/tm-monitor/glide.yaml @@ -15,7 +15,6 @@ import: - package: github.com/tendermint/go-rpc subpackages: - client -- package: github.com/tendermint/log15 - package: github.com/go-kit/kit subpackages: - log diff --git a/tm-monitor/mock/mock.go b/tm-monitor/mock/mock.go index e765dbe19..b614e5953 100644 --- a/tm-monitor/mock/mock.go +++ b/tm-monitor/mock/mock.go @@ -4,6 +4,7 @@ import ( "log" "reflect" + gokitlog "github.com/go-kit/kit/log" ctypes "github.com/tendermint/tendermint/rpc/core/types" em "github.com/tendermint/tools/tm-monitor/eventmeter" ) @@ -16,6 +17,7 @@ type EventMeter struct { func (e *EventMeter) Start() error { return nil } func (e *EventMeter) Stop() {} +func (e *EventMeter) SetLogger(l gokitlog.Logger) {} func (e *EventMeter) RegisterLatencyCallback(cb em.LatencyCallbackFunc) { e.latencyCallback = cb } func (e *EventMeter) RegisterDisconnectCallback(cb em.DisconnectCallbackFunc) { e.disconnectCallback = cb diff --git a/tm-monitor/monitor/node.go b/tm-monitor/monitor/node.go index 8f3071d12..b8f873dcc 100644 --- a/tm-monitor/monitor/node.go +++ b/tm-monitor/monitor/node.go @@ -98,6 +98,7 @@ func (n *Node) NotifyAboutDisconnects(ch chan<- bool) { // SetLogger lets you set your own logger func (n *Node) SetLogger(l log.Logger) { n.logger = l + n.em.SetLogger(l) } func (n *Node) Start() error { @@ -120,12 +121,7 @@ func (n *Node) Start() error { func (n *Node) Stop() { n.Online = false - n.em.RegisterLatencyCallback(nil) - n.em.Unsubscribe(tmtypes.EventStringNewBlockHeader()) - n.em.RegisterDisconnectCallback(nil) - - // FIXME stop blocks at event_meter.go:140 - // n.em.Stop() + n.em.Stop() close(n.quit) } @@ -166,7 +162,7 @@ func disconnectCallback(n *Node) em.DisconnectCallbackFunc { n.disconnectCh <- true } - if err := n.RestartBackOff(); err != nil { + if err := n.RestartEventMeterBackoff(); err != nil { n.logger.Log("err", errors.Wrap(err, "restart failed")) } else { n.Online = true @@ -179,14 +175,14 @@ func disconnectCallback(n *Node) em.DisconnectCallbackFunc { } } -func (n *Node) RestartBackOff() error { +func (n *Node) RestartEventMeterBackoff() error { attempt := 0 for { d := time.Duration(math.Exp2(float64(attempt))) time.Sleep(d * time.Second) - if err := n.Start(); err != nil { + if err := n.em.Start(); err != nil { n.logger.Log("err", errors.Wrap(err, "restart failed")) } else { // TODO: authenticate pubkey @@ -265,6 +261,7 @@ type eventMeter interface { RegisterDisconnectCallback(em.DisconnectCallbackFunc) Subscribe(string, em.EventCallbackFunc) error Unsubscribe(string) error + SetLogger(l log.Logger) } // UnmarshalEvent unmarshals a json event