You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

274 lines
8.0 KiB

  1. package eventstream_test
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "testing"
  7. "time"
  8. "github.com/fortytw2/leaktest"
  9. "github.com/google/go-cmp/cmp"
  10. "github.com/tendermint/tendermint/internal/eventlog"
  11. "github.com/tendermint/tendermint/internal/eventlog/cursor"
  12. rpccore "github.com/tendermint/tendermint/internal/rpc/core"
  13. "github.com/tendermint/tendermint/rpc/client/eventstream"
  14. "github.com/tendermint/tendermint/rpc/coretypes"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. func TestStream_filterOrder(t *testing.T) {
  18. defer leaktest.Check(t)
  19. s := newStreamTester(t, `tm.event = 'good'`, eventlog.LogSettings{
  20. WindowSize: 30 * time.Second,
  21. }, nil)
  22. // Verify that events are delivered in forward time order (i.e., that the
  23. // stream unpacks the pages correctly) and that events not matching the
  24. // query (here, type="bad") are skipped.
  25. //
  26. // The minimum batch size is 16 and half the events we publish match, so we
  27. // publish > 32 items (> 16 good) to ensure we exercise paging.
  28. etype := [2]string{"good", "bad"}
  29. var items []testItem
  30. for i := 0; i < 40; i++ {
  31. s.advance(100 * time.Millisecond)
  32. text := fmt.Sprintf("item%d", i)
  33. cur := s.publish(etype[i%2], text)
  34. // Even-numbered items match the target type.
  35. if i%2 == 0 {
  36. items = append(items, makeTestItem(cur, text))
  37. }
  38. }
  39. s.start()
  40. for _, itm := range items {
  41. s.mustItem(t, itm)
  42. }
  43. s.stopWait()
  44. }
  45. func TestStream_lostItem(t *testing.T) {
  46. defer leaktest.Check(t)
  47. s := newStreamTester(t, ``, eventlog.LogSettings{
  48. WindowSize: 30 * time.Second,
  49. }, nil)
  50. // Publish an item and let the client observe it.
  51. cur := s.publish("ok", "whatever")
  52. s.start()
  53. s.mustItem(t, makeTestItem(cur, "whatever"))
  54. s.stopWait()
  55. // Time passes, and cur expires out of the window.
  56. s.advance(50 * time.Second)
  57. next1 := s.publish("ok", "more stuff")
  58. s.advance(15 * time.Second)
  59. next2 := s.publish("ok", "still more stuff")
  60. // At this point, the oldest item in the log is newer than the point at
  61. // which we continued, we should get an error.
  62. s.start()
  63. var missed *eventstream.MissedItemsError
  64. if err := s.mustError(t); !errors.As(err, &missed) {
  65. t.Errorf("Wrong error: got %v, want %T", err, missed)
  66. } else {
  67. t.Logf("Correctly reported missed item: %v", missed)
  68. }
  69. // If we reset the stream and continue from head, we should catch up.
  70. s.stopWait()
  71. s.stream.Reset()
  72. s.start()
  73. s.mustItem(t, makeTestItem(next1, "more stuff"))
  74. s.mustItem(t, makeTestItem(next2, "still more stuff"))
  75. s.stopWait()
  76. }
  77. func TestMinPollTime(t *testing.T) {
  78. defer leaktest.Check(t)
  79. s := newStreamTester(t, ``, eventlog.LogSettings{
  80. WindowSize: 30 * time.Second,
  81. }, nil)
  82. s.publish("bad", "whatever")
  83. // Waiting for an item on a log with no matching events incurs a minimum
  84. // wait time and reports no events.
  85. ctx := context.Background()
  86. filter := &coretypes.EventFilter{Query: `tm.event = 'good'`}
  87. var zero cursor.Cursor
  88. t.Run("NoneMatch", func(t *testing.T) {
  89. start := time.Now()
  90. // Request a very short delay, and affirm we got the server's minimum.
  91. rsp, err := s.env.Events(ctx, filter, 1, zero, zero, 10*time.Millisecond)
  92. if err != nil {
  93. t.Fatalf("Events failed: %v", err)
  94. } else if elapsed := time.Since(start); elapsed < time.Second {
  95. t.Errorf("Events returned too quickly: got %v, wanted 1s", elapsed)
  96. } else if len(rsp.Items) != 0 {
  97. t.Errorf("Events returned %d items, expected none", len(rsp.Items))
  98. }
  99. })
  100. s.publish("good", "whatever")
  101. // Waiting for an available matching item incurs no delay.
  102. t.Run("SomeMatch", func(t *testing.T) {
  103. start := time.Now()
  104. // Request a long-ish delay and affirm we don't block for it.
  105. // Check for this by ensuring we return sooner than the minimum delay,
  106. // since we don't know the exact timing.
  107. rsp, err := s.env.Events(ctx, filter, 1, zero, zero, 10*time.Second)
  108. if err != nil {
  109. t.Fatalf("Events failed: %v", err)
  110. } else if elapsed := time.Since(start); elapsed > 500*time.Millisecond {
  111. t.Errorf("Events returned too slowly: got %v, wanted immediate", elapsed)
  112. } else if len(rsp.Items) == 0 {
  113. t.Error("Events returned no items, wanted at least 1")
  114. }
  115. })
  116. }
  117. // testItem is a wrapper for comparing item results in a friendly output format
  118. // for the cmp package.
  119. type testItem struct {
  120. Cursor string
  121. Data string
  122. // N.B. Fields exported to simplify use in cmp.
  123. }
  124. func makeTestItem(cur, data string) testItem {
  125. return testItem{
  126. Cursor: cur,
  127. Data: fmt.Sprintf(`{"type":%q,"value":%q}`, types.EventDataString("").TypeTag(), data),
  128. }
  129. }
  130. // streamTester is a simulation harness for an eventstream.Stream. It simulates
  131. // the production service by plumbing an event log into a stub RPC environment,
  132. // into which the test can publish events and advance the perceived time to
  133. // exercise various cases of the stream.
  134. type streamTester struct {
  135. log *eventlog.Log
  136. env *rpccore.Environment
  137. clock int64
  138. index int64
  139. stream *eventstream.Stream
  140. errc chan error
  141. recv chan *coretypes.EventItem
  142. stop func()
  143. }
  144. func newStreamTester(t *testing.T, query string, logOpts eventlog.LogSettings, streamOpts *eventstream.StreamOptions) *streamTester {
  145. t.Helper()
  146. s := new(streamTester)
  147. // Plumb a time source controlled by the tester into the event log.
  148. logOpts.Source = cursor.Source{
  149. TimeIndex: s.timeNow,
  150. }
  151. lg, err := eventlog.New(logOpts)
  152. if err != nil {
  153. t.Fatalf("Creating event log: %v", err)
  154. }
  155. s.log = lg
  156. s.env = &rpccore.Environment{EventLog: lg}
  157. s.stream = eventstream.New(s, query, streamOpts)
  158. return s
  159. }
  160. // start starts the stream receiver, which runs until it it terminated by
  161. // calling stop.
  162. func (s *streamTester) start() {
  163. ctx, cancel := context.WithCancel(context.Background())
  164. s.errc = make(chan error, 1)
  165. s.recv = make(chan *coretypes.EventItem)
  166. s.stop = cancel
  167. go func() {
  168. defer close(s.errc)
  169. s.errc <- s.stream.Run(ctx, func(itm *coretypes.EventItem) error {
  170. select {
  171. case <-ctx.Done():
  172. return ctx.Err()
  173. case s.recv <- itm:
  174. return nil
  175. }
  176. })
  177. }()
  178. }
  179. // publish adds a single event to the event log at the present moment.
  180. func (s *streamTester) publish(etype, payload string) string {
  181. _ = s.log.Add(etype, types.EventDataString(payload))
  182. s.index++
  183. return fmt.Sprintf("%016x-%04x", s.clock, s.index)
  184. }
  185. // wait blocks until either an item is received or the runner stops.
  186. func (s *streamTester) wait() (*coretypes.EventItem, error) {
  187. select {
  188. case itm := <-s.recv:
  189. return itm, nil
  190. case err := <-s.errc:
  191. return nil, err
  192. }
  193. }
  194. // mustItem waits for an item and fails if either an error occurs or the item
  195. // does not match want.
  196. func (s *streamTester) mustItem(t *testing.T, want testItem) {
  197. t.Helper()
  198. itm, err := s.wait()
  199. if err != nil {
  200. t.Fatalf("Receive: got error %v, want item %v", err, want)
  201. }
  202. got := testItem{Cursor: itm.Cursor, Data: string(itm.Data)}
  203. if diff := cmp.Diff(want, got); diff != "" {
  204. t.Errorf("Item: (-want, +got)\n%s", diff)
  205. }
  206. }
  207. // mustError waits for an error and fails if an item is returned.
  208. func (s *streamTester) mustError(t *testing.T) error {
  209. t.Helper()
  210. itm, err := s.wait()
  211. if err == nil {
  212. t.Fatalf("Receive: got item %v, want error", itm)
  213. }
  214. return err
  215. }
  216. // stopWait stops the runner and waits for it to terminate.
  217. func (s *streamTester) stopWait() { s.stop(); s.wait() } //nolint:errcheck
  218. // timeNow reports the current simulated time index.
  219. func (s *streamTester) timeNow() int64 { return s.clock }
  220. // advance moves the simulated time index.
  221. func (s *streamTester) advance(d time.Duration) { s.clock += int64(d) }
  222. // Events implements the eventstream.Client interface by delegating to a stub
  223. // environment as if it were a local RPC client. This works because the Events
  224. // method only requires the event log, the other fields are unused.
  225. func (s *streamTester) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) {
  226. var before, after cursor.Cursor
  227. if err := before.UnmarshalText([]byte(req.Before)); err != nil {
  228. return nil, err
  229. }
  230. if err := after.UnmarshalText([]byte(req.After)); err != nil {
  231. return nil, err
  232. }
  233. return s.env.Events(ctx, req.Filter, req.MaxItems, before, after, req.WaitTime)
  234. }