You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

375 lines
10 KiB

8 years ago
  1. package client
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/pkg/errors"
  7. data "github.com/tendermint/go-wire/data"
  8. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  9. rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
  10. "github.com/tendermint/tendermint/types"
  11. events "github.com/tendermint/tmlibs/events"
  12. )
  13. /*
  14. HTTP is a Client implementation that communicates
  15. with a tendermint node over json rpc and websockets.
  16. This is the main implementation you probably want to use in
  17. production code. There are other implementations when calling
  18. the tendermint node in-process (local), or when you want to mock
  19. out the server for test code (mock).
  20. */
  21. type HTTP struct {
  22. remote string
  23. rpc *rpcclient.JSONRPCClient
  24. *WSEvents
  25. }
  26. // New takes a remote endpoint in the form tcp://<host>:<port>
  27. // and the websocket path (which always seems to be "/websocket")
  28. func NewHTTP(remote, wsEndpoint string) *HTTP {
  29. return &HTTP{
  30. rpc: rpcclient.NewJSONRPCClient(remote),
  31. remote: remote,
  32. WSEvents: newWSEvents(remote, wsEndpoint),
  33. }
  34. }
  35. func (c *HTTP) _assertIsClient() Client {
  36. return c
  37. }
  38. func (c *HTTP) _assertIsNetworkClient() NetworkClient {
  39. return c
  40. }
  41. func (c *HTTP) _assertIsEventSwitch() types.EventSwitch {
  42. return c
  43. }
  44. func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
  45. result := new(ctypes.ResultStatus)
  46. _, err := c.rpc.Call("status", map[string]interface{}{}, result)
  47. if err != nil {
  48. return nil, errors.Wrap(err, "Status")
  49. }
  50. return result, nil
  51. }
  52. func (c *HTTP) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
  53. result := new(ctypes.ResultABCIInfo)
  54. _, err := c.rpc.Call("abci_info", map[string]interface{}{}, result)
  55. if err != nil {
  56. return nil, errors.Wrap(err, "ABCIInfo")
  57. }
  58. return result, nil
  59. }
  60. func (c *HTTP) ABCIQuery(path string, data data.Bytes, prove bool) (*ctypes.ResultABCIQuery, error) {
  61. result := new(ctypes.ResultABCIQuery)
  62. _, err := c.rpc.Call("abci_query",
  63. map[string]interface{}{"path": path, "data": data, "prove": prove},
  64. result)
  65. if err != nil {
  66. return nil, errors.Wrap(err, "ABCIQuery")
  67. }
  68. return result, nil
  69. }
  70. func (c *HTTP) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
  71. result := new(ctypes.ResultBroadcastTxCommit)
  72. _, err := c.rpc.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result)
  73. if err != nil {
  74. return nil, errors.Wrap(err, "broadcast_tx_commit")
  75. }
  76. return result, nil
  77. }
  78. func (c *HTTP) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  79. return c.broadcastTX("broadcast_tx_async", tx)
  80. }
  81. func (c *HTTP) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  82. return c.broadcastTX("broadcast_tx_sync", tx)
  83. }
  84. func (c *HTTP) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  85. result := new(ctypes.ResultBroadcastTx)
  86. _, err := c.rpc.Call(route, map[string]interface{}{"tx": tx}, result)
  87. if err != nil {
  88. return nil, errors.Wrap(err, route)
  89. }
  90. return result, nil
  91. }
  92. func (c *HTTP) NetInfo() (*ctypes.ResultNetInfo, error) {
  93. result := new(ctypes.ResultNetInfo)
  94. _, err := c.rpc.Call("net_info", map[string]interface{}{}, result)
  95. if err != nil {
  96. return nil, errors.Wrap(err, "NetInfo")
  97. }
  98. return result, nil
  99. }
  100. func (c *HTTP) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
  101. result := new(ctypes.ResultDumpConsensusState)
  102. _, err := c.rpc.Call("dump_consensus_state", map[string]interface{}{}, result)
  103. if err != nil {
  104. return nil, errors.Wrap(err, "DumpConsensusState")
  105. }
  106. return result, nil
  107. }
  108. func (c *HTTP) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) {
  109. result := new(ctypes.ResultBlockchainInfo)
  110. _, err := c.rpc.Call("blockchain",
  111. map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight},
  112. result)
  113. if err != nil {
  114. return nil, errors.Wrap(err, "BlockchainInfo")
  115. }
  116. return result, nil
  117. }
  118. func (c *HTTP) Genesis() (*ctypes.ResultGenesis, error) {
  119. result := new(ctypes.ResultGenesis)
  120. _, err := c.rpc.Call("genesis", map[string]interface{}{}, result)
  121. if err != nil {
  122. return nil, errors.Wrap(err, "Genesis")
  123. }
  124. return result, nil
  125. }
  126. func (c *HTTP) Block(height *int) (*ctypes.ResultBlock, error) {
  127. result := new(ctypes.ResultBlock)
  128. _, err := c.rpc.Call("block", map[string]interface{}{"height": height}, result)
  129. if err != nil {
  130. return nil, errors.Wrap(err, "Block")
  131. }
  132. return result, nil
  133. }
  134. func (c *HTTP) Commit(height *int) (*ctypes.ResultCommit, error) {
  135. result := new(ctypes.ResultCommit)
  136. _, err := c.rpc.Call("commit", map[string]interface{}{"height": height}, result)
  137. if err != nil {
  138. return nil, errors.Wrap(err, "Commit")
  139. }
  140. return result, nil
  141. }
  142. func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
  143. result := new(ctypes.ResultTx)
  144. query := map[string]interface{}{
  145. "hash": hash,
  146. "prove": prove,
  147. }
  148. _, err := c.rpc.Call("tx", query, result)
  149. if err != nil {
  150. return nil, errors.Wrap(err, "Tx")
  151. }
  152. return result, nil
  153. }
  154. func (c *HTTP) Validators(height *int) (*ctypes.ResultValidators, error) {
  155. result := new(ctypes.ResultValidators)
  156. _, err := c.rpc.Call("validators", map[string]interface{}{"height": height}, result)
  157. if err != nil {
  158. return nil, errors.Wrap(err, "Validators")
  159. }
  160. return result, nil
  161. }
  162. /** websocket event stuff here... **/
  163. type WSEvents struct {
  164. types.EventSwitch
  165. remote string
  166. endpoint string
  167. ws *rpcclient.WSClient
  168. // used for signaling the goroutine that feeds ws -> EventSwitch
  169. quit chan bool
  170. done chan bool
  171. // used to maintain counts of actively listened events
  172. // so we can properly subscribe/unsubscribe
  173. // FIXME: thread-safety???
  174. // FIXME: reuse code from tmlibs/events???
  175. evtCount map[string]int // count how many time each event is subscribed
  176. listeners map[string][]string // keep track of which events each listener is listening to
  177. }
  178. func newWSEvents(remote, endpoint string) *WSEvents {
  179. return &WSEvents{
  180. EventSwitch: types.NewEventSwitch(),
  181. endpoint: endpoint,
  182. remote: remote,
  183. quit: make(chan bool, 1),
  184. done: make(chan bool, 1),
  185. evtCount: map[string]int{},
  186. listeners: map[string][]string{},
  187. }
  188. }
  189. func (w *WSEvents) _assertIsEventSwitch() types.EventSwitch {
  190. return w
  191. }
  192. // Start is the only way I could think the extend OnStart from
  193. // events.eventSwitch. If only it wasn't private...
  194. // BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
  195. func (w *WSEvents) Start() (bool, error) {
  196. st, err := w.EventSwitch.Start()
  197. // if we did start, then OnStart here...
  198. if st && err == nil {
  199. ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
  200. w.redoSubscriptions()
  201. }))
  202. _, err = ws.Start()
  203. if err == nil {
  204. w.ws = ws
  205. go w.eventListener()
  206. }
  207. }
  208. return st, errors.Wrap(err, "StartWSEvent")
  209. }
  210. // Stop wraps the BaseService/eventSwitch actions as Start does
  211. func (w *WSEvents) Stop() bool {
  212. stop := w.EventSwitch.Stop()
  213. if stop {
  214. // send a message to quit to stop the eventListener
  215. w.quit <- true
  216. <-w.done
  217. w.ws.Stop()
  218. w.ws = nil
  219. }
  220. return stop
  221. }
  222. /** TODO: more intelligent subscriptions! **/
  223. func (w *WSEvents) AddListenerForEvent(listenerID, event string, cb events.EventCallback) {
  224. // no one listening -> subscribe
  225. if w.evtCount[event] == 0 {
  226. w.subscribe(event)
  227. }
  228. // if this listener was already listening to this event, return early
  229. for _, s := range w.listeners[listenerID] {
  230. if event == s {
  231. return
  232. }
  233. }
  234. // otherwise, add this event to this listener
  235. w.evtCount[event] += 1
  236. w.listeners[listenerID] = append(w.listeners[listenerID], event)
  237. w.EventSwitch.AddListenerForEvent(listenerID, event, cb)
  238. }
  239. func (w *WSEvents) RemoveListenerForEvent(event string, listenerID string) {
  240. // if this listener is listening already, splice it out
  241. found := false
  242. l := w.listeners[listenerID]
  243. for i, s := range l {
  244. if event == s {
  245. found = true
  246. w.listeners[listenerID] = append(l[:i], l[i+1:]...)
  247. break
  248. }
  249. }
  250. // if the listener wasn't already listening to the event, exit early
  251. if !found {
  252. return
  253. }
  254. // now we can update the subscriptions
  255. w.evtCount[event] -= 1
  256. if w.evtCount[event] == 0 {
  257. w.unsubscribe(event)
  258. }
  259. w.EventSwitch.RemoveListenerForEvent(event, listenerID)
  260. }
  261. func (w *WSEvents) RemoveListener(listenerID string) {
  262. // remove all counts for this listener
  263. for _, s := range w.listeners[listenerID] {
  264. w.evtCount[s] -= 1
  265. if w.evtCount[s] == 0 {
  266. w.unsubscribe(s)
  267. }
  268. }
  269. w.listeners[listenerID] = nil
  270. // then let the switch do it's magic
  271. w.EventSwitch.RemoveListener(listenerID)
  272. }
  273. // After being reconnected, it is necessary to redo subscription
  274. // to server otherwise no data will be automatically received
  275. func (w *WSEvents) redoSubscriptions() {
  276. for event, _ := range w.evtCount {
  277. w.subscribe(event)
  278. }
  279. }
  280. // eventListener is an infinite loop pulling all websocket events
  281. // and pushing them to the EventSwitch.
  282. //
  283. // the goroutine only stops by closing quit
  284. func (w *WSEvents) eventListener() {
  285. for {
  286. select {
  287. case res := <-w.ws.ResultsCh:
  288. // res is json.RawMessage
  289. err := w.parseEvent(res)
  290. if err != nil {
  291. // FIXME: better logging/handling of errors??
  292. fmt.Printf("ws result: %+v\n", err)
  293. }
  294. case err := <-w.ws.ErrorsCh:
  295. // FIXME: better logging/handling of errors??
  296. fmt.Printf("ws err: %+v\n", err)
  297. case <-w.quit:
  298. // send a message so we can wait for the routine to exit
  299. // before cleaning up the w.ws stuff
  300. w.done <- true
  301. return
  302. }
  303. }
  304. }
  305. // parseEvent unmarshals the json message and converts it into
  306. // some implementation of types.TMEventData, and sends it off
  307. // on the merry way to the EventSwitch
  308. func (w *WSEvents) parseEvent(data []byte) (err error) {
  309. result := new(ctypes.ResultEvent)
  310. err = json.Unmarshal(data, result)
  311. if err != nil {
  312. // ignore silently (eg. subscribe, unsubscribe and maybe other events)
  313. // TODO: ?
  314. return nil
  315. }
  316. // looks good! let's fire this baby!
  317. w.EventSwitch.FireEvent(result.Name, result.Data)
  318. return nil
  319. }
  320. // no way of exposing these failures, so we panic.
  321. // is this right? or silently ignore???
  322. func (w *WSEvents) subscribe(event string) {
  323. err := w.ws.Subscribe(context.TODO(), event)
  324. if err != nil {
  325. panic(err)
  326. }
  327. }
  328. func (w *WSEvents) unsubscribe(event string) {
  329. err := w.ws.Unsubscribe(context.TODO(), event)
  330. if err != nil {
  331. panic(err)
  332. }
  333. }