From 211b80a4845c4e143f9d6720a072cfcda52a704b Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Thu, 24 Feb 2022 06:51:14 -0800 Subject: [PATCH] rpc/client: add Events method to the client interface (#7982) - Update documentation to deprecate the old methods. - Add Events methods to HTTP, WS, and Local clients. - Add Events method to the light client wrapper. - Rename legacy events client to SubscriptionClient. --- CHANGELOG_PENDING.md | 1 + light/proxy/routes.go | 13 +++++++----- light/rpc/client.go | 16 +++++++++------ rpc/client/helpers.go | 2 +- rpc/client/http/http.go | 8 ++++++++ rpc/client/http/ws.go | 14 ++++++------- rpc/client/interface.go | 43 ++++++++++++++++++++++++++++++--------- rpc/client/local/local.go | 12 +++++++++++ rpc/test/helpers.go | 1 + 9 files changed, 81 insertions(+), 29 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index a7d4ab676..2fa954519 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -18,6 +18,7 @@ Special thanks to external contributors on this release: - [rpc] \#7575 Rework how RPC responses are written back via HTTP. (@creachadair) - [rpc] \#7713 Remove unused options for websocket clients. (@creachadair) - [config] \#7930 Add new event subscription options and defaults. (@creachadair) + - [rpc] \#7982 Add new Events interface and deprecate Subscribe. (@creachadair) - Apps diff --git a/light/proxy/routes.go b/light/proxy/routes.go index bf1ecc5a9..5dd934ed1 100644 --- a/light/proxy/routes.go +++ b/light/proxy/routes.go @@ -2,7 +2,6 @@ package proxy import ( "context" - "errors" "time" "github.com/tendermint/tendermint/internal/eventlog/cursor" @@ -30,15 +29,19 @@ func (p proxyService) GetConsensusState(ctx context.Context) (*coretypes.ResultC return p.ConsensusState(ctx) } -// TODO(creachadair): Remove this once the RPC clients support the new method. -// This is just a placeholder to let things build during development. -func (proxyService) Events(ctx context.Context, +func (p proxyService) Events(ctx context.Context, filter *coretypes.EventFilter, maxItems int, before, after cursor.Cursor, waitTime time.Duration, ) (*coretypes.ResultEvents, error) { - return nil, errors.New("the /events method is not implemented") + return p.Client.Events(ctx, &coretypes.RequestEvents{ + Filter: filter, + MaxItems: maxItems, + Before: before.String(), + After: after.String(), + WaitTime: waitTime, + }) } func (p proxyService) Subscribe(ctx context.Context, query string) (*coretypes.ResultSubscribe, error) { diff --git a/light/rpc/client.go b/light/rpc/client.go index f7f741843..001e1c7f6 100644 --- a/light/rpc/client.go +++ b/light/rpc/client.go @@ -292,6 +292,10 @@ func (c *Client) ConsensusParams(ctx context.Context, height *int64) (*coretypes return res, nil } +func (c *Client) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) { + return c.next.Events(ctx, req) +} + func (c *Client) Health(ctx context.Context) (*coretypes.ResultHealth, error) { return c.next.Health(ctx) } @@ -597,15 +601,15 @@ func (c *Client) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*cor func (c *Client) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan coretypes.ResultEvent, err error) { - return c.next.Subscribe(ctx, subscriber, query, outCapacity...) + return c.next.Subscribe(ctx, subscriber, query, outCapacity...) //nolint:staticcheck } func (c *Client) Unsubscribe(ctx context.Context, subscriber, query string) error { - return c.next.Unsubscribe(ctx, subscriber, query) + return c.next.Unsubscribe(ctx, subscriber, query) //nolint:staticcheck } func (c *Client) UnsubscribeAll(ctx context.Context, subscriber string) error { - return c.next.UnsubscribeAll(ctx, subscriber) + return c.next.UnsubscribeAll(ctx, subscriber) //nolint:staticcheck } func (c *Client) updateLightClientIfNeededTo(ctx context.Context, height *int64) (*types.LightBlock, error) { @@ -636,7 +640,7 @@ func (c *Client) SubscribeWS(ctx context.Context, query string) (*coretypes.Resu c.closers = append(c.closers, bcancel) callInfo := rpctypes.GetCallInfo(ctx) - out, err := c.next.Subscribe(bctx, callInfo.RemoteAddr(), query) + out, err := c.next.Subscribe(bctx, callInfo.RemoteAddr(), query) //nolint:staticcheck if err != nil { return nil, err } @@ -660,7 +664,7 @@ func (c *Client) SubscribeWS(ctx context.Context, query string) (*coretypes.Resu // UnsubscribeWS calls original client's Unsubscribe using remote address as a // subscriber. func (c *Client) UnsubscribeWS(ctx context.Context, query string) (*coretypes.ResultUnsubscribe, error) { - err := c.next.Unsubscribe(context.Background(), rpctypes.GetCallInfo(ctx).RemoteAddr(), query) + err := c.next.Unsubscribe(context.Background(), rpctypes.GetCallInfo(ctx).RemoteAddr(), query) //nolint:staticcheck if err != nil { return nil, err } @@ -670,7 +674,7 @@ func (c *Client) UnsubscribeWS(ctx context.Context, query string) (*coretypes.Re // UnsubscribeAllWS calls original client's UnsubscribeAll using remote address // as a subscriber. func (c *Client) UnsubscribeAllWS(ctx context.Context) (*coretypes.ResultUnsubscribe, error) { - err := c.next.UnsubscribeAll(context.Background(), rpctypes.GetCallInfo(ctx).RemoteAddr()) + err := c.next.UnsubscribeAll(context.Background(), rpctypes.GetCallInfo(ctx).RemoteAddr()) //nolint:staticcheck if err != nil { return nil, err } diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index f771d6156..520a95a47 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -57,7 +57,7 @@ func WaitForHeight(ctx context.Context, c StatusClient, h int64, waiter Waiter) // when the timeout duration has expired. // // This handles subscribing and unsubscribing under the hood -func WaitForOneEvent(ctx context.Context, c EventsClient, eventValue string, timeout time.Duration) (types.EventData, error) { +func WaitForOneEvent(ctx context.Context, c SubscriptionClient, eventValue string, timeout time.Duration) (types.EventData, error) { const subscriber = "helpers" ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index ebdc18eb2..2a9507f8e 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -340,6 +340,14 @@ func (c *baseRPCClient) ConsensusParams( return result, nil } +func (c *baseRPCClient) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) { + result := new(coretypes.ResultEvents) + if err := c.caller.Call(ctx, "events", req, result); err != nil { + return nil, err + } + return result, nil +} + func (c *baseRPCClient) Health(ctx context.Context) (*coretypes.ResultHealth, error) { result := new(coretypes.ResultHealth) if err := c.caller.Call(ctx, "health", nil, result); err != nil { diff --git a/rpc/client/http/ws.go b/rpc/client/http/ws.go index 62a8ccf97..2f188a24d 100644 --- a/rpc/client/http/ws.go +++ b/rpc/client/http/ws.go @@ -15,7 +15,7 @@ import ( jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" ) -// wsEvents is a wrapper around WSClient, which implements EventsClient. +// wsEvents is a wrapper around WSClient, which implements SubscriptionClient. type wsEvents struct { Logger log.Logger ws *jsonrpcclient.WSClient @@ -30,7 +30,7 @@ type wsSubscription struct { query string } -var _ rpcclient.EventsClient = (*wsEvents)(nil) +var _ rpcclient.SubscriptionClient = (*wsEvents)(nil) func newWsEvents(remote string) (*wsEvents, error) { w := &wsEvents{ @@ -64,7 +64,7 @@ func (w *wsEvents) Start(ctx context.Context) error { // Stop shuts down the websocket client. func (w *wsEvents) Stop() error { return w.ws.Stop() } -// Subscribe implements EventsClient by using WSClient to subscribe given +// Subscribe implements SubscriptionClient by using WSClient to subscribe given // subscriber to query. By default, it returns a channel with cap=1. Error is // returned if it fails to subscribe. // @@ -97,8 +97,8 @@ func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string, return outc, nil } -// Unsubscribe implements EventsClient by using WSClient to unsubscribe given -// subscriber from query. +// Unsubscribe implements SubscriptionClient by using WSClient to unsubscribe +// given subscriber from query. // // It returns an error if wsEvents is not running. func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error { @@ -119,8 +119,8 @@ func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) er return nil } -// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe -// given subscriber from all the queries. +// UnsubscribeAll implements SubscriptionClient by using WSClient to +// unsubscribe given subscriber from all the queries. // // It returns an error if wsEvents is not running. func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error { diff --git a/rpc/client/interface.go b/rpc/client/interface.go index b53079852..acc54889f 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -37,14 +37,16 @@ type Client interface { Start(context.Context) error // These embedded interfaces define the callable methods of the service. + ABCIClient EventsClient + EvidenceClient HistoryClient + MempoolClient NetworkClient SignClient StatusClient - EvidenceClient - MempoolClient + SubscriptionClient } // ABCIClient groups together the functionality that principally affects the @@ -119,20 +121,41 @@ type NetworkClient interface { Health(context.Context) (*coretypes.ResultHealth, error) } -// EventsClient is reactive, you can subscribe to any message, given the proper -// string. see tendermint/types/events.go +// EventsClient exposes the methods to retrieve events from the consensus engine. type EventsClient interface { - // Subscribe subscribes given subscriber to query. Returns a channel with - // cap=1 onto which events are published. An error is returned if it fails to - // subscribe. outCapacity can be used optionally to set capacity for the - // channel. Channel is never closed to prevent accidental reads. + // Events fetches a batch of events from the server matching the given query + // and time range. + Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) +} + +// TODO(creachadair): This interface should be removed once the streaming event +// interface is removed in Tendermint v0.37. +type SubscriptionClient interface { + // Subscribe issues a subscription request for the given subscriber ID and + // query. This method does not block: If subscription fails, it reports an + // error, and if subscription succeeds it returns a channel that delivers + // matching events until the subscription is stopped. The channel is never + // closed; the client is responsible for knowing when no further data will + // be sent. + // + // The context only governs the initial subscription, it does not control + // the lifetime of the channel. To cancel a subscription call Unsubscribe or + // UnsubscribeAll. // - // ctx cannot be used to unsubscribe. To unsubscribe, use either Unsubscribe - // or UnsubscribeAll. + // Deprecated: This method will be removed in Tendermint v0.37, use Events + // instead. Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan coretypes.ResultEvent, err error) + // Unsubscribe unsubscribes given subscriber from query. + // + // Deprecated: This method will be removed in Tendermint v0.37, use Events + // instead. Unsubscribe(ctx context.Context, subscriber, query string) error + // UnsubscribeAll unsubscribes given subscriber from all the queries. + // + // Deprecated: This method will be removed in Tendermint v0.37, use Events + // instead. UnsubscribeAll(ctx context.Context, subscriber string) error } diff --git a/rpc/client/local/local.go b/rpc/client/local/local.go index 8b2a88314..b9091efac 100644 --- a/rpc/client/local/local.go +++ b/rpc/client/local/local.go @@ -7,6 +7,7 @@ import ( "time" "github.com/tendermint/tendermint/internal/eventbus" + "github.com/tendermint/tendermint/internal/eventlog/cursor" "github.com/tendermint/tendermint/internal/pubsub" "github.com/tendermint/tendermint/internal/pubsub/query" rpccore "github.com/tendermint/tendermint/internal/rpc/core" @@ -129,6 +130,17 @@ func (c *Local) ConsensusParams(ctx context.Context, height *int64) (*coretypes. return c.env.ConsensusParams(ctx, height) } +func (c *Local) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) { + var before, after cursor.Cursor + if err := before.UnmarshalText([]byte(req.Before)); err != nil { + return nil, err + } + if err := after.UnmarshalText([]byte(req.After)); err != nil { + return nil, err + } + return c.env.Events(ctx, req.Filter, req.MaxItems, before, after, req.WaitTime) +} + func (c *Local) Health(ctx context.Context) (*coretypes.ResultHealth, error) { return c.env.Health(ctx) } diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 307fc3855..19deac607 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -67,6 +67,7 @@ func CreateConfig(t *testing.T, testName string) (*config.Config, error) { p2pAddr, rpcAddr := makeAddrs() c.P2P.ListenAddress = p2pAddr c.RPC.ListenAddress = rpcAddr + c.RPC.EventLogWindowSize = 5 * time.Minute c.Consensus.WalPath = "rpc-test" c.RPC.CORSAllowedOrigins = []string{"https://tendermint.com/"} return c, nil