You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

304 lines
7.2 KiB

  1. package tmspcli
  2. import (
  3. "bufio"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "sync"
  9. . "github.com/tendermint/go-common"
  10. "github.com/tendermint/tmsp/types"
  11. )
  12. const maxResponseSize = 1048576 // 1MB TODO make configurable
  13. const flushThrottleMS = 20 // Don't wait longer than...
  14. type Callback func(*types.Request, *types.Response)
  15. // This is goroutine-safe, but users should beware that
  16. // the application in general is not meant to be interfaced
  17. // with concurrent callers.
  18. type TMSPClient struct {
  19. QuitService
  20. sync.Mutex // [EB]: is this even used?
  21. reqQueue chan *reqRes
  22. flushTimer *ThrottleTimer
  23. mtx sync.Mutex
  24. conn net.Conn
  25. bufWriter *bufio.Writer
  26. err error
  27. reqSent *list.List
  28. resCb func(*types.Request, *types.Response)
  29. }
  30. func NewTMSPClient(conn net.Conn, bufferSize int) *TMSPClient {
  31. cli := &TMSPClient{
  32. reqQueue: make(chan *reqRes, bufferSize),
  33. flushTimer: NewThrottleTimer("TMSPClient", flushThrottleMS),
  34. conn: conn,
  35. bufWriter: bufio.NewWriter(conn),
  36. reqSent: list.New(),
  37. resCb: nil,
  38. }
  39. cli.QuitService = *NewQuitService(nil, "TMSPClient", cli)
  40. cli.Start() // Just start it, it's confusing for callers to remember to start.
  41. return cli
  42. }
  43. func (cli *TMSPClient) OnStart() error {
  44. cli.QuitService.OnStart()
  45. go cli.sendRequestsRoutine()
  46. go cli.recvResponseRoutine()
  47. return nil
  48. }
  49. func (cli *TMSPClient) OnStop() {
  50. cli.QuitService.OnStop()
  51. cli.conn.Close()
  52. }
  53. // NOTE: callback may get internally generated flush responses.
  54. func (cli *TMSPClient) SetResponseCallback(resCb Callback) {
  55. cli.mtx.Lock()
  56. defer cli.mtx.Unlock()
  57. cli.resCb = resCb
  58. }
  59. func (cli *TMSPClient) StopForError(err error) {
  60. cli.mtx.Lock()
  61. // log.Error("Stopping TMSPClient for error.", "error", err)
  62. if cli.err == nil {
  63. cli.err = err
  64. }
  65. cli.mtx.Unlock()
  66. cli.Stop()
  67. }
  68. func (cli *TMSPClient) Error() error {
  69. cli.mtx.Lock()
  70. defer cli.mtx.Unlock()
  71. return cli.err
  72. }
  73. //----------------------------------------
  74. func (cli *TMSPClient) sendRequestsRoutine() {
  75. for {
  76. select {
  77. case <-cli.flushTimer.Ch:
  78. select {
  79. case cli.reqQueue <- newReqRes(types.RequestFlush()):
  80. default:
  81. // Probably will fill the buffer, or retry later.
  82. }
  83. case <-cli.QuitService.Quit:
  84. return
  85. case reqres := <-cli.reqQueue:
  86. cli.willSendReq(reqres)
  87. err := types.WriteMessage(reqres.Request, cli.bufWriter)
  88. if err != nil {
  89. cli.StopForError(err)
  90. return
  91. }
  92. // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
  93. if reqres.Request.Type == types.MessageType_Flush {
  94. err = cli.bufWriter.Flush()
  95. if err != nil {
  96. cli.StopForError(err)
  97. return
  98. }
  99. }
  100. }
  101. }
  102. }
  103. func (cli *TMSPClient) recvResponseRoutine() {
  104. r := bufio.NewReader(cli.conn) // Buffer reads
  105. for {
  106. var res = &types.Response{}
  107. err := types.ReadMessage(r, res)
  108. if err != nil {
  109. cli.StopForError(err)
  110. return
  111. }
  112. switch res.Type {
  113. case types.MessageType_Exception:
  114. // XXX After setting cli.err, release waiters (e.g. reqres.Done())
  115. cli.StopForError(errors.New(res.Error))
  116. default:
  117. // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  118. err := cli.didRecvResponse(res)
  119. if err != nil {
  120. cli.StopForError(err)
  121. }
  122. }
  123. }
  124. }
  125. func (cli *TMSPClient) willSendReq(reqres *reqRes) {
  126. cli.mtx.Lock()
  127. defer cli.mtx.Unlock()
  128. cli.reqSent.PushBack(reqres)
  129. }
  130. func (cli *TMSPClient) didRecvResponse(res *types.Response) error {
  131. cli.mtx.Lock()
  132. defer cli.mtx.Unlock()
  133. // Get the first reqRes
  134. next := cli.reqSent.Front()
  135. if next == nil {
  136. return fmt.Errorf("Unexpected result type %v when nothing expected", res.Type)
  137. }
  138. reqres := next.Value.(*reqRes)
  139. if !resMatchesReq(reqres.Request, res) {
  140. return fmt.Errorf("Unexpected result type %v when response to %v expected",
  141. res.Type, reqres.Request.Type)
  142. }
  143. reqres.Response = res // Set response
  144. reqres.Done() // Release waiters
  145. cli.reqSent.Remove(next) // Pop first item from linked list
  146. // Callback if there is a listener
  147. if cli.resCb != nil {
  148. cli.resCb(reqres.Request, res)
  149. }
  150. return nil
  151. }
  152. //----------------------------------------
  153. func (cli *TMSPClient) EchoAsync(msg string) {
  154. cli.queueRequest(types.RequestEcho(msg))
  155. }
  156. func (cli *TMSPClient) FlushAsync() {
  157. cli.queueRequest(types.RequestFlush())
  158. }
  159. func (cli *TMSPClient) SetOptionAsync(key string, value string) {
  160. cli.queueRequest(types.RequestSetOption(key, value))
  161. }
  162. func (cli *TMSPClient) AppendTxAsync(tx []byte) {
  163. cli.queueRequest(types.RequestAppendTx(tx))
  164. }
  165. func (cli *TMSPClient) CheckTxAsync(tx []byte) {
  166. cli.queueRequest(types.RequestCheckTx(tx))
  167. }
  168. func (cli *TMSPClient) GetHashAsync() {
  169. cli.queueRequest(types.RequestGetHash())
  170. }
  171. func (cli *TMSPClient) QueryAsync(query []byte) {
  172. cli.queueRequest(types.RequestQuery(query))
  173. }
  174. //----------------------------------------
  175. func (cli *TMSPClient) InfoSync() (info string, err error) {
  176. reqres := cli.queueRequest(types.RequestInfo())
  177. cli.FlushSync()
  178. if cli.err != nil {
  179. return "", cli.err
  180. }
  181. return string(reqres.Response.Data), nil
  182. }
  183. func (cli *TMSPClient) FlushSync() error {
  184. cli.queueRequest(types.RequestFlush()).Wait()
  185. return cli.err
  186. }
  187. func (cli *TMSPClient) AppendTxSync(tx []byte) (code types.RetCode, result []byte, log string, err error) {
  188. reqres := cli.queueRequest(types.RequestAppendTx(tx))
  189. cli.FlushSync()
  190. if cli.err != nil {
  191. return types.RetCodeInternalError, nil, "", cli.err
  192. }
  193. res := reqres.Response
  194. return types.RetCode(res.Code), res.Data, res.Log, nil
  195. }
  196. func (cli *TMSPClient) CheckTxSync(tx []byte) (code types.RetCode, result []byte, log string, err error) {
  197. reqres := cli.queueRequest(types.RequestCheckTx(tx))
  198. cli.FlushSync()
  199. if cli.err != nil {
  200. return types.RetCodeInternalError, nil, "", cli.err
  201. }
  202. res := reqres.Response
  203. return types.RetCode(res.Code), res.Data, res.Log, nil
  204. }
  205. func (cli *TMSPClient) GetHashSync() (hash []byte, log string, err error) {
  206. reqres := cli.queueRequest(types.RequestGetHash())
  207. cli.FlushSync()
  208. if cli.err != nil {
  209. return nil, "", cli.err
  210. }
  211. res := reqres.Response
  212. return res.Data, res.Log, nil
  213. }
  214. func (cli *TMSPClient) QuerySync(query []byte) (code types.RetCode, result []byte, log string, err error) {
  215. reqres := cli.queueRequest(types.RequestQuery(query))
  216. cli.FlushSync()
  217. if cli.err != nil {
  218. return types.RetCodeInternalError, nil, "", cli.err
  219. }
  220. res := reqres.Response
  221. return types.RetCode(res.Code), res.Data, res.Log, nil
  222. }
  223. //----------------------------------------
  224. func (cli *TMSPClient) queueRequest(req *types.Request) *reqRes {
  225. reqres := newReqRes(req)
  226. // TODO: set cli.err if reqQueue times out
  227. cli.reqQueue <- reqres
  228. // Maybe auto-flush, or unset auto-flush
  229. switch req.Type {
  230. case types.MessageType_Flush:
  231. cli.flushTimer.Unset()
  232. default:
  233. cli.flushTimer.Set()
  234. }
  235. return reqres
  236. }
  237. //----------------------------------------
  238. func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
  239. return req.Type == res.Type
  240. }
  241. type reqRes struct {
  242. *types.Request
  243. *sync.WaitGroup
  244. *types.Response // Not set atomically, so be sure to use WaitGroup.
  245. }
  246. func newReqRes(req *types.Request) *reqRes {
  247. return &reqRes{
  248. Request: req,
  249. WaitGroup: waitGroup1(),
  250. Response: nil,
  251. }
  252. }
  253. func waitGroup1() (wg *sync.WaitGroup) {
  254. wg = &sync.WaitGroup{}
  255. wg.Add(1)
  256. return
  257. }