You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

401 lines
10 KiB

8 years ago
8 years ago
8 years ago
8 years ago
  1. package tmspcli
  2. import (
  3. "bufio"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "reflect"
  9. "sync"
  10. "time"
  11. . "github.com/tendermint/go-common"
  12. "github.com/tendermint/tmsp/types"
  13. )
  14. const (
  15. OK = types.CodeType_OK
  16. LOG = ""
  17. )
  18. const reqQueueSize = 256 // TODO make configurable
  19. const maxResponseSize = 1048576 // 1MB TODO make configurable
  20. const flushThrottleMS = 20 // Don't wait longer than...
  21. // This is goroutine-safe, but users should beware that
  22. // the application in general is not meant to be interfaced
  23. // with concurrent callers.
  24. type remoteClient struct {
  25. QuitService
  26. sync.Mutex // [EB]: is this even used?
  27. reqQueue chan *ReqRes
  28. flushTimer *ThrottleTimer
  29. mustConnect bool
  30. mtx sync.Mutex
  31. addr string
  32. conn net.Conn
  33. err error
  34. reqSent *list.List
  35. resCb func(*types.Request, *types.Response) // listens to all callbacks
  36. }
  37. func NewSocketClient(addr string, mustConnect bool) (*remoteClient, error) {
  38. cli := &remoteClient{
  39. reqQueue: make(chan *ReqRes, reqQueueSize),
  40. flushTimer: NewThrottleTimer("remoteClient", flushThrottleMS),
  41. mustConnect: mustConnect,
  42. addr: addr,
  43. reqSent: list.New(),
  44. resCb: nil,
  45. }
  46. cli.QuitService = *NewQuitService(nil, "remoteClient", cli)
  47. _, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
  48. return cli, err
  49. }
  50. func (cli *remoteClient) OnStart() error {
  51. cli.QuitService.OnStart()
  52. RETRY_LOOP:
  53. for {
  54. conn, err := Connect(cli.addr)
  55. if err != nil {
  56. if cli.mustConnect {
  57. return err
  58. } else {
  59. fmt.Printf("tmsp.remoteClient failed to connect to %v. Retrying...\n", cli.addr)
  60. time.Sleep(time.Second * 3)
  61. continue RETRY_LOOP
  62. }
  63. }
  64. go cli.sendRequestsRoutine(conn)
  65. go cli.recvResponseRoutine(conn)
  66. return err
  67. }
  68. return nil // never happens
  69. }
  70. func (cli *remoteClient) OnStop() {
  71. cli.QuitService.OnStop()
  72. if cli.conn != nil {
  73. cli.conn.Close()
  74. }
  75. }
  76. // Set listener for all responses
  77. // NOTE: callback may get internally generated flush responses.
  78. func (cli *remoteClient) SetResponseCallback(resCb Callback) {
  79. cli.mtx.Lock()
  80. defer cli.mtx.Unlock()
  81. cli.resCb = resCb
  82. }
  83. func (cli *remoteClient) StopForError(err error) {
  84. cli.mtx.Lock()
  85. fmt.Printf("Stopping tmsp.remoteClient for error: %v\n", err.Error())
  86. if cli.err == nil {
  87. cli.err = err
  88. }
  89. cli.mtx.Unlock()
  90. cli.Stop()
  91. }
  92. func (cli *remoteClient) Error() error {
  93. cli.mtx.Lock()
  94. defer cli.mtx.Unlock()
  95. return cli.err
  96. }
  97. //----------------------------------------
  98. func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) {
  99. w := bufio.NewWriter(conn)
  100. for {
  101. select {
  102. case <-cli.flushTimer.Ch:
  103. select {
  104. case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): // cant this block ?
  105. default:
  106. // Probably will fill the buffer, or retry later.
  107. }
  108. case <-cli.QuitService.Quit:
  109. return
  110. case reqres := <-cli.reqQueue:
  111. cli.willSendReq(reqres)
  112. err := types.WriteMessage(reqres.Request, w)
  113. if err != nil {
  114. cli.StopForError(err)
  115. return
  116. }
  117. // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
  118. if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
  119. err = w.Flush()
  120. if err != nil {
  121. cli.StopForError(err)
  122. return
  123. }
  124. }
  125. }
  126. }
  127. }
  128. func (cli *remoteClient) recvResponseRoutine(conn net.Conn) {
  129. r := bufio.NewReader(conn) // Buffer reads
  130. for {
  131. var res = &types.Response{}
  132. err := types.ReadMessage(r, res)
  133. if err != nil {
  134. cli.StopForError(err)
  135. return
  136. }
  137. switch r := res.Value.(type) {
  138. case *types.Response_Exception:
  139. // XXX After setting cli.err, release waiters (e.g. reqres.Done())
  140. cli.StopForError(errors.New(r.Exception.Error))
  141. default:
  142. // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  143. err := cli.didRecvResponse(res)
  144. if err != nil {
  145. cli.StopForError(err)
  146. }
  147. }
  148. }
  149. }
  150. func (cli *remoteClient) willSendReq(reqres *ReqRes) {
  151. cli.mtx.Lock()
  152. defer cli.mtx.Unlock()
  153. cli.reqSent.PushBack(reqres)
  154. }
  155. func (cli *remoteClient) didRecvResponse(res *types.Response) error {
  156. cli.mtx.Lock()
  157. defer cli.mtx.Unlock()
  158. // Get the first ReqRes
  159. next := cli.reqSent.Front()
  160. if next == nil {
  161. return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
  162. }
  163. reqres := next.Value.(*ReqRes)
  164. if !resMatchesReq(reqres.Request, res) {
  165. return fmt.Errorf("Unexpected result type %v when response to %v expected",
  166. reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
  167. }
  168. reqres.Response = res // Set response
  169. reqres.Done() // Release waiters
  170. cli.reqSent.Remove(next) // Pop first item from linked list
  171. // Notify reqRes listener if set
  172. if cb := reqres.GetCallback(); cb != nil {
  173. cb(res)
  174. }
  175. // Notify client listener if set
  176. if cli.resCb != nil {
  177. cli.resCb(reqres.Request, res)
  178. }
  179. return nil
  180. }
  181. //----------------------------------------
  182. func (cli *remoteClient) EchoAsync(msg string) *ReqRes {
  183. return cli.queueRequest(types.ToRequestEcho(msg))
  184. }
  185. func (cli *remoteClient) FlushAsync() *ReqRes {
  186. return cli.queueRequest(types.ToRequestFlush())
  187. }
  188. func (cli *remoteClient) InfoAsync() *ReqRes {
  189. return cli.queueRequest(types.ToRequestInfo())
  190. }
  191. func (cli *remoteClient) SetOptionAsync(key string, value string) *ReqRes {
  192. return cli.queueRequest(types.ToRequestSetOption(key, value))
  193. }
  194. func (cli *remoteClient) AppendTxAsync(tx []byte) *ReqRes {
  195. return cli.queueRequest(types.ToRequestAppendTx(tx))
  196. }
  197. func (cli *remoteClient) CheckTxAsync(tx []byte) *ReqRes {
  198. return cli.queueRequest(types.ToRequestCheckTx(tx))
  199. }
  200. func (cli *remoteClient) QueryAsync(query []byte) *ReqRes {
  201. return cli.queueRequest(types.ToRequestQuery(query))
  202. }
  203. func (cli *remoteClient) CommitAsync() *ReqRes {
  204. return cli.queueRequest(types.ToRequestCommit())
  205. }
  206. func (cli *remoteClient) InitChainAsync(validators []*types.Validator) *ReqRes {
  207. return cli.queueRequest(types.ToRequestInitChain(validators))
  208. }
  209. func (cli *remoteClient) BeginBlockAsync(height uint64) *ReqRes {
  210. return cli.queueRequest(types.ToRequestBeginBlock(height))
  211. }
  212. func (cli *remoteClient) EndBlockAsync(height uint64) *ReqRes {
  213. return cli.queueRequest(types.ToRequestEndBlock(height))
  214. }
  215. //----------------------------------------
  216. func (cli *remoteClient) EchoSync(msg string) (res types.Result) {
  217. reqres := cli.queueRequest(types.ToRequestEcho(msg))
  218. cli.FlushSync()
  219. if cli.err != nil {
  220. return types.ErrInternalError.SetLog(cli.err.Error())
  221. }
  222. resp := reqres.Response.GetEcho()
  223. return types.Result{Code: OK, Data: []byte(resp.Message), Log: LOG}
  224. }
  225. func (cli *remoteClient) FlushSync() error {
  226. cli.queueRequest(types.ToRequestFlush()).Wait()
  227. return cli.err
  228. }
  229. func (cli *remoteClient) InfoSync() (res types.Result) {
  230. reqres := cli.queueRequest(types.ToRequestInfo())
  231. cli.FlushSync()
  232. if cli.err != nil {
  233. return types.ErrInternalError.SetLog(cli.err.Error())
  234. }
  235. resp := reqres.Response.GetInfo()
  236. return types.Result{Code: OK, Data: []byte(resp.Info), Log: LOG}
  237. }
  238. func (cli *remoteClient) SetOptionSync(key string, value string) (res types.Result) {
  239. reqres := cli.queueRequest(types.ToRequestSetOption(key, value))
  240. cli.FlushSync()
  241. if cli.err != nil {
  242. return types.ErrInternalError.SetLog(cli.err.Error())
  243. }
  244. resp := reqres.Response.GetSetOption()
  245. return types.Result{Code: OK, Data: nil, Log: resp.Log}
  246. }
  247. func (cli *remoteClient) AppendTxSync(tx []byte) (res types.Result) {
  248. reqres := cli.queueRequest(types.ToRequestAppendTx(tx))
  249. cli.FlushSync()
  250. if cli.err != nil {
  251. return types.ErrInternalError.SetLog(cli.err.Error())
  252. }
  253. resp := reqres.Response.GetAppendTx()
  254. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  255. }
  256. func (cli *remoteClient) CheckTxSync(tx []byte) (res types.Result) {
  257. reqres := cli.queueRequest(types.ToRequestCheckTx(tx))
  258. cli.FlushSync()
  259. if cli.err != nil {
  260. return types.ErrInternalError.SetLog(cli.err.Error())
  261. }
  262. resp := reqres.Response.GetCheckTx()
  263. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  264. }
  265. func (cli *remoteClient) QuerySync(query []byte) (res types.Result) {
  266. reqres := cli.queueRequest(types.ToRequestQuery(query))
  267. cli.FlushSync()
  268. if cli.err != nil {
  269. return types.ErrInternalError.SetLog(cli.err.Error())
  270. }
  271. resp := reqres.Response.GetQuery()
  272. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  273. }
  274. func (cli *remoteClient) CommitSync() (res types.Result) {
  275. reqres := cli.queueRequest(types.ToRequestCommit())
  276. cli.FlushSync()
  277. if cli.err != nil {
  278. return types.ErrInternalError.SetLog(cli.err.Error())
  279. }
  280. resp := reqres.Response.GetCommit()
  281. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  282. }
  283. func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error) {
  284. cli.queueRequest(types.ToRequestInitChain(validators))
  285. cli.FlushSync()
  286. if cli.err != nil {
  287. return cli.err
  288. }
  289. return nil
  290. }
  291. func (cli *remoteClient) BeginBlockSync(height uint64) (err error) {
  292. cli.queueRequest(types.ToRequestBeginBlock(height))
  293. cli.FlushSync()
  294. if cli.err != nil {
  295. return cli.err
  296. }
  297. return nil
  298. }
  299. func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) {
  300. reqres := cli.queueRequest(types.ToRequestEndBlock(height))
  301. cli.FlushSync()
  302. if cli.err != nil {
  303. return nil, cli.err
  304. }
  305. return reqres.Response.GetEndBlock().Diffs, nil
  306. }
  307. //----------------------------------------
  308. func (cli *remoteClient) queueRequest(req *types.Request) *ReqRes {
  309. reqres := NewReqRes(req)
  310. // TODO: set cli.err if reqQueue times out
  311. cli.reqQueue <- reqres
  312. // Maybe auto-flush, or unset auto-flush
  313. switch req.Value.(type) {
  314. case *types.Request_Flush:
  315. cli.flushTimer.Unset()
  316. default:
  317. cli.flushTimer.Set()
  318. }
  319. return reqres
  320. }
  321. //----------------------------------------
  322. func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
  323. switch req.Value.(type) {
  324. case *types.Request_Echo:
  325. _, ok = res.Value.(*types.Response_Echo)
  326. case *types.Request_Flush:
  327. _, ok = res.Value.(*types.Response_Flush)
  328. case *types.Request_Info:
  329. _, ok = res.Value.(*types.Response_Info)
  330. case *types.Request_SetOption:
  331. _, ok = res.Value.(*types.Response_SetOption)
  332. case *types.Request_AppendTx:
  333. _, ok = res.Value.(*types.Response_AppendTx)
  334. case *types.Request_CheckTx:
  335. _, ok = res.Value.(*types.Response_CheckTx)
  336. case *types.Request_Commit:
  337. _, ok = res.Value.(*types.Response_Commit)
  338. case *types.Request_Query:
  339. _, ok = res.Value.(*types.Response_Query)
  340. case *types.Request_InitChain:
  341. _, ok = res.Value.(*types.Response_InitChain)
  342. case *types.Request_EndBlock:
  343. _, ok = res.Value.(*types.Response_EndBlock)
  344. }
  345. return ok
  346. }