diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 68d1ec941..1ae111f63 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -1,14 +1,12 @@ -// Package pubsub implements a pub-sub model with a single publisher (Server) -// and multiple subscribers (clients). +// Package pubsub implements an event dispatching server with a single publisher +// and multiple subscriber clients. Multiple goroutines can safely publish to a +// single Server instance. // -// Though you can have multiple publishers by sharing a pointer to a server or -// by giving the same channel to each publisher and publishing messages from -// that channel (fan-in). -// -// Clients subscribe for messages, which could be of any type, using a query. -// When some message is published, we match it with all queries. If there is a -// match, this message will be pushed to all clients, subscribed to that query. -// See query subpackage for our implementation. +// Clients register subscriptions with a query to select which messages they +// wish to receive. When messages are published, they are broadcast to all +// clients whose subscription query matches that message. Queries are +// constructed using the github.com/tendermint/tendermint/libs/pubsub/query +// package. // // Example: // @@ -16,7 +14,7 @@ // if err != nil { // return err // } -// ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second) +// ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) // defer cancel() // subscription, err := pubsub.Subscribe(ctx, "johns-transactions", q) // if err != nil { @@ -38,22 +36,12 @@ import ( "context" "errors" "fmt" + "sync" "github.com/tendermint/tendermint/abci/types" - tmsync "github.com/tendermint/tendermint/internal/libs/sync" - "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/libs/service" ) -type operation int - -const ( - sub operation = iota - pub - unsub - shutdown -) - var ( // ErrSubscriptionNotFound is returned when a client tries to unsubscribe // from not existing subscription. @@ -62,6 +50,10 @@ var ( // ErrAlreadySubscribed is returned when a client tries to subscribe twice or // more using the same query. ErrAlreadySubscribed = errors.New("already subscribed") + + // ErrServerStopped is returned when attempting to publish or subscribe to a + // server that has been stopped. + ErrServerStopped = errors.New("pubsub server is stopped") ) // Query defines an interface for a query to be used for subscribing. A query @@ -75,17 +67,21 @@ type Query interface { String() string } +// UnsubscribeArgs are the parameters to remove a subscription. +// The subscriber ID must be populated, and at least one of the client ID or +// the registered query. type UnsubscribeArgs struct { - ID string - Subscriber string - Query Query + Subscriber string // subscriber ID chosen by the client (required) + ID string // subscription ID (assigned by the server) + Query Query // the query registered with the subscription } +// Validate returns nil if args are valid to identify a subscription to remove. +// Otherwise, it reports an error. func (args UnsubscribeArgs) Validate() error { if args.Subscriber == "" { return errors.New("must specify a subscriber") } - if args.ID == "" && args.Query == nil { return fmt.Errorf("subscription is not fully defined [subscriber=%q]", args.Subscriber) } @@ -93,35 +89,28 @@ func (args UnsubscribeArgs) Validate() error { return nil } -type cmd struct { - op operation - - // subscribe, unsubscribe - query Query - subscription *Subscription - clientID string - - // publish - msg interface{} - events []types.Event -} - // Server allows clients to subscribe/unsubscribe for messages, publishing // messages with or without events, and manages internal state. type Server struct { service.BaseService - cmds chan cmd - cmdsCap int - - // check if we have subscription before - // subscribing or unsubscribing - mtx tmsync.RWMutex + queue chan item + done <-chan struct{} // closed when server should exit + stop func() // signal the server to exit + pubs sync.RWMutex // excl: shutdown; shared: active publisher + exited chan struct{} // server exited + + // All subscriptions currently known. + // Lock exclusive to add, remove, or cancel subscriptions. + // Lock shared to look up or publish to subscriptions. + subs struct { + sync.RWMutex + index *subIndex + } - // subscriber -> [query->id (string) OR id->query (string))], - // track connections both by ID (new) and query (legacy) to - // avoid breaking the interface. - subscriptions map[string]map[string]string + // TODO(creachadair): Rework the options so that this does not need to live + // as a field. It is not otherwise needed. + queueCap int } // Option sets a parameter for the server. @@ -131,37 +120,34 @@ type Option func(*Server) // for a detailed description of how to configure buffering. If no options are // provided, the resulting server's queue is unbuffered. func NewServer(options ...Option) *Server { - s := &Server{ - subscriptions: make(map[string]map[string]string), + s := new(Server) + for _, opt := range options { + opt(s) } s.BaseService = *service.NewBaseService(nil, "PubSub", s) - for _, option := range options { - option(s) - } + // The queue receives items to be published. + s.queue = make(chan item, s.queueCap) - // if BufferCapacity option was not set, the channel is unbuffered - s.cmds = make(chan cmd, s.cmdsCap) + // The index tracks subscriptions by ID and query terms. + s.subs.index = newSubIndex() return s } -// BufferCapacity allows you to specify capacity for the internal server's -// queue. Since the server, given Y subscribers, could only process X messages, -// this option could be used to survive spikes (e.g. high amount of -// transactions during peak hours). +// BufferCapacity allows you to specify capacity for publisher's queue. This +// is the number of messages that can be published without blocking. If no +// buffer is specified, publishing is synchronous with delivery. This function +// will panic if cap < 0. func BufferCapacity(cap int) Option { - return func(s *Server) { - if cap > 0 { - s.cmdsCap = cap - } + if cap < 0 { + panic("negative buffer capacity") } + return func(s *Server) { s.queueCap = cap } } -// BufferCapacity returns capacity of the internal server's queue. -func (s *Server) BufferCapacity() int { - return s.cmdsCap -} +// BufferCapacity returns capacity of the publication queue. +func (s *Server) BufferCapacity() int { return cap(s.queue) } // Subscribe creates a subscription for the given client. // @@ -195,331 +181,223 @@ func (s *Server) SubscribeUnbuffered(ctx context.Context, clientID string, query } func (s *Server) subscribe(ctx context.Context, clientID string, query Query, outCapacity int) (*Subscription, error) { - s.mtx.RLock() - clientSubscriptions, ok := s.subscriptions[clientID] - if ok { - _, ok = clientSubscriptions[query.String()] - } - s.mtx.RUnlock() - if ok { + s.subs.Lock() + defer s.subs.Unlock() + + if s.subs.index == nil { + return nil, ErrServerStopped + } else if s.subs.index.contains(clientID, query.String()) { return nil, ErrAlreadySubscribed } - subscription := NewSubscription(outCapacity) - select { - case s.cmds <- cmd{op: sub, clientID: clientID, query: query, subscription: subscription}: - s.mtx.Lock() - if _, ok = s.subscriptions[clientID]; !ok { - s.subscriptions[clientID] = make(map[string]string) - } - s.subscriptions[clientID][query.String()] = subscription.id - s.subscriptions[clientID][subscription.id] = query.String() - s.mtx.Unlock() - return subscription, nil - case <-ctx.Done(): - return nil, ctx.Err() - case <-s.Quit(): - return nil, nil - } + sub := NewSubscription(outCapacity) + s.subs.index.add(&subInfo{ + clientID: clientID, + query: query, + subID: sub.id, + sub: sub, + }) + return sub, nil } -// Unsubscribe removes the subscription on the given query. An error will be -// returned to the caller if the context is canceled or if subscription does -// not exist. +// Unsubscribe removes the subscription for the given client and/or query. It +// returns ErrSubscriptionNotFound if no such subscription exists. func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error { if err := args.Validate(); err != nil { return err } - var qs string - - if args.Query != nil { - qs = args.Query.String() + s.subs.Lock() + defer s.subs.Unlock() + if s.subs.index == nil { + return ErrServerStopped } - clientSubscriptions, err := func() (map[string]string, error) { - s.mtx.RLock() - defer s.mtx.RUnlock() - - clientSubscriptions, ok := s.subscriptions[args.Subscriber] - if args.ID != "" { - qs, ok = clientSubscriptions[args.ID] - - if ok && args.Query == nil { - var err error - args.Query, err = query.New(qs) - if err != nil { - return nil, err - } - } - } else if qs != "" { - args.ID, ok = clientSubscriptions[qs] - } - - if !ok { - return nil, ErrSubscriptionNotFound + // TODO(creachadair): Do we need to support unsubscription for an "empty" + // query? I believe that case is not possible by the Query grammar, but we + // should make sure. + // + // Revisit this logic once we are able to remove indexing by query. + + var evict subInfoSet + if args.Subscriber != "" { + evict = s.subs.index.findClientID(args.Subscriber) + if args.Query != nil { + evict = evict.withQuery(args.Query.String()) } - - return clientSubscriptions, nil - }() - - if err != nil { - return err + } else { + evict = s.subs.index.findQuery(args.Query.String()) } - select { - case s.cmds <- cmd{op: unsub, clientID: args.Subscriber, query: args.Query, subscription: &Subscription{id: args.ID}}: - s.mtx.Lock() - defer s.mtx.Unlock() - - delete(clientSubscriptions, args.ID) - delete(clientSubscriptions, qs) - - if len(clientSubscriptions) == 0 { - delete(s.subscriptions, args.Subscriber) - } - return nil - case <-ctx.Done(): - return ctx.Err() - case <-s.Quit(): - return nil + if len(evict) == 0 { + return ErrSubscriptionNotFound } + s.removeSubs(evict, ErrUnsubscribed) + return nil } -// UnsubscribeAll removes all client subscriptions. An error will be returned -// to the caller if the context is canceled or if subscription does not exist. +// UnsubscribeAll removes all subscriptions for the given client ID. +// It returns ErrSubscriptionNotFound if no subscriptions exist for that client. func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { - s.mtx.RLock() - _, ok := s.subscriptions[clientID] - s.mtx.RUnlock() - if !ok { - return ErrSubscriptionNotFound - } + s.subs.Lock() + defer s.subs.Unlock() - select { - case s.cmds <- cmd{op: unsub, clientID: clientID}: - s.mtx.Lock() - defer s.mtx.Unlock() - - delete(s.subscriptions, clientID) - - return nil - case <-ctx.Done(): - return ctx.Err() - case <-s.Quit(): - return nil + evict := s.subs.index.findClientID(clientID) + if len(evict) == 0 { + return ErrSubscriptionNotFound } + s.removeSubs(evict, ErrUnsubscribed) + return nil } // NumClients returns the number of clients. func (s *Server) NumClients() int { - s.mtx.RLock() - defer s.mtx.RUnlock() - return len(s.subscriptions) + s.subs.RLock() + defer s.subs.RUnlock() + return len(s.subs.index.byClient) } // NumClientSubscriptions returns the number of subscriptions the client has. func (s *Server) NumClientSubscriptions(clientID string) int { - s.mtx.RLock() - defer s.mtx.RUnlock() - return len(s.subscriptions[clientID]) / 2 + s.subs.RLock() + defer s.subs.RUnlock() + return len(s.subs.index.findClientID(clientID)) } // Publish publishes the given message. An error will be returned to the caller // if the context is canceled. func (s *Server) Publish(ctx context.Context, msg interface{}) error { - return s.PublishWithEvents(ctx, msg, []types.Event{}) + return s.publish(ctx, msg, []types.Event{}) } // PublishWithEvents publishes the given message with the set of events. The set // is matched with clients queries. If there is a match, the message is sent to // the client. func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events []types.Event) error { - select { - case s.cmds <- cmd{op: pub, msg: msg, events: events}: - return nil - case <-ctx.Done(): - return ctx.Err() - case <-s.Quit(): - return nil - } + return s.publish(ctx, msg, events) } // OnStop implements Service.OnStop by shutting down the server. -func (s *Server) OnStop() { - s.cmds <- cmd{op: shutdown} -} - -// NOTE: not goroutine safe -type state struct { - // query string -> client -> subscription - subscriptions map[string]map[string]*Subscription - // query string -> queryPlusRefCount - queries map[string]*queryPlusRefCount -} +func (s *Server) OnStop() { s.stop() } -// queryPlusRefCount holds a pointer to a query and reference counter. When -// refCount is zero, query will be removed. -type queryPlusRefCount struct { - q Query - refCount int -} +// Wait implements Service.Wait by blocking until the server has exited, then +// yielding to the base service wait. +func (s *Server) Wait() { <-s.exited; s.BaseService.Wait() } // OnStart implements Service.OnStart by starting the server. -func (s *Server) OnStart() error { - go s.loop(state{ - subscriptions: make(map[string]map[string]*Subscription), - queries: make(map[string]*queryPlusRefCount), - }) - return nil -} +func (s *Server) OnStart() error { s.run(); return nil } -// OnReset implements Service.OnReset -func (s *Server) OnReset() error { - return nil -} +// OnReset implements Service.OnReset. It has no effect for this service. +func (s *Server) OnReset() error { return nil } -func (s *Server) loop(state state) { -loop: - for cmd := range s.cmds { - switch cmd.op { - case unsub: - if cmd.query != nil { - state.remove(cmd.clientID, cmd.query.String(), cmd.subscription.id, ErrUnsubscribed) - } else { - state.removeClient(cmd.clientID, ErrUnsubscribed) - } - case shutdown: - state.removeAll(nil) - break loop - case sub: - state.add(cmd.clientID, cmd.query, cmd.subscription) - case pub: - if err := state.send(cmd.msg, cmd.events); err != nil { - s.Logger.Error("Error querying for events", "err", err) - } - } - } -} - -func (state *state) add(clientID string, q Query, subscription *Subscription) { - qStr := q.String() +func (s *Server) publish(ctx context.Context, data interface{}, events []types.Event) error { + s.pubs.RLock() + defer s.pubs.RUnlock() - // initialize subscription for this client per query if needed - if _, ok := state.subscriptions[qStr]; !ok { - state.subscriptions[qStr] = make(map[string]*Subscription) - } - - if _, ok := state.subscriptions[subscription.id]; !ok { - state.subscriptions[subscription.id] = make(map[string]*Subscription) - } - - // create subscription - state.subscriptions[qStr][clientID] = subscription - state.subscriptions[subscription.id][clientID] = subscription - - // initialize query if needed - if _, ok := state.queries[qStr]; !ok { - state.queries[qStr] = &queryPlusRefCount{q: q, refCount: 0} + select { + case <-s.done: + return ErrServerStopped + case <-ctx.Done(): + return ctx.Err() + case s.queue <- item{ + Data: data, + Events: events, + }: + return nil } - // increment reference counter - state.queries[qStr].refCount++ } -func (state *state) remove(clientID string, qStr, id string, reason error) { - clientSubscriptions, ok := state.subscriptions[qStr] - if !ok { - return - } - - subscription, ok := clientSubscriptions[clientID] - if !ok { - return - } +func (s *Server) run() { + // The server runs until ctx is canceled. + ctx, cancel := context.WithCancel(context.Background()) + s.done = ctx.Done() + s.stop = cancel + + // Shutdown monitor: When the context ends, wait for any active publish + // calls to exit, then close the queue to signal the sender to exit. + go func() { + <-ctx.Done() + s.pubs.Lock() + defer s.pubs.Unlock() + close(s.queue) + }() - subscription.cancel(reason) + s.exited = make(chan struct{}) + go func() { + defer close(s.exited) - // remove client from query map. - // if query has no other clients subscribed, remove it. - delete(state.subscriptions[qStr], clientID) - delete(state.subscriptions[id], clientID) - if len(state.subscriptions[qStr]) == 0 { - delete(state.subscriptions, qStr) - } - - // decrease ref counter in queries - if ref, ok := state.queries[qStr]; ok { - ref.refCount-- - if ref.refCount == 0 { - // remove the query if nobody else is using it - delete(state.queries, qStr) + // Sender: Service the queue and forward messages to subscribers. + for it := range s.queue { + if err := s.send(it.Data, it.Events); err != nil { + s.Logger.Error("Error sending event", "err", err) + } } - } + // Terminate all subscribers without error before exit. + s.subs.Lock() + defer s.subs.Unlock() + for si := range s.subs.index.all { + si.sub.cancel(nil) + } + s.subs.index = nil + }() } -func (state *state) removeClient(clientID string, reason error) { - seen := map[string]struct{}{} - for qStr, clientSubscriptions := range state.subscriptions { - if sub, ok := clientSubscriptions[clientID]; ok { - if _, ok = seen[sub.id]; ok { - // all subscriptions are double indexed by ID and query, only - // process them once. - continue - } - state.remove(clientID, qStr, sub.id, reason) - seen[sub.id] = struct{}{} - } +// removeSubs cancels and removes all the subscriptions in evict with the given +// error. The caller must hold the s.subs lock. +func (s *Server) removeSubs(evict subInfoSet, reason error) { + for si := range evict { + si.sub.cancel(reason) } + s.subs.index.removeAll(evict) } -func (state *state) removeAll(reason error) { - for qStr, clientSubscriptions := range state.subscriptions { - sub, ok := clientSubscriptions[qStr] - if !ok || ok && sub.id == qStr { - // all subscriptions are double indexed by ID and query, only - // process them once. - continue +// send delivers the given message to all matching subscribers. An error in +// query matching stops transmission and is returned. +func (s *Server) send(data interface{}, events []types.Event) error { + // At exit, evict any subscriptions that were too slow. + evict := make(subInfoSet) + defer func() { + if len(evict) != 0 { + s.subs.Lock() + defer s.subs.Unlock() + s.removeSubs(evict, ErrOutOfCapacity) } + }() - for clientID := range clientSubscriptions { - state.remove(clientID, qStr, sub.id, reason) - } - } -} + // N.B. Order is important here. We must acquire and defer the lock release + // AFTER deferring the eviction cleanup: The cleanup must happen after the + // reader lock has released, or it will deadlock. + s.subs.RLock() + defer s.subs.RUnlock() -func (state *state) send(msg interface{}, events []types.Event) error { - for qStr, clientSubscriptions := range state.subscriptions { - if sub, ok := clientSubscriptions[qStr]; ok && sub.id == qStr { - continue - } - var q Query - if qi, ok := state.queries[qStr]; ok { - q = qi.q - } else { + for si := range s.subs.index.all { + match, err := si.query.Matches(events) + if err != nil { + return fmt.Errorf("match failed against query: %w", err) + // TODO(creachadair): Should we evict this subscription? + } else if !match { continue } - match, err := q.Matches(events) - if err != nil { - return fmt.Errorf("failed to match against query %s: %w", q.String(), err) + // Subscriptions may be buffered or unbuffered. Unbuffered subscriptions + // are intended for internal use such as indexing, where we don't want to + // penalize a slow reader. Buffered subscribers must keep up with their + // queue, or they will be terminated. + // + // TODO(creachadair): Unbuffered subscriptions used by the event indexer + // to avoid losing events if it happens to be slow. Rework this so that + // use case doesn't require this affordance, and then remove unbuffered + // subscriptions. + msg := NewMessage(si.sub.id, data, events) + if cap(si.sub.out) == 0 { + si.sub.out <- msg + continue } - - if match { - for clientID, subscription := range clientSubscriptions { - if cap(subscription.out) == 0 { - // block on unbuffered channel - select { - case subscription.out <- NewMessage(subscription.id, msg, events): - case <-subscription.canceled: - } - } else { - // don't block on buffered channels - select { - case subscription.out <- NewMessage(subscription.id, msg, events): - default: - state.remove(clientID, qStr, subscription.id, ErrOutOfCapacity) - } - } - } + select { + case si.sub.out <- msg: + // ok, delivered + default: + // slow subscriber, cancel them + evict.add(si) } } diff --git a/libs/pubsub/subindex.go b/libs/pubsub/subindex.go new file mode 100644 index 000000000..48dccf72d --- /dev/null +++ b/libs/pubsub/subindex.go @@ -0,0 +1,113 @@ +package pubsub + +import "github.com/tendermint/tendermint/abci/types" + +// An item to be published to subscribers. +type item struct { + Data interface{} + Events []types.Event +} + +// A subInfo value records a single subscription. +type subInfo struct { + clientID string // chosen by the client + query Query // chosen by the client + subID string // assigned at registration + sub *Subscription // receives published events +} + +// A subInfoSet is an unordered set of subscription info records. +type subInfoSet map[*subInfo]struct{} + +func (s subInfoSet) contains(si *subInfo) bool { _, ok := s[si]; return ok } +func (s subInfoSet) add(si *subInfo) { s[si] = struct{}{} } +func (s subInfoSet) remove(si *subInfo) { delete(s, si) } + +// withQuery returns the subset of s whose query string matches qs. +func (s subInfoSet) withQuery(qs string) subInfoSet { + out := make(subInfoSet) + for si := range s { + if si.query.String() == qs { + out.add(si) + } + } + return out +} + +// A subIndex is an indexed collection of subscription info records. +// The index is not safe for concurrent use without external synchronization. +type subIndex struct { + all subInfoSet // all subscriptions + byClient map[string]subInfoSet // per-client subscriptions + byQuery map[string]subInfoSet // per-query subscriptions + + // TODO(creachadair): We allow indexing by query to support existing use by + // the RPC service methods for event streaming. Fix up those methods not to + // require this, and then remove indexing by query. +} + +// newSubIndex constructs a new, empty subscription index. +func newSubIndex() *subIndex { + return &subIndex{ + all: make(subInfoSet), + byClient: make(map[string]subInfoSet), + byQuery: make(map[string]subInfoSet), + } +} + +// findClients returns the set of subscriptions for the given client ID, or nil. +func (idx *subIndex) findClientID(id string) subInfoSet { return idx.byClient[id] } + +// findQuery returns the set of subscriptions on the given query string, or nil. +func (idx *subIndex) findQuery(qs string) subInfoSet { return idx.byQuery[qs] } + +// contains reports whether idx contains any subscription matching the given +// client ID and query pair. +func (idx *subIndex) contains(clientID, query string) bool { + csubs, qsubs := idx.byClient[clientID], idx.byQuery[query] + if len(csubs) == 0 || len(qsubs) == 0 { + return false + } + for si := range csubs { + if qsubs.contains(si) { + return true + } + } + return false +} + +// add adds si to the index, replacing any previous entry with the same terms. +// It is the caller's responsibility to check for duplicates before adding. +// See also the contains method. +func (idx *subIndex) add(si *subInfo) { + idx.all.add(si) + if m := idx.byClient[si.clientID]; m == nil { + idx.byClient[si.clientID] = subInfoSet{si: struct{}{}} + } else { + m.add(si) + } + qs := si.query.String() + if m := idx.byQuery[qs]; m == nil { + idx.byQuery[qs] = subInfoSet{si: struct{}{}} + } else { + m.add(si) + } +} + +// removeAll removes all the elements of s from the index. +func (idx *subIndex) removeAll(s subInfoSet) { + for si := range s { + idx.all.remove(si) + idx.byClient[si.clientID].remove(si) + if len(idx.byClient[si.clientID]) == 0 { + delete(idx.byClient, si.clientID) + } + if si.query != nil { + qs := si.query.String() + idx.byQuery[qs].remove(si) + if len(idx.byQuery[qs]) == 0 { + delete(idx.byQuery, qs) + } + } + } +} diff --git a/libs/pubsub/subscription.go b/libs/pubsub/subscription.go index 40b84711e..4210416b6 100644 --- a/libs/pubsub/subscription.go +++ b/libs/pubsub/subscription.go @@ -44,9 +44,7 @@ func NewSubscription(outCapacity int) *Subscription { // Out returns a channel onto which messages and events are published. // Unsubscribe/UnsubscribeAll does not close the channel to avoid clients from // receiving a nil message. -func (s *Subscription) Out() <-chan Message { - return s.out -} +func (s *Subscription) Out() <-chan Message { return s.out } func (s *Subscription) ID() string { return s.id }