You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

272 lines
7.8 KiB

  1. package pubsub_test
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime/debug"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. "github.com/tendermint/tendermint/libs/log"
  11. "github.com/tendermint/tendermint/libs/pubsub"
  12. "github.com/tendermint/tendermint/libs/pubsub/query"
  13. )
  14. const (
  15. clientID = "test-client"
  16. )
  17. func TestSubscribe(t *testing.T) {
  18. s := pubsub.NewServer()
  19. s.SetLogger(log.TestingLogger())
  20. s.Start()
  21. defer s.Stop()
  22. ctx := context.Background()
  23. ch := make(chan interface{}, 1)
  24. err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
  25. require.NoError(t, err)
  26. err = s.Publish(ctx, "Ka-Zar")
  27. require.NoError(t, err)
  28. assertReceive(t, "Ka-Zar", ch)
  29. err = s.Publish(ctx, "Quicksilver")
  30. require.NoError(t, err)
  31. assertReceive(t, "Quicksilver", ch)
  32. }
  33. func TestDifferentClients(t *testing.T) {
  34. s := pubsub.NewServer()
  35. s.SetLogger(log.TestingLogger())
  36. s.Start()
  37. defer s.Stop()
  38. ctx := context.Background()
  39. ch1 := make(chan interface{}, 1)
  40. err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1)
  41. require.NoError(t, err)
  42. err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
  43. require.NoError(t, err)
  44. assertReceive(t, "Iceman", ch1)
  45. ch2 := make(chan interface{}, 1)
  46. err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2)
  47. require.NoError(t, err)
  48. err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}))
  49. require.NoError(t, err)
  50. assertReceive(t, "Ultimo", ch1)
  51. assertReceive(t, "Ultimo", ch2)
  52. ch3 := make(chan interface{}, 1)
  53. err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3)
  54. require.NoError(t, err)
  55. err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewRoundStep"}))
  56. require.NoError(t, err)
  57. assert.Zero(t, len(ch3))
  58. }
  59. func TestClientSubscribesTwice(t *testing.T) {
  60. s := pubsub.NewServer()
  61. s.SetLogger(log.TestingLogger())
  62. s.Start()
  63. defer s.Stop()
  64. ctx := context.Background()
  65. q := query.MustParse("tm.events.type='NewBlock'")
  66. ch1 := make(chan interface{}, 1)
  67. err := s.Subscribe(ctx, clientID, q, ch1)
  68. require.NoError(t, err)
  69. err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
  70. require.NoError(t, err)
  71. assertReceive(t, "Goblin Queen", ch1)
  72. ch2 := make(chan interface{}, 1)
  73. err = s.Subscribe(ctx, clientID, q, ch2)
  74. require.Error(t, err)
  75. err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
  76. require.NoError(t, err)
  77. assertReceive(t, "Spider-Man", ch1)
  78. }
  79. func TestUnsubscribe(t *testing.T) {
  80. s := pubsub.NewServer()
  81. s.SetLogger(log.TestingLogger())
  82. s.Start()
  83. defer s.Stop()
  84. ctx := context.Background()
  85. ch := make(chan interface{})
  86. err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch)
  87. require.NoError(t, err)
  88. err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
  89. require.NoError(t, err)
  90. err = s.Publish(ctx, "Nick Fury")
  91. require.NoError(t, err)
  92. assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe")
  93. _, ok := <-ch
  94. assert.False(t, ok)
  95. }
  96. func TestClientUnsubscribesTwice(t *testing.T) {
  97. s := pubsub.NewServer()
  98. s.SetLogger(log.TestingLogger())
  99. s.Start()
  100. defer s.Stop()
  101. ctx := context.Background()
  102. ch := make(chan interface{})
  103. err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch)
  104. require.NoError(t, err)
  105. err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
  106. require.NoError(t, err)
  107. err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
  108. assert.Equal(t, pubsub.ErrSubscriptionNotFound, err)
  109. err = s.UnsubscribeAll(ctx, clientID)
  110. assert.Equal(t, pubsub.ErrSubscriptionNotFound, err)
  111. }
  112. func TestResubscribe(t *testing.T) {
  113. s := pubsub.NewServer()
  114. s.SetLogger(log.TestingLogger())
  115. s.Start()
  116. defer s.Stop()
  117. ctx := context.Background()
  118. ch := make(chan interface{})
  119. err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
  120. require.NoError(t, err)
  121. err = s.Unsubscribe(ctx, clientID, query.Empty{})
  122. require.NoError(t, err)
  123. ch = make(chan interface{})
  124. err = s.Subscribe(ctx, clientID, query.Empty{}, ch)
  125. require.NoError(t, err)
  126. err = s.Publish(ctx, "Cable")
  127. require.NoError(t, err)
  128. assertReceive(t, "Cable", ch)
  129. }
  130. func TestUnsubscribeAll(t *testing.T) {
  131. s := pubsub.NewServer()
  132. s.SetLogger(log.TestingLogger())
  133. s.Start()
  134. defer s.Stop()
  135. ctx := context.Background()
  136. ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1)
  137. err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch1)
  138. require.NoError(t, err)
  139. err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"), ch2)
  140. require.NoError(t, err)
  141. err = s.UnsubscribeAll(ctx, clientID)
  142. require.NoError(t, err)
  143. err = s.Publish(ctx, "Nick Fury")
  144. require.NoError(t, err)
  145. assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll")
  146. assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll")
  147. _, ok := <-ch1
  148. assert.False(t, ok)
  149. _, ok = <-ch2
  150. assert.False(t, ok)
  151. }
  152. func TestBufferCapacity(t *testing.T) {
  153. s := pubsub.NewServer(pubsub.BufferCapacity(2))
  154. s.SetLogger(log.TestingLogger())
  155. assert.Equal(t, 2, s.BufferCapacity())
  156. ctx := context.Background()
  157. err := s.Publish(ctx, "Nighthawk")
  158. require.NoError(t, err)
  159. err = s.Publish(ctx, "Sage")
  160. require.NoError(t, err)
  161. ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
  162. defer cancel()
  163. err = s.Publish(ctx, "Ironclad")
  164. if assert.Error(t, err) {
  165. assert.Equal(t, context.DeadlineExceeded, err)
  166. }
  167. }
  168. func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) }
  169. func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) }
  170. func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) }
  171. func Benchmark10ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(10, b) }
  172. func Benchmark100ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(100, b) }
  173. func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) }
  174. func benchmarkNClients(n int, b *testing.B) {
  175. s := pubsub.NewServer()
  176. s.Start()
  177. defer s.Stop()
  178. ctx := context.Background()
  179. for i := 0; i < n; i++ {
  180. ch := make(chan interface{})
  181. go func() {
  182. for range ch {
  183. }
  184. }()
  185. s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), ch)
  186. }
  187. b.ReportAllocs()
  188. b.ResetTimer()
  189. for i := 0; i < b.N; i++ {
  190. s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": string(i)}))
  191. }
  192. }
  193. func benchmarkNClientsOneQuery(n int, b *testing.B) {
  194. s := pubsub.NewServer()
  195. s.Start()
  196. defer s.Stop()
  197. ctx := context.Background()
  198. q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1")
  199. for i := 0; i < n; i++ {
  200. ch := make(chan interface{})
  201. go func() {
  202. for range ch {
  203. }
  204. }()
  205. s.Subscribe(ctx, clientID, q, ch)
  206. }
  207. b.ReportAllocs()
  208. b.ResetTimer()
  209. for i := 0; i < b.N; i++ {
  210. s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": "1"}))
  211. }
  212. }
  213. ///////////////////////////////////////////////////////////////////////////////
  214. /// HELPERS
  215. ///////////////////////////////////////////////////////////////////////////////
  216. func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) {
  217. select {
  218. case actual := <-ch:
  219. if actual != nil {
  220. assert.Equal(t, expected, actual, msgAndArgs...)
  221. }
  222. case <-time.After(1 * time.Second):
  223. t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected)
  224. debug.PrintStack()
  225. }
  226. }