You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

363 lines
9.1 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package rpc
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/tendermint/tendermint/binary"
  8. "github.com/tendermint/tendermint/events"
  9. "golang.org/x/net/websocket"
  10. "io/ioutil"
  11. "net/http"
  12. "reflect"
  13. "time"
  14. )
  15. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  16. // HTTP endpoints
  17. for funcName, rpcFunc := range funcMap {
  18. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  19. }
  20. // JSONRPC endpoints
  21. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  22. }
  23. func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) {
  24. // websocket endpoint
  25. w := NewWebsocketManager(evsw)
  26. mux.Handle("/events", websocket.Handler(w.eventsHandler))
  27. }
  28. //-------------------------------------
  29. // function introspection
  30. // holds all type information for each function
  31. type RPCFunc struct {
  32. f reflect.Value // underlying rpc function
  33. args []reflect.Type // type of each function arg
  34. returns []reflect.Type // type of each return arg
  35. argNames []string // name of each argument
  36. }
  37. // wraps a function for quicker introspection
  38. func NewRPCFunc(f interface{}, args []string) *RPCFunc {
  39. return &RPCFunc{
  40. f: reflect.ValueOf(f),
  41. args: funcArgTypes(f),
  42. returns: funcReturnTypes(f),
  43. argNames: args,
  44. }
  45. }
  46. // return a function's argument types
  47. func funcArgTypes(f interface{}) []reflect.Type {
  48. t := reflect.TypeOf(f)
  49. n := t.NumIn()
  50. types := make([]reflect.Type, n)
  51. for i := 0; i < n; i++ {
  52. types[i] = t.In(i)
  53. }
  54. return types
  55. }
  56. // return a function's return types
  57. func funcReturnTypes(f interface{}) []reflect.Type {
  58. t := reflect.TypeOf(f)
  59. n := t.NumOut()
  60. types := make([]reflect.Type, n)
  61. for i := 0; i < n; i++ {
  62. types[i] = t.Out(i)
  63. }
  64. return types
  65. }
  66. // function introspection
  67. //-----------------------------------------------------------------------------
  68. // rpc.json
  69. // jsonrpc calls grab the given method's function info and runs reflect.Call
  70. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  71. return func(w http.ResponseWriter, r *http.Request) {
  72. if len(r.URL.Path) > 1 {
  73. WriteRPCResponse(w, NewRPCResponse(nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  74. return
  75. }
  76. b, _ := ioutil.ReadAll(r.Body)
  77. var request RPCRequest
  78. err := json.Unmarshal(b, &request)
  79. if err != nil {
  80. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  81. return
  82. }
  83. rpcFunc := funcMap[request.Method]
  84. if rpcFunc == nil {
  85. WriteRPCResponse(w, NewRPCResponse(nil, "RPC method unknown: "+request.Method))
  86. return
  87. }
  88. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  89. if err != nil {
  90. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  91. return
  92. }
  93. returns := rpcFunc.f.Call(args)
  94. log.Debug("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  95. response, err := unreflectResponse(returns)
  96. if err != nil {
  97. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  98. return
  99. }
  100. WriteRPCResponse(w, NewRPCResponse(response, ""))
  101. }
  102. }
  103. // covert a list of interfaces to properly typed values
  104. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  105. if len(rpcFunc.argNames) != len(params) {
  106. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  107. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
  108. }
  109. values := make([]reflect.Value, len(params))
  110. for i, p := range params {
  111. ty := rpcFunc.args[i]
  112. v, err := _jsonObjectToArg(ty, p)
  113. if err != nil {
  114. return nil, err
  115. }
  116. values[i] = v
  117. }
  118. return values, nil
  119. }
  120. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  121. var err error
  122. v := reflect.New(ty)
  123. binary.ReadJSONObject(v.Interface(), object, &err)
  124. if err != nil {
  125. return v, err
  126. }
  127. v = v.Elem()
  128. return v, nil
  129. }
  130. // rpc.json
  131. //-----------------------------------------------------------------------------
  132. // rpc.http
  133. // convert from a function name to the http handler
  134. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  135. return func(w http.ResponseWriter, r *http.Request) {
  136. args, err := httpParamsToArgs(rpcFunc, r)
  137. if err != nil {
  138. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  139. return
  140. }
  141. returns := rpcFunc.f.Call(args)
  142. log.Debug("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  143. response, err := unreflectResponse(returns)
  144. if err != nil {
  145. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  146. return
  147. }
  148. WriteRPCResponse(w, NewRPCResponse(response, ""))
  149. }
  150. }
  151. // Covert an http query to a list of properly typed values.
  152. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  153. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  154. argTypes := rpcFunc.args
  155. argNames := rpcFunc.argNames
  156. var err error
  157. values := make([]reflect.Value, len(argNames))
  158. for i, name := range argNames {
  159. ty := argTypes[i]
  160. arg := GetParam(r, name)
  161. values[i], err = _jsonStringToArg(ty, arg)
  162. if err != nil {
  163. return nil, err
  164. }
  165. }
  166. return values, nil
  167. }
  168. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  169. var err error
  170. v := reflect.New(ty)
  171. binary.ReadJSON(v.Interface(), []byte(arg), &err)
  172. if err != nil {
  173. return v, err
  174. }
  175. v = v.Elem()
  176. return v, nil
  177. }
  178. // rpc.http
  179. //-----------------------------------------------------------------------------
  180. // rpc.websocket
  181. // for requests coming in
  182. type WsRequest struct {
  183. Type string // subscribe or unsubscribe
  184. Event string
  185. }
  186. // for responses going out
  187. type WsResponse struct {
  188. Event string
  189. Data interface{}
  190. Error string
  191. }
  192. // a single websocket connection
  193. // contains the listeners id
  194. type Connection struct {
  195. id string
  196. wsCon *websocket.Conn
  197. writeChan chan WsResponse
  198. quitChan chan struct{}
  199. failedSends uint
  200. }
  201. // new websocket connection wrapper
  202. func NewConnection(con *websocket.Conn) *Connection {
  203. return &Connection{
  204. id: con.RemoteAddr().String(),
  205. wsCon: con,
  206. writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full
  207. }
  208. }
  209. // close the connection
  210. func (c *Connection) Close() {
  211. c.wsCon.Close()
  212. close(c.writeChan)
  213. close(c.quitChan)
  214. }
  215. // main manager for all websocket connections
  216. // holds the event switch
  217. type WebsocketManager struct {
  218. ew *events.EventSwitch
  219. cons map[string]*Connection
  220. }
  221. func NewWebsocketManager(ew *events.EventSwitch) *WebsocketManager {
  222. return &WebsocketManager{
  223. ew: ew,
  224. cons: make(map[string]*Connection),
  225. }
  226. }
  227. func (w *WebsocketManager) eventsHandler(con *websocket.Conn) {
  228. // register connection
  229. c := NewConnection(con)
  230. w.cons[con.RemoteAddr().String()] = c
  231. // read subscriptions/unsubscriptions to events
  232. go w.read(c)
  233. // write responses
  234. go w.write(c)
  235. }
  236. const (
  237. WsConnectionReaperSeconds = 5
  238. MaxFailedSendsSeconds = 10
  239. WriteChanBuffer = 10
  240. )
  241. // read from the socket and subscribe to or unsubscribe from events
  242. func (w *WebsocketManager) read(con *Connection) {
  243. reaper := time.Tick(time.Second * WsConnectionReaperSeconds)
  244. for {
  245. select {
  246. case <-reaper:
  247. if con.failedSends > MaxFailedSendsSeconds {
  248. // sending has failed too many times.
  249. // kill the connection
  250. con.quitChan <- struct{}{}
  251. }
  252. default:
  253. var in []byte
  254. if err := websocket.Message.Receive(con.wsCon, &in); err != nil {
  255. // an error reading the connection,
  256. // so kill the connection
  257. con.quitChan <- struct{}{}
  258. }
  259. var req WsRequest
  260. err := json.Unmarshal(in, &req)
  261. if err != nil {
  262. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  263. con.writeChan <- WsResponse{Error: errStr}
  264. }
  265. switch req.Type {
  266. case "subscribe":
  267. w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
  268. resp := WsResponse{
  269. Event: req.Event,
  270. Data: msg,
  271. }
  272. select {
  273. case con.writeChan <- resp:
  274. // yay
  275. con.failedSends = 0
  276. default:
  277. // channel is full
  278. // if this happens too many times,
  279. // close connection
  280. con.failedSends += 1
  281. }
  282. })
  283. case "unsubscribe":
  284. if req.Event != "" {
  285. w.ew.RemoveListenerForEvent(req.Event, con.id)
  286. } else {
  287. w.ew.RemoveListener(con.id)
  288. }
  289. default:
  290. con.writeChan <- WsResponse{Error: "Unknown request type: " + req.Type}
  291. }
  292. }
  293. }
  294. }
  295. // receives on a write channel and writes out to the socket
  296. func (w *WebsocketManager) write(con *Connection) {
  297. n, err := new(int64), new(error)
  298. for {
  299. select {
  300. case msg := <-con.writeChan:
  301. buf := new(bytes.Buffer)
  302. binary.WriteJSON(msg, buf, n, err)
  303. if *err != nil {
  304. log.Error("Failed to write JSON WsResponse", "error", err)
  305. } else {
  306. websocket.Message.Send(con.wsCon, buf.Bytes())
  307. }
  308. case <-con.quitChan:
  309. w.closeConn(con)
  310. return
  311. }
  312. }
  313. }
  314. // close a connection and delete from manager
  315. func (w *WebsocketManager) closeConn(con *Connection) {
  316. con.Close()
  317. delete(w.cons, con.id)
  318. }
  319. // rpc.websocket
  320. //-----------------------------------------------------------------------------
  321. // returns is Response struct and error. If error is not nil, return it
  322. func unreflectResponse(returns []reflect.Value) (interface{}, error) {
  323. errV := returns[1]
  324. if errV.Interface() != nil {
  325. return nil, fmt.Errorf("%v", errV.Interface())
  326. }
  327. return returns[0].Interface(), nil
  328. }