You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

445 lines
12 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package rpcserver
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "reflect"
  10. "sort"
  11. "sync/atomic"
  12. "time"
  13. "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/gorilla/websocket"
  14. "github.com/tendermint/tendermint/binary"
  15. "github.com/tendermint/tendermint/events"
  16. . "github.com/tendermint/tendermint/rpc/types"
  17. )
  18. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  19. // HTTP endpoints
  20. for funcName, rpcFunc := range funcMap {
  21. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  22. }
  23. // JSONRPC endpoints
  24. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  25. }
  26. func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) {
  27. // websocket endpoint
  28. wm := NewWebsocketManager(evsw)
  29. mux.HandleFunc("/events", wm.websocketHandler) // websocket.Handler(w.eventsHandler))
  30. }
  31. //-------------------------------------
  32. // function introspection
  33. // holds all type information for each function
  34. type RPCFunc struct {
  35. f reflect.Value // underlying rpc function
  36. args []reflect.Type // type of each function arg
  37. returns []reflect.Type // type of each return arg
  38. argNames []string // name of each argument
  39. }
  40. // wraps a function for quicker introspection
  41. func NewRPCFunc(f interface{}, args []string) *RPCFunc {
  42. return &RPCFunc{
  43. f: reflect.ValueOf(f),
  44. args: funcArgTypes(f),
  45. returns: funcReturnTypes(f),
  46. argNames: args,
  47. }
  48. }
  49. // return a function's argument types
  50. func funcArgTypes(f interface{}) []reflect.Type {
  51. t := reflect.TypeOf(f)
  52. n := t.NumIn()
  53. types := make([]reflect.Type, n)
  54. for i := 0; i < n; i++ {
  55. types[i] = t.In(i)
  56. }
  57. return types
  58. }
  59. // return a function's return types
  60. func funcReturnTypes(f interface{}) []reflect.Type {
  61. t := reflect.TypeOf(f)
  62. n := t.NumOut()
  63. types := make([]reflect.Type, n)
  64. for i := 0; i < n; i++ {
  65. types[i] = t.Out(i)
  66. }
  67. return types
  68. }
  69. // function introspection
  70. //-----------------------------------------------------------------------------
  71. // rpc.json
  72. // jsonrpc calls grab the given method's function info and runs reflect.Call
  73. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  74. return func(w http.ResponseWriter, r *http.Request) {
  75. if len(r.URL.Path) > 1 {
  76. WriteRPCResponse(w, NewRPCResponse(nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  77. return
  78. }
  79. b, _ := ioutil.ReadAll(r.Body)
  80. // if its an empty request (like from a browser),
  81. // just display a list of functions
  82. if len(b) == 0 {
  83. writeListOfEndpoints(w, r, funcMap)
  84. return
  85. }
  86. var request RPCRequest
  87. err := json.Unmarshal(b, &request)
  88. if err != nil {
  89. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  90. return
  91. }
  92. rpcFunc := funcMap[request.Method]
  93. if rpcFunc == nil {
  94. WriteRPCResponse(w, NewRPCResponse(nil, "RPC method unknown: "+request.Method))
  95. return
  96. }
  97. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  98. if err != nil {
  99. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  100. return
  101. }
  102. returns := rpcFunc.f.Call(args)
  103. log.Debug("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  104. response, err := unreflectResponse(returns)
  105. if err != nil {
  106. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  107. return
  108. }
  109. WriteRPCResponse(w, NewRPCResponse(response, ""))
  110. }
  111. }
  112. // covert a list of interfaces to properly typed values
  113. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  114. if len(rpcFunc.argNames) != len(params) {
  115. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  116. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
  117. }
  118. values := make([]reflect.Value, len(params))
  119. for i, p := range params {
  120. ty := rpcFunc.args[i]
  121. v, err := _jsonObjectToArg(ty, p)
  122. if err != nil {
  123. return nil, err
  124. }
  125. values[i] = v
  126. }
  127. return values, nil
  128. }
  129. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  130. var err error
  131. v := reflect.New(ty)
  132. binary.ReadJSONObjectPtr(v.Interface(), object, &err)
  133. if err != nil {
  134. return v, err
  135. }
  136. v = v.Elem()
  137. return v, nil
  138. }
  139. // rpc.json
  140. //-----------------------------------------------------------------------------
  141. // rpc.http
  142. // convert from a function name to the http handler
  143. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  144. return func(w http.ResponseWriter, r *http.Request) {
  145. args, err := httpParamsToArgs(rpcFunc, r)
  146. if err != nil {
  147. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  148. return
  149. }
  150. returns := rpcFunc.f.Call(args)
  151. log.Debug("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  152. response, err := unreflectResponse(returns)
  153. if err != nil {
  154. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  155. return
  156. }
  157. WriteRPCResponse(w, NewRPCResponse(response, ""))
  158. }
  159. }
  160. // Covert an http query to a list of properly typed values.
  161. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  162. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  163. argTypes := rpcFunc.args
  164. argNames := rpcFunc.argNames
  165. var err error
  166. values := make([]reflect.Value, len(argNames))
  167. for i, name := range argNames {
  168. ty := argTypes[i]
  169. arg := GetParam(r, name)
  170. values[i], err = _jsonStringToArg(ty, arg)
  171. if err != nil {
  172. return nil, err
  173. }
  174. }
  175. return values, nil
  176. }
  177. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  178. var err error
  179. v := reflect.New(ty)
  180. binary.ReadJSONPtr(v.Interface(), []byte(arg), &err)
  181. if err != nil {
  182. return v, err
  183. }
  184. v = v.Elem()
  185. return v, nil
  186. }
  187. // rpc.http
  188. //-----------------------------------------------------------------------------
  189. // rpc.websocket
  190. const (
  191. WSConnectionReaperSeconds = 5
  192. MaxFailedSends = 10
  193. WriteChanBufferSize = 10
  194. )
  195. // a single websocket connection
  196. // contains listener id, underlying ws connection,
  197. // and the event switch for subscribing to events
  198. type WSConnection struct {
  199. id string
  200. wsConn *websocket.Conn
  201. writeChan chan WSResponse
  202. quitChan chan struct{}
  203. failedSends int
  204. started uint32
  205. stopped uint32
  206. evsw *events.EventSwitch
  207. }
  208. // new websocket connection wrapper
  209. func NewWSConnection(wsConn *websocket.Conn) *WSConnection {
  210. return &WSConnection{
  211. id: wsConn.RemoteAddr().String(),
  212. wsConn: wsConn,
  213. writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
  214. quitChan: make(chan struct{}),
  215. }
  216. }
  217. // start the connection and hand her the event switch
  218. func (con *WSConnection) Start(evsw *events.EventSwitch) {
  219. if atomic.CompareAndSwapUint32(&con.started, 0, 1) {
  220. con.evsw = evsw
  221. // read subscriptions/unsubscriptions to events
  222. go con.read()
  223. // write responses
  224. con.write()
  225. }
  226. }
  227. // close the connection
  228. func (con *WSConnection) Stop() {
  229. if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) {
  230. con.evsw.RemoveListener(con.id)
  231. close(con.quitChan)
  232. // the write loop closes the websocket connection
  233. // when it exits its loop, and the read loop
  234. // closes the writeChan
  235. }
  236. }
  237. // attempt to write response to writeChan and record failures
  238. func (con *WSConnection) safeWrite(resp WSResponse) {
  239. select {
  240. case con.writeChan <- resp:
  241. // yay
  242. con.failedSends = 0
  243. default:
  244. // channel is full
  245. // if this happens too many times in a row,
  246. // close connection
  247. con.failedSends += 1
  248. }
  249. }
  250. // read from the socket and subscribe to or unsubscribe from events
  251. func (con *WSConnection) read() {
  252. defer close(con.writeChan)
  253. reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
  254. for {
  255. select {
  256. // TODO: this actually doesn't work
  257. // since ReadMessage blocks. Really it needs its own
  258. // go routine
  259. case <-reaper:
  260. if con.failedSends > MaxFailedSends {
  261. // sending has failed too many times.
  262. // kill the connection
  263. con.Stop()
  264. return
  265. }
  266. default:
  267. var in []byte
  268. _, in, err := con.wsConn.ReadMessage()
  269. if err != nil {
  270. // an error reading the connection,
  271. // kill the connection
  272. con.Stop()
  273. return
  274. }
  275. var req WSRequest
  276. err = json.Unmarshal(in, &req)
  277. if err != nil {
  278. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  279. con.safeWrite(WSResponse{Error: errStr})
  280. continue
  281. }
  282. switch req.Type {
  283. case "subscribe":
  284. log.Info("New event subscription", "con id", con.id, "event", req.Event)
  285. con.evsw.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
  286. resp := WSResponse{
  287. Event: req.Event,
  288. Data: msg,
  289. }
  290. con.safeWrite(resp)
  291. })
  292. case "unsubscribe":
  293. if req.Event != "" {
  294. con.evsw.RemoveListenerForEvent(req.Event, con.id)
  295. } else {
  296. con.evsw.RemoveListener(con.id)
  297. }
  298. default:
  299. con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type})
  300. }
  301. }
  302. }
  303. }
  304. // receives on a write channel and writes out on the socket
  305. func (con *WSConnection) write() {
  306. defer con.wsConn.Close()
  307. n, err := new(int64), new(error)
  308. for {
  309. select {
  310. case msg := <-con.writeChan:
  311. buf := new(bytes.Buffer)
  312. binary.WriteJSON(msg, buf, n, err)
  313. if *err != nil {
  314. log.Error("Failed to marshal WSResponse to JSON", "error", err)
  315. } else {
  316. if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
  317. log.Warn("Failed to write response on websocket", "error", err)
  318. con.Stop()
  319. return
  320. }
  321. }
  322. case <-con.quitChan:
  323. return
  324. }
  325. }
  326. }
  327. // main manager for all websocket connections
  328. // holds the event switch
  329. type WebsocketManager struct {
  330. websocket.Upgrader
  331. evsw *events.EventSwitch
  332. }
  333. func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager {
  334. return &WebsocketManager{
  335. evsw: evsw,
  336. Upgrader: websocket.Upgrader{
  337. ReadBufferSize: 1024,
  338. WriteBufferSize: 1024,
  339. CheckOrigin: func(r *http.Request) bool {
  340. // TODO
  341. return true
  342. },
  343. },
  344. }
  345. }
  346. func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) {
  347. wsConn, err := wm.Upgrade(w, r, nil)
  348. if err != nil {
  349. // TODO - return http error
  350. log.Error("Failed to upgrade to websocket connection", "error", err)
  351. return
  352. }
  353. // register connection
  354. con := NewWSConnection(wsConn)
  355. log.Info("New websocket connection", "origin", con.id)
  356. con.Start(wm.evsw)
  357. }
  358. // rpc.websocket
  359. //-----------------------------------------------------------------------------
  360. // returns is Response struct and error. If error is not nil, return it
  361. func unreflectResponse(returns []reflect.Value) (interface{}, error) {
  362. errV := returns[1]
  363. if errV.Interface() != nil {
  364. return nil, fmt.Errorf("%v", errV.Interface())
  365. }
  366. return returns[0].Interface(), nil
  367. }
  368. // writes a list of available rpc endpoints as an html page
  369. func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
  370. noArgNames := []string{}
  371. argNames := []string{}
  372. for name, funcData := range funcMap {
  373. if len(funcData.args) == 0 {
  374. noArgNames = append(noArgNames, name)
  375. } else {
  376. argNames = append(argNames, name)
  377. }
  378. }
  379. sort.Strings(noArgNames)
  380. sort.Strings(argNames)
  381. buf := new(bytes.Buffer)
  382. buf.WriteString("<html><body>")
  383. buf.WriteString("<br>Available endpoints:<br>")
  384. for _, name := range noArgNames {
  385. link := fmt.Sprintf("http://%s/%s", r.Host, name)
  386. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  387. }
  388. buf.WriteString("<br>Endpoints that require arguments:<br>")
  389. for _, name := range argNames {
  390. link := fmt.Sprintf("http://%s/%s?", r.Host, name)
  391. funcData := funcMap[name]
  392. for i, argName := range funcData.argNames {
  393. link += argName + "=_"
  394. if i < len(funcData.argNames)-1 {
  395. link += "&"
  396. }
  397. }
  398. buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
  399. }
  400. buf.WriteString("</body></html>")
  401. w.Header().Set("Content-Type", "text/html")
  402. w.WriteHeader(200)
  403. w.Write(buf.Bytes())
  404. }