package pubsub_test import ( "context" "errors" "fmt" "testing" "time" "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/pubsub" "github.com/tendermint/tendermint/internal/pubsub/query" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/types" ) const ( clientID = "test-client" ) // pubstring is a trivial implementation of the EventData interface for // string-valued test data. type pubstring string func (pubstring) TypeTag() string { return "pubstring" } func TestSubscribeWithArgs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.TestingLogger() s := newTestServer(ctx, t, logger) t.Run("DefaultLimit", func(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, Query: query.All, })) require.Equal(t, 1, s.NumClients()) require.Equal(t, 1, s.NumClientSubscriptions(clientID)) require.NoError(t, s.Publish(ctx, pubstring("Ka-Zar"))) sub.mustReceive(ctx, pubstring("Ka-Zar")) }) t.Run("PositiveLimit", func(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID + "-2", Query: query.All, Limit: 10, })) require.NoError(t, s.Publish(ctx, pubstring("Aggamon"))) sub.mustReceive(ctx, pubstring("Aggamon")) }) } func TestObserver(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.TestingLogger() s := newTestServer(ctx, t, logger) done := make(chan struct{}) var got interface{} require.NoError(t, s.Observe(ctx, func(msg pubsub.Message) error { defer close(done) got = msg.Data() return nil })) const input = pubstring("Lions and tigers and bears, oh my!") require.NoError(t, s.Publish(ctx, input)) <-done require.Equal(t, got, input) } func TestObserverErrors(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.TestingLogger() s := newTestServer(ctx, t, logger) require.Error(t, s.Observe(ctx, nil, query.All)) require.NoError(t, s.Observe(ctx, func(pubsub.Message) error { return nil })) require.Error(t, s.Observe(ctx, func(pubsub.Message) error { return nil }, query.All)) } func TestPublishDoesNotBlock(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.TestingLogger() s := newTestServer(ctx, t, logger) sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, Query: query.All, })) published := make(chan struct{}) go func() { defer close(published) require.NoError(t, s.Publish(ctx, pubstring("Quicksilver"))) require.NoError(t, s.Publish(ctx, pubstring("Asylum"))) require.NoError(t, s.Publish(ctx, pubstring("Ivan"))) }() select { case <-published: sub.mustReceive(ctx, pubstring("Quicksilver")) sub.mustFail(ctx, pubsub.ErrTerminated) case <-time.After(3 * time.Second): t.Fatal("Publishing should not have blocked") } } func TestSubscribeErrors(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.TestingLogger() s := newTestServer(ctx, t, logger) t.Run("NegativeLimitErr", func(t *testing.T) { _, err := s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, Query: query.All, Limit: -5, }) require.Error(t, err) }) } func TestSlowSubscriber(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.TestingLogger() s := newTestServer(ctx, t, logger) sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, Query: query.All, })) require.NoError(t, s.Publish(ctx, pubstring("Fat Cobra"))) require.NoError(t, s.Publish(ctx, pubstring("Viper"))) require.NoError(t, s.Publish(ctx, pubstring("Black Panther"))) // We had capacity for one item, so we should get that item, but after that // the subscription should have been terminated by the publisher. sub.mustReceive(ctx, pubstring("Fat Cobra")) sub.mustFail(ctx, pubsub.ErrTerminated) } func TestDifferentClients(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.TestingLogger() s := newTestServer(ctx, t, logger) sub1 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: "client-1", Query: query.MustCompile(`tm.events.type='NewBlock'`), })) events := []abci.Event{{ Type: "tm.events", Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}}, }} require.NoError(t, s.PublishWithEvents(ctx, pubstring("Iceman"), events)) sub1.mustReceive(ctx, pubstring("Iceman")) sub2 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: "client-2", Query: query.MustCompile(`tm.events.type='NewBlock' AND abci.account.name='Igor'`), })) events = []abci.Event{ { Type: "tm.events", Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}}, }, { Type: "abci.account", Attributes: []abci.EventAttribute{{Key: "name", Value: "Igor"}}, }, } require.NoError(t, s.PublishWithEvents(ctx, pubstring("Ultimo"), events)) sub1.mustReceive(ctx, pubstring("Ultimo")) sub2.mustReceive(ctx, pubstring("Ultimo")) sub3 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: "client-3", Query: query.MustCompile( `tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10`), })) events = []abci.Event{{ Type: "tm.events", Attributes: []abci.EventAttribute{{Key: "type", Value: "NewRoundStep"}}, }} require.NoError(t, s.PublishWithEvents(ctx, pubstring("Valeria Richards"), events)) sub3.mustTimeOut(ctx, 100*time.Millisecond) } func TestSubscribeDuplicateKeys(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.TestingLogger() s := newTestServer(ctx, t, logger) testCases := []struct { query string expected types.EventData }{ {`withdraw.rewards='17'`, pubstring("Iceman")}, {`withdraw.rewards='22'`, pubstring("Iceman")}, {`withdraw.rewards='1' AND withdraw.rewards='22'`, pubstring("Iceman")}, {`withdraw.rewards='100'`, nil}, } for i, tc := range testCases { id := fmt.Sprintf("client-%d", i) q := query.MustCompile(tc.query) t.Run(id, func(t *testing.T) { sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: id, Query: q, })) events := []abci.Event{ { Type: "transfer", Attributes: []abci.EventAttribute{ {Key: "sender", Value: "foo"}, {Key: "sender", Value: "bar"}, {Key: "sender", Value: "baz"}, }, }, { Type: "withdraw", Attributes: []abci.EventAttribute{ {Key: "rewards", Value: "1"}, {Key: "rewards", Value: "17"}, {Key: "rewards", Value: "22"}, }, }, } require.NoError(t, s.PublishWithEvents(ctx, pubstring("Iceman"), events)) if tc.expected != nil { sub.mustReceive(ctx, tc.expected) } else { sub.mustTimeOut(ctx, 100*time.Millisecond) } }) } } func TestClientSubscribesTwice(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.TestingLogger() s := newTestServer(ctx, t, logger) q := query.MustCompile(`tm.events.type='NewBlock'`) events := []abci.Event{{ Type: "tm.events", Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}}, }} sub1 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, Query: q, })) require.NoError(t, s.PublishWithEvents(ctx, pubstring("Goblin Queen"), events)) sub1.mustReceive(ctx, pubstring("Goblin Queen")) // Subscribing a second time with the same client ID and query fails. { sub2, err := s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, Query: q, }) require.Error(t, err) require.Nil(t, sub2) } // The attempt to re-subscribe does not disrupt the existing sub. require.NoError(t, s.PublishWithEvents(ctx, pubstring("Spider-Man"), events)) sub1.mustReceive(ctx, pubstring("Spider-Man")) } func TestUnsubscribe(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.TestingLogger() s := newTestServer(ctx, t, logger) sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, Query: query.MustCompile(`tm.events.type='NewBlock'`), })) // Removing the subscription we just made should succeed. require.NoError(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{ Subscriber: clientID, Query: query.MustCompile(`tm.events.type='NewBlock'`), })) // Publishing should still work. require.NoError(t, s.Publish(ctx, pubstring("Nick Fury"))) // The unsubscribed subscriber should report as such. sub.mustFail(ctx, pubsub.ErrUnsubscribed) } func TestClientUnsubscribesTwice(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.TestingLogger() s := newTestServer(ctx, t, logger) newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, Query: query.MustCompile(`tm.events.type='NewBlock'`), })) require.NoError(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{ Subscriber: clientID, Query: query.MustCompile(`tm.events.type='NewBlock'`), })) require.ErrorIs(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{ Subscriber: clientID, Query: query.MustCompile(`tm.events.type='NewBlock'`), }), pubsub.ErrSubscriptionNotFound) require.ErrorIs(t, s.UnsubscribeAll(ctx, clientID), pubsub.ErrSubscriptionNotFound) } func TestResubscribe(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.TestingLogger() s := newTestServer(ctx, t, logger) args := pubsub.SubscribeArgs{ ClientID: clientID, Query: query.All, } newTestSub(t).must(s.SubscribeWithArgs(ctx, args)) require.NoError(t, s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{ Subscriber: clientID, Query: query.All, })) sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, args)) require.NoError(t, s.Publish(ctx, pubstring("Cable"))) sub.mustReceive(ctx, pubstring("Cable")) } func TestUnsubscribeAll(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.TestingLogger() s := newTestServer(ctx, t, logger) sub1 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, Query: query.MustCompile(`tm.events.type='NewBlock'`), })) sub2 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: clientID, Query: query.MustCompile(`tm.events.type='NewBlockHeader'`), })) require.NoError(t, s.UnsubscribeAll(ctx, clientID)) require.NoError(t, s.Publish(ctx, pubstring("Nick Fury"))) sub1.mustFail(ctx, pubsub.ErrUnsubscribed) sub2.mustFail(ctx, pubsub.ErrUnsubscribed) } func TestBufferCapacity(t *testing.T) { logger := log.TestingLogger() s := pubsub.NewServer(logger, pubsub.BufferCapacity(2)) require.Equal(t, 2, s.BufferCapacity()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, s.Publish(ctx, pubstring("Nighthawk"))) require.NoError(t, s.Publish(ctx, pubstring("Sage"))) ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond) defer cancel() require.ErrorIs(t, s.Publish(ctx, pubstring("Ironclad")), context.DeadlineExceeded) } func newTestServer(ctx context.Context, t testing.TB, logger log.Logger) *pubsub.Server { t.Helper() s := pubsub.NewServer(logger) require.NoError(t, s.Start(ctx)) t.Cleanup(s.Wait) return s } type testSub struct { t testing.TB *pubsub.Subscription } func newTestSub(t testing.TB) *testSub { return &testSub{t: t} } func (s *testSub) must(sub *pubsub.Subscription, err error) *testSub { s.t.Helper() require.NoError(s.t, err) require.NotNil(s.t, sub) s.Subscription = sub return s } func (s *testSub) mustReceive(ctx context.Context, want types.EventData) { s.t.Helper() got, err := s.Next(ctx) require.NoError(s.t, err) require.Equal(s.t, want, got.Data()) } func (s *testSub) mustTimeOut(ctx context.Context, dur time.Duration) { s.t.Helper() tctx, cancel := context.WithTimeout(ctx, dur) defer cancel() got, err := s.Next(tctx) if !errors.Is(err, context.DeadlineExceeded) { s.t.Errorf("Next: got (%+v, %v), want %v", got, err, context.DeadlineExceeded) } } func (s *testSub) mustFail(ctx context.Context, want error) { s.t.Helper() got, err := s.Next(ctx) if err == nil && want != nil { s.t.Fatalf("Next: got (%+v, %v), want error %v", got, err, want) } require.ErrorIs(s.t, err, want) }