From 01be6fa3098786b57120083597ba51fea29995cf Mon Sep 17 00:00:00 2001 From: Silas Davis Date: Tue, 24 Oct 2017 13:20:49 +0100 Subject: [PATCH] Fix WSClient blocking in the readRoutine after Stop() as it tries to write to ResultsCh --- rpc/lib/client/ws_client.go | 20 +++++++++++++++----- rpc/lib/client/ws_client_test.go | 23 +++++++++++++++++++++++ 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index d233004b6..62dce4d44 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -48,10 +48,11 @@ type WSClient struct { onReconnect func() // internal channels - send chan types.RPCRequest // user requests - backlog chan types.RPCRequest // stores a single user request received during a conn failure - reconnectAfter chan error // reconnect requests - readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine + send chan types.RPCRequest // user requests + backlog chan types.RPCRequest // stores a single user request received during a conn failure + reconnectAfter chan error // reconnect requests + readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine + writeRoutineQuit chan struct{} // a way for writeRoutine to close readRoutine (on <-BaseService.Quit) wg sync.WaitGroup @@ -282,6 +283,7 @@ func (c *WSClient) reconnect() error { func (c *WSClient) startReadWriteRoutines() { c.wg.Add(2) c.readRoutineQuit = make(chan struct{}) + c.writeRoutineQuit = make(chan struct{}) go c.readRoutine() go c.writeRoutine() } @@ -387,6 +389,9 @@ func (c *WSClient) writeRoutine() { case <-c.readRoutineQuit: return case <-c.Quit: + // We need to fan out the quit message from the single BaseService Quit Channel to the readRoutine + // Use a non-blocking close rather than a send in case readRoutine is in the process of quitting + close(c.writeRoutineQuit) c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) return } @@ -441,7 +446,12 @@ func (c *WSClient) readRoutine() { continue } c.Logger.Info("got response", "resp", response.Result) - c.ResultsCh <- *response.Result + // Combine a non-blocking read on writeRoutineQuit with a non-blocking write on ResultsCh to avoid blocking + // c.wg.Wait() in c.Stop() + select { + case <-c.writeRoutineQuit: + case c.ResultsCh <- *response.Result: + } } } diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index f5aa027fe..049301f33 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -162,6 +162,29 @@ func TestWSClientReconnectFailure(t *testing.T) { } } +func TestNotBlockingOnStop(t *testing.T) { + timeout := 2 *time.Second + s := httptest.NewServer(&myHandler{}) + c := startClient(t, s.Listener.Addr()) + c.Call(context.Background(), "a", make(map[string]interface{})) + // Let the readRoutine get around to blocking + time.Sleep(time.Second) + passCh := make(chan struct{}) + go func() { + // Unless we have a non-blocking write to ResultsCh from readRoutine + // this blocks forever ont the waitgroup + c.Stop() + passCh <- struct{}{} + }() + select { + case <-passCh: + // Pass + case <-time.After(timeout): + t.Fatalf("WSClient did failed to stop within %v seconds - is one of the read/write routines blocking?", + timeout.Seconds()) + } +} + func startClient(t *testing.T, addr net.Addr) *WSClient { c := NewWSClient(addr.String(), "/websocket") _, err := c.Start()