You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

561 lines
16 KiB

  1. package rpcserver
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "reflect"
  10. "sort"
  11. "strings"
  12. "time"
  13. "github.com/gorilla/websocket"
  14. . "github.com/tendermint/go-common"
  15. "github.com/tendermint/go-events"
  16. . "github.com/tendermint/go-rpc/types"
  17. "github.com/tendermint/go-wire"
  18. )
  19. // Adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions.
  20. // "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse
  21. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  22. // HTTP endpoints
  23. for funcName, rpcFunc := range funcMap {
  24. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  25. }
  26. // JSONRPC endpoints
  27. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  28. }
  29. //-------------------------------------
  30. // function introspection
  31. // holds all type information for each function
  32. type RPCFunc struct {
  33. f reflect.Value // underlying rpc function
  34. args []reflect.Type // type of each function arg
  35. returns []reflect.Type // type of each return arg
  36. argNames []string // name of each argument
  37. ws bool // websocket only
  38. }
  39. // wraps a function for quicker introspection
  40. // f is the function, args are comma separated argument names
  41. func NewRPCFunc(f interface{}, args string) *RPCFunc {
  42. return newRPCFunc(f, args, false)
  43. }
  44. func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
  45. return newRPCFunc(f, args, true)
  46. }
  47. func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc {
  48. var argNames []string
  49. if args != "" {
  50. argNames = strings.Split(args, ",")
  51. }
  52. return &RPCFunc{
  53. f: reflect.ValueOf(f),
  54. args: funcArgTypes(f),
  55. returns: funcReturnTypes(f),
  56. argNames: argNames,
  57. ws: ws,
  58. }
  59. }
  60. // return a function's argument types
  61. func funcArgTypes(f interface{}) []reflect.Type {
  62. t := reflect.TypeOf(f)
  63. n := t.NumIn()
  64. typez := make([]reflect.Type, n)
  65. for i := 0; i < n; i++ {
  66. typez[i] = t.In(i)
  67. }
  68. return typez
  69. }
  70. // return a function's return types
  71. func funcReturnTypes(f interface{}) []reflect.Type {
  72. t := reflect.TypeOf(f)
  73. n := t.NumOut()
  74. typez := make([]reflect.Type, n)
  75. for i := 0; i < n; i++ {
  76. typez[i] = t.Out(i)
  77. }
  78. return typez
  79. }
  80. // function introspection
  81. //-----------------------------------------------------------------------------
  82. // rpc.json
  83. // jsonrpc calls grab the given method's function info and runs reflect.Call
  84. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  85. return func(w http.ResponseWriter, r *http.Request) {
  86. b, _ := ioutil.ReadAll(r.Body)
  87. // if its an empty request (like from a browser),
  88. // just display a list of functions
  89. if len(b) == 0 {
  90. writeListOfEndpoints(w, r, funcMap)
  91. return
  92. }
  93. var request RPCRequest
  94. err := json.Unmarshal(b, &request)
  95. if err != nil {
  96. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
  97. return
  98. }
  99. if len(r.URL.Path) > 1 {
  100. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  101. return
  102. }
  103. rpcFunc := funcMap[request.Method]
  104. if rpcFunc == nil {
  105. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  106. return
  107. }
  108. if rpcFunc.ws {
  109. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method))
  110. return
  111. }
  112. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  113. if err != nil {
  114. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error()))
  115. return
  116. }
  117. returns := rpcFunc.f.Call(args)
  118. log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  119. result, err := unreflectResult(returns)
  120. if err != nil {
  121. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error()))
  122. return
  123. }
  124. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, ""))
  125. }
  126. }
  127. // Convert a list of interfaces to properly typed values
  128. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  129. if len(rpcFunc.argNames) != len(params) {
  130. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  131. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
  132. }
  133. values := make([]reflect.Value, len(params))
  134. for i, p := range params {
  135. ty := rpcFunc.args[i]
  136. v, err := _jsonObjectToArg(ty, p)
  137. if err != nil {
  138. return nil, err
  139. }
  140. values[i] = v
  141. }
  142. return values, nil
  143. }
  144. // Same as above, but with the first param the websocket connection
  145. func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) {
  146. if len(rpcFunc.argNames) != len(params) {
  147. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  148. len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params))
  149. }
  150. values := make([]reflect.Value, len(params)+1)
  151. values[0] = reflect.ValueOf(wsCtx)
  152. for i, p := range params {
  153. ty := rpcFunc.args[i+1]
  154. v, err := _jsonObjectToArg(ty, p)
  155. if err != nil {
  156. return nil, err
  157. }
  158. values[i+1] = v
  159. }
  160. return values, nil
  161. }
  162. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  163. var err error
  164. v := reflect.New(ty)
  165. wire.ReadJSONObjectPtr(v.Interface(), object, &err)
  166. if err != nil {
  167. return v, err
  168. }
  169. v = v.Elem()
  170. return v, nil
  171. }
  172. // rpc.json
  173. //-----------------------------------------------------------------------------
  174. // rpc.http
  175. // convert from a function name to the http handler
  176. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  177. // Exception for websocket endpoints
  178. if rpcFunc.ws {
  179. return func(w http.ResponseWriter, r *http.Request) {
  180. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets"))
  181. }
  182. }
  183. // All other endpoints
  184. return func(w http.ResponseWriter, r *http.Request) {
  185. args, err := httpParamsToArgs(rpcFunc, r)
  186. if err != nil {
  187. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
  188. return
  189. }
  190. returns := rpcFunc.f.Call(args)
  191. log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  192. result, err := unreflectResult(returns)
  193. if err != nil {
  194. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
  195. return
  196. }
  197. WriteRPCResponseHTTP(w, NewRPCResponse("", result, ""))
  198. }
  199. }
  200. // Covert an http query to a list of properly typed values.
  201. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  202. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  203. argTypes := rpcFunc.args
  204. argNames := rpcFunc.argNames
  205. var err error
  206. values := make([]reflect.Value, len(argNames))
  207. for i, name := range argNames {
  208. ty := argTypes[i]
  209. arg := GetParam(r, name)
  210. values[i], err = _jsonStringToArg(ty, arg)
  211. if err != nil {
  212. return nil, err
  213. }
  214. }
  215. return values, nil
  216. }
  217. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  218. var err error
  219. v := reflect.New(ty)
  220. wire.ReadJSONPtr(v.Interface(), []byte(arg), &err)
  221. if err != nil {
  222. return v, err
  223. }
  224. v = v.Elem()
  225. return v, nil
  226. }
  227. // rpc.http
  228. //-----------------------------------------------------------------------------
  229. // rpc.websocket
  230. const (
  231. writeChanCapacity = 1000
  232. wsWriteTimeoutSeconds = 30 // each write times out after this
  233. wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings.
  234. wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds.
  235. )
  236. // a single websocket connection
  237. // contains listener id, underlying ws connection,
  238. // and the event switch for subscribing to events
  239. type wsConnection struct {
  240. QuitService
  241. remoteAddr string
  242. baseConn *websocket.Conn
  243. writeChan chan RPCResponse
  244. readTimeout *time.Timer
  245. pingTicker *time.Ticker
  246. funcMap map[string]*RPCFunc
  247. evsw *events.EventSwitch
  248. }
  249. // new websocket connection wrapper
  250. func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *wsConnection {
  251. wsc := &wsConnection{
  252. remoteAddr: baseConn.RemoteAddr().String(),
  253. baseConn: baseConn,
  254. writeChan: make(chan RPCResponse, writeChanCapacity), // error when full.
  255. funcMap: funcMap,
  256. evsw: evsw,
  257. }
  258. wsc.QuitService = *NewQuitService(log, "wsConnection", wsc)
  259. return wsc
  260. }
  261. // wsc.Start() blocks until the connection closes.
  262. func (wsc *wsConnection) OnStart() error {
  263. wsc.QuitService.OnStart()
  264. // Read subscriptions/unsubscriptions to events
  265. go wsc.readRoutine()
  266. // Custom Ping handler to touch readTimeout
  267. wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds)
  268. wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds)
  269. wsc.baseConn.SetPingHandler(func(m string) error {
  270. // NOTE: https://github.com/gorilla/websocket/issues/97
  271. go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  272. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  273. return nil
  274. })
  275. wsc.baseConn.SetPongHandler(func(m string) error {
  276. // NOTE: https://github.com/gorilla/websocket/issues/97
  277. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  278. return nil
  279. })
  280. go wsc.readTimeoutRoutine()
  281. // Write responses, BLOCKING.
  282. wsc.writeRoutine()
  283. return nil
  284. }
  285. func (wsc *wsConnection) OnStop() {
  286. wsc.QuitService.OnStop()
  287. wsc.evsw.RemoveListener(wsc.remoteAddr)
  288. wsc.readTimeout.Stop()
  289. wsc.pingTicker.Stop()
  290. // The write loop closes the websocket connection
  291. // when it exits its loop, and the read loop
  292. // closes the writeChan
  293. }
  294. func (wsc *wsConnection) readTimeoutRoutine() {
  295. select {
  296. case <-wsc.readTimeout.C:
  297. log.Notice("Stopping connection due to read timeout")
  298. wsc.Stop()
  299. case <-wsc.Quit:
  300. return
  301. }
  302. }
  303. // Implements WSRPCConnection
  304. func (wsc *wsConnection) GetRemoteAddr() string {
  305. return wsc.remoteAddr
  306. }
  307. // Implements WSRPCConnection
  308. func (wsc *wsConnection) GetEventSwitch() *events.EventSwitch {
  309. return wsc.evsw
  310. }
  311. // Implements WSRPCConnection
  312. // Blocking write to writeChan until service stops.
  313. func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) {
  314. select {
  315. case <-wsc.Quit:
  316. return
  317. case wsc.writeChan <- resp:
  318. }
  319. }
  320. // Implements WSRPCConnection
  321. // Nonblocking write.
  322. func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool {
  323. select {
  324. case <-wsc.Quit:
  325. return false
  326. case wsc.writeChan <- resp:
  327. return true
  328. default:
  329. return false
  330. }
  331. }
  332. // Read from the socket and subscribe to or unsubscribe from events
  333. func (wsc *wsConnection) readRoutine() {
  334. // Do not close writeChan, to allow WriteRPCResponse() to fail.
  335. // defer close(wsc.writeChan)
  336. for {
  337. select {
  338. case <-wsc.Quit:
  339. return
  340. default:
  341. var in []byte
  342. // Do not set a deadline here like below:
  343. // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds))
  344. // The client may not send anything for a while.
  345. // We use `readTimeout` to handle read timeouts.
  346. _, in, err := wsc.baseConn.ReadMessage()
  347. if err != nil {
  348. log.Notice("Failed to read from connection", "remote", wsc.remoteAddr)
  349. // an error reading the connection,
  350. // kill the connection
  351. wsc.Stop()
  352. return
  353. }
  354. var request RPCRequest
  355. err = json.Unmarshal(in, &request)
  356. if err != nil {
  357. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  358. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr))
  359. continue
  360. }
  361. // Now, fetch the RPCFunc and execute it.
  362. rpcFunc := wsc.funcMap[request.Method]
  363. if rpcFunc == nil {
  364. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  365. continue
  366. }
  367. var args []reflect.Value
  368. if rpcFunc.ws {
  369. wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc}
  370. args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx)
  371. } else {
  372. args, err = jsonParamsToArgs(rpcFunc, request.Params)
  373. }
  374. if err != nil {
  375. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
  376. continue
  377. }
  378. returns := rpcFunc.f.Call(args)
  379. log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns)
  380. result, err := unreflectResult(returns)
  381. if err != nil {
  382. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
  383. continue
  384. } else {
  385. wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, ""))
  386. continue
  387. }
  388. }
  389. }
  390. }
  391. // receives on a write channel and writes out on the socket
  392. func (wsc *wsConnection) writeRoutine() {
  393. defer wsc.baseConn.Close()
  394. for {
  395. select {
  396. case <-wsc.Quit:
  397. return
  398. case <-wsc.pingTicker.C:
  399. err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{})
  400. if err != nil {
  401. log.Error("Failed to write ping message on websocket", "error", err)
  402. wsc.Stop()
  403. return
  404. }
  405. case msg := <-wsc.writeChan:
  406. jsonBytes, err := json.Marshal(msg)
  407. if err != nil {
  408. log.Error("Failed to marshal RPCResponse to JSON", "error", err)
  409. } else {
  410. wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
  411. if err = wsc.baseConn.WriteMessage(websocket.TextMessage, jsonBytes); err != nil {
  412. log.Warn("Failed to write response on websocket", "error", err)
  413. wsc.Stop()
  414. return
  415. }
  416. }
  417. }
  418. }
  419. }
  420. //----------------------------------------
  421. // Main manager for all websocket connections
  422. // Holds the event switch
  423. // NOTE: The websocket path is defined externally, e.g. in node/node.go
  424. type WebsocketManager struct {
  425. websocket.Upgrader
  426. funcMap map[string]*RPCFunc
  427. evsw *events.EventSwitch
  428. }
  429. func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WebsocketManager {
  430. return &WebsocketManager{
  431. funcMap: funcMap,
  432. evsw: evsw,
  433. Upgrader: websocket.Upgrader{
  434. ReadBufferSize: 1024,
  435. WriteBufferSize: 1024,
  436. CheckOrigin: func(r *http.Request) bool {
  437. // TODO
  438. return true
  439. },
  440. },
  441. }
  442. }
  443. // Upgrade the request/response (via http.Hijack) and starts the wsConnection.
  444. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
  445. wsConn, err := wm.Upgrade(w, r, nil)
  446. if err != nil {
  447. // TODO - return http error
  448. log.Error("Failed to upgrade to websocket connection", "error", err)
  449. return
  450. }
  451. // register connection
  452. con := NewWSConnection(wsConn, wm.funcMap, wm.evsw)
  453. log.Notice("New websocket connection", "remote", con.remoteAddr)
  454. con.Start() // Blocking
  455. }
  456. // rpc.websocket
  457. //-----------------------------------------------------------------------------
  458. // NOTE: assume returns is result struct and error. If error is not nil, return it
  459. func unreflectResult(returns []reflect.Value) (interface{}, error) {
  460. errV := returns[1]
  461. if errV.Interface() != nil {
  462. return nil, fmt.Errorf("%v", errV.Interface())
  463. }
  464. rv := returns[0]
  465. // the result is a registered interface,
  466. // we need a pointer to it so we can marshal with type byte
  467. rvp := reflect.New(rv.Type())
  468. rvp.Elem().Set(rv)
  469. return rvp.Interface(), nil
  470. }
  471. // writes a list of available rpc endpoints as an html page
  472. func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
  473. noArgNames := []string{}
  474. argNames := []string{}
  475. for name, funcData := range funcMap {
  476. if len(funcData.args) == 0 {
  477. noArgNames = append(noArgNames, name)
  478. } else {
  479. argNames = append(argNames, name)
  480. }
  481. }
  482. sort.Strings(noArgNames)
  483. sort.Strings(argNames)
  484. buf := new(bytes.Buffer)
  485. buf.WriteString("<html><body>")
  486. buf.WriteString("<br>Available endpoints:<br>")
  487. for _, name := range noArgNames {
  488. link := fmt.Sprintf("http://%s/%s", r.Host, name)
  489. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  490. }
  491. buf.WriteString("<br>Endpoints that require arguments:<br>")
  492. for _, name := range argNames {
  493. link := fmt.Sprintf("http://%s/%s?", r.Host, name)
  494. funcData := funcMap[name]
  495. for i, argName := range funcData.argNames {
  496. link += argName + "=_"
  497. if i < len(funcData.argNames)-1 {
  498. link += "&"
  499. }
  500. }
  501. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  502. }
  503. buf.WriteString("</body></html>")
  504. w.Header().Set("Content-Type", "text/html")
  505. w.WriteHeader(200)
  506. w.Write(buf.Bytes())
  507. }