From d8886614b54ced964afb0dc7b621fb72b74e44e3 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 23 Jul 2015 12:19:59 -0700 Subject: [PATCH] Read timeouts for WSConnection --- rpc/server/handlers.go | 146 +++++++++++++++++++++++------------------ rpc/test/ws_helpers.go | 40 ++++++++--- 2 files changed, 113 insertions(+), 73 deletions(-) diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index f43bb1029..3c2ab0028 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -211,9 +211,9 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { // rpc.websocket const ( - WSConnectionReaperSeconds = 5 - MaxFailedSends = 10 - WriteChanBufferSize = 10 + writeChanCapacity = 20 + WSWriteTimeoutSeconds = 10 // exposed for tests + WSReadTimeoutSeconds = 10 // exposed for tests ) // a single websocket connection @@ -223,135 +223,154 @@ type WSConnection struct { QuitService id string - wsConn *websocket.Conn + baseConn *websocket.Conn writeChan chan WSResponse - failedSends int + readTimeout *time.Timer evsw *events.EventSwitch } // new websocket connection wrapper -func NewWSConnection(wsConn *websocket.Conn) *WSConnection { - con := &WSConnection{ - id: wsConn.RemoteAddr().String(), - wsConn: wsConn, - writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full +func NewWSConnection(baseConn *websocket.Conn) *WSConnection { + wsc := &WSConnection{ + id: baseConn.RemoteAddr().String(), + baseConn: baseConn, + writeChan: make(chan WSResponse, writeChanCapacity), // error when full. } - con.QuitService = *NewQuitService(log, "WSConnection", con) - return con + wsc.QuitService = *NewQuitService(log, "WSConnection", wsc) + return wsc } -func (con *WSConnection) OnStart() { - con.QuitService.OnStart() - // read subscriptions/unsubscriptions to events - go con.read() - // write responses - con.write() +// wsc.Start() blocks until the connection closes. +func (wsc *WSConnection) OnStart() { + wsc.QuitService.OnStart() + + // Read subscriptions/unsubscriptions to events + go wsc.readRoutine() + + // Custom Ping handler to touch readTimeout + wsc.readTimeout = time.NewTimer(time.Second * WSReadTimeoutSeconds) + wsc.baseConn.SetPingHandler(func(m string) error { + wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*WSWriteTimeoutSeconds)) + wsc.readTimeout.Reset(time.Second * WSReadTimeoutSeconds) + return nil + }) + go wsc.readTimeoutRoutine() + + // Write responses, BLOCKING. + wsc.writeRoutine() } -func (con *WSConnection) OnStop() { - con.QuitService.OnStop() - con.evsw.RemoveListener(con.id) - // the write loop closes the websocket connection +func (wsc *WSConnection) OnStop() { + wsc.QuitService.OnStop() + wsc.evsw.RemoveListener(wsc.id) + wsc.readTimeout.Stop() + // The write loop closes the websocket connection // when it exits its loop, and the read loop // closes the writeChan } -func (con *WSConnection) SetEventSwitch(evsw *events.EventSwitch) { con.evsw = evsw } +func (wsc *WSConnection) SetEventSwitch(evsw *events.EventSwitch) { wsc.evsw = evsw } + +func (wsc *WSConnection) readTimeoutRoutine() { + select { + case <-wsc.readTimeout.C: + log.Notice("Stopping connection due to read timeout") + wsc.Stop() + case <-wsc.Quit: + return + } +} -// attempt to write response to writeChan and record failures -func (con *WSConnection) safeWrite(resp WSResponse) { +// Attempt to write response to writeChan and record failures +func (wsc *WSConnection) writeResponse(resp WSResponse) { select { - case con.writeChan <- resp: - // yay - con.failedSends = 0 + case wsc.writeChan <- resp: default: - // channel is full - // if this happens too many times in a row, - // close connection - con.failedSends += 1 + log.Notice("Stopping connection due to writeChan overflow", "id", wsc.id) + wsc.Stop() // writeChan capacity exceeded, error. } } -// read from the socket and subscribe to or unsubscribe from events -func (con *WSConnection) read() { - defer close(con.writeChan) - reaper := time.Tick(time.Second * WSConnectionReaperSeconds) +// Read from the socket and subscribe to or unsubscribe from events +func (wsc *WSConnection) readRoutine() { + defer close(wsc.writeChan) + for { select { - // TODO: this actually doesn't work - // since ReadMessage blocks. Really it needs its own - // go routine - case <-reaper: - if con.failedSends > MaxFailedSends { - // sending has failed too many times. - // kill the connection - con.Stop() - return - } + case <-wsc.Quit: + return default: var in []byte - _, in, err := con.wsConn.ReadMessage() + // Do not set a deadline here like below: + // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * WSReadTimeoutSeconds)) + // The client may not send anything for a while. + // We use `readTimeout` to handle read timeouts. + _, in, err := wsc.baseConn.ReadMessage() if err != nil { + log.Notice("Failed to read from connection", "id", wsc.id) // an error reading the connection, // kill the connection - con.Stop() + wsc.Stop() return } var req WSRequest err = json.Unmarshal(in, &req) if err != nil { errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) - con.safeWrite(WSResponse{Error: errStr}) + wsc.writeResponse(WSResponse{Error: errStr}) continue } switch req.Type { case "subscribe": - log.Notice("New event subscription", "con id", con.id, "event", req.Event) - con.evsw.AddListenerForEvent(con.id, req.Event, func(msg interface{}) { + log.Notice("New event subscription", "id", wsc.id, "event", req.Event) + wsc.evsw.AddListenerForEvent(wsc.id, req.Event, func(msg interface{}) { resp := WSResponse{ Event: req.Event, Data: msg, } - con.safeWrite(resp) + wsc.writeResponse(resp) }) case "unsubscribe": if req.Event != "" { - con.evsw.RemoveListenerForEvent(req.Event, con.id) + wsc.evsw.RemoveListenerForEvent(req.Event, wsc.id) } else { - con.evsw.RemoveListener(con.id) + wsc.evsw.RemoveListener(wsc.id) } default: - con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type}) + wsc.writeResponse(WSResponse{Error: "Unknown request type: " + req.Type}) } } } } // receives on a write channel and writes out on the socket -func (con *WSConnection) write() { - defer con.wsConn.Close() +func (wsc *WSConnection) writeRoutine() { + defer wsc.baseConn.Close() n, err := new(int64), new(error) for { select { - case msg := <-con.writeChan: + case <-wsc.Quit: + return + case msg := <-wsc.writeChan: buf := new(bytes.Buffer) binary.WriteJSON(msg, buf, n, err) if *err != nil { log.Error("Failed to marshal WSResponse to JSON", "error", err) } else { - if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { + wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * WSWriteTimeoutSeconds)) + if err := wsc.baseConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { log.Warn("Failed to write response on websocket", "error", err) - con.Stop() + wsc.Stop() return } } - case <-con.Quit: - return } } } +//---------------------------------------- + // main manager for all websocket connections // holds the event switch type WebsocketManager struct { @@ -373,6 +392,7 @@ func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager { } } +// Upgrade the request/response (via http.Hijack) and starts the WSConnection. func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) { wsConn, err := wm.Upgrade(w, r, nil) if err != nil { @@ -385,7 +405,7 @@ func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Requ con := NewWSConnection(wsConn) log.Notice("New websocket connection", "origin", con.id) con.SetEventSwitch(wm.evsw) - con.Start() + con.Start() // Blocking } // rpc.websocket diff --git a/rpc/test/ws_helpers.go b/rpc/test/ws_helpers.go index 2ab6c4d5b..6e2038b19 100644 --- a/rpc/test/ws_helpers.go +++ b/rpc/test/ws_helpers.go @@ -11,6 +11,7 @@ import ( "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/gorilla/websocket" "github.com/tendermint/tendermint/binary" _ "github.com/tendermint/tendermint/config/tendermint_test" + "github.com/tendermint/tendermint/rpc/server" "github.com/tendermint/tendermint/rpc/types" "github.com/tendermint/tendermint/types" ) @@ -55,13 +56,32 @@ func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) { // wait for an event; do things that might trigger events, and check them when they are received func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeout bool, f func(), check func(string, []byte) error) { // go routine to wait for webscoket msg - gch := make(chan []byte) // good channel - ech := make(chan error) // error channel + goodCh := make(chan []byte) + errCh := make(chan error) + quitCh := make(chan struct{}) + defer close(quitCh) + + // Write pings repeatedly + // TODO: Maybe move this out to something that manages the con? + go func() { + pingTicker := time.NewTicker((time.Second * rpcserver.WSReadTimeoutSeconds) / 2) + for { + select { + case <-quitCh: + pingTicker.Stop() + return + case <-pingTicker.C: + con.WriteControl(websocket.PingMessage, []byte("whatevs"), time.Now().Add(time.Second)) + } + } + }() + + // Read message go func() { for { _, p, err := con.ReadMessage() if err != nil { - ech <- err + errCh <- err break } else { // if the event id isnt what we're waiting on @@ -70,11 +90,11 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou Event string `json:"event"` } if err := json.Unmarshal(p, &response); err != nil { - ech <- err + errCh <- err break } if response.Event == eventid { - gch <- p + goodCh <- p break } } @@ -84,17 +104,17 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou // do stuff (transactions) f() - // wait for an event or 10 seconds - ticker := time.Tick(10 * time.Second) + // wait for an event or timeout + timeout := time.NewTimer(10 * time.Second) select { - case <-ticker: + case <-timeout.C: if dieOnTimeout { con.Close() t.Fatalf("%s event was not received in time", eventid) } // else that's great, we didn't hear the event // and we shouldn't have - case p := <-gch: + case p := <-goodCh: if dieOnTimeout { // message was received and expected // run the check @@ -106,7 +126,7 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou con.Close() t.Fatalf("%s event was not expected", eventid) } - case err := <-ech: + case err := <-errCh: t.Fatal(err) } }