You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

370 lines
10 KiB

8 years ago
  1. package client
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/pkg/errors"
  7. data "github.com/tendermint/go-wire/data"
  8. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  9. rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
  10. "github.com/tendermint/tendermint/types"
  11. events "github.com/tendermint/tmlibs/events"
  12. )
  13. /*
  14. HTTP is a Client implementation that communicates
  15. with a tendermint node over json rpc and websockets.
  16. This is the main implementation you probably want to use in
  17. production code. There are other implementations when calling
  18. the tendermint node in-process (local), or when you want to mock
  19. out the server for test code (mock).
  20. */
  21. type HTTP struct {
  22. remote string
  23. rpc *rpcclient.JSONRPCClient
  24. *WSEvents
  25. }
  26. // New takes a remote endpoint in the form tcp://<host>:<port>
  27. // and the websocket path (which always seems to be "/websocket")
  28. func NewHTTP(remote, wsEndpoint string) *HTTP {
  29. return &HTTP{
  30. rpc: rpcclient.NewJSONRPCClient(remote),
  31. remote: remote,
  32. WSEvents: newWSEvents(remote, wsEndpoint),
  33. }
  34. }
  35. var (
  36. _ Client = (*HTTP)(nil)
  37. _ NetworkClient = (*HTTP)(nil)
  38. _ types.EventSwitch = (*HTTP)(nil)
  39. _ types.EventSwitch = (*WSEvents)(nil)
  40. )
  41. func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
  42. result := new(ctypes.ResultStatus)
  43. _, err := c.rpc.Call("status", map[string]interface{}{}, result)
  44. if err != nil {
  45. return nil, errors.Wrap(err, "Status")
  46. }
  47. return result, nil
  48. }
  49. func (c *HTTP) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
  50. result := new(ctypes.ResultABCIInfo)
  51. _, err := c.rpc.Call("abci_info", map[string]interface{}{}, result)
  52. if err != nil {
  53. return nil, errors.Wrap(err, "ABCIInfo")
  54. }
  55. return result, nil
  56. }
  57. func (c *HTTP) ABCIQuery(path string, data data.Bytes) (*ctypes.ResultABCIQuery, error) {
  58. return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions)
  59. }
  60. func (c *HTTP) ABCIQueryWithOptions(path string, data data.Bytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
  61. result := new(ctypes.ResultABCIQuery)
  62. _, err := c.rpc.Call("abci_query",
  63. map[string]interface{}{"path": path, "data": data, "height": opts.Height, "trusted": opts.Trusted},
  64. result)
  65. if err != nil {
  66. return nil, errors.Wrap(err, "ABCIQuery")
  67. }
  68. return result, nil
  69. }
  70. func (c *HTTP) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
  71. result := new(ctypes.ResultBroadcastTxCommit)
  72. _, err := c.rpc.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result)
  73. if err != nil {
  74. return nil, errors.Wrap(err, "broadcast_tx_commit")
  75. }
  76. return result, nil
  77. }
  78. func (c *HTTP) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  79. return c.broadcastTX("broadcast_tx_async", tx)
  80. }
  81. func (c *HTTP) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  82. return c.broadcastTX("broadcast_tx_sync", tx)
  83. }
  84. func (c *HTTP) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  85. result := new(ctypes.ResultBroadcastTx)
  86. _, err := c.rpc.Call(route, map[string]interface{}{"tx": tx}, result)
  87. if err != nil {
  88. return nil, errors.Wrap(err, route)
  89. }
  90. return result, nil
  91. }
  92. func (c *HTTP) NetInfo() (*ctypes.ResultNetInfo, error) {
  93. result := new(ctypes.ResultNetInfo)
  94. _, err := c.rpc.Call("net_info", map[string]interface{}{}, result)
  95. if err != nil {
  96. return nil, errors.Wrap(err, "NetInfo")
  97. }
  98. return result, nil
  99. }
  100. func (c *HTTP) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
  101. result := new(ctypes.ResultDumpConsensusState)
  102. _, err := c.rpc.Call("dump_consensus_state", map[string]interface{}{}, result)
  103. if err != nil {
  104. return nil, errors.Wrap(err, "DumpConsensusState")
  105. }
  106. return result, nil
  107. }
  108. func (c *HTTP) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) {
  109. result := new(ctypes.ResultBlockchainInfo)
  110. _, err := c.rpc.Call("blockchain",
  111. map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight},
  112. result)
  113. if err != nil {
  114. return nil, errors.Wrap(err, "BlockchainInfo")
  115. }
  116. return result, nil
  117. }
  118. func (c *HTTP) Genesis() (*ctypes.ResultGenesis, error) {
  119. result := new(ctypes.ResultGenesis)
  120. _, err := c.rpc.Call("genesis", map[string]interface{}{}, result)
  121. if err != nil {
  122. return nil, errors.Wrap(err, "Genesis")
  123. }
  124. return result, nil
  125. }
  126. func (c *HTTP) Block(height *int) (*ctypes.ResultBlock, error) {
  127. result := new(ctypes.ResultBlock)
  128. _, err := c.rpc.Call("block", map[string]interface{}{"height": height}, result)
  129. if err != nil {
  130. return nil, errors.Wrap(err, "Block")
  131. }
  132. return result, nil
  133. }
  134. func (c *HTTP) Commit(height *int) (*ctypes.ResultCommit, error) {
  135. result := new(ctypes.ResultCommit)
  136. _, err := c.rpc.Call("commit", map[string]interface{}{"height": height}, result)
  137. if err != nil {
  138. return nil, errors.Wrap(err, "Commit")
  139. }
  140. return result, nil
  141. }
  142. func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
  143. result := new(ctypes.ResultTx)
  144. query := map[string]interface{}{
  145. "hash": hash,
  146. "prove": prove,
  147. }
  148. _, err := c.rpc.Call("tx", query, result)
  149. if err != nil {
  150. return nil, errors.Wrap(err, "Tx")
  151. }
  152. return result, nil
  153. }
  154. func (c *HTTP) Validators(height *int) (*ctypes.ResultValidators, error) {
  155. result := new(ctypes.ResultValidators)
  156. _, err := c.rpc.Call("validators", map[string]interface{}{"height": height}, result)
  157. if err != nil {
  158. return nil, errors.Wrap(err, "Validators")
  159. }
  160. return result, nil
  161. }
  162. /** websocket event stuff here... **/
  163. type WSEvents struct {
  164. types.EventSwitch
  165. remote string
  166. endpoint string
  167. ws *rpcclient.WSClient
  168. // used for signaling the goroutine that feeds ws -> EventSwitch
  169. quit chan bool
  170. done chan bool
  171. // used to maintain counts of actively listened events
  172. // so we can properly subscribe/unsubscribe
  173. // FIXME: thread-safety???
  174. // FIXME: reuse code from tmlibs/events???
  175. evtCount map[string]int // count how many time each event is subscribed
  176. listeners map[string][]string // keep track of which events each listener is listening to
  177. }
  178. func newWSEvents(remote, endpoint string) *WSEvents {
  179. return &WSEvents{
  180. EventSwitch: types.NewEventSwitch(),
  181. endpoint: endpoint,
  182. remote: remote,
  183. quit: make(chan bool, 1),
  184. done: make(chan bool, 1),
  185. evtCount: map[string]int{},
  186. listeners: map[string][]string{},
  187. }
  188. }
  189. // Start is the only way I could think the extend OnStart from
  190. // events.eventSwitch. If only it wasn't private...
  191. // BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
  192. func (w *WSEvents) Start() (bool, error) {
  193. st, err := w.EventSwitch.Start()
  194. // if we did start, then OnStart here...
  195. if st && err == nil {
  196. ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
  197. w.redoSubscriptions()
  198. }))
  199. _, err = ws.Start()
  200. if err == nil {
  201. w.ws = ws
  202. go w.eventListener()
  203. }
  204. }
  205. return st, errors.Wrap(err, "StartWSEvent")
  206. }
  207. // Stop wraps the BaseService/eventSwitch actions as Start does
  208. func (w *WSEvents) Stop() bool {
  209. stop := w.EventSwitch.Stop()
  210. if stop {
  211. // send a message to quit to stop the eventListener
  212. w.quit <- true
  213. <-w.done
  214. w.ws.Stop()
  215. w.ws = nil
  216. }
  217. return stop
  218. }
  219. /** TODO: more intelligent subscriptions! **/
  220. func (w *WSEvents) AddListenerForEvent(listenerID, event string, cb events.EventCallback) {
  221. // no one listening -> subscribe
  222. if w.evtCount[event] == 0 {
  223. w.subscribe(event)
  224. }
  225. // if this listener was already listening to this event, return early
  226. for _, s := range w.listeners[listenerID] {
  227. if event == s {
  228. return
  229. }
  230. }
  231. // otherwise, add this event to this listener
  232. w.evtCount[event] += 1
  233. w.listeners[listenerID] = append(w.listeners[listenerID], event)
  234. w.EventSwitch.AddListenerForEvent(listenerID, event, cb)
  235. }
  236. func (w *WSEvents) RemoveListenerForEvent(event string, listenerID string) {
  237. // if this listener is listening already, splice it out
  238. found := false
  239. l := w.listeners[listenerID]
  240. for i, s := range l {
  241. if event == s {
  242. found = true
  243. w.listeners[listenerID] = append(l[:i], l[i+1:]...)
  244. break
  245. }
  246. }
  247. // if the listener wasn't already listening to the event, exit early
  248. if !found {
  249. return
  250. }
  251. // now we can update the subscriptions
  252. w.evtCount[event] -= 1
  253. if w.evtCount[event] == 0 {
  254. w.unsubscribe(event)
  255. }
  256. w.EventSwitch.RemoveListenerForEvent(event, listenerID)
  257. }
  258. func (w *WSEvents) RemoveListener(listenerID string) {
  259. // remove all counts for this listener
  260. for _, s := range w.listeners[listenerID] {
  261. w.evtCount[s] -= 1
  262. if w.evtCount[s] == 0 {
  263. w.unsubscribe(s)
  264. }
  265. }
  266. w.listeners[listenerID] = nil
  267. // then let the switch do it's magic
  268. w.EventSwitch.RemoveListener(listenerID)
  269. }
  270. // After being reconnected, it is necessary to redo subscription
  271. // to server otherwise no data will be automatically received
  272. func (w *WSEvents) redoSubscriptions() {
  273. for event, _ := range w.evtCount {
  274. w.subscribe(event)
  275. }
  276. }
  277. // eventListener is an infinite loop pulling all websocket events
  278. // and pushing them to the EventSwitch.
  279. //
  280. // the goroutine only stops by closing quit
  281. func (w *WSEvents) eventListener() {
  282. for {
  283. select {
  284. case res := <-w.ws.ResultsCh:
  285. // res is json.RawMessage
  286. err := w.parseEvent(res)
  287. if err != nil {
  288. // FIXME: better logging/handling of errors??
  289. fmt.Printf("ws result: %+v\n", err)
  290. }
  291. case err := <-w.ws.ErrorsCh:
  292. // FIXME: better logging/handling of errors??
  293. fmt.Printf("ws err: %+v\n", err)
  294. case <-w.quit:
  295. // send a message so we can wait for the routine to exit
  296. // before cleaning up the w.ws stuff
  297. w.done <- true
  298. return
  299. }
  300. }
  301. }
  302. // parseEvent unmarshals the json message and converts it into
  303. // some implementation of types.TMEventData, and sends it off
  304. // on the merry way to the EventSwitch
  305. func (w *WSEvents) parseEvent(data []byte) (err error) {
  306. result := new(ctypes.ResultEvent)
  307. err = json.Unmarshal(data, result)
  308. if err != nil {
  309. // ignore silently (eg. subscribe, unsubscribe and maybe other events)
  310. // TODO: ?
  311. return nil
  312. }
  313. // looks good! let's fire this baby!
  314. w.EventSwitch.FireEvent(result.Name, result.Data)
  315. return nil
  316. }
  317. // no way of exposing these failures, so we panic.
  318. // is this right? or silently ignore???
  319. func (w *WSEvents) subscribe(event string) {
  320. err := w.ws.Subscribe(context.TODO(), event)
  321. if err != nil {
  322. panic(err)
  323. }
  324. }
  325. func (w *WSEvents) unsubscribe(event string) {
  326. err := w.ws.Unsubscribe(context.TODO(), event)
  327. if err != nil {
  328. panic(err)
  329. }
  330. }