diff --git a/benchmarks/simu/counter.go b/benchmarks/simu/counter.go index ff5b14c0d..c6b4c161c 100644 --- a/benchmarks/simu/counter.go +++ b/benchmarks/simu/counter.go @@ -21,7 +21,7 @@ func main() { // Read a bunch of responses go func() { for { - _, ok := <-wsc.ResultsCh + _, ok := <-wsc.ResponsesCh if !ok { break } diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 393250673..e63fcd4ba 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -318,16 +318,18 @@ func (w *WSEvents) redoSubscriptions() { func (w *WSEvents) eventListener() { for { select { - case res := <-w.ws.ResultsCh: + case resp := <-w.ws.ResponsesCh: // res is json.RawMessage - err := w.parseEvent(res) + if resp.Error != nil { + // FIXME: better logging/handling of errors?? + fmt.Printf("ws err: %+v\n", resp.Error.Error()) + continue + } + err := w.parseEvent(*resp.Result) if err != nil { // FIXME: better logging/handling of errors?? fmt.Printf("ws result: %+v\n", err) } - case err := <-w.ws.ErrorsCh: - // FIXME: better logging/handling of errors?? - fmt.Printf("ws err: %+v\n", err) case <-w.quit: // send a message so we can wait for the routine to exit // before cleaning up the w.ws stuff diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 62dce4d44..a15aca2bb 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -40,9 +40,8 @@ type WSClient struct { // https://godoc.org/github.com/rcrowley/go-metrics#Timer. PingPongLatencyTimer metrics.Timer - // user facing channels, closed only when the client is being stopped. - ResultsCh chan json.RawMessage - ErrorsCh chan error + // Single user facing channel to read RPCResponses from, closed only when the client is being stopped. + ResponsesCh chan types.RPCResponse // Callback, which will be called each time after successful reconnect. onReconnect func() @@ -149,8 +148,7 @@ func (c *WSClient) OnStart() error { return err } - c.ResultsCh = make(chan json.RawMessage) - c.ErrorsCh = make(chan error) + c.ResponsesCh = make(chan types.RPCResponse) c.send = make(chan types.RPCRequest) // 1 additional error may come from the read/write @@ -175,8 +173,7 @@ func (c *WSClient) Stop() bool { success := c.BaseService.Stop() // only close user-facing channels when we can't write to them c.wg.Wait() - close(c.ResultsCh) - close(c.ErrorsCh) + close(c.ResponsesCh) return success } @@ -193,7 +190,7 @@ func (c *WSClient) IsActive() bool { } // Send the given RPC request to the server. Results will be available on -// ResultsCh, errors, if any, on ErrorsCh. Will block until send succeeds or +// ResponsesCh, errors, if any, on ErrorsCh. Will block until send succeeds or // ctx.Done is closed. func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error { select { @@ -438,19 +435,14 @@ func (c *WSClient) readRoutine() { err = json.Unmarshal(data, &response) if err != nil { c.Logger.Error("failed to parse response", "err", err, "data", string(data)) - c.ErrorsCh <- err - continue - } - if response.Error != nil { - c.ErrorsCh <- response.Error continue } c.Logger.Info("got response", "resp", response.Result) - // Combine a non-blocking read on writeRoutineQuit with a non-blocking write on ResultsCh to avoid blocking + // Combine a non-blocking read on writeRoutineQuit with a non-blocking write on ResponsesCh to avoid blocking // c.wg.Wait() in c.Stop() select { case <-c.writeRoutineQuit: - case c.ResultsCh <- *response.Result: + case c.ResponsesCh <- response: } } } diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index 049301f33..23f19dc00 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -125,8 +125,7 @@ func TestWSClientReconnectFailure(t *testing.T) { go func() { for { select { - case <-c.ResultsCh: - case <-c.ErrorsCh: + case <-c.ResponsesCh: case <-c.Quit: return } @@ -163,7 +162,7 @@ func TestWSClientReconnectFailure(t *testing.T) { } func TestNotBlockingOnStop(t *testing.T) { - timeout := 2 *time.Second + timeout := 2 * time.Second s := httptest.NewServer(&myHandler{}) c := startClient(t, s.Listener.Addr()) c.Call(context.Background(), "a", make(map[string]interface{})) @@ -171,10 +170,10 @@ func TestNotBlockingOnStop(t *testing.T) { time.Sleep(time.Second) passCh := make(chan struct{}) go func() { - // Unless we have a non-blocking write to ResultsCh from readRoutine + // Unless we have a non-blocking write to ResponsesCh from readRoutine // this blocks forever ont the waitgroup - c.Stop() - passCh <- struct{}{} + c.Stop() + passCh <- struct{}{} }() select { case <-passCh: @@ -201,13 +200,12 @@ func call(t *testing.T, method string, c *WSClient) { func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) { for { select { - case res := <-c.ResultsCh: - if res != nil { - wg.Done() + case resp := <-c.ResponsesCh: + if resp.Error != nil { + t.Fatalf("unexpected error: %v", resp.Error) } - case err := <-c.ErrorsCh: - if err != nil { - t.Fatalf("unexpected error: %v", err) + if *resp.Result != nil { + wg.Done() } case <-c.Quit: return diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index 4e83d23ef..2ec3014d5 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -217,15 +217,17 @@ func echoViaWS(cl *client.WSClient, val string) (string, error) { } select { - case msg := <-cl.ResultsCh: + case msg := <-cl.ResponsesCh: + if msg.Error != nil { + return "", err + + } result := new(ResultEcho) - err = json.Unmarshal(msg, result) + err = json.Unmarshal(*msg.Result, result) if err != nil { return "", nil } return result.Value, nil - case err := <-cl.ErrorsCh: - return "", err } } @@ -239,15 +241,17 @@ func echoBytesViaWS(cl *client.WSClient, bytes []byte) ([]byte, error) { } select { - case msg := <-cl.ResultsCh: + case msg := <-cl.ResponsesCh: + if msg.Error != nil { + return []byte{}, msg.Error + + } result := new(ResultEchoBytes) - err = json.Unmarshal(msg, result) + err = json.Unmarshal(*msg.Result, result) if err != nil { return []byte{}, nil } return result.Value, nil - case err := <-cl.ErrorsCh: - return []byte{}, err } } @@ -319,14 +323,15 @@ func TestWSNewWSRPCFunc(t *testing.T) { require.Nil(t, err) select { - case msg := <-cl.ResultsCh: + case msg := <-cl.ResponsesCh: + if msg.Error != nil { + t.Fatal(err) + } result := new(ResultEcho) - err = json.Unmarshal(msg, result) + err = json.Unmarshal(*msg.Result, result) require.Nil(t, err) got := result.Value assert.Equal(t, got, val) - case err := <-cl.ErrorsCh: - t.Fatal(err) } } @@ -343,14 +348,15 @@ func TestWSHandlesArrayParams(t *testing.T) { require.Nil(t, err) select { - case msg := <-cl.ResultsCh: + case msg := <-cl.ResponsesCh: + if msg.Error != nil { + t.Fatalf("%+v", err) + } result := new(ResultEcho) - err = json.Unmarshal(msg, result) + err = json.Unmarshal(*msg.Result, result) require.Nil(t, err) got := result.Value assert.Equal(t, got, val) - case err := <-cl.ErrorsCh: - t.Fatalf("%+v", err) } }