You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

444 lines
11 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package abcicli
  2. import (
  3. "bufio"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "reflect"
  9. "sync"
  10. "time"
  11. "github.com/tendermint/abci/types"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. )
  14. const (
  15. OK = types.CodeType_OK
  16. LOG = ""
  17. )
  18. const reqQueueSize = 256 // TODO make configurable
  19. const maxResponseSize = 1048576 // 1MB TODO make configurable
  20. const flushThrottleMS = 20 // Don't wait longer than...
  21. // This is goroutine-safe, but users should beware that
  22. // the application in general is not meant to be interfaced
  23. // with concurrent callers.
  24. type socketClient struct {
  25. cmn.BaseService
  26. reqQueue chan *ReqRes
  27. flushTimer *cmn.ThrottleTimer
  28. mustConnect bool
  29. mtx sync.Mutex
  30. addr string
  31. conn net.Conn
  32. err error
  33. reqSent *list.List
  34. resCb func(*types.Request, *types.Response) // listens to all callbacks
  35. }
  36. func NewSocketClient(addr string, mustConnect bool) *socketClient {
  37. cli := &socketClient{
  38. reqQueue: make(chan *ReqRes, reqQueueSize),
  39. flushTimer: cmn.NewThrottleTimer("socketClient", flushThrottleMS),
  40. mustConnect: mustConnect,
  41. addr: addr,
  42. reqSent: list.New(),
  43. resCb: nil,
  44. }
  45. cli.BaseService = *cmn.NewBaseService(nil, "socketClient", cli)
  46. return cli
  47. }
  48. func (cli *socketClient) OnStart() error {
  49. cli.BaseService.OnStart()
  50. var err error
  51. var conn net.Conn
  52. RETRY_LOOP:
  53. for {
  54. conn, err = cmn.Connect(cli.addr)
  55. if err != nil {
  56. if cli.mustConnect {
  57. return err
  58. }
  59. cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying...", cli.addr))
  60. time.Sleep(time.Second * 3)
  61. continue RETRY_LOOP
  62. }
  63. cli.conn = conn
  64. go cli.sendRequestsRoutine(conn)
  65. go cli.recvResponseRoutine(conn)
  66. return nil
  67. }
  68. }
  69. func (cli *socketClient) OnStop() {
  70. cli.BaseService.OnStop()
  71. cli.mtx.Lock()
  72. defer cli.mtx.Unlock()
  73. if cli.conn != nil {
  74. cli.conn.Close()
  75. }
  76. cli.flushQueue()
  77. }
  78. // Stop the client and set the error
  79. func (cli *socketClient) StopForError(err error) {
  80. cli.mtx.Lock()
  81. if !cli.IsRunning() {
  82. return
  83. }
  84. if cli.err == nil {
  85. cli.err = err
  86. }
  87. cli.mtx.Unlock()
  88. cli.Logger.Error(fmt.Sprintf("Stopping abci.socketClient for error: %v", err.Error()))
  89. cli.Stop()
  90. }
  91. func (cli *socketClient) Error() error {
  92. cli.mtx.Lock()
  93. defer cli.mtx.Unlock()
  94. return cli.err
  95. }
  96. // Set listener for all responses
  97. // NOTE: callback may get internally generated flush responses.
  98. func (cli *socketClient) SetResponseCallback(resCb Callback) {
  99. cli.mtx.Lock()
  100. defer cli.mtx.Unlock()
  101. cli.resCb = resCb
  102. }
  103. //----------------------------------------
  104. func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
  105. w := bufio.NewWriter(conn)
  106. for {
  107. select {
  108. case <-cli.flushTimer.Ch:
  109. select {
  110. case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
  111. default:
  112. // Probably will fill the buffer, or retry later.
  113. }
  114. case <-cli.BaseService.Quit:
  115. return
  116. case reqres := <-cli.reqQueue:
  117. cli.willSendReq(reqres)
  118. err := types.WriteMessage(reqres.Request, w)
  119. if err != nil {
  120. cli.StopForError(fmt.Errorf("Error writing msg: %v", err))
  121. return
  122. }
  123. // cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
  124. if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
  125. err = w.Flush()
  126. if err != nil {
  127. cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))
  128. return
  129. }
  130. }
  131. }
  132. }
  133. }
  134. func (cli *socketClient) recvResponseRoutine(conn net.Conn) {
  135. r := bufio.NewReader(conn) // Buffer reads
  136. for {
  137. var res = &types.Response{}
  138. err := types.ReadMessage(r, res)
  139. if err != nil {
  140. cli.StopForError(err)
  141. return
  142. }
  143. switch r := res.Value.(type) {
  144. case *types.Response_Exception:
  145. // XXX After setting cli.err, release waiters (e.g. reqres.Done())
  146. cli.StopForError(errors.New(r.Exception.Error))
  147. return
  148. default:
  149. // cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  150. err := cli.didRecvResponse(res)
  151. if err != nil {
  152. cli.StopForError(err)
  153. return
  154. }
  155. }
  156. }
  157. }
  158. func (cli *socketClient) willSendReq(reqres *ReqRes) {
  159. cli.mtx.Lock()
  160. defer cli.mtx.Unlock()
  161. cli.reqSent.PushBack(reqres)
  162. }
  163. func (cli *socketClient) didRecvResponse(res *types.Response) error {
  164. cli.mtx.Lock()
  165. defer cli.mtx.Unlock()
  166. // Get the first ReqRes
  167. next := cli.reqSent.Front()
  168. if next == nil {
  169. return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
  170. }
  171. reqres := next.Value.(*ReqRes)
  172. if !resMatchesReq(reqres.Request, res) {
  173. return fmt.Errorf("Unexpected result type %v when response to %v expected",
  174. reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
  175. }
  176. reqres.Response = res // Set response
  177. reqres.Done() // Release waiters
  178. cli.reqSent.Remove(next) // Pop first item from linked list
  179. // Notify reqRes listener if set
  180. if cb := reqres.GetCallback(); cb != nil {
  181. cb(res)
  182. }
  183. // Notify client listener if set
  184. if cli.resCb != nil {
  185. cli.resCb(reqres.Request, res)
  186. }
  187. return nil
  188. }
  189. //----------------------------------------
  190. func (cli *socketClient) EchoAsync(msg string) *ReqRes {
  191. return cli.queueRequest(types.ToRequestEcho(msg))
  192. }
  193. func (cli *socketClient) FlushAsync() *ReqRes {
  194. return cli.queueRequest(types.ToRequestFlush())
  195. }
  196. func (cli *socketClient) InfoAsync() *ReqRes {
  197. return cli.queueRequest(types.ToRequestInfo())
  198. }
  199. func (cli *socketClient) SetOptionAsync(key string, value string) *ReqRes {
  200. return cli.queueRequest(types.ToRequestSetOption(key, value))
  201. }
  202. func (cli *socketClient) DeliverTxAsync(tx []byte) *ReqRes {
  203. return cli.queueRequest(types.ToRequestDeliverTx(tx))
  204. }
  205. func (cli *socketClient) CheckTxAsync(tx []byte) *ReqRes {
  206. return cli.queueRequest(types.ToRequestCheckTx(tx))
  207. }
  208. func (cli *socketClient) QueryAsync(reqQuery types.RequestQuery) *ReqRes {
  209. return cli.queueRequest(types.ToRequestQuery(reqQuery))
  210. }
  211. func (cli *socketClient) CommitAsync() *ReqRes {
  212. return cli.queueRequest(types.ToRequestCommit())
  213. }
  214. func (cli *socketClient) InitChainAsync(validators []*types.Validator) *ReqRes {
  215. return cli.queueRequest(types.ToRequestInitChain(validators))
  216. }
  217. func (cli *socketClient) BeginBlockAsync(hash []byte, header *types.Header) *ReqRes {
  218. return cli.queueRequest(types.ToRequestBeginBlock(hash, header))
  219. }
  220. func (cli *socketClient) EndBlockAsync(height uint64) *ReqRes {
  221. return cli.queueRequest(types.ToRequestEndBlock(height))
  222. }
  223. //----------------------------------------
  224. func (cli *socketClient) EchoSync(msg string) (res types.Result) {
  225. reqres := cli.queueRequest(types.ToRequestEcho(msg))
  226. cli.FlushSync()
  227. if err := cli.Error(); err != nil {
  228. return types.ErrInternalError.SetLog(err.Error())
  229. }
  230. resp := reqres.Response.GetEcho()
  231. return types.Result{Code: OK, Data: []byte(resp.Message)}
  232. }
  233. func (cli *socketClient) FlushSync() error {
  234. reqRes := cli.queueRequest(types.ToRequestFlush())
  235. if err := cli.Error(); err != nil {
  236. return types.ErrInternalError.SetLog(err.Error())
  237. }
  238. reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here
  239. return cli.Error()
  240. }
  241. func (cli *socketClient) InfoSync() (resInfo types.ResponseInfo, err error) {
  242. reqres := cli.queueRequest(types.ToRequestInfo())
  243. cli.FlushSync()
  244. if err := cli.Error(); err != nil {
  245. return resInfo, err
  246. }
  247. if resInfo_ := reqres.Response.GetInfo(); resInfo_ != nil {
  248. return *resInfo_, nil
  249. }
  250. return resInfo, nil
  251. }
  252. func (cli *socketClient) SetOptionSync(key string, value string) (res types.Result) {
  253. reqres := cli.queueRequest(types.ToRequestSetOption(key, value))
  254. cli.FlushSync()
  255. if err := cli.Error(); err != nil {
  256. return types.ErrInternalError.SetLog(err.Error())
  257. }
  258. resp := reqres.Response.GetSetOption()
  259. return types.Result{Code: OK, Data: nil, Log: resp.Log}
  260. }
  261. func (cli *socketClient) DeliverTxSync(tx []byte) (res types.Result) {
  262. reqres := cli.queueRequest(types.ToRequestDeliverTx(tx))
  263. cli.FlushSync()
  264. if err := cli.Error(); err != nil {
  265. return types.ErrInternalError.SetLog(err.Error())
  266. }
  267. resp := reqres.Response.GetDeliverTx()
  268. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  269. }
  270. func (cli *socketClient) CheckTxSync(tx []byte) (res types.Result) {
  271. reqres := cli.queueRequest(types.ToRequestCheckTx(tx))
  272. cli.FlushSync()
  273. if err := cli.Error(); err != nil {
  274. return types.ErrInternalError.SetLog(err.Error())
  275. }
  276. resp := reqres.Response.GetCheckTx()
  277. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  278. }
  279. func (cli *socketClient) QuerySync(reqQuery types.RequestQuery) (resQuery types.ResponseQuery, err error) {
  280. reqres := cli.queueRequest(types.ToRequestQuery(reqQuery))
  281. cli.FlushSync()
  282. if err := cli.Error(); err != nil {
  283. return resQuery, err
  284. }
  285. if resQuery_ := reqres.Response.GetQuery(); resQuery_ != nil {
  286. return *resQuery_, nil
  287. }
  288. return resQuery, nil
  289. }
  290. func (cli *socketClient) CommitSync() (res types.Result) {
  291. reqres := cli.queueRequest(types.ToRequestCommit())
  292. cli.FlushSync()
  293. if err := cli.Error(); err != nil {
  294. return types.ErrInternalError.SetLog(err.Error())
  295. }
  296. resp := reqres.Response.GetCommit()
  297. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  298. }
  299. func (cli *socketClient) InitChainSync(validators []*types.Validator) (err error) {
  300. cli.queueRequest(types.ToRequestInitChain(validators))
  301. cli.FlushSync()
  302. if err := cli.Error(); err != nil {
  303. return err
  304. }
  305. return nil
  306. }
  307. func (cli *socketClient) BeginBlockSync(hash []byte, header *types.Header) (err error) {
  308. cli.queueRequest(types.ToRequestBeginBlock(hash, header))
  309. cli.FlushSync()
  310. if err := cli.Error(); err != nil {
  311. return err
  312. }
  313. return nil
  314. }
  315. func (cli *socketClient) EndBlockSync(height uint64) (resEndBlock types.ResponseEndBlock, err error) {
  316. reqres := cli.queueRequest(types.ToRequestEndBlock(height))
  317. cli.FlushSync()
  318. if err := cli.Error(); err != nil {
  319. return resEndBlock, err
  320. }
  321. if blk := reqres.Response.GetEndBlock(); blk != nil {
  322. return *blk, nil
  323. }
  324. return resEndBlock, nil
  325. }
  326. //----------------------------------------
  327. func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
  328. reqres := NewReqRes(req)
  329. // TODO: set cli.err if reqQueue times out
  330. cli.reqQueue <- reqres
  331. // Maybe auto-flush, or unset auto-flush
  332. switch req.Value.(type) {
  333. case *types.Request_Flush:
  334. cli.flushTimer.Unset()
  335. default:
  336. cli.flushTimer.Set()
  337. }
  338. return reqres
  339. }
  340. func (cli *socketClient) flushQueue() {
  341. LOOP:
  342. for {
  343. select {
  344. case reqres := <-cli.reqQueue:
  345. reqres.Done()
  346. default:
  347. break LOOP
  348. }
  349. }
  350. }
  351. //----------------------------------------
  352. func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
  353. switch req.Value.(type) {
  354. case *types.Request_Echo:
  355. _, ok = res.Value.(*types.Response_Echo)
  356. case *types.Request_Flush:
  357. _, ok = res.Value.(*types.Response_Flush)
  358. case *types.Request_Info:
  359. _, ok = res.Value.(*types.Response_Info)
  360. case *types.Request_SetOption:
  361. _, ok = res.Value.(*types.Response_SetOption)
  362. case *types.Request_DeliverTx:
  363. _, ok = res.Value.(*types.Response_DeliverTx)
  364. case *types.Request_CheckTx:
  365. _, ok = res.Value.(*types.Response_CheckTx)
  366. case *types.Request_Commit:
  367. _, ok = res.Value.(*types.Response_Commit)
  368. case *types.Request_Query:
  369. _, ok = res.Value.(*types.Response_Query)
  370. case *types.Request_InitChain:
  371. _, ok = res.Value.(*types.Response_InitChain)
  372. case *types.Request_BeginBlock:
  373. _, ok = res.Value.(*types.Response_BeginBlock)
  374. case *types.Request_EndBlock:
  375. _, ok = res.Value.(*types.Response_EndBlock)
  376. }
  377. return ok
  378. }