You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

261 lines
7.8 KiB

package core
import (
"context"
"errors"
"fmt"
"time"
"github.com/tendermint/tendermint/internal/eventlog"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/internal/jsontypes"
tmpubsub "github.com/tendermint/tendermint/internal/pubsub"
tmquery "github.com/tendermint/tendermint/internal/pubsub/query"
"github.com/tendermint/tendermint/rpc/coretypes"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
)
const (
// Buffer on the Tendermint (server) side to allow some slowness in clients.
subBufferSize = 100
// maxQueryLength is the maximum length of a query string that will be
// accepted. This is just a safety check to avoid outlandish queries.
maxQueryLength = 512
)
// Subscribe for events via WebSocket.
// More: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe
func (env *Environment) Subscribe(ctx context.Context, query string) (*coretypes.ResultSubscribe, error) {
callInfo := rpctypes.GetCallInfo(ctx)
addr := callInfo.RemoteAddr()
if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients)
} else if env.EventBus.NumClientSubscriptions(addr) >= env.Config.MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient)
} else if len(query) > maxQueryLength {
return nil, errors.New("maximum query length exceeded")
}
env.Logger.Info("WARNING: Websocket subscriptions are deprecated and will be removed " +
"in Tendermint v0.37. See https://tinyurl.com/adr075 for more information.")
env.Logger.Info("Subscribe to query", "remote", addr, "query", query)
q, err := tmquery.New(query)
if err != nil {
return nil, fmt.Errorf("failed to parse query: %w", err)
}
subCtx, cancel := context.WithTimeout(ctx, SubscribeTimeout)
defer cancel()
sub, err := env.EventBus.SubscribeWithArgs(subCtx, tmpubsub.SubscribeArgs{
ClientID: addr,
Query: q,
Limit: subBufferSize,
})
if err != nil {
return nil, err
}
// Capture the current ID, since it can change in the future.
subscriptionID := callInfo.RPCRequest.ID
go func() {
opctx, opcancel := context.WithCancel(context.TODO())
defer opcancel()
for {
msg, err := sub.Next(opctx)
if errors.Is(err, tmpubsub.ErrUnsubscribed) {
// The subscription was removed by the client.
return
} else if errors.Is(err, tmpubsub.ErrTerminated) {
// The subscription was terminated by the publisher.
resp := callInfo.RPCRequest.MakeError(err)
ok := callInfo.WSConn.TryWriteRPCResponse(opctx, resp)
if !ok {
env.Logger.Info("Unable to write response (slow client)",
"to", addr, "subscriptionID", subscriptionID, "err", err)
}
return
}
// We have a message to deliver to the client.
resp := callInfo.RPCRequest.MakeResponse(&coretypes.ResultEvent{
Query: query,
Data: msg.Data(),
Events: msg.Events(),
})
wctx, cancel := context.WithTimeout(opctx, 10*time.Second)
err = callInfo.WSConn.WriteRPCResponse(wctx, resp)
cancel()
if err != nil {
env.Logger.Info("Unable to write response (slow client)",
"to", addr, "subscriptionID", subscriptionID, "err", err)
}
}
}()
return &coretypes.ResultSubscribe{}, nil
}
// Unsubscribe from events via WebSocket.
// More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe
func (env *Environment) Unsubscribe(ctx context.Context, query string) (*coretypes.ResultUnsubscribe, error) {
args := tmpubsub.UnsubscribeArgs{Subscriber: rpctypes.GetCallInfo(ctx).RemoteAddr()}
env.Logger.Info("Unsubscribe from query", "remote", args.Subscriber, "subscription", query)
var err error
args.Query, err = tmquery.New(query)
if err != nil {
args.ID = query
}
err = env.EventBus.Unsubscribe(ctx, args)
if err != nil {
return nil, err
}
return &coretypes.ResultUnsubscribe{}, nil
}
// UnsubscribeAll from all events via WebSocket.
// More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe_all
func (env *Environment) UnsubscribeAll(ctx context.Context) (*coretypes.ResultUnsubscribe, error) {
addr := rpctypes.GetCallInfo(ctx).RemoteAddr()
env.Logger.Info("Unsubscribe from all", "remote", addr)
err := env.EventBus.UnsubscribeAll(ctx, addr)
if err != nil {
return nil, err
}
return &coretypes.ResultUnsubscribe{}, nil
}
// Events applies a query to the event log. If an event log is not enabled,
// Events reports an error. Otherwise, it filters the current contents of the
// log to return matching events.
//
// Events returns up to maxItems of the newest eligible event items. An item is
// eligible if it is older than before (or before is zero), it is newer than
// after (or after is zero), and its data matches the filter. A nil filter
// matches all event data.
//
// If before is zero and no eligible event items are available, Events waits
// for up to waitTime for a matching item to become available. The wait is
// terminated early if ctx ends.
//
// If maxItems ≤ 0, a default positive number of events is chosen. The values
// of maxItems and waitTime may be capped to sensible internal maxima without
// reporting an error to the caller.
func (env *Environment) Events(ctx context.Context,
filter *coretypes.EventFilter,
maxItems int,
before, after cursor.Cursor,
waitTime time.Duration,
) (*coretypes.ResultEvents, error) {
if env.EventLog == nil {
return nil, errors.New("the event log is not enabled")
}
// Parse and validate parameters.
if maxItems <= 0 {
maxItems = 10
} else if maxItems > 100 {
maxItems = 100
}
const maxWaitTime = 30 * time.Second
if waitTime > maxWaitTime {
waitTime = maxWaitTime
}
query := tmquery.All
if filter != nil && filter.Query != "" {
q, err := tmquery.New(filter.Query)
if err != nil {
return nil, fmt.Errorf("invalid filter query: %w", err)
}
query = q
}
var info eventlog.Info
var items []*eventlog.Item
var err error
accept := func(itm *eventlog.Item) error {
// N.B. We accept up to one item more than requested, so we can tell how
// to set the "more" flag in the response.
if len(items) > maxItems {
return eventlog.ErrStopScan
}
if cursorInRange(itm.Cursor, before, after) && query.Matches(itm.Events) {
items = append(items, itm)
}
return nil
}
if waitTime > 0 && before.IsZero() {
ctx, cancel := context.WithTimeout(ctx, waitTime)
defer cancel()
// Long poll. The loop here is because new items may not match the query,
// and we want to keep waiting until we have relevant results (or time out).
cur := after
for len(items) == 0 {
info, err = env.EventLog.WaitScan(ctx, cur, accept)
if err != nil {
// Don't report a timeout as a request failure.
if errors.Is(err, context.DeadlineExceeded) {
err = nil
}
break
}
cur = info.Newest
}
} else {
// Quick poll, return only what is already available.
info, err = env.EventLog.Scan(accept)
}
if err != nil {
return nil, err
}
more := len(items) > maxItems
if more {
items = items[:len(items)-1]
}
enc, err := marshalItems(items)
if err != nil {
return nil, err
}
return &coretypes.ResultEvents{
Items: enc,
More: more,
Oldest: cursorString(info.Oldest),
Newest: cursorString(info.Newest),
}, nil
}
func cursorString(c cursor.Cursor) string {
if c.IsZero() {
return ""
}
return c.String()
}
func cursorInRange(c, before, after cursor.Cursor) bool {
return (before.IsZero() || c.Before(before)) && (after.IsZero() || after.Before(c))
}
func marshalItems(items []*eventlog.Item) ([]*coretypes.EventItem, error) {
out := make([]*coretypes.EventItem, len(items))
for i, itm := range items {
v, err := jsontypes.Marshal(itm.Data)
if err != nil {
return nil, fmt.Errorf("encoding event data: %w", err)
}
out[i] = &coretypes.EventItem{Cursor: itm.Cursor.String(), Event: itm.Type}
out[i].Data = v
}
return out, nil
}