Browse Source

unsubscribe from all subscriptions on WS disconnect

pull/788/head
Anton Kaliaev 7 years ago
parent
commit
4ffe9304ba
No known key found for this signature in database GPG Key ID: 7B6881D965918214
2 changed files with 29 additions and 11 deletions
  1. +5
    -1
      node/node.go
  2. +24
    -10
      rpc/lib/server/handlers.go

+ 5
- 1
node/node.go View File

@ -2,6 +2,7 @@ package node
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -415,7 +416,10 @@ func (n *Node) startRPC() ([]net.Listener, error) {
for i, listenAddr := range listenAddrs { for i, listenAddr := range listenAddrs {
mux := http.NewServeMux() mux := http.NewServeMux()
rpcLogger := n.Logger.With("module", "rpc-server") rpcLogger := n.Logger.With("module", "rpc-server")
wm := rpcserver.NewWebsocketManager(rpccore.Routes)
onDisconnect := rpcserver.OnDisconnect(func(remoteAddr string) {
n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
})
wm := rpcserver.NewWebsocketManager(rpccore.Routes, onDisconnect)
wm.SetLogger(rpcLogger.With("protocol", "websocket")) wm.SetLogger(rpcLogger.With("protocol", "websocket"))
mux.HandleFunc("/websocket", wm.WebsocketHandler) mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)


+ 24
- 10
rpc/lib/server/handlers.go View File

@ -349,9 +349,10 @@ const (
defaultWSPingPeriod = (defaultWSReadWait * 9) / 10 defaultWSPingPeriod = (defaultWSReadWait * 9) / 10
) )
// a single websocket connection
// contains listener id, underlying ws connection,
// and the event switch for subscribing to events
// a single websocket connection contains listener id, underlying ws
// connection, and the event switch for subscribing to events.
//
// In case of an error, the connection is stopped.
type wsConnection struct { type wsConnection struct {
cmn.BaseService cmn.BaseService
@ -374,13 +375,17 @@ type wsConnection struct {
// Send pings to server with this period. Must be less than readWait, but greater than zero. // Send pings to server with this period. Must be less than readWait, but greater than zero.
pingPeriod time.Duration pingPeriod time.Duration
// called before stopping the connection.
onDisconnect func(remoteAddr string)
} }
// NewWSConnection wraps websocket.Conn. See the commentary on the
// func(*wsConnection) functions for a detailed description of how to configure
// ping period and pong wait time.
// NOTE: if the write buffer is full, pongs may be dropped, which may cause clients to disconnect.
// see https://github.com/gorilla/websocket/issues/97
// NewWSConnection wraps websocket.Conn.
//
// See the commentary on the func(*wsConnection) functions for a detailed
// description of how to configure ping period and pong wait time. NOTE: if the
// write buffer is full, pongs may be dropped, which may cause clients to
// disconnect. see https://github.com/gorilla/websocket/issues/97
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, options ...func(*wsConnection)) *wsConnection { func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, options ...func(*wsConnection)) *wsConnection {
wsc := &wsConnection{ wsc := &wsConnection{
remoteAddr: baseConn.RemoteAddr().String(), remoteAddr: baseConn.RemoteAddr().String(),
@ -431,7 +436,16 @@ func PingPeriod(pingPeriod time.Duration) func(*wsConnection) {
} }
} }
// OnStart starts the read and write routines. It blocks until the connection closes.
// OnDisconnect called before stopping the connection.
// It should only be used in the constructor - not Goroutine-safe.
func OnDisconnect(cb func(remoteAddr string)) func(*wsConnection) {
return func(wsc *wsConnection) {
wsc.onDisconnect = cb
}
}
// OnStart implements cmn.Service by starting the read and write routines. It
// blocks until the connection closes.
func (wsc *wsConnection) OnStart() error { func (wsc *wsConnection) OnStart() error {
wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity) wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity)
@ -443,7 +457,7 @@ func (wsc *wsConnection) OnStart() error {
return nil return nil
} }
// OnStop unsubscribes from all events.
// OnStop is a nop.
func (wsc *wsConnection) OnStop() { func (wsc *wsConnection) OnStop() {
// Both read and write loops close the websocket connection when they exit their loops. // Both read and write loops close the websocket connection when they exit their loops.
// The writeChan is never closed, to allow WriteRPCResponse() to fail. // The writeChan is never closed, to allow WriteRPCResponse() to fail.


Loading…
Cancel
Save