You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

139 lines
3.2 KiB

  1. package rpcclient
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net"
  6. "net/http"
  7. "time"
  8. "github.com/gorilla/websocket"
  9. cmn "github.com/tendermint/go-common"
  10. types "github.com/tendermint/go-rpc/types"
  11. )
  12. const (
  13. wsResultsChannelCapacity = 10
  14. wsErrorsChannelCapacity = 1
  15. wsWriteTimeoutSeconds = 10
  16. )
  17. type WSClient struct {
  18. cmn.BaseService
  19. Address string // IP:PORT or /path/to/socket
  20. Endpoint string // /websocket/url/endpoint
  21. Dialer func(string, string) (net.Conn, error)
  22. *websocket.Conn
  23. ResultsCh chan json.RawMessage // closes upon WSClient.Stop()
  24. ErrorsCh chan error // closes upon WSClient.Stop()
  25. }
  26. // create a new connection
  27. func NewWSClient(remoteAddr, endpoint string) *WSClient {
  28. addr, dialer := makeHTTPDialer(remoteAddr)
  29. wsClient := &WSClient{
  30. Address: addr,
  31. Dialer: dialer,
  32. Endpoint: endpoint,
  33. Conn: nil,
  34. ResultsCh: make(chan json.RawMessage, wsResultsChannelCapacity),
  35. ErrorsCh: make(chan error, wsErrorsChannelCapacity),
  36. }
  37. wsClient.BaseService = *cmn.NewBaseService(log, "WSClient", wsClient)
  38. return wsClient
  39. }
  40. func (wsc *WSClient) String() string {
  41. return wsc.Address + ", " + wsc.Endpoint
  42. }
  43. func (wsc *WSClient) OnStart() error {
  44. wsc.BaseService.OnStart()
  45. err := wsc.dial()
  46. if err != nil {
  47. return err
  48. }
  49. go wsc.receiveEventsRoutine()
  50. return nil
  51. }
  52. func (wsc *WSClient) dial() error {
  53. // Dial
  54. dialer := &websocket.Dialer{
  55. NetDial: wsc.Dialer,
  56. Proxy: http.ProxyFromEnvironment,
  57. }
  58. rHeader := http.Header{}
  59. con, _, err := dialer.Dial("ws://"+wsc.Address+wsc.Endpoint, rHeader)
  60. if err != nil {
  61. return err
  62. }
  63. // Set the ping/pong handlers
  64. con.SetPingHandler(func(m string) error {
  65. // NOTE: https://github.com/gorilla/websocket/issues/97
  66. go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  67. return nil
  68. })
  69. con.SetPongHandler(func(m string) error {
  70. // NOTE: https://github.com/gorilla/websocket/issues/97
  71. return nil
  72. })
  73. wsc.Conn = con
  74. return nil
  75. }
  76. func (wsc *WSClient) OnStop() {
  77. wsc.BaseService.OnStop()
  78. // ResultsCh/ErrorsCh is closed in receiveEventsRoutine.
  79. }
  80. func (wsc *WSClient) receiveEventsRoutine() {
  81. for {
  82. _, data, err := wsc.ReadMessage()
  83. if err != nil {
  84. log.Info("WSClient failed to read message", "error", err, "data", string(data))
  85. wsc.Stop()
  86. break
  87. } else {
  88. var response types.RPCResponse
  89. err := json.Unmarshal(data, &response)
  90. if err != nil {
  91. log.Info("WSClient failed to parse message", "error", err, "data", string(data))
  92. wsc.ErrorsCh <- err
  93. continue
  94. }
  95. if response.Error != "" {
  96. wsc.ErrorsCh <- fmt.Errorf(response.Error)
  97. continue
  98. }
  99. wsc.ResultsCh <- *response.Result
  100. }
  101. }
  102. // Cleanup
  103. close(wsc.ResultsCh)
  104. close(wsc.ErrorsCh)
  105. }
  106. // subscribe to an event
  107. func (wsc *WSClient) Subscribe(eventid string) error {
  108. err := wsc.WriteJSON(types.RPCRequest{
  109. JSONRPC: "2.0",
  110. ID: "",
  111. Method: "subscribe",
  112. Params: map[string]interface{}{"event": eventid},
  113. })
  114. return err
  115. }
  116. // unsubscribe from an event
  117. func (wsc *WSClient) Unsubscribe(eventid string) error {
  118. err := wsc.WriteJSON(types.RPCRequest{
  119. JSONRPC: "2.0",
  120. ID: "",
  121. Method: "unsubscribe",
  122. Params: map[string]interface{}{"event": eventid},
  123. })
  124. return err
  125. }