You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

531 lines
15 KiB

10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package rpcserver
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "reflect"
  10. "sort"
  11. "time"
  12. "github.com/gorilla/websocket"
  13. . "github.com/tendermint/go-common"
  14. "github.com/tendermint/go-wire"
  15. "github.com/tendermint/tendermint/events"
  16. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  17. . "github.com/tendermint/tendermint/rpc/types"
  18. "github.com/tendermint/tendermint/types"
  19. )
  20. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  21. // HTTP endpoints
  22. for funcName, rpcFunc := range funcMap {
  23. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  24. }
  25. // JSONRPC endpoints
  26. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  27. }
  28. //-------------------------------------
  29. // function introspection
  30. // holds all type information for each function
  31. type RPCFunc struct {
  32. f reflect.Value // underlying rpc function
  33. args []reflect.Type // type of each function arg
  34. returns []reflect.Type // type of each return arg
  35. argNames []string // name of each argument
  36. }
  37. // wraps a function for quicker introspection
  38. func NewRPCFunc(f interface{}, args []string) *RPCFunc {
  39. return &RPCFunc{
  40. f: reflect.ValueOf(f),
  41. args: funcArgTypes(f),
  42. returns: funcReturnTypes(f),
  43. argNames: args,
  44. }
  45. }
  46. // return a function's argument types
  47. func funcArgTypes(f interface{}) []reflect.Type {
  48. t := reflect.TypeOf(f)
  49. n := t.NumIn()
  50. typez := make([]reflect.Type, n)
  51. for i := 0; i < n; i++ {
  52. typez[i] = t.In(i)
  53. }
  54. return typez
  55. }
  56. // return a function's return types
  57. func funcReturnTypes(f interface{}) []reflect.Type {
  58. t := reflect.TypeOf(f)
  59. n := t.NumOut()
  60. typez := make([]reflect.Type, n)
  61. for i := 0; i < n; i++ {
  62. typez[i] = t.Out(i)
  63. }
  64. return typez
  65. }
  66. // function introspection
  67. //-----------------------------------------------------------------------------
  68. // rpc.json
  69. // jsonrpc calls grab the given method's function info and runs reflect.Call
  70. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  71. return func(w http.ResponseWriter, r *http.Request) {
  72. b, _ := ioutil.ReadAll(r.Body)
  73. // if its an empty request (like from a browser),
  74. // just display a list of functions
  75. if len(b) == 0 {
  76. writeListOfEndpoints(w, r, funcMap)
  77. return
  78. }
  79. var request RPCRequest
  80. err := json.Unmarshal(b, &request)
  81. if err != nil {
  82. WriteRPCResponse(w, NewRPCResponse("", nil, err.Error()))
  83. return
  84. }
  85. if len(r.URL.Path) > 1 {
  86. WriteRPCResponse(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  87. return
  88. }
  89. rpcFunc := funcMap[request.Method]
  90. if rpcFunc == nil {
  91. WriteRPCResponse(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  92. return
  93. }
  94. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  95. if err != nil {
  96. WriteRPCResponse(w, NewRPCResponse(request.ID, nil, err.Error()))
  97. return
  98. }
  99. returns := rpcFunc.f.Call(args)
  100. log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  101. result, err := unreflectResult(returns)
  102. if err != nil {
  103. WriteRPCResponse(w, NewRPCResponse(request.ID, nil, err.Error()))
  104. return
  105. }
  106. WriteRPCResponse(w, NewRPCResponse(request.ID, result, ""))
  107. }
  108. }
  109. // covert a list of interfaces to properly typed values
  110. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  111. if len(rpcFunc.argNames) != len(params) {
  112. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  113. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
  114. }
  115. values := make([]reflect.Value, len(params))
  116. for i, p := range params {
  117. ty := rpcFunc.args[i]
  118. v, err := _jsonObjectToArg(ty, p)
  119. if err != nil {
  120. return nil, err
  121. }
  122. values[i] = v
  123. }
  124. return values, nil
  125. }
  126. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  127. var err error
  128. v := reflect.New(ty)
  129. wire.ReadJSONObjectPtr(v.Interface(), object, &err)
  130. if err != nil {
  131. return v, err
  132. }
  133. v = v.Elem()
  134. return v, nil
  135. }
  136. // rpc.json
  137. //-----------------------------------------------------------------------------
  138. // rpc.http
  139. // convert from a function name to the http handler
  140. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  141. return func(w http.ResponseWriter, r *http.Request) {
  142. args, err := httpParamsToArgs(rpcFunc, r)
  143. if err != nil {
  144. WriteRPCResponse(w, NewRPCResponse("", nil, err.Error()))
  145. return
  146. }
  147. returns := rpcFunc.f.Call(args)
  148. log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  149. result, err := unreflectResult(returns)
  150. if err != nil {
  151. WriteRPCResponse(w, NewRPCResponse("", nil, err.Error()))
  152. return
  153. }
  154. WriteRPCResponse(w, NewRPCResponse("", result, ""))
  155. }
  156. }
  157. // Covert an http query to a list of properly typed values.
  158. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  159. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  160. argTypes := rpcFunc.args
  161. argNames := rpcFunc.argNames
  162. var err error
  163. values := make([]reflect.Value, len(argNames))
  164. for i, name := range argNames {
  165. ty := argTypes[i]
  166. arg := GetParam(r, name)
  167. values[i], err = _jsonStringToArg(ty, arg)
  168. if err != nil {
  169. return nil, err
  170. }
  171. }
  172. return values, nil
  173. }
  174. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  175. var err error
  176. v := reflect.New(ty)
  177. wire.ReadJSONPtr(v.Interface(), []byte(arg), &err)
  178. if err != nil {
  179. return v, err
  180. }
  181. v = v.Elem()
  182. return v, nil
  183. }
  184. // rpc.http
  185. //-----------------------------------------------------------------------------
  186. // rpc.websocket
  187. const (
  188. writeChanCapacity = 1000
  189. wsWriteTimeoutSeconds = 30 // each write times out after this
  190. wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings.
  191. wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds.
  192. )
  193. // a single websocket connection
  194. // contains listener id, underlying ws connection,
  195. // and the event switch for subscribing to events
  196. type WSConnection struct {
  197. QuitService
  198. id string
  199. baseConn *websocket.Conn
  200. writeChan chan RPCResponse
  201. readTimeout *time.Timer
  202. pingTicker *time.Ticker
  203. funcMap map[string]*RPCFunc
  204. evsw *events.EventSwitch
  205. }
  206. // new websocket connection wrapper
  207. func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WSConnection {
  208. wsc := &WSConnection{
  209. id: baseConn.RemoteAddr().String(),
  210. baseConn: baseConn,
  211. writeChan: make(chan RPCResponse, writeChanCapacity), // error when full.
  212. funcMap: funcMap,
  213. evsw: evsw,
  214. }
  215. wsc.QuitService = *NewQuitService(log, "WSConnection", wsc)
  216. return wsc
  217. }
  218. // wsc.Start() blocks until the connection closes.
  219. func (wsc *WSConnection) OnStart() error {
  220. wsc.QuitService.OnStart()
  221. // Read subscriptions/unsubscriptions to events
  222. go wsc.readRoutine()
  223. // Custom Ping handler to touch readTimeout
  224. wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds)
  225. wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds)
  226. wsc.baseConn.SetPingHandler(func(m string) error {
  227. // NOTE: https://github.com/gorilla/websocket/issues/97
  228. go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
  229. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  230. return nil
  231. })
  232. wsc.baseConn.SetPongHandler(func(m string) error {
  233. // NOTE: https://github.com/gorilla/websocket/issues/97
  234. wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
  235. return nil
  236. })
  237. go wsc.readTimeoutRoutine()
  238. // Write responses, BLOCKING.
  239. wsc.writeRoutine()
  240. return nil
  241. }
  242. func (wsc *WSConnection) OnStop() {
  243. wsc.QuitService.OnStop()
  244. wsc.evsw.RemoveListener(wsc.id)
  245. wsc.readTimeout.Stop()
  246. wsc.pingTicker.Stop()
  247. // The write loop closes the websocket connection
  248. // when it exits its loop, and the read loop
  249. // closes the writeChan
  250. }
  251. func (wsc *WSConnection) readTimeoutRoutine() {
  252. select {
  253. case <-wsc.readTimeout.C:
  254. log.Notice("Stopping connection due to read timeout")
  255. wsc.Stop()
  256. case <-wsc.Quit:
  257. return
  258. }
  259. }
  260. // Blocking write to writeChan until service stops.
  261. func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) {
  262. select {
  263. case <-wsc.Quit:
  264. return
  265. case wsc.writeChan <- resp:
  266. }
  267. }
  268. // Nonblocking write.
  269. func (wsc *WSConnection) tryWriteRPCResponse(resp RPCResponse) bool {
  270. select {
  271. case <-wsc.Quit:
  272. return false
  273. case wsc.writeChan <- resp:
  274. return true
  275. default:
  276. return false
  277. }
  278. }
  279. // Read from the socket and subscribe to or unsubscribe from events
  280. func (wsc *WSConnection) readRoutine() {
  281. // Do not close writeChan, to allow writeRPCResponse() to fail.
  282. // defer close(wsc.writeChan)
  283. for {
  284. select {
  285. case <-wsc.Quit:
  286. return
  287. default:
  288. var in []byte
  289. // Do not set a deadline here like below:
  290. // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds))
  291. // The client may not send anything for a while.
  292. // We use `readTimeout` to handle read timeouts.
  293. _, in, err := wsc.baseConn.ReadMessage()
  294. if err != nil {
  295. log.Notice("Failed to read from connection", "id", wsc.id)
  296. // an error reading the connection,
  297. // kill the connection
  298. wsc.Stop()
  299. return
  300. }
  301. var request RPCRequest
  302. err = json.Unmarshal(in, &request)
  303. if err != nil {
  304. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  305. wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, errStr))
  306. continue
  307. }
  308. switch request.Method {
  309. case "subscribe":
  310. if len(request.Params) != 1 {
  311. wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "subscribe takes 1 event parameter string"))
  312. continue
  313. }
  314. if event, ok := request.Params[0].(string); !ok {
  315. wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "subscribe takes 1 event parameter string"))
  316. continue
  317. } else {
  318. log.Notice("Subscribe to event", "id", wsc.id, "event", event)
  319. wsc.evsw.AddListenerForEvent(wsc.id, event, func(msg types.EventData) {
  320. // NOTE: EventSwitch callbacks must be nonblocking
  321. // NOTE: RPCResponses of subscribed events have id suffix "#event"
  322. wsc.tryWriteRPCResponse(NewRPCResponse(request.ID+"#event", ctypes.ResultEvent{event, msg}, ""))
  323. })
  324. continue
  325. }
  326. case "unsubscribe":
  327. if len(request.Params) == 0 {
  328. log.Notice("Unsubscribe from all events", "id", wsc.id)
  329. wsc.evsw.RemoveListener(wsc.id)
  330. wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, ""))
  331. continue
  332. } else if len(request.Params) == 1 {
  333. if event, ok := request.Params[0].(string); !ok {
  334. wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "unsubscribe takes 0 or 1 event parameter strings"))
  335. continue
  336. } else {
  337. log.Notice("Unsubscribe from event", "id", wsc.id, "event", event)
  338. wsc.evsw.RemoveListenerForEvent(event, wsc.id)
  339. wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, ""))
  340. continue
  341. }
  342. } else {
  343. wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "unsubscribe takes 0 or 1 event parameter strings"))
  344. continue
  345. }
  346. default:
  347. rpcFunc := wsc.funcMap[request.Method]
  348. if rpcFunc == nil {
  349. wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
  350. continue
  351. }
  352. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  353. if err != nil {
  354. wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
  355. continue
  356. }
  357. returns := rpcFunc.f.Call(args)
  358. log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns)
  359. result, err := unreflectResult(returns)
  360. if err != nil {
  361. wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
  362. continue
  363. } else {
  364. wsc.writeRPCResponse(NewRPCResponse(request.ID, result, ""))
  365. continue
  366. }
  367. }
  368. }
  369. }
  370. }
  371. // receives on a write channel and writes out on the socket
  372. func (wsc *WSConnection) writeRoutine() {
  373. defer wsc.baseConn.Close()
  374. var n, err = int(0), error(nil)
  375. for {
  376. select {
  377. case <-wsc.Quit:
  378. return
  379. case <-wsc.pingTicker.C:
  380. err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{})
  381. if err != nil {
  382. log.Error("Failed to write ping message on websocket", "error", err)
  383. wsc.Stop()
  384. return
  385. }
  386. case msg := <-wsc.writeChan:
  387. buf := new(bytes.Buffer)
  388. wire.WriteJSON(msg, buf, &n, &err)
  389. if err != nil {
  390. log.Error("Failed to marshal RPCResponse to JSON", "error", err)
  391. } else {
  392. wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
  393. bufBytes := buf.Bytes()
  394. if err = wsc.baseConn.WriteMessage(websocket.TextMessage, bufBytes); err != nil {
  395. log.Warn("Failed to write response on websocket", "error", err)
  396. wsc.Stop()
  397. return
  398. }
  399. }
  400. }
  401. }
  402. }
  403. //----------------------------------------
  404. // Main manager for all websocket connections
  405. // Holds the event switch
  406. // NOTE: The websocket path is defined externally, e.g. in node/node.go
  407. type WebsocketManager struct {
  408. websocket.Upgrader
  409. funcMap map[string]*RPCFunc
  410. evsw *events.EventSwitch
  411. }
  412. func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WebsocketManager {
  413. return &WebsocketManager{
  414. funcMap: funcMap,
  415. evsw: evsw,
  416. Upgrader: websocket.Upgrader{
  417. ReadBufferSize: 1024,
  418. WriteBufferSize: 1024,
  419. CheckOrigin: func(r *http.Request) bool {
  420. // TODO
  421. return true
  422. },
  423. },
  424. }
  425. }
  426. // Upgrade the request/response (via http.Hijack) and starts the WSConnection.
  427. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
  428. wsConn, err := wm.Upgrade(w, r, nil)
  429. if err != nil {
  430. // TODO - return http error
  431. log.Error("Failed to upgrade to websocket connection", "error", err)
  432. return
  433. }
  434. // register connection
  435. con := NewWSConnection(wsConn, wm.funcMap, wm.evsw)
  436. log.Notice("New websocket connection", "origin", con.id)
  437. con.Start() // Blocking
  438. }
  439. // rpc.websocket
  440. //-----------------------------------------------------------------------------
  441. // returns is result struct and error. If error is not nil, return it
  442. func unreflectResult(returns []reflect.Value) (interface{}, error) {
  443. errV := returns[1]
  444. if errV.Interface() != nil {
  445. return nil, fmt.Errorf("%v", errV.Interface())
  446. }
  447. return returns[0].Interface(), nil
  448. }
  449. // writes a list of available rpc endpoints as an html page
  450. func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
  451. noArgNames := []string{}
  452. argNames := []string{}
  453. for name, funcData := range funcMap {
  454. if len(funcData.args) == 0 {
  455. noArgNames = append(noArgNames, name)
  456. } else {
  457. argNames = append(argNames, name)
  458. }
  459. }
  460. sort.Strings(noArgNames)
  461. sort.Strings(argNames)
  462. buf := new(bytes.Buffer)
  463. buf.WriteString("<html><body>")
  464. buf.WriteString("<br>Available endpoints:<br>")
  465. for _, name := range noArgNames {
  466. link := fmt.Sprintf("http://%s/%s", r.Host, name)
  467. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  468. }
  469. buf.WriteString("<br>Endpoints that require arguments:<br>")
  470. for _, name := range argNames {
  471. link := fmt.Sprintf("http://%s/%s?", r.Host, name)
  472. funcData := funcMap[name]
  473. for i, argName := range funcData.argNames {
  474. link += argName + "=_"
  475. if i < len(funcData.argNames)-1 {
  476. link += "&"
  477. }
  478. }
  479. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  480. }
  481. buf.WriteString("</body></html>")
  482. w.Header().Set("Content-Type", "text/html")
  483. w.WriteHeader(200)
  484. w.Write(buf.Bytes())
  485. }