This allows the caller to stream events. It handles the bookkeeping for cursors and pagination, and delivers items to a callback. Handle missed items by reporting a structured error. The caller can use the Reset method to "catch up" to head after this happens. Add a manual test CLI to probe a running node. Requires the node to be configured with the event log settings. Add a unit test that scripts input to the stream to exercise it.pull/8015/head
@ -0,0 +1,194 @@ | |||
// Package eventstream implements a convenience client for the Events method | |||
// of the Tendermint RPC service, allowing clients to observe a resumable | |||
// stream of events matching a query. | |||
package eventstream | |||
import ( | |||
"context" | |||
"errors" | |||
"fmt" | |||
"time" | |||
"github.com/tendermint/tendermint/rpc/coretypes" | |||
) | |||
// Client is the subset of the RPC client interface consumed by Stream. | |||
type Client interface { | |||
Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) | |||
} | |||
// ErrStopRunning is returned by a Run callback to signal that no more events | |||
// are wanted and that Run should return. | |||
var ErrStopRunning = errors.New("stop accepting events") | |||
// A Stream cpatures the state of a streaming event subscription. | |||
type Stream struct { | |||
filter *coretypes.EventFilter // the query being streamed | |||
batchSize int // request batch size | |||
newestSeen string // from the latest item matching our query | |||
waitTime time.Duration // the long-polling interval | |||
client Client | |||
} | |||
// New constructs a new stream for the given query and options. | |||
// If opts == nil, the stream uses default values as described by | |||
// StreamOptions. This function will panic if cli == nil. | |||
func New(cli Client, query string, opts *StreamOptions) *Stream { | |||
if cli == nil { | |||
panic("eventstream: nil client") | |||
} | |||
return &Stream{ | |||
filter: &coretypes.EventFilter{Query: query}, | |||
batchSize: opts.batchSize(), | |||
newestSeen: opts.resumeFrom(), | |||
waitTime: opts.waitTime(), | |||
client: cli, | |||
} | |||
} | |||
// Run polls the service for events matching the query, and calls accept for | |||
// each such event. Run handles pagination transparently, and delivers events | |||
// to accept in order of publication. | |||
// | |||
// Run continues until ctx ends or accept reports an error. If accept returns | |||
// ErrStopRunning, Run returns nil; otherwise Run returns the error reported by | |||
// accept or ctx. Run also returns an error if the server reports an error | |||
// from the Events method. | |||
// | |||
// If the stream falls behind the event log on the server, Run will stop and | |||
// report an error of concrete type *MissedItemsError. Call Reset to reset the | |||
// stream to the head of the log, and call Run again to resume. | |||
func (s *Stream) Run(ctx context.Context, accept func(*coretypes.EventItem) error) error { | |||
for { | |||
items, err := s.fetchPages(ctx) | |||
if err != nil { | |||
return err | |||
} | |||
// Deliver events from the current batch to the receiver. We visit the | |||
// batch in reverse order so the receiver sees them in forward order. | |||
for i := len(items) - 1; i >= 0; i-- { | |||
if err := ctx.Err(); err != nil { | |||
return err | |||
} | |||
itm := items[i] | |||
err := accept(itm) | |||
if itm.Cursor > s.newestSeen { | |||
s.newestSeen = itm.Cursor // update the latest delivered | |||
} | |||
if errors.Is(err, ErrStopRunning) { | |||
return nil | |||
} else if err != nil { | |||
return err | |||
} | |||
} | |||
} | |||
} | |||
// Reset updates the stream's current cursor position to the head of the log. | |||
// This method may safely be called only when Run is not executing. | |||
func (s *Stream) Reset() { s.newestSeen = "" } | |||
// fetchPages fetches the next batch of matching results. If there are multiple | |||
// pages, all the matching pages are retrieved. An error is reported if the | |||
// current scan position falls out of the event log window. | |||
func (s *Stream) fetchPages(ctx context.Context) ([]*coretypes.EventItem, error) { | |||
var pageCursor string // if non-empty, page through items before this | |||
var items []*coretypes.EventItem | |||
// Fetch the next paginated batch of matching responses. | |||
for { | |||
rsp, err := s.client.Events(ctx, &coretypes.RequestEvents{ | |||
Filter: s.filter, | |||
MaxItems: s.batchSize, | |||
After: s.newestSeen, | |||
Before: pageCursor, | |||
WaitTime: s.waitTime, | |||
}) | |||
if err != nil { | |||
return nil, err | |||
} | |||
// If the oldest item in the log is newer than our most recent item, | |||
// it means we might have missed some events matching our query. | |||
if s.newestSeen != "" && s.newestSeen < rsp.Oldest { | |||
return nil, &MissedItemsError{ | |||
Query: s.filter.Query, | |||
NewestSeen: s.newestSeen, | |||
OldestPresent: rsp.Oldest, | |||
} | |||
} | |||
items = append(items, rsp.Items...) | |||
if rsp.More { | |||
// There are more results matching this request, leave the baseline | |||
// where it is and set the page cursor so that subsequent requests | |||
// will get the next chunk. | |||
pageCursor = items[len(items)-1].Cursor | |||
} else if len(items) != 0 { | |||
// We got everything matching so far. | |||
return items, nil | |||
} | |||
} | |||
} | |||
// StreamOptions are optional settings for a Stream value. A nil *StreamOptions | |||
// is ready for use and provides default values as described. | |||
type StreamOptions struct { | |||
// How many items to request per call to the service. The stream may pin | |||
// this value to a minimum default batch size. | |||
BatchSize int | |||
// If set, resume streaming from this cursor. Typically this is set to the | |||
// cursor of the most recently-received matching value. If empty, streaming | |||
// begins at the head of the log (the default). | |||
ResumeFrom string | |||
// Specifies the long poll interval. The stream may pin this value to a | |||
// minimum default poll interval. | |||
WaitTime time.Duration | |||
} | |||
func (o *StreamOptions) batchSize() int { | |||
const minBatchSize = 16 | |||
if o == nil || o.BatchSize < minBatchSize { | |||
return minBatchSize | |||
} | |||
return o.BatchSize | |||
} | |||
func (o *StreamOptions) resumeFrom() string { | |||
if o == nil { | |||
return "" | |||
} | |||
return o.ResumeFrom | |||
} | |||
func (o *StreamOptions) waitTime() time.Duration { | |||
const minWaitTime = 5 * time.Second | |||
if o == nil || o.WaitTime < minWaitTime { | |||
return minWaitTime | |||
} | |||
return o.WaitTime | |||
} | |||
// MissedItemsError is an error that indicates the stream missed (lost) some | |||
// number of events matching the specified query. | |||
type MissedItemsError struct { | |||
// The cursor of the newest matching item the stream has observed. | |||
NewestSeen string | |||
// The oldest cursor in the log at the point the miss was detected. | |||
// Any matching events between NewestSeen and OldestPresent are lost. | |||
OldestPresent string | |||
// The active query. | |||
Query string | |||
} | |||
// Error satisfies the error interface. | |||
func (e *MissedItemsError) Error() string { | |||
return fmt.Sprintf("missed events matching %q between %q and %q", | |||
e.Query, e.NewestSeen, e.OldestPresent) | |||
} |
@ -0,0 +1,225 @@ | |||
package eventstream_test | |||
import ( | |||
"context" | |||
"errors" | |||
"fmt" | |||
"testing" | |||
"time" | |||
"github.com/fortytw2/leaktest" | |||
"github.com/google/go-cmp/cmp" | |||
"github.com/tendermint/tendermint/internal/eventlog" | |||
"github.com/tendermint/tendermint/internal/eventlog/cursor" | |||
rpccore "github.com/tendermint/tendermint/internal/rpc/core" | |||
"github.com/tendermint/tendermint/rpc/client/eventstream" | |||
"github.com/tendermint/tendermint/rpc/coretypes" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
func TestStream_filterOrder(t *testing.T) { | |||
defer leaktest.Check(t) | |||
s := newStreamTester(t, `tm.event = 'good'`, eventlog.LogSettings{ | |||
WindowSize: 30 * time.Second, | |||
}, nil) | |||
// Verify that events are delivered in forward time order (i.e., that the | |||
// stream unpacks the pages correctly) and that events not matching the | |||
// query (here, type="bad") are skipped. | |||
// | |||
// The minimum batch size is 16 and half the events we publish match, so we | |||
// publish > 32 items (> 16 good) to ensure we exercise paging. | |||
etype := [2]string{"good", "bad"} | |||
var items []testItem | |||
for i := 0; i < 40; i++ { | |||
s.advance(100 * time.Millisecond) | |||
text := fmt.Sprintf("item%d", i) | |||
cur := s.publish(etype[i%2], text) | |||
// Even-numbered items match the target type. | |||
if i%2 == 0 { | |||
items = append(items, makeTestItem(cur, text)) | |||
} | |||
} | |||
s.start() | |||
for _, itm := range items { | |||
s.mustItem(t, itm) | |||
} | |||
s.stopWait() | |||
} | |||
func TestStream_lostItem(t *testing.T) { | |||
defer leaktest.Check(t) | |||
s := newStreamTester(t, ``, eventlog.LogSettings{ | |||
WindowSize: 30 * time.Second, | |||
}, nil) | |||
// Publish an item and let the client observe it. | |||
cur := s.publish("ok", "whatever") | |||
s.start() | |||
s.mustItem(t, makeTestItem(cur, "whatever")) | |||
s.stopWait() | |||
// Time passes, and cur expires out of the window. | |||
s.advance(50 * time.Second) | |||
next1 := s.publish("ok", "more stuff") | |||
s.advance(15 * time.Second) | |||
next2 := s.publish("ok", "still more stuff") | |||
// At this point, the oldest item in the log is newer than the point at | |||
// which we continued, we should get an error. | |||
s.start() | |||
var missed *eventstream.MissedItemsError | |||
if err := s.mustError(t); !errors.As(err, &missed) { | |||
t.Errorf("Wrong error: got %v, want %T", err, missed) | |||
} else { | |||
t.Logf("Correctly reported missed item: %v", missed) | |||
} | |||
// If we reset the stream and continue from head, we should catch up. | |||
s.stopWait() | |||
s.stream.Reset() | |||
s.start() | |||
s.mustItem(t, makeTestItem(next1, "more stuff")) | |||
s.mustItem(t, makeTestItem(next2, "still more stuff")) | |||
s.stopWait() | |||
} | |||
// testItem is a wrapper for comparing item results in a friendly output format | |||
// for the cmp package. | |||
type testItem struct { | |||
Cursor string | |||
Data string | |||
// N.B. Fields exported to simplify use in cmp. | |||
} | |||
func makeTestItem(cur, data string) testItem { | |||
return testItem{ | |||
Cursor: cur, | |||
Data: fmt.Sprintf(`{"type":%q,"value":%q}`, types.EventDataString("").TypeTag(), data), | |||
} | |||
} | |||
// streamTester is a simulation harness for an eventstream.Stream. It simulates | |||
// the production service by plumbing an event log into a stub RPC environment, | |||
// into which the test can publish events and advance the perceived time to | |||
// exercise various cases of the stream. | |||
type streamTester struct { | |||
log *eventlog.Log | |||
env *rpccore.Environment | |||
clock int64 | |||
index int64 | |||
stream *eventstream.Stream | |||
errc chan error | |||
recv chan *coretypes.EventItem | |||
stop func() | |||
} | |||
func newStreamTester(t *testing.T, query string, logOpts eventlog.LogSettings, streamOpts *eventstream.StreamOptions) *streamTester { | |||
t.Helper() | |||
s := new(streamTester) | |||
// Plumb a time source controlled by the tester into the event log. | |||
logOpts.Source = cursor.Source{ | |||
TimeIndex: s.timeNow, | |||
} | |||
lg, err := eventlog.New(logOpts) | |||
if err != nil { | |||
t.Fatalf("Creating event log: %v", err) | |||
} | |||
s.log = lg | |||
s.env = &rpccore.Environment{EventLog: lg} | |||
s.stream = eventstream.New(s, query, streamOpts) | |||
return s | |||
} | |||
// start starts the stream receiver, which runs until it it terminated by | |||
// calling stop. | |||
func (s *streamTester) start() { | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
s.errc = make(chan error, 1) | |||
s.recv = make(chan *coretypes.EventItem) | |||
s.stop = cancel | |||
go func() { | |||
defer close(s.errc) | |||
s.errc <- s.stream.Run(ctx, func(itm *coretypes.EventItem) error { | |||
select { | |||
case <-ctx.Done(): | |||
return ctx.Err() | |||
case s.recv <- itm: | |||
return nil | |||
} | |||
}) | |||
}() | |||
} | |||
// publish adds a single event to the event log at the present moment. | |||
func (s *streamTester) publish(etype, payload string) string { | |||
_ = s.log.Add(etype, types.EventDataString(payload)) | |||
s.index++ | |||
return fmt.Sprintf("%016x-%04x", s.clock, s.index) | |||
} | |||
// wait blocks until either an item is received or the runner stops. | |||
func (s *streamTester) wait() (*coretypes.EventItem, error) { | |||
select { | |||
case itm := <-s.recv: | |||
return itm, nil | |||
case err := <-s.errc: | |||
return nil, err | |||
} | |||
} | |||
// mustItem waits for an item and fails if either an error occurs or the item | |||
// does not match want. | |||
func (s *streamTester) mustItem(t *testing.T, want testItem) { | |||
t.Helper() | |||
itm, err := s.wait() | |||
if err != nil { | |||
t.Fatalf("Receive: got error %v, want item %v", err, want) | |||
} | |||
got := testItem{Cursor: itm.Cursor, Data: string(itm.Data)} | |||
if diff := cmp.Diff(want, got); diff != "" { | |||
t.Errorf("Item: (-want, +got)\n%s", diff) | |||
} | |||
} | |||
// mustError waits for an error and fails if an item is returned. | |||
func (s *streamTester) mustError(t *testing.T) error { | |||
t.Helper() | |||
itm, err := s.wait() | |||
if err == nil { | |||
t.Fatalf("Receive: got item %v, want error", itm) | |||
} | |||
return err | |||
} | |||
// stopWait stops the runner and waits for it to terminate. | |||
func (s *streamTester) stopWait() { s.stop(); s.wait() } //nolint:errcheck | |||
// timeNow reports the current simulated time index. | |||
func (s *streamTester) timeNow() int64 { return s.clock } | |||
// advance moves the simulated time index. | |||
func (s *streamTester) advance(d time.Duration) { s.clock += int64(d) } | |||
// Events implements the eventstream.Client interface by delegating to a stub | |||
// environment as if it were a local RPC client. This works because the Events | |||
// method only requires the event log, the other fields are unused. | |||
func (s *streamTester) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) { | |||
var before, after cursor.Cursor | |||
if err := before.UnmarshalText([]byte(req.Before)); err != nil { | |||
return nil, err | |||
} | |||
if err := after.UnmarshalText([]byte(req.After)); err != nil { | |||
return nil, err | |||
} | |||
return s.env.Events(ctx, req.Filter, req.MaxItems, before, after, req.WaitTime) | |||
} |