You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

356 lines
8.8 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package rpc
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/tendermint/tendermint/binary"
  7. "github.com/tendermint/tendermint/events"
  8. "golang.org/x/net/websocket"
  9. "io/ioutil"
  10. "net/http"
  11. "reflect"
  12. "time"
  13. )
  14. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  15. // HTTP endpoints
  16. for funcName, rpcFunc := range funcMap {
  17. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  18. }
  19. // JSONRPC endpoints
  20. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  21. }
  22. func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) {
  23. // websocket endpoint
  24. w := NewWebsocketManager(evsw)
  25. mux.Handle("/events", websocket.Handler(w.eventsHandler))
  26. }
  27. //-------------------------------------
  28. // function introspection
  29. // holds all type information for each function
  30. type RPCFunc struct {
  31. f reflect.Value // underlying rpc function
  32. args []reflect.Type // type of each function arg
  33. returns []reflect.Type // type of each return arg
  34. argNames []string // name of each argument
  35. }
  36. // wraps a function for quicker introspection
  37. func NewRPCFunc(f interface{}, args []string) *RPCFunc {
  38. return &RPCFunc{
  39. f: reflect.ValueOf(f),
  40. args: funcArgTypes(f),
  41. returns: funcReturnTypes(f),
  42. argNames: args,
  43. }
  44. }
  45. // return a function's argument types
  46. func funcArgTypes(f interface{}) []reflect.Type {
  47. t := reflect.TypeOf(f)
  48. n := t.NumIn()
  49. types := make([]reflect.Type, n)
  50. for i := 0; i < n; i++ {
  51. types[i] = t.In(i)
  52. }
  53. return types
  54. }
  55. // return a function's return types
  56. func funcReturnTypes(f interface{}) []reflect.Type {
  57. t := reflect.TypeOf(f)
  58. n := t.NumOut()
  59. types := make([]reflect.Type, n)
  60. for i := 0; i < n; i++ {
  61. types[i] = t.Out(i)
  62. }
  63. return types
  64. }
  65. // function introspection
  66. //-----------------------------------------------------------------------------
  67. // rpc.json
  68. // jsonrpc calls grab the given method's function info and runs reflect.Call
  69. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  70. return func(w http.ResponseWriter, r *http.Request) {
  71. if len(r.URL.Path) > 1 {
  72. WriteRPCResponse(w, NewRPCResponse(nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  73. return
  74. }
  75. b, _ := ioutil.ReadAll(r.Body)
  76. var request RPCRequest
  77. err := json.Unmarshal(b, &request)
  78. if err != nil {
  79. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  80. return
  81. }
  82. rpcFunc := funcMap[request.Method]
  83. if rpcFunc == nil {
  84. WriteRPCResponse(w, NewRPCResponse(nil, "RPC method unknown: "+request.Method))
  85. return
  86. }
  87. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  88. if err != nil {
  89. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  90. return
  91. }
  92. returns := rpcFunc.f.Call(args)
  93. response, err := unreflectResponse(returns)
  94. if err != nil {
  95. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  96. return
  97. }
  98. WriteRPCResponse(w, NewRPCResponse(response, ""))
  99. }
  100. }
  101. // covert a list of interfaces to properly typed values
  102. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  103. values := make([]reflect.Value, len(params))
  104. for i, p := range params {
  105. ty := rpcFunc.args[i]
  106. v, err := _jsonObjectToArg(ty, p)
  107. if err != nil {
  108. return nil, err
  109. }
  110. values[i] = v
  111. }
  112. return values, nil
  113. }
  114. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  115. var err error
  116. v := reflect.New(ty)
  117. binary.ReadJSONFromObject(v.Interface(), object, &err)
  118. if err != nil {
  119. return v, err
  120. }
  121. v = v.Elem()
  122. return v, nil
  123. }
  124. // rpc.json
  125. //-----------------------------------------------------------------------------
  126. // rpc.http
  127. // convert from a function name to the http handler
  128. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  129. return func(w http.ResponseWriter, r *http.Request) {
  130. args, err := httpParamsToArgs(rpcFunc, r)
  131. if err != nil {
  132. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  133. return
  134. }
  135. returns := rpcFunc.f.Call(args)
  136. response, err := unreflectResponse(returns)
  137. if err != nil {
  138. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  139. return
  140. }
  141. WriteRPCResponse(w, NewRPCResponse(response, ""))
  142. }
  143. }
  144. // Covert an http query to a list of properly typed values.
  145. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  146. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  147. argTypes := rpcFunc.args
  148. argNames := rpcFunc.argNames
  149. var err error
  150. values := make([]reflect.Value, len(argNames))
  151. for i, name := range argNames {
  152. ty := argTypes[i]
  153. arg := GetParam(r, name)
  154. values[i], err = _jsonStringToArg(ty, arg)
  155. if err != nil {
  156. return nil, err
  157. }
  158. }
  159. return values, nil
  160. }
  161. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  162. var err error
  163. v := reflect.New(ty)
  164. binary.ReadJSON(v.Interface(), []byte(arg), &err)
  165. if err != nil {
  166. return v, err
  167. }
  168. v = v.Elem()
  169. return v, nil
  170. }
  171. // rpc.http
  172. //-----------------------------------------------------------------------------
  173. // rpc.websocket
  174. // for requests coming in
  175. type WsRequest struct {
  176. Type string // subscribe or unsubscribe
  177. Event string
  178. }
  179. // for responses going out
  180. type WsResponse struct {
  181. Event string
  182. Data interface{}
  183. Error string
  184. }
  185. // a single websocket connection
  186. // contains the listeners id
  187. type Connection struct {
  188. id string
  189. wsCon *websocket.Conn
  190. writeChan chan WsResponse
  191. quitChan chan struct{}
  192. failedSends uint
  193. }
  194. // new websocket connection wrapper
  195. func NewConnection(con *websocket.Conn) *Connection {
  196. return &Connection{
  197. id: con.RemoteAddr().String(),
  198. wsCon: con,
  199. writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full
  200. }
  201. }
  202. // close the connection
  203. func (c *Connection) Close() {
  204. c.wsCon.Close()
  205. close(c.writeChan)
  206. close(c.quitChan)
  207. }
  208. // main manager for all websocket connections
  209. // holds the event switch
  210. type WebsocketManager struct {
  211. ew *events.EventSwitch
  212. cons map[string]*Connection
  213. }
  214. func NewWebsocketManager(ew *events.EventSwitch) *WebsocketManager {
  215. return &WebsocketManager{
  216. ew: ew,
  217. cons: make(map[string]*Connection),
  218. }
  219. }
  220. func (w *WebsocketManager) eventsHandler(con *websocket.Conn) {
  221. // register connection
  222. c := NewConnection(con)
  223. w.cons[con.RemoteAddr().String()] = c
  224. // read subscriptions/unsubscriptions to events
  225. go w.read(c)
  226. // write responses
  227. go w.write(c)
  228. }
  229. const (
  230. WsConnectionReaperSeconds = 5
  231. MaxFailedSendsSeconds = 10
  232. WriteChanBuffer = 10
  233. )
  234. // read from the socket and subscribe to or unsubscribe from events
  235. func (w *WebsocketManager) read(con *Connection) {
  236. reaper := time.Tick(time.Second * WsConnectionReaperSeconds)
  237. for {
  238. select {
  239. case <-reaper:
  240. if con.failedSends > MaxFailedSendsSeconds {
  241. // sending has failed too many times.
  242. // kill the connection
  243. con.quitChan <- struct{}{}
  244. }
  245. default:
  246. var in []byte
  247. if err := websocket.Message.Receive(con.wsCon, &in); err != nil {
  248. // an error reading the connection,
  249. // so kill the connection
  250. con.quitChan <- struct{}{}
  251. }
  252. var req WsRequest
  253. err := json.Unmarshal(in, &req)
  254. if err != nil {
  255. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  256. con.writeChan <- WsResponse{Error: errStr}
  257. }
  258. switch req.Type {
  259. case "subscribe":
  260. w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
  261. resp := WsResponse{
  262. Event: req.Event,
  263. Data: msg,
  264. }
  265. select {
  266. case con.writeChan <- resp:
  267. // yay
  268. con.failedSends = 0
  269. default:
  270. // channel is full
  271. // if this happens too many times,
  272. // close connection
  273. con.failedSends += 1
  274. }
  275. })
  276. case "unsubscribe":
  277. if req.Event != "" {
  278. w.ew.RemoveListenerForEvent(req.Event, con.id)
  279. } else {
  280. w.ew.RemoveListener(con.id)
  281. }
  282. default:
  283. con.writeChan <- WsResponse{Error: "Unknown request type: " + req.Type}
  284. }
  285. }
  286. }
  287. }
  288. // receives on a write channel and writes out to the socket
  289. func (w *WebsocketManager) write(con *Connection) {
  290. n, err := new(int64), new(error)
  291. for {
  292. select {
  293. case msg := <-con.writeChan:
  294. buf := new(bytes.Buffer)
  295. binary.WriteJSON(msg, buf, n, err)
  296. if *err != nil {
  297. log.Error("Failed to write JSON WsResponse", "error", err)
  298. } else {
  299. websocket.Message.Send(con.wsCon, buf.Bytes())
  300. }
  301. case <-con.quitChan:
  302. w.closeConn(con)
  303. return
  304. }
  305. }
  306. }
  307. // close a connection and delete from manager
  308. func (w *WebsocketManager) closeConn(con *Connection) {
  309. con.Close()
  310. delete(w.cons, con.id)
  311. }
  312. // rpc.websocket
  313. //-----------------------------------------------------------------------------
  314. // returns is Response struct and error. If error is not nil, return it
  315. func unreflectResponse(returns []reflect.Value) (interface{}, error) {
  316. errV := returns[1]
  317. if errV.Interface() != nil {
  318. return nil, fmt.Errorf("%v", errV.Interface())
  319. }
  320. return returns[0].Interface(), nil
  321. }