You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

364 lines
9.8 KiB

8 years ago
  1. package client
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/pkg/errors"
  7. data "github.com/tendermint/go-wire/data"
  8. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  9. rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
  10. "github.com/tendermint/tendermint/types"
  11. events "github.com/tendermint/tmlibs/events"
  12. )
  13. /*
  14. HTTP is a Client implementation that communicates
  15. with a tendermint node over json rpc and websockets.
  16. This is the main implementation you probably want to use in
  17. production code. There are other implementations when calling
  18. the tendermint node in-process (local), or when you want to mock
  19. out the server for test code (mock).
  20. */
  21. type HTTP struct {
  22. remote string
  23. rpc *rpcclient.JSONRPCClient
  24. *WSEvents
  25. }
  26. // New takes a remote endpoint in the form tcp://<host>:<port>
  27. // and the websocket path (which always seems to be "/websocket")
  28. func NewHTTP(remote, wsEndpoint string) *HTTP {
  29. return &HTTP{
  30. rpc: rpcclient.NewJSONRPCClient(remote),
  31. remote: remote,
  32. WSEvents: newWSEvents(remote, wsEndpoint),
  33. }
  34. }
  35. func (c *HTTP) _assertIsClient() Client {
  36. return c
  37. }
  38. func (c *HTTP) _assertIsNetworkClient() NetworkClient {
  39. return c
  40. }
  41. func (c *HTTP) _assertIsEventSwitch() types.EventSwitch {
  42. return c
  43. }
  44. func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
  45. result := new(ctypes.ResultStatus)
  46. _, err := c.rpc.Call("status", map[string]interface{}{}, result)
  47. if err != nil {
  48. return nil, errors.Wrap(err, "Status")
  49. }
  50. return result, nil
  51. }
  52. func (c *HTTP) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
  53. result := new(ctypes.ResultABCIInfo)
  54. _, err := c.rpc.Call("abci_info", map[string]interface{}{}, result)
  55. if err != nil {
  56. return nil, errors.Wrap(err, "ABCIInfo")
  57. }
  58. return result, nil
  59. }
  60. func (c *HTTP) ABCIQuery(path string, data data.Bytes, prove bool) (*ctypes.ResultABCIQuery, error) {
  61. result := new(ctypes.ResultABCIQuery)
  62. _, err := c.rpc.Call("abci_query",
  63. map[string]interface{}{"path": path, "data": data, "prove": prove},
  64. result)
  65. if err != nil {
  66. return nil, errors.Wrap(err, "ABCIQuery")
  67. }
  68. return result, nil
  69. }
  70. func (c *HTTP) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
  71. result := new(ctypes.ResultBroadcastTxCommit)
  72. _, err := c.rpc.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result)
  73. if err != nil {
  74. return nil, errors.Wrap(err, "broadcast_tx_commit")
  75. }
  76. return result, nil
  77. }
  78. func (c *HTTP) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  79. return c.broadcastTX("broadcast_tx_async", tx)
  80. }
  81. func (c *HTTP) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  82. return c.broadcastTX("broadcast_tx_sync", tx)
  83. }
  84. func (c *HTTP) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  85. result := new(ctypes.ResultBroadcastTx)
  86. _, err := c.rpc.Call(route, map[string]interface{}{"tx": tx}, result)
  87. if err != nil {
  88. return nil, errors.Wrap(err, route)
  89. }
  90. return result, nil
  91. }
  92. func (c *HTTP) NetInfo() (*ctypes.ResultNetInfo, error) {
  93. result := new(ctypes.ResultNetInfo)
  94. _, err := c.rpc.Call("net_info", map[string]interface{}{}, result)
  95. if err != nil {
  96. return nil, errors.Wrap(err, "NetInfo")
  97. }
  98. return result, nil
  99. }
  100. func (c *HTTP) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
  101. result := new(ctypes.ResultDumpConsensusState)
  102. _, err := c.rpc.Call("dump_consensus_state", map[string]interface{}{}, result)
  103. if err != nil {
  104. return nil, errors.Wrap(err, "DumpConsensusState")
  105. }
  106. return result, nil
  107. }
  108. func (c *HTTP) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) {
  109. result := new(ctypes.ResultBlockchainInfo)
  110. _, err := c.rpc.Call("blockchain",
  111. map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight},
  112. result)
  113. if err != nil {
  114. return nil, errors.Wrap(err, "BlockchainInfo")
  115. }
  116. return result, nil
  117. }
  118. func (c *HTTP) Genesis() (*ctypes.ResultGenesis, error) {
  119. result := new(ctypes.ResultGenesis)
  120. _, err := c.rpc.Call("genesis", map[string]interface{}{}, result)
  121. if err != nil {
  122. return nil, errors.Wrap(err, "Genesis")
  123. }
  124. return result, nil
  125. }
  126. func (c *HTTP) Block(height int) (*ctypes.ResultBlock, error) {
  127. result := new(ctypes.ResultBlock)
  128. _, err := c.rpc.Call("block", map[string]interface{}{"height": height}, result)
  129. if err != nil {
  130. return nil, errors.Wrap(err, "Block")
  131. }
  132. return result, nil
  133. }
  134. func (c *HTTP) Commit(height int) (*ctypes.ResultCommit, error) {
  135. result := new(ctypes.ResultCommit)
  136. _, err := c.rpc.Call("commit", map[string]interface{}{"height": height}, result)
  137. if err != nil {
  138. return nil, errors.Wrap(err, "Commit")
  139. }
  140. return result, nil
  141. }
  142. func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
  143. result := new(ctypes.ResultTx)
  144. query := map[string]interface{}{
  145. "hash": hash,
  146. "prove": prove,
  147. }
  148. _, err := c.rpc.Call("tx", query, result)
  149. if err != nil {
  150. return nil, errors.Wrap(err, "Tx")
  151. }
  152. return result, nil
  153. }
  154. func (c *HTTP) Validators() (*ctypes.ResultValidators, error) {
  155. result := new(ctypes.ResultValidators)
  156. _, err := c.rpc.Call("validators", map[string]interface{}{}, result)
  157. if err != nil {
  158. return nil, errors.Wrap(err, "Validators")
  159. }
  160. return result, nil
  161. }
  162. /** websocket event stuff here... **/
  163. type WSEvents struct {
  164. types.EventSwitch
  165. remote string
  166. endpoint string
  167. ws *rpcclient.WSClient
  168. // used for signaling the goroutine that feeds ws -> EventSwitch
  169. quit chan bool
  170. done chan bool
  171. // used to maintain counts of actively listened events
  172. // so we can properly subscribe/unsubscribe
  173. // FIXME: thread-safety???
  174. // FIXME: reuse code from tmlibs/events???
  175. evtCount map[string]int // count how many time each event is subscribed
  176. listeners map[string][]string // keep track of which events each listener is listening to
  177. }
  178. func newWSEvents(remote, endpoint string) *WSEvents {
  179. return &WSEvents{
  180. EventSwitch: types.NewEventSwitch(),
  181. endpoint: endpoint,
  182. remote: remote,
  183. quit: make(chan bool, 1),
  184. done: make(chan bool, 1),
  185. evtCount: map[string]int{},
  186. listeners: map[string][]string{},
  187. }
  188. }
  189. func (w *WSEvents) _assertIsEventSwitch() types.EventSwitch {
  190. return w
  191. }
  192. // Start is the only way I could think the extend OnStart from
  193. // events.eventSwitch. If only it wasn't private...
  194. // BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
  195. func (w *WSEvents) Start() (bool, error) {
  196. st, err := w.EventSwitch.Start()
  197. // if we did start, then OnStart here...
  198. if st && err == nil {
  199. ws := rpcclient.NewWSClient(w.remote, w.endpoint)
  200. _, err = ws.Start()
  201. if err == nil {
  202. w.ws = ws
  203. go w.eventListener()
  204. }
  205. }
  206. return st, errors.Wrap(err, "StartWSEvent")
  207. }
  208. // Stop wraps the BaseService/eventSwitch actions as Start does
  209. func (w *WSEvents) Stop() bool {
  210. stop := w.EventSwitch.Stop()
  211. if stop {
  212. // send a message to quit to stop the eventListener
  213. w.quit <- true
  214. <-w.done
  215. w.ws.Stop()
  216. w.ws = nil
  217. }
  218. return stop
  219. }
  220. /** TODO: more intelligent subscriptions! **/
  221. func (w *WSEvents) AddListenerForEvent(listenerID, event string, cb events.EventCallback) {
  222. // no one listening -> subscribe
  223. if w.evtCount[event] == 0 {
  224. w.subscribe(event)
  225. }
  226. // if this listener was already listening to this event, return early
  227. for _, s := range w.listeners[listenerID] {
  228. if event == s {
  229. return
  230. }
  231. }
  232. // otherwise, add this event to this listener
  233. w.evtCount[event] += 1
  234. w.listeners[listenerID] = append(w.listeners[listenerID], event)
  235. w.EventSwitch.AddListenerForEvent(listenerID, event, cb)
  236. }
  237. func (w *WSEvents) RemoveListenerForEvent(event string, listenerID string) {
  238. // if this listener is listening already, splice it out
  239. found := false
  240. l := w.listeners[listenerID]
  241. for i, s := range l {
  242. if event == s {
  243. found = true
  244. w.listeners[listenerID] = append(l[:i], l[i+1:]...)
  245. break
  246. }
  247. }
  248. // if the listener wasn't already listening to the event, exit early
  249. if !found {
  250. return
  251. }
  252. // now we can update the subscriptions
  253. w.evtCount[event] -= 1
  254. if w.evtCount[event] == 0 {
  255. w.unsubscribe(event)
  256. }
  257. w.EventSwitch.RemoveListenerForEvent(event, listenerID)
  258. }
  259. func (w *WSEvents) RemoveListener(listenerID string) {
  260. // remove all counts for this listener
  261. for _, s := range w.listeners[listenerID] {
  262. w.evtCount[s] -= 1
  263. if w.evtCount[s] == 0 {
  264. w.unsubscribe(s)
  265. }
  266. }
  267. w.listeners[listenerID] = nil
  268. // then let the switch do it's magic
  269. w.EventSwitch.RemoveListener(listenerID)
  270. }
  271. // eventListener is an infinite loop pulling all websocket events
  272. // and pushing them to the EventSwitch.
  273. //
  274. // the goroutine only stops by closing quit
  275. func (w *WSEvents) eventListener() {
  276. for {
  277. select {
  278. case res := <-w.ws.ResultsCh:
  279. // res is json.RawMessage
  280. err := w.parseEvent(res)
  281. if err != nil {
  282. // FIXME: better logging/handling of errors??
  283. fmt.Printf("ws result: %+v\n", err)
  284. }
  285. case err := <-w.ws.ErrorsCh:
  286. // FIXME: better logging/handling of errors??
  287. fmt.Printf("ws err: %+v\n", err)
  288. case <-w.quit:
  289. // send a message so we can wait for the routine to exit
  290. // before cleaning up the w.ws stuff
  291. w.done <- true
  292. return
  293. }
  294. }
  295. }
  296. // parseEvent unmarshals the json message and converts it into
  297. // some implementation of types.TMEventData, and sends it off
  298. // on the merry way to the EventSwitch
  299. func (w *WSEvents) parseEvent(data []byte) (err error) {
  300. result := new(ctypes.ResultEvent)
  301. err = json.Unmarshal(data, result)
  302. if err != nil {
  303. // ignore silently (eg. subscribe, unsubscribe and maybe other events)
  304. // TODO: ?
  305. return nil
  306. }
  307. // looks good! let's fire this baby!
  308. w.EventSwitch.FireEvent(result.Name, result.Data)
  309. return nil
  310. }
  311. // no way of exposing these failures, so we panic.
  312. // is this right? or silently ignore???
  313. func (w *WSEvents) subscribe(event string) {
  314. err := w.ws.Subscribe(context.TODO(), event)
  315. if err != nil {
  316. panic(err)
  317. }
  318. }
  319. func (w *WSEvents) unsubscribe(event string) {
  320. err := w.ws.Unsubscribe(context.TODO(), event)
  321. if err != nil {
  322. panic(err)
  323. }
  324. }