From d578f7f81edde980fa2cdd15a3220f667e8f697e Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 31 Jul 2017 18:44:46 -0400 Subject: [PATCH] biff up WS client What's new: - auto reconnect - ping/pong - colored tests --- benchmarks/simu/counter.go | 32 +-- rpc/client/httpclient.go | 7 +- rpc/lib/client/ws_client.go | 347 +++++++++++++++++++++++-------- rpc/lib/client/ws_client_test.go | 204 ++++++++++++++++++ rpc/lib/rpc_test.go | 69 ++++-- rpc/lib/test/integration_test.sh | 3 + rpc/lib/types/types.go | 13 ++ 7 files changed, 554 insertions(+), 121 deletions(-) create mode 100644 rpc/lib/client/ws_client_test.go diff --git a/benchmarks/simu/counter.go b/benchmarks/simu/counter.go index e9502f956..d26f5e6f7 100644 --- a/benchmarks/simu/counter.go +++ b/benchmarks/simu/counter.go @@ -1,30 +1,28 @@ package main import ( + "context" "encoding/binary" "time" //"encoding/hex" "fmt" - "github.com/gorilla/websocket" - "github.com/tendermint/go-wire" - _ "github.com/tendermint/tendermint/rpc/core/types" // Register RPCResponse > Result types - "github.com/tendermint/tendermint/rpc/lib/client" - "github.com/tendermint/tendermint/rpc/lib/types" - . "github.com/tendermint/tmlibs/common" + rpcclient "github.com/tendermint/tendermint/rpc/lib/client" + cmn "github.com/tendermint/tmlibs/common" ) func main() { - ws := rpcclient.NewWSClient("127.0.0.1:46657", "/websocket") - _, err := ws.Start() + wsc := rpcclient.NewWSClient("127.0.0.1:46657", "/websocket") + _, err := wsc.Start() if err != nil { - Exit(err.Error()) + cmn.Exit(err.Error()) } + defer wsc.Stop() // Read a bunch of responses go func() { for { - _, ok := <-ws.ResultsCh + _, ok := <-wsc.ResultsCh if !ok { break } @@ -37,24 +35,14 @@ func main() { for i := 0; ; i++ { binary.BigEndian.PutUint64(buf, uint64(i)) //txBytes := hex.EncodeToString(buf[:n]) - request, err := rpctypes.MapToRequest("fakeid", - "broadcast_tx", - map[string]interface{}{"tx": buf[:8]}) - if err != nil { - Exit(err.Error()) - } - reqBytes := wire.JSONBytes(request) - //fmt.Println("!!", string(reqBytes)) fmt.Print(".") - err = ws.WriteMessage(websocket.TextMessage, reqBytes) + err = wsc.Call(context.TODO(), "broadcast_tx", map[string]interface{}{"tx": buf[:8]}) if err != nil { - Exit(err.Error()) + cmn.Exit(err.Error()) } if i%1000 == 0 { fmt.Println(i) } time.Sleep(time.Microsecond * 1000) } - - ws.Stop() } diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index cb7149406..9623229e6 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -1,13 +1,14 @@ package client import ( + "context" "encoding/json" "fmt" "github.com/pkg/errors" data "github.com/tendermint/go-wire/data" ctypes "github.com/tendermint/tendermint/rpc/core/types" - "github.com/tendermint/tendermint/rpc/lib/client" + rpcclient "github.com/tendermint/tendermint/rpc/lib/client" "github.com/tendermint/tendermint/types" events "github.com/tendermint/tmlibs/events" ) @@ -349,14 +350,14 @@ func (w *WSEvents) parseEvent(data []byte) (err error) { // no way of exposing these failures, so we panic. // is this right? or silently ignore??? func (w *WSEvents) subscribe(event string) { - err := w.ws.Subscribe(event) + err := w.ws.Subscribe(context.TODO(), event) if err != nil { panic(err) } } func (w *WSEvents) unsubscribe(event string) { - err := w.ws.Unsubscribe(event) + err := w.ws.Unsubscribe(context.TODO(), event) if err != nil { panic(err) } diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index f018de67c..2e4b8557d 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -1,9 +1,13 @@ package rpcclient import ( + "context" "encoding/json" + "fmt" + "math" "net" "net/http" + "sync" "time" "github.com/gorilla/websocket" @@ -13,148 +17,325 @@ import ( ) const ( - wsResultsChannelCapacity = 10 - wsErrorsChannelCapacity = 1 - wsWriteTimeoutSeconds = 10 + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the server. + pongWait = 30 * time.Second + + // Send pings to server with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + + // Maximum reconnect attempts + maxReconnectAttempts = 25 ) type WSClient struct { cmn.BaseService + + conn *websocket.Conn + Address string // IP:PORT or /path/to/socket Endpoint string // /websocket/url/endpoint Dialer func(string, string) (net.Conn, error) - *websocket.Conn - ResultsCh chan json.RawMessage // closes upon WSClient.Stop() - ErrorsCh chan error // closes upon WSClient.Stop() + + // user facing channels, closed only when the client is being stopped. + ResultsCh chan json.RawMessage + ErrorsCh chan error + + // internal channels + send chan types.RPCRequest // user requests + backlog chan types.RPCRequest // stores a single user request received during a conn failure + reconnectAfter chan error // reconnect requests + receiveRoutineQuit chan struct{} // a way for receiveRoutine to close writeRoutine + + wg sync.WaitGroup } -// create a new connection +// NewWSClient returns a new client. func NewWSClient(remoteAddr, endpoint string) *WSClient { addr, dialer := makeHTTPDialer(remoteAddr) wsClient := &WSClient{ Address: addr, Dialer: dialer, Endpoint: endpoint, - Conn: nil, } wsClient.BaseService = *cmn.NewBaseService(nil, "WSClient", wsClient) return wsClient } -func (wsc *WSClient) String() string { - return wsc.Address + ", " + wsc.Endpoint +// String returns WS client full address. +func (c *WSClient) String() string { + return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint) } -// OnStart implements cmn.BaseService interface -func (wsc *WSClient) OnStart() error { - wsc.BaseService.OnStart() - err := wsc.dial() +// OnStart implements cmn.Service by dialing a server and creating read and +// write routines. +func (c *WSClient) OnStart() error { + err := c.dial() if err != nil { return err } - wsc.ResultsCh = make(chan json.RawMessage, wsResultsChannelCapacity) - wsc.ErrorsCh = make(chan error, wsErrorsChannelCapacity) - go wsc.receiveEventsRoutine() + + c.ResultsCh = make(chan json.RawMessage) + c.ErrorsCh = make(chan error) + + c.send = make(chan types.RPCRequest) + // 1 additional error may come from the read/write + // goroutine depending on which failed first. + c.reconnectAfter = make(chan error, 1) + // capacity for 1 request. a user won't be able to send more because the send + // channel is unbuffered. + c.backlog = make(chan types.RPCRequest, 1) + + c.startReadWriteRoutines() + go c.reconnectRoutine() + return nil } -// OnReset implements cmn.BaseService interface -func (wsc *WSClient) OnReset() error { - return nil +// OnStop implements cmn.Service. +func (c *WSClient) OnStop() {} + +// Stop overrides cmn.Service#Stop. There is no other way to wait until Quit +// channel is closed. +func (c *WSClient) Stop() bool { + success := c.BaseService.Stop() + // only close user-facing channels when we can't write to them + c.wg.Wait() + close(c.ResultsCh) + close(c.ErrorsCh) + return success } -func (wsc *WSClient) dial() error { +// Send asynchronously sends the given RPCRequest to the server. Results will +// be available on ResultsCh, errors, if any, on ErrorsCh. +func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error { + select { + case c.send <- request: + c.Logger.Info("sent a request", "req", request) + return nil + case <-ctx.Done(): + return ctx.Err() + } +} - // Dial +// Call asynchronously calls a given method by sending an RPCRequest to the +// server. Results will be available on ResultsCh, errors, if any, on ErrorsCh. +func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error { + request, err := types.MapToRequest("", method, params) + if err != nil { + return err + } + return c.Send(ctx, request) +} + +// CallWithArrayParams asynchronously calls a given method by sending an +// RPCRequest to the server. Results will be available on ResultsCh, errors, if +// any, on ErrorsCh. +func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error { + request, err := types.ArrayToRequest("", method, params) + if err != nil { + return err + } + return c.Send(ctx, request) +} + +/////////////////////////////////////////////////////////////////////////////// +// Private methods + +func (c *WSClient) dial() error { dialer := &websocket.Dialer{ - NetDial: wsc.Dialer, + NetDial: c.Dialer, Proxy: http.ProxyFromEnvironment, } rHeader := http.Header{} - con, _, err := dialer.Dial("ws://"+wsc.Address+wsc.Endpoint, rHeader) + conn, _, err := dialer.Dial("ws://"+c.Address+c.Endpoint, rHeader) if err != nil { return err } - // Set the ping/pong handlers - con.SetPingHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) - return nil - }) - con.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - return nil - }) - wsc.Conn = con + c.conn = conn return nil } -// OnStop implements cmn.BaseService interface -func (wsc *WSClient) OnStop() { - wsc.BaseService.OnStop() - wsc.Conn.Close() - // ResultsCh/ErrorsCh is closed in receiveEventsRoutine. -} +// reconnect tries to redial up to maxReconnectAttempts with exponential +// backoff. +func (c *WSClient) reconnect() error { + attempt := 0 -func (wsc *WSClient) receiveEventsRoutine() { for { - _, data, err := wsc.ReadMessage() + c.Logger.Info("reconnecting", "attempt", attempt+1) + + d := time.Duration(math.Exp2(float64(attempt))) + time.Sleep(d * time.Second) + + err := c.dial() if err != nil { - wsc.Logger.Info("WSClient failed to read message", "err", err, "data", string(data)) - wsc.Stop() - break + c.Logger.Error("failed to redial", "err", err) } else { - var response types.RPCResponse - err := json.Unmarshal(data, &response) + c.Logger.Info("reconnected") + return nil + } + + attempt++ + + if attempt > maxReconnectAttempts { + return errors.Wrap(err, "reached maximum reconnect attempts") + } + } +} + +func (c *WSClient) startReadWriteRoutines() { + c.wg.Add(2) + c.receiveRoutineQuit = make(chan struct{}) + go c.receiveRoutine() + go c.writeRoutine() +} + +func (c *WSClient) reconnectRoutine() { + for { + select { + case originalError := <-c.reconnectAfter: + // wait until writeRoutine and receiveRoutine finish + c.wg.Wait() + err := c.reconnect() + if err != nil { + c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError) + c.Stop() + return + } else { + // drain reconnectAfter + LOOP: + for { + select { + case <-c.reconnectAfter: + default: + break LOOP + } + } + c.startReadWriteRoutines() + return + } + case <-c.Quit: + return + } + } +} + +// The client ensures that there is at most one writer to a connection by +// executing all writes from this goroutine. +func (c *WSClient) writeRoutine() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.conn.Close() + c.wg.Done() + }() + + for { + select { + case request := <-c.backlog: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + err := c.conn.WriteJSON(request) if err != nil { - wsc.Logger.Info("WSClient failed to parse message", "err", err, "data", string(data)) - wsc.ErrorsCh <- err - continue + c.Logger.Error("failed to resend request", "err", err) + c.reconnectAfter <- err + // add request to the backlog, so we don't lose it + c.backlog <- request + return } - if response.Error != "" { - wsc.ErrorsCh <- errors.Errorf(response.Error) - continue + c.Logger.Info("resend a request", "req", request) + case request := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + err := c.conn.WriteJSON(request) + if err != nil { + c.Logger.Error("failed to send request", "err", err) + c.reconnectAfter <- err + // add request to the backlog, so we don't lose it + c.backlog <- request + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + err := c.conn.WriteMessage(websocket.PingMessage, []byte{}) + if err != nil { + c.Logger.Error("failed to write ping", "err", err) + c.reconnectAfter <- err + return } - wsc.ResultsCh <- *response.Result + c.Logger.Debug("sent ping") + case <-c.receiveRoutineQuit: + return + case <-c.Quit: + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return } } - // this must be modified in the same go-routine that reads from the - // connection to avoid race conditions - wsc.Conn = nil +} + +// The client ensures that there is at most one reader to a connection by +// executing all reads from this goroutine. +func (c *WSClient) receiveRoutine() { + defer func() { + c.conn.Close() + c.wg.Done() + }() - // Cleanup - close(wsc.ResultsCh) - close(wsc.ErrorsCh) + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.Logger.Debug("got pong") + return nil + }) + for { + _, data, err := c.conn.ReadMessage() + if err != nil { + if !websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { + return + } + + c.Logger.Error("failed to read response", "err", err) + close(c.receiveRoutineQuit) + c.reconnectAfter <- err + return + } + + var response types.RPCResponse + err = json.Unmarshal(data, &response) + if err != nil { + c.Logger.Error("failed to parse response", "err", err, "data", string(data)) + c.ErrorsCh <- err + continue + } + if response.Error != "" { + c.ErrorsCh <- errors.Errorf(response.Error) + continue + } + c.Logger.Info("got response", "resp", response.Result) + c.ResultsCh <- *response.Result + } } +/////////////////////////////////////////////////////////////////////////////// +// Predefined methods + // Subscribe to an event. Note the server must have a "subscribe" route // defined. -func (wsc *WSClient) Subscribe(eventid string) error { - params := map[string]interface{}{"event": eventid} - request, err := types.MapToRequest("", "subscribe", params) - if err == nil { - err = wsc.WriteJSON(request) - } - return err +func (c *WSClient) Subscribe(ctx context.Context, eventType string) error { + params := map[string]interface{}{"event": eventType} + return c.Call(ctx, "subscribe", params) } // Unsubscribe from an event. Note the server must have a "unsubscribe" route // defined. -func (wsc *WSClient) Unsubscribe(eventid string) error { - params := map[string]interface{}{"event": eventid} - request, err := types.MapToRequest("", "unsubscribe", params) - if err == nil { - err = wsc.WriteJSON(request) - } - return err +func (c *WSClient) Unsubscribe(ctx context.Context, eventType string) error { + params := map[string]interface{}{"event": eventType} + return c.Call(ctx, "unsubscribe", params) } -// Call asynchronously calls a given method by sending an RPCRequest to the -// server. Results will be available on ResultsCh, errors, if any, on ErrorsCh. -func (wsc *WSClient) Call(method string, params map[string]interface{}) error { - request, err := types.MapToRequest("", method, params) - if err == nil { - err = wsc.WriteJSON(request) - } - return err +// UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route +// defined. +func (c *WSClient) UnsubscribeAll(ctx context.Context) error { + params := map[string]interface{}{} + return c.Call(ctx, "unsubscribe_all", params) } diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go new file mode 100644 index 000000000..6a72ad28d --- /dev/null +++ b/rpc/lib/client/ws_client_test.go @@ -0,0 +1,204 @@ +package rpcclient + +import ( + "context" + "encoding/json" + "net" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" + "github.com/tendermint/tmlibs/log" + + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +type myHandler struct { + closeConnAfterRead bool + mtx sync.RWMutex +} + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +func (h *myHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + panic(err) + } + for { + messageType, _, err := conn.ReadMessage() + if err != nil { + return + } + + h.mtx.RLock() + if h.closeConnAfterRead { + conn.Close() + } + h.mtx.RUnlock() + + res := json.RawMessage(`{}`) + emptyRespBytes, _ := json.Marshal(types.RPCResponse{Result: &res}) + if err := conn.WriteMessage(messageType, emptyRespBytes); err != nil { + return + } + } +} + +func TestWSClientReconnectsAfterReadFailure(t *testing.T) { + var wg sync.WaitGroup + + // start server + h := &myHandler{} + s := httptest.NewServer(h) + defer s.Close() + + c := startClient(t, s.Listener.Addr()) + defer c.Stop() + + wg.Add(1) + go func() { + for { + select { + case res := <-c.ResultsCh: + if res != nil { + wg.Done() + } + case err := <-c.ErrorsCh: + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + case <-c.Quit: + return + } + } + }() + + h.mtx.Lock() + h.closeConnAfterRead = true + h.mtx.Unlock() + + // results in error + call(t, "a", c) + + // expect to reconnect almost immediately + time.Sleep(10 * time.Millisecond) + h.mtx.Lock() + h.closeConnAfterRead = false + h.mtx.Unlock() + + // should succeed + call(t, "b", c) + + wg.Wait() +} + +func TestWSClientReconnectsAfterWriteFailure(t *testing.T) { + var wg sync.WaitGroup + + // start server + h := &myHandler{} + s := httptest.NewServer(h) + + c := startClient(t, s.Listener.Addr()) + defer c.Stop() + + wg.Add(2) + go func() { + for { + select { + case res := <-c.ResultsCh: + if res != nil { + wg.Done() + } + case err := <-c.ErrorsCh: + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + case <-c.Quit: + return + } + } + }() + + // hacky way to abort the connection before write + c.conn.Close() + + // results in error, the client should resend on reconnect + call(t, "a", c) + + // expect to reconnect almost immediately + time.Sleep(10 * time.Millisecond) + + // should succeed + call(t, "b", c) + + wg.Wait() +} + +func TestWSClientReconnectFailure(t *testing.T) { + // start server + h := &myHandler{} + s := httptest.NewServer(h) + + c := startClient(t, s.Listener.Addr()) + defer c.Stop() + + go func() { + for { + select { + case <-c.ResultsCh: + case <-c.ErrorsCh: + case <-c.Quit: + return + } + } + }() + + // hacky way to abort the connection before write + c.conn.Close() + s.Close() + + // results in error + call(t, "a", c) + + // expect to reconnect almost immediately + time.Sleep(10 * time.Millisecond) + + done := make(chan struct{}) + go func() { + // client should block on this + call(t, "b", c) + close(done) + }() + + // test that client blocks on the second send + select { + case <-done: + t.Fatal("client should block on calling 'b' during reconnect") + case <-time.After(5 * time.Second): + t.Log("All good") + } +} + +func startClient(t *testing.T, addr net.Addr) *WSClient { + c := NewWSClient(addr.String(), "/websocket") + _, err := c.Start() + require.Nil(t, err) + c.SetLogger(log.TestingLogger()) + return c +} + +func call(t *testing.T, method string, c *WSClient) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := c.Call(ctx, method, make(map[string]interface{})) + require.NoError(t, err) +} diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index a79a5270e..50750e5db 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -2,15 +2,18 @@ package rpc import ( "bytes" + "context" crand "crypto/rand" "encoding/json" "fmt" "math/rand" "net/http" + "os" "os/exec" "testing" "time" + "github.com/go-kit/kit/log/term" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/go-wire/data" @@ -75,8 +78,29 @@ func EchoDataBytesResult(v data.Bytes) (*ResultEchoDataBytes, error) { return &ResultEchoDataBytes{v}, nil } +func TestMain(m *testing.M) { + setup() + code := m.Run() + os.Exit(code) +} + +var colorFn = func(keyvals ...interface{}) term.FgBgColor { + for i := 0; i < len(keyvals)-1; i += 2 { + if keyvals[i] == "socket" { + if keyvals[i+1] == "tcp" { + return term.FgBgColor{Fg: term.DarkBlue} + } else if keyvals[i+1] == "unix" { + return term.FgBgColor{Fg: term.DarkCyan} + } + } + } + return term.FgBgColor{} +} + // launch unix and tcp servers -func init() { +func setup() { + logger := log.NewTMLoggerWithColorFn(log.NewSyncWriter(os.Stdout), colorFn) + cmd := exec.Command("rm", "-f", unixSocket) err := cmd.Start() if err != nil { @@ -86,25 +110,27 @@ func init() { panic(err) } + tcpLogger := logger.With("socket", "tcp") mux := http.NewServeMux() - server.RegisterRPCFuncs(mux, Routes, log.TestingLogger()) + server.RegisterRPCFuncs(mux, Routes, tcpLogger) wm := server.NewWebsocketManager(Routes, nil) - wm.SetLogger(log.TestingLogger()) + wm.SetLogger(tcpLogger) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) go func() { - _, err := server.StartHTTPServer(tcpAddr, mux, log.TestingLogger()) + _, err := server.StartHTTPServer(tcpAddr, mux, tcpLogger) if err != nil { panic(err) } }() + unixLogger := logger.With("socket", "unix") mux2 := http.NewServeMux() - server.RegisterRPCFuncs(mux2, Routes, log.TestingLogger()) + server.RegisterRPCFuncs(mux2, Routes, unixLogger) wm = server.NewWebsocketManager(Routes, nil) - wm.SetLogger(log.TestingLogger()) + wm.SetLogger(unixLogger) mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler) go func() { - _, err := server.StartHTTPServer(unixAddr, mux2, log.TestingLogger()) + _, err := server.StartHTTPServer(unixAddr, mux2, unixLogger) if err != nil { panic(err) } @@ -184,7 +210,7 @@ func echoViaWS(cl *client.WSClient, val string) (string, error) { params := map[string]interface{}{ "arg": val, } - err := cl.Call("echo", params) + err := cl.Call(context.Background(), "echo", params) if err != nil { return "", err } @@ -206,7 +232,7 @@ func echoBytesViaWS(cl *client.WSClient, bytes []byte) ([]byte, error) { params := map[string]interface{}{ "arg": bytes, } - err := cl.Call("echo_bytes", params) + err := cl.Call(context.Background(), "echo_bytes", params) if err != nil { return []byte{}, err } @@ -252,6 +278,7 @@ func TestServersAndClientsBasic(t *testing.T) { cl3 := client.NewWSClient(addr, websocketEndpoint) _, err := cl3.Start() require.Nil(t, err) + cl3.SetLogger(log.TestingLogger()) fmt.Printf("=== testing server on %s using %v client", addr, cl3) testWithWSClient(t, cl3) cl3.Stop() @@ -280,13 +307,14 @@ func TestWSNewWSRPCFunc(t *testing.T) { cl := client.NewWSClient(tcpAddr, websocketEndpoint) _, err := cl.Start() require.Nil(t, err) + cl.SetLogger(log.TestingLogger()) defer cl.Stop() val := "acbd" params := map[string]interface{}{ "arg": val, } - err = cl.Call("echo_ws", params) + err = cl.Call(context.Background(), "echo_ws", params) require.Nil(t, err) select { @@ -305,13 +333,12 @@ func TestWSHandlesArrayParams(t *testing.T) { cl := client.NewWSClient(tcpAddr, websocketEndpoint) _, err := cl.Start() require.Nil(t, err) + cl.SetLogger(log.TestingLogger()) defer cl.Stop() val := "acbd" params := []interface{}{val} - request, err := types.ArrayToRequest("", "echo_ws", params) - require.Nil(t, err) - err = cl.WriteJSON(request) + err = cl.CallWithArrayParams(context.Background(), "echo_ws", params) require.Nil(t, err) select { @@ -326,6 +353,22 @@ func TestWSHandlesArrayParams(t *testing.T) { } } +// TestWSClientPingPong checks that a client & server exchange pings +// & pongs so connection stays alive. +func TestWSClientPingPong(t *testing.T) { + if testing.Short() { + t.Skip("skipping ping pong in short mode") + } + + cl := client.NewWSClient(tcpAddr, websocketEndpoint) + _, err := cl.Start() + require.Nil(t, err) + cl.SetLogger(log.TestingLogger()) + defer cl.Stop() + + time.Sleep(35 * time.Second) +} + func randBytes(t *testing.T) []byte { n := rand.Intn(10) + 2 buf := make([]byte, n) diff --git a/rpc/lib/test/integration_test.sh b/rpc/lib/test/integration_test.sh index 7c23be7d3..ca15440c1 100755 --- a/rpc/lib/test/integration_test.sh +++ b/rpc/lib/test/integration_test.sh @@ -9,6 +9,9 @@ DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" # Change into that dir because we expect that. pushd "$DIR" +echo "==> Installing deps" +go get -v + echo "==> Building the server" go build -o rpcserver main.go diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index 8076e4b0d..f4a2cede0 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -2,6 +2,7 @@ package rpctypes import ( "encoding/json" + "fmt" "strings" events "github.com/tendermint/tmlibs/events" @@ -23,6 +24,10 @@ func NewRPCRequest(id string, method string, params json.RawMessage) RPCRequest } } +func (req RPCRequest) String() string { + return fmt.Sprintf("[%s %s]", req.ID, req.Method) +} + func MapToRequest(id string, method string, params map[string]interface{}) (RPCRequest, error) { payload, err := json.Marshal(params) if err != nil { @@ -70,6 +75,14 @@ func NewRPCResponse(id string, res interface{}, err string) RPCResponse { } } +func (resp RPCResponse) String() string { + if resp.Error == "" { + return fmt.Sprintf("[%s %v]", resp.ID, resp.Result) + } else { + return fmt.Sprintf("[%s %s]", resp.ID, resp.Error) + } +} + //---------------------------------------- // *wsConnection implements this interface.