|
|
@ -40,6 +40,7 @@ import ( |
|
|
|
"fmt" |
|
|
|
|
|
|
|
tmsync "github.com/tendermint/tendermint/internal/libs/sync" |
|
|
|
"github.com/tendermint/tendermint/libs/pubsub/query" |
|
|
|
"github.com/tendermint/tendermint/libs/service" |
|
|
|
) |
|
|
|
|
|
|
@ -73,6 +74,24 @@ type Query interface { |
|
|
|
String() string |
|
|
|
} |
|
|
|
|
|
|
|
type UnsubscribeArgs struct { |
|
|
|
ID string |
|
|
|
Subscriber string |
|
|
|
Query Query |
|
|
|
} |
|
|
|
|
|
|
|
func (args UnsubscribeArgs) Validate() error { |
|
|
|
if args.Subscriber == "" { |
|
|
|
return errors.New("must specify a subscriber") |
|
|
|
} |
|
|
|
|
|
|
|
if args.ID == "" && args.Query == nil { |
|
|
|
return fmt.Errorf("subscription is not fully defined [subscriber=%q]", args.Subscriber) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
type cmd struct { |
|
|
|
op operation |
|
|
|
|
|
|
@ -96,8 +115,12 @@ type Server struct { |
|
|
|
|
|
|
|
// check if we have subscription before
|
|
|
|
// subscribing or unsubscribing
|
|
|
|
mtx tmsync.RWMutex |
|
|
|
subscriptions map[string]map[string]struct{} // subscriber -> query (string) -> empty struct
|
|
|
|
mtx tmsync.RWMutex |
|
|
|
|
|
|
|
// subscriber -> [query->id (string) OR id->query (string))],
|
|
|
|
// track connections both by ID (new) and query (legacy) to
|
|
|
|
// avoid breaking the interface.
|
|
|
|
subscriptions map[string]map[string]string |
|
|
|
} |
|
|
|
|
|
|
|
// Option sets a parameter for the server.
|
|
|
@ -108,7 +131,7 @@ type Option func(*Server) |
|
|
|
// provided, the resulting server's queue is unbuffered.
|
|
|
|
func NewServer(options ...Option) *Server { |
|
|
|
s := &Server{ |
|
|
|
subscriptions: make(map[string]map[string]struct{}), |
|
|
|
subscriptions: make(map[string]map[string]string), |
|
|
|
} |
|
|
|
s.BaseService = *service.NewBaseService(nil, "PubSub", s) |
|
|
|
|
|
|
@ -186,9 +209,10 @@ func (s *Server) subscribe(ctx context.Context, clientID string, query Query, ou |
|
|
|
case s.cmds <- cmd{op: sub, clientID: clientID, query: query, subscription: subscription}: |
|
|
|
s.mtx.Lock() |
|
|
|
if _, ok = s.subscriptions[clientID]; !ok { |
|
|
|
s.subscriptions[clientID] = make(map[string]struct{}) |
|
|
|
s.subscriptions[clientID] = make(map[string]string) |
|
|
|
} |
|
|
|
s.subscriptions[clientID][query.String()] = struct{}{} |
|
|
|
s.subscriptions[clientID][query.String()] = subscription.id |
|
|
|
s.subscriptions[clientID][subscription.id] = query.String() |
|
|
|
s.mtx.Unlock() |
|
|
|
return subscription, nil |
|
|
|
case <-ctx.Done(): |
|
|
@ -201,23 +225,45 @@ func (s *Server) subscribe(ctx context.Context, clientID string, query Query, ou |
|
|
|
// Unsubscribe removes the subscription on the given query. An error will be
|
|
|
|
// returned to the caller if the context is canceled or if subscription does
|
|
|
|
// not exist.
|
|
|
|
func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error { |
|
|
|
func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error { |
|
|
|
if err := args.Validate(); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
var qs string |
|
|
|
if args.Query != nil { |
|
|
|
qs = args.Query.String() |
|
|
|
} |
|
|
|
|
|
|
|
s.mtx.RLock() |
|
|
|
clientSubscriptions, ok := s.subscriptions[clientID] |
|
|
|
if ok { |
|
|
|
_, ok = clientSubscriptions[query.String()] |
|
|
|
clientSubscriptions, ok := s.subscriptions[args.Subscriber] |
|
|
|
if args.ID != "" { |
|
|
|
qs, ok = clientSubscriptions[args.ID] |
|
|
|
|
|
|
|
if ok && args.Query == nil { |
|
|
|
var err error |
|
|
|
args.Query, err = query.New(qs) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
} else if qs != "" { |
|
|
|
args.ID, ok = clientSubscriptions[qs] |
|
|
|
} |
|
|
|
|
|
|
|
s.mtx.RUnlock() |
|
|
|
if !ok { |
|
|
|
return ErrSubscriptionNotFound |
|
|
|
} |
|
|
|
|
|
|
|
select { |
|
|
|
case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}: |
|
|
|
case s.cmds <- cmd{op: unsub, clientID: args.Subscriber, query: args.Query, subscription: &Subscription{id: args.ID}}: |
|
|
|
s.mtx.Lock() |
|
|
|
delete(clientSubscriptions, query.String()) |
|
|
|
|
|
|
|
delete(clientSubscriptions, args.ID) |
|
|
|
delete(clientSubscriptions, qs) |
|
|
|
|
|
|
|
if len(clientSubscriptions) == 0 { |
|
|
|
delete(s.subscriptions, clientID) |
|
|
|
delete(s.subscriptions, args.Subscriber) |
|
|
|
} |
|
|
|
s.mtx.Unlock() |
|
|
|
return nil |
|
|
@ -262,7 +308,7 @@ func (s *Server) NumClients() int { |
|
|
|
func (s *Server) NumClientSubscriptions(clientID string) int { |
|
|
|
s.mtx.RLock() |
|
|
|
defer s.mtx.RUnlock() |
|
|
|
return len(s.subscriptions[clientID]) |
|
|
|
return len(s.subscriptions[clientID]) / 2 |
|
|
|
} |
|
|
|
|
|
|
|
// Publish publishes the given message. An error will be returned to the caller
|
|
|
@ -325,7 +371,7 @@ loop: |
|
|
|
switch cmd.op { |
|
|
|
case unsub: |
|
|
|
if cmd.query != nil { |
|
|
|
state.remove(cmd.clientID, cmd.query.String(), ErrUnsubscribed) |
|
|
|
state.remove(cmd.clientID, cmd.query.String(), cmd.subscription.id, ErrUnsubscribed) |
|
|
|
} else { |
|
|
|
state.removeClient(cmd.clientID, ErrUnsubscribed) |
|
|
|
} |
|
|
@ -349,8 +395,14 @@ func (state *state) add(clientID string, q Query, subscription *Subscription) { |
|
|
|
if _, ok := state.subscriptions[qStr]; !ok { |
|
|
|
state.subscriptions[qStr] = make(map[string]*Subscription) |
|
|
|
} |
|
|
|
|
|
|
|
if _, ok := state.subscriptions[subscription.id]; !ok { |
|
|
|
state.subscriptions[subscription.id] = make(map[string]*Subscription) |
|
|
|
} |
|
|
|
|
|
|
|
// create subscription
|
|
|
|
state.subscriptions[qStr][clientID] = subscription |
|
|
|
state.subscriptions[subscription.id][clientID] = subscription |
|
|
|
|
|
|
|
// initialize query if needed
|
|
|
|
if _, ok := state.queries[qStr]; !ok { |
|
|
@ -360,7 +412,7 @@ func (state *state) add(clientID string, q Query, subscription *Subscription) { |
|
|
|
state.queries[qStr].refCount++ |
|
|
|
} |
|
|
|
|
|
|
|
func (state *state) remove(clientID string, qStr string, reason error) { |
|
|
|
func (state *state) remove(clientID string, qStr, id string, reason error) { |
|
|
|
clientSubscriptions, ok := state.subscriptions[qStr] |
|
|
|
if !ok { |
|
|
|
return |
|
|
@ -376,37 +428,62 @@ func (state *state) remove(clientID string, qStr string, reason error) { |
|
|
|
// remove client from query map.
|
|
|
|
// if query has no other clients subscribed, remove it.
|
|
|
|
delete(state.subscriptions[qStr], clientID) |
|
|
|
delete(state.subscriptions[id], clientID) |
|
|
|
if len(state.subscriptions[qStr]) == 0 { |
|
|
|
delete(state.subscriptions, qStr) |
|
|
|
} |
|
|
|
|
|
|
|
// decrease ref counter in queries
|
|
|
|
state.queries[qStr].refCount-- |
|
|
|
// remove the query if nobody else is using it
|
|
|
|
if state.queries[qStr].refCount == 0 { |
|
|
|
delete(state.queries, qStr) |
|
|
|
if ref, ok := state.queries[qStr]; ok { |
|
|
|
ref.refCount-- |
|
|
|
if ref.refCount == 0 { |
|
|
|
// remove the query if nobody else is using it
|
|
|
|
delete(state.queries, qStr) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (state *state) removeClient(clientID string, reason error) { |
|
|
|
seen := map[string]struct{}{} |
|
|
|
for qStr, clientSubscriptions := range state.subscriptions { |
|
|
|
if _, ok := clientSubscriptions[clientID]; ok { |
|
|
|
state.remove(clientID, qStr, reason) |
|
|
|
if sub, ok := clientSubscriptions[clientID]; ok { |
|
|
|
if _, ok = seen[sub.id]; ok { |
|
|
|
// all subscriptions are double indexed by ID and query, only
|
|
|
|
// process them once.
|
|
|
|
continue |
|
|
|
} |
|
|
|
state.remove(clientID, qStr, sub.id, reason) |
|
|
|
seen[sub.id] = struct{}{} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (state *state) removeAll(reason error) { |
|
|
|
for qStr, clientSubscriptions := range state.subscriptions { |
|
|
|
sub, ok := clientSubscriptions[qStr] |
|
|
|
if !ok || ok && sub.id == qStr { |
|
|
|
// all subscriptions are double indexed by ID and query, only
|
|
|
|
// process them once.
|
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
for clientID := range clientSubscriptions { |
|
|
|
state.remove(clientID, qStr, reason) |
|
|
|
state.remove(clientID, qStr, sub.id, reason) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (state *state) send(msg interface{}, events map[string][]string) error { |
|
|
|
for qStr, clientSubscriptions := range state.subscriptions { |
|
|
|
q := state.queries[qStr].q |
|
|
|
if sub, ok := clientSubscriptions[qStr]; ok && sub.id == qStr { |
|
|
|
continue |
|
|
|
} |
|
|
|
var q Query |
|
|
|
if qi, ok := state.queries[qStr]; ok { |
|
|
|
q = qi.q |
|
|
|
} else { |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
match, err := q.Matches(events) |
|
|
|
if err != nil { |
|
|
@ -417,13 +494,13 @@ func (state *state) send(msg interface{}, events map[string][]string) error { |
|
|
|
for clientID, subscription := range clientSubscriptions { |
|
|
|
if cap(subscription.out) == 0 { |
|
|
|
// block on unbuffered channel
|
|
|
|
subscription.out <- NewMessage(msg, events) |
|
|
|
subscription.out <- NewMessage(subscription.id, msg, events) |
|
|
|
} else { |
|
|
|
// don't block on buffered channels
|
|
|
|
select { |
|
|
|
case subscription.out <- NewMessage(msg, events): |
|
|
|
case subscription.out <- NewMessage(subscription.id, msg, events): |
|
|
|
default: |
|
|
|
state.remove(clientID, qStr, ErrOutOfCapacity) |
|
|
|
state.remove(clientID, qStr, subscription.id, ErrOutOfCapacity) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|