|
@ -51,7 +51,6 @@ type WSClient struct { |
|
|
backlog chan types.RPCRequest // stores a single user request received during a conn failure
|
|
|
backlog chan types.RPCRequest // stores a single user request received during a conn failure
|
|
|
reconnectAfter chan error // reconnect requests
|
|
|
reconnectAfter chan error // reconnect requests
|
|
|
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
|
|
|
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
|
|
|
writeRoutineQuit chan struct{} // a way for writeRoutine to close readRoutine (on <-BaseService.Quit)
|
|
|
|
|
|
|
|
|
|
|
|
wg sync.WaitGroup |
|
|
wg sync.WaitGroup |
|
|
|
|
|
|
|
@ -280,7 +279,6 @@ func (c *WSClient) reconnect() error { |
|
|
func (c *WSClient) startReadWriteRoutines() { |
|
|
func (c *WSClient) startReadWriteRoutines() { |
|
|
c.wg.Add(2) |
|
|
c.wg.Add(2) |
|
|
c.readRoutineQuit = make(chan struct{}) |
|
|
c.readRoutineQuit = make(chan struct{}) |
|
|
c.writeRoutineQuit = make(chan struct{}) |
|
|
|
|
|
go c.readRoutine() |
|
|
go c.readRoutine() |
|
|
go c.writeRoutine() |
|
|
go c.writeRoutine() |
|
|
} |
|
|
} |
|
@ -386,9 +384,6 @@ func (c *WSClient) writeRoutine() { |
|
|
case <-c.readRoutineQuit: |
|
|
case <-c.readRoutineQuit: |
|
|
return |
|
|
return |
|
|
case <-c.Quit: |
|
|
case <-c.Quit: |
|
|
// We need to fan out the quit message from the single BaseService Quit Channel to the readRoutine
|
|
|
|
|
|
// Use a non-blocking close rather than a send in case readRoutine is in the process of quitting
|
|
|
|
|
|
close(c.writeRoutineQuit) |
|
|
|
|
|
c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) |
|
|
c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
@ -438,10 +433,11 @@ func (c *WSClient) readRoutine() { |
|
|
continue |
|
|
continue |
|
|
} |
|
|
} |
|
|
c.Logger.Info("got response", "resp", response.Result) |
|
|
c.Logger.Info("got response", "resp", response.Result) |
|
|
// Combine a non-blocking read on writeRoutineQuit with a non-blocking write on ResponsesCh to avoid blocking
|
|
|
|
|
|
// c.wg.Wait() in c.Stop()
|
|
|
|
|
|
|
|
|
// Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking
|
|
|
|
|
|
// c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
|
|
|
|
|
|
// both readRoutine and writeRoutine
|
|
|
select { |
|
|
select { |
|
|
case <-c.writeRoutineQuit: |
|
|
|
|
|
|
|
|
case <-c.Quit: |
|
|
case c.ResponsesCh <- response: |
|
|
case c.ResponsesCh <- response: |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|