diff --git a/tm-monitor/eventmeter/eventmeter.go b/tm-monitor/eventmeter/eventmeter.go index d32882fb4..82cd7a186 100644 --- a/tm-monitor/eventmeter/eventmeter.go +++ b/tm-monitor/eventmeter/eventmeter.go @@ -136,20 +136,28 @@ func (em *EventMeter) Start() error { } return nil }) + + em.quit = make(chan struct{}) go em.receiveRoutine() - return nil + + return em.resubscribe() } // Stop stops the EventMeter. func (em *EventMeter) Stop() { close(em.quit) - em.RegisterDisconnectCallback(nil) // so we don't try and reconnect - em.wsc.Stop() // close(wsc.Quit) + if em.wsc.IsRunning() { + em.wsc.Stop() + } } -func (em *EventMeter) StopAndReconnect() { - em.wsc.Stop() +// StopAndCallDisconnectCallback stops the EventMeter and calls +// disconnectCallback if present. +func (em *EventMeter) StopAndCallDisconnectCallback() { + if em.wsc.IsRunning() { + em.wsc.Stop() + } em.mtx.Lock() defer em.mtx.Unlock() @@ -223,38 +231,50 @@ func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) { //------------------------------------------------------ +func (em *EventMeter) resubscribe() error { + for eventID, _ := range em.events { + if err := em.wsc.Subscribe(eventID); err != nil { + return err + } + } + return nil +} + func (em *EventMeter) receiveRoutine() { pingTime := time.Second * 1 pingTicker := time.NewTicker(pingTime) pingAttempts := 0 // if this hits maxPingsPerPong we kill the conn + var err error for { select { case <-pingTicker.C: if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil { - em.logger.Log("err", errors.Wrap(err, "Failed to write ping message on websocket")) - em.StopAndReconnect() + em.logger.Log("err", errors.Wrap(err, "failed to write ping message on websocket")) + em.StopAndCallDisconnectCallback() return } else if pingAttempts >= maxPingsPerPong { em.logger.Log("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime)) - em.StopAndReconnect() + em.StopAndCallDisconnectCallback() return } case r := <-em.wsc.ResultsCh: if r == nil { - em.StopAndReconnect() + em.logger.Log("err", errors.New("Expected some event, received nil")) + em.StopAndCallDisconnectCallback() return } eventID, data, err := em.unmarshalEvent(r) if err != nil { - em.logger.Log("err", err) + em.logger.Log("err", errors.Wrap(err, "failed to unmarshal event")) continue } if eventID != "" { em.updateMetric(eventID, data) } case <-em.wsc.Quit: - em.StopAndReconnect() + em.logger.Log("err", errors.New("WSClient closed unexpectedly")) + em.StopAndCallDisconnectCallback() return case <-em.quit: return diff --git a/tm-monitor/monitor/node.go b/tm-monitor/monitor/node.go index 85ffc1492..b8f873dcc 100644 --- a/tm-monitor/monitor/node.go +++ b/tm-monitor/monitor/node.go @@ -162,7 +162,7 @@ func disconnectCallback(n *Node) em.DisconnectCallbackFunc { n.disconnectCh <- true } - if err := n.RestartBackOff(); err != nil { + if err := n.RestartEventMeterBackoff(); err != nil { n.logger.Log("err", errors.Wrap(err, "restart failed")) } else { n.Online = true @@ -175,7 +175,7 @@ func disconnectCallback(n *Node) em.DisconnectCallbackFunc { } } -func (n *Node) RestartBackOff() error { +func (n *Node) RestartEventMeterBackoff() error { attempt := 0 for {