You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

456 lines
12 KiB

7 years ago
7 years ago
7 years ago
7 years ago
  1. package rpcclient
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "math"
  7. "net"
  8. "net/http"
  9. "sync"
  10. "time"
  11. "github.com/gorilla/websocket"
  12. "github.com/pkg/errors"
  13. metrics "github.com/rcrowley/go-metrics"
  14. types "github.com/tendermint/tendermint/rpc/lib/types"
  15. cmn "github.com/tendermint/tmlibs/common"
  16. )
  17. const (
  18. defaultMaxReconnectAttempts = 25
  19. defaultWriteWait = 0
  20. defaultReadWait = 0
  21. defaultPingPeriod = 0
  22. )
  23. // WSClient is a WebSocket client. The methods of WSClient are safe for use by
  24. // multiple goroutines.
  25. type WSClient struct {
  26. cmn.BaseService
  27. conn *websocket.Conn
  28. Address string // IP:PORT or /path/to/socket
  29. Endpoint string // /websocket/url/endpoint
  30. Dialer func(string, string) (net.Conn, error)
  31. // Time between sending a ping and receiving a pong. See
  32. // https://godoc.org/github.com/rcrowley/go-metrics#Timer.
  33. PingPongLatencyTimer metrics.Timer
  34. // user facing channels, closed only when the client is being stopped.
  35. ResultsCh chan json.RawMessage
  36. ErrorsCh chan error
  37. // internal channels
  38. send chan types.RPCRequest // user requests
  39. backlog chan types.RPCRequest // stores a single user request received during a conn failure
  40. reconnectAfter chan error // reconnect requests
  41. readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
  42. wg sync.WaitGroup
  43. mtx sync.RWMutex
  44. sentLastPingAt time.Time
  45. reconnecting bool
  46. // Maximum reconnect attempts (0 or greater; default: 25).
  47. maxReconnectAttempts int
  48. // Time allowed to write a message to the server. 0 means block until operation succeeds.
  49. writeWait time.Duration
  50. // Time allowed to read the next message from the server. 0 means block until operation succeeds.
  51. readWait time.Duration
  52. // Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent.
  53. pingPeriod time.Duration
  54. }
  55. // NewWSClient returns a new client. See the commentary on the func(*WSClient)
  56. // functions for a detailed description of how to configure ping period and
  57. // pong wait time. The endpoint argument must begin with a `/`.
  58. func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient {
  59. addr, dialer := makeHTTPDialer(remoteAddr)
  60. c := &WSClient{
  61. Address: addr,
  62. Dialer: dialer,
  63. Endpoint: endpoint,
  64. PingPongLatencyTimer: metrics.NewTimer(),
  65. maxReconnectAttempts: defaultMaxReconnectAttempts,
  66. readWait: defaultReadWait,
  67. writeWait: defaultWriteWait,
  68. pingPeriod: defaultPingPeriod,
  69. }
  70. c.BaseService = *cmn.NewBaseService(nil, "WSClient", c)
  71. for _, option := range options {
  72. option(c)
  73. }
  74. return c
  75. }
  76. // MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error.
  77. // It should only be used in the constructor and is not Goroutine-safe.
  78. func MaxReconnectAttempts(max int) func(*WSClient) {
  79. return func(c *WSClient) {
  80. c.maxReconnectAttempts = max
  81. }
  82. }
  83. // ReadWait sets the amount of time to wait before a websocket read times out.
  84. // It should only be used in the constructor and is not Goroutine-safe.
  85. func ReadWait(readWait time.Duration) func(*WSClient) {
  86. return func(c *WSClient) {
  87. c.readWait = readWait
  88. }
  89. }
  90. // WriteWait sets the amount of time to wait before a websocket write times out.
  91. // It should only be used in the constructor and is not Goroutine-safe.
  92. func WriteWait(writeWait time.Duration) func(*WSClient) {
  93. return func(c *WSClient) {
  94. c.writeWait = writeWait
  95. }
  96. }
  97. // PingPeriod sets the duration for sending websocket pings.
  98. // It should only be used in the constructor - not Goroutine-safe.
  99. func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
  100. return func(c *WSClient) {
  101. c.pingPeriod = pingPeriod
  102. }
  103. }
  104. // String returns WS client full address.
  105. func (c *WSClient) String() string {
  106. return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint)
  107. }
  108. // OnStart implements cmn.Service by dialing a server and creating read and
  109. // write routines.
  110. func (c *WSClient) OnStart() error {
  111. err := c.dial()
  112. if err != nil {
  113. return err
  114. }
  115. c.ResultsCh = make(chan json.RawMessage)
  116. c.ErrorsCh = make(chan error)
  117. c.send = make(chan types.RPCRequest)
  118. // 1 additional error may come from the read/write
  119. // goroutine depending on which failed first.
  120. c.reconnectAfter = make(chan error, 1)
  121. // capacity for 1 request. a user won't be able to send more because the send
  122. // channel is unbuffered.
  123. c.backlog = make(chan types.RPCRequest, 1)
  124. c.startReadWriteRoutines()
  125. go c.reconnectRoutine()
  126. return nil
  127. }
  128. // OnStop implements cmn.Service.
  129. func (c *WSClient) OnStop() {}
  130. // Stop overrides cmn.Service#Stop. There is no other way to wait until Quit
  131. // channel is closed.
  132. func (c *WSClient) Stop() bool {
  133. success := c.BaseService.Stop()
  134. // only close user-facing channels when we can't write to them
  135. c.wg.Wait()
  136. close(c.ResultsCh)
  137. close(c.ErrorsCh)
  138. return success
  139. }
  140. // IsReconnecting returns true if the client is reconnecting right now.
  141. func (c *WSClient) IsReconnecting() bool {
  142. c.mtx.RLock()
  143. defer c.mtx.RUnlock()
  144. return c.reconnecting
  145. }
  146. // IsActive returns true if the client is running and not reconnecting.
  147. func (c *WSClient) IsActive() bool {
  148. return c.IsRunning() && !c.IsReconnecting()
  149. }
  150. // Send the given RPC request to the server. Results will be available on
  151. // ResultsCh, errors, if any, on ErrorsCh. Will block until send succeeds or
  152. // ctx.Done is closed.
  153. func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error {
  154. select {
  155. case c.send <- request:
  156. c.Logger.Info("sent a request", "req", request)
  157. return nil
  158. case <-ctx.Done():
  159. return ctx.Err()
  160. }
  161. }
  162. // Call the given method. See Send description.
  163. func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error {
  164. request, err := types.MapToRequest("ws-client", method, params)
  165. if err != nil {
  166. return err
  167. }
  168. return c.Send(ctx, request)
  169. }
  170. // CallWithArrayParams the given method with params in a form of array. See
  171. // Send description.
  172. func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error {
  173. request, err := types.ArrayToRequest("ws-client", method, params)
  174. if err != nil {
  175. return err
  176. }
  177. return c.Send(ctx, request)
  178. }
  179. ///////////////////////////////////////////////////////////////////////////////
  180. // Private methods
  181. func (c *WSClient) dial() error {
  182. dialer := &websocket.Dialer{
  183. NetDial: c.Dialer,
  184. Proxy: http.ProxyFromEnvironment,
  185. }
  186. rHeader := http.Header{}
  187. conn, _, err := dialer.Dial("ws://"+c.Address+c.Endpoint, rHeader)
  188. if err != nil {
  189. return err
  190. }
  191. c.conn = conn
  192. return nil
  193. }
  194. // reconnect tries to redial up to maxReconnectAttempts with exponential
  195. // backoff.
  196. func (c *WSClient) reconnect() error {
  197. attempt := 0
  198. c.mtx.Lock()
  199. c.reconnecting = true
  200. c.mtx.Unlock()
  201. defer func() {
  202. c.mtx.Lock()
  203. c.reconnecting = false
  204. c.mtx.Unlock()
  205. }()
  206. for {
  207. c.Logger.Info("reconnecting", "attempt", attempt+1)
  208. d := time.Duration(math.Exp2(float64(attempt)))
  209. time.Sleep(d * time.Second)
  210. err := c.dial()
  211. if err != nil {
  212. c.Logger.Error("failed to redial", "err", err)
  213. } else {
  214. c.Logger.Info("reconnected")
  215. return nil
  216. }
  217. attempt++
  218. if attempt > c.maxReconnectAttempts {
  219. return errors.Wrap(err, "reached maximum reconnect attempts")
  220. }
  221. }
  222. }
  223. func (c *WSClient) startReadWriteRoutines() {
  224. c.wg.Add(2)
  225. c.readRoutineQuit = make(chan struct{})
  226. go c.readRoutine()
  227. go c.writeRoutine()
  228. }
  229. func (c *WSClient) processBacklog() error {
  230. select {
  231. case request := <-c.backlog:
  232. if c.writeWait > 0 {
  233. c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
  234. }
  235. err := c.conn.WriteJSON(request)
  236. if err != nil {
  237. c.Logger.Error("failed to resend request", "err", err)
  238. c.reconnectAfter <- err
  239. // requeue request
  240. c.backlog <- request
  241. return err
  242. }
  243. c.Logger.Info("resend a request", "req", request)
  244. default:
  245. }
  246. return nil
  247. }
  248. func (c *WSClient) reconnectRoutine() {
  249. for {
  250. select {
  251. case originalError := <-c.reconnectAfter:
  252. // wait until writeRoutine and readRoutine finish
  253. c.wg.Wait()
  254. err := c.reconnect()
  255. if err != nil {
  256. c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError)
  257. c.Stop()
  258. return
  259. } else {
  260. // drain reconnectAfter
  261. LOOP:
  262. for {
  263. select {
  264. case <-c.reconnectAfter:
  265. default:
  266. break LOOP
  267. }
  268. }
  269. err = c.processBacklog()
  270. if err == nil {
  271. c.startReadWriteRoutines()
  272. }
  273. }
  274. case <-c.Quit:
  275. return
  276. }
  277. }
  278. }
  279. // The client ensures that there is at most one writer to a connection by
  280. // executing all writes from this goroutine.
  281. func (c *WSClient) writeRoutine() {
  282. var ticker *time.Ticker
  283. if c.pingPeriod > 0 {
  284. // ticker with a predefined period
  285. ticker = time.NewTicker(c.pingPeriod)
  286. } else {
  287. // ticker that never fires
  288. ticker = &time.Ticker{C: make(<-chan time.Time)}
  289. }
  290. defer func() {
  291. ticker.Stop()
  292. c.conn.Close()
  293. c.wg.Done()
  294. }()
  295. for {
  296. select {
  297. case request := <-c.send:
  298. if c.writeWait > 0 {
  299. c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
  300. }
  301. err := c.conn.WriteJSON(request)
  302. if err != nil {
  303. c.Logger.Error("failed to send request", "err", err)
  304. c.reconnectAfter <- err
  305. // add request to the backlog, so we don't lose it
  306. c.backlog <- request
  307. return
  308. }
  309. case <-ticker.C:
  310. if c.writeWait > 0 {
  311. c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
  312. }
  313. err := c.conn.WriteMessage(websocket.PingMessage, []byte{})
  314. if err != nil {
  315. c.Logger.Error("failed to write ping", "err", err)
  316. c.reconnectAfter <- err
  317. return
  318. }
  319. c.mtx.Lock()
  320. c.sentLastPingAt = time.Now()
  321. c.mtx.Unlock()
  322. c.Logger.Debug("sent ping")
  323. case <-c.readRoutineQuit:
  324. return
  325. case <-c.Quit:
  326. c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  327. return
  328. }
  329. }
  330. }
  331. // The client ensures that there is at most one reader to a connection by
  332. // executing all reads from this goroutine.
  333. func (c *WSClient) readRoutine() {
  334. defer func() {
  335. c.conn.Close()
  336. c.wg.Done()
  337. }()
  338. c.conn.SetPongHandler(func(string) error {
  339. // gather latency stats
  340. c.mtx.RLock()
  341. t := c.sentLastPingAt
  342. c.mtx.RUnlock()
  343. c.PingPongLatencyTimer.UpdateSince(t)
  344. c.Logger.Debug("got pong")
  345. return nil
  346. })
  347. for {
  348. // reset deadline for every message type (control or data)
  349. if c.readWait > 0 {
  350. c.conn.SetReadDeadline(time.Now().Add(c.readWait))
  351. }
  352. _, data, err := c.conn.ReadMessage()
  353. if err != nil {
  354. if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
  355. return
  356. }
  357. c.Logger.Error("failed to read response", "err", err)
  358. close(c.readRoutineQuit)
  359. c.reconnectAfter <- err
  360. return
  361. }
  362. var response types.RPCResponse
  363. err = json.Unmarshal(data, &response)
  364. if err != nil {
  365. c.Logger.Error("failed to parse response", "err", err, "data", string(data))
  366. c.ErrorsCh <- err
  367. continue
  368. }
  369. if response.Error != nil {
  370. c.ErrorsCh <- errors.New(response.Error.Message)
  371. continue
  372. }
  373. c.Logger.Info("got response", "resp", response.Result)
  374. c.ResultsCh <- *response.Result
  375. }
  376. }
  377. ///////////////////////////////////////////////////////////////////////////////
  378. // Predefined methods
  379. // Subscribe to an event. Note the server must have a "subscribe" route
  380. // defined.
  381. func (c *WSClient) Subscribe(ctx context.Context, eventType string) error {
  382. params := map[string]interface{}{"event": eventType}
  383. return c.Call(ctx, "subscribe", params)
  384. }
  385. // Unsubscribe from an event. Note the server must have a "unsubscribe" route
  386. // defined.
  387. func (c *WSClient) Unsubscribe(ctx context.Context, eventType string) error {
  388. params := map[string]interface{}{"event": eventType}
  389. return c.Call(ctx, "unsubscribe", params)
  390. }
  391. // UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route
  392. // defined.
  393. func (c *WSClient) UnsubscribeAll(ctx context.Context) error {
  394. params := map[string]interface{}{}
  395. return c.Call(ctx, "unsubscribe_all", params)
  396. }