You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

463 lines
12 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package abcicli
  2. import (
  3. "bufio"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "reflect"
  9. "sync"
  10. "time"
  11. . "github.com/tendermint/go-common"
  12. "github.com/tendermint/abci/types"
  13. )
  14. const (
  15. OK = types.CodeType_OK
  16. LOG = ""
  17. )
  18. const reqQueueSize = 256 // TODO make configurable
  19. const maxResponseSize = 1048576 // 1MB TODO make configurable
  20. const flushThrottleMS = 20 // Don't wait longer than...
  21. // This is goroutine-safe, but users should beware that
  22. // the application in general is not meant to be interfaced
  23. // with concurrent callers.
  24. type socketClient struct {
  25. BaseService
  26. reqQueue chan *ReqRes
  27. flushTimer *ThrottleTimer
  28. mustConnect bool
  29. mtx sync.Mutex
  30. addr string
  31. conn net.Conn
  32. err error
  33. reqSent *list.List
  34. resCb func(*types.Request, *types.Response) // listens to all callbacks
  35. }
  36. func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) {
  37. cli := &socketClient{
  38. reqQueue: make(chan *ReqRes, reqQueueSize),
  39. flushTimer: NewThrottleTimer("socketClient", flushThrottleMS),
  40. mustConnect: mustConnect,
  41. addr: addr,
  42. reqSent: list.New(),
  43. resCb: nil,
  44. }
  45. cli.BaseService = *NewBaseService(nil, "socketClient", cli)
  46. _, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
  47. return cli, err
  48. }
  49. func (cli *socketClient) OnStart() error {
  50. cli.BaseService.OnStart()
  51. var err error
  52. var conn net.Conn
  53. RETRY_LOOP:
  54. for {
  55. conn, err = Connect(cli.addr)
  56. if err != nil {
  57. if cli.mustConnect {
  58. return err
  59. } else {
  60. log.Warn(Fmt("abci.socketClient failed to connect to %v. Retrying...", cli.addr))
  61. time.Sleep(time.Second * 3)
  62. continue RETRY_LOOP
  63. }
  64. }
  65. cli.conn = conn
  66. go cli.sendRequestsRoutine(conn)
  67. go cli.recvResponseRoutine(conn)
  68. return nil
  69. }
  70. return nil // never happens
  71. }
  72. func (cli *socketClient) OnStop() {
  73. cli.BaseService.OnStop()
  74. cli.mtx.Lock()
  75. defer cli.mtx.Unlock()
  76. if cli.conn != nil {
  77. cli.conn.Close()
  78. }
  79. cli.flushQueue()
  80. }
  81. // Stop the client and set the error
  82. func (cli *socketClient) StopForError(err error) {
  83. cli.mtx.Lock()
  84. if !cli.IsRunning() {
  85. return
  86. }
  87. if cli.err == nil {
  88. cli.err = err
  89. }
  90. cli.mtx.Unlock()
  91. log.Warn(Fmt("Stopping abci.socketClient for error: %v", err.Error()))
  92. cli.Stop()
  93. }
  94. func (cli *socketClient) Error() error {
  95. cli.mtx.Lock()
  96. defer cli.mtx.Unlock()
  97. return cli.err
  98. }
  99. // Set listener for all responses
  100. // NOTE: callback may get internally generated flush responses.
  101. func (cli *socketClient) SetResponseCallback(resCb Callback) {
  102. cli.mtx.Lock()
  103. defer cli.mtx.Unlock()
  104. cli.resCb = resCb
  105. }
  106. //----------------------------------------
  107. func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
  108. w := bufio.NewWriter(conn)
  109. for {
  110. select {
  111. case <-cli.flushTimer.Ch:
  112. select {
  113. case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
  114. default:
  115. // Probably will fill the buffer, or retry later.
  116. }
  117. case <-cli.BaseService.Quit:
  118. return
  119. case reqres := <-cli.reqQueue:
  120. cli.willSendReq(reqres)
  121. err := types.WriteMessage(reqres.Request, w)
  122. if err != nil {
  123. cli.StopForError(fmt.Errorf("Error writing msg: %v", err))
  124. return
  125. }
  126. // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
  127. if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
  128. err = w.Flush()
  129. if err != nil {
  130. cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))
  131. return
  132. }
  133. }
  134. }
  135. }
  136. }
  137. func (cli *socketClient) recvResponseRoutine(conn net.Conn) {
  138. r := bufio.NewReader(conn) // Buffer reads
  139. for {
  140. var res = &types.Response{}
  141. err := types.ReadMessage(r, res)
  142. if err != nil {
  143. cli.StopForError(err)
  144. return
  145. }
  146. switch r := res.Value.(type) {
  147. case *types.Response_Exception:
  148. // XXX After setting cli.err, release waiters (e.g. reqres.Done())
  149. cli.StopForError(errors.New(r.Exception.Error))
  150. return
  151. default:
  152. // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  153. err := cli.didRecvResponse(res)
  154. if err != nil {
  155. cli.StopForError(err)
  156. return
  157. }
  158. }
  159. }
  160. }
  161. func (cli *socketClient) willSendReq(reqres *ReqRes) {
  162. cli.mtx.Lock()
  163. defer cli.mtx.Unlock()
  164. cli.reqSent.PushBack(reqres)
  165. }
  166. func (cli *socketClient) didRecvResponse(res *types.Response) error {
  167. cli.mtx.Lock()
  168. defer cli.mtx.Unlock()
  169. // Get the first ReqRes
  170. next := cli.reqSent.Front()
  171. if next == nil {
  172. return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
  173. }
  174. reqres := next.Value.(*ReqRes)
  175. if !resMatchesReq(reqres.Request, res) {
  176. return fmt.Errorf("Unexpected result type %v when response to %v expected",
  177. reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
  178. }
  179. reqres.Response = res // Set response
  180. reqres.Done() // Release waiters
  181. cli.reqSent.Remove(next) // Pop first item from linked list
  182. // Notify reqRes listener if set
  183. if cb := reqres.GetCallback(); cb != nil {
  184. cb(res)
  185. }
  186. // Notify client listener if set
  187. if cli.resCb != nil {
  188. cli.resCb(reqres.Request, res)
  189. }
  190. return nil
  191. }
  192. //----------------------------------------
  193. func (cli *socketClient) EchoAsync(msg string) *ReqRes {
  194. return cli.queueRequest(types.ToRequestEcho(msg))
  195. }
  196. func (cli *socketClient) FlushAsync() *ReqRes {
  197. return cli.queueRequest(types.ToRequestFlush())
  198. }
  199. func (cli *socketClient) InfoAsync() *ReqRes {
  200. return cli.queueRequest(types.ToRequestInfo())
  201. }
  202. func (cli *socketClient) SetOptionAsync(key string, value string) *ReqRes {
  203. return cli.queueRequest(types.ToRequestSetOption(key, value))
  204. }
  205. func (cli *socketClient) DeliverTxAsync(tx []byte) *ReqRes {
  206. return cli.queueRequest(types.ToRequestDeliverTx(tx))
  207. }
  208. func (cli *socketClient) CheckTxAsync(tx []byte) *ReqRes {
  209. return cli.queueRequest(types.ToRequestCheckTx(tx))
  210. }
  211. func (cli *socketClient) QueryAsync(query []byte) *ReqRes {
  212. return cli.queueRequest(types.ToRequestQuery(query))
  213. }
  214. func (cli *socketClient) ProofAsync(key []byte, blockHeight int64) *ReqRes {
  215. return cli.queueRequest(types.ToRequestProof(key, blockHeight))
  216. }
  217. func (cli *socketClient) CommitAsync() *ReqRes {
  218. return cli.queueRequest(types.ToRequestCommit())
  219. }
  220. func (cli *socketClient) InitChainAsync(validators []*types.Validator) *ReqRes {
  221. return cli.queueRequest(types.ToRequestInitChain(validators))
  222. }
  223. func (cli *socketClient) BeginBlockAsync(hash []byte, header *types.Header) *ReqRes {
  224. return cli.queueRequest(types.ToRequestBeginBlock(hash, header))
  225. }
  226. func (cli *socketClient) EndBlockAsync(height uint64) *ReqRes {
  227. return cli.queueRequest(types.ToRequestEndBlock(height))
  228. }
  229. //----------------------------------------
  230. func (cli *socketClient) EchoSync(msg string) (res types.Result) {
  231. reqres := cli.queueRequest(types.ToRequestEcho(msg))
  232. cli.FlushSync()
  233. if err := cli.Error(); err != nil {
  234. return types.ErrInternalError.SetLog(err.Error())
  235. }
  236. resp := reqres.Response.GetEcho()
  237. return types.Result{Code: OK, Data: []byte(resp.Message), Log: LOG}
  238. }
  239. func (cli *socketClient) FlushSync() error {
  240. reqRes := cli.queueRequest(types.ToRequestFlush())
  241. if err := cli.Error(); err != nil {
  242. return types.ErrInternalError.SetLog(err.Error())
  243. }
  244. reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here
  245. return cli.Error()
  246. }
  247. func (cli *socketClient) InfoSync() (resInfo types.ResponseInfo, err error) {
  248. reqres := cli.queueRequest(types.ToRequestInfo())
  249. cli.FlushSync()
  250. if err := cli.Error(); err != nil {
  251. return resInfo, err
  252. }
  253. if resInfo_ := reqres.Response.GetInfo(); resInfo_ != nil {
  254. return *resInfo_, nil
  255. } else {
  256. return resInfo, nil
  257. }
  258. }
  259. func (cli *socketClient) SetOptionSync(key string, value string) (res types.Result) {
  260. reqres := cli.queueRequest(types.ToRequestSetOption(key, value))
  261. cli.FlushSync()
  262. if err := cli.Error(); err != nil {
  263. return types.ErrInternalError.SetLog(err.Error())
  264. }
  265. resp := reqres.Response.GetSetOption()
  266. return types.Result{Code: OK, Data: nil, Log: resp.Log}
  267. }
  268. func (cli *socketClient) DeliverTxSync(tx []byte) (res types.Result) {
  269. reqres := cli.queueRequest(types.ToRequestDeliverTx(tx))
  270. cli.FlushSync()
  271. if err := cli.Error(); err != nil {
  272. return types.ErrInternalError.SetLog(err.Error())
  273. }
  274. resp := reqres.Response.GetDeliverTx()
  275. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  276. }
  277. func (cli *socketClient) CheckTxSync(tx []byte) (res types.Result) {
  278. reqres := cli.queueRequest(types.ToRequestCheckTx(tx))
  279. cli.FlushSync()
  280. if err := cli.Error(); err != nil {
  281. return types.ErrInternalError.SetLog(err.Error())
  282. }
  283. resp := reqres.Response.GetCheckTx()
  284. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  285. }
  286. func (cli *socketClient) QuerySync(query []byte) (res types.Result) {
  287. reqres := cli.queueRequest(types.ToRequestQuery(query))
  288. cli.FlushSync()
  289. if err := cli.Error(); err != nil {
  290. return types.ErrInternalError.SetLog(err.Error())
  291. }
  292. resp := reqres.Response.GetQuery()
  293. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  294. }
  295. func (cli *socketClient) ProofSync(key []byte, blockHeight int64) (res types.Result) {
  296. reqres := cli.queueRequest(types.ToRequestProof(key, blockHeight))
  297. cli.FlushSync()
  298. if err := cli.Error(); err != nil {
  299. return types.ErrInternalError.SetLog(err.Error())
  300. }
  301. resp := reqres.Response.GetProof()
  302. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  303. }
  304. func (cli *socketClient) CommitSync() (res types.Result) {
  305. reqres := cli.queueRequest(types.ToRequestCommit())
  306. cli.FlushSync()
  307. if err := cli.Error(); err != nil {
  308. return types.ErrInternalError.SetLog(err.Error())
  309. }
  310. resp := reqres.Response.GetCommit()
  311. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  312. }
  313. func (cli *socketClient) InitChainSync(validators []*types.Validator) (err error) {
  314. cli.queueRequest(types.ToRequestInitChain(validators))
  315. cli.FlushSync()
  316. if err := cli.Error(); err != nil {
  317. return err
  318. }
  319. return nil
  320. }
  321. func (cli *socketClient) BeginBlockSync(hash []byte, header *types.Header) (err error) {
  322. cli.queueRequest(types.ToRequestBeginBlock(hash, header))
  323. cli.FlushSync()
  324. if err := cli.Error(); err != nil {
  325. return err
  326. }
  327. return nil
  328. }
  329. func (cli *socketClient) EndBlockSync(height uint64) (resEndBlock types.ResponseEndBlock, err error) {
  330. reqres := cli.queueRequest(types.ToRequestEndBlock(height))
  331. cli.FlushSync()
  332. if err := cli.Error(); err != nil {
  333. return resEndBlock, err
  334. }
  335. if resEndBlock_ := reqres.Response.GetEndBlock(); resEndBlock_ != nil {
  336. return *resEndBlock_, nil
  337. } else {
  338. return resEndBlock, nil
  339. }
  340. }
  341. //----------------------------------------
  342. func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
  343. reqres := NewReqRes(req)
  344. // TODO: set cli.err if reqQueue times out
  345. cli.reqQueue <- reqres
  346. // Maybe auto-flush, or unset auto-flush
  347. switch req.Value.(type) {
  348. case *types.Request_Flush:
  349. cli.flushTimer.Unset()
  350. default:
  351. cli.flushTimer.Set()
  352. }
  353. return reqres
  354. }
  355. func (cli *socketClient) flushQueue() {
  356. LOOP:
  357. for {
  358. select {
  359. case reqres := <-cli.reqQueue:
  360. reqres.Done()
  361. default:
  362. break LOOP
  363. }
  364. }
  365. }
  366. //----------------------------------------
  367. func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
  368. switch req.Value.(type) {
  369. case *types.Request_Echo:
  370. _, ok = res.Value.(*types.Response_Echo)
  371. case *types.Request_Flush:
  372. _, ok = res.Value.(*types.Response_Flush)
  373. case *types.Request_Info:
  374. _, ok = res.Value.(*types.Response_Info)
  375. case *types.Request_SetOption:
  376. _, ok = res.Value.(*types.Response_SetOption)
  377. case *types.Request_DeliverTx:
  378. _, ok = res.Value.(*types.Response_DeliverTx)
  379. case *types.Request_CheckTx:
  380. _, ok = res.Value.(*types.Response_CheckTx)
  381. case *types.Request_Commit:
  382. _, ok = res.Value.(*types.Response_Commit)
  383. case *types.Request_Query:
  384. _, ok = res.Value.(*types.Response_Query)
  385. case *types.Request_Proof:
  386. _, ok = res.Value.(*types.Response_Proof)
  387. case *types.Request_InitChain:
  388. _, ok = res.Value.(*types.Response_InitChain)
  389. case *types.Request_BeginBlock:
  390. _, ok = res.Value.(*types.Response_BeginBlock)
  391. case *types.Request_EndBlock:
  392. _, ok = res.Value.(*types.Response_EndBlock)
  393. }
  394. return ok
  395. }