You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

160 lines
4.0 KiB

  1. package rpcclient
  2. import (
  3. "encoding/json"
  4. "net"
  5. "net/http"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. "github.com/pkg/errors"
  9. types "github.com/tendermint/tendermint/rpc/lib/types"
  10. cmn "github.com/tendermint/tmlibs/common"
  11. )
  12. const (
  13. wsResultsChannelCapacity = 10
  14. wsErrorsChannelCapacity = 1
  15. wsWriteTimeoutSeconds = 10
  16. )
  17. type WSClient struct {
  18. cmn.BaseService
  19. Address string // IP:PORT or /path/to/socket
  20. Endpoint string // /websocket/url/endpoint
  21. Dialer func(string, string) (net.Conn, error)
  22. *websocket.Conn
  23. ResultsCh chan json.RawMessage // closes upon WSClient.Stop()
  24. ErrorsCh chan error // closes upon WSClient.Stop()
  25. }
  26. // create a new connection
  27. func NewWSClient(remoteAddr, endpoint string) *WSClient {
  28. addr, dialer := makeHTTPDialer(remoteAddr)
  29. wsClient := &WSClient{
  30. Address: addr,
  31. Dialer: dialer,
  32. Endpoint: endpoint,
  33. Conn: nil,
  34. }
  35. wsClient.BaseService = *cmn.NewBaseService(log, "WSClient", wsClient)
  36. return wsClient
  37. }
  38. func (wsc *WSClient) String() string {
  39. return wsc.Address + ", " + wsc.Endpoint
  40. }
  41. // OnStart implements cmn.BaseService interface
  42. func (wsc *WSClient) OnStart() error {
  43. wsc.BaseService.OnStart()
  44. err := wsc.dial()
  45. if err != nil {
  46. return err
  47. }
  48. wsc.ResultsCh = make(chan json.RawMessage, wsResultsChannelCapacity)
  49. wsc.ErrorsCh = make(chan error, wsErrorsChannelCapacity)
  50. go wsc.receiveEventsRoutine()
  51. return nil
  52. }
  53. // OnReset implements cmn.BaseService interface
  54. func (wsc *WSClient) OnReset() error {
  55. return nil
  56. }
  57. func (wsc *WSClient) dial() error {
  58. // Dial
  59. dialer := &websocket.Dialer{
  60. NetDial: wsc.Dialer,
  61. Proxy: http.ProxyFromEnvironment,
  62. }
  63. rHeader := http.Header{}
  64. con, _, err := dialer.Dial("ws://"+wsc.Address+wsc.Endpoint, rHeader)
  65. if err != nil {
  66. return err
  67. }
  68. // Set the ping/pong handlers
  69. con.SetPingHandler(func(m string) error {
  70. // NOTE: https://github.com/gorilla/websocket/issues/97
  71. go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  72. return nil
  73. })
  74. con.SetPongHandler(func(m string) error {
  75. // NOTE: https://github.com/gorilla/websocket/issues/97
  76. return nil
  77. })
  78. wsc.Conn = con
  79. return nil
  80. }
  81. // OnStop implements cmn.BaseService interface
  82. func (wsc *WSClient) OnStop() {
  83. wsc.BaseService.OnStop()
  84. wsc.Conn.Close()
  85. // ResultsCh/ErrorsCh is closed in receiveEventsRoutine.
  86. }
  87. func (wsc *WSClient) receiveEventsRoutine() {
  88. for {
  89. _, data, err := wsc.ReadMessage()
  90. if err != nil {
  91. log.Info("WSClient failed to read message", "error", err, "data", string(data))
  92. wsc.Stop()
  93. break
  94. } else {
  95. var response types.RPCResponse
  96. err := json.Unmarshal(data, &response)
  97. if err != nil {
  98. log.Info("WSClient failed to parse message", "error", err, "data", string(data))
  99. wsc.ErrorsCh <- err
  100. continue
  101. }
  102. if response.Error != "" {
  103. wsc.ErrorsCh <- errors.Errorf(response.Error)
  104. continue
  105. }
  106. wsc.ResultsCh <- *response.Result
  107. }
  108. }
  109. // this must be modified in the same go-routine that reads from the
  110. // connection to avoid race conditions
  111. wsc.Conn = nil
  112. // Cleanup
  113. close(wsc.ResultsCh)
  114. close(wsc.ErrorsCh)
  115. }
  116. // Subscribe to an event. Note the server must have a "subscribe" route
  117. // defined.
  118. func (wsc *WSClient) Subscribe(eventid string) error {
  119. params := map[string]interface{}{"event": eventid}
  120. request, err := types.MapToRequest("", "subscribe", params)
  121. if err == nil {
  122. err = wsc.WriteJSON(request)
  123. }
  124. return err
  125. }
  126. // Unsubscribe from an event. Note the server must have a "unsubscribe" route
  127. // defined.
  128. func (wsc *WSClient) Unsubscribe(eventid string) error {
  129. params := map[string]interface{}{"event": eventid}
  130. request, err := types.MapToRequest("", "unsubscribe", params)
  131. if err == nil {
  132. err = wsc.WriteJSON(request)
  133. }
  134. return err
  135. }
  136. // Call asynchronously calls a given method by sending an RPCRequest to the
  137. // server. Results will be available on ResultsCh, errors, if any, on ErrorsCh.
  138. func (wsc *WSClient) Call(method string, params map[string]interface{}) error {
  139. request, err := types.MapToRequest("", method, params)
  140. if err == nil {
  141. err = wsc.WriteJSON(request)
  142. }
  143. return err
  144. }