You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

514 lines
14 KiB

8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
7 years ago
7 years ago
7 years ago
8 years ago
  1. package abcicli
  2. import (
  3. "bufio"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "reflect"
  10. "time"
  11. "github.com/tendermint/tendermint/abci/types"
  12. tmnet "github.com/tendermint/tendermint/libs/net"
  13. "github.com/tendermint/tendermint/libs/service"
  14. tmsync "github.com/tendermint/tendermint/libs/sync"
  15. "github.com/tendermint/tendermint/libs/timer"
  16. )
  17. const (
  18. reqQueueSize = 256 // TODO make configurable
  19. flushThrottleMS = 20 // Don't wait longer than...
  20. )
  21. // This is goroutine-safe, but users should beware that the application in
  22. // general is not meant to be interfaced with concurrent callers.
  23. type socketClient struct {
  24. service.BaseService
  25. addr string
  26. mustConnect bool
  27. conn net.Conn
  28. reqQueue chan *ReqRes
  29. flushTimer *timer.ThrottleTimer
  30. mtx tmsync.RWMutex
  31. err error
  32. reqSent *list.List // list of requests sent, waiting for response
  33. resCb func(*types.Request, *types.Response) // called on all requests, if set.
  34. }
  35. var _ Client = (*socketClient)(nil)
  36. // NewSocketClient creates a new socket client, which connects to a given
  37. // address. If mustConnect is true, the client will return an error upon start
  38. // if it fails to connect.
  39. func NewSocketClient(addr string, mustConnect bool) Client {
  40. cli := &socketClient{
  41. reqQueue: make(chan *ReqRes, reqQueueSize),
  42. flushTimer: timer.NewThrottleTimer("socketClient", flushThrottleMS),
  43. mustConnect: mustConnect,
  44. addr: addr,
  45. reqSent: list.New(),
  46. resCb: nil,
  47. }
  48. cli.BaseService = *service.NewBaseService(nil, "socketClient", cli)
  49. return cli
  50. }
  51. // OnStart implements Service by connecting to the server and spawning reading
  52. // and writing goroutines.
  53. func (cli *socketClient) OnStart() error {
  54. var (
  55. err error
  56. conn net.Conn
  57. )
  58. for {
  59. conn, err = tmnet.Connect(cli.addr)
  60. if err != nil {
  61. if cli.mustConnect {
  62. return err
  63. }
  64. cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying after %vs...",
  65. cli.addr, dialRetryIntervalSeconds), "err", err)
  66. time.Sleep(time.Second * dialRetryIntervalSeconds)
  67. continue
  68. }
  69. cli.conn = conn
  70. go cli.sendRequestsRoutine(conn)
  71. go cli.recvResponseRoutine(conn)
  72. return nil
  73. }
  74. }
  75. // OnStop implements Service by closing connection and flushing all queues.
  76. func (cli *socketClient) OnStop() {
  77. if cli.conn != nil {
  78. cli.conn.Close()
  79. }
  80. cli.flushQueue()
  81. cli.flushTimer.Stop()
  82. }
  83. // Error returns an error if the client was stopped abruptly.
  84. func (cli *socketClient) Error() error {
  85. cli.mtx.RLock()
  86. defer cli.mtx.RUnlock()
  87. return cli.err
  88. }
  89. // SetResponseCallback sets a callback, which will be executed for each
  90. // non-error & non-empty response from the server.
  91. //
  92. // NOTE: callback may get internally generated flush responses.
  93. func (cli *socketClient) SetResponseCallback(resCb Callback) {
  94. cli.mtx.Lock()
  95. defer cli.mtx.Unlock()
  96. cli.resCb = resCb
  97. }
  98. //----------------------------------------
  99. func (cli *socketClient) sendRequestsRoutine(conn io.Writer) {
  100. w := bufio.NewWriter(conn)
  101. for {
  102. select {
  103. case reqres := <-cli.reqQueue:
  104. // cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
  105. cli.willSendReq(reqres)
  106. err := types.WriteMessage(reqres.Request, w)
  107. if err != nil {
  108. cli.stopForError(fmt.Errorf("write to buffer: %w", err))
  109. return
  110. }
  111. // If it's a flush request, flush the current buffer.
  112. if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
  113. err = w.Flush()
  114. if err != nil {
  115. cli.stopForError(fmt.Errorf("flush buffer: %w", err))
  116. return
  117. }
  118. }
  119. case <-cli.flushTimer.Ch: // flush queue
  120. select {
  121. case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
  122. default:
  123. // Probably will fill the buffer, or retry later.
  124. }
  125. case <-cli.Quit():
  126. return
  127. }
  128. }
  129. }
  130. func (cli *socketClient) recvResponseRoutine(conn io.Reader) {
  131. r := bufio.NewReader(conn)
  132. for {
  133. var res = &types.Response{}
  134. err := types.ReadMessage(r, res)
  135. if err != nil {
  136. cli.stopForError(fmt.Errorf("read message: %w", err))
  137. return
  138. }
  139. // cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  140. switch r := res.Value.(type) {
  141. case *types.Response_Exception: // app responded with error
  142. // XXX After setting cli.err, release waiters (e.g. reqres.Done())
  143. cli.stopForError(errors.New(r.Exception.Error))
  144. return
  145. default:
  146. err := cli.didRecvResponse(res)
  147. if err != nil {
  148. cli.stopForError(err)
  149. return
  150. }
  151. }
  152. }
  153. }
  154. func (cli *socketClient) willSendReq(reqres *ReqRes) {
  155. cli.mtx.Lock()
  156. defer cli.mtx.Unlock()
  157. cli.reqSent.PushBack(reqres)
  158. }
  159. func (cli *socketClient) didRecvResponse(res *types.Response) error {
  160. cli.mtx.Lock()
  161. defer cli.mtx.Unlock()
  162. // Get the first ReqRes.
  163. next := cli.reqSent.Front()
  164. if next == nil {
  165. return fmt.Errorf("unexpected %v when nothing expected", reflect.TypeOf(res.Value))
  166. }
  167. reqres := next.Value.(*ReqRes)
  168. if !resMatchesReq(reqres.Request, res) {
  169. return fmt.Errorf("unexpected %v when response to %v expected",
  170. reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
  171. }
  172. reqres.Response = res
  173. reqres.Done() // release waiters
  174. cli.reqSent.Remove(next) // pop first item from linked list
  175. // Notify client listener if set (global callback).
  176. if cli.resCb != nil {
  177. cli.resCb(reqres.Request, res)
  178. }
  179. // Notify reqRes listener if set (request specific callback).
  180. //
  181. // NOTE: It is possible this callback isn't set on the reqres object. At this
  182. // point, in which case it will be called after, when it is set.
  183. reqres.InvokeCallback()
  184. return nil
  185. }
  186. //----------------------------------------
  187. func (cli *socketClient) EchoAsync(msg string) *ReqRes {
  188. return cli.queueRequest(types.ToRequestEcho(msg))
  189. }
  190. func (cli *socketClient) FlushAsync() *ReqRes {
  191. return cli.queueRequest(types.ToRequestFlush())
  192. }
  193. func (cli *socketClient) InfoAsync(req types.RequestInfo) *ReqRes {
  194. return cli.queueRequest(types.ToRequestInfo(req))
  195. }
  196. func (cli *socketClient) SetOptionAsync(req types.RequestSetOption) *ReqRes {
  197. return cli.queueRequest(types.ToRequestSetOption(req))
  198. }
  199. func (cli *socketClient) DeliverTxAsync(req types.RequestDeliverTx) *ReqRes {
  200. return cli.queueRequest(types.ToRequestDeliverTx(req))
  201. }
  202. func (cli *socketClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
  203. return cli.queueRequest(types.ToRequestCheckTx(req))
  204. }
  205. func (cli *socketClient) QueryAsync(req types.RequestQuery) *ReqRes {
  206. return cli.queueRequest(types.ToRequestQuery(req))
  207. }
  208. func (cli *socketClient) CommitAsync() *ReqRes {
  209. return cli.queueRequest(types.ToRequestCommit())
  210. }
  211. func (cli *socketClient) InitChainAsync(req types.RequestInitChain) *ReqRes {
  212. return cli.queueRequest(types.ToRequestInitChain(req))
  213. }
  214. func (cli *socketClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes {
  215. return cli.queueRequest(types.ToRequestBeginBlock(req))
  216. }
  217. func (cli *socketClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes {
  218. return cli.queueRequest(types.ToRequestEndBlock(req))
  219. }
  220. func (cli *socketClient) ListSnapshotsAsync(req types.RequestListSnapshots) *ReqRes {
  221. return cli.queueRequest(types.ToRequestListSnapshots(req))
  222. }
  223. func (cli *socketClient) OfferSnapshotAsync(req types.RequestOfferSnapshot) *ReqRes {
  224. return cli.queueRequest(types.ToRequestOfferSnapshot(req))
  225. }
  226. func (cli *socketClient) LoadSnapshotChunkAsync(req types.RequestLoadSnapshotChunk) *ReqRes {
  227. return cli.queueRequest(types.ToRequestLoadSnapshotChunk(req))
  228. }
  229. func (cli *socketClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotChunk) *ReqRes {
  230. return cli.queueRequest(types.ToRequestApplySnapshotChunk(req))
  231. }
  232. //----------------------------------------
  233. func (cli *socketClient) FlushSync() error {
  234. reqRes := cli.queueRequest(types.ToRequestFlush())
  235. if err := cli.Error(); err != nil {
  236. return err
  237. }
  238. reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here
  239. return cli.Error()
  240. }
  241. func (cli *socketClient) EchoSync(msg string) (*types.ResponseEcho, error) {
  242. reqres := cli.queueRequest(types.ToRequestEcho(msg))
  243. if err := cli.FlushSync(); err != nil {
  244. return nil, err
  245. }
  246. return reqres.Response.GetEcho(), cli.Error()
  247. }
  248. func (cli *socketClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
  249. reqres := cli.queueRequest(types.ToRequestInfo(req))
  250. if err := cli.FlushSync(); err != nil {
  251. return nil, err
  252. }
  253. return reqres.Response.GetInfo(), cli.Error()
  254. }
  255. func (cli *socketClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) {
  256. reqres := cli.queueRequest(types.ToRequestSetOption(req))
  257. if err := cli.FlushSync(); err != nil {
  258. return nil, err
  259. }
  260. return reqres.Response.GetSetOption(), cli.Error()
  261. }
  262. func (cli *socketClient) DeliverTxSync(req types.RequestDeliverTx) (*types.ResponseDeliverTx, error) {
  263. reqres := cli.queueRequest(types.ToRequestDeliverTx(req))
  264. if err := cli.FlushSync(); err != nil {
  265. return nil, err
  266. }
  267. return reqres.Response.GetDeliverTx(), cli.Error()
  268. }
  269. func (cli *socketClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
  270. reqres := cli.queueRequest(types.ToRequestCheckTx(req))
  271. if err := cli.FlushSync(); err != nil {
  272. return nil, err
  273. }
  274. return reqres.Response.GetCheckTx(), cli.Error()
  275. }
  276. func (cli *socketClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
  277. reqres := cli.queueRequest(types.ToRequestQuery(req))
  278. if err := cli.FlushSync(); err != nil {
  279. return nil, err
  280. }
  281. return reqres.Response.GetQuery(), cli.Error()
  282. }
  283. func (cli *socketClient) CommitSync() (*types.ResponseCommit, error) {
  284. reqres := cli.queueRequest(types.ToRequestCommit())
  285. if err := cli.FlushSync(); err != nil {
  286. return nil, err
  287. }
  288. return reqres.Response.GetCommit(), cli.Error()
  289. }
  290. func (cli *socketClient) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) {
  291. reqres := cli.queueRequest(types.ToRequestInitChain(req))
  292. if err := cli.FlushSync(); err != nil {
  293. return nil, err
  294. }
  295. return reqres.Response.GetInitChain(), cli.Error()
  296. }
  297. func (cli *socketClient) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) {
  298. reqres := cli.queueRequest(types.ToRequestBeginBlock(req))
  299. if err := cli.FlushSync(); err != nil {
  300. return nil, err
  301. }
  302. return reqres.Response.GetBeginBlock(), cli.Error()
  303. }
  304. func (cli *socketClient) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) {
  305. reqres := cli.queueRequest(types.ToRequestEndBlock(req))
  306. if err := cli.FlushSync(); err != nil {
  307. return nil, err
  308. }
  309. return reqres.Response.GetEndBlock(), cli.Error()
  310. }
  311. func (cli *socketClient) ListSnapshotsSync(req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
  312. reqres := cli.queueRequest(types.ToRequestListSnapshots(req))
  313. if err := cli.FlushSync(); err != nil {
  314. return nil, err
  315. }
  316. return reqres.Response.GetListSnapshots(), cli.Error()
  317. }
  318. func (cli *socketClient) OfferSnapshotSync(req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
  319. reqres := cli.queueRequest(types.ToRequestOfferSnapshot(req))
  320. if err := cli.FlushSync(); err != nil {
  321. return nil, err
  322. }
  323. return reqres.Response.GetOfferSnapshot(), cli.Error()
  324. }
  325. func (cli *socketClient) LoadSnapshotChunkSync(
  326. req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
  327. reqres := cli.queueRequest(types.ToRequestLoadSnapshotChunk(req))
  328. if err := cli.FlushSync(); err != nil {
  329. return nil, err
  330. }
  331. return reqres.Response.GetLoadSnapshotChunk(), cli.Error()
  332. }
  333. func (cli *socketClient) ApplySnapshotChunkSync(
  334. req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
  335. reqres := cli.queueRequest(types.ToRequestApplySnapshotChunk(req))
  336. if err := cli.FlushSync(); err != nil {
  337. return nil, err
  338. }
  339. return reqres.Response.GetApplySnapshotChunk(), cli.Error()
  340. }
  341. //----------------------------------------
  342. func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
  343. reqres := NewReqRes(req)
  344. // TODO: set cli.err if reqQueue times out
  345. cli.reqQueue <- reqres
  346. // Maybe auto-flush, or unset auto-flush
  347. switch req.Value.(type) {
  348. case *types.Request_Flush:
  349. cli.flushTimer.Unset()
  350. default:
  351. cli.flushTimer.Set()
  352. }
  353. return reqres
  354. }
  355. func (cli *socketClient) flushQueue() {
  356. cli.mtx.Lock()
  357. defer cli.mtx.Unlock()
  358. // mark all in-flight messages as resolved (they will get cli.Error())
  359. for req := cli.reqSent.Front(); req != nil; req = req.Next() {
  360. reqres := req.Value.(*ReqRes)
  361. reqres.Done()
  362. }
  363. // mark all queued messages as resolved
  364. LOOP:
  365. for {
  366. select {
  367. case reqres := <-cli.reqQueue:
  368. reqres.Done()
  369. default:
  370. break LOOP
  371. }
  372. }
  373. }
  374. //----------------------------------------
  375. func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
  376. switch req.Value.(type) {
  377. case *types.Request_Echo:
  378. _, ok = res.Value.(*types.Response_Echo)
  379. case *types.Request_Flush:
  380. _, ok = res.Value.(*types.Response_Flush)
  381. case *types.Request_Info:
  382. _, ok = res.Value.(*types.Response_Info)
  383. case *types.Request_SetOption:
  384. _, ok = res.Value.(*types.Response_SetOption)
  385. case *types.Request_DeliverTx:
  386. _, ok = res.Value.(*types.Response_DeliverTx)
  387. case *types.Request_CheckTx:
  388. _, ok = res.Value.(*types.Response_CheckTx)
  389. case *types.Request_Commit:
  390. _, ok = res.Value.(*types.Response_Commit)
  391. case *types.Request_Query:
  392. _, ok = res.Value.(*types.Response_Query)
  393. case *types.Request_InitChain:
  394. _, ok = res.Value.(*types.Response_InitChain)
  395. case *types.Request_BeginBlock:
  396. _, ok = res.Value.(*types.Response_BeginBlock)
  397. case *types.Request_EndBlock:
  398. _, ok = res.Value.(*types.Response_EndBlock)
  399. case *types.Request_ApplySnapshotChunk:
  400. _, ok = res.Value.(*types.Response_ApplySnapshotChunk)
  401. case *types.Request_LoadSnapshotChunk:
  402. _, ok = res.Value.(*types.Response_LoadSnapshotChunk)
  403. case *types.Request_ListSnapshots:
  404. _, ok = res.Value.(*types.Response_ListSnapshots)
  405. case *types.Request_OfferSnapshot:
  406. _, ok = res.Value.(*types.Response_OfferSnapshot)
  407. }
  408. return ok
  409. }
  410. func (cli *socketClient) stopForError(err error) {
  411. if !cli.IsRunning() {
  412. return
  413. }
  414. cli.mtx.Lock()
  415. if cli.err == nil {
  416. cli.err = err
  417. }
  418. cli.mtx.Unlock()
  419. cli.Logger.Error(fmt.Sprintf("Stopping abci.socketClient for error: %v", err.Error()))
  420. if err := cli.Stop(); err != nil {
  421. cli.Logger.Error("Error stopping abci.socketClient", "err", err)
  422. }
  423. }