You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

440 lines
11 KiB

8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
7 years ago
7 years ago
7 years ago
8 years ago
  1. package abcicli
  2. import (
  3. "bufio"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "reflect"
  9. "sync"
  10. "time"
  11. "github.com/tendermint/abci/types"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. )
  14. const (
  15. OK = types.CodeType_OK
  16. LOG = ""
  17. )
  18. const reqQueueSize = 256 // TODO make configurable
  19. // const maxResponseSize = 1048576 // 1MB TODO make configurable
  20. const flushThrottleMS = 20 // Don't wait longer than...
  21. // This is goroutine-safe, but users should beware that
  22. // the application in general is not meant to be interfaced
  23. // with concurrent callers.
  24. type socketClient struct {
  25. cmn.BaseService
  26. reqQueue chan *ReqRes
  27. flushTimer *cmn.ThrottleTimer
  28. mustConnect bool
  29. mtx sync.Mutex
  30. addr string
  31. conn net.Conn
  32. err error
  33. reqSent *list.List
  34. resCb func(*types.Request, *types.Response) // listens to all callbacks
  35. }
  36. func NewSocketClient(addr string, mustConnect bool) *socketClient {
  37. cli := &socketClient{
  38. reqQueue: make(chan *ReqRes, reqQueueSize),
  39. flushTimer: cmn.NewThrottleTimer("socketClient", flushThrottleMS),
  40. mustConnect: mustConnect,
  41. addr: addr,
  42. reqSent: list.New(),
  43. resCb: nil,
  44. }
  45. cli.BaseService = *cmn.NewBaseService(nil, "socketClient", cli)
  46. return cli
  47. }
  48. func (cli *socketClient) OnStart() error {
  49. if err := cli.BaseService.OnStart(); err != nil {
  50. return err
  51. }
  52. var err error
  53. var conn net.Conn
  54. RETRY_LOOP:
  55. for {
  56. conn, err = cmn.Connect(cli.addr)
  57. if err != nil {
  58. if cli.mustConnect {
  59. return err
  60. }
  61. cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying...", cli.addr))
  62. time.Sleep(time.Second * 3)
  63. continue RETRY_LOOP
  64. }
  65. cli.conn = conn
  66. go cli.sendRequestsRoutine(conn)
  67. go cli.recvResponseRoutine(conn)
  68. return nil
  69. }
  70. }
  71. func (cli *socketClient) OnStop() {
  72. cli.BaseService.OnStop()
  73. cli.mtx.Lock()
  74. defer cli.mtx.Unlock()
  75. if cli.conn != nil {
  76. cli.conn.Close()
  77. }
  78. cli.flushQueue()
  79. }
  80. // Stop the client and set the error
  81. func (cli *socketClient) StopForError(err error) {
  82. cli.mtx.Lock()
  83. if !cli.IsRunning() {
  84. return
  85. }
  86. if cli.err == nil {
  87. cli.err = err
  88. }
  89. cli.mtx.Unlock()
  90. cli.Logger.Error(fmt.Sprintf("Stopping abci.socketClient for error: %v", err.Error()))
  91. cli.Stop()
  92. }
  93. func (cli *socketClient) Error() error {
  94. cli.mtx.Lock()
  95. defer cli.mtx.Unlock()
  96. return cli.err
  97. }
  98. // Set listener for all responses
  99. // NOTE: callback may get internally generated flush responses.
  100. func (cli *socketClient) SetResponseCallback(resCb Callback) {
  101. cli.mtx.Lock()
  102. defer cli.mtx.Unlock()
  103. cli.resCb = resCb
  104. }
  105. //----------------------------------------
  106. func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
  107. w := bufio.NewWriter(conn)
  108. for {
  109. select {
  110. case <-cli.flushTimer.Ch:
  111. select {
  112. case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
  113. default:
  114. // Probably will fill the buffer, or retry later.
  115. }
  116. case <-cli.BaseService.Quit:
  117. return
  118. case reqres := <-cli.reqQueue:
  119. cli.willSendReq(reqres)
  120. err := types.WriteMessage(reqres.Request, w)
  121. if err != nil {
  122. cli.StopForError(fmt.Errorf("Error writing msg: %v", err))
  123. return
  124. }
  125. // cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
  126. if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
  127. err = w.Flush()
  128. if err != nil {
  129. cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))
  130. return
  131. }
  132. }
  133. }
  134. }
  135. }
  136. func (cli *socketClient) recvResponseRoutine(conn net.Conn) {
  137. r := bufio.NewReader(conn) // Buffer reads
  138. for {
  139. var res = &types.Response{}
  140. err := types.ReadMessage(r, res)
  141. if err != nil {
  142. cli.StopForError(err)
  143. return
  144. }
  145. switch r := res.Value.(type) {
  146. case *types.Response_Exception:
  147. // XXX After setting cli.err, release waiters (e.g. reqres.Done())
  148. cli.StopForError(errors.New(r.Exception.Error))
  149. return
  150. default:
  151. // cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  152. err := cli.didRecvResponse(res)
  153. if err != nil {
  154. cli.StopForError(err)
  155. return
  156. }
  157. }
  158. }
  159. }
  160. func (cli *socketClient) willSendReq(reqres *ReqRes) {
  161. cli.mtx.Lock()
  162. defer cli.mtx.Unlock()
  163. cli.reqSent.PushBack(reqres)
  164. }
  165. func (cli *socketClient) didRecvResponse(res *types.Response) error {
  166. cli.mtx.Lock()
  167. defer cli.mtx.Unlock()
  168. // Get the first ReqRes
  169. next := cli.reqSent.Front()
  170. if next == nil {
  171. return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
  172. }
  173. reqres := next.Value.(*ReqRes)
  174. if !resMatchesReq(reqres.Request, res) {
  175. return fmt.Errorf("Unexpected result type %v when response to %v expected",
  176. reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
  177. }
  178. reqres.Response = res // Set response
  179. reqres.Done() // Release waiters
  180. cli.reqSent.Remove(next) // Pop first item from linked list
  181. // Notify reqRes listener if set
  182. if cb := reqres.GetCallback(); cb != nil {
  183. cb(res)
  184. }
  185. // Notify client listener if set
  186. if cli.resCb != nil {
  187. cli.resCb(reqres.Request, res)
  188. }
  189. return nil
  190. }
  191. //----------------------------------------
  192. func (cli *socketClient) EchoAsync(msg string) *ReqRes {
  193. return cli.queueRequest(types.ToRequestEcho(msg))
  194. }
  195. func (cli *socketClient) FlushAsync() *ReqRes {
  196. return cli.queueRequest(types.ToRequestFlush())
  197. }
  198. func (cli *socketClient) InfoAsync(req types.RequestInfo) *ReqRes {
  199. return cli.queueRequest(types.ToRequestInfo(req))
  200. }
  201. func (cli *socketClient) SetOptionAsync(key string, value string) *ReqRes {
  202. return cli.queueRequest(types.ToRequestSetOption(key, value))
  203. }
  204. func (cli *socketClient) DeliverTxAsync(tx []byte) *ReqRes {
  205. return cli.queueRequest(types.ToRequestDeliverTx(tx))
  206. }
  207. func (cli *socketClient) CheckTxAsync(tx []byte) *ReqRes {
  208. return cli.queueRequest(types.ToRequestCheckTx(tx))
  209. }
  210. func (cli *socketClient) QueryAsync(reqQuery types.RequestQuery) *ReqRes {
  211. return cli.queueRequest(types.ToRequestQuery(reqQuery))
  212. }
  213. func (cli *socketClient) CommitAsync() *ReqRes {
  214. return cli.queueRequest(types.ToRequestCommit())
  215. }
  216. func (cli *socketClient) InitChainAsync(params types.RequestInitChain) *ReqRes {
  217. return cli.queueRequest(types.ToRequestInitChain(params))
  218. }
  219. func (cli *socketClient) BeginBlockAsync(params types.RequestBeginBlock) *ReqRes {
  220. return cli.queueRequest(types.ToRequestBeginBlock(params))
  221. }
  222. func (cli *socketClient) EndBlockAsync(height uint64) *ReqRes {
  223. return cli.queueRequest(types.ToRequestEndBlock(height))
  224. }
  225. //----------------------------------------
  226. func (cli *socketClient) EchoSync(msg string) (res types.Result) {
  227. reqres := cli.queueRequest(types.ToRequestEcho(msg))
  228. cli.FlushSync()
  229. if err := cli.Error(); err != nil {
  230. return types.ErrInternalError.SetLog(err.Error())
  231. }
  232. resp := reqres.Response.GetEcho()
  233. return types.Result{Code: OK, Data: []byte(resp.Message)}
  234. }
  235. func (cli *socketClient) FlushSync() error {
  236. reqRes := cli.queueRequest(types.ToRequestFlush())
  237. if err := cli.Error(); err != nil {
  238. return types.ErrInternalError.SetLog(err.Error())
  239. }
  240. reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here
  241. return cli.Error()
  242. }
  243. func (cli *socketClient) InfoSync(req types.RequestInfo) (resInfo types.ResponseInfo, err error) {
  244. reqres := cli.queueRequest(types.ToRequestInfo(req))
  245. cli.FlushSync()
  246. if err := cli.Error(); err != nil {
  247. return resInfo, err
  248. }
  249. if resInfo_ := reqres.Response.GetInfo(); resInfo_ != nil {
  250. return *resInfo_, nil
  251. }
  252. return resInfo, nil
  253. }
  254. func (cli *socketClient) SetOptionSync(key string, value string) (res types.Result) {
  255. reqres := cli.queueRequest(types.ToRequestSetOption(key, value))
  256. cli.FlushSync()
  257. if err := cli.Error(); err != nil {
  258. return types.ErrInternalError.SetLog(err.Error())
  259. }
  260. resp := reqres.Response.GetSetOption()
  261. return types.Result{Code: OK, Data: nil, Log: resp.Log}
  262. }
  263. func (cli *socketClient) DeliverTxSync(tx []byte) (res types.Result) {
  264. reqres := cli.queueRequest(types.ToRequestDeliverTx(tx))
  265. cli.FlushSync()
  266. if err := cli.Error(); err != nil {
  267. return types.ErrInternalError.SetLog(err.Error())
  268. }
  269. resp := reqres.Response.GetDeliverTx()
  270. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  271. }
  272. func (cli *socketClient) CheckTxSync(tx []byte) (res types.Result) {
  273. reqres := cli.queueRequest(types.ToRequestCheckTx(tx))
  274. cli.FlushSync()
  275. if err := cli.Error(); err != nil {
  276. return types.ErrInternalError.SetLog(err.Error())
  277. }
  278. resp := reqres.Response.GetCheckTx()
  279. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  280. }
  281. func (cli *socketClient) QuerySync(reqQuery types.RequestQuery) (resQuery types.ResponseQuery, err error) {
  282. reqres := cli.queueRequest(types.ToRequestQuery(reqQuery))
  283. cli.FlushSync()
  284. if err := cli.Error(); err != nil {
  285. return resQuery, err
  286. }
  287. if resQuery_ := reqres.Response.GetQuery(); resQuery_ != nil {
  288. return *resQuery_, nil
  289. }
  290. return resQuery, nil
  291. }
  292. func (cli *socketClient) CommitSync() (res types.Result) {
  293. reqres := cli.queueRequest(types.ToRequestCommit())
  294. cli.FlushSync()
  295. if err := cli.Error(); err != nil {
  296. return types.ErrInternalError.SetLog(err.Error())
  297. }
  298. resp := reqres.Response.GetCommit()
  299. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  300. }
  301. func (cli *socketClient) InitChainSync(params types.RequestInitChain) (err error) {
  302. cli.queueRequest(types.ToRequestInitChain(params))
  303. cli.FlushSync()
  304. return cli.Error()
  305. }
  306. func (cli *socketClient) BeginBlockSync(params types.RequestBeginBlock) (err error) {
  307. cli.queueRequest(types.ToRequestBeginBlock(params))
  308. cli.FlushSync()
  309. return cli.Error()
  310. }
  311. func (cli *socketClient) EndBlockSync(height uint64) (resEndBlock types.ResponseEndBlock, err error) {
  312. reqres := cli.queueRequest(types.ToRequestEndBlock(height))
  313. cli.FlushSync()
  314. if err := cli.Error(); err != nil {
  315. return resEndBlock, err
  316. }
  317. if blk := reqres.Response.GetEndBlock(); blk != nil {
  318. return *blk, nil
  319. }
  320. return resEndBlock, nil
  321. }
  322. //----------------------------------------
  323. func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
  324. reqres := NewReqRes(req)
  325. // TODO: set cli.err if reqQueue times out
  326. cli.reqQueue <- reqres
  327. // Maybe auto-flush, or unset auto-flush
  328. switch req.Value.(type) {
  329. case *types.Request_Flush:
  330. cli.flushTimer.Unset()
  331. default:
  332. cli.flushTimer.Set()
  333. }
  334. return reqres
  335. }
  336. func (cli *socketClient) flushQueue() {
  337. LOOP:
  338. for {
  339. select {
  340. case reqres := <-cli.reqQueue:
  341. reqres.Done()
  342. default:
  343. break LOOP
  344. }
  345. }
  346. }
  347. //----------------------------------------
  348. func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
  349. switch req.Value.(type) {
  350. case *types.Request_Echo:
  351. _, ok = res.Value.(*types.Response_Echo)
  352. case *types.Request_Flush:
  353. _, ok = res.Value.(*types.Response_Flush)
  354. case *types.Request_Info:
  355. _, ok = res.Value.(*types.Response_Info)
  356. case *types.Request_SetOption:
  357. _, ok = res.Value.(*types.Response_SetOption)
  358. case *types.Request_DeliverTx:
  359. _, ok = res.Value.(*types.Response_DeliverTx)
  360. case *types.Request_CheckTx:
  361. _, ok = res.Value.(*types.Response_CheckTx)
  362. case *types.Request_Commit:
  363. _, ok = res.Value.(*types.Response_Commit)
  364. case *types.Request_Query:
  365. _, ok = res.Value.(*types.Response_Query)
  366. case *types.Request_InitChain:
  367. _, ok = res.Value.(*types.Response_InitChain)
  368. case *types.Request_BeginBlock:
  369. _, ok = res.Value.(*types.Response_BeginBlock)
  370. case *types.Request_EndBlock:
  371. _, ok = res.Value.(*types.Response_EndBlock)
  372. }
  373. return ok
  374. }