From 0013053fae3fb7611c392ebcff15352bb7ec717b Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 4 Aug 2017 10:30:22 -0400 Subject: [PATCH] allow to change pong wait and ping period --- rpc/lib/client/ws_client.go | 46 +++++++++++++++++++++++--------- rpc/lib/client/ws_client_test.go | 7 +++++ 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 12eb97ab4..c617bf21a 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -22,14 +22,11 @@ const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second - // Time allowed to read the next pong message from the server. - pongWait = 30 * time.Second - - // Send pings to server with this period. Must be less than pongWait. - pingPeriod = (pongWait * 9) / 10 - // Maximum reconnect attempts maxReconnectAttempts = 25 + + defaultPongWait = 30 * time.Second + defaultPingPeriod = (defaultPongWait * 9) / 10 ) type WSClient struct { @@ -58,19 +55,42 @@ type WSClient struct { wg sync.WaitGroup mtx sync.RWMutex + + // Time allowed to read the next pong message from the server. + pongWait time.Duration + + // Send pings to server with this period. Must be less than pongWait. + pingPeriod time.Duration } // NewWSClient returns a new client. -func NewWSClient(remoteAddr, endpoint string) *WSClient { +func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient { addr, dialer := makeHTTPDialer(remoteAddr) - wsClient := &WSClient{ + c := &WSClient{ Address: addr, Dialer: dialer, Endpoint: endpoint, PingPongLatencyTimer: metrics.NewTimer(), + pongWait: defaultPongWait, + pingPeriod: defaultPingPeriod, + } + c.BaseService = *cmn.NewBaseService(nil, "WSClient", c) + for _, option := range options { + option(c) + } + return c +} + +// PingPong allows changing ping period and pong wait time. If ping period +// greater or equal to pong wait time, panic will be thrown. +func PingPong(pingPeriod, pongWait time.Duration) func(*WSClient) { + return func(c *WSClient) { + if pingPeriod >= pongWait { + panic(fmt.Sprintf("ping period (%v) must be less than pong wait time (%v)", pingPeriod, pongWait)) + } + c.pingPeriod = pingPeriod + c.pongWait = pongWait } - wsClient.BaseService = *cmn.NewBaseService(nil, "WSClient", wsClient) - return wsClient } // String returns WS client full address. @@ -248,7 +268,7 @@ func (c *WSClient) reconnectRoutine() { // The client ensures that there is at most one writer to a connection by // executing all writes from this goroutine. func (c *WSClient) writeRoutine() { - ticker := time.NewTicker(pingPeriod) + ticker := time.NewTicker(c.pingPeriod) defer func() { ticker.Stop() c.conn.Close() @@ -307,9 +327,9 @@ func (c *WSClient) receiveRoutine() { c.wg.Done() }() - c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetReadDeadline(time.Now().Add(c.pongWait)) c.conn.SetPongHandler(func(string) error { - c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetReadDeadline(time.Now().Add(c.pongWait)) c.mtx.RLock() c.PingPongLatencyTimer.UpdateSince(c.sentLastPingAt) c.mtx.RUnlock() diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index 6a72ad28d..6778a0894 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tmlibs/log" @@ -188,6 +189,12 @@ func TestWSClientReconnectFailure(t *testing.T) { } } +func TestWSClientPingPongOption(t *testing.T) { + assert.Panics(t, func() { + NewWSClient("tcp://localhost:8080", "/websocket", PingPong(2*time.Second, 2*time.Second)) + }) +} + func startClient(t *testing.T, addr net.Addr) *WSClient { c := NewWSClient(addr.String(), "/websocket") _, err := c.Start()