@ -41,6 +41,7 @@ import (
"sync"
"sync"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/pubsub/query"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/libs/service"
)
)
@ -59,32 +60,21 @@ var (
ErrServerStopped = errors . New ( "pubsub server is stopped" )
ErrServerStopped = errors . New ( "pubsub server is stopped" )
)
)
// Query defines an interface for a query to be used for subscribing. A query
// matches against a map of events. Each key in this map is a composite of the
// even type and an attribute key (e.g. "{eventType}.{eventAttrKey}") and the
// values are the event values that are contained under that relationship. This
// allows event types to repeat themselves with the same set of keys and
// different values.
type Query interface {
Matches ( events [ ] types . Event ) ( bool , error )
String ( ) string
}
// SubscribeArgs are the parameters to create a new subscription.
// SubscribeArgs are the parameters to create a new subscription.
type SubscribeArgs struct {
type SubscribeArgs struct {
ClientID string // Client ID
Query Query // filter query for events (required)
Limit int // subscription queue capacity limit (0 means 1)
Quota int // subscription queue soft quota (0 uses Limit)
ClientID string // Client ID
Query * query . Query // filter query for events (required)
Limit int // subscription queue capacity limit (0 means 1)
Quota int // subscription queue soft quota (0 uses Limit)
}
}
// UnsubscribeArgs are the parameters to remove a subscription.
// UnsubscribeArgs are the parameters to remove a subscription.
// The subscriber ID must be populated, and at least one of the client ID or
// The subscriber ID must be populated, and at least one of the client ID or
// the registered query.
// the registered query.
type UnsubscribeArgs struct {
type UnsubscribeArgs struct {
Subscriber string // subscriber ID chosen by the client (required)
ID string // subscription ID (assigned by the server)
Query Query // the query registered with the subscription
Subscriber string // subscriber ID chosen by the client (required)
ID string // subscription ID (assigned by the server)
Query * query . Query // the query registered with the subscription
}
}
// Validate returns nil if args are valid to identify a subscription to remove.
// Validate returns nil if args are valid to identify a subscription to remove.
@ -93,10 +83,6 @@ func (args UnsubscribeArgs) Validate() error {
if args . Subscriber == "" {
if args . Subscriber == "" {
return errors . New ( "must specify a subscriber" )
return errors . New ( "must specify a subscriber" )
}
}
if args . ID == "" && args . Query == nil {
return fmt . Errorf ( "subscription is not fully defined [subscriber=%q]" , args . Subscriber )
}
return nil
return nil
}
}
@ -170,7 +156,7 @@ func (s *Server) BufferCapacity() int { return cap(s.queue) }
// If len(capacities) > 0, its first value is used as the queue capacity.
// If len(capacities) > 0, its first value is used as the queue capacity.
//
//
// Deprecated: Use SubscribeWithArgs. This method will be removed in v0.36.
// Deprecated: Use SubscribeWithArgs. This method will be removed in v0.36.
func ( s * Server ) Subscribe ( ctx context . Context , clientID string , query Query , capacities ... int ) ( * Subscription , error ) {
func ( s * Server ) Subscribe ( ctx context . Context , clientID string , query * query . Query , capacities ... int ) ( * Subscription , error ) {
args := SubscribeArgs {
args := SubscribeArgs {
ClientID : clientID ,
ClientID : clientID ,
Query : query ,
Query : query ,
@ -191,7 +177,7 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ca
// being forwarded to any subscriber. If no queries are specified, all
// being forwarded to any subscriber. If no queries are specified, all
// messages will be observed. An error is reported if an observer is already
// messages will be observed. An error is reported if an observer is already
// registered.
// registered.
func ( s * Server ) Observe ( ctx context . Context , observe func ( Message ) error , queries ... Query ) error {
func ( s * Server ) Observe ( ctx context . Context , observe func ( Message ) error , queries ... * query . Query ) error {
s . subs . Lock ( )
s . subs . Lock ( )
defer s . subs . Unlock ( )
defer s . subs . Unlock ( )
if observe == nil {
if observe == nil {
@ -207,8 +193,7 @@ func (s *Server) Observe(ctx context.Context, observe func(Message) error, queri
} else {
} else {
matches = func ( msg Message ) bool {
matches = func ( msg Message ) bool {
for _ , q := range queries {
for _ , q := range queries {
match , err := q . Matches ( msg . events )
if err == nil && match {
if q . Matches ( msg . events ) {
return true
return true
}
}
}
}
@ -229,9 +214,6 @@ func (s *Server) Observe(ctx context.Context, observe func(Message) error, queri
// error if the query is nil, a subscription already exists for the specified
// error if the query is nil, a subscription already exists for the specified
// client ID and query, or if the capacity arguments are invalid.
// client ID and query, or if the capacity arguments are invalid.
func ( s * Server ) SubscribeWithArgs ( ctx context . Context , args SubscribeArgs ) ( * Subscription , error ) {
func ( s * Server ) SubscribeWithArgs ( ctx context . Context , args SubscribeArgs ) ( * Subscription , error ) {
if args . Query == nil {
return nil , errors . New ( "query is nil" )
}
s . subs . Lock ( )
s . subs . Lock ( )
defer s . subs . Unlock ( )
defer s . subs . Unlock ( )
@ -440,11 +422,7 @@ func (s *Server) send(data interface{}, events []types.Event) error {
}
}
for si := range s . subs . index . all {
for si := range s . subs . index . all {
match , err := si . query . Matches ( events )
if err != nil {
return fmt . Errorf ( "match failed against query: %w" , err )
// TODO(creachadair): Should we evict this subscription?
} else if ! match {
if ! si . query . Matches ( events ) {
continue
continue
}
}