You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
6.1 KiB

  1. // Package eventstream implements a convenience client for the Events method
  2. // of the Tendermint RPC service, allowing clients to observe a resumable
  3. // stream of events matching a query.
  4. package eventstream
  5. import (
  6. "context"
  7. "errors"
  8. "fmt"
  9. "time"
  10. "github.com/tendermint/tendermint/rpc/coretypes"
  11. )
  12. // Client is the subset of the RPC client interface consumed by Stream.
  13. type Client interface {
  14. Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error)
  15. }
  16. // ErrStopRunning is returned by a Run callback to signal that no more events
  17. // are wanted and that Run should return.
  18. var ErrStopRunning = errors.New("stop accepting events")
  19. // A Stream cpatures the state of a streaming event subscription.
  20. type Stream struct {
  21. filter *coretypes.EventFilter // the query being streamed
  22. batchSize int // request batch size
  23. newestSeen string // from the latest item matching our query
  24. waitTime time.Duration // the long-polling interval
  25. client Client
  26. }
  27. // New constructs a new stream for the given query and options.
  28. // If opts == nil, the stream uses default values as described by
  29. // StreamOptions. This function will panic if cli == nil.
  30. func New(cli Client, query string, opts *StreamOptions) *Stream {
  31. if cli == nil {
  32. panic("eventstream: nil client")
  33. }
  34. return &Stream{
  35. filter: &coretypes.EventFilter{Query: query},
  36. batchSize: opts.batchSize(),
  37. newestSeen: opts.resumeFrom(),
  38. waitTime: opts.waitTime(),
  39. client: cli,
  40. }
  41. }
  42. // Run polls the service for events matching the query, and calls accept for
  43. // each such event. Run handles pagination transparently, and delivers events
  44. // to accept in order of publication.
  45. //
  46. // Run continues until ctx ends or accept reports an error. If accept returns
  47. // ErrStopRunning, Run returns nil; otherwise Run returns the error reported by
  48. // accept or ctx. Run also returns an error if the server reports an error
  49. // from the Events method.
  50. //
  51. // If the stream falls behind the event log on the server, Run will stop and
  52. // report an error of concrete type *MissedItemsError. Call Reset to reset the
  53. // stream to the head of the log, and call Run again to resume.
  54. func (s *Stream) Run(ctx context.Context, accept func(*coretypes.EventItem) error) error {
  55. for {
  56. items, err := s.fetchPages(ctx)
  57. if err != nil {
  58. return err
  59. }
  60. // Deliver events from the current batch to the receiver. We visit the
  61. // batch in reverse order so the receiver sees them in forward order.
  62. for i := len(items) - 1; i >= 0; i-- {
  63. if err := ctx.Err(); err != nil {
  64. return err
  65. }
  66. itm := items[i]
  67. err := accept(itm)
  68. if itm.Cursor > s.newestSeen {
  69. s.newestSeen = itm.Cursor // update the latest delivered
  70. }
  71. if errors.Is(err, ErrStopRunning) {
  72. return nil
  73. } else if err != nil {
  74. return err
  75. }
  76. }
  77. }
  78. }
  79. // Reset updates the stream's current cursor position to the head of the log.
  80. // This method may safely be called only when Run is not executing.
  81. func (s *Stream) Reset() { s.newestSeen = "" }
  82. // fetchPages fetches the next batch of matching results. If there are multiple
  83. // pages, all the matching pages are retrieved. An error is reported if the
  84. // current scan position falls out of the event log window.
  85. func (s *Stream) fetchPages(ctx context.Context) ([]*coretypes.EventItem, error) {
  86. var pageCursor string // if non-empty, page through items before this
  87. var items []*coretypes.EventItem
  88. // Fetch the next paginated batch of matching responses.
  89. for {
  90. rsp, err := s.client.Events(ctx, &coretypes.RequestEvents{
  91. Filter: s.filter,
  92. MaxItems: s.batchSize,
  93. After: s.newestSeen,
  94. Before: pageCursor,
  95. WaitTime: s.waitTime,
  96. })
  97. if err != nil {
  98. return nil, err
  99. }
  100. // If the oldest item in the log is newer than our most recent item,
  101. // it means we might have missed some events matching our query.
  102. if s.newestSeen != "" && s.newestSeen < rsp.Oldest {
  103. return nil, &MissedItemsError{
  104. Query: s.filter.Query,
  105. NewestSeen: s.newestSeen,
  106. OldestPresent: rsp.Oldest,
  107. }
  108. }
  109. items = append(items, rsp.Items...)
  110. if rsp.More {
  111. // There are more results matching this request, leave the baseline
  112. // where it is and set the page cursor so that subsequent requests
  113. // will get the next chunk.
  114. pageCursor = items[len(items)-1].Cursor
  115. } else if len(items) != 0 {
  116. // We got everything matching so far.
  117. return items, nil
  118. }
  119. }
  120. }
  121. // StreamOptions are optional settings for a Stream value. A nil *StreamOptions
  122. // is ready for use and provides default values as described.
  123. type StreamOptions struct {
  124. // How many items to request per call to the service. The stream may pin
  125. // this value to a minimum default batch size.
  126. BatchSize int
  127. // If set, resume streaming from this cursor. Typically this is set to the
  128. // cursor of the most recently-received matching value. If empty, streaming
  129. // begins at the head of the log (the default).
  130. ResumeFrom string
  131. // Specifies the long poll interval. The stream may pin this value to a
  132. // minimum default poll interval.
  133. WaitTime time.Duration
  134. }
  135. func (o *StreamOptions) batchSize() int {
  136. const minBatchSize = 16
  137. if o == nil || o.BatchSize < minBatchSize {
  138. return minBatchSize
  139. }
  140. return o.BatchSize
  141. }
  142. func (o *StreamOptions) resumeFrom() string {
  143. if o == nil {
  144. return ""
  145. }
  146. return o.ResumeFrom
  147. }
  148. func (o *StreamOptions) waitTime() time.Duration {
  149. const minWaitTime = 5 * time.Second
  150. if o == nil || o.WaitTime < minWaitTime {
  151. return minWaitTime
  152. }
  153. return o.WaitTime
  154. }
  155. // MissedItemsError is an error that indicates the stream missed (lost) some
  156. // number of events matching the specified query.
  157. type MissedItemsError struct {
  158. // The cursor of the newest matching item the stream has observed.
  159. NewestSeen string
  160. // The oldest cursor in the log at the point the miss was detected.
  161. // Any matching events between NewestSeen and OldestPresent are lost.
  162. OldestPresent string
  163. // The active query.
  164. Query string
  165. }
  166. // Error satisfies the error interface.
  167. func (e *MissedItemsError) Error() string {
  168. return fmt.Sprintf("missed events matching %q between %q and %q",
  169. e.Query, e.NewestSeen, e.OldestPresent)
  170. }