You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

611 lines
16 KiB

8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
  1. package abcicli
  2. import (
  3. "bufio"
  4. "container/list"
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "reflect"
  11. "time"
  12. "github.com/tendermint/tendermint/abci/types"
  13. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  14. "github.com/tendermint/tendermint/internal/libs/timer"
  15. tmnet "github.com/tendermint/tendermint/libs/net"
  16. "github.com/tendermint/tendermint/libs/service"
  17. )
  18. const (
  19. // reqQueueSize is the max number of queued async requests.
  20. // (memory: 256MB max assuming 1MB transactions)
  21. reqQueueSize = 256
  22. // Don't wait longer than...
  23. flushThrottleMS = 20
  24. )
  25. type reqResWithContext struct {
  26. R *ReqRes
  27. C context.Context // if context.Err is not nil, reqRes will be thrown away (ignored)
  28. }
  29. // This is goroutine-safe, but users should beware that the application in
  30. // general is not meant to be interfaced with concurrent callers.
  31. type socketClient struct {
  32. service.BaseService
  33. addr string
  34. mustConnect bool
  35. conn net.Conn
  36. reqQueue chan *reqResWithContext
  37. flushTimer *timer.ThrottleTimer
  38. mtx tmsync.RWMutex
  39. err error
  40. reqSent *list.List // list of requests sent, waiting for response
  41. resCb func(*types.Request, *types.Response) // called on all requests, if set.
  42. }
  43. var _ Client = (*socketClient)(nil)
  44. // NewSocketClient creates a new socket client, which connects to a given
  45. // address. If mustConnect is true, the client will return an error upon start
  46. // if it fails to connect.
  47. func NewSocketClient(addr string, mustConnect bool) Client {
  48. cli := &socketClient{
  49. reqQueue: make(chan *reqResWithContext, reqQueueSize),
  50. flushTimer: timer.NewThrottleTimer("socketClient", flushThrottleMS),
  51. mustConnect: mustConnect,
  52. addr: addr,
  53. reqSent: list.New(),
  54. resCb: nil,
  55. }
  56. cli.BaseService = *service.NewBaseService(nil, "socketClient", cli)
  57. return cli
  58. }
  59. // OnStart implements Service by connecting to the server and spawning reading
  60. // and writing goroutines.
  61. func (cli *socketClient) OnStart() error {
  62. var (
  63. err error
  64. conn net.Conn
  65. )
  66. for {
  67. conn, err = tmnet.Connect(cli.addr)
  68. if err != nil {
  69. if cli.mustConnect {
  70. return err
  71. }
  72. cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying after %vs...",
  73. cli.addr, dialRetryIntervalSeconds), "err", err)
  74. time.Sleep(time.Second * dialRetryIntervalSeconds)
  75. continue
  76. }
  77. cli.conn = conn
  78. go cli.sendRequestsRoutine(conn)
  79. go cli.recvResponseRoutine(conn)
  80. return nil
  81. }
  82. }
  83. // OnStop implements Service by closing connection and flushing all queues.
  84. func (cli *socketClient) OnStop() {
  85. if cli.conn != nil {
  86. cli.conn.Close()
  87. }
  88. cli.flushQueue()
  89. cli.flushTimer.Stop()
  90. }
  91. // Error returns an error if the client was stopped abruptly.
  92. func (cli *socketClient) Error() error {
  93. cli.mtx.RLock()
  94. defer cli.mtx.RUnlock()
  95. return cli.err
  96. }
  97. // SetResponseCallback sets a callback, which will be executed for each
  98. // non-error & non-empty response from the server.
  99. //
  100. // NOTE: callback may get internally generated flush responses.
  101. func (cli *socketClient) SetResponseCallback(resCb Callback) {
  102. cli.mtx.Lock()
  103. defer cli.mtx.Unlock()
  104. cli.resCb = resCb
  105. }
  106. //----------------------------------------
  107. func (cli *socketClient) sendRequestsRoutine(conn io.Writer) {
  108. w := bufio.NewWriter(conn)
  109. for {
  110. select {
  111. case reqres := <-cli.reqQueue:
  112. // cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
  113. if reqres.C.Err() != nil {
  114. cli.Logger.Debug("Request's context is done", "req", reqres.R, "err", reqres.C.Err())
  115. continue
  116. }
  117. cli.willSendReq(reqres.R)
  118. err := types.WriteMessage(reqres.R.Request, w)
  119. if err != nil {
  120. cli.stopForError(fmt.Errorf("write to buffer: %w", err))
  121. return
  122. }
  123. // If it's a flush request, flush the current buffer.
  124. if _, ok := reqres.R.Request.Value.(*types.Request_Flush); ok {
  125. err = w.Flush()
  126. if err != nil {
  127. cli.stopForError(fmt.Errorf("flush buffer: %w", err))
  128. return
  129. }
  130. }
  131. case <-cli.flushTimer.Ch: // flush queue
  132. select {
  133. case cli.reqQueue <- &reqResWithContext{R: NewReqRes(types.ToRequestFlush()), C: context.Background()}:
  134. default:
  135. // Probably will fill the buffer, or retry later.
  136. }
  137. case <-cli.Quit():
  138. return
  139. }
  140. }
  141. }
  142. func (cli *socketClient) recvResponseRoutine(conn io.Reader) {
  143. r := bufio.NewReader(conn)
  144. for {
  145. var res = &types.Response{}
  146. err := types.ReadMessage(r, res)
  147. if err != nil {
  148. cli.stopForError(fmt.Errorf("read message: %w", err))
  149. return
  150. }
  151. // cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  152. switch r := res.Value.(type) {
  153. case *types.Response_Exception: // app responded with error
  154. // XXX After setting cli.err, release waiters (e.g. reqres.Done())
  155. cli.stopForError(errors.New(r.Exception.Error))
  156. return
  157. default:
  158. err := cli.didRecvResponse(res)
  159. if err != nil {
  160. cli.stopForError(err)
  161. return
  162. }
  163. }
  164. }
  165. }
  166. func (cli *socketClient) willSendReq(reqres *ReqRes) {
  167. cli.mtx.Lock()
  168. defer cli.mtx.Unlock()
  169. cli.reqSent.PushBack(reqres)
  170. }
  171. func (cli *socketClient) didRecvResponse(res *types.Response) error {
  172. cli.mtx.Lock()
  173. defer cli.mtx.Unlock()
  174. // Get the first ReqRes.
  175. next := cli.reqSent.Front()
  176. if next == nil {
  177. return fmt.Errorf("unexpected %v when nothing expected", reflect.TypeOf(res.Value))
  178. }
  179. reqres := next.Value.(*ReqRes)
  180. if !resMatchesReq(reqres.Request, res) {
  181. return fmt.Errorf("unexpected %v when response to %v expected",
  182. reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
  183. }
  184. reqres.Response = res
  185. reqres.Done() // release waiters
  186. cli.reqSent.Remove(next) // pop first item from linked list
  187. // Notify client listener if set (global callback).
  188. if cli.resCb != nil {
  189. cli.resCb(reqres.Request, res)
  190. }
  191. // Notify reqRes listener if set (request specific callback).
  192. //
  193. // NOTE: It is possible this callback isn't set on the reqres object. At this
  194. // point, in which case it will be called after, when it is set.
  195. reqres.InvokeCallback()
  196. return nil
  197. }
  198. //----------------------------------------
  199. func (cli *socketClient) EchoAsync(ctx context.Context, msg string) (*ReqRes, error) {
  200. return cli.queueRequestAsync(ctx, types.ToRequestEcho(msg))
  201. }
  202. func (cli *socketClient) FlushAsync(ctx context.Context) (*ReqRes, error) {
  203. return cli.queueRequestAsync(ctx, types.ToRequestFlush())
  204. }
  205. func (cli *socketClient) InfoAsync(ctx context.Context, req types.RequestInfo) (*ReqRes, error) {
  206. return cli.queueRequestAsync(ctx, types.ToRequestInfo(req))
  207. }
  208. func (cli *socketClient) DeliverTxAsync(ctx context.Context, req types.RequestDeliverTx) (*ReqRes, error) {
  209. return cli.queueRequestAsync(ctx, types.ToRequestDeliverTx(req))
  210. }
  211. func (cli *socketClient) CheckTxAsync(ctx context.Context, req types.RequestCheckTx) (*ReqRes, error) {
  212. return cli.queueRequestAsync(ctx, types.ToRequestCheckTx(req))
  213. }
  214. func (cli *socketClient) QueryAsync(ctx context.Context, req types.RequestQuery) (*ReqRes, error) {
  215. return cli.queueRequestAsync(ctx, types.ToRequestQuery(req))
  216. }
  217. func (cli *socketClient) CommitAsync(ctx context.Context) (*ReqRes, error) {
  218. return cli.queueRequestAsync(ctx, types.ToRequestCommit())
  219. }
  220. func (cli *socketClient) InitChainAsync(ctx context.Context, req types.RequestInitChain) (*ReqRes, error) {
  221. return cli.queueRequestAsync(ctx, types.ToRequestInitChain(req))
  222. }
  223. func (cli *socketClient) BeginBlockAsync(ctx context.Context, req types.RequestBeginBlock) (*ReqRes, error) {
  224. return cli.queueRequestAsync(ctx, types.ToRequestBeginBlock(req))
  225. }
  226. func (cli *socketClient) EndBlockAsync(ctx context.Context, req types.RequestEndBlock) (*ReqRes, error) {
  227. return cli.queueRequestAsync(ctx, types.ToRequestEndBlock(req))
  228. }
  229. func (cli *socketClient) ListSnapshotsAsync(ctx context.Context, req types.RequestListSnapshots) (*ReqRes, error) {
  230. return cli.queueRequestAsync(ctx, types.ToRequestListSnapshots(req))
  231. }
  232. func (cli *socketClient) OfferSnapshotAsync(ctx context.Context, req types.RequestOfferSnapshot) (*ReqRes, error) {
  233. return cli.queueRequestAsync(ctx, types.ToRequestOfferSnapshot(req))
  234. }
  235. func (cli *socketClient) LoadSnapshotChunkAsync(
  236. ctx context.Context,
  237. req types.RequestLoadSnapshotChunk,
  238. ) (*ReqRes, error) {
  239. return cli.queueRequestAsync(ctx, types.ToRequestLoadSnapshotChunk(req))
  240. }
  241. func (cli *socketClient) ApplySnapshotChunkAsync(
  242. ctx context.Context,
  243. req types.RequestApplySnapshotChunk,
  244. ) (*ReqRes, error) {
  245. return cli.queueRequestAsync(ctx, types.ToRequestApplySnapshotChunk(req))
  246. }
  247. //----------------------------------------
  248. func (cli *socketClient) FlushSync(ctx context.Context) error {
  249. reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush(), true)
  250. if err != nil {
  251. return queueErr(err)
  252. }
  253. if err := cli.Error(); err != nil {
  254. return err
  255. }
  256. gotResp := make(chan struct{})
  257. go func() {
  258. // NOTE: if we don't flush the queue, its possible to get stuck here
  259. reqRes.Wait()
  260. close(gotResp)
  261. }()
  262. select {
  263. case <-gotResp:
  264. return cli.Error()
  265. case <-ctx.Done():
  266. return ctx.Err()
  267. }
  268. }
  269. func (cli *socketClient) EchoSync(ctx context.Context, msg string) (*types.ResponseEcho, error) {
  270. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestEcho(msg))
  271. if err != nil {
  272. return nil, err
  273. }
  274. return reqres.Response.GetEcho(), nil
  275. }
  276. func (cli *socketClient) InfoSync(
  277. ctx context.Context,
  278. req types.RequestInfo,
  279. ) (*types.ResponseInfo, error) {
  280. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestInfo(req))
  281. if err != nil {
  282. return nil, err
  283. }
  284. return reqres.Response.GetInfo(), nil
  285. }
  286. func (cli *socketClient) DeliverTxSync(
  287. ctx context.Context,
  288. req types.RequestDeliverTx,
  289. ) (*types.ResponseDeliverTx, error) {
  290. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestDeliverTx(req))
  291. if err != nil {
  292. return nil, err
  293. }
  294. return reqres.Response.GetDeliverTx(), nil
  295. }
  296. func (cli *socketClient) CheckTxSync(
  297. ctx context.Context,
  298. req types.RequestCheckTx,
  299. ) (*types.ResponseCheckTx, error) {
  300. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestCheckTx(req))
  301. if err != nil {
  302. return nil, err
  303. }
  304. return reqres.Response.GetCheckTx(), nil
  305. }
  306. func (cli *socketClient) QuerySync(
  307. ctx context.Context,
  308. req types.RequestQuery,
  309. ) (*types.ResponseQuery, error) {
  310. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestQuery(req))
  311. if err != nil {
  312. return nil, err
  313. }
  314. return reqres.Response.GetQuery(), nil
  315. }
  316. func (cli *socketClient) CommitSync(ctx context.Context) (*types.ResponseCommit, error) {
  317. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestCommit())
  318. if err != nil {
  319. return nil, err
  320. }
  321. return reqres.Response.GetCommit(), nil
  322. }
  323. func (cli *socketClient) InitChainSync(
  324. ctx context.Context,
  325. req types.RequestInitChain,
  326. ) (*types.ResponseInitChain, error) {
  327. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestInitChain(req))
  328. if err != nil {
  329. return nil, err
  330. }
  331. return reqres.Response.GetInitChain(), nil
  332. }
  333. func (cli *socketClient) BeginBlockSync(
  334. ctx context.Context,
  335. req types.RequestBeginBlock,
  336. ) (*types.ResponseBeginBlock, error) {
  337. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestBeginBlock(req))
  338. if err != nil {
  339. return nil, err
  340. }
  341. return reqres.Response.GetBeginBlock(), nil
  342. }
  343. func (cli *socketClient) EndBlockSync(
  344. ctx context.Context,
  345. req types.RequestEndBlock,
  346. ) (*types.ResponseEndBlock, error) {
  347. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestEndBlock(req))
  348. if err != nil {
  349. return nil, err
  350. }
  351. return reqres.Response.GetEndBlock(), nil
  352. }
  353. func (cli *socketClient) ListSnapshotsSync(
  354. ctx context.Context,
  355. req types.RequestListSnapshots,
  356. ) (*types.ResponseListSnapshots, error) {
  357. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestListSnapshots(req))
  358. if err != nil {
  359. return nil, err
  360. }
  361. return reqres.Response.GetListSnapshots(), nil
  362. }
  363. func (cli *socketClient) OfferSnapshotSync(
  364. ctx context.Context,
  365. req types.RequestOfferSnapshot,
  366. ) (*types.ResponseOfferSnapshot, error) {
  367. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestOfferSnapshot(req))
  368. if err != nil {
  369. return nil, err
  370. }
  371. return reqres.Response.GetOfferSnapshot(), nil
  372. }
  373. func (cli *socketClient) LoadSnapshotChunkSync(
  374. ctx context.Context,
  375. req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
  376. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestLoadSnapshotChunk(req))
  377. if err != nil {
  378. return nil, err
  379. }
  380. return reqres.Response.GetLoadSnapshotChunk(), nil
  381. }
  382. func (cli *socketClient) ApplySnapshotChunkSync(
  383. ctx context.Context,
  384. req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
  385. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestApplySnapshotChunk(req))
  386. if err != nil {
  387. return nil, err
  388. }
  389. return reqres.Response.GetApplySnapshotChunk(), nil
  390. }
  391. //----------------------------------------
  392. // queueRequest enqueues req onto the queue. If the queue is full, it ether
  393. // returns an error (sync=false) or blocks (sync=true).
  394. //
  395. // When sync=true, ctx can be used to break early. When sync=false, ctx will be
  396. // used later to determine if request should be dropped (if ctx.Err is
  397. // non-nil).
  398. //
  399. // The caller is responsible for checking cli.Error.
  400. func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request, sync bool) (*ReqRes, error) {
  401. reqres := NewReqRes(req)
  402. if sync {
  403. select {
  404. case cli.reqQueue <- &reqResWithContext{R: reqres, C: context.Background()}:
  405. case <-ctx.Done():
  406. return nil, ctx.Err()
  407. }
  408. } else {
  409. select {
  410. case cli.reqQueue <- &reqResWithContext{R: reqres, C: ctx}:
  411. default:
  412. return nil, errors.New("buffer is full")
  413. }
  414. }
  415. // Maybe auto-flush, or unset auto-flush
  416. switch req.Value.(type) {
  417. case *types.Request_Flush:
  418. cli.flushTimer.Unset()
  419. default:
  420. cli.flushTimer.Set()
  421. }
  422. return reqres, nil
  423. }
  424. func (cli *socketClient) queueRequestAsync(
  425. ctx context.Context,
  426. req *types.Request,
  427. ) (*ReqRes, error) {
  428. reqres, err := cli.queueRequest(ctx, req, false)
  429. if err != nil {
  430. return nil, queueErr(err)
  431. }
  432. return reqres, cli.Error()
  433. }
  434. func (cli *socketClient) queueRequestAndFlushSync(
  435. ctx context.Context,
  436. req *types.Request,
  437. ) (*ReqRes, error) {
  438. reqres, err := cli.queueRequest(ctx, req, true)
  439. if err != nil {
  440. return nil, queueErr(err)
  441. }
  442. if err := cli.FlushSync(ctx); err != nil {
  443. return nil, err
  444. }
  445. return reqres, cli.Error()
  446. }
  447. func queueErr(e error) error {
  448. return fmt.Errorf("can't queue req: %w", e)
  449. }
  450. func (cli *socketClient) flushQueue() {
  451. cli.mtx.Lock()
  452. defer cli.mtx.Unlock()
  453. // mark all in-flight messages as resolved (they will get cli.Error())
  454. for req := cli.reqSent.Front(); req != nil; req = req.Next() {
  455. reqres := req.Value.(*ReqRes)
  456. reqres.Done()
  457. }
  458. // mark all queued messages as resolved
  459. LOOP:
  460. for {
  461. select {
  462. case reqres := <-cli.reqQueue:
  463. reqres.R.Done()
  464. default:
  465. break LOOP
  466. }
  467. }
  468. }
  469. //----------------------------------------
  470. func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
  471. switch req.Value.(type) {
  472. case *types.Request_Echo:
  473. _, ok = res.Value.(*types.Response_Echo)
  474. case *types.Request_Flush:
  475. _, ok = res.Value.(*types.Response_Flush)
  476. case *types.Request_Info:
  477. _, ok = res.Value.(*types.Response_Info)
  478. case *types.Request_DeliverTx:
  479. _, ok = res.Value.(*types.Response_DeliverTx)
  480. case *types.Request_CheckTx:
  481. _, ok = res.Value.(*types.Response_CheckTx)
  482. case *types.Request_Commit:
  483. _, ok = res.Value.(*types.Response_Commit)
  484. case *types.Request_Query:
  485. _, ok = res.Value.(*types.Response_Query)
  486. case *types.Request_InitChain:
  487. _, ok = res.Value.(*types.Response_InitChain)
  488. case *types.Request_BeginBlock:
  489. _, ok = res.Value.(*types.Response_BeginBlock)
  490. case *types.Request_EndBlock:
  491. _, ok = res.Value.(*types.Response_EndBlock)
  492. case *types.Request_ApplySnapshotChunk:
  493. _, ok = res.Value.(*types.Response_ApplySnapshotChunk)
  494. case *types.Request_LoadSnapshotChunk:
  495. _, ok = res.Value.(*types.Response_LoadSnapshotChunk)
  496. case *types.Request_ListSnapshots:
  497. _, ok = res.Value.(*types.Response_ListSnapshots)
  498. case *types.Request_OfferSnapshot:
  499. _, ok = res.Value.(*types.Response_OfferSnapshot)
  500. }
  501. return ok
  502. }
  503. func (cli *socketClient) stopForError(err error) {
  504. if !cli.IsRunning() {
  505. return
  506. }
  507. cli.mtx.Lock()
  508. cli.err = err
  509. cli.mtx.Unlock()
  510. cli.Logger.Info("Stopping abci.socketClient", "reason", err)
  511. if err := cli.Stop(); err != nil {
  512. cli.Logger.Error("Error stopping abci.socketClient", "err", err)
  513. }
  514. }