You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

143 lines
3.4 KiB

  1. package rpcclient
  2. import (
  3. "encoding/json"
  4. "net"
  5. "net/http"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. "github.com/pkg/errors"
  9. cmn "github.com/tendermint/go-common"
  10. types "github.com/tendermint/go-rpc/types"
  11. )
  12. const (
  13. wsResultsChannelCapacity = 10
  14. wsErrorsChannelCapacity = 1
  15. wsWriteTimeoutSeconds = 10
  16. )
  17. type WSClient struct {
  18. cmn.BaseService
  19. Address string // IP:PORT or /path/to/socket
  20. Endpoint string // /websocket/url/endpoint
  21. Dialer func(string, string) (net.Conn, error)
  22. *websocket.Conn
  23. ResultsCh chan json.RawMessage // closes upon WSClient.Stop()
  24. ErrorsCh chan error // closes upon WSClient.Stop()
  25. }
  26. // create a new connection
  27. func NewWSClient(remoteAddr, endpoint string) *WSClient {
  28. addr, dialer := makeHTTPDialer(remoteAddr)
  29. wsClient := &WSClient{
  30. Address: addr,
  31. Dialer: dialer,
  32. Endpoint: endpoint,
  33. Conn: nil,
  34. ResultsCh: make(chan json.RawMessage, wsResultsChannelCapacity),
  35. ErrorsCh: make(chan error, wsErrorsChannelCapacity),
  36. }
  37. wsClient.BaseService = *cmn.NewBaseService(log, "WSClient", wsClient)
  38. return wsClient
  39. }
  40. func (wsc *WSClient) String() string {
  41. return wsc.Address + ", " + wsc.Endpoint
  42. }
  43. func (wsc *WSClient) OnStart() error {
  44. wsc.BaseService.OnStart()
  45. err := wsc.dial()
  46. if err != nil {
  47. return err
  48. }
  49. go wsc.receiveEventsRoutine()
  50. return nil
  51. }
  52. func (wsc *WSClient) dial() error {
  53. // Dial
  54. dialer := &websocket.Dialer{
  55. NetDial: wsc.Dialer,
  56. Proxy: http.ProxyFromEnvironment,
  57. }
  58. rHeader := http.Header{}
  59. con, _, err := dialer.Dial("ws://"+wsc.Address+wsc.Endpoint, rHeader)
  60. if err != nil {
  61. return err
  62. }
  63. // Set the ping/pong handlers
  64. con.SetPingHandler(func(m string) error {
  65. // NOTE: https://github.com/gorilla/websocket/issues/97
  66. go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  67. return nil
  68. })
  69. con.SetPongHandler(func(m string) error {
  70. // NOTE: https://github.com/gorilla/websocket/issues/97
  71. return nil
  72. })
  73. wsc.Conn = con
  74. return nil
  75. }
  76. func (wsc *WSClient) OnStop() {
  77. wsc.BaseService.OnStop()
  78. wsc.Conn.Close()
  79. // ResultsCh/ErrorsCh is closed in receiveEventsRoutine.
  80. }
  81. func (wsc *WSClient) receiveEventsRoutine() {
  82. for {
  83. _, data, err := wsc.ReadMessage()
  84. if err != nil {
  85. log.Info("WSClient failed to read message", "error", err, "data", string(data))
  86. wsc.Stop()
  87. break
  88. } else {
  89. var response types.RPCResponse
  90. err := json.Unmarshal(data, &response)
  91. if err != nil {
  92. log.Info("WSClient failed to parse message", "error", err, "data", string(data))
  93. wsc.ErrorsCh <- err
  94. continue
  95. }
  96. if response.Error != "" {
  97. wsc.ErrorsCh <- errors.Errorf(response.Error)
  98. continue
  99. }
  100. wsc.ResultsCh <- *response.Result
  101. }
  102. }
  103. // this must be modified in the same go-routine that reads from the
  104. // connection to avoid race conditions
  105. wsc.Conn = nil
  106. // Cleanup
  107. close(wsc.ResultsCh)
  108. close(wsc.ErrorsCh)
  109. }
  110. // subscribe to an event
  111. func (wsc *WSClient) Subscribe(eventid string) error {
  112. err := wsc.WriteJSON(types.RPCRequest{
  113. JSONRPC: "2.0",
  114. ID: "",
  115. Method: "subscribe",
  116. Params: map[string]interface{}{"event": eventid},
  117. })
  118. return err
  119. }
  120. // unsubscribe from an event
  121. func (wsc *WSClient) Unsubscribe(eventid string) error {
  122. err := wsc.WriteJSON(types.RPCRequest{
  123. JSONRPC: "2.0",
  124. ID: "",
  125. Method: "unsubscribe",
  126. Params: map[string]interface{}{"event": eventid},
  127. })
  128. return err
  129. }