diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index a15112d48..3d0c7a99f 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -95,7 +95,7 @@ }, { "ImportPath": "github.com/tendermint/go-event-meter", - "Rev": "48308fb1a262b55db2fce92eb5cd55c5316217c1" + "Rev": "b94cb932ca63a80e711b8b8f4dc16baad0c54b98" }, { "ImportPath": "github.com/tendermint/go-events", diff --git a/setup.sh b/setup.sh index 0cdab14e5..1d40505b7 100644 --- a/setup.sh +++ b/setup.sh @@ -4,9 +4,9 @@ set -e # assumes machines already created N_MACHINES=4 -MACH_PREFIX=mach +MACH_PREFIX=netmon -TESTNET_DIR=~/testnets_mach +TESTNET_DIR=~/testnets_netmon CHAINS_AND_VALS=$TESTNET_DIR/chains_and_vals.json CHAINS_DIR=$TESTNET_DIR/chains VALS_DIR=$TESTNET_DIR/validators diff --git a/types/val.go b/types/val.go index 4e45c31c6..c6441b321 100644 --- a/types/val.go +++ b/types/val.go @@ -66,7 +66,7 @@ func (vs *ValidatorState) Start() error { vs.Config.mtx.Unlock() em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", rpcAddr), ctypes.UnmarshalEvent) - if err := em.Start(); err != nil { + if _, err := em.Start(); err != nil { return err } vs.em = em diff --git a/vendor/github.com/tendermint/go-event-meter/event_meter.go b/vendor/github.com/tendermint/go-event-meter/event_meter.go index 32fefc0cf..47a67e469 100644 --- a/vendor/github.com/tendermint/go-event-meter/event_meter.go +++ b/vendor/github.com/tendermint/go-event-meter/event_meter.go @@ -78,6 +78,7 @@ type DisconnectCallbackFunc func() // Each node gets an event meter to track events for that node type EventMeter struct { + QuitService wsc *client.WSClient mtx sync.Mutex @@ -91,6 +92,8 @@ type EventMeter struct { disconnectCallback DisconnectCallbackFunc unmarshalEvent EventUnmarshalFunc + + done chan struct{} // clean shutdown } func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { @@ -100,11 +103,19 @@ func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { timer: metrics.NewTimer(), receivedPong: true, unmarshalEvent: unmarshalEvent, + done: make(chan struct{}), } + em.QuitService = *NewQuitService(log, "EventMeter", em) return em } -func (em *EventMeter) Start() error { +func (em *EventMeter) String() string { + return em.wsc.Address +} + +func (em *EventMeter) OnStart() error { + em.QuitService.OnStart() + if _, err := em.wsc.Start(); err != nil { return err } @@ -124,10 +135,21 @@ func (em *EventMeter) Start() error { return nil } -func (em *EventMeter) Stop() { +func (em *EventMeter) OnStop() { + em.QuitService.OnStop() + <-em.done + + em.RegisterDisconnectCallback(nil) // so we don't try and reconnect + em.wsc.Stop() // close(wsc.Quit) +} + +func (em *EventMeter) StopAndReconnect() { em.wsc.Stop() + + em.mtx.Lock() + defer em.mtx.Unlock() if em.disconnectCallback != nil { - em.disconnectCallback() + go em.disconnectCallback() } } @@ -206,17 +228,16 @@ func (em *EventMeter) receiveRoutine() { case <-pingTicker.C: if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil { log.Error("Failed to write ping message on websocket", err) - em.Stop() + em.StopAndReconnect() return } else if pingAttempts >= maxPingsPerPong { log.Error(Fmt("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime)) - em.Stop() + em.StopAndReconnect() return } case r := <-em.wsc.ResultsCh: if r == nil { - // we might receive the closed ResultsCh before the Quit - em.Stop() // call stop to trigger the disconnect callback + em.StopAndReconnect() return } eventID, data, err := em.unmarshalEvent(r) @@ -228,10 +249,12 @@ func (em *EventMeter) receiveRoutine() { em.updateMetric(eventID, data) } case <-em.wsc.Quit: - em.Stop() // call stop to trigger the disconnect callback + em.StopAndReconnect() + return + case <-em.Quit: + close(em.done) return } - } }