You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

506 lines
14 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
8 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
8 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
8 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
8 years ago
  1. package rpcclient
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net"
  7. "net/http"
  8. "sync"
  9. "time"
  10. "github.com/gorilla/websocket"
  11. "github.com/pkg/errors"
  12. metrics "github.com/rcrowley/go-metrics"
  13. amino "github.com/tendermint/go-amino"
  14. cmn "github.com/tendermint/tendermint/libs/common"
  15. types "github.com/tendermint/tendermint/rpc/lib/types"
  16. )
  17. const (
  18. defaultMaxReconnectAttempts = 25
  19. defaultWriteWait = 0
  20. defaultReadWait = 0
  21. defaultPingPeriod = 0
  22. )
  23. // WSClient is a WebSocket client. The methods of WSClient are safe for use by
  24. // multiple goroutines.
  25. type WSClient struct {
  26. cmn.BaseService
  27. conn *websocket.Conn
  28. cdc *amino.Codec
  29. Address string // IP:PORT or /path/to/socket
  30. Endpoint string // /websocket/url/endpoint
  31. Dialer func(string, string) (net.Conn, error)
  32. // Time between sending a ping and receiving a pong. See
  33. // https://godoc.org/github.com/rcrowley/go-metrics#Timer.
  34. PingPongLatencyTimer metrics.Timer
  35. // Single user facing channel to read RPCResponses from, closed only when the client is being stopped.
  36. ResponsesCh chan types.RPCResponse
  37. // Callback, which will be called each time after successful reconnect.
  38. onReconnect func()
  39. // internal channels
  40. send chan types.RPCRequest // user requests
  41. backlog chan types.RPCRequest // stores a single user request received during a conn failure
  42. reconnectAfter chan error // reconnect requests
  43. readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
  44. wg sync.WaitGroup
  45. mtx sync.RWMutex
  46. sentLastPingAt time.Time
  47. reconnecting bool
  48. // Maximum reconnect attempts (0 or greater; default: 25).
  49. maxReconnectAttempts int
  50. // Time allowed to write a message to the server. 0 means block until operation succeeds.
  51. writeWait time.Duration
  52. // Time allowed to read the next message from the server. 0 means block until operation succeeds.
  53. readWait time.Duration
  54. // Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent.
  55. pingPeriod time.Duration
  56. // Support both ws and wss protocols
  57. protocol string
  58. }
  59. // NewWSClient returns a new client. See the commentary on the func(*WSClient)
  60. // functions for a detailed description of how to configure ping period and
  61. // pong wait time. The endpoint argument must begin with a `/`.
  62. // The function panics if the provided address is invalid.
  63. func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient {
  64. protocol, addr, err := toClientAddrAndParse(remoteAddr)
  65. if err != nil {
  66. panic(fmt.Sprintf("invalid remote %s: %s", remoteAddr, err))
  67. }
  68. // default to ws protocol, unless wss is explicitly specified
  69. if protocol != "wss" {
  70. protocol = "ws"
  71. }
  72. c := &WSClient{
  73. cdc: amino.NewCodec(),
  74. Address: addr,
  75. Dialer: makeHTTPDialer(remoteAddr),
  76. Endpoint: endpoint,
  77. PingPongLatencyTimer: metrics.NewTimer(),
  78. maxReconnectAttempts: defaultMaxReconnectAttempts,
  79. readWait: defaultReadWait,
  80. writeWait: defaultWriteWait,
  81. pingPeriod: defaultPingPeriod,
  82. protocol: protocol,
  83. }
  84. c.BaseService = *cmn.NewBaseService(nil, "WSClient", c)
  85. for _, option := range options {
  86. option(c)
  87. }
  88. return c
  89. }
  90. // MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error.
  91. // It should only be used in the constructor and is not Goroutine-safe.
  92. func MaxReconnectAttempts(max int) func(*WSClient) {
  93. return func(c *WSClient) {
  94. c.maxReconnectAttempts = max
  95. }
  96. }
  97. // ReadWait sets the amount of time to wait before a websocket read times out.
  98. // It should only be used in the constructor and is not Goroutine-safe.
  99. func ReadWait(readWait time.Duration) func(*WSClient) {
  100. return func(c *WSClient) {
  101. c.readWait = readWait
  102. }
  103. }
  104. // WriteWait sets the amount of time to wait before a websocket write times out.
  105. // It should only be used in the constructor and is not Goroutine-safe.
  106. func WriteWait(writeWait time.Duration) func(*WSClient) {
  107. return func(c *WSClient) {
  108. c.writeWait = writeWait
  109. }
  110. }
  111. // PingPeriod sets the duration for sending websocket pings.
  112. // It should only be used in the constructor - not Goroutine-safe.
  113. func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
  114. return func(c *WSClient) {
  115. c.pingPeriod = pingPeriod
  116. }
  117. }
  118. // OnReconnect sets the callback, which will be called every time after
  119. // successful reconnect.
  120. func OnReconnect(cb func()) func(*WSClient) {
  121. return func(c *WSClient) {
  122. c.onReconnect = cb
  123. }
  124. }
  125. // String returns WS client full address.
  126. func (c *WSClient) String() string {
  127. return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint)
  128. }
  129. // OnStart implements cmn.Service by dialing a server and creating read and
  130. // write routines.
  131. func (c *WSClient) OnStart() error {
  132. err := c.dial()
  133. if err != nil {
  134. return err
  135. }
  136. c.ResponsesCh = make(chan types.RPCResponse)
  137. c.send = make(chan types.RPCRequest)
  138. // 1 additional error may come from the read/write
  139. // goroutine depending on which failed first.
  140. c.reconnectAfter = make(chan error, 1)
  141. // capacity for 1 request. a user won't be able to send more because the send
  142. // channel is unbuffered.
  143. c.backlog = make(chan types.RPCRequest, 1)
  144. c.startReadWriteRoutines()
  145. go c.reconnectRoutine()
  146. return nil
  147. }
  148. // Stop overrides cmn.Service#Stop. There is no other way to wait until Quit
  149. // channel is closed.
  150. func (c *WSClient) Stop() error {
  151. if err := c.BaseService.Stop(); err != nil {
  152. return err
  153. }
  154. // only close user-facing channels when we can't write to them
  155. c.wg.Wait()
  156. close(c.ResponsesCh)
  157. return nil
  158. }
  159. // IsReconnecting returns true if the client is reconnecting right now.
  160. func (c *WSClient) IsReconnecting() bool {
  161. c.mtx.RLock()
  162. defer c.mtx.RUnlock()
  163. return c.reconnecting
  164. }
  165. // IsActive returns true if the client is running and not reconnecting.
  166. func (c *WSClient) IsActive() bool {
  167. return c.IsRunning() && !c.IsReconnecting()
  168. }
  169. // Send the given RPC request to the server. Results will be available on
  170. // ResponsesCh, errors, if any, on ErrorsCh. Will block until send succeeds or
  171. // ctx.Done is closed.
  172. func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error {
  173. select {
  174. case c.send <- request:
  175. c.Logger.Info("sent a request", "req", request)
  176. return nil
  177. case <-ctx.Done():
  178. return ctx.Err()
  179. }
  180. }
  181. // Call the given method. See Send description.
  182. func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error {
  183. request, err := types.MapToRequest(c.cdc, types.JSONRPCStringID("ws-client"), method, params)
  184. if err != nil {
  185. return err
  186. }
  187. return c.Send(ctx, request)
  188. }
  189. // CallWithArrayParams the given method with params in a form of array. See
  190. // Send description.
  191. func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error {
  192. request, err := types.ArrayToRequest(c.cdc, types.JSONRPCStringID("ws-client"), method, params)
  193. if err != nil {
  194. return err
  195. }
  196. return c.Send(ctx, request)
  197. }
  198. func (c *WSClient) Codec() *amino.Codec {
  199. return c.cdc
  200. }
  201. func (c *WSClient) SetCodec(cdc *amino.Codec) {
  202. c.cdc = cdc
  203. }
  204. ///////////////////////////////////////////////////////////////////////////////
  205. // Private methods
  206. func (c *WSClient) dial() error {
  207. dialer := &websocket.Dialer{
  208. NetDial: c.Dialer,
  209. Proxy: http.ProxyFromEnvironment,
  210. }
  211. rHeader := http.Header{}
  212. conn, _, err := dialer.Dial(c.protocol+"://"+c.Address+c.Endpoint, rHeader)
  213. if err != nil {
  214. return err
  215. }
  216. c.conn = conn
  217. return nil
  218. }
  219. // reconnect tries to redial up to maxReconnectAttempts with exponential
  220. // backoff.
  221. func (c *WSClient) reconnect() error {
  222. attempt := 0
  223. c.mtx.Lock()
  224. c.reconnecting = true
  225. c.mtx.Unlock()
  226. defer func() {
  227. c.mtx.Lock()
  228. c.reconnecting = false
  229. c.mtx.Unlock()
  230. }()
  231. for {
  232. jitterSeconds := time.Duration(cmn.RandFloat64() * float64(time.Second)) // 1s == (1e9 ns)
  233. backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second)
  234. c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration)
  235. time.Sleep(backoffDuration)
  236. err := c.dial()
  237. if err != nil {
  238. c.Logger.Error("failed to redial", "err", err)
  239. } else {
  240. c.Logger.Info("reconnected")
  241. if c.onReconnect != nil {
  242. go c.onReconnect()
  243. }
  244. return nil
  245. }
  246. attempt++
  247. if attempt > c.maxReconnectAttempts {
  248. return errors.Wrap(err, "reached maximum reconnect attempts")
  249. }
  250. }
  251. }
  252. func (c *WSClient) startReadWriteRoutines() {
  253. c.wg.Add(2)
  254. c.readRoutineQuit = make(chan struct{})
  255. go c.readRoutine()
  256. go c.writeRoutine()
  257. }
  258. func (c *WSClient) processBacklog() error {
  259. select {
  260. case request := <-c.backlog:
  261. if c.writeWait > 0 {
  262. if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
  263. c.Logger.Error("failed to set write deadline", "err", err)
  264. }
  265. }
  266. if err := c.conn.WriteJSON(request); err != nil {
  267. c.Logger.Error("failed to resend request", "err", err)
  268. c.reconnectAfter <- err
  269. // requeue request
  270. c.backlog <- request
  271. return err
  272. }
  273. c.Logger.Info("resend a request", "req", request)
  274. default:
  275. }
  276. return nil
  277. }
  278. func (c *WSClient) reconnectRoutine() {
  279. for {
  280. select {
  281. case originalError := <-c.reconnectAfter:
  282. // wait until writeRoutine and readRoutine finish
  283. c.wg.Wait()
  284. if err := c.reconnect(); err != nil {
  285. c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError)
  286. c.Stop()
  287. return
  288. }
  289. // drain reconnectAfter
  290. LOOP:
  291. for {
  292. select {
  293. case <-c.reconnectAfter:
  294. default:
  295. break LOOP
  296. }
  297. }
  298. err := c.processBacklog()
  299. if err == nil {
  300. c.startReadWriteRoutines()
  301. }
  302. case <-c.Quit():
  303. return
  304. }
  305. }
  306. }
  307. // The client ensures that there is at most one writer to a connection by
  308. // executing all writes from this goroutine.
  309. func (c *WSClient) writeRoutine() {
  310. var ticker *time.Ticker
  311. if c.pingPeriod > 0 {
  312. // ticker with a predefined period
  313. ticker = time.NewTicker(c.pingPeriod)
  314. } else {
  315. // ticker that never fires
  316. ticker = &time.Ticker{C: make(<-chan time.Time)}
  317. }
  318. defer func() {
  319. ticker.Stop()
  320. c.conn.Close()
  321. // err != nil {
  322. // ignore error; it will trigger in tests
  323. // likely because it's closing an already closed connection
  324. // }
  325. c.wg.Done()
  326. }()
  327. for {
  328. select {
  329. case request := <-c.send:
  330. if c.writeWait > 0 {
  331. if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
  332. c.Logger.Error("failed to set write deadline", "err", err)
  333. }
  334. }
  335. if err := c.conn.WriteJSON(request); err != nil {
  336. c.Logger.Error("failed to send request", "err", err)
  337. c.reconnectAfter <- err
  338. // add request to the backlog, so we don't lose it
  339. c.backlog <- request
  340. return
  341. }
  342. case <-ticker.C:
  343. if c.writeWait > 0 {
  344. if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
  345. c.Logger.Error("failed to set write deadline", "err", err)
  346. }
  347. }
  348. if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  349. c.Logger.Error("failed to write ping", "err", err)
  350. c.reconnectAfter <- err
  351. return
  352. }
  353. c.mtx.Lock()
  354. c.sentLastPingAt = time.Now()
  355. c.mtx.Unlock()
  356. c.Logger.Debug("sent ping")
  357. case <-c.readRoutineQuit:
  358. return
  359. case <-c.Quit():
  360. if err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
  361. c.Logger.Error("failed to write message", "err", err)
  362. }
  363. return
  364. }
  365. }
  366. }
  367. // The client ensures that there is at most one reader to a connection by
  368. // executing all reads from this goroutine.
  369. func (c *WSClient) readRoutine() {
  370. defer func() {
  371. c.conn.Close()
  372. // err != nil {
  373. // ignore error; it will trigger in tests
  374. // likely because it's closing an already closed connection
  375. // }
  376. c.wg.Done()
  377. }()
  378. c.conn.SetPongHandler(func(string) error {
  379. // gather latency stats
  380. c.mtx.RLock()
  381. t := c.sentLastPingAt
  382. c.mtx.RUnlock()
  383. c.PingPongLatencyTimer.UpdateSince(t)
  384. c.Logger.Debug("got pong")
  385. return nil
  386. })
  387. for {
  388. // reset deadline for every message type (control or data)
  389. if c.readWait > 0 {
  390. if err := c.conn.SetReadDeadline(time.Now().Add(c.readWait)); err != nil {
  391. c.Logger.Error("failed to set read deadline", "err", err)
  392. }
  393. }
  394. _, data, err := c.conn.ReadMessage()
  395. if err != nil {
  396. if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
  397. return
  398. }
  399. c.Logger.Error("failed to read response", "err", err)
  400. close(c.readRoutineQuit)
  401. c.reconnectAfter <- err
  402. return
  403. }
  404. var response types.RPCResponse
  405. err = json.Unmarshal(data, &response)
  406. if err != nil {
  407. c.Logger.Error("failed to parse response", "err", err, "data", string(data))
  408. continue
  409. }
  410. c.Logger.Info("got response", "resp", response.Result)
  411. // Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking
  412. // c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
  413. // both readRoutine and writeRoutine
  414. select {
  415. case <-c.Quit():
  416. case c.ResponsesCh <- response:
  417. }
  418. }
  419. }
  420. ///////////////////////////////////////////////////////////////////////////////
  421. // Predefined methods
  422. // Subscribe to a query. Note the server must have a "subscribe" route
  423. // defined.
  424. func (c *WSClient) Subscribe(ctx context.Context, query string) error {
  425. params := map[string]interface{}{"query": query}
  426. return c.Call(ctx, "subscribe", params)
  427. }
  428. // Unsubscribe from a query. Note the server must have a "unsubscribe" route
  429. // defined.
  430. func (c *WSClient) Unsubscribe(ctx context.Context, query string) error {
  431. params := map[string]interface{}{"query": query}
  432. return c.Call(ctx, "unsubscribe", params)
  433. }
  434. // UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route
  435. // defined.
  436. func (c *WSClient) UnsubscribeAll(ctx context.Context) error {
  437. params := map[string]interface{}{}
  438. return c.Call(ctx, "unsubscribe_all", params)
  439. }