- package core
-
- import (
- "context"
- "errors"
- "fmt"
- "time"
-
- "github.com/tendermint/tendermint/internal/eventlog"
- "github.com/tendermint/tendermint/internal/eventlog/cursor"
- "github.com/tendermint/tendermint/internal/jsontypes"
- tmpubsub "github.com/tendermint/tendermint/internal/pubsub"
- tmquery "github.com/tendermint/tendermint/internal/pubsub/query"
- "github.com/tendermint/tendermint/rpc/coretypes"
- rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
- )
-
- const (
- // Buffer on the Tendermint (server) side to allow some slowness in clients.
- subBufferSize = 100
-
- // maxQueryLength is the maximum length of a query string that will be
- // accepted. This is just a safety check to avoid outlandish queries.
- maxQueryLength = 512
- )
-
- // Subscribe for events via WebSocket.
- // More: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe
- func (env *Environment) Subscribe(ctx context.Context, query string) (*coretypes.ResultSubscribe, error) {
- callInfo := rpctypes.GetCallInfo(ctx)
- addr := callInfo.RemoteAddr()
-
- if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
- return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients)
- } else if env.EventBus.NumClientSubscriptions(addr) >= env.Config.MaxSubscriptionsPerClient {
- return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient)
- } else if len(query) > maxQueryLength {
- return nil, errors.New("maximum query length exceeded")
- }
-
- env.Logger.Info("Subscribe to query", "remote", addr, "query", query)
-
- q, err := tmquery.New(query)
- if err != nil {
- return nil, fmt.Errorf("failed to parse query: %w", err)
- }
-
- subCtx, cancel := context.WithTimeout(ctx, SubscribeTimeout)
- defer cancel()
-
- sub, err := env.EventBus.SubscribeWithArgs(subCtx, tmpubsub.SubscribeArgs{
- ClientID: addr,
- Query: q,
- Limit: subBufferSize,
- })
- if err != nil {
- return nil, err
- }
-
- // Capture the current ID, since it can change in the future.
- subscriptionID := callInfo.RPCRequest.ID
- go func() {
- opctx, opcancel := context.WithCancel(context.TODO())
- defer opcancel()
-
- for {
- msg, err := sub.Next(opctx)
- if errors.Is(err, tmpubsub.ErrUnsubscribed) {
- // The subscription was removed by the client.
- return
- } else if errors.Is(err, tmpubsub.ErrTerminated) {
- // The subscription was terminated by the publisher.
- resp := callInfo.RPCRequest.MakeError(err)
- ok := callInfo.WSConn.TryWriteRPCResponse(opctx, resp)
- if !ok {
- env.Logger.Info("Unable to write response (slow client)",
- "to", addr, "subscriptionID", subscriptionID, "err", err)
- }
- return
- }
-
- // We have a message to deliver to the client.
- resp := callInfo.RPCRequest.MakeResponse(&coretypes.ResultEvent{
- Query: query,
- Data: msg.Data(),
- Events: msg.Events(),
- })
- wctx, cancel := context.WithTimeout(opctx, 10*time.Second)
- err = callInfo.WSConn.WriteRPCResponse(wctx, resp)
- cancel()
- if err != nil {
- env.Logger.Info("Unable to write response (slow client)",
- "to", addr, "subscriptionID", subscriptionID, "err", err)
- }
- }
- }()
-
- return &coretypes.ResultSubscribe{}, nil
- }
-
- // Unsubscribe from events via WebSocket.
- // More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe
- func (env *Environment) Unsubscribe(ctx context.Context, query string) (*coretypes.ResultUnsubscribe, error) {
- args := tmpubsub.UnsubscribeArgs{Subscriber: rpctypes.GetCallInfo(ctx).RemoteAddr()}
- env.Logger.Info("Unsubscribe from query", "remote", args.Subscriber, "subscription", query)
-
- var err error
- args.Query, err = tmquery.New(query)
-
- if err != nil {
- args.ID = query
- }
-
- err = env.EventBus.Unsubscribe(ctx, args)
- if err != nil {
- return nil, err
- }
- return &coretypes.ResultUnsubscribe{}, nil
- }
-
- // UnsubscribeAll from all events via WebSocket.
- // More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe_all
- func (env *Environment) UnsubscribeAll(ctx context.Context) (*coretypes.ResultUnsubscribe, error) {
- addr := rpctypes.GetCallInfo(ctx).RemoteAddr()
- env.Logger.Info("Unsubscribe from all", "remote", addr)
- err := env.EventBus.UnsubscribeAll(ctx, addr)
- if err != nil {
- return nil, err
- }
- return &coretypes.ResultUnsubscribe{}, nil
- }
-
- // Events applies a query to the event log. If an event log is not enabled,
- // Events reports an error. Otherwise, it filters the current contents of the
- // log to return matching events.
- //
- // Events returns up to maxItems of the newest eligible event items. An item is
- // eligible if it is older than before (or before is zero), it is newer than
- // after (or after is zero), and its data matches the filter. A nil filter
- // matches all event data.
- //
- // If before is zero and no eligible event items are available, Events waits
- // for up to waitTime for a matching item to become available. The wait is
- // terminated early if ctx ends.
- //
- // If maxItems ≤ 0, a default positive number of events is chosen. The values
- // of maxItems and waitTime may be capped to sensible internal maxima without
- // reporting an error to the caller.
- func (env *Environment) Events(ctx context.Context,
- filter *coretypes.EventFilter,
- maxItems int,
- before, after cursor.Cursor,
- waitTime time.Duration,
- ) (*coretypes.ResultEvents, error) {
- if env.EventLog == nil {
- return nil, errors.New("the event log is not enabled")
- }
-
- // Parse and validate parameters.
- if maxItems <= 0 {
- maxItems = 10
- } else if maxItems > 100 {
- maxItems = 100
- }
-
- const maxWaitTime = 30 * time.Second
- if waitTime > maxWaitTime {
- waitTime = maxWaitTime
- }
-
- query := tmquery.All
- if filter != nil && filter.Query != "" {
- q, err := tmquery.New(filter.Query)
- if err != nil {
- return nil, fmt.Errorf("invalid filter query: %w", err)
- }
- query = q
- }
-
- var info eventlog.Info
- var items []*eventlog.Item
- var err error
- accept := func(itm *eventlog.Item) error {
- // N.B. We accept up to one item more than requested, so we can tell how
- // to set the "more" flag in the response.
- if len(items) > maxItems {
- return eventlog.ErrStopScan
- }
- if cursorInRange(itm.Cursor, before, after) && query.Matches(itm.Events) {
- items = append(items, itm)
- }
- return nil
- }
-
- if waitTime > 0 && before.IsZero() {
- ctx, cancel := context.WithTimeout(ctx, waitTime)
- defer cancel()
-
- // Long poll. The loop here is because new items may not match the query,
- // and we want to keep waiting until we have relevant results (or time out).
- cur := after
- for len(items) == 0 {
- info, err = env.EventLog.WaitScan(ctx, cur, accept)
- if err != nil {
- // Don't report a timeout as a request failure.
- if errors.Is(err, context.DeadlineExceeded) {
- err = nil
- }
- break
- }
- cur = info.Newest
- }
- } else {
- // Quick poll, return only what is already available.
- info, err = env.EventLog.Scan(accept)
- }
- if err != nil {
- return nil, err
- }
-
- more := len(items) > maxItems
- if more {
- items = items[:len(items)-1]
- }
- enc, err := marshalItems(items)
- if err != nil {
- return nil, err
- }
- return &coretypes.ResultEvents{
- Items: enc,
- More: more,
- Oldest: cursorString(info.Oldest),
- Newest: cursorString(info.Newest),
- }, nil
- }
-
- func cursorString(c cursor.Cursor) string {
- if c.IsZero() {
- return ""
- }
- return c.String()
- }
-
- func cursorInRange(c, before, after cursor.Cursor) bool {
- return (before.IsZero() || c.Before(before)) && (after.IsZero() || after.Before(c))
- }
-
- func marshalItems(items []*eventlog.Item) ([]*coretypes.EventItem, error) {
- out := make([]*coretypes.EventItem, len(items))
- for i, itm := range items {
- v, err := jsontypes.Marshal(itm.Data)
- if err != nil {
- return nil, fmt.Errorf("encoding event data: %w", err)
- }
- out[i] = &coretypes.EventItem{Cursor: itm.Cursor.String(), Event: itm.Type}
- out[i].Data = v
- }
- return out, nil
- }
|