You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

403 lines
10 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package rpc
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/gorilla/websocket"
  8. "github.com/tendermint/tendermint/binary"
  9. "github.com/tendermint/tendermint/events"
  10. "io/ioutil"
  11. "net/http"
  12. "reflect"
  13. "sync/atomic"
  14. "time"
  15. )
  16. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  17. // HTTP endpoints
  18. for funcName, rpcFunc := range funcMap {
  19. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  20. }
  21. // JSONRPC endpoints
  22. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  23. }
  24. func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) {
  25. // websocket endpoint
  26. wm := NewWebsocketManager(evsw)
  27. mux.HandleFunc("/events", wm.websocketHandler) // websocket.Handler(w.eventsHandler))
  28. }
  29. //-------------------------------------
  30. // function introspection
  31. // holds all type information for each function
  32. type RPCFunc struct {
  33. f reflect.Value // underlying rpc function
  34. args []reflect.Type // type of each function arg
  35. returns []reflect.Type // type of each return arg
  36. argNames []string // name of each argument
  37. }
  38. // wraps a function for quicker introspection
  39. func NewRPCFunc(f interface{}, args []string) *RPCFunc {
  40. return &RPCFunc{
  41. f: reflect.ValueOf(f),
  42. args: funcArgTypes(f),
  43. returns: funcReturnTypes(f),
  44. argNames: args,
  45. }
  46. }
  47. // return a function's argument types
  48. func funcArgTypes(f interface{}) []reflect.Type {
  49. t := reflect.TypeOf(f)
  50. n := t.NumIn()
  51. types := make([]reflect.Type, n)
  52. for i := 0; i < n; i++ {
  53. types[i] = t.In(i)
  54. }
  55. return types
  56. }
  57. // return a function's return types
  58. func funcReturnTypes(f interface{}) []reflect.Type {
  59. t := reflect.TypeOf(f)
  60. n := t.NumOut()
  61. types := make([]reflect.Type, n)
  62. for i := 0; i < n; i++ {
  63. types[i] = t.Out(i)
  64. }
  65. return types
  66. }
  67. // function introspection
  68. //-----------------------------------------------------------------------------
  69. // rpc.json
  70. // jsonrpc calls grab the given method's function info and runs reflect.Call
  71. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  72. return func(w http.ResponseWriter, r *http.Request) {
  73. if len(r.URL.Path) > 1 {
  74. WriteRPCResponse(w, NewRPCResponse(nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  75. return
  76. }
  77. b, _ := ioutil.ReadAll(r.Body)
  78. var request RPCRequest
  79. err := json.Unmarshal(b, &request)
  80. if err != nil {
  81. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  82. return
  83. }
  84. rpcFunc := funcMap[request.Method]
  85. if rpcFunc == nil {
  86. WriteRPCResponse(w, NewRPCResponse(nil, "RPC method unknown: "+request.Method))
  87. return
  88. }
  89. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  90. if err != nil {
  91. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  92. return
  93. }
  94. returns := rpcFunc.f.Call(args)
  95. log.Debug("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  96. response, err := unreflectResponse(returns)
  97. if err != nil {
  98. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  99. return
  100. }
  101. WriteRPCResponse(w, NewRPCResponse(response, ""))
  102. }
  103. }
  104. // covert a list of interfaces to properly typed values
  105. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  106. if len(rpcFunc.argNames) != len(params) {
  107. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  108. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
  109. }
  110. values := make([]reflect.Value, len(params))
  111. for i, p := range params {
  112. ty := rpcFunc.args[i]
  113. v, err := _jsonObjectToArg(ty, p)
  114. if err != nil {
  115. return nil, err
  116. }
  117. values[i] = v
  118. }
  119. return values, nil
  120. }
  121. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  122. var err error
  123. v := reflect.New(ty)
  124. binary.ReadJSONObject(v.Interface(), object, &err)
  125. if err != nil {
  126. return v, err
  127. }
  128. v = v.Elem()
  129. return v, nil
  130. }
  131. // rpc.json
  132. //-----------------------------------------------------------------------------
  133. // rpc.http
  134. // convert from a function name to the http handler
  135. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  136. return func(w http.ResponseWriter, r *http.Request) {
  137. args, err := httpParamsToArgs(rpcFunc, r)
  138. if err != nil {
  139. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  140. return
  141. }
  142. returns := rpcFunc.f.Call(args)
  143. log.Debug("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  144. response, err := unreflectResponse(returns)
  145. if err != nil {
  146. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  147. return
  148. }
  149. WriteRPCResponse(w, NewRPCResponse(response, ""))
  150. }
  151. }
  152. // Covert an http query to a list of properly typed values.
  153. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  154. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  155. argTypes := rpcFunc.args
  156. argNames := rpcFunc.argNames
  157. var err error
  158. values := make([]reflect.Value, len(argNames))
  159. for i, name := range argNames {
  160. ty := argTypes[i]
  161. arg := GetParam(r, name)
  162. values[i], err = _jsonStringToArg(ty, arg)
  163. if err != nil {
  164. return nil, err
  165. }
  166. }
  167. return values, nil
  168. }
  169. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  170. var err error
  171. v := reflect.New(ty)
  172. binary.ReadJSON(v.Interface(), []byte(arg), &err)
  173. if err != nil {
  174. return v, err
  175. }
  176. v = v.Elem()
  177. return v, nil
  178. }
  179. // rpc.http
  180. //-----------------------------------------------------------------------------
  181. // rpc.websocket
  182. const (
  183. WSConnectionReaperSeconds = 5
  184. MaxFailedSends = 10
  185. WriteChanBufferSize = 10
  186. )
  187. // for requests coming in
  188. type WSRequest struct {
  189. Type string // subscribe or unsubscribe
  190. Event string
  191. }
  192. // for responses going out
  193. type WSResponse struct {
  194. Event string
  195. Data interface{}
  196. Error string
  197. }
  198. // a single websocket connection
  199. // contains listener id, underlying ws connection,
  200. // and the event switch for subscribing to events
  201. type WSConnection struct {
  202. id string
  203. wsConn *websocket.Conn
  204. writeChan chan WSResponse
  205. quitChan chan struct{}
  206. failedSends uint
  207. started uint32
  208. stopped uint32
  209. evsw *events.EventSwitch
  210. }
  211. // new websocket connection wrapper
  212. func NewWSConnection(wsConn *websocket.Conn) *WSConnection {
  213. return &WSConnection{
  214. id: wsConn.RemoteAddr().String(),
  215. wsConn: wsConn,
  216. writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
  217. quitChan: make(chan struct{}),
  218. }
  219. }
  220. // start the connection and hand her the event switch
  221. func (con *WSConnection) Start(evsw *events.EventSwitch) {
  222. if atomic.CompareAndSwapUint32(&con.started, 0, 1) {
  223. con.evsw = evsw
  224. // read subscriptions/unsubscriptions to events
  225. go con.read()
  226. // write responses
  227. con.write()
  228. }
  229. }
  230. // close the connection
  231. func (con *WSConnection) Stop() {
  232. if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) {
  233. close(con.quitChan)
  234. // the write loop closes the websocket connection
  235. // when it exits its loop, and the read loop
  236. // closes the writeChan
  237. }
  238. }
  239. // attempt to write response to writeChan and record failures
  240. func (con *WSConnection) safeWrite(resp WSResponse) {
  241. select {
  242. case con.writeChan <- resp:
  243. // yay
  244. con.failedSends = 0
  245. default:
  246. // channel is full
  247. // if this happens too many times in a row,
  248. // close connection
  249. con.failedSends += 1
  250. }
  251. }
  252. // read from the socket and subscribe to or unsubscribe from events
  253. func (con *WSConnection) read() {
  254. defer close(con.writeChan)
  255. reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
  256. for {
  257. select {
  258. case <-reaper:
  259. if con.failedSends > MaxFailedSends {
  260. // sending has failed too many times.
  261. // kill the connection
  262. con.Stop()
  263. return
  264. }
  265. default:
  266. var in []byte
  267. _, in, err := con.wsConn.ReadMessage()
  268. if err != nil {
  269. // an error reading the connection,
  270. // kill the connection
  271. con.Stop()
  272. return
  273. }
  274. var req WSRequest
  275. err = json.Unmarshal(in, &req)
  276. if err != nil {
  277. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  278. con.safeWrite(WSResponse{Error: errStr})
  279. continue
  280. }
  281. switch req.Type {
  282. case "subscribe":
  283. log.Info("New event subscription", "con id", con.id, "event", req.Event)
  284. con.evsw.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
  285. resp := WSResponse{
  286. Event: req.Event,
  287. Data: msg,
  288. }
  289. con.safeWrite(resp)
  290. })
  291. case "unsubscribe":
  292. if req.Event != "" {
  293. con.evsw.RemoveListenerForEvent(req.Event, con.id)
  294. } else {
  295. con.evsw.RemoveListener(con.id)
  296. }
  297. default:
  298. con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type})
  299. }
  300. }
  301. }
  302. }
  303. // receives on a write channel and writes out on the socket
  304. func (con *WSConnection) write() {
  305. defer con.wsConn.Close()
  306. n, err := new(int64), new(error)
  307. for {
  308. select {
  309. case msg := <-con.writeChan:
  310. buf := new(bytes.Buffer)
  311. binary.WriteJSON(msg, buf, n, err)
  312. if *err != nil {
  313. log.Error("Failed to marshal WSResponse to JSON", "error", err)
  314. } else {
  315. if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
  316. log.Error("Failed to write response on websocket", "error", err)
  317. con.Stop()
  318. return
  319. }
  320. }
  321. case <-con.quitChan:
  322. return
  323. }
  324. }
  325. }
  326. // main manager for all websocket connections
  327. // holds the event switch
  328. type WebsocketManager struct {
  329. websocket.Upgrader
  330. evsw *events.EventSwitch
  331. }
  332. func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager {
  333. return &WebsocketManager{
  334. evsw: evsw,
  335. Upgrader: websocket.Upgrader{
  336. ReadBufferSize: 1024,
  337. WriteBufferSize: 1024,
  338. CheckOrigin: func(r *http.Request) bool {
  339. // TODO
  340. return true
  341. },
  342. },
  343. }
  344. }
  345. func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) {
  346. wsConn, err := wm.Upgrade(w, r, nil)
  347. if err != nil {
  348. // TODO - return http error
  349. log.Error("Failed to upgrade to websocket connection", "error", err)
  350. return
  351. }
  352. // register connection
  353. con := NewWSConnection(wsConn)
  354. log.Info("New websocket connection", "origin", con.id)
  355. con.Start(wm.evsw)
  356. }
  357. // rpc.websocket
  358. //-----------------------------------------------------------------------------
  359. // returns is Response struct and error. If error is not nil, return it
  360. func unreflectResponse(returns []reflect.Value) (interface{}, error) {
  361. errV := returns[1]
  362. if errV.Interface() != nil {
  363. return nil, fmt.Errorf("%v", errV.Interface())
  364. }
  365. return returns[0].Interface(), nil
  366. }