You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

366 lines
10 KiB

8 years ago
  1. package client
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/pkg/errors"
  7. data "github.com/tendermint/go-wire/data"
  8. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  9. rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
  10. "github.com/tendermint/tendermint/types"
  11. events "github.com/tendermint/tmlibs/events"
  12. )
  13. /*
  14. HTTP is a Client implementation that communicates
  15. with a tendermint node over json rpc and websockets.
  16. This is the main implementation you probably want to use in
  17. production code. There are other implementations when calling
  18. the tendermint node in-process (local), or when you want to mock
  19. out the server for test code (mock).
  20. */
  21. type HTTP struct {
  22. remote string
  23. rpc *rpcclient.JSONRPCClient
  24. *WSEvents
  25. }
  26. // New takes a remote endpoint in the form tcp://<host>:<port>
  27. // and the websocket path (which always seems to be "/websocket")
  28. func NewHTTP(remote, wsEndpoint string) *HTTP {
  29. return &HTTP{
  30. rpc: rpcclient.NewJSONRPCClient(remote),
  31. remote: remote,
  32. WSEvents: newWSEvents(remote, wsEndpoint),
  33. }
  34. }
  35. var (
  36. _ Client = (*HTTP)(nil)
  37. _ NetworkClient = (*HTTP)(nil)
  38. _ types.EventSwitch = (*HTTP)(nil)
  39. _ types.EventSwitch = (*WSEvents)(nil)
  40. )
  41. func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
  42. result := new(ctypes.ResultStatus)
  43. _, err := c.rpc.Call("status", map[string]interface{}{}, result)
  44. if err != nil {
  45. return nil, errors.Wrap(err, "Status")
  46. }
  47. return result, nil
  48. }
  49. func (c *HTTP) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
  50. result := new(ctypes.ResultABCIInfo)
  51. _, err := c.rpc.Call("abci_info", map[string]interface{}{}, result)
  52. if err != nil {
  53. return nil, errors.Wrap(err, "ABCIInfo")
  54. }
  55. return result, nil
  56. }
  57. func (c *HTTP) ABCIQuery(path string, data data.Bytes, prove bool) (*ctypes.ResultABCIQuery, error) {
  58. result := new(ctypes.ResultABCIQuery)
  59. _, err := c.rpc.Call("abci_query",
  60. map[string]interface{}{"path": path, "data": data, "prove": prove},
  61. result)
  62. if err != nil {
  63. return nil, errors.Wrap(err, "ABCIQuery")
  64. }
  65. return result, nil
  66. }
  67. func (c *HTTP) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
  68. result := new(ctypes.ResultBroadcastTxCommit)
  69. _, err := c.rpc.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result)
  70. if err != nil {
  71. return nil, errors.Wrap(err, "broadcast_tx_commit")
  72. }
  73. return result, nil
  74. }
  75. func (c *HTTP) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  76. return c.broadcastTX("broadcast_tx_async", tx)
  77. }
  78. func (c *HTTP) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  79. return c.broadcastTX("broadcast_tx_sync", tx)
  80. }
  81. func (c *HTTP) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  82. result := new(ctypes.ResultBroadcastTx)
  83. _, err := c.rpc.Call(route, map[string]interface{}{"tx": tx}, result)
  84. if err != nil {
  85. return nil, errors.Wrap(err, route)
  86. }
  87. return result, nil
  88. }
  89. func (c *HTTP) NetInfo() (*ctypes.ResultNetInfo, error) {
  90. result := new(ctypes.ResultNetInfo)
  91. _, err := c.rpc.Call("net_info", map[string]interface{}{}, result)
  92. if err != nil {
  93. return nil, errors.Wrap(err, "NetInfo")
  94. }
  95. return result, nil
  96. }
  97. func (c *HTTP) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
  98. result := new(ctypes.ResultDumpConsensusState)
  99. _, err := c.rpc.Call("dump_consensus_state", map[string]interface{}{}, result)
  100. if err != nil {
  101. return nil, errors.Wrap(err, "DumpConsensusState")
  102. }
  103. return result, nil
  104. }
  105. func (c *HTTP) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) {
  106. result := new(ctypes.ResultBlockchainInfo)
  107. _, err := c.rpc.Call("blockchain",
  108. map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight},
  109. result)
  110. if err != nil {
  111. return nil, errors.Wrap(err, "BlockchainInfo")
  112. }
  113. return result, nil
  114. }
  115. func (c *HTTP) Genesis() (*ctypes.ResultGenesis, error) {
  116. result := new(ctypes.ResultGenesis)
  117. _, err := c.rpc.Call("genesis", map[string]interface{}{}, result)
  118. if err != nil {
  119. return nil, errors.Wrap(err, "Genesis")
  120. }
  121. return result, nil
  122. }
  123. func (c *HTTP) Block(height *int) (*ctypes.ResultBlock, error) {
  124. result := new(ctypes.ResultBlock)
  125. _, err := c.rpc.Call("block", map[string]interface{}{"height": height}, result)
  126. if err != nil {
  127. return nil, errors.Wrap(err, "Block")
  128. }
  129. return result, nil
  130. }
  131. func (c *HTTP) Commit(height *int) (*ctypes.ResultCommit, error) {
  132. result := new(ctypes.ResultCommit)
  133. _, err := c.rpc.Call("commit", map[string]interface{}{"height": height}, result)
  134. if err != nil {
  135. return nil, errors.Wrap(err, "Commit")
  136. }
  137. return result, nil
  138. }
  139. func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
  140. result := new(ctypes.ResultTx)
  141. query := map[string]interface{}{
  142. "hash": hash,
  143. "prove": prove,
  144. }
  145. _, err := c.rpc.Call("tx", query, result)
  146. if err != nil {
  147. return nil, errors.Wrap(err, "Tx")
  148. }
  149. return result, nil
  150. }
  151. func (c *HTTP) Validators(height *int) (*ctypes.ResultValidators, error) {
  152. result := new(ctypes.ResultValidators)
  153. _, err := c.rpc.Call("validators", map[string]interface{}{"height": height}, result)
  154. if err != nil {
  155. return nil, errors.Wrap(err, "Validators")
  156. }
  157. return result, nil
  158. }
  159. /** websocket event stuff here... **/
  160. type WSEvents struct {
  161. types.EventSwitch
  162. remote string
  163. endpoint string
  164. ws *rpcclient.WSClient
  165. // used for signaling the goroutine that feeds ws -> EventSwitch
  166. quit chan bool
  167. done chan bool
  168. // used to maintain counts of actively listened events
  169. // so we can properly subscribe/unsubscribe
  170. // FIXME: thread-safety???
  171. // FIXME: reuse code from tmlibs/events???
  172. evtCount map[string]int // count how many time each event is subscribed
  173. listeners map[string][]string // keep track of which events each listener is listening to
  174. }
  175. func newWSEvents(remote, endpoint string) *WSEvents {
  176. return &WSEvents{
  177. EventSwitch: types.NewEventSwitch(),
  178. endpoint: endpoint,
  179. remote: remote,
  180. quit: make(chan bool, 1),
  181. done: make(chan bool, 1),
  182. evtCount: map[string]int{},
  183. listeners: map[string][]string{},
  184. }
  185. }
  186. // Start is the only way I could think the extend OnStart from
  187. // events.eventSwitch. If only it wasn't private...
  188. // BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
  189. func (w *WSEvents) Start() (bool, error) {
  190. st, err := w.EventSwitch.Start()
  191. // if we did start, then OnStart here...
  192. if st && err == nil {
  193. ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
  194. w.redoSubscriptions()
  195. }))
  196. _, err = ws.Start()
  197. if err == nil {
  198. w.ws = ws
  199. go w.eventListener()
  200. }
  201. }
  202. return st, errors.Wrap(err, "StartWSEvent")
  203. }
  204. // Stop wraps the BaseService/eventSwitch actions as Start does
  205. func (w *WSEvents) Stop() bool {
  206. stop := w.EventSwitch.Stop()
  207. if stop {
  208. // send a message to quit to stop the eventListener
  209. w.quit <- true
  210. <-w.done
  211. w.ws.Stop()
  212. w.ws = nil
  213. }
  214. return stop
  215. }
  216. /** TODO: more intelligent subscriptions! **/
  217. func (w *WSEvents) AddListenerForEvent(listenerID, event string, cb events.EventCallback) {
  218. // no one listening -> subscribe
  219. if w.evtCount[event] == 0 {
  220. w.subscribe(event)
  221. }
  222. // if this listener was already listening to this event, return early
  223. for _, s := range w.listeners[listenerID] {
  224. if event == s {
  225. return
  226. }
  227. }
  228. // otherwise, add this event to this listener
  229. w.evtCount[event] += 1
  230. w.listeners[listenerID] = append(w.listeners[listenerID], event)
  231. w.EventSwitch.AddListenerForEvent(listenerID, event, cb)
  232. }
  233. func (w *WSEvents) RemoveListenerForEvent(event string, listenerID string) {
  234. // if this listener is listening already, splice it out
  235. found := false
  236. l := w.listeners[listenerID]
  237. for i, s := range l {
  238. if event == s {
  239. found = true
  240. w.listeners[listenerID] = append(l[:i], l[i+1:]...)
  241. break
  242. }
  243. }
  244. // if the listener wasn't already listening to the event, exit early
  245. if !found {
  246. return
  247. }
  248. // now we can update the subscriptions
  249. w.evtCount[event] -= 1
  250. if w.evtCount[event] == 0 {
  251. w.unsubscribe(event)
  252. }
  253. w.EventSwitch.RemoveListenerForEvent(event, listenerID)
  254. }
  255. func (w *WSEvents) RemoveListener(listenerID string) {
  256. // remove all counts for this listener
  257. for _, s := range w.listeners[listenerID] {
  258. w.evtCount[s] -= 1
  259. if w.evtCount[s] == 0 {
  260. w.unsubscribe(s)
  261. }
  262. }
  263. w.listeners[listenerID] = nil
  264. // then let the switch do it's magic
  265. w.EventSwitch.RemoveListener(listenerID)
  266. }
  267. // After being reconnected, it is necessary to redo subscription
  268. // to server otherwise no data will be automatically received
  269. func (w *WSEvents) redoSubscriptions() {
  270. for event, _ := range w.evtCount {
  271. w.subscribe(event)
  272. }
  273. }
  274. // eventListener is an infinite loop pulling all websocket events
  275. // and pushing them to the EventSwitch.
  276. //
  277. // the goroutine only stops by closing quit
  278. func (w *WSEvents) eventListener() {
  279. for {
  280. select {
  281. case res := <-w.ws.ResultsCh:
  282. // res is json.RawMessage
  283. err := w.parseEvent(res)
  284. if err != nil {
  285. // FIXME: better logging/handling of errors??
  286. fmt.Printf("ws result: %+v\n", err)
  287. }
  288. case err := <-w.ws.ErrorsCh:
  289. // FIXME: better logging/handling of errors??
  290. fmt.Printf("ws err: %+v\n", err)
  291. case <-w.quit:
  292. // send a message so we can wait for the routine to exit
  293. // before cleaning up the w.ws stuff
  294. w.done <- true
  295. return
  296. }
  297. }
  298. }
  299. // parseEvent unmarshals the json message and converts it into
  300. // some implementation of types.TMEventData, and sends it off
  301. // on the merry way to the EventSwitch
  302. func (w *WSEvents) parseEvent(data []byte) (err error) {
  303. result := new(ctypes.ResultEvent)
  304. err = json.Unmarshal(data, result)
  305. if err != nil {
  306. // ignore silently (eg. subscribe, unsubscribe and maybe other events)
  307. // TODO: ?
  308. return nil
  309. }
  310. // looks good! let's fire this baby!
  311. w.EventSwitch.FireEvent(result.Name, result.Data)
  312. return nil
  313. }
  314. // no way of exposing these failures, so we panic.
  315. // is this right? or silently ignore???
  316. func (w *WSEvents) subscribe(event string) {
  317. err := w.ws.Subscribe(context.TODO(), event)
  318. if err != nil {
  319. panic(err)
  320. }
  321. }
  322. func (w *WSEvents) unsubscribe(event string) {
  323. err := w.ws.Unsubscribe(context.TODO(), event)
  324. if err != nil {
  325. panic(err)
  326. }
  327. }