You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

446 lines
11 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package abcicli
  2. import (
  3. "bufio"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "reflect"
  9. "sync"
  10. "time"
  11. "github.com/tendermint/abci/types"
  12. cmn "github.com/tendermint/go-common"
  13. )
  14. const (
  15. OK = types.CodeType_OK
  16. LOG = ""
  17. )
  18. const reqQueueSize = 256 // TODO make configurable
  19. const maxResponseSize = 1048576 // 1MB TODO make configurable
  20. const flushThrottleMS = 20 // Don't wait longer than...
  21. // This is goroutine-safe, but users should beware that
  22. // the application in general is not meant to be interfaced
  23. // with concurrent callers.
  24. type socketClient struct {
  25. cmn.BaseService
  26. reqQueue chan *ReqRes
  27. flushTimer *cmn.ThrottleTimer
  28. mustConnect bool
  29. mtx sync.Mutex
  30. addr string
  31. conn net.Conn
  32. err error
  33. reqSent *list.List
  34. resCb func(*types.Request, *types.Response) // listens to all callbacks
  35. }
  36. func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) {
  37. cli := &socketClient{
  38. reqQueue: make(chan *ReqRes, reqQueueSize),
  39. flushTimer: cmn.NewThrottleTimer("socketClient", flushThrottleMS),
  40. mustConnect: mustConnect,
  41. addr: addr,
  42. reqSent: list.New(),
  43. resCb: nil,
  44. }
  45. cli.BaseService = *cmn.NewBaseService(nil, "socketClient", cli)
  46. _, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
  47. return cli, err
  48. }
  49. func (cli *socketClient) OnStart() error {
  50. cli.BaseService.OnStart()
  51. var err error
  52. var conn net.Conn
  53. RETRY_LOOP:
  54. for {
  55. conn, err = cmn.Connect(cli.addr)
  56. if err != nil {
  57. if cli.mustConnect {
  58. return err
  59. }
  60. log.Warn(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying...", cli.addr))
  61. time.Sleep(time.Second * 3)
  62. continue RETRY_LOOP
  63. }
  64. cli.conn = conn
  65. go cli.sendRequestsRoutine(conn)
  66. go cli.recvResponseRoutine(conn)
  67. return nil
  68. }
  69. }
  70. func (cli *socketClient) OnStop() {
  71. cli.BaseService.OnStop()
  72. cli.mtx.Lock()
  73. defer cli.mtx.Unlock()
  74. if cli.conn != nil {
  75. cli.conn.Close()
  76. }
  77. cli.flushQueue()
  78. }
  79. // Stop the client and set the error
  80. func (cli *socketClient) StopForError(err error) {
  81. cli.mtx.Lock()
  82. if !cli.IsRunning() {
  83. return
  84. }
  85. if cli.err == nil {
  86. cli.err = err
  87. }
  88. cli.mtx.Unlock()
  89. log.Warn(fmt.Sprintf("Stopping abci.socketClient for error: %v", err.Error()))
  90. cli.Stop()
  91. }
  92. func (cli *socketClient) Error() error {
  93. cli.mtx.Lock()
  94. defer cli.mtx.Unlock()
  95. return cli.err
  96. }
  97. // Set listener for all responses
  98. // NOTE: callback may get internally generated flush responses.
  99. func (cli *socketClient) SetResponseCallback(resCb Callback) {
  100. cli.mtx.Lock()
  101. defer cli.mtx.Unlock()
  102. cli.resCb = resCb
  103. }
  104. //----------------------------------------
  105. func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
  106. w := bufio.NewWriter(conn)
  107. for {
  108. select {
  109. case <-cli.flushTimer.Ch:
  110. select {
  111. case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
  112. default:
  113. // Probably will fill the buffer, or retry later.
  114. }
  115. case <-cli.BaseService.Quit:
  116. return
  117. case reqres := <-cli.reqQueue:
  118. cli.willSendReq(reqres)
  119. err := types.WriteMessage(reqres.Request, w)
  120. if err != nil {
  121. cli.StopForError(fmt.Errorf("Error writing msg: %v", err))
  122. return
  123. }
  124. // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
  125. if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
  126. err = w.Flush()
  127. if err != nil {
  128. cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))
  129. return
  130. }
  131. }
  132. }
  133. }
  134. }
  135. func (cli *socketClient) recvResponseRoutine(conn net.Conn) {
  136. r := bufio.NewReader(conn) // Buffer reads
  137. for {
  138. var res = &types.Response{}
  139. err := types.ReadMessage(r, res)
  140. if err != nil {
  141. cli.StopForError(err)
  142. return
  143. }
  144. switch r := res.Value.(type) {
  145. case *types.Response_Exception:
  146. // XXX After setting cli.err, release waiters (e.g. reqres.Done())
  147. cli.StopForError(errors.New(r.Exception.Error))
  148. return
  149. default:
  150. // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  151. err := cli.didRecvResponse(res)
  152. if err != nil {
  153. cli.StopForError(err)
  154. return
  155. }
  156. }
  157. }
  158. }
  159. func (cli *socketClient) willSendReq(reqres *ReqRes) {
  160. cli.mtx.Lock()
  161. defer cli.mtx.Unlock()
  162. cli.reqSent.PushBack(reqres)
  163. }
  164. func (cli *socketClient) didRecvResponse(res *types.Response) error {
  165. cli.mtx.Lock()
  166. defer cli.mtx.Unlock()
  167. // Get the first ReqRes
  168. next := cli.reqSent.Front()
  169. if next == nil {
  170. return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
  171. }
  172. reqres := next.Value.(*ReqRes)
  173. if !resMatchesReq(reqres.Request, res) {
  174. return fmt.Errorf("Unexpected result type %v when response to %v expected",
  175. reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
  176. }
  177. reqres.Response = res // Set response
  178. reqres.Done() // Release waiters
  179. cli.reqSent.Remove(next) // Pop first item from linked list
  180. // Notify reqRes listener if set
  181. if cb := reqres.GetCallback(); cb != nil {
  182. cb(res)
  183. }
  184. // Notify client listener if set
  185. if cli.resCb != nil {
  186. cli.resCb(reqres.Request, res)
  187. }
  188. return nil
  189. }
  190. //----------------------------------------
  191. func (cli *socketClient) EchoAsync(msg string) *ReqRes {
  192. return cli.queueRequest(types.ToRequestEcho(msg))
  193. }
  194. func (cli *socketClient) FlushAsync() *ReqRes {
  195. return cli.queueRequest(types.ToRequestFlush())
  196. }
  197. func (cli *socketClient) InfoAsync() *ReqRes {
  198. return cli.queueRequest(types.ToRequestInfo())
  199. }
  200. func (cli *socketClient) SetOptionAsync(key string, value string) *ReqRes {
  201. return cli.queueRequest(types.ToRequestSetOption(key, value))
  202. }
  203. func (cli *socketClient) DeliverTxAsync(tx []byte) *ReqRes {
  204. return cli.queueRequest(types.ToRequestDeliverTx(tx))
  205. }
  206. func (cli *socketClient) CheckTxAsync(tx []byte) *ReqRes {
  207. return cli.queueRequest(types.ToRequestCheckTx(tx))
  208. }
  209. func (cli *socketClient) QueryAsync(reqQuery types.RequestQuery) *ReqRes {
  210. return cli.queueRequest(types.ToRequestQuery(reqQuery))
  211. }
  212. func (cli *socketClient) CommitAsync() *ReqRes {
  213. return cli.queueRequest(types.ToRequestCommit())
  214. }
  215. func (cli *socketClient) InitChainAsync(validators []*types.Validator) *ReqRes {
  216. return cli.queueRequest(types.ToRequestInitChain(validators))
  217. }
  218. func (cli *socketClient) BeginBlockAsync(hash []byte, header *types.Header) *ReqRes {
  219. return cli.queueRequest(types.ToRequestBeginBlock(hash, header))
  220. }
  221. func (cli *socketClient) EndBlockAsync(height uint64) *ReqRes {
  222. return cli.queueRequest(types.ToRequestEndBlock(height))
  223. }
  224. //----------------------------------------
  225. func (cli *socketClient) EchoSync(msg string) (res types.Result) {
  226. reqres := cli.queueRequest(types.ToRequestEcho(msg))
  227. cli.FlushSync()
  228. if err := cli.Error(); err != nil {
  229. return types.ErrInternalError.SetLog(err.Error())
  230. }
  231. resp := reqres.Response.GetEcho()
  232. return types.Result{Code: OK, Data: []byte(resp.Message)}
  233. }
  234. func (cli *socketClient) FlushSync() error {
  235. reqRes := cli.queueRequest(types.ToRequestFlush())
  236. if err := cli.Error(); err != nil {
  237. return types.ErrInternalError.SetLog(err.Error())
  238. }
  239. reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here
  240. return cli.Error()
  241. }
  242. func (cli *socketClient) InfoSync() (resInfo types.ResponseInfo, err error) {
  243. reqres := cli.queueRequest(types.ToRequestInfo())
  244. cli.FlushSync()
  245. if err := cli.Error(); err != nil {
  246. return resInfo, err
  247. }
  248. if resInfo_ := reqres.Response.GetInfo(); resInfo_ != nil {
  249. return *resInfo_, nil
  250. }
  251. return resInfo, nil
  252. }
  253. func (cli *socketClient) SetOptionSync(key string, value string) (res types.Result) {
  254. reqres := cli.queueRequest(types.ToRequestSetOption(key, value))
  255. cli.FlushSync()
  256. if err := cli.Error(); err != nil {
  257. return types.ErrInternalError.SetLog(err.Error())
  258. }
  259. resp := reqres.Response.GetSetOption()
  260. return types.Result{Code: OK, Data: nil, Log: resp.Log}
  261. }
  262. func (cli *socketClient) DeliverTxSync(tx []byte) (res types.Result) {
  263. reqres := cli.queueRequest(types.ToRequestDeliverTx(tx))
  264. cli.FlushSync()
  265. if err := cli.Error(); err != nil {
  266. return types.ErrInternalError.SetLog(err.Error())
  267. }
  268. resp := reqres.Response.GetDeliverTx()
  269. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  270. }
  271. func (cli *socketClient) CheckTxSync(tx []byte) (res types.Result) {
  272. reqres := cli.queueRequest(types.ToRequestCheckTx(tx))
  273. cli.FlushSync()
  274. if err := cli.Error(); err != nil {
  275. return types.ErrInternalError.SetLog(err.Error())
  276. }
  277. resp := reqres.Response.GetCheckTx()
  278. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  279. }
  280. func (cli *socketClient) QuerySync(reqQuery types.RequestQuery) (resQuery types.ResponseQuery, err error) {
  281. reqres := cli.queueRequest(types.ToRequestQuery(reqQuery))
  282. cli.FlushSync()
  283. if err := cli.Error(); err != nil {
  284. return resQuery, err
  285. }
  286. if resQuery_ := reqres.Response.GetQuery(); resQuery_ != nil {
  287. return *resQuery_, nil
  288. }
  289. return resQuery, nil
  290. }
  291. func (cli *socketClient) CommitSync() (res types.Result) {
  292. reqres := cli.queueRequest(types.ToRequestCommit())
  293. cli.FlushSync()
  294. if err := cli.Error(); err != nil {
  295. return types.ErrInternalError.SetLog(err.Error())
  296. }
  297. resp := reqres.Response.GetCommit()
  298. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  299. }
  300. func (cli *socketClient) InitChainSync(validators []*types.Validator) (err error) {
  301. cli.queueRequest(types.ToRequestInitChain(validators))
  302. cli.FlushSync()
  303. if err := cli.Error(); err != nil {
  304. return err
  305. }
  306. return nil
  307. }
  308. func (cli *socketClient) BeginBlockSync(hash []byte, header *types.Header) (err error) {
  309. cli.queueRequest(types.ToRequestBeginBlock(hash, header))
  310. cli.FlushSync()
  311. if err := cli.Error(); err != nil {
  312. return err
  313. }
  314. return nil
  315. }
  316. func (cli *socketClient) EndBlockSync(height uint64) (resEndBlock types.ResponseEndBlock, err error) {
  317. reqres := cli.queueRequest(types.ToRequestEndBlock(height))
  318. cli.FlushSync()
  319. if err := cli.Error(); err != nil {
  320. return resEndBlock, err
  321. }
  322. if blk := reqres.Response.GetEndBlock(); blk != nil {
  323. return *blk, nil
  324. }
  325. return resEndBlock, nil
  326. }
  327. //----------------------------------------
  328. func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
  329. reqres := NewReqRes(req)
  330. // TODO: set cli.err if reqQueue times out
  331. cli.reqQueue <- reqres
  332. // Maybe auto-flush, or unset auto-flush
  333. switch req.Value.(type) {
  334. case *types.Request_Flush:
  335. cli.flushTimer.Unset()
  336. default:
  337. cli.flushTimer.Set()
  338. }
  339. return reqres
  340. }
  341. func (cli *socketClient) flushQueue() {
  342. LOOP:
  343. for {
  344. select {
  345. case reqres := <-cli.reqQueue:
  346. reqres.Done()
  347. default:
  348. break LOOP
  349. }
  350. }
  351. }
  352. //----------------------------------------
  353. func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
  354. switch req.Value.(type) {
  355. case *types.Request_Echo:
  356. _, ok = res.Value.(*types.Response_Echo)
  357. case *types.Request_Flush:
  358. _, ok = res.Value.(*types.Response_Flush)
  359. case *types.Request_Info:
  360. _, ok = res.Value.(*types.Response_Info)
  361. case *types.Request_SetOption:
  362. _, ok = res.Value.(*types.Response_SetOption)
  363. case *types.Request_DeliverTx:
  364. _, ok = res.Value.(*types.Response_DeliverTx)
  365. case *types.Request_CheckTx:
  366. _, ok = res.Value.(*types.Response_CheckTx)
  367. case *types.Request_Commit:
  368. _, ok = res.Value.(*types.Response_Commit)
  369. case *types.Request_Query:
  370. _, ok = res.Value.(*types.Response_Query)
  371. case *types.Request_InitChain:
  372. _, ok = res.Value.(*types.Response_InitChain)
  373. case *types.Request_BeginBlock:
  374. _, ok = res.Value.(*types.Response_BeginBlock)
  375. case *types.Request_EndBlock:
  376. _, ok = res.Value.(*types.Response_EndBlock)
  377. }
  378. return ok
  379. }