You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

557 lines
16 KiB

  1. package rpcserver
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "reflect"
  10. "sort"
  11. "strings"
  12. "time"
  13. "github.com/gorilla/websocket"
  14. . "github.com/tendermint/go-common"
  15. "github.com/tendermint/go-events"
  16. . "github.com/tendermint/go-rpc/types"
  17. "github.com/tendermint/go-wire"
  18. )
  19. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  20. // HTTP endpoints
  21. for funcName, rpcFunc := range funcMap {
  22. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  23. }
  24. // JSONRPC endpoints
  25. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  26. }
  27. //-------------------------------------
  28. // function introspection
  29. // holds all type information for each function
  30. type RPCFunc struct {
  31. f reflect.Value // underlying rpc function
  32. args []reflect.Type // type of each function arg
  33. returns []reflect.Type // type of each return arg
  34. argNames []string // name of each argument
  35. ws bool // websocket only
  36. }
  37. // wraps a function for quicker introspection
  38. // f is the function, args are comma separated argument names
  39. func NewRPCFunc(f interface{}, args string) *RPCFunc {
  40. return newRPCFunc(f, args, false)
  41. }
  42. func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
  43. return newRPCFunc(f, args, true)
  44. }
  45. func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc {
  46. var argNames []string
  47. if args != "" {
  48. argNames = strings.Split(args, ",")
  49. }
  50. return &RPCFunc{
  51. f: reflect.ValueOf(f),
  52. args: funcArgTypes(f),
  53. returns: funcReturnTypes(f),
  54. argNames: argNames,
  55. ws: ws,
  56. }
  57. }
  58. // return a function's argument types
  59. func funcArgTypes(f interface{}) []reflect.Type {
  60. t := reflect.TypeOf(f)
  61. n := t.NumIn()
  62. typez := make([]reflect.Type, n)
  63. for i := 0; i < n; i++ {
  64. typez[i] = t.In(i)
  65. }
  66. return typez
  67. }
  68. // return a function's return types
  69. func funcReturnTypes(f interface{}) []reflect.Type {
  70. t := reflect.TypeOf(f)
  71. n := t.NumOut()
  72. typez := make([]reflect.Type, n)
  73. for i := 0; i < n; i++ {
  74. typez[i] = t.Out(i)
  75. }
  76. return typez
  77. }
  78. // function introspection
  79. //-----------------------------------------------------------------------------
  80. // rpc.json
  81. // jsonrpc calls grab the given method's function info and runs reflect.Call
  82. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  83. return func(w http.ResponseWriter, r *http.Request) {
  84. b, _ := ioutil.ReadAll(r.Body)
  85. // if its an empty request (like from a browser),
  86. // just display a list of functions
  87. if len(b) == 0 {
  88. writeListOfEndpoints(w, r, funcMap)
  89. return
  90. }
  91. var request RPCRequest
  92. err := json.Unmarshal(b, &request)
  93. if err != nil {
  94. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
  95. return
  96. }
  97. if len(r.URL.Path) > 1 {
  98. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  99. return
  100. }
  101. rpcFunc := funcMap[request.Method]
  102. if rpcFunc == nil {
  103. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  104. return
  105. }
  106. if rpcFunc.ws {
  107. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method))
  108. return
  109. }
  110. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  111. if err != nil {
  112. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error()))
  113. return
  114. }
  115. returns := rpcFunc.f.Call(args)
  116. log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  117. result, err := unreflectResult(returns)
  118. if err != nil {
  119. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error()))
  120. return
  121. }
  122. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, ""))
  123. }
  124. }
  125. // Convert a list of interfaces to properly typed values
  126. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  127. if len(rpcFunc.argNames) != len(params) {
  128. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  129. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
  130. }
  131. values := make([]reflect.Value, len(params))
  132. for i, p := range params {
  133. ty := rpcFunc.args[i]
  134. v, err := _jsonObjectToArg(ty, p)
  135. if err != nil {
  136. return nil, err
  137. }
  138. values[i] = v
  139. }
  140. return values, nil
  141. }
  142. // Same as above, but with the first param the websocket connection
  143. func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) {
  144. if len(rpcFunc.argNames) != len(params) {
  145. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  146. len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params))
  147. }
  148. values := make([]reflect.Value, len(params)+1)
  149. values[0] = reflect.ValueOf(wsCtx)
  150. for i, p := range params {
  151. ty := rpcFunc.args[i+1]
  152. v, err := _jsonObjectToArg(ty, p)
  153. if err != nil {
  154. return nil, err
  155. }
  156. values[i+1] = v
  157. }
  158. return values, nil
  159. }
  160. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  161. var err error
  162. v := reflect.New(ty)
  163. wire.ReadJSONObjectPtr(v.Interface(), object, &err)
  164. if err != nil {
  165. return v, err
  166. }
  167. v = v.Elem()
  168. return v, nil
  169. }
  170. // rpc.json
  171. //-----------------------------------------------------------------------------
  172. // rpc.http
  173. // convert from a function name to the http handler
  174. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  175. // Exception for websocket endpoints
  176. if rpcFunc.ws {
  177. return func(w http.ResponseWriter, r *http.Request) {
  178. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets"))
  179. }
  180. }
  181. // All other endpoints
  182. return func(w http.ResponseWriter, r *http.Request) {
  183. args, err := httpParamsToArgs(rpcFunc, r)
  184. if err != nil {
  185. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
  186. return
  187. }
  188. returns := rpcFunc.f.Call(args)
  189. log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  190. result, err := unreflectResult(returns)
  191. if err != nil {
  192. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
  193. return
  194. }
  195. WriteRPCResponseHTTP(w, NewRPCResponse("", result, ""))
  196. }
  197. }
  198. // Covert an http query to a list of properly typed values.
  199. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  200. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  201. argTypes := rpcFunc.args
  202. argNames := rpcFunc.argNames
  203. var err error
  204. values := make([]reflect.Value, len(argNames))
  205. for i, name := range argNames {
  206. ty := argTypes[i]
  207. arg := GetParam(r, name)
  208. values[i], err = _jsonStringToArg(ty, arg)
  209. if err != nil {
  210. return nil, err
  211. }
  212. }
  213. return values, nil
  214. }
  215. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  216. var err error
  217. v := reflect.New(ty)
  218. wire.ReadJSONPtr(v.Interface(), []byte(arg), &err)
  219. if err != nil {
  220. return v, err
  221. }
  222. v = v.Elem()
  223. return v, nil
  224. }
  225. // rpc.http
  226. //-----------------------------------------------------------------------------
  227. // rpc.websocket
  228. const (
  229. writeChanCapacity = 1000
  230. wsWriteTimeoutSeconds = 30 // each write times out after this
  231. wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings.
  232. wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds.
  233. )
  234. // a single websocket connection
  235. // contains listener id, underlying ws connection,
  236. // and the event switch for subscribing to events
  237. type wsConnection struct {
  238. QuitService
  239. remoteAddr string
  240. baseConn *websocket.Conn
  241. writeChan chan RPCResponse
  242. readTimeout *time.Timer
  243. pingTicker *time.Ticker
  244. funcMap map[string]*RPCFunc
  245. evsw *events.EventSwitch
  246. }
  247. // new websocket connection wrapper
  248. func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *wsConnection {
  249. wsc := &wsConnection{
  250. remoteAddr: baseConn.RemoteAddr().String(),
  251. baseConn: baseConn,
  252. writeChan: make(chan RPCResponse, writeChanCapacity), // error when full.
  253. funcMap: funcMap,
  254. evsw: evsw,
  255. }
  256. wsc.QuitService = *NewQuitService(log, "wsConnection", wsc)
  257. return wsc
  258. }
  259. // wsc.Start() blocks until the connection closes.
  260. func (wsc *wsConnection) OnStart() error {
  261. wsc.QuitService.OnStart()
  262. // Read subscriptions/unsubscriptions to events
  263. go wsc.readRoutine()
  264. // Custom Ping handler to touch readTimeout
  265. wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds)
  266. wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds)
  267. wsc.baseConn.SetPingHandler(func(m string) error {
  268. // NOTE: https://github.com/gorilla/websocket/issues/97
  269. go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  270. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  271. return nil
  272. })
  273. wsc.baseConn.SetPongHandler(func(m string) error {
  274. // NOTE: https://github.com/gorilla/websocket/issues/97
  275. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  276. return nil
  277. })
  278. go wsc.readTimeoutRoutine()
  279. // Write responses, BLOCKING.
  280. wsc.writeRoutine()
  281. return nil
  282. }
  283. func (wsc *wsConnection) OnStop() {
  284. wsc.QuitService.OnStop()
  285. wsc.evsw.RemoveListener(wsc.remoteAddr)
  286. wsc.readTimeout.Stop()
  287. wsc.pingTicker.Stop()
  288. // The write loop closes the websocket connection
  289. // when it exits its loop, and the read loop
  290. // closes the writeChan
  291. }
  292. func (wsc *wsConnection) readTimeoutRoutine() {
  293. select {
  294. case <-wsc.readTimeout.C:
  295. log.Notice("Stopping connection due to read timeout")
  296. wsc.Stop()
  297. case <-wsc.Quit:
  298. return
  299. }
  300. }
  301. // Implements WSRPCConnection
  302. func (wsc *wsConnection) GetRemoteAddr() string {
  303. return wsc.remoteAddr
  304. }
  305. // Implements WSRPCConnection
  306. func (wsc *wsConnection) GetEventSwitch() *events.EventSwitch {
  307. return wsc.evsw
  308. }
  309. // Implements WSRPCConnection
  310. // Blocking write to writeChan until service stops.
  311. func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) {
  312. select {
  313. case <-wsc.Quit:
  314. return
  315. case wsc.writeChan <- resp:
  316. }
  317. }
  318. // Implements WSRPCConnection
  319. // Nonblocking write.
  320. func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool {
  321. select {
  322. case <-wsc.Quit:
  323. return false
  324. case wsc.writeChan <- resp:
  325. return true
  326. default:
  327. return false
  328. }
  329. }
  330. // Read from the socket and subscribe to or unsubscribe from events
  331. func (wsc *wsConnection) readRoutine() {
  332. // Do not close writeChan, to allow WriteRPCResponse() to fail.
  333. // defer close(wsc.writeChan)
  334. for {
  335. select {
  336. case <-wsc.Quit:
  337. return
  338. default:
  339. var in []byte
  340. // Do not set a deadline here like below:
  341. // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds))
  342. // The client may not send anything for a while.
  343. // We use `readTimeout` to handle read timeouts.
  344. _, in, err := wsc.baseConn.ReadMessage()
  345. if err != nil {
  346. log.Notice("Failed to read from connection", "remote", wsc.remoteAddr)
  347. // an error reading the connection,
  348. // kill the connection
  349. wsc.Stop()
  350. return
  351. }
  352. var request RPCRequest
  353. err = json.Unmarshal(in, &request)
  354. if err != nil {
  355. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  356. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr))
  357. continue
  358. }
  359. // Now, fetch the RPCFunc and execute it.
  360. rpcFunc := wsc.funcMap[request.Method]
  361. if rpcFunc == nil {
  362. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  363. continue
  364. }
  365. var args []reflect.Value
  366. if rpcFunc.ws {
  367. wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc}
  368. args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx)
  369. } else {
  370. args, err = jsonParamsToArgs(rpcFunc, request.Params)
  371. }
  372. if err != nil {
  373. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
  374. continue
  375. }
  376. returns := rpcFunc.f.Call(args)
  377. log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns)
  378. result, err := unreflectResult(returns)
  379. if err != nil {
  380. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
  381. continue
  382. } else {
  383. wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, ""))
  384. continue
  385. }
  386. }
  387. }
  388. }
  389. // receives on a write channel and writes out on the socket
  390. func (wsc *wsConnection) writeRoutine() {
  391. defer wsc.baseConn.Close()
  392. var n, err = int(0), error(nil)
  393. for {
  394. select {
  395. case <-wsc.Quit:
  396. return
  397. case <-wsc.pingTicker.C:
  398. err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{})
  399. if err != nil {
  400. log.Error("Failed to write ping message on websocket", "error", err)
  401. wsc.Stop()
  402. return
  403. }
  404. case msg := <-wsc.writeChan:
  405. buf := new(bytes.Buffer)
  406. wire.WriteJSON(msg, buf, &n, &err)
  407. if err != nil {
  408. log.Error("Failed to marshal RPCResponse to JSON", "error", err)
  409. } else {
  410. wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
  411. bufBytes := buf.Bytes()
  412. if err = wsc.baseConn.WriteMessage(websocket.TextMessage, bufBytes); err != nil {
  413. log.Warn("Failed to write response on websocket", "error", err)
  414. wsc.Stop()
  415. return
  416. }
  417. }
  418. }
  419. }
  420. }
  421. //----------------------------------------
  422. // Main manager for all websocket connections
  423. // Holds the event switch
  424. // NOTE: The websocket path is defined externally, e.g. in node/node.go
  425. type WebsocketManager struct {
  426. websocket.Upgrader
  427. funcMap map[string]*RPCFunc
  428. evsw *events.EventSwitch
  429. }
  430. func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WebsocketManager {
  431. return &WebsocketManager{
  432. funcMap: funcMap,
  433. evsw: evsw,
  434. Upgrader: websocket.Upgrader{
  435. ReadBufferSize: 1024,
  436. WriteBufferSize: 1024,
  437. CheckOrigin: func(r *http.Request) bool {
  438. // TODO
  439. return true
  440. },
  441. },
  442. }
  443. }
  444. // Upgrade the request/response (via http.Hijack) and starts the wsConnection.
  445. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
  446. wsConn, err := wm.Upgrade(w, r, nil)
  447. if err != nil {
  448. // TODO - return http error
  449. log.Error("Failed to upgrade to websocket connection", "error", err)
  450. return
  451. }
  452. // register connection
  453. con := NewWSConnection(wsConn, wm.funcMap, wm.evsw)
  454. log.Notice("New websocket connection", "remote", con.remoteAddr)
  455. con.Start() // Blocking
  456. }
  457. // rpc.websocket
  458. //-----------------------------------------------------------------------------
  459. // returns is result struct and error. If error is not nil, return it
  460. func unreflectResult(returns []reflect.Value) (interface{}, error) {
  461. errV := returns[1]
  462. if errV.Interface() != nil {
  463. return nil, fmt.Errorf("%v", errV.Interface())
  464. }
  465. return returns[0].Interface(), nil
  466. }
  467. // writes a list of available rpc endpoints as an html page
  468. func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
  469. noArgNames := []string{}
  470. argNames := []string{}
  471. for name, funcData := range funcMap {
  472. if len(funcData.args) == 0 {
  473. noArgNames = append(noArgNames, name)
  474. } else {
  475. argNames = append(argNames, name)
  476. }
  477. }
  478. sort.Strings(noArgNames)
  479. sort.Strings(argNames)
  480. buf := new(bytes.Buffer)
  481. buf.WriteString("<html><body>")
  482. buf.WriteString("<br>Available endpoints:<br>")
  483. for _, name := range noArgNames {
  484. link := fmt.Sprintf("http://%s/%s", r.Host, name)
  485. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  486. }
  487. buf.WriteString("<br>Endpoints that require arguments:<br>")
  488. for _, name := range argNames {
  489. link := fmt.Sprintf("http://%s/%s?", r.Host, name)
  490. funcData := funcMap[name]
  491. for i, argName := range funcData.argNames {
  492. link += argName + "=_"
  493. if i < len(funcData.argNames)-1 {
  494. link += "&"
  495. }
  496. }
  497. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  498. }
  499. buf.WriteString("</body></html>")
  500. w.Header().Set("Content-Type", "text/html")
  501. w.WriteHeader(200)
  502. w.Write(buf.Bytes())
  503. }