// Package pubsub implements an event dispatching server with a single publisher // and multiple subscriber clients. Multiple goroutines can safely publish to a // single Server instance. // // Clients register subscriptions with a query to select which messages they // wish to receive. When messages are published, they are broadcast to all // clients whose subscription query matches that message. Queries are // constructed using the github.com/tendermint/tendermint/libs/pubsub/query // package. // // Example: // // q, err := query.New("account.name='John'") // if err != nil { // return err // } // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) // defer cancel() // subscription, err := pubsub.Subscribe(ctx, "johns-transactions", q) // if err != nil { // return err // } // // for { // select { // case msg <- subscription.Out(): // // handle msg.Data() and msg.Events() // case <-subscription.Canceled(): // return subscription.Err() // } // } // package pubsub import ( "context" "errors" "fmt" "sync" "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/service" ) var ( // ErrSubscriptionNotFound is returned when a client tries to unsubscribe // from not existing subscription. ErrSubscriptionNotFound = errors.New("subscription not found") // ErrAlreadySubscribed is returned when a client tries to subscribe twice or // more using the same query. ErrAlreadySubscribed = errors.New("already subscribed") // ErrServerStopped is returned when attempting to publish or subscribe to a // server that has been stopped. ErrServerStopped = errors.New("pubsub server is stopped") ) // Query defines an interface for a query to be used for subscribing. A query // matches against a map of events. Each key in this map is a composite of the // even type and an attribute key (e.g. "{eventType}.{eventAttrKey}") and the // values are the event values that are contained under that relationship. This // allows event types to repeat themselves with the same set of keys and // different values. type Query interface { Matches(events []types.Event) (bool, error) String() string } // UnsubscribeArgs are the parameters to remove a subscription. // The subscriber ID must be populated, and at least one of the client ID or // the registered query. type UnsubscribeArgs struct { Subscriber string // subscriber ID chosen by the client (required) ID string // subscription ID (assigned by the server) Query Query // the query registered with the subscription } // Validate returns nil if args are valid to identify a subscription to remove. // Otherwise, it reports an error. func (args UnsubscribeArgs) Validate() error { if args.Subscriber == "" { return errors.New("must specify a subscriber") } if args.ID == "" && args.Query == nil { return fmt.Errorf("subscription is not fully defined [subscriber=%q]", args.Subscriber) } return nil } // Server allows clients to subscribe/unsubscribe for messages, publishing // messages with or without events, and manages internal state. type Server struct { service.BaseService queue chan item done <-chan struct{} // closed when server should exit stop func() // signal the server to exit pubs sync.RWMutex // excl: shutdown; shared: active publisher exited chan struct{} // server exited // All subscriptions currently known. // Lock exclusive to add, remove, or cancel subscriptions. // Lock shared to look up or publish to subscriptions. subs struct { sync.RWMutex index *subIndex } // TODO(creachadair): Rework the options so that this does not need to live // as a field. It is not otherwise needed. queueCap int } // Option sets a parameter for the server. type Option func(*Server) // NewServer returns a new server. See the commentary on the Option functions // for a detailed description of how to configure buffering. If no options are // provided, the resulting server's queue is unbuffered. func NewServer(options ...Option) *Server { s := new(Server) for _, opt := range options { opt(s) } s.BaseService = *service.NewBaseService(nil, "PubSub", s) // The queue receives items to be published. s.queue = make(chan item, s.queueCap) // The index tracks subscriptions by ID and query terms. s.subs.index = newSubIndex() return s } // BufferCapacity allows you to specify capacity for publisher's queue. This // is the number of messages that can be published without blocking. If no // buffer is specified, publishing is synchronous with delivery. This function // will panic if cap < 0. func BufferCapacity(cap int) Option { if cap < 0 { panic("negative buffer capacity") } return func(s *Server) { s.queueCap = cap } } // BufferCapacity returns capacity of the publication queue. func (s *Server) BufferCapacity() int { return cap(s.queue) } // Subscribe creates a subscription for the given client. // // An error will be returned to the caller if the context is canceled or if // subscription already exist for pair clientID and query. // // outCapacity can be used to set a capacity for Subscription#Out channel (1 by // default). Panics if outCapacity is less than or equal to zero. If you want // an unbuffered channel, use SubscribeUnbuffered. func (s *Server) Subscribe( ctx context.Context, clientID string, query Query, outCapacity ...int) (*Subscription, error) { outCap := 1 if len(outCapacity) > 0 { if outCapacity[0] <= 0 { panic("Negative or zero capacity. Use SubscribeUnbuffered if you want an unbuffered channel") } outCap = outCapacity[0] } return s.subscribe(ctx, clientID, query, outCap) } // SubscribeUnbuffered does the same as Subscribe, except it returns a // subscription with unbuffered channel. Use with caution as it can freeze the // server. func (s *Server) SubscribeUnbuffered(ctx context.Context, clientID string, query Query) (*Subscription, error) { return s.subscribe(ctx, clientID, query, 0) } func (s *Server) subscribe(ctx context.Context, clientID string, query Query, outCapacity int) (*Subscription, error) { s.subs.Lock() defer s.subs.Unlock() if s.subs.index == nil { return nil, ErrServerStopped } else if s.subs.index.contains(clientID, query.String()) { return nil, ErrAlreadySubscribed } sub, err := newSubscription(outCapacity) if err != nil { return nil, err } s.subs.index.add(&subInfo{ clientID: clientID, query: query, subID: sub.id, sub: sub, }) return sub, nil } // Unsubscribe removes the subscription for the given client and/or query. It // returns ErrSubscriptionNotFound if no such subscription exists. func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error { if err := args.Validate(); err != nil { return err } s.subs.Lock() defer s.subs.Unlock() if s.subs.index == nil { return ErrServerStopped } // TODO(creachadair): Do we need to support unsubscription for an "empty" // query? I believe that case is not possible by the Query grammar, but we // should make sure. // // Revisit this logic once we are able to remove indexing by query. var evict subInfoSet if args.Subscriber != "" { evict = s.subs.index.findClientID(args.Subscriber) if args.Query != nil { evict = evict.withQuery(args.Query.String()) } } else { evict = s.subs.index.findQuery(args.Query.String()) } if len(evict) == 0 { return ErrSubscriptionNotFound } s.removeSubs(evict, ErrUnsubscribed) return nil } // UnsubscribeAll removes all subscriptions for the given client ID. // It returns ErrSubscriptionNotFound if no subscriptions exist for that client. func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { s.subs.Lock() defer s.subs.Unlock() evict := s.subs.index.findClientID(clientID) if len(evict) == 0 { return ErrSubscriptionNotFound } s.removeSubs(evict, ErrUnsubscribed) return nil } // NumClients returns the number of clients. func (s *Server) NumClients() int { s.subs.RLock() defer s.subs.RUnlock() return len(s.subs.index.byClient) } // NumClientSubscriptions returns the number of subscriptions the client has. func (s *Server) NumClientSubscriptions(clientID string) int { s.subs.RLock() defer s.subs.RUnlock() return len(s.subs.index.findClientID(clientID)) } // Publish publishes the given message. An error will be returned to the caller // if the context is canceled. func (s *Server) Publish(ctx context.Context, msg interface{}) error { return s.publish(ctx, msg, []types.Event{}) } // PublishWithEvents publishes the given message with the set of events. The set // is matched with clients queries. If there is a match, the message is sent to // the client. func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events []types.Event) error { return s.publish(ctx, msg, events) } // OnStop implements Service.OnStop by shutting down the server. func (s *Server) OnStop() { s.stop() } // Wait implements Service.Wait by blocking until the server has exited, then // yielding to the base service wait. func (s *Server) Wait() { <-s.exited; s.BaseService.Wait() } // OnStart implements Service.OnStart by starting the server. func (s *Server) OnStart() error { s.run(); return nil } // OnReset implements Service.OnReset. It has no effect for this service. func (s *Server) OnReset() error { return nil } func (s *Server) publish(ctx context.Context, data interface{}, events []types.Event) error { s.pubs.RLock() defer s.pubs.RUnlock() select { case <-s.done: return ErrServerStopped case <-ctx.Done(): return ctx.Err() case s.queue <- item{ Data: data, Events: events, }: return nil } } func (s *Server) run() { // The server runs until ctx is canceled. ctx, cancel := context.WithCancel(context.Background()) s.done = ctx.Done() s.stop = cancel // Shutdown monitor: When the context ends, wait for any active publish // calls to exit, then close the queue to signal the sender to exit. go func() { <-ctx.Done() s.pubs.Lock() defer s.pubs.Unlock() close(s.queue) }() s.exited = make(chan struct{}) go func() { defer close(s.exited) // Sender: Service the queue and forward messages to subscribers. for it := range s.queue { if err := s.send(it.Data, it.Events); err != nil { s.Logger.Error("Error sending event", "err", err) } } // Terminate all subscribers without error before exit. s.subs.Lock() defer s.subs.Unlock() for si := range s.subs.index.all { si.sub.cancel(nil) } s.subs.index = nil }() } // removeSubs cancels and removes all the subscriptions in evict with the given // error. The caller must hold the s.subs lock. func (s *Server) removeSubs(evict subInfoSet, reason error) { for si := range evict { si.sub.cancel(reason) } s.subs.index.removeAll(evict) } // send delivers the given message to all matching subscribers. An error in // query matching stops transmission and is returned. func (s *Server) send(data interface{}, events []types.Event) error { // At exit, evict any subscriptions that were too slow. evict := make(subInfoSet) defer func() { if len(evict) != 0 { s.subs.Lock() defer s.subs.Unlock() s.removeSubs(evict, ErrOutOfCapacity) } }() // N.B. Order is important here. We must acquire and defer the lock release // AFTER deferring the eviction cleanup: The cleanup must happen after the // reader lock has released, or it will deadlock. s.subs.RLock() defer s.subs.RUnlock() for si := range s.subs.index.all { match, err := si.query.Matches(events) if err != nil { return fmt.Errorf("match failed against query: %w", err) // TODO(creachadair): Should we evict this subscription? } else if !match { continue } // Subscriptions may be buffered or unbuffered. Unbuffered subscriptions // are intended for internal use such as indexing, where we don't want to // penalize a slow reader. Buffered subscribers must keep up with their // queue, or they will be terminated. // // TODO(creachadair): Unbuffered subscriptions used by the event indexer // to avoid losing events if it happens to be slow. Rework this so that // use case doesn't require this affordance, and then remove unbuffered // subscriptions. msg := NewMessage(si.sub.id, data, events) if err := si.sub.putMessage(msg); err != nil { // The subscriber was too slow, cancel them. evict.add(si) } } return nil }