Browse Source

rpc: remove the placeholder RunState type. (#7749)

* rpc/client: remove the placeholder RunState type.

I added the RunState type in #6971 to disconnect clients from the service
plumbing, which they do not need. Now that we have more complete context
plumbing, the lifecycle of a client no longer depends on this type: It serves
as a carrier for a logger, and a Boolean flag for "running" status, neither of
which is used outside of tests.

Logging in particular is defaulted to a no-op logger in all production use.
Arguably we could just remove the logging calls, since they are never invoked
except in tests. To defer the question of whether we should do that or make the
logging go somewhere more productive, I've preserved the existing use here.

Remove use of the IsRunning method that was provided by the RunState, and use
the Start method and context to govern client lifecycle.

Remove the one test that exercised "unstarted" clients. I would like to remove
that method entirely, but that will require updating the constructors for all
the client types to plumb a context and possibly other options. I have deferred
that for now.
pull/7769/head
M. J. Fromberger 2 years ago
committed by GitHub
parent
commit
ce61abc038
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 59 additions and 226 deletions
  1. +5
    -5
      light/rpc/client.go
  2. +0
    -68
      rpc/client/helpers.go
  3. +4
    -19
      rpc/client/http/ws.go
  4. +2
    -6
      rpc/client/interface.go
  5. +0
    -14
      rpc/client/mocks/client.go
  6. +11
    -38
      rpc/client/rpc_test.go
  7. +11
    -19
      rpc/jsonrpc/client/integration_test.go
  8. +12
    -21
      rpc/jsonrpc/client/ws_client.go
  9. +12
    -27
      rpc/jsonrpc/client/ws_client_test.go
  10. +2
    -9
      rpc/jsonrpc/server/ws_handler.go

+ 5
- 5
light/rpc/client.go View File

@ -105,12 +105,12 @@ func NewClient(logger log.Logger, next rpcclient.Client, lc LightClient, opts ..
}
func (c *Client) OnStart(ctx context.Context) error {
if !c.next.IsRunning() {
nctx, ncancel := context.WithCancel(ctx)
c.closers = append(c.closers, ncancel)
return c.next.Start(nctx)
nctx, ncancel := context.WithCancel(ctx)
if err := c.next.Start(nctx); err != nil {
ncancel()
return err
}
c.closers = append(c.closers, ncancel)
go func() {
defer close(c.quitCh)
c.Wait()


+ 0
- 68
rpc/client/helpers.go View File

@ -4,10 +4,8 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
@ -84,69 +82,3 @@ func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) (
return nil, errors.New("timed out waiting for event")
}
}
var (
// ErrClientRunning is returned by Start when the client is already running.
ErrClientRunning = errors.New("client already running")
// ErrClientNotRunning is returned by Stop when the client is not running.
ErrClientNotRunning = errors.New("client is not running")
)
// RunState is a helper that a client implementation can embed to implement
// common plumbing for keeping track of run state and logging.
//
// TODO(creachadair): This type is a temporary measure, and will be removed.
// See the discussion on #6971.
type RunState struct {
Logger log.Logger
mu sync.Mutex
name string
isRunning bool
}
// NewRunState returns a new unstarted run state tracker with the given logging
// label and log sink. If logger == nil, a no-op logger is provided by default.
func NewRunState(name string, logger log.Logger) *RunState {
if logger == nil {
logger = log.NewNopLogger()
}
return &RunState{
name: name,
Logger: logger,
}
}
// Start sets the state to running, or reports an error.
func (r *RunState) Start(context.Context) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.isRunning {
r.Logger.Error("not starting client, it is already started", "client", r.name)
return ErrClientRunning
}
r.Logger.Info("starting client", "client", r.name)
r.isRunning = true
return nil
}
// Stop sets the state to not running, or reports an error.
func (r *RunState) Stop() error {
r.mu.Lock()
defer r.mu.Unlock()
if !r.isRunning {
r.Logger.Error("not stopping client; it is already stopped", "client", r.name)
return ErrClientNotRunning
}
r.Logger.Info("stopping client", "client", r.name)
r.isRunning = false
return nil
}
// IsRunning reports whether the state is running.
func (r *RunState) IsRunning() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.isRunning
}

+ 4
- 19
rpc/client/http/ws.go View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/tendermint/tendermint/internal/pubsub"
"github.com/tendermint/tendermint/libs/log"
rpcclient "github.com/tendermint/tendermint/rpc/client"
"github.com/tendermint/tendermint/rpc/coretypes"
jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client"
@ -16,8 +17,8 @@ import (
// wsEvents is a wrapper around WSClient, which implements EventsClient.
type wsEvents struct {
*rpcclient.RunState
ws *jsonrpcclient.WSClient
Logger log.Logger
ws *jsonrpcclient.WSClient
mtx sync.RWMutex
subscriptions map[string]*wsSubscription
@ -33,7 +34,7 @@ var _ rpcclient.EventsClient = (*wsEvents)(nil)
func newWsEvents(remote string) (*wsEvents, error) {
w := &wsEvents{
RunState: rpcclient.NewRunState("wsEvents", nil),
Logger: log.NewNopLogger(),
subscriptions: make(map[string]*wsSubscription),
}
@ -60,9 +61,6 @@ func (w *wsEvents) Start(ctx context.Context) error {
return nil
}
// IsRunning reports whether the websocket client is running.
func (w *wsEvents) IsRunning() bool { return w.ws.IsRunning() }
// Stop shuts down the websocket client.
func (w *wsEvents) Stop() error { return w.ws.Stop() }
@ -80,11 +78,6 @@ func (w *wsEvents) Stop() error { return w.ws.Stop() }
// It returns an error if wsEvents is not running.
func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string,
outCapacity ...int) (out <-chan coretypes.ResultEvent, err error) {
if !w.IsRunning() {
return nil, rpcclient.ErrClientNotRunning
}
if err := w.ws.Subscribe(ctx, query); err != nil {
return nil, err
}
@ -109,10 +102,6 @@ func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string,
//
// It returns an error if wsEvents is not running.
func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error {
if !w.IsRunning() {
return rpcclient.ErrClientNotRunning
}
if err := w.ws.Unsubscribe(ctx, query); err != nil {
return err
}
@ -135,10 +124,6 @@ func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) er
//
// It returns an error if wsEvents is not running.
func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error {
if !w.IsRunning() {
return rpcclient.ErrClientNotRunning
}
if err := w.ws.UnsubscribeAll(ctx); err != nil {
return err
}


+ 2
- 6
rpc/client/interface.go View File

@ -32,14 +32,10 @@ import (
// Client describes the interface of Tendermint RPC client implementations.
type Client interface {
// These methods define the operational structure of the client.
// Start the client. Start must report an error if the client is running.
// Start the client, which will run until the context terminates.
// An error from Start indicates the client could not start.
Start(context.Context) error
// IsRunning reports whether the client is running.
IsRunning() bool
// These embedded interfaces define the callable methods of the service.
ABCIClient
EventsClient


+ 0
- 14
rpc/client/mocks/client.go View File

@ -526,20 +526,6 @@ func (_m *Client) Health(_a0 context.Context) (*coretypes.ResultHealth, error) {
return r0, r1
}
// IsRunning provides a mock function with given fields:
func (_m *Client) IsRunning() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// NetInfo provides a mock function with given fields: _a0
func (_m *Client) NetInfo(_a0 context.Context) (*coretypes.ResultNetInfo, error) {
ret := _m.Called(_a0)


+ 11
- 38
rpc/client/rpc_test.go View File

@ -40,12 +40,13 @@ func getHTTPClient(t *testing.T, logger log.Logger, conf *config.Config) *rpchtt
rpcAddr := conf.RPC.ListenAddress
c, err := rpchttp.NewWithClient(rpcAddr, http.DefaultClient)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
require.NoError(t, c.Start(ctx))
c.Logger = logger
t.Cleanup(func() {
if c.IsRunning() {
require.NoError(t, c.Stop())
}
cancel()
require.NoError(t, c.Stop())
})
return c
@ -56,16 +57,16 @@ func getHTTPClientWithTimeout(t *testing.T, logger log.Logger, conf *config.Conf
rpcAddr := conf.RPC.ListenAddress
http.DefaultClient.Timeout = timeout
c, err := rpchttp.NewWithClient(rpcAddr, http.DefaultClient)
tclient := &http.Client{Timeout: timeout}
c, err := rpchttp.NewWithClient(rpcAddr, tclient)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
require.NoError(t, c.Start(ctx))
c.Logger = logger
t.Cleanup(func() {
http.DefaultClient.Timeout = 0
if c.IsRunning() {
require.NoError(t, c.Stop())
}
cancel()
require.NoError(t, c.Stop())
})
return c
@ -184,25 +185,6 @@ func TestClientOperations(t *testing.T) {
wg.Wait()
})
})
t.Run("HTTPReturnsErrorIfClientIsNotRunning", func(t *testing.T) {
logger := log.NewTestingLogger(t)
c := getHTTPClientWithTimeout(t, logger, conf, 100*time.Millisecond)
// on Subscribe
_, err := c.Subscribe(ctx, "TestHeaderEvents",
types.QueryForEvent(types.EventNewBlockHeaderValue).String())
assert.Error(t, err)
// on Unsubscribe
err = c.Unsubscribe(ctx, "TestHeaderEvents",
types.QueryForEvent(types.EventNewBlockHeaderValue).String())
assert.Error(t, err)
// on UnsubscribeAll
err = c.UnsubscribeAll(ctx, "TestHeaderEvents")
assert.Error(t, err)
})
}
// Make sure info is correct (we connect properly)
@ -488,15 +470,6 @@ func TestClientMethodCalls(t *testing.T) {
assert.Equal(t, 0, pool.Size(), "mempool must be empty")
})
t.Run("Events", func(t *testing.T) {
// start for this test it if it wasn't already running
if !c.IsRunning() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// if so, then we start it, listen, and stop it.
err := c.Start(ctx)
require.NoError(t, err)
}
t.Run("Header", func(t *testing.T) {
evt, err := client.WaitForOneEvent(c, types.EventNewBlockHeaderValue, waitForEventTimeout)
require.NoError(t, err, "%d: %+v", i, err)
@ -538,7 +511,7 @@ func TestClientMethodCalls(t *testing.T) {
})
})
t.Run("Evidence", func(t *testing.T) {
t.Run("BraodcastDuplicateVote", func(t *testing.T) {
t.Run("BroadcastDuplicateVote", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()


+ 11
- 19
rpc/jsonrpc/client/integration_test.go View File

@ -19,24 +19,26 @@ import (
)
func TestWSClientReconnectWithJitter(t *testing.T) {
n := 8
var maxReconnectAttempts uint = 3
// Max wait time is ceil(1+0.999) + ceil(2+0.999) + ceil(4+0.999) + ceil(...) = 2 + 3 + 5 = 10s + ...
maxSleepTime := time.Second * time.Duration(((1<<uint(maxReconnectAttempts))-1)+maxReconnectAttempts)
const numClients = 8
const maxReconnectAttempts = 3
const maxSleepTime = time.Duration(((1<<maxReconnectAttempts)-1)+maxReconnectAttempts) * time.Second
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var errNotConnected = errors.New("not connected")
failDialer := func(net, addr string) (net.Conn, error) {
return nil, errors.New("not connected")
}
clientMap := make(map[int]*WSClient)
buf := new(bytes.Buffer)
for i := 0; i < n; i++ {
for i := 0; i < numClients; i++ {
c, err := NewWS("tcp://foo", "/websocket")
require.NoError(t, err)
c.Dialer = func(string, string) (net.Conn, error) {
return nil, errNotConnected
}
c.Dialer = failDialer
c.maxReconnectAttempts = maxReconnectAttempts
c.Start(ctx)
// Not invoking defer c.Stop() because
// after all the reconnect attempts have been
// exhausted, c.Stop is implicitly invoked.
@ -45,16 +47,6 @@ func TestWSClientReconnectWithJitter(t *testing.T) {
go c.reconnect(ctx)
}
stopCount := 0
time.Sleep(maxSleepTime)
for key, c := range clientMap {
if !c.IsActive() {
delete(clientMap, key)
stopCount++
}
}
require.Equal(t, stopCount, n, "expecting all clients to have been stopped")
// Next we have to examine the logs to ensure that no single time was repeated
backoffDurRegexp := regexp.MustCompile(`backoff_duration=(.+)`)
matches := backoffDurRegexp.FindAll(buf.Bytes(), -1)


+ 12
- 21
rpc/jsonrpc/client/ws_client.go View File

@ -13,7 +13,7 @@ import (
"github.com/gorilla/websocket"
metrics "github.com/rcrowley/go-metrics"
tmclient "github.com/tendermint/tendermint/rpc/client"
"github.com/tendermint/tendermint/libs/log"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
)
@ -38,8 +38,8 @@ var defaultWSOptions = wsOptions{
//
// WSClient is safe for concurrent use by multiple goroutines.
type WSClient struct { // nolint: maligned
*tmclient.RunState
conn *websocket.Conn
Logger log.Logger
conn *websocket.Conn
Address string // IP:PORT or /path/to/socket
Endpoint string // /websocket/url/endpoint
@ -105,7 +105,7 @@ func NewWS(remoteAddr, endpoint string) (*WSClient, error) {
}
c := &WSClient{
RunState: tmclient.NewRunState("WSClient", nil),
Logger: log.NewNopLogger(),
Address: parsedURL.GetTrimmedHostWithPath(),
Dialer: dialFn,
Endpoint: endpoint,
@ -134,13 +134,11 @@ func (c *WSClient) String() string {
return fmt.Sprintf("WSClient{%s (%s)}", c.Address, c.Endpoint)
}
// Start dials the specified service address and starts the I/O routines.
// Start dials the specified service address and starts the I/O routines. The
// service routines run until ctx terminates. To wait for the client to exit
// after ctx ends, call Stop.
func (c *WSClient) Start(ctx context.Context) error {
if err := c.RunState.Start(ctx); err != nil {
return err
}
err := c.dial()
if err != nil {
if err := c.dial(); err != nil {
return err
}
@ -160,17 +158,15 @@ func (c *WSClient) Start(ctx context.Context) error {
return nil
}
// Stop shuts down the client.
// Stop blocks until the client is shut down and returns nil.
//
// TODO(creachadair): This method exists for compatibility with the original
// service plumbing. Give it a better name (e.g., Wait).
func (c *WSClient) Stop() error {
if err := c.RunState.Stop(); err != nil {
return err
}
// only close user-facing channels when we can't write to them
c.wg.Wait()
c.PingPongLatencyTimer.Stop()
close(c.ResponsesCh)
return nil
}
@ -181,11 +177,6 @@ func (c *WSClient) IsReconnecting() bool {
return c.reconnecting
}
// IsActive returns true if the client is running and not reconnecting.
func (c *WSClient) IsActive() bool {
return c.IsRunning() && !c.IsReconnecting()
}
// Send the given RPC request to the server. Results will be available on
// ResponsesCh, errors, if any, on ErrorsCh. Will block until send succeeds or
// ctx.Done is closed.


+ 12
- 27
rpc/jsonrpc/client/ws_client_test.go View File

@ -5,7 +5,6 @@ import (
"encoding/json"
"net/http"
"net/http/httptest"
"runtime"
"sync"
"testing"
"time"
@ -13,9 +12,9 @@ import (
"github.com/fortytw2/leaktest"
"github.com/gorilla/websocket"
metrics "github.com/rcrowley/go-metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
)
@ -193,38 +192,27 @@ func TestWSClientReconnectFailure(t *testing.T) {
func TestNotBlockingOnStop(t *testing.T) {
t.Cleanup(leaktest.Check(t))
timeout := 3 * time.Second
s := httptest.NewServer(&myTestHandler{t: t})
defer s.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := startClient(ctx, t, "//"+s.Listener.Addr().String())
c.Call(ctx, "a", make(map[string]interface{})) // nolint:errcheck // ignore for tests
// Let the readRoutine get around to blocking
time.Sleep(time.Second)
passCh := make(chan struct{})
require.NoError(t, c.Call(ctx, "a", make(map[string]interface{})))
time.Sleep(200 * time.Millisecond) // give service routines time to start ⚠️
done := make(chan struct{})
go func() {
// Unless we have a non-blocking write to ResponsesCh from readRoutine
// this blocks forever ont the waitgroup
cancel()
require.NoError(t, c.Stop())
select {
case <-ctx.Done():
case passCh <- struct{}{}:
if assert.NoError(t, c.Stop()) {
close(done)
}
}()
runtime.Gosched() // hacks: force context switch
select {
case <-passCh:
// Pass
case <-time.After(timeout):
if c.IsRunning() {
t.Fatalf("WSClient did failed to stop within %v seconds - is one of the read/write routines blocking?",
timeout.Seconds())
}
case <-done:
t.Log("Stopped client successfully")
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for client to stop")
}
}
@ -234,11 +222,8 @@ func startClient(ctx context.Context, t *testing.T, addr string) *WSClient {
t.Cleanup(leaktest.Check(t))
c, err := NewWS(addr, "/websocket")
require.NoError(t, err)
err = c.Start(ctx)
require.NoError(t, err)
c.Logger = log.NewNopLogger()
require.NoError(t, c.Start(ctx))
return c
}


+ 2
- 9
rpc/jsonrpc/server/ws_handler.go View File

@ -11,7 +11,6 @@ import (
"github.com/gorilla/websocket"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/rpc/client"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
)
@ -103,7 +102,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ
//
// In case of an error, the connection is stopped.
type wsConnection struct {
*client.RunState
Logger log.Logger
remoteAddr string
baseConn *websocket.Conn
@ -145,7 +144,7 @@ func newWSConnection(
options ...func(*wsConnection),
) *wsConnection {
wsc := &wsConnection{
RunState: client.NewRunState("wsConnection", logger),
Logger: logger,
remoteAddr: baseConn.RemoteAddr().String(),
baseConn: baseConn,
funcMap: funcMap,
@ -194,9 +193,6 @@ func ReadLimit(readLimit int64) func(*wsConnection) {
// Start starts the client service routines and blocks until there is an error.
func (wsc *wsConnection) Start(ctx context.Context) error {
if err := wsc.RunState.Start(ctx); err != nil {
return err
}
wsc.writeChan = make(chan rpctypes.RPCResponse, defaultWSWriteChanCapacity)
// Read subscriptions/unsubscriptions to events
@ -209,9 +205,6 @@ func (wsc *wsConnection) Start(ctx context.Context) error {
// Stop unsubscribes the remote from all subscriptions.
func (wsc *wsConnection) Stop() error {
if err := wsc.RunState.Stop(); err != nil {
return err
}
if wsc.onDisconnect != nil {
wsc.onDisconnect(wsc.remoteAddr)
}


Loading…
Cancel
Save