diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index ae642a4fb..007f93f32 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -290,11 +290,8 @@ func (state *state) removeAll(clientID string) { func (state *state) send(msg interface{}, tags map[string]interface{}, slowClientStrategy overflowStrategy, logger log.Logger) { for q, clientToChannelMap := range state.queries { - // NOTE we can use LRU cache to speed up common cases like query = " - // tm.events.type=NewBlock" and tags = {"tm.events.type": "NewBlock"} if q.Matches(tags) { for clientID, ch := range clientToChannelMap { - logger.Info("Sending message to client", "msg", msg, "client", clientID) switch slowClientStrategy { case drop: select { diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 570f76a82..3112ab5d3 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -193,13 +193,21 @@ func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) } func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) } +func Benchmark10ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(10, b) } +func Benchmark100ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(100, b) } +func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) } + func benchmarkNClients(n int, b *testing.B) { - s := pubsub.NewServer(pubsub.BufferCapacity(b.N)) + s := pubsub.NewServer() s.Start() defer s.Stop() for i := 0; i < n; i++ { ch := make(chan interface{}) + go func() { + for range ch { + } + }() s.Subscribe(clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = Ivan AND abci.Invoices.Number = %d", i)), ch) } @@ -210,6 +218,28 @@ func benchmarkNClients(n int, b *testing.B) { } } +func benchmarkNClientsOneQuery(n int, b *testing.B) { + s := pubsub.NewServer() + s.Start() + defer s.Stop() + + q := query.MustParse("abci.Account.Owner = Ivan AND abci.Invoices.Number = 1") + for i := 0; i < n; i++ { + ch := make(chan interface{}) + go func() { + for range ch { + } + }() + s.Subscribe(clientID, q, ch) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.PublishWithTags("Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}) + } +} + /////////////////////////////////////////////////////////////////////////////// /// HELPERS ///////////////////////////////////////////////////////////////////////////////