You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

395 lines
10 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package rpcserver
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/gorilla/websocket"
  8. "github.com/tendermint/tendermint/binary"
  9. "github.com/tendermint/tendermint/events"
  10. . "github.com/tendermint/tendermint/rpc/types"
  11. "io/ioutil"
  12. "net/http"
  13. "reflect"
  14. "sync/atomic"
  15. "time"
  16. )
  17. func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
  18. // HTTP endpoints
  19. for funcName, rpcFunc := range funcMap {
  20. mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
  21. }
  22. // JSONRPC endpoints
  23. mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
  24. }
  25. func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) {
  26. // websocket endpoint
  27. wm := NewWebsocketManager(evsw)
  28. mux.HandleFunc("/events", wm.websocketHandler) // websocket.Handler(w.eventsHandler))
  29. }
  30. //-------------------------------------
  31. // function introspection
  32. // holds all type information for each function
  33. type RPCFunc struct {
  34. f reflect.Value // underlying rpc function
  35. args []reflect.Type // type of each function arg
  36. returns []reflect.Type // type of each return arg
  37. argNames []string // name of each argument
  38. }
  39. // wraps a function for quicker introspection
  40. func NewRPCFunc(f interface{}, args []string) *RPCFunc {
  41. return &RPCFunc{
  42. f: reflect.ValueOf(f),
  43. args: funcArgTypes(f),
  44. returns: funcReturnTypes(f),
  45. argNames: args,
  46. }
  47. }
  48. // return a function's argument types
  49. func funcArgTypes(f interface{}) []reflect.Type {
  50. t := reflect.TypeOf(f)
  51. n := t.NumIn()
  52. types := make([]reflect.Type, n)
  53. for i := 0; i < n; i++ {
  54. types[i] = t.In(i)
  55. }
  56. return types
  57. }
  58. // return a function's return types
  59. func funcReturnTypes(f interface{}) []reflect.Type {
  60. t := reflect.TypeOf(f)
  61. n := t.NumOut()
  62. types := make([]reflect.Type, n)
  63. for i := 0; i < n; i++ {
  64. types[i] = t.Out(i)
  65. }
  66. return types
  67. }
  68. // function introspection
  69. //-----------------------------------------------------------------------------
  70. // rpc.json
  71. // jsonrpc calls grab the given method's function info and runs reflect.Call
  72. func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
  73. return func(w http.ResponseWriter, r *http.Request) {
  74. if len(r.URL.Path) > 1 {
  75. WriteRPCResponse(w, NewRPCResponse(nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
  76. return
  77. }
  78. b, _ := ioutil.ReadAll(r.Body)
  79. var request RPCRequest
  80. err := json.Unmarshal(b, &request)
  81. if err != nil {
  82. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  83. return
  84. }
  85. rpcFunc := funcMap[request.Method]
  86. if rpcFunc == nil {
  87. WriteRPCResponse(w, NewRPCResponse(nil, "RPC method unknown: "+request.Method))
  88. return
  89. }
  90. args, err := jsonParamsToArgs(rpcFunc, request.Params)
  91. if err != nil {
  92. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  93. return
  94. }
  95. returns := rpcFunc.f.Call(args)
  96. log.Debug("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
  97. response, err := unreflectResponse(returns)
  98. if err != nil {
  99. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  100. return
  101. }
  102. WriteRPCResponse(w, NewRPCResponse(response, ""))
  103. }
  104. }
  105. // covert a list of interfaces to properly typed values
  106. func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
  107. if len(rpcFunc.argNames) != len(params) {
  108. return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
  109. len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
  110. }
  111. values := make([]reflect.Value, len(params))
  112. for i, p := range params {
  113. ty := rpcFunc.args[i]
  114. v, err := _jsonObjectToArg(ty, p)
  115. if err != nil {
  116. return nil, err
  117. }
  118. values[i] = v
  119. }
  120. return values, nil
  121. }
  122. func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
  123. var err error
  124. v := reflect.New(ty)
  125. binary.ReadJSONObject(v.Interface(), object, &err)
  126. if err != nil {
  127. return v, err
  128. }
  129. v = v.Elem()
  130. return v, nil
  131. }
  132. // rpc.json
  133. //-----------------------------------------------------------------------------
  134. // rpc.http
  135. // convert from a function name to the http handler
  136. func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
  137. return func(w http.ResponseWriter, r *http.Request) {
  138. args, err := httpParamsToArgs(rpcFunc, r)
  139. if err != nil {
  140. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  141. return
  142. }
  143. returns := rpcFunc.f.Call(args)
  144. log.Debug("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
  145. response, err := unreflectResponse(returns)
  146. if err != nil {
  147. WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
  148. return
  149. }
  150. WriteRPCResponse(w, NewRPCResponse(response, ""))
  151. }
  152. }
  153. // Covert an http query to a list of properly typed values.
  154. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
  155. func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
  156. argTypes := rpcFunc.args
  157. argNames := rpcFunc.argNames
  158. var err error
  159. values := make([]reflect.Value, len(argNames))
  160. for i, name := range argNames {
  161. ty := argTypes[i]
  162. arg := GetParam(r, name)
  163. values[i], err = _jsonStringToArg(ty, arg)
  164. if err != nil {
  165. return nil, err
  166. }
  167. }
  168. return values, nil
  169. }
  170. func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
  171. var err error
  172. v := reflect.New(ty)
  173. binary.ReadJSON(v.Interface(), []byte(arg), &err)
  174. if err != nil {
  175. return v, err
  176. }
  177. v = v.Elem()
  178. return v, nil
  179. }
  180. // rpc.http
  181. //-----------------------------------------------------------------------------
  182. // rpc.websocket
  183. const (
  184. WSConnectionReaperSeconds = 5
  185. MaxFailedSends = 10
  186. WriteChanBufferSize = 10
  187. )
  188. // a single websocket connection
  189. // contains listener id, underlying ws connection,
  190. // and the event switch for subscribing to events
  191. type WSConnection struct {
  192. id string
  193. wsConn *websocket.Conn
  194. writeChan chan WSResponse
  195. quitChan chan struct{}
  196. failedSends uint
  197. started uint32
  198. stopped uint32
  199. evsw *events.EventSwitch
  200. }
  201. // new websocket connection wrapper
  202. func NewWSConnection(wsConn *websocket.Conn) *WSConnection {
  203. return &WSConnection{
  204. id: wsConn.RemoteAddr().String(),
  205. wsConn: wsConn,
  206. writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
  207. quitChan: make(chan struct{}),
  208. }
  209. }
  210. // start the connection and hand her the event switch
  211. func (con *WSConnection) Start(evsw *events.EventSwitch) {
  212. if atomic.CompareAndSwapUint32(&con.started, 0, 1) {
  213. con.evsw = evsw
  214. // read subscriptions/unsubscriptions to events
  215. go con.read()
  216. // write responses
  217. con.write()
  218. }
  219. }
  220. // close the connection
  221. func (con *WSConnection) Stop() {
  222. if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) {
  223. con.evsw.RemoveListener(con.id)
  224. close(con.quitChan)
  225. // the write loop closes the websocket connection
  226. // when it exits its loop, and the read loop
  227. // closes the writeChan
  228. }
  229. }
  230. // attempt to write response to writeChan and record failures
  231. func (con *WSConnection) safeWrite(resp WSResponse) {
  232. select {
  233. case con.writeChan <- resp:
  234. // yay
  235. con.failedSends = 0
  236. default:
  237. // channel is full
  238. // if this happens too many times in a row,
  239. // close connection
  240. con.failedSends += 1
  241. }
  242. }
  243. // read from the socket and subscribe to or unsubscribe from events
  244. func (con *WSConnection) read() {
  245. defer close(con.writeChan)
  246. reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
  247. for {
  248. select {
  249. // TODO: this actually doesn't work
  250. // since ReadMessage blocks. Really it needs its own
  251. // go routine
  252. case <-reaper:
  253. if con.failedSends > MaxFailedSends {
  254. // sending has failed too many times.
  255. // kill the connection
  256. con.Stop()
  257. return
  258. }
  259. default:
  260. var in []byte
  261. _, in, err := con.wsConn.ReadMessage()
  262. if err != nil {
  263. // an error reading the connection,
  264. // kill the connection
  265. con.Stop()
  266. return
  267. }
  268. var req WSRequest
  269. err = json.Unmarshal(in, &req)
  270. if err != nil {
  271. errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
  272. con.safeWrite(WSResponse{Error: errStr})
  273. continue
  274. }
  275. switch req.Type {
  276. case "subscribe":
  277. log.Info("New event subscription", "con id", con.id, "event", req.Event)
  278. con.evsw.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
  279. resp := WSResponse{
  280. Event: req.Event,
  281. Data: msg,
  282. }
  283. con.safeWrite(resp)
  284. })
  285. case "unsubscribe":
  286. if req.Event != "" {
  287. con.evsw.RemoveListenerForEvent(req.Event, con.id)
  288. } else {
  289. con.evsw.RemoveListener(con.id)
  290. }
  291. default:
  292. con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type})
  293. }
  294. }
  295. }
  296. }
  297. // receives on a write channel and writes out on the socket
  298. func (con *WSConnection) write() {
  299. defer con.wsConn.Close()
  300. n, err := new(int64), new(error)
  301. for {
  302. select {
  303. case msg := <-con.writeChan:
  304. buf := new(bytes.Buffer)
  305. binary.WriteJSON(msg, buf, n, err)
  306. if *err != nil {
  307. log.Error("Failed to marshal WSResponse to JSON", "error", err)
  308. } else {
  309. if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
  310. log.Error("Failed to write response on websocket", "error", err)
  311. con.Stop()
  312. return
  313. }
  314. }
  315. case <-con.quitChan:
  316. return
  317. }
  318. }
  319. }
  320. // main manager for all websocket connections
  321. // holds the event switch
  322. type WebsocketManager struct {
  323. websocket.Upgrader
  324. evsw *events.EventSwitch
  325. }
  326. func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager {
  327. return &WebsocketManager{
  328. evsw: evsw,
  329. Upgrader: websocket.Upgrader{
  330. ReadBufferSize: 1024,
  331. WriteBufferSize: 1024,
  332. CheckOrigin: func(r *http.Request) bool {
  333. // TODO
  334. return true
  335. },
  336. },
  337. }
  338. }
  339. func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) {
  340. wsConn, err := wm.Upgrade(w, r, nil)
  341. if err != nil {
  342. // TODO - return http error
  343. log.Error("Failed to upgrade to websocket connection", "error", err)
  344. return
  345. }
  346. // register connection
  347. con := NewWSConnection(wsConn)
  348. log.Info("New websocket connection", "origin", con.id)
  349. con.Start(wm.evsw)
  350. }
  351. // rpc.websocket
  352. //-----------------------------------------------------------------------------
  353. // returns is Response struct and error. If error is not nil, return it
  354. func unreflectResponse(returns []reflect.Value) (interface{}, error) {
  355. errV := returns[1]
  356. if errV.Interface() != nil {
  357. return nil, fmt.Errorf("%v", errV.Interface())
  358. }
  359. return returns[0].Interface(), nil
  360. }