Browse Source

rpc/client: add Events method to the client interface (#7982)

- Update documentation to deprecate the old methods.
- Add Events methods to HTTP, WS, and Local clients.
- Add Events method to the light client wrapper.
- Rename legacy events client to SubscriptionClient.
pull/7988/head
M. J. Fromberger 3 years ago
committed by GitHub
parent
commit
211b80a484
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 81 additions and 29 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +8
    -5
      light/proxy/routes.go
  3. +10
    -6
      light/rpc/client.go
  4. +1
    -1
      rpc/client/helpers.go
  5. +8
    -0
      rpc/client/http/http.go
  6. +7
    -7
      rpc/client/http/ws.go
  7. +33
    -10
      rpc/client/interface.go
  8. +12
    -0
      rpc/client/local/local.go
  9. +1
    -0
      rpc/test/helpers.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -18,6 +18,7 @@ Special thanks to external contributors on this release:
- [rpc] \#7575 Rework how RPC responses are written back via HTTP. (@creachadair) - [rpc] \#7575 Rework how RPC responses are written back via HTTP. (@creachadair)
- [rpc] \#7713 Remove unused options for websocket clients. (@creachadair) - [rpc] \#7713 Remove unused options for websocket clients. (@creachadair)
- [config] \#7930 Add new event subscription options and defaults. (@creachadair) - [config] \#7930 Add new event subscription options and defaults. (@creachadair)
- [rpc] \#7982 Add new Events interface and deprecate Subscribe. (@creachadair)
- Apps - Apps


+ 8
- 5
light/proxy/routes.go View File

@ -2,7 +2,6 @@ package proxy
import ( import (
"context" "context"
"errors"
"time" "time"
"github.com/tendermint/tendermint/internal/eventlog/cursor" "github.com/tendermint/tendermint/internal/eventlog/cursor"
@ -30,15 +29,19 @@ func (p proxyService) GetConsensusState(ctx context.Context) (*coretypes.ResultC
return p.ConsensusState(ctx) return p.ConsensusState(ctx)
} }
// TODO(creachadair): Remove this once the RPC clients support the new method.
// This is just a placeholder to let things build during development.
func (proxyService) Events(ctx context.Context,
func (p proxyService) Events(ctx context.Context,
filter *coretypes.EventFilter, filter *coretypes.EventFilter,
maxItems int, maxItems int,
before, after cursor.Cursor, before, after cursor.Cursor,
waitTime time.Duration, waitTime time.Duration,
) (*coretypes.ResultEvents, error) { ) (*coretypes.ResultEvents, error) {
return nil, errors.New("the /events method is not implemented")
return p.Client.Events(ctx, &coretypes.RequestEvents{
Filter: filter,
MaxItems: maxItems,
Before: before.String(),
After: after.String(),
WaitTime: waitTime,
})
} }
func (p proxyService) Subscribe(ctx context.Context, query string) (*coretypes.ResultSubscribe, error) { func (p proxyService) Subscribe(ctx context.Context, query string) (*coretypes.ResultSubscribe, error) {


+ 10
- 6
light/rpc/client.go View File

@ -292,6 +292,10 @@ func (c *Client) ConsensusParams(ctx context.Context, height *int64) (*coretypes
return res, nil return res, nil
} }
func (c *Client) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) {
return c.next.Events(ctx, req)
}
func (c *Client) Health(ctx context.Context) (*coretypes.ResultHealth, error) { func (c *Client) Health(ctx context.Context) (*coretypes.ResultHealth, error) {
return c.next.Health(ctx) return c.next.Health(ctx)
} }
@ -597,15 +601,15 @@ func (c *Client) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*cor
func (c *Client) Subscribe(ctx context.Context, subscriber, query string, func (c *Client) Subscribe(ctx context.Context, subscriber, query string,
outCapacity ...int) (out <-chan coretypes.ResultEvent, err error) { outCapacity ...int) (out <-chan coretypes.ResultEvent, err error) {
return c.next.Subscribe(ctx, subscriber, query, outCapacity...)
return c.next.Subscribe(ctx, subscriber, query, outCapacity...) //nolint:staticcheck
} }
func (c *Client) Unsubscribe(ctx context.Context, subscriber, query string) error { func (c *Client) Unsubscribe(ctx context.Context, subscriber, query string) error {
return c.next.Unsubscribe(ctx, subscriber, query)
return c.next.Unsubscribe(ctx, subscriber, query) //nolint:staticcheck
} }
func (c *Client) UnsubscribeAll(ctx context.Context, subscriber string) error { func (c *Client) UnsubscribeAll(ctx context.Context, subscriber string) error {
return c.next.UnsubscribeAll(ctx, subscriber)
return c.next.UnsubscribeAll(ctx, subscriber) //nolint:staticcheck
} }
func (c *Client) updateLightClientIfNeededTo(ctx context.Context, height *int64) (*types.LightBlock, error) { func (c *Client) updateLightClientIfNeededTo(ctx context.Context, height *int64) (*types.LightBlock, error) {
@ -636,7 +640,7 @@ func (c *Client) SubscribeWS(ctx context.Context, query string) (*coretypes.Resu
c.closers = append(c.closers, bcancel) c.closers = append(c.closers, bcancel)
callInfo := rpctypes.GetCallInfo(ctx) callInfo := rpctypes.GetCallInfo(ctx)
out, err := c.next.Subscribe(bctx, callInfo.RemoteAddr(), query)
out, err := c.next.Subscribe(bctx, callInfo.RemoteAddr(), query) //nolint:staticcheck
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -660,7 +664,7 @@ func (c *Client) SubscribeWS(ctx context.Context, query string) (*coretypes.Resu
// UnsubscribeWS calls original client's Unsubscribe using remote address as a // UnsubscribeWS calls original client's Unsubscribe using remote address as a
// subscriber. // subscriber.
func (c *Client) UnsubscribeWS(ctx context.Context, query string) (*coretypes.ResultUnsubscribe, error) { func (c *Client) UnsubscribeWS(ctx context.Context, query string) (*coretypes.ResultUnsubscribe, error) {
err := c.next.Unsubscribe(context.Background(), rpctypes.GetCallInfo(ctx).RemoteAddr(), query)
err := c.next.Unsubscribe(context.Background(), rpctypes.GetCallInfo(ctx).RemoteAddr(), query) //nolint:staticcheck
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -670,7 +674,7 @@ func (c *Client) UnsubscribeWS(ctx context.Context, query string) (*coretypes.Re
// UnsubscribeAllWS calls original client's UnsubscribeAll using remote address // UnsubscribeAllWS calls original client's UnsubscribeAll using remote address
// as a subscriber. // as a subscriber.
func (c *Client) UnsubscribeAllWS(ctx context.Context) (*coretypes.ResultUnsubscribe, error) { func (c *Client) UnsubscribeAllWS(ctx context.Context) (*coretypes.ResultUnsubscribe, error) {
err := c.next.UnsubscribeAll(context.Background(), rpctypes.GetCallInfo(ctx).RemoteAddr())
err := c.next.UnsubscribeAll(context.Background(), rpctypes.GetCallInfo(ctx).RemoteAddr()) //nolint:staticcheck
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 1
- 1
rpc/client/helpers.go View File

@ -57,7 +57,7 @@ func WaitForHeight(ctx context.Context, c StatusClient, h int64, waiter Waiter)
// when the timeout duration has expired. // when the timeout duration has expired.
// //
// This handles subscribing and unsubscribing under the hood // This handles subscribing and unsubscribing under the hood
func WaitForOneEvent(ctx context.Context, c EventsClient, eventValue string, timeout time.Duration) (types.EventData, error) {
func WaitForOneEvent(ctx context.Context, c SubscriptionClient, eventValue string, timeout time.Duration) (types.EventData, error) {
const subscriber = "helpers" const subscriber = "helpers"
ctx, cancel := context.WithTimeout(ctx, timeout) ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel() defer cancel()


+ 8
- 0
rpc/client/http/http.go View File

@ -340,6 +340,14 @@ func (c *baseRPCClient) ConsensusParams(
return result, nil return result, nil
} }
func (c *baseRPCClient) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) {
result := new(coretypes.ResultEvents)
if err := c.caller.Call(ctx, "events", req, result); err != nil {
return nil, err
}
return result, nil
}
func (c *baseRPCClient) Health(ctx context.Context) (*coretypes.ResultHealth, error) { func (c *baseRPCClient) Health(ctx context.Context) (*coretypes.ResultHealth, error) {
result := new(coretypes.ResultHealth) result := new(coretypes.ResultHealth)
if err := c.caller.Call(ctx, "health", nil, result); err != nil { if err := c.caller.Call(ctx, "health", nil, result); err != nil {


+ 7
- 7
rpc/client/http/ws.go View File

@ -15,7 +15,7 @@ import (
jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client"
) )
// wsEvents is a wrapper around WSClient, which implements EventsClient.
// wsEvents is a wrapper around WSClient, which implements SubscriptionClient.
type wsEvents struct { type wsEvents struct {
Logger log.Logger Logger log.Logger
ws *jsonrpcclient.WSClient ws *jsonrpcclient.WSClient
@ -30,7 +30,7 @@ type wsSubscription struct {
query string query string
} }
var _ rpcclient.EventsClient = (*wsEvents)(nil)
var _ rpcclient.SubscriptionClient = (*wsEvents)(nil)
func newWsEvents(remote string) (*wsEvents, error) { func newWsEvents(remote string) (*wsEvents, error) {
w := &wsEvents{ w := &wsEvents{
@ -64,7 +64,7 @@ func (w *wsEvents) Start(ctx context.Context) error {
// Stop shuts down the websocket client. // Stop shuts down the websocket client.
func (w *wsEvents) Stop() error { return w.ws.Stop() } func (w *wsEvents) Stop() error { return w.ws.Stop() }
// Subscribe implements EventsClient by using WSClient to subscribe given
// Subscribe implements SubscriptionClient by using WSClient to subscribe given
// subscriber to query. By default, it returns a channel with cap=1. Error is // subscriber to query. By default, it returns a channel with cap=1. Error is
// returned if it fails to subscribe. // returned if it fails to subscribe.
// //
@ -97,8 +97,8 @@ func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string,
return outc, nil return outc, nil
} }
// Unsubscribe implements EventsClient by using WSClient to unsubscribe given
// subscriber from query.
// Unsubscribe implements SubscriptionClient by using WSClient to unsubscribe
// given subscriber from query.
// //
// It returns an error if wsEvents is not running. // It returns an error if wsEvents is not running.
func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error { func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error {
@ -119,8 +119,8 @@ func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) er
return nil return nil
} }
// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe
// given subscriber from all the queries.
// UnsubscribeAll implements SubscriptionClient by using WSClient to
// unsubscribe given subscriber from all the queries.
// //
// It returns an error if wsEvents is not running. // It returns an error if wsEvents is not running.
func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error { func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error {


+ 33
- 10
rpc/client/interface.go View File

@ -37,14 +37,16 @@ type Client interface {
Start(context.Context) error Start(context.Context) error
// These embedded interfaces define the callable methods of the service. // These embedded interfaces define the callable methods of the service.
ABCIClient ABCIClient
EventsClient EventsClient
EvidenceClient
HistoryClient HistoryClient
MempoolClient
NetworkClient NetworkClient
SignClient SignClient
StatusClient StatusClient
EvidenceClient
MempoolClient
SubscriptionClient
} }
// ABCIClient groups together the functionality that principally affects the // ABCIClient groups together the functionality that principally affects the
@ -119,20 +121,41 @@ type NetworkClient interface {
Health(context.Context) (*coretypes.ResultHealth, error) Health(context.Context) (*coretypes.ResultHealth, error)
} }
// EventsClient is reactive, you can subscribe to any message, given the proper
// string. see tendermint/types/events.go
// EventsClient exposes the methods to retrieve events from the consensus engine.
type EventsClient interface { type EventsClient interface {
// Subscribe subscribes given subscriber to query. Returns a channel with
// cap=1 onto which events are published. An error is returned if it fails to
// subscribe. outCapacity can be used optionally to set capacity for the
// channel. Channel is never closed to prevent accidental reads.
// Events fetches a batch of events from the server matching the given query
// and time range.
Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error)
}
// TODO(creachadair): This interface should be removed once the streaming event
// interface is removed in Tendermint v0.37.
type SubscriptionClient interface {
// Subscribe issues a subscription request for the given subscriber ID and
// query. This method does not block: If subscription fails, it reports an
// error, and if subscription succeeds it returns a channel that delivers
// matching events until the subscription is stopped. The channel is never
// closed; the client is responsible for knowing when no further data will
// be sent.
//
// The context only governs the initial subscription, it does not control
// the lifetime of the channel. To cancel a subscription call Unsubscribe or
// UnsubscribeAll.
// //
// ctx cannot be used to unsubscribe. To unsubscribe, use either Unsubscribe
// or UnsubscribeAll.
// Deprecated: This method will be removed in Tendermint v0.37, use Events
// instead.
Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan coretypes.ResultEvent, err error) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan coretypes.ResultEvent, err error)
// Unsubscribe unsubscribes given subscriber from query. // Unsubscribe unsubscribes given subscriber from query.
//
// Deprecated: This method will be removed in Tendermint v0.37, use Events
// instead.
Unsubscribe(ctx context.Context, subscriber, query string) error Unsubscribe(ctx context.Context, subscriber, query string) error
// UnsubscribeAll unsubscribes given subscriber from all the queries. // UnsubscribeAll unsubscribes given subscriber from all the queries.
//
// Deprecated: This method will be removed in Tendermint v0.37, use Events
// instead.
UnsubscribeAll(ctx context.Context, subscriber string) error UnsubscribeAll(ctx context.Context, subscriber string) error
} }


+ 12
- 0
rpc/client/local/local.go View File

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/internal/pubsub" "github.com/tendermint/tendermint/internal/pubsub"
"github.com/tendermint/tendermint/internal/pubsub/query" "github.com/tendermint/tendermint/internal/pubsub/query"
rpccore "github.com/tendermint/tendermint/internal/rpc/core" rpccore "github.com/tendermint/tendermint/internal/rpc/core"
@ -129,6 +130,17 @@ func (c *Local) ConsensusParams(ctx context.Context, height *int64) (*coretypes.
return c.env.ConsensusParams(ctx, height) return c.env.ConsensusParams(ctx, height)
} }
func (c *Local) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) {
var before, after cursor.Cursor
if err := before.UnmarshalText([]byte(req.Before)); err != nil {
return nil, err
}
if err := after.UnmarshalText([]byte(req.After)); err != nil {
return nil, err
}
return c.env.Events(ctx, req.Filter, req.MaxItems, before, after, req.WaitTime)
}
func (c *Local) Health(ctx context.Context) (*coretypes.ResultHealth, error) { func (c *Local) Health(ctx context.Context) (*coretypes.ResultHealth, error) {
return c.env.Health(ctx) return c.env.Health(ctx)
} }


+ 1
- 0
rpc/test/helpers.go View File

@ -67,6 +67,7 @@ func CreateConfig(t *testing.T, testName string) (*config.Config, error) {
p2pAddr, rpcAddr := makeAddrs() p2pAddr, rpcAddr := makeAddrs()
c.P2P.ListenAddress = p2pAddr c.P2P.ListenAddress = p2pAddr
c.RPC.ListenAddress = rpcAddr c.RPC.ListenAddress = rpcAddr
c.RPC.EventLogWindowSize = 5 * time.Minute
c.Consensus.WalPath = "rpc-test" c.Consensus.WalPath = "rpc-test"
c.RPC.CORSAllowedOrigins = []string{"https://tendermint.com/"} c.RPC.CORSAllowedOrigins = []string{"https://tendermint.com/"}
return c, nil return c, nil


Loading…
Cancel
Save