Browse Source

rpc: set a minimum long-polling interval for Events (#8050)

Since the goal of reading events at the head of the event log is to satisfy a
subscription style interface, there is no point in allowing head polling with
no wait interval. The pagination case already bypasses long polling, so the
extra option is unneessary.

Set a minimum default long-polling interval for the head case.
Add a test for minimum delay.
pull/8054/head
M. J. Fromberger 3 years ago
committed by GitHub
parent
commit
af96ef2fe4
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 5 deletions
  1. +3
    -2
      docs/architecture/adr-075-rpc-subscription.md
  2. +6
    -3
      internal/rpc/core/events.go
  3. +49
    -0
      rpc/client/eventstream/eventstream_test.go

+ 3
- 2
docs/architecture/adr-075-rpc-subscription.md View File

@ -2,6 +2,7 @@
## Changelog ## Changelog
- 01-Mar-2022: Update long-polling interface (@creachadair).
- 10-Feb-2022: Updates to reflect implementation. - 10-Feb-2022: Updates to reflect implementation.
- 26-Jan-2022: Marked accepted. - 26-Jan-2022: Marked accepted.
- 22-Jan-2022: Updated and expanded (@creachadair). - 22-Jan-2022: Updated and expanded (@creachadair).
@ -347,8 +348,8 @@ limit.
The `wait_time` parameter is used to effect polling. If `before` is empty and The `wait_time` parameter is used to effect polling. If `before` is empty and
no items are available, the server will wait for up to `wait_time` for matching no items are available, the server will wait for up to `wait_time` for matching
items to arrive at the head of the log. If `wait_time` is zero, the server will
return whatever eligible items are available immediately.
items to arrive at the head of the log. If `wait_time` is zero or negative, the
server will wait for a default (positive) interval.
If `before` non-empty, `wait_time` is ignored: new results are only added to If `before` non-empty, `wait_time` is ignored: new results are only added to
the head of the log, so there is no need to wait. This allows the client to the head of the log, so there is no need to wait. This allows the client to


+ 6
- 3
internal/rpc/core/events.go View File

@ -165,8 +165,11 @@ func (env *Environment) Events(ctx context.Context,
maxItems = 100 maxItems = 100
} }
const minWaitTime = 1 * time.Second
const maxWaitTime = 30 * time.Second const maxWaitTime = 30 * time.Second
if waitTime > maxWaitTime {
if waitTime < minWaitTime {
waitTime = minWaitTime
} else if waitTime > maxWaitTime {
waitTime = maxWaitTime waitTime = maxWaitTime
} }
@ -185,7 +188,7 @@ func (env *Environment) Events(ctx context.Context,
accept := func(itm *eventlog.Item) error { accept := func(itm *eventlog.Item) error {
// N.B. We accept up to one item more than requested, so we can tell how // N.B. We accept up to one item more than requested, so we can tell how
// to set the "more" flag in the response. // to set the "more" flag in the response.
if len(items) > maxItems {
if len(items) > maxItems || itm.Cursor.Before(after) {
return eventlog.ErrStopScan return eventlog.ErrStopScan
} }
if cursorInRange(itm.Cursor, before, after) && query.Matches(itm.Events) { if cursorInRange(itm.Cursor, before, after) && query.Matches(itm.Events) {
@ -194,7 +197,7 @@ func (env *Environment) Events(ctx context.Context,
return nil return nil
} }
if waitTime > 0 && before.IsZero() {
if before.IsZero() {
ctx, cancel := context.WithTimeout(ctx, waitTime) ctx, cancel := context.WithTimeout(ctx, waitTime)
defer cancel() defer cancel()


+ 49
- 0
rpc/client/eventstream/eventstream_test.go View File

@ -90,6 +90,55 @@ func TestStream_lostItem(t *testing.T) {
s.stopWait() s.stopWait()
} }
func TestMinPollTime(t *testing.T) {
defer leaktest.Check(t)
s := newStreamTester(t, ``, eventlog.LogSettings{
WindowSize: 30 * time.Second,
}, nil)
s.publish("bad", "whatever")
// Waiting for an item on a log with no matching events incurs a minimum
// wait time and reports no events.
ctx := context.Background()
filter := &coretypes.EventFilter{Query: `tm.event = 'good'`}
var zero cursor.Cursor
t.Run("NoneMatch", func(t *testing.T) {
start := time.Now()
// Request a very short delay, and affirm we got the server's minimum.
rsp, err := s.env.Events(ctx, filter, 1, zero, zero, 10*time.Millisecond)
if err != nil {
t.Fatalf("Events failed: %v", err)
} else if elapsed := time.Since(start); elapsed < time.Second {
t.Errorf("Events returned too quickly: got %v, wanted 1s", elapsed)
} else if len(rsp.Items) != 0 {
t.Errorf("Events returned %d items, expected none", len(rsp.Items))
}
})
s.publish("good", "whatever")
// Waiting for an available matching item incurs no delay.
t.Run("SomeMatch", func(t *testing.T) {
start := time.Now()
// Request a long-ish delay and affirm we don't block for it.
// Check for this by ensuring we return sooner than the minimum delay,
// since we don't know the exact timing.
rsp, err := s.env.Events(ctx, filter, 1, zero, zero, 10*time.Second)
if err != nil {
t.Fatalf("Events failed: %v", err)
} else if elapsed := time.Since(start); elapsed > 500*time.Millisecond {
t.Errorf("Events returned too slowly: got %v, wanted immediate", elapsed)
} else if len(rsp.Items) == 0 {
t.Error("Events returned no items, wanted at least 1")
}
})
}
// testItem is a wrapper for comparing item results in a friendly output format // testItem is a wrapper for comparing item results in a friendly output format
// for the cmp package. // for the cmp package.
type testItem struct { type testItem struct {


Loading…
Cancel
Save