You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

407 lines
10 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package rpc
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/gorilla/websocket"
  8. "github.com/tendermint/tendermint/binary"
  9. "github.com/tendermint/tendermint/events"
  10. "io/ioutil"
  11. "net/http"
  12. "reflect"
  13. "sync/atomic"
  14. "time"
  15. )
  16. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  17. // HTTP endpoints
  18. for funcName, rpcFunc := range funcMap {
  19. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  20. }
  21. // JSONRPC endpoints
  22. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  23. }
  24. func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) {
  25. // websocket endpoint
  26. wm := NewWebsocketManager(evsw)
  27. mux.HandleFunc("/events", wm.websocketHandler) // websocket.Handler(w.eventsHandler))
  28. }
  29. //-------------------------------------
  30. // function introspection
  31. // holds all type information for each function
  32. type RPCFunc struct {
  33. f reflect.Value // underlying rpc function
  34. args []reflect.Type // type of each function arg
  35. returns []reflect.Type // type of each return arg
  36. argNames []string // name of each argument
  37. }
  38. // wraps a function for quicker introspection
  39. func NewRPCFunc(f interface{}, args []string) *RPCFunc {
  40. return &RPCFunc{
  41. f: reflect.ValueOf(f),
  42. args: funcArgTypes(f),
  43. returns: funcReturnTypes(f),
  44. argNames: args,
  45. }
  46. }
  47. // return a function's argument types
  48. func funcArgTypes(f interface{}) []reflect.Type {
  49. t := reflect.TypeOf(f)
  50. n := t.NumIn()
  51. types := make([]reflect.Type, n)
  52. for i := 0; i < n; i++ {
  53. types[i] = t.In(i)
  54. }
  55. return types
  56. }
  57. // return a function's return types
  58. func funcReturnTypes(f interface{}) []reflect.Type {
  59. t := reflect.TypeOf(f)
  60. n := t.NumOut()
  61. types := make([]reflect.Type, n)
  62. for i := 0; i < n; i++ {
  63. types[i] = t.Out(i)
  64. }
  65. return types
  66. }
  67. // function introspection
  68. //-----------------------------------------------------------------------------
  69. // rpc.json
  70. // jsonrpc calls grab the given method's function info and runs reflect.Call
  71. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  72. return func(w http.ResponseWriter, r *http.Request) {
  73. if len(r.URL.Path) > 1 {
  74. WriteRPCResponse(w, NewRPCResponse(nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  75. return
  76. }
  77. b, _ := ioutil.ReadAll(r.Body)
  78. var request RPCRequest
  79. err := json.Unmarshal(b, &request)
  80. if err != nil {
  81. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  82. return
  83. }
  84. rpcFunc := funcMap[request.Method]
  85. if rpcFunc == nil {
  86. WriteRPCResponse(w, NewRPCResponse(nil, "RPC method unknown: "+request.Method))
  87. return
  88. }
  89. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  90. if err != nil {
  91. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  92. return
  93. }
  94. returns := rpcFunc.f.Call(args)
  95. log.Debug("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  96. response, err := unreflectResponse(returns)
  97. if err != nil {
  98. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  99. return
  100. }
  101. WriteRPCResponse(w, NewRPCResponse(response, ""))
  102. }
  103. }
  104. // covert a list of interfaces to properly typed values
  105. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  106. if len(rpcFunc.argNames) != len(params) {
  107. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  108. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
  109. }
  110. values := make([]reflect.Value, len(params))
  111. for i, p := range params {
  112. ty := rpcFunc.args[i]
  113. v, err := _jsonObjectToArg(ty, p)
  114. if err != nil {
  115. return nil, err
  116. }
  117. values[i] = v
  118. }
  119. return values, nil
  120. }
  121. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  122. var err error
  123. v := reflect.New(ty)
  124. binary.ReadJSONObject(v.Interface(), object, &err)
  125. if err != nil {
  126. return v, err
  127. }
  128. v = v.Elem()
  129. return v, nil
  130. }
  131. // rpc.json
  132. //-----------------------------------------------------------------------------
  133. // rpc.http
  134. // convert from a function name to the http handler
  135. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  136. return func(w http.ResponseWriter, r *http.Request) {
  137. args, err := httpParamsToArgs(rpcFunc, r)
  138. if err != nil {
  139. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  140. return
  141. }
  142. returns := rpcFunc.f.Call(args)
  143. log.Debug("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  144. response, err := unreflectResponse(returns)
  145. if err != nil {
  146. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  147. return
  148. }
  149. WriteRPCResponse(w, NewRPCResponse(response, ""))
  150. }
  151. }
  152. // Covert an http query to a list of properly typed values.
  153. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  154. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  155. argTypes := rpcFunc.args
  156. argNames := rpcFunc.argNames
  157. var err error
  158. values := make([]reflect.Value, len(argNames))
  159. for i, name := range argNames {
  160. ty := argTypes[i]
  161. arg := GetParam(r, name)
  162. values[i], err = _jsonStringToArg(ty, arg)
  163. if err != nil {
  164. return nil, err
  165. }
  166. }
  167. return values, nil
  168. }
  169. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  170. var err error
  171. v := reflect.New(ty)
  172. binary.ReadJSON(v.Interface(), []byte(arg), &err)
  173. if err != nil {
  174. return v, err
  175. }
  176. v = v.Elem()
  177. return v, nil
  178. }
  179. // rpc.http
  180. //-----------------------------------------------------------------------------
  181. // rpc.websocket
  182. const (
  183. WSConnectionReaperSeconds = 5
  184. MaxFailedSends = 10
  185. WriteChanBufferSize = 10
  186. )
  187. // for requests coming in
  188. type WSRequest struct {
  189. Type string // subscribe or unsubscribe
  190. Event string
  191. }
  192. // for responses going out
  193. type WSResponse struct {
  194. Event string
  195. Data interface{}
  196. Error string
  197. }
  198. // a single websocket connection
  199. // contains listener id, underlying ws connection,
  200. // and the event switch for subscribing to events
  201. type WSConnection struct {
  202. id string
  203. wsConn *websocket.Conn
  204. writeChan chan WSResponse
  205. quitChan chan struct{}
  206. failedSends uint
  207. started uint32
  208. stopped uint32
  209. evsw *events.EventSwitch
  210. }
  211. // new websocket connection wrapper
  212. func NewWSConnection(wsConn *websocket.Conn) *WSConnection {
  213. return &WSConnection{
  214. id: wsConn.RemoteAddr().String(),
  215. wsConn: wsConn,
  216. writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
  217. quitChan: make(chan struct{}),
  218. }
  219. }
  220. // start the connection and hand her the event switch
  221. func (con *WSConnection) Start(evsw *events.EventSwitch) {
  222. if atomic.CompareAndSwapUint32(&con.started, 0, 1) {
  223. con.evsw = evsw
  224. // read subscriptions/unsubscriptions to events
  225. go con.read()
  226. // write responses
  227. con.write()
  228. }
  229. }
  230. // close the connection
  231. func (con *WSConnection) Stop() {
  232. if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) {
  233. con.evsw.RemoveListener(con.id)
  234. close(con.quitChan)
  235. // the write loop closes the websocket connection
  236. // when it exits its loop, and the read loop
  237. // closes the writeChan
  238. }
  239. }
  240. // attempt to write response to writeChan and record failures
  241. func (con *WSConnection) safeWrite(resp WSResponse) {
  242. select {
  243. case con.writeChan <- resp:
  244. // yay
  245. con.failedSends = 0
  246. default:
  247. // channel is full
  248. // if this happens too many times in a row,
  249. // close connection
  250. con.failedSends += 1
  251. }
  252. }
  253. // read from the socket and subscribe to or unsubscribe from events
  254. func (con *WSConnection) read() {
  255. defer close(con.writeChan)
  256. reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
  257. for {
  258. select {
  259. // TODO: this actually doesn't work
  260. // since ReadMessage blocks. Really it needs its own
  261. // go routine
  262. case <-reaper:
  263. if con.failedSends > MaxFailedSends {
  264. // sending has failed too many times.
  265. // kill the connection
  266. con.Stop()
  267. return
  268. }
  269. default:
  270. var in []byte
  271. _, in, err := con.wsConn.ReadMessage()
  272. if err != nil {
  273. // an error reading the connection,
  274. // kill the connection
  275. con.Stop()
  276. return
  277. }
  278. var req WSRequest
  279. err = json.Unmarshal(in, &req)
  280. if err != nil {
  281. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  282. con.safeWrite(WSResponse{Error: errStr})
  283. continue
  284. }
  285. switch req.Type {
  286. case "subscribe":
  287. log.Info("New event subscription", "con id", con.id, "event", req.Event)
  288. con.evsw.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
  289. resp := WSResponse{
  290. Event: req.Event,
  291. Data: msg,
  292. }
  293. con.safeWrite(resp)
  294. })
  295. case "unsubscribe":
  296. if req.Event != "" {
  297. con.evsw.RemoveListenerForEvent(req.Event, con.id)
  298. } else {
  299. con.evsw.RemoveListener(con.id)
  300. }
  301. default:
  302. con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type})
  303. }
  304. }
  305. }
  306. }
  307. // receives on a write channel and writes out on the socket
  308. func (con *WSConnection) write() {
  309. defer con.wsConn.Close()
  310. n, err := new(int64), new(error)
  311. for {
  312. select {
  313. case msg := <-con.writeChan:
  314. buf := new(bytes.Buffer)
  315. binary.WriteJSON(msg, buf, n, err)
  316. if *err != nil {
  317. log.Error("Failed to marshal WSResponse to JSON", "error", err)
  318. } else {
  319. if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
  320. log.Error("Failed to write response on websocket", "error", err)
  321. con.Stop()
  322. return
  323. }
  324. }
  325. case <-con.quitChan:
  326. return
  327. }
  328. }
  329. }
  330. // main manager for all websocket connections
  331. // holds the event switch
  332. type WebsocketManager struct {
  333. websocket.Upgrader
  334. evsw *events.EventSwitch
  335. }
  336. func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager {
  337. return &WebsocketManager{
  338. evsw: evsw,
  339. Upgrader: websocket.Upgrader{
  340. ReadBufferSize: 1024,
  341. WriteBufferSize: 1024,
  342. CheckOrigin: func(r *http.Request) bool {
  343. // TODO
  344. return true
  345. },
  346. },
  347. }
  348. }
  349. func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) {
  350. wsConn, err := wm.Upgrade(w, r, nil)
  351. if err != nil {
  352. // TODO - return http error
  353. log.Error("Failed to upgrade to websocket connection", "error", err)
  354. return
  355. }
  356. // register connection
  357. con := NewWSConnection(wsConn)
  358. log.Info("New websocket connection", "origin", con.id)
  359. con.Start(wm.evsw)
  360. }
  361. // rpc.websocket
  362. //-----------------------------------------------------------------------------
  363. // returns is Response struct and error. If error is not nil, return it
  364. func unreflectResponse(returns []reflect.Value) (interface{}, error) {
  365. errV := returns[1]
  366. if errV.Interface() != nil {
  367. return nil, fmt.Errorf("%v", errV.Interface())
  368. }
  369. return returns[0].Interface(), nil
  370. }