From f795d3f3609126d4fa2dd0c2161be8c9e455d6db Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Thu, 24 Feb 2022 11:21:40 -0800 Subject: [PATCH] rpc/client: rewrite the WaitForOneEvent helper (#7986) Update usage in tests. --- rpc/client/event_test.go | 9 +++++++- rpc/client/helpers.go | 49 +++++++++++++++++----------------------- rpc/client/rpc_test.go | 5 +++- 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index 2e4b2242a..e59fcc83b 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -2,6 +2,7 @@ package client_test import ( "context" + "fmt" "testing" "time" @@ -51,7 +52,13 @@ func testTxEventsSent(ctx context.Context, t *testing.T, broadcastMethod string, }() // and wait for confirmation - evt, err := client.WaitForOneEvent(ctx, c, types.EventTxValue, waitForEventTimeout) + ectx, cancel := context.WithTimeout(ctx, waitForEventTimeout) + defer cancel() + + // Wait for the transaction we sent to be confirmed. + query := fmt.Sprintf(`tm.event = '%s' AND tx.hash = '%X'`, + types.EventTxValue, types.Tx(tx).Hash()) + evt, err := client.WaitForOneEvent(ectx, c, query) require.NoError(t, err) // and make sure it has the proper info diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 520a95a47..05694afff 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -2,10 +2,11 @@ package client import ( "context" - "errors" "fmt" "time" + "github.com/tendermint/tendermint/internal/jsontypes" + "github.com/tendermint/tendermint/rpc/coretypes" "github.com/tendermint/tendermint/types" ) @@ -52,33 +53,25 @@ func WaitForHeight(ctx context.Context, c StatusClient, h int64, waiter Waiter) return nil } -// WaitForOneEvent subscribes to a websocket event for the given -// event time and returns upon receiving it one time, or -// when the timeout duration has expired. -// -// This handles subscribing and unsubscribing under the hood -func WaitForOneEvent(ctx context.Context, c SubscriptionClient, eventValue string, timeout time.Duration) (types.EventData, error) { - const subscriber = "helpers" - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - // register for the next event of this type - eventCh, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(eventValue).String()) - if err != nil { - return nil, fmt.Errorf("failed to subscribe: %w", err) - } - - // make sure to un-register after the test is over - defer func() { - if deferErr := c.UnsubscribeAll(ctx, subscriber); deferErr != nil { - panic(deferErr) +// WaitForOneEvent waits for the first event matching the given query on c, or +// until ctx ends. It reports an error if ctx ends before a matching event is +// received. +func WaitForOneEvent(ctx context.Context, c EventsClient, query string) (types.EventData, error) { + for { + rsp, err := c.Events(ctx, &coretypes.RequestEvents{ + Filter: &coretypes.EventFilter{Query: query}, + MaxItems: 1, + WaitTime: 10 * time.Second, // duration doesn't matter, limited by ctx timeout + }) + if err != nil { + return nil, err + } else if len(rsp.Items) == 0 { + continue // continue polling until ctx expires } - }() - - select { - case event := <-eventCh: - return event.Data, nil - case <-ctx.Done(): - return nil, errors.New("timed out waiting for event") + var result types.EventData + if err := jsontypes.Unmarshal(rsp.Items[0].Data, &result); err != nil { + return nil, err + } + return result, nil } } diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 14e689405..3ad241380 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -471,7 +471,10 @@ func TestClientMethodCalls(t *testing.T) { }) t.Run("Events", func(t *testing.T) { t.Run("Header", func(t *testing.T) { - evt, err := client.WaitForOneEvent(ctx, c, types.EventNewBlockHeaderValue, waitForEventTimeout) + ctx, cancel := context.WithTimeout(ctx, waitForEventTimeout) + defer cancel() + query := types.QueryForEvent(types.EventNewBlockHeaderValue).String() + evt, err := client.WaitForOneEvent(ctx, c, query) require.NoError(t, err, "%d: %+v", i, err) _, ok := evt.(types.EventDataNewBlockHeader) require.True(t, ok, "%d: %#v", i, evt)