You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

470 lines
12 KiB

7 years ago
7 years ago
7 years ago
7 years ago
  1. package rpcclient
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "math"
  7. "net"
  8. "net/http"
  9. "sync"
  10. "time"
  11. "github.com/gorilla/websocket"
  12. "github.com/pkg/errors"
  13. metrics "github.com/rcrowley/go-metrics"
  14. types "github.com/tendermint/tendermint/rpc/lib/types"
  15. cmn "github.com/tendermint/tmlibs/common"
  16. )
  17. const (
  18. defaultMaxReconnectAttempts = 25
  19. defaultWriteWait = 0
  20. defaultReadWait = 0
  21. defaultPingPeriod = 0
  22. )
  23. // WSClient is a WebSocket client. The methods of WSClient are safe for use by
  24. // multiple goroutines.
  25. type WSClient struct {
  26. cmn.BaseService
  27. conn *websocket.Conn
  28. Address string // IP:PORT or /path/to/socket
  29. Endpoint string // /websocket/url/endpoint
  30. Dialer func(string, string) (net.Conn, error)
  31. // Time between sending a ping and receiving a pong. See
  32. // https://godoc.org/github.com/rcrowley/go-metrics#Timer.
  33. PingPongLatencyTimer metrics.Timer
  34. // user facing channels, closed only when the client is being stopped.
  35. ResultsCh chan json.RawMessage
  36. ErrorsCh chan error
  37. // Callback, which will be called each time after successful reconnect.
  38. onReconnect func()
  39. // internal channels
  40. send chan types.RPCRequest // user requests
  41. backlog chan types.RPCRequest // stores a single user request received during a conn failure
  42. reconnectAfter chan error // reconnect requests
  43. readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
  44. wg sync.WaitGroup
  45. mtx sync.RWMutex
  46. sentLastPingAt time.Time
  47. reconnecting bool
  48. // Maximum reconnect attempts (0 or greater; default: 25).
  49. maxReconnectAttempts int
  50. // Time allowed to write a message to the server. 0 means block until operation succeeds.
  51. writeWait time.Duration
  52. // Time allowed to read the next message from the server. 0 means block until operation succeeds.
  53. readWait time.Duration
  54. // Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent.
  55. pingPeriod time.Duration
  56. }
  57. // NewWSClient returns a new client. See the commentary on the func(*WSClient)
  58. // functions for a detailed description of how to configure ping period and
  59. // pong wait time. The endpoint argument must begin with a `/`.
  60. func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient {
  61. addr, dialer := makeHTTPDialer(remoteAddr)
  62. c := &WSClient{
  63. Address: addr,
  64. Dialer: dialer,
  65. Endpoint: endpoint,
  66. PingPongLatencyTimer: metrics.NewTimer(),
  67. maxReconnectAttempts: defaultMaxReconnectAttempts,
  68. readWait: defaultReadWait,
  69. writeWait: defaultWriteWait,
  70. pingPeriod: defaultPingPeriod,
  71. }
  72. c.BaseService = *cmn.NewBaseService(nil, "WSClient", c)
  73. for _, option := range options {
  74. option(c)
  75. }
  76. return c
  77. }
  78. // MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error.
  79. // It should only be used in the constructor and is not Goroutine-safe.
  80. func MaxReconnectAttempts(max int) func(*WSClient) {
  81. return func(c *WSClient) {
  82. c.maxReconnectAttempts = max
  83. }
  84. }
  85. // ReadWait sets the amount of time to wait before a websocket read times out.
  86. // It should only be used in the constructor and is not Goroutine-safe.
  87. func ReadWait(readWait time.Duration) func(*WSClient) {
  88. return func(c *WSClient) {
  89. c.readWait = readWait
  90. }
  91. }
  92. // WriteWait sets the amount of time to wait before a websocket write times out.
  93. // It should only be used in the constructor and is not Goroutine-safe.
  94. func WriteWait(writeWait time.Duration) func(*WSClient) {
  95. return func(c *WSClient) {
  96. c.writeWait = writeWait
  97. }
  98. }
  99. // PingPeriod sets the duration for sending websocket pings.
  100. // It should only be used in the constructor - not Goroutine-safe.
  101. func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
  102. return func(c *WSClient) {
  103. c.pingPeriod = pingPeriod
  104. }
  105. }
  106. // OnReconnect sets the callback, which will be called every time after
  107. // successful reconnect.
  108. func OnReconnect(cb func()) func(*WSClient) {
  109. return func(c *WSClient) {
  110. c.onReconnect = cb
  111. }
  112. }
  113. // String returns WS client full address.
  114. func (c *WSClient) String() string {
  115. return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint)
  116. }
  117. // OnStart implements cmn.Service by dialing a server and creating read and
  118. // write routines.
  119. func (c *WSClient) OnStart() error {
  120. err := c.dial()
  121. if err != nil {
  122. return err
  123. }
  124. c.ResultsCh = make(chan json.RawMessage)
  125. c.ErrorsCh = make(chan error)
  126. c.send = make(chan types.RPCRequest)
  127. // 1 additional error may come from the read/write
  128. // goroutine depending on which failed first.
  129. c.reconnectAfter = make(chan error, 1)
  130. // capacity for 1 request. a user won't be able to send more because the send
  131. // channel is unbuffered.
  132. c.backlog = make(chan types.RPCRequest, 1)
  133. c.startReadWriteRoutines()
  134. go c.reconnectRoutine()
  135. return nil
  136. }
  137. // OnStop implements cmn.Service.
  138. func (c *WSClient) OnStop() {}
  139. // Stop overrides cmn.Service#Stop. There is no other way to wait until Quit
  140. // channel is closed.
  141. func (c *WSClient) Stop() bool {
  142. success := c.BaseService.Stop()
  143. // only close user-facing channels when we can't write to them
  144. c.wg.Wait()
  145. close(c.ResultsCh)
  146. close(c.ErrorsCh)
  147. return success
  148. }
  149. // IsReconnecting returns true if the client is reconnecting right now.
  150. func (c *WSClient) IsReconnecting() bool {
  151. c.mtx.RLock()
  152. defer c.mtx.RUnlock()
  153. return c.reconnecting
  154. }
  155. // IsActive returns true if the client is running and not reconnecting.
  156. func (c *WSClient) IsActive() bool {
  157. return c.IsRunning() && !c.IsReconnecting()
  158. }
  159. // Send the given RPC request to the server. Results will be available on
  160. // ResultsCh, errors, if any, on ErrorsCh. Will block until send succeeds or
  161. // ctx.Done is closed.
  162. func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error {
  163. select {
  164. case c.send <- request:
  165. c.Logger.Info("sent a request", "req", request)
  166. return nil
  167. case <-ctx.Done():
  168. return ctx.Err()
  169. }
  170. }
  171. // Call the given method. See Send description.
  172. func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error {
  173. request, err := types.MapToRequest("ws-client", method, params)
  174. if err != nil {
  175. return err
  176. }
  177. return c.Send(ctx, request)
  178. }
  179. // CallWithArrayParams the given method with params in a form of array. See
  180. // Send description.
  181. func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error {
  182. request, err := types.ArrayToRequest("ws-client", method, params)
  183. if err != nil {
  184. return err
  185. }
  186. return c.Send(ctx, request)
  187. }
  188. ///////////////////////////////////////////////////////////////////////////////
  189. // Private methods
  190. func (c *WSClient) dial() error {
  191. dialer := &websocket.Dialer{
  192. NetDial: c.Dialer,
  193. Proxy: http.ProxyFromEnvironment,
  194. }
  195. rHeader := http.Header{}
  196. conn, _, err := dialer.Dial("ws://"+c.Address+c.Endpoint, rHeader)
  197. if err != nil {
  198. return err
  199. }
  200. c.conn = conn
  201. return nil
  202. }
  203. // reconnect tries to redial up to maxReconnectAttempts with exponential
  204. // backoff.
  205. func (c *WSClient) reconnect() error {
  206. attempt := 0
  207. c.mtx.Lock()
  208. c.reconnecting = true
  209. c.mtx.Unlock()
  210. defer func() {
  211. c.mtx.Lock()
  212. c.reconnecting = false
  213. c.mtx.Unlock()
  214. }()
  215. for {
  216. c.Logger.Info("reconnecting", "attempt", attempt+1)
  217. d := time.Duration(math.Exp2(float64(attempt)))
  218. time.Sleep(d * time.Second)
  219. err := c.dial()
  220. if err != nil {
  221. c.Logger.Error("failed to redial", "err", err)
  222. } else {
  223. c.Logger.Info("reconnected")
  224. if c.onReconnect != nil {
  225. go c.onReconnect()
  226. }
  227. return nil
  228. }
  229. attempt++
  230. if attempt > c.maxReconnectAttempts {
  231. return errors.Wrap(err, "reached maximum reconnect attempts")
  232. }
  233. }
  234. }
  235. func (c *WSClient) startReadWriteRoutines() {
  236. c.wg.Add(2)
  237. c.readRoutineQuit = make(chan struct{})
  238. go c.readRoutine()
  239. go c.writeRoutine()
  240. }
  241. func (c *WSClient) processBacklog() error {
  242. select {
  243. case request := <-c.backlog:
  244. if c.writeWait > 0 {
  245. c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
  246. }
  247. err := c.conn.WriteJSON(request)
  248. if err != nil {
  249. c.Logger.Error("failed to resend request", "err", err)
  250. c.reconnectAfter <- err
  251. // requeue request
  252. c.backlog <- request
  253. return err
  254. }
  255. c.Logger.Info("resend a request", "req", request)
  256. default:
  257. }
  258. return nil
  259. }
  260. func (c *WSClient) reconnectRoutine() {
  261. for {
  262. select {
  263. case originalError := <-c.reconnectAfter:
  264. // wait until writeRoutine and readRoutine finish
  265. c.wg.Wait()
  266. err := c.reconnect()
  267. if err != nil {
  268. c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError)
  269. c.Stop()
  270. return
  271. } else {
  272. // drain reconnectAfter
  273. LOOP:
  274. for {
  275. select {
  276. case <-c.reconnectAfter:
  277. default:
  278. break LOOP
  279. }
  280. }
  281. err = c.processBacklog()
  282. if err == nil {
  283. c.startReadWriteRoutines()
  284. }
  285. }
  286. case <-c.Quit:
  287. return
  288. }
  289. }
  290. }
  291. // The client ensures that there is at most one writer to a connection by
  292. // executing all writes from this goroutine.
  293. func (c *WSClient) writeRoutine() {
  294. var ticker *time.Ticker
  295. if c.pingPeriod > 0 {
  296. // ticker with a predefined period
  297. ticker = time.NewTicker(c.pingPeriod)
  298. } else {
  299. // ticker that never fires
  300. ticker = &time.Ticker{C: make(<-chan time.Time)}
  301. }
  302. defer func() {
  303. ticker.Stop()
  304. c.conn.Close()
  305. c.wg.Done()
  306. }()
  307. for {
  308. select {
  309. case request := <-c.send:
  310. if c.writeWait > 0 {
  311. c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
  312. }
  313. err := c.conn.WriteJSON(request)
  314. if err != nil {
  315. c.Logger.Error("failed to send request", "err", err)
  316. c.reconnectAfter <- err
  317. // add request to the backlog, so we don't lose it
  318. c.backlog <- request
  319. return
  320. }
  321. case <-ticker.C:
  322. if c.writeWait > 0 {
  323. c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
  324. }
  325. err := c.conn.WriteMessage(websocket.PingMessage, []byte{})
  326. if err != nil {
  327. c.Logger.Error("failed to write ping", "err", err)
  328. c.reconnectAfter <- err
  329. return
  330. }
  331. c.mtx.Lock()
  332. c.sentLastPingAt = time.Now()
  333. c.mtx.Unlock()
  334. c.Logger.Debug("sent ping")
  335. case <-c.readRoutineQuit:
  336. return
  337. case <-c.Quit:
  338. c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  339. return
  340. }
  341. }
  342. }
  343. // The client ensures that there is at most one reader to a connection by
  344. // executing all reads from this goroutine.
  345. func (c *WSClient) readRoutine() {
  346. defer func() {
  347. c.conn.Close()
  348. c.wg.Done()
  349. }()
  350. c.conn.SetPongHandler(func(string) error {
  351. // gather latency stats
  352. c.mtx.RLock()
  353. t := c.sentLastPingAt
  354. c.mtx.RUnlock()
  355. c.PingPongLatencyTimer.UpdateSince(t)
  356. c.Logger.Debug("got pong")
  357. return nil
  358. })
  359. for {
  360. // reset deadline for every message type (control or data)
  361. if c.readWait > 0 {
  362. c.conn.SetReadDeadline(time.Now().Add(c.readWait))
  363. }
  364. _, data, err := c.conn.ReadMessage()
  365. if err != nil {
  366. if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
  367. return
  368. }
  369. c.Logger.Error("failed to read response", "err", err)
  370. close(c.readRoutineQuit)
  371. c.reconnectAfter <- err
  372. return
  373. }
  374. var response types.RPCResponse
  375. err = json.Unmarshal(data, &response)
  376. if err != nil {
  377. c.Logger.Error("failed to parse response", "err", err, "data", string(data))
  378. c.ErrorsCh <- err
  379. continue
  380. }
  381. if response.Error != nil {
  382. c.ErrorsCh <- response.Error
  383. continue
  384. }
  385. c.Logger.Info("got response", "resp", response.Result)
  386. c.ResultsCh <- *response.Result
  387. }
  388. }
  389. ///////////////////////////////////////////////////////////////////////////////
  390. // Predefined methods
  391. // Subscribe to an event. Note the server must have a "subscribe" route
  392. // defined.
  393. func (c *WSClient) Subscribe(ctx context.Context, eventType string) error {
  394. params := map[string]interface{}{"event": eventType}
  395. return c.Call(ctx, "subscribe", params)
  396. }
  397. // Unsubscribe from an event. Note the server must have a "unsubscribe" route
  398. // defined.
  399. func (c *WSClient) Unsubscribe(ctx context.Context, eventType string) error {
  400. params := map[string]interface{}{"event": eventType}
  401. return c.Call(ctx, "unsubscribe", params)
  402. }
  403. // UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route
  404. // defined.
  405. func (c *WSClient) UnsubscribeAll(ctx context.Context) error {
  406. params := map[string]interface{}{}
  407. return c.Call(ctx, "unsubscribe_all", params)
  408. }