You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

353 lines
9.8 KiB

  1. package client
  2. import (
  3. "fmt"
  4. "github.com/pkg/errors"
  5. events "github.com/tendermint/go-events"
  6. "github.com/tendermint/go-rpc/client"
  7. wire "github.com/tendermint/go-wire"
  8. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  9. "github.com/tendermint/tendermint/types"
  10. )
  11. /*
  12. HTTP is a Client implementation that communicates
  13. with a tendermint node over json rpc and websockets.
  14. This is the main implementation you probably want to use in
  15. production code. There are other implementations when calling
  16. the tendermint node in-process (local), or when you want to mock
  17. out the server for test code (mock).
  18. */
  19. type HTTP struct {
  20. remote string
  21. rpc *rpcclient.JSONRPCClient
  22. *WSEvents
  23. }
  24. // New takes a remote endpoint in the form tcp://<host>:<port>
  25. // and the websocket path (which always seems to be "/websocket")
  26. func NewHTTP(remote, wsEndpoint string) *HTTP {
  27. return &HTTP{
  28. rpc: rpcclient.NewJSONRPCClient(remote),
  29. remote: remote,
  30. WSEvents: newWSEvents(remote, wsEndpoint),
  31. }
  32. }
  33. func (c *HTTP) _assertIsClient() Client {
  34. return c
  35. }
  36. func (c *HTTP) _assertIsNetworkClient() NetworkClient {
  37. return c
  38. }
  39. func (c *HTTP) _assertIsEventSwitch() types.EventSwitch {
  40. return c
  41. }
  42. func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
  43. tmResult := new(ctypes.TMResult)
  44. _, err := c.rpc.Call("status", map[string]interface{}{}, tmResult)
  45. if err != nil {
  46. return nil, errors.Wrap(err, "Status")
  47. }
  48. // note: panics if rpc doesn't match. okay???
  49. return (*tmResult).(*ctypes.ResultStatus), nil
  50. }
  51. func (c *HTTP) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
  52. tmResult := new(ctypes.TMResult)
  53. _, err := c.rpc.Call("abci_info", map[string]interface{}{}, tmResult)
  54. if err != nil {
  55. return nil, errors.Wrap(err, "ABCIInfo")
  56. }
  57. return (*tmResult).(*ctypes.ResultABCIInfo), nil
  58. }
  59. func (c *HTTP) ABCIQuery(path string, data []byte, prove bool) (*ctypes.ResultABCIQuery, error) {
  60. tmResult := new(ctypes.TMResult)
  61. _, err := c.rpc.Call("abci_query",
  62. map[string]interface{}{"path": path, "data": data, "prove": prove},
  63. tmResult)
  64. if err != nil {
  65. return nil, errors.Wrap(err, "ABCIQuery")
  66. }
  67. return (*tmResult).(*ctypes.ResultABCIQuery), nil
  68. }
  69. func (c *HTTP) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
  70. tmResult := new(ctypes.TMResult)
  71. _, err := c.rpc.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult)
  72. if err != nil {
  73. return nil, errors.Wrap(err, "broadcast_tx_commit")
  74. }
  75. return (*tmResult).(*ctypes.ResultBroadcastTxCommit), nil
  76. }
  77. func (c *HTTP) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  78. return c.broadcastTX("broadcast_tx_async", tx)
  79. }
  80. func (c *HTTP) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  81. return c.broadcastTX("broadcast_tx_sync", tx)
  82. }
  83. func (c *HTTP) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  84. tmResult := new(ctypes.TMResult)
  85. _, err := c.rpc.Call(route, map[string]interface{}{"tx": tx}, tmResult)
  86. if err != nil {
  87. return nil, errors.Wrap(err, route)
  88. }
  89. return (*tmResult).(*ctypes.ResultBroadcastTx), nil
  90. }
  91. func (c *HTTP) NetInfo() (*ctypes.ResultNetInfo, error) {
  92. tmResult := new(ctypes.TMResult)
  93. _, err := c.rpc.Call("net_info", map[string]interface{}{}, tmResult)
  94. if err != nil {
  95. return nil, errors.Wrap(err, "NetInfo")
  96. }
  97. return (*tmResult).(*ctypes.ResultNetInfo), nil
  98. }
  99. func (c *HTTP) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
  100. tmResult := new(ctypes.TMResult)
  101. _, err := c.rpc.Call("dump_consensus_state", map[string]interface{}{}, tmResult)
  102. if err != nil {
  103. return nil, errors.Wrap(err, "DumpConsensusState")
  104. }
  105. return (*tmResult).(*ctypes.ResultDumpConsensusState), nil
  106. }
  107. func (c *HTTP) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) {
  108. tmResult := new(ctypes.TMResult)
  109. _, err := c.rpc.Call("blockchain",
  110. map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight},
  111. tmResult)
  112. if err != nil {
  113. return nil, errors.Wrap(err, "BlockchainInfo")
  114. }
  115. return (*tmResult).(*ctypes.ResultBlockchainInfo), nil
  116. }
  117. func (c *HTTP) Genesis() (*ctypes.ResultGenesis, error) {
  118. tmResult := new(ctypes.TMResult)
  119. _, err := c.rpc.Call("genesis", map[string]interface{}{}, tmResult)
  120. if err != nil {
  121. return nil, errors.Wrap(err, "Genesis")
  122. }
  123. return (*tmResult).(*ctypes.ResultGenesis), nil
  124. }
  125. func (c *HTTP) Block(height int) (*ctypes.ResultBlock, error) {
  126. tmResult := new(ctypes.TMResult)
  127. _, err := c.rpc.Call("block", map[string]interface{}{"height": height}, tmResult)
  128. if err != nil {
  129. return nil, errors.Wrap(err, "Block")
  130. }
  131. return (*tmResult).(*ctypes.ResultBlock), nil
  132. }
  133. func (c *HTTP) Commit(height int) (*ctypes.ResultCommit, error) {
  134. tmResult := new(ctypes.TMResult)
  135. _, err := c.rpc.Call("commit", map[string]interface{}{"height": height}, tmResult)
  136. if err != nil {
  137. return nil, errors.Wrap(err, "Commit")
  138. }
  139. return (*tmResult).(*ctypes.ResultCommit), nil
  140. }
  141. func (c *HTTP) Validators() (*ctypes.ResultValidators, error) {
  142. tmResult := new(ctypes.TMResult)
  143. _, err := c.rpc.Call("validators", map[string]interface{}{}, tmResult)
  144. if err != nil {
  145. return nil, errors.Wrap(err, "Validators")
  146. }
  147. return (*tmResult).(*ctypes.ResultValidators), nil
  148. }
  149. /** websocket event stuff here... **/
  150. type WSEvents struct {
  151. types.EventSwitch
  152. remote string
  153. endpoint string
  154. ws *rpcclient.WSClient
  155. // used for signaling the goroutine that feeds ws -> EventSwitch
  156. quit chan bool
  157. done chan bool
  158. // used to maintain counts of actively listened events
  159. // so we can properly subscribe/unsubscribe
  160. // FIXME: thread-safety???
  161. // FIXME: reuse code from go-events???
  162. evtCount map[string]int // count how many time each event is subscribed
  163. listeners map[string][]string // keep track of which events each listener is listening to
  164. }
  165. func newWSEvents(remote, endpoint string) *WSEvents {
  166. return &WSEvents{
  167. EventSwitch: types.NewEventSwitch(),
  168. endpoint: endpoint,
  169. remote: remote,
  170. quit: make(chan bool, 1),
  171. done: make(chan bool, 1),
  172. evtCount: map[string]int{},
  173. listeners: map[string][]string{},
  174. }
  175. }
  176. func (w *WSEvents) _assertIsEventSwitch() types.EventSwitch {
  177. return w
  178. }
  179. // Start is the only way I could think the extend OnStart from
  180. // events.eventSwitch. If only it wasn't private...
  181. // BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
  182. func (w *WSEvents) Start() (bool, error) {
  183. st, err := w.EventSwitch.Start()
  184. // if we did start, then OnStart here...
  185. if st && err == nil {
  186. ws := rpcclient.NewWSClient(w.remote, w.endpoint)
  187. _, err = ws.Start()
  188. if err == nil {
  189. w.ws = ws
  190. go w.eventListener()
  191. }
  192. }
  193. return st, errors.Wrap(err, "StartWSEvent")
  194. }
  195. // Stop wraps the BaseService/eventSwitch actions as Start does
  196. func (w *WSEvents) Stop() bool {
  197. stop := w.EventSwitch.Stop()
  198. if stop {
  199. // send a message to quit to stop the eventListener
  200. w.quit <- true
  201. <-w.done
  202. w.ws.Stop()
  203. w.ws = nil
  204. }
  205. return stop
  206. }
  207. /** TODO: more intelligent subscriptions! **/
  208. func (w *WSEvents) AddListenerForEvent(listenerID, event string, cb events.EventCallback) {
  209. // no one listening -> subscribe
  210. if w.evtCount[event] == 0 {
  211. w.subscribe(event)
  212. }
  213. // if this listener was already listening to this event, return early
  214. for _, s := range w.listeners[listenerID] {
  215. if event == s {
  216. return
  217. }
  218. }
  219. // otherwise, add this event to this listener
  220. w.evtCount[event] += 1
  221. w.listeners[listenerID] = append(w.listeners[listenerID], event)
  222. w.EventSwitch.AddListenerForEvent(listenerID, event, cb)
  223. }
  224. func (w *WSEvents) RemoveListenerForEvent(event string, listenerID string) {
  225. // if this listener is listening already, splice it out
  226. found := false
  227. l := w.listeners[listenerID]
  228. for i, s := range l {
  229. if event == s {
  230. found = true
  231. w.listeners[listenerID] = append(l[:i], l[i+1:]...)
  232. break
  233. }
  234. }
  235. // if the listener wasn't already listening to the event, exit early
  236. if !found {
  237. return
  238. }
  239. // now we can update the subscriptions
  240. w.evtCount[event] -= 1
  241. if w.evtCount[event] == 0 {
  242. w.unsubscribe(event)
  243. }
  244. w.EventSwitch.RemoveListenerForEvent(event, listenerID)
  245. }
  246. func (w *WSEvents) RemoveListener(listenerID string) {
  247. // remove all counts for this listener
  248. for _, s := range w.listeners[listenerID] {
  249. w.evtCount[s] -= 1
  250. if w.evtCount[s] == 0 {
  251. w.unsubscribe(s)
  252. }
  253. }
  254. w.listeners[listenerID] = nil
  255. // then let the switch do it's magic
  256. w.EventSwitch.RemoveListener(listenerID)
  257. }
  258. // eventListener is an infinite loop pulling all websocket events
  259. // and pushing them to the EventSwitch.
  260. //
  261. // the goroutine only stops by closing quit
  262. func (w *WSEvents) eventListener() {
  263. for {
  264. select {
  265. case res := <-w.ws.ResultsCh:
  266. // res is json.RawMessage
  267. err := w.parseEvent(res)
  268. if err != nil {
  269. // FIXME: better logging/handling of errors??
  270. fmt.Printf("ws result: %+v\n", err)
  271. }
  272. case err := <-w.ws.ErrorsCh:
  273. // FIXME: better logging/handling of errors??
  274. fmt.Printf("ws err: %+v\n", err)
  275. case <-w.quit:
  276. // send a message so we can wait for the routine to exit
  277. // before cleaning up the w.ws stuff
  278. w.done <- true
  279. return
  280. }
  281. }
  282. }
  283. // parseEvent unmarshals the json message and converts it into
  284. // some implementation of types.TMEventData, and sends it off
  285. // on the merry way to the EventSwitch
  286. func (w *WSEvents) parseEvent(data []byte) (err error) {
  287. result := new(ctypes.TMResult)
  288. wire.ReadJSONPtr(result, data, &err)
  289. if err != nil {
  290. return err
  291. }
  292. event, ok := (*result).(*ctypes.ResultEvent)
  293. if !ok {
  294. // ignore silently (eg. subscribe, unsubscribe and maybe other events)
  295. return nil
  296. }
  297. // looks good! let's fire this baby!
  298. w.EventSwitch.FireEvent(event.Name, event.Data)
  299. return nil
  300. }
  301. // no way of exposing these failures, so we panic.
  302. // is this right? or silently ignore???
  303. func (w *WSEvents) subscribe(event string) {
  304. err := w.ws.Subscribe(event)
  305. if err != nil {
  306. panic(err)
  307. }
  308. }
  309. func (w *WSEvents) unsubscribe(event string) {
  310. err := w.ws.Unsubscribe(event)
  311. if err != nil {
  312. panic(err)
  313. }
  314. }