Browse Source

Replace ResultsCh with ResponsesCh

pull/777/head
Silas Davis 7 years ago
parent
commit
f6adddb4a8
No known key found for this signature in database GPG Key ID: 4CBFD0FF2D395219
5 changed files with 47 additions and 49 deletions
  1. +1
    -1
      benchmarks/simu/counter.go
  2. +7
    -5
      rpc/client/httpclient.go
  3. +7
    -15
      rpc/lib/client/ws_client.go
  4. +10
    -12
      rpc/lib/client/ws_client_test.go
  5. +22
    -16
      rpc/lib/rpc_test.go

+ 1
- 1
benchmarks/simu/counter.go View File

@ -21,7 +21,7 @@ func main() {
// Read a bunch of responses // Read a bunch of responses
go func() { go func() {
for { for {
_, ok := <-wsc.ResultsCh
_, ok := <-wsc.ResponsesCh
if !ok { if !ok {
break break
} }


+ 7
- 5
rpc/client/httpclient.go View File

@ -318,16 +318,18 @@ func (w *WSEvents) redoSubscriptions() {
func (w *WSEvents) eventListener() { func (w *WSEvents) eventListener() {
for { for {
select { select {
case res := <-w.ws.ResultsCh:
case resp := <-w.ws.ResponsesCh:
// res is json.RawMessage // res is json.RawMessage
err := w.parseEvent(res)
if resp.Error != nil {
// FIXME: better logging/handling of errors??
fmt.Printf("ws err: %+v\n", resp.Error.Error())
continue
}
err := w.parseEvent(*resp.Result)
if err != nil { if err != nil {
// FIXME: better logging/handling of errors?? // FIXME: better logging/handling of errors??
fmt.Printf("ws result: %+v\n", err) fmt.Printf("ws result: %+v\n", err)
} }
case err := <-w.ws.ErrorsCh:
// FIXME: better logging/handling of errors??
fmt.Printf("ws err: %+v\n", err)
case <-w.quit: case <-w.quit:
// send a message so we can wait for the routine to exit // send a message so we can wait for the routine to exit
// before cleaning up the w.ws stuff // before cleaning up the w.ws stuff


+ 7
- 15
rpc/lib/client/ws_client.go View File

@ -40,9 +40,8 @@ type WSClient struct {
// https://godoc.org/github.com/rcrowley/go-metrics#Timer. // https://godoc.org/github.com/rcrowley/go-metrics#Timer.
PingPongLatencyTimer metrics.Timer PingPongLatencyTimer metrics.Timer
// user facing channels, closed only when the client is being stopped.
ResultsCh chan json.RawMessage
ErrorsCh chan error
// Single user facing channel to read RPCResponses from, closed only when the client is being stopped.
ResponsesCh chan types.RPCResponse
// Callback, which will be called each time after successful reconnect. // Callback, which will be called each time after successful reconnect.
onReconnect func() onReconnect func()
@ -149,8 +148,7 @@ func (c *WSClient) OnStart() error {
return err return err
} }
c.ResultsCh = make(chan json.RawMessage)
c.ErrorsCh = make(chan error)
c.ResponsesCh = make(chan types.RPCResponse)
c.send = make(chan types.RPCRequest) c.send = make(chan types.RPCRequest)
// 1 additional error may come from the read/write // 1 additional error may come from the read/write
@ -175,8 +173,7 @@ func (c *WSClient) Stop() bool {
success := c.BaseService.Stop() success := c.BaseService.Stop()
// only close user-facing channels when we can't write to them // only close user-facing channels when we can't write to them
c.wg.Wait() c.wg.Wait()
close(c.ResultsCh)
close(c.ErrorsCh)
close(c.ResponsesCh)
return success return success
} }
@ -193,7 +190,7 @@ func (c *WSClient) IsActive() bool {
} }
// Send the given RPC request to the server. Results will be available on // Send the given RPC request to the server. Results will be available on
// ResultsCh, errors, if any, on ErrorsCh. Will block until send succeeds or
// ResponsesCh, errors, if any, on ErrorsCh. Will block until send succeeds or
// ctx.Done is closed. // ctx.Done is closed.
func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error { func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error {
select { select {
@ -438,19 +435,14 @@ func (c *WSClient) readRoutine() {
err = json.Unmarshal(data, &response) err = json.Unmarshal(data, &response)
if err != nil { if err != nil {
c.Logger.Error("failed to parse response", "err", err, "data", string(data)) c.Logger.Error("failed to parse response", "err", err, "data", string(data))
c.ErrorsCh <- err
continue
}
if response.Error != nil {
c.ErrorsCh <- response.Error
continue continue
} }
c.Logger.Info("got response", "resp", response.Result) c.Logger.Info("got response", "resp", response.Result)
// Combine a non-blocking read on writeRoutineQuit with a non-blocking write on ResultsCh to avoid blocking
// Combine a non-blocking read on writeRoutineQuit with a non-blocking write on ResponsesCh to avoid blocking
// c.wg.Wait() in c.Stop() // c.wg.Wait() in c.Stop()
select { select {
case <-c.writeRoutineQuit: case <-c.writeRoutineQuit:
case c.ResultsCh <- *response.Result:
case c.ResponsesCh <- response:
} }
} }
} }


+ 10
- 12
rpc/lib/client/ws_client_test.go View File

@ -125,8 +125,7 @@ func TestWSClientReconnectFailure(t *testing.T) {
go func() { go func() {
for { for {
select { select {
case <-c.ResultsCh:
case <-c.ErrorsCh:
case <-c.ResponsesCh:
case <-c.Quit: case <-c.Quit:
return return
} }
@ -163,7 +162,7 @@ func TestWSClientReconnectFailure(t *testing.T) {
} }
func TestNotBlockingOnStop(t *testing.T) { func TestNotBlockingOnStop(t *testing.T) {
timeout := 2 *time.Second
timeout := 2 * time.Second
s := httptest.NewServer(&myHandler{}) s := httptest.NewServer(&myHandler{})
c := startClient(t, s.Listener.Addr()) c := startClient(t, s.Listener.Addr())
c.Call(context.Background(), "a", make(map[string]interface{})) c.Call(context.Background(), "a", make(map[string]interface{}))
@ -171,10 +170,10 @@ func TestNotBlockingOnStop(t *testing.T) {
time.Sleep(time.Second) time.Sleep(time.Second)
passCh := make(chan struct{}) passCh := make(chan struct{})
go func() { go func() {
// Unless we have a non-blocking write to ResultsCh from readRoutine
// Unless we have a non-blocking write to ResponsesCh from readRoutine
// this blocks forever ont the waitgroup // this blocks forever ont the waitgroup
c.Stop()
passCh <- struct{}{}
c.Stop()
passCh <- struct{}{}
}() }()
select { select {
case <-passCh: case <-passCh:
@ -201,13 +200,12 @@ func call(t *testing.T, method string, c *WSClient) {
func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) { func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) {
for { for {
select { select {
case res := <-c.ResultsCh:
if res != nil {
wg.Done()
case resp := <-c.ResponsesCh:
if resp.Error != nil {
t.Fatalf("unexpected error: %v", resp.Error)
} }
case err := <-c.ErrorsCh:
if err != nil {
t.Fatalf("unexpected error: %v", err)
if *resp.Result != nil {
wg.Done()
} }
case <-c.Quit: case <-c.Quit:
return return


+ 22
- 16
rpc/lib/rpc_test.go View File

@ -217,15 +217,17 @@ func echoViaWS(cl *client.WSClient, val string) (string, error) {
} }
select { select {
case msg := <-cl.ResultsCh:
case msg := <-cl.ResponsesCh:
if msg.Error != nil {
return "", err
}
result := new(ResultEcho) result := new(ResultEcho)
err = json.Unmarshal(msg, result)
err = json.Unmarshal(*msg.Result, result)
if err != nil { if err != nil {
return "", nil return "", nil
} }
return result.Value, nil return result.Value, nil
case err := <-cl.ErrorsCh:
return "", err
} }
} }
@ -239,15 +241,17 @@ func echoBytesViaWS(cl *client.WSClient, bytes []byte) ([]byte, error) {
} }
select { select {
case msg := <-cl.ResultsCh:
case msg := <-cl.ResponsesCh:
if msg.Error != nil {
return []byte{}, msg.Error
}
result := new(ResultEchoBytes) result := new(ResultEchoBytes)
err = json.Unmarshal(msg, result)
err = json.Unmarshal(*msg.Result, result)
if err != nil { if err != nil {
return []byte{}, nil return []byte{}, nil
} }
return result.Value, nil return result.Value, nil
case err := <-cl.ErrorsCh:
return []byte{}, err
} }
} }
@ -319,14 +323,15 @@ func TestWSNewWSRPCFunc(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
select { select {
case msg := <-cl.ResultsCh:
case msg := <-cl.ResponsesCh:
if msg.Error != nil {
t.Fatal(err)
}
result := new(ResultEcho) result := new(ResultEcho)
err = json.Unmarshal(msg, result)
err = json.Unmarshal(*msg.Result, result)
require.Nil(t, err) require.Nil(t, err)
got := result.Value got := result.Value
assert.Equal(t, got, val) assert.Equal(t, got, val)
case err := <-cl.ErrorsCh:
t.Fatal(err)
} }
} }
@ -343,14 +348,15 @@ func TestWSHandlesArrayParams(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
select { select {
case msg := <-cl.ResultsCh:
case msg := <-cl.ResponsesCh:
if msg.Error != nil {
t.Fatalf("%+v", err)
}
result := new(ResultEcho) result := new(ResultEcho)
err = json.Unmarshal(msg, result)
err = json.Unmarshal(*msg.Result, result)
require.Nil(t, err) require.Nil(t, err)
got := result.Value got := result.Value
assert.Equal(t, got, val) assert.Equal(t, got, val)
case err := <-cl.ErrorsCh:
t.Fatalf("%+v", err)
} }
} }


Loading…
Cancel
Save