You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

225 lines
6.4 KiB

  1. package eventstream_test
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "testing"
  7. "time"
  8. "github.com/fortytw2/leaktest"
  9. "github.com/google/go-cmp/cmp"
  10. "github.com/tendermint/tendermint/internal/eventlog"
  11. "github.com/tendermint/tendermint/internal/eventlog/cursor"
  12. rpccore "github.com/tendermint/tendermint/internal/rpc/core"
  13. "github.com/tendermint/tendermint/rpc/client/eventstream"
  14. "github.com/tendermint/tendermint/rpc/coretypes"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. func TestStream_filterOrder(t *testing.T) {
  18. defer leaktest.Check(t)
  19. s := newStreamTester(t, `tm.event = 'good'`, eventlog.LogSettings{
  20. WindowSize: 30 * time.Second,
  21. }, nil)
  22. // Verify that events are delivered in forward time order (i.e., that the
  23. // stream unpacks the pages correctly) and that events not matching the
  24. // query (here, type="bad") are skipped.
  25. //
  26. // The minimum batch size is 16 and half the events we publish match, so we
  27. // publish > 32 items (> 16 good) to ensure we exercise paging.
  28. etype := [2]string{"good", "bad"}
  29. var items []testItem
  30. for i := 0; i < 40; i++ {
  31. s.advance(100 * time.Millisecond)
  32. text := fmt.Sprintf("item%d", i)
  33. cur := s.publish(etype[i%2], text)
  34. // Even-numbered items match the target type.
  35. if i%2 == 0 {
  36. items = append(items, makeTestItem(cur, text))
  37. }
  38. }
  39. s.start()
  40. for _, itm := range items {
  41. s.mustItem(t, itm)
  42. }
  43. s.stopWait()
  44. }
  45. func TestStream_lostItem(t *testing.T) {
  46. defer leaktest.Check(t)
  47. s := newStreamTester(t, ``, eventlog.LogSettings{
  48. WindowSize: 30 * time.Second,
  49. }, nil)
  50. // Publish an item and let the client observe it.
  51. cur := s.publish("ok", "whatever")
  52. s.start()
  53. s.mustItem(t, makeTestItem(cur, "whatever"))
  54. s.stopWait()
  55. // Time passes, and cur expires out of the window.
  56. s.advance(50 * time.Second)
  57. next1 := s.publish("ok", "more stuff")
  58. s.advance(15 * time.Second)
  59. next2 := s.publish("ok", "still more stuff")
  60. // At this point, the oldest item in the log is newer than the point at
  61. // which we continued, we should get an error.
  62. s.start()
  63. var missed *eventstream.MissedItemsError
  64. if err := s.mustError(t); !errors.As(err, &missed) {
  65. t.Errorf("Wrong error: got %v, want %T", err, missed)
  66. } else {
  67. t.Logf("Correctly reported missed item: %v", missed)
  68. }
  69. // If we reset the stream and continue from head, we should catch up.
  70. s.stopWait()
  71. s.stream.Reset()
  72. s.start()
  73. s.mustItem(t, makeTestItem(next1, "more stuff"))
  74. s.mustItem(t, makeTestItem(next2, "still more stuff"))
  75. s.stopWait()
  76. }
  77. // testItem is a wrapper for comparing item results in a friendly output format
  78. // for the cmp package.
  79. type testItem struct {
  80. Cursor string
  81. Data string
  82. // N.B. Fields exported to simplify use in cmp.
  83. }
  84. func makeTestItem(cur, data string) testItem {
  85. return testItem{
  86. Cursor: cur,
  87. Data: fmt.Sprintf(`{"type":%q,"value":%q}`, types.EventDataString("").TypeTag(), data),
  88. }
  89. }
  90. // streamTester is a simulation harness for an eventstream.Stream. It simulates
  91. // the production service by plumbing an event log into a stub RPC environment,
  92. // into which the test can publish events and advance the perceived time to
  93. // exercise various cases of the stream.
  94. type streamTester struct {
  95. log *eventlog.Log
  96. env *rpccore.Environment
  97. clock int64
  98. index int64
  99. stream *eventstream.Stream
  100. errc chan error
  101. recv chan *coretypes.EventItem
  102. stop func()
  103. }
  104. func newStreamTester(t *testing.T, query string, logOpts eventlog.LogSettings, streamOpts *eventstream.StreamOptions) *streamTester {
  105. t.Helper()
  106. s := new(streamTester)
  107. // Plumb a time source controlled by the tester into the event log.
  108. logOpts.Source = cursor.Source{
  109. TimeIndex: s.timeNow,
  110. }
  111. lg, err := eventlog.New(logOpts)
  112. if err != nil {
  113. t.Fatalf("Creating event log: %v", err)
  114. }
  115. s.log = lg
  116. s.env = &rpccore.Environment{EventLog: lg}
  117. s.stream = eventstream.New(s, query, streamOpts)
  118. return s
  119. }
  120. // start starts the stream receiver, which runs until it it terminated by
  121. // calling stop.
  122. func (s *streamTester) start() {
  123. ctx, cancel := context.WithCancel(context.Background())
  124. s.errc = make(chan error, 1)
  125. s.recv = make(chan *coretypes.EventItem)
  126. s.stop = cancel
  127. go func() {
  128. defer close(s.errc)
  129. s.errc <- s.stream.Run(ctx, func(itm *coretypes.EventItem) error {
  130. select {
  131. case <-ctx.Done():
  132. return ctx.Err()
  133. case s.recv <- itm:
  134. return nil
  135. }
  136. })
  137. }()
  138. }
  139. // publish adds a single event to the event log at the present moment.
  140. func (s *streamTester) publish(etype, payload string) string {
  141. _ = s.log.Add(etype, types.EventDataString(payload))
  142. s.index++
  143. return fmt.Sprintf("%016x-%04x", s.clock, s.index)
  144. }
  145. // wait blocks until either an item is received or the runner stops.
  146. func (s *streamTester) wait() (*coretypes.EventItem, error) {
  147. select {
  148. case itm := <-s.recv:
  149. return itm, nil
  150. case err := <-s.errc:
  151. return nil, err
  152. }
  153. }
  154. // mustItem waits for an item and fails if either an error occurs or the item
  155. // does not match want.
  156. func (s *streamTester) mustItem(t *testing.T, want testItem) {
  157. t.Helper()
  158. itm, err := s.wait()
  159. if err != nil {
  160. t.Fatalf("Receive: got error %v, want item %v", err, want)
  161. }
  162. got := testItem{Cursor: itm.Cursor, Data: string(itm.Data)}
  163. if diff := cmp.Diff(want, got); diff != "" {
  164. t.Errorf("Item: (-want, +got)\n%s", diff)
  165. }
  166. }
  167. // mustError waits for an error and fails if an item is returned.
  168. func (s *streamTester) mustError(t *testing.T) error {
  169. t.Helper()
  170. itm, err := s.wait()
  171. if err == nil {
  172. t.Fatalf("Receive: got item %v, want error", itm)
  173. }
  174. return err
  175. }
  176. // stopWait stops the runner and waits for it to terminate.
  177. func (s *streamTester) stopWait() { s.stop(); s.wait() } //nolint:errcheck
  178. // timeNow reports the current simulated time index.
  179. func (s *streamTester) timeNow() int64 { return s.clock }
  180. // advance moves the simulated time index.
  181. func (s *streamTester) advance(d time.Duration) { s.clock += int64(d) }
  182. // Events implements the eventstream.Client interface by delegating to a stub
  183. // environment as if it were a local RPC client. This works because the Events
  184. // method only requires the event log, the other fields are unused.
  185. func (s *streamTester) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) {
  186. var before, after cursor.Cursor
  187. if err := before.UnmarshalText([]byte(req.Before)); err != nil {
  188. return nil, err
  189. }
  190. if err := after.UnmarshalText([]byte(req.After)); err != nil {
  191. return nil, err
  192. }
  193. return s.env.Events(ctx, req.Filter, req.MaxItems, before, after, req.WaitTime)
  194. }