diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 031c46512..19c066099 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -247,6 +247,24 @@ func (c *WSClient) startReadWriteRoutines() { go c.writeRoutine() } +func (c *WSClient) processBacklog() error { + select { + case request := <-c.backlog: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + err := c.conn.WriteJSON(request) + if err != nil { + c.Logger.Error("failed to resend request", "err", err) + c.reconnectAfter <- err + // requeue request + c.backlog <- request + return err + } + c.Logger.Info("resend a request", "req", request) + default: + } + return nil +} + func (c *WSClient) reconnectRoutine() { for { select { @@ -268,7 +286,10 @@ func (c *WSClient) reconnectRoutine() { break LOOP } } - c.startReadWriteRoutines() + err = c.processBacklog() + if err == nil { + c.startReadWriteRoutines() + } } case <-c.Quit: return @@ -288,17 +309,6 @@ func (c *WSClient) writeRoutine() { for { select { - case request := <-c.backlog: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) - err := c.conn.WriteJSON(request) - if err != nil { - c.Logger.Error("failed to resend request", "err", err) - c.reconnectAfter <- err - // add request to the backlog, so we don't lose it - c.backlog <- request - return - } - c.Logger.Info("resend a request", "req", request) case request := <-c.send: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) err := c.conn.WriteJSON(request) diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index 32385bfd7..d1b9764c3 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -138,7 +138,10 @@ func TestWSClientReconnectFailure(t *testing.T) { s.Close() // results in WS write error - call(t, "a", c) + // provide timeout to avoid blocking + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + c.Call(ctx, "a", make(map[string]interface{})) // expect to reconnect almost immediately time.Sleep(10 * time.Millisecond)