- // Package pubsub implements an event dispatching server with a single publisher
- // and multiple subscriber clients. Multiple goroutines can safely publish to a
- // single Server instance.
- //
- // Clients register subscriptions with a query to select which messages they
- // wish to receive. When messages are published, they are broadcast to all
- // clients whose subscription query matches that message. Queries are
- // constructed using the github.com/tendermint/tendermint/libs/pubsub/query
- // package.
- //
- // Example:
- //
- // q, err := query.New("account.name='John'")
- // if err != nil {
- // return err
- // }
- // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
- // defer cancel()
- // subscription, err := pubsub.Subscribe(ctx, "johns-transactions", q)
- // if err != nil {
- // return err
- // }
- //
- // for {
- // select {
- // case msg <- subscription.Out():
- // // handle msg.Data() and msg.Events()
- // case <-subscription.Canceled():
- // return subscription.Err()
- // }
- // }
- //
- package pubsub
-
- import (
- "context"
- "errors"
- "fmt"
- "sync"
-
- "github.com/tendermint/tendermint/abci/types"
- "github.com/tendermint/tendermint/libs/service"
- )
-
- var (
- // ErrSubscriptionNotFound is returned when a client tries to unsubscribe
- // from not existing subscription.
- ErrSubscriptionNotFound = errors.New("subscription not found")
-
- // ErrAlreadySubscribed is returned when a client tries to subscribe twice or
- // more using the same query.
- ErrAlreadySubscribed = errors.New("already subscribed")
-
- // ErrServerStopped is returned when attempting to publish or subscribe to a
- // server that has been stopped.
- ErrServerStopped = errors.New("pubsub server is stopped")
- )
-
- // Query defines an interface for a query to be used for subscribing. A query
- // matches against a map of events. Each key in this map is a composite of the
- // even type and an attribute key (e.g. "{eventType}.{eventAttrKey}") and the
- // values are the event values that are contained under that relationship. This
- // allows event types to repeat themselves with the same set of keys and
- // different values.
- type Query interface {
- Matches(events []types.Event) (bool, error)
- String() string
- }
-
- // UnsubscribeArgs are the parameters to remove a subscription.
- // The subscriber ID must be populated, and at least one of the client ID or
- // the registered query.
- type UnsubscribeArgs struct {
- Subscriber string // subscriber ID chosen by the client (required)
- ID string // subscription ID (assigned by the server)
- Query Query // the query registered with the subscription
- }
-
- // Validate returns nil if args are valid to identify a subscription to remove.
- // Otherwise, it reports an error.
- func (args UnsubscribeArgs) Validate() error {
- if args.Subscriber == "" {
- return errors.New("must specify a subscriber")
- }
- if args.ID == "" && args.Query == nil {
- return fmt.Errorf("subscription is not fully defined [subscriber=%q]", args.Subscriber)
- }
-
- return nil
- }
-
- // Server allows clients to subscribe/unsubscribe for messages, publishing
- // messages with or without events, and manages internal state.
- type Server struct {
- service.BaseService
-
- queue chan item
- done <-chan struct{} // closed when server should exit
- stop func() // signal the server to exit
- pubs sync.RWMutex // excl: shutdown; shared: active publisher
- exited chan struct{} // server exited
-
- // All subscriptions currently known.
- // Lock exclusive to add, remove, or cancel subscriptions.
- // Lock shared to look up or publish to subscriptions.
- subs struct {
- sync.RWMutex
- index *subIndex
- }
-
- // TODO(creachadair): Rework the options so that this does not need to live
- // as a field. It is not otherwise needed.
- queueCap int
- }
-
- // Option sets a parameter for the server.
- type Option func(*Server)
-
- // NewServer returns a new server. See the commentary on the Option functions
- // for a detailed description of how to configure buffering. If no options are
- // provided, the resulting server's queue is unbuffered.
- func NewServer(options ...Option) *Server {
- s := new(Server)
- for _, opt := range options {
- opt(s)
- }
- s.BaseService = *service.NewBaseService(nil, "PubSub", s)
-
- // The queue receives items to be published.
- s.queue = make(chan item, s.queueCap)
-
- // The index tracks subscriptions by ID and query terms.
- s.subs.index = newSubIndex()
-
- return s
- }
-
- // BufferCapacity allows you to specify capacity for publisher's queue. This
- // is the number of messages that can be published without blocking. If no
- // buffer is specified, publishing is synchronous with delivery. This function
- // will panic if cap < 0.
- func BufferCapacity(cap int) Option {
- if cap < 0 {
- panic("negative buffer capacity")
- }
- return func(s *Server) { s.queueCap = cap }
- }
-
- // BufferCapacity returns capacity of the publication queue.
- func (s *Server) BufferCapacity() int { return cap(s.queue) }
-
- // Subscribe creates a subscription for the given client.
- //
- // An error will be returned to the caller if the context is canceled or if
- // subscription already exist for pair clientID and query.
- //
- // outCapacity can be used to set a capacity for Subscription#Out channel (1 by
- // default). Panics if outCapacity is less than or equal to zero. If you want
- // an unbuffered channel, use SubscribeUnbuffered.
- func (s *Server) Subscribe(
- ctx context.Context,
- clientID string,
- query Query,
- outCapacity ...int) (*Subscription, error) {
- outCap := 1
- if len(outCapacity) > 0 {
- if outCapacity[0] <= 0 {
- panic("Negative or zero capacity. Use SubscribeUnbuffered if you want an unbuffered channel")
- }
- outCap = outCapacity[0]
- }
-
- return s.subscribe(ctx, clientID, query, outCap)
- }
-
- // SubscribeUnbuffered does the same as Subscribe, except it returns a
- // subscription with unbuffered channel. Use with caution as it can freeze the
- // server.
- func (s *Server) SubscribeUnbuffered(ctx context.Context, clientID string, query Query) (*Subscription, error) {
- return s.subscribe(ctx, clientID, query, 0)
- }
-
- func (s *Server) subscribe(ctx context.Context, clientID string, query Query, outCapacity int) (*Subscription, error) {
- s.subs.Lock()
- defer s.subs.Unlock()
-
- if s.subs.index == nil {
- return nil, ErrServerStopped
- } else if s.subs.index.contains(clientID, query.String()) {
- return nil, ErrAlreadySubscribed
- }
-
- sub, err := newSubscription(outCapacity)
- if err != nil {
- return nil, err
- }
- s.subs.index.add(&subInfo{
- clientID: clientID,
- query: query,
- subID: sub.id,
- sub: sub,
- })
- return sub, nil
- }
-
- // Unsubscribe removes the subscription for the given client and/or query. It
- // returns ErrSubscriptionNotFound if no such subscription exists.
- func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error {
- if err := args.Validate(); err != nil {
- return err
- }
- s.subs.Lock()
- defer s.subs.Unlock()
- if s.subs.index == nil {
- return ErrServerStopped
- }
-
- // TODO(creachadair): Do we need to support unsubscription for an "empty"
- // query? I believe that case is not possible by the Query grammar, but we
- // should make sure.
- //
- // Revisit this logic once we are able to remove indexing by query.
-
- var evict subInfoSet
- if args.Subscriber != "" {
- evict = s.subs.index.findClientID(args.Subscriber)
- if args.Query != nil {
- evict = evict.withQuery(args.Query.String())
- }
- } else {
- evict = s.subs.index.findQuery(args.Query.String())
- }
-
- if len(evict) == 0 {
- return ErrSubscriptionNotFound
- }
- s.removeSubs(evict, ErrUnsubscribed)
- return nil
- }
-
- // UnsubscribeAll removes all subscriptions for the given client ID.
- // It returns ErrSubscriptionNotFound if no subscriptions exist for that client.
- func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
- s.subs.Lock()
- defer s.subs.Unlock()
-
- evict := s.subs.index.findClientID(clientID)
- if len(evict) == 0 {
- return ErrSubscriptionNotFound
- }
- s.removeSubs(evict, ErrUnsubscribed)
- return nil
- }
-
- // NumClients returns the number of clients.
- func (s *Server) NumClients() int {
- s.subs.RLock()
- defer s.subs.RUnlock()
- return len(s.subs.index.byClient)
- }
-
- // NumClientSubscriptions returns the number of subscriptions the client has.
- func (s *Server) NumClientSubscriptions(clientID string) int {
- s.subs.RLock()
- defer s.subs.RUnlock()
- return len(s.subs.index.findClientID(clientID))
- }
-
- // Publish publishes the given message. An error will be returned to the caller
- // if the context is canceled.
- func (s *Server) Publish(ctx context.Context, msg interface{}) error {
- return s.publish(ctx, msg, []types.Event{})
- }
-
- // PublishWithEvents publishes the given message with the set of events. The set
- // is matched with clients queries. If there is a match, the message is sent to
- // the client.
- func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events []types.Event) error {
- return s.publish(ctx, msg, events)
- }
-
- // OnStop implements Service.OnStop by shutting down the server.
- func (s *Server) OnStop() { s.stop() }
-
- // Wait implements Service.Wait by blocking until the server has exited, then
- // yielding to the base service wait.
- func (s *Server) Wait() { <-s.exited; s.BaseService.Wait() }
-
- // OnStart implements Service.OnStart by starting the server.
- func (s *Server) OnStart() error { s.run(); return nil }
-
- // OnReset implements Service.OnReset. It has no effect for this service.
- func (s *Server) OnReset() error { return nil }
-
- func (s *Server) publish(ctx context.Context, data interface{}, events []types.Event) error {
- s.pubs.RLock()
- defer s.pubs.RUnlock()
-
- select {
- case <-s.done:
- return ErrServerStopped
- case <-ctx.Done():
- return ctx.Err()
- case s.queue <- item{
- Data: data,
- Events: events,
- }:
- return nil
- }
- }
-
- func (s *Server) run() {
- // The server runs until ctx is canceled.
- ctx, cancel := context.WithCancel(context.Background())
- s.done = ctx.Done()
- s.stop = cancel
-
- // Shutdown monitor: When the context ends, wait for any active publish
- // calls to exit, then close the queue to signal the sender to exit.
- go func() {
- <-ctx.Done()
- s.pubs.Lock()
- defer s.pubs.Unlock()
- close(s.queue)
- }()
-
- s.exited = make(chan struct{})
- go func() {
- defer close(s.exited)
-
- // Sender: Service the queue and forward messages to subscribers.
- for it := range s.queue {
- if err := s.send(it.Data, it.Events); err != nil {
- s.Logger.Error("Error sending event", "err", err)
- }
- }
- // Terminate all subscribers without error before exit.
- s.subs.Lock()
- defer s.subs.Unlock()
- for si := range s.subs.index.all {
- si.sub.cancel(nil)
- }
- s.subs.index = nil
- }()
- }
-
- // removeSubs cancels and removes all the subscriptions in evict with the given
- // error. The caller must hold the s.subs lock.
- func (s *Server) removeSubs(evict subInfoSet, reason error) {
- for si := range evict {
- si.sub.cancel(reason)
- }
- s.subs.index.removeAll(evict)
- }
-
- // send delivers the given message to all matching subscribers. An error in
- // query matching stops transmission and is returned.
- func (s *Server) send(data interface{}, events []types.Event) error {
- // At exit, evict any subscriptions that were too slow.
- evict := make(subInfoSet)
- defer func() {
- if len(evict) != 0 {
- s.subs.Lock()
- defer s.subs.Unlock()
- s.removeSubs(evict, ErrOutOfCapacity)
- }
- }()
-
- // N.B. Order is important here. We must acquire and defer the lock release
- // AFTER deferring the eviction cleanup: The cleanup must happen after the
- // reader lock has released, or it will deadlock.
- s.subs.RLock()
- defer s.subs.RUnlock()
-
- for si := range s.subs.index.all {
- match, err := si.query.Matches(events)
- if err != nil {
- return fmt.Errorf("match failed against query: %w", err)
- // TODO(creachadair): Should we evict this subscription?
- } else if !match {
- continue
- }
-
- // Subscriptions may be buffered or unbuffered. Unbuffered subscriptions
- // are intended for internal use such as indexing, where we don't want to
- // penalize a slow reader. Buffered subscribers must keep up with their
- // queue, or they will be terminated.
- //
- // TODO(creachadair): Unbuffered subscriptions used by the event indexer
- // to avoid losing events if it happens to be slow. Rework this so that
- // use case doesn't require this affordance, and then remove unbuffered
- // subscriptions.
- msg := NewMessage(si.sub.id, data, events)
- if err := si.sub.putMessage(msg); err != nil {
- // The subscriber was too slow, cancel them.
- evict.add(si)
- }
- }
-
- return nil
- }
|