From e664f9c68861299060174ba348ad64a6854551d3 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 14 Jul 2017 14:49:25 +0300 Subject: [PATCH] use context to provide timeouts! --- pubsub/example_test.go | 7 +++- pubsub/pubsub.go | 53 +++++++++++++++----------- pubsub/pubsub_test.go | 86 ++++++++++++++++++++++++------------------ 3 files changed, 85 insertions(+), 61 deletions(-) diff --git a/pubsub/example_test.go b/pubsub/example_test.go index 38026ccd6..6597c858d 100644 --- a/pubsub/example_test.go +++ b/pubsub/example_test.go @@ -1,6 +1,7 @@ package pubsub_test import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -16,9 +17,11 @@ func TestExample(t *testing.T) { s.Start() defer s.Stop() + ctx := context.Background() ch := make(chan interface{}, 1) - err := s.Subscribe("example-client", query.MustParse("abci.account.name=John"), ch) + err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name=John"), ch) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Tombstone", map[string]interface{}{"abci.account.name": "John"}) require.NoError(t, err) - s.PublishWithTags("Tombstone", map[string]interface{}{"abci.account.name": "John"}) assertReceive(t, "Tombstone", ch) } diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 9ac260c93..34df86a45 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -14,8 +14,7 @@ package pubsub import ( - "errors" - "time" + "context" cmn "github.com/tendermint/tmlibs/common" ) @@ -29,12 +28,6 @@ const ( shutdown ) -const subscribeTimeout = 10 * time.Millisecond - -var ( - ErrorOverflow = errors.New("server overflowed") -) - type cmd struct { op operation query Query @@ -97,37 +90,53 @@ func (s Server) BufferCapacity() int { // Subscribe returns a channel on which messages matching the given query can // be received. If the subscription already exists old channel will be closed -// and new one returned. Error will be returned to the caller if the server is -// overflowed. -func (s *Server) Subscribe(clientID string, query Query, out chan<- interface{}) error { +// and new one returned. Error will be returned to the caller if the context is +// cancelled. +func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error { select { case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: return nil - case <-time.After(subscribeTimeout): - return ErrorOverflow + case <-ctx.Done(): + return ctx.Err() } } -// Unsubscribe unsubscribes the given client from the query. Blocking. -func (s *Server) Unsubscribe(clientID string, query Query) { - s.cmds <- cmd{op: unsub, clientID: clientID, query: query} +// Unsubscribe unsubscribes the given client from the query. Error will be +// returned to the caller if the context is cancelled. +func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error { + select { + case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}: + return nil + case <-ctx.Done(): + return ctx.Err() + } } // Unsubscribe unsubscribes the given channel. Blocking. -func (s *Server) UnsubscribeAll(clientID string) { - s.cmds <- cmd{op: unsub, clientID: clientID} +func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { + select { + case s.cmds <- cmd{op: unsub, clientID: clientID}: + return nil + case <-ctx.Done(): + return ctx.Err() + } } // Publish publishes the given message. Blocking. -func (s *Server) Publish(msg interface{}) { - s.PublishWithTags(msg, make(map[string]interface{})) +func (s *Server) Publish(ctx context.Context, msg interface{}) error { + return s.PublishWithTags(ctx, msg, make(map[string]interface{})) } // PublishWithTags publishes the given message with a set of tags. This set of // tags will be matched with client queries. If there is a match, the message // will be sent to a client. Blocking. -func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) { - s.cmds <- cmd{op: pub, msg: msg, tags: tags} +func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error { + select { + case s.cmds <- cmd{op: pub, msg: msg, tags: tags}: + return nil + case <-ctx.Done(): + return ctx.Err() + } } // OnStop implements Service.OnStop by shutting down the server. diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index fb15b3489..9c9841440 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -1,6 +1,7 @@ package pubsub_test import ( + "context" "fmt" "runtime/debug" "testing" @@ -24,13 +25,16 @@ func TestSubscribe(t *testing.T) { s.Start() defer s.Stop() + ctx := context.Background() ch := make(chan interface{}, 1) - err := s.Subscribe(clientID, query.Empty{}, ch) + err := s.Subscribe(ctx, clientID, query.Empty{}, ch) + require.NoError(t, err) + err = s.Publish(ctx, "Ka-Zar") require.NoError(t, err) - s.Publish("Ka-Zar") assertReceive(t, "Ka-Zar", ch) - s.Publish("Quicksilver") + err = s.Publish(ctx, "Quicksilver") + require.NoError(t, err) assertReceive(t, "Quicksilver", ch) } @@ -39,23 +43,28 @@ func TestDifferentClients(t *testing.T) { s.SetLogger(log.TestingLogger()) s.Start() defer s.Stop() + + ctx := context.Background() ch1 := make(chan interface{}, 1) - err := s.Subscribe("client-1", query.MustParse("tm.events.type=NewBlock"), ch1) + err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type=NewBlock"), ch1) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) require.NoError(t, err) - s.PublishWithTags("Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) assertReceive(t, "Iceman", ch1) ch2 := make(chan interface{}, 1) - err = s.Subscribe("client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2) + err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) require.NoError(t, err) - s.PublishWithTags("Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) assertReceive(t, "Ultimo", ch1) assertReceive(t, "Ultimo", ch2) ch3 := make(chan interface{}, 1) - err = s.Subscribe("client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3) + err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) require.NoError(t, err) - s.PublishWithTags("Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) assert.Zero(t, len(ch3)) } @@ -65,22 +74,25 @@ func TestClientResubscribes(t *testing.T) { s.Start() defer s.Stop() + ctx := context.Background() q := query.MustParse("tm.events.type=NewBlock") ch1 := make(chan interface{}, 1) - err := s.Subscribe(clientID, q, ch1) + err := s.Subscribe(ctx, clientID, q, ch1) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) require.NoError(t, err) - s.PublishWithTags("Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) assertReceive(t, "Goblin Queen", ch1) ch2 := make(chan interface{}, 1) - err = s.Subscribe(clientID, q, ch2) + err = s.Subscribe(ctx, clientID, q, ch2) require.NoError(t, err) _, ok := <-ch1 assert.False(t, ok) - s.PublishWithTags("Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) + err = s.PublishWithTags(ctx, "Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) + require.NoError(t, err) assertReceive(t, "Spider-Man", ch2) } @@ -90,12 +102,15 @@ func TestUnsubscribe(t *testing.T) { s.Start() defer s.Stop() + ctx := context.Background() ch := make(chan interface{}) - err := s.Subscribe(clientID, query.Empty{}, ch) + err := s.Subscribe(ctx, clientID, query.Empty{}, ch) + require.NoError(t, err) + err = s.Unsubscribe(ctx, clientID, query.Empty{}) require.NoError(t, err) - s.Unsubscribe(clientID, query.Empty{}) - s.Publish("Nick Fury") + err = s.Publish(ctx, "Nick Fury") + require.NoError(t, err) assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") _, ok := <-ch @@ -108,15 +123,18 @@ func TestUnsubscribeAll(t *testing.T) { s.Start() defer s.Stop() + ctx := context.Background() ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) - err := s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch1) + err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type=NewBlock"), ch1) require.NoError(t, err) - err = s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2) + err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2) require.NoError(t, err) - s.UnsubscribeAll(clientID) + err = s.UnsubscribeAll(ctx, clientID) + require.NoError(t, err) - s.Publish("Nick Fury") + err = s.Publish(ctx, "Nick Fury") + require.NoError(t, err) assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") @@ -130,19 +148,11 @@ func TestBufferCapacity(t *testing.T) { s := pubsub.NewServer(pubsub.BufferCapacity(2)) s.SetLogger(log.TestingLogger()) - s.Publish("Nighthawk") - s.Publish("Sage") -} - -func TestSubscribeReturnsErrorIfServerOverflowed(t *testing.T) { - s := pubsub.NewServer() - s.SetLogger(log.TestingLogger()) - - ch := make(chan interface{}, 1) - err := s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch) - if assert.Error(t, err) { - assert.Equal(t, pubsub.ErrorOverflow, err) - } + ctx := context.Background() + err := s.Publish(ctx, "Nighthawk") + require.NoError(t, err) + err = s.Publish(ctx, "Sage") + require.NoError(t, err) } func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } @@ -158,19 +168,20 @@ func benchmarkNClients(n int, b *testing.B) { s.Start() defer s.Stop() + ctx := context.Background() for i := 0; i < n; i++ { ch := make(chan interface{}) go func() { for range ch { } }() - s.Subscribe(clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = Ivan AND abci.Invoices.Number = %d", i)), ch) + s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = Ivan AND abci.Invoices.Number = %d", i)), ch) } b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.PublishWithTags("Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i}) + s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i}) } } @@ -179,6 +190,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { s.Start() defer s.Stop() + ctx := context.Background() q := query.MustParse("abci.Account.Owner = Ivan AND abci.Invoices.Number = 1") for i := 0; i < n; i++ { ch := make(chan interface{}) @@ -186,13 +198,13 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { for range ch { } }() - s.Subscribe(clientID, q, ch) + s.Subscribe(ctx, clientID, q, ch) } b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.PublishWithTags("Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}) + s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}) } }