You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
4.3 KiB

  1. package rpcclient
  2. import (
  3. "encoding/json"
  4. "net"
  5. "net/http"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. "github.com/pkg/errors"
  9. cmn "github.com/tendermint/go-common"
  10. types "github.com/tendermint/go-rpc/types"
  11. wire "github.com/tendermint/go-wire"
  12. )
  13. const (
  14. wsResultsChannelCapacity = 10
  15. wsErrorsChannelCapacity = 1
  16. wsWriteTimeoutSeconds = 10
  17. )
  18. type WSClient struct {
  19. cmn.BaseService
  20. Address string // IP:PORT or /path/to/socket
  21. Endpoint string // /websocket/url/endpoint
  22. Dialer func(string, string) (net.Conn, error)
  23. *websocket.Conn
  24. ResultsCh chan json.RawMessage // closes upon WSClient.Stop()
  25. ErrorsCh chan error // closes upon WSClient.Stop()
  26. }
  27. // create a new connection
  28. func NewWSClient(remoteAddr, endpoint string) *WSClient {
  29. addr, dialer := makeHTTPDialer(remoteAddr)
  30. wsClient := &WSClient{
  31. Address: addr,
  32. Dialer: dialer,
  33. Endpoint: endpoint,
  34. Conn: nil,
  35. }
  36. wsClient.BaseService = *cmn.NewBaseService(log, "WSClient", wsClient)
  37. return wsClient
  38. }
  39. func (wsc *WSClient) String() string {
  40. return wsc.Address + ", " + wsc.Endpoint
  41. }
  42. // OnStart implements cmn.BaseService interface
  43. func (wsc *WSClient) OnStart() error {
  44. wsc.BaseService.OnStart()
  45. err := wsc.dial()
  46. if err != nil {
  47. return err
  48. }
  49. wsc.ResultsCh = make(chan json.RawMessage, wsResultsChannelCapacity)
  50. wsc.ErrorsCh = make(chan error, wsErrorsChannelCapacity)
  51. go wsc.receiveEventsRoutine()
  52. return nil
  53. }
  54. // OnReset implements cmn.BaseService interface
  55. func (wsc *WSClient) OnReset() error {
  56. return nil
  57. }
  58. func (wsc *WSClient) dial() error {
  59. // Dial
  60. dialer := &websocket.Dialer{
  61. NetDial: wsc.Dialer,
  62. Proxy: http.ProxyFromEnvironment,
  63. }
  64. rHeader := http.Header{}
  65. con, _, err := dialer.Dial("ws://"+wsc.Address+wsc.Endpoint, rHeader)
  66. if err != nil {
  67. return err
  68. }
  69. // Set the ping/pong handlers
  70. con.SetPingHandler(func(m string) error {
  71. // NOTE: https://github.com/gorilla/websocket/issues/97
  72. go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  73. return nil
  74. })
  75. con.SetPongHandler(func(m string) error {
  76. // NOTE: https://github.com/gorilla/websocket/issues/97
  77. return nil
  78. })
  79. wsc.Conn = con
  80. return nil
  81. }
  82. // OnStop implements cmn.BaseService interface
  83. func (wsc *WSClient) OnStop() {
  84. wsc.BaseService.OnStop()
  85. wsc.Conn.Close()
  86. // ResultsCh/ErrorsCh is closed in receiveEventsRoutine.
  87. }
  88. func (wsc *WSClient) receiveEventsRoutine() {
  89. for {
  90. _, data, err := wsc.ReadMessage()
  91. if err != nil {
  92. log.Info("WSClient failed to read message", "error", err, "data", string(data))
  93. wsc.Stop()
  94. break
  95. } else {
  96. var response types.RPCResponse
  97. err := json.Unmarshal(data, &response)
  98. if err != nil {
  99. log.Info("WSClient failed to parse message", "error", err, "data", string(data))
  100. wsc.ErrorsCh <- err
  101. continue
  102. }
  103. if response.Error != "" {
  104. wsc.ErrorsCh <- errors.Errorf(response.Error)
  105. continue
  106. }
  107. wsc.ResultsCh <- *response.Result
  108. }
  109. }
  110. // this must be modified in the same go-routine that reads from the
  111. // connection to avoid race conditions
  112. wsc.Conn = nil
  113. // Cleanup
  114. close(wsc.ResultsCh)
  115. close(wsc.ErrorsCh)
  116. }
  117. // Subscribe to an event. Note the server must have a "subscribe" route
  118. // defined.
  119. func (wsc *WSClient) Subscribe(eventid string) error {
  120. err := wsc.WriteJSON(types.RPCRequest{
  121. JSONRPC: "2.0",
  122. ID: "",
  123. Method: "subscribe",
  124. Params: map[string]interface{}{"event": eventid},
  125. })
  126. return err
  127. }
  128. // Unsubscribe from an event. Note the server must have a "unsubscribe" route
  129. // defined.
  130. func (wsc *WSClient) Unsubscribe(eventid string) error {
  131. err := wsc.WriteJSON(types.RPCRequest{
  132. JSONRPC: "2.0",
  133. ID: "",
  134. Method: "unsubscribe",
  135. Params: map[string]interface{}{"event": eventid},
  136. })
  137. return err
  138. }
  139. // Call asynchronously calls a given method by sending an RPCRequest to the
  140. // server. Results will be available on ResultsCh, errors, if any, on ErrorsCh.
  141. func (wsc *WSClient) Call(method string, params map[string]interface{}) error {
  142. // we need this step because we attempt to decode values using `go-wire`
  143. // (handlers.go:470) on the server side
  144. encodedParams := make(map[string]interface{})
  145. for k, v := range params {
  146. bytes := json.RawMessage(wire.JSONBytes(v))
  147. encodedParams[k] = &bytes
  148. }
  149. err := wsc.WriteJSON(types.RPCRequest{
  150. JSONRPC: "2.0",
  151. Method: method,
  152. Params: encodedParams,
  153. ID: "",
  154. })
  155. return err
  156. }