You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

516 lines
16 KiB

9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
  1. package abciclient
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "time"
  8. "google.golang.org/grpc"
  9. "github.com/tendermint/tendermint/abci/types"
  10. "github.com/tendermint/tendermint/libs/log"
  11. tmnet "github.com/tendermint/tendermint/libs/net"
  12. "github.com/tendermint/tendermint/libs/service"
  13. )
  14. // A gRPC client.
  15. type grpcClient struct {
  16. service.BaseService
  17. logger log.Logger
  18. mustConnect bool
  19. client types.ABCIApplicationClient
  20. conn *grpc.ClientConn
  21. chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool
  22. mtx sync.Mutex
  23. addr string
  24. err error
  25. resCb func(*types.Request, *types.Response) // listens to all callbacks
  26. }
  27. var _ Client = (*grpcClient)(nil)
  28. // NewGRPCClient creates a gRPC client, which will connect to addr upon the
  29. // start. Note Client#Start returns an error if connection is unsuccessful and
  30. // mustConnect is true.
  31. //
  32. // GRPC calls are synchronous, but some callbacks expect to be called
  33. // asynchronously (eg. the mempool expects to be able to lock to remove bad txs
  34. // from cache). To accommodate, we finish each call in its own go-routine,
  35. // which is expensive, but easy - if you want something better, use the socket
  36. // protocol! maybe one day, if people really want it, we use grpc streams, but
  37. // hopefully not :D
  38. func NewGRPCClient(logger log.Logger, addr string, mustConnect bool) Client {
  39. cli := &grpcClient{
  40. logger: logger,
  41. addr: addr,
  42. mustConnect: mustConnect,
  43. // Buffering the channel is needed to make calls appear asynchronous,
  44. // which is required when the caller makes multiple async calls before
  45. // processing callbacks (e.g. due to holding locks). 64 means that a
  46. // caller can make up to 64 async calls before a callback must be
  47. // processed (otherwise it deadlocks). It also means that we can make 64
  48. // gRPC calls while processing a slow callback at the channel head.
  49. chReqRes: make(chan *ReqRes, 64),
  50. }
  51. cli.BaseService = *service.NewBaseService(logger, "grpcClient", cli)
  52. return cli
  53. }
  54. func dialerFunc(ctx context.Context, addr string) (net.Conn, error) {
  55. return tmnet.Connect(addr)
  56. }
  57. func (cli *grpcClient) OnStart(ctx context.Context) error {
  58. // This processes asynchronous request/response messages and dispatches
  59. // them to callbacks.
  60. go func() {
  61. // Use a separate function to use defer for mutex unlocks (this handles panics)
  62. callCb := func(reqres *ReqRes) {
  63. cli.mtx.Lock()
  64. defer cli.mtx.Unlock()
  65. reqres.SetDone()
  66. reqres.Done()
  67. // Notify client listener if set
  68. if cli.resCb != nil {
  69. cli.resCb(reqres.Request, reqres.Response)
  70. }
  71. // Notify reqRes listener if set
  72. if cb := reqres.GetCallback(); cb != nil {
  73. cb(reqres.Response)
  74. }
  75. }
  76. for {
  77. select {
  78. case reqres := <-cli.chReqRes:
  79. if reqres != nil {
  80. callCb(reqres)
  81. } else {
  82. cli.logger.Error("Received nil reqres")
  83. }
  84. case <-ctx.Done():
  85. return
  86. }
  87. }
  88. }()
  89. RETRY_LOOP:
  90. for {
  91. conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc))
  92. if err != nil {
  93. if cli.mustConnect {
  94. return err
  95. }
  96. cli.logger.Error(fmt.Sprintf("abci.grpcClient failed to connect to %v. Retrying...\n", cli.addr), "err", err)
  97. time.Sleep(time.Second * dialRetryIntervalSeconds)
  98. continue RETRY_LOOP
  99. }
  100. cli.logger.Info("Dialed server. Waiting for echo.", "addr", cli.addr)
  101. client := types.NewABCIApplicationClient(conn)
  102. cli.conn = conn
  103. ENSURE_CONNECTED:
  104. for {
  105. _, err := client.Echo(context.Background(), &types.RequestEcho{Message: "hello"}, grpc.WaitForReady(true))
  106. if err == nil {
  107. break ENSURE_CONNECTED
  108. }
  109. cli.logger.Error("Echo failed", "err", err)
  110. time.Sleep(time.Second * echoRetryIntervalSeconds)
  111. }
  112. cli.client = client
  113. return nil
  114. }
  115. }
  116. func (cli *grpcClient) OnStop() {
  117. if cli.conn != nil {
  118. cli.conn.Close()
  119. }
  120. close(cli.chReqRes)
  121. }
  122. func (cli *grpcClient) StopForError(err error) {
  123. if !cli.IsRunning() {
  124. return
  125. }
  126. cli.mtx.Lock()
  127. if cli.err == nil {
  128. cli.err = err
  129. }
  130. cli.mtx.Unlock()
  131. cli.logger.Error(fmt.Sprintf("Stopping abci.grpcClient for error: %v", err.Error()))
  132. if err := cli.Stop(); err != nil {
  133. cli.logger.Error("Error stopping abci.grpcClient", "err", err)
  134. }
  135. }
  136. func (cli *grpcClient) Error() error {
  137. cli.mtx.Lock()
  138. defer cli.mtx.Unlock()
  139. return cli.err
  140. }
  141. // Set listener for all responses
  142. // NOTE: callback may get internally generated flush responses.
  143. func (cli *grpcClient) SetResponseCallback(resCb Callback) {
  144. cli.mtx.Lock()
  145. cli.resCb = resCb
  146. cli.mtx.Unlock()
  147. }
  148. //----------------------------------------
  149. // NOTE: call is synchronous, use ctx to break early if needed
  150. func (cli *grpcClient) EchoAsync(ctx context.Context, msg string) (*ReqRes, error) {
  151. req := types.ToRequestEcho(msg)
  152. res, err := cli.client.Echo(ctx, req.GetEcho(), grpc.WaitForReady(true))
  153. if err != nil {
  154. return nil, err
  155. }
  156. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Echo{Echo: res}})
  157. }
  158. // NOTE: call is synchronous, use ctx to break early if needed
  159. func (cli *grpcClient) FlushAsync(ctx context.Context) (*ReqRes, error) {
  160. req := types.ToRequestFlush()
  161. res, err := cli.client.Flush(ctx, req.GetFlush(), grpc.WaitForReady(true))
  162. if err != nil {
  163. return nil, err
  164. }
  165. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Flush{Flush: res}})
  166. }
  167. // NOTE: call is synchronous, use ctx to break early if needed
  168. func (cli *grpcClient) InfoAsync(ctx context.Context, params types.RequestInfo) (*ReqRes, error) {
  169. req := types.ToRequestInfo(params)
  170. res, err := cli.client.Info(ctx, req.GetInfo(), grpc.WaitForReady(true))
  171. if err != nil {
  172. return nil, err
  173. }
  174. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Info{Info: res}})
  175. }
  176. // NOTE: call is synchronous, use ctx to break early if needed
  177. func (cli *grpcClient) DeliverTxAsync(ctx context.Context, params types.RequestDeliverTx) (*ReqRes, error) {
  178. req := types.ToRequestDeliverTx(params)
  179. res, err := cli.client.DeliverTx(ctx, req.GetDeliverTx(), grpc.WaitForReady(true))
  180. if err != nil {
  181. return nil, err
  182. }
  183. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_DeliverTx{DeliverTx: res}})
  184. }
  185. // NOTE: call is synchronous, use ctx to break early if needed
  186. func (cli *grpcClient) CheckTxAsync(ctx context.Context, params types.RequestCheckTx) (*ReqRes, error) {
  187. req := types.ToRequestCheckTx(params)
  188. res, err := cli.client.CheckTx(ctx, req.GetCheckTx(), grpc.WaitForReady(true))
  189. if err != nil {
  190. return nil, err
  191. }
  192. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_CheckTx{CheckTx: res}})
  193. }
  194. // NOTE: call is synchronous, use ctx to break early if needed
  195. func (cli *grpcClient) QueryAsync(ctx context.Context, params types.RequestQuery) (*ReqRes, error) {
  196. req := types.ToRequestQuery(params)
  197. res, err := cli.client.Query(ctx, req.GetQuery(), grpc.WaitForReady(true))
  198. if err != nil {
  199. return nil, err
  200. }
  201. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Query{Query: res}})
  202. }
  203. // NOTE: call is synchronous, use ctx to break early if needed
  204. func (cli *grpcClient) CommitAsync(ctx context.Context) (*ReqRes, error) {
  205. req := types.ToRequestCommit()
  206. res, err := cli.client.Commit(ctx, req.GetCommit(), grpc.WaitForReady(true))
  207. if err != nil {
  208. return nil, err
  209. }
  210. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Commit{Commit: res}})
  211. }
  212. // NOTE: call is synchronous, use ctx to break early if needed
  213. func (cli *grpcClient) InitChainAsync(ctx context.Context, params types.RequestInitChain) (*ReqRes, error) {
  214. req := types.ToRequestInitChain(params)
  215. res, err := cli.client.InitChain(ctx, req.GetInitChain(), grpc.WaitForReady(true))
  216. if err != nil {
  217. return nil, err
  218. }
  219. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_InitChain{InitChain: res}})
  220. }
  221. // NOTE: call is synchronous, use ctx to break early if needed
  222. func (cli *grpcClient) BeginBlockAsync(ctx context.Context, params types.RequestBeginBlock) (*ReqRes, error) {
  223. req := types.ToRequestBeginBlock(params)
  224. res, err := cli.client.BeginBlock(ctx, req.GetBeginBlock(), grpc.WaitForReady(true))
  225. if err != nil {
  226. return nil, err
  227. }
  228. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_BeginBlock{BeginBlock: res}})
  229. }
  230. // NOTE: call is synchronous, use ctx to break early if needed
  231. func (cli *grpcClient) EndBlockAsync(ctx context.Context, params types.RequestEndBlock) (*ReqRes, error) {
  232. req := types.ToRequestEndBlock(params)
  233. res, err := cli.client.EndBlock(ctx, req.GetEndBlock(), grpc.WaitForReady(true))
  234. if err != nil {
  235. return nil, err
  236. }
  237. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_EndBlock{EndBlock: res}})
  238. }
  239. // NOTE: call is synchronous, use ctx to break early if needed
  240. func (cli *grpcClient) ListSnapshotsAsync(ctx context.Context, params types.RequestListSnapshots) (*ReqRes, error) {
  241. req := types.ToRequestListSnapshots(params)
  242. res, err := cli.client.ListSnapshots(ctx, req.GetListSnapshots(), grpc.WaitForReady(true))
  243. if err != nil {
  244. return nil, err
  245. }
  246. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_ListSnapshots{ListSnapshots: res}})
  247. }
  248. // NOTE: call is synchronous, use ctx to break early if needed
  249. func (cli *grpcClient) OfferSnapshotAsync(ctx context.Context, params types.RequestOfferSnapshot) (*ReqRes, error) {
  250. req := types.ToRequestOfferSnapshot(params)
  251. res, err := cli.client.OfferSnapshot(ctx, req.GetOfferSnapshot(), grpc.WaitForReady(true))
  252. if err != nil {
  253. return nil, err
  254. }
  255. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_OfferSnapshot{OfferSnapshot: res}})
  256. }
  257. // NOTE: call is synchronous, use ctx to break early if needed
  258. func (cli *grpcClient) LoadSnapshotChunkAsync(
  259. ctx context.Context,
  260. params types.RequestLoadSnapshotChunk,
  261. ) (*ReqRes, error) {
  262. req := types.ToRequestLoadSnapshotChunk(params)
  263. res, err := cli.client.LoadSnapshotChunk(ctx, req.GetLoadSnapshotChunk(), grpc.WaitForReady(true))
  264. if err != nil {
  265. return nil, err
  266. }
  267. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_LoadSnapshotChunk{LoadSnapshotChunk: res}})
  268. }
  269. // NOTE: call is synchronous, use ctx to break early if needed
  270. func (cli *grpcClient) ApplySnapshotChunkAsync(
  271. ctx context.Context,
  272. params types.RequestApplySnapshotChunk,
  273. ) (*ReqRes, error) {
  274. req := types.ToRequestApplySnapshotChunk(params)
  275. res, err := cli.client.ApplySnapshotChunk(ctx, req.GetApplySnapshotChunk(), grpc.WaitForReady(true))
  276. if err != nil {
  277. return nil, err
  278. }
  279. return cli.finishAsyncCall(
  280. ctx,
  281. req,
  282. &types.Response{Value: &types.Response_ApplySnapshotChunk{ApplySnapshotChunk: res}},
  283. )
  284. }
  285. // finishAsyncCall creates a ReqRes for an async call, and immediately populates it
  286. // with the response. We don't complete it until it's been ordered via the channel.
  287. func (cli *grpcClient) finishAsyncCall(ctx context.Context, req *types.Request, res *types.Response) (*ReqRes, error) {
  288. reqres := NewReqRes(req)
  289. reqres.Response = res
  290. select {
  291. case cli.chReqRes <- reqres: // use channel for async responses, since they must be ordered
  292. return reqres, nil
  293. case <-ctx.Done():
  294. return nil, ctx.Err()
  295. }
  296. }
  297. // finishSyncCall waits for an async call to complete. It is necessary to call all
  298. // sync calls asynchronously as well, to maintain call and response ordering via
  299. // the channel, and this method will wait until the async call completes.
  300. func (cli *grpcClient) finishSyncCall(reqres *ReqRes) *types.Response {
  301. // It's possible that the callback is called twice, since the callback can
  302. // be called immediately on SetCallback() in addition to after it has been
  303. // set. This is because completing the ReqRes happens in a separate critical
  304. // section from the one where the callback is called: there is a race where
  305. // SetCallback() is called between completing the ReqRes and dispatching the
  306. // callback.
  307. //
  308. // We also buffer the channel with 1 response, since SetCallback() will be
  309. // called synchronously if the reqres is already completed, in which case
  310. // it will block on sending to the channel since it hasn't gotten around to
  311. // receiving from it yet.
  312. //
  313. // ReqRes should really handle callback dispatch internally, to guarantee
  314. // that it's only called once and avoid the above race conditions.
  315. var once sync.Once
  316. ch := make(chan *types.Response, 1)
  317. reqres.SetCallback(func(res *types.Response) {
  318. once.Do(func() {
  319. ch <- res
  320. })
  321. })
  322. return <-ch
  323. }
  324. //----------------------------------------
  325. func (cli *grpcClient) FlushSync(ctx context.Context) error {
  326. return nil
  327. }
  328. func (cli *grpcClient) EchoSync(ctx context.Context, msg string) (*types.ResponseEcho, error) {
  329. reqres, err := cli.EchoAsync(ctx, msg)
  330. if err != nil {
  331. return nil, err
  332. }
  333. return cli.finishSyncCall(reqres).GetEcho(), cli.Error()
  334. }
  335. func (cli *grpcClient) InfoSync(
  336. ctx context.Context,
  337. req types.RequestInfo,
  338. ) (*types.ResponseInfo, error) {
  339. reqres, err := cli.InfoAsync(ctx, req)
  340. if err != nil {
  341. return nil, err
  342. }
  343. return cli.finishSyncCall(reqres).GetInfo(), cli.Error()
  344. }
  345. func (cli *grpcClient) DeliverTxSync(
  346. ctx context.Context,
  347. params types.RequestDeliverTx,
  348. ) (*types.ResponseDeliverTx, error) {
  349. reqres, err := cli.DeliverTxAsync(ctx, params)
  350. if err != nil {
  351. return nil, err
  352. }
  353. return cli.finishSyncCall(reqres).GetDeliverTx(), cli.Error()
  354. }
  355. func (cli *grpcClient) CheckTxSync(
  356. ctx context.Context,
  357. params types.RequestCheckTx,
  358. ) (*types.ResponseCheckTx, error) {
  359. reqres, err := cli.CheckTxAsync(ctx, params)
  360. if err != nil {
  361. return nil, err
  362. }
  363. return cli.finishSyncCall(reqres).GetCheckTx(), cli.Error()
  364. }
  365. func (cli *grpcClient) QuerySync(
  366. ctx context.Context,
  367. req types.RequestQuery,
  368. ) (*types.ResponseQuery, error) {
  369. reqres, err := cli.QueryAsync(ctx, req)
  370. if err != nil {
  371. return nil, err
  372. }
  373. return cli.finishSyncCall(reqres).GetQuery(), cli.Error()
  374. }
  375. func (cli *grpcClient) CommitSync(ctx context.Context) (*types.ResponseCommit, error) {
  376. reqres, err := cli.CommitAsync(ctx)
  377. if err != nil {
  378. return nil, err
  379. }
  380. return cli.finishSyncCall(reqres).GetCommit(), cli.Error()
  381. }
  382. func (cli *grpcClient) InitChainSync(
  383. ctx context.Context,
  384. params types.RequestInitChain,
  385. ) (*types.ResponseInitChain, error) {
  386. reqres, err := cli.InitChainAsync(ctx, params)
  387. if err != nil {
  388. return nil, err
  389. }
  390. return cli.finishSyncCall(reqres).GetInitChain(), cli.Error()
  391. }
  392. func (cli *grpcClient) BeginBlockSync(
  393. ctx context.Context,
  394. params types.RequestBeginBlock,
  395. ) (*types.ResponseBeginBlock, error) {
  396. reqres, err := cli.BeginBlockAsync(ctx, params)
  397. if err != nil {
  398. return nil, err
  399. }
  400. return cli.finishSyncCall(reqres).GetBeginBlock(), cli.Error()
  401. }
  402. func (cli *grpcClient) EndBlockSync(
  403. ctx context.Context,
  404. params types.RequestEndBlock,
  405. ) (*types.ResponseEndBlock, error) {
  406. reqres, err := cli.EndBlockAsync(ctx, params)
  407. if err != nil {
  408. return nil, err
  409. }
  410. return cli.finishSyncCall(reqres).GetEndBlock(), cli.Error()
  411. }
  412. func (cli *grpcClient) ListSnapshotsSync(
  413. ctx context.Context,
  414. params types.RequestListSnapshots,
  415. ) (*types.ResponseListSnapshots, error) {
  416. reqres, err := cli.ListSnapshotsAsync(ctx, params)
  417. if err != nil {
  418. return nil, err
  419. }
  420. return cli.finishSyncCall(reqres).GetListSnapshots(), cli.Error()
  421. }
  422. func (cli *grpcClient) OfferSnapshotSync(
  423. ctx context.Context,
  424. params types.RequestOfferSnapshot,
  425. ) (*types.ResponseOfferSnapshot, error) {
  426. reqres, err := cli.OfferSnapshotAsync(ctx, params)
  427. if err != nil {
  428. return nil, err
  429. }
  430. return cli.finishSyncCall(reqres).GetOfferSnapshot(), cli.Error()
  431. }
  432. func (cli *grpcClient) LoadSnapshotChunkSync(
  433. ctx context.Context,
  434. params types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
  435. reqres, err := cli.LoadSnapshotChunkAsync(ctx, params)
  436. if err != nil {
  437. return nil, err
  438. }
  439. return cli.finishSyncCall(reqres).GetLoadSnapshotChunk(), cli.Error()
  440. }
  441. func (cli *grpcClient) ApplySnapshotChunkSync(
  442. ctx context.Context,
  443. params types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
  444. reqres, err := cli.ApplySnapshotChunkAsync(ctx, params)
  445. if err != nil {
  446. return nil, err
  447. }
  448. return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error()
  449. }