You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

471 lines
12 KiB

7 years ago
7 years ago
7 years ago
7 years ago
  1. package rpcclient
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "math/rand"
  7. "net"
  8. "net/http"
  9. "sync"
  10. "time"
  11. "github.com/gorilla/websocket"
  12. "github.com/pkg/errors"
  13. metrics "github.com/rcrowley/go-metrics"
  14. types "github.com/tendermint/tendermint/rpc/lib/types"
  15. cmn "github.com/tendermint/tmlibs/common"
  16. )
  17. const (
  18. defaultMaxReconnectAttempts = 25
  19. defaultWriteWait = 0
  20. defaultReadWait = 0
  21. defaultPingPeriod = 0
  22. )
  23. // WSClient is a WebSocket client. The methods of WSClient are safe for use by
  24. // multiple goroutines.
  25. type WSClient struct {
  26. cmn.BaseService
  27. conn *websocket.Conn
  28. Address string // IP:PORT or /path/to/socket
  29. Endpoint string // /websocket/url/endpoint
  30. Dialer func(string, string) (net.Conn, error)
  31. // Time between sending a ping and receiving a pong. See
  32. // https://godoc.org/github.com/rcrowley/go-metrics#Timer.
  33. PingPongLatencyTimer metrics.Timer
  34. // Single user facing channel to read RPCResponses from, closed only when the client is being stopped.
  35. ResponsesCh chan types.RPCResponse
  36. // Callback, which will be called each time after successful reconnect.
  37. onReconnect func()
  38. // internal channels
  39. send chan types.RPCRequest // user requests
  40. backlog chan types.RPCRequest // stores a single user request received during a conn failure
  41. reconnectAfter chan error // reconnect requests
  42. readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
  43. wg sync.WaitGroup
  44. mtx sync.RWMutex
  45. sentLastPingAt time.Time
  46. reconnecting bool
  47. // Maximum reconnect attempts (0 or greater; default: 25).
  48. maxReconnectAttempts int
  49. // Time allowed to write a message to the server. 0 means block until operation succeeds.
  50. writeWait time.Duration
  51. // Time allowed to read the next message from the server. 0 means block until operation succeeds.
  52. readWait time.Duration
  53. // Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent.
  54. pingPeriod time.Duration
  55. }
  56. // NewWSClient returns a new client. See the commentary on the func(*WSClient)
  57. // functions for a detailed description of how to configure ping period and
  58. // pong wait time. The endpoint argument must begin with a `/`.
  59. func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient {
  60. addr, dialer := makeHTTPDialer(remoteAddr)
  61. c := &WSClient{
  62. Address: addr,
  63. Dialer: dialer,
  64. Endpoint: endpoint,
  65. PingPongLatencyTimer: metrics.NewTimer(),
  66. maxReconnectAttempts: defaultMaxReconnectAttempts,
  67. readWait: defaultReadWait,
  68. writeWait: defaultWriteWait,
  69. pingPeriod: defaultPingPeriod,
  70. }
  71. c.BaseService = *cmn.NewBaseService(nil, "WSClient", c)
  72. for _, option := range options {
  73. option(c)
  74. }
  75. return c
  76. }
  77. // MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error.
  78. // It should only be used in the constructor and is not Goroutine-safe.
  79. func MaxReconnectAttempts(max int) func(*WSClient) {
  80. return func(c *WSClient) {
  81. c.maxReconnectAttempts = max
  82. }
  83. }
  84. // ReadWait sets the amount of time to wait before a websocket read times out.
  85. // It should only be used in the constructor and is not Goroutine-safe.
  86. func ReadWait(readWait time.Duration) func(*WSClient) {
  87. return func(c *WSClient) {
  88. c.readWait = readWait
  89. }
  90. }
  91. // WriteWait sets the amount of time to wait before a websocket write times out.
  92. // It should only be used in the constructor and is not Goroutine-safe.
  93. func WriteWait(writeWait time.Duration) func(*WSClient) {
  94. return func(c *WSClient) {
  95. c.writeWait = writeWait
  96. }
  97. }
  98. // PingPeriod sets the duration for sending websocket pings.
  99. // It should only be used in the constructor - not Goroutine-safe.
  100. func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
  101. return func(c *WSClient) {
  102. c.pingPeriod = pingPeriod
  103. }
  104. }
  105. // OnReconnect sets the callback, which will be called every time after
  106. // successful reconnect.
  107. func OnReconnect(cb func()) func(*WSClient) {
  108. return func(c *WSClient) {
  109. c.onReconnect = cb
  110. }
  111. }
  112. // String returns WS client full address.
  113. func (c *WSClient) String() string {
  114. return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint)
  115. }
  116. // OnStart implements cmn.Service by dialing a server and creating read and
  117. // write routines.
  118. func (c *WSClient) OnStart() error {
  119. err := c.dial()
  120. if err != nil {
  121. return err
  122. }
  123. c.ResponsesCh = make(chan types.RPCResponse)
  124. c.send = make(chan types.RPCRequest)
  125. // 1 additional error may come from the read/write
  126. // goroutine depending on which failed first.
  127. c.reconnectAfter = make(chan error, 1)
  128. // capacity for 1 request. a user won't be able to send more because the send
  129. // channel is unbuffered.
  130. c.backlog = make(chan types.RPCRequest, 1)
  131. c.startReadWriteRoutines()
  132. go c.reconnectRoutine()
  133. return nil
  134. }
  135. // OnStop implements cmn.Service.
  136. func (c *WSClient) OnStop() {}
  137. // Stop overrides cmn.Service#Stop. There is no other way to wait until Quit
  138. // channel is closed.
  139. func (c *WSClient) Stop() bool {
  140. success := c.BaseService.Stop()
  141. // only close user-facing channels when we can't write to them
  142. c.wg.Wait()
  143. close(c.ResponsesCh)
  144. return success
  145. }
  146. // IsReconnecting returns true if the client is reconnecting right now.
  147. func (c *WSClient) IsReconnecting() bool {
  148. c.mtx.RLock()
  149. defer c.mtx.RUnlock()
  150. return c.reconnecting
  151. }
  152. // IsActive returns true if the client is running and not reconnecting.
  153. func (c *WSClient) IsActive() bool {
  154. return c.IsRunning() && !c.IsReconnecting()
  155. }
  156. // Send the given RPC request to the server. Results will be available on
  157. // ResponsesCh, errors, if any, on ErrorsCh. Will block until send succeeds or
  158. // ctx.Done is closed.
  159. func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error {
  160. select {
  161. case c.send <- request:
  162. c.Logger.Info("sent a request", "req", request)
  163. return nil
  164. case <-ctx.Done():
  165. return ctx.Err()
  166. }
  167. }
  168. // Call the given method. See Send description.
  169. func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error {
  170. request, err := types.MapToRequest("ws-client", method, params)
  171. if err != nil {
  172. return err
  173. }
  174. return c.Send(ctx, request)
  175. }
  176. // CallWithArrayParams the given method with params in a form of array. See
  177. // Send description.
  178. func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error {
  179. request, err := types.ArrayToRequest("ws-client", method, params)
  180. if err != nil {
  181. return err
  182. }
  183. return c.Send(ctx, request)
  184. }
  185. ///////////////////////////////////////////////////////////////////////////////
  186. // Private methods
  187. func (c *WSClient) dial() error {
  188. dialer := &websocket.Dialer{
  189. NetDial: c.Dialer,
  190. Proxy: http.ProxyFromEnvironment,
  191. }
  192. rHeader := http.Header{}
  193. conn, _, err := dialer.Dial("ws://"+c.Address+c.Endpoint, rHeader)
  194. if err != nil {
  195. return err
  196. }
  197. c.conn = conn
  198. return nil
  199. }
  200. // reconnect tries to redial up to maxReconnectAttempts with exponential
  201. // backoff.
  202. func (c *WSClient) reconnect() error {
  203. attempt := 0
  204. c.mtx.Lock()
  205. c.reconnecting = true
  206. c.mtx.Unlock()
  207. defer func() {
  208. c.mtx.Lock()
  209. c.reconnecting = false
  210. c.mtx.Unlock()
  211. }()
  212. // 1s == (1e9 ns) == (1 Billion ns)
  213. billionNs := float64(time.Second.Nanoseconds())
  214. for {
  215. jitterSeconds := time.Duration(rand.Float64() * billionNs)
  216. backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second)
  217. c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration)
  218. time.Sleep(backoffDuration)
  219. err := c.dial()
  220. if err != nil {
  221. c.Logger.Error("failed to redial", "err", err)
  222. } else {
  223. c.Logger.Info("reconnected")
  224. if c.onReconnect != nil {
  225. go c.onReconnect()
  226. }
  227. return nil
  228. }
  229. attempt++
  230. if attempt > c.maxReconnectAttempts {
  231. return errors.Wrap(err, "reached maximum reconnect attempts")
  232. }
  233. }
  234. }
  235. func (c *WSClient) startReadWriteRoutines() {
  236. c.wg.Add(2)
  237. c.readRoutineQuit = make(chan struct{})
  238. go c.readRoutine()
  239. go c.writeRoutine()
  240. }
  241. func (c *WSClient) processBacklog() error {
  242. select {
  243. case request := <-c.backlog:
  244. if c.writeWait > 0 {
  245. c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
  246. }
  247. err := c.conn.WriteJSON(request)
  248. if err != nil {
  249. c.Logger.Error("failed to resend request", "err", err)
  250. c.reconnectAfter <- err
  251. // requeue request
  252. c.backlog <- request
  253. return err
  254. }
  255. c.Logger.Info("resend a request", "req", request)
  256. default:
  257. }
  258. return nil
  259. }
  260. func (c *WSClient) reconnectRoutine() {
  261. for {
  262. select {
  263. case originalError := <-c.reconnectAfter:
  264. // wait until writeRoutine and readRoutine finish
  265. c.wg.Wait()
  266. err := c.reconnect()
  267. if err != nil {
  268. c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError)
  269. c.Stop()
  270. return
  271. } else {
  272. // drain reconnectAfter
  273. LOOP:
  274. for {
  275. select {
  276. case <-c.reconnectAfter:
  277. default:
  278. break LOOP
  279. }
  280. }
  281. err = c.processBacklog()
  282. if err == nil {
  283. c.startReadWriteRoutines()
  284. }
  285. }
  286. case <-c.Quit:
  287. return
  288. }
  289. }
  290. }
  291. // The client ensures that there is at most one writer to a connection by
  292. // executing all writes from this goroutine.
  293. func (c *WSClient) writeRoutine() {
  294. var ticker *time.Ticker
  295. if c.pingPeriod > 0 {
  296. // ticker with a predefined period
  297. ticker = time.NewTicker(c.pingPeriod)
  298. } else {
  299. // ticker that never fires
  300. ticker = &time.Ticker{C: make(<-chan time.Time)}
  301. }
  302. defer func() {
  303. ticker.Stop()
  304. c.conn.Close()
  305. c.wg.Done()
  306. }()
  307. for {
  308. select {
  309. case request := <-c.send:
  310. if c.writeWait > 0 {
  311. c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
  312. }
  313. err := c.conn.WriteJSON(request)
  314. if err != nil {
  315. c.Logger.Error("failed to send request", "err", err)
  316. c.reconnectAfter <- err
  317. // add request to the backlog, so we don't lose it
  318. c.backlog <- request
  319. return
  320. }
  321. case <-ticker.C:
  322. if c.writeWait > 0 {
  323. c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
  324. }
  325. err := c.conn.WriteMessage(websocket.PingMessage, []byte{})
  326. if err != nil {
  327. c.Logger.Error("failed to write ping", "err", err)
  328. c.reconnectAfter <- err
  329. return
  330. }
  331. c.mtx.Lock()
  332. c.sentLastPingAt = time.Now()
  333. c.mtx.Unlock()
  334. c.Logger.Debug("sent ping")
  335. case <-c.readRoutineQuit:
  336. return
  337. case <-c.Quit:
  338. c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  339. return
  340. }
  341. }
  342. }
  343. // The client ensures that there is at most one reader to a connection by
  344. // executing all reads from this goroutine.
  345. func (c *WSClient) readRoutine() {
  346. defer func() {
  347. c.conn.Close()
  348. c.wg.Done()
  349. }()
  350. c.conn.SetPongHandler(func(string) error {
  351. // gather latency stats
  352. c.mtx.RLock()
  353. t := c.sentLastPingAt
  354. c.mtx.RUnlock()
  355. c.PingPongLatencyTimer.UpdateSince(t)
  356. c.Logger.Debug("got pong")
  357. return nil
  358. })
  359. for {
  360. // reset deadline for every message type (control or data)
  361. if c.readWait > 0 {
  362. c.conn.SetReadDeadline(time.Now().Add(c.readWait))
  363. }
  364. _, data, err := c.conn.ReadMessage()
  365. if err != nil {
  366. if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
  367. return
  368. }
  369. c.Logger.Error("failed to read response", "err", err)
  370. close(c.readRoutineQuit)
  371. c.reconnectAfter <- err
  372. return
  373. }
  374. var response types.RPCResponse
  375. err = json.Unmarshal(data, &response)
  376. if err != nil {
  377. c.Logger.Error("failed to parse response", "err", err, "data", string(data))
  378. continue
  379. }
  380. c.Logger.Info("got response", "resp", response.Result)
  381. // Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking
  382. // c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
  383. // both readRoutine and writeRoutine
  384. select {
  385. case <-c.Quit:
  386. case c.ResponsesCh <- response:
  387. }
  388. }
  389. }
  390. ///////////////////////////////////////////////////////////////////////////////
  391. // Predefined methods
  392. // Subscribe to an event. Note the server must have a "subscribe" route
  393. // defined.
  394. func (c *WSClient) Subscribe(ctx context.Context, eventType string) error {
  395. params := map[string]interface{}{"event": eventType}
  396. return c.Call(ctx, "subscribe", params)
  397. }
  398. // Unsubscribe from an event. Note the server must have a "unsubscribe" route
  399. // defined.
  400. func (c *WSClient) Unsubscribe(ctx context.Context, eventType string) error {
  401. params := map[string]interface{}{"event": eventType}
  402. return c.Call(ctx, "unsubscribe", params)
  403. }
  404. // UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route
  405. // defined.
  406. func (c *WSClient) UnsubscribeAll(ctx context.Context) error {
  407. params := map[string]interface{}{}
  408. return c.Call(ctx, "unsubscribe_all", params)
  409. }