From ce61abc038c794c3b22f210f2fa9e2a50697df92 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Wed, 2 Feb 2022 12:02:04 -0800 Subject: [PATCH] rpc: remove the placeholder RunState type. (#7749) * rpc/client: remove the placeholder RunState type. I added the RunState type in #6971 to disconnect clients from the service plumbing, which they do not need. Now that we have more complete context plumbing, the lifecycle of a client no longer depends on this type: It serves as a carrier for a logger, and a Boolean flag for "running" status, neither of which is used outside of tests. Logging in particular is defaulted to a no-op logger in all production use. Arguably we could just remove the logging calls, since they are never invoked except in tests. To defer the question of whether we should do that or make the logging go somewhere more productive, I've preserved the existing use here. Remove use of the IsRunning method that was provided by the RunState, and use the Start method and context to govern client lifecycle. Remove the one test that exercised "unstarted" clients. I would like to remove that method entirely, but that will require updating the constructors for all the client types to plumb a context and possibly other options. I have deferred that for now. --- light/rpc/client.go | 10 ++-- rpc/client/helpers.go | 68 -------------------------- rpc/client/http/ws.go | 23 ++------- rpc/client/interface.go | 8 +-- rpc/client/mocks/client.go | 14 ------ rpc/client/rpc_test.go | 49 +++++-------------- rpc/jsonrpc/client/integration_test.go | 30 +++++------- rpc/jsonrpc/client/ws_client.go | 33 +++++-------- rpc/jsonrpc/client/ws_client_test.go | 39 +++++---------- rpc/jsonrpc/server/ws_handler.go | 11 +---- 10 files changed, 59 insertions(+), 226 deletions(-) diff --git a/light/rpc/client.go b/light/rpc/client.go index ba7a2c8fa..cfbdaa409 100644 --- a/light/rpc/client.go +++ b/light/rpc/client.go @@ -105,12 +105,12 @@ func NewClient(logger log.Logger, next rpcclient.Client, lc LightClient, opts .. } func (c *Client) OnStart(ctx context.Context) error { - if !c.next.IsRunning() { - nctx, ncancel := context.WithCancel(ctx) - c.closers = append(c.closers, ncancel) - return c.next.Start(nctx) + nctx, ncancel := context.WithCancel(ctx) + if err := c.next.Start(nctx); err != nil { + ncancel() + return err } - + c.closers = append(c.closers, ncancel) go func() { defer close(c.quitCh) c.Wait() diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 529537082..b9ad05aac 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -4,10 +4,8 @@ import ( "context" "errors" "fmt" - "sync" "time" - "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/types" ) @@ -84,69 +82,3 @@ func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) ( return nil, errors.New("timed out waiting for event") } } - -var ( - // ErrClientRunning is returned by Start when the client is already running. - ErrClientRunning = errors.New("client already running") - - // ErrClientNotRunning is returned by Stop when the client is not running. - ErrClientNotRunning = errors.New("client is not running") -) - -// RunState is a helper that a client implementation can embed to implement -// common plumbing for keeping track of run state and logging. -// -// TODO(creachadair): This type is a temporary measure, and will be removed. -// See the discussion on #6971. -type RunState struct { - Logger log.Logger - - mu sync.Mutex - name string - isRunning bool -} - -// NewRunState returns a new unstarted run state tracker with the given logging -// label and log sink. If logger == nil, a no-op logger is provided by default. -func NewRunState(name string, logger log.Logger) *RunState { - if logger == nil { - logger = log.NewNopLogger() - } - return &RunState{ - name: name, - Logger: logger, - } -} - -// Start sets the state to running, or reports an error. -func (r *RunState) Start(context.Context) error { - r.mu.Lock() - defer r.mu.Unlock() - if r.isRunning { - r.Logger.Error("not starting client, it is already started", "client", r.name) - return ErrClientRunning - } - r.Logger.Info("starting client", "client", r.name) - r.isRunning = true - return nil -} - -// Stop sets the state to not running, or reports an error. -func (r *RunState) Stop() error { - r.mu.Lock() - defer r.mu.Unlock() - if !r.isRunning { - r.Logger.Error("not stopping client; it is already stopped", "client", r.name) - return ErrClientNotRunning - } - r.Logger.Info("stopping client", "client", r.name) - r.isRunning = false - return nil -} - -// IsRunning reports whether the state is running. -func (r *RunState) IsRunning() bool { - r.mu.Lock() - defer r.mu.Unlock() - return r.isRunning -} diff --git a/rpc/client/http/ws.go b/rpc/client/http/ws.go index c36ad9fc6..43a70b74e 100644 --- a/rpc/client/http/ws.go +++ b/rpc/client/http/ws.go @@ -9,6 +9,7 @@ import ( "time" "github.com/tendermint/tendermint/internal/pubsub" + "github.com/tendermint/tendermint/libs/log" rpcclient "github.com/tendermint/tendermint/rpc/client" "github.com/tendermint/tendermint/rpc/coretypes" jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" @@ -16,8 +17,8 @@ import ( // wsEvents is a wrapper around WSClient, which implements EventsClient. type wsEvents struct { - *rpcclient.RunState - ws *jsonrpcclient.WSClient + Logger log.Logger + ws *jsonrpcclient.WSClient mtx sync.RWMutex subscriptions map[string]*wsSubscription @@ -33,7 +34,7 @@ var _ rpcclient.EventsClient = (*wsEvents)(nil) func newWsEvents(remote string) (*wsEvents, error) { w := &wsEvents{ - RunState: rpcclient.NewRunState("wsEvents", nil), + Logger: log.NewNopLogger(), subscriptions: make(map[string]*wsSubscription), } @@ -60,9 +61,6 @@ func (w *wsEvents) Start(ctx context.Context) error { return nil } -// IsRunning reports whether the websocket client is running. -func (w *wsEvents) IsRunning() bool { return w.ws.IsRunning() } - // Stop shuts down the websocket client. func (w *wsEvents) Stop() error { return w.ws.Stop() } @@ -80,11 +78,6 @@ func (w *wsEvents) Stop() error { return w.ws.Stop() } // It returns an error if wsEvents is not running. func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan coretypes.ResultEvent, err error) { - - if !w.IsRunning() { - return nil, rpcclient.ErrClientNotRunning - } - if err := w.ws.Subscribe(ctx, query); err != nil { return nil, err } @@ -109,10 +102,6 @@ func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string, // // It returns an error if wsEvents is not running. func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error { - if !w.IsRunning() { - return rpcclient.ErrClientNotRunning - } - if err := w.ws.Unsubscribe(ctx, query); err != nil { return err } @@ -135,10 +124,6 @@ func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) er // // It returns an error if wsEvents is not running. func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error { - if !w.IsRunning() { - return rpcclient.ErrClientNotRunning - } - if err := w.ws.UnsubscribeAll(ctx); err != nil { return err } diff --git a/rpc/client/interface.go b/rpc/client/interface.go index d5bbaec1b..b53079852 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -32,14 +32,10 @@ import ( // Client describes the interface of Tendermint RPC client implementations. type Client interface { - // These methods define the operational structure of the client. - - // Start the client. Start must report an error if the client is running. + // Start the client, which will run until the context terminates. + // An error from Start indicates the client could not start. Start(context.Context) error - // IsRunning reports whether the client is running. - IsRunning() bool - // These embedded interfaces define the callable methods of the service. ABCIClient EventsClient diff --git a/rpc/client/mocks/client.go b/rpc/client/mocks/client.go index a74b6438f..ffa1d1f29 100644 --- a/rpc/client/mocks/client.go +++ b/rpc/client/mocks/client.go @@ -526,20 +526,6 @@ func (_m *Client) Health(_a0 context.Context) (*coretypes.ResultHealth, error) { return r0, r1 } -// IsRunning provides a mock function with given fields: -func (_m *Client) IsRunning() bool { - ret := _m.Called() - - var r0 bool - if rf, ok := ret.Get(0).(func() bool); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - // NetInfo provides a mock function with given fields: _a0 func (_m *Client) NetInfo(_a0 context.Context) (*coretypes.ResultNetInfo, error) { ret := _m.Called(_a0) diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index ee4cb467c..883716c19 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -40,12 +40,13 @@ func getHTTPClient(t *testing.T, logger log.Logger, conf *config.Config) *rpchtt rpcAddr := conf.RPC.ListenAddress c, err := rpchttp.NewWithClient(rpcAddr, http.DefaultClient) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + require.NoError(t, c.Start(ctx)) c.Logger = logger t.Cleanup(func() { - if c.IsRunning() { - require.NoError(t, c.Stop()) - } + cancel() + require.NoError(t, c.Stop()) }) return c @@ -56,16 +57,16 @@ func getHTTPClientWithTimeout(t *testing.T, logger log.Logger, conf *config.Conf rpcAddr := conf.RPC.ListenAddress - http.DefaultClient.Timeout = timeout - c, err := rpchttp.NewWithClient(rpcAddr, http.DefaultClient) + tclient := &http.Client{Timeout: timeout} + c, err := rpchttp.NewWithClient(rpcAddr, tclient) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + require.NoError(t, c.Start(ctx)) c.Logger = logger t.Cleanup(func() { - http.DefaultClient.Timeout = 0 - if c.IsRunning() { - require.NoError(t, c.Stop()) - } + cancel() + require.NoError(t, c.Stop()) }) return c @@ -184,25 +185,6 @@ func TestClientOperations(t *testing.T) { wg.Wait() }) }) - t.Run("HTTPReturnsErrorIfClientIsNotRunning", func(t *testing.T) { - logger := log.NewTestingLogger(t) - - c := getHTTPClientWithTimeout(t, logger, conf, 100*time.Millisecond) - - // on Subscribe - _, err := c.Subscribe(ctx, "TestHeaderEvents", - types.QueryForEvent(types.EventNewBlockHeaderValue).String()) - assert.Error(t, err) - - // on Unsubscribe - err = c.Unsubscribe(ctx, "TestHeaderEvents", - types.QueryForEvent(types.EventNewBlockHeaderValue).String()) - assert.Error(t, err) - - // on UnsubscribeAll - err = c.UnsubscribeAll(ctx, "TestHeaderEvents") - assert.Error(t, err) - }) } // Make sure info is correct (we connect properly) @@ -488,15 +470,6 @@ func TestClientMethodCalls(t *testing.T) { assert.Equal(t, 0, pool.Size(), "mempool must be empty") }) t.Run("Events", func(t *testing.T) { - // start for this test it if it wasn't already running - if !c.IsRunning() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // if so, then we start it, listen, and stop it. - err := c.Start(ctx) - require.NoError(t, err) - } t.Run("Header", func(t *testing.T) { evt, err := client.WaitForOneEvent(c, types.EventNewBlockHeaderValue, waitForEventTimeout) require.NoError(t, err, "%d: %+v", i, err) @@ -538,7 +511,7 @@ func TestClientMethodCalls(t *testing.T) { }) }) t.Run("Evidence", func(t *testing.T) { - t.Run("BraodcastDuplicateVote", func(t *testing.T) { + t.Run("BroadcastDuplicateVote", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/rpc/jsonrpc/client/integration_test.go b/rpc/jsonrpc/client/integration_test.go index 961b9a619..f53b28802 100644 --- a/rpc/jsonrpc/client/integration_test.go +++ b/rpc/jsonrpc/client/integration_test.go @@ -19,24 +19,26 @@ import ( ) func TestWSClientReconnectWithJitter(t *testing.T) { - n := 8 - var maxReconnectAttempts uint = 3 - // Max wait time is ceil(1+0.999) + ceil(2+0.999) + ceil(4+0.999) + ceil(...) = 2 + 3 + 5 = 10s + ... - maxSleepTime := time.Second * time.Duration(((1<