From 53f74d052fbaacac009de4d4684424180ab1db73 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 7 Dec 2015 17:54:19 -0800 Subject: [PATCH] Fix deadlock bug in websocket client impl --- benchmarks/simu/counter.go | 44 ++++++++ rpc/client/{client.go => http_client.go} | 8 +- rpc/client/log.go | 1 - rpc/client/ws_client.go | 126 +++++++++++++++++++++++ rpc/server/handlers.go | 14 +-- rpc/types/types.go | 9 ++ 6 files changed, 191 insertions(+), 11 deletions(-) create mode 100644 benchmarks/simu/counter.go rename rpc/client/{client.go => http_client.go} (82%) create mode 100644 rpc/client/ws_client.go diff --git a/benchmarks/simu/counter.go b/benchmarks/simu/counter.go new file mode 100644 index 000000000..8b1b3e5e2 --- /dev/null +++ b/benchmarks/simu/counter.go @@ -0,0 +1,44 @@ +package main + +import ( + "fmt" + + "github.com/gorilla/websocket" + . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/rpc/client" + // ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/rpc/types" +) + +func main() { + ws := rpcclient.NewWSClient("ws://127.0.0.1:46657/websocket") + // ws := rpcclient.NewWSClient("ws://104.236.69.128:46657/websocket") + _, err := ws.Start() + if err != nil { + Exit(err.Error()) + } + + // Read a bunch of responses + go func() { + for { + res, ok := <-ws.ResultsCh + if !ok { + break + } + fmt.Println("Received response", res) + } + }() + + // Make a bunch of requests + request := rpctypes.NewRPCRequest("fakeid", "status", nil) + for i := 0; ; i++ { + reqBytes := wire.JSONBytes(request) + err := ws.WriteMessage(websocket.TextMessage, reqBytes) + if err != nil { + Exit(err.Error()) + } + } + + ws.Stop() +} diff --git a/rpc/client/client.go b/rpc/client/http_client.go similarity index 82% rename from rpc/client/client.go rename to rpc/client/http_client.go index 80e15744e..6cc275d02 100644 --- a/rpc/client/client.go +++ b/rpc/client/http_client.go @@ -9,12 +9,12 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-wire" - . "github.com/tendermint/tendermint/rpc/types" + "github.com/tendermint/tendermint/rpc/types" ) -func Call(remote string, method string, params []interface{}, dest interface{}) (interface{}, error) { +func CallHTTP(remote string, method string, params []interface{}, dest interface{}) (interface{}, error) { // Make request and get responseBytes - request := RPCRequest{ + request := rpctypes.RPCRequest{ JSONRPC: "2.0", Method: method, Params: params, @@ -35,7 +35,7 @@ func Call(remote string, method string, params []interface{}, dest interface{}) log.Info(Fmt("RPC response: %v", string(responseBytes))) // Parse response into JSONResponse - response := RPCResponse{} + response := rpctypes.RPCResponse{} err = json.Unmarshal(responseBytes, &response) if err != nil { return dest, err diff --git a/rpc/client/log.go b/rpc/client/log.go index 465a5c445..8b33e2f10 100644 --- a/rpc/client/log.go +++ b/rpc/client/log.go @@ -1,4 +1,3 @@ - package rpcclient import ( diff --git a/rpc/client/ws_client.go b/rpc/client/ws_client.go new file mode 100644 index 000000000..2ca2f44fc --- /dev/null +++ b/rpc/client/ws_client.go @@ -0,0 +1,126 @@ +package rpcclient + +import ( + "net/http" + "strings" + "time" + + "github.com/gorilla/websocket" + . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/rpc/types" +) + +const ( + wsEventsChannelCapacity = 10 + wsResultsChannelCapacity = 10 + wsWriteTimeoutSeconds = 10 +) + +type WSClient struct { + QuitService + Address string + *websocket.Conn + EventsCh chan ctypes.ResultEvent // closes upon WSClient.Stop() + ResultsCh chan ctypes.Result // closes upon WSClient.Stop() +} + +// create a new connection +func NewWSClient(addr string) *WSClient { + wsClient := &WSClient{ + Address: addr, + Conn: nil, + EventsCh: make(chan ctypes.ResultEvent, wsEventsChannelCapacity), + ResultsCh: make(chan ctypes.Result, wsResultsChannelCapacity), + } + wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient) + return wsClient +} + +func (wsc *WSClient) OnStart() error { + wsc.QuitService.OnStart() + err := wsc.dial() + if err != nil { + return err + } + go wsc.receiveEventsRoutine() + return nil +} + +func (wsc *WSClient) dial() error { + // Dial + dialer := websocket.DefaultDialer + rHeader := http.Header{} + con, _, err := dialer.Dial(wsc.Address, rHeader) + if err != nil { + return err + } + // Set the ping/pong handlers + con.SetPingHandler(func(m string) error { + // NOTE: https://github.com/gorilla/websocket/issues/97 + go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) + return nil + }) + con.SetPongHandler(func(m string) error { + // NOTE: https://github.com/gorilla/websocket/issues/97 + return nil + }) + wsc.Conn = con + return nil +} + +func (wsc *WSClient) OnStop() { + wsc.QuitService.OnStop() + // EventsCh and ResultsCh are closed in receiveEventsRoutine. +} + +func (wsc *WSClient) receiveEventsRoutine() { + for { + _, data, err := wsc.ReadMessage() + if err != nil { + log.Info("WSClient failed to read message", "error", err, "data", string(data)) + wsc.Stop() + break + } else { + var response ctypes.Response + wire.ReadJSON(&response, data, &err) + if err != nil { + log.Info("WSClient failed to parse message", "error", err) + wsc.Stop() + break + } + if strings.HasSuffix(response.ID, "#event") { + wsc.EventsCh <- *response.Result.(*ctypes.ResultEvent) + } else { + wsc.ResultsCh <- response.Result + } + } + } + + // Cleanup + close(wsc.EventsCh) + close(wsc.ResultsCh) +} + +// subscribe to an event +func (wsc *WSClient) Subscribe(eventid string) error { + err := wsc.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "subscribe", + Params: []interface{}{eventid}, + }) + return err +} + +// unsubscribe from an event +func (wsc *WSClient) Unsubscribe(eventid string) error { + err := wsc.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "unsubscribe", + Params: []interface{}{eventid}, + }) + return err +} diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index bdb942ffa..c26b60fe7 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -252,11 +252,13 @@ func (wsc *WSConnection) OnStart() error { wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds) wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds) wsc.baseConn.SetPingHandler(func(m string) error { - wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) + // NOTE: https://github.com/gorilla/websocket/issues/97 + go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) return nil }) wsc.baseConn.SetPongHandler(func(m string) error { + // NOTE: https://github.com/gorilla/websocket/issues/97 wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) return nil }) @@ -287,13 +289,12 @@ func (wsc *WSConnection) readTimeoutRoutine() { } } -// Attempt to write response to writeChan and record failures +// Block trying to write to writeChan until service stops. func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) { select { + case <-wsc.Quit: + return case wsc.writeChan <- resp: - default: - log.Notice("Stopping connection due to writeChan overflow", "id", wsc.id) - wsc.Stop() // writeChan capacity exceeded, error. } } @@ -412,7 +413,8 @@ func (wsc *WSConnection) writeRoutine() { log.Error("Failed to marshal RPCResponse to JSON", "error", err) } else { wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) - if err = wsc.baseConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { + bufBytes := buf.Bytes() + if err = wsc.baseConn.WriteMessage(websocket.TextMessage, bufBytes); err != nil { log.Warn("Failed to write response on websocket", "error", err) wsc.Stop() return diff --git a/rpc/types/types.go b/rpc/types/types.go index c93e6735b..ba6ff0d6e 100644 --- a/rpc/types/types.go +++ b/rpc/types/types.go @@ -7,6 +7,15 @@ type RPCRequest struct { Params []interface{} `json:"params"` } +func NewRPCRequest(id string, method string, params []interface{}) RPCRequest { + return RPCRequest{ + JSONRPC: "2.0", + ID: id, + Method: method, + Params: params, + } +} + type RPCResponse struct { JSONRPC string `json:"jsonrpc"` ID string `json:"id"`