You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

464 lines
14 KiB

// Package pubsub implements an event dispatching server with a single publisher
// and multiple subscriber clients. Multiple goroutines can safely publish to a
// single Server instance.
//
// Clients register subscriptions with a query to select which messages they
// wish to receive. When messages are published, they are broadcast to all
// clients whose subscription query matches that message. Queries are
// constructed using the github.com/tendermint/tendermint/libs/pubsub/query
// package.
//
// Example:
//
// q, err := query.New(`account.name='John'`)
// if err != nil {
// return err
// }
// sub, err := pubsub.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
// ClientID: "johns-transactions",
// Query: q,
// })
// if err != nil {
// return err
// }
//
// for {
// next, err := sub.Next(ctx)
// if err == pubsub.ErrTerminated {
// return err // terminated by publisher
// } else if err != nil {
// return err // timed out, client unsubscribed, etc.
// }
// process(next)
// }
//
package pubsub
import (
"context"
"errors"
"fmt"
"sync"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/service"
)
var (
// ErrSubscriptionNotFound is returned when a client tries to unsubscribe
// from not existing subscription.
ErrSubscriptionNotFound = errors.New("subscription not found")
// ErrAlreadySubscribed is returned when a client tries to subscribe twice or
// more using the same query.
ErrAlreadySubscribed = errors.New("already subscribed")
// ErrServerStopped is returned when attempting to publish or subscribe to a
// server that has been stopped.
ErrServerStopped = errors.New("pubsub server is stopped")
)
// Query defines an interface for a query to be used for subscribing. A query
// matches against a map of events. Each key in this map is a composite of the
// even type and an attribute key (e.g. "{eventType}.{eventAttrKey}") and the
// values are the event values that are contained under that relationship. This
// allows event types to repeat themselves with the same set of keys and
// different values.
type Query interface {
Matches(events []types.Event) (bool, error)
String() string
}
// SubscribeArgs are the parameters to create a new subscription.
type SubscribeArgs struct {
ClientID string // Client ID
Query Query // filter query for events (required)
Limit int // subscription queue capacity limit (0 means 1)
Quota int // subscription queue soft quota (0 uses Limit)
}
// UnsubscribeArgs are the parameters to remove a subscription.
// The subscriber ID must be populated, and at least one of the client ID or
// the registered query.
type UnsubscribeArgs struct {
Subscriber string // subscriber ID chosen by the client (required)
ID string // subscription ID (assigned by the server)
Query Query // the query registered with the subscription
}
// Validate returns nil if args are valid to identify a subscription to remove.
// Otherwise, it reports an error.
func (args UnsubscribeArgs) Validate() error {
if args.Subscriber == "" {
return errors.New("must specify a subscriber")
}
if args.ID == "" && args.Query == nil {
return fmt.Errorf("subscription is not fully defined [subscriber=%q]", args.Subscriber)
}
return nil
}
// Server allows clients to subscribe/unsubscribe for messages, publishing
// messages with or without events, and manages internal state.
type Server struct {
service.BaseService
queue chan item
done <-chan struct{} // closed when server should exit
stop func() // signal the server to exit
pubs sync.RWMutex // excl: shutdown; shared: active publisher
exited chan struct{} // server exited
// All subscriptions currently known.
// Lock exclusive to add, remove, or cancel subscriptions.
// Lock shared to look up or publish to subscriptions.
subs struct {
sync.RWMutex
index *subIndex
// This function is called synchronously with each message published
// before it is delivered to any other subscriber. This allows an index
// to be persisted before any subscribers see the messages.
observe func(Message) error
}
// TODO(creachadair): Rework the options so that this does not need to live
// as a field. It is not otherwise needed.
queueCap int
}
// Option sets a parameter for the server.
type Option func(*Server)
// NewServer returns a new server. See the commentary on the Option functions
// for a detailed description of how to configure buffering. If no options are
// provided, the resulting server's queue is unbuffered.
func NewServer(options ...Option) *Server {
s := new(Server)
for _, opt := range options {
opt(s)
}
s.BaseService = *service.NewBaseService(nil, "PubSub", s)
// The queue receives items to be published.
s.queue = make(chan item, s.queueCap)
// The index tracks subscriptions by ID and query terms.
s.subs.index = newSubIndex()
return s
}
// BufferCapacity allows you to specify capacity for publisher's queue. This
// is the number of messages that can be published without blocking. If no
// buffer is specified, publishing is synchronous with delivery. This function
// will panic if cap < 0.
func BufferCapacity(cap int) Option {
if cap < 0 {
panic("negative buffer capacity")
}
return func(s *Server) { s.queueCap = cap }
}
// BufferCapacity returns capacity of the publication queue.
func (s *Server) BufferCapacity() int { return cap(s.queue) }
// Subscribe creates a subscription for the given client ID and query.
// If len(capacities) > 0, its first value is used as the queue capacity.
//
// Deprecated: Use SubscribeWithArgs. This method will be removed in v0.36.
func (s *Server) Subscribe(ctx context.Context,
clientID string, query Query, capacities ...int) (*Subscription, error) {
args := SubscribeArgs{
ClientID: clientID,
Query: query,
Limit: 1,
}
if len(capacities) > 0 {
args.Limit = capacities[0]
if len(capacities) > 1 {
args.Quota = capacities[1]
}
// bounds are checked below
}
return s.SubscribeWithArgs(ctx, args)
}
// Observe registers an observer function that will be called synchronously
// with each published message matching any of the given queries, prior to it
// being forwarded to any subscriber. If no queries are specified, all
// messages will be observed. An error is reported if an observer is already
// registered.
func (s *Server) Observe(ctx context.Context, observe func(Message) error, queries ...Query) error {
s.subs.Lock()
defer s.subs.Unlock()
if observe == nil {
return errors.New("observe callback is nil")
} else if s.subs.observe != nil {
return errors.New("an observer is already registered")
}
// Compile the message filter.
var matches func(Message) bool
if len(queries) == 0 {
matches = func(Message) bool { return true }
} else {
matches = func(msg Message) bool {
for _, q := range queries {
match, err := q.Matches(msg.events)
if err == nil && match {
return true
}
}
return false
}
}
s.subs.observe = func(msg Message) error {
if matches(msg) {
return observe(msg)
}
return nil // nothing to do for this message
}
return nil
}
// SubscribeWithArgs creates a subscription for the given arguments. It is an
// error if the query is nil, a subscription already exists for the specified
// client ID and query, or if the capacity arguments are invalid.
func (s *Server) SubscribeWithArgs(ctx context.Context, args SubscribeArgs) (*Subscription, error) {
if args.Query == nil {
return nil, errors.New("query is nil")
}
s.subs.Lock()
defer s.subs.Unlock()
if s.subs.index == nil {
return nil, ErrServerStopped
} else if s.subs.index.contains(args.ClientID, args.Query.String()) {
return nil, ErrAlreadySubscribed
}
if args.Limit == 0 {
args.Limit = 1
}
sub, err := newSubscription(args.Quota, args.Limit)
if err != nil {
return nil, err
}
s.subs.index.add(&subInfo{
clientID: args.ClientID,
query: args.Query,
subID: sub.id,
sub: sub,
})
return sub, nil
}
// Unsubscribe removes the subscription for the given client and/or query. It
// returns ErrSubscriptionNotFound if no such subscription exists.
func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error {
if err := args.Validate(); err != nil {
return err
}
s.subs.Lock()
defer s.subs.Unlock()
if s.subs.index == nil {
return ErrServerStopped
}
// TODO(creachadair): Do we need to support unsubscription for an "empty"
// query? I believe that case is not possible by the Query grammar, but we
// should make sure.
//
// Revisit this logic once we are able to remove indexing by query.
var evict subInfoSet
if args.Subscriber != "" {
evict = s.subs.index.findClientID(args.Subscriber)
if args.Query != nil {
evict = evict.withQuery(args.Query.String())
}
} else {
evict = s.subs.index.findQuery(args.Query.String())
}
if len(evict) == 0 {
return ErrSubscriptionNotFound
}
s.removeSubs(evict, ErrUnsubscribed)
return nil
}
// UnsubscribeAll removes all subscriptions for the given client ID.
// It returns ErrSubscriptionNotFound if no subscriptions exist for that client.
func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
s.subs.Lock()
defer s.subs.Unlock()
evict := s.subs.index.findClientID(clientID)
if len(evict) == 0 {
return ErrSubscriptionNotFound
}
s.removeSubs(evict, ErrUnsubscribed)
return nil
}
// NumClients returns the number of clients.
func (s *Server) NumClients() int {
s.subs.RLock()
defer s.subs.RUnlock()
return len(s.subs.index.byClient)
}
// NumClientSubscriptions returns the number of subscriptions the client has.
func (s *Server) NumClientSubscriptions(clientID string) int {
s.subs.RLock()
defer s.subs.RUnlock()
return len(s.subs.index.findClientID(clientID))
}
// Publish publishes the given message. An error will be returned to the caller
// if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error {
return s.publish(ctx, msg, []types.Event{})
}
// PublishWithEvents publishes the given message with the set of events. The set
// is matched with clients queries. If there is a match, the message is sent to
// the client.
func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events []types.Event) error {
return s.publish(ctx, msg, events)
}
// OnStop implements Service.OnStop by shutting down the server.
func (s *Server) OnStop() { s.stop() }
// Wait implements Service.Wait by blocking until the server has exited, then
// yielding to the base service wait.
func (s *Server) Wait() { <-s.exited; s.BaseService.Wait() }
// OnStart implements Service.OnStart by starting the server.
func (s *Server) OnStart() error { s.run(); return nil }
// OnReset implements Service.OnReset. It has no effect for this service.
func (s *Server) OnReset() error { return nil }
func (s *Server) publish(ctx context.Context, data interface{}, events []types.Event) error {
s.pubs.RLock()
defer s.pubs.RUnlock()
select {
case <-s.done:
return ErrServerStopped
case <-ctx.Done():
return ctx.Err()
case s.queue <- item{
Data: data,
Events: events,
}:
return nil
}
}
func (s *Server) run() {
// The server runs until ctx is canceled.
ctx, cancel := context.WithCancel(context.Background())
s.done = ctx.Done()
s.stop = cancel
// Shutdown monitor: When the context ends, wait for any active publish
// calls to exit, then close the queue to signal the sender to exit.
go func() {
<-ctx.Done()
s.pubs.Lock()
defer s.pubs.Unlock()
close(s.queue)
}()
s.exited = make(chan struct{})
go func() {
defer close(s.exited)
// Sender: Service the queue and forward messages to subscribers.
for it := range s.queue {
if err := s.send(it.Data, it.Events); err != nil {
s.Logger.Error("Error sending event", "err", err)
}
}
// Terminate all subscribers before exit.
s.subs.Lock()
defer s.subs.Unlock()
for si := range s.subs.index.all {
si.sub.stop(ErrTerminated)
}
s.subs.index = nil
}()
}
// removeSubs cancels and removes all the subscriptions in evict with the given
// error. The caller must hold the s.subs lock.
func (s *Server) removeSubs(evict subInfoSet, reason error) {
for si := range evict {
si.sub.stop(reason)
}
s.subs.index.removeAll(evict)
}
// send delivers the given message to all matching subscribers. An error in
// query matching stops transmission and is returned.
func (s *Server) send(data interface{}, events []types.Event) error {
// At exit, evict any subscriptions that were too slow.
evict := make(subInfoSet)
defer func() {
if len(evict) != 0 {
s.subs.Lock()
defer s.subs.Unlock()
s.removeSubs(evict, ErrTerminated)
}
}()
// N.B. Order is important here. We must acquire and defer the lock release
// AFTER deferring the eviction cleanup: The cleanup must happen after the
// reader lock has released, or it will deadlock.
s.subs.RLock()
defer s.subs.RUnlock()
// If an observer is defined, give it control of the message before
// attempting to deliver it to any matching subscribers. If the observer
// fails, the message will not be forwarded.
if s.subs.observe != nil {
err := s.subs.observe(Message{
data: data,
events: events,
})
if err != nil {
return fmt.Errorf("observer failed on message: %w", err)
}
}
for si := range s.subs.index.all {
match, err := si.query.Matches(events)
if err != nil {
return fmt.Errorf("match failed against query: %w", err)
// TODO(creachadair): Should we evict this subscription?
} else if !match {
continue
}
// Publish the events to the subscriber's queue. If this fails, e.g.,
// because the queue is over capacity or out of quota, evict the
// subscription from the index.
if err := si.sub.publish(Message{
subID: si.sub.id,
data: data,
events: events,
}); err != nil {
evict.add(si)
}
}
return nil
}