@ -1,14 +1,12 @@
// Package pubsub implements a pub-sub model with a single publisher (Server)
// and multiple subscribers (clients).
// Package pubsub implements an event dispatching server with a single publisher
// and multiple subscriber clients. Multiple goroutines can safely publish to a
// single Server instance.
//
// Though you can have multiple publishers by sharing a pointer to a server or
// by giving the same channel to each publisher and publishing messages from
// that channel (fan-in).
//
// Clients subscribe for messages, which could be of any type, using a query.
// When some message is published, we match it with all queries. If there is a
// match, this message will be pushed to all clients, subscribed to that query.
// See query subpackage for our implementation.
// Clients register subscriptions with a query to select which messages they
// wish to receive. When messages are published, they are broadcast to all
// clients whose subscription query matches that message. Queries are
// constructed using the github.com/tendermint/tendermint/libs/pubsub/query
// package.
//
// Example:
//
@ -16,7 +14,7 @@
// if err != nil {
// return err
// }
// ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
// ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
// defer cancel()
// subscription, err := pubsub.Subscribe(ctx, "johns-transactions", q)
// if err != nil {
@ -38,22 +36,12 @@ import (
"context"
"errors"
"fmt"
"sync"
"github.com/tendermint/tendermint/abci/types"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/libs/service"
)
type operation int
const (
sub operation = iota
pub
unsub
shutdown
)
var (
// ErrSubscriptionNotFound is returned when a client tries to unsubscribe
// from not existing subscription.
@ -62,6 +50,10 @@ var (
// ErrAlreadySubscribed is returned when a client tries to subscribe twice or
// more using the same query.
ErrAlreadySubscribed = errors . New ( "already subscribed" )
// ErrServerStopped is returned when attempting to publish or subscribe to a
// server that has been stopped.
ErrServerStopped = errors . New ( "pubsub server is stopped" )
)
// Query defines an interface for a query to be used for subscribing. A query
@ -75,17 +67,21 @@ type Query interface {
String ( ) string
}
// UnsubscribeArgs are the parameters to remove a subscription.
// The subscriber ID must be populated, and at least one of the client ID or
// the registered query.
type UnsubscribeArgs struct {
ID string
Subscriber string
Query Query
Subscriber string // subscriber ID chosen by the client (required)
ID string // subscription ID (assigned by the server)
Query Query // the query registered with the subscription
}
// Validate returns nil if args are valid to identify a subscription to remove.
// Otherwise, it reports an error.
func ( args UnsubscribeArgs ) Validate ( ) error {
if args . Subscriber == "" {
return errors . New ( "must specify a subscriber" )
}
if args . ID == "" && args . Query == nil {
return fmt . Errorf ( "subscription is not fully defined [subscriber=%q]" , args . Subscriber )
}
@ -93,35 +89,28 @@ func (args UnsubscribeArgs) Validate() error {
return nil
}
type cmd struct {
op operation
// subscribe, unsubscribe
query Query
subscription * Subscription
clientID string
// publish
msg interface { }
events [ ] types . Event
}
// Server allows clients to subscribe/unsubscribe for messages, publishing
// messages with or without events, and manages internal state.
type Server struct {
service . BaseService
cmds chan cmd
cmdsCap int
// check if we have subscription before
// subscribing or unsubscribing
mtx tmsync . RWMutex
queue chan item
done <- chan struct { } // closed when server should exit
stop func ( ) // signal the server to exit
pubs sync . RWMutex // excl: shutdown; shared: active publisher
exited chan struct { } // server exited
// All subscriptions currently known.
// Lock exclusive to add, remove, or cancel subscriptions.
// Lock shared to look up or publish to subscriptions.
subs struct {
sync . RWMutex
index * subIndex
}
// subscriber -> [query->id (string) OR id->query (string))],
// track connections both by ID (new) and query (legacy) to
// avoid breaking the interface.
subscriptions map [ string ] map [ string ] string
// TODO(creachadair): Rework the options so that this does not need to live
// as a field. It is not otherwise needed.
queueCap int
}
// Option sets a parameter for the server.
@ -131,37 +120,34 @@ type Option func(*Server)
// for a detailed description of how to configure buffering. If no options are
// provided, the resulting server's queue is unbuffered.
func NewServer ( options ... Option ) * Server {
s := & Server {
subscriptions : make ( map [ string ] map [ string ] string ) ,
s := new ( Server )
for _ , opt := range options {
opt ( s )
}
s . BaseService = * service . NewBaseService ( nil , "PubSub" , s )
for _ , option := range options {
option ( s )
}
// The queue receives items to be published.
s . queue = make ( chan item , s . queueCap )
// if BufferCapacity option was not set, the channel is unbuffered
s . cmds = make ( chan cmd , s . cmdsCap )
// The index tracks subscriptions by ID and query terms.
s . subs . index = newSubIndex ( )
return s
}
// BufferCapacity allows you to specify capacity for the internal server' s
// queue. Since the server, given Y subscribers, could only process X messages,
// this option could be used to survive spikes (e.g. high amount of
// transactions during peak hours) .
// BufferCapacity allows you to specify capacity for publisher's queue. Thi s
// is the number of messages that can be published without blocking. If no
// buffer is specified, publishing is synchronous with delivery. This function
// will panic if cap < 0 .
func BufferCapacity ( cap int ) Option {
return func ( s * Server ) {
if cap > 0 {
s . cmdsCap = cap
}
if cap < 0 {
panic ( "negative buffer capacity" )
}
return func ( s * Server ) { s . queueCap = cap }
}
// BufferCapacity returns capacity of the internal server's queue.
func ( s * Server ) BufferCapacity ( ) int {
return s . cmdsCap
}
// BufferCapacity returns capacity of the publication queue.
func ( s * Server ) BufferCapacity ( ) int { return cap ( s . queue ) }
// Subscribe creates a subscription for the given client.
//
@ -195,331 +181,223 @@ func (s *Server) SubscribeUnbuffered(ctx context.Context, clientID string, query
}
func ( s * Server ) subscribe ( ctx context . Context , clientID string , query Query , outCapacity int ) ( * Subscription , error ) {
s . mtx . RLock ( )
clientSubscriptions , ok := s . subscriptions [ clientID ]
if ok {
_ , ok = clientSubscriptions [ query . String ( ) ]
}
s . mtx . RUnlock ( )
if ok {
s . subs . Lock ( )
defer s . subs . Unlock ( )
if s . subs . index == nil {
return nil , ErrServerStopped
} else if s . subs . index . contains ( clientID , query . String ( ) ) {
return nil , ErrAlreadySubscribed
}
subscription := NewSubscription ( outCapacity )
select {
case s . cmds <- cmd { op : sub , clientID : clientID , query : query , subscription : subscription } :
s . mtx . Lock ( )
if _ , ok = s . subscriptions [ clientID ] ; ! ok {
s . subscriptions [ clientID ] = make ( map [ string ] string )
}
s . subscriptions [ clientID ] [ query . String ( ) ] = subscription . id
s . subscriptions [ clientID ] [ subscription . id ] = query . String ( )
s . mtx . Unlock ( )
return subscription , nil
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
case <- s . Quit ( ) :
return nil , nil
}
sub := NewSubscription ( outCapacity )
s . subs . index . add ( & subInfo {
clientID : clientID ,
query : query ,
subID : sub . id ,
sub : sub ,
} )
return sub , nil
}
// Unsubscribe removes the subscription on the given query. An error will be
// returned to the caller if the context is canceled or if subscription does
// not exist.
// Unsubscribe removes the subscription for the given client and/or query. It
// returns ErrSubscriptionNotFound if no such subscription exists.
func ( s * Server ) Unsubscribe ( ctx context . Context , args UnsubscribeArgs ) error {
if err := args . Validate ( ) ; err != nil {
return err
}
var qs string
if args . Query ! = nil {
qs = args . Query . String ( )
s . subs . Lock ( )
defer s . subs . Unlock ( )
if s . subs . index = = nil {
return ErrServerStopped
}
clientSubscriptions , err := func ( ) ( map [ string ] string , error ) {
s . mtx . RLock ( )
defer s . mtx . RUnlock ( )
clientSubscriptions , ok := s . subscriptions [ args . Subscriber ]
if args . ID != "" {
qs , ok = clientSubscriptions [ args . ID ]
if ok && args . Query == nil {
var err error
args . Query , err = query . New ( qs )
if err != nil {
return nil , err
}
}
} else if qs != "" {
args . ID , ok = clientSubscriptions [ qs ]
}
if ! ok {
return nil , ErrSubscriptionNotFound
// TODO(creachadair): Do we need to support unsubscription for an "empty"
// query? I believe that case is not possible by the Query grammar, but we
// should make sure.
//
// Revisit this logic once we are able to remove indexing by query.
var evict subInfoSet
if args . Subscriber != "" {
evict = s . subs . index . findClientID ( args . Subscriber )
if args . Query != nil {
evict = evict . withQuery ( args . Query . String ( ) )
}
return clientSubscriptions , nil
} ( )
if err != nil {
return err
} else {
evict = s . subs . index . findQuery ( args . Query . String ( ) )
}
select {
case s . cmds <- cmd { op : unsub , clientID : args . Subscriber , query : args . Query , subscription : & Subscription { id : args . ID } } :
s . mtx . Lock ( )
defer s . mtx . Unlock ( )
delete ( clientSubscriptions , args . ID )
delete ( clientSubscriptions , qs )
if len ( clientSubscriptions ) == 0 {
delete ( s . subscriptions , args . Subscriber )
}
return nil
case <- ctx . Done ( ) :
return ctx . Err ( )
case <- s . Quit ( ) :
return nil
if len ( evict ) == 0 {
return ErrSubscriptionNotFound
}
s . removeSubs ( evict , ErrUnsubscribed )
return nil
}
// UnsubscribeAll removes all client subscriptions. An error will be returned
// to the caller if the context is canceled or if subscription does not exis t.
// UnsubscribeAll removes all subscriptions for the given client ID.
// It returns ErrSubscriptionNotFound if no subscriptions exist for that client.
func ( s * Server ) UnsubscribeAll ( ctx context . Context , clientID string ) error {
s . mtx . RLock ( )
_ , ok := s . subscriptions [ clientID ]
s . mtx . RUnlock ( )
if ! ok {
return ErrSubscriptionNotFound
}
s . subs . Lock ( )
defer s . subs . Unlock ( )
select {
case s . cmds <- cmd { op : unsub , clientID : clientID } :
s . mtx . Lock ( )
defer s . mtx . Unlock ( )
delete ( s . subscriptions , clientID )
return nil
case <- ctx . Done ( ) :
return ctx . Err ( )
case <- s . Quit ( ) :
return nil
evict := s . subs . index . findClientID ( clientID )
if len ( evict ) == 0 {
return ErrSubscriptionNotFound
}
s . removeSubs ( evict , ErrUnsubscribed )
return nil
}
// NumClients returns the number of clients.
func ( s * Server ) NumClients ( ) int {
s . mtx . RLock ( )
defer s . mtx . RUnlock ( )
return len ( s . subscriptions )
s . subs . RLock ( )
defer s . subs . RUnlock ( )
return len ( s . subs . index . byClient )
}
// NumClientSubscriptions returns the number of subscriptions the client has.
func ( s * Server ) NumClientSubscriptions ( clientID string ) int {
s . mtx . RLock ( )
defer s . mtx . RUnlock ( )
return len ( s . subscriptions [ clientID ] ) / 2
s . subs . RLock ( )
defer s . subs . RUnlock ( )
return len ( s . subs . index . findClientID ( clientID ) )
}
// Publish publishes the given message. An error will be returned to the caller
// if the context is canceled.
func ( s * Server ) Publish ( ctx context . Context , msg interface { } ) error {
return s . PublishWithEvents ( ctx , msg , [ ] types . Event { } )
return s . publish ( ctx , msg , [ ] types . Event { } )
}
// PublishWithEvents publishes the given message with the set of events. The set
// is matched with clients queries. If there is a match, the message is sent to
// the client.
func ( s * Server ) PublishWithEvents ( ctx context . Context , msg interface { } , events [ ] types . Event ) error {
select {
case s . cmds <- cmd { op : pub , msg : msg , events : events } :
return nil
case <- ctx . Done ( ) :
return ctx . Err ( )
case <- s . Quit ( ) :
return nil
}
return s . publish ( ctx , msg , events )
}
// OnStop implements Service.OnStop by shutting down the server.
func ( s * Server ) OnStop ( ) {
s . cmds <- cmd { op : shutdown }
}
// NOTE: not goroutine safe
type state struct {
// query string -> client -> subscription
subscriptions map [ string ] map [ string ] * Subscription
// query string -> queryPlusRefCount
queries map [ string ] * queryPlusRefCount
}
func ( s * Server ) OnStop ( ) { s . stop ( ) }
// queryPlusRefCount holds a pointer to a query and reference counter. When
// refCount is zero, query will be removed.
type queryPlusRefCount struct {
q Query
refCount int
}
// Wait implements Service.Wait by blocking until the server has exited, then
// yielding to the base service wait.
func ( s * Server ) Wait ( ) { <- s . exited ; s . BaseService . Wait ( ) }
// OnStart implements Service.OnStart by starting the server.
func ( s * Server ) OnStart ( ) error {
go s . loop ( state {
subscriptions : make ( map [ string ] map [ string ] * Subscription ) ,
queries : make ( map [ string ] * queryPlusRefCount ) ,
} )
return nil
}
func ( s * Server ) OnStart ( ) error { s . run ( ) ; return nil }
// OnReset implements Service.OnReset
func ( s * Server ) OnReset ( ) error {
return nil
}
// OnReset implements Service.OnReset. It has no effect for this service.
func ( s * Server ) OnReset ( ) error { return nil }
func ( s * Server ) loop ( state state ) {
loop :
for cmd := range s . cmds {
switch cmd . op {
case unsub :
if cmd . query != nil {
state . remove ( cmd . clientID , cmd . query . String ( ) , cmd . subscription . id , ErrUnsubscribed )
} else {
state . removeClient ( cmd . clientID , ErrUnsubscribed )
}
case shutdown :
state . removeAll ( nil )
break loop
case sub :
state . add ( cmd . clientID , cmd . query , cmd . subscription )
case pub :
if err := state . send ( cmd . msg , cmd . events ) ; err != nil {
s . Logger . Error ( "Error querying for events" , "err" , err )
}
}
}
}
func ( state * state ) add ( clientID string , q Query , subscription * Subscription ) {
qStr := q . String ( )
func ( s * Server ) publish ( ctx context . Context , data interface { } , events [ ] types . Event ) error {
s . pubs . RLock ( )
defer s . pubs . RUnlock ( )
// initialize subscription for this client per query if needed
if _ , ok := state . subscriptions [ qStr ] ; ! ok {
state . subscriptions [ qStr ] = make ( map [ string ] * Subscription )
}
if _ , ok := state . subscriptions [ subscription . id ] ; ! ok {
state . subscriptions [ subscription . id ] = make ( map [ string ] * Subscription )
}
// create subscription
state . subscriptions [ qStr ] [ clientID ] = subscription
state . subscriptions [ subscription . id ] [ clientID ] = subscription
// initialize query if needed
if _ , ok := state . queries [ qStr ] ; ! ok {
state . queries [ qStr ] = & queryPlusRefCount { q : q , refCount : 0 }
select {
case <- s . done :
return ErrServerStopped
case <- ctx . Done ( ) :
return ctx . Err ( )
case s . queue <- item {
Data : data ,
Events : events ,
} :
return nil
}
// increment reference counter
state . queries [ qStr ] . refCount ++
}
func ( state * state ) remove ( clientID string , qStr , id string , reason error ) {
clientSubscriptions , ok := state . subscriptions [ qStr ]
if ! ok {
return
}
subscription , ok := clientSubscriptions [ clientID ]
if ! ok {
return
}
func ( s * Server ) run ( ) {
// The server runs until ctx is canceled.
ctx , cancel := context . WithCancel ( context . Background ( ) )
s . done = ctx . Done ( )
s . stop = cancel
// Shutdown monitor: When the context ends, wait for any active publish
// calls to exit, then close the queue to signal the sender to exit.
go func ( ) {
<- ctx . Done ( )
s . pubs . Lock ( )
defer s . pubs . Unlock ( )
close ( s . queue )
} ( )
subscription . cancel ( reason )
s . exited = make ( chan struct { } )
go func ( ) {
defer close ( s . exited )
// remove client from query map.
// if query has no other clients subscribed, remove it.
delete ( state . subscriptions [ qStr ] , clientID )
delete ( state . subscriptions [ id ] , clientID )
if len ( state . subscriptions [ qStr ] ) == 0 {
delete ( state . subscriptions , qStr )
}
// decrease ref counter in queries
if ref , ok := state . queries [ qStr ] ; ok {
ref . refCount --
if ref . refCount == 0 {
// remove the query if nobody else is using it
delete ( state . queries , qStr )
// Sender: Service the queue and forward messages to subscribers.
for it := range s . queue {
if err := s . send ( it . Data , it . Events ) ; err != nil {
s . Logger . Error ( "Error sending event" , "err" , err )
}
}
}
// Terminate all subscribers without error before exit.
s . subs . Lock ( )
defer s . subs . Unlock ( )
for si := range s . subs . index . all {
si . sub . cancel ( nil )
}
s . subs . index = nil
} ( )
}
func ( state * state ) removeClient ( clientID string , reason error ) {
seen := map [ string ] struct { } { }
for qStr , clientSubscriptions := range state . subscriptions {
if sub , ok := clientSubscriptions [ clientID ] ; ok {
if _ , ok = seen [ sub . id ] ; ok {
// all subscriptions are double indexed by ID and query, only
// process them once.
continue
}
state . remove ( clientID , qStr , sub . id , reason )
seen [ sub . id ] = struct { } { }
}
// removeSubs cancels and removes all the subscriptions in evict with the given
// error. The caller must hold the s.subs lock.
func ( s * Server ) removeSubs ( evict subInfoSet , reason error ) {
for si := range evict {
si . sub . cancel ( reason )
}
s . subs . index . removeAll ( evict )
}
func ( state * state ) removeAll ( reason error ) {
for qStr , clientSubscriptions := range state . subscriptions {
sub , ok := clientSubscriptions [ qStr ]
if ! ok || ok && sub . id == qStr {
// all subscriptions are double indexed by ID and query, only
// process them once.
continue
// send delivers the given message to all matching subscribers. An error in
// query matching stops transmission and is returned.
func ( s * Server ) send ( data interface { } , events [ ] types . Event ) error {
// At exit, evict any subscriptions that were too slow.
evict := make ( subInfoSet )
defer func ( ) {
if len ( evict ) != 0 {
s . subs . Lock ( )
defer s . subs . Unlock ( )
s . removeSubs ( evict , ErrOutOfCapacity )
}
} ( )
for clientID := range clientSubscriptions {
state . remove ( clientID , qStr , sub . id , reason )
}
}
}
// N.B. Order is important here. We must acquire and defer the lock release
// AFTER deferring the eviction cleanup: The cleanup must happen after the
// reader lock has released, or it will deadlock.
s . subs . RLock ( )
defer s . subs . RUnlock ( )
func ( state * state ) send ( msg interface { } , events [ ] types . Event ) error {
for qStr , clientSubscriptions := range state . subscriptions {
if sub , ok := clientSubscriptions [ qStr ] ; ok && sub . id == qStr {
continue
}
var q Query
if qi , ok := state . queries [ qStr ] ; ok {
q = qi . q
} else {
for si := range s . subs . index . all {
match , err := si . query . Matches ( events )
if err != nil {
return fmt . Errorf ( "match failed against query: %w" , err )
// TODO(creachadair): Should we evict this subscription?
} else if ! match {
continue
}
match , err := q . Matches ( events )
if err != nil {
return fmt . Errorf ( "failed to match against query %s: %w" , q . String ( ) , err )
// Subscriptions may be buffered or unbuffered. Unbuffered subscriptions
// are intended for internal use such as indexing, where we don't want to
// penalize a slow reader. Buffered subscribers must keep up with their
// queue, or they will be terminated.
//
// TODO(creachadair): Unbuffered subscriptions used by the event indexer
// to avoid losing events if it happens to be slow. Rework this so that
// use case doesn't require this affordance, and then remove unbuffered
// subscriptions.
msg := NewMessage ( si . sub . id , data , events )
if cap ( si . sub . out ) == 0 {
si . sub . out <- msg
continue
}
if match {
for clientID , subscription := range clientSubscriptions {
if cap ( subscription . out ) == 0 {
// block on unbuffered channel
select {
case subscription . out <- NewMessage ( subscription . id , msg , events ) :
case <- subscription . canceled :
}
} else {
// don't block on buffered channels
select {
case subscription . out <- NewMessage ( subscription . id , msg , events ) :
default :
state . remove ( clientID , qStr , subscription . id , ErrOutOfCapacity )
}
}
}
select {
case si . sub . out <- msg :
// ok, delivered
default :
// slow subscriber, cancel them
evict . add ( si )
}
}