You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

645 lines
18 KiB

  1. package rpcserver
  2. import (
  3. "bytes"
  4. "encoding/hex"
  5. "encoding/json"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "reflect"
  10. "sort"
  11. "strings"
  12. "time"
  13. "github.com/gorilla/websocket"
  14. cmn "github.com/tendermint/go-common"
  15. events "github.com/tendermint/go-events"
  16. types "github.com/tendermint/go-rpc/types"
  17. wire "github.com/tendermint/go-wire"
  18. )
  19. // Adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions.
  20. // "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse
  21. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  22. // HTTP endpoints
  23. for funcName, rpcFunc := range funcMap {
  24. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  25. }
  26. // JSONRPC endpoints
  27. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  28. }
  29. //-------------------------------------
  30. // function introspection
  31. // holds all type information for each function
  32. type RPCFunc struct {
  33. f reflect.Value // underlying rpc function
  34. args []reflect.Type // type of each function arg
  35. returns []reflect.Type // type of each return arg
  36. argNames []string // name of each argument
  37. ws bool // websocket only
  38. }
  39. // wraps a function for quicker introspection
  40. // f is the function, args are comma separated argument names
  41. func NewRPCFunc(f interface{}, args string) *RPCFunc {
  42. return newRPCFunc(f, args, false)
  43. }
  44. func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
  45. return newRPCFunc(f, args, true)
  46. }
  47. func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc {
  48. var argNames []string
  49. if args != "" {
  50. argNames = strings.Split(args, ",")
  51. }
  52. return &RPCFunc{
  53. f: reflect.ValueOf(f),
  54. args: funcArgTypes(f),
  55. returns: funcReturnTypes(f),
  56. argNames: argNames,
  57. ws: ws,
  58. }
  59. }
  60. // return a function's argument types
  61. func funcArgTypes(f interface{}) []reflect.Type {
  62. t := reflect.TypeOf(f)
  63. n := t.NumIn()
  64. typez := make([]reflect.Type, n)
  65. for i := 0; i < n; i++ {
  66. typez[i] = t.In(i)
  67. }
  68. return typez
  69. }
  70. // return a function's return types
  71. func funcReturnTypes(f interface{}) []reflect.Type {
  72. t := reflect.TypeOf(f)
  73. n := t.NumOut()
  74. typez := make([]reflect.Type, n)
  75. for i := 0; i < n; i++ {
  76. typez[i] = t.Out(i)
  77. }
  78. return typez
  79. }
  80. // function introspection
  81. //-----------------------------------------------------------------------------
  82. // rpc.json
  83. // jsonrpc calls grab the given method's function info and runs reflect.Call
  84. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  85. return func(w http.ResponseWriter, r *http.Request) {
  86. b, _ := ioutil.ReadAll(r.Body)
  87. // if its an empty request (like from a browser),
  88. // just display a list of functions
  89. if len(b) == 0 {
  90. writeListOfEndpoints(w, r, funcMap)
  91. return
  92. }
  93. var request types.RPCRequest
  94. err := json.Unmarshal(b, &request)
  95. if err != nil {
  96. WriteRPCResponseHTTP(w, types.NewRPCResponse("", nil, fmt.Sprintf("Error unmarshalling request: %v", err.Error())))
  97. return
  98. }
  99. if len(r.URL.Path) > 1 {
  100. WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  101. return
  102. }
  103. rpcFunc := funcMap[request.Method]
  104. if rpcFunc == nil {
  105. WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  106. return
  107. }
  108. if rpcFunc.ws {
  109. WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method))
  110. return
  111. }
  112. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  113. if err != nil {
  114. WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, nil, fmt.Sprintf("Error converting json params to arguments: %v", err.Error())))
  115. return
  116. }
  117. returns := rpcFunc.f.Call(args)
  118. log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  119. result, err := unreflectResult(returns)
  120. if err != nil {
  121. WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, result, fmt.Sprintf("Error unreflecting result: %v", err.Error())))
  122. return
  123. }
  124. WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, result, ""))
  125. }
  126. }
  127. // Convert a list of interfaces to properly typed values
  128. func jsonParamsToArgs(rpcFunc *RPCFunc, params map[string]interface{}) ([]reflect.Value, error) {
  129. values := make([]reflect.Value, len(rpcFunc.args))
  130. // fill each value with default
  131. for i, argType := range rpcFunc.args {
  132. values[i] = reflect.Zero(argType)
  133. }
  134. for name, param := range params {
  135. i := indexOf(name, rpcFunc.argNames)
  136. if -1 == i {
  137. return nil, fmt.Errorf("%s is not an argument (args: %v)", name, rpcFunc.argNames)
  138. }
  139. argType := rpcFunc.args[i]
  140. v, err := _jsonObjectToArg(argType, param)
  141. if err != nil {
  142. return nil, err
  143. }
  144. values[i] = v
  145. }
  146. return values, nil
  147. }
  148. // indexOf returns index of a string in a slice of strings, -1 if not found.
  149. func indexOf(value string, values []string) int {
  150. for i, v := range values {
  151. if v == value {
  152. return i
  153. }
  154. }
  155. return -1
  156. }
  157. // Same as above, but with the first param the websocket connection
  158. func jsonParamsToArgsWS(rpcFunc *RPCFunc, params map[string]interface{}, wsCtx types.WSRPCContext) ([]reflect.Value, error) {
  159. values := make([]reflect.Value, len(rpcFunc.args))
  160. values[0] = reflect.ValueOf(wsCtx)
  161. // fill each value with default
  162. for i, argType := range rpcFunc.args {
  163. values[i+1] = reflect.Zero(argType)
  164. }
  165. for name, param := range params {
  166. i := indexOf(name, rpcFunc.argNames)
  167. if -1 == i {
  168. return nil, fmt.Errorf("%s is not an argument (args: %v)", name, rpcFunc.argNames)
  169. }
  170. argType := rpcFunc.args[i+1]
  171. v, err := _jsonObjectToArg(argType, param)
  172. if err != nil {
  173. return nil, err
  174. }
  175. values[i+1] = v
  176. }
  177. return values, nil
  178. }
  179. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  180. var err error
  181. v := reflect.New(ty)
  182. wire.ReadJSONObjectPtr(v.Interface(), object, &err)
  183. if err != nil {
  184. return v, err
  185. }
  186. v = v.Elem()
  187. return v, nil
  188. }
  189. // rpc.json
  190. //-----------------------------------------------------------------------------
  191. // rpc.http
  192. // convert from a function name to the http handler
  193. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  194. // Exception for websocket endpoints
  195. if rpcFunc.ws {
  196. return func(w http.ResponseWriter, r *http.Request) {
  197. WriteRPCResponseHTTP(w, types.NewRPCResponse("", nil, "This RPC method is only for websockets"))
  198. }
  199. }
  200. // All other endpoints
  201. return func(w http.ResponseWriter, r *http.Request) {
  202. log.Debug("HTTP HANDLER", "req", r)
  203. args, err := httpParamsToArgs(rpcFunc, r)
  204. if err != nil {
  205. WriteRPCResponseHTTP(w, types.NewRPCResponse("", nil, fmt.Sprintf("Error converting http params to args: %v", err.Error())))
  206. return
  207. }
  208. returns := rpcFunc.f.Call(args)
  209. log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  210. result, err := unreflectResult(returns)
  211. if err != nil {
  212. WriteRPCResponseHTTP(w, types.NewRPCResponse("", nil, fmt.Sprintf("Error unreflecting result: %v", err.Error())))
  213. return
  214. }
  215. WriteRPCResponseHTTP(w, types.NewRPCResponse("", result, ""))
  216. }
  217. }
  218. // Covert an http query to a list of properly typed values.
  219. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  220. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  221. values := make([]reflect.Value, len(rpcFunc.args))
  222. // fill each value with default
  223. for i, argType := range rpcFunc.args {
  224. values[i] = reflect.Zero(argType)
  225. }
  226. for i, name := range rpcFunc.argNames {
  227. argType := rpcFunc.args[i]
  228. arg := GetParam(r, name)
  229. // log.Notice("param to arg", "argType", argType, "name", name, "arg", arg)
  230. if "" == arg {
  231. continue
  232. }
  233. v, err, ok := nonJsonToArg(argType, arg)
  234. if err != nil {
  235. return nil, err
  236. }
  237. if ok {
  238. values[i] = v
  239. continue
  240. }
  241. // Pass values to go-wire
  242. values[i], err = _jsonStringToArg(argType, arg)
  243. if err != nil {
  244. return nil, err
  245. }
  246. }
  247. return values, nil
  248. }
  249. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  250. var err error
  251. v := reflect.New(ty)
  252. wire.ReadJSONPtr(v.Interface(), []byte(arg), &err)
  253. if err != nil {
  254. return v, err
  255. }
  256. v = v.Elem()
  257. return v, nil
  258. }
  259. func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) {
  260. isQuotedString := strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`)
  261. isHexString := strings.HasPrefix(strings.ToLower(arg), "0x")
  262. expectingString := ty.Kind() == reflect.String
  263. expectingByteSlice := ty.Kind() == reflect.Slice && ty.Elem().Kind() == reflect.Uint8
  264. if isHexString {
  265. if !expectingString && !expectingByteSlice {
  266. err := fmt.Errorf("Got a hex string arg, but expected '%s'",
  267. ty.Kind().String())
  268. return reflect.ValueOf(nil), err, false
  269. }
  270. var value []byte
  271. value, err := hex.DecodeString(arg[2:])
  272. if err != nil {
  273. return reflect.ValueOf(nil), err, false
  274. }
  275. if ty.Kind() == reflect.String {
  276. return reflect.ValueOf(string(value)), nil, true
  277. }
  278. return reflect.ValueOf([]byte(value)), nil, true
  279. }
  280. if isQuotedString && expectingByteSlice {
  281. var err error
  282. v := reflect.New(reflect.TypeOf(""))
  283. wire.ReadJSONPtr(v.Interface(), []byte(arg), &err)
  284. if err != nil {
  285. return reflect.ValueOf(nil), err, false
  286. }
  287. v = v.Elem()
  288. return reflect.ValueOf([]byte(v.String())), nil, true
  289. }
  290. return reflect.ValueOf(nil), nil, false
  291. }
  292. // rpc.http
  293. //-----------------------------------------------------------------------------
  294. // rpc.websocket
  295. const (
  296. writeChanCapacity = 1000
  297. wsWriteTimeoutSeconds = 30 // each write times out after this
  298. wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings.
  299. wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds.
  300. )
  301. // a single websocket connection
  302. // contains listener id, underlying ws connection,
  303. // and the event switch for subscribing to events
  304. type wsConnection struct {
  305. cmn.BaseService
  306. remoteAddr string
  307. baseConn *websocket.Conn
  308. writeChan chan types.RPCResponse
  309. readTimeout *time.Timer
  310. pingTicker *time.Ticker
  311. funcMap map[string]*RPCFunc
  312. evsw events.EventSwitch
  313. }
  314. // new websocket connection wrapper
  315. func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch) *wsConnection {
  316. wsc := &wsConnection{
  317. remoteAddr: baseConn.RemoteAddr().String(),
  318. baseConn: baseConn,
  319. writeChan: make(chan types.RPCResponse, writeChanCapacity), // error when full.
  320. funcMap: funcMap,
  321. evsw: evsw,
  322. }
  323. wsc.BaseService = *cmn.NewBaseService(log, "wsConnection", wsc)
  324. return wsc
  325. }
  326. // wsc.Start() blocks until the connection closes.
  327. func (wsc *wsConnection) OnStart() error {
  328. wsc.BaseService.OnStart()
  329. // Read subscriptions/unsubscriptions to events
  330. go wsc.readRoutine()
  331. // Custom Ping handler to touch readTimeout
  332. wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds)
  333. wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds)
  334. wsc.baseConn.SetPingHandler(func(m string) error {
  335. // NOTE: https://github.com/gorilla/websocket/issues/97
  336. go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  337. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  338. return nil
  339. })
  340. wsc.baseConn.SetPongHandler(func(m string) error {
  341. // NOTE: https://github.com/gorilla/websocket/issues/97
  342. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  343. return nil
  344. })
  345. go wsc.readTimeoutRoutine()
  346. // Write responses, BLOCKING.
  347. wsc.writeRoutine()
  348. return nil
  349. }
  350. func (wsc *wsConnection) OnStop() {
  351. wsc.BaseService.OnStop()
  352. wsc.evsw.RemoveListener(wsc.remoteAddr)
  353. wsc.readTimeout.Stop()
  354. wsc.pingTicker.Stop()
  355. // The write loop closes the websocket connection
  356. // when it exits its loop, and the read loop
  357. // closes the writeChan
  358. }
  359. func (wsc *wsConnection) readTimeoutRoutine() {
  360. select {
  361. case <-wsc.readTimeout.C:
  362. log.Notice("Stopping connection due to read timeout")
  363. wsc.Stop()
  364. case <-wsc.Quit:
  365. return
  366. }
  367. }
  368. // Implements WSRPCConnection
  369. func (wsc *wsConnection) GetRemoteAddr() string {
  370. return wsc.remoteAddr
  371. }
  372. // Implements WSRPCConnection
  373. func (wsc *wsConnection) GetEventSwitch() events.EventSwitch {
  374. return wsc.evsw
  375. }
  376. // Implements WSRPCConnection
  377. // Blocking write to writeChan until service stops.
  378. // Goroutine-safe
  379. func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
  380. select {
  381. case <-wsc.Quit:
  382. return
  383. case wsc.writeChan <- resp:
  384. }
  385. }
  386. // Implements WSRPCConnection
  387. // Nonblocking write.
  388. // Goroutine-safe
  389. func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
  390. select {
  391. case <-wsc.Quit:
  392. return false
  393. case wsc.writeChan <- resp:
  394. return true
  395. default:
  396. return false
  397. }
  398. }
  399. // Read from the socket and subscribe to or unsubscribe from events
  400. func (wsc *wsConnection) readRoutine() {
  401. // Do not close writeChan, to allow WriteRPCResponse() to fail.
  402. // defer close(wsc.writeChan)
  403. for {
  404. select {
  405. case <-wsc.Quit:
  406. return
  407. default:
  408. var in []byte
  409. // Do not set a deadline here like below:
  410. // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds))
  411. // The client may not send anything for a while.
  412. // We use `readTimeout` to handle read timeouts.
  413. _, in, err := wsc.baseConn.ReadMessage()
  414. if err != nil {
  415. log.Notice("Failed to read from connection", "remote", wsc.remoteAddr, "err", err.Error())
  416. // an error reading the connection,
  417. // kill the connection
  418. wsc.Stop()
  419. return
  420. }
  421. var request types.RPCRequest
  422. err = json.Unmarshal(in, &request)
  423. if err != nil {
  424. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  425. wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, errStr))
  426. continue
  427. }
  428. // Now, fetch the RPCFunc and execute it.
  429. rpcFunc := wsc.funcMap[request.Method]
  430. if rpcFunc == nil {
  431. wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  432. continue
  433. }
  434. var args []reflect.Value
  435. if rpcFunc.ws {
  436. wsCtx := types.WSRPCContext{Request: request, WSRPCConnection: wsc}
  437. args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx)
  438. } else {
  439. args, err = jsonParamsToArgs(rpcFunc, request.Params)
  440. }
  441. if err != nil {
  442. wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, err.Error()))
  443. continue
  444. }
  445. returns := rpcFunc.f.Call(args)
  446. log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns)
  447. result, err := unreflectResult(returns)
  448. if err != nil {
  449. wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, err.Error()))
  450. continue
  451. } else {
  452. wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, result, ""))
  453. continue
  454. }
  455. }
  456. }
  457. }
  458. // receives on a write channel and writes out on the socket
  459. func (wsc *wsConnection) writeRoutine() {
  460. defer wsc.baseConn.Close()
  461. for {
  462. select {
  463. case <-wsc.Quit:
  464. return
  465. case <-wsc.pingTicker.C:
  466. err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{})
  467. if err != nil {
  468. log.Error("Failed to write ping message on websocket", "error", err)
  469. wsc.Stop()
  470. return
  471. }
  472. case msg := <-wsc.writeChan:
  473. jsonBytes, err := json.Marshal(msg)
  474. if err != nil {
  475. log.Error("Failed to marshal RPCResponse to JSON", "error", err)
  476. } else {
  477. wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
  478. if err = wsc.baseConn.WriteMessage(websocket.TextMessage, jsonBytes); err != nil {
  479. log.Warn("Failed to write response on websocket", "error", err)
  480. wsc.Stop()
  481. return
  482. }
  483. }
  484. }
  485. }
  486. }
  487. //----------------------------------------
  488. // Main manager for all websocket connections
  489. // Holds the event switch
  490. // NOTE: The websocket path is defined externally, e.g. in node/node.go
  491. type WebsocketManager struct {
  492. websocket.Upgrader
  493. funcMap map[string]*RPCFunc
  494. evsw events.EventSwitch
  495. }
  496. func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch) *WebsocketManager {
  497. return &WebsocketManager{
  498. funcMap: funcMap,
  499. evsw: evsw,
  500. Upgrader: websocket.Upgrader{
  501. ReadBufferSize: 1024,
  502. WriteBufferSize: 1024,
  503. CheckOrigin: func(r *http.Request) bool {
  504. // TODO
  505. return true
  506. },
  507. },
  508. }
  509. }
  510. // Upgrade the request/response (via http.Hijack) and starts the wsConnection.
  511. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
  512. wsConn, err := wm.Upgrade(w, r, nil)
  513. if err != nil {
  514. // TODO - return http error
  515. log.Error("Failed to upgrade to websocket connection", "error", err)
  516. return
  517. }
  518. // register connection
  519. con := NewWSConnection(wsConn, wm.funcMap, wm.evsw)
  520. log.Notice("New websocket connection", "remote", con.remoteAddr)
  521. con.Start() // Blocking
  522. }
  523. // rpc.websocket
  524. //-----------------------------------------------------------------------------
  525. // NOTE: assume returns is result struct and error. If error is not nil, return it
  526. func unreflectResult(returns []reflect.Value) (interface{}, error) {
  527. errV := returns[1]
  528. if errV.Interface() != nil {
  529. return nil, fmt.Errorf("%v", errV.Interface())
  530. }
  531. rv := returns[0]
  532. // the result is a registered interface,
  533. // we need a pointer to it so we can marshal with type byte
  534. rvp := reflect.New(rv.Type())
  535. rvp.Elem().Set(rv)
  536. return rvp.Interface(), nil
  537. }
  538. // writes a list of available rpc endpoints as an html page
  539. func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
  540. noArgNames := []string{}
  541. argNames := []string{}
  542. for name, funcData := range funcMap {
  543. if len(funcData.args) == 0 {
  544. noArgNames = append(noArgNames, name)
  545. } else {
  546. argNames = append(argNames, name)
  547. }
  548. }
  549. sort.Strings(noArgNames)
  550. sort.Strings(argNames)
  551. buf := new(bytes.Buffer)
  552. buf.WriteString("<html><body>")
  553. buf.WriteString("<br>Available endpoints:<br>")
  554. for _, name := range noArgNames {
  555. link := fmt.Sprintf("http://%s/%s", r.Host, name)
  556. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  557. }
  558. buf.WriteString("<br>Endpoints that require arguments:<br>")
  559. for _, name := range argNames {
  560. link := fmt.Sprintf("http://%s/%s?", r.Host, name)
  561. funcData := funcMap[name]
  562. for i, argName := range funcData.argNames {
  563. link += argName + "=_"
  564. if i < len(funcData.argNames)-1 {
  565. link += "&"
  566. }
  567. }
  568. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  569. }
  570. buf.WriteString("</body></html>")
  571. w.Header().Set("Content-Type", "text/html")
  572. w.WriteHeader(200)
  573. w.Write(buf.Bytes())
  574. }