From d578f7f81edde980fa2cdd15a3220f667e8f697e Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 31 Jul 2017 18:44:46 -0400 Subject: [PATCH 01/16] biff up WS client What's new: - auto reconnect - ping/pong - colored tests --- benchmarks/simu/counter.go | 32 +-- rpc/client/httpclient.go | 7 +- rpc/lib/client/ws_client.go | 347 +++++++++++++++++++++++-------- rpc/lib/client/ws_client_test.go | 204 ++++++++++++++++++ rpc/lib/rpc_test.go | 69 ++++-- rpc/lib/test/integration_test.sh | 3 + rpc/lib/types/types.go | 13 ++ 7 files changed, 554 insertions(+), 121 deletions(-) create mode 100644 rpc/lib/client/ws_client_test.go diff --git a/benchmarks/simu/counter.go b/benchmarks/simu/counter.go index e9502f956..d26f5e6f7 100644 --- a/benchmarks/simu/counter.go +++ b/benchmarks/simu/counter.go @@ -1,30 +1,28 @@ package main import ( + "context" "encoding/binary" "time" //"encoding/hex" "fmt" - "github.com/gorilla/websocket" - "github.com/tendermint/go-wire" - _ "github.com/tendermint/tendermint/rpc/core/types" // Register RPCResponse > Result types - "github.com/tendermint/tendermint/rpc/lib/client" - "github.com/tendermint/tendermint/rpc/lib/types" - . "github.com/tendermint/tmlibs/common" + rpcclient "github.com/tendermint/tendermint/rpc/lib/client" + cmn "github.com/tendermint/tmlibs/common" ) func main() { - ws := rpcclient.NewWSClient("127.0.0.1:46657", "/websocket") - _, err := ws.Start() + wsc := rpcclient.NewWSClient("127.0.0.1:46657", "/websocket") + _, err := wsc.Start() if err != nil { - Exit(err.Error()) + cmn.Exit(err.Error()) } + defer wsc.Stop() // Read a bunch of responses go func() { for { - _, ok := <-ws.ResultsCh + _, ok := <-wsc.ResultsCh if !ok { break } @@ -37,24 +35,14 @@ func main() { for i := 0; ; i++ { binary.BigEndian.PutUint64(buf, uint64(i)) //txBytes := hex.EncodeToString(buf[:n]) - request, err := rpctypes.MapToRequest("fakeid", - "broadcast_tx", - map[string]interface{}{"tx": buf[:8]}) - if err != nil { - Exit(err.Error()) - } - reqBytes := wire.JSONBytes(request) - //fmt.Println("!!", string(reqBytes)) fmt.Print(".") - err = ws.WriteMessage(websocket.TextMessage, reqBytes) + err = wsc.Call(context.TODO(), "broadcast_tx", map[string]interface{}{"tx": buf[:8]}) if err != nil { - Exit(err.Error()) + cmn.Exit(err.Error()) } if i%1000 == 0 { fmt.Println(i) } time.Sleep(time.Microsecond * 1000) } - - ws.Stop() } diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index cb7149406..9623229e6 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -1,13 +1,14 @@ package client import ( + "context" "encoding/json" "fmt" "github.com/pkg/errors" data "github.com/tendermint/go-wire/data" ctypes "github.com/tendermint/tendermint/rpc/core/types" - "github.com/tendermint/tendermint/rpc/lib/client" + rpcclient "github.com/tendermint/tendermint/rpc/lib/client" "github.com/tendermint/tendermint/types" events "github.com/tendermint/tmlibs/events" ) @@ -349,14 +350,14 @@ func (w *WSEvents) parseEvent(data []byte) (err error) { // no way of exposing these failures, so we panic. // is this right? or silently ignore??? func (w *WSEvents) subscribe(event string) { - err := w.ws.Subscribe(event) + err := w.ws.Subscribe(context.TODO(), event) if err != nil { panic(err) } } func (w *WSEvents) unsubscribe(event string) { - err := w.ws.Unsubscribe(event) + err := w.ws.Unsubscribe(context.TODO(), event) if err != nil { panic(err) } diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index f018de67c..2e4b8557d 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -1,9 +1,13 @@ package rpcclient import ( + "context" "encoding/json" + "fmt" + "math" "net" "net/http" + "sync" "time" "github.com/gorilla/websocket" @@ -13,148 +17,325 @@ import ( ) const ( - wsResultsChannelCapacity = 10 - wsErrorsChannelCapacity = 1 - wsWriteTimeoutSeconds = 10 + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the server. + pongWait = 30 * time.Second + + // Send pings to server with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + + // Maximum reconnect attempts + maxReconnectAttempts = 25 ) type WSClient struct { cmn.BaseService + + conn *websocket.Conn + Address string // IP:PORT or /path/to/socket Endpoint string // /websocket/url/endpoint Dialer func(string, string) (net.Conn, error) - *websocket.Conn - ResultsCh chan json.RawMessage // closes upon WSClient.Stop() - ErrorsCh chan error // closes upon WSClient.Stop() + + // user facing channels, closed only when the client is being stopped. + ResultsCh chan json.RawMessage + ErrorsCh chan error + + // internal channels + send chan types.RPCRequest // user requests + backlog chan types.RPCRequest // stores a single user request received during a conn failure + reconnectAfter chan error // reconnect requests + receiveRoutineQuit chan struct{} // a way for receiveRoutine to close writeRoutine + + wg sync.WaitGroup } -// create a new connection +// NewWSClient returns a new client. func NewWSClient(remoteAddr, endpoint string) *WSClient { addr, dialer := makeHTTPDialer(remoteAddr) wsClient := &WSClient{ Address: addr, Dialer: dialer, Endpoint: endpoint, - Conn: nil, } wsClient.BaseService = *cmn.NewBaseService(nil, "WSClient", wsClient) return wsClient } -func (wsc *WSClient) String() string { - return wsc.Address + ", " + wsc.Endpoint +// String returns WS client full address. +func (c *WSClient) String() string { + return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint) } -// OnStart implements cmn.BaseService interface -func (wsc *WSClient) OnStart() error { - wsc.BaseService.OnStart() - err := wsc.dial() +// OnStart implements cmn.Service by dialing a server and creating read and +// write routines. +func (c *WSClient) OnStart() error { + err := c.dial() if err != nil { return err } - wsc.ResultsCh = make(chan json.RawMessage, wsResultsChannelCapacity) - wsc.ErrorsCh = make(chan error, wsErrorsChannelCapacity) - go wsc.receiveEventsRoutine() + + c.ResultsCh = make(chan json.RawMessage) + c.ErrorsCh = make(chan error) + + c.send = make(chan types.RPCRequest) + // 1 additional error may come from the read/write + // goroutine depending on which failed first. + c.reconnectAfter = make(chan error, 1) + // capacity for 1 request. a user won't be able to send more because the send + // channel is unbuffered. + c.backlog = make(chan types.RPCRequest, 1) + + c.startReadWriteRoutines() + go c.reconnectRoutine() + return nil } -// OnReset implements cmn.BaseService interface -func (wsc *WSClient) OnReset() error { - return nil +// OnStop implements cmn.Service. +func (c *WSClient) OnStop() {} + +// Stop overrides cmn.Service#Stop. There is no other way to wait until Quit +// channel is closed. +func (c *WSClient) Stop() bool { + success := c.BaseService.Stop() + // only close user-facing channels when we can't write to them + c.wg.Wait() + close(c.ResultsCh) + close(c.ErrorsCh) + return success } -func (wsc *WSClient) dial() error { +// Send asynchronously sends the given RPCRequest to the server. Results will +// be available on ResultsCh, errors, if any, on ErrorsCh. +func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error { + select { + case c.send <- request: + c.Logger.Info("sent a request", "req", request) + return nil + case <-ctx.Done(): + return ctx.Err() + } +} - // Dial +// Call asynchronously calls a given method by sending an RPCRequest to the +// server. Results will be available on ResultsCh, errors, if any, on ErrorsCh. +func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error { + request, err := types.MapToRequest("", method, params) + if err != nil { + return err + } + return c.Send(ctx, request) +} + +// CallWithArrayParams asynchronously calls a given method by sending an +// RPCRequest to the server. Results will be available on ResultsCh, errors, if +// any, on ErrorsCh. +func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error { + request, err := types.ArrayToRequest("", method, params) + if err != nil { + return err + } + return c.Send(ctx, request) +} + +/////////////////////////////////////////////////////////////////////////////// +// Private methods + +func (c *WSClient) dial() error { dialer := &websocket.Dialer{ - NetDial: wsc.Dialer, + NetDial: c.Dialer, Proxy: http.ProxyFromEnvironment, } rHeader := http.Header{} - con, _, err := dialer.Dial("ws://"+wsc.Address+wsc.Endpoint, rHeader) + conn, _, err := dialer.Dial("ws://"+c.Address+c.Endpoint, rHeader) if err != nil { return err } - // Set the ping/pong handlers - con.SetPingHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) - return nil - }) - con.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - return nil - }) - wsc.Conn = con + c.conn = conn return nil } -// OnStop implements cmn.BaseService interface -func (wsc *WSClient) OnStop() { - wsc.BaseService.OnStop() - wsc.Conn.Close() - // ResultsCh/ErrorsCh is closed in receiveEventsRoutine. -} +// reconnect tries to redial up to maxReconnectAttempts with exponential +// backoff. +func (c *WSClient) reconnect() error { + attempt := 0 -func (wsc *WSClient) receiveEventsRoutine() { for { - _, data, err := wsc.ReadMessage() + c.Logger.Info("reconnecting", "attempt", attempt+1) + + d := time.Duration(math.Exp2(float64(attempt))) + time.Sleep(d * time.Second) + + err := c.dial() if err != nil { - wsc.Logger.Info("WSClient failed to read message", "err", err, "data", string(data)) - wsc.Stop() - break + c.Logger.Error("failed to redial", "err", err) } else { - var response types.RPCResponse - err := json.Unmarshal(data, &response) + c.Logger.Info("reconnected") + return nil + } + + attempt++ + + if attempt > maxReconnectAttempts { + return errors.Wrap(err, "reached maximum reconnect attempts") + } + } +} + +func (c *WSClient) startReadWriteRoutines() { + c.wg.Add(2) + c.receiveRoutineQuit = make(chan struct{}) + go c.receiveRoutine() + go c.writeRoutine() +} + +func (c *WSClient) reconnectRoutine() { + for { + select { + case originalError := <-c.reconnectAfter: + // wait until writeRoutine and receiveRoutine finish + c.wg.Wait() + err := c.reconnect() + if err != nil { + c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError) + c.Stop() + return + } else { + // drain reconnectAfter + LOOP: + for { + select { + case <-c.reconnectAfter: + default: + break LOOP + } + } + c.startReadWriteRoutines() + return + } + case <-c.Quit: + return + } + } +} + +// The client ensures that there is at most one writer to a connection by +// executing all writes from this goroutine. +func (c *WSClient) writeRoutine() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.conn.Close() + c.wg.Done() + }() + + for { + select { + case request := <-c.backlog: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + err := c.conn.WriteJSON(request) if err != nil { - wsc.Logger.Info("WSClient failed to parse message", "err", err, "data", string(data)) - wsc.ErrorsCh <- err - continue + c.Logger.Error("failed to resend request", "err", err) + c.reconnectAfter <- err + // add request to the backlog, so we don't lose it + c.backlog <- request + return } - if response.Error != "" { - wsc.ErrorsCh <- errors.Errorf(response.Error) - continue + c.Logger.Info("resend a request", "req", request) + case request := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + err := c.conn.WriteJSON(request) + if err != nil { + c.Logger.Error("failed to send request", "err", err) + c.reconnectAfter <- err + // add request to the backlog, so we don't lose it + c.backlog <- request + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + err := c.conn.WriteMessage(websocket.PingMessage, []byte{}) + if err != nil { + c.Logger.Error("failed to write ping", "err", err) + c.reconnectAfter <- err + return } - wsc.ResultsCh <- *response.Result + c.Logger.Debug("sent ping") + case <-c.receiveRoutineQuit: + return + case <-c.Quit: + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return } } - // this must be modified in the same go-routine that reads from the - // connection to avoid race conditions - wsc.Conn = nil +} + +// The client ensures that there is at most one reader to a connection by +// executing all reads from this goroutine. +func (c *WSClient) receiveRoutine() { + defer func() { + c.conn.Close() + c.wg.Done() + }() - // Cleanup - close(wsc.ResultsCh) - close(wsc.ErrorsCh) + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.Logger.Debug("got pong") + return nil + }) + for { + _, data, err := c.conn.ReadMessage() + if err != nil { + if !websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { + return + } + + c.Logger.Error("failed to read response", "err", err) + close(c.receiveRoutineQuit) + c.reconnectAfter <- err + return + } + + var response types.RPCResponse + err = json.Unmarshal(data, &response) + if err != nil { + c.Logger.Error("failed to parse response", "err", err, "data", string(data)) + c.ErrorsCh <- err + continue + } + if response.Error != "" { + c.ErrorsCh <- errors.Errorf(response.Error) + continue + } + c.Logger.Info("got response", "resp", response.Result) + c.ResultsCh <- *response.Result + } } +/////////////////////////////////////////////////////////////////////////////// +// Predefined methods + // Subscribe to an event. Note the server must have a "subscribe" route // defined. -func (wsc *WSClient) Subscribe(eventid string) error { - params := map[string]interface{}{"event": eventid} - request, err := types.MapToRequest("", "subscribe", params) - if err == nil { - err = wsc.WriteJSON(request) - } - return err +func (c *WSClient) Subscribe(ctx context.Context, eventType string) error { + params := map[string]interface{}{"event": eventType} + return c.Call(ctx, "subscribe", params) } // Unsubscribe from an event. Note the server must have a "unsubscribe" route // defined. -func (wsc *WSClient) Unsubscribe(eventid string) error { - params := map[string]interface{}{"event": eventid} - request, err := types.MapToRequest("", "unsubscribe", params) - if err == nil { - err = wsc.WriteJSON(request) - } - return err +func (c *WSClient) Unsubscribe(ctx context.Context, eventType string) error { + params := map[string]interface{}{"event": eventType} + return c.Call(ctx, "unsubscribe", params) } -// Call asynchronously calls a given method by sending an RPCRequest to the -// server. Results will be available on ResultsCh, errors, if any, on ErrorsCh. -func (wsc *WSClient) Call(method string, params map[string]interface{}) error { - request, err := types.MapToRequest("", method, params) - if err == nil { - err = wsc.WriteJSON(request) - } - return err +// UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route +// defined. +func (c *WSClient) UnsubscribeAll(ctx context.Context) error { + params := map[string]interface{}{} + return c.Call(ctx, "unsubscribe_all", params) } diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go new file mode 100644 index 000000000..6a72ad28d --- /dev/null +++ b/rpc/lib/client/ws_client_test.go @@ -0,0 +1,204 @@ +package rpcclient + +import ( + "context" + "encoding/json" + "net" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" + "github.com/tendermint/tmlibs/log" + + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +type myHandler struct { + closeConnAfterRead bool + mtx sync.RWMutex +} + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +func (h *myHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + panic(err) + } + for { + messageType, _, err := conn.ReadMessage() + if err != nil { + return + } + + h.mtx.RLock() + if h.closeConnAfterRead { + conn.Close() + } + h.mtx.RUnlock() + + res := json.RawMessage(`{}`) + emptyRespBytes, _ := json.Marshal(types.RPCResponse{Result: &res}) + if err := conn.WriteMessage(messageType, emptyRespBytes); err != nil { + return + } + } +} + +func TestWSClientReconnectsAfterReadFailure(t *testing.T) { + var wg sync.WaitGroup + + // start server + h := &myHandler{} + s := httptest.NewServer(h) + defer s.Close() + + c := startClient(t, s.Listener.Addr()) + defer c.Stop() + + wg.Add(1) + go func() { + for { + select { + case res := <-c.ResultsCh: + if res != nil { + wg.Done() + } + case err := <-c.ErrorsCh: + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + case <-c.Quit: + return + } + } + }() + + h.mtx.Lock() + h.closeConnAfterRead = true + h.mtx.Unlock() + + // results in error + call(t, "a", c) + + // expect to reconnect almost immediately + time.Sleep(10 * time.Millisecond) + h.mtx.Lock() + h.closeConnAfterRead = false + h.mtx.Unlock() + + // should succeed + call(t, "b", c) + + wg.Wait() +} + +func TestWSClientReconnectsAfterWriteFailure(t *testing.T) { + var wg sync.WaitGroup + + // start server + h := &myHandler{} + s := httptest.NewServer(h) + + c := startClient(t, s.Listener.Addr()) + defer c.Stop() + + wg.Add(2) + go func() { + for { + select { + case res := <-c.ResultsCh: + if res != nil { + wg.Done() + } + case err := <-c.ErrorsCh: + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + case <-c.Quit: + return + } + } + }() + + // hacky way to abort the connection before write + c.conn.Close() + + // results in error, the client should resend on reconnect + call(t, "a", c) + + // expect to reconnect almost immediately + time.Sleep(10 * time.Millisecond) + + // should succeed + call(t, "b", c) + + wg.Wait() +} + +func TestWSClientReconnectFailure(t *testing.T) { + // start server + h := &myHandler{} + s := httptest.NewServer(h) + + c := startClient(t, s.Listener.Addr()) + defer c.Stop() + + go func() { + for { + select { + case <-c.ResultsCh: + case <-c.ErrorsCh: + case <-c.Quit: + return + } + } + }() + + // hacky way to abort the connection before write + c.conn.Close() + s.Close() + + // results in error + call(t, "a", c) + + // expect to reconnect almost immediately + time.Sleep(10 * time.Millisecond) + + done := make(chan struct{}) + go func() { + // client should block on this + call(t, "b", c) + close(done) + }() + + // test that client blocks on the second send + select { + case <-done: + t.Fatal("client should block on calling 'b' during reconnect") + case <-time.After(5 * time.Second): + t.Log("All good") + } +} + +func startClient(t *testing.T, addr net.Addr) *WSClient { + c := NewWSClient(addr.String(), "/websocket") + _, err := c.Start() + require.Nil(t, err) + c.SetLogger(log.TestingLogger()) + return c +} + +func call(t *testing.T, method string, c *WSClient) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := c.Call(ctx, method, make(map[string]interface{})) + require.NoError(t, err) +} diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index a79a5270e..50750e5db 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -2,15 +2,18 @@ package rpc import ( "bytes" + "context" crand "crypto/rand" "encoding/json" "fmt" "math/rand" "net/http" + "os" "os/exec" "testing" "time" + "github.com/go-kit/kit/log/term" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/go-wire/data" @@ -75,8 +78,29 @@ func EchoDataBytesResult(v data.Bytes) (*ResultEchoDataBytes, error) { return &ResultEchoDataBytes{v}, nil } +func TestMain(m *testing.M) { + setup() + code := m.Run() + os.Exit(code) +} + +var colorFn = func(keyvals ...interface{}) term.FgBgColor { + for i := 0; i < len(keyvals)-1; i += 2 { + if keyvals[i] == "socket" { + if keyvals[i+1] == "tcp" { + return term.FgBgColor{Fg: term.DarkBlue} + } else if keyvals[i+1] == "unix" { + return term.FgBgColor{Fg: term.DarkCyan} + } + } + } + return term.FgBgColor{} +} + // launch unix and tcp servers -func init() { +func setup() { + logger := log.NewTMLoggerWithColorFn(log.NewSyncWriter(os.Stdout), colorFn) + cmd := exec.Command("rm", "-f", unixSocket) err := cmd.Start() if err != nil { @@ -86,25 +110,27 @@ func init() { panic(err) } + tcpLogger := logger.With("socket", "tcp") mux := http.NewServeMux() - server.RegisterRPCFuncs(mux, Routes, log.TestingLogger()) + server.RegisterRPCFuncs(mux, Routes, tcpLogger) wm := server.NewWebsocketManager(Routes, nil) - wm.SetLogger(log.TestingLogger()) + wm.SetLogger(tcpLogger) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) go func() { - _, err := server.StartHTTPServer(tcpAddr, mux, log.TestingLogger()) + _, err := server.StartHTTPServer(tcpAddr, mux, tcpLogger) if err != nil { panic(err) } }() + unixLogger := logger.With("socket", "unix") mux2 := http.NewServeMux() - server.RegisterRPCFuncs(mux2, Routes, log.TestingLogger()) + server.RegisterRPCFuncs(mux2, Routes, unixLogger) wm = server.NewWebsocketManager(Routes, nil) - wm.SetLogger(log.TestingLogger()) + wm.SetLogger(unixLogger) mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler) go func() { - _, err := server.StartHTTPServer(unixAddr, mux2, log.TestingLogger()) + _, err := server.StartHTTPServer(unixAddr, mux2, unixLogger) if err != nil { panic(err) } @@ -184,7 +210,7 @@ func echoViaWS(cl *client.WSClient, val string) (string, error) { params := map[string]interface{}{ "arg": val, } - err := cl.Call("echo", params) + err := cl.Call(context.Background(), "echo", params) if err != nil { return "", err } @@ -206,7 +232,7 @@ func echoBytesViaWS(cl *client.WSClient, bytes []byte) ([]byte, error) { params := map[string]interface{}{ "arg": bytes, } - err := cl.Call("echo_bytes", params) + err := cl.Call(context.Background(), "echo_bytes", params) if err != nil { return []byte{}, err } @@ -252,6 +278,7 @@ func TestServersAndClientsBasic(t *testing.T) { cl3 := client.NewWSClient(addr, websocketEndpoint) _, err := cl3.Start() require.Nil(t, err) + cl3.SetLogger(log.TestingLogger()) fmt.Printf("=== testing server on %s using %v client", addr, cl3) testWithWSClient(t, cl3) cl3.Stop() @@ -280,13 +307,14 @@ func TestWSNewWSRPCFunc(t *testing.T) { cl := client.NewWSClient(tcpAddr, websocketEndpoint) _, err := cl.Start() require.Nil(t, err) + cl.SetLogger(log.TestingLogger()) defer cl.Stop() val := "acbd" params := map[string]interface{}{ "arg": val, } - err = cl.Call("echo_ws", params) + err = cl.Call(context.Background(), "echo_ws", params) require.Nil(t, err) select { @@ -305,13 +333,12 @@ func TestWSHandlesArrayParams(t *testing.T) { cl := client.NewWSClient(tcpAddr, websocketEndpoint) _, err := cl.Start() require.Nil(t, err) + cl.SetLogger(log.TestingLogger()) defer cl.Stop() val := "acbd" params := []interface{}{val} - request, err := types.ArrayToRequest("", "echo_ws", params) - require.Nil(t, err) - err = cl.WriteJSON(request) + err = cl.CallWithArrayParams(context.Background(), "echo_ws", params) require.Nil(t, err) select { @@ -326,6 +353,22 @@ func TestWSHandlesArrayParams(t *testing.T) { } } +// TestWSClientPingPong checks that a client & server exchange pings +// & pongs so connection stays alive. +func TestWSClientPingPong(t *testing.T) { + if testing.Short() { + t.Skip("skipping ping pong in short mode") + } + + cl := client.NewWSClient(tcpAddr, websocketEndpoint) + _, err := cl.Start() + require.Nil(t, err) + cl.SetLogger(log.TestingLogger()) + defer cl.Stop() + + time.Sleep(35 * time.Second) +} + func randBytes(t *testing.T) []byte { n := rand.Intn(10) + 2 buf := make([]byte, n) diff --git a/rpc/lib/test/integration_test.sh b/rpc/lib/test/integration_test.sh index 7c23be7d3..ca15440c1 100755 --- a/rpc/lib/test/integration_test.sh +++ b/rpc/lib/test/integration_test.sh @@ -9,6 +9,9 @@ DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" # Change into that dir because we expect that. pushd "$DIR" +echo "==> Installing deps" +go get -v + echo "==> Building the server" go build -o rpcserver main.go diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index 8076e4b0d..f4a2cede0 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -2,6 +2,7 @@ package rpctypes import ( "encoding/json" + "fmt" "strings" events "github.com/tendermint/tmlibs/events" @@ -23,6 +24,10 @@ func NewRPCRequest(id string, method string, params json.RawMessage) RPCRequest } } +func (req RPCRequest) String() string { + return fmt.Sprintf("[%s %s]", req.ID, req.Method) +} + func MapToRequest(id string, method string, params map[string]interface{}) (RPCRequest, error) { payload, err := json.Marshal(params) if err != nil { @@ -70,6 +75,14 @@ func NewRPCResponse(id string, res interface{}, err string) RPCResponse { } } +func (resp RPCResponse) String() string { + if resp.Error == "" { + return fmt.Sprintf("[%s %v]", resp.ID, resp.Result) + } else { + return fmt.Sprintf("[%s %s]", resp.ID, resp.Error) + } +} + //---------------------------------------- // *wsConnection implements this interface. From c08618f7e99bae3813c3ea52a1b1dfe49c542057 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 3 Aug 2017 14:19:39 -0400 Subject: [PATCH 02/16] expose latency timer on WSClient --- glide.lock | 7 ++++--- glide.yaml | 20 +++++++++++--------- rpc/lib/client/ws_client.go | 21 +++++++++++++++++---- rpc/lib/rpc_test.go | 8 ++++---- 4 files changed, 36 insertions(+), 20 deletions(-) diff --git a/glide.lock b/glide.lock index c1bf21c14..6ab0354eb 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 2c988aae9517b386ee911e4da5deb9f5034359b7e2ccf448952a3ddb9771222d -updated: 2017-06-28T13:04:20.907047164+02:00 +hash: 41581813ff97225a7feb86b5accb0fe4acb3e198b64592d7452240e9473c479f +updated: 2017-08-03T19:17:16.410522485Z imports: - name: github.com/btcsuite/btcd version: b8df516b4b267acf2de46be593a9d948d1d2c420 @@ -61,6 +61,8 @@ imports: version: 5ccdfb18c776b740aecaf085c4d9a2779199c279 - name: github.com/pkg/errors version: 645ef00459ed84a119197bfb8d8205042c6df63d +- name: github.com/rcrowley/go-metrics + version: 1f30fe9094a513ce4c700b9a54458bbb0c96996c - name: github.com/spf13/afero version: 9be650865eab0c12963d8753212f4f9c66cdcf12 subpackages: @@ -126,7 +128,6 @@ imports: - clist - common - db - - events - flowrate - log - merkle diff --git a/glide.yaml b/glide.yaml index 9bf95b508..78f9fec6e 100644 --- a/glide.yaml +++ b/glide.yaml @@ -7,11 +7,10 @@ import: - package: github.com/golang/protobuf subpackages: - proto -- package: github.com/pelletier/go-toml - version: ^1.0.0 - package: github.com/gorilla/websocket - package: github.com/pkg/errors version: ~0.8.0 +- package: github.com/rcrowley/go-metrics - package: github.com/spf13/cobra - package: github.com/spf13/viper - package: github.com/tendermint/abci @@ -26,21 +25,15 @@ import: version: ~0.6.2 subpackages: - data -- package: github.com/tendermint/merkleeyes - version: ~0.2.4 - subpackages: - - app - - iavl - - testutil - package: github.com/tendermint/tmlibs version: ~0.2.2 subpackages: - autofile - cli + - cli/flags - clist - common - db - - events - flowrate - log - merkle @@ -53,7 +46,16 @@ import: subpackages: - context - package: google.golang.org/grpc +- package: github.com/tendermint/merkleeyes + version: ~0.2.4 + subpackages: + - app + - iavl + - testutil testImport: +- package: github.com/go-kit/kit + subpackages: + - log/term - package: github.com/stretchr/testify subpackages: - assert diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 2e4b8557d..9e456c6dc 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -12,6 +12,8 @@ import ( "github.com/gorilla/websocket" "github.com/pkg/errors" + metrics "github.com/rcrowley/go-metrics" + types "github.com/tendermint/tendermint/rpc/lib/types" cmn "github.com/tendermint/tmlibs/common" ) @@ -39,6 +41,9 @@ type WSClient struct { Endpoint string // /websocket/url/endpoint Dialer func(string, string) (net.Conn, error) + PingPongLatencyTimer metrics.Timer + sentLastPingAt time.Time + // user facing channels, closed only when the client is being stopped. ResultsCh chan json.RawMessage ErrorsCh chan error @@ -49,16 +54,18 @@ type WSClient struct { reconnectAfter chan error // reconnect requests receiveRoutineQuit chan struct{} // a way for receiveRoutine to close writeRoutine - wg sync.WaitGroup + wg sync.WaitGroup + mtx sync.RWMutex } // NewWSClient returns a new client. func NewWSClient(remoteAddr, endpoint string) *WSClient { addr, dialer := makeHTTPDialer(remoteAddr) wsClient := &WSClient{ - Address: addr, - Dialer: dialer, - Endpoint: endpoint, + Address: addr, + Dialer: dialer, + Endpoint: endpoint, + PingPongLatencyTimer: metrics.NewTimer(), } wsClient.BaseService = *cmn.NewBaseService(nil, "WSClient", wsClient) return wsClient @@ -263,6 +270,9 @@ func (c *WSClient) writeRoutine() { c.reconnectAfter <- err return } + c.mtx.Lock() + c.sentLastPingAt = time.Now() + c.mtx.Unlock() c.Logger.Debug("sent ping") case <-c.receiveRoutineQuit: return @@ -284,6 +294,9 @@ func (c *WSClient) receiveRoutine() { c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.mtx.RLock() + c.PingPongLatencyTimer.UpdateSince(c.sentLastPingAt) + c.mtx.RUnlock() c.Logger.Debug("got pong") return nil }) diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index 50750e5db..341aea467 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -276,9 +276,9 @@ func TestServersAndClientsBasic(t *testing.T) { testWithHTTPClient(t, cl2) cl3 := client.NewWSClient(addr, websocketEndpoint) + cl3.SetLogger(log.TestingLogger()) _, err := cl3.Start() require.Nil(t, err) - cl3.SetLogger(log.TestingLogger()) fmt.Printf("=== testing server on %s using %v client", addr, cl3) testWithWSClient(t, cl3) cl3.Stop() @@ -305,9 +305,9 @@ func TestQuotedStringArg(t *testing.T) { func TestWSNewWSRPCFunc(t *testing.T) { cl := client.NewWSClient(tcpAddr, websocketEndpoint) + cl.SetLogger(log.TestingLogger()) _, err := cl.Start() require.Nil(t, err) - cl.SetLogger(log.TestingLogger()) defer cl.Stop() val := "acbd" @@ -331,9 +331,9 @@ func TestWSNewWSRPCFunc(t *testing.T) { func TestWSHandlesArrayParams(t *testing.T) { cl := client.NewWSClient(tcpAddr, websocketEndpoint) + cl.SetLogger(log.TestingLogger()) _, err := cl.Start() require.Nil(t, err) - cl.SetLogger(log.TestingLogger()) defer cl.Stop() val := "acbd" @@ -361,9 +361,9 @@ func TestWSClientPingPong(t *testing.T) { } cl := client.NewWSClient(tcpAddr, websocketEndpoint) + cl.SetLogger(log.TestingLogger()) _, err := cl.Start() require.Nil(t, err) - cl.SetLogger(log.TestingLogger()) defer cl.Stop() time.Sleep(35 * time.Second) From 54903adeff36c250a00e47ded2b4fd3899a24253 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 3 Aug 2017 14:20:17 -0400 Subject: [PATCH 03/16] add IsReconnecting and IsActive methods --- rpc/lib/client/ws_client.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 9e456c6dc..b70ed359d 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -54,6 +54,8 @@ type WSClient struct { reconnectAfter chan error // reconnect requests receiveRoutineQuit chan struct{} // a way for receiveRoutine to close writeRoutine + reconnecting bool + wg sync.WaitGroup mtx sync.RWMutex } @@ -115,6 +117,16 @@ func (c *WSClient) Stop() bool { return success } +// IsReconnecting returns true if the client is reconnecting right now. +func (c *WSClient) IsReconnecting() bool { + return c.reconnecting +} + +// IsActive returns true if the client is running and not reconnecting. +func (c *WSClient) IsActive() bool { + return c.IsRunning() && !c.IsReconnecting() +} + // Send asynchronously sends the given RPCRequest to the server. Results will // be available on ResultsCh, errors, if any, on ErrorsCh. func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error { @@ -170,6 +182,11 @@ func (c *WSClient) dial() error { func (c *WSClient) reconnect() error { attempt := 0 + c.reconnecting = true + defer func() { + c.reconnecting = false + }() + for { c.Logger.Info("reconnecting", "attempt", attempt+1) From 1abbb11b4436ebf3244fe9e22fecfd25a0c2e601 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 3 Aug 2017 22:44:18 -0400 Subject: [PATCH 04/16] do not exit from reconnectRoutine! --- rpc/lib/client/ws_client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index b70ed359d..12eb97ab4 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -238,7 +238,6 @@ func (c *WSClient) reconnectRoutine() { } } c.startReadWriteRoutines() - return } case <-c.Quit: return From 0013053fae3fb7611c392ebcff15352bb7ec717b Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 4 Aug 2017 10:30:22 -0400 Subject: [PATCH 05/16] allow to change pong wait and ping period --- rpc/lib/client/ws_client.go | 46 +++++++++++++++++++++++--------- rpc/lib/client/ws_client_test.go | 7 +++++ 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 12eb97ab4..c617bf21a 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -22,14 +22,11 @@ const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second - // Time allowed to read the next pong message from the server. - pongWait = 30 * time.Second - - // Send pings to server with this period. Must be less than pongWait. - pingPeriod = (pongWait * 9) / 10 - // Maximum reconnect attempts maxReconnectAttempts = 25 + + defaultPongWait = 30 * time.Second + defaultPingPeriod = (defaultPongWait * 9) / 10 ) type WSClient struct { @@ -58,19 +55,42 @@ type WSClient struct { wg sync.WaitGroup mtx sync.RWMutex + + // Time allowed to read the next pong message from the server. + pongWait time.Duration + + // Send pings to server with this period. Must be less than pongWait. + pingPeriod time.Duration } // NewWSClient returns a new client. -func NewWSClient(remoteAddr, endpoint string) *WSClient { +func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient { addr, dialer := makeHTTPDialer(remoteAddr) - wsClient := &WSClient{ + c := &WSClient{ Address: addr, Dialer: dialer, Endpoint: endpoint, PingPongLatencyTimer: metrics.NewTimer(), + pongWait: defaultPongWait, + pingPeriod: defaultPingPeriod, + } + c.BaseService = *cmn.NewBaseService(nil, "WSClient", c) + for _, option := range options { + option(c) + } + return c +} + +// PingPong allows changing ping period and pong wait time. If ping period +// greater or equal to pong wait time, panic will be thrown. +func PingPong(pingPeriod, pongWait time.Duration) func(*WSClient) { + return func(c *WSClient) { + if pingPeriod >= pongWait { + panic(fmt.Sprintf("ping period (%v) must be less than pong wait time (%v)", pingPeriod, pongWait)) + } + c.pingPeriod = pingPeriod + c.pongWait = pongWait } - wsClient.BaseService = *cmn.NewBaseService(nil, "WSClient", wsClient) - return wsClient } // String returns WS client full address. @@ -248,7 +268,7 @@ func (c *WSClient) reconnectRoutine() { // The client ensures that there is at most one writer to a connection by // executing all writes from this goroutine. func (c *WSClient) writeRoutine() { - ticker := time.NewTicker(pingPeriod) + ticker := time.NewTicker(c.pingPeriod) defer func() { ticker.Stop() c.conn.Close() @@ -307,9 +327,9 @@ func (c *WSClient) receiveRoutine() { c.wg.Done() }() - c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetReadDeadline(time.Now().Add(c.pongWait)) c.conn.SetPongHandler(func(string) error { - c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetReadDeadline(time.Now().Add(c.pongWait)) c.mtx.RLock() c.PingPongLatencyTimer.UpdateSince(c.sentLastPingAt) c.mtx.RUnlock() diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index 6a72ad28d..6778a0894 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tmlibs/log" @@ -188,6 +189,12 @@ func TestWSClientReconnectFailure(t *testing.T) { } } +func TestWSClientPingPongOption(t *testing.T) { + assert.Panics(t, func() { + NewWSClient("tcp://localhost:8080", "/websocket", PingPong(2*time.Second, 2*time.Second)) + }) +} + func startClient(t *testing.T, addr net.Addr) *WSClient { c := NewWSClient(addr.String(), "/websocket") _, err := c.Start() From 5d66d1c28c572f787438f12f71fb5679cf34dc02 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Sat, 5 Aug 2017 12:07:02 -0400 Subject: [PATCH 06/16] fixes from review --- rpc/lib/client/ws_client.go | 42 +++++++++++----------- rpc/lib/client/ws_client_test.go | 61 ++++++++++++-------------------- 2 files changed, 45 insertions(+), 58 deletions(-) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index c617bf21a..758df658f 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -39,22 +39,23 @@ type WSClient struct { Dialer func(string, string) (net.Conn, error) PingPongLatencyTimer metrics.Timer - sentLastPingAt time.Time // user facing channels, closed only when the client is being stopped. ResultsCh chan json.RawMessage ErrorsCh chan error // internal channels - send chan types.RPCRequest // user requests - backlog chan types.RPCRequest // stores a single user request received during a conn failure - reconnectAfter chan error // reconnect requests - receiveRoutineQuit chan struct{} // a way for receiveRoutine to close writeRoutine + send chan types.RPCRequest // user requests + backlog chan types.RPCRequest // stores a single user request received during a conn failure + reconnectAfter chan error // reconnect requests + readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine reconnecting bool - wg sync.WaitGroup - mtx sync.RWMutex + wg sync.WaitGroup + + mtx sync.RWMutex + sentLastPingAt time.Time // Time allowed to read the next pong message from the server. pongWait time.Duration @@ -147,8 +148,9 @@ func (c *WSClient) IsActive() bool { return c.IsRunning() && !c.IsReconnecting() } -// Send asynchronously sends the given RPCRequest to the server. Results will -// be available on ResultsCh, errors, if any, on ErrorsCh. +// Send the given RPC request to the server. Results will be available on +// ResultsCh, errors, if any, on ErrorsCh. Will block until send succeeds or +// ctx.Done is closed. func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error { select { case c.send <- request: @@ -159,8 +161,7 @@ func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error { } } -// Call asynchronously calls a given method by sending an RPCRequest to the -// server. Results will be available on ResultsCh, errors, if any, on ErrorsCh. +// Call the given method. See Send description. func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error { request, err := types.MapToRequest("", method, params) if err != nil { @@ -169,9 +170,8 @@ func (c *WSClient) Call(ctx context.Context, method string, params map[string]in return c.Send(ctx, request) } -// CallWithArrayParams asynchronously calls a given method by sending an -// RPCRequest to the server. Results will be available on ResultsCh, errors, if -// any, on ErrorsCh. +// CallWithArrayParams the given method with params in a form of array. See +// Send description. func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error { request, err := types.ArrayToRequest("", method, params) if err != nil { @@ -231,8 +231,8 @@ func (c *WSClient) reconnect() error { func (c *WSClient) startReadWriteRoutines() { c.wg.Add(2) - c.receiveRoutineQuit = make(chan struct{}) - go c.receiveRoutine() + c.readRoutineQuit = make(chan struct{}) + go c.readRoutine() go c.writeRoutine() } @@ -240,7 +240,7 @@ func (c *WSClient) reconnectRoutine() { for { select { case originalError := <-c.reconnectAfter: - // wait until writeRoutine and receiveRoutine finish + // wait until writeRoutine and readRoutine finish c.wg.Wait() err := c.reconnect() if err != nil { @@ -310,7 +310,7 @@ func (c *WSClient) writeRoutine() { c.sentLastPingAt = time.Now() c.mtx.Unlock() c.Logger.Debug("sent ping") - case <-c.receiveRoutineQuit: + case <-c.readRoutineQuit: return case <-c.Quit: c.conn.WriteMessage(websocket.CloseMessage, []byte{}) @@ -321,13 +321,14 @@ func (c *WSClient) writeRoutine() { // The client ensures that there is at most one reader to a connection by // executing all reads from this goroutine. -func (c *WSClient) receiveRoutine() { +func (c *WSClient) readRoutine() { defer func() { c.conn.Close() c.wg.Done() }() c.conn.SetReadDeadline(time.Now().Add(c.pongWait)) + c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(c.pongWait)) c.mtx.RLock() @@ -336,6 +337,7 @@ func (c *WSClient) receiveRoutine() { c.Logger.Debug("got pong") return nil }) + for { _, data, err := c.conn.ReadMessage() if err != nil { @@ -344,7 +346,7 @@ func (c *WSClient) receiveRoutine() { } c.Logger.Error("failed to read response", "err", err) - close(c.receiveRoutineQuit) + close(c.readRoutineQuit) c.reconnectAfter <- err return } diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index 6778a0894..32385bfd7 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -65,28 +65,13 @@ func TestWSClientReconnectsAfterReadFailure(t *testing.T) { defer c.Stop() wg.Add(1) - go func() { - for { - select { - case res := <-c.ResultsCh: - if res != nil { - wg.Done() - } - case err := <-c.ErrorsCh: - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - case <-c.Quit: - return - } - } - }() + go callWgDoneOnResult(t, c, &wg) h.mtx.Lock() h.closeConnAfterRead = true h.mtx.Unlock() - // results in error + // results in WS read error, no send retry because write succeeded call(t, "a", c) // expect to reconnect almost immediately @@ -112,27 +97,12 @@ func TestWSClientReconnectsAfterWriteFailure(t *testing.T) { defer c.Stop() wg.Add(2) - go func() { - for { - select { - case res := <-c.ResultsCh: - if res != nil { - wg.Done() - } - case err := <-c.ErrorsCh: - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - case <-c.Quit: - return - } - } - }() + go callWgDoneOnResult(t, c, &wg) // hacky way to abort the connection before write c.conn.Close() - // results in error, the client should resend on reconnect + // results in WS write error, the client should resend on reconnect call(t, "a", c) // expect to reconnect almost immediately @@ -167,7 +137,7 @@ func TestWSClientReconnectFailure(t *testing.T) { c.conn.Close() s.Close() - // results in error + // results in WS write error call(t, "a", c) // expect to reconnect almost immediately @@ -204,8 +174,23 @@ func startClient(t *testing.T, addr net.Addr) *WSClient { } func call(t *testing.T, method string, c *WSClient) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - err := c.Call(ctx, method, make(map[string]interface{})) + err := c.Call(context.Background(), method, make(map[string]interface{})) require.NoError(t, err) } + +func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) { + for { + select { + case res := <-c.ResultsCh: + if res != nil { + wg.Done() + } + case err := <-c.ErrorsCh: + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + case <-c.Quit: + return + } + } +} From 57eee2466b304f5de9696176e1d842d04a93231e Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 7 Aug 2017 17:56:38 -0400 Subject: [PATCH 07/16] make WSClient thread-safe --- rpc/lib/client/ws_client.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 758df658f..d1bfc1d37 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -29,6 +29,8 @@ const ( defaultPingPeriod = (defaultPongWait * 9) / 10 ) +// WSClient is a WebSocket client. The methods of WSClient are safe for use by +// multiple goroutines. type WSClient struct { cmn.BaseService @@ -50,12 +52,11 @@ type WSClient struct { reconnectAfter chan error // reconnect requests readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine - reconnecting bool - wg sync.WaitGroup mtx sync.RWMutex sentLastPingAt time.Time + reconnecting bool // Time allowed to read the next pong message from the server. pongWait time.Duration @@ -64,7 +65,9 @@ type WSClient struct { pingPeriod time.Duration } -// NewWSClient returns a new client. +// NewWSClient returns a new client. See the commentary on the func(*WSClient) +// functions for a detailed description of how to configure ping period and +// pong wait time. func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient { addr, dialer := makeHTTPDialer(remoteAddr) c := &WSClient{ @@ -140,6 +143,8 @@ func (c *WSClient) Stop() bool { // IsReconnecting returns true if the client is reconnecting right now. func (c *WSClient) IsReconnecting() bool { + c.mtx.RLock() + defer c.mtx.RUnlock() return c.reconnecting } @@ -202,9 +207,13 @@ func (c *WSClient) dial() error { func (c *WSClient) reconnect() error { attempt := 0 + c.mtx.Lock() c.reconnecting = true + c.mtx.Unlock() defer func() { + c.mtx.Lock() c.reconnecting = false + c.mtx.Unlock() }() for { From c14b39da5f3ce599d047bc6b42b2045c9630251d Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 7 Aug 2017 18:29:55 -0400 Subject: [PATCH 08/16] make RPC server's ping period and pong wait configurable via options --- rpc/lib/rpc_test.go | 18 +++++------ rpc/lib/server/handlers.go | 65 +++++++++++++++++++++++++++----------- 2 files changed, 54 insertions(+), 29 deletions(-) diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index 341aea467..8bac7aa3c 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -31,6 +31,8 @@ const ( unixAddr = "unix://" + unixSocket websocketEndpoint = "/websocket/endpoint" + + testPongWait = 2 * time.Second ) type ResultEcho struct { @@ -113,7 +115,7 @@ func setup() { tcpLogger := logger.With("socket", "tcp") mux := http.NewServeMux() server.RegisterRPCFuncs(mux, Routes, tcpLogger) - wm := server.NewWebsocketManager(Routes, nil) + wm := server.NewWebsocketManager(Routes, nil, server.PingPong((testPongWait*9)/10, testPongWait)) wm.SetLogger(tcpLogger) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) go func() { @@ -276,7 +278,7 @@ func TestServersAndClientsBasic(t *testing.T) { testWithHTTPClient(t, cl2) cl3 := client.NewWSClient(addr, websocketEndpoint) - cl3.SetLogger(log.TestingLogger()) + cl3.SetLogger(log.TestingLogger()) _, err := cl3.Start() require.Nil(t, err) fmt.Printf("=== testing server on %s using %v client", addr, cl3) @@ -305,7 +307,7 @@ func TestQuotedStringArg(t *testing.T) { func TestWSNewWSRPCFunc(t *testing.T) { cl := client.NewWSClient(tcpAddr, websocketEndpoint) - cl.SetLogger(log.TestingLogger()) + cl.SetLogger(log.TestingLogger()) _, err := cl.Start() require.Nil(t, err) defer cl.Stop() @@ -331,7 +333,7 @@ func TestWSNewWSRPCFunc(t *testing.T) { func TestWSHandlesArrayParams(t *testing.T) { cl := client.NewWSClient(tcpAddr, websocketEndpoint) - cl.SetLogger(log.TestingLogger()) + cl.SetLogger(log.TestingLogger()) _, err := cl.Start() require.Nil(t, err) defer cl.Stop() @@ -356,17 +358,13 @@ func TestWSHandlesArrayParams(t *testing.T) { // TestWSClientPingPong checks that a client & server exchange pings // & pongs so connection stays alive. func TestWSClientPingPong(t *testing.T) { - if testing.Short() { - t.Skip("skipping ping pong in short mode") - } - cl := client.NewWSClient(tcpAddr, websocketEndpoint) - cl.SetLogger(log.TestingLogger()) + cl.SetLogger(log.TestingLogger()) _, err := cl.Start() require.Nil(t, err) defer cl.Stop() - time.Sleep(35 * time.Second) + time.Sleep((testPongWait * 11) / 10) } func randBytes(t *testing.T) []byte { diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index b6431a1ec..b95f606c9 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -337,10 +337,10 @@ func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) { // rpc.websocket const ( - writeChanCapacity = 1000 - wsWriteTimeoutSeconds = 30 // each write times out after this. - wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. - wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. + writeChanCapacity = 1000 + wsWriteWait = 30 * time.Second // each write times out after this. + defaultWSPongWait = 30 * time.Second + defaultWSPingPeriod = 10 * time.Second ) // a single websocket connection @@ -357,29 +357,54 @@ type wsConnection struct { funcMap map[string]*RPCFunc evsw events.EventSwitch + + // Connection times out if we haven't received *anything* in this long, not even pings. + pongWait time.Duration + + // Send pings to server with this period. Must be less than pongWait. + pingPeriod time.Duration } -// new websocket connection wrapper -func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch) *wsConnection { +// NewWSConnection wraps websocket.Conn. See the commentary on the +// func(*wsConnection) functions for a detailed description of how to configure +// ping period and pong wait time. +func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection { wsc := &wsConnection{ remoteAddr: baseConn.RemoteAddr().String(), baseConn: baseConn, writeChan: make(chan types.RPCResponse, writeChanCapacity), // error when full. funcMap: funcMap, evsw: evsw, + pongWait: defaultWSPongWait, + pingPeriod: defaultWSPingPeriod, + } + for _, option := range options { + option(wsc) } wsc.BaseService = *cmn.NewBaseService(nil, "wsConnection", wsc) return wsc } +// PingPong allows changing ping period and pong wait time. If ping period +// greater or equal to pong wait time, panic will be thrown. +func PingPong(pingPeriod, pongWait time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + if pingPeriod >= pongWait { + panic(fmt.Sprintf("ping period (%v) must be less than pong wait time (%v)", pingPeriod, pongWait)) + } + wsc.pingPeriod = pingPeriod + wsc.pongWait = pongWait + } +} + // wsc.Start() blocks until the connection closes. func (wsc *wsConnection) OnStart() error { wsc.BaseService.OnStart() // these must be set before the readRoutine is created, as it may // call wsc.Stop(), which accesses these timers - wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds) - wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds) + wsc.readTimeout = time.NewTimer(wsc.pongWait) + wsc.pingTicker = time.NewTicker(wsc.pingPeriod) // Read subscriptions/unsubscriptions to events go wsc.readRoutine() @@ -387,13 +412,13 @@ func (wsc *wsConnection) OnStart() error { // Custom Ping handler to touch readTimeout wsc.baseConn.SetPingHandler(func(m string) error { // NOTE: https://github.com/gorilla/websocket/issues/97 - go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) - wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) + go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(wsWriteWait)) + wsc.readTimeout.Reset(wsc.pongWait) return nil }) wsc.baseConn.SetPongHandler(func(m string) error { // NOTE: https://github.com/gorilla/websocket/issues/97 - wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) + wsc.readTimeout.Reset(wsc.pongWait) return nil }) go wsc.readTimeoutRoutine() @@ -472,7 +497,7 @@ func (wsc *wsConnection) readRoutine() { default: var in []byte // Do not set a deadline here like below: - // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds)) + // wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.pongWait)) // The client may not send anything for a while. // We use `readTimeout` to handle read timeouts. _, in, err := wsc.baseConn.ReadMessage() @@ -559,7 +584,7 @@ func (wsc *wsConnection) writeRoutine() { // All writes to the websocket must (re)set the write deadline. // If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553) func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error { - wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) + wsc.baseConn.SetWriteDeadline(time.Now().Add(wsWriteWait)) return wsc.baseConn.WriteMessage(msgType, msg) } @@ -570,12 +595,13 @@ func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error // NOTE: The websocket path is defined externally, e.g. in node/node.go type WebsocketManager struct { websocket.Upgrader - funcMap map[string]*RPCFunc - evsw events.EventSwitch - logger log.Logger + funcMap map[string]*RPCFunc + evsw events.EventSwitch + logger log.Logger + wsConnOptions []func(*wsConnection) } -func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch) *WebsocketManager { +func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, wsConnOptions ...func(*wsConnection)) *WebsocketManager { return &WebsocketManager{ funcMap: funcMap, evsw: evsw, @@ -587,7 +613,8 @@ func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch) * return true }, }, - logger: log.NewNopLogger(), + logger: log.NewNopLogger(), + wsConnOptions: wsConnOptions, } } @@ -605,7 +632,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ } // register connection - con := NewWSConnection(wsConn, wm.funcMap, wm.evsw) + con := NewWSConnection(wsConn, wm.funcMap, wm.evsw, wm.wsConnOptions...) con.SetLogger(wm.logger) wm.logger.Info("New websocket connection", "remote", con.remoteAddr) con.Start() // Blocking From 23a87304cc2d2c9d2ae64c19a0b802d23ec3be3d Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 8 Aug 2017 13:20:58 -0400 Subject: [PATCH 09/16] add a comment for PingPongLatencyTimer [ci skip] --- rpc/lib/client/ws_client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index d1bfc1d37..2c379c86a 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -19,7 +19,7 @@ import ( ) const ( - // Time allowed to write a message to the peer. + // Time allowed to write a message to the server. writeWait = 10 * time.Second // Maximum reconnect attempts @@ -40,6 +40,8 @@ type WSClient struct { Endpoint string // /websocket/url/endpoint Dialer func(string, string) (net.Conn, error) + // Time between sending a ping and receiving a pong. See + // https://godoc.org/github.com/rcrowley/go-metrics#Timer. PingPongLatencyTimer metrics.Timer // user facing channels, closed only when the client is being stopped. From 6c85e4be4f027e2e5fe25bc79910a09407a6e90c Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 8 Aug 2017 13:21:59 -0400 Subject: [PATCH 10/16] change server ping period to be less frequent no need to ping ws every 10 sec --- rpc/lib/server/handlers.go | 2 +- rpc/lib/test/integration_test.sh | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index b95f606c9..78845c090 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -340,7 +340,7 @@ const ( writeChanCapacity = 1000 wsWriteWait = 30 * time.Second // each write times out after this. defaultWSPongWait = 30 * time.Second - defaultWSPingPeriod = 10 * time.Second + defaultWSPingPeriod = (defaultWSPongWait * 9) / 10 ) // a single websocket connection diff --git a/rpc/lib/test/integration_test.sh b/rpc/lib/test/integration_test.sh index ca15440c1..7c23be7d3 100755 --- a/rpc/lib/test/integration_test.sh +++ b/rpc/lib/test/integration_test.sh @@ -9,9 +9,6 @@ DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" # Change into that dir because we expect that. pushd "$DIR" -echo "==> Installing deps" -go get -v - echo "==> Building the server" go build -o rpcserver main.go From 82679207495b7127c2ca66cbd25e69d18ccb09fe Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 8 Aug 2017 16:02:37 -0400 Subject: [PATCH 11/16] [ws-client] write normal close message --- rpc/lib/client/ws_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 2c379c86a..fac32d85a 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -324,7 +324,7 @@ func (c *WSClient) writeRoutine() { case <-c.readRoutineQuit: return case <-c.Quit: - c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) return } } @@ -352,7 +352,7 @@ func (c *WSClient) readRoutine() { for { _, data, err := c.conn.ReadMessage() if err != nil { - if !websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { return } From 9b5f21a650d3fc380aaa8b188c50af2f83422ce8 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 8 Aug 2017 16:03:04 -0400 Subject: [PATCH 12/16] [ws-server] reset readTimeout when we receive something --- rpc/lib/server/handlers.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 78845c090..431ef742f 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -502,12 +502,16 @@ func (wsc *wsConnection) readRoutine() { // We use `readTimeout` to handle read timeouts. _, in, err := wsc.baseConn.ReadMessage() if err != nil { - wsc.Logger.Info("Failed to read from connection", "remote", wsc.remoteAddr, "err", err.Error()) - // an error reading the connection, - // kill the connection + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + wsc.Logger.Info("Client closed the connection", "remote", wsc.remoteAddr) + } else { + wsc.Logger.Info("Failed to read from connection", "remote", wsc.remoteAddr, "err", err.Error()) + } wsc.Stop() return } + wsc.readTimeout.Reset(wsc.pongWait) + var request types.RPCRequest err = json.Unmarshal(in, &request) if err != nil { From 797acbe911ab8fa6d70bb8053ad29497d1fc0302 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 8 Aug 2017 17:33:17 -0400 Subject: [PATCH 13/16] ws: small comment --- rpc/lib/client/ws_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index fac32d85a..031c46512 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -69,7 +69,7 @@ type WSClient struct { // NewWSClient returns a new client. See the commentary on the func(*WSClient) // functions for a detailed description of how to configure ping period and -// pong wait time. +// pong wait time. The endpoint argument must begin with a `/`. func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient { addr, dialer := makeHTTPDialer(remoteAddr) c := &WSClient{ From 236489aecf61f2ff0da6ed321e6edf2d11a97f89 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 8 Aug 2017 19:02:47 -0400 Subject: [PATCH 14/16] backlog must always have higher priority --- rpc/lib/client/ws_client.go | 34 +++++++++++++++++++++----------- rpc/lib/client/ws_client_test.go | 5 ++++- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 031c46512..19c066099 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -247,6 +247,24 @@ func (c *WSClient) startReadWriteRoutines() { go c.writeRoutine() } +func (c *WSClient) processBacklog() error { + select { + case request := <-c.backlog: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + err := c.conn.WriteJSON(request) + if err != nil { + c.Logger.Error("failed to resend request", "err", err) + c.reconnectAfter <- err + // requeue request + c.backlog <- request + return err + } + c.Logger.Info("resend a request", "req", request) + default: + } + return nil +} + func (c *WSClient) reconnectRoutine() { for { select { @@ -268,7 +286,10 @@ func (c *WSClient) reconnectRoutine() { break LOOP } } - c.startReadWriteRoutines() + err = c.processBacklog() + if err == nil { + c.startReadWriteRoutines() + } } case <-c.Quit: return @@ -288,17 +309,6 @@ func (c *WSClient) writeRoutine() { for { select { - case request := <-c.backlog: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) - err := c.conn.WriteJSON(request) - if err != nil { - c.Logger.Error("failed to resend request", "err", err) - c.reconnectAfter <- err - // add request to the backlog, so we don't lose it - c.backlog <- request - return - } - c.Logger.Info("resend a request", "req", request) case request := <-c.send: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) err := c.conn.WriteJSON(request) diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index 32385bfd7..d1b9764c3 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -138,7 +138,10 @@ func TestWSClientReconnectFailure(t *testing.T) { s.Close() // results in WS write error - call(t, "a", c) + // provide timeout to avoid blocking + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + c.Call(ctx, "a", make(map[string]interface{})) // expect to reconnect almost immediately time.Sleep(10 * time.Millisecond) From 2fd8496bc109d010c6c2e415604131b500550e37 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 10 Aug 2017 17:39:38 -0400 Subject: [PATCH 15/16] correct handling of pings and pongs server: - always has read & write timeouts - ping handler never blocks the reader (see A) - sends regular pings to check up on a client A: at some point server write buffer can become full, so in order not to block reads from a client (see https://github.com/gorilla/websocket/issues/97), server may skip some pongs. As a result, client may disconnect. But you either have to do that or block the reader. There is no third way. client: - optional read & write timeouts - optional ping/pong to measure latency --- node/node.go | 4 +- rpc/lib/client/ws_client.go | 93 ++++++++++++------ rpc/lib/client/ws_client_test.go | 7 -- rpc/lib/rpc_test.go | 6 +- rpc/lib/server/handlers.go | 164 ++++++++++++++++--------------- 5 files changed, 151 insertions(+), 123 deletions(-) diff --git a/node/node.go b/node/node.go index 672e384b9..bcb549fc4 100644 --- a/node/node.go +++ b/node/node.go @@ -330,9 +330,9 @@ func (n *Node) startRPC() ([]net.Listener, error) { listeners := make([]net.Listener, len(listenAddrs)) for i, listenAddr := range listenAddrs { mux := http.NewServeMux() - wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw) rpcLogger := n.Logger.With("module", "rpc-server") - wm.SetLogger(rpcLogger) + wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw) + wm.SetLogger(rpcLogger.With("protocol", "websocket")) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 19c066099..6a932caf6 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -19,14 +19,10 @@ import ( ) const ( - // Time allowed to write a message to the server. - writeWait = 10 * time.Second - - // Maximum reconnect attempts - maxReconnectAttempts = 25 - - defaultPongWait = 30 * time.Second - defaultPingPeriod = (defaultPongWait * 9) / 10 + defaultMaxReconnectAttempts = 25 + defaultWriteWait = 0 + defaultReadWait = 0 + defaultPingPeriod = 0 ) // WSClient is a WebSocket client. The methods of WSClient are safe for use by @@ -60,10 +56,16 @@ type WSClient struct { sentLastPingAt time.Time reconnecting bool - // Time allowed to read the next pong message from the server. - pongWait time.Duration + // Maximum reconnect attempts (0 or greater; default: 25). + maxReconnectAttempts int + + // Time allowed to write a message to the server. 0 means block until operation succeeds. + writeWait time.Duration + + // Time allowed to read the next message from the server. 0 means block until operation succeeds. + readWait time.Duration - // Send pings to server with this period. Must be less than pongWait. + // Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent. pingPeriod time.Duration } @@ -77,7 +79,10 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSCli Dialer: dialer, Endpoint: endpoint, PingPongLatencyTimer: metrics.NewTimer(), - pongWait: defaultPongWait, + + maxReconnectAttempts: defaultMaxReconnectAttempts, + readWait: defaultReadWait, + writeWait: defaultWriteWait, pingPeriod: defaultPingPeriod, } c.BaseService = *cmn.NewBaseService(nil, "WSClient", c) @@ -87,15 +92,27 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSCli return c } -// PingPong allows changing ping period and pong wait time. If ping period -// greater or equal to pong wait time, panic will be thrown. -func PingPong(pingPeriod, pongWait time.Duration) func(*WSClient) { +func MaxReconnectAttempts(max int) func(*WSClient) { + return func(c *WSClient) { + c.maxReconnectAttempts = max + } +} + +func ReadWait(readWait time.Duration) func(*WSClient) { + return func(c *WSClient) { + c.readWait = readWait + } +} + +func WriteWait(writeWait time.Duration) func(*WSClient) { + return func(c *WSClient) { + c.writeWait = writeWait + } +} + +func PingPeriod(pingPeriod time.Duration) func(*WSClient) { return func(c *WSClient) { - if pingPeriod >= pongWait { - panic(fmt.Sprintf("ping period (%v) must be less than pong wait time (%v)", pingPeriod, pongWait)) - } c.pingPeriod = pingPeriod - c.pongWait = pongWait } } @@ -234,7 +251,7 @@ func (c *WSClient) reconnect() error { attempt++ - if attempt > maxReconnectAttempts { + if attempt > c.maxReconnectAttempts { return errors.Wrap(err, "reached maximum reconnect attempts") } } @@ -250,7 +267,9 @@ func (c *WSClient) startReadWriteRoutines() { func (c *WSClient) processBacklog() error { select { case request := <-c.backlog: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if c.writeWait > 0 { + c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)) + } err := c.conn.WriteJSON(request) if err != nil { c.Logger.Error("failed to resend request", "err", err) @@ -300,7 +319,15 @@ func (c *WSClient) reconnectRoutine() { // The client ensures that there is at most one writer to a connection by // executing all writes from this goroutine. func (c *WSClient) writeRoutine() { - ticker := time.NewTicker(c.pingPeriod) + var ticker *time.Ticker + if c.pingPeriod > 0 { + // ticker with a predefined period + ticker = time.NewTicker(c.pingPeriod) + } else { + // ticker that never fires + ticker = &time.Ticker{C: make(<-chan time.Time)} + } + defer func() { ticker.Stop() c.conn.Close() @@ -310,7 +337,9 @@ func (c *WSClient) writeRoutine() { for { select { case request := <-c.send: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if c.writeWait > 0 { + c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)) + } err := c.conn.WriteJSON(request) if err != nil { c.Logger.Error("failed to send request", "err", err) @@ -320,7 +349,9 @@ func (c *WSClient) writeRoutine() { return } case <-ticker.C: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if c.writeWait > 0 { + c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)) + } err := c.conn.WriteMessage(websocket.PingMessage, []byte{}) if err != nil { c.Logger.Error("failed to write ping", "err", err) @@ -348,21 +379,25 @@ func (c *WSClient) readRoutine() { c.wg.Done() }() - c.conn.SetReadDeadline(time.Now().Add(c.pongWait)) - c.conn.SetPongHandler(func(string) error { - c.conn.SetReadDeadline(time.Now().Add(c.pongWait)) + // gather latency stats c.mtx.RLock() - c.PingPongLatencyTimer.UpdateSince(c.sentLastPingAt) + t := c.sentLastPingAt c.mtx.RUnlock() + c.PingPongLatencyTimer.UpdateSince(t) + c.Logger.Debug("got pong") return nil }) for { + // reset deadline for every message type (control or data) + if c.readWait > 0 { + c.conn.SetReadDeadline(time.Now().Add(c.readWait)) + } _, data, err := c.conn.ReadMessage() if err != nil { - if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { return } diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index d1b9764c3..6f39690db 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -11,7 +11,6 @@ import ( "time" "github.com/gorilla/websocket" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tmlibs/log" @@ -162,12 +161,6 @@ func TestWSClientReconnectFailure(t *testing.T) { } } -func TestWSClientPingPongOption(t *testing.T) { - assert.Panics(t, func() { - NewWSClient("tcp://localhost:8080", "/websocket", PingPong(2*time.Second, 2*time.Second)) - }) -} - func startClient(t *testing.T, addr net.Addr) *WSClient { c := NewWSClient(addr.String(), "/websocket") _, err := c.Start() diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index 8bac7aa3c..7415cb36d 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -31,8 +31,6 @@ const ( unixAddr = "unix://" + unixSocket websocketEndpoint = "/websocket/endpoint" - - testPongWait = 2 * time.Second ) type ResultEcho struct { @@ -115,7 +113,7 @@ func setup() { tcpLogger := logger.With("socket", "tcp") mux := http.NewServeMux() server.RegisterRPCFuncs(mux, Routes, tcpLogger) - wm := server.NewWebsocketManager(Routes, nil, server.PingPong((testPongWait*9)/10, testPongWait)) + wm := server.NewWebsocketManager(Routes, nil, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second)) wm.SetLogger(tcpLogger) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) go func() { @@ -364,7 +362,7 @@ func TestWSClientPingPong(t *testing.T) { require.Nil(t, err) defer cl.Stop() - time.Sleep((testPongWait * 11) / 10) + time.Sleep(3 * time.Second) } func randBytes(t *testing.T) []byte { diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 431ef742f..203a71ff7 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -337,10 +337,10 @@ func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) { // rpc.websocket const ( - writeChanCapacity = 1000 - wsWriteWait = 30 * time.Second // each write times out after this. - defaultWSPongWait = 30 * time.Second - defaultWSPingPeriod = (defaultWSPongWait * 9) / 10 + defaultWSWriteChanCapacity = 1000 + defaultWSWriteWait = 10 * time.Second + defaultWSReadWait = 30 * time.Second + defaultWSPingPeriod = (defaultWSReadWait * 9) / 10 ) // a single websocket connection @@ -349,19 +349,23 @@ const ( type wsConnection struct { cmn.BaseService - remoteAddr string - baseConn *websocket.Conn - writeChan chan types.RPCResponse - readTimeout *time.Timer - pingTicker *time.Ticker + remoteAddr string + baseConn *websocket.Conn + writeChan chan types.RPCResponse funcMap map[string]*RPCFunc evsw events.EventSwitch + // write channel capacity + writeChanCapacity int + + // each write times out after this. + writeWait time.Duration + // Connection times out if we haven't received *anything* in this long, not even pings. - pongWait time.Duration + readWait time.Duration - // Send pings to server with this period. Must be less than pongWait. + // Send pings to server with this period. Must be less than readWait, but greater than zero. pingPeriod time.Duration } @@ -370,13 +374,14 @@ type wsConnection struct { // ping period and pong wait time. func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection { wsc := &wsConnection{ - remoteAddr: baseConn.RemoteAddr().String(), - baseConn: baseConn, - writeChan: make(chan types.RPCResponse, writeChanCapacity), // error when full. - funcMap: funcMap, - evsw: evsw, - pongWait: defaultWSPongWait, - pingPeriod: defaultWSPingPeriod, + remoteAddr: baseConn.RemoteAddr().String(), + baseConn: baseConn, + funcMap: funcMap, + evsw: evsw, + writeWait: defaultWSWriteWait, + writeChanCapacity: defaultWSWriteChanCapacity, + readWait: defaultWSReadWait, + pingPeriod: defaultWSPingPeriod, } for _, option := range options { option(wsc) @@ -385,69 +390,48 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw return wsc } -// PingPong allows changing ping period and pong wait time. If ping period -// greater or equal to pong wait time, panic will be thrown. -func PingPong(pingPeriod, pongWait time.Duration) func(*wsConnection) { +func WriteWait(writeWait time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.writeWait = writeWait + } +} + +func WriteChanCapacity(cap int) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.writeChanCapacity = cap + } +} + +func ReadWait(readWait time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.readWait = readWait + } +} + +func PingPeriod(pingPeriod time.Duration) func(*wsConnection) { return func(wsc *wsConnection) { - if pingPeriod >= pongWait { - panic(fmt.Sprintf("ping period (%v) must be less than pong wait time (%v)", pingPeriod, pongWait)) - } wsc.pingPeriod = pingPeriod - wsc.pongWait = pongWait } } // wsc.Start() blocks until the connection closes. func (wsc *wsConnection) OnStart() error { - wsc.BaseService.OnStart() - - // these must be set before the readRoutine is created, as it may - // call wsc.Stop(), which accesses these timers - wsc.readTimeout = time.NewTimer(wsc.pongWait) - wsc.pingTicker = time.NewTicker(wsc.pingPeriod) + wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity) // Read subscriptions/unsubscriptions to events go wsc.readRoutine() - - // Custom Ping handler to touch readTimeout - wsc.baseConn.SetPingHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(wsWriteWait)) - wsc.readTimeout.Reset(wsc.pongWait) - return nil - }) - wsc.baseConn.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - wsc.readTimeout.Reset(wsc.pongWait) - return nil - }) - go wsc.readTimeoutRoutine() - // Write responses, BLOCKING. wsc.writeRoutine() + return nil } func (wsc *wsConnection) OnStop() { - wsc.BaseService.OnStop() if wsc.evsw != nil { wsc.evsw.RemoveListener(wsc.remoteAddr) } - wsc.readTimeout.Stop() - wsc.pingTicker.Stop() - // The write loop closes the websocket connection - // when it exits its loop, and the read loop - // closes the writeChan -} - -func (wsc *wsConnection) readTimeoutRoutine() { - select { - case <-wsc.readTimeout.C: - wsc.Logger.Info("Stopping connection due to read timeout") - wsc.Stop() - case <-wsc.Quit: - return - } + // The write loop closes the websocket connection when it exits its loop, and + // the read loop closes the writeChan. } // Implements WSRPCConnection @@ -487,30 +471,30 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { // Read from the socket and subscribe to or unsubscribe from events func (wsc *wsConnection) readRoutine() { + defer func() { + wsc.baseConn.Close() + }() + // Do not close writeChan, to allow WriteRPCResponse() to fail. // defer close(wsc.writeChan) - for { select { case <-wsc.Quit: return default: + // reset deadline for every type of message (control or data) + wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)) var in []byte - // Do not set a deadline here like below: - // wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.pongWait)) - // The client may not send anything for a while. - // We use `readTimeout` to handle read timeouts. _, in, err := wsc.baseConn.ReadMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure) { - wsc.Logger.Info("Client closed the connection", "remote", wsc.remoteAddr) + wsc.Logger.Info("Client closed the connection") } else { - wsc.Logger.Info("Failed to read from connection", "remote", wsc.remoteAddr, "err", err.Error()) + wsc.Logger.Error("Failed to read request", "err", err) } wsc.Stop() return } - wsc.readTimeout.Reset(wsc.pongWait) var request types.RPCRequest err = json.Unmarshal(in, &request) @@ -558,15 +542,33 @@ func (wsc *wsConnection) readRoutine() { // receives on a write channel and writes out on the socket func (wsc *wsConnection) writeRoutine() { - defer wsc.baseConn.Close() + pingTicker := time.NewTicker(wsc.pingPeriod) + defer func() { + pingTicker.Stop() + wsc.baseConn.Close() + }() + + // https://github.com/gorilla/websocket/issues/97 + pongs := make(chan string, 1) + wsc.baseConn.SetPingHandler(func(m string) error { + select { + case pongs <- m: + default: + } + return nil + }) + for { select { - case <-wsc.Quit: - return - case <-wsc.pingTicker.C: + case m := <-pongs: + err := wsc.writeMessageWithDeadline(websocket.PongMessage, []byte(m)) + if err != nil { + wsc.Logger.Info("Failed to write pong (client may disconnect)", "err", err) + } + case <-pingTicker.C: err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{}) if err != nil { - wsc.Logger.Error("Failed to write ping message on websocket", "err", err) + wsc.Logger.Error("Failed to write ping", "err", err) wsc.Stop() return } @@ -576,11 +578,13 @@ func (wsc *wsConnection) writeRoutine() { wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err) } else { if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil { - wsc.Logger.Error("Failed to write response on websocket", "err", err) + wsc.Logger.Error("Failed to write response", "err", err) wsc.Stop() return } } + case <-wsc.Quit: + return } } } @@ -588,7 +592,7 @@ func (wsc *wsConnection) writeRoutine() { // All writes to the websocket must (re)set the write deadline. // If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553) func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error { - wsc.baseConn.SetWriteDeadline(time.Now().Add(wsWriteWait)) + wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait)) return wsc.baseConn.WriteMessage(msgType, msg) } @@ -610,10 +614,8 @@ func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, w funcMap: funcMap, evsw: evsw, Upgrader: websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { - // TODO + // TODO ??? return true }, }, @@ -637,7 +639,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ // register connection con := NewWSConnection(wsConn, wm.funcMap, wm.evsw, wm.wsConnOptions...) - con.SetLogger(wm.logger) + con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr())) wm.logger.Info("New websocket connection", "remote", con.remoteAddr) con.Start() // Blocking } From 9dde1a0bd4f4ba1b30134def75f89686ad77cb83 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 24 Aug 2017 16:25:56 -0400 Subject: [PATCH 16/16] rpc: comments --- rpc/lib/client/ws_client.go | 8 +++++ rpc/lib/client/ws_client_test.go | 1 + rpc/lib/server/handlers.go | 51 ++++++++++++++++++++------------ 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 6a932caf6..f56c45e99 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -92,24 +92,32 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSCli return c } +// MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error. +// It should only be used in the constructor and is not Goroutine-safe. func MaxReconnectAttempts(max int) func(*WSClient) { return func(c *WSClient) { c.maxReconnectAttempts = max } } +// ReadWait sets the amount of time to wait before a websocket read times out. +// It should only be used in the constructor and is not Goroutine-safe. func ReadWait(readWait time.Duration) func(*WSClient) { return func(c *WSClient) { c.readWait = readWait } } +// WriteWait sets the amount of time to wait before a websocket write times out. +// It should only be used in the constructor and is not Goroutine-safe. func WriteWait(writeWait time.Duration) func(*WSClient) { return func(c *WSClient) { c.writeWait = writeWait } } +// PingPeriod sets the duration for sending websocket pings. +// It should only be used in the constructor - not Goroutine-safe. func PingPeriod(pingPeriod time.Duration) func(*WSClient) { return func(c *WSClient) { c.pingPeriod = pingPeriod diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index 6f39690db..f5aa027fe 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -32,6 +32,7 @@ func (h *myHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { panic(err) } + defer conn.Close() for { messageType, _, err := conn.ReadMessage() if err != nil { diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 203a71ff7..3b81567e4 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -21,7 +21,7 @@ import ( "github.com/tendermint/tmlibs/log" ) -// Adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions. +// RegisterRPCFuncs adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions. // "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger log.Logger) { // HTTP endpoints @@ -36,7 +36,7 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger lo //------------------------------------- // function introspection -// holds all type information for each function +// RPCFunc contains the introspected type information for a function type RPCFunc struct { f reflect.Value // underlying rpc function args []reflect.Type // type of each function arg @@ -45,12 +45,13 @@ type RPCFunc struct { ws bool // websocket only } -// wraps a function for quicker introspection +// NewRPCFunc wraps a function for introspection. // f is the function, args are comma separated argument names func NewRPCFunc(f interface{}, args string) *RPCFunc { return newRPCFunc(f, args, false) } +// NewWSRPCFunc wraps a function for introspection and use in the websockets. func NewWSRPCFunc(f interface{}, args string) *RPCFunc { return newRPCFunc(f, args, true) } @@ -372,6 +373,8 @@ type wsConnection struct { // NewWSConnection wraps websocket.Conn. See the commentary on the // func(*wsConnection) functions for a detailed description of how to configure // ping period and pong wait time. +// NOTE: if the write buffer is full, pongs may be dropped, which may cause clients to disconnect. +// see https://github.com/gorilla/websocket/issues/97 func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection { wsc := &wsConnection{ remoteAddr: baseConn.RemoteAddr().String(), @@ -390,31 +393,39 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw return wsc } +// WriteWait sets the amount of time to wait before a websocket write times out. +// It should only be used in the constructor - not Goroutine-safe. func WriteWait(writeWait time.Duration) func(*wsConnection) { return func(wsc *wsConnection) { wsc.writeWait = writeWait } } +// WriteChanCapacity sets the capacity of the websocket write channel. +// It should only be used in the constructor - not Goroutine-safe. func WriteChanCapacity(cap int) func(*wsConnection) { return func(wsc *wsConnection) { wsc.writeChanCapacity = cap } } +// ReadWait sets the amount of time to wait before a websocket read times out. +// It should only be used in the constructor - not Goroutine-safe. func ReadWait(readWait time.Duration) func(*wsConnection) { return func(wsc *wsConnection) { wsc.readWait = readWait } } +// PingPeriod sets the duration for sending websocket pings. +// It should only be used in the constructor - not Goroutine-safe. func PingPeriod(pingPeriod time.Duration) func(*wsConnection) { return func(wsc *wsConnection) { wsc.pingPeriod = pingPeriod } } -// wsc.Start() blocks until the connection closes. +// OnStart starts the read and write routines. It blocks until the connection closes. func (wsc *wsConnection) OnStart() error { wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity) @@ -426,27 +437,29 @@ func (wsc *wsConnection) OnStart() error { return nil } +// OnStop unsubscribes from all events. func (wsc *wsConnection) OnStop() { if wsc.evsw != nil { wsc.evsw.RemoveListener(wsc.remoteAddr) } - // The write loop closes the websocket connection when it exits its loop, and - // the read loop closes the writeChan. + // Both read and write loops close the websocket connection when they exit their loops. + // The writeChan is never closed, to allow WriteRPCResponse() to fail. } -// Implements WSRPCConnection +// GetRemoteAddr returns the remote address of the underlying connection. +// It implements WSRPCConnection func (wsc *wsConnection) GetRemoteAddr() string { return wsc.remoteAddr } -// Implements WSRPCConnection +// GetEventSwitch returns the event switch. +// It implements WSRPCConnection func (wsc *wsConnection) GetEventSwitch() events.EventSwitch { return wsc.evsw } -// Implements WSRPCConnection -// Blocking write to writeChan until service stops. -// Goroutine-safe +// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. +// It implements WSRPCConnection. It is Goroutine-safe. func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { select { case <-wsc.Quit: @@ -455,9 +468,8 @@ func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { } } -// Implements WSRPCConnection -// Nonblocking write. -// Goroutine-safe +// TryWriteRPCResponse attempts to push a response to the writeChan, but does not block. +// It implements WSRPCConnection. It is Goroutine-safe func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { select { case <-wsc.Quit: @@ -475,8 +487,6 @@ func (wsc *wsConnection) readRoutine() { wsc.baseConn.Close() }() - // Do not close writeChan, to allow WriteRPCResponse() to fail. - // defer close(wsc.writeChan) for { select { case <-wsc.Quit: @@ -598,8 +608,8 @@ func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error //---------------------------------------- -// Main manager for all websocket connections -// Holds the event switch +// WebsocketManager is the main manager for all websocket connections. +// It holds the event switch and a map of functions for routing. // NOTE: The websocket path is defined externally, e.g. in node/node.go type WebsocketManager struct { websocket.Upgrader @@ -609,6 +619,8 @@ type WebsocketManager struct { wsConnOptions []func(*wsConnection) } +// NewWebsocketManager returns a new WebsocketManager that routes according to the given funcMap, listens on the given event switch, +// and connects to the server with the given connection options. func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, wsConnOptions ...func(*wsConnection)) *WebsocketManager { return &WebsocketManager{ funcMap: funcMap, @@ -624,11 +636,12 @@ func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, w } } +// SetLogger sets the logger. func (wm *WebsocketManager) SetLogger(l log.Logger) { wm.logger = l } -// Upgrade the request/response (via http.Hijack) and starts the wsConnection. +// WebsocketHandler upgrades the request/response (via http.Hijack) and starts the wsConnection. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { wsConn, err := wm.Upgrade(w, r, nil) if err != nil {