You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

400 lines
10 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package rpc
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/gorilla/websocket"
  8. "github.com/tendermint/tendermint/binary"
  9. "github.com/tendermint/tendermint/events"
  10. "io/ioutil"
  11. "net/http"
  12. "reflect"
  13. "sync/atomic"
  14. "time"
  15. )
  16. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  17. // HTTP endpoints
  18. for funcName, rpcFunc := range funcMap {
  19. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  20. }
  21. // JSONRPC endpoints
  22. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  23. }
  24. func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) {
  25. // websocket endpoint
  26. wm := NewWebsocketManager(evsw)
  27. mux.HandleFunc("/events", wm.websocketHandler) // websocket.Handler(w.eventsHandler))
  28. }
  29. //-------------------------------------
  30. // function introspection
  31. // holds all type information for each function
  32. type RPCFunc struct {
  33. f reflect.Value // underlying rpc function
  34. args []reflect.Type // type of each function arg
  35. returns []reflect.Type // type of each return arg
  36. argNames []string // name of each argument
  37. }
  38. // wraps a function for quicker introspection
  39. func NewRPCFunc(f interface{}, args []string) *RPCFunc {
  40. return &RPCFunc{
  41. f: reflect.ValueOf(f),
  42. args: funcArgTypes(f),
  43. returns: funcReturnTypes(f),
  44. argNames: args,
  45. }
  46. }
  47. // return a function's argument types
  48. func funcArgTypes(f interface{}) []reflect.Type {
  49. t := reflect.TypeOf(f)
  50. n := t.NumIn()
  51. types := make([]reflect.Type, n)
  52. for i := 0; i < n; i++ {
  53. types[i] = t.In(i)
  54. }
  55. return types
  56. }
  57. // return a function's return types
  58. func funcReturnTypes(f interface{}) []reflect.Type {
  59. t := reflect.TypeOf(f)
  60. n := t.NumOut()
  61. types := make([]reflect.Type, n)
  62. for i := 0; i < n; i++ {
  63. types[i] = t.Out(i)
  64. }
  65. return types
  66. }
  67. // function introspection
  68. //-----------------------------------------------------------------------------
  69. // rpc.json
  70. // jsonrpc calls grab the given method's function info and runs reflect.Call
  71. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  72. return func(w http.ResponseWriter, r *http.Request) {
  73. if len(r.URL.Path) > 1 {
  74. WriteRPCResponse(w, NewRPCResponse(nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  75. return
  76. }
  77. b, _ := ioutil.ReadAll(r.Body)
  78. var request RPCRequest
  79. err := json.Unmarshal(b, &request)
  80. if err != nil {
  81. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  82. return
  83. }
  84. rpcFunc := funcMap[request.Method]
  85. if rpcFunc == nil {
  86. WriteRPCResponse(w, NewRPCResponse(nil, "RPC method unknown: "+request.Method))
  87. return
  88. }
  89. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  90. if err != nil {
  91. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  92. return
  93. }
  94. returns := rpcFunc.f.Call(args)
  95. log.Debug("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  96. response, err := unreflectResponse(returns)
  97. if err != nil {
  98. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  99. return
  100. }
  101. WriteRPCResponse(w, NewRPCResponse(response, ""))
  102. }
  103. }
  104. // covert a list of interfaces to properly typed values
  105. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  106. if len(rpcFunc.argNames) != len(params) {
  107. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  108. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
  109. }
  110. values := make([]reflect.Value, len(params))
  111. for i, p := range params {
  112. ty := rpcFunc.args[i]
  113. v, err := _jsonObjectToArg(ty, p)
  114. if err != nil {
  115. return nil, err
  116. }
  117. values[i] = v
  118. }
  119. return values, nil
  120. }
  121. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  122. var err error
  123. v := reflect.New(ty)
  124. binary.ReadJSONObject(v.Interface(), object, &err)
  125. if err != nil {
  126. return v, err
  127. }
  128. v = v.Elem()
  129. return v, nil
  130. }
  131. // rpc.json
  132. //-----------------------------------------------------------------------------
  133. // rpc.http
  134. // convert from a function name to the http handler
  135. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  136. return func(w http.ResponseWriter, r *http.Request) {
  137. args, err := httpParamsToArgs(rpcFunc, r)
  138. if err != nil {
  139. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  140. return
  141. }
  142. returns := rpcFunc.f.Call(args)
  143. log.Debug("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  144. response, err := unreflectResponse(returns)
  145. if err != nil {
  146. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  147. return
  148. }
  149. WriteRPCResponse(w, NewRPCResponse(response, ""))
  150. }
  151. }
  152. // Covert an http query to a list of properly typed values.
  153. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  154. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  155. argTypes := rpcFunc.args
  156. argNames := rpcFunc.argNames
  157. var err error
  158. values := make([]reflect.Value, len(argNames))
  159. for i, name := range argNames {
  160. ty := argTypes[i]
  161. arg := GetParam(r, name)
  162. values[i], err = _jsonStringToArg(ty, arg)
  163. if err != nil {
  164. return nil, err
  165. }
  166. }
  167. return values, nil
  168. }
  169. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  170. var err error
  171. v := reflect.New(ty)
  172. binary.ReadJSON(v.Interface(), []byte(arg), &err)
  173. if err != nil {
  174. return v, err
  175. }
  176. v = v.Elem()
  177. return v, nil
  178. }
  179. // rpc.http
  180. //-----------------------------------------------------------------------------
  181. // rpc.websocket
  182. const (
  183. WSConnectionReaperSeconds = 5
  184. MaxFailedSends = 10
  185. WriteChanBufferSize = 10
  186. )
  187. // for requests coming in
  188. type WSRequest struct {
  189. Type string // subscribe or unsubscribe
  190. Event string
  191. }
  192. // for responses going out
  193. type WSResponse struct {
  194. Event string
  195. Data interface{}
  196. Error string
  197. }
  198. // a single websocket connection
  199. // contains listener id, underlying ws connection,
  200. // and the event switch for subscribing to events
  201. type WSConnection struct {
  202. id string
  203. wsConn *websocket.Conn
  204. writeChan chan WSResponse
  205. failedSends uint
  206. started uint32
  207. stopped uint32
  208. evsw *events.EventSwitch
  209. }
  210. // new websocket connection wrapper
  211. func NewWSConnection(wsConn *websocket.Conn) *WSConnection {
  212. return &WSConnection{
  213. id: wsConn.RemoteAddr().String(),
  214. wsConn: wsConn,
  215. writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
  216. }
  217. }
  218. // start the connection and hand her the event switch
  219. func (con *WSConnection) Start(evsw *events.EventSwitch) {
  220. if atomic.CompareAndSwapUint32(&con.started, 0, 1) {
  221. con.evsw = evsw
  222. // read subscriptions/unsubscriptions to events
  223. go con.read()
  224. // write responses
  225. con.write()
  226. }
  227. }
  228. // close the connection
  229. func (con *WSConnection) Stop() {
  230. if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) {
  231. con.wsConn.Close()
  232. close(con.writeChan)
  233. }
  234. }
  235. // attempt to write response to writeChan and record failures
  236. func (con *WSConnection) safeWrite(resp WSResponse) {
  237. select {
  238. case con.writeChan <- resp:
  239. // yay
  240. con.failedSends = 0
  241. default:
  242. // channel is full
  243. // if this happens too many times in a row,
  244. // close connection
  245. con.failedSends += 1
  246. }
  247. }
  248. // read from the socket and subscribe to or unsubscribe from events
  249. func (con *WSConnection) read() {
  250. reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
  251. for {
  252. select {
  253. case <-reaper:
  254. if con.failedSends > MaxFailedSends {
  255. // sending has failed too many times.
  256. // kill the connection
  257. con.Stop()
  258. return
  259. }
  260. default:
  261. var in []byte
  262. _, in, err := con.wsConn.ReadMessage()
  263. if err != nil {
  264. // an error reading the connection,
  265. // kill the connection
  266. con.Stop()
  267. return
  268. }
  269. var req WSRequest
  270. err = json.Unmarshal(in, &req)
  271. if err != nil {
  272. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  273. con.safeWrite(WSResponse{Error: errStr})
  274. continue
  275. }
  276. switch req.Type {
  277. case "subscribe":
  278. log.Info("New event subscription", "con id", con.id, "event", req.Event)
  279. con.evsw.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
  280. resp := WSResponse{
  281. Event: req.Event,
  282. Data: msg,
  283. }
  284. con.safeWrite(resp)
  285. })
  286. case "unsubscribe":
  287. if req.Event != "" {
  288. con.evsw.RemoveListenerForEvent(req.Event, con.id)
  289. } else {
  290. con.evsw.RemoveListener(con.id)
  291. }
  292. default:
  293. con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type})
  294. }
  295. }
  296. }
  297. }
  298. // receives on a write channel and writes out on the socket
  299. func (con *WSConnection) write() {
  300. n, err := new(int64), new(error)
  301. for {
  302. msg, more := <-con.writeChan
  303. if !more {
  304. // the channel was closed, so ensure
  305. // connection is stopped and return
  306. con.Stop()
  307. return
  308. }
  309. buf := new(bytes.Buffer)
  310. binary.WriteJSON(msg, buf, n, err)
  311. if *err != nil {
  312. log.Error("Failed to write JSON WSResponse", "error", err)
  313. } else {
  314. if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
  315. log.Error("Failed to write response on websocket", "error", err)
  316. con.Stop()
  317. return
  318. }
  319. }
  320. }
  321. }
  322. // main manager for all websocket connections
  323. // holds the event switch
  324. type WebsocketManager struct {
  325. websocket.Upgrader
  326. evsw *events.EventSwitch
  327. }
  328. func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager {
  329. return &WebsocketManager{
  330. evsw: evsw,
  331. Upgrader: websocket.Upgrader{
  332. ReadBufferSize: 1024,
  333. WriteBufferSize: 1024,
  334. CheckOrigin: func(r *http.Request) bool {
  335. // TODO
  336. return true
  337. },
  338. },
  339. }
  340. }
  341. func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) {
  342. wsConn, err := wm.Upgrade(w, r, nil)
  343. if err != nil {
  344. // TODO - return http error
  345. log.Error("Failed to upgrade to websocket connection", "error", err)
  346. return
  347. }
  348. // register connection
  349. con := NewWSConnection(wsConn)
  350. log.Info("New websocket connection", "origin", con.id)
  351. con.Start(wm.evsw)
  352. }
  353. // rpc.websocket
  354. //-----------------------------------------------------------------------------
  355. // returns is Response struct and error. If error is not nil, return it
  356. func unreflectResponse(returns []reflect.Value) (interface{}, error) {
  357. errV := returns[1]
  358. if errV.Interface() != nil {
  359. return nil, fmt.Errorf("%v", errV.Interface())
  360. }
  361. return returns[0].Interface(), nil
  362. }