From ce36a0111a9771882f7196aecab63e1c0a9804ff Mon Sep 17 00:00:00 2001 From: Alexandre Thibault Date: Fri, 29 Sep 2017 11:31:39 +0200 Subject: [PATCH] rpc: subscribe on reconnection (#689) * rpc: subscribe on reconnection * rpc: fix unit tests --- rpc/client/httpclient.go | 10 ++++++++++ rpc/lib/client/ws_client.go | 7 +++++-- rpc/lib/client/ws_client_test.go | 2 ++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 7f29183d5..d068ee95b 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -305,6 +305,14 @@ func (w *WSEvents) RemoveListener(listenerID string) { w.EventSwitch.RemoveListener(listenerID) } +// After being reconnected, it is necessary to redo subscription +// to server otherwise no data will be automatically received +func (w *WSEvents) redoSubscriptions() { + for event, _ := range w.evtCount { + w.subscribe(event) + } +} + // eventListener is an infinite loop pulling all websocket events // and pushing them to the EventSwitch. // @@ -327,6 +335,8 @@ func (w *WSEvents) eventListener() { // before cleaning up the w.ws stuff w.done <- true return + case <-w.ws.ReconnectCh: + w.redoSubscriptions() } } } diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 1407073ae..788cb8605 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -41,8 +41,9 @@ type WSClient struct { PingPongLatencyTimer metrics.Timer // user facing channels, closed only when the client is being stopped. - ResultsCh chan json.RawMessage - ErrorsCh chan error + ResultsCh chan json.RawMessage + ErrorsCh chan error + ReconnectCh chan bool // internal channels send chan types.RPCRequest // user requests @@ -139,6 +140,7 @@ func (c *WSClient) OnStart() error { c.ResultsCh = make(chan json.RawMessage) c.ErrorsCh = make(chan error) + c.ReconnectCh = make(chan bool) c.send = make(chan types.RPCRequest) // 1 additional error may come from the read/write @@ -254,6 +256,7 @@ func (c *WSClient) reconnect() error { c.Logger.Error("failed to redial", "err", err) } else { c.Logger.Info("reconnected") + c.ReconnectCh <- true return nil } diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index f5aa027fe..e90fc29d8 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -186,6 +186,8 @@ func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) { if err != nil { t.Fatalf("unexpected error: %v", err) } + case <-c.ReconnectCh: + t.Log("Reconnected") case <-c.Quit: return }