You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

563 lines
16 KiB

  1. package rpcserver
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "reflect"
  10. "sort"
  11. "strings"
  12. "time"
  13. "github.com/gorilla/websocket"
  14. . "github.com/tendermint/go-common"
  15. "github.com/tendermint/go-events"
  16. . "github.com/tendermint/go-rpc/types"
  17. "github.com/tendermint/go-wire"
  18. )
  19. // Adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions.
  20. // "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse
  21. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  22. // HTTP endpoints
  23. for funcName, rpcFunc := range funcMap {
  24. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  25. }
  26. // JSONRPC endpoints
  27. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  28. }
  29. //-------------------------------------
  30. // function introspection
  31. // holds all type information for each function
  32. type RPCFunc struct {
  33. f reflect.Value // underlying rpc function
  34. args []reflect.Type // type of each function arg
  35. returns []reflect.Type // type of each return arg
  36. argNames []string // name of each argument
  37. ws bool // websocket only
  38. }
  39. // wraps a function for quicker introspection
  40. // f is the function, args are comma separated argument names
  41. func NewRPCFunc(f interface{}, args string) *RPCFunc {
  42. return newRPCFunc(f, args, false)
  43. }
  44. func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
  45. return newRPCFunc(f, args, true)
  46. }
  47. func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc {
  48. var argNames []string
  49. if args != "" {
  50. argNames = strings.Split(args, ",")
  51. }
  52. return &RPCFunc{
  53. f: reflect.ValueOf(f),
  54. args: funcArgTypes(f),
  55. returns: funcReturnTypes(f),
  56. argNames: argNames,
  57. ws: ws,
  58. }
  59. }
  60. // return a function's argument types
  61. func funcArgTypes(f interface{}) []reflect.Type {
  62. t := reflect.TypeOf(f)
  63. n := t.NumIn()
  64. typez := make([]reflect.Type, n)
  65. for i := 0; i < n; i++ {
  66. typez[i] = t.In(i)
  67. }
  68. return typez
  69. }
  70. // return a function's return types
  71. func funcReturnTypes(f interface{}) []reflect.Type {
  72. t := reflect.TypeOf(f)
  73. n := t.NumOut()
  74. typez := make([]reflect.Type, n)
  75. for i := 0; i < n; i++ {
  76. typez[i] = t.Out(i)
  77. }
  78. return typez
  79. }
  80. // function introspection
  81. //-----------------------------------------------------------------------------
  82. // rpc.json
  83. // jsonrpc calls grab the given method's function info and runs reflect.Call
  84. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  85. return func(w http.ResponseWriter, r *http.Request) {
  86. b, _ := ioutil.ReadAll(r.Body)
  87. // if its an empty request (like from a browser),
  88. // just display a list of functions
  89. if len(b) == 0 {
  90. writeListOfEndpoints(w, r, funcMap)
  91. return
  92. }
  93. var request RPCRequest
  94. err := json.Unmarshal(b, &request)
  95. if err != nil {
  96. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
  97. return
  98. }
  99. if len(r.URL.Path) > 1 {
  100. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  101. return
  102. }
  103. rpcFunc := funcMap[request.Method]
  104. if rpcFunc == nil {
  105. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  106. return
  107. }
  108. if rpcFunc.ws {
  109. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method))
  110. return
  111. }
  112. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  113. if err != nil {
  114. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error()))
  115. return
  116. }
  117. returns := rpcFunc.f.Call(args)
  118. log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  119. result, err := unreflectResult(returns)
  120. if err != nil {
  121. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error()))
  122. return
  123. }
  124. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, ""))
  125. }
  126. }
  127. // Convert a list of interfaces to properly typed values
  128. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  129. if len(rpcFunc.argNames) != len(params) {
  130. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  131. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
  132. }
  133. values := make([]reflect.Value, len(params))
  134. for i, p := range params {
  135. ty := rpcFunc.args[i]
  136. v, err := _jsonObjectToArg(ty, p)
  137. if err != nil {
  138. return nil, err
  139. }
  140. values[i] = v
  141. }
  142. return values, nil
  143. }
  144. // Same as above, but with the first param the websocket connection
  145. func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) {
  146. if len(rpcFunc.argNames) != len(params) {
  147. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  148. len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params))
  149. }
  150. values := make([]reflect.Value, len(params)+1)
  151. values[0] = reflect.ValueOf(wsCtx)
  152. for i, p := range params {
  153. ty := rpcFunc.args[i+1]
  154. v, err := _jsonObjectToArg(ty, p)
  155. if err != nil {
  156. return nil, err
  157. }
  158. values[i+1] = v
  159. }
  160. return values, nil
  161. }
  162. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  163. var err error
  164. v := reflect.New(ty)
  165. wire.ReadJSONObjectPtr(v.Interface(), object, &err)
  166. if err != nil {
  167. return v, err
  168. }
  169. v = v.Elem()
  170. return v, nil
  171. }
  172. // rpc.json
  173. //-----------------------------------------------------------------------------
  174. // rpc.http
  175. // convert from a function name to the http handler
  176. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  177. // Exception for websocket endpoints
  178. if rpcFunc.ws {
  179. return func(w http.ResponseWriter, r *http.Request) {
  180. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets"))
  181. }
  182. }
  183. // All other endpoints
  184. return func(w http.ResponseWriter, r *http.Request) {
  185. args, err := httpParamsToArgs(rpcFunc, r)
  186. if err != nil {
  187. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
  188. return
  189. }
  190. returns := rpcFunc.f.Call(args)
  191. log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  192. result, err := unreflectResult(returns)
  193. if err != nil {
  194. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
  195. return
  196. }
  197. WriteRPCResponseHTTP(w, NewRPCResponse("", result, ""))
  198. }
  199. }
  200. // Covert an http query to a list of properly typed values.
  201. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  202. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  203. argTypes := rpcFunc.args
  204. argNames := rpcFunc.argNames
  205. var err error
  206. values := make([]reflect.Value, len(argNames))
  207. for i, name := range argNames {
  208. ty := argTypes[i]
  209. arg := GetParam(r, name)
  210. values[i], err = _jsonStringToArg(ty, arg)
  211. if err != nil {
  212. return nil, err
  213. }
  214. }
  215. return values, nil
  216. }
  217. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  218. var err error
  219. v := reflect.New(ty)
  220. wire.ReadJSONPtr(v.Interface(), []byte(arg), &err)
  221. if err != nil {
  222. return v, err
  223. }
  224. v = v.Elem()
  225. return v, nil
  226. }
  227. // rpc.http
  228. //-----------------------------------------------------------------------------
  229. // rpc.websocket
  230. const (
  231. writeChanCapacity = 1000
  232. wsWriteTimeoutSeconds = 30 // each write times out after this
  233. wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings.
  234. wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds.
  235. )
  236. // a single websocket connection
  237. // contains listener id, underlying ws connection,
  238. // and the event switch for subscribing to events
  239. type wsConnection struct {
  240. QuitService
  241. remoteAddr string
  242. baseConn *websocket.Conn
  243. writeChan chan RPCResponse
  244. readTimeout *time.Timer
  245. pingTicker *time.Ticker
  246. funcMap map[string]*RPCFunc
  247. evsw *events.EventSwitch
  248. }
  249. // new websocket connection wrapper
  250. func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *wsConnection {
  251. wsc := &wsConnection{
  252. remoteAddr: baseConn.RemoteAddr().String(),
  253. baseConn: baseConn,
  254. writeChan: make(chan RPCResponse, writeChanCapacity), // error when full.
  255. funcMap: funcMap,
  256. evsw: evsw,
  257. }
  258. wsc.QuitService = *NewQuitService(log, "wsConnection", wsc)
  259. return wsc
  260. }
  261. // wsc.Start() blocks until the connection closes.
  262. func (wsc *wsConnection) OnStart() error {
  263. wsc.QuitService.OnStart()
  264. // Read subscriptions/unsubscriptions to events
  265. go wsc.readRoutine()
  266. // Custom Ping handler to touch readTimeout
  267. wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds)
  268. wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds)
  269. wsc.baseConn.SetPingHandler(func(m string) error {
  270. // NOTE: https://github.com/gorilla/websocket/issues/97
  271. go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  272. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  273. return nil
  274. })
  275. wsc.baseConn.SetPongHandler(func(m string) error {
  276. // NOTE: https://github.com/gorilla/websocket/issues/97
  277. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  278. return nil
  279. })
  280. go wsc.readTimeoutRoutine()
  281. // Write responses, BLOCKING.
  282. wsc.writeRoutine()
  283. return nil
  284. }
  285. func (wsc *wsConnection) OnStop() {
  286. wsc.QuitService.OnStop()
  287. wsc.evsw.RemoveListener(wsc.remoteAddr)
  288. wsc.readTimeout.Stop()
  289. wsc.pingTicker.Stop()
  290. // The write loop closes the websocket connection
  291. // when it exits its loop, and the read loop
  292. // closes the writeChan
  293. }
  294. func (wsc *wsConnection) readTimeoutRoutine() {
  295. select {
  296. case <-wsc.readTimeout.C:
  297. log.Notice("Stopping connection due to read timeout")
  298. wsc.Stop()
  299. case <-wsc.Quit:
  300. return
  301. }
  302. }
  303. // Implements WSRPCConnection
  304. func (wsc *wsConnection) GetRemoteAddr() string {
  305. return wsc.remoteAddr
  306. }
  307. // Implements WSRPCConnection
  308. func (wsc *wsConnection) GetEventSwitch() *events.EventSwitch {
  309. return wsc.evsw
  310. }
  311. // Implements WSRPCConnection
  312. // Blocking write to writeChan until service stops.
  313. // Goroutine-safe
  314. func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) {
  315. select {
  316. case <-wsc.Quit:
  317. return
  318. case wsc.writeChan <- resp:
  319. }
  320. }
  321. // Implements WSRPCConnection
  322. // Nonblocking write.
  323. // Goroutine-safe
  324. func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool {
  325. select {
  326. case <-wsc.Quit:
  327. return false
  328. case wsc.writeChan <- resp:
  329. return true
  330. default:
  331. return false
  332. }
  333. }
  334. // Read from the socket and subscribe to or unsubscribe from events
  335. func (wsc *wsConnection) readRoutine() {
  336. // Do not close writeChan, to allow WriteRPCResponse() to fail.
  337. // defer close(wsc.writeChan)
  338. for {
  339. select {
  340. case <-wsc.Quit:
  341. return
  342. default:
  343. var in []byte
  344. // Do not set a deadline here like below:
  345. // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds))
  346. // The client may not send anything for a while.
  347. // We use `readTimeout` to handle read timeouts.
  348. _, in, err := wsc.baseConn.ReadMessage()
  349. if err != nil {
  350. log.Notice("Failed to read from connection", "remote", wsc.remoteAddr, "err", err.Error())
  351. // an error reading the connection,
  352. // kill the connection
  353. wsc.Stop()
  354. return
  355. }
  356. var request RPCRequest
  357. err = json.Unmarshal(in, &request)
  358. if err != nil {
  359. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  360. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr))
  361. continue
  362. }
  363. // Now, fetch the RPCFunc and execute it.
  364. rpcFunc := wsc.funcMap[request.Method]
  365. if rpcFunc == nil {
  366. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  367. continue
  368. }
  369. var args []reflect.Value
  370. if rpcFunc.ws {
  371. wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc}
  372. args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx)
  373. } else {
  374. args, err = jsonParamsToArgs(rpcFunc, request.Params)
  375. }
  376. if err != nil {
  377. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
  378. continue
  379. }
  380. returns := rpcFunc.f.Call(args)
  381. log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns)
  382. result, err := unreflectResult(returns)
  383. if err != nil {
  384. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
  385. continue
  386. } else {
  387. wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, ""))
  388. continue
  389. }
  390. }
  391. }
  392. }
  393. // receives on a write channel and writes out on the socket
  394. func (wsc *wsConnection) writeRoutine() {
  395. defer wsc.baseConn.Close()
  396. for {
  397. select {
  398. case <-wsc.Quit:
  399. return
  400. case <-wsc.pingTicker.C:
  401. err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{})
  402. if err != nil {
  403. log.Error("Failed to write ping message on websocket", "error", err)
  404. wsc.Stop()
  405. return
  406. }
  407. case msg := <-wsc.writeChan:
  408. jsonBytes, err := json.Marshal(msg)
  409. if err != nil {
  410. log.Error("Failed to marshal RPCResponse to JSON", "error", err)
  411. } else {
  412. wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
  413. if err = wsc.baseConn.WriteMessage(websocket.TextMessage, jsonBytes); err != nil {
  414. log.Warn("Failed to write response on websocket", "error", err)
  415. wsc.Stop()
  416. return
  417. }
  418. }
  419. }
  420. }
  421. }
  422. //----------------------------------------
  423. // Main manager for all websocket connections
  424. // Holds the event switch
  425. // NOTE: The websocket path is defined externally, e.g. in node/node.go
  426. type WebsocketManager struct {
  427. websocket.Upgrader
  428. funcMap map[string]*RPCFunc
  429. evsw *events.EventSwitch
  430. }
  431. func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WebsocketManager {
  432. return &WebsocketManager{
  433. funcMap: funcMap,
  434. evsw: evsw,
  435. Upgrader: websocket.Upgrader{
  436. ReadBufferSize: 1024,
  437. WriteBufferSize: 1024,
  438. CheckOrigin: func(r *http.Request) bool {
  439. // TODO
  440. return true
  441. },
  442. },
  443. }
  444. }
  445. // Upgrade the request/response (via http.Hijack) and starts the wsConnection.
  446. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
  447. wsConn, err := wm.Upgrade(w, r, nil)
  448. if err != nil {
  449. // TODO - return http error
  450. log.Error("Failed to upgrade to websocket connection", "error", err)
  451. return
  452. }
  453. // register connection
  454. con := NewWSConnection(wsConn, wm.funcMap, wm.evsw)
  455. log.Notice("New websocket connection", "remote", con.remoteAddr)
  456. con.Start() // Blocking
  457. }
  458. // rpc.websocket
  459. //-----------------------------------------------------------------------------
  460. // NOTE: assume returns is result struct and error. If error is not nil, return it
  461. func unreflectResult(returns []reflect.Value) (interface{}, error) {
  462. errV := returns[1]
  463. if errV.Interface() != nil {
  464. return nil, fmt.Errorf("%v", errV.Interface())
  465. }
  466. rv := returns[0]
  467. // the result is a registered interface,
  468. // we need a pointer to it so we can marshal with type byte
  469. rvp := reflect.New(rv.Type())
  470. rvp.Elem().Set(rv)
  471. return rvp.Interface(), nil
  472. }
  473. // writes a list of available rpc endpoints as an html page
  474. func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
  475. noArgNames := []string{}
  476. argNames := []string{}
  477. for name, funcData := range funcMap {
  478. if len(funcData.args) == 0 {
  479. noArgNames = append(noArgNames, name)
  480. } else {
  481. argNames = append(argNames, name)
  482. }
  483. }
  484. sort.Strings(noArgNames)
  485. sort.Strings(argNames)
  486. buf := new(bytes.Buffer)
  487. buf.WriteString("<html><body>")
  488. buf.WriteString("<br>Available endpoints:<br>")
  489. for _, name := range noArgNames {
  490. link := fmt.Sprintf("http://%s/%s", r.Host, name)
  491. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  492. }
  493. buf.WriteString("<br>Endpoints that require arguments:<br>")
  494. for _, name := range argNames {
  495. link := fmt.Sprintf("http://%s/%s?", r.Host, name)
  496. funcData := funcMap[name]
  497. for i, argName := range funcData.argNames {
  498. link += argName + "=_"
  499. if i < len(funcData.argNames)-1 {
  500. link += "&"
  501. }
  502. }
  503. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  504. }
  505. buf.WriteString("</body></html>")
  506. w.Header().Set("Content-Type", "text/html")
  507. w.WriteHeader(200)
  508. w.Write(buf.Bytes())
  509. }