You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

745 lines
22 KiB

8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
8 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
8 years ago
7 years ago
8 years ago
  1. package rpcserver
  2. import (
  3. "bytes"
  4. "encoding/hex"
  5. "encoding/json"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "reflect"
  10. "runtime/debug"
  11. "sort"
  12. "strings"
  13. "time"
  14. "github.com/gorilla/websocket"
  15. "github.com/pkg/errors"
  16. types "github.com/tendermint/tendermint/rpc/lib/types"
  17. cmn "github.com/tendermint/tmlibs/common"
  18. events "github.com/tendermint/tmlibs/events"
  19. "github.com/tendermint/tmlibs/log"
  20. )
  21. // RegisterRPCFuncs adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions.
  22. // "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse
  23. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger log.Logger) {
  24. // HTTP endpoints
  25. for funcName, rpcFunc := range funcMap {
  26. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc, logger))
  27. }
  28. // JSONRPC endpoints
  29. mux.HandleFunc("/", makeJSONRPCHandler(funcMap, logger))
  30. }
  31. //-------------------------------------
  32. // function introspection
  33. // RPCFunc contains the introspected type information for a function
  34. type RPCFunc struct {
  35. f reflect.Value // underlying rpc function
  36. args []reflect.Type // type of each function arg
  37. returns []reflect.Type // type of each return arg
  38. argNames []string // name of each argument
  39. ws bool // websocket only
  40. }
  41. // NewRPCFunc wraps a function for introspection.
  42. // f is the function, args are comma separated argument names
  43. func NewRPCFunc(f interface{}, args string) *RPCFunc {
  44. return newRPCFunc(f, args, false)
  45. }
  46. // NewWSRPCFunc wraps a function for introspection and use in the websockets.
  47. func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
  48. return newRPCFunc(f, args, true)
  49. }
  50. func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc {
  51. var argNames []string
  52. if args != "" {
  53. argNames = strings.Split(args, ",")
  54. }
  55. return &RPCFunc{
  56. f: reflect.ValueOf(f),
  57. args: funcArgTypes(f),
  58. returns: funcReturnTypes(f),
  59. argNames: argNames,
  60. ws: ws,
  61. }
  62. }
  63. // return a function's argument types
  64. func funcArgTypes(f interface{}) []reflect.Type {
  65. t := reflect.TypeOf(f)
  66. n := t.NumIn()
  67. typez := make([]reflect.Type, n)
  68. for i := 0; i < n; i++ {
  69. typez[i] = t.In(i)
  70. }
  71. return typez
  72. }
  73. // return a function's return types
  74. func funcReturnTypes(f interface{}) []reflect.Type {
  75. t := reflect.TypeOf(f)
  76. n := t.NumOut()
  77. typez := make([]reflect.Type, n)
  78. for i := 0; i < n; i++ {
  79. typez[i] = t.Out(i)
  80. }
  81. return typez
  82. }
  83. // function introspection
  84. //-----------------------------------------------------------------------------
  85. // rpc.json
  86. // jsonrpc calls grab the given method's function info and runs reflect.Call
  87. func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.HandlerFunc {
  88. return func(w http.ResponseWriter, r *http.Request) {
  89. b, _ := ioutil.ReadAll(r.Body)
  90. // if its an empty request (like from a browser),
  91. // just display a list of functions
  92. if len(b) == 0 {
  93. writeListOfEndpoints(w, r, funcMap)
  94. return
  95. }
  96. var request types.RPCRequest
  97. err := json.Unmarshal(b, &request)
  98. if err != nil {
  99. WriteRPCResponseHTTP(w, types.RPCParseError("", errors.Wrap(err, "Error unmarshalling request")))
  100. return
  101. }
  102. // A Notification is a Request object without an "id" member.
  103. // The Server MUST NOT reply to a Notification, including those that are within a batch request.
  104. if request.ID == "" {
  105. logger.Debug("HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)")
  106. return
  107. }
  108. if len(r.URL.Path) > 1 {
  109. WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(request.ID, errors.Errorf("Path %s is invalid", r.URL.Path)))
  110. return
  111. }
  112. rpcFunc := funcMap[request.Method]
  113. if rpcFunc == nil || rpcFunc.ws {
  114. WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(request.ID))
  115. return
  116. }
  117. var args []reflect.Value
  118. if len(request.Params) > 0 {
  119. args, err = jsonParamsToArgsRPC(rpcFunc, request.Params)
  120. if err != nil {
  121. WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "Error converting json params to arguments")))
  122. return
  123. }
  124. }
  125. returns := rpcFunc.f.Call(args)
  126. logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  127. result, err := unreflectResult(returns)
  128. if err != nil {
  129. WriteRPCResponseHTTP(w, types.RPCInternalError(request.ID, err))
  130. return
  131. }
  132. WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(request.ID, result))
  133. }
  134. }
  135. func mapParamsToArgs(rpcFunc *RPCFunc, params map[string]*json.RawMessage, argsOffset int) ([]reflect.Value, error) {
  136. values := make([]reflect.Value, len(rpcFunc.argNames))
  137. for i, argName := range rpcFunc.argNames {
  138. argType := rpcFunc.args[i+argsOffset]
  139. if p, ok := params[argName]; ok && p != nil && len(*p) > 0 {
  140. val := reflect.New(argType)
  141. err := json.Unmarshal(*p, val.Interface())
  142. if err != nil {
  143. return nil, err
  144. }
  145. values[i] = val.Elem()
  146. } else { // use default for that type
  147. values[i] = reflect.Zero(argType)
  148. }
  149. }
  150. return values, nil
  151. }
  152. func arrayParamsToArgs(rpcFunc *RPCFunc, params []*json.RawMessage, argsOffset int) ([]reflect.Value, error) {
  153. if len(rpcFunc.argNames) != len(params) {
  154. return nil, errors.Errorf("Expected %v parameters (%v), got %v (%v)",
  155. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params)
  156. }
  157. values := make([]reflect.Value, len(params))
  158. for i, p := range params {
  159. argType := rpcFunc.args[i+argsOffset]
  160. val := reflect.New(argType)
  161. err := json.Unmarshal(*p, val.Interface())
  162. if err != nil {
  163. return nil, err
  164. }
  165. values[i] = val.Elem()
  166. }
  167. return values, nil
  168. }
  169. // raw is unparsed json (from json.RawMessage) encoding either a map or an array.
  170. //
  171. // argsOffset should be 0 for RPC calls, and 1 for WS requests, where len(rpcFunc.args) != len(rpcFunc.argNames).
  172. // Example:
  173. // rpcFunc.args = [rpctypes.WSRPCContext string]
  174. // rpcFunc.argNames = ["arg"]
  175. func jsonParamsToArgs(rpcFunc *RPCFunc, raw []byte, argsOffset int) ([]reflect.Value, error) {
  176. // first, try to get the map..
  177. var m map[string]*json.RawMessage
  178. err := json.Unmarshal(raw, &m)
  179. if err == nil {
  180. return mapParamsToArgs(rpcFunc, m, argsOffset)
  181. }
  182. // otherwise, try an array
  183. var a []*json.RawMessage
  184. err = json.Unmarshal(raw, &a)
  185. if err == nil {
  186. return arrayParamsToArgs(rpcFunc, a, argsOffset)
  187. }
  188. // otherwise, bad format, we cannot parse
  189. return nil, errors.Errorf("Unknown type for JSON params: %v. Expected map or array", err)
  190. }
  191. // Convert a []interface{} OR a map[string]interface{} to properly typed values
  192. func jsonParamsToArgsRPC(rpcFunc *RPCFunc, params json.RawMessage) ([]reflect.Value, error) {
  193. return jsonParamsToArgs(rpcFunc, params, 0)
  194. }
  195. // Same as above, but with the first param the websocket connection
  196. func jsonParamsToArgsWS(rpcFunc *RPCFunc, params json.RawMessage, wsCtx types.WSRPCContext) ([]reflect.Value, error) {
  197. values, err := jsonParamsToArgs(rpcFunc, params, 1)
  198. if err != nil {
  199. return nil, err
  200. }
  201. return append([]reflect.Value{reflect.ValueOf(wsCtx)}, values...), nil
  202. }
  203. // rpc.json
  204. //-----------------------------------------------------------------------------
  205. // rpc.http
  206. // convert from a function name to the http handler
  207. func makeHTTPHandler(rpcFunc *RPCFunc, logger log.Logger) func(http.ResponseWriter, *http.Request) {
  208. // Exception for websocket endpoints
  209. if rpcFunc.ws {
  210. return func(w http.ResponseWriter, r *http.Request) {
  211. WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(""))
  212. }
  213. }
  214. // All other endpoints
  215. return func(w http.ResponseWriter, r *http.Request) {
  216. logger.Debug("HTTP HANDLER", "req", r)
  217. args, err := httpParamsToArgs(rpcFunc, r)
  218. if err != nil {
  219. WriteRPCResponseHTTP(w, types.RPCInvalidParamsError("", errors.Wrap(err, "Error converting http params to arguments")))
  220. return
  221. }
  222. returns := rpcFunc.f.Call(args)
  223. logger.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  224. result, err := unreflectResult(returns)
  225. if err != nil {
  226. WriteRPCResponseHTTP(w, types.RPCInternalError("", err))
  227. return
  228. }
  229. WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse("", result))
  230. }
  231. }
  232. // Covert an http query to a list of properly typed values.
  233. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  234. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  235. values := make([]reflect.Value, len(rpcFunc.args))
  236. for i, name := range rpcFunc.argNames {
  237. argType := rpcFunc.args[i]
  238. values[i] = reflect.Zero(argType) // set default for that type
  239. arg := GetParam(r, name)
  240. // log.Notice("param to arg", "argType", argType, "name", name, "arg", arg)
  241. if "" == arg {
  242. continue
  243. }
  244. v, err, ok := nonJsonToArg(argType, arg)
  245. if err != nil {
  246. return nil, err
  247. }
  248. if ok {
  249. values[i] = v
  250. continue
  251. }
  252. values[i], err = _jsonStringToArg(argType, arg)
  253. if err != nil {
  254. return nil, err
  255. }
  256. }
  257. return values, nil
  258. }
  259. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  260. v := reflect.New(ty)
  261. err := json.Unmarshal([]byte(arg), v.Interface())
  262. if err != nil {
  263. return v, err
  264. }
  265. v = v.Elem()
  266. return v, nil
  267. }
  268. func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) {
  269. isQuotedString := strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`)
  270. isHexString := strings.HasPrefix(strings.ToLower(arg), "0x")
  271. expectingString := ty.Kind() == reflect.String
  272. expectingByteSlice := ty.Kind() == reflect.Slice && ty.Elem().Kind() == reflect.Uint8
  273. if isHexString {
  274. if !expectingString && !expectingByteSlice {
  275. err := errors.Errorf("Got a hex string arg, but expected '%s'",
  276. ty.Kind().String())
  277. return reflect.ValueOf(nil), err, false
  278. }
  279. var value []byte
  280. value, err := hex.DecodeString(arg[2:])
  281. if err != nil {
  282. return reflect.ValueOf(nil), err, false
  283. }
  284. if ty.Kind() == reflect.String {
  285. return reflect.ValueOf(string(value)), nil, true
  286. }
  287. return reflect.ValueOf([]byte(value)), nil, true
  288. }
  289. if isQuotedString && expectingByteSlice {
  290. v := reflect.New(reflect.TypeOf(""))
  291. err := json.Unmarshal([]byte(arg), v.Interface())
  292. if err != nil {
  293. return reflect.ValueOf(nil), err, false
  294. }
  295. v = v.Elem()
  296. return reflect.ValueOf([]byte(v.String())), nil, true
  297. }
  298. return reflect.ValueOf(nil), nil, false
  299. }
  300. // rpc.http
  301. //-----------------------------------------------------------------------------
  302. // rpc.websocket
  303. const (
  304. defaultWSWriteChanCapacity = 1000
  305. defaultWSWriteWait = 10 * time.Second
  306. defaultWSReadWait = 30 * time.Second
  307. defaultWSPingPeriod = (defaultWSReadWait * 9) / 10
  308. )
  309. // a single websocket connection
  310. // contains listener id, underlying ws connection,
  311. // and the event switch for subscribing to events
  312. type wsConnection struct {
  313. cmn.BaseService
  314. remoteAddr string
  315. baseConn *websocket.Conn
  316. writeChan chan types.RPCResponse
  317. funcMap map[string]*RPCFunc
  318. evsw events.EventSwitch
  319. // write channel capacity
  320. writeChanCapacity int
  321. // each write times out after this.
  322. writeWait time.Duration
  323. // Connection times out if we haven't received *anything* in this long, not even pings.
  324. readWait time.Duration
  325. // Send pings to server with this period. Must be less than readWait, but greater than zero.
  326. pingPeriod time.Duration
  327. }
  328. // NewWSConnection wraps websocket.Conn. See the commentary on the
  329. // func(*wsConnection) functions for a detailed description of how to configure
  330. // ping period and pong wait time.
  331. // NOTE: if the write buffer is full, pongs may be dropped, which may cause clients to disconnect.
  332. // see https://github.com/gorilla/websocket/issues/97
  333. func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection {
  334. wsc := &wsConnection{
  335. remoteAddr: baseConn.RemoteAddr().String(),
  336. baseConn: baseConn,
  337. funcMap: funcMap,
  338. evsw: evsw,
  339. writeWait: defaultWSWriteWait,
  340. writeChanCapacity: defaultWSWriteChanCapacity,
  341. readWait: defaultWSReadWait,
  342. pingPeriod: defaultWSPingPeriod,
  343. }
  344. for _, option := range options {
  345. option(wsc)
  346. }
  347. wsc.BaseService = *cmn.NewBaseService(nil, "wsConnection", wsc)
  348. return wsc
  349. }
  350. // WriteWait sets the amount of time to wait before a websocket write times out.
  351. // It should only be used in the constructor - not Goroutine-safe.
  352. func WriteWait(writeWait time.Duration) func(*wsConnection) {
  353. return func(wsc *wsConnection) {
  354. wsc.writeWait = writeWait
  355. }
  356. }
  357. // WriteChanCapacity sets the capacity of the websocket write channel.
  358. // It should only be used in the constructor - not Goroutine-safe.
  359. func WriteChanCapacity(cap int) func(*wsConnection) {
  360. return func(wsc *wsConnection) {
  361. wsc.writeChanCapacity = cap
  362. }
  363. }
  364. // ReadWait sets the amount of time to wait before a websocket read times out.
  365. // It should only be used in the constructor - not Goroutine-safe.
  366. func ReadWait(readWait time.Duration) func(*wsConnection) {
  367. return func(wsc *wsConnection) {
  368. wsc.readWait = readWait
  369. }
  370. }
  371. // PingPeriod sets the duration for sending websocket pings.
  372. // It should only be used in the constructor - not Goroutine-safe.
  373. func PingPeriod(pingPeriod time.Duration) func(*wsConnection) {
  374. return func(wsc *wsConnection) {
  375. wsc.pingPeriod = pingPeriod
  376. }
  377. }
  378. // OnStart starts the read and write routines. It blocks until the connection closes.
  379. func (wsc *wsConnection) OnStart() error {
  380. wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity)
  381. // Read subscriptions/unsubscriptions to events
  382. go wsc.readRoutine()
  383. // Write responses, BLOCKING.
  384. wsc.writeRoutine()
  385. return nil
  386. }
  387. // OnStop unsubscribes from all events.
  388. func (wsc *wsConnection) OnStop() {
  389. if wsc.evsw != nil {
  390. wsc.evsw.RemoveListener(wsc.remoteAddr)
  391. }
  392. // Both read and write loops close the websocket connection when they exit their loops.
  393. // The writeChan is never closed, to allow WriteRPCResponse() to fail.
  394. }
  395. // GetRemoteAddr returns the remote address of the underlying connection.
  396. // It implements WSRPCConnection
  397. func (wsc *wsConnection) GetRemoteAddr() string {
  398. return wsc.remoteAddr
  399. }
  400. // GetEventSwitch returns the event switch.
  401. // It implements WSRPCConnection
  402. func (wsc *wsConnection) GetEventSwitch() events.EventSwitch {
  403. return wsc.evsw
  404. }
  405. // WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
  406. // It implements WSRPCConnection. It is Goroutine-safe.
  407. func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
  408. select {
  409. case <-wsc.Quit:
  410. return
  411. case wsc.writeChan <- resp:
  412. }
  413. }
  414. // TryWriteRPCResponse attempts to push a response to the writeChan, but does not block.
  415. // It implements WSRPCConnection. It is Goroutine-safe
  416. func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
  417. select {
  418. case <-wsc.Quit:
  419. return false
  420. case wsc.writeChan <- resp:
  421. return true
  422. default:
  423. return false
  424. }
  425. }
  426. // Read from the socket and subscribe to or unsubscribe from events
  427. func (wsc *wsConnection) readRoutine() {
  428. defer func() {
  429. if r := recover(); r != nil {
  430. err, ok := r.(error)
  431. if !ok {
  432. err = fmt.Errorf("WSJSONRPC: %v", r)
  433. }
  434. wsc.Logger.Error("Panic in WSJSONRPC handler", "err", err, "stack", string(debug.Stack()))
  435. wsc.WriteRPCResponse(types.RPCInternalError("unknown", err))
  436. go wsc.readRoutine()
  437. } else {
  438. wsc.baseConn.Close()
  439. }
  440. }()
  441. wsc.baseConn.SetPongHandler(func(m string) error {
  442. return wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait))
  443. })
  444. for {
  445. select {
  446. case <-wsc.Quit:
  447. return
  448. default:
  449. // reset deadline for every type of message (control or data)
  450. wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait))
  451. var in []byte
  452. _, in, err := wsc.baseConn.ReadMessage()
  453. if err != nil {
  454. if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
  455. wsc.Logger.Info("Client closed the connection")
  456. } else {
  457. wsc.Logger.Error("Failed to read request", "err", err)
  458. }
  459. wsc.Stop()
  460. return
  461. }
  462. var request types.RPCRequest
  463. err = json.Unmarshal(in, &request)
  464. if err != nil {
  465. wsc.WriteRPCResponse(types.RPCParseError("", errors.Wrap(err, "Error unmarshaling request")))
  466. continue
  467. }
  468. // A Notification is a Request object without an "id" member.
  469. // The Server MUST NOT reply to a Notification, including those that are within a batch request.
  470. if request.ID == "" {
  471. wsc.Logger.Debug("WSJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)")
  472. continue
  473. }
  474. // Now, fetch the RPCFunc and execute it.
  475. rpcFunc := wsc.funcMap[request.Method]
  476. if rpcFunc == nil {
  477. wsc.WriteRPCResponse(types.RPCMethodNotFoundError(request.ID))
  478. continue
  479. }
  480. var args []reflect.Value
  481. if rpcFunc.ws {
  482. wsCtx := types.WSRPCContext{Request: request, WSRPCConnection: wsc}
  483. if len(request.Params) > 0 {
  484. args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx)
  485. }
  486. } else {
  487. if len(request.Params) > 0 {
  488. args, err = jsonParamsToArgsRPC(rpcFunc, request.Params)
  489. }
  490. }
  491. if err != nil {
  492. wsc.WriteRPCResponse(types.RPCInternalError(request.ID, errors.Wrap(err, "Error converting json params to arguments")))
  493. continue
  494. }
  495. returns := rpcFunc.f.Call(args)
  496. // TODO: Need to encode args/returns to string if we want to log them
  497. wsc.Logger.Info("WSJSONRPC", "method", request.Method)
  498. result, err := unreflectResult(returns)
  499. if err != nil {
  500. wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err))
  501. continue
  502. } else {
  503. wsc.WriteRPCResponse(types.NewRPCSuccessResponse(request.ID, result))
  504. continue
  505. }
  506. }
  507. }
  508. }
  509. // receives on a write channel and writes out on the socket
  510. func (wsc *wsConnection) writeRoutine() {
  511. pingTicker := time.NewTicker(wsc.pingPeriod)
  512. defer func() {
  513. pingTicker.Stop()
  514. wsc.baseConn.Close()
  515. }()
  516. // https://github.com/gorilla/websocket/issues/97
  517. pongs := make(chan string, 1)
  518. wsc.baseConn.SetPingHandler(func(m string) error {
  519. select {
  520. case pongs <- m:
  521. default:
  522. }
  523. return nil
  524. })
  525. for {
  526. select {
  527. case m := <-pongs:
  528. err := wsc.writeMessageWithDeadline(websocket.PongMessage, []byte(m))
  529. if err != nil {
  530. wsc.Logger.Info("Failed to write pong (client may disconnect)", "err", err)
  531. }
  532. case <-pingTicker.C:
  533. err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{})
  534. if err != nil {
  535. wsc.Logger.Error("Failed to write ping", "err", err)
  536. wsc.Stop()
  537. return
  538. }
  539. case msg := <-wsc.writeChan:
  540. jsonBytes, err := json.MarshalIndent(msg, "", " ")
  541. if err != nil {
  542. wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err)
  543. } else {
  544. if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil {
  545. wsc.Logger.Error("Failed to write response", "err", err)
  546. wsc.Stop()
  547. return
  548. }
  549. }
  550. case <-wsc.Quit:
  551. return
  552. }
  553. }
  554. }
  555. // All writes to the websocket must (re)set the write deadline.
  556. // If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553)
  557. func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error {
  558. wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait))
  559. return wsc.baseConn.WriteMessage(msgType, msg)
  560. }
  561. //----------------------------------------
  562. // WebsocketManager is the main manager for all websocket connections.
  563. // It holds the event switch and a map of functions for routing.
  564. // NOTE: The websocket path is defined externally, e.g. in node/node.go
  565. type WebsocketManager struct {
  566. websocket.Upgrader
  567. funcMap map[string]*RPCFunc
  568. evsw events.EventSwitch
  569. logger log.Logger
  570. wsConnOptions []func(*wsConnection)
  571. }
  572. // NewWebsocketManager returns a new WebsocketManager that routes according to the given funcMap, listens on the given event switch,
  573. // and connects to the server with the given connection options.
  574. func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, wsConnOptions ...func(*wsConnection)) *WebsocketManager {
  575. return &WebsocketManager{
  576. funcMap: funcMap,
  577. evsw: evsw,
  578. Upgrader: websocket.Upgrader{
  579. CheckOrigin: func(r *http.Request) bool {
  580. // TODO ???
  581. return true
  582. },
  583. },
  584. logger: log.NewNopLogger(),
  585. wsConnOptions: wsConnOptions,
  586. }
  587. }
  588. // SetLogger sets the logger.
  589. func (wm *WebsocketManager) SetLogger(l log.Logger) {
  590. wm.logger = l
  591. }
  592. // WebsocketHandler upgrades the request/response (via http.Hijack) and starts the wsConnection.
  593. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
  594. wsConn, err := wm.Upgrade(w, r, nil)
  595. if err != nil {
  596. // TODO - return http error
  597. wm.logger.Error("Failed to upgrade to websocket connection", "err", err)
  598. return
  599. }
  600. // register connection
  601. con := NewWSConnection(wsConn, wm.funcMap, wm.evsw, wm.wsConnOptions...)
  602. con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr()))
  603. wm.logger.Info("New websocket connection", "remote", con.remoteAddr)
  604. con.Start() // Blocking
  605. }
  606. // rpc.websocket
  607. //-----------------------------------------------------------------------------
  608. // NOTE: assume returns is result struct and error. If error is not nil, return it
  609. func unreflectResult(returns []reflect.Value) (interface{}, error) {
  610. errV := returns[1]
  611. if errV.Interface() != nil {
  612. return nil, errors.Errorf("%v", errV.Interface())
  613. }
  614. rv := returns[0]
  615. // the result is a registered interface,
  616. // we need a pointer to it so we can marshal with type byte
  617. rvp := reflect.New(rv.Type())
  618. rvp.Elem().Set(rv)
  619. return rvp.Interface(), nil
  620. }
  621. // writes a list of available rpc endpoints as an html page
  622. func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
  623. noArgNames := []string{}
  624. argNames := []string{}
  625. for name, funcData := range funcMap {
  626. if len(funcData.args) == 0 {
  627. noArgNames = append(noArgNames, name)
  628. } else {
  629. argNames = append(argNames, name)
  630. }
  631. }
  632. sort.Strings(noArgNames)
  633. sort.Strings(argNames)
  634. buf := new(bytes.Buffer)
  635. buf.WriteString("<html><body>")
  636. buf.WriteString("<br>Available endpoints:<br>")
  637. for _, name := range noArgNames {
  638. link := fmt.Sprintf("http://%s/%s", r.Host, name)
  639. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  640. }
  641. buf.WriteString("<br>Endpoints that require arguments:<br>")
  642. for _, name := range argNames {
  643. link := fmt.Sprintf("http://%s/%s?", r.Host, name)
  644. funcData := funcMap[name]
  645. for i, argName := range funcData.argNames {
  646. link += argName + "=_"
  647. if i < len(funcData.argNames)-1 {
  648. link += "&"
  649. }
  650. }
  651. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  652. }
  653. buf.WriteString("</body></html>")
  654. w.Header().Set("Content-Type", "text/html")
  655. w.WriteHeader(200)
  656. w.Write(buf.Bytes())
  657. }