diff --git a/node/node.go b/node/node.go index 672e384b9..bcb549fc4 100644 --- a/node/node.go +++ b/node/node.go @@ -330,9 +330,9 @@ func (n *Node) startRPC() ([]net.Listener, error) { listeners := make([]net.Listener, len(listenAddrs)) for i, listenAddr := range listenAddrs { mux := http.NewServeMux() - wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw) rpcLogger := n.Logger.With("module", "rpc-server") - wm.SetLogger(rpcLogger) + wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw) + wm.SetLogger(rpcLogger.With("protocol", "websocket")) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 19c066099..6a932caf6 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -19,14 +19,10 @@ import ( ) const ( - // Time allowed to write a message to the server. - writeWait = 10 * time.Second - - // Maximum reconnect attempts - maxReconnectAttempts = 25 - - defaultPongWait = 30 * time.Second - defaultPingPeriod = (defaultPongWait * 9) / 10 + defaultMaxReconnectAttempts = 25 + defaultWriteWait = 0 + defaultReadWait = 0 + defaultPingPeriod = 0 ) // WSClient is a WebSocket client. The methods of WSClient are safe for use by @@ -60,10 +56,16 @@ type WSClient struct { sentLastPingAt time.Time reconnecting bool - // Time allowed to read the next pong message from the server. - pongWait time.Duration + // Maximum reconnect attempts (0 or greater; default: 25). + maxReconnectAttempts int + + // Time allowed to write a message to the server. 0 means block until operation succeeds. + writeWait time.Duration + + // Time allowed to read the next message from the server. 0 means block until operation succeeds. + readWait time.Duration - // Send pings to server with this period. Must be less than pongWait. + // Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent. pingPeriod time.Duration } @@ -77,7 +79,10 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSCli Dialer: dialer, Endpoint: endpoint, PingPongLatencyTimer: metrics.NewTimer(), - pongWait: defaultPongWait, + + maxReconnectAttempts: defaultMaxReconnectAttempts, + readWait: defaultReadWait, + writeWait: defaultWriteWait, pingPeriod: defaultPingPeriod, } c.BaseService = *cmn.NewBaseService(nil, "WSClient", c) @@ -87,15 +92,27 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSCli return c } -// PingPong allows changing ping period and pong wait time. If ping period -// greater or equal to pong wait time, panic will be thrown. -func PingPong(pingPeriod, pongWait time.Duration) func(*WSClient) { +func MaxReconnectAttempts(max int) func(*WSClient) { + return func(c *WSClient) { + c.maxReconnectAttempts = max + } +} + +func ReadWait(readWait time.Duration) func(*WSClient) { + return func(c *WSClient) { + c.readWait = readWait + } +} + +func WriteWait(writeWait time.Duration) func(*WSClient) { + return func(c *WSClient) { + c.writeWait = writeWait + } +} + +func PingPeriod(pingPeriod time.Duration) func(*WSClient) { return func(c *WSClient) { - if pingPeriod >= pongWait { - panic(fmt.Sprintf("ping period (%v) must be less than pong wait time (%v)", pingPeriod, pongWait)) - } c.pingPeriod = pingPeriod - c.pongWait = pongWait } } @@ -234,7 +251,7 @@ func (c *WSClient) reconnect() error { attempt++ - if attempt > maxReconnectAttempts { + if attempt > c.maxReconnectAttempts { return errors.Wrap(err, "reached maximum reconnect attempts") } } @@ -250,7 +267,9 @@ func (c *WSClient) startReadWriteRoutines() { func (c *WSClient) processBacklog() error { select { case request := <-c.backlog: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if c.writeWait > 0 { + c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)) + } err := c.conn.WriteJSON(request) if err != nil { c.Logger.Error("failed to resend request", "err", err) @@ -300,7 +319,15 @@ func (c *WSClient) reconnectRoutine() { // The client ensures that there is at most one writer to a connection by // executing all writes from this goroutine. func (c *WSClient) writeRoutine() { - ticker := time.NewTicker(c.pingPeriod) + var ticker *time.Ticker + if c.pingPeriod > 0 { + // ticker with a predefined period + ticker = time.NewTicker(c.pingPeriod) + } else { + // ticker that never fires + ticker = &time.Ticker{C: make(<-chan time.Time)} + } + defer func() { ticker.Stop() c.conn.Close() @@ -310,7 +337,9 @@ func (c *WSClient) writeRoutine() { for { select { case request := <-c.send: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if c.writeWait > 0 { + c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)) + } err := c.conn.WriteJSON(request) if err != nil { c.Logger.Error("failed to send request", "err", err) @@ -320,7 +349,9 @@ func (c *WSClient) writeRoutine() { return } case <-ticker.C: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if c.writeWait > 0 { + c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)) + } err := c.conn.WriteMessage(websocket.PingMessage, []byte{}) if err != nil { c.Logger.Error("failed to write ping", "err", err) @@ -348,21 +379,25 @@ func (c *WSClient) readRoutine() { c.wg.Done() }() - c.conn.SetReadDeadline(time.Now().Add(c.pongWait)) - c.conn.SetPongHandler(func(string) error { - c.conn.SetReadDeadline(time.Now().Add(c.pongWait)) + // gather latency stats c.mtx.RLock() - c.PingPongLatencyTimer.UpdateSince(c.sentLastPingAt) + t := c.sentLastPingAt c.mtx.RUnlock() + c.PingPongLatencyTimer.UpdateSince(t) + c.Logger.Debug("got pong") return nil }) for { + // reset deadline for every message type (control or data) + if c.readWait > 0 { + c.conn.SetReadDeadline(time.Now().Add(c.readWait)) + } _, data, err := c.conn.ReadMessage() if err != nil { - if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { return } diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index d1b9764c3..6f39690db 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -11,7 +11,6 @@ import ( "time" "github.com/gorilla/websocket" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tmlibs/log" @@ -162,12 +161,6 @@ func TestWSClientReconnectFailure(t *testing.T) { } } -func TestWSClientPingPongOption(t *testing.T) { - assert.Panics(t, func() { - NewWSClient("tcp://localhost:8080", "/websocket", PingPong(2*time.Second, 2*time.Second)) - }) -} - func startClient(t *testing.T, addr net.Addr) *WSClient { c := NewWSClient(addr.String(), "/websocket") _, err := c.Start() diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index 8bac7aa3c..7415cb36d 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -31,8 +31,6 @@ const ( unixAddr = "unix://" + unixSocket websocketEndpoint = "/websocket/endpoint" - - testPongWait = 2 * time.Second ) type ResultEcho struct { @@ -115,7 +113,7 @@ func setup() { tcpLogger := logger.With("socket", "tcp") mux := http.NewServeMux() server.RegisterRPCFuncs(mux, Routes, tcpLogger) - wm := server.NewWebsocketManager(Routes, nil, server.PingPong((testPongWait*9)/10, testPongWait)) + wm := server.NewWebsocketManager(Routes, nil, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second)) wm.SetLogger(tcpLogger) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) go func() { @@ -364,7 +362,7 @@ func TestWSClientPingPong(t *testing.T) { require.Nil(t, err) defer cl.Stop() - time.Sleep((testPongWait * 11) / 10) + time.Sleep(3 * time.Second) } func randBytes(t *testing.T) []byte { diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 431ef742f..203a71ff7 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -337,10 +337,10 @@ func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) { // rpc.websocket const ( - writeChanCapacity = 1000 - wsWriteWait = 30 * time.Second // each write times out after this. - defaultWSPongWait = 30 * time.Second - defaultWSPingPeriod = (defaultWSPongWait * 9) / 10 + defaultWSWriteChanCapacity = 1000 + defaultWSWriteWait = 10 * time.Second + defaultWSReadWait = 30 * time.Second + defaultWSPingPeriod = (defaultWSReadWait * 9) / 10 ) // a single websocket connection @@ -349,19 +349,23 @@ const ( type wsConnection struct { cmn.BaseService - remoteAddr string - baseConn *websocket.Conn - writeChan chan types.RPCResponse - readTimeout *time.Timer - pingTicker *time.Ticker + remoteAddr string + baseConn *websocket.Conn + writeChan chan types.RPCResponse funcMap map[string]*RPCFunc evsw events.EventSwitch + // write channel capacity + writeChanCapacity int + + // each write times out after this. + writeWait time.Duration + // Connection times out if we haven't received *anything* in this long, not even pings. - pongWait time.Duration + readWait time.Duration - // Send pings to server with this period. Must be less than pongWait. + // Send pings to server with this period. Must be less than readWait, but greater than zero. pingPeriod time.Duration } @@ -370,13 +374,14 @@ type wsConnection struct { // ping period and pong wait time. func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection { wsc := &wsConnection{ - remoteAddr: baseConn.RemoteAddr().String(), - baseConn: baseConn, - writeChan: make(chan types.RPCResponse, writeChanCapacity), // error when full. - funcMap: funcMap, - evsw: evsw, - pongWait: defaultWSPongWait, - pingPeriod: defaultWSPingPeriod, + remoteAddr: baseConn.RemoteAddr().String(), + baseConn: baseConn, + funcMap: funcMap, + evsw: evsw, + writeWait: defaultWSWriteWait, + writeChanCapacity: defaultWSWriteChanCapacity, + readWait: defaultWSReadWait, + pingPeriod: defaultWSPingPeriod, } for _, option := range options { option(wsc) @@ -385,69 +390,48 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw return wsc } -// PingPong allows changing ping period and pong wait time. If ping period -// greater or equal to pong wait time, panic will be thrown. -func PingPong(pingPeriod, pongWait time.Duration) func(*wsConnection) { +func WriteWait(writeWait time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.writeWait = writeWait + } +} + +func WriteChanCapacity(cap int) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.writeChanCapacity = cap + } +} + +func ReadWait(readWait time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.readWait = readWait + } +} + +func PingPeriod(pingPeriod time.Duration) func(*wsConnection) { return func(wsc *wsConnection) { - if pingPeriod >= pongWait { - panic(fmt.Sprintf("ping period (%v) must be less than pong wait time (%v)", pingPeriod, pongWait)) - } wsc.pingPeriod = pingPeriod - wsc.pongWait = pongWait } } // wsc.Start() blocks until the connection closes. func (wsc *wsConnection) OnStart() error { - wsc.BaseService.OnStart() - - // these must be set before the readRoutine is created, as it may - // call wsc.Stop(), which accesses these timers - wsc.readTimeout = time.NewTimer(wsc.pongWait) - wsc.pingTicker = time.NewTicker(wsc.pingPeriod) + wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity) // Read subscriptions/unsubscriptions to events go wsc.readRoutine() - - // Custom Ping handler to touch readTimeout - wsc.baseConn.SetPingHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(wsWriteWait)) - wsc.readTimeout.Reset(wsc.pongWait) - return nil - }) - wsc.baseConn.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - wsc.readTimeout.Reset(wsc.pongWait) - return nil - }) - go wsc.readTimeoutRoutine() - // Write responses, BLOCKING. wsc.writeRoutine() + return nil } func (wsc *wsConnection) OnStop() { - wsc.BaseService.OnStop() if wsc.evsw != nil { wsc.evsw.RemoveListener(wsc.remoteAddr) } - wsc.readTimeout.Stop() - wsc.pingTicker.Stop() - // The write loop closes the websocket connection - // when it exits its loop, and the read loop - // closes the writeChan -} - -func (wsc *wsConnection) readTimeoutRoutine() { - select { - case <-wsc.readTimeout.C: - wsc.Logger.Info("Stopping connection due to read timeout") - wsc.Stop() - case <-wsc.Quit: - return - } + // The write loop closes the websocket connection when it exits its loop, and + // the read loop closes the writeChan. } // Implements WSRPCConnection @@ -487,30 +471,30 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { // Read from the socket and subscribe to or unsubscribe from events func (wsc *wsConnection) readRoutine() { + defer func() { + wsc.baseConn.Close() + }() + // Do not close writeChan, to allow WriteRPCResponse() to fail. // defer close(wsc.writeChan) - for { select { case <-wsc.Quit: return default: + // reset deadline for every type of message (control or data) + wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)) var in []byte - // Do not set a deadline here like below: - // wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.pongWait)) - // The client may not send anything for a while. - // We use `readTimeout` to handle read timeouts. _, in, err := wsc.baseConn.ReadMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure) { - wsc.Logger.Info("Client closed the connection", "remote", wsc.remoteAddr) + wsc.Logger.Info("Client closed the connection") } else { - wsc.Logger.Info("Failed to read from connection", "remote", wsc.remoteAddr, "err", err.Error()) + wsc.Logger.Error("Failed to read request", "err", err) } wsc.Stop() return } - wsc.readTimeout.Reset(wsc.pongWait) var request types.RPCRequest err = json.Unmarshal(in, &request) @@ -558,15 +542,33 @@ func (wsc *wsConnection) readRoutine() { // receives on a write channel and writes out on the socket func (wsc *wsConnection) writeRoutine() { - defer wsc.baseConn.Close() + pingTicker := time.NewTicker(wsc.pingPeriod) + defer func() { + pingTicker.Stop() + wsc.baseConn.Close() + }() + + // https://github.com/gorilla/websocket/issues/97 + pongs := make(chan string, 1) + wsc.baseConn.SetPingHandler(func(m string) error { + select { + case pongs <- m: + default: + } + return nil + }) + for { select { - case <-wsc.Quit: - return - case <-wsc.pingTicker.C: + case m := <-pongs: + err := wsc.writeMessageWithDeadline(websocket.PongMessage, []byte(m)) + if err != nil { + wsc.Logger.Info("Failed to write pong (client may disconnect)", "err", err) + } + case <-pingTicker.C: err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{}) if err != nil { - wsc.Logger.Error("Failed to write ping message on websocket", "err", err) + wsc.Logger.Error("Failed to write ping", "err", err) wsc.Stop() return } @@ -576,11 +578,13 @@ func (wsc *wsConnection) writeRoutine() { wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err) } else { if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil { - wsc.Logger.Error("Failed to write response on websocket", "err", err) + wsc.Logger.Error("Failed to write response", "err", err) wsc.Stop() return } } + case <-wsc.Quit: + return } } } @@ -588,7 +592,7 @@ func (wsc *wsConnection) writeRoutine() { // All writes to the websocket must (re)set the write deadline. // If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553) func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error { - wsc.baseConn.SetWriteDeadline(time.Now().Add(wsWriteWait)) + wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait)) return wsc.baseConn.WriteMessage(msgType, msg) } @@ -610,10 +614,8 @@ func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, w funcMap: funcMap, evsw: evsw, Upgrader: websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { - // TODO + // TODO ??? return true }, }, @@ -637,7 +639,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ // register connection con := NewWSConnection(wsConn, wm.funcMap, wm.evsw, wm.wsConnOptions...) - con.SetLogger(wm.logger) + con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr())) wm.logger.Info("New websocket connection", "remote", con.remoteAddr) con.Start() // Blocking }