diff --git a/light/rpc/client.go b/light/rpc/client.go index ba7a2c8fa..cfbdaa409 100644 --- a/light/rpc/client.go +++ b/light/rpc/client.go @@ -105,12 +105,12 @@ func NewClient(logger log.Logger, next rpcclient.Client, lc LightClient, opts .. } func (c *Client) OnStart(ctx context.Context) error { - if !c.next.IsRunning() { - nctx, ncancel := context.WithCancel(ctx) - c.closers = append(c.closers, ncancel) - return c.next.Start(nctx) + nctx, ncancel := context.WithCancel(ctx) + if err := c.next.Start(nctx); err != nil { + ncancel() + return err } - + c.closers = append(c.closers, ncancel) go func() { defer close(c.quitCh) c.Wait() diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 529537082..b9ad05aac 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -4,10 +4,8 @@ import ( "context" "errors" "fmt" - "sync" "time" - "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/types" ) @@ -84,69 +82,3 @@ func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) ( return nil, errors.New("timed out waiting for event") } } - -var ( - // ErrClientRunning is returned by Start when the client is already running. - ErrClientRunning = errors.New("client already running") - - // ErrClientNotRunning is returned by Stop when the client is not running. - ErrClientNotRunning = errors.New("client is not running") -) - -// RunState is a helper that a client implementation can embed to implement -// common plumbing for keeping track of run state and logging. -// -// TODO(creachadair): This type is a temporary measure, and will be removed. -// See the discussion on #6971. -type RunState struct { - Logger log.Logger - - mu sync.Mutex - name string - isRunning bool -} - -// NewRunState returns a new unstarted run state tracker with the given logging -// label and log sink. If logger == nil, a no-op logger is provided by default. -func NewRunState(name string, logger log.Logger) *RunState { - if logger == nil { - logger = log.NewNopLogger() - } - return &RunState{ - name: name, - Logger: logger, - } -} - -// Start sets the state to running, or reports an error. -func (r *RunState) Start(context.Context) error { - r.mu.Lock() - defer r.mu.Unlock() - if r.isRunning { - r.Logger.Error("not starting client, it is already started", "client", r.name) - return ErrClientRunning - } - r.Logger.Info("starting client", "client", r.name) - r.isRunning = true - return nil -} - -// Stop sets the state to not running, or reports an error. -func (r *RunState) Stop() error { - r.mu.Lock() - defer r.mu.Unlock() - if !r.isRunning { - r.Logger.Error("not stopping client; it is already stopped", "client", r.name) - return ErrClientNotRunning - } - r.Logger.Info("stopping client", "client", r.name) - r.isRunning = false - return nil -} - -// IsRunning reports whether the state is running. -func (r *RunState) IsRunning() bool { - r.mu.Lock() - defer r.mu.Unlock() - return r.isRunning -} diff --git a/rpc/client/http/ws.go b/rpc/client/http/ws.go index c36ad9fc6..43a70b74e 100644 --- a/rpc/client/http/ws.go +++ b/rpc/client/http/ws.go @@ -9,6 +9,7 @@ import ( "time" "github.com/tendermint/tendermint/internal/pubsub" + "github.com/tendermint/tendermint/libs/log" rpcclient "github.com/tendermint/tendermint/rpc/client" "github.com/tendermint/tendermint/rpc/coretypes" jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" @@ -16,8 +17,8 @@ import ( // wsEvents is a wrapper around WSClient, which implements EventsClient. type wsEvents struct { - *rpcclient.RunState - ws *jsonrpcclient.WSClient + Logger log.Logger + ws *jsonrpcclient.WSClient mtx sync.RWMutex subscriptions map[string]*wsSubscription @@ -33,7 +34,7 @@ var _ rpcclient.EventsClient = (*wsEvents)(nil) func newWsEvents(remote string) (*wsEvents, error) { w := &wsEvents{ - RunState: rpcclient.NewRunState("wsEvents", nil), + Logger: log.NewNopLogger(), subscriptions: make(map[string]*wsSubscription), } @@ -60,9 +61,6 @@ func (w *wsEvents) Start(ctx context.Context) error { return nil } -// IsRunning reports whether the websocket client is running. -func (w *wsEvents) IsRunning() bool { return w.ws.IsRunning() } - // Stop shuts down the websocket client. func (w *wsEvents) Stop() error { return w.ws.Stop() } @@ -80,11 +78,6 @@ func (w *wsEvents) Stop() error { return w.ws.Stop() } // It returns an error if wsEvents is not running. func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan coretypes.ResultEvent, err error) { - - if !w.IsRunning() { - return nil, rpcclient.ErrClientNotRunning - } - if err := w.ws.Subscribe(ctx, query); err != nil { return nil, err } @@ -109,10 +102,6 @@ func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string, // // It returns an error if wsEvents is not running. func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error { - if !w.IsRunning() { - return rpcclient.ErrClientNotRunning - } - if err := w.ws.Unsubscribe(ctx, query); err != nil { return err } @@ -135,10 +124,6 @@ func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) er // // It returns an error if wsEvents is not running. func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error { - if !w.IsRunning() { - return rpcclient.ErrClientNotRunning - } - if err := w.ws.UnsubscribeAll(ctx); err != nil { return err } diff --git a/rpc/client/interface.go b/rpc/client/interface.go index d5bbaec1b..b53079852 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -32,14 +32,10 @@ import ( // Client describes the interface of Tendermint RPC client implementations. type Client interface { - // These methods define the operational structure of the client. - - // Start the client. Start must report an error if the client is running. + // Start the client, which will run until the context terminates. + // An error from Start indicates the client could not start. Start(context.Context) error - // IsRunning reports whether the client is running. - IsRunning() bool - // These embedded interfaces define the callable methods of the service. ABCIClient EventsClient diff --git a/rpc/client/mocks/client.go b/rpc/client/mocks/client.go index a74b6438f..ffa1d1f29 100644 --- a/rpc/client/mocks/client.go +++ b/rpc/client/mocks/client.go @@ -526,20 +526,6 @@ func (_m *Client) Health(_a0 context.Context) (*coretypes.ResultHealth, error) { return r0, r1 } -// IsRunning provides a mock function with given fields: -func (_m *Client) IsRunning() bool { - ret := _m.Called() - - var r0 bool - if rf, ok := ret.Get(0).(func() bool); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - // NetInfo provides a mock function with given fields: _a0 func (_m *Client) NetInfo(_a0 context.Context) (*coretypes.ResultNetInfo, error) { ret := _m.Called(_a0) diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index ee4cb467c..883716c19 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -40,12 +40,13 @@ func getHTTPClient(t *testing.T, logger log.Logger, conf *config.Config) *rpchtt rpcAddr := conf.RPC.ListenAddress c, err := rpchttp.NewWithClient(rpcAddr, http.DefaultClient) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + require.NoError(t, c.Start(ctx)) c.Logger = logger t.Cleanup(func() { - if c.IsRunning() { - require.NoError(t, c.Stop()) - } + cancel() + require.NoError(t, c.Stop()) }) return c @@ -56,16 +57,16 @@ func getHTTPClientWithTimeout(t *testing.T, logger log.Logger, conf *config.Conf rpcAddr := conf.RPC.ListenAddress - http.DefaultClient.Timeout = timeout - c, err := rpchttp.NewWithClient(rpcAddr, http.DefaultClient) + tclient := &http.Client{Timeout: timeout} + c, err := rpchttp.NewWithClient(rpcAddr, tclient) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + require.NoError(t, c.Start(ctx)) c.Logger = logger t.Cleanup(func() { - http.DefaultClient.Timeout = 0 - if c.IsRunning() { - require.NoError(t, c.Stop()) - } + cancel() + require.NoError(t, c.Stop()) }) return c @@ -184,25 +185,6 @@ func TestClientOperations(t *testing.T) { wg.Wait() }) }) - t.Run("HTTPReturnsErrorIfClientIsNotRunning", func(t *testing.T) { - logger := log.NewTestingLogger(t) - - c := getHTTPClientWithTimeout(t, logger, conf, 100*time.Millisecond) - - // on Subscribe - _, err := c.Subscribe(ctx, "TestHeaderEvents", - types.QueryForEvent(types.EventNewBlockHeaderValue).String()) - assert.Error(t, err) - - // on Unsubscribe - err = c.Unsubscribe(ctx, "TestHeaderEvents", - types.QueryForEvent(types.EventNewBlockHeaderValue).String()) - assert.Error(t, err) - - // on UnsubscribeAll - err = c.UnsubscribeAll(ctx, "TestHeaderEvents") - assert.Error(t, err) - }) } // Make sure info is correct (we connect properly) @@ -488,15 +470,6 @@ func TestClientMethodCalls(t *testing.T) { assert.Equal(t, 0, pool.Size(), "mempool must be empty") }) t.Run("Events", func(t *testing.T) { - // start for this test it if it wasn't already running - if !c.IsRunning() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // if so, then we start it, listen, and stop it. - err := c.Start(ctx) - require.NoError(t, err) - } t.Run("Header", func(t *testing.T) { evt, err := client.WaitForOneEvent(c, types.EventNewBlockHeaderValue, waitForEventTimeout) require.NoError(t, err, "%d: %+v", i, err) @@ -538,7 +511,7 @@ func TestClientMethodCalls(t *testing.T) { }) }) t.Run("Evidence", func(t *testing.T) { - t.Run("BraodcastDuplicateVote", func(t *testing.T) { + t.Run("BroadcastDuplicateVote", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/rpc/jsonrpc/client/integration_test.go b/rpc/jsonrpc/client/integration_test.go index 961b9a619..f53b28802 100644 --- a/rpc/jsonrpc/client/integration_test.go +++ b/rpc/jsonrpc/client/integration_test.go @@ -19,24 +19,26 @@ import ( ) func TestWSClientReconnectWithJitter(t *testing.T) { - n := 8 - var maxReconnectAttempts uint = 3 - // Max wait time is ceil(1+0.999) + ceil(2+0.999) + ceil(4+0.999) + ceil(...) = 2 + 3 + 5 = 10s + ... - maxSleepTime := time.Second * time.Duration(((1<