You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

275 lines
6.6 KiB

  1. package http
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "time"
  8. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  9. tmjson "github.com/tendermint/tendermint/libs/json"
  10. tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
  11. "github.com/tendermint/tendermint/libs/service"
  12. rpcclient "github.com/tendermint/tendermint/rpc/client"
  13. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  14. jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client"
  15. )
  16. var errNotRunning = errors.New("client is not running. Use .Start() method to start")
  17. // WSOptions for the WS part of the HTTP client.
  18. type WSOptions struct {
  19. Path string // path (e.g. "/ws")
  20. jsonrpcclient.WSOptions // WSClient options
  21. }
  22. // DefaultWSOptions returns default WS options.
  23. // See jsonrpcclient.DefaultWSOptions.
  24. func DefaultWSOptions() WSOptions {
  25. return WSOptions{
  26. Path: "/websocket",
  27. WSOptions: jsonrpcclient.DefaultWSOptions(),
  28. }
  29. }
  30. // Validate performs a basic validation of WSOptions.
  31. func (wso WSOptions) Validate() error {
  32. if len(wso.Path) <= 1 {
  33. return errors.New("empty Path")
  34. }
  35. if wso.Path[0] != '/' {
  36. return errors.New("leading slash is missing in Path")
  37. }
  38. return nil
  39. }
  40. // wsEvents is a wrapper around WSClient, which implements EventsClient.
  41. type wsEvents struct {
  42. service.BaseService
  43. ws *jsonrpcclient.WSClient
  44. mtx tmsync.RWMutex
  45. subscriptions map[string]*wsSubscription
  46. }
  47. type wsSubscription struct {
  48. res chan ctypes.ResultEvent
  49. id string
  50. query string
  51. }
  52. var _ rpcclient.EventsClient = (*wsEvents)(nil)
  53. func newWsEvents(remote string, wso WSOptions) (*wsEvents, error) {
  54. // validate options
  55. if err := wso.Validate(); err != nil {
  56. return nil, fmt.Errorf("invalid WSOptions: %w", err)
  57. }
  58. // remove the trailing / from the remote else the websocket endpoint
  59. // won't parse correctly
  60. if remote[len(remote)-1] == '/' {
  61. remote = remote[:len(remote)-1]
  62. }
  63. w := &wsEvents{
  64. subscriptions: make(map[string]*wsSubscription),
  65. }
  66. w.BaseService = *service.NewBaseService(nil, "wsEvents", w)
  67. var err error
  68. w.ws, err = jsonrpcclient.NewWSWithOptions(remote, wso.Path, wso.WSOptions)
  69. if err != nil {
  70. return nil, fmt.Errorf("can't create WS client: %w", err)
  71. }
  72. w.ws.OnReconnect(func() {
  73. // resubscribe immediately
  74. w.redoSubscriptionsAfter(0 * time.Second)
  75. })
  76. w.ws.SetLogger(w.Logger)
  77. return w, nil
  78. }
  79. // OnStart implements service.Service by starting WSClient and event loop.
  80. func (w *wsEvents) OnStart() error {
  81. if err := w.ws.Start(); err != nil {
  82. return err
  83. }
  84. go w.eventListener()
  85. return nil
  86. }
  87. // OnStop implements service.Service by stopping WSClient.
  88. func (w *wsEvents) OnStop() {
  89. if err := w.ws.Stop(); err != nil {
  90. w.Logger.Error("Can't stop ws client", "err", err)
  91. }
  92. }
  93. // Subscribe implements EventsClient by using WSClient to subscribe given
  94. // subscriber to query. By default, it returns a channel with cap=1. Error is
  95. // returned if it fails to subscribe.
  96. //
  97. // When reading from the channel, keep in mind there's a single events loop, so
  98. // if you don't read events for this subscription fast enough, other
  99. // subscriptions will slow down in effect.
  100. //
  101. // The channel is never closed to prevent clients from seeing an erroneous
  102. // event.
  103. //
  104. // It returns an error if wsEvents is not running.
  105. func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string,
  106. outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
  107. if !w.IsRunning() {
  108. return nil, errNotRunning
  109. }
  110. if err := w.ws.Subscribe(ctx, query); err != nil {
  111. return nil, err
  112. }
  113. outCap := 1
  114. if len(outCapacity) > 0 {
  115. outCap = outCapacity[0]
  116. }
  117. outc := make(chan ctypes.ResultEvent, outCap)
  118. w.mtx.Lock()
  119. defer w.mtx.Unlock()
  120. // subscriber param is ignored because Tendermint will override it with
  121. // remote IP anyway.
  122. w.subscriptions[query] = &wsSubscription{res: outc, query: query}
  123. return outc, nil
  124. }
  125. // Unsubscribe implements EventsClient by using WSClient to unsubscribe given
  126. // subscriber from query.
  127. //
  128. // It returns an error if wsEvents is not running.
  129. func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error {
  130. if !w.IsRunning() {
  131. return errNotRunning
  132. }
  133. if err := w.ws.Unsubscribe(ctx, query); err != nil {
  134. return err
  135. }
  136. w.mtx.Lock()
  137. info, ok := w.subscriptions[query]
  138. if ok {
  139. if info.id != "" {
  140. delete(w.subscriptions, info.id)
  141. }
  142. delete(w.subscriptions, info.query)
  143. }
  144. w.mtx.Unlock()
  145. return nil
  146. }
  147. // UnsubscribeAll implements EventsClient by using WSClient to unsubscribe
  148. // given subscriber from all the queries.
  149. //
  150. // It returns an error if wsEvents is not running.
  151. func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error {
  152. if !w.IsRunning() {
  153. return errNotRunning
  154. }
  155. if err := w.ws.UnsubscribeAll(ctx); err != nil {
  156. return err
  157. }
  158. w.mtx.Lock()
  159. w.subscriptions = make(map[string]*wsSubscription)
  160. w.mtx.Unlock()
  161. return nil
  162. }
  163. // After being reconnected, it is necessary to redo subscription to server
  164. // otherwise no data will be automatically received.
  165. func (w *wsEvents) redoSubscriptionsAfter(d time.Duration) {
  166. time.Sleep(d)
  167. ctx := context.Background()
  168. w.mtx.Lock()
  169. defer w.mtx.Unlock()
  170. for q, info := range w.subscriptions {
  171. if q != "" && q == info.id {
  172. continue
  173. }
  174. err := w.ws.Subscribe(ctx, q)
  175. if err != nil {
  176. w.Logger.Error("failed to resubscribe", "query", q, "err", err)
  177. delete(w.subscriptions, q)
  178. }
  179. }
  180. }
  181. func isErrAlreadySubscribed(err error) bool {
  182. return strings.Contains(err.Error(), tmpubsub.ErrAlreadySubscribed.Error())
  183. }
  184. func (w *wsEvents) eventListener() {
  185. for {
  186. select {
  187. case resp, ok := <-w.ws.ResponsesCh:
  188. if !ok {
  189. return
  190. }
  191. if resp.Error != nil {
  192. w.Logger.Error("WS error", "err", resp.Error.Error())
  193. // Error can be ErrAlreadySubscribed or max client (subscriptions per
  194. // client) reached or Tendermint exited.
  195. // We can ignore ErrAlreadySubscribed, but need to retry in other
  196. // cases.
  197. if !isErrAlreadySubscribed(resp.Error) {
  198. // Resubscribe after 1 second to give Tendermint time to restart (if
  199. // crashed).
  200. w.redoSubscriptionsAfter(1 * time.Second)
  201. }
  202. continue
  203. }
  204. result := new(ctypes.ResultEvent)
  205. err := tmjson.Unmarshal(resp.Result, result)
  206. if err != nil {
  207. w.Logger.Error("failed to unmarshal response", "err", err)
  208. continue
  209. }
  210. w.mtx.RLock()
  211. out, ok := w.subscriptions[result.Query]
  212. if ok {
  213. if _, idOk := w.subscriptions[result.SubscriptionID]; !idOk {
  214. out.id = result.SubscriptionID
  215. w.subscriptions[result.SubscriptionID] = out
  216. }
  217. }
  218. w.mtx.RUnlock()
  219. if ok {
  220. select {
  221. case out.res <- *result:
  222. case <-w.Quit():
  223. return
  224. }
  225. }
  226. case <-w.Quit():
  227. return
  228. }
  229. }
  230. }