You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

116 lines
2.5 KiB

  1. package rpcclient
  2. import (
  3. "net/http"
  4. "time"
  5. "github.com/gorilla/websocket"
  6. . "github.com/tendermint/go-common"
  7. "github.com/tendermint/go-wire"
  8. "github.com/tendermint/tendermint/rpc/types"
  9. )
  10. const (
  11. wsResultsChannelCapacity = 10
  12. wsWriteTimeoutSeconds = 10
  13. )
  14. type WSClient struct {
  15. QuitService
  16. Address string
  17. *websocket.Conn
  18. ResultsCh chan rpctypes.Result // closes upon WSClient.Stop()
  19. }
  20. // create a new connection
  21. func NewWSClient(addr string) *WSClient {
  22. wsClient := &WSClient{
  23. Address: addr,
  24. Conn: nil,
  25. ResultsCh: make(chan rpctypes.Result, wsResultsChannelCapacity),
  26. }
  27. wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient)
  28. return wsClient
  29. }
  30. func (wsc *WSClient) OnStart() error {
  31. wsc.QuitService.OnStart()
  32. err := wsc.dial()
  33. if err != nil {
  34. return err
  35. }
  36. go wsc.receiveEventsRoutine()
  37. return nil
  38. }
  39. func (wsc *WSClient) dial() error {
  40. // Dial
  41. dialer := websocket.DefaultDialer
  42. rHeader := http.Header{}
  43. con, _, err := dialer.Dial(wsc.Address, rHeader)
  44. if err != nil {
  45. return err
  46. }
  47. // Set the ping/pong handlers
  48. con.SetPingHandler(func(m string) error {
  49. // NOTE: https://github.com/gorilla/websocket/issues/97
  50. go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  51. return nil
  52. })
  53. con.SetPongHandler(func(m string) error {
  54. // NOTE: https://github.com/gorilla/websocket/issues/97
  55. return nil
  56. })
  57. wsc.Conn = con
  58. return nil
  59. }
  60. func (wsc *WSClient) OnStop() {
  61. wsc.QuitService.OnStop()
  62. // ResultsCh is closed in receiveEventsRoutine.
  63. }
  64. func (wsc *WSClient) receiveEventsRoutine() {
  65. for {
  66. _, data, err := wsc.ReadMessage()
  67. if err != nil {
  68. log.Info("WSClient failed to read message", "error", err, "data", string(data))
  69. wsc.Stop()
  70. break
  71. } else {
  72. var response rpctypes.RPCResponse
  73. wire.ReadJSON(&response, data, &err)
  74. if err != nil {
  75. log.Info("WSClient failed to parse message", "error", err)
  76. wsc.Stop()
  77. break
  78. }
  79. wsc.ResultsCh <- response.Result
  80. }
  81. }
  82. // Cleanup
  83. close(wsc.ResultsCh)
  84. }
  85. // subscribe to an event
  86. func (wsc *WSClient) Subscribe(eventid string) error {
  87. err := wsc.WriteJSON(rpctypes.RPCRequest{
  88. JSONRPC: "2.0",
  89. ID: "",
  90. Method: "subscribe",
  91. Params: []interface{}{eventid},
  92. })
  93. return err
  94. }
  95. // unsubscribe from an event
  96. func (wsc *WSClient) Unsubscribe(eventid string) error {
  97. err := wsc.WriteJSON(rpctypes.RPCRequest{
  98. JSONRPC: "2.0",
  99. ID: "",
  100. Method: "unsubscribe",
  101. Params: []interface{}{eventid},
  102. })
  103. return err
  104. }