Browse Source

Merge pull request #1769 from tendermint/1755-possible-memory-leak

Memory leak in Websocket
pull/1721/merge
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
c793a72ac5
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 77 additions and 14 deletions
  1. +6
    -0
      CHANGELOG.md
  2. +8
    -1
      libs/pubsub/pubsub.go
  3. +9
    -10
      rpc/lib/server/handlers.go
  4. +50
    -1
      rpc/lib/server/handlers_test.go
  5. +3
    -1
      rpc/lib/types/types.go
  6. +1
    -1
      types/event_bus.go

+ 6
- 0
CHANGELOG.md View File

@ -1,5 +1,11 @@
# Changelog
## 0.20.1
BUG FIXES:
- [rpc] fix memory leak in Websocket (when using `/subscribe` method)
## 0.20.0
*June 6th, 2018*


+ 8
- 1
libs/pubsub/pubsub.go View File

@ -156,6 +156,8 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou
if _, ok = s.subscriptions[clientID]; !ok {
s.subscriptions[clientID] = make(map[string]Query)
}
// preserve original query
// see Unsubscribe
s.subscriptions[clientID][query.String()] = query
s.mtx.Unlock()
return nil
@ -314,6 +316,9 @@ func (state *state) remove(clientID string, q Query) {
}
delete(state.queries[q], clientID)
if len(state.queries[q]) == 0 {
delete(state.queries, q)
}
}
}
@ -328,8 +333,10 @@ func (state *state) removeAll(clientID string) {
close(ch)
delete(state.queries[q], clientID)
if len(state.queries[q]) == 0 {
delete(state.queries, q)
}
}
delete(state.clients, clientID)
}


+ 9
- 10
rpc/lib/server/handlers.go View File

@ -17,7 +17,7 @@ import (
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/tendermint/go-amino"
amino "github.com/tendermint/go-amino"
types "github.com/tendermint/tendermint/rpc/lib/types"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
@ -613,11 +613,9 @@ func (wsc *wsConnection) readRoutine() {
if err != nil {
wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err))
continue
} else {
wsc.WriteRPCResponse(types.NewRPCSuccessResponse(wsc.cdc, request.ID, result))
continue
}
wsc.WriteRPCResponse(types.NewRPCSuccessResponse(wsc.cdc, request.ID, result))
}
}
}
@ -684,20 +682,20 @@ func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error
//----------------------------------------
// WebsocketManager is the main manager for all websocket connections.
// It holds the event switch and a map of functions for routing.
// WebsocketManager provides a WS handler for incoming connections and passes a
// map of functions along with any additional params to new connections.
// NOTE: The websocket path is defined externally, e.g. in node/node.go
type WebsocketManager struct {
websocket.Upgrader
funcMap map[string]*RPCFunc
cdc *amino.Codec
logger log.Logger
wsConnOptions []func(*wsConnection)
}
// NewWebsocketManager returns a new WebsocketManager that routes according to
// the given funcMap and connects to the server with the given connection
// options.
// NewWebsocketManager returns a new WebsocketManager that passes a map of
// functions, connection options and logger to new WS connections.
func NewWebsocketManager(funcMap map[string]*RPCFunc, cdc *amino.Codec, wsConnOptions ...func(*wsConnection)) *WebsocketManager {
return &WebsocketManager{
funcMap: funcMap,
@ -718,7 +716,8 @@ func (wm *WebsocketManager) SetLogger(l log.Logger) {
wm.logger = l
}
// WebsocketHandler upgrades the request/response (via http.Hijack) and starts the wsConnection.
// WebsocketHandler upgrades the request/response (via http.Hijack) and starts
// the wsConnection.
func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
wsConn, err := wm.Upgrade(w, r, nil)
if err != nil {


+ 50
- 1
rpc/lib/server/handlers_test.go View File

@ -9,15 +9,23 @@ import (
"strings"
"testing"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/go-amino"
amino "github.com/tendermint/go-amino"
rs "github.com/tendermint/tendermint/rpc/lib/server"
types "github.com/tendermint/tendermint/rpc/lib/types"
"github.com/tendermint/tmlibs/log"
)
//////////////////////////////////////////////////////////////////////////////
// HTTP REST API
// TODO
//////////////////////////////////////////////////////////////////////////////
// JSON-RPC over HTTP
func testMux() *http.ServeMux {
funcMap := map[string]*rs.RPCFunc{
"c": rs.NewRPCFunc(func(s string, i int) (string, error) { return "foo", nil }, "s,i"),
@ -108,3 +116,44 @@ func TestUnknownRPCPath(t *testing.T) {
// Always expecting back a 404 error
require.Equal(t, http.StatusNotFound, res.StatusCode, "should always return 404")
}
//////////////////////////////////////////////////////////////////////////////
// JSON-RPC over WEBSOCKETS
func TestWebsocketManagerHandler(t *testing.T) {
s := newWSServer()
defer s.Close()
// check upgrader works
d := websocket.Dialer{}
c, dialResp, err := d.Dial("ws://"+s.Listener.Addr().String()+"/websocket", nil)
require.NoError(t, err)
if got, want := dialResp.StatusCode, http.StatusSwitchingProtocols; got != want {
t.Errorf("dialResp.StatusCode = %q, want %q", got, want)
}
// check basic functionality works
req, err := types.MapToRequest(amino.NewCodec(), "TestWebsocketManager", "c", map[string]interface{}{"s": "a", "i": 10})
require.NoError(t, err)
err = c.WriteJSON(req)
require.NoError(t, err)
var resp types.RPCResponse
err = c.ReadJSON(&resp)
require.NoError(t, err)
require.Nil(t, resp.Error)
}
func newWSServer() *httptest.Server {
funcMap := map[string]*rs.RPCFunc{
"c": rs.NewWSRPCFunc(func(wsCtx types.WSRPCContext, s string, i int) (string, error) { return "foo", nil }, "s,i"),
}
wm := rs.NewWebsocketManager(funcMap, amino.NewCodec())
wm.SetLogger(log.TestingLogger())
mux := http.NewServeMux()
mux.HandleFunc("/websocket", wm.WebsocketHandler)
return httptest.NewServer(mux)
}

+ 3
- 1
rpc/lib/types/types.go View File

@ -7,7 +7,9 @@ import (
"strings"
"github.com/pkg/errors"
"github.com/tendermint/go-amino"
amino "github.com/tendermint/go-amino"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
)


+ 1
- 1
types/event_bus.go View File

@ -9,7 +9,7 @@ import (
"github.com/tendermint/tmlibs/log"
)
const defaultCapacity = 1000
const defaultCapacity = 0
type EventBusSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error


Loading…
Cancel
Save