|
|
@ -40,18 +40,17 @@ type WSClient struct { |
|
|
|
// https://godoc.org/github.com/rcrowley/go-metrics#Timer.
|
|
|
|
PingPongLatencyTimer metrics.Timer |
|
|
|
|
|
|
|
// user facing channels, closed only when the client is being stopped.
|
|
|
|
ResultsCh chan json.RawMessage |
|
|
|
ErrorsCh chan error |
|
|
|
// Single user facing channel to read RPCResponses from, closed only when the client is being stopped.
|
|
|
|
ResponsesCh chan types.RPCResponse |
|
|
|
|
|
|
|
// Callback, which will be called each time after successful reconnect.
|
|
|
|
onReconnect func() |
|
|
|
|
|
|
|
// internal channels
|
|
|
|
send chan types.RPCRequest // user requests
|
|
|
|
backlog chan types.RPCRequest // stores a single user request received during a conn failure
|
|
|
|
reconnectAfter chan error // reconnect requests
|
|
|
|
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
|
|
|
|
send chan types.RPCRequest // user requests
|
|
|
|
backlog chan types.RPCRequest // stores a single user request received during a conn failure
|
|
|
|
reconnectAfter chan error // reconnect requests
|
|
|
|
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
|
|
|
|
|
|
|
|
wg sync.WaitGroup |
|
|
|
|
|
|
@ -148,8 +147,7 @@ func (c *WSClient) OnStart() error { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
c.ResultsCh = make(chan json.RawMessage) |
|
|
|
c.ErrorsCh = make(chan error) |
|
|
|
c.ResponsesCh = make(chan types.RPCResponse) |
|
|
|
|
|
|
|
c.send = make(chan types.RPCRequest) |
|
|
|
// 1 additional error may come from the read/write
|
|
|
@ -174,8 +172,7 @@ func (c *WSClient) Stop() bool { |
|
|
|
success := c.BaseService.Stop() |
|
|
|
// only close user-facing channels when we can't write to them
|
|
|
|
c.wg.Wait() |
|
|
|
close(c.ResultsCh) |
|
|
|
close(c.ErrorsCh) |
|
|
|
close(c.ResponsesCh) |
|
|
|
return success |
|
|
|
} |
|
|
|
|
|
|
@ -192,7 +189,7 @@ func (c *WSClient) IsActive() bool { |
|
|
|
} |
|
|
|
|
|
|
|
// Send the given RPC request to the server. Results will be available on
|
|
|
|
// ResultsCh, errors, if any, on ErrorsCh. Will block until send succeeds or
|
|
|
|
// ResponsesCh, errors, if any, on ErrorsCh. Will block until send succeeds or
|
|
|
|
// ctx.Done is closed.
|
|
|
|
func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error { |
|
|
|
select { |
|
|
@ -433,15 +430,16 @@ func (c *WSClient) readRoutine() { |
|
|
|
err = json.Unmarshal(data, &response) |
|
|
|
if err != nil { |
|
|
|
c.Logger.Error("failed to parse response", "err", err, "data", string(data)) |
|
|
|
c.ErrorsCh <- err |
|
|
|
continue |
|
|
|
} |
|
|
|
if response.Error != nil { |
|
|
|
c.ErrorsCh <- response.Error |
|
|
|
continue |
|
|
|
} |
|
|
|
c.Logger.Info("got response", "resp", response.Result) |
|
|
|
c.ResultsCh <- *response.Result |
|
|
|
// Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking
|
|
|
|
// c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
|
|
|
|
// both readRoutine and writeRoutine
|
|
|
|
select { |
|
|
|
case <-c.Quit: |
|
|
|
case c.ResponsesCh <- response: |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|