You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

367 lines
10 KiB

8 years ago
8 years ago
  1. package client
  2. import (
  3. "fmt"
  4. "github.com/pkg/errors"
  5. data "github.com/tendermint/go-wire/data"
  6. events "github.com/tendermint/tmlibs/events"
  7. "github.com/tendermint/tendermint/rpc/lib/client"
  8. wire "github.com/tendermint/go-wire"
  9. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. /*
  13. HTTP is a Client implementation that communicates
  14. with a tendermint node over json rpc and websockets.
  15. This is the main implementation you probably want to use in
  16. production code. There are other implementations when calling
  17. the tendermint node in-process (local), or when you want to mock
  18. out the server for test code (mock).
  19. */
  20. type HTTP struct {
  21. remote string
  22. rpc *rpcclient.JSONRPCClient
  23. *WSEvents
  24. }
  25. // New takes a remote endpoint in the form tcp://<host>:<port>
  26. // and the websocket path (which always seems to be "/websocket")
  27. func NewHTTP(remote, wsEndpoint string) *HTTP {
  28. return &HTTP{
  29. rpc: rpcclient.NewJSONRPCClient(remote),
  30. remote: remote,
  31. WSEvents: newWSEvents(remote, wsEndpoint),
  32. }
  33. }
  34. func (c *HTTP) _assertIsClient() Client {
  35. return c
  36. }
  37. func (c *HTTP) _assertIsNetworkClient() NetworkClient {
  38. return c
  39. }
  40. func (c *HTTP) _assertIsEventSwitch() types.EventSwitch {
  41. return c
  42. }
  43. func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
  44. tmResult := new(ctypes.TMResult)
  45. _, err := c.rpc.Call("status", map[string]interface{}{}, tmResult)
  46. if err != nil {
  47. return nil, errors.Wrap(err, "Status")
  48. }
  49. // note: panics if rpc doesn't match. okay???
  50. return (*tmResult).(*ctypes.ResultStatus), nil
  51. }
  52. func (c *HTTP) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
  53. tmResult := new(ctypes.TMResult)
  54. _, err := c.rpc.Call("abci_info", map[string]interface{}{}, tmResult)
  55. if err != nil {
  56. return nil, errors.Wrap(err, "ABCIInfo")
  57. }
  58. return (*tmResult).(*ctypes.ResultABCIInfo), nil
  59. }
  60. func (c *HTTP) ABCIQuery(path string, data data.Bytes, prove bool) (*ctypes.ResultABCIQuery, error) {
  61. tmResult := new(ctypes.TMResult)
  62. _, err := c.rpc.Call("abci_query",
  63. map[string]interface{}{"path": path, "data": data, "prove": prove},
  64. tmResult)
  65. if err != nil {
  66. return nil, errors.Wrap(err, "ABCIQuery")
  67. }
  68. return (*tmResult).(*ctypes.ResultABCIQuery), nil
  69. }
  70. func (c *HTTP) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
  71. tmResult := new(ctypes.TMResult)
  72. _, err := c.rpc.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult)
  73. if err != nil {
  74. return nil, errors.Wrap(err, "broadcast_tx_commit")
  75. }
  76. return (*tmResult).(*ctypes.ResultBroadcastTxCommit), nil
  77. }
  78. func (c *HTTP) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  79. return c.broadcastTX("broadcast_tx_async", tx)
  80. }
  81. func (c *HTTP) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  82. return c.broadcastTX("broadcast_tx_sync", tx)
  83. }
  84. func (c *HTTP) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  85. tmResult := new(ctypes.TMResult)
  86. _, err := c.rpc.Call(route, map[string]interface{}{"tx": tx}, tmResult)
  87. if err != nil {
  88. return nil, errors.Wrap(err, route)
  89. }
  90. return (*tmResult).(*ctypes.ResultBroadcastTx), nil
  91. }
  92. func (c *HTTP) NetInfo() (*ctypes.ResultNetInfo, error) {
  93. tmResult := new(ctypes.TMResult)
  94. _, err := c.rpc.Call("net_info", map[string]interface{}{}, tmResult)
  95. if err != nil {
  96. return nil, errors.Wrap(err, "NetInfo")
  97. }
  98. return (*tmResult).(*ctypes.ResultNetInfo), nil
  99. }
  100. func (c *HTTP) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
  101. tmResult := new(ctypes.TMResult)
  102. _, err := c.rpc.Call("dump_consensus_state", map[string]interface{}{}, tmResult)
  103. if err != nil {
  104. return nil, errors.Wrap(err, "DumpConsensusState")
  105. }
  106. return (*tmResult).(*ctypes.ResultDumpConsensusState), nil
  107. }
  108. func (c *HTTP) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) {
  109. tmResult := new(ctypes.TMResult)
  110. _, err := c.rpc.Call("blockchain",
  111. map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight},
  112. tmResult)
  113. if err != nil {
  114. return nil, errors.Wrap(err, "BlockchainInfo")
  115. }
  116. return (*tmResult).(*ctypes.ResultBlockchainInfo), nil
  117. }
  118. func (c *HTTP) Genesis() (*ctypes.ResultGenesis, error) {
  119. tmResult := new(ctypes.TMResult)
  120. _, err := c.rpc.Call("genesis", map[string]interface{}{}, tmResult)
  121. if err != nil {
  122. return nil, errors.Wrap(err, "Genesis")
  123. }
  124. return (*tmResult).(*ctypes.ResultGenesis), nil
  125. }
  126. func (c *HTTP) Block(height int) (*ctypes.ResultBlock, error) {
  127. tmResult := new(ctypes.TMResult)
  128. _, err := c.rpc.Call("block", map[string]interface{}{"height": height}, tmResult)
  129. if err != nil {
  130. return nil, errors.Wrap(err, "Block")
  131. }
  132. return (*tmResult).(*ctypes.ResultBlock), nil
  133. }
  134. func (c *HTTP) Commit(height int) (*ctypes.ResultCommit, error) {
  135. tmResult := new(ctypes.TMResult)
  136. _, err := c.rpc.Call("commit", map[string]interface{}{"height": height}, tmResult)
  137. if err != nil {
  138. return nil, errors.Wrap(err, "Commit")
  139. }
  140. return (*tmResult).(*ctypes.ResultCommit), nil
  141. }
  142. func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
  143. tmResult := new(ctypes.TMResult)
  144. query := map[string]interface{}{
  145. "hash": hash,
  146. "prove": prove,
  147. }
  148. _, err := c.rpc.Call("tx", query, tmResult)
  149. if err != nil {
  150. return nil, errors.Wrap(err, "Tx")
  151. }
  152. return (*tmResult).(*ctypes.ResultTx), nil
  153. }
  154. func (c *HTTP) Validators() (*ctypes.ResultValidators, error) {
  155. tmResult := new(ctypes.TMResult)
  156. _, err := c.rpc.Call("validators", map[string]interface{}{}, tmResult)
  157. if err != nil {
  158. return nil, errors.Wrap(err, "Validators")
  159. }
  160. return (*tmResult).(*ctypes.ResultValidators), nil
  161. }
  162. /** websocket event stuff here... **/
  163. type WSEvents struct {
  164. types.EventSwitch
  165. remote string
  166. endpoint string
  167. ws *rpcclient.WSClient
  168. // used for signaling the goroutine that feeds ws -> EventSwitch
  169. quit chan bool
  170. done chan bool
  171. // used to maintain counts of actively listened events
  172. // so we can properly subscribe/unsubscribe
  173. // FIXME: thread-safety???
  174. // FIXME: reuse code from tmlibs/events???
  175. evtCount map[string]int // count how many time each event is subscribed
  176. listeners map[string][]string // keep track of which events each listener is listening to
  177. }
  178. func newWSEvents(remote, endpoint string) *WSEvents {
  179. return &WSEvents{
  180. EventSwitch: types.NewEventSwitch(),
  181. endpoint: endpoint,
  182. remote: remote,
  183. quit: make(chan bool, 1),
  184. done: make(chan bool, 1),
  185. evtCount: map[string]int{},
  186. listeners: map[string][]string{},
  187. }
  188. }
  189. func (w *WSEvents) _assertIsEventSwitch() types.EventSwitch {
  190. return w
  191. }
  192. // Start is the only way I could think the extend OnStart from
  193. // events.eventSwitch. If only it wasn't private...
  194. // BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
  195. func (w *WSEvents) Start() (bool, error) {
  196. st, err := w.EventSwitch.Start()
  197. // if we did start, then OnStart here...
  198. if st && err == nil {
  199. ws := rpcclient.NewWSClient(w.remote, w.endpoint)
  200. _, err = ws.Start()
  201. if err == nil {
  202. w.ws = ws
  203. go w.eventListener()
  204. }
  205. }
  206. return st, errors.Wrap(err, "StartWSEvent")
  207. }
  208. // Stop wraps the BaseService/eventSwitch actions as Start does
  209. func (w *WSEvents) Stop() bool {
  210. stop := w.EventSwitch.Stop()
  211. if stop {
  212. // send a message to quit to stop the eventListener
  213. w.quit <- true
  214. <-w.done
  215. w.ws.Stop()
  216. w.ws = nil
  217. }
  218. return stop
  219. }
  220. /** TODO: more intelligent subscriptions! **/
  221. func (w *WSEvents) AddListenerForEvent(listenerID, event string, cb events.EventCallback) {
  222. // no one listening -> subscribe
  223. if w.evtCount[event] == 0 {
  224. w.subscribe(event)
  225. }
  226. // if this listener was already listening to this event, return early
  227. for _, s := range w.listeners[listenerID] {
  228. if event == s {
  229. return
  230. }
  231. }
  232. // otherwise, add this event to this listener
  233. w.evtCount[event] += 1
  234. w.listeners[listenerID] = append(w.listeners[listenerID], event)
  235. w.EventSwitch.AddListenerForEvent(listenerID, event, cb)
  236. }
  237. func (w *WSEvents) RemoveListenerForEvent(event string, listenerID string) {
  238. // if this listener is listening already, splice it out
  239. found := false
  240. l := w.listeners[listenerID]
  241. for i, s := range l {
  242. if event == s {
  243. found = true
  244. w.listeners[listenerID] = append(l[:i], l[i+1:]...)
  245. break
  246. }
  247. }
  248. // if the listener wasn't already listening to the event, exit early
  249. if !found {
  250. return
  251. }
  252. // now we can update the subscriptions
  253. w.evtCount[event] -= 1
  254. if w.evtCount[event] == 0 {
  255. w.unsubscribe(event)
  256. }
  257. w.EventSwitch.RemoveListenerForEvent(event, listenerID)
  258. }
  259. func (w *WSEvents) RemoveListener(listenerID string) {
  260. // remove all counts for this listener
  261. for _, s := range w.listeners[listenerID] {
  262. w.evtCount[s] -= 1
  263. if w.evtCount[s] == 0 {
  264. w.unsubscribe(s)
  265. }
  266. }
  267. w.listeners[listenerID] = nil
  268. // then let the switch do it's magic
  269. w.EventSwitch.RemoveListener(listenerID)
  270. }
  271. // eventListener is an infinite loop pulling all websocket events
  272. // and pushing them to the EventSwitch.
  273. //
  274. // the goroutine only stops by closing quit
  275. func (w *WSEvents) eventListener() {
  276. for {
  277. select {
  278. case res := <-w.ws.ResultsCh:
  279. // res is json.RawMessage
  280. err := w.parseEvent(res)
  281. if err != nil {
  282. // FIXME: better logging/handling of errors??
  283. fmt.Printf("ws result: %+v\n", err)
  284. }
  285. case err := <-w.ws.ErrorsCh:
  286. // FIXME: better logging/handling of errors??
  287. fmt.Printf("ws err: %+v\n", err)
  288. case <-w.quit:
  289. // send a message so we can wait for the routine to exit
  290. // before cleaning up the w.ws stuff
  291. w.done <- true
  292. return
  293. }
  294. }
  295. }
  296. // parseEvent unmarshals the json message and converts it into
  297. // some implementation of types.TMEventData, and sends it off
  298. // on the merry way to the EventSwitch
  299. func (w *WSEvents) parseEvent(data []byte) (err error) {
  300. result := new(ctypes.TMResult)
  301. wire.ReadJSONPtr(result, data, &err)
  302. if err != nil {
  303. return err
  304. }
  305. event, ok := (*result).(*ctypes.ResultEvent)
  306. if !ok {
  307. // ignore silently (eg. subscribe, unsubscribe and maybe other events)
  308. return nil
  309. }
  310. // looks good! let's fire this baby!
  311. w.EventSwitch.FireEvent(event.Name, event.Data)
  312. return nil
  313. }
  314. // no way of exposing these failures, so we panic.
  315. // is this right? or silently ignore???
  316. func (w *WSEvents) subscribe(event string) {
  317. err := w.ws.Subscribe(event)
  318. if err != nil {
  319. panic(err)
  320. }
  321. }
  322. func (w *WSEvents) unsubscribe(event string) {
  323. err := w.ws.Unsubscribe(event)
  324. if err != nil {
  325. panic(err)
  326. }
  327. }