You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

553 lines
15 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package rpcserver
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "reflect"
  10. "sort"
  11. "time"
  12. "github.com/gorilla/websocket"
  13. . "github.com/tendermint/go-common"
  14. "github.com/tendermint/go-wire"
  15. "github.com/tendermint/tendermint/events"
  16. . "github.com/tendermint/tendermint/rpc/types"
  17. )
  18. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  19. // HTTP endpoints
  20. for funcName, rpcFunc := range funcMap {
  21. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  22. }
  23. // JSONRPC endpoints
  24. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  25. }
  26. //-------------------------------------
  27. // function introspection
  28. // holds all type information for each function
  29. type RPCFunc struct {
  30. f reflect.Value // underlying rpc function
  31. args []reflect.Type // type of each function arg
  32. returns []reflect.Type // type of each return arg
  33. argNames []string // name of each argument
  34. ws bool // websocket only
  35. }
  36. // wraps a function for quicker introspection
  37. func NewRPCFunc(f interface{}, args []string) *RPCFunc {
  38. return &RPCFunc{
  39. f: reflect.ValueOf(f),
  40. args: funcArgTypes(f),
  41. returns: funcReturnTypes(f),
  42. argNames: args,
  43. ws: false,
  44. }
  45. }
  46. func NewWSRPCFunc(f interface{}, args []string) *RPCFunc {
  47. return &RPCFunc{
  48. f: reflect.ValueOf(f),
  49. args: funcArgTypes(f),
  50. returns: funcReturnTypes(f),
  51. argNames: args,
  52. ws: true,
  53. }
  54. }
  55. // return a function's argument types
  56. func funcArgTypes(f interface{}) []reflect.Type {
  57. t := reflect.TypeOf(f)
  58. n := t.NumIn()
  59. typez := make([]reflect.Type, n)
  60. for i := 0; i < n; i++ {
  61. typez[i] = t.In(i)
  62. }
  63. return typez
  64. }
  65. // return a function's return types
  66. func funcReturnTypes(f interface{}) []reflect.Type {
  67. t := reflect.TypeOf(f)
  68. n := t.NumOut()
  69. typez := make([]reflect.Type, n)
  70. for i := 0; i < n; i++ {
  71. typez[i] = t.Out(i)
  72. }
  73. return typez
  74. }
  75. // function introspection
  76. //-----------------------------------------------------------------------------
  77. // rpc.json
  78. // jsonrpc calls grab the given method's function info and runs reflect.Call
  79. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  80. return func(w http.ResponseWriter, r *http.Request) {
  81. b, _ := ioutil.ReadAll(r.Body)
  82. // if its an empty request (like from a browser),
  83. // just display a list of functions
  84. if len(b) == 0 {
  85. writeListOfEndpoints(w, r, funcMap)
  86. return
  87. }
  88. var request RPCRequest
  89. err := json.Unmarshal(b, &request)
  90. if err != nil {
  91. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
  92. return
  93. }
  94. if len(r.URL.Path) > 1 {
  95. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  96. return
  97. }
  98. rpcFunc := funcMap[request.Method]
  99. if rpcFunc == nil {
  100. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  101. return
  102. }
  103. if rpcFunc.ws {
  104. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method))
  105. return
  106. }
  107. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  108. if err != nil {
  109. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error()))
  110. return
  111. }
  112. returns := rpcFunc.f.Call(args)
  113. log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  114. result, err := unreflectResult(returns)
  115. if err != nil {
  116. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error()))
  117. return
  118. }
  119. WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, ""))
  120. }
  121. }
  122. // Convert a list of interfaces to properly typed values
  123. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  124. if len(rpcFunc.argNames) != len(params) {
  125. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  126. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
  127. }
  128. values := make([]reflect.Value, len(params))
  129. for i, p := range params {
  130. ty := rpcFunc.args[i]
  131. v, err := _jsonObjectToArg(ty, p)
  132. if err != nil {
  133. return nil, err
  134. }
  135. values[i] = v
  136. }
  137. return values, nil
  138. }
  139. // Same as above, but with the first param the websocket connection
  140. func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) {
  141. if len(rpcFunc.argNames)-1 != len(params) {
  142. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  143. len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params))
  144. }
  145. values := make([]reflect.Value, len(params)+1)
  146. values[0] = reflect.ValueOf(wsCtx)
  147. for i, p := range params {
  148. ty := rpcFunc.args[i+1]
  149. v, err := _jsonObjectToArg(ty, p)
  150. if err != nil {
  151. return nil, err
  152. }
  153. values[i+1] = v
  154. }
  155. return values, nil
  156. }
  157. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  158. var err error
  159. v := reflect.New(ty)
  160. wire.ReadJSONObjectPtr(v.Interface(), object, &err)
  161. if err != nil {
  162. return v, err
  163. }
  164. v = v.Elem()
  165. return v, nil
  166. }
  167. // rpc.json
  168. //-----------------------------------------------------------------------------
  169. // rpc.http
  170. // convert from a function name to the http handler
  171. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  172. // Exception for websocket endpoints
  173. if rpcFunc.ws {
  174. return func(w http.ResponseWriter, r *http.Request) {
  175. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets"))
  176. }
  177. }
  178. // All other endpoints
  179. return func(w http.ResponseWriter, r *http.Request) {
  180. args, err := httpParamsToArgs(rpcFunc, r)
  181. if err != nil {
  182. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
  183. return
  184. }
  185. returns := rpcFunc.f.Call(args)
  186. log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  187. result, err := unreflectResult(returns)
  188. if err != nil {
  189. WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
  190. return
  191. }
  192. WriteRPCResponseHTTP(w, NewRPCResponse("", result, ""))
  193. }
  194. }
  195. // Covert an http query to a list of properly typed values.
  196. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  197. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  198. argTypes := rpcFunc.args
  199. argNames := rpcFunc.argNames
  200. var err error
  201. values := make([]reflect.Value, len(argNames))
  202. for i, name := range argNames {
  203. ty := argTypes[i]
  204. arg := GetParam(r, name)
  205. values[i], err = _jsonStringToArg(ty, arg)
  206. if err != nil {
  207. return nil, err
  208. }
  209. }
  210. return values, nil
  211. }
  212. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  213. var err error
  214. v := reflect.New(ty)
  215. wire.ReadJSONPtr(v.Interface(), []byte(arg), &err)
  216. if err != nil {
  217. return v, err
  218. }
  219. v = v.Elem()
  220. return v, nil
  221. }
  222. // rpc.http
  223. //-----------------------------------------------------------------------------
  224. // rpc.websocket
  225. const (
  226. writeChanCapacity = 1000
  227. wsWriteTimeoutSeconds = 30 // each write times out after this
  228. wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings.
  229. wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds.
  230. )
  231. // a single websocket connection
  232. // contains listener id, underlying ws connection,
  233. // and the event switch for subscribing to events
  234. type wsConnection struct {
  235. QuitService
  236. remoteAddr string
  237. baseConn *websocket.Conn
  238. writeChan chan RPCResponse
  239. readTimeout *time.Timer
  240. pingTicker *time.Ticker
  241. funcMap map[string]*RPCFunc
  242. evsw *events.EventSwitch
  243. }
  244. // new websocket connection wrapper
  245. func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *wsConnection {
  246. wsc := &wsConnection{
  247. remoteAddr: baseConn.RemoteAddr().String(),
  248. baseConn: baseConn,
  249. writeChan: make(chan RPCResponse, writeChanCapacity), // error when full.
  250. funcMap: funcMap,
  251. evsw: evsw,
  252. }
  253. wsc.QuitService = *NewQuitService(log, "wsConnection", wsc)
  254. return wsc
  255. }
  256. // wsc.Start() blocks until the connection closes.
  257. func (wsc *wsConnection) OnStart() error {
  258. wsc.QuitService.OnStart()
  259. // Read subscriptions/unsubscriptions to events
  260. go wsc.readRoutine()
  261. // Custom Ping handler to touch readTimeout
  262. wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds)
  263. wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds)
  264. wsc.baseConn.SetPingHandler(func(m string) error {
  265. // NOTE: https://github.com/gorilla/websocket/issues/97
  266. go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  267. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  268. return nil
  269. })
  270. wsc.baseConn.SetPongHandler(func(m string) error {
  271. // NOTE: https://github.com/gorilla/websocket/issues/97
  272. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  273. return nil
  274. })
  275. go wsc.readTimeoutRoutine()
  276. // Write responses, BLOCKING.
  277. wsc.writeRoutine()
  278. return nil
  279. }
  280. func (wsc *wsConnection) OnStop() {
  281. wsc.QuitService.OnStop()
  282. wsc.evsw.RemoveListener(wsc.remoteAddr)
  283. wsc.readTimeout.Stop()
  284. wsc.pingTicker.Stop()
  285. // The write loop closes the websocket connection
  286. // when it exits its loop, and the read loop
  287. // closes the writeChan
  288. }
  289. func (wsc *wsConnection) readTimeoutRoutine() {
  290. select {
  291. case <-wsc.readTimeout.C:
  292. log.Notice("Stopping connection due to read timeout")
  293. wsc.Stop()
  294. case <-wsc.Quit:
  295. return
  296. }
  297. }
  298. // Implements WSRPCConnection
  299. func (wsc *wsConnection) GetRemoteAddr() string {
  300. return wsc.remoteAddr
  301. }
  302. // Implements WSRPCConnection
  303. func (wsc *wsConnection) GetEventSwitch() *events.EventSwitch {
  304. return wsc.evsw
  305. }
  306. // Implements WSRPCConnection
  307. // Blocking write to writeChan until service stops.
  308. func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) {
  309. select {
  310. case <-wsc.Quit:
  311. return
  312. case wsc.writeChan <- resp:
  313. }
  314. }
  315. // Implements WSRPCConnection
  316. // Nonblocking write.
  317. func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool {
  318. select {
  319. case <-wsc.Quit:
  320. return false
  321. case wsc.writeChan <- resp:
  322. return true
  323. default:
  324. return false
  325. }
  326. }
  327. // Read from the socket and subscribe to or unsubscribe from events
  328. func (wsc *wsConnection) readRoutine() {
  329. // Do not close writeChan, to allow WriteRPCResponse() to fail.
  330. // defer close(wsc.writeChan)
  331. for {
  332. select {
  333. case <-wsc.Quit:
  334. return
  335. default:
  336. var in []byte
  337. // Do not set a deadline here like below:
  338. // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds))
  339. // The client may not send anything for a while.
  340. // We use `readTimeout` to handle read timeouts.
  341. _, in, err := wsc.baseConn.ReadMessage()
  342. if err != nil {
  343. log.Notice("Failed to read from connection", "remote", wsc.remoteAddr)
  344. // an error reading the connection,
  345. // kill the connection
  346. wsc.Stop()
  347. return
  348. }
  349. var request RPCRequest
  350. err = json.Unmarshal(in, &request)
  351. if err != nil {
  352. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  353. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr))
  354. continue
  355. }
  356. // Now, fetch the RPCFunc and execute it.
  357. rpcFunc := wsc.funcMap[request.Method]
  358. if rpcFunc == nil {
  359. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  360. continue
  361. }
  362. var args []reflect.Value
  363. if rpcFunc.ws {
  364. wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc}
  365. args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx)
  366. } else {
  367. args, err = jsonParamsToArgs(rpcFunc, request.Params)
  368. }
  369. if err != nil {
  370. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
  371. continue
  372. }
  373. returns := rpcFunc.f.Call(args)
  374. log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns)
  375. result, err := unreflectResult(returns)
  376. if err != nil {
  377. wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
  378. continue
  379. } else {
  380. wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, ""))
  381. continue
  382. }
  383. }
  384. }
  385. }
  386. // receives on a write channel and writes out on the socket
  387. func (wsc *wsConnection) writeRoutine() {
  388. defer wsc.baseConn.Close()
  389. var n, err = int(0), error(nil)
  390. for {
  391. select {
  392. case <-wsc.Quit:
  393. return
  394. case <-wsc.pingTicker.C:
  395. err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{})
  396. if err != nil {
  397. log.Error("Failed to write ping message on websocket", "error", err)
  398. wsc.Stop()
  399. return
  400. }
  401. case msg := <-wsc.writeChan:
  402. buf := new(bytes.Buffer)
  403. wire.WriteJSON(msg, buf, &n, &err)
  404. if err != nil {
  405. log.Error("Failed to marshal RPCResponse to JSON", "error", err)
  406. } else {
  407. wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
  408. bufBytes := buf.Bytes()
  409. if err = wsc.baseConn.WriteMessage(websocket.TextMessage, bufBytes); err != nil {
  410. log.Warn("Failed to write response on websocket", "error", err)
  411. wsc.Stop()
  412. return
  413. }
  414. }
  415. }
  416. }
  417. }
  418. //----------------------------------------
  419. // Main manager for all websocket connections
  420. // Holds the event switch
  421. // NOTE: The websocket path is defined externally, e.g. in node/node.go
  422. type WebsocketManager struct {
  423. websocket.Upgrader
  424. funcMap map[string]*RPCFunc
  425. evsw *events.EventSwitch
  426. }
  427. func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WebsocketManager {
  428. return &WebsocketManager{
  429. funcMap: funcMap,
  430. evsw: evsw,
  431. Upgrader: websocket.Upgrader{
  432. ReadBufferSize: 1024,
  433. WriteBufferSize: 1024,
  434. CheckOrigin: func(r *http.Request) bool {
  435. // TODO
  436. return true
  437. },
  438. },
  439. }
  440. }
  441. // Upgrade the request/response (via http.Hijack) and starts the wsConnection.
  442. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
  443. wsConn, err := wm.Upgrade(w, r, nil)
  444. if err != nil {
  445. // TODO - return http error
  446. log.Error("Failed to upgrade to websocket connection", "error", err)
  447. return
  448. }
  449. // register connection
  450. con := NewWSConnection(wsConn, wm.funcMap, wm.evsw)
  451. log.Notice("New websocket connection", "remote", con.remoteAddr)
  452. con.Start() // Blocking
  453. }
  454. // rpc.websocket
  455. //-----------------------------------------------------------------------------
  456. // returns is result struct and error. If error is not nil, return it
  457. func unreflectResult(returns []reflect.Value) (interface{}, error) {
  458. errV := returns[1]
  459. if errV.Interface() != nil {
  460. return nil, fmt.Errorf("%v", errV.Interface())
  461. }
  462. return returns[0].Interface(), nil
  463. }
  464. // writes a list of available rpc endpoints as an html page
  465. func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
  466. noArgNames := []string{}
  467. argNames := []string{}
  468. for name, funcData := range funcMap {
  469. if len(funcData.args) == 0 {
  470. noArgNames = append(noArgNames, name)
  471. } else {
  472. argNames = append(argNames, name)
  473. }
  474. }
  475. sort.Strings(noArgNames)
  476. sort.Strings(argNames)
  477. buf := new(bytes.Buffer)
  478. buf.WriteString("<html><body>")
  479. buf.WriteString("<br>Available endpoints:<br>")
  480. for _, name := range noArgNames {
  481. link := fmt.Sprintf("http://%s/%s", r.Host, name)
  482. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  483. }
  484. buf.WriteString("<br>Endpoints that require arguments:<br>")
  485. for _, name := range argNames {
  486. link := fmt.Sprintf("http://%s/%s?", r.Host, name)
  487. funcData := funcMap[name]
  488. for i, argName := range funcData.argNames {
  489. link += argName + "=_"
  490. if i < len(funcData.argNames)-1 {
  491. link += "&"
  492. }
  493. }
  494. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  495. }
  496. buf.WriteString("</body></html>")
  497. w.Header().Set("Content-Type", "text/html")
  498. w.WriteHeader(200)
  499. w.Write(buf.Bytes())
  500. }