You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

349 lines
9.6 KiB

  1. package client
  2. import (
  3. "fmt"
  4. "github.com/pkg/errors"
  5. events "github.com/tendermint/go-events"
  6. "github.com/tendermint/go-rpc/client"
  7. wire "github.com/tendermint/go-wire"
  8. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  9. "github.com/tendermint/tendermint/types"
  10. )
  11. /*
  12. HTTP is a Client implementation that communicates
  13. with a tendermint node over json rpc and websockets.
  14. This is the main implementation you probably want to use in
  15. production code. There are other implementations when calling
  16. the tendermint node in-process (local), or when you want to mock
  17. out the server for test code (mock).
  18. */
  19. type HTTP struct {
  20. remote string
  21. rpc *rpcclient.ClientJSONRPC
  22. *WSEvents
  23. }
  24. // New takes a remote endpoint in the form tcp://<host>:<port>
  25. // and the websocket path (which always seems to be "/websocket")
  26. func NewHTTP(remote, wsEndpoint string) *HTTP {
  27. return &HTTP{
  28. rpc: rpcclient.NewClientJSONRPC(remote),
  29. remote: remote,
  30. WSEvents: newWSEvents(remote, wsEndpoint),
  31. }
  32. }
  33. func (c *HTTP) _assertIsClient() Client {
  34. return c
  35. }
  36. func (c *HTTP) _assertIsNetworkClient() NetworkClient {
  37. return c
  38. }
  39. func (c *HTTP) _assertIsEventSwitch() types.EventSwitch {
  40. return c
  41. }
  42. func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
  43. tmResult := new(ctypes.TMResult)
  44. _, err := c.rpc.Call("status", []interface{}{}, tmResult)
  45. if err != nil {
  46. return nil, errors.Wrap(err, "Status")
  47. }
  48. // note: panics if rpc doesn't match. okay???
  49. return (*tmResult).(*ctypes.ResultStatus), nil
  50. }
  51. func (c *HTTP) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
  52. tmResult := new(ctypes.TMResult)
  53. _, err := c.rpc.Call("abci_info", []interface{}{}, tmResult)
  54. if err != nil {
  55. return nil, errors.Wrap(err, "ABCIInfo")
  56. }
  57. return (*tmResult).(*ctypes.ResultABCIInfo), nil
  58. }
  59. func (c *HTTP) ABCIQuery(path string, data []byte, prove bool) (*ctypes.ResultABCIQuery, error) {
  60. tmResult := new(ctypes.TMResult)
  61. _, err := c.rpc.Call("abci_query", []interface{}{path, data, prove}, tmResult)
  62. if err != nil {
  63. return nil, errors.Wrap(err, "ABCIQuery")
  64. }
  65. return (*tmResult).(*ctypes.ResultABCIQuery), nil
  66. }
  67. func (c *HTTP) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
  68. tmResult := new(ctypes.TMResult)
  69. _, err := c.rpc.Call("broadcast_tx_commit", []interface{}{tx}, tmResult)
  70. if err != nil {
  71. return nil, errors.Wrap(err, "broadcast_tx_commit")
  72. }
  73. return (*tmResult).(*ctypes.ResultBroadcastTxCommit), nil
  74. }
  75. func (c *HTTP) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  76. return c.broadcastTX("broadcast_tx_async", tx)
  77. }
  78. func (c *HTTP) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  79. return c.broadcastTX("broadcast_tx_sync", tx)
  80. }
  81. func (c *HTTP) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  82. tmResult := new(ctypes.TMResult)
  83. _, err := c.rpc.Call(route, []interface{}{tx}, tmResult)
  84. if err != nil {
  85. return nil, errors.Wrap(err, route)
  86. }
  87. return (*tmResult).(*ctypes.ResultBroadcastTx), nil
  88. }
  89. func (c *HTTP) NetInfo() (*ctypes.ResultNetInfo, error) {
  90. tmResult := new(ctypes.TMResult)
  91. _, err := c.rpc.Call("net_info", nil, tmResult)
  92. if err != nil {
  93. return nil, errors.Wrap(err, "NetInfo")
  94. }
  95. return (*tmResult).(*ctypes.ResultNetInfo), nil
  96. }
  97. func (c *HTTP) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
  98. tmResult := new(ctypes.TMResult)
  99. _, err := c.rpc.Call("dump_consensus_state", nil, tmResult)
  100. if err != nil {
  101. return nil, errors.Wrap(err, "DumpConsensusState")
  102. }
  103. return (*tmResult).(*ctypes.ResultDumpConsensusState), nil
  104. }
  105. func (c *HTTP) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) {
  106. tmResult := new(ctypes.TMResult)
  107. _, err := c.rpc.Call("blockchain", []interface{}{minHeight, maxHeight}, tmResult)
  108. if err != nil {
  109. return nil, errors.Wrap(err, "BlockchainInfo")
  110. }
  111. return (*tmResult).(*ctypes.ResultBlockchainInfo), nil
  112. }
  113. func (c *HTTP) Genesis() (*ctypes.ResultGenesis, error) {
  114. tmResult := new(ctypes.TMResult)
  115. _, err := c.rpc.Call("genesis", nil, tmResult)
  116. if err != nil {
  117. return nil, errors.Wrap(err, "Genesis")
  118. }
  119. return (*tmResult).(*ctypes.ResultGenesis), nil
  120. }
  121. func (c *HTTP) Block(height int) (*ctypes.ResultBlock, error) {
  122. tmResult := new(ctypes.TMResult)
  123. _, err := c.rpc.Call("block", []interface{}{height}, tmResult)
  124. if err != nil {
  125. return nil, errors.Wrap(err, "Block")
  126. }
  127. return (*tmResult).(*ctypes.ResultBlock), nil
  128. }
  129. func (c *HTTP) Commit(height int) (*ctypes.ResultCommit, error) {
  130. tmResult := new(ctypes.TMResult)
  131. _, err := c.rpc.Call("commit", []interface{}{height}, tmResult)
  132. if err != nil {
  133. return nil, errors.Wrap(err, "Commit")
  134. }
  135. return (*tmResult).(*ctypes.ResultCommit), nil
  136. }
  137. func (c *HTTP) Validators() (*ctypes.ResultValidators, error) {
  138. tmResult := new(ctypes.TMResult)
  139. _, err := c.rpc.Call("validators", nil, tmResult)
  140. if err != nil {
  141. return nil, errors.Wrap(err, "Validators")
  142. }
  143. return (*tmResult).(*ctypes.ResultValidators), nil
  144. }
  145. /** websocket event stuff here... **/
  146. type WSEvents struct {
  147. types.EventSwitch
  148. remote string
  149. endpoint string
  150. ws *rpcclient.WSClient
  151. // used for signaling the goroutine that feeds ws -> EventSwitch
  152. quit chan bool
  153. done chan bool
  154. // used to maintain counts of actively listened events
  155. // so we can properly subscribe/unsubscribe
  156. // FIXME: thread-safety???
  157. // FIXME: reuse code from go-events???
  158. evtCount map[string]int // count how many time each event is subscribed
  159. listeners map[string][]string // keep track of which events each listener is listening to
  160. }
  161. func newWSEvents(remote, endpoint string) *WSEvents {
  162. return &WSEvents{
  163. EventSwitch: types.NewEventSwitch(),
  164. endpoint: endpoint,
  165. remote: remote,
  166. quit: make(chan bool, 1),
  167. done: make(chan bool, 1),
  168. evtCount: map[string]int{},
  169. listeners: map[string][]string{},
  170. }
  171. }
  172. func (w *WSEvents) _assertIsEventSwitch() types.EventSwitch {
  173. return w
  174. }
  175. // Start is the only way I could think the extend OnStart from
  176. // events.eventSwitch. If only it wasn't private...
  177. // BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
  178. func (w *WSEvents) Start() (bool, error) {
  179. st, err := w.EventSwitch.Start()
  180. // if we did start, then OnStart here...
  181. if st && err == nil {
  182. ws := rpcclient.NewWSClient(w.remote, w.endpoint)
  183. _, err = ws.Start()
  184. if err == nil {
  185. w.ws = ws
  186. go w.eventListener()
  187. }
  188. }
  189. return st, errors.Wrap(err, "StartWSEvent")
  190. }
  191. // Stop wraps the BaseService/eventSwitch actions as Start does
  192. func (w *WSEvents) Stop() bool {
  193. stop := w.EventSwitch.Stop()
  194. if stop {
  195. // send a message to quit to stop the eventListener
  196. w.quit <- true
  197. <-w.done
  198. w.ws.Stop()
  199. w.ws = nil
  200. }
  201. return stop
  202. }
  203. /** TODO: more intelligent subscriptions! **/
  204. func (w *WSEvents) AddListenerForEvent(listenerID, event string, cb events.EventCallback) {
  205. // no one listening -> subscribe
  206. if w.evtCount[event] == 0 {
  207. w.subscribe(event)
  208. }
  209. // if this listener was already listening to this event, return early
  210. for _, s := range w.listeners[listenerID] {
  211. if event == s {
  212. return
  213. }
  214. }
  215. // otherwise, add this event to this listener
  216. w.evtCount[event] += 1
  217. w.listeners[listenerID] = append(w.listeners[listenerID], event)
  218. w.EventSwitch.AddListenerForEvent(listenerID, event, cb)
  219. }
  220. func (w *WSEvents) RemoveListenerForEvent(event string, listenerID string) {
  221. // if this listener is listening already, splice it out
  222. found := false
  223. l := w.listeners[listenerID]
  224. for i, s := range l {
  225. if event == s {
  226. found = true
  227. w.listeners[listenerID] = append(l[:i], l[i+1:]...)
  228. break
  229. }
  230. }
  231. // if the listener wasn't already listening to the event, exit early
  232. if !found {
  233. return
  234. }
  235. // now we can update the subscriptions
  236. w.evtCount[event] -= 1
  237. if w.evtCount[event] == 0 {
  238. w.unsubscribe(event)
  239. }
  240. w.EventSwitch.RemoveListenerForEvent(event, listenerID)
  241. }
  242. func (w *WSEvents) RemoveListener(listenerID string) {
  243. // remove all counts for this listener
  244. for _, s := range w.listeners[listenerID] {
  245. w.evtCount[s] -= 1
  246. if w.evtCount[s] == 0 {
  247. w.unsubscribe(s)
  248. }
  249. }
  250. w.listeners[listenerID] = nil
  251. // then let the switch do it's magic
  252. w.EventSwitch.RemoveListener(listenerID)
  253. }
  254. // eventListener is an infinite loop pulling all websocket events
  255. // and pushing them to the EventSwitch.
  256. //
  257. // the goroutine only stops by closing quit
  258. func (w *WSEvents) eventListener() {
  259. for {
  260. select {
  261. case res := <-w.ws.ResultsCh:
  262. // res is json.RawMessage
  263. err := w.parseEvent(res)
  264. if err != nil {
  265. // FIXME: better logging/handling of errors??
  266. fmt.Printf("ws result: %+v\n", err)
  267. }
  268. case err := <-w.ws.ErrorsCh:
  269. // FIXME: better logging/handling of errors??
  270. fmt.Printf("ws err: %+v\n", err)
  271. case <-w.quit:
  272. // send a message so we can wait for the routine to exit
  273. // before cleaning up the w.ws stuff
  274. w.done <- true
  275. return
  276. }
  277. }
  278. }
  279. // parseEvent unmarshals the json message and converts it into
  280. // some implementation of types.TMEventData, and sends it off
  281. // on the merry way to the EventSwitch
  282. func (w *WSEvents) parseEvent(data []byte) (err error) {
  283. result := new(ctypes.TMResult)
  284. wire.ReadJSONPtr(result, data, &err)
  285. if err != nil {
  286. return err
  287. }
  288. event, ok := (*result).(*ctypes.ResultEvent)
  289. if !ok {
  290. // ignore silently (eg. subscribe, unsubscribe and maybe other events)
  291. return nil
  292. }
  293. // looks good! let's fire this baby!
  294. w.EventSwitch.FireEvent(event.Name, event.Data)
  295. return nil
  296. }
  297. // no way of exposing these failures, so we panic.
  298. // is this right? or silently ignore???
  299. func (w *WSEvents) subscribe(event string) {
  300. err := w.ws.Subscribe(event)
  301. if err != nil {
  302. panic(err)
  303. }
  304. }
  305. func (w *WSEvents) unsubscribe(event string) {
  306. err := w.ws.Unsubscribe(event)
  307. if err != nil {
  308. panic(err)
  309. }
  310. }