You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

370 lines
9.3 KiB

  1. package tmspcli
  2. import (
  3. "bufio"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "sync"
  9. "time"
  10. . "github.com/tendermint/go-common"
  11. "github.com/tendermint/tmsp/types"
  12. )
  13. const reqQueueSize = 256 // TODO make configurable
  14. const maxResponseSize = 1048576 // 1MB TODO make configurable
  15. const flushThrottleMS = 20 // Don't wait longer than...
  16. // This is goroutine-safe, but users should beware that
  17. // the application in general is not meant to be interfaced
  18. // with concurrent callers.
  19. type remoteClient struct {
  20. QuitService
  21. sync.Mutex // [EB]: is this even used?
  22. reqQueue chan *ReqRes
  23. flushTimer *ThrottleTimer
  24. mustConnect bool
  25. mtx sync.Mutex
  26. addr string
  27. conn net.Conn
  28. err error
  29. reqSent *list.List
  30. resCb func(*types.Request, *types.Response) // listens to all callbacks
  31. }
  32. func NewClient(addr string, mustConnect bool) (*remoteClient, error) {
  33. cli := &remoteClient{
  34. reqQueue: make(chan *ReqRes, reqQueueSize),
  35. flushTimer: NewThrottleTimer("remoteClient", flushThrottleMS),
  36. mustConnect: mustConnect,
  37. addr: addr,
  38. reqSent: list.New(),
  39. resCb: nil,
  40. }
  41. cli.QuitService = *NewQuitService(nil, "remoteClient", cli)
  42. _, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
  43. if mustConnect {
  44. return nil, err
  45. } else {
  46. return cli, nil
  47. }
  48. }
  49. func (cli *remoteClient) OnStart() (err error) {
  50. cli.QuitService.OnStart()
  51. doneCh := make(chan struct{})
  52. go func() {
  53. RETRY_LOOP:
  54. for {
  55. conn, err_ := Connect(cli.addr)
  56. if err_ != nil {
  57. if cli.mustConnect {
  58. err = err_ // OnStart() will return this.
  59. close(doneCh)
  60. return
  61. } else {
  62. fmt.Printf("tmsp.remoteClient failed to connect to %v. Retrying...\n", cli.addr)
  63. time.Sleep(time.Second * 3)
  64. continue RETRY_LOOP
  65. }
  66. }
  67. go cli.sendRequestsRoutine(conn)
  68. go cli.recvResponseRoutine(conn)
  69. close(doneCh) // OnStart() will return no error.
  70. return
  71. }
  72. }()
  73. <-doneCh
  74. return // err
  75. }
  76. func (cli *remoteClient) OnStop() {
  77. cli.QuitService.OnStop()
  78. if cli.conn != nil {
  79. cli.conn.Close()
  80. }
  81. }
  82. // Set listener for all responses
  83. // NOTE: callback may get internally generated flush responses.
  84. func (cli *remoteClient) SetResponseCallback(resCb Callback) {
  85. cli.mtx.Lock()
  86. defer cli.mtx.Unlock()
  87. cli.resCb = resCb
  88. }
  89. func (cli *remoteClient) StopForError(err error) {
  90. cli.mtx.Lock()
  91. fmt.Printf("Stopping tmsp.remoteClient for error: %v\n", err.Error())
  92. if cli.err == nil {
  93. cli.err = err
  94. }
  95. cli.mtx.Unlock()
  96. cli.Stop()
  97. }
  98. func (cli *remoteClient) Error() error {
  99. cli.mtx.Lock()
  100. defer cli.mtx.Unlock()
  101. return cli.err
  102. }
  103. //----------------------------------------
  104. func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) {
  105. w := bufio.NewWriter(conn)
  106. for {
  107. select {
  108. case <-cli.flushTimer.Ch:
  109. select {
  110. case cli.reqQueue <- NewReqRes(types.RequestFlush()):
  111. default:
  112. // Probably will fill the buffer, or retry later.
  113. }
  114. case <-cli.QuitService.Quit:
  115. return
  116. case reqres := <-cli.reqQueue:
  117. cli.willSendReq(reqres)
  118. err := types.WriteMessage(reqres.Request, w)
  119. if err != nil {
  120. cli.StopForError(err)
  121. return
  122. }
  123. // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
  124. if reqres.Request.Type == types.MessageType_Flush {
  125. err = w.Flush()
  126. if err != nil {
  127. cli.StopForError(err)
  128. return
  129. }
  130. }
  131. }
  132. }
  133. }
  134. func (cli *remoteClient) recvResponseRoutine(conn net.Conn) {
  135. r := bufio.NewReader(conn) // Buffer reads
  136. for {
  137. var res = &types.Response{}
  138. err := types.ReadMessage(r, res)
  139. if err != nil {
  140. cli.StopForError(err)
  141. return
  142. }
  143. switch res.Type {
  144. case types.MessageType_Exception:
  145. // XXX After setting cli.err, release waiters (e.g. reqres.Done())
  146. cli.StopForError(errors.New(res.Error))
  147. default:
  148. // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  149. err := cli.didRecvResponse(res)
  150. if err != nil {
  151. cli.StopForError(err)
  152. }
  153. }
  154. }
  155. }
  156. func (cli *remoteClient) willSendReq(reqres *ReqRes) {
  157. cli.mtx.Lock()
  158. defer cli.mtx.Unlock()
  159. cli.reqSent.PushBack(reqres)
  160. }
  161. func (cli *remoteClient) didRecvResponse(res *types.Response) error {
  162. cli.mtx.Lock()
  163. defer cli.mtx.Unlock()
  164. // Get the first ReqRes
  165. next := cli.reqSent.Front()
  166. if next == nil {
  167. return fmt.Errorf("Unexpected result type %v when nothing expected", res.Type)
  168. }
  169. reqres := next.Value.(*ReqRes)
  170. if !resMatchesReq(reqres.Request, res) {
  171. return fmt.Errorf("Unexpected result type %v when response to %v expected",
  172. res.Type, reqres.Request.Type)
  173. }
  174. reqres.Response = res // Set response
  175. reqres.Done() // Release waiters
  176. cli.reqSent.Remove(next) // Pop first item from linked list
  177. // Notify reqRes listener if set
  178. if cb := reqres.GetCallback(); cb != nil {
  179. cb(res)
  180. }
  181. // Notify client listener if set
  182. if cli.resCb != nil {
  183. cli.resCb(reqres.Request, res)
  184. }
  185. return nil
  186. }
  187. //----------------------------------------
  188. func (cli *remoteClient) EchoAsync(msg string) *ReqRes {
  189. return cli.queueRequest(types.RequestEcho(msg))
  190. }
  191. func (cli *remoteClient) FlushAsync() *ReqRes {
  192. return cli.queueRequest(types.RequestFlush())
  193. }
  194. func (cli *remoteClient) InfoAsync() *ReqRes {
  195. return cli.queueRequest(types.RequestInfo())
  196. }
  197. func (cli *remoteClient) SetOptionAsync(key string, value string) *ReqRes {
  198. return cli.queueRequest(types.RequestSetOption(key, value))
  199. }
  200. func (cli *remoteClient) AppendTxAsync(tx []byte) *ReqRes {
  201. return cli.queueRequest(types.RequestAppendTx(tx))
  202. }
  203. func (cli *remoteClient) CheckTxAsync(tx []byte) *ReqRes {
  204. return cli.queueRequest(types.RequestCheckTx(tx))
  205. }
  206. func (cli *remoteClient) QueryAsync(query []byte) *ReqRes {
  207. return cli.queueRequest(types.RequestQuery(query))
  208. }
  209. func (cli *remoteClient) CommitAsync() *ReqRes {
  210. return cli.queueRequest(types.RequestCommit())
  211. }
  212. func (cli *remoteClient) InitChainAsync(validators []*types.Validator) *ReqRes {
  213. return cli.queueRequest(types.RequestInitChain(validators))
  214. }
  215. func (cli *remoteClient) EndBlockAsync(height uint64) *ReqRes {
  216. return cli.queueRequest(types.RequestEndBlock(height))
  217. }
  218. //----------------------------------------
  219. func (cli *remoteClient) EchoSync(msg string) (res types.Result) {
  220. reqres := cli.queueRequest(types.RequestEcho(msg))
  221. cli.FlushSync()
  222. if cli.err != nil {
  223. return types.ErrInternalError.SetLog(cli.err.Error())
  224. }
  225. resp := reqres.Response
  226. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  227. }
  228. func (cli *remoteClient) FlushSync() error {
  229. cli.queueRequest(types.RequestFlush()).Wait()
  230. return cli.err
  231. }
  232. func (cli *remoteClient) InfoSync() (res types.Result) {
  233. reqres := cli.queueRequest(types.RequestInfo())
  234. cli.FlushSync()
  235. if cli.err != nil {
  236. return types.ErrInternalError.SetLog(cli.err.Error())
  237. }
  238. resp := reqres.Response
  239. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  240. }
  241. func (cli *remoteClient) SetOptionSync(key string, value string) (res types.Result) {
  242. reqres := cli.queueRequest(types.RequestSetOption(key, value))
  243. cli.FlushSync()
  244. if cli.err != nil {
  245. return types.ErrInternalError.SetLog(cli.err.Error())
  246. }
  247. resp := reqres.Response
  248. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  249. }
  250. func (cli *remoteClient) AppendTxSync(tx []byte) (res types.Result) {
  251. reqres := cli.queueRequest(types.RequestAppendTx(tx))
  252. cli.FlushSync()
  253. if cli.err != nil {
  254. return types.ErrInternalError.SetLog(cli.err.Error())
  255. }
  256. resp := reqres.Response
  257. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  258. }
  259. func (cli *remoteClient) CheckTxSync(tx []byte) (res types.Result) {
  260. reqres := cli.queueRequest(types.RequestCheckTx(tx))
  261. cli.FlushSync()
  262. if cli.err != nil {
  263. return types.ErrInternalError.SetLog(cli.err.Error())
  264. }
  265. resp := reqres.Response
  266. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  267. }
  268. func (cli *remoteClient) QuerySync(query []byte) (res types.Result) {
  269. reqres := cli.queueRequest(types.RequestQuery(query))
  270. cli.FlushSync()
  271. if cli.err != nil {
  272. return types.ErrInternalError.SetLog(cli.err.Error())
  273. }
  274. resp := reqres.Response
  275. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  276. }
  277. func (cli *remoteClient) CommitSync() (res types.Result) {
  278. reqres := cli.queueRequest(types.RequestCommit())
  279. cli.FlushSync()
  280. if cli.err != nil {
  281. return types.ErrInternalError.SetLog(cli.err.Error())
  282. }
  283. resp := reqres.Response
  284. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  285. }
  286. func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error) {
  287. cli.queueRequest(types.RequestInitChain(validators))
  288. cli.FlushSync()
  289. if cli.err != nil {
  290. return cli.err
  291. }
  292. return nil
  293. }
  294. func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) {
  295. reqres := cli.queueRequest(types.RequestEndBlock(height))
  296. cli.FlushSync()
  297. if cli.err != nil {
  298. return nil, cli.err
  299. }
  300. return reqres.Response.Validators, nil
  301. }
  302. //----------------------------------------
  303. func (cli *remoteClient) queueRequest(req *types.Request) *ReqRes {
  304. reqres := NewReqRes(req)
  305. // TODO: set cli.err if reqQueue times out
  306. cli.reqQueue <- reqres
  307. // Maybe auto-flush, or unset auto-flush
  308. switch req.Type {
  309. case types.MessageType_Flush:
  310. cli.flushTimer.Unset()
  311. default:
  312. cli.flushTimer.Set()
  313. }
  314. return reqres
  315. }
  316. //----------------------------------------
  317. func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
  318. return req.Type == res.Type
  319. }