You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

721 lines
22 KiB

8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
8 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
8 years ago
7 years ago
8 years ago
  1. package rpcserver
  2. import (
  3. "bytes"
  4. "encoding/hex"
  5. "encoding/json"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "reflect"
  10. "sort"
  11. "strings"
  12. "time"
  13. "github.com/gorilla/websocket"
  14. "github.com/pkg/errors"
  15. types "github.com/tendermint/tendermint/rpc/lib/types"
  16. cmn "github.com/tendermint/tmlibs/common"
  17. events "github.com/tendermint/tmlibs/events"
  18. "github.com/tendermint/tmlibs/log"
  19. )
  20. // RegisterRPCFuncs adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions.
  21. // "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse
  22. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger log.Logger) {
  23. // HTTP endpoints
  24. for funcName, rpcFunc := range funcMap {
  25. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc, logger))
  26. }
  27. // JSONRPC endpoints
  28. mux.HandleFunc("/", makeJSONRPCHandler(funcMap, logger))
  29. }
  30. //-------------------------------------
  31. // function introspection
  32. // RPCFunc contains the introspected type information for a function
  33. type RPCFunc struct {
  34. f reflect.Value // underlying rpc function
  35. args []reflect.Type // type of each function arg
  36. returns []reflect.Type // type of each return arg
  37. argNames []string // name of each argument
  38. ws bool // websocket only
  39. }
  40. // NewRPCFunc wraps a function for introspection.
  41. // f is the function, args are comma separated argument names
  42. func NewRPCFunc(f interface{}, args string) *RPCFunc {
  43. return newRPCFunc(f, args, false)
  44. }
  45. // NewWSRPCFunc wraps a function for introspection and use in the websockets.
  46. func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
  47. return newRPCFunc(f, args, true)
  48. }
  49. func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc {
  50. var argNames []string
  51. if args != "" {
  52. argNames = strings.Split(args, ",")
  53. }
  54. return &RPCFunc{
  55. f: reflect.ValueOf(f),
  56. args: funcArgTypes(f),
  57. returns: funcReturnTypes(f),
  58. argNames: argNames,
  59. ws: ws,
  60. }
  61. }
  62. // return a function's argument types
  63. func funcArgTypes(f interface{}) []reflect.Type {
  64. t := reflect.TypeOf(f)
  65. n := t.NumIn()
  66. typez := make([]reflect.Type, n)
  67. for i := 0; i < n; i++ {
  68. typez[i] = t.In(i)
  69. }
  70. return typez
  71. }
  72. // return a function's return types
  73. func funcReturnTypes(f interface{}) []reflect.Type {
  74. t := reflect.TypeOf(f)
  75. n := t.NumOut()
  76. typez := make([]reflect.Type, n)
  77. for i := 0; i < n; i++ {
  78. typez[i] = t.Out(i)
  79. }
  80. return typez
  81. }
  82. // function introspection
  83. //-----------------------------------------------------------------------------
  84. // rpc.json
  85. // jsonrpc calls grab the given method's function info and runs reflect.Call
  86. func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.HandlerFunc {
  87. return func(w http.ResponseWriter, r *http.Request) {
  88. b, _ := ioutil.ReadAll(r.Body)
  89. // if its an empty request (like from a browser),
  90. // just display a list of functions
  91. if len(b) == 0 {
  92. writeListOfEndpoints(w, r, funcMap)
  93. return
  94. }
  95. var request types.RPCRequest
  96. err := json.Unmarshal(b, &request)
  97. if err != nil {
  98. WriteRPCResponseHTTP(w, types.RPCParseError("", errors.Wrap(err, "Error unmarshalling request")))
  99. return
  100. }
  101. // A Notification is a Request object without an "id" member.
  102. // The Server MUST NOT reply to a Notification, including those that are within a batch request.
  103. if request.ID == "" {
  104. return
  105. }
  106. if len(r.URL.Path) > 1 {
  107. WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(request.ID, errors.Errorf("Path %s is invalid", r.URL.Path)))
  108. return
  109. }
  110. rpcFunc := funcMap[request.Method]
  111. if rpcFunc == nil || rpcFunc.ws {
  112. WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(request.ID))
  113. return
  114. }
  115. args, err := jsonParamsToArgsRPC(rpcFunc, request.Params)
  116. if err != nil {
  117. WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "Error converting json params to arguments")))
  118. return
  119. }
  120. returns := rpcFunc.f.Call(args)
  121. logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  122. result, err := unreflectResult(returns)
  123. if err != nil {
  124. WriteRPCResponseHTTP(w, types.RPCInternalError(request.ID, err))
  125. return
  126. }
  127. WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(request.ID, result))
  128. }
  129. }
  130. func mapParamsToArgs(rpcFunc *RPCFunc, params map[string]*json.RawMessage, argsOffset int) ([]reflect.Value, error) {
  131. values := make([]reflect.Value, len(rpcFunc.argNames))
  132. for i, argName := range rpcFunc.argNames {
  133. argType := rpcFunc.args[i+argsOffset]
  134. if p, ok := params[argName]; ok && p != nil && len(*p) > 0 {
  135. val := reflect.New(argType)
  136. err := json.Unmarshal(*p, val.Interface())
  137. if err != nil {
  138. return nil, err
  139. }
  140. values[i] = val.Elem()
  141. } else { // use default for that type
  142. values[i] = reflect.Zero(argType)
  143. }
  144. }
  145. return values, nil
  146. }
  147. func arrayParamsToArgs(rpcFunc *RPCFunc, params []*json.RawMessage, argsOffset int) ([]reflect.Value, error) {
  148. if len(rpcFunc.argNames) != len(params) {
  149. return nil, errors.Errorf("Expected %v parameters (%v), got %v (%v)",
  150. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params)
  151. }
  152. values := make([]reflect.Value, len(params))
  153. for i, p := range params {
  154. argType := rpcFunc.args[i+argsOffset]
  155. val := reflect.New(argType)
  156. err := json.Unmarshal(*p, val.Interface())
  157. if err != nil {
  158. return nil, err
  159. }
  160. values[i] = val.Elem()
  161. }
  162. return values, nil
  163. }
  164. // raw is unparsed json (from json.RawMessage) encoding either a map or an array.
  165. //
  166. // argsOffset should be 0 for RPC calls, and 1 for WS requests, where len(rpcFunc.args) != len(rpcFunc.argNames).
  167. // Example:
  168. // rpcFunc.args = [rpctypes.WSRPCContext string]
  169. // rpcFunc.argNames = ["arg"]
  170. func jsonParamsToArgs(rpcFunc *RPCFunc, raw []byte, argsOffset int) ([]reflect.Value, error) {
  171. // first, try to get the map..
  172. var m map[string]*json.RawMessage
  173. err := json.Unmarshal(raw, &m)
  174. if err == nil {
  175. return mapParamsToArgs(rpcFunc, m, argsOffset)
  176. }
  177. // otherwise, try an array
  178. var a []*json.RawMessage
  179. err = json.Unmarshal(raw, &a)
  180. if err == nil {
  181. return arrayParamsToArgs(rpcFunc, a, argsOffset)
  182. }
  183. // otherwise, bad format, we cannot parse
  184. return nil, errors.Errorf("Unknown type for JSON params: %v. Expected map or array", err)
  185. }
  186. // Convert a []interface{} OR a map[string]interface{} to properly typed values
  187. func jsonParamsToArgsRPC(rpcFunc *RPCFunc, params *json.RawMessage) ([]reflect.Value, error) {
  188. return jsonParamsToArgs(rpcFunc, *params, 0)
  189. }
  190. // Same as above, but with the first param the websocket connection
  191. func jsonParamsToArgsWS(rpcFunc *RPCFunc, params *json.RawMessage, wsCtx types.WSRPCContext) ([]reflect.Value, error) {
  192. values, err := jsonParamsToArgs(rpcFunc, *params, 1)
  193. if err != nil {
  194. return nil, err
  195. }
  196. return append([]reflect.Value{reflect.ValueOf(wsCtx)}, values...), nil
  197. }
  198. // rpc.json
  199. //-----------------------------------------------------------------------------
  200. // rpc.http
  201. // convert from a function name to the http handler
  202. func makeHTTPHandler(rpcFunc *RPCFunc, logger log.Logger) func(http.ResponseWriter, *http.Request) {
  203. // Exception for websocket endpoints
  204. if rpcFunc.ws {
  205. return func(w http.ResponseWriter, r *http.Request) {
  206. WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(""))
  207. }
  208. }
  209. // All other endpoints
  210. return func(w http.ResponseWriter, r *http.Request) {
  211. logger.Debug("HTTP HANDLER", "req", r)
  212. args, err := httpParamsToArgs(rpcFunc, r)
  213. if err != nil {
  214. WriteRPCResponseHTTP(w, types.RPCInvalidParamsError("", errors.Wrap(err, "Error converting http params to arguments")))
  215. return
  216. }
  217. returns := rpcFunc.f.Call(args)
  218. logger.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  219. result, err := unreflectResult(returns)
  220. if err != nil {
  221. WriteRPCResponseHTTP(w, types.RPCInternalError("", err))
  222. return
  223. }
  224. WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse("", result))
  225. }
  226. }
  227. // Covert an http query to a list of properly typed values.
  228. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  229. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  230. values := make([]reflect.Value, len(rpcFunc.args))
  231. for i, name := range rpcFunc.argNames {
  232. argType := rpcFunc.args[i]
  233. values[i] = reflect.Zero(argType) // set default for that type
  234. arg := GetParam(r, name)
  235. // log.Notice("param to arg", "argType", argType, "name", name, "arg", arg)
  236. if "" == arg {
  237. continue
  238. }
  239. v, err, ok := nonJsonToArg(argType, arg)
  240. if err != nil {
  241. return nil, err
  242. }
  243. if ok {
  244. values[i] = v
  245. continue
  246. }
  247. values[i], err = _jsonStringToArg(argType, arg)
  248. if err != nil {
  249. return nil, err
  250. }
  251. }
  252. return values, nil
  253. }
  254. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  255. v := reflect.New(ty)
  256. err := json.Unmarshal([]byte(arg), v.Interface())
  257. if err != nil {
  258. return v, err
  259. }
  260. v = v.Elem()
  261. return v, nil
  262. }
  263. func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) {
  264. isQuotedString := strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`)
  265. isHexString := strings.HasPrefix(strings.ToLower(arg), "0x")
  266. expectingString := ty.Kind() == reflect.String
  267. expectingByteSlice := ty.Kind() == reflect.Slice && ty.Elem().Kind() == reflect.Uint8
  268. if isHexString {
  269. if !expectingString && !expectingByteSlice {
  270. err := errors.Errorf("Got a hex string arg, but expected '%s'",
  271. ty.Kind().String())
  272. return reflect.ValueOf(nil), err, false
  273. }
  274. var value []byte
  275. value, err := hex.DecodeString(arg[2:])
  276. if err != nil {
  277. return reflect.ValueOf(nil), err, false
  278. }
  279. if ty.Kind() == reflect.String {
  280. return reflect.ValueOf(string(value)), nil, true
  281. }
  282. return reflect.ValueOf([]byte(value)), nil, true
  283. }
  284. if isQuotedString && expectingByteSlice {
  285. v := reflect.New(reflect.TypeOf(""))
  286. err := json.Unmarshal([]byte(arg), v.Interface())
  287. if err != nil {
  288. return reflect.ValueOf(nil), err, false
  289. }
  290. v = v.Elem()
  291. return reflect.ValueOf([]byte(v.String())), nil, true
  292. }
  293. return reflect.ValueOf(nil), nil, false
  294. }
  295. // rpc.http
  296. //-----------------------------------------------------------------------------
  297. // rpc.websocket
  298. const (
  299. defaultWSWriteChanCapacity = 1000
  300. defaultWSWriteWait = 10 * time.Second
  301. defaultWSReadWait = 30 * time.Second
  302. defaultWSPingPeriod = (defaultWSReadWait * 9) / 10
  303. )
  304. // a single websocket connection
  305. // contains listener id, underlying ws connection,
  306. // and the event switch for subscribing to events
  307. type wsConnection struct {
  308. cmn.BaseService
  309. remoteAddr string
  310. baseConn *websocket.Conn
  311. writeChan chan types.RPCResponse
  312. funcMap map[string]*RPCFunc
  313. evsw events.EventSwitch
  314. // write channel capacity
  315. writeChanCapacity int
  316. // each write times out after this.
  317. writeWait time.Duration
  318. // Connection times out if we haven't received *anything* in this long, not even pings.
  319. readWait time.Duration
  320. // Send pings to server with this period. Must be less than readWait, but greater than zero.
  321. pingPeriod time.Duration
  322. }
  323. // NewWSConnection wraps websocket.Conn. See the commentary on the
  324. // func(*wsConnection) functions for a detailed description of how to configure
  325. // ping period and pong wait time.
  326. // NOTE: if the write buffer is full, pongs may be dropped, which may cause clients to disconnect.
  327. // see https://github.com/gorilla/websocket/issues/97
  328. func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection {
  329. wsc := &wsConnection{
  330. remoteAddr: baseConn.RemoteAddr().String(),
  331. baseConn: baseConn,
  332. funcMap: funcMap,
  333. evsw: evsw,
  334. writeWait: defaultWSWriteWait,
  335. writeChanCapacity: defaultWSWriteChanCapacity,
  336. readWait: defaultWSReadWait,
  337. pingPeriod: defaultWSPingPeriod,
  338. }
  339. for _, option := range options {
  340. option(wsc)
  341. }
  342. wsc.BaseService = *cmn.NewBaseService(nil, "wsConnection", wsc)
  343. return wsc
  344. }
  345. // WriteWait sets the amount of time to wait before a websocket write times out.
  346. // It should only be used in the constructor - not Goroutine-safe.
  347. func WriteWait(writeWait time.Duration) func(*wsConnection) {
  348. return func(wsc *wsConnection) {
  349. wsc.writeWait = writeWait
  350. }
  351. }
  352. // WriteChanCapacity sets the capacity of the websocket write channel.
  353. // It should only be used in the constructor - not Goroutine-safe.
  354. func WriteChanCapacity(cap int) func(*wsConnection) {
  355. return func(wsc *wsConnection) {
  356. wsc.writeChanCapacity = cap
  357. }
  358. }
  359. // ReadWait sets the amount of time to wait before a websocket read times out.
  360. // It should only be used in the constructor - not Goroutine-safe.
  361. func ReadWait(readWait time.Duration) func(*wsConnection) {
  362. return func(wsc *wsConnection) {
  363. wsc.readWait = readWait
  364. }
  365. }
  366. // PingPeriod sets the duration for sending websocket pings.
  367. // It should only be used in the constructor - not Goroutine-safe.
  368. func PingPeriod(pingPeriod time.Duration) func(*wsConnection) {
  369. return func(wsc *wsConnection) {
  370. wsc.pingPeriod = pingPeriod
  371. }
  372. }
  373. // OnStart starts the read and write routines. It blocks until the connection closes.
  374. func (wsc *wsConnection) OnStart() error {
  375. wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity)
  376. // Read subscriptions/unsubscriptions to events
  377. go wsc.readRoutine()
  378. // Write responses, BLOCKING.
  379. wsc.writeRoutine()
  380. return nil
  381. }
  382. // OnStop unsubscribes from all events.
  383. func (wsc *wsConnection) OnStop() {
  384. if wsc.evsw != nil {
  385. wsc.evsw.RemoveListener(wsc.remoteAddr)
  386. }
  387. // Both read and write loops close the websocket connection when they exit their loops.
  388. // The writeChan is never closed, to allow WriteRPCResponse() to fail.
  389. }
  390. // GetRemoteAddr returns the remote address of the underlying connection.
  391. // It implements WSRPCConnection
  392. func (wsc *wsConnection) GetRemoteAddr() string {
  393. return wsc.remoteAddr
  394. }
  395. // GetEventSwitch returns the event switch.
  396. // It implements WSRPCConnection
  397. func (wsc *wsConnection) GetEventSwitch() events.EventSwitch {
  398. return wsc.evsw
  399. }
  400. // WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
  401. // It implements WSRPCConnection. It is Goroutine-safe.
  402. func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
  403. select {
  404. case <-wsc.Quit:
  405. return
  406. case wsc.writeChan <- resp:
  407. }
  408. }
  409. // TryWriteRPCResponse attempts to push a response to the writeChan, but does not block.
  410. // It implements WSRPCConnection. It is Goroutine-safe
  411. func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
  412. select {
  413. case <-wsc.Quit:
  414. return false
  415. case wsc.writeChan <- resp:
  416. return true
  417. default:
  418. return false
  419. }
  420. }
  421. // Read from the socket and subscribe to or unsubscribe from events
  422. func (wsc *wsConnection) readRoutine() {
  423. defer func() {
  424. wsc.baseConn.Close()
  425. }()
  426. for {
  427. select {
  428. case <-wsc.Quit:
  429. return
  430. default:
  431. // reset deadline for every type of message (control or data)
  432. wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait))
  433. var in []byte
  434. _, in, err := wsc.baseConn.ReadMessage()
  435. if err != nil {
  436. if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
  437. wsc.Logger.Info("Client closed the connection")
  438. } else {
  439. wsc.Logger.Error("Failed to read request", "err", err)
  440. }
  441. wsc.Stop()
  442. return
  443. }
  444. var request types.RPCRequest
  445. err = json.Unmarshal(in, &request)
  446. if err != nil {
  447. wsc.WriteRPCResponse(types.RPCParseError("", errors.Wrap(err, "Error unmarshaling request")))
  448. continue
  449. }
  450. // A Notification is a Request object without an "id" member.
  451. // The Server MUST NOT reply to a Notification, including those that are within a batch request.
  452. if request.ID == "" {
  453. continue
  454. }
  455. // Now, fetch the RPCFunc and execute it.
  456. rpcFunc := wsc.funcMap[request.Method]
  457. if rpcFunc == nil {
  458. wsc.WriteRPCResponse(types.RPCMethodNotFoundError(request.ID))
  459. continue
  460. }
  461. var args []reflect.Value
  462. if rpcFunc.ws {
  463. wsCtx := types.WSRPCContext{Request: request, WSRPCConnection: wsc}
  464. args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx)
  465. } else {
  466. args, err = jsonParamsToArgsRPC(rpcFunc, request.Params)
  467. }
  468. if err != nil {
  469. wsc.WriteRPCResponse(types.RPCInternalError(request.ID, errors.Wrap(err, "Error converting json params to arguments")))
  470. continue
  471. }
  472. returns := rpcFunc.f.Call(args)
  473. // TODO: Need to encode args/returns to string if we want to log them
  474. wsc.Logger.Info("WSJSONRPC", "method", request.Method)
  475. result, err := unreflectResult(returns)
  476. if err != nil {
  477. wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err))
  478. continue
  479. } else {
  480. wsc.WriteRPCResponse(types.NewRPCSuccessResponse(request.ID, result))
  481. continue
  482. }
  483. }
  484. }
  485. }
  486. // receives on a write channel and writes out on the socket
  487. func (wsc *wsConnection) writeRoutine() {
  488. pingTicker := time.NewTicker(wsc.pingPeriod)
  489. defer func() {
  490. pingTicker.Stop()
  491. wsc.baseConn.Close()
  492. }()
  493. // https://github.com/gorilla/websocket/issues/97
  494. pongs := make(chan string, 1)
  495. wsc.baseConn.SetPingHandler(func(m string) error {
  496. select {
  497. case pongs <- m:
  498. default:
  499. }
  500. return nil
  501. })
  502. for {
  503. select {
  504. case m := <-pongs:
  505. err := wsc.writeMessageWithDeadline(websocket.PongMessage, []byte(m))
  506. if err != nil {
  507. wsc.Logger.Info("Failed to write pong (client may disconnect)", "err", err)
  508. }
  509. case <-pingTicker.C:
  510. err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{})
  511. if err != nil {
  512. wsc.Logger.Error("Failed to write ping", "err", err)
  513. wsc.Stop()
  514. return
  515. }
  516. case msg := <-wsc.writeChan:
  517. jsonBytes, err := json.MarshalIndent(msg, "", " ")
  518. if err != nil {
  519. wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err)
  520. } else {
  521. if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil {
  522. wsc.Logger.Error("Failed to write response", "err", err)
  523. wsc.Stop()
  524. return
  525. }
  526. }
  527. case <-wsc.Quit:
  528. return
  529. }
  530. }
  531. }
  532. // All writes to the websocket must (re)set the write deadline.
  533. // If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553)
  534. func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error {
  535. wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait))
  536. return wsc.baseConn.WriteMessage(msgType, msg)
  537. }
  538. //----------------------------------------
  539. // WebsocketManager is the main manager for all websocket connections.
  540. // It holds the event switch and a map of functions for routing.
  541. // NOTE: The websocket path is defined externally, e.g. in node/node.go
  542. type WebsocketManager struct {
  543. websocket.Upgrader
  544. funcMap map[string]*RPCFunc
  545. evsw events.EventSwitch
  546. logger log.Logger
  547. wsConnOptions []func(*wsConnection)
  548. }
  549. // NewWebsocketManager returns a new WebsocketManager that routes according to the given funcMap, listens on the given event switch,
  550. // and connects to the server with the given connection options.
  551. func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, wsConnOptions ...func(*wsConnection)) *WebsocketManager {
  552. return &WebsocketManager{
  553. funcMap: funcMap,
  554. evsw: evsw,
  555. Upgrader: websocket.Upgrader{
  556. CheckOrigin: func(r *http.Request) bool {
  557. // TODO ???
  558. return true
  559. },
  560. },
  561. logger: log.NewNopLogger(),
  562. wsConnOptions: wsConnOptions,
  563. }
  564. }
  565. // SetLogger sets the logger.
  566. func (wm *WebsocketManager) SetLogger(l log.Logger) {
  567. wm.logger = l
  568. }
  569. // WebsocketHandler upgrades the request/response (via http.Hijack) and starts the wsConnection.
  570. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
  571. wsConn, err := wm.Upgrade(w, r, nil)
  572. if err != nil {
  573. // TODO - return http error
  574. wm.logger.Error("Failed to upgrade to websocket connection", "err", err)
  575. return
  576. }
  577. // register connection
  578. con := NewWSConnection(wsConn, wm.funcMap, wm.evsw, wm.wsConnOptions...)
  579. con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr()))
  580. wm.logger.Info("New websocket connection", "remote", con.remoteAddr)
  581. con.Start() // Blocking
  582. }
  583. // rpc.websocket
  584. //-----------------------------------------------------------------------------
  585. // NOTE: assume returns is result struct and error. If error is not nil, return it
  586. func unreflectResult(returns []reflect.Value) (interface{}, error) {
  587. errV := returns[1]
  588. if errV.Interface() != nil {
  589. return nil, errors.Errorf("%v", errV.Interface())
  590. }
  591. rv := returns[0]
  592. // the result is a registered interface,
  593. // we need a pointer to it so we can marshal with type byte
  594. rvp := reflect.New(rv.Type())
  595. rvp.Elem().Set(rv)
  596. return rvp.Interface(), nil
  597. }
  598. // writes a list of available rpc endpoints as an html page
  599. func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
  600. noArgNames := []string{}
  601. argNames := []string{}
  602. for name, funcData := range funcMap {
  603. if len(funcData.args) == 0 {
  604. noArgNames = append(noArgNames, name)
  605. } else {
  606. argNames = append(argNames, name)
  607. }
  608. }
  609. sort.Strings(noArgNames)
  610. sort.Strings(argNames)
  611. buf := new(bytes.Buffer)
  612. buf.WriteString("<html><body>")
  613. buf.WriteString("<br>Available endpoints:<br>")
  614. for _, name := range noArgNames {
  615. link := fmt.Sprintf("http://%s/%s", r.Host, name)
  616. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  617. }
  618. buf.WriteString("<br>Endpoints that require arguments:<br>")
  619. for _, name := range argNames {
  620. link := fmt.Sprintf("http://%s/%s?", r.Host, name)
  621. funcData := funcMap[name]
  622. for i, argName := range funcData.argNames {
  623. link += argName + "=_"
  624. if i < len(funcData.argNames)-1 {
  625. link += "&"
  626. }
  627. }
  628. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  629. }
  630. buf.WriteString("</body></html>")
  631. w.Header().Set("Content-Type", "text/html")
  632. w.WriteHeader(200)
  633. w.Write(buf.Bytes())
  634. }