You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

119 lines
2.6 KiB

  1. package rpcclient
  2. import (
  3. "net/http"
  4. "time"
  5. "github.com/gorilla/websocket"
  6. . "github.com/tendermint/go-common"
  7. "github.com/tendermint/go-rpc/types"
  8. "github.com/tendermint/go-wire"
  9. )
  10. const (
  11. wsResultsChannelCapacity = 10
  12. wsWriteTimeoutSeconds = 10
  13. )
  14. type WSClient struct {
  15. QuitService
  16. Address string
  17. *websocket.Conn
  18. ResultsCh chan rpctypes.Result // closes upon WSClient.Stop()
  19. }
  20. // create a new connection
  21. func NewWSClient(addr string) *WSClient {
  22. wsClient := &WSClient{
  23. Address: addr,
  24. Conn: nil,
  25. ResultsCh: make(chan rpctypes.Result, wsResultsChannelCapacity),
  26. }
  27. wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient)
  28. return wsClient
  29. }
  30. func (wsc *WSClient) OnStart() error {
  31. wsc.QuitService.OnStart()
  32. err := wsc.dial()
  33. if err != nil {
  34. return err
  35. }
  36. go wsc.receiveEventsRoutine()
  37. return nil
  38. }
  39. func (wsc *WSClient) dial() error {
  40. // Dial
  41. dialer := websocket.DefaultDialer
  42. rHeader := http.Header{}
  43. con, _, err := dialer.Dial(wsc.Address, rHeader)
  44. if err != nil {
  45. return err
  46. }
  47. // Set the ping/pong handlers
  48. con.SetPingHandler(func(m string) error {
  49. // NOTE: https://github.com/gorilla/websocket/issues/97
  50. log.Debug("Client received ping, writing pong")
  51. go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  52. return nil
  53. })
  54. con.SetPongHandler(func(m string) error {
  55. log.Debug("Client received pong")
  56. // NOTE: https://github.com/gorilla/websocket/issues/97
  57. return nil
  58. })
  59. wsc.Conn = con
  60. return nil
  61. }
  62. func (wsc *WSClient) OnStop() {
  63. wsc.QuitService.OnStop()
  64. // ResultsCh is closed in receiveEventsRoutine.
  65. }
  66. func (wsc *WSClient) receiveEventsRoutine() {
  67. for {
  68. log.Notice("Waiting for wsc message ...")
  69. _, data, err := wsc.ReadMessage()
  70. if err != nil {
  71. log.Info("WSClient failed to read message", "error", err, "data", string(data))
  72. wsc.Stop()
  73. break
  74. } else {
  75. var response rpctypes.RPCResponse
  76. wire.ReadJSON(&response, data, &err)
  77. if err != nil {
  78. log.Info("WSClient failed to parse message", "error", err, "data", string(data))
  79. wsc.Stop()
  80. break
  81. }
  82. wsc.ResultsCh <- response.Result
  83. }
  84. }
  85. // Cleanup
  86. close(wsc.ResultsCh)
  87. }
  88. // subscribe to an event
  89. func (wsc *WSClient) Subscribe(eventid string) error {
  90. err := wsc.WriteJSON(rpctypes.RPCRequest{
  91. JSONRPC: "2.0",
  92. ID: "",
  93. Method: "subscribe",
  94. Params: []interface{}{eventid},
  95. })
  96. return err
  97. }
  98. // unsubscribe from an event
  99. func (wsc *WSClient) Unsubscribe(eventid string) error {
  100. err := wsc.WriteJSON(rpctypes.RPCRequest{
  101. JSONRPC: "2.0",
  102. ID: "",
  103. Method: "unsubscribe",
  104. Params: []interface{}{eventid},
  105. })
  106. return err
  107. }