You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

129 lines
2.8 KiB

  1. package rpcclient
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. . "github.com/tendermint/go-common"
  9. "github.com/tendermint/go-rpc/types"
  10. )
  11. const (
  12. wsResultsChannelCapacity = 10
  13. wsErrorsChannelCapacity = 1
  14. wsWriteTimeoutSeconds = 10
  15. )
  16. type WSClient struct {
  17. QuitService
  18. Address string
  19. *websocket.Conn
  20. ResultsCh chan json.RawMessage // closes upon WSClient.Stop()
  21. ErrorsCh chan error // closes upon WSClient.Stop()
  22. }
  23. // create a new connection
  24. func NewWSClient(addr string) *WSClient {
  25. wsClient := &WSClient{
  26. Address: addr,
  27. Conn: nil,
  28. ResultsCh: make(chan json.RawMessage, wsResultsChannelCapacity),
  29. ErrorsCh: make(chan error, wsErrorsChannelCapacity),
  30. }
  31. wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient)
  32. return wsClient
  33. }
  34. func (wsc *WSClient) String() string {
  35. return wsc.Address
  36. }
  37. func (wsc *WSClient) OnStart() error {
  38. wsc.QuitService.OnStart()
  39. err := wsc.dial()
  40. if err != nil {
  41. return err
  42. }
  43. go wsc.receiveEventsRoutine()
  44. return nil
  45. }
  46. func (wsc *WSClient) dial() error {
  47. // Dial
  48. dialer := websocket.DefaultDialer
  49. rHeader := http.Header{}
  50. con, _, err := dialer.Dial(wsc.Address, rHeader)
  51. if err != nil {
  52. return err
  53. }
  54. // Set the ping/pong handlers
  55. con.SetPingHandler(func(m string) error {
  56. // NOTE: https://github.com/gorilla/websocket/issues/97
  57. go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  58. return nil
  59. })
  60. con.SetPongHandler(func(m string) error {
  61. // NOTE: https://github.com/gorilla/websocket/issues/97
  62. return nil
  63. })
  64. wsc.Conn = con
  65. return nil
  66. }
  67. func (wsc *WSClient) OnStop() {
  68. wsc.QuitService.OnStop()
  69. // ResultsCh/ErrorsCh is closed in receiveEventsRoutine.
  70. }
  71. func (wsc *WSClient) receiveEventsRoutine() {
  72. for {
  73. _, data, err := wsc.ReadMessage()
  74. if err != nil {
  75. log.Info("WSClient failed to read message", "error", err, "data", string(data))
  76. wsc.Stop()
  77. break
  78. } else {
  79. var response rpctypes.RPCResponse
  80. err := json.Unmarshal(data, &response)
  81. if err != nil {
  82. log.Info("WSClient failed to parse message", "error", err, "data", string(data))
  83. wsc.ErrorsCh <- err
  84. continue
  85. }
  86. if response.Error != "" {
  87. wsc.ErrorsCh <- fmt.Errorf(err.Error())
  88. continue
  89. }
  90. wsc.ResultsCh <- *response.Result
  91. }
  92. }
  93. // Cleanup
  94. close(wsc.ResultsCh)
  95. close(wsc.ErrorsCh)
  96. }
  97. // subscribe to an event
  98. func (wsc *WSClient) Subscribe(eventid string) error {
  99. err := wsc.WriteJSON(rpctypes.RPCRequest{
  100. JSONRPC: "2.0",
  101. ID: "",
  102. Method: "subscribe",
  103. Params: []interface{}{eventid},
  104. })
  105. return err
  106. }
  107. // unsubscribe from an event
  108. func (wsc *WSClient) Unsubscribe(eventid string) error {
  109. err := wsc.WriteJSON(rpctypes.RPCRequest{
  110. JSONRPC: "2.0",
  111. ID: "",
  112. Method: "unsubscribe",
  113. Params: []interface{}{eventid},
  114. })
  115. return err
  116. }