You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

362 lines
8.5 KiB

  1. package tmspcli
  2. import (
  3. "bufio"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "sync"
  9. . "github.com/tendermint/go-common"
  10. "github.com/tendermint/tmsp/types"
  11. )
  12. const reqQueueSize = 256 // TODO make configurable
  13. const maxResponseSize = 1048576 // 1MB TODO make configurable
  14. const flushThrottleMS = 20 // Don't wait longer than...
  15. type Callback func(*types.Request, *types.Response)
  16. // This is goroutine-safe, but users should beware that
  17. // the application in general is not meant to be interfaced
  18. // with concurrent callers.
  19. type Client struct {
  20. QuitService
  21. sync.Mutex // [EB]: is this even used?
  22. reqQueue chan *ReqRes
  23. flushTimer *ThrottleTimer
  24. mtx sync.Mutex
  25. addr string
  26. conn net.Conn
  27. bufWriter *bufio.Writer
  28. err error
  29. reqSent *list.List
  30. resCb func(*types.Request, *types.Response) // listens to all callbacks
  31. }
  32. func NewClient(addr string) (*Client, error) {
  33. conn, err := Connect(addr)
  34. if err != nil {
  35. return nil, err
  36. }
  37. cli := &Client{
  38. reqQueue: make(chan *ReqRes, reqQueueSize),
  39. flushTimer: NewThrottleTimer("Client", flushThrottleMS),
  40. conn: conn,
  41. bufWriter: bufio.NewWriter(conn),
  42. reqSent: list.New(),
  43. resCb: nil,
  44. }
  45. cli.QuitService = *NewQuitService(nil, "Client", cli)
  46. cli.Start() // Just start it, it's confusing for callers to remember to start.
  47. return cli, nil
  48. }
  49. func (cli *Client) OnStart() error {
  50. cli.QuitService.OnStart()
  51. go cli.sendRequestsRoutine()
  52. go cli.recvResponseRoutine()
  53. return nil
  54. }
  55. func (cli *Client) OnStop() {
  56. cli.QuitService.OnStop()
  57. cli.conn.Close()
  58. }
  59. // Set listener for all responses
  60. // NOTE: callback may get internally generated flush responses.
  61. func (cli *Client) SetResponseCallback(resCb Callback) {
  62. cli.mtx.Lock()
  63. defer cli.mtx.Unlock()
  64. cli.resCb = resCb
  65. }
  66. func (cli *Client) StopForError(err error) {
  67. cli.mtx.Lock()
  68. // log.Error("Stopping Client for error.", "error", err)
  69. if cli.err == nil {
  70. cli.err = err
  71. }
  72. cli.mtx.Unlock()
  73. cli.Stop()
  74. }
  75. func (cli *Client) Error() error {
  76. cli.mtx.Lock()
  77. defer cli.mtx.Unlock()
  78. return cli.err
  79. }
  80. //----------------------------------------
  81. func (cli *Client) sendRequestsRoutine() {
  82. for {
  83. select {
  84. case <-cli.flushTimer.Ch:
  85. select {
  86. case cli.reqQueue <- newReqRes(types.RequestFlush()):
  87. default:
  88. // Probably will fill the buffer, or retry later.
  89. }
  90. case <-cli.QuitService.Quit:
  91. return
  92. case reqres := <-cli.reqQueue:
  93. cli.willSendReq(reqres)
  94. err := types.WriteMessage(reqres.Request, cli.bufWriter)
  95. if err != nil {
  96. cli.StopForError(err)
  97. return
  98. }
  99. // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
  100. if reqres.Request.Type == types.MessageType_Flush {
  101. err = cli.bufWriter.Flush()
  102. if err != nil {
  103. cli.StopForError(err)
  104. return
  105. }
  106. }
  107. }
  108. }
  109. }
  110. func (cli *Client) recvResponseRoutine() {
  111. r := bufio.NewReader(cli.conn) // Buffer reads
  112. for {
  113. var res = &types.Response{}
  114. err := types.ReadMessage(r, res)
  115. if err != nil {
  116. cli.StopForError(err)
  117. return
  118. }
  119. switch res.Type {
  120. case types.MessageType_Exception:
  121. // XXX After setting cli.err, release waiters (e.g. reqres.Done())
  122. cli.StopForError(errors.New(res.Error))
  123. default:
  124. // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  125. err := cli.didRecvResponse(res)
  126. if err != nil {
  127. cli.StopForError(err)
  128. }
  129. }
  130. }
  131. }
  132. func (cli *Client) willSendReq(reqres *ReqRes) {
  133. cli.mtx.Lock()
  134. defer cli.mtx.Unlock()
  135. cli.reqSent.PushBack(reqres)
  136. }
  137. func (cli *Client) didRecvResponse(res *types.Response) error {
  138. cli.mtx.Lock()
  139. defer cli.mtx.Unlock()
  140. // Get the first ReqRes
  141. next := cli.reqSent.Front()
  142. if next == nil {
  143. return fmt.Errorf("Unexpected result type %v when nothing expected", res.Type)
  144. }
  145. reqres := next.Value.(*ReqRes)
  146. if !resMatchesReq(reqres.Request, res) {
  147. return fmt.Errorf("Unexpected result type %v when response to %v expected",
  148. res.Type, reqres.Request.Type)
  149. }
  150. reqres.Response = res // Set response
  151. reqres.Done() // Release waiters
  152. cli.reqSent.Remove(next) // Pop first item from linked list
  153. // Notify reqRes listener if set
  154. if cb := reqres.GetCallback(); cb != nil {
  155. cb(res)
  156. }
  157. // Notify client listener if set
  158. if cli.resCb != nil {
  159. cli.resCb(reqres.Request, res)
  160. }
  161. return nil
  162. }
  163. //----------------------------------------
  164. func (cli *Client) EchoAsync(msg string) *ReqRes {
  165. return cli.queueRequest(types.RequestEcho(msg))
  166. }
  167. func (cli *Client) FlushAsync() *ReqRes {
  168. return cli.queueRequest(types.RequestFlush())
  169. }
  170. func (cli *Client) SetOptionAsync(key string, value string) *ReqRes {
  171. return cli.queueRequest(types.RequestSetOption(key, value))
  172. }
  173. func (cli *Client) AppendTxAsync(tx []byte) *ReqRes {
  174. return cli.queueRequest(types.RequestAppendTx(tx))
  175. }
  176. func (cli *Client) CheckTxAsync(tx []byte) *ReqRes {
  177. return cli.queueRequest(types.RequestCheckTx(tx))
  178. }
  179. func (cli *Client) CommitAsync() *ReqRes {
  180. return cli.queueRequest(types.RequestCommit())
  181. }
  182. func (cli *Client) QueryAsync(query []byte) *ReqRes {
  183. return cli.queueRequest(types.RequestQuery(query))
  184. }
  185. //----------------------------------------
  186. func (cli *Client) FlushSync() error {
  187. cli.queueRequest(types.RequestFlush()).Wait()
  188. return cli.err
  189. }
  190. func (cli *Client) InfoSync() (info string, err error) {
  191. reqres := cli.queueRequest(types.RequestInfo())
  192. cli.FlushSync()
  193. if cli.err != nil {
  194. return "", cli.err
  195. }
  196. return string(reqres.Response.Data), nil
  197. }
  198. func (cli *Client) SetOptionSync(key string, value string) (log string, err error) {
  199. reqres := cli.queueRequest(types.RequestSetOption(key, value))
  200. cli.FlushSync()
  201. if cli.err != nil {
  202. return "", cli.err
  203. }
  204. return reqres.Response.Log, nil
  205. }
  206. func (cli *Client) AppendTxSync(tx []byte) (code types.CodeType, result []byte, log string, err error) {
  207. reqres := cli.queueRequest(types.RequestAppendTx(tx))
  208. cli.FlushSync()
  209. if cli.err != nil {
  210. return types.CodeType_InternalError, nil, "", cli.err
  211. }
  212. res := reqres.Response
  213. return res.Code, res.Data, res.Log, nil
  214. }
  215. func (cli *Client) CheckTxSync(tx []byte) (code types.CodeType, result []byte, log string, err error) {
  216. reqres := cli.queueRequest(types.RequestCheckTx(tx))
  217. cli.FlushSync()
  218. if cli.err != nil {
  219. return types.CodeType_InternalError, nil, "", cli.err
  220. }
  221. res := reqres.Response
  222. return res.Code, res.Data, res.Log, nil
  223. }
  224. func (cli *Client) CommitSync() (hash []byte, log string, err error) {
  225. reqres := cli.queueRequest(types.RequestCommit())
  226. cli.FlushSync()
  227. if cli.err != nil {
  228. return nil, "", cli.err
  229. }
  230. res := reqres.Response
  231. return res.Data, res.Log, nil
  232. }
  233. func (cli *Client) QuerySync(query []byte) (code types.CodeType, result []byte, log string, err error) {
  234. reqres := cli.queueRequest(types.RequestQuery(query))
  235. cli.FlushSync()
  236. if cli.err != nil {
  237. return types.CodeType_InternalError, nil, "", cli.err
  238. }
  239. res := reqres.Response
  240. return res.Code, res.Data, res.Log, nil
  241. }
  242. //----------------------------------------
  243. func (cli *Client) queueRequest(req *types.Request) *ReqRes {
  244. reqres := newReqRes(req)
  245. // TODO: set cli.err if reqQueue times out
  246. cli.reqQueue <- reqres
  247. // Maybe auto-flush, or unset auto-flush
  248. switch req.Type {
  249. case types.MessageType_Flush:
  250. cli.flushTimer.Unset()
  251. default:
  252. cli.flushTimer.Set()
  253. }
  254. return reqres
  255. }
  256. //----------------------------------------
  257. func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
  258. return req.Type == res.Type
  259. }
  260. type ReqRes struct {
  261. *types.Request
  262. *sync.WaitGroup
  263. *types.Response // Not set atomically, so be sure to use WaitGroup.
  264. mtx sync.Mutex
  265. done bool // Gets set to true once *after* WaitGroup.Done().
  266. cb func(*types.Response) // A single callback that may be set.
  267. }
  268. func newReqRes(req *types.Request) *ReqRes {
  269. return &ReqRes{
  270. Request: req,
  271. WaitGroup: waitGroup1(),
  272. Response: nil,
  273. done: false,
  274. cb: nil,
  275. }
  276. }
  277. // Sets the callback for this ReqRes atomically.
  278. // If reqRes is already done, calls cb immediately.
  279. // NOTE: reqRes.cb should not change if reqRes.done.
  280. // NOTE: only one callback is supported.
  281. func (reqRes *ReqRes) SetCallback(cb func(res *types.Response)) {
  282. reqRes.mtx.Lock()
  283. if reqRes.done {
  284. reqRes.mtx.Unlock()
  285. cb(reqRes.Response)
  286. return
  287. }
  288. defer reqRes.mtx.Unlock()
  289. reqRes.cb = cb
  290. }
  291. func (reqRes *ReqRes) GetCallback() func(*types.Response) {
  292. reqRes.mtx.Lock()
  293. defer reqRes.mtx.Unlock()
  294. return reqRes.cb
  295. }
  296. // NOTE: it should be safe to read reqRes.cb without locks after this.
  297. func (reqRes *ReqRes) SetDone() {
  298. reqRes.mtx.Lock()
  299. reqRes.done = true
  300. reqRes.mtx.Unlock()
  301. }
  302. func waitGroup1() (wg *sync.WaitGroup) {
  303. wg = &sync.WaitGroup{}
  304. wg.Add(1)
  305. return
  306. }