Browse Source

rpc: Strip down the base RPC client interface. (#6971)

* rpc: Strip down the base RPC client interface.

Prior to this change, the RPC client interface requires implementing the entire
Service interface, but most of the methods of Service are not needed by the
concrete clients. Dissociate the Client interface from the Service interface.

- Extract only those methods of Service that are necessary to make the existing
  clients work.

- Update the clients to combine Start/Onstart and Stop/OnStop. This does not
  change what the clients do to start or stop. Only the websocket clients make
  use of this functionality anyway.

  The websocket implementation uses some plumbing from the BaseService helper.
  We should be able to excising that entirely, but the current interface
  dependencies among the clients would require a much larger change, and one
  that leaks into other (non-RPC) packages.

  As a less-invasive intermediate step, preserve the existing client behaviour
  (and tests) by extracting the necessary subset of the BaseService
  functionality to an analogous RunState helper for clients. I plan to obsolete
  that type in a future PR, but for now this makes a useful waypoint.

Related:
- Clean up client implementations.
- Update mocks.
pull/6974/head
M. J. Fromberger 3 years ago
committed by GitHub
parent
commit
1995ef2572
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 138 additions and 149 deletions
  1. +85
    -0
      rpc/client/helpers.go
  2. +0
    -6
      rpc/client/http/http.go
  3. +12
    -18
      rpc/client/http/ws.go
  4. +13
    -4
      rpc/client/interface.go
  5. +1
    -10
      rpc/client/mock/client.go
  6. +0
    -89
      rpc/client/mocks/client.go
  7. +10
    -10
      rpc/jsonrpc/client/ws_client.go
  8. +17
    -12
      rpc/jsonrpc/server/ws_handler.go

+ 85
- 0
rpc/client/helpers.go View File

@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
@ -82,3 +84,86 @@ func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) (
return nil, errors.New("timed out waiting for event")
}
}
var (
// ErrClientRunning is returned by Start when the client is already running.
ErrClientRunning = errors.New("client already running")
// ErrClientNotRunning is returned by Stop when the client is not running.
ErrClientNotRunning = errors.New("client is not running")
)
// RunState is a helper that a client implementation can embed to implement
// common plumbing for keeping track of run state and logging.
//
// TODO(creachadair): This type is a temporary measure, and will be removed.
// See the discussion on #6971.
type RunState struct {
Logger log.Logger
mu sync.Mutex
name string
isRunning bool
quit chan struct{}
}
// NewRunState returns a new unstarted run state tracker with the given logging
// label and log sink. If logger == nil, a no-op logger is provided by default.
func NewRunState(name string, logger log.Logger) *RunState {
if logger == nil {
logger = log.NewNopLogger()
}
return &RunState{
name: name,
Logger: logger,
}
}
// Start sets the state to running, or reports an error.
func (r *RunState) Start() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.isRunning {
r.Logger.Error("not starting client, it is already started", "client", r.name)
return ErrClientRunning
}
r.Logger.Info("starting client", "client", r.name)
r.isRunning = true
r.quit = make(chan struct{})
return nil
}
// Stop sets the state to not running, or reports an error.
func (r *RunState) Stop() error {
r.mu.Lock()
defer r.mu.Unlock()
if !r.isRunning {
r.Logger.Error("not stopping client; it is already stopped", "client", r.name)
return ErrClientNotRunning
}
r.Logger.Info("stopping client", "client", r.name)
r.isRunning = false
close(r.quit)
return nil
}
// SetLogger updates the log sink.
func (r *RunState) SetLogger(logger log.Logger) {
r.mu.Lock()
defer r.mu.Unlock()
r.Logger = logger
}
// IsRunning reports whether the state is running.
func (r *RunState) IsRunning() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.isRunning
}
// Quit returns a channel that is closed when a call to Stop succeeds.
func (r *RunState) Quit() <-chan struct{} {
r.mu.Lock()
defer r.mu.Unlock()
return r.quit
}

+ 0
- 6
rpc/client/http/http.go View File

@ -6,7 +6,6 @@ import (
"time"
"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/log"
rpcclient "github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/coretypes"
jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client"
@ -158,11 +157,6 @@ func NewWithClientAndWSOptions(remote string, c *http.Client, wso WSOptions) (*H
var _ rpcclient.Client = (*HTTP)(nil)
// SetLogger sets a logger.
func (c *HTTP) SetLogger(l log.Logger) {
c.wsEvents.SetLogger(l)
}
// Remote returns the remote network address in a string form.
func (c *HTTP) Remote() string {
return c.remote


+ 12
- 18
rpc/client/http/ws.go View File

@ -10,14 +10,11 @@ import (
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
tmjson "github.com/tendermint/tendermint/libs/json"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/service"
rpcclient "github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/coretypes"
jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client"
)
var errNotRunning = errors.New("client is not running. Use .Start() method to start")
// WSOptions for the WS part of the HTTP client.
type WSOptions struct {
Path string // path (e.g. "/ws")
@ -48,7 +45,7 @@ func (wso WSOptions) Validate() error {
// wsEvents is a wrapper around WSClient, which implements EventsClient.
type wsEvents struct {
service.BaseService
*rpcclient.RunState
ws *jsonrpcclient.WSClient
mtx tmsync.RWMutex
@ -78,7 +75,7 @@ func newWsEvents(remote string, wso WSOptions) (*wsEvents, error) {
w := &wsEvents{
subscriptions: make(map[string]*wsSubscription),
}
w.BaseService = *service.NewBaseService(nil, "wsEvents", w)
w.RunState = rpcclient.NewRunState("wsEvents", nil)
var err error
w.ws, err = jsonrpcclient.NewWSWithOptions(remote, wso.Path, wso.WSOptions)
@ -94,23 +91,20 @@ func newWsEvents(remote string, wso WSOptions) (*wsEvents, error) {
return w, nil
}
// OnStart implements service.Service by starting WSClient and event loop.
func (w *wsEvents) OnStart() error {
// Start starts the websocket client and the event loop.
func (w *wsEvents) Start() error {
if err := w.ws.Start(); err != nil {
return err
}
go w.eventListener()
return nil
}
// OnStop implements service.Service by stopping WSClient.
func (w *wsEvents) OnStop() {
if err := w.ws.Stop(); err != nil {
w.Logger.Error("Can't stop ws client", "err", err)
}
}
// IsRunning reports whether the websocket client is running.
func (w *wsEvents) IsRunning() bool { return w.ws.IsRunning() }
// Stop shuts down the websocket client.
func (w *wsEvents) Stop() error { return w.ws.Stop() }
// Subscribe implements EventsClient by using WSClient to subscribe given
// subscriber to query. By default, it returns a channel with cap=1. Error is
@ -128,7 +122,7 @@ func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string,
outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
if !w.IsRunning() {
return nil, errNotRunning
return nil, rpcclient.ErrClientNotRunning
}
if err := w.ws.Subscribe(ctx, query); err != nil {
@ -156,7 +150,7 @@ func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string,
// It returns an error if wsEvents is not running.
func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error {
if !w.IsRunning() {
return errNotRunning
return rpcclient.ErrClientNotRunning
}
if err := w.ws.Unsubscribe(ctx, query); err != nil {
@ -182,7 +176,7 @@ func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) er
// It returns an error if wsEvents is not running.
func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error {
if !w.IsRunning() {
return errNotRunning
return rpcclient.ErrClientNotRunning
}
if err := w.ws.UnsubscribeAll(ctx); err != nil {


+ 13
- 4
rpc/client/interface.go View File

@ -24,17 +24,26 @@ import (
"context"
"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/service"
ctypes "github.com/tendermint/tendermint/rpc/coretypes"
"github.com/tendermint/tendermint/types"
)
//go:generate ../../scripts/mockery_generate.sh Client
// Client wraps most important rpc calls a client would make if you want to
// listen for events, test if it also implements events.EventSwitch.
// Client describes the interface of Tendermint RPC client implementations.
type Client interface {
service.Service
// These methods define the operational structure of the client.
// Start the client. Start must report an error if the client is running.
Start() error
// Stop the client. Stop must report an error if the client is not running.
Stop() error
// IsRunning reports whether the client is running.
IsRunning() bool
// These embedded interfaces define the callable methods of the service.
ABCIClient
EventsClient
HistoryClient


+ 1
- 10
rpc/client/mock/client.go View File

@ -20,7 +20,6 @@ import (
"github.com/tendermint/tendermint/internal/rpc/core"
"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/coretypes"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
@ -29,15 +28,7 @@ import (
// Client wraps arbitrary implementations of the various interfaces.
type Client struct {
client.ABCIClient
client.SignClient
client.HistoryClient
client.StatusClient
client.EventsClient
client.EvidenceClient
client.MempoolClient
service.Service
client.Client
env *core.Environment
}


+ 0
- 89
rpc/client/mocks/client.go View File

@ -10,8 +10,6 @@ import (
coretypes "github.com/tendermint/tendermint/rpc/coretypes"
log "github.com/tendermint/tendermint/libs/log"
mock "github.com/stretchr/testify/mock"
types "github.com/tendermint/tendermint/types"
@ -542,74 +540,6 @@ func (_m *Client) NumUnconfirmedTxs(_a0 context.Context) (*coretypes.ResultUncon
return r0, r1
}
// OnReset provides a mock function with given fields:
func (_m *Client) OnReset() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// OnStart provides a mock function with given fields:
func (_m *Client) OnStart() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// OnStop provides a mock function with given fields:
func (_m *Client) OnStop() {
_m.Called()
}
// Quit provides a mock function with given fields:
func (_m *Client) Quit() <-chan struct{} {
ret := _m.Called()
var r0 <-chan struct{}
if rf, ok := ret.Get(0).(func() <-chan struct{}); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan struct{})
}
}
return r0
}
// Reset provides a mock function with given fields:
func (_m *Client) Reset() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// SetLogger provides a mock function with given fields: _a0
func (_m *Client) SetLogger(_a0 log.Logger) {
_m.Called(_a0)
}
// Start provides a mock function with given fields:
func (_m *Client) Start() error {
ret := _m.Called()
@ -661,20 +591,6 @@ func (_m *Client) Stop() error {
return r0
}
// String provides a mock function with given fields:
func (_m *Client) String() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// Subscribe provides a mock function with given fields: ctx, subscriber, query, outCapacity
func (_m *Client) Subscribe(ctx context.Context, subscriber string, query string, outCapacity ...int) (<-chan coretypes.ResultEvent, error) {
_va := make([]interface{}, len(outCapacity))
@ -824,8 +740,3 @@ func (_m *Client) Validators(ctx context.Context, height *int64, page *int, perP
return r0, r1
}
// Wait provides a mock function with given fields:
func (_m *Client) Wait() {
_m.Called()
}

+ 10
- 10
rpc/jsonrpc/client/ws_client.go View File

@ -14,7 +14,7 @@ import (
metrics "github.com/rcrowley/go-metrics"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/service"
tmclient "github.com/tendermint/tendermint/rpc/client"
types "github.com/tendermint/tendermint/rpc/jsonrpc/types"
)
@ -41,6 +41,7 @@ func DefaultWSOptions() WSOptions {
//
// WSClient is safe for concurrent use by multiple goroutines.
type WSClient struct { // nolint: maligned
*tmclient.RunState
conn *websocket.Conn
Address string // IP:PORT or /path/to/socket
@ -83,8 +84,6 @@ type WSClient struct { // nolint: maligned
// Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent.
pingPeriod time.Duration
service.BaseService
// Time between sending a ping and receiving a pong. See
// https://godoc.org/github.com/rcrowley/go-metrics#Timer.
PingPongLatencyTimer metrics.Timer
@ -114,6 +113,7 @@ func NewWSWithOptions(remoteAddr, endpoint string, opts WSOptions) (*WSClient, e
}
c := &WSClient{
RunState: tmclient.NewRunState("WSClient", nil),
Address: parsedURL.GetTrimmedHostWithPath(),
Dialer: dialFn,
Endpoint: endpoint,
@ -127,7 +127,6 @@ func NewWSWithOptions(remoteAddr, endpoint string, opts WSOptions) (*WSClient, e
// sentIDs: make(map[types.JSONRPCIntID]bool),
}
c.BaseService = *service.NewBaseService(nil, "WSClient", c)
return c, nil
}
@ -143,9 +142,11 @@ func (c *WSClient) String() string {
return fmt.Sprintf("WSClient{%s (%s)}", c.Address, c.Endpoint)
}
// OnStart implements service.Service by dialing a server and creating read and
// write routines.
func (c *WSClient) OnStart() error {
// Start dials the specified service address and starts the I/O routines.
func (c *WSClient) Start() error {
if err := c.RunState.Start(); err != nil {
return err
}
err := c.dial()
if err != nil {
return err
@ -167,10 +168,9 @@ func (c *WSClient) OnStart() error {
return nil
}
// Stop overrides service.Service#Stop. There is no other way to wait until Quit
// channel is closed.
// Stop shuts down the client.
func (c *WSClient) Stop() error {
if err := c.BaseService.Stop(); err != nil {
if err := c.RunState.Stop(); err != nil {
return err
}
// only close user-facing channels when we can't write to them


+ 17
- 12
rpc/jsonrpc/server/ws_handler.go View File

@ -13,7 +13,7 @@ import (
"github.com/gorilla/websocket"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/coretypes"
types "github.com/tendermint/tendermint/rpc/jsonrpc/types"
)
@ -86,8 +86,8 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ
}()
// register connection
con := newWSConnection(wsConn, wm.funcMap, wm.wsConnOptions...)
con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr()))
logger := wm.logger.With("remote", wsConn.RemoteAddr())
con := newWSConnection(wsConn, wm.funcMap, logger, wm.wsConnOptions...)
wm.logger.Info("New websocket connection", "remote", con.remoteAddr)
err = con.Start() // BLOCKING
if err != nil {
@ -106,7 +106,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ
//
// In case of an error, the connection is stopped.
type wsConnection struct {
service.BaseService
*client.RunState
remoteAddr string
baseConn *websocket.Conn
@ -150,9 +150,11 @@ type wsConnection struct {
func newWSConnection(
baseConn *websocket.Conn,
funcMap map[string]*RPCFunc,
logger log.Logger,
options ...func(*wsConnection),
) *wsConnection {
wsc := &wsConnection{
RunState: client.NewRunState("wsConnection", logger),
remoteAddr: baseConn.RemoteAddr().String(),
baseConn: baseConn,
funcMap: funcMap,
@ -166,7 +168,6 @@ func newWSConnection(
option(wsc)
}
wsc.baseConn.SetReadLimit(wsc.readLimit)
wsc.BaseService = *service.NewBaseService(nil, "wsConnection", wsc)
return wsc
}
@ -218,9 +219,11 @@ func ReadLimit(readLimit int64) func(*wsConnection) {
}
}
// OnStart implements service.Service by starting the read and write routines. It
// blocks until there's some error.
func (wsc *wsConnection) OnStart() error {
// Start starts the client service routines and blocks until there is an error.
func (wsc *wsConnection) Start() error {
if err := wsc.RunState.Start(); err != nil {
return err
}
wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity)
// Read subscriptions/unsubscriptions to events
@ -231,16 +234,18 @@ func (wsc *wsConnection) OnStart() error {
return nil
}
// OnStop implements service.Service by unsubscribing remoteAddr from all
// subscriptions.
func (wsc *wsConnection) OnStop() {
// Stop unsubscribes the remote from all subscriptions.
func (wsc *wsConnection) Stop() error {
if err := wsc.RunState.Stop(); err != nil {
return err
}
if wsc.onDisconnect != nil {
wsc.onDisconnect(wsc.remoteAddr)
}
if wsc.ctx != nil {
wsc.cancel()
}
return nil
}
// GetRemoteAddr returns the remote address of the underlying connection.


Loading…
Cancel
Save