Browse Source

Read timeouts for WSConnection

pull/122/head
Jae Kwon 9 years ago
parent
commit
d8886614b5
2 changed files with 113 additions and 73 deletions
  1. +83
    -63
      rpc/server/handlers.go
  2. +30
    -10
      rpc/test/ws_helpers.go

+ 83
- 63
rpc/server/handlers.go View File

@ -211,9 +211,9 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
// rpc.websocket
const (
WSConnectionReaperSeconds = 5
MaxFailedSends = 10
WriteChanBufferSize = 10
writeChanCapacity = 20
WSWriteTimeoutSeconds = 10 // exposed for tests
WSReadTimeoutSeconds = 10 // exposed for tests
)
// a single websocket connection
@ -223,135 +223,154 @@ type WSConnection struct {
QuitService
id string
wsConn *websocket.Conn
baseConn *websocket.Conn
writeChan chan WSResponse
failedSends int
readTimeout *time.Timer
evsw *events.EventSwitch
}
// new websocket connection wrapper
func NewWSConnection(wsConn *websocket.Conn) *WSConnection {
con := &WSConnection{
id: wsConn.RemoteAddr().String(),
wsConn: wsConn,
writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
func NewWSConnection(baseConn *websocket.Conn) *WSConnection {
wsc := &WSConnection{
id: baseConn.RemoteAddr().String(),
baseConn: baseConn,
writeChan: make(chan WSResponse, writeChanCapacity), // error when full.
}
con.QuitService = *NewQuitService(log, "WSConnection", con)
return con
wsc.QuitService = *NewQuitService(log, "WSConnection", wsc)
return wsc
}
func (con *WSConnection) OnStart() {
con.QuitService.OnStart()
// read subscriptions/unsubscriptions to events
go con.read()
// write responses
con.write()
// wsc.Start() blocks until the connection closes.
func (wsc *WSConnection) OnStart() {
wsc.QuitService.OnStart()
// Read subscriptions/unsubscriptions to events
go wsc.readRoutine()
// Custom Ping handler to touch readTimeout
wsc.readTimeout = time.NewTimer(time.Second * WSReadTimeoutSeconds)
wsc.baseConn.SetPingHandler(func(m string) error {
wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*WSWriteTimeoutSeconds))
wsc.readTimeout.Reset(time.Second * WSReadTimeoutSeconds)
return nil
})
go wsc.readTimeoutRoutine()
// Write responses, BLOCKING.
wsc.writeRoutine()
}
func (con *WSConnection) OnStop() {
con.QuitService.OnStop()
con.evsw.RemoveListener(con.id)
// the write loop closes the websocket connection
func (wsc *WSConnection) OnStop() {
wsc.QuitService.OnStop()
wsc.evsw.RemoveListener(wsc.id)
wsc.readTimeout.Stop()
// The write loop closes the websocket connection
// when it exits its loop, and the read loop
// closes the writeChan
}
func (con *WSConnection) SetEventSwitch(evsw *events.EventSwitch) { con.evsw = evsw }
func (wsc *WSConnection) SetEventSwitch(evsw *events.EventSwitch) { wsc.evsw = evsw }
func (wsc *WSConnection) readTimeoutRoutine() {
select {
case <-wsc.readTimeout.C:
log.Notice("Stopping connection due to read timeout")
wsc.Stop()
case <-wsc.Quit:
return
}
}
// attempt to write response to writeChan and record failures
func (con *WSConnection) safeWrite(resp WSResponse) {
// Attempt to write response to writeChan and record failures
func (wsc *WSConnection) writeResponse(resp WSResponse) {
select {
case con.writeChan <- resp:
// yay
con.failedSends = 0
case wsc.writeChan <- resp:
default:
// channel is full
// if this happens too many times in a row,
// close connection
con.failedSends += 1
log.Notice("Stopping connection due to writeChan overflow", "id", wsc.id)
wsc.Stop() // writeChan capacity exceeded, error.
}
}
// read from the socket and subscribe to or unsubscribe from events
func (con *WSConnection) read() {
defer close(con.writeChan)
reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
// Read from the socket and subscribe to or unsubscribe from events
func (wsc *WSConnection) readRoutine() {
defer close(wsc.writeChan)
for {
select {
// TODO: this actually doesn't work
// since ReadMessage blocks. Really it needs its own
// go routine
case <-reaper:
if con.failedSends > MaxFailedSends {
// sending has failed too many times.
// kill the connection
con.Stop()
return
}
case <-wsc.Quit:
return
default:
var in []byte
_, in, err := con.wsConn.ReadMessage()
// Do not set a deadline here like below:
// wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * WSReadTimeoutSeconds))
// The client may not send anything for a while.
// We use `readTimeout` to handle read timeouts.
_, in, err := wsc.baseConn.ReadMessage()
if err != nil {
log.Notice("Failed to read from connection", "id", wsc.id)
// an error reading the connection,
// kill the connection
con.Stop()
wsc.Stop()
return
}
var req WSRequest
err = json.Unmarshal(in, &req)
if err != nil {
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
con.safeWrite(WSResponse{Error: errStr})
wsc.writeResponse(WSResponse{Error: errStr})
continue
}
switch req.Type {
case "subscribe":
log.Notice("New event subscription", "con id", con.id, "event", req.Event)
con.evsw.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
log.Notice("New event subscription", "id", wsc.id, "event", req.Event)
wsc.evsw.AddListenerForEvent(wsc.id, req.Event, func(msg interface{}) {
resp := WSResponse{
Event: req.Event,
Data: msg,
}
con.safeWrite(resp)
wsc.writeResponse(resp)
})
case "unsubscribe":
if req.Event != "" {
con.evsw.RemoveListenerForEvent(req.Event, con.id)
wsc.evsw.RemoveListenerForEvent(req.Event, wsc.id)
} else {
con.evsw.RemoveListener(con.id)
wsc.evsw.RemoveListener(wsc.id)
}
default:
con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type})
wsc.writeResponse(WSResponse{Error: "Unknown request type: " + req.Type})
}
}
}
}
// receives on a write channel and writes out on the socket
func (con *WSConnection) write() {
defer con.wsConn.Close()
func (wsc *WSConnection) writeRoutine() {
defer wsc.baseConn.Close()
n, err := new(int64), new(error)
for {
select {
case msg := <-con.writeChan:
case <-wsc.Quit:
return
case msg := <-wsc.writeChan:
buf := new(bytes.Buffer)
binary.WriteJSON(msg, buf, n, err)
if *err != nil {
log.Error("Failed to marshal WSResponse to JSON", "error", err)
} else {
if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * WSWriteTimeoutSeconds))
if err := wsc.baseConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
log.Warn("Failed to write response on websocket", "error", err)
con.Stop()
wsc.Stop()
return
}
}
case <-con.Quit:
return
}
}
}
//----------------------------------------
// main manager for all websocket connections
// holds the event switch
type WebsocketManager struct {
@ -373,6 +392,7 @@ func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager {
}
}
// Upgrade the request/response (via http.Hijack) and starts the WSConnection.
func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) {
wsConn, err := wm.Upgrade(w, r, nil)
if err != nil {
@ -385,7 +405,7 @@ func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Requ
con := NewWSConnection(wsConn)
log.Notice("New websocket connection", "origin", con.id)
con.SetEventSwitch(wm.evsw)
con.Start()
con.Start() // Blocking
}
// rpc.websocket


+ 30
- 10
rpc/test/ws_helpers.go View File

@ -11,6 +11,7 @@ import (
"github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/gorilla/websocket"
"github.com/tendermint/tendermint/binary"
_ "github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tendermint/rpc/server"
"github.com/tendermint/tendermint/rpc/types"
"github.com/tendermint/tendermint/types"
)
@ -55,13 +56,32 @@ func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) {
// wait for an event; do things that might trigger events, and check them when they are received
func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeout bool, f func(), check func(string, []byte) error) {
// go routine to wait for webscoket msg
gch := make(chan []byte) // good channel
ech := make(chan error) // error channel
goodCh := make(chan []byte)
errCh := make(chan error)
quitCh := make(chan struct{})
defer close(quitCh)
// Write pings repeatedly
// TODO: Maybe move this out to something that manages the con?
go func() {
pingTicker := time.NewTicker((time.Second * rpcserver.WSReadTimeoutSeconds) / 2)
for {
select {
case <-quitCh:
pingTicker.Stop()
return
case <-pingTicker.C:
con.WriteControl(websocket.PingMessage, []byte("whatevs"), time.Now().Add(time.Second))
}
}
}()
// Read message
go func() {
for {
_, p, err := con.ReadMessage()
if err != nil {
ech <- err
errCh <- err
break
} else {
// if the event id isnt what we're waiting on
@ -70,11 +90,11 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou
Event string `json:"event"`
}
if err := json.Unmarshal(p, &response); err != nil {
ech <- err
errCh <- err
break
}
if response.Event == eventid {
gch <- p
goodCh <- p
break
}
}
@ -84,17 +104,17 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou
// do stuff (transactions)
f()
// wait for an event or 10 seconds
ticker := time.Tick(10 * time.Second)
// wait for an event or timeout
timeout := time.NewTimer(10 * time.Second)
select {
case <-ticker:
case <-timeout.C:
if dieOnTimeout {
con.Close()
t.Fatalf("%s event was not received in time", eventid)
}
// else that's great, we didn't hear the event
// and we shouldn't have
case p := <-gch:
case p := <-goodCh:
if dieOnTimeout {
// message was received and expected
// run the check
@ -106,7 +126,7 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou
con.Close()
t.Fatalf("%s event was not expected", eventid)
}
case err := <-ech:
case err := <-errCh:
t.Fatal(err)
}
}


Loading…
Cancel
Save