You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

715 lines
22 KiB

8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
8 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
8 years ago
7 years ago
8 years ago
  1. package rpcserver
  2. import (
  3. "bytes"
  4. "encoding/hex"
  5. "encoding/json"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "reflect"
  10. "sort"
  11. "strings"
  12. "time"
  13. "github.com/gorilla/websocket"
  14. "github.com/pkg/errors"
  15. types "github.com/tendermint/tendermint/rpc/lib/types"
  16. cmn "github.com/tendermint/tmlibs/common"
  17. events "github.com/tendermint/tmlibs/events"
  18. "github.com/tendermint/tmlibs/log"
  19. )
  20. // RegisterRPCFuncs adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions.
  21. // "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse
  22. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger log.Logger) {
  23. // HTTP endpoints
  24. for funcName, rpcFunc := range funcMap {
  25. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc, logger))
  26. }
  27. // JSONRPC endpoints
  28. mux.HandleFunc("/", makeJSONRPCHandler(funcMap, logger))
  29. }
  30. //-------------------------------------
  31. // function introspection
  32. // RPCFunc contains the introspected type information for a function
  33. type RPCFunc struct {
  34. f reflect.Value // underlying rpc function
  35. args []reflect.Type // type of each function arg
  36. returns []reflect.Type // type of each return arg
  37. argNames []string // name of each argument
  38. ws bool // websocket only
  39. }
  40. // NewRPCFunc wraps a function for introspection.
  41. // f is the function, args are comma separated argument names
  42. func NewRPCFunc(f interface{}, args string) *RPCFunc {
  43. return newRPCFunc(f, args, false)
  44. }
  45. // NewWSRPCFunc wraps a function for introspection and use in the websockets.
  46. func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
  47. return newRPCFunc(f, args, true)
  48. }
  49. func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc {
  50. var argNames []string
  51. if args != "" {
  52. argNames = strings.Split(args, ",")
  53. }
  54. return &RPCFunc{
  55. f: reflect.ValueOf(f),
  56. args: funcArgTypes(f),
  57. returns: funcReturnTypes(f),
  58. argNames: argNames,
  59. ws: ws,
  60. }
  61. }
  62. // return a function's argument types
  63. func funcArgTypes(f interface{}) []reflect.Type {
  64. t := reflect.TypeOf(f)
  65. n := t.NumIn()
  66. typez := make([]reflect.Type, n)
  67. for i := 0; i < n; i++ {
  68. typez[i] = t.In(i)
  69. }
  70. return typez
  71. }
  72. // return a function's return types
  73. func funcReturnTypes(f interface{}) []reflect.Type {
  74. t := reflect.TypeOf(f)
  75. n := t.NumOut()
  76. typez := make([]reflect.Type, n)
  77. for i := 0; i < n; i++ {
  78. typez[i] = t.Out(i)
  79. }
  80. return typez
  81. }
  82. // function introspection
  83. //-----------------------------------------------------------------------------
  84. // rpc.json
  85. // jsonrpc calls grab the given method's function info and runs reflect.Call
  86. func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.HandlerFunc {
  87. return func(w http.ResponseWriter, r *http.Request) {
  88. b, _ := ioutil.ReadAll(r.Body)
  89. // if its an empty request (like from a browser),
  90. // just display a list of functions
  91. if len(b) == 0 {
  92. writeListOfEndpoints(w, r, funcMap)
  93. return
  94. }
  95. var request types.RPCRequest
  96. err := json.Unmarshal(b, &request)
  97. if err != nil {
  98. WriteRPCResponseHTTPError(w, http.StatusBadRequest, types.NewRPCResponse("", nil, fmt.Sprintf("Error unmarshalling request: %v", err.Error())))
  99. return
  100. }
  101. if len(r.URL.Path) > 1 {
  102. WriteRPCResponseHTTPError(w, http.StatusNotFound, types.NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  103. return
  104. }
  105. rpcFunc := funcMap[request.Method]
  106. if rpcFunc == nil {
  107. WriteRPCResponseHTTPError(w, http.StatusNotFound, types.NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  108. return
  109. }
  110. if rpcFunc.ws {
  111. WriteRPCResponseHTTPError(w, http.StatusMethodNotAllowed, types.NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method))
  112. return
  113. }
  114. args, err := jsonParamsToArgsRPC(rpcFunc, request.Params)
  115. if err != nil {
  116. WriteRPCResponseHTTPError(w, http.StatusBadRequest, types.NewRPCResponse(request.ID, nil, fmt.Sprintf("Error converting json params to arguments: %v", err.Error())))
  117. return
  118. }
  119. returns := rpcFunc.f.Call(args)
  120. logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  121. result, err := unreflectResult(returns)
  122. if err != nil {
  123. WriteRPCResponseHTTPError(w, http.StatusInternalServerError, types.NewRPCResponse(request.ID, result, err.Error()))
  124. return
  125. }
  126. WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, result, ""))
  127. }
  128. }
  129. func mapParamsToArgs(rpcFunc *RPCFunc, params map[string]*json.RawMessage, argsOffset int) ([]reflect.Value, error) {
  130. values := make([]reflect.Value, len(rpcFunc.argNames))
  131. for i, argName := range rpcFunc.argNames {
  132. argType := rpcFunc.args[i+argsOffset]
  133. if p, ok := params[argName]; ok && p != nil && len(*p) > 0 {
  134. val := reflect.New(argType)
  135. err := json.Unmarshal(*p, val.Interface())
  136. if err != nil {
  137. return nil, err
  138. }
  139. values[i] = val.Elem()
  140. } else { // use default for that type
  141. values[i] = reflect.Zero(argType)
  142. }
  143. }
  144. return values, nil
  145. }
  146. func arrayParamsToArgs(rpcFunc *RPCFunc, params []*json.RawMessage, argsOffset int) ([]reflect.Value, error) {
  147. if len(rpcFunc.argNames) != len(params) {
  148. return nil, errors.Errorf("Expected %v parameters (%v), got %v (%v)",
  149. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params)
  150. }
  151. values := make([]reflect.Value, len(params))
  152. for i, p := range params {
  153. argType := rpcFunc.args[i+argsOffset]
  154. val := reflect.New(argType)
  155. err := json.Unmarshal(*p, val.Interface())
  156. if err != nil {
  157. return nil, err
  158. }
  159. values[i] = val.Elem()
  160. }
  161. return values, nil
  162. }
  163. // raw is unparsed json (from json.RawMessage) encoding either a map or an array.
  164. //
  165. // argsOffset should be 0 for RPC calls, and 1 for WS requests, where len(rpcFunc.args) != len(rpcFunc.argNames).
  166. // Example:
  167. // rpcFunc.args = [rpctypes.WSRPCContext string]
  168. // rpcFunc.argNames = ["arg"]
  169. func jsonParamsToArgs(rpcFunc *RPCFunc, raw []byte, argsOffset int) ([]reflect.Value, error) {
  170. // first, try to get the map..
  171. var m map[string]*json.RawMessage
  172. err := json.Unmarshal(raw, &m)
  173. if err == nil {
  174. return mapParamsToArgs(rpcFunc, m, argsOffset)
  175. }
  176. // otherwise, try an array
  177. var a []*json.RawMessage
  178. err = json.Unmarshal(raw, &a)
  179. if err == nil {
  180. return arrayParamsToArgs(rpcFunc, a, argsOffset)
  181. }
  182. // otherwise, bad format, we cannot parse
  183. return nil, errors.Errorf("Unknown type for JSON params: %v. Expected map or array", err)
  184. }
  185. // Convert a []interface{} OR a map[string]interface{} to properly typed values
  186. func jsonParamsToArgsRPC(rpcFunc *RPCFunc, params *json.RawMessage) ([]reflect.Value, error) {
  187. return jsonParamsToArgs(rpcFunc, *params, 0)
  188. }
  189. // Same as above, but with the first param the websocket connection
  190. func jsonParamsToArgsWS(rpcFunc *RPCFunc, params *json.RawMessage, wsCtx types.WSRPCContext) ([]reflect.Value, error) {
  191. values, err := jsonParamsToArgs(rpcFunc, *params, 1)
  192. if err != nil {
  193. return nil, err
  194. }
  195. return append([]reflect.Value{reflect.ValueOf(wsCtx)}, values...), nil
  196. }
  197. // rpc.json
  198. //-----------------------------------------------------------------------------
  199. // rpc.http
  200. // convert from a function name to the http handler
  201. func makeHTTPHandler(rpcFunc *RPCFunc, logger log.Logger) func(http.ResponseWriter, *http.Request) {
  202. // Exception for websocket endpoints
  203. if rpcFunc.ws {
  204. return func(w http.ResponseWriter, r *http.Request) {
  205. WriteRPCResponseHTTPError(w, http.StatusMethodNotAllowed, types.NewRPCResponse("", nil, "This RPC method is only for websockets"))
  206. }
  207. }
  208. // All other endpoints
  209. return func(w http.ResponseWriter, r *http.Request) {
  210. logger.Debug("HTTP HANDLER", "req", r)
  211. args, err := httpParamsToArgs(rpcFunc, r)
  212. if err != nil {
  213. WriteRPCResponseHTTPError(w, http.StatusBadRequest, types.NewRPCResponse("", nil, fmt.Sprintf("Error converting http params to args: %v", err.Error())))
  214. return
  215. }
  216. returns := rpcFunc.f.Call(args)
  217. logger.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  218. result, err := unreflectResult(returns)
  219. if err != nil {
  220. WriteRPCResponseHTTPError(w, http.StatusInternalServerError, types.NewRPCResponse("", nil, err.Error()))
  221. return
  222. }
  223. WriteRPCResponseHTTP(w, types.NewRPCResponse("", result, ""))
  224. }
  225. }
  226. // Covert an http query to a list of properly typed values.
  227. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  228. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  229. values := make([]reflect.Value, len(rpcFunc.args))
  230. for i, name := range rpcFunc.argNames {
  231. argType := rpcFunc.args[i]
  232. values[i] = reflect.Zero(argType) // set default for that type
  233. arg := GetParam(r, name)
  234. // log.Notice("param to arg", "argType", argType, "name", name, "arg", arg)
  235. if "" == arg {
  236. continue
  237. }
  238. v, err, ok := nonJsonToArg(argType, arg)
  239. if err != nil {
  240. return nil, err
  241. }
  242. if ok {
  243. values[i] = v
  244. continue
  245. }
  246. values[i], err = _jsonStringToArg(argType, arg)
  247. if err != nil {
  248. return nil, err
  249. }
  250. }
  251. return values, nil
  252. }
  253. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  254. v := reflect.New(ty)
  255. err := json.Unmarshal([]byte(arg), v.Interface())
  256. if err != nil {
  257. return v, err
  258. }
  259. v = v.Elem()
  260. return v, nil
  261. }
  262. func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) {
  263. isQuotedString := strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`)
  264. isHexString := strings.HasPrefix(strings.ToLower(arg), "0x")
  265. expectingString := ty.Kind() == reflect.String
  266. expectingByteSlice := ty.Kind() == reflect.Slice && ty.Elem().Kind() == reflect.Uint8
  267. if isHexString {
  268. if !expectingString && !expectingByteSlice {
  269. err := errors.Errorf("Got a hex string arg, but expected '%s'",
  270. ty.Kind().String())
  271. return reflect.ValueOf(nil), err, false
  272. }
  273. var value []byte
  274. value, err := hex.DecodeString(arg[2:])
  275. if err != nil {
  276. return reflect.ValueOf(nil), err, false
  277. }
  278. if ty.Kind() == reflect.String {
  279. return reflect.ValueOf(string(value)), nil, true
  280. }
  281. return reflect.ValueOf([]byte(value)), nil, true
  282. }
  283. if isQuotedString && expectingByteSlice {
  284. v := reflect.New(reflect.TypeOf(""))
  285. err := json.Unmarshal([]byte(arg), v.Interface())
  286. if err != nil {
  287. return reflect.ValueOf(nil), err, false
  288. }
  289. v = v.Elem()
  290. return reflect.ValueOf([]byte(v.String())), nil, true
  291. }
  292. return reflect.ValueOf(nil), nil, false
  293. }
  294. // rpc.http
  295. //-----------------------------------------------------------------------------
  296. // rpc.websocket
  297. const (
  298. defaultWSWriteChanCapacity = 1000
  299. defaultWSWriteWait = 10 * time.Second
  300. defaultWSReadWait = 30 * time.Second
  301. defaultWSPingPeriod = (defaultWSReadWait * 9) / 10
  302. )
  303. // a single websocket connection
  304. // contains listener id, underlying ws connection,
  305. // and the event switch for subscribing to events
  306. type wsConnection struct {
  307. cmn.BaseService
  308. remoteAddr string
  309. baseConn *websocket.Conn
  310. writeChan chan types.RPCResponse
  311. funcMap map[string]*RPCFunc
  312. evsw events.EventSwitch
  313. // write channel capacity
  314. writeChanCapacity int
  315. // each write times out after this.
  316. writeWait time.Duration
  317. // Connection times out if we haven't received *anything* in this long, not even pings.
  318. readWait time.Duration
  319. // Send pings to server with this period. Must be less than readWait, but greater than zero.
  320. pingPeriod time.Duration
  321. }
  322. // NewWSConnection wraps websocket.Conn. See the commentary on the
  323. // func(*wsConnection) functions for a detailed description of how to configure
  324. // ping period and pong wait time.
  325. // NOTE: if the write buffer is full, pongs may be dropped, which may cause clients to disconnect.
  326. // see https://github.com/gorilla/websocket/issues/97
  327. func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection {
  328. wsc := &wsConnection{
  329. remoteAddr: baseConn.RemoteAddr().String(),
  330. baseConn: baseConn,
  331. funcMap: funcMap,
  332. evsw: evsw,
  333. writeWait: defaultWSWriteWait,
  334. writeChanCapacity: defaultWSWriteChanCapacity,
  335. readWait: defaultWSReadWait,
  336. pingPeriod: defaultWSPingPeriod,
  337. }
  338. for _, option := range options {
  339. option(wsc)
  340. }
  341. wsc.BaseService = *cmn.NewBaseService(nil, "wsConnection", wsc)
  342. return wsc
  343. }
  344. // WriteWait sets the amount of time to wait before a websocket write times out.
  345. // It should only be used in the constructor - not Goroutine-safe.
  346. func WriteWait(writeWait time.Duration) func(*wsConnection) {
  347. return func(wsc *wsConnection) {
  348. wsc.writeWait = writeWait
  349. }
  350. }
  351. // WriteChanCapacity sets the capacity of the websocket write channel.
  352. // It should only be used in the constructor - not Goroutine-safe.
  353. func WriteChanCapacity(cap int) func(*wsConnection) {
  354. return func(wsc *wsConnection) {
  355. wsc.writeChanCapacity = cap
  356. }
  357. }
  358. // ReadWait sets the amount of time to wait before a websocket read times out.
  359. // It should only be used in the constructor - not Goroutine-safe.
  360. func ReadWait(readWait time.Duration) func(*wsConnection) {
  361. return func(wsc *wsConnection) {
  362. wsc.readWait = readWait
  363. }
  364. }
  365. // PingPeriod sets the duration for sending websocket pings.
  366. // It should only be used in the constructor - not Goroutine-safe.
  367. func PingPeriod(pingPeriod time.Duration) func(*wsConnection) {
  368. return func(wsc *wsConnection) {
  369. wsc.pingPeriod = pingPeriod
  370. }
  371. }
  372. // OnStart starts the read and write routines. It blocks until the connection closes.
  373. func (wsc *wsConnection) OnStart() error {
  374. wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity)
  375. // Read subscriptions/unsubscriptions to events
  376. go wsc.readRoutine()
  377. // Write responses, BLOCKING.
  378. wsc.writeRoutine()
  379. return nil
  380. }
  381. // OnStop unsubscribes from all events.
  382. func (wsc *wsConnection) OnStop() {
  383. if wsc.evsw != nil {
  384. wsc.evsw.RemoveListener(wsc.remoteAddr)
  385. }
  386. // Both read and write loops close the websocket connection when they exit their loops.
  387. // The writeChan is never closed, to allow WriteRPCResponse() to fail.
  388. }
  389. // GetRemoteAddr returns the remote address of the underlying connection.
  390. // It implements WSRPCConnection
  391. func (wsc *wsConnection) GetRemoteAddr() string {
  392. return wsc.remoteAddr
  393. }
  394. // GetEventSwitch returns the event switch.
  395. // It implements WSRPCConnection
  396. func (wsc *wsConnection) GetEventSwitch() events.EventSwitch {
  397. return wsc.evsw
  398. }
  399. // WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
  400. // It implements WSRPCConnection. It is Goroutine-safe.
  401. func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
  402. select {
  403. case <-wsc.Quit:
  404. return
  405. case wsc.writeChan <- resp:
  406. }
  407. }
  408. // TryWriteRPCResponse attempts to push a response to the writeChan, but does not block.
  409. // It implements WSRPCConnection. It is Goroutine-safe
  410. func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
  411. select {
  412. case <-wsc.Quit:
  413. return false
  414. case wsc.writeChan <- resp:
  415. return true
  416. default:
  417. return false
  418. }
  419. }
  420. // Read from the socket and subscribe to or unsubscribe from events
  421. func (wsc *wsConnection) readRoutine() {
  422. defer func() {
  423. wsc.baseConn.Close()
  424. }()
  425. for {
  426. select {
  427. case <-wsc.Quit:
  428. return
  429. default:
  430. // reset deadline for every type of message (control or data)
  431. wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait))
  432. var in []byte
  433. _, in, err := wsc.baseConn.ReadMessage()
  434. if err != nil {
  435. if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
  436. wsc.Logger.Info("Client closed the connection")
  437. } else {
  438. wsc.Logger.Error("Failed to read request", "err", err)
  439. }
  440. wsc.Stop()
  441. return
  442. }
  443. var request types.RPCRequest
  444. err = json.Unmarshal(in, &request)
  445. if err != nil {
  446. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  447. wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, errStr))
  448. continue
  449. }
  450. // Now, fetch the RPCFunc and execute it.
  451. rpcFunc := wsc.funcMap[request.Method]
  452. if rpcFunc == nil {
  453. wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  454. continue
  455. }
  456. var args []reflect.Value
  457. if rpcFunc.ws {
  458. wsCtx := types.WSRPCContext{Request: request, WSRPCConnection: wsc}
  459. args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx)
  460. } else {
  461. args, err = jsonParamsToArgsRPC(rpcFunc, request.Params)
  462. }
  463. if err != nil {
  464. wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, err.Error()))
  465. continue
  466. }
  467. returns := rpcFunc.f.Call(args)
  468. // TODO: Need to encode args/returns to string if we want to log them
  469. wsc.Logger.Info("WSJSONRPC", "method", request.Method)
  470. result, err := unreflectResult(returns)
  471. if err != nil {
  472. wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, err.Error()))
  473. continue
  474. } else {
  475. wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, result, ""))
  476. continue
  477. }
  478. }
  479. }
  480. }
  481. // receives on a write channel and writes out on the socket
  482. func (wsc *wsConnection) writeRoutine() {
  483. pingTicker := time.NewTicker(wsc.pingPeriod)
  484. defer func() {
  485. pingTicker.Stop()
  486. wsc.baseConn.Close()
  487. }()
  488. // https://github.com/gorilla/websocket/issues/97
  489. pongs := make(chan string, 1)
  490. wsc.baseConn.SetPingHandler(func(m string) error {
  491. select {
  492. case pongs <- m:
  493. default:
  494. }
  495. return nil
  496. })
  497. for {
  498. select {
  499. case m := <-pongs:
  500. err := wsc.writeMessageWithDeadline(websocket.PongMessage, []byte(m))
  501. if err != nil {
  502. wsc.Logger.Info("Failed to write pong (client may disconnect)", "err", err)
  503. }
  504. case <-pingTicker.C:
  505. err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{})
  506. if err != nil {
  507. wsc.Logger.Error("Failed to write ping", "err", err)
  508. wsc.Stop()
  509. return
  510. }
  511. case msg := <-wsc.writeChan:
  512. jsonBytes, err := json.MarshalIndent(msg, "", " ")
  513. if err != nil {
  514. wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err)
  515. } else {
  516. if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil {
  517. wsc.Logger.Error("Failed to write response", "err", err)
  518. wsc.Stop()
  519. return
  520. }
  521. }
  522. case <-wsc.Quit:
  523. return
  524. }
  525. }
  526. }
  527. // All writes to the websocket must (re)set the write deadline.
  528. // If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553)
  529. func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error {
  530. wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait))
  531. return wsc.baseConn.WriteMessage(msgType, msg)
  532. }
  533. //----------------------------------------
  534. // WebsocketManager is the main manager for all websocket connections.
  535. // It holds the event switch and a map of functions for routing.
  536. // NOTE: The websocket path is defined externally, e.g. in node/node.go
  537. type WebsocketManager struct {
  538. websocket.Upgrader
  539. funcMap map[string]*RPCFunc
  540. evsw events.EventSwitch
  541. logger log.Logger
  542. wsConnOptions []func(*wsConnection)
  543. }
  544. // NewWebsocketManager returns a new WebsocketManager that routes according to the given funcMap, listens on the given event switch,
  545. // and connects to the server with the given connection options.
  546. func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, wsConnOptions ...func(*wsConnection)) *WebsocketManager {
  547. return &WebsocketManager{
  548. funcMap: funcMap,
  549. evsw: evsw,
  550. Upgrader: websocket.Upgrader{
  551. CheckOrigin: func(r *http.Request) bool {
  552. // TODO ???
  553. return true
  554. },
  555. },
  556. logger: log.NewNopLogger(),
  557. wsConnOptions: wsConnOptions,
  558. }
  559. }
  560. // SetLogger sets the logger.
  561. func (wm *WebsocketManager) SetLogger(l log.Logger) {
  562. wm.logger = l
  563. }
  564. // WebsocketHandler upgrades the request/response (via http.Hijack) and starts the wsConnection.
  565. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
  566. wsConn, err := wm.Upgrade(w, r, nil)
  567. if err != nil {
  568. // TODO - return http error
  569. wm.logger.Error("Failed to upgrade to websocket connection", "err", err)
  570. return
  571. }
  572. // register connection
  573. con := NewWSConnection(wsConn, wm.funcMap, wm.evsw, wm.wsConnOptions...)
  574. con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr()))
  575. wm.logger.Info("New websocket connection", "remote", con.remoteAddr)
  576. con.Start() // Blocking
  577. }
  578. // rpc.websocket
  579. //-----------------------------------------------------------------------------
  580. // NOTE: assume returns is result struct and error. If error is not nil, return it
  581. func unreflectResult(returns []reflect.Value) (interface{}, error) {
  582. errV := returns[1]
  583. if errV.Interface() != nil {
  584. return nil, errors.Errorf("%v", errV.Interface())
  585. }
  586. rv := returns[0]
  587. // the result is a registered interface,
  588. // we need a pointer to it so we can marshal with type byte
  589. rvp := reflect.New(rv.Type())
  590. rvp.Elem().Set(rv)
  591. return rvp.Interface(), nil
  592. }
  593. // writes a list of available rpc endpoints as an html page
  594. func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
  595. noArgNames := []string{}
  596. argNames := []string{}
  597. for name, funcData := range funcMap {
  598. if len(funcData.args) == 0 {
  599. noArgNames = append(noArgNames, name)
  600. } else {
  601. argNames = append(argNames, name)
  602. }
  603. }
  604. sort.Strings(noArgNames)
  605. sort.Strings(argNames)
  606. buf := new(bytes.Buffer)
  607. buf.WriteString("<html><body>")
  608. buf.WriteString("<br>Available endpoints:<br>")
  609. for _, name := range noArgNames {
  610. link := fmt.Sprintf("http://%s/%s", r.Host, name)
  611. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  612. }
  613. buf.WriteString("<br>Endpoints that require arguments:<br>")
  614. for _, name := range argNames {
  615. link := fmt.Sprintf("http://%s/%s?", r.Host, name)
  616. funcData := funcMap[name]
  617. for i, argName := range funcData.argNames {
  618. link += argName + "=_"
  619. if i < len(funcData.argNames)-1 {
  620. link += "&"
  621. }
  622. }
  623. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  624. }
  625. buf.WriteString("</body></html>")
  626. w.Header().Set("Content-Type", "text/html")
  627. w.WriteHeader(200)
  628. w.Write(buf.Bytes())
  629. }