You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

225 lines
6.4 KiB

package eventstream_test
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/google/go-cmp/cmp"
"github.com/tendermint/tendermint/internal/eventlog"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
rpccore "github.com/tendermint/tendermint/internal/rpc/core"
"github.com/tendermint/tendermint/rpc/client/eventstream"
"github.com/tendermint/tendermint/rpc/coretypes"
"github.com/tendermint/tendermint/types"
)
func TestStream_filterOrder(t *testing.T) {
defer leaktest.Check(t)
s := newStreamTester(t, `tm.event = 'good'`, eventlog.LogSettings{
WindowSize: 30 * time.Second,
}, nil)
// Verify that events are delivered in forward time order (i.e., that the
// stream unpacks the pages correctly) and that events not matching the
// query (here, type="bad") are skipped.
//
// The minimum batch size is 16 and half the events we publish match, so we
// publish > 32 items (> 16 good) to ensure we exercise paging.
etype := [2]string{"good", "bad"}
var items []testItem
for i := 0; i < 40; i++ {
s.advance(100 * time.Millisecond)
text := fmt.Sprintf("item%d", i)
cur := s.publish(etype[i%2], text)
// Even-numbered items match the target type.
if i%2 == 0 {
items = append(items, makeTestItem(cur, text))
}
}
s.start()
for _, itm := range items {
s.mustItem(t, itm)
}
s.stopWait()
}
func TestStream_lostItem(t *testing.T) {
defer leaktest.Check(t)
s := newStreamTester(t, ``, eventlog.LogSettings{
WindowSize: 30 * time.Second,
}, nil)
// Publish an item and let the client observe it.
cur := s.publish("ok", "whatever")
s.start()
s.mustItem(t, makeTestItem(cur, "whatever"))
s.stopWait()
// Time passes, and cur expires out of the window.
s.advance(50 * time.Second)
next1 := s.publish("ok", "more stuff")
s.advance(15 * time.Second)
next2 := s.publish("ok", "still more stuff")
// At this point, the oldest item in the log is newer than the point at
// which we continued, we should get an error.
s.start()
var missed *eventstream.MissedItemsError
if err := s.mustError(t); !errors.As(err, &missed) {
t.Errorf("Wrong error: got %v, want %T", err, missed)
} else {
t.Logf("Correctly reported missed item: %v", missed)
}
// If we reset the stream and continue from head, we should catch up.
s.stopWait()
s.stream.Reset()
s.start()
s.mustItem(t, makeTestItem(next1, "more stuff"))
s.mustItem(t, makeTestItem(next2, "still more stuff"))
s.stopWait()
}
// testItem is a wrapper for comparing item results in a friendly output format
// for the cmp package.
type testItem struct {
Cursor string
Data string
// N.B. Fields exported to simplify use in cmp.
}
func makeTestItem(cur, data string) testItem {
return testItem{
Cursor: cur,
Data: fmt.Sprintf(`{"type":%q,"value":%q}`, types.EventDataString("").TypeTag(), data),
}
}
// streamTester is a simulation harness for an eventstream.Stream. It simulates
// the production service by plumbing an event log into a stub RPC environment,
// into which the test can publish events and advance the perceived time to
// exercise various cases of the stream.
type streamTester struct {
log *eventlog.Log
env *rpccore.Environment
clock int64
index int64
stream *eventstream.Stream
errc chan error
recv chan *coretypes.EventItem
stop func()
}
func newStreamTester(t *testing.T, query string, logOpts eventlog.LogSettings, streamOpts *eventstream.StreamOptions) *streamTester {
t.Helper()
s := new(streamTester)
// Plumb a time source controlled by the tester into the event log.
logOpts.Source = cursor.Source{
TimeIndex: s.timeNow,
}
lg, err := eventlog.New(logOpts)
if err != nil {
t.Fatalf("Creating event log: %v", err)
}
s.log = lg
s.env = &rpccore.Environment{EventLog: lg}
s.stream = eventstream.New(s, query, streamOpts)
return s
}
// start starts the stream receiver, which runs until it it terminated by
// calling stop.
func (s *streamTester) start() {
ctx, cancel := context.WithCancel(context.Background())
s.errc = make(chan error, 1)
s.recv = make(chan *coretypes.EventItem)
s.stop = cancel
go func() {
defer close(s.errc)
s.errc <- s.stream.Run(ctx, func(itm *coretypes.EventItem) error {
select {
case <-ctx.Done():
return ctx.Err()
case s.recv <- itm:
return nil
}
})
}()
}
// publish adds a single event to the event log at the present moment.
func (s *streamTester) publish(etype, payload string) string {
_ = s.log.Add(etype, types.EventDataString(payload))
s.index++
return fmt.Sprintf("%016x-%04x", s.clock, s.index)
}
// wait blocks until either an item is received or the runner stops.
func (s *streamTester) wait() (*coretypes.EventItem, error) {
select {
case itm := <-s.recv:
return itm, nil
case err := <-s.errc:
return nil, err
}
}
// mustItem waits for an item and fails if either an error occurs or the item
// does not match want.
func (s *streamTester) mustItem(t *testing.T, want testItem) {
t.Helper()
itm, err := s.wait()
if err != nil {
t.Fatalf("Receive: got error %v, want item %v", err, want)
}
got := testItem{Cursor: itm.Cursor, Data: string(itm.Data)}
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("Item: (-want, +got)\n%s", diff)
}
}
// mustError waits for an error and fails if an item is returned.
func (s *streamTester) mustError(t *testing.T) error {
t.Helper()
itm, err := s.wait()
if err == nil {
t.Fatalf("Receive: got item %v, want error", itm)
}
return err
}
// stopWait stops the runner and waits for it to terminate.
func (s *streamTester) stopWait() { s.stop(); s.wait() } //nolint:errcheck
// timeNow reports the current simulated time index.
func (s *streamTester) timeNow() int64 { return s.clock }
// advance moves the simulated time index.
func (s *streamTester) advance(d time.Duration) { s.clock += int64(d) }
// Events implements the eventstream.Client interface by delegating to a stub
// environment as if it were a local RPC client. This works because the Events
// method only requires the event log, the other fields are unused.
func (s *streamTester) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) {
var before, after cursor.Cursor
if err := before.UnmarshalText([]byte(req.Before)); err != nil {
return nil, err
}
if err := after.UnmarshalText([]byte(req.After)); err != nil {
return nil, err
}
return s.env.Events(ctx, req.Filter, req.MaxItems, before, after, req.WaitTime)
}