You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

249 lines
6.0 KiB

  1. package http
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "time"
  8. tmjson "github.com/tendermint/tendermint/libs/json"
  9. tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
  10. "github.com/tendermint/tendermint/libs/service"
  11. tmsync "github.com/tendermint/tendermint/libs/sync"
  12. rpcclient "github.com/tendermint/tendermint/rpc/client"
  13. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  14. jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client"
  15. )
  16. var errNotRunning = errors.New("client is not running. Use .Start() method to start")
  17. // WSOptions for the WS part of the HTTP client.
  18. type WSOptions struct {
  19. Path string // path (e.g. "/ws")
  20. jsonrpcclient.WSOptions // WSClient options
  21. }
  22. // DefaultWSOptions returns default WS options.
  23. // See jsonrpcclient.DefaultWSOptions.
  24. func DefaultWSOptions() WSOptions {
  25. return WSOptions{
  26. Path: "/websocket",
  27. WSOptions: jsonrpcclient.DefaultWSOptions(),
  28. }
  29. }
  30. // Validate performs a basic validation of WSOptions.
  31. func (wso WSOptions) Validate() error {
  32. if len(wso.Path) <= 1 {
  33. return errors.New("empty Path")
  34. }
  35. if wso.Path[0] != '/' {
  36. return errors.New("leading slash is missing in Path")
  37. }
  38. return nil
  39. }
  40. // wsEvents is a wrapper around WSClient, which implements EventsClient.
  41. type wsEvents struct {
  42. service.BaseService
  43. ws *jsonrpcclient.WSClient
  44. mtx tmsync.RWMutex
  45. subscriptions map[string]chan ctypes.ResultEvent // query -> chan
  46. }
  47. var _ rpcclient.EventsClient = (*wsEvents)(nil)
  48. func newWsEvents(remote string, wso WSOptions) (*wsEvents, error) {
  49. // validate options
  50. if err := wso.Validate(); err != nil {
  51. return nil, fmt.Errorf("invalid WSOptions: %w", err)
  52. }
  53. w := &wsEvents{
  54. subscriptions: make(map[string]chan ctypes.ResultEvent),
  55. }
  56. w.BaseService = *service.NewBaseService(nil, "wsEvents", w)
  57. var err error
  58. w.ws, err = jsonrpcclient.NewWSWithOptions(remote, wso.Path, wso.WSOptions)
  59. if err != nil {
  60. return nil, fmt.Errorf("can't create WS client: %w", err)
  61. }
  62. w.ws.OnReconnect(func() {
  63. // resubscribe immediately
  64. w.redoSubscriptionsAfter(0 * time.Second)
  65. })
  66. w.ws.SetLogger(w.Logger)
  67. return w, nil
  68. }
  69. // OnStart implements service.Service by starting WSClient and event loop.
  70. func (w *wsEvents) OnStart() error {
  71. if err := w.ws.Start(); err != nil {
  72. return err
  73. }
  74. go w.eventListener()
  75. return nil
  76. }
  77. // OnStop implements service.Service by stopping WSClient.
  78. func (w *wsEvents) OnStop() {
  79. if err := w.ws.Stop(); err != nil {
  80. w.Logger.Error("Can't stop ws client", "err", err)
  81. }
  82. }
  83. // Subscribe implements EventsClient by using WSClient to subscribe given
  84. // subscriber to query. By default, it returns a channel with cap=1. Error is
  85. // returned if it fails to subscribe.
  86. //
  87. // When reading from the channel, keep in mind there's a single events loop, so
  88. // if you don't read events for this subscription fast enough, other
  89. // subscriptions will slow down in effect.
  90. //
  91. // The channel is never closed to prevent clients from seeing an erroneous
  92. // event.
  93. //
  94. // It returns an error if wsEvents is not running.
  95. func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string,
  96. outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
  97. if !w.IsRunning() {
  98. return nil, errNotRunning
  99. }
  100. if err := w.ws.Subscribe(ctx, query); err != nil {
  101. return nil, err
  102. }
  103. outCap := 1
  104. if len(outCapacity) > 0 {
  105. outCap = outCapacity[0]
  106. }
  107. outc := make(chan ctypes.ResultEvent, outCap)
  108. w.mtx.Lock()
  109. // subscriber param is ignored because Tendermint will override it with
  110. // remote IP anyway.
  111. w.subscriptions[query] = outc
  112. w.mtx.Unlock()
  113. return outc, nil
  114. }
  115. // Unsubscribe implements EventsClient by using WSClient to unsubscribe given
  116. // subscriber from query.
  117. //
  118. // It returns an error if wsEvents is not running.
  119. func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error {
  120. if !w.IsRunning() {
  121. return errNotRunning
  122. }
  123. if err := w.ws.Unsubscribe(ctx, query); err != nil {
  124. return err
  125. }
  126. w.mtx.Lock()
  127. _, ok := w.subscriptions[query]
  128. if ok {
  129. delete(w.subscriptions, query)
  130. }
  131. w.mtx.Unlock()
  132. return nil
  133. }
  134. // UnsubscribeAll implements EventsClient by using WSClient to unsubscribe
  135. // given subscriber from all the queries.
  136. //
  137. // It returns an error if wsEvents is not running.
  138. func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error {
  139. if !w.IsRunning() {
  140. return errNotRunning
  141. }
  142. if err := w.ws.UnsubscribeAll(ctx); err != nil {
  143. return err
  144. }
  145. w.mtx.Lock()
  146. w.subscriptions = make(map[string]chan ctypes.ResultEvent)
  147. w.mtx.Unlock()
  148. return nil
  149. }
  150. // After being reconnected, it is necessary to redo subscription to server
  151. // otherwise no data will be automatically received.
  152. func (w *wsEvents) redoSubscriptionsAfter(d time.Duration) {
  153. time.Sleep(d)
  154. ctx := context.Background()
  155. w.mtx.Lock()
  156. defer w.mtx.Unlock()
  157. for q := range w.subscriptions {
  158. err := w.ws.Subscribe(ctx, q)
  159. if err != nil {
  160. w.Logger.Error("failed to resubscribe", "query", q, "err", err)
  161. delete(w.subscriptions, q)
  162. }
  163. }
  164. }
  165. func isErrAlreadySubscribed(err error) bool {
  166. return strings.Contains(err.Error(), tmpubsub.ErrAlreadySubscribed.Error())
  167. }
  168. func (w *wsEvents) eventListener() {
  169. for {
  170. select {
  171. case resp, ok := <-w.ws.ResponsesCh:
  172. if !ok {
  173. return
  174. }
  175. if resp.Error != nil {
  176. w.Logger.Error("WS error", "err", resp.Error.Error())
  177. // Error can be ErrAlreadySubscribed or max client (subscriptions per
  178. // client) reached or Tendermint exited.
  179. // We can ignore ErrAlreadySubscribed, but need to retry in other
  180. // cases.
  181. if !isErrAlreadySubscribed(resp.Error) {
  182. // Resubscribe after 1 second to give Tendermint time to restart (if
  183. // crashed).
  184. w.redoSubscriptionsAfter(1 * time.Second)
  185. }
  186. continue
  187. }
  188. result := new(ctypes.ResultEvent)
  189. err := tmjson.Unmarshal(resp.Result, result)
  190. if err != nil {
  191. w.Logger.Error("failed to unmarshal response", "err", err)
  192. continue
  193. }
  194. w.mtx.RLock()
  195. out, ok := w.subscriptions[result.Query]
  196. w.mtx.RUnlock()
  197. if ok {
  198. select {
  199. case out <- *result:
  200. case <-w.Quit():
  201. return
  202. }
  203. }
  204. case <-w.Quit():
  205. return
  206. }
  207. }
  208. }