You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

405 lines
12 KiB

// Package pubsub implements an event dispatching server with a single publisher
// and multiple subscriber clients. Multiple goroutines can safely publish to a
// single Server instance.
//
// Clients register subscriptions with a query to select which messages they
// wish to receive. When messages are published, they are broadcast to all
// clients whose subscription query matches that message. Queries are
// constructed using the github.com/tendermint/tendermint/libs/pubsub/query
// package.
//
// Example:
//
// q, err := query.New("account.name='John'")
// if err != nil {
// return err
// }
// ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
// defer cancel()
// subscription, err := pubsub.Subscribe(ctx, "johns-transactions", q)
// if err != nil {
// return err
// }
//
// for {
// select {
// case msg <- subscription.Out():
// // handle msg.Data() and msg.Events()
// case <-subscription.Canceled():
// return subscription.Err()
// }
// }
//
package pubsub
import (
"context"
"errors"
"fmt"
"sync"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/service"
)
var (
// ErrSubscriptionNotFound is returned when a client tries to unsubscribe
// from not existing subscription.
ErrSubscriptionNotFound = errors.New("subscription not found")
// ErrAlreadySubscribed is returned when a client tries to subscribe twice or
// more using the same query.
ErrAlreadySubscribed = errors.New("already subscribed")
// ErrServerStopped is returned when attempting to publish or subscribe to a
// server that has been stopped.
ErrServerStopped = errors.New("pubsub server is stopped")
)
// Query defines an interface for a query to be used for subscribing. A query
// matches against a map of events. Each key in this map is a composite of the
// even type and an attribute key (e.g. "{eventType}.{eventAttrKey}") and the
// values are the event values that are contained under that relationship. This
// allows event types to repeat themselves with the same set of keys and
// different values.
type Query interface {
Matches(events []types.Event) (bool, error)
String() string
}
// UnsubscribeArgs are the parameters to remove a subscription.
// The subscriber ID must be populated, and at least one of the client ID or
// the registered query.
type UnsubscribeArgs struct {
Subscriber string // subscriber ID chosen by the client (required)
ID string // subscription ID (assigned by the server)
Query Query // the query registered with the subscription
}
// Validate returns nil if args are valid to identify a subscription to remove.
// Otherwise, it reports an error.
func (args UnsubscribeArgs) Validate() error {
if args.Subscriber == "" {
return errors.New("must specify a subscriber")
}
if args.ID == "" && args.Query == nil {
return fmt.Errorf("subscription is not fully defined [subscriber=%q]", args.Subscriber)
}
return nil
}
// Server allows clients to subscribe/unsubscribe for messages, publishing
// messages with or without events, and manages internal state.
type Server struct {
service.BaseService
queue chan item
done <-chan struct{} // closed when server should exit
stop func() // signal the server to exit
pubs sync.RWMutex // excl: shutdown; shared: active publisher
exited chan struct{} // server exited
// All subscriptions currently known.
// Lock exclusive to add, remove, or cancel subscriptions.
// Lock shared to look up or publish to subscriptions.
subs struct {
sync.RWMutex
index *subIndex
}
// TODO(creachadair): Rework the options so that this does not need to live
// as a field. It is not otherwise needed.
queueCap int
}
// Option sets a parameter for the server.
type Option func(*Server)
// NewServer returns a new server. See the commentary on the Option functions
// for a detailed description of how to configure buffering. If no options are
// provided, the resulting server's queue is unbuffered.
func NewServer(options ...Option) *Server {
s := new(Server)
for _, opt := range options {
opt(s)
}
s.BaseService = *service.NewBaseService(nil, "PubSub", s)
// The queue receives items to be published.
s.queue = make(chan item, s.queueCap)
// The index tracks subscriptions by ID and query terms.
s.subs.index = newSubIndex()
return s
}
// BufferCapacity allows you to specify capacity for publisher's queue. This
// is the number of messages that can be published without blocking. If no
// buffer is specified, publishing is synchronous with delivery. This function
// will panic if cap < 0.
func BufferCapacity(cap int) Option {
if cap < 0 {
panic("negative buffer capacity")
}
return func(s *Server) { s.queueCap = cap }
}
// BufferCapacity returns capacity of the publication queue.
func (s *Server) BufferCapacity() int { return cap(s.queue) }
// Subscribe creates a subscription for the given client.
//
// An error will be returned to the caller if the context is canceled or if
// subscription already exist for pair clientID and query.
//
// outCapacity can be used to set a capacity for Subscription#Out channel (1 by
// default). Panics if outCapacity is less than or equal to zero. If you want
// an unbuffered channel, use SubscribeUnbuffered.
func (s *Server) Subscribe(
ctx context.Context,
clientID string,
query Query,
outCapacity ...int) (*Subscription, error) {
outCap := 1
if len(outCapacity) > 0 {
if outCapacity[0] <= 0 {
panic("Negative or zero capacity. Use SubscribeUnbuffered if you want an unbuffered channel")
}
outCap = outCapacity[0]
}
return s.subscribe(ctx, clientID, query, outCap)
}
// SubscribeUnbuffered does the same as Subscribe, except it returns a
// subscription with unbuffered channel. Use with caution as it can freeze the
// server.
func (s *Server) SubscribeUnbuffered(ctx context.Context, clientID string, query Query) (*Subscription, error) {
return s.subscribe(ctx, clientID, query, 0)
}
func (s *Server) subscribe(ctx context.Context, clientID string, query Query, outCapacity int) (*Subscription, error) {
s.subs.Lock()
defer s.subs.Unlock()
if s.subs.index == nil {
return nil, ErrServerStopped
} else if s.subs.index.contains(clientID, query.String()) {
return nil, ErrAlreadySubscribed
}
sub := NewSubscription(outCapacity)
s.subs.index.add(&subInfo{
clientID: clientID,
query: query,
subID: sub.id,
sub: sub,
})
return sub, nil
}
// Unsubscribe removes the subscription for the given client and/or query. It
// returns ErrSubscriptionNotFound if no such subscription exists.
func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error {
if err := args.Validate(); err != nil {
return err
}
s.subs.Lock()
defer s.subs.Unlock()
if s.subs.index == nil {
return ErrServerStopped
}
// TODO(creachadair): Do we need to support unsubscription for an "empty"
// query? I believe that case is not possible by the Query grammar, but we
// should make sure.
//
// Revisit this logic once we are able to remove indexing by query.
var evict subInfoSet
if args.Subscriber != "" {
evict = s.subs.index.findClientID(args.Subscriber)
if args.Query != nil {
evict = evict.withQuery(args.Query.String())
}
} else {
evict = s.subs.index.findQuery(args.Query.String())
}
if len(evict) == 0 {
return ErrSubscriptionNotFound
}
s.removeSubs(evict, ErrUnsubscribed)
return nil
}
// UnsubscribeAll removes all subscriptions for the given client ID.
// It returns ErrSubscriptionNotFound if no subscriptions exist for that client.
func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
s.subs.Lock()
defer s.subs.Unlock()
evict := s.subs.index.findClientID(clientID)
if len(evict) == 0 {
return ErrSubscriptionNotFound
}
s.removeSubs(evict, ErrUnsubscribed)
return nil
}
// NumClients returns the number of clients.
func (s *Server) NumClients() int {
s.subs.RLock()
defer s.subs.RUnlock()
return len(s.subs.index.byClient)
}
// NumClientSubscriptions returns the number of subscriptions the client has.
func (s *Server) NumClientSubscriptions(clientID string) int {
s.subs.RLock()
defer s.subs.RUnlock()
return len(s.subs.index.findClientID(clientID))
}
// Publish publishes the given message. An error will be returned to the caller
// if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error {
return s.publish(ctx, msg, []types.Event{})
}
// PublishWithEvents publishes the given message with the set of events. The set
// is matched with clients queries. If there is a match, the message is sent to
// the client.
func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events []types.Event) error {
return s.publish(ctx, msg, events)
}
// OnStop implements Service.OnStop by shutting down the server.
func (s *Server) OnStop() { s.stop() }
// Wait implements Service.Wait by blocking until the server has exited, then
// yielding to the base service wait.
func (s *Server) Wait() { <-s.exited; s.BaseService.Wait() }
// OnStart implements Service.OnStart by starting the server.
func (s *Server) OnStart() error { s.run(); return nil }
// OnReset implements Service.OnReset. It has no effect for this service.
func (s *Server) OnReset() error { return nil }
func (s *Server) publish(ctx context.Context, data interface{}, events []types.Event) error {
s.pubs.RLock()
defer s.pubs.RUnlock()
select {
case <-s.done:
return ErrServerStopped
case <-ctx.Done():
return ctx.Err()
case s.queue <- item{
Data: data,
Events: events,
}:
return nil
}
}
func (s *Server) run() {
// The server runs until ctx is canceled.
ctx, cancel := context.WithCancel(context.Background())
s.done = ctx.Done()
s.stop = cancel
// Shutdown monitor: When the context ends, wait for any active publish
// calls to exit, then close the queue to signal the sender to exit.
go func() {
<-ctx.Done()
s.pubs.Lock()
defer s.pubs.Unlock()
close(s.queue)
}()
s.exited = make(chan struct{})
go func() {
defer close(s.exited)
// Sender: Service the queue and forward messages to subscribers.
for it := range s.queue {
if err := s.send(it.Data, it.Events); err != nil {
s.Logger.Error("Error sending event", "err", err)
}
}
// Terminate all subscribers without error before exit.
s.subs.Lock()
defer s.subs.Unlock()
for si := range s.subs.index.all {
si.sub.cancel(nil)
}
s.subs.index = nil
}()
}
// removeSubs cancels and removes all the subscriptions in evict with the given
// error. The caller must hold the s.subs lock.
func (s *Server) removeSubs(evict subInfoSet, reason error) {
for si := range evict {
si.sub.cancel(reason)
}
s.subs.index.removeAll(evict)
}
// send delivers the given message to all matching subscribers. An error in
// query matching stops transmission and is returned.
func (s *Server) send(data interface{}, events []types.Event) error {
// At exit, evict any subscriptions that were too slow.
evict := make(subInfoSet)
defer func() {
if len(evict) != 0 {
s.subs.Lock()
defer s.subs.Unlock()
s.removeSubs(evict, ErrOutOfCapacity)
}
}()
// N.B. Order is important here. We must acquire and defer the lock release
// AFTER deferring the eviction cleanup: The cleanup must happen after the
// reader lock has released, or it will deadlock.
s.subs.RLock()
defer s.subs.RUnlock()
for si := range s.subs.index.all {
match, err := si.query.Matches(events)
if err != nil {
return fmt.Errorf("match failed against query: %w", err)
// TODO(creachadair): Should we evict this subscription?
} else if !match {
continue
}
// Subscriptions may be buffered or unbuffered. Unbuffered subscriptions
// are intended for internal use such as indexing, where we don't want to
// penalize a slow reader. Buffered subscribers must keep up with their
// queue, or they will be terminated.
//
// TODO(creachadair): Unbuffered subscriptions used by the event indexer
// to avoid losing events if it happens to be slow. Rework this so that
// use case doesn't require this affordance, and then remove unbuffered
// subscriptions.
msg := NewMessage(si.sub.id, data, events)
if cap(si.sub.out) == 0 {
si.sub.out <- msg
continue
}
select {
case si.sub.out <- msg:
// ok, delivered
default:
// slow subscriber, cancel them
evict.add(si)
}
}
return nil
}