diff --git a/rpc/client/eventstream/eventstream.go b/rpc/client/eventstream/eventstream.go new file mode 100644 index 000000000..887e723ce --- /dev/null +++ b/rpc/client/eventstream/eventstream.go @@ -0,0 +1,194 @@ +// Package eventstream implements a convenience client for the Events method +// of the Tendermint RPC service, allowing clients to observe a resumable +// stream of events matching a query. +package eventstream + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/tendermint/tendermint/rpc/coretypes" +) + +// Client is the subset of the RPC client interface consumed by Stream. +type Client interface { + Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) +} + +// ErrStopRunning is returned by a Run callback to signal that no more events +// are wanted and that Run should return. +var ErrStopRunning = errors.New("stop accepting events") + +// A Stream cpatures the state of a streaming event subscription. +type Stream struct { + filter *coretypes.EventFilter // the query being streamed + batchSize int // request batch size + newestSeen string // from the latest item matching our query + waitTime time.Duration // the long-polling interval + client Client +} + +// New constructs a new stream for the given query and options. +// If opts == nil, the stream uses default values as described by +// StreamOptions. This function will panic if cli == nil. +func New(cli Client, query string, opts *StreamOptions) *Stream { + if cli == nil { + panic("eventstream: nil client") + } + return &Stream{ + filter: &coretypes.EventFilter{Query: query}, + batchSize: opts.batchSize(), + newestSeen: opts.resumeFrom(), + waitTime: opts.waitTime(), + client: cli, + } +} + +// Run polls the service for events matching the query, and calls accept for +// each such event. Run handles pagination transparently, and delivers events +// to accept in order of publication. +// +// Run continues until ctx ends or accept reports an error. If accept returns +// ErrStopRunning, Run returns nil; otherwise Run returns the error reported by +// accept or ctx. Run also returns an error if the server reports an error +// from the Events method. +// +// If the stream falls behind the event log on the server, Run will stop and +// report an error of concrete type *MissedItemsError. Call Reset to reset the +// stream to the head of the log, and call Run again to resume. +func (s *Stream) Run(ctx context.Context, accept func(*coretypes.EventItem) error) error { + for { + items, err := s.fetchPages(ctx) + if err != nil { + return err + } + + // Deliver events from the current batch to the receiver. We visit the + // batch in reverse order so the receiver sees them in forward order. + for i := len(items) - 1; i >= 0; i-- { + if err := ctx.Err(); err != nil { + return err + } + + itm := items[i] + err := accept(itm) + if itm.Cursor > s.newestSeen { + s.newestSeen = itm.Cursor // update the latest delivered + } + if errors.Is(err, ErrStopRunning) { + return nil + } else if err != nil { + return err + } + } + } +} + +// Reset updates the stream's current cursor position to the head of the log. +// This method may safely be called only when Run is not executing. +func (s *Stream) Reset() { s.newestSeen = "" } + +// fetchPages fetches the next batch of matching results. If there are multiple +// pages, all the matching pages are retrieved. An error is reported if the +// current scan position falls out of the event log window. +func (s *Stream) fetchPages(ctx context.Context) ([]*coretypes.EventItem, error) { + var pageCursor string // if non-empty, page through items before this + var items []*coretypes.EventItem + + // Fetch the next paginated batch of matching responses. + for { + rsp, err := s.client.Events(ctx, &coretypes.RequestEvents{ + Filter: s.filter, + MaxItems: s.batchSize, + After: s.newestSeen, + Before: pageCursor, + WaitTime: s.waitTime, + }) + if err != nil { + return nil, err + } + + // If the oldest item in the log is newer than our most recent item, + // it means we might have missed some events matching our query. + if s.newestSeen != "" && s.newestSeen < rsp.Oldest { + return nil, &MissedItemsError{ + Query: s.filter.Query, + NewestSeen: s.newestSeen, + OldestPresent: rsp.Oldest, + } + } + items = append(items, rsp.Items...) + + if rsp.More { + // There are more results matching this request, leave the baseline + // where it is and set the page cursor so that subsequent requests + // will get the next chunk. + pageCursor = items[len(items)-1].Cursor + } else if len(items) != 0 { + // We got everything matching so far. + return items, nil + } + } +} + +// StreamOptions are optional settings for a Stream value. A nil *StreamOptions +// is ready for use and provides default values as described. +type StreamOptions struct { + // How many items to request per call to the service. The stream may pin + // this value to a minimum default batch size. + BatchSize int + + // If set, resume streaming from this cursor. Typically this is set to the + // cursor of the most recently-received matching value. If empty, streaming + // begins at the head of the log (the default). + ResumeFrom string + + // Specifies the long poll interval. The stream may pin this value to a + // minimum default poll interval. + WaitTime time.Duration +} + +func (o *StreamOptions) batchSize() int { + const minBatchSize = 16 + if o == nil || o.BatchSize < minBatchSize { + return minBatchSize + } + return o.BatchSize +} + +func (o *StreamOptions) resumeFrom() string { + if o == nil { + return "" + } + return o.ResumeFrom +} + +func (o *StreamOptions) waitTime() time.Duration { + const minWaitTime = 5 * time.Second + if o == nil || o.WaitTime < minWaitTime { + return minWaitTime + } + return o.WaitTime +} + +// MissedItemsError is an error that indicates the stream missed (lost) some +// number of events matching the specified query. +type MissedItemsError struct { + // The cursor of the newest matching item the stream has observed. + NewestSeen string + + // The oldest cursor in the log at the point the miss was detected. + // Any matching events between NewestSeen and OldestPresent are lost. + OldestPresent string + + // The active query. + Query string +} + +// Error satisfies the error interface. +func (e *MissedItemsError) Error() string { + return fmt.Sprintf("missed events matching %q between %q and %q", + e.Query, e.NewestSeen, e.OldestPresent) +} diff --git a/rpc/client/eventstream/eventstream_test.go b/rpc/client/eventstream/eventstream_test.go new file mode 100644 index 000000000..110dc8a90 --- /dev/null +++ b/rpc/client/eventstream/eventstream_test.go @@ -0,0 +1,225 @@ +package eventstream_test + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/fortytw2/leaktest" + "github.com/google/go-cmp/cmp" + + "github.com/tendermint/tendermint/internal/eventlog" + "github.com/tendermint/tendermint/internal/eventlog/cursor" + rpccore "github.com/tendermint/tendermint/internal/rpc/core" + "github.com/tendermint/tendermint/rpc/client/eventstream" + "github.com/tendermint/tendermint/rpc/coretypes" + "github.com/tendermint/tendermint/types" +) + +func TestStream_filterOrder(t *testing.T) { + defer leaktest.Check(t) + + s := newStreamTester(t, `tm.event = 'good'`, eventlog.LogSettings{ + WindowSize: 30 * time.Second, + }, nil) + + // Verify that events are delivered in forward time order (i.e., that the + // stream unpacks the pages correctly) and that events not matching the + // query (here, type="bad") are skipped. + // + // The minimum batch size is 16 and half the events we publish match, so we + // publish > 32 items (> 16 good) to ensure we exercise paging. + etype := [2]string{"good", "bad"} + var items []testItem + for i := 0; i < 40; i++ { + s.advance(100 * time.Millisecond) + text := fmt.Sprintf("item%d", i) + cur := s.publish(etype[i%2], text) + + // Even-numbered items match the target type. + if i%2 == 0 { + items = append(items, makeTestItem(cur, text)) + } + } + + s.start() + for _, itm := range items { + s.mustItem(t, itm) + } + s.stopWait() +} + +func TestStream_lostItem(t *testing.T) { + defer leaktest.Check(t) + + s := newStreamTester(t, ``, eventlog.LogSettings{ + WindowSize: 30 * time.Second, + }, nil) + + // Publish an item and let the client observe it. + cur := s.publish("ok", "whatever") + s.start() + s.mustItem(t, makeTestItem(cur, "whatever")) + s.stopWait() + + // Time passes, and cur expires out of the window. + s.advance(50 * time.Second) + next1 := s.publish("ok", "more stuff") + s.advance(15 * time.Second) + next2 := s.publish("ok", "still more stuff") + + // At this point, the oldest item in the log is newer than the point at + // which we continued, we should get an error. + s.start() + var missed *eventstream.MissedItemsError + if err := s.mustError(t); !errors.As(err, &missed) { + t.Errorf("Wrong error: got %v, want %T", err, missed) + } else { + t.Logf("Correctly reported missed item: %v", missed) + } + + // If we reset the stream and continue from head, we should catch up. + s.stopWait() + s.stream.Reset() + s.start() + + s.mustItem(t, makeTestItem(next1, "more stuff")) + s.mustItem(t, makeTestItem(next2, "still more stuff")) + s.stopWait() +} + +// testItem is a wrapper for comparing item results in a friendly output format +// for the cmp package. +type testItem struct { + Cursor string + Data string + + // N.B. Fields exported to simplify use in cmp. +} + +func makeTestItem(cur, data string) testItem { + return testItem{ + Cursor: cur, + Data: fmt.Sprintf(`{"type":%q,"value":%q}`, types.EventDataString("").TypeTag(), data), + } +} + +// streamTester is a simulation harness for an eventstream.Stream. It simulates +// the production service by plumbing an event log into a stub RPC environment, +// into which the test can publish events and advance the perceived time to +// exercise various cases of the stream. +type streamTester struct { + log *eventlog.Log + env *rpccore.Environment + clock int64 + index int64 + stream *eventstream.Stream + errc chan error + recv chan *coretypes.EventItem + stop func() +} + +func newStreamTester(t *testing.T, query string, logOpts eventlog.LogSettings, streamOpts *eventstream.StreamOptions) *streamTester { + t.Helper() + s := new(streamTester) + + // Plumb a time source controlled by the tester into the event log. + logOpts.Source = cursor.Source{ + TimeIndex: s.timeNow, + } + lg, err := eventlog.New(logOpts) + if err != nil { + t.Fatalf("Creating event log: %v", err) + } + s.log = lg + s.env = &rpccore.Environment{EventLog: lg} + s.stream = eventstream.New(s, query, streamOpts) + return s +} + +// start starts the stream receiver, which runs until it it terminated by +// calling stop. +func (s *streamTester) start() { + ctx, cancel := context.WithCancel(context.Background()) + s.errc = make(chan error, 1) + s.recv = make(chan *coretypes.EventItem) + s.stop = cancel + go func() { + defer close(s.errc) + s.errc <- s.stream.Run(ctx, func(itm *coretypes.EventItem) error { + select { + case <-ctx.Done(): + return ctx.Err() + case s.recv <- itm: + return nil + } + }) + }() +} + +// publish adds a single event to the event log at the present moment. +func (s *streamTester) publish(etype, payload string) string { + _ = s.log.Add(etype, types.EventDataString(payload)) + s.index++ + return fmt.Sprintf("%016x-%04x", s.clock, s.index) +} + +// wait blocks until either an item is received or the runner stops. +func (s *streamTester) wait() (*coretypes.EventItem, error) { + select { + case itm := <-s.recv: + return itm, nil + case err := <-s.errc: + return nil, err + } +} + +// mustItem waits for an item and fails if either an error occurs or the item +// does not match want. +func (s *streamTester) mustItem(t *testing.T, want testItem) { + t.Helper() + + itm, err := s.wait() + if err != nil { + t.Fatalf("Receive: got error %v, want item %v", err, want) + } + got := testItem{Cursor: itm.Cursor, Data: string(itm.Data)} + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("Item: (-want, +got)\n%s", diff) + } +} + +// mustError waits for an error and fails if an item is returned. +func (s *streamTester) mustError(t *testing.T) error { + t.Helper() + itm, err := s.wait() + if err == nil { + t.Fatalf("Receive: got item %v, want error", itm) + } + return err +} + +// stopWait stops the runner and waits for it to terminate. +func (s *streamTester) stopWait() { s.stop(); s.wait() } //nolint:errcheck + +// timeNow reports the current simulated time index. +func (s *streamTester) timeNow() int64 { return s.clock } + +// advance moves the simulated time index. +func (s *streamTester) advance(d time.Duration) { s.clock += int64(d) } + +// Events implements the eventstream.Client interface by delegating to a stub +// environment as if it were a local RPC client. This works because the Events +// method only requires the event log, the other fields are unused. +func (s *streamTester) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) { + var before, after cursor.Cursor + if err := before.UnmarshalText([]byte(req.Before)); err != nil { + return nil, err + } + if err := after.UnmarshalText([]byte(req.After)); err != nil { + return nil, err + } + return s.env.Events(ctx, req.Filter, req.MaxItems, before, after, req.WaitTime) +} diff --git a/rpc/coretypes/responses.go b/rpc/coretypes/responses.go index 9f86742de..7aaf7552c 100644 --- a/rpc/coretypes/responses.go +++ b/rpc/coretypes/responses.go @@ -364,7 +364,7 @@ type RequestEvents struct { // The maximum number of eligible items to return. // If zero or negative, the server will report a default number. - MaxItems int `json:"max_items"` + MaxItems int `json:"maxItems"` // Return only items after this cursor. If empty, the limit is just // before the the beginning of the event log. @@ -375,7 +375,7 @@ type RequestEvents struct { Before string `json:"before"` // Wait for up to this long for events to be available. - WaitTime time.Duration `json:"wait_time"` + WaitTime time.Duration `json:"waitTime"` } // An EventFilter specifies which events are selected by an /events request.