Browse Source

Merge pull request #13 from tendermint/bugfix/6-tm-monitor-does-not-reconnect-to-a-node

tm monitor does not reconnect to a node
pull/1943/head
Anton Kaliaev 8 years ago
committed by GitHub
parent
commit
d07e164796
4 changed files with 56 additions and 28 deletions
  1. +48
    -18
      tm-monitor/eventmeter/eventmeter.go
  2. +0
    -1
      tm-monitor/glide.yaml
  3. +2
    -0
      tm-monitor/mock/mock.go
  4. +6
    -9
      tm-monitor/monitor/node.go

+ 48
- 18
tm-monitor/eventmeter/eventmeter.go View File

@ -6,16 +6,14 @@ import (
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
metrics "github.com/rcrowley/go-metrics"
events "github.com/tendermint/go-events"
client "github.com/tendermint/go-rpc/client"
log15 "github.com/tendermint/log15"
)
// Log allows you to set your own logger.
var Log log15.Logger
//------------------------------------------------------
// Generic system to subscribe to events and record their frequency
//------------------------------------------------------
@ -96,6 +94,8 @@ type EventMeter struct {
unmarshalEvent EventUnmarshalFunc
quit chan struct{}
logger log.Logger
}
func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
@ -105,16 +105,25 @@ func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
timer: metrics.NewTimer(),
receivedPong: true,
unmarshalEvent: unmarshalEvent,
quit: make(chan struct{}),
logger: log.NewNopLogger(),
}
return em
}
// SetLogger lets you set your own logger
func (em *EventMeter) SetLogger(l log.Logger) {
em.logger = l
}
func (em *EventMeter) String() string {
return em.wsc.Address
}
func (em *EventMeter) Start() error {
if _, err := em.wsc.Reset(); err != nil {
return err
}
if _, err := em.wsc.Start(); err != nil {
return err
}
@ -130,19 +139,28 @@ func (em *EventMeter) Start() error {
}
return nil
})
em.quit = make(chan struct{})
go em.receiveRoutine()
return nil
return em.resubscribe()
}
// Stop stops the EventMeter.
func (em *EventMeter) Stop() {
<-em.quit
close(em.quit)
em.RegisterDisconnectCallback(nil) // so we don't try and reconnect
em.wsc.Stop() // close(wsc.Quit)
if em.wsc.IsRunning() {
em.wsc.Stop()
}
}
func (em *EventMeter) StopAndReconnect() {
em.wsc.Stop()
// StopAndCallDisconnectCallback stops the EventMeter and calls
// disconnectCallback if present.
func (em *EventMeter) StopAndCallDisconnectCallback() {
if em.wsc.IsRunning() {
em.wsc.Stop()
}
em.mtx.Lock()
defer em.mtx.Unlock()
@ -216,38 +234,50 @@ func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) {
//------------------------------------------------------
func (em *EventMeter) resubscribe() error {
for eventID, _ := range em.events {
if err := em.wsc.Subscribe(eventID); err != nil {
return err
}
}
return nil
}
func (em *EventMeter) receiveRoutine() {
pingTime := time.Second * 1
pingTicker := time.NewTicker(pingTime)
pingAttempts := 0 // if this hits maxPingsPerPong we kill the conn
var err error
for {
select {
case <-pingTicker.C:
if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil {
Log.Error("Failed to write ping message on websocket", err)
em.StopAndReconnect()
em.logger.Log("err", errors.Wrap(err, "failed to write ping message on websocket"))
em.StopAndCallDisconnectCallback()
return
} else if pingAttempts >= maxPingsPerPong {
Log.Error(fmt.Sprintf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime))
em.StopAndReconnect()
em.logger.Log("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime))
em.StopAndCallDisconnectCallback()
return
}
case r := <-em.wsc.ResultsCh:
if r == nil {
em.StopAndReconnect()
em.logger.Log("err", errors.New("Expected some event, received nil"))
em.StopAndCallDisconnectCallback()
return
}
eventID, data, err := em.unmarshalEvent(r)
if err != nil {
Log.Error(err.Error())
em.logger.Log("err", errors.Wrap(err, "failed to unmarshal event"))
continue
}
if eventID != "" {
em.updateMetric(eventID, data)
}
case <-em.wsc.Quit:
em.StopAndReconnect()
em.logger.Log("err", errors.New("WSClient closed unexpectedly"))
em.StopAndCallDisconnectCallback()
return
case <-em.quit:
return


+ 0
- 1
tm-monitor/glide.yaml View File

@ -15,7 +15,6 @@ import:
- package: github.com/tendermint/go-rpc
subpackages:
- client
- package: github.com/tendermint/log15
- package: github.com/go-kit/kit
subpackages:
- log


+ 2
- 0
tm-monitor/mock/mock.go View File

@ -4,6 +4,7 @@ import (
"log"
"reflect"
gokitlog "github.com/go-kit/kit/log"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
em "github.com/tendermint/tools/tm-monitor/eventmeter"
)
@ -16,6 +17,7 @@ type EventMeter struct {
func (e *EventMeter) Start() error { return nil }
func (e *EventMeter) Stop() {}
func (e *EventMeter) SetLogger(l gokitlog.Logger) {}
func (e *EventMeter) RegisterLatencyCallback(cb em.LatencyCallbackFunc) { e.latencyCallback = cb }
func (e *EventMeter) RegisterDisconnectCallback(cb em.DisconnectCallbackFunc) {
e.disconnectCallback = cb


+ 6
- 9
tm-monitor/monitor/node.go View File

@ -98,6 +98,7 @@ func (n *Node) NotifyAboutDisconnects(ch chan<- bool) {
// SetLogger lets you set your own logger
func (n *Node) SetLogger(l log.Logger) {
n.logger = l
n.em.SetLogger(l)
}
func (n *Node) Start() error {
@ -120,12 +121,7 @@ func (n *Node) Start() error {
func (n *Node) Stop() {
n.Online = false
n.em.RegisterLatencyCallback(nil)
n.em.Unsubscribe(tmtypes.EventStringNewBlockHeader())
n.em.RegisterDisconnectCallback(nil)
// FIXME stop blocks at event_meter.go:140
// n.em.Stop()
n.em.Stop()
close(n.quit)
}
@ -166,7 +162,7 @@ func disconnectCallback(n *Node) em.DisconnectCallbackFunc {
n.disconnectCh <- true
}
if err := n.RestartBackOff(); err != nil {
if err := n.RestartEventMeterBackoff(); err != nil {
n.logger.Log("err", errors.Wrap(err, "restart failed"))
} else {
n.Online = true
@ -179,14 +175,14 @@ func disconnectCallback(n *Node) em.DisconnectCallbackFunc {
}
}
func (n *Node) RestartBackOff() error {
func (n *Node) RestartEventMeterBackoff() error {
attempt := 0
for {
d := time.Duration(math.Exp2(float64(attempt)))
time.Sleep(d * time.Second)
if err := n.Start(); err != nil {
if err := n.em.Start(); err != nil {
n.logger.Log("err", errors.Wrap(err, "restart failed"))
} else {
// TODO: authenticate pubkey
@ -265,6 +261,7 @@ type eventMeter interface {
RegisterDisconnectCallback(em.DisconnectCallbackFunc)
Subscribe(string, em.EventCallbackFunc) error
Unsubscribe(string) error
SetLogger(l log.Logger)
}
// UnmarshalEvent unmarshals a json event


Loading…
Cancel
Save