- package rpcclient
-
- import (
- "context"
- "encoding/json"
- "fmt"
- "net"
- "net/http"
- "sync"
- "time"
-
- "github.com/gorilla/websocket"
- "github.com/pkg/errors"
- metrics "github.com/rcrowley/go-metrics"
-
- "github.com/tendermint/go-amino"
- types "github.com/tendermint/tendermint/rpc/lib/types"
- cmn "github.com/tendermint/tmlibs/common"
- )
-
- const (
- defaultMaxReconnectAttempts = 25
- defaultWriteWait = 0
- defaultReadWait = 0
- defaultPingPeriod = 0
- )
-
- // WSClient is a WebSocket client. The methods of WSClient are safe for use by
- // multiple goroutines.
- type WSClient struct {
- cmn.BaseService
-
- conn *websocket.Conn
- cdc *amino.Codec
-
- Address string // IP:PORT or /path/to/socket
- Endpoint string // /websocket/url/endpoint
- Dialer func(string, string) (net.Conn, error)
-
- // Time between sending a ping and receiving a pong. See
- // https://godoc.org/github.com/rcrowley/go-metrics#Timer.
- PingPongLatencyTimer metrics.Timer
-
- // Single user facing channel to read RPCResponses from, closed only when the client is being stopped.
- ResponsesCh chan types.RPCResponse
-
- // Callback, which will be called each time after successful reconnect.
- onReconnect func()
-
- // internal channels
- send chan types.RPCRequest // user requests
- backlog chan types.RPCRequest // stores a single user request received during a conn failure
- reconnectAfter chan error // reconnect requests
- readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
-
- wg sync.WaitGroup
-
- mtx sync.RWMutex
- sentLastPingAt time.Time
- reconnecting bool
-
- // Maximum reconnect attempts (0 or greater; default: 25).
- maxReconnectAttempts int
-
- // Time allowed to write a message to the server. 0 means block until operation succeeds.
- writeWait time.Duration
-
- // Time allowed to read the next message from the server. 0 means block until operation succeeds.
- readWait time.Duration
-
- // Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent.
- pingPeriod time.Duration
- }
-
- // NewWSClient returns a new client. See the commentary on the func(*WSClient)
- // functions for a detailed description of how to configure ping period and
- // pong wait time. The endpoint argument must begin with a `/`.
- func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient {
- addr, dialer := makeHTTPDialer(remoteAddr)
- c := &WSClient{
- cdc: amino.NewCodec(),
- Address: addr,
- Dialer: dialer,
- Endpoint: endpoint,
- PingPongLatencyTimer: metrics.NewTimer(),
-
- maxReconnectAttempts: defaultMaxReconnectAttempts,
- readWait: defaultReadWait,
- writeWait: defaultWriteWait,
- pingPeriod: defaultPingPeriod,
- }
- c.BaseService = *cmn.NewBaseService(nil, "WSClient", c)
- for _, option := range options {
- option(c)
- }
- return c
- }
-
- // MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error.
- // It should only be used in the constructor and is not Goroutine-safe.
- func MaxReconnectAttempts(max int) func(*WSClient) {
- return func(c *WSClient) {
- c.maxReconnectAttempts = max
- }
- }
-
- // ReadWait sets the amount of time to wait before a websocket read times out.
- // It should only be used in the constructor and is not Goroutine-safe.
- func ReadWait(readWait time.Duration) func(*WSClient) {
- return func(c *WSClient) {
- c.readWait = readWait
- }
- }
-
- // WriteWait sets the amount of time to wait before a websocket write times out.
- // It should only be used in the constructor and is not Goroutine-safe.
- func WriteWait(writeWait time.Duration) func(*WSClient) {
- return func(c *WSClient) {
- c.writeWait = writeWait
- }
- }
-
- // PingPeriod sets the duration for sending websocket pings.
- // It should only be used in the constructor - not Goroutine-safe.
- func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
- return func(c *WSClient) {
- c.pingPeriod = pingPeriod
- }
- }
-
- // OnReconnect sets the callback, which will be called every time after
- // successful reconnect.
- func OnReconnect(cb func()) func(*WSClient) {
- return func(c *WSClient) {
- c.onReconnect = cb
- }
- }
-
- // String returns WS client full address.
- func (c *WSClient) String() string {
- return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint)
- }
-
- // OnStart implements cmn.Service by dialing a server and creating read and
- // write routines.
- func (c *WSClient) OnStart() error {
- err := c.dial()
- if err != nil {
- return err
- }
-
- c.ResponsesCh = make(chan types.RPCResponse)
-
- c.send = make(chan types.RPCRequest)
- // 1 additional error may come from the read/write
- // goroutine depending on which failed first.
- c.reconnectAfter = make(chan error, 1)
- // capacity for 1 request. a user won't be able to send more because the send
- // channel is unbuffered.
- c.backlog = make(chan types.RPCRequest, 1)
-
- c.startReadWriteRoutines()
- go c.reconnectRoutine()
-
- return nil
- }
-
- // OnStop implements cmn.Service.
- func (c *WSClient) OnStop() {}
-
- // Stop overrides cmn.Service#Stop. There is no other way to wait until Quit
- // channel is closed.
- func (c *WSClient) Stop() error {
- if err := c.BaseService.Stop(); err != nil {
- return err
- }
- // only close user-facing channels when we can't write to them
- c.wg.Wait()
- close(c.ResponsesCh)
-
- return nil
- }
-
- // IsReconnecting returns true if the client is reconnecting right now.
- func (c *WSClient) IsReconnecting() bool {
- c.mtx.RLock()
- defer c.mtx.RUnlock()
- return c.reconnecting
- }
-
- // IsActive returns true if the client is running and not reconnecting.
- func (c *WSClient) IsActive() bool {
- return c.IsRunning() && !c.IsReconnecting()
- }
-
- // Send the given RPC request to the server. Results will be available on
- // ResponsesCh, errors, if any, on ErrorsCh. Will block until send succeeds or
- // ctx.Done is closed.
- func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error {
- select {
- case c.send <- request:
- c.Logger.Info("sent a request", "req", request)
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }
-
- // Call the given method. See Send description.
- func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error {
- request, err := types.MapToRequest(c.cdc, "ws-client", method, params)
- if err != nil {
- return err
- }
- return c.Send(ctx, request)
- }
-
- // CallWithArrayParams the given method with params in a form of array. See
- // Send description.
- func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error {
- request, err := types.ArrayToRequest(c.cdc, "ws-client", method, params)
- if err != nil {
- return err
- }
- return c.Send(ctx, request)
- }
-
- func (c *WSClient) Codec() *amino.Codec {
- return c.cdc
- }
-
- func (c *WSClient) SetCodec(cdc *amino.Codec) {
- c.cdc = cdc
- }
-
- ///////////////////////////////////////////////////////////////////////////////
- // Private methods
-
- func (c *WSClient) dial() error {
- dialer := &websocket.Dialer{
- NetDial: c.Dialer,
- Proxy: http.ProxyFromEnvironment,
- }
- rHeader := http.Header{}
- conn, _, err := dialer.Dial("ws://"+c.Address+c.Endpoint, rHeader)
- if err != nil {
- return err
- }
- c.conn = conn
- return nil
- }
-
- // reconnect tries to redial up to maxReconnectAttempts with exponential
- // backoff.
- func (c *WSClient) reconnect() error {
- attempt := 0
-
- c.mtx.Lock()
- c.reconnecting = true
- c.mtx.Unlock()
- defer func() {
- c.mtx.Lock()
- c.reconnecting = false
- c.mtx.Unlock()
- }()
-
- for {
- jitterSeconds := time.Duration(cmn.RandFloat64() * float64(time.Second)) // 1s == (1e9 ns)
- backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second)
-
- c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration)
- time.Sleep(backoffDuration)
-
- err := c.dial()
- if err != nil {
- c.Logger.Error("failed to redial", "err", err)
- } else {
- c.Logger.Info("reconnected")
- if c.onReconnect != nil {
- go c.onReconnect()
- }
- return nil
- }
-
- attempt++
-
- if attempt > c.maxReconnectAttempts {
- return errors.Wrap(err, "reached maximum reconnect attempts")
- }
- }
- }
-
- func (c *WSClient) startReadWriteRoutines() {
- c.wg.Add(2)
- c.readRoutineQuit = make(chan struct{})
- go c.readRoutine()
- go c.writeRoutine()
- }
-
- func (c *WSClient) processBacklog() error {
- select {
- case request := <-c.backlog:
- if c.writeWait > 0 {
- if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
- c.Logger.Error("failed to set write deadline", "err", err)
- }
- }
- if err := c.conn.WriteJSON(request); err != nil {
- c.Logger.Error("failed to resend request", "err", err)
- c.reconnectAfter <- err
- // requeue request
- c.backlog <- request
- return err
- }
- c.Logger.Info("resend a request", "req", request)
- default:
- }
- return nil
- }
-
- func (c *WSClient) reconnectRoutine() {
- for {
- select {
- case originalError := <-c.reconnectAfter:
- // wait until writeRoutine and readRoutine finish
- c.wg.Wait()
- if err := c.reconnect(); err != nil {
- c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError)
- c.Stop()
- return
- }
- // drain reconnectAfter
- LOOP:
- for {
- select {
- case <-c.reconnectAfter:
- default:
- break LOOP
- }
- }
- err := c.processBacklog()
- if err == nil {
- c.startReadWriteRoutines()
- }
-
- case <-c.Quit():
- return
- }
- }
- }
-
- // The client ensures that there is at most one writer to a connection by
- // executing all writes from this goroutine.
- func (c *WSClient) writeRoutine() {
- var ticker *time.Ticker
- if c.pingPeriod > 0 {
- // ticker with a predefined period
- ticker = time.NewTicker(c.pingPeriod)
- } else {
- // ticker that never fires
- ticker = &time.Ticker{C: make(<-chan time.Time)}
- }
-
- defer func() {
- ticker.Stop()
- if err := c.conn.Close(); err != nil {
- // ignore error; it will trigger in tests
- // likely because it's closing an already closed connection
- }
- c.wg.Done()
- }()
-
- for {
- select {
- case request := <-c.send:
- if c.writeWait > 0 {
- if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
- c.Logger.Error("failed to set write deadline", "err", err)
- }
- }
- if err := c.conn.WriteJSON(request); err != nil {
- c.Logger.Error("failed to send request", "err", err)
- c.reconnectAfter <- err
- // add request to the backlog, so we don't lose it
- c.backlog <- request
- return
- }
- case <-ticker.C:
- if c.writeWait > 0 {
- if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
- c.Logger.Error("failed to set write deadline", "err", err)
- }
- }
- if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
- c.Logger.Error("failed to write ping", "err", err)
- c.reconnectAfter <- err
- return
- }
- c.mtx.Lock()
- c.sentLastPingAt = time.Now()
- c.mtx.Unlock()
- c.Logger.Debug("sent ping")
- case <-c.readRoutineQuit:
- return
- case <-c.Quit():
- if err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
- c.Logger.Error("failed to write message", "err", err)
- }
- return
- }
- }
- }
-
- // The client ensures that there is at most one reader to a connection by
- // executing all reads from this goroutine.
- func (c *WSClient) readRoutine() {
- defer func() {
- if err := c.conn.Close(); err != nil {
- // ignore error; it will trigger in tests
- // likely because it's closing an already closed connection
- }
- c.wg.Done()
- }()
-
- c.conn.SetPongHandler(func(string) error {
- // gather latency stats
- c.mtx.RLock()
- t := c.sentLastPingAt
- c.mtx.RUnlock()
- c.PingPongLatencyTimer.UpdateSince(t)
-
- c.Logger.Debug("got pong")
- return nil
- })
-
- for {
- // reset deadline for every message type (control or data)
- if c.readWait > 0 {
- if err := c.conn.SetReadDeadline(time.Now().Add(c.readWait)); err != nil {
- c.Logger.Error("failed to set read deadline", "err", err)
- }
- }
- _, data, err := c.conn.ReadMessage()
- if err != nil {
- if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
- return
- }
-
- c.Logger.Error("failed to read response", "err", err)
- close(c.readRoutineQuit)
- c.reconnectAfter <- err
- return
- }
-
- var response types.RPCResponse
- err = json.Unmarshal(data, &response)
- if err != nil {
- c.Logger.Error("failed to parse response", "err", err, "data", string(data))
- continue
- }
- c.Logger.Info("got response", "resp", response.Result)
- // Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking
- // c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
- // both readRoutine and writeRoutine
- select {
- case <-c.Quit():
- case c.ResponsesCh <- response:
- }
- }
- }
-
- ///////////////////////////////////////////////////////////////////////////////
- // Predefined methods
-
- // Subscribe to a query. Note the server must have a "subscribe" route
- // defined.
- func (c *WSClient) Subscribe(ctx context.Context, query string) error {
- params := map[string]interface{}{"query": query}
- return c.Call(ctx, "subscribe", params)
- }
-
- // Unsubscribe from a query. Note the server must have a "unsubscribe" route
- // defined.
- func (c *WSClient) Unsubscribe(ctx context.Context, query string) error {
- params := map[string]interface{}{"query": query}
- return c.Call(ctx, "unsubscribe", params)
- }
-
- // UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route
- // defined.
- func (c *WSClient) UnsubscribeAll(ctx context.Context) error {
- params := map[string]interface{}{}
- return c.Call(ctx, "unsubscribe_all", params)
- }
|