diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 6a932caf6..f56c45e99 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -92,24 +92,32 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSCli return c } +// MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error. +// It should only be used in the constructor and is not Goroutine-safe. func MaxReconnectAttempts(max int) func(*WSClient) { return func(c *WSClient) { c.maxReconnectAttempts = max } } +// ReadWait sets the amount of time to wait before a websocket read times out. +// It should only be used in the constructor and is not Goroutine-safe. func ReadWait(readWait time.Duration) func(*WSClient) { return func(c *WSClient) { c.readWait = readWait } } +// WriteWait sets the amount of time to wait before a websocket write times out. +// It should only be used in the constructor and is not Goroutine-safe. func WriteWait(writeWait time.Duration) func(*WSClient) { return func(c *WSClient) { c.writeWait = writeWait } } +// PingPeriod sets the duration for sending websocket pings. +// It should only be used in the constructor - not Goroutine-safe. func PingPeriod(pingPeriod time.Duration) func(*WSClient) { return func(c *WSClient) { c.pingPeriod = pingPeriod diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index 6f39690db..f5aa027fe 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -32,6 +32,7 @@ func (h *myHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { panic(err) } + defer conn.Close() for { messageType, _, err := conn.ReadMessage() if err != nil { diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 203a71ff7..3b81567e4 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -21,7 +21,7 @@ import ( "github.com/tendermint/tmlibs/log" ) -// Adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions. +// RegisterRPCFuncs adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions. // "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger log.Logger) { // HTTP endpoints @@ -36,7 +36,7 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger lo //------------------------------------- // function introspection -// holds all type information for each function +// RPCFunc contains the introspected type information for a function type RPCFunc struct { f reflect.Value // underlying rpc function args []reflect.Type // type of each function arg @@ -45,12 +45,13 @@ type RPCFunc struct { ws bool // websocket only } -// wraps a function for quicker introspection +// NewRPCFunc wraps a function for introspection. // f is the function, args are comma separated argument names func NewRPCFunc(f interface{}, args string) *RPCFunc { return newRPCFunc(f, args, false) } +// NewWSRPCFunc wraps a function for introspection and use in the websockets. func NewWSRPCFunc(f interface{}, args string) *RPCFunc { return newRPCFunc(f, args, true) } @@ -372,6 +373,8 @@ type wsConnection struct { // NewWSConnection wraps websocket.Conn. See the commentary on the // func(*wsConnection) functions for a detailed description of how to configure // ping period and pong wait time. +// NOTE: if the write buffer is full, pongs may be dropped, which may cause clients to disconnect. +// see https://github.com/gorilla/websocket/issues/97 func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection { wsc := &wsConnection{ remoteAddr: baseConn.RemoteAddr().String(), @@ -390,31 +393,39 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw return wsc } +// WriteWait sets the amount of time to wait before a websocket write times out. +// It should only be used in the constructor - not Goroutine-safe. func WriteWait(writeWait time.Duration) func(*wsConnection) { return func(wsc *wsConnection) { wsc.writeWait = writeWait } } +// WriteChanCapacity sets the capacity of the websocket write channel. +// It should only be used in the constructor - not Goroutine-safe. func WriteChanCapacity(cap int) func(*wsConnection) { return func(wsc *wsConnection) { wsc.writeChanCapacity = cap } } +// ReadWait sets the amount of time to wait before a websocket read times out. +// It should only be used in the constructor - not Goroutine-safe. func ReadWait(readWait time.Duration) func(*wsConnection) { return func(wsc *wsConnection) { wsc.readWait = readWait } } +// PingPeriod sets the duration for sending websocket pings. +// It should only be used in the constructor - not Goroutine-safe. func PingPeriod(pingPeriod time.Duration) func(*wsConnection) { return func(wsc *wsConnection) { wsc.pingPeriod = pingPeriod } } -// wsc.Start() blocks until the connection closes. +// OnStart starts the read and write routines. It blocks until the connection closes. func (wsc *wsConnection) OnStart() error { wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity) @@ -426,27 +437,29 @@ func (wsc *wsConnection) OnStart() error { return nil } +// OnStop unsubscribes from all events. func (wsc *wsConnection) OnStop() { if wsc.evsw != nil { wsc.evsw.RemoveListener(wsc.remoteAddr) } - // The write loop closes the websocket connection when it exits its loop, and - // the read loop closes the writeChan. + // Both read and write loops close the websocket connection when they exit their loops. + // The writeChan is never closed, to allow WriteRPCResponse() to fail. } -// Implements WSRPCConnection +// GetRemoteAddr returns the remote address of the underlying connection. +// It implements WSRPCConnection func (wsc *wsConnection) GetRemoteAddr() string { return wsc.remoteAddr } -// Implements WSRPCConnection +// GetEventSwitch returns the event switch. +// It implements WSRPCConnection func (wsc *wsConnection) GetEventSwitch() events.EventSwitch { return wsc.evsw } -// Implements WSRPCConnection -// Blocking write to writeChan until service stops. -// Goroutine-safe +// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. +// It implements WSRPCConnection. It is Goroutine-safe. func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { select { case <-wsc.Quit: @@ -455,9 +468,8 @@ func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { } } -// Implements WSRPCConnection -// Nonblocking write. -// Goroutine-safe +// TryWriteRPCResponse attempts to push a response to the writeChan, but does not block. +// It implements WSRPCConnection. It is Goroutine-safe func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { select { case <-wsc.Quit: @@ -475,8 +487,6 @@ func (wsc *wsConnection) readRoutine() { wsc.baseConn.Close() }() - // Do not close writeChan, to allow WriteRPCResponse() to fail. - // defer close(wsc.writeChan) for { select { case <-wsc.Quit: @@ -598,8 +608,8 @@ func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error //---------------------------------------- -// Main manager for all websocket connections -// Holds the event switch +// WebsocketManager is the main manager for all websocket connections. +// It holds the event switch and a map of functions for routing. // NOTE: The websocket path is defined externally, e.g. in node/node.go type WebsocketManager struct { websocket.Upgrader @@ -609,6 +619,8 @@ type WebsocketManager struct { wsConnOptions []func(*wsConnection) } +// NewWebsocketManager returns a new WebsocketManager that routes according to the given funcMap, listens on the given event switch, +// and connects to the server with the given connection options. func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, wsConnOptions ...func(*wsConnection)) *WebsocketManager { return &WebsocketManager{ funcMap: funcMap, @@ -624,11 +636,12 @@ func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, w } } +// SetLogger sets the logger. func (wm *WebsocketManager) SetLogger(l log.Logger) { wm.logger = l } -// Upgrade the request/response (via http.Hijack) and starts the wsConnection. +// WebsocketHandler upgrades the request/response (via http.Hijack) and starts the wsConnection. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { wsConn, err := wm.Upgrade(w, r, nil) if err != nil {