diff --git a/pubsub/example_test.go b/pubsub/example_test.go index d64b96eab..38026ccd6 100644 --- a/pubsub/example_test.go +++ b/pubsub/example_test.go @@ -17,8 +17,8 @@ func TestExample(t *testing.T) { defer s.Stop() ch := make(chan interface{}, 1) - s.Subscribe("example-client", query.MustParse("abci.account.name=John"), ch) - err := s.PublishWithTags("Tombstone", map[string]interface{}{"abci.account.name": "John"}) + err := s.Subscribe("example-client", query.MustParse("abci.account.name=John"), ch) require.NoError(t, err) + s.PublishWithTags("Tombstone", map[string]interface{}{"abci.account.name": "John"}) assertReceive(t, "Tombstone", ch) } diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 2b1a569c7..9ac260c93 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -15,6 +15,7 @@ package pubsub import ( "errors" + "time" cmn "github.com/tendermint/tmlibs/common" ) @@ -28,8 +29,10 @@ const ( shutdown ) +const subscribeTimeout = 10 * time.Millisecond + var ( - ErrorOverflow = errors.New("Server overflowed") + ErrorOverflow = errors.New("server overflowed") ) type cmd struct { @@ -51,7 +54,8 @@ type Query interface { type Server struct { cmn.BaseService - cmds chan cmd + cmds chan cmd + cmdsCap int } // Option sets a parameter for the server. @@ -68,9 +72,8 @@ func NewServer(options ...Option) *Server { option(s) } - if s.cmds == nil { // if BufferCapacity was not set, create unbuffered channel - s.cmds = make(chan cmd) - } + // if BufferCapacity option was not set, the channel is unbuffered + s.cmds = make(chan cmd, s.cmdsCap) return s } @@ -82,40 +85,49 @@ func NewServer(options ...Option) *Server { func BufferCapacity(cap int) Option { return func(s *Server) { if cap > 0 { - s.cmds = make(chan cmd, cap) + s.cmdsCap = cap } } } +// Returns capacity of the internal server's queue. +func (s Server) BufferCapacity() int { + return s.cmdsCap +} + // Subscribe returns a channel on which messages matching the given query can // be received. If the subscription already exists old channel will be closed -// and new one returned. -func (s *Server) Subscribe(clientID string, query Query, out chan<- interface{}) { - s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out} +// and new one returned. Error will be returned to the caller if the server is +// overflowed. +func (s *Server) Subscribe(clientID string, query Query, out chan<- interface{}) error { + select { + case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: + return nil + case <-time.After(subscribeTimeout): + return ErrorOverflow + } } -// Unsubscribe unsubscribes the given client from the query. +// Unsubscribe unsubscribes the given client from the query. Blocking. func (s *Server) Unsubscribe(clientID string, query Query) { s.cmds <- cmd{op: unsub, clientID: clientID, query: query} } -// Unsubscribe unsubscribes the given channel. +// Unsubscribe unsubscribes the given channel. Blocking. func (s *Server) UnsubscribeAll(clientID string) { s.cmds <- cmd{op: unsub, clientID: clientID} } -// Publish publishes the given message. -func (s *Server) Publish(msg interface{}) error { - return s.PublishWithTags(msg, make(map[string]interface{})) +// Publish publishes the given message. Blocking. +func (s *Server) Publish(msg interface{}) { + s.PublishWithTags(msg, make(map[string]interface{})) } // PublishWithTags publishes the given message with a set of tags. This set of // tags will be matched with client queries. If there is a match, the message -// will be sent to a client. -func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) error { - pubCmd := cmd{op: pub, msg: msg, tags: tags} - s.cmds <- pubCmd - return nil +// will be sent to a client. Blocking. +func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) { + s.cmds <- cmd{op: pub, msg: msg, tags: tags} } // OnStop implements Service.OnStop by shutting down the server. diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 7cc4e5998..fb15b3489 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -25,13 +25,12 @@ func TestSubscribe(t *testing.T) { defer s.Stop() ch := make(chan interface{}, 1) - s.Subscribe(clientID, query.Empty{}, ch) - err := s.Publish("Ka-Zar") + err := s.Subscribe(clientID, query.Empty{}, ch) require.NoError(t, err) + s.Publish("Ka-Zar") assertReceive(t, "Ka-Zar", ch) - err = s.Publish("Quicksilver") - require.NoError(t, err) + s.Publish("Quicksilver") assertReceive(t, "Quicksilver", ch) } @@ -41,22 +40,22 @@ func TestDifferentClients(t *testing.T) { s.Start() defer s.Stop() ch1 := make(chan interface{}, 1) - s.Subscribe("client-1", query.MustParse("tm.events.type=NewBlock"), ch1) - err := s.PublishWithTags("Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) + err := s.Subscribe("client-1", query.MustParse("tm.events.type=NewBlock"), ch1) require.NoError(t, err) + s.PublishWithTags("Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) assertReceive(t, "Iceman", ch1) ch2 := make(chan interface{}, 1) - s.Subscribe("client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2) - err = s.PublishWithTags("Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) + err = s.Subscribe("client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2) require.NoError(t, err) + s.PublishWithTags("Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) assertReceive(t, "Ultimo", ch1) assertReceive(t, "Ultimo", ch2) ch3 := make(chan interface{}, 1) - s.Subscribe("client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3) - err = s.PublishWithTags("Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) + err = s.Subscribe("client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3) require.NoError(t, err) + s.PublishWithTags("Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) assert.Zero(t, len(ch3)) } @@ -69,19 +68,19 @@ func TestClientResubscribes(t *testing.T) { q := query.MustParse("tm.events.type=NewBlock") ch1 := make(chan interface{}, 1) - s.Subscribe(clientID, q, ch1) - err := s.PublishWithTags("Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) + err := s.Subscribe(clientID, q, ch1) require.NoError(t, err) + s.PublishWithTags("Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) assertReceive(t, "Goblin Queen", ch1) ch2 := make(chan interface{}, 1) - s.Subscribe(clientID, q, ch2) + err = s.Subscribe(clientID, q, ch2) + require.NoError(t, err) _, ok := <-ch1 assert.False(t, ok) - err = s.PublishWithTags("Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) - require.NoError(t, err) + s.PublishWithTags("Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) assertReceive(t, "Spider-Man", ch2) } @@ -92,11 +91,11 @@ func TestUnsubscribe(t *testing.T) { defer s.Stop() ch := make(chan interface{}) - s.Subscribe(clientID, query.Empty{}, ch) + err := s.Subscribe(clientID, query.Empty{}, ch) + require.NoError(t, err) s.Unsubscribe(clientID, query.Empty{}) - err := s.Publish("Nick Fury") - require.NoError(t, err) + s.Publish("Nick Fury") assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") _, ok := <-ch @@ -110,13 +109,14 @@ func TestUnsubscribeAll(t *testing.T) { defer s.Stop() ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) - s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch1) - s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2) + err := s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch1) + require.NoError(t, err) + err = s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2) + require.NoError(t, err) s.UnsubscribeAll(clientID) - err := s.Publish("Nick Fury") - require.NoError(t, err) + s.Publish("Nick Fury") assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") @@ -130,10 +130,19 @@ func TestBufferCapacity(t *testing.T) { s := pubsub.NewServer(pubsub.BufferCapacity(2)) s.SetLogger(log.TestingLogger()) - err := s.Publish("Nighthawk") - require.NoError(t, err) - err = s.Publish("Sage") - require.NoError(t, err) + s.Publish("Nighthawk") + s.Publish("Sage") +} + +func TestSubscribeReturnsErrorIfServerOverflowed(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + + ch := make(chan interface{}, 1) + err := s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch) + if assert.Error(t, err) { + assert.Equal(t, pubsub.ErrorOverflow, err) + } } func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) }