From e4f3f9d9bf327083eba26afa2b85ff09189856c3 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 12 Jul 2017 22:52:13 +0300 Subject: [PATCH] remove comment about LRU cache (see comments below) I've tried https://github.com/hashicorp/golang-lru/tree/master/simplelru today and here are the results: with LRU cache: ``` Benchmark10Clients-2 50000 29021 ns/op 3976 B/op 105 allocs/op Benchmark100Clients-2 3000 363432 ns/op 36382 B/op 1005 allocs/op Benchmark1000Clients-2 500 2473752 ns/op 360500 B/op 10009 allocs/op Benchmark10ClientsUsingTheSameQuery-2 300000 4059 ns/op 773 B/op 15 allocs/op Benchmark100ClientsUsingTheSameQuery-2 500000 4360 ns/op 773 B/op 15 allocs/op Benchmark1000ClientsUsingTheSameQuery-2 300000 4204 ns/op 773 B/op 15 allocs/op ``` without LRU cache: ``` Benchmark10Clients-2 200000 5267 ns/op 616 B/op 25 allocs/op Benchmark100Clients-2 30000 42134 ns/op 2776 B/op 205 allocs/op Benchmark1000Clients-2 3000 552648 ns/op 24376 B/op 2005 allocs/op Benchmark10ClientsOneQuery-2 1000000 2127 ns/op 462 B/op 9 allocs/op Benchmark100ClientsOneQuery-2 500000 2353 ns/op 462 B/op 9 allocs/op Benchmark1000ClientsOneQuery-2 500000 2339 ns/op 462 B/op 9 allocs/op ``` > How were you using the lru cache exactly? I was adding a KV pair each time there is a match plus checking if `lru.Contains(key)` before running the actual check (`q.Matches(tags)`). ``` key = fmt.Sprintf("%s/%v", query + tags) ``` --- pubsub/pubsub.go | 3 --- pubsub/pubsub_test.go | 32 +++++++++++++++++++++++++++++++- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index ae642a4fb..007f93f32 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -290,11 +290,8 @@ func (state *state) removeAll(clientID string) { func (state *state) send(msg interface{}, tags map[string]interface{}, slowClientStrategy overflowStrategy, logger log.Logger) { for q, clientToChannelMap := range state.queries { - // NOTE we can use LRU cache to speed up common cases like query = " - // tm.events.type=NewBlock" and tags = {"tm.events.type": "NewBlock"} if q.Matches(tags) { for clientID, ch := range clientToChannelMap { - logger.Info("Sending message to client", "msg", msg, "client", clientID) switch slowClientStrategy { case drop: select { diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 570f76a82..3112ab5d3 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -193,13 +193,21 @@ func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) } func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) } +func Benchmark10ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(10, b) } +func Benchmark100ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(100, b) } +func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) } + func benchmarkNClients(n int, b *testing.B) { - s := pubsub.NewServer(pubsub.BufferCapacity(b.N)) + s := pubsub.NewServer() s.Start() defer s.Stop() for i := 0; i < n; i++ { ch := make(chan interface{}) + go func() { + for range ch { + } + }() s.Subscribe(clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = Ivan AND abci.Invoices.Number = %d", i)), ch) } @@ -210,6 +218,28 @@ func benchmarkNClients(n int, b *testing.B) { } } +func benchmarkNClientsOneQuery(n int, b *testing.B) { + s := pubsub.NewServer() + s.Start() + defer s.Stop() + + q := query.MustParse("abci.Account.Owner = Ivan AND abci.Invoices.Number = 1") + for i := 0; i < n; i++ { + ch := make(chan interface{}) + go func() { + for range ch { + } + }() + s.Subscribe(clientID, q, ch) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.PublishWithTags("Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}) + } +} + /////////////////////////////////////////////////////////////////////////////// /// HELPERS ///////////////////////////////////////////////////////////////////////////////