You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

439 lines
12 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package rpcserver
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "reflect"
  10. "sort"
  11. "time"
  12. "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/gorilla/websocket"
  13. "github.com/tendermint/tendermint/binary"
  14. . "github.com/tendermint/tendermint/common"
  15. "github.com/tendermint/tendermint/events"
  16. . "github.com/tendermint/tendermint/rpc/types"
  17. )
  18. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  19. // HTTP endpoints
  20. for funcName, rpcFunc := range funcMap {
  21. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  22. }
  23. // JSONRPC endpoints
  24. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  25. }
  26. func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) {
  27. // websocket endpoint
  28. wm := NewWebsocketManager(evsw)
  29. mux.HandleFunc("/events", wm.websocketHandler) // websocket.Handler(w.eventsHandler))
  30. }
  31. //-------------------------------------
  32. // function introspection
  33. // holds all type information for each function
  34. type RPCFunc struct {
  35. f reflect.Value // underlying rpc function
  36. args []reflect.Type // type of each function arg
  37. returns []reflect.Type // type of each return arg
  38. argNames []string // name of each argument
  39. }
  40. // wraps a function for quicker introspection
  41. func NewRPCFunc(f interface{}, args []string) *RPCFunc {
  42. return &RPCFunc{
  43. f: reflect.ValueOf(f),
  44. args: funcArgTypes(f),
  45. returns: funcReturnTypes(f),
  46. argNames: args,
  47. }
  48. }
  49. // return a function's argument types
  50. func funcArgTypes(f interface{}) []reflect.Type {
  51. t := reflect.TypeOf(f)
  52. n := t.NumIn()
  53. types := make([]reflect.Type, n)
  54. for i := 0; i < n; i++ {
  55. types[i] = t.In(i)
  56. }
  57. return types
  58. }
  59. // return a function's return types
  60. func funcReturnTypes(f interface{}) []reflect.Type {
  61. t := reflect.TypeOf(f)
  62. n := t.NumOut()
  63. types := make([]reflect.Type, n)
  64. for i := 0; i < n; i++ {
  65. types[i] = t.Out(i)
  66. }
  67. return types
  68. }
  69. // function introspection
  70. //-----------------------------------------------------------------------------
  71. // rpc.json
  72. // jsonrpc calls grab the given method's function info and runs reflect.Call
  73. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  74. return func(w http.ResponseWriter, r *http.Request) {
  75. if len(r.URL.Path) > 1 {
  76. WriteRPCResponse(w, NewRPCResponse(nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  77. return
  78. }
  79. b, _ := ioutil.ReadAll(r.Body)
  80. // if its an empty request (like from a browser),
  81. // just display a list of functions
  82. if len(b) == 0 {
  83. writeListOfEndpoints(w, r, funcMap)
  84. return
  85. }
  86. var request RPCRequest
  87. err := json.Unmarshal(b, &request)
  88. if err != nil {
  89. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  90. return
  91. }
  92. rpcFunc := funcMap[request.Method]
  93. if rpcFunc == nil {
  94. WriteRPCResponse(w, NewRPCResponse(nil, "RPC method unknown: "+request.Method))
  95. return
  96. }
  97. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  98. if err != nil {
  99. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  100. return
  101. }
  102. returns := rpcFunc.f.Call(args)
  103. log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  104. response, err := unreflectResponse(returns)
  105. if err != nil {
  106. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  107. return
  108. }
  109. WriteRPCResponse(w, NewRPCResponse(response, ""))
  110. }
  111. }
  112. // covert a list of interfaces to properly typed values
  113. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  114. if len(rpcFunc.argNames) != len(params) {
  115. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  116. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
  117. }
  118. values := make([]reflect.Value, len(params))
  119. for i, p := range params {
  120. ty := rpcFunc.args[i]
  121. v, err := _jsonObjectToArg(ty, p)
  122. if err != nil {
  123. return nil, err
  124. }
  125. values[i] = v
  126. }
  127. return values, nil
  128. }
  129. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  130. var err error
  131. v := reflect.New(ty)
  132. binary.ReadJSONObjectPtr(v.Interface(), object, &err)
  133. if err != nil {
  134. return v, err
  135. }
  136. v = v.Elem()
  137. return v, nil
  138. }
  139. // rpc.json
  140. //-----------------------------------------------------------------------------
  141. // rpc.http
  142. // convert from a function name to the http handler
  143. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  144. return func(w http.ResponseWriter, r *http.Request) {
  145. args, err := httpParamsToArgs(rpcFunc, r)
  146. if err != nil {
  147. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  148. return
  149. }
  150. returns := rpcFunc.f.Call(args)
  151. log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  152. response, err := unreflectResponse(returns)
  153. if err != nil {
  154. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  155. return
  156. }
  157. WriteRPCResponse(w, NewRPCResponse(response, ""))
  158. }
  159. }
  160. // Covert an http query to a list of properly typed values.
  161. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  162. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  163. argTypes := rpcFunc.args
  164. argNames := rpcFunc.argNames
  165. var err error
  166. values := make([]reflect.Value, len(argNames))
  167. for i, name := range argNames {
  168. ty := argTypes[i]
  169. arg := GetParam(r, name)
  170. values[i], err = _jsonStringToArg(ty, arg)
  171. if err != nil {
  172. return nil, err
  173. }
  174. }
  175. return values, nil
  176. }
  177. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  178. var err error
  179. v := reflect.New(ty)
  180. binary.ReadJSONPtr(v.Interface(), []byte(arg), &err)
  181. if err != nil {
  182. return v, err
  183. }
  184. v = v.Elem()
  185. return v, nil
  186. }
  187. // rpc.http
  188. //-----------------------------------------------------------------------------
  189. // rpc.websocket
  190. const (
  191. WSConnectionReaperSeconds = 5
  192. MaxFailedSends = 10
  193. WriteChanBufferSize = 10
  194. )
  195. // a single websocket connection
  196. // contains listener id, underlying ws connection,
  197. // and the event switch for subscribing to events
  198. type WSConnection struct {
  199. QuitService
  200. id string
  201. wsConn *websocket.Conn
  202. writeChan chan WSResponse
  203. failedSends int
  204. evsw *events.EventSwitch
  205. }
  206. // new websocket connection wrapper
  207. func NewWSConnection(wsConn *websocket.Conn) *WSConnection {
  208. con := &WSConnection{
  209. id: wsConn.RemoteAddr().String(),
  210. wsConn: wsConn,
  211. writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
  212. }
  213. con.QuitService = *NewQuitService(log, "WSConnection", con)
  214. return con
  215. }
  216. func (con *WSConnection) AfterStart() {
  217. // read subscriptions/unsubscriptions to events
  218. go con.read()
  219. // write responses
  220. con.write()
  221. }
  222. func (con *WSConnection) AfterStop() {
  223. con.evsw.RemoveListener(con.id)
  224. // the write loop closes the websocket connection
  225. // when it exits its loop, and the read loop
  226. // closes the writeChan
  227. }
  228. func (con *WSConnection) SetEventSwitch(evsw *events.EventSwitch) { con.evsw = evsw }
  229. // attempt to write response to writeChan and record failures
  230. func (con *WSConnection) safeWrite(resp WSResponse) {
  231. select {
  232. case con.writeChan <- resp:
  233. // yay
  234. con.failedSends = 0
  235. default:
  236. // channel is full
  237. // if this happens too many times in a row,
  238. // close connection
  239. con.failedSends += 1
  240. }
  241. }
  242. // read from the socket and subscribe to or unsubscribe from events
  243. func (con *WSConnection) read() {
  244. defer close(con.writeChan)
  245. reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
  246. for {
  247. select {
  248. // TODO: this actually doesn't work
  249. // since ReadMessage blocks. Really it needs its own
  250. // go routine
  251. case <-reaper:
  252. if con.failedSends > MaxFailedSends {
  253. // sending has failed too many times.
  254. // kill the connection
  255. con.Stop()
  256. return
  257. }
  258. default:
  259. var in []byte
  260. _, in, err := con.wsConn.ReadMessage()
  261. if err != nil {
  262. // an error reading the connection,
  263. // kill the connection
  264. con.Stop()
  265. return
  266. }
  267. var req WSRequest
  268. err = json.Unmarshal(in, &req)
  269. if err != nil {
  270. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  271. con.safeWrite(WSResponse{Error: errStr})
  272. continue
  273. }
  274. switch req.Type {
  275. case "subscribe":
  276. log.Notice("New event subscription", "con id", con.id, "event", req.Event)
  277. con.evsw.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
  278. resp := WSResponse{
  279. Event: req.Event,
  280. Data: msg,
  281. }
  282. con.safeWrite(resp)
  283. })
  284. case "unsubscribe":
  285. if req.Event != "" {
  286. con.evsw.RemoveListenerForEvent(req.Event, con.id)
  287. } else {
  288. con.evsw.RemoveListener(con.id)
  289. }
  290. default:
  291. con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type})
  292. }
  293. }
  294. }
  295. }
  296. // receives on a write channel and writes out on the socket
  297. func (con *WSConnection) write() {
  298. defer con.wsConn.Close()
  299. n, err := new(int64), new(error)
  300. for {
  301. select {
  302. case msg := <-con.writeChan:
  303. buf := new(bytes.Buffer)
  304. binary.WriteJSON(msg, buf, n, err)
  305. if *err != nil {
  306. log.Error("Failed to marshal WSResponse to JSON", "error", err)
  307. } else {
  308. if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
  309. log.Warn("Failed to write response on websocket", "error", err)
  310. con.Stop()
  311. return
  312. }
  313. }
  314. case <-con.Quit:
  315. return
  316. }
  317. }
  318. }
  319. // main manager for all websocket connections
  320. // holds the event switch
  321. type WebsocketManager struct {
  322. websocket.Upgrader
  323. evsw *events.EventSwitch
  324. }
  325. func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager {
  326. return &WebsocketManager{
  327. evsw: evsw,
  328. Upgrader: websocket.Upgrader{
  329. ReadBufferSize: 1024,
  330. WriteBufferSize: 1024,
  331. CheckOrigin: func(r *http.Request) bool {
  332. // TODO
  333. return true
  334. },
  335. },
  336. }
  337. }
  338. func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) {
  339. wsConn, err := wm.Upgrade(w, r, nil)
  340. if err != nil {
  341. // TODO - return http error
  342. log.Error("Failed to upgrade to websocket connection", "error", err)
  343. return
  344. }
  345. // register connection
  346. con := NewWSConnection(wsConn)
  347. log.Notice("New websocket connection", "origin", con.id)
  348. con.SetEventSwitch(wm.evsw)
  349. con.Start()
  350. }
  351. // rpc.websocket
  352. //-----------------------------------------------------------------------------
  353. // returns is Response struct and error. If error is not nil, return it
  354. func unreflectResponse(returns []reflect.Value) (interface{}, error) {
  355. errV := returns[1]
  356. if errV.Interface() != nil {
  357. return nil, fmt.Errorf("%v", errV.Interface())
  358. }
  359. return returns[0].Interface(), nil
  360. }
  361. // writes a list of available rpc endpoints as an html page
  362. func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
  363. noArgNames := []string{}
  364. argNames := []string{}
  365. for name, funcData := range funcMap {
  366. if len(funcData.args) == 0 {
  367. noArgNames = append(noArgNames, name)
  368. } else {
  369. argNames = append(argNames, name)
  370. }
  371. }
  372. sort.Strings(noArgNames)
  373. sort.Strings(argNames)
  374. buf := new(bytes.Buffer)
  375. buf.WriteString("<html><body>")
  376. buf.WriteString("<br>Available endpoints:<br>")
  377. for _, name := range noArgNames {
  378. link := fmt.Sprintf("http://%s/%s", r.Host, name)
  379. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  380. }
  381. buf.WriteString("<br>Endpoints that require arguments:<br>")
  382. for _, name := range argNames {
  383. link := fmt.Sprintf("http://%s/%s?", r.Host, name)
  384. funcData := funcMap[name]
  385. for i, argName := range funcData.argNames {
  386. link += argName + "=_"
  387. if i < len(funcData.argNames)-1 {
  388. link += "&"
  389. }
  390. }
  391. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  392. }
  393. buf.WriteString("</body></html>")
  394. w.Header().Set("Content-Type", "text/html")
  395. w.WriteHeader(200)
  396. w.Write(buf.Bytes())
  397. }