- package pubsub_test
-
- import (
- "context"
- "errors"
- "fmt"
- "testing"
- "time"
-
- "github.com/stretchr/testify/require"
- abci "github.com/tendermint/tendermint/abci/types"
- "github.com/tendermint/tendermint/internal/pubsub"
- "github.com/tendermint/tendermint/internal/pubsub/query"
- "github.com/tendermint/tendermint/libs/log"
- "github.com/tendermint/tendermint/types"
- )
-
- const (
- clientID = "test-client"
- )
-
- // pubstring is a trivial implementation of the EventData interface for
- // string-valued test data.
- type pubstring string
-
- func (pubstring) TypeTag() string { return "pubstring" }
-
- func TestSubscribeWithArgs(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.TestingLogger()
- s := newTestServer(ctx, t, logger)
-
- t.Run("DefaultLimit", func(t *testing.T) {
- sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: clientID,
- Query: query.All,
- }))
-
- require.Equal(t, 1, s.NumClients())
- require.Equal(t, 1, s.NumClientSubscriptions(clientID))
-
- require.NoError(t, s.Publish(ctx, pubstring("Ka-Zar")))
- sub.mustReceive(ctx, pubstring("Ka-Zar"))
- })
- t.Run("PositiveLimit", func(t *testing.T) {
- sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: clientID + "-2",
- Query: query.All,
- Limit: 10,
- }))
- require.NoError(t, s.Publish(ctx, pubstring("Aggamon")))
- sub.mustReceive(ctx, pubstring("Aggamon"))
- })
- }
-
- func TestObserver(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- logger := log.TestingLogger()
-
- s := newTestServer(ctx, t, logger)
-
- done := make(chan struct{})
- var got interface{}
- require.NoError(t, s.Observe(ctx, func(msg pubsub.Message) error {
- defer close(done)
- got = msg.Data()
- return nil
- }))
-
- const input = pubstring("Lions and tigers and bears, oh my!")
- require.NoError(t, s.Publish(ctx, input))
- <-done
- require.Equal(t, got, input)
- }
-
- func TestObserverErrors(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.TestingLogger()
-
- s := newTestServer(ctx, t, logger)
-
- require.Error(t, s.Observe(ctx, nil, query.All))
- require.NoError(t, s.Observe(ctx, func(pubsub.Message) error { return nil }))
- require.Error(t, s.Observe(ctx, func(pubsub.Message) error { return nil }, query.All))
- }
-
- func TestPublishDoesNotBlock(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.TestingLogger()
-
- s := newTestServer(ctx, t, logger)
-
- sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: clientID,
- Query: query.All,
- }))
- published := make(chan struct{})
- go func() {
- defer close(published)
-
- require.NoError(t, s.Publish(ctx, pubstring("Quicksilver")))
- require.NoError(t, s.Publish(ctx, pubstring("Asylum")))
- require.NoError(t, s.Publish(ctx, pubstring("Ivan")))
- }()
-
- select {
- case <-published:
- sub.mustReceive(ctx, pubstring("Quicksilver"))
- sub.mustFail(ctx, pubsub.ErrTerminated)
- case <-time.After(3 * time.Second):
- t.Fatal("Publishing should not have blocked")
- }
- }
-
- func TestSubscribeErrors(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.TestingLogger()
- s := newTestServer(ctx, t, logger)
-
- t.Run("NegativeLimitErr", func(t *testing.T) {
- _, err := s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: clientID,
- Query: query.All,
- Limit: -5,
- })
- require.Error(t, err)
- })
- }
-
- func TestSlowSubscriber(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.TestingLogger()
- s := newTestServer(ctx, t, logger)
-
- sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: clientID,
- Query: query.All,
- }))
-
- require.NoError(t, s.Publish(ctx, pubstring("Fat Cobra")))
- require.NoError(t, s.Publish(ctx, pubstring("Viper")))
- require.NoError(t, s.Publish(ctx, pubstring("Black Panther")))
-
- // We had capacity for one item, so we should get that item, but after that
- // the subscription should have been terminated by the publisher.
- sub.mustReceive(ctx, pubstring("Fat Cobra"))
- sub.mustFail(ctx, pubsub.ErrTerminated)
- }
-
- func TestDifferentClients(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.TestingLogger()
- s := newTestServer(ctx, t, logger)
-
- sub1 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: "client-1",
- Query: query.MustCompile(`tm.events.type='NewBlock'`),
- }))
-
- events := []abci.Event{{
- Type: "tm.events",
- Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}},
- }}
-
- require.NoError(t, s.PublishWithEvents(ctx, pubstring("Iceman"), events))
- sub1.mustReceive(ctx, pubstring("Iceman"))
-
- sub2 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: "client-2",
- Query: query.MustCompile(`tm.events.type='NewBlock' AND abci.account.name='Igor'`),
- }))
-
- events = []abci.Event{
- {
- Type: "tm.events",
- Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}},
- },
- {
- Type: "abci.account",
- Attributes: []abci.EventAttribute{{Key: "name", Value: "Igor"}},
- },
- }
-
- require.NoError(t, s.PublishWithEvents(ctx, pubstring("Ultimo"), events))
- sub1.mustReceive(ctx, pubstring("Ultimo"))
- sub2.mustReceive(ctx, pubstring("Ultimo"))
-
- sub3 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: "client-3",
- Query: query.MustCompile(
- `tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10`),
- }))
-
- events = []abci.Event{{
- Type: "tm.events",
- Attributes: []abci.EventAttribute{{Key: "type", Value: "NewRoundStep"}},
- }}
-
- require.NoError(t, s.PublishWithEvents(ctx, pubstring("Valeria Richards"), events))
- sub3.mustTimeOut(ctx, 100*time.Millisecond)
- }
-
- func TestSubscribeDuplicateKeys(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.TestingLogger()
- s := newTestServer(ctx, t, logger)
-
- testCases := []struct {
- query string
- expected types.EventData
- }{
- {`withdraw.rewards='17'`, pubstring("Iceman")},
- {`withdraw.rewards='22'`, pubstring("Iceman")},
- {`withdraw.rewards='1' AND withdraw.rewards='22'`, pubstring("Iceman")},
- {`withdraw.rewards='100'`, nil},
- }
-
- for i, tc := range testCases {
- id := fmt.Sprintf("client-%d", i)
- q := query.MustCompile(tc.query)
- t.Run(id, func(t *testing.T) {
- sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: id,
- Query: q,
- }))
-
- events := []abci.Event{
- {
- Type: "transfer",
- Attributes: []abci.EventAttribute{
- {Key: "sender", Value: "foo"},
- {Key: "sender", Value: "bar"},
- {Key: "sender", Value: "baz"},
- },
- },
- {
- Type: "withdraw",
- Attributes: []abci.EventAttribute{
- {Key: "rewards", Value: "1"},
- {Key: "rewards", Value: "17"},
- {Key: "rewards", Value: "22"},
- },
- },
- }
-
- require.NoError(t, s.PublishWithEvents(ctx, pubstring("Iceman"), events))
-
- if tc.expected != nil {
- sub.mustReceive(ctx, tc.expected)
- } else {
- sub.mustTimeOut(ctx, 100*time.Millisecond)
- }
- })
- }
- }
-
- func TestClientSubscribesTwice(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.TestingLogger()
- s := newTestServer(ctx, t, logger)
-
- q := query.MustCompile(`tm.events.type='NewBlock'`)
- events := []abci.Event{{
- Type: "tm.events",
- Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}},
- }}
-
- sub1 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: clientID,
- Query: q,
- }))
-
- require.NoError(t, s.PublishWithEvents(ctx, pubstring("Goblin Queen"), events))
- sub1.mustReceive(ctx, pubstring("Goblin Queen"))
-
- // Subscribing a second time with the same client ID and query fails.
- {
- sub2, err := s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: clientID,
- Query: q,
- })
- require.Error(t, err)
- require.Nil(t, sub2)
- }
-
- // The attempt to re-subscribe does not disrupt the existing sub.
- require.NoError(t, s.PublishWithEvents(ctx, pubstring("Spider-Man"), events))
- sub1.mustReceive(ctx, pubstring("Spider-Man"))
- }
-
- func TestUnsubscribe(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.TestingLogger()
- s := newTestServer(ctx, t, logger)
-
- sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: clientID,
- Query: query.MustCompile(`tm.events.type='NewBlock'`),
- }))
-
- // Removing the subscription we just made should succeed.
- require.NoError(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{
- Subscriber: clientID,
- Query: query.MustCompile(`tm.events.type='NewBlock'`),
- }))
-
- // Publishing should still work.
- require.NoError(t, s.Publish(ctx, pubstring("Nick Fury")))
-
- // The unsubscribed subscriber should report as such.
- sub.mustFail(ctx, pubsub.ErrUnsubscribed)
- }
-
- func TestClientUnsubscribesTwice(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.TestingLogger()
- s := newTestServer(ctx, t, logger)
-
- newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: clientID,
- Query: query.MustCompile(`tm.events.type='NewBlock'`),
- }))
- require.NoError(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{
- Subscriber: clientID,
- Query: query.MustCompile(`tm.events.type='NewBlock'`),
- }))
- require.ErrorIs(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{
- Subscriber: clientID,
- Query: query.MustCompile(`tm.events.type='NewBlock'`),
- }), pubsub.ErrSubscriptionNotFound)
- require.ErrorIs(t, s.UnsubscribeAll(ctx, clientID), pubsub.ErrSubscriptionNotFound)
- }
-
- func TestResubscribe(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.TestingLogger()
- s := newTestServer(ctx, t, logger)
-
- args := pubsub.SubscribeArgs{
- ClientID: clientID,
- Query: query.All,
- }
- newTestSub(t).must(s.SubscribeWithArgs(ctx, args))
-
- require.NoError(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{
- Subscriber: clientID,
- Query: query.All,
- }))
-
- sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, args))
-
- require.NoError(t, s.Publish(ctx, pubstring("Cable")))
- sub.mustReceive(ctx, pubstring("Cable"))
- }
-
- func TestUnsubscribeAll(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.TestingLogger()
- s := newTestServer(ctx, t, logger)
-
- sub1 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: clientID,
- Query: query.MustCompile(`tm.events.type='NewBlock'`),
- }))
- sub2 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- ClientID: clientID,
- Query: query.MustCompile(`tm.events.type='NewBlockHeader'`),
- }))
-
- require.NoError(t, s.UnsubscribeAll(ctx, clientID))
- require.NoError(t, s.Publish(ctx, pubstring("Nick Fury")))
-
- sub1.mustFail(ctx, pubsub.ErrUnsubscribed)
- sub2.mustFail(ctx, pubsub.ErrUnsubscribed)
-
- }
-
- func TestBufferCapacity(t *testing.T) {
- logger := log.TestingLogger()
- s := pubsub.NewServer(logger, pubsub.BufferCapacity(2))
-
- require.Equal(t, 2, s.BufferCapacity())
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- require.NoError(t, s.Publish(ctx, pubstring("Nighthawk")))
- require.NoError(t, s.Publish(ctx, pubstring("Sage")))
-
- ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond)
- defer cancel()
-
- require.ErrorIs(t, s.Publish(ctx, pubstring("Ironclad")), context.DeadlineExceeded)
- }
-
- func newTestServer(ctx context.Context, t testing.TB, logger log.Logger) *pubsub.Server {
- t.Helper()
-
- s := pubsub.NewServer(logger)
-
- require.NoError(t, s.Start(ctx))
- t.Cleanup(s.Wait)
- return s
- }
-
- type testSub struct {
- t testing.TB
- *pubsub.Subscription
- }
-
- func newTestSub(t testing.TB) *testSub { return &testSub{t: t} }
-
- func (s *testSub) must(sub *pubsub.Subscription, err error) *testSub {
- s.t.Helper()
- require.NoError(s.t, err)
- require.NotNil(s.t, sub)
- s.Subscription = sub
- return s
- }
-
- func (s *testSub) mustReceive(ctx context.Context, want types.EventData) {
- s.t.Helper()
- got, err := s.Next(ctx)
- require.NoError(s.t, err)
- require.Equal(s.t, want, got.Data())
- }
-
- func (s *testSub) mustTimeOut(ctx context.Context, dur time.Duration) {
- s.t.Helper()
- tctx, cancel := context.WithTimeout(ctx, dur)
- defer cancel()
- got, err := s.Next(tctx)
- if !errors.Is(err, context.DeadlineExceeded) {
- s.t.Errorf("Next: got (%+v, %v), want %v", got, err, context.DeadlineExceeded)
- }
- }
-
- func (s *testSub) mustFail(ctx context.Context, want error) {
- s.t.Helper()
- got, err := s.Next(ctx)
- if err == nil && want != nil {
- s.t.Fatalf("Next: got (%+v, %v), want error %v", got, err, want)
- }
- require.ErrorIs(s.t, err, want)
- }
|