|
@ -68,7 +68,7 @@ func TestDifferentClients(t *testing.T) { |
|
|
assert.Zero(t, len(ch3)) |
|
|
assert.Zero(t, len(ch3)) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func TestClientResubscribes(t *testing.T) { |
|
|
|
|
|
|
|
|
func TestClientSubscribesTwice(t *testing.T) { |
|
|
s := pubsub.NewServer() |
|
|
s := pubsub.NewServer() |
|
|
s.SetLogger(log.TestingLogger()) |
|
|
s.SetLogger(log.TestingLogger()) |
|
|
s.Start() |
|
|
s.Start() |
|
@ -125,9 +125,9 @@ func TestUnsubscribeAll(t *testing.T) { |
|
|
|
|
|
|
|
|
ctx := context.Background() |
|
|
ctx := context.Background() |
|
|
ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) |
|
|
ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) |
|
|
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type=NewBlock"), ch1) |
|
|
|
|
|
|
|
|
err := s.Subscribe(ctx, clientID, query.Empty{}, ch1) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2) |
|
|
|
|
|
|
|
|
err = s.Subscribe(ctx, clientID, query.Empty{}, ch2) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
err = s.UnsubscribeAll(ctx, clientID) |
|
|
err = s.UnsubscribeAll(ctx, clientID) |
|
@ -148,6 +148,8 @@ func TestBufferCapacity(t *testing.T) { |
|
|
s := pubsub.NewServer(pubsub.BufferCapacity(2)) |
|
|
s := pubsub.NewServer(pubsub.BufferCapacity(2)) |
|
|
s.SetLogger(log.TestingLogger()) |
|
|
s.SetLogger(log.TestingLogger()) |
|
|
|
|
|
|
|
|
|
|
|
assert.Equal(t, 2, s.BufferCapacity()) |
|
|
|
|
|
|
|
|
ctx := context.Background() |
|
|
ctx := context.Background() |
|
|
err := s.Publish(ctx, "Nighthawk") |
|
|
err := s.Publish(ctx, "Nighthawk") |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|