You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

126 lines
2.9 KiB

  1. package rpcclient
  2. import (
  3. "net/http"
  4. "strings"
  5. "time"
  6. "github.com/gorilla/websocket"
  7. . "github.com/tendermint/go-common"
  8. "github.com/tendermint/go-wire"
  9. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  10. "github.com/tendermint/tendermint/rpc/types"
  11. )
  12. const (
  13. wsEventsChannelCapacity = 10
  14. wsResultsChannelCapacity = 10
  15. wsWriteTimeoutSeconds = 10
  16. )
  17. type WSClient struct {
  18. QuitService
  19. Address string
  20. *websocket.Conn
  21. EventsCh chan ctypes.ResultEvent // closes upon WSClient.Stop()
  22. ResultsCh chan ctypes.Result // closes upon WSClient.Stop()
  23. }
  24. // create a new connection
  25. func NewWSClient(addr string) *WSClient {
  26. wsClient := &WSClient{
  27. Address: addr,
  28. Conn: nil,
  29. EventsCh: make(chan ctypes.ResultEvent, wsEventsChannelCapacity),
  30. ResultsCh: make(chan ctypes.Result, wsResultsChannelCapacity),
  31. }
  32. wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient)
  33. return wsClient
  34. }
  35. func (wsc *WSClient) OnStart() error {
  36. wsc.QuitService.OnStart()
  37. err := wsc.dial()
  38. if err != nil {
  39. return err
  40. }
  41. go wsc.receiveEventsRoutine()
  42. return nil
  43. }
  44. func (wsc *WSClient) dial() error {
  45. // Dial
  46. dialer := websocket.DefaultDialer
  47. rHeader := http.Header{}
  48. con, _, err := dialer.Dial(wsc.Address, rHeader)
  49. if err != nil {
  50. return err
  51. }
  52. // Set the ping/pong handlers
  53. con.SetPingHandler(func(m string) error {
  54. // NOTE: https://github.com/gorilla/websocket/issues/97
  55. go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  56. return nil
  57. })
  58. con.SetPongHandler(func(m string) error {
  59. // NOTE: https://github.com/gorilla/websocket/issues/97
  60. return nil
  61. })
  62. wsc.Conn = con
  63. return nil
  64. }
  65. func (wsc *WSClient) OnStop() {
  66. wsc.QuitService.OnStop()
  67. // EventsCh and ResultsCh are closed in receiveEventsRoutine.
  68. }
  69. func (wsc *WSClient) receiveEventsRoutine() {
  70. for {
  71. _, data, err := wsc.ReadMessage()
  72. if err != nil {
  73. log.Info("WSClient failed to read message", "error", err, "data", string(data))
  74. wsc.Stop()
  75. break
  76. } else {
  77. var response ctypes.Response
  78. wire.ReadJSON(&response, data, &err)
  79. if err != nil {
  80. log.Info("WSClient failed to parse message", "error", err)
  81. wsc.Stop()
  82. break
  83. }
  84. if strings.HasSuffix(response.ID, "#event") {
  85. wsc.EventsCh <- *response.Result.(*ctypes.ResultEvent)
  86. } else {
  87. wsc.ResultsCh <- response.Result
  88. }
  89. }
  90. }
  91. // Cleanup
  92. close(wsc.EventsCh)
  93. close(wsc.ResultsCh)
  94. }
  95. // subscribe to an event
  96. func (wsc *WSClient) Subscribe(eventid string) error {
  97. err := wsc.WriteJSON(rpctypes.RPCRequest{
  98. JSONRPC: "2.0",
  99. ID: "",
  100. Method: "subscribe",
  101. Params: []interface{}{eventid},
  102. })
  103. return err
  104. }
  105. // unsubscribe from an event
  106. func (wsc *WSClient) Unsubscribe(eventid string) error {
  107. err := wsc.WriteJSON(rpctypes.RPCRequest{
  108. JSONRPC: "2.0",
  109. ID: "",
  110. Method: "unsubscribe",
  111. Params: []interface{}{eventid},
  112. })
  113. return err
  114. }