You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

366 lines
10 KiB

  1. package client
  2. import (
  3. "fmt"
  4. "github.com/pkg/errors"
  5. events "github.com/tendermint/go-events"
  6. "github.com/tendermint/go-rpc/client"
  7. wire "github.com/tendermint/go-wire"
  8. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  9. "github.com/tendermint/tendermint/types"
  10. )
  11. /*
  12. HTTP is a Client implementation that communicates
  13. with a tendermint node over json rpc and websockets.
  14. This is the main implementation you probably want to use in
  15. production code. There are other implementations when calling
  16. the tendermint node in-process (local), or when you want to mock
  17. out the server for test code (mock).
  18. */
  19. type HTTP struct {
  20. remote string
  21. rpc *rpcclient.JSONRPCClient
  22. *WSEvents
  23. }
  24. // New takes a remote endpoint in the form tcp://<host>:<port>
  25. // and the websocket path (which always seems to be "/websocket")
  26. func NewHTTP(remote, wsEndpoint string) *HTTP {
  27. return &HTTP{
  28. rpc: rpcclient.NewJSONRPCClient(remote),
  29. remote: remote,
  30. WSEvents: newWSEvents(remote, wsEndpoint),
  31. }
  32. }
  33. func (c *HTTP) _assertIsClient() Client {
  34. return c
  35. }
  36. func (c *HTTP) _assertIsNetworkClient() NetworkClient {
  37. return c
  38. }
  39. func (c *HTTP) _assertIsEventSwitch() types.EventSwitch {
  40. return c
  41. }
  42. func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
  43. tmResult := new(ctypes.TMResult)
  44. _, err := c.rpc.Call("status", map[string]interface{}{}, tmResult)
  45. if err != nil {
  46. return nil, errors.Wrap(err, "Status")
  47. }
  48. // note: panics if rpc doesn't match. okay???
  49. return (*tmResult).(*ctypes.ResultStatus), nil
  50. }
  51. func (c *HTTP) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
  52. tmResult := new(ctypes.TMResult)
  53. _, err := c.rpc.Call("abci_info", map[string]interface{}{}, tmResult)
  54. if err != nil {
  55. return nil, errors.Wrap(err, "ABCIInfo")
  56. }
  57. return (*tmResult).(*ctypes.ResultABCIInfo), nil
  58. }
  59. func (c *HTTP) ABCIQuery(path string, data []byte, prove bool) (*ctypes.ResultABCIQuery, error) {
  60. tmResult := new(ctypes.TMResult)
  61. _, err := c.rpc.Call("abci_query",
  62. map[string]interface{}{"path": path, "data": data, "prove": prove},
  63. tmResult)
  64. if err != nil {
  65. return nil, errors.Wrap(err, "ABCIQuery")
  66. }
  67. return (*tmResult).(*ctypes.ResultABCIQuery), nil
  68. }
  69. func (c *HTTP) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
  70. tmResult := new(ctypes.TMResult)
  71. _, err := c.rpc.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult)
  72. if err != nil {
  73. return nil, errors.Wrap(err, "broadcast_tx_commit")
  74. }
  75. return (*tmResult).(*ctypes.ResultBroadcastTxCommit), nil
  76. }
  77. func (c *HTTP) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  78. return c.broadcastTX("broadcast_tx_async", tx)
  79. }
  80. func (c *HTTP) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  81. return c.broadcastTX("broadcast_tx_sync", tx)
  82. }
  83. func (c *HTTP) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  84. tmResult := new(ctypes.TMResult)
  85. _, err := c.rpc.Call(route, map[string]interface{}{"tx": tx}, tmResult)
  86. if err != nil {
  87. return nil, errors.Wrap(err, route)
  88. }
  89. return (*tmResult).(*ctypes.ResultBroadcastTx), nil
  90. }
  91. func (c *HTTP) NetInfo() (*ctypes.ResultNetInfo, error) {
  92. tmResult := new(ctypes.TMResult)
  93. _, err := c.rpc.Call("net_info", map[string]interface{}{}, tmResult)
  94. if err != nil {
  95. return nil, errors.Wrap(err, "NetInfo")
  96. }
  97. return (*tmResult).(*ctypes.ResultNetInfo), nil
  98. }
  99. func (c *HTTP) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
  100. tmResult := new(ctypes.TMResult)
  101. _, err := c.rpc.Call("dump_consensus_state", map[string]interface{}{}, tmResult)
  102. if err != nil {
  103. return nil, errors.Wrap(err, "DumpConsensusState")
  104. }
  105. return (*tmResult).(*ctypes.ResultDumpConsensusState), nil
  106. }
  107. func (c *HTTP) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) {
  108. tmResult := new(ctypes.TMResult)
  109. _, err := c.rpc.Call("blockchain",
  110. map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight},
  111. tmResult)
  112. if err != nil {
  113. return nil, errors.Wrap(err, "BlockchainInfo")
  114. }
  115. return (*tmResult).(*ctypes.ResultBlockchainInfo), nil
  116. }
  117. func (c *HTTP) Genesis() (*ctypes.ResultGenesis, error) {
  118. tmResult := new(ctypes.TMResult)
  119. _, err := c.rpc.Call("genesis", map[string]interface{}{}, tmResult)
  120. if err != nil {
  121. return nil, errors.Wrap(err, "Genesis")
  122. }
  123. return (*tmResult).(*ctypes.ResultGenesis), nil
  124. }
  125. func (c *HTTP) Block(height int) (*ctypes.ResultBlock, error) {
  126. tmResult := new(ctypes.TMResult)
  127. _, err := c.rpc.Call("block", map[string]interface{}{"height": height}, tmResult)
  128. if err != nil {
  129. return nil, errors.Wrap(err, "Block")
  130. }
  131. return (*tmResult).(*ctypes.ResultBlock), nil
  132. }
  133. func (c *HTTP) Commit(height int) (*ctypes.ResultCommit, error) {
  134. tmResult := new(ctypes.TMResult)
  135. _, err := c.rpc.Call("commit", map[string]interface{}{"height": height}, tmResult)
  136. if err != nil {
  137. return nil, errors.Wrap(err, "Commit")
  138. }
  139. return (*tmResult).(*ctypes.ResultCommit), nil
  140. }
  141. func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
  142. tmResult := new(ctypes.TMResult)
  143. query := map[string]interface{}{
  144. "hash": hash,
  145. "prove": prove,
  146. }
  147. _, err := c.rpc.Call("tx", query, tmResult)
  148. if err != nil {
  149. return nil, errors.Wrap(err, "Tx")
  150. }
  151. return (*tmResult).(*ctypes.ResultTx), nil
  152. }
  153. func (c *HTTP) Validators() (*ctypes.ResultValidators, error) {
  154. tmResult := new(ctypes.TMResult)
  155. _, err := c.rpc.Call("validators", map[string]interface{}{}, tmResult)
  156. if err != nil {
  157. return nil, errors.Wrap(err, "Validators")
  158. }
  159. return (*tmResult).(*ctypes.ResultValidators), nil
  160. }
  161. /** websocket event stuff here... **/
  162. type WSEvents struct {
  163. types.EventSwitch
  164. remote string
  165. endpoint string
  166. ws *rpcclient.WSClient
  167. // used for signaling the goroutine that feeds ws -> EventSwitch
  168. quit chan bool
  169. done chan bool
  170. // used to maintain counts of actively listened events
  171. // so we can properly subscribe/unsubscribe
  172. // FIXME: thread-safety???
  173. // FIXME: reuse code from go-events???
  174. evtCount map[string]int // count how many time each event is subscribed
  175. listeners map[string][]string // keep track of which events each listener is listening to
  176. }
  177. func newWSEvents(remote, endpoint string) *WSEvents {
  178. return &WSEvents{
  179. EventSwitch: types.NewEventSwitch(),
  180. endpoint: endpoint,
  181. remote: remote,
  182. quit: make(chan bool, 1),
  183. done: make(chan bool, 1),
  184. evtCount: map[string]int{},
  185. listeners: map[string][]string{},
  186. }
  187. }
  188. func (w *WSEvents) _assertIsEventSwitch() types.EventSwitch {
  189. return w
  190. }
  191. // Start is the only way I could think the extend OnStart from
  192. // events.eventSwitch. If only it wasn't private...
  193. // BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
  194. func (w *WSEvents) Start() (bool, error) {
  195. st, err := w.EventSwitch.Start()
  196. // if we did start, then OnStart here...
  197. if st && err == nil {
  198. ws := rpcclient.NewWSClient(w.remote, w.endpoint)
  199. _, err = ws.Start()
  200. if err == nil {
  201. w.ws = ws
  202. go w.eventListener()
  203. }
  204. }
  205. return st, errors.Wrap(err, "StartWSEvent")
  206. }
  207. // Stop wraps the BaseService/eventSwitch actions as Start does
  208. func (w *WSEvents) Stop() bool {
  209. stop := w.EventSwitch.Stop()
  210. if stop {
  211. // send a message to quit to stop the eventListener
  212. w.quit <- true
  213. <-w.done
  214. w.ws.Stop()
  215. w.ws = nil
  216. }
  217. return stop
  218. }
  219. /** TODO: more intelligent subscriptions! **/
  220. func (w *WSEvents) AddListenerForEvent(listenerID, event string, cb events.EventCallback) {
  221. // no one listening -> subscribe
  222. if w.evtCount[event] == 0 {
  223. w.subscribe(event)
  224. }
  225. // if this listener was already listening to this event, return early
  226. for _, s := range w.listeners[listenerID] {
  227. if event == s {
  228. return
  229. }
  230. }
  231. // otherwise, add this event to this listener
  232. w.evtCount[event] += 1
  233. w.listeners[listenerID] = append(w.listeners[listenerID], event)
  234. w.EventSwitch.AddListenerForEvent(listenerID, event, cb)
  235. }
  236. func (w *WSEvents) RemoveListenerForEvent(event string, listenerID string) {
  237. // if this listener is listening already, splice it out
  238. found := false
  239. l := w.listeners[listenerID]
  240. for i, s := range l {
  241. if event == s {
  242. found = true
  243. w.listeners[listenerID] = append(l[:i], l[i+1:]...)
  244. break
  245. }
  246. }
  247. // if the listener wasn't already listening to the event, exit early
  248. if !found {
  249. return
  250. }
  251. // now we can update the subscriptions
  252. w.evtCount[event] -= 1
  253. if w.evtCount[event] == 0 {
  254. w.unsubscribe(event)
  255. }
  256. w.EventSwitch.RemoveListenerForEvent(event, listenerID)
  257. }
  258. func (w *WSEvents) RemoveListener(listenerID string) {
  259. // remove all counts for this listener
  260. for _, s := range w.listeners[listenerID] {
  261. w.evtCount[s] -= 1
  262. if w.evtCount[s] == 0 {
  263. w.unsubscribe(s)
  264. }
  265. }
  266. w.listeners[listenerID] = nil
  267. // then let the switch do it's magic
  268. w.EventSwitch.RemoveListener(listenerID)
  269. }
  270. // eventListener is an infinite loop pulling all websocket events
  271. // and pushing them to the EventSwitch.
  272. //
  273. // the goroutine only stops by closing quit
  274. func (w *WSEvents) eventListener() {
  275. for {
  276. select {
  277. case res := <-w.ws.ResultsCh:
  278. // res is json.RawMessage
  279. err := w.parseEvent(res)
  280. if err != nil {
  281. // FIXME: better logging/handling of errors??
  282. fmt.Printf("ws result: %+v\n", err)
  283. }
  284. case err := <-w.ws.ErrorsCh:
  285. // FIXME: better logging/handling of errors??
  286. fmt.Printf("ws err: %+v\n", err)
  287. case <-w.quit:
  288. // send a message so we can wait for the routine to exit
  289. // before cleaning up the w.ws stuff
  290. w.done <- true
  291. return
  292. }
  293. }
  294. }
  295. // parseEvent unmarshals the json message and converts it into
  296. // some implementation of types.TMEventData, and sends it off
  297. // on the merry way to the EventSwitch
  298. func (w *WSEvents) parseEvent(data []byte) (err error) {
  299. result := new(ctypes.TMResult)
  300. wire.ReadJSONPtr(result, data, &err)
  301. if err != nil {
  302. return err
  303. }
  304. event, ok := (*result).(*ctypes.ResultEvent)
  305. if !ok {
  306. // ignore silently (eg. subscribe, unsubscribe and maybe other events)
  307. return nil
  308. }
  309. // looks good! let's fire this baby!
  310. w.EventSwitch.FireEvent(event.Name, event.Data)
  311. return nil
  312. }
  313. // no way of exposing these failures, so we panic.
  314. // is this right? or silently ignore???
  315. func (w *WSEvents) subscribe(event string) {
  316. err := w.ws.Subscribe(event)
  317. if err != nil {
  318. panic(err)
  319. }
  320. }
  321. func (w *WSEvents) unsubscribe(event string) {
  322. err := w.ws.Unsubscribe(event)
  323. if err != nil {
  324. panic(err)
  325. }
  326. }