Browse Source

updates as per Bucky's comments

pull/1842/head
Anton Kaliaev 7 years ago
parent
commit
17d6091ef4
No known key found for this signature in database GPG Key ID: 7B6881D965918214
2 changed files with 25 additions and 20 deletions
  1. +18
    -20
      pubsub/pubsub.go
  2. +7
    -0
      pubsub/pubsub_test.go

+ 18
- 20
pubsub/pubsub.go View File

@ -9,8 +9,6 @@
// When some message is published, we match it with all queries. If there is a
// match, this message will be pushed to all clients, subscribed to that query.
// See query subpackage for our implementation.
//
// Subscribe/Unsubscribe calls are always blocking.
package pubsub
import (
@ -42,7 +40,7 @@ type Query interface {
Matches(tags map[string]interface{}) bool
}
// Server allows clients to subscribe/unsubscribe for messages, pubsling
// Server allows clients to subscribe/unsubscribe for messages, publishing
// messages with or without tags, and manages internal state.
type Server struct {
cmn.BaseService
@ -83,15 +81,15 @@ func BufferCapacity(cap int) Option {
}
}
// Returns capacity of the internal server's queue.
// BufferCapacity returns capacity of the internal server's queue.
func (s Server) BufferCapacity() int {
return s.cmdsCap
}
// Subscribe returns a channel on which messages matching the given query can
// be received. If the subscription already exists old channel will be closed
// and new one returned. Error will be returned to the caller if the context is
// cancelled.
// Subscribe creates a subscription for the given client. It accepts a channel
// on which messages matching the given query can be received. If the
// subscription already exists, the old channel will be closed. An error will
// be returned to the caller if the context is canceled.
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error {
select {
case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
@ -101,8 +99,8 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou
}
}
// Unsubscribe unsubscribes the given client from the query. Error will be
// returned to the caller if the context is cancelled.
// Unsubscribe removes the subscription on the given query. An error will be
// returned to the caller if the context is canceled.
func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error {
select {
case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}:
@ -112,7 +110,8 @@ func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query)
}
}
// Unsubscribe unsubscribes the given channel. Blocking.
// Unsubscribe removes all client subscriptions. An error will be returned to
// the caller if the context is canceled.
func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
select {
case s.cmds <- cmd{op: unsub, clientID: clientID}:
@ -122,14 +121,15 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
}
}
// Publish publishes the given message. Blocking.
// Publish publishes the given message. An error will be returned to the caller
// if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error {
return s.PublishWithTags(ctx, msg, make(map[string]interface{}))
}
// PublishWithTags publishes the given message with a set of tags. This set of
// tags will be matched with client queries. If there is a match, the message
// will be sent to a client. Blocking.
// PublishWithTags publishes the given message with the set of tags. The set is
// matched with clients queries. If there is a match, the message is sent to
// the client.
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error {
select {
case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
@ -152,7 +152,7 @@ type state struct {
clients map[string]map[Query]struct{}
}
// OnStart implements Service.OnStart by creating a main loop.
// OnStart implements Service.OnStart by starting the server.
func (s *Server) OnStart() error {
go s.loop(state{
queries: make(map[Query]map[string]chan<- interface{}),
@ -194,6 +194,8 @@ func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
close(oldCh)
}
}
// create subscription
state.queries[q][clientID] = ch
// add client if needed
@ -201,10 +203,6 @@ func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
state.clients[clientID] = make(map[Query]struct{})
}
state.clients[clientID][q] = struct{}{}
// create subscription
clientToChannelMap := state.queries[q]
clientToChannelMap[clientID] = ch
}
func (state *state) remove(clientID string, q Query) {


+ 7
- 0
pubsub/pubsub_test.go View File

@ -153,6 +153,13 @@ func TestBufferCapacity(t *testing.T) {
require.NoError(t, err)
err = s.Publish(ctx, "Sage")
require.NoError(t, err)
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
err = s.Publish(ctx, "Ironclad")
if assert.Error(t, err) {
assert.Equal(t, context.DeadlineExceeded, err)
}
}
func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) }


Loading…
Cancel
Save