Browse Source

update event meter

pull/1943/head
Ethan Buchman 9 years ago
parent
commit
75d31daacd
4 changed files with 36 additions and 13 deletions
  1. +1
    -1
      Godeps/Godeps.json
  2. +2
    -2
      setup.sh
  3. +1
    -1
      types/val.go
  4. +32
    -9
      vendor/github.com/tendermint/go-event-meter/event_meter.go

+ 1
- 1
Godeps/Godeps.json View File

@ -95,7 +95,7 @@
}, },
{ {
"ImportPath": "github.com/tendermint/go-event-meter", "ImportPath": "github.com/tendermint/go-event-meter",
"Rev": "48308fb1a262b55db2fce92eb5cd55c5316217c1"
"Rev": "b94cb932ca63a80e711b8b8f4dc16baad0c54b98"
}, },
{ {
"ImportPath": "github.com/tendermint/go-events", "ImportPath": "github.com/tendermint/go-events",


+ 2
- 2
setup.sh View File

@ -4,9 +4,9 @@ set -e
# assumes machines already created # assumes machines already created
N_MACHINES=4 N_MACHINES=4
MACH_PREFIX=mach
MACH_PREFIX=netmon
TESTNET_DIR=~/testnets_mach
TESTNET_DIR=~/testnets_netmon
CHAINS_AND_VALS=$TESTNET_DIR/chains_and_vals.json CHAINS_AND_VALS=$TESTNET_DIR/chains_and_vals.json
CHAINS_DIR=$TESTNET_DIR/chains CHAINS_DIR=$TESTNET_DIR/chains
VALS_DIR=$TESTNET_DIR/validators VALS_DIR=$TESTNET_DIR/validators


+ 1
- 1
types/val.go View File

@ -66,7 +66,7 @@ func (vs *ValidatorState) Start() error {
vs.Config.mtx.Unlock() vs.Config.mtx.Unlock()
em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", rpcAddr), ctypes.UnmarshalEvent) em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", rpcAddr), ctypes.UnmarshalEvent)
if err := em.Start(); err != nil {
if _, err := em.Start(); err != nil {
return err return err
} }
vs.em = em vs.em = em


+ 32
- 9
vendor/github.com/tendermint/go-event-meter/event_meter.go View File

@ -78,6 +78,7 @@ type DisconnectCallbackFunc func()
// Each node gets an event meter to track events for that node // Each node gets an event meter to track events for that node
type EventMeter struct { type EventMeter struct {
QuitService
wsc *client.WSClient wsc *client.WSClient
mtx sync.Mutex mtx sync.Mutex
@ -91,6 +92,8 @@ type EventMeter struct {
disconnectCallback DisconnectCallbackFunc disconnectCallback DisconnectCallbackFunc
unmarshalEvent EventUnmarshalFunc unmarshalEvent EventUnmarshalFunc
done chan struct{} // clean shutdown
} }
func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
@ -100,11 +103,19 @@ func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
timer: metrics.NewTimer(), timer: metrics.NewTimer(),
receivedPong: true, receivedPong: true,
unmarshalEvent: unmarshalEvent, unmarshalEvent: unmarshalEvent,
done: make(chan struct{}),
} }
em.QuitService = *NewQuitService(log, "EventMeter", em)
return em return em
} }
func (em *EventMeter) Start() error {
func (em *EventMeter) String() string {
return em.wsc.Address
}
func (em *EventMeter) OnStart() error {
em.QuitService.OnStart()
if _, err := em.wsc.Start(); err != nil { if _, err := em.wsc.Start(); err != nil {
return err return err
} }
@ -124,10 +135,21 @@ func (em *EventMeter) Start() error {
return nil return nil
} }
func (em *EventMeter) Stop() {
func (em *EventMeter) OnStop() {
em.QuitService.OnStop()
<-em.done
em.RegisterDisconnectCallback(nil) // so we don't try and reconnect
em.wsc.Stop() // close(wsc.Quit)
}
func (em *EventMeter) StopAndReconnect() {
em.wsc.Stop() em.wsc.Stop()
em.mtx.Lock()
defer em.mtx.Unlock()
if em.disconnectCallback != nil { if em.disconnectCallback != nil {
em.disconnectCallback()
go em.disconnectCallback()
} }
} }
@ -206,17 +228,16 @@ func (em *EventMeter) receiveRoutine() {
case <-pingTicker.C: case <-pingTicker.C:
if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil { if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil {
log.Error("Failed to write ping message on websocket", err) log.Error("Failed to write ping message on websocket", err)
em.Stop()
em.StopAndReconnect()
return return
} else if pingAttempts >= maxPingsPerPong { } else if pingAttempts >= maxPingsPerPong {
log.Error(Fmt("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime)) log.Error(Fmt("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime))
em.Stop()
em.StopAndReconnect()
return return
} }
case r := <-em.wsc.ResultsCh: case r := <-em.wsc.ResultsCh:
if r == nil { if r == nil {
// we might receive the closed ResultsCh before the Quit
em.Stop() // call stop to trigger the disconnect callback
em.StopAndReconnect()
return return
} }
eventID, data, err := em.unmarshalEvent(r) eventID, data, err := em.unmarshalEvent(r)
@ -228,10 +249,12 @@ func (em *EventMeter) receiveRoutine() {
em.updateMetric(eventID, data) em.updateMetric(eventID, data)
} }
case <-em.wsc.Quit: case <-em.wsc.Quit:
em.Stop() // call stop to trigger the disconnect callback
em.StopAndReconnect()
return
case <-em.Quit:
close(em.done)
return return
} }
} }
} }


Loading…
Cancel
Save