From 8057fb7125e809222d3570071e44fafc96e9c20f Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 17 Apr 2015 20:51:15 -0700 Subject: [PATCH] fix websocket handler --- rpc/handlers.go | 41 ++++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/rpc/handlers.go b/rpc/handlers.go index 2d0bb1eaf..5d2064b89 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -225,6 +225,7 @@ type WSConnection struct { id string wsConn *websocket.Conn writeChan chan WSResponse + quitChan chan struct{} failedSends uint started uint32 stopped uint32 @@ -238,6 +239,7 @@ func NewWSConnection(wsConn *websocket.Conn) *WSConnection { id: wsConn.RemoteAddr().String(), wsConn: wsConn, writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full + quitChan: make(chan struct{}), } } @@ -256,8 +258,10 @@ func (con *WSConnection) Start(evsw *events.EventSwitch) { // close the connection func (con *WSConnection) Stop() { if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) { - con.wsConn.Close() - close(con.writeChan) + close(con.quitChan) + // the write loop closes the websocket connection + // when it exits its loop, and the read loop + // closes the writeChan } } @@ -277,6 +281,7 @@ func (con *WSConnection) safeWrite(resp WSResponse) { // read from the socket and subscribe to or unsubscribe from events func (con *WSConnection) read() { + defer close(con.writeChan) reaper := time.Tick(time.Second * WSConnectionReaperSeconds) for { select { @@ -322,32 +327,30 @@ func (con *WSConnection) read() { default: con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type}) } - } } } // receives on a write channel and writes out on the socket func (con *WSConnection) write() { + defer con.wsConn.Close() n, err := new(int64), new(error) for { - msg, more := <-con.writeChan - if !more { - // the channel was closed, so ensure - // connection is stopped and return - con.Stop() - return - } - buf := new(bytes.Buffer) - binary.WriteJSON(msg, buf, n, err) - if *err != nil { - log.Error("Failed to write JSON WSResponse", "error", err) - } else { - if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { - log.Error("Failed to write response on websocket", "error", err) - con.Stop() - return + select { + case msg := <-con.writeChan: + buf := new(bytes.Buffer) + binary.WriteJSON(msg, buf, n, err) + if *err != nil { + log.Error("Failed to marshal WSResponse to JSON", "error", err) + } else { + if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { + log.Error("Failed to write response on websocket", "error", err) + con.Stop() + return + } } + case <-con.quitChan: + return } } }