You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

165 lines
4.1 KiB

  1. package rpcclient
  2. import (
  3. "encoding/json"
  4. "net"
  5. "net/http"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. "github.com/pkg/errors"
  9. cmn "github.com/tendermint/go-common"
  10. types "github.com/tendermint/go-rpc/types"
  11. wire "github.com/tendermint/go-wire"
  12. )
  13. const (
  14. wsResultsChannelCapacity = 10
  15. wsErrorsChannelCapacity = 1
  16. wsWriteTimeoutSeconds = 10
  17. )
  18. type WSClient struct {
  19. cmn.BaseService
  20. Address string // IP:PORT or /path/to/socket
  21. Endpoint string // /websocket/url/endpoint
  22. Dialer func(string, string) (net.Conn, error)
  23. *websocket.Conn
  24. ResultsCh chan json.RawMessage // closes upon WSClient.Stop()
  25. ErrorsCh chan error // closes upon WSClient.Stop()
  26. }
  27. // create a new connection
  28. func NewWSClient(remoteAddr, endpoint string) *WSClient {
  29. addr, dialer := makeHTTPDialer(remoteAddr)
  30. wsClient := &WSClient{
  31. Address: addr,
  32. Dialer: dialer,
  33. Endpoint: endpoint,
  34. Conn: nil,
  35. ResultsCh: make(chan json.RawMessage, wsResultsChannelCapacity),
  36. ErrorsCh: make(chan error, wsErrorsChannelCapacity),
  37. }
  38. wsClient.BaseService = *cmn.NewBaseService(log, "WSClient", wsClient)
  39. return wsClient
  40. }
  41. func (wsc *WSClient) String() string {
  42. return wsc.Address + ", " + wsc.Endpoint
  43. }
  44. func (wsc *WSClient) OnStart() error {
  45. wsc.BaseService.OnStart()
  46. err := wsc.dial()
  47. if err != nil {
  48. return err
  49. }
  50. go wsc.receiveEventsRoutine()
  51. return nil
  52. }
  53. func (wsc *WSClient) dial() error {
  54. // Dial
  55. dialer := &websocket.Dialer{
  56. NetDial: wsc.Dialer,
  57. Proxy: http.ProxyFromEnvironment,
  58. }
  59. rHeader := http.Header{}
  60. con, _, err := dialer.Dial("ws://"+wsc.Address+wsc.Endpoint, rHeader)
  61. if err != nil {
  62. return err
  63. }
  64. // Set the ping/pong handlers
  65. con.SetPingHandler(func(m string) error {
  66. // NOTE: https://github.com/gorilla/websocket/issues/97
  67. go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  68. return nil
  69. })
  70. con.SetPongHandler(func(m string) error {
  71. // NOTE: https://github.com/gorilla/websocket/issues/97
  72. return nil
  73. })
  74. wsc.Conn = con
  75. return nil
  76. }
  77. func (wsc *WSClient) OnStop() {
  78. wsc.BaseService.OnStop()
  79. wsc.Conn.Close()
  80. // ResultsCh/ErrorsCh is closed in receiveEventsRoutine.
  81. }
  82. func (wsc *WSClient) receiveEventsRoutine() {
  83. for {
  84. _, data, err := wsc.ReadMessage()
  85. if err != nil {
  86. log.Info("WSClient failed to read message", "error", err, "data", string(data))
  87. wsc.Stop()
  88. break
  89. } else {
  90. var response types.RPCResponse
  91. err := json.Unmarshal(data, &response)
  92. if err != nil {
  93. log.Info("WSClient failed to parse message", "error", err, "data", string(data))
  94. wsc.ErrorsCh <- err
  95. continue
  96. }
  97. if response.Error != "" {
  98. wsc.ErrorsCh <- errors.Errorf(response.Error)
  99. continue
  100. }
  101. wsc.ResultsCh <- *response.Result
  102. }
  103. }
  104. // this must be modified in the same go-routine that reads from the
  105. // connection to avoid race conditions
  106. wsc.Conn = nil
  107. // Cleanup
  108. close(wsc.ResultsCh)
  109. close(wsc.ErrorsCh)
  110. }
  111. // Subscribe to an event. Note the server must have a "subscribe" route
  112. // defined.
  113. func (wsc *WSClient) Subscribe(eventid string) error {
  114. err := wsc.WriteJSON(types.RPCRequest{
  115. JSONRPC: "2.0",
  116. ID: "",
  117. Method: "subscribe",
  118. Params: map[string]interface{}{"event": eventid},
  119. })
  120. return err
  121. }
  122. // Unsubscribe from an event. Note the server must have a "unsubscribe" route
  123. // defined.
  124. func (wsc *WSClient) Unsubscribe(eventid string) error {
  125. err := wsc.WriteJSON(types.RPCRequest{
  126. JSONRPC: "2.0",
  127. ID: "",
  128. Method: "unsubscribe",
  129. Params: map[string]interface{}{"event": eventid},
  130. })
  131. return err
  132. }
  133. // Call asynchronously calls a given method by sending an RPCRequest to the
  134. // server. Results will be available on ResultsCh, errors, if any, on ErrorsCh.
  135. func (wsc *WSClient) Call(method string, params map[string]interface{}) error {
  136. // we need this step because we attempt to decode values using `go-wire`
  137. // (handlers.go:470) on the server side
  138. encodedParams := make(map[string]interface{})
  139. for k, v := range params {
  140. bytes := json.RawMessage(wire.JSONBytes(v))
  141. encodedParams[k] = &bytes
  142. }
  143. err := wsc.WriteJSON(types.RPCRequest{
  144. JSONRPC: "2.0",
  145. Method: method,
  146. Params: encodedParams,
  147. ID: "",
  148. })
  149. return err
  150. }