diff --git a/docs/architecture/adr-075-rpc-subscription.md b/docs/architecture/adr-075-rpc-subscription.md index f2c4dcf12..1ca48e712 100644 --- a/docs/architecture/adr-075-rpc-subscription.md +++ b/docs/architecture/adr-075-rpc-subscription.md @@ -2,6 +2,7 @@ ## Changelog +- 01-Mar-2022: Update long-polling interface (@creachadair). - 10-Feb-2022: Updates to reflect implementation. - 26-Jan-2022: Marked accepted. - 22-Jan-2022: Updated and expanded (@creachadair). @@ -347,8 +348,8 @@ limit. The `wait_time` parameter is used to effect polling. If `before` is empty and no items are available, the server will wait for up to `wait_time` for matching -items to arrive at the head of the log. If `wait_time` is zero, the server will -return whatever eligible items are available immediately. +items to arrive at the head of the log. If `wait_time` is zero or negative, the +server will wait for a default (positive) interval. If `before` non-empty, `wait_time` is ignored: new results are only added to the head of the log, so there is no need to wait. This allows the client to diff --git a/internal/rpc/core/events.go b/internal/rpc/core/events.go index bc21fadc6..4e0d2ac8a 100644 --- a/internal/rpc/core/events.go +++ b/internal/rpc/core/events.go @@ -165,8 +165,11 @@ func (env *Environment) Events(ctx context.Context, maxItems = 100 } + const minWaitTime = 1 * time.Second const maxWaitTime = 30 * time.Second - if waitTime > maxWaitTime { + if waitTime < minWaitTime { + waitTime = minWaitTime + } else if waitTime > maxWaitTime { waitTime = maxWaitTime } @@ -185,7 +188,7 @@ func (env *Environment) Events(ctx context.Context, accept := func(itm *eventlog.Item) error { // N.B. We accept up to one item more than requested, so we can tell how // to set the "more" flag in the response. - if len(items) > maxItems { + if len(items) > maxItems || itm.Cursor.Before(after) { return eventlog.ErrStopScan } if cursorInRange(itm.Cursor, before, after) && query.Matches(itm.Events) { @@ -194,7 +197,7 @@ func (env *Environment) Events(ctx context.Context, return nil } - if waitTime > 0 && before.IsZero() { + if before.IsZero() { ctx, cancel := context.WithTimeout(ctx, waitTime) defer cancel() diff --git a/rpc/client/eventstream/eventstream_test.go b/rpc/client/eventstream/eventstream_test.go index 110dc8a90..ca27734e2 100644 --- a/rpc/client/eventstream/eventstream_test.go +++ b/rpc/client/eventstream/eventstream_test.go @@ -90,6 +90,55 @@ func TestStream_lostItem(t *testing.T) { s.stopWait() } +func TestMinPollTime(t *testing.T) { + defer leaktest.Check(t) + + s := newStreamTester(t, ``, eventlog.LogSettings{ + WindowSize: 30 * time.Second, + }, nil) + + s.publish("bad", "whatever") + + // Waiting for an item on a log with no matching events incurs a minimum + // wait time and reports no events. + ctx := context.Background() + filter := &coretypes.EventFilter{Query: `tm.event = 'good'`} + var zero cursor.Cursor + + t.Run("NoneMatch", func(t *testing.T) { + start := time.Now() + + // Request a very short delay, and affirm we got the server's minimum. + rsp, err := s.env.Events(ctx, filter, 1, zero, zero, 10*time.Millisecond) + if err != nil { + t.Fatalf("Events failed: %v", err) + } else if elapsed := time.Since(start); elapsed < time.Second { + t.Errorf("Events returned too quickly: got %v, wanted 1s", elapsed) + } else if len(rsp.Items) != 0 { + t.Errorf("Events returned %d items, expected none", len(rsp.Items)) + } + }) + + s.publish("good", "whatever") + + // Waiting for an available matching item incurs no delay. + t.Run("SomeMatch", func(t *testing.T) { + start := time.Now() + + // Request a long-ish delay and affirm we don't block for it. + // Check for this by ensuring we return sooner than the minimum delay, + // since we don't know the exact timing. + rsp, err := s.env.Events(ctx, filter, 1, zero, zero, 10*time.Second) + if err != nil { + t.Fatalf("Events failed: %v", err) + } else if elapsed := time.Since(start); elapsed > 500*time.Millisecond { + t.Errorf("Events returned too slowly: got %v, wanted immediate", elapsed) + } else if len(rsp.Items) == 0 { + t.Error("Events returned no items, wanted at least 1") + } + }) +} + // testItem is a wrapper for comparing item results in a friendly output format // for the cmp package. type testItem struct {