Browse Source

rewrite ws client to expose a callback instead of a channel

callback gives more power to the publisher. plus it is optional
comparing to a channel, which will block the whole client if you won't
read from it.
pull/692/head
Anton Kaliaev 7 years ago
committed by Zach Ramsay
parent
commit
45ff7cdd0c
3 changed files with 19 additions and 10 deletions
  1. +3
    -3
      rpc/client/httpclient.go
  2. +16
    -5
      rpc/lib/client/ws_client.go
  3. +0
    -2
      rpc/lib/client/ws_client_test.go

+ 3
- 3
rpc/client/httpclient.go View File

@ -226,7 +226,9 @@ func (w *WSEvents) Start() (bool, error) {
st, err := w.EventSwitch.Start() st, err := w.EventSwitch.Start()
// if we did start, then OnStart here... // if we did start, then OnStart here...
if st && err == nil { if st && err == nil {
ws := rpcclient.NewWSClient(w.remote, w.endpoint)
ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
w.redoSubscriptions()
}))
_, err = ws.Start() _, err = ws.Start()
if err == nil { if err == nil {
w.ws = ws w.ws = ws
@ -335,8 +337,6 @@ func (w *WSEvents) eventListener() {
// before cleaning up the w.ws stuff // before cleaning up the w.ws stuff
w.done <- true w.done <- true
return return
case <-w.ws.ReconnectCh:
w.redoSubscriptions()
} }
} }
} }


+ 16
- 5
rpc/lib/client/ws_client.go View File

@ -41,9 +41,11 @@ type WSClient struct {
PingPongLatencyTimer metrics.Timer PingPongLatencyTimer metrics.Timer
// user facing channels, closed only when the client is being stopped. // user facing channels, closed only when the client is being stopped.
ResultsCh chan json.RawMessage
ErrorsCh chan error
ReconnectCh chan bool
ResultsCh chan json.RawMessage
ErrorsCh chan error
// Callback, which will be called each time after successful reconnect.
onReconnect func()
// internal channels // internal channels
send chan types.RPCRequest // user requests send chan types.RPCRequest // user requests
@ -125,6 +127,14 @@ func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
} }
} }
// OnReconnect sets the callback, which will be called every time after
// successful reconnect.
func OnReconnect(cb func()) func(*WSClient) {
return func(c *WSClient) {
c.onReconnect = cb
}
}
// String returns WS client full address. // String returns WS client full address.
func (c *WSClient) String() string { func (c *WSClient) String() string {
return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint) return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint)
@ -140,7 +150,6 @@ func (c *WSClient) OnStart() error {
c.ResultsCh = make(chan json.RawMessage) c.ResultsCh = make(chan json.RawMessage)
c.ErrorsCh = make(chan error) c.ErrorsCh = make(chan error)
c.ReconnectCh = make(chan bool)
c.send = make(chan types.RPCRequest) c.send = make(chan types.RPCRequest)
// 1 additional error may come from the read/write // 1 additional error may come from the read/write
@ -256,7 +265,9 @@ func (c *WSClient) reconnect() error {
c.Logger.Error("failed to redial", "err", err) c.Logger.Error("failed to redial", "err", err)
} else { } else {
c.Logger.Info("reconnected") c.Logger.Info("reconnected")
c.ReconnectCh <- true
if c.onReconnect != nil {
go c.onReconnect()
}
return nil return nil
} }


+ 0
- 2
rpc/lib/client/ws_client_test.go View File

@ -186,8 +186,6 @@ func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
case <-c.ReconnectCh:
t.Log("Reconnected")
case <-c.Quit: case <-c.Quit:
return return
} }


Loading…
Cancel
Save