You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

119 lines
2.6 KiB

  1. package core_client
  2. import (
  3. "net/http"
  4. "strings"
  5. "time"
  6. "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/gorilla/websocket"
  7. . "github.com/tendermint/tendermint/common"
  8. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  9. "github.com/tendermint/tendermint/rpc/types"
  10. "github.com/tendermint/tendermint/wire"
  11. )
  12. const (
  13. wsEventsChannelCapacity = 10
  14. wsResultsChannelCapacity = 10
  15. wsWriteTimeoutSeconds = 10
  16. )
  17. type WSClient struct {
  18. QuitService
  19. Address string
  20. *websocket.Conn
  21. EventsCh chan ctypes.ResultEvent
  22. ResultsCh chan ctypes.Result
  23. }
  24. // create a new connection
  25. func NewWSClient(addr string) *WSClient {
  26. wsClient := &WSClient{
  27. Address: addr,
  28. Conn: nil,
  29. EventsCh: make(chan ctypes.ResultEvent, wsEventsChannelCapacity),
  30. ResultsCh: make(chan ctypes.Result, wsResultsChannelCapacity),
  31. }
  32. wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient)
  33. return wsClient
  34. }
  35. func (wsc *WSClient) OnStart() error {
  36. wsc.QuitService.OnStart()
  37. err := wsc.dial()
  38. if err != nil {
  39. return err
  40. }
  41. go wsc.receiveEventsRoutine()
  42. return nil
  43. }
  44. func (wsc *WSClient) dial() error {
  45. // Dial
  46. dialer := websocket.DefaultDialer
  47. rHeader := http.Header{}
  48. con, _, err := dialer.Dial(wsc.Address, rHeader)
  49. if err != nil {
  50. return err
  51. }
  52. // Set the ping/pong handlers
  53. con.SetPingHandler(func(m string) error {
  54. con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  55. return nil
  56. })
  57. con.SetPongHandler(func(m string) error {
  58. return nil
  59. })
  60. wsc.Conn = con
  61. return nil
  62. }
  63. func (wsc *WSClient) OnStop() {
  64. wsc.QuitService.OnStop()
  65. }
  66. func (wsc *WSClient) receiveEventsRoutine() {
  67. for {
  68. _, data, err := wsc.ReadMessage()
  69. if err != nil {
  70. log.Info("WSClient failed to read message", "error", err, "data", string(data))
  71. wsc.Stop()
  72. break
  73. } else {
  74. var response ctypes.Response
  75. wire.ReadJSON(&response, data, &err)
  76. if err != nil {
  77. log.Info("WSClient failed to parse message", "error", err)
  78. wsc.Stop()
  79. break
  80. }
  81. if strings.HasSuffix(response.ID, "#event") {
  82. wsc.EventsCh <- *response.Result.(*ctypes.ResultEvent)
  83. } else {
  84. wsc.ResultsCh <- response.Result
  85. }
  86. }
  87. }
  88. }
  89. // subscribe to an event
  90. func (wsc *WSClient) Subscribe(eventid string) error {
  91. err := wsc.WriteJSON(rpctypes.RPCRequest{
  92. JSONRPC: "2.0",
  93. ID: "",
  94. Method: "subscribe",
  95. Params: []interface{}{eventid},
  96. })
  97. return err
  98. }
  99. // unsubscribe from an event
  100. func (wsc *WSClient) Unsubscribe(eventid string) error {
  101. err := wsc.WriteJSON(rpctypes.RPCRequest{
  102. JSONRPC: "2.0",
  103. ID: "",
  104. Method: "unsubscribe",
  105. Params: []interface{}{eventid},
  106. })
  107. return err
  108. }