You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

565 lines
16 KiB

  1. package rpcserver
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "reflect"
  10. "sort"
  11. "strings"
  12. "time"
  13. "github.com/gorilla/websocket"
  14. . "github.com/tendermint/go-common"
  15. "github.com/tendermint/go-events"
  16. . "github.com/tendermint/go-rpc/types"
  17. "github.com/tendermint/go-wire"
  18. )
  19. // Adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions.
  20. // "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse
  21. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  22. // HTTP endpoints
  23. for funcName, rpcFunc := range funcMap {
  24. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  25. }
  26. // JSONRPC endpoints
  27. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  28. }
  29. //-------------------------------------
  30. // function introspection
  31. // holds all type information for each function
  32. type RPCFunc struct {
  33. f reflect.Value // underlying rpc function
  34. args []reflect.Type // type of each function arg
  35. returns []reflect.Type // type of each return arg
  36. argNames []string // name of each argument
  37. ws bool // websocket only
  38. }
  39. // wraps a function for quicker introspection
  40. // f is the function, args are comma separated argument names
  41. func NewRPCFunc(f interface{}, args string) *RPCFunc {
  42. return newRPCFunc(f, args, false)
  43. }
  44. func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
  45. return newRPCFunc(f, args, true)
  46. }
  47. func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc {
  48. var argNames []string
  49. if args != "" {
  50. argNames = strings.Split(args, ",")
  51. }
  52. return &RPCFunc{
  53. f: reflect.ValueOf(f),
  54. args: funcArgTypes(f),
  55. returns: funcReturnTypes(f),
  56. argNames: argNames,
  57. ws: ws,
  58. }
  59. }
  60. // return a function's argument types
  61. func funcArgTypes(f interface{}) []reflect.Type {
  62. t := reflect.TypeOf(f)
  63. n := t.NumIn()
  64. typez := make([]reflect.Type, n)
  65. for i := 0; i < n; i++ {
  66. typez[i] = t.In(i)
  67. }
  68. return typez
  69. }
  70. // return a function's return types
  71. func funcReturnTypes(f interface{}) []reflect.Type {
  72. t := reflect.TypeOf(f)
  73. n := t.NumOut()
  74. typez := make([]reflect.Type, n)
  75. for i := 0; i < n; i++ {
  76. typez[i] = t.Out(i)
  77. }
  78. return typez
  79. }
  80. // function introspection
  81. //-----------------------------------------------------------------------------
  82. // rpc.json
  83. // jsonrpc calls grab the given method's function info and runs reflect.Call
  84. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  85. return func(w http.ResponseWriter, r *http.Request) {
  86. b, _ := ioutil.ReadAll(r.Body)
  87. // if its an empty request (like from a browser),
  88. // just display a list of functions
  89. if len(b) == 0 {
  90. writeListOfEndpoints(w, r, funcMap)
  91. return
  92. }
  93. var request RPCRequest
  94. err := json.Unmarshal(b, &request)
  95. if err != nil {
  96. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, fmt.Sprintf("Error unmarshalling request: %v", err.Error())))
  97. return
  98. }
  99. if len(r.URL.Path) > 1 {
  100. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  101. return
  102. }
  103. rpcFunc := funcMap[request.Method]
  104. if rpcFunc == nil {
  105. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  106. return
  107. }
  108. if rpcFunc.ws {
  109. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method))
  110. return
  111. }
  112. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  113. if err != nil {
  114. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Error converting json params to arguments: %v", err.Error())))
  115. return
  116. }
  117. returns := rpcFunc.f.Call(args)
  118. log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  119. result, err := unreflectResult(returns)
  120. if err != nil {
  121. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, fmt.Sprintf("Error unreflecting result: %v", err.Error())))
  122. return
  123. }
  124. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, ""))
  125. }
  126. }
  127. // Convert a list of interfaces to properly typed values
  128. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  129. if len(rpcFunc.argNames) != len(params) {
  130. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  131. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
  132. }
  133. values := make([]reflect.Value, len(params))
  134. for i, p := range params {
  135. ty := rpcFunc.args[i]
  136. v, err := _jsonObjectToArg(ty, p)
  137. if err != nil {
  138. return nil, err
  139. }
  140. values[i] = v
  141. }
  142. return values, nil
  143. }
  144. // Same as above, but with the first param the websocket connection
  145. func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) {
  146. if len(rpcFunc.argNames) != len(params) {
  147. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  148. len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params))
  149. }
  150. values := make([]reflect.Value, len(params)+1)
  151. values[0] = reflect.ValueOf(wsCtx)
  152. for i, p := range params {
  153. ty := rpcFunc.args[i+1]
  154. v, err := _jsonObjectToArg(ty, p)
  155. if err != nil {
  156. return nil, err
  157. }
  158. values[i+1] = v
  159. }
  160. return values, nil
  161. }
  162. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  163. var err error
  164. v := reflect.New(ty)
  165. wire.ReadJSONObjectPtr(v.Interface(), object, &err)
  166. if err != nil {
  167. return v, err
  168. }
  169. v = v.Elem()
  170. return v, nil
  171. }
  172. // rpc.json
  173. //-----------------------------------------------------------------------------
  174. // rpc.http
  175. // convert from a function name to the http handler
  176. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  177. // Exception for websocket endpoints
  178. if rpcFunc.ws {
  179. return func(w http.ResponseWriter, r *http.Request) {
  180. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets"))
  181. }
  182. }
  183. // All other endpoints
  184. return func(w http.ResponseWriter, r *http.Request) {
  185. log.Debug("HTTP HANDLER", "req", r)
  186. args, err := httpParamsToArgs(rpcFunc, r)
  187. if err != nil {
  188. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, fmt.Sprintf("Error converting http params to args: %v", err.Error())))
  189. return
  190. }
  191. returns := rpcFunc.f.Call(args)
  192. log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  193. result, err := unreflectResult(returns)
  194. if err != nil {
  195. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, fmt.Sprintf("Error unreflecting result: %v", err.Error())))
  196. return
  197. }
  198. WriteRPCResponseHTTP(w, NewRPCResponse("", result, ""))
  199. }
  200. }
  201. // Covert an http query to a list of properly typed values.
  202. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  203. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  204. argTypes := rpcFunc.args
  205. argNames := rpcFunc.argNames
  206. var err error
  207. values := make([]reflect.Value, len(argNames))
  208. for i, name := range argNames {
  209. ty := argTypes[i]
  210. arg := GetParam(r, name)
  211. //log.Notice("param to arg", "ty", ty, "name", name, "arg", arg)
  212. values[i], err = _jsonStringToArg(ty, arg)
  213. if err != nil {
  214. return nil, err
  215. }
  216. }
  217. return values, nil
  218. }
  219. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  220. var err error
  221. v := reflect.New(ty)
  222. wire.ReadJSONPtr(v.Interface(), []byte(arg), &err)
  223. if err != nil {
  224. return v, err
  225. }
  226. v = v.Elem()
  227. return v, nil
  228. }
  229. // rpc.http
  230. //-----------------------------------------------------------------------------
  231. // rpc.websocket
  232. const (
  233. writeChanCapacity = 1000
  234. wsWriteTimeoutSeconds = 30 // each write times out after this
  235. wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings.
  236. wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds.
  237. )
  238. // a single websocket connection
  239. // contains listener id, underlying ws connection,
  240. // and the event switch for subscribing to events
  241. type wsConnection struct {
  242. BaseService
  243. remoteAddr string
  244. baseConn *websocket.Conn
  245. writeChan chan RPCResponse
  246. readTimeout *time.Timer
  247. pingTicker *time.Ticker
  248. funcMap map[string]*RPCFunc
  249. evsw events.EventSwitch
  250. }
  251. // new websocket connection wrapper
  252. func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch) *wsConnection {
  253. wsc := &wsConnection{
  254. remoteAddr: baseConn.RemoteAddr().String(),
  255. baseConn: baseConn,
  256. writeChan: make(chan RPCResponse, writeChanCapacity), // error when full.
  257. funcMap: funcMap,
  258. evsw: evsw,
  259. }
  260. wsc.BaseService = *NewBaseService(log, "wsConnection", wsc)
  261. return wsc
  262. }
  263. // wsc.Start() blocks until the connection closes.
  264. func (wsc *wsConnection) OnStart() error {
  265. wsc.BaseService.OnStart()
  266. // Read subscriptions/unsubscriptions to events
  267. go wsc.readRoutine()
  268. // Custom Ping handler to touch readTimeout
  269. wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds)
  270. wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds)
  271. wsc.baseConn.SetPingHandler(func(m string) error {
  272. // NOTE: https://github.com/gorilla/websocket/issues/97
  273. go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  274. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  275. return nil
  276. })
  277. wsc.baseConn.SetPongHandler(func(m string) error {
  278. // NOTE: https://github.com/gorilla/websocket/issues/97
  279. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  280. return nil
  281. })
  282. go wsc.readTimeoutRoutine()
  283. // Write responses, BLOCKING.
  284. wsc.writeRoutine()
  285. return nil
  286. }
  287. func (wsc *wsConnection) OnStop() {
  288. wsc.BaseService.OnStop()
  289. wsc.evsw.RemoveListener(wsc.remoteAddr)
  290. wsc.readTimeout.Stop()
  291. wsc.pingTicker.Stop()
  292. // The write loop closes the websocket connection
  293. // when it exits its loop, and the read loop
  294. // closes the writeChan
  295. }
  296. func (wsc *wsConnection) readTimeoutRoutine() {
  297. select {
  298. case <-wsc.readTimeout.C:
  299. log.Notice("Stopping connection due to read timeout")
  300. wsc.Stop()
  301. case <-wsc.Quit:
  302. return
  303. }
  304. }
  305. // Implements WSRPCConnection
  306. func (wsc *wsConnection) GetRemoteAddr() string {
  307. return wsc.remoteAddr
  308. }
  309. // Implements WSRPCConnection
  310. func (wsc *wsConnection) GetEventSwitch() events.EventSwitch {
  311. return wsc.evsw
  312. }
  313. // Implements WSRPCConnection
  314. // Blocking write to writeChan until service stops.
  315. // Goroutine-safe
  316. func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) {
  317. select {
  318. case <-wsc.Quit:
  319. return
  320. case wsc.writeChan <- resp:
  321. }
  322. }
  323. // Implements WSRPCConnection
  324. // Nonblocking write.
  325. // Goroutine-safe
  326. func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool {
  327. select {
  328. case <-wsc.Quit:
  329. return false
  330. case wsc.writeChan <- resp:
  331. return true
  332. default:
  333. return false
  334. }
  335. }
  336. // Read from the socket and subscribe to or unsubscribe from events
  337. func (wsc *wsConnection) readRoutine() {
  338. // Do not close writeChan, to allow WriteRPCResponse() to fail.
  339. // defer close(wsc.writeChan)
  340. for {
  341. select {
  342. case <-wsc.Quit:
  343. return
  344. default:
  345. var in []byte
  346. // Do not set a deadline here like below:
  347. // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds))
  348. // The client may not send anything for a while.
  349. // We use `readTimeout` to handle read timeouts.
  350. _, in, err := wsc.baseConn.ReadMessage()
  351. if err != nil {
  352. log.Notice("Failed to read from connection", "remote", wsc.remoteAddr, "err", err.Error())
  353. // an error reading the connection,
  354. // kill the connection
  355. wsc.Stop()
  356. return
  357. }
  358. var request RPCRequest
  359. err = json.Unmarshal(in, &request)
  360. if err != nil {
  361. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  362. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr))
  363. continue
  364. }
  365. // Now, fetch the RPCFunc and execute it.
  366. rpcFunc := wsc.funcMap[request.Method]
  367. if rpcFunc == nil {
  368. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  369. continue
  370. }
  371. var args []reflect.Value
  372. if rpcFunc.ws {
  373. wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc}
  374. args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx)
  375. } else {
  376. args, err = jsonParamsToArgs(rpcFunc, request.Params)
  377. }
  378. if err != nil {
  379. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
  380. continue
  381. }
  382. returns := rpcFunc.f.Call(args)
  383. log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns)
  384. result, err := unreflectResult(returns)
  385. if err != nil {
  386. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
  387. continue
  388. } else {
  389. wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, ""))
  390. continue
  391. }
  392. }
  393. }
  394. }
  395. // receives on a write channel and writes out on the socket
  396. func (wsc *wsConnection) writeRoutine() {
  397. defer wsc.baseConn.Close()
  398. for {
  399. select {
  400. case <-wsc.Quit:
  401. return
  402. case <-wsc.pingTicker.C:
  403. err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{})
  404. if err != nil {
  405. log.Error("Failed to write ping message on websocket", "error", err)
  406. wsc.Stop()
  407. return
  408. }
  409. case msg := <-wsc.writeChan:
  410. jsonBytes, err := json.Marshal(msg)
  411. if err != nil {
  412. log.Error("Failed to marshal RPCResponse to JSON", "error", err)
  413. } else {
  414. wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
  415. if err = wsc.baseConn.WriteMessage(websocket.TextMessage, jsonBytes); err != nil {
  416. log.Warn("Failed to write response on websocket", "error", err)
  417. wsc.Stop()
  418. return
  419. }
  420. }
  421. }
  422. }
  423. }
  424. //----------------------------------------
  425. // Main manager for all websocket connections
  426. // Holds the event switch
  427. // NOTE: The websocket path is defined externally, e.g. in node/node.go
  428. type WebsocketManager struct {
  429. websocket.Upgrader
  430. funcMap map[string]*RPCFunc
  431. evsw events.EventSwitch
  432. }
  433. func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch) *WebsocketManager {
  434. return &WebsocketManager{
  435. funcMap: funcMap,
  436. evsw: evsw,
  437. Upgrader: websocket.Upgrader{
  438. ReadBufferSize: 1024,
  439. WriteBufferSize: 1024,
  440. CheckOrigin: func(r *http.Request) bool {
  441. // TODO
  442. return true
  443. },
  444. },
  445. }
  446. }
  447. // Upgrade the request/response (via http.Hijack) and starts the wsConnection.
  448. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
  449. wsConn, err := wm.Upgrade(w, r, nil)
  450. if err != nil {
  451. // TODO - return http error
  452. log.Error("Failed to upgrade to websocket connection", "error", err)
  453. return
  454. }
  455. // register connection
  456. con := NewWSConnection(wsConn, wm.funcMap, wm.evsw)
  457. log.Notice("New websocket connection", "remote", con.remoteAddr)
  458. con.Start() // Blocking
  459. }
  460. // rpc.websocket
  461. //-----------------------------------------------------------------------------
  462. // NOTE: assume returns is result struct and error. If error is not nil, return it
  463. func unreflectResult(returns []reflect.Value) (interface{}, error) {
  464. errV := returns[1]
  465. if errV.Interface() != nil {
  466. return nil, fmt.Errorf("%v", errV.Interface())
  467. }
  468. rv := returns[0]
  469. // the result is a registered interface,
  470. // we need a pointer to it so we can marshal with type byte
  471. rvp := reflect.New(rv.Type())
  472. rvp.Elem().Set(rv)
  473. return rvp.Interface(), nil
  474. }
  475. // writes a list of available rpc endpoints as an html page
  476. func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
  477. noArgNames := []string{}
  478. argNames := []string{}
  479. for name, funcData := range funcMap {
  480. if len(funcData.args) == 0 {
  481. noArgNames = append(noArgNames, name)
  482. } else {
  483. argNames = append(argNames, name)
  484. }
  485. }
  486. sort.Strings(noArgNames)
  487. sort.Strings(argNames)
  488. buf := new(bytes.Buffer)
  489. buf.WriteString("<html><body>")
  490. buf.WriteString("<br>Available endpoints:<br>")
  491. for _, name := range noArgNames {
  492. link := fmt.Sprintf("http://%s/%s", r.Host, name)
  493. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  494. }
  495. buf.WriteString("<br>Endpoints that require arguments:<br>")
  496. for _, name := range argNames {
  497. link := fmt.Sprintf("http://%s/%s?", r.Host, name)
  498. funcData := funcMap[name]
  499. for i, argName := range funcData.argNames {
  500. link += argName + "=_"
  501. if i < len(funcData.argNames)-1 {
  502. link += "&"
  503. }
  504. }
  505. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  506. }
  507. buf.WriteString("</body></html>")
  508. w.Header().Set("Content-Type", "text/html")
  509. w.WriteHeader(200)
  510. w.Write(buf.Bytes())
  511. }