You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

399 lines
10 KiB

  1. package abcicli
  2. import (
  3. "bufio"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "reflect"
  9. "sync"
  10. "time"
  11. "github.com/tendermint/tendermint/abci/types"
  12. cmn "github.com/tendermint/tendermint/libs/common"
  13. )
  14. const reqQueueSize = 256 // TODO make configurable
  15. // const maxResponseSize = 1048576 // 1MB TODO make configurable
  16. const flushThrottleMS = 20 // Don't wait longer than...
  17. var _ Client = (*socketClient)(nil)
  18. // This is goroutine-safe, but users should beware that
  19. // the application in general is not meant to be interfaced
  20. // with concurrent callers.
  21. type socketClient struct {
  22. cmn.BaseService
  23. reqQueue chan *ReqRes
  24. flushTimer *cmn.ThrottleTimer
  25. mustConnect bool
  26. mtx sync.Mutex
  27. addr string
  28. conn net.Conn
  29. err error
  30. reqSent *list.List
  31. resCb func(*types.Request, *types.Response) // listens to all callbacks
  32. }
  33. func NewSocketClient(addr string, mustConnect bool) *socketClient {
  34. cli := &socketClient{
  35. reqQueue: make(chan *ReqRes, reqQueueSize),
  36. flushTimer: cmn.NewThrottleTimer("socketClient", flushThrottleMS),
  37. mustConnect: mustConnect,
  38. addr: addr,
  39. reqSent: list.New(),
  40. resCb: nil,
  41. }
  42. cli.BaseService = *cmn.NewBaseService(nil, "socketClient", cli)
  43. return cli
  44. }
  45. func (cli *socketClient) OnStart() error {
  46. if err := cli.BaseService.OnStart(); err != nil {
  47. return err
  48. }
  49. var err error
  50. var conn net.Conn
  51. RETRY_LOOP:
  52. for {
  53. conn, err = cmn.Connect(cli.addr)
  54. if err != nil {
  55. if cli.mustConnect {
  56. return err
  57. }
  58. cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying...", cli.addr))
  59. time.Sleep(time.Second * dialRetryIntervalSeconds)
  60. continue RETRY_LOOP
  61. }
  62. cli.conn = conn
  63. go cli.sendRequestsRoutine(conn)
  64. go cli.recvResponseRoutine(conn)
  65. return nil
  66. }
  67. }
  68. func (cli *socketClient) OnStop() {
  69. cli.BaseService.OnStop()
  70. cli.mtx.Lock()
  71. defer cli.mtx.Unlock()
  72. if cli.conn != nil {
  73. cli.conn.Close()
  74. }
  75. cli.flushQueue()
  76. }
  77. // Stop the client and set the error
  78. func (cli *socketClient) StopForError(err error) {
  79. if !cli.IsRunning() {
  80. return
  81. }
  82. cli.mtx.Lock()
  83. if cli.err == nil {
  84. cli.err = err
  85. }
  86. cli.mtx.Unlock()
  87. cli.Logger.Error(fmt.Sprintf("Stopping abci.socketClient for error: %v", err.Error()))
  88. cli.Stop()
  89. }
  90. func (cli *socketClient) Error() error {
  91. cli.mtx.Lock()
  92. defer cli.mtx.Unlock()
  93. return cli.err
  94. }
  95. // Set listener for all responses
  96. // NOTE: callback may get internally generated flush responses.
  97. func (cli *socketClient) SetResponseCallback(resCb Callback) {
  98. cli.mtx.Lock()
  99. defer cli.mtx.Unlock()
  100. cli.resCb = resCb
  101. }
  102. //----------------------------------------
  103. func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
  104. w := bufio.NewWriter(conn)
  105. for {
  106. select {
  107. case <-cli.flushTimer.Ch:
  108. select {
  109. case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
  110. default:
  111. // Probably will fill the buffer, or retry later.
  112. }
  113. case <-cli.Quit():
  114. return
  115. case reqres := <-cli.reqQueue:
  116. cli.willSendReq(reqres)
  117. err := types.WriteMessage(reqres.Request, w)
  118. if err != nil {
  119. cli.StopForError(fmt.Errorf("Error writing msg: %v", err))
  120. return
  121. }
  122. // cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
  123. if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
  124. err = w.Flush()
  125. if err != nil {
  126. cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))
  127. return
  128. }
  129. }
  130. }
  131. }
  132. }
  133. func (cli *socketClient) recvResponseRoutine(conn net.Conn) {
  134. r := bufio.NewReader(conn) // Buffer reads
  135. for {
  136. var res = &types.Response{}
  137. err := types.ReadMessage(r, res)
  138. if err != nil {
  139. cli.StopForError(err)
  140. return
  141. }
  142. switch r := res.Value.(type) {
  143. case *types.Response_Exception:
  144. // XXX After setting cli.err, release waiters (e.g. reqres.Done())
  145. cli.StopForError(errors.New(r.Exception.Error))
  146. return
  147. default:
  148. // cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  149. err := cli.didRecvResponse(res)
  150. if err != nil {
  151. cli.StopForError(err)
  152. return
  153. }
  154. }
  155. }
  156. }
  157. func (cli *socketClient) willSendReq(reqres *ReqRes) {
  158. cli.mtx.Lock()
  159. defer cli.mtx.Unlock()
  160. cli.reqSent.PushBack(reqres)
  161. }
  162. func (cli *socketClient) didRecvResponse(res *types.Response) error {
  163. cli.mtx.Lock()
  164. defer cli.mtx.Unlock()
  165. // Get the first ReqRes
  166. next := cli.reqSent.Front()
  167. if next == nil {
  168. return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
  169. }
  170. reqres := next.Value.(*ReqRes)
  171. if !resMatchesReq(reqres.Request, res) {
  172. return fmt.Errorf("Unexpected result type %v when response to %v expected",
  173. reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
  174. }
  175. reqres.Response = res // Set response
  176. reqres.Done() // Release waiters
  177. cli.reqSent.Remove(next) // Pop first item from linked list
  178. // Notify reqRes listener if set
  179. if cb := reqres.GetCallback(); cb != nil {
  180. cb(res)
  181. }
  182. // Notify client listener if set
  183. if cli.resCb != nil {
  184. cli.resCb(reqres.Request, res)
  185. }
  186. return nil
  187. }
  188. //----------------------------------------
  189. func (cli *socketClient) EchoAsync(msg string) *ReqRes {
  190. return cli.queueRequest(types.ToRequestEcho(msg))
  191. }
  192. func (cli *socketClient) FlushAsync() *ReqRes {
  193. return cli.queueRequest(types.ToRequestFlush())
  194. }
  195. func (cli *socketClient) InfoAsync(req types.RequestInfo) *ReqRes {
  196. return cli.queueRequest(types.ToRequestInfo(req))
  197. }
  198. func (cli *socketClient) SetOptionAsync(req types.RequestSetOption) *ReqRes {
  199. return cli.queueRequest(types.ToRequestSetOption(req))
  200. }
  201. func (cli *socketClient) DeliverTxAsync(tx []byte) *ReqRes {
  202. return cli.queueRequest(types.ToRequestDeliverTx(tx))
  203. }
  204. func (cli *socketClient) CheckTxAsync(tx []byte) *ReqRes {
  205. return cli.queueRequest(types.ToRequestCheckTx(tx))
  206. }
  207. func (cli *socketClient) QueryAsync(req types.RequestQuery) *ReqRes {
  208. return cli.queueRequest(types.ToRequestQuery(req))
  209. }
  210. func (cli *socketClient) CommitAsync() *ReqRes {
  211. return cli.queueRequest(types.ToRequestCommit())
  212. }
  213. func (cli *socketClient) InitChainAsync(req types.RequestInitChain) *ReqRes {
  214. return cli.queueRequest(types.ToRequestInitChain(req))
  215. }
  216. func (cli *socketClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes {
  217. return cli.queueRequest(types.ToRequestBeginBlock(req))
  218. }
  219. func (cli *socketClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes {
  220. return cli.queueRequest(types.ToRequestEndBlock(req))
  221. }
  222. //----------------------------------------
  223. func (cli *socketClient) FlushSync() error {
  224. reqRes := cli.queueRequest(types.ToRequestFlush())
  225. if err := cli.Error(); err != nil {
  226. return err
  227. }
  228. reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here
  229. return cli.Error()
  230. }
  231. func (cli *socketClient) EchoSync(msg string) (*types.ResponseEcho, error) {
  232. reqres := cli.queueRequest(types.ToRequestEcho(msg))
  233. cli.FlushSync()
  234. return reqres.Response.GetEcho(), cli.Error()
  235. }
  236. func (cli *socketClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
  237. reqres := cli.queueRequest(types.ToRequestInfo(req))
  238. cli.FlushSync()
  239. return reqres.Response.GetInfo(), cli.Error()
  240. }
  241. func (cli *socketClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) {
  242. reqres := cli.queueRequest(types.ToRequestSetOption(req))
  243. cli.FlushSync()
  244. return reqres.Response.GetSetOption(), cli.Error()
  245. }
  246. func (cli *socketClient) DeliverTxSync(tx []byte) (*types.ResponseDeliverTx, error) {
  247. reqres := cli.queueRequest(types.ToRequestDeliverTx(tx))
  248. cli.FlushSync()
  249. return reqres.Response.GetDeliverTx(), cli.Error()
  250. }
  251. func (cli *socketClient) CheckTxSync(tx []byte) (*types.ResponseCheckTx, error) {
  252. reqres := cli.queueRequest(types.ToRequestCheckTx(tx))
  253. cli.FlushSync()
  254. return reqres.Response.GetCheckTx(), cli.Error()
  255. }
  256. func (cli *socketClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
  257. reqres := cli.queueRequest(types.ToRequestQuery(req))
  258. cli.FlushSync()
  259. return reqres.Response.GetQuery(), cli.Error()
  260. }
  261. func (cli *socketClient) CommitSync() (*types.ResponseCommit, error) {
  262. reqres := cli.queueRequest(types.ToRequestCommit())
  263. cli.FlushSync()
  264. return reqres.Response.GetCommit(), cli.Error()
  265. }
  266. func (cli *socketClient) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) {
  267. reqres := cli.queueRequest(types.ToRequestInitChain(req))
  268. cli.FlushSync()
  269. return reqres.Response.GetInitChain(), cli.Error()
  270. }
  271. func (cli *socketClient) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) {
  272. reqres := cli.queueRequest(types.ToRequestBeginBlock(req))
  273. cli.FlushSync()
  274. return reqres.Response.GetBeginBlock(), cli.Error()
  275. }
  276. func (cli *socketClient) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) {
  277. reqres := cli.queueRequest(types.ToRequestEndBlock(req))
  278. cli.FlushSync()
  279. return reqres.Response.GetEndBlock(), cli.Error()
  280. }
  281. //----------------------------------------
  282. func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
  283. reqres := NewReqRes(req)
  284. // TODO: set cli.err if reqQueue times out
  285. cli.reqQueue <- reqres
  286. // Maybe auto-flush, or unset auto-flush
  287. switch req.Value.(type) {
  288. case *types.Request_Flush:
  289. cli.flushTimer.Unset()
  290. default:
  291. cli.flushTimer.Set()
  292. }
  293. return reqres
  294. }
  295. func (cli *socketClient) flushQueue() {
  296. LOOP:
  297. for {
  298. select {
  299. case reqres := <-cli.reqQueue:
  300. reqres.Done()
  301. default:
  302. break LOOP
  303. }
  304. }
  305. }
  306. //----------------------------------------
  307. func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
  308. switch req.Value.(type) {
  309. case *types.Request_Echo:
  310. _, ok = res.Value.(*types.Response_Echo)
  311. case *types.Request_Flush:
  312. _, ok = res.Value.(*types.Response_Flush)
  313. case *types.Request_Info:
  314. _, ok = res.Value.(*types.Response_Info)
  315. case *types.Request_SetOption:
  316. _, ok = res.Value.(*types.Response_SetOption)
  317. case *types.Request_DeliverTx:
  318. _, ok = res.Value.(*types.Response_DeliverTx)
  319. case *types.Request_CheckTx:
  320. _, ok = res.Value.(*types.Response_CheckTx)
  321. case *types.Request_Commit:
  322. _, ok = res.Value.(*types.Response_Commit)
  323. case *types.Request_Query:
  324. _, ok = res.Value.(*types.Response_Query)
  325. case *types.Request_InitChain:
  326. _, ok = res.Value.(*types.Response_InitChain)
  327. case *types.Request_BeginBlock:
  328. _, ok = res.Value.(*types.Response_BeginBlock)
  329. case *types.Request_EndBlock:
  330. _, ok = res.Value.(*types.Response_EndBlock)
  331. }
  332. return ok
  333. }