- // Package pubsub implements an event dispatching server with a single publisher
- // and multiple subscriber clients. Multiple goroutines can safely publish to a
- // single Server instance.
- //
- // Clients register subscriptions with a query to select which messages they
- // wish to receive. When messages are published, they are broadcast to all
- // clients whose subscription query matches that message. Queries are
- // constructed using the github.com/tendermint/tendermint/internal/pubsub/query
- // package.
- //
- // Example:
- //
- // q, err := query.New(`account.name='John'`)
- // if err != nil {
- // return err
- // }
- // sub, err := pubsub.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
- // ClientID: "johns-transactions",
- // Query: q,
- // })
- // if err != nil {
- // return err
- // }
- //
- // for {
- // next, err := sub.Next(ctx)
- // if err == pubsub.ErrTerminated {
- // return err // terminated by publisher
- // } else if err != nil {
- // return err // timed out, client unsubscribed, etc.
- // }
- // process(next)
- // }
- //
- package pubsub
-
- import (
- "context"
- "errors"
- "fmt"
- "sync"
-
- "github.com/tendermint/tendermint/abci/types"
- "github.com/tendermint/tendermint/libs/log"
- "github.com/tendermint/tendermint/libs/service"
- )
-
- var (
- // ErrSubscriptionNotFound is returned when a client tries to unsubscribe
- // from not existing subscription.
- ErrSubscriptionNotFound = errors.New("subscription not found")
-
- // ErrAlreadySubscribed is returned when a client tries to subscribe twice or
- // more using the same query.
- ErrAlreadySubscribed = errors.New("already subscribed")
-
- // ErrServerStopped is returned when attempting to publish or subscribe to a
- // server that has been stopped.
- ErrServerStopped = errors.New("pubsub server is stopped")
- )
-
- // Query defines an interface for a query to be used for subscribing. A query
- // matches against a map of events. Each key in this map is a composite of the
- // even type and an attribute key (e.g. "{eventType}.{eventAttrKey}") and the
- // values are the event values that are contained under that relationship. This
- // allows event types to repeat themselves with the same set of keys and
- // different values.
- type Query interface {
- Matches(events []types.Event) (bool, error)
- String() string
- }
-
- // SubscribeArgs are the parameters to create a new subscription.
- type SubscribeArgs struct {
- ClientID string // Client ID
- Query Query // filter query for events (required)
- Limit int // subscription queue capacity limit (0 means 1)
- Quota int // subscription queue soft quota (0 uses Limit)
- }
-
- // UnsubscribeArgs are the parameters to remove a subscription.
- // The subscriber ID must be populated, and at least one of the client ID or
- // the registered query.
- type UnsubscribeArgs struct {
- Subscriber string // subscriber ID chosen by the client (required)
- ID string // subscription ID (assigned by the server)
- Query Query // the query registered with the subscription
- }
-
- // Validate returns nil if args are valid to identify a subscription to remove.
- // Otherwise, it reports an error.
- func (args UnsubscribeArgs) Validate() error {
- if args.Subscriber == "" {
- return errors.New("must specify a subscriber")
- }
- if args.ID == "" && args.Query == nil {
- return fmt.Errorf("subscription is not fully defined [subscriber=%q]", args.Subscriber)
- }
-
- return nil
- }
-
- // Server allows clients to subscribe/unsubscribe for messages, publishing
- // messages with or without events, and manages internal state.
- type Server struct {
- service.BaseService
- logger log.Logger
-
- queue chan item
- done <-chan struct{} // closed when server should exit
- pubs sync.RWMutex // excl: shutdown; shared: active publisher
- exited chan struct{} // server exited
-
- // All subscriptions currently known.
- // Lock exclusive to add, remove, or cancel subscriptions.
- // Lock shared to look up or publish to subscriptions.
- subs struct {
- sync.RWMutex
- index *subIndex
-
- // This function is called synchronously with each message published
- // before it is delivered to any other subscriber. This allows an index
- // to be persisted before any subscribers see the messages.
- observe func(Message) error
- }
-
- // TODO(creachadair): Rework the options so that this does not need to live
- // as a field. It is not otherwise needed.
- queueCap int
- }
-
- // Option sets a parameter for the server.
- type Option func(*Server)
-
- // NewServer returns a new server. See the commentary on the Option functions
- // for a detailed description of how to configure buffering. If no options are
- // provided, the resulting server's queue is unbuffered.
- func NewServer(logger log.Logger, options ...Option) *Server {
- s := &Server{logger: logger}
-
- s.BaseService = *service.NewBaseService(logger, "PubSub", s)
- for _, opt := range options {
- opt(s)
- }
-
- // The queue receives items to be published.
- s.queue = make(chan item, s.queueCap)
-
- // The index tracks subscriptions by ID and query terms.
- s.subs.index = newSubIndex()
-
- return s
- }
-
- // BufferCapacity allows you to specify capacity for publisher's queue. This
- // is the number of messages that can be published without blocking. If no
- // buffer is specified, publishing is synchronous with delivery. This function
- // will panic if cap < 0.
- func BufferCapacity(cap int) Option {
- if cap < 0 {
- panic("negative buffer capacity")
- }
- return func(s *Server) { s.queueCap = cap }
- }
-
- // BufferCapacity returns capacity of the publication queue.
- func (s *Server) BufferCapacity() int { return cap(s.queue) }
-
- // Subscribe creates a subscription for the given client ID and query.
- // If len(capacities) > 0, its first value is used as the queue capacity.
- //
- // Deprecated: Use SubscribeWithArgs. This method will be removed in v0.36.
- func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, capacities ...int) (*Subscription, error) {
- args := SubscribeArgs{
- ClientID: clientID,
- Query: query,
- Limit: 1,
- }
- if len(capacities) > 0 {
- args.Limit = capacities[0]
- if len(capacities) > 1 {
- args.Quota = capacities[1]
- }
- // bounds are checked below
- }
- return s.SubscribeWithArgs(ctx, args)
- }
-
- // Observe registers an observer function that will be called synchronously
- // with each published message matching any of the given queries, prior to it
- // being forwarded to any subscriber. If no queries are specified, all
- // messages will be observed. An error is reported if an observer is already
- // registered.
- func (s *Server) Observe(ctx context.Context, observe func(Message) error, queries ...Query) error {
- s.subs.Lock()
- defer s.subs.Unlock()
- if observe == nil {
- return errors.New("observe callback is nil")
- } else if s.subs.observe != nil {
- return errors.New("an observer is already registered")
- }
-
- // Compile the message filter.
- var matches func(Message) bool
- if len(queries) == 0 {
- matches = func(Message) bool { return true }
- } else {
- matches = func(msg Message) bool {
- for _, q := range queries {
- match, err := q.Matches(msg.events)
- if err == nil && match {
- return true
- }
- }
- return false
- }
- }
-
- s.subs.observe = func(msg Message) error {
- if matches(msg) {
- return observe(msg)
- }
- return nil // nothing to do for this message
- }
- return nil
- }
-
- // SubscribeWithArgs creates a subscription for the given arguments. It is an
- // error if the query is nil, a subscription already exists for the specified
- // client ID and query, or if the capacity arguments are invalid.
- func (s *Server) SubscribeWithArgs(ctx context.Context, args SubscribeArgs) (*Subscription, error) {
- if args.Query == nil {
- return nil, errors.New("query is nil")
- }
- s.subs.Lock()
- defer s.subs.Unlock()
-
- if s.subs.index == nil {
- return nil, ErrServerStopped
- } else if s.subs.index.contains(args.ClientID, args.Query.String()) {
- return nil, ErrAlreadySubscribed
- }
-
- if args.Limit == 0 {
- args.Limit = 1
- }
- sub, err := newSubscription(args.Quota, args.Limit)
- if err != nil {
- return nil, err
- }
- s.subs.index.add(&subInfo{
- clientID: args.ClientID,
- query: args.Query,
- subID: sub.id,
- sub: sub,
- })
- return sub, nil
- }
-
- // Unsubscribe removes the subscription for the given client and/or query. It
- // returns ErrSubscriptionNotFound if no such subscription exists.
- func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error {
- if err := args.Validate(); err != nil {
- return err
- }
- s.subs.Lock()
- defer s.subs.Unlock()
- if s.subs.index == nil {
- return ErrServerStopped
- }
-
- // TODO(creachadair): Do we need to support unsubscription for an "empty"
- // query? I believe that case is not possible by the Query grammar, but we
- // should make sure.
- //
- // Revisit this logic once we are able to remove indexing by query.
-
- var evict subInfoSet
- if args.Subscriber != "" {
- evict = s.subs.index.findClientID(args.Subscriber)
- if args.Query != nil {
- evict = evict.withQuery(args.Query.String())
- }
- } else {
- evict = s.subs.index.findQuery(args.Query.String())
- }
-
- if len(evict) == 0 {
- return ErrSubscriptionNotFound
- }
- s.removeSubs(evict, ErrUnsubscribed)
- return nil
- }
-
- // UnsubscribeAll removes all subscriptions for the given client ID.
- // It returns ErrSubscriptionNotFound if no subscriptions exist for that client.
- func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
- s.subs.Lock()
- defer s.subs.Unlock()
-
- evict := s.subs.index.findClientID(clientID)
- if len(evict) == 0 {
- return ErrSubscriptionNotFound
- }
- s.removeSubs(evict, ErrUnsubscribed)
- return nil
- }
-
- // NumClients returns the number of clients.
- func (s *Server) NumClients() int {
- s.subs.RLock()
- defer s.subs.RUnlock()
- return len(s.subs.index.byClient)
- }
-
- // NumClientSubscriptions returns the number of subscriptions the client has.
- func (s *Server) NumClientSubscriptions(clientID string) int {
- s.subs.RLock()
- defer s.subs.RUnlock()
- return len(s.subs.index.findClientID(clientID))
- }
-
- // Publish publishes the given message. An error will be returned to the caller
- // if the context is canceled.
- func (s *Server) Publish(ctx context.Context, msg interface{}) error {
- return s.publish(ctx, msg, []types.Event{})
- }
-
- // PublishWithEvents publishes the given message with the set of events. The set
- // is matched with clients queries. If there is a match, the message is sent to
- // the client.
- func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events []types.Event) error {
- return s.publish(ctx, msg, events)
- }
-
- // OnStop implements part of the Service interface. It is a no-op.
- func (s *Server) OnStop() {}
-
- // Wait implements Service.Wait by blocking until the server has exited, then
- // yielding to the base service wait.
- func (s *Server) Wait() { <-s.exited; s.BaseService.Wait() }
-
- // OnStart implements Service.OnStart by starting the server.
- func (s *Server) OnStart(ctx context.Context) error { s.run(ctx); return nil }
-
- // OnReset implements Service.OnReset. It has no effect for this service.
- func (s *Server) OnReset() error { return nil }
-
- func (s *Server) publish(ctx context.Context, data interface{}, events []types.Event) error {
- s.pubs.RLock()
- defer s.pubs.RUnlock()
-
- select {
- case <-s.done:
- return ErrServerStopped
- case <-ctx.Done():
- return ctx.Err()
- case s.queue <- item{
- Data: data,
- Events: events,
- }:
- return nil
- }
- }
-
- func (s *Server) run(ctx context.Context) {
- // The server runs until ctx is canceled.
- s.done = ctx.Done()
- queue := s.queue
-
- // Shutdown monitor: When the context ends, wait for any active publish
- // calls to exit, then close the queue to signal the sender to exit.
- go func() {
- <-ctx.Done()
- s.pubs.Lock()
- defer s.pubs.Unlock()
- close(s.queue)
- s.queue = nil
- }()
-
- s.exited = make(chan struct{})
- go func() {
- defer close(s.exited)
-
- // Sender: Service the queue and forward messages to subscribers.
- for it := range queue {
- if err := s.send(it.Data, it.Events); err != nil {
- s.logger.Error("Error sending event", "err", err)
- }
- }
- // Terminate all subscribers before exit.
- s.subs.Lock()
- defer s.subs.Unlock()
- for si := range s.subs.index.all {
- si.sub.stop(ErrTerminated)
- }
- s.subs.index = nil
- }()
- }
-
- // removeSubs cancels and removes all the subscriptions in evict with the given
- // error. The caller must hold the s.subs lock.
- func (s *Server) removeSubs(evict subInfoSet, reason error) {
- for si := range evict {
- si.sub.stop(reason)
- }
- s.subs.index.removeAll(evict)
- }
-
- // send delivers the given message to all matching subscribers. An error in
- // query matching stops transmission and is returned.
- func (s *Server) send(data interface{}, events []types.Event) error {
- // At exit, evict any subscriptions that were too slow.
- evict := make(subInfoSet)
- defer func() {
- if len(evict) != 0 {
- s.subs.Lock()
- defer s.subs.Unlock()
- s.removeSubs(evict, ErrTerminated)
- }
- }()
-
- // N.B. Order is important here. We must acquire and defer the lock release
- // AFTER deferring the eviction cleanup: The cleanup must happen after the
- // reader lock has released, or it will deadlock.
- s.subs.RLock()
- defer s.subs.RUnlock()
-
- // If an observer is defined, give it control of the message before
- // attempting to deliver it to any matching subscribers. If the observer
- // fails, the message will not be forwarded.
- if s.subs.observe != nil {
- err := s.subs.observe(Message{
- data: data,
- events: events,
- })
- if err != nil {
- return fmt.Errorf("observer failed on message: %w", err)
- }
- }
-
- for si := range s.subs.index.all {
- match, err := si.query.Matches(events)
- if err != nil {
- return fmt.Errorf("match failed against query: %w", err)
- // TODO(creachadair): Should we evict this subscription?
- } else if !match {
- continue
- }
-
- // Publish the events to the subscriber's queue. If this fails, e.g.,
- // because the queue is over capacity or out of quota, evict the
- // subscription from the index.
- if err := si.sub.publish(Message{
- subID: si.sub.id,
- data: data,
- events: events,
- }); err != nil {
- evict.add(si)
- }
- }
-
- return nil
- }
|