Browse Source

fix websocket handler

pull/55/head
Ethan Buchman 9 years ago
parent
commit
8057fb7125
1 changed files with 22 additions and 19 deletions
  1. +22
    -19
      rpc/handlers.go

+ 22
- 19
rpc/handlers.go View File

@ -225,6 +225,7 @@ type WSConnection struct {
id string
wsConn *websocket.Conn
writeChan chan WSResponse
quitChan chan struct{}
failedSends uint
started uint32
stopped uint32
@ -238,6 +239,7 @@ func NewWSConnection(wsConn *websocket.Conn) *WSConnection {
id: wsConn.RemoteAddr().String(),
wsConn: wsConn,
writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
quitChan: make(chan struct{}),
}
}
@ -256,8 +258,10 @@ func (con *WSConnection) Start(evsw *events.EventSwitch) {
// close the connection
func (con *WSConnection) Stop() {
if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) {
con.wsConn.Close()
close(con.writeChan)
close(con.quitChan)
// the write loop closes the websocket connection
// when it exits its loop, and the read loop
// closes the writeChan
}
}
@ -277,6 +281,7 @@ func (con *WSConnection) safeWrite(resp WSResponse) {
// read from the socket and subscribe to or unsubscribe from events
func (con *WSConnection) read() {
defer close(con.writeChan)
reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
for {
select {
@ -322,32 +327,30 @@ func (con *WSConnection) read() {
default:
con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type})
}
}
}
}
// receives on a write channel and writes out on the socket
func (con *WSConnection) write() {
defer con.wsConn.Close()
n, err := new(int64), new(error)
for {
msg, more := <-con.writeChan
if !more {
// the channel was closed, so ensure
// connection is stopped and return
con.Stop()
return
}
buf := new(bytes.Buffer)
binary.WriteJSON(msg, buf, n, err)
if *err != nil {
log.Error("Failed to write JSON WSResponse", "error", err)
} else {
if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
log.Error("Failed to write response on websocket", "error", err)
con.Stop()
return
select {
case msg := <-con.writeChan:
buf := new(bytes.Buffer)
binary.WriteJSON(msg, buf, n, err)
if *err != nil {
log.Error("Failed to marshal WSResponse to JSON", "error", err)
} else {
if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
log.Error("Failed to write response on websocket", "error", err)
con.Stop()
return
}
}
case <-con.quitChan:
return
}
}
}


Loading…
Cancel
Save