You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

517 lines
16 KiB

9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
  1. package abciclient
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "time"
  8. "google.golang.org/grpc"
  9. "github.com/tendermint/tendermint/abci/types"
  10. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  11. "github.com/tendermint/tendermint/libs/log"
  12. tmnet "github.com/tendermint/tendermint/libs/net"
  13. "github.com/tendermint/tendermint/libs/service"
  14. )
  15. // A gRPC client.
  16. type grpcClient struct {
  17. service.BaseService
  18. logger log.Logger
  19. mustConnect bool
  20. client types.ABCIApplicationClient
  21. conn *grpc.ClientConn
  22. chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool
  23. mtx tmsync.Mutex
  24. addr string
  25. err error
  26. resCb func(*types.Request, *types.Response) // listens to all callbacks
  27. }
  28. var _ Client = (*grpcClient)(nil)
  29. // NewGRPCClient creates a gRPC client, which will connect to addr upon the
  30. // start. Note Client#Start returns an error if connection is unsuccessful and
  31. // mustConnect is true.
  32. //
  33. // GRPC calls are synchronous, but some callbacks expect to be called
  34. // asynchronously (eg. the mempool expects to be able to lock to remove bad txs
  35. // from cache). To accommodate, we finish each call in its own go-routine,
  36. // which is expensive, but easy - if you want something better, use the socket
  37. // protocol! maybe one day, if people really want it, we use grpc streams, but
  38. // hopefully not :D
  39. func NewGRPCClient(logger log.Logger, addr string, mustConnect bool) Client {
  40. cli := &grpcClient{
  41. logger: logger,
  42. addr: addr,
  43. mustConnect: mustConnect,
  44. // Buffering the channel is needed to make calls appear asynchronous,
  45. // which is required when the caller makes multiple async calls before
  46. // processing callbacks (e.g. due to holding locks). 64 means that a
  47. // caller can make up to 64 async calls before a callback must be
  48. // processed (otherwise it deadlocks). It also means that we can make 64
  49. // gRPC calls while processing a slow callback at the channel head.
  50. chReqRes: make(chan *ReqRes, 64),
  51. }
  52. cli.BaseService = *service.NewBaseService(logger, "grpcClient", cli)
  53. return cli
  54. }
  55. func dialerFunc(ctx context.Context, addr string) (net.Conn, error) {
  56. return tmnet.Connect(addr)
  57. }
  58. func (cli *grpcClient) OnStart(ctx context.Context) error {
  59. // This processes asynchronous request/response messages and dispatches
  60. // them to callbacks.
  61. go func() {
  62. // Use a separate function to use defer for mutex unlocks (this handles panics)
  63. callCb := func(reqres *ReqRes) {
  64. cli.mtx.Lock()
  65. defer cli.mtx.Unlock()
  66. reqres.SetDone()
  67. reqres.Done()
  68. // Notify client listener if set
  69. if cli.resCb != nil {
  70. cli.resCb(reqres.Request, reqres.Response)
  71. }
  72. // Notify reqRes listener if set
  73. if cb := reqres.GetCallback(); cb != nil {
  74. cb(reqres.Response)
  75. }
  76. }
  77. for {
  78. select {
  79. case reqres := <-cli.chReqRes:
  80. if reqres != nil {
  81. callCb(reqres)
  82. } else {
  83. cli.logger.Error("Received nil reqres")
  84. }
  85. case <-ctx.Done():
  86. return
  87. }
  88. }
  89. }()
  90. RETRY_LOOP:
  91. for {
  92. conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc))
  93. if err != nil {
  94. if cli.mustConnect {
  95. return err
  96. }
  97. cli.logger.Error(fmt.Sprintf("abci.grpcClient failed to connect to %v. Retrying...\n", cli.addr), "err", err)
  98. time.Sleep(time.Second * dialRetryIntervalSeconds)
  99. continue RETRY_LOOP
  100. }
  101. cli.logger.Info("Dialed server. Waiting for echo.", "addr", cli.addr)
  102. client := types.NewABCIApplicationClient(conn)
  103. cli.conn = conn
  104. ENSURE_CONNECTED:
  105. for {
  106. _, err := client.Echo(context.Background(), &types.RequestEcho{Message: "hello"}, grpc.WaitForReady(true))
  107. if err == nil {
  108. break ENSURE_CONNECTED
  109. }
  110. cli.logger.Error("Echo failed", "err", err)
  111. time.Sleep(time.Second * echoRetryIntervalSeconds)
  112. }
  113. cli.client = client
  114. return nil
  115. }
  116. }
  117. func (cli *grpcClient) OnStop() {
  118. if cli.conn != nil {
  119. cli.conn.Close()
  120. }
  121. close(cli.chReqRes)
  122. }
  123. func (cli *grpcClient) StopForError(err error) {
  124. if !cli.IsRunning() {
  125. return
  126. }
  127. cli.mtx.Lock()
  128. if cli.err == nil {
  129. cli.err = err
  130. }
  131. cli.mtx.Unlock()
  132. cli.logger.Error(fmt.Sprintf("Stopping abci.grpcClient for error: %v", err.Error()))
  133. if err := cli.Stop(); err != nil {
  134. cli.logger.Error("Error stopping abci.grpcClient", "err", err)
  135. }
  136. }
  137. func (cli *grpcClient) Error() error {
  138. cli.mtx.Lock()
  139. defer cli.mtx.Unlock()
  140. return cli.err
  141. }
  142. // Set listener for all responses
  143. // NOTE: callback may get internally generated flush responses.
  144. func (cli *grpcClient) SetResponseCallback(resCb Callback) {
  145. cli.mtx.Lock()
  146. cli.resCb = resCb
  147. cli.mtx.Unlock()
  148. }
  149. //----------------------------------------
  150. // NOTE: call is synchronous, use ctx to break early if needed
  151. func (cli *grpcClient) EchoAsync(ctx context.Context, msg string) (*ReqRes, error) {
  152. req := types.ToRequestEcho(msg)
  153. res, err := cli.client.Echo(ctx, req.GetEcho(), grpc.WaitForReady(true))
  154. if err != nil {
  155. return nil, err
  156. }
  157. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Echo{Echo: res}})
  158. }
  159. // NOTE: call is synchronous, use ctx to break early if needed
  160. func (cli *grpcClient) FlushAsync(ctx context.Context) (*ReqRes, error) {
  161. req := types.ToRequestFlush()
  162. res, err := cli.client.Flush(ctx, req.GetFlush(), grpc.WaitForReady(true))
  163. if err != nil {
  164. return nil, err
  165. }
  166. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Flush{Flush: res}})
  167. }
  168. // NOTE: call is synchronous, use ctx to break early if needed
  169. func (cli *grpcClient) InfoAsync(ctx context.Context, params types.RequestInfo) (*ReqRes, error) {
  170. req := types.ToRequestInfo(params)
  171. res, err := cli.client.Info(ctx, req.GetInfo(), grpc.WaitForReady(true))
  172. if err != nil {
  173. return nil, err
  174. }
  175. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Info{Info: res}})
  176. }
  177. // NOTE: call is synchronous, use ctx to break early if needed
  178. func (cli *grpcClient) DeliverTxAsync(ctx context.Context, params types.RequestDeliverTx) (*ReqRes, error) {
  179. req := types.ToRequestDeliverTx(params)
  180. res, err := cli.client.DeliverTx(ctx, req.GetDeliverTx(), grpc.WaitForReady(true))
  181. if err != nil {
  182. return nil, err
  183. }
  184. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_DeliverTx{DeliverTx: res}})
  185. }
  186. // NOTE: call is synchronous, use ctx to break early if needed
  187. func (cli *grpcClient) CheckTxAsync(ctx context.Context, params types.RequestCheckTx) (*ReqRes, error) {
  188. req := types.ToRequestCheckTx(params)
  189. res, err := cli.client.CheckTx(ctx, req.GetCheckTx(), grpc.WaitForReady(true))
  190. if err != nil {
  191. return nil, err
  192. }
  193. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_CheckTx{CheckTx: res}})
  194. }
  195. // NOTE: call is synchronous, use ctx to break early if needed
  196. func (cli *grpcClient) QueryAsync(ctx context.Context, params types.RequestQuery) (*ReqRes, error) {
  197. req := types.ToRequestQuery(params)
  198. res, err := cli.client.Query(ctx, req.GetQuery(), grpc.WaitForReady(true))
  199. if err != nil {
  200. return nil, err
  201. }
  202. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Query{Query: res}})
  203. }
  204. // NOTE: call is synchronous, use ctx to break early if needed
  205. func (cli *grpcClient) CommitAsync(ctx context.Context) (*ReqRes, error) {
  206. req := types.ToRequestCommit()
  207. res, err := cli.client.Commit(ctx, req.GetCommit(), grpc.WaitForReady(true))
  208. if err != nil {
  209. return nil, err
  210. }
  211. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Commit{Commit: res}})
  212. }
  213. // NOTE: call is synchronous, use ctx to break early if needed
  214. func (cli *grpcClient) InitChainAsync(ctx context.Context, params types.RequestInitChain) (*ReqRes, error) {
  215. req := types.ToRequestInitChain(params)
  216. res, err := cli.client.InitChain(ctx, req.GetInitChain(), grpc.WaitForReady(true))
  217. if err != nil {
  218. return nil, err
  219. }
  220. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_InitChain{InitChain: res}})
  221. }
  222. // NOTE: call is synchronous, use ctx to break early if needed
  223. func (cli *grpcClient) BeginBlockAsync(ctx context.Context, params types.RequestBeginBlock) (*ReqRes, error) {
  224. req := types.ToRequestBeginBlock(params)
  225. res, err := cli.client.BeginBlock(ctx, req.GetBeginBlock(), grpc.WaitForReady(true))
  226. if err != nil {
  227. return nil, err
  228. }
  229. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_BeginBlock{BeginBlock: res}})
  230. }
  231. // NOTE: call is synchronous, use ctx to break early if needed
  232. func (cli *grpcClient) EndBlockAsync(ctx context.Context, params types.RequestEndBlock) (*ReqRes, error) {
  233. req := types.ToRequestEndBlock(params)
  234. res, err := cli.client.EndBlock(ctx, req.GetEndBlock(), grpc.WaitForReady(true))
  235. if err != nil {
  236. return nil, err
  237. }
  238. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_EndBlock{EndBlock: res}})
  239. }
  240. // NOTE: call is synchronous, use ctx to break early if needed
  241. func (cli *grpcClient) ListSnapshotsAsync(ctx context.Context, params types.RequestListSnapshots) (*ReqRes, error) {
  242. req := types.ToRequestListSnapshots(params)
  243. res, err := cli.client.ListSnapshots(ctx, req.GetListSnapshots(), grpc.WaitForReady(true))
  244. if err != nil {
  245. return nil, err
  246. }
  247. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_ListSnapshots{ListSnapshots: res}})
  248. }
  249. // NOTE: call is synchronous, use ctx to break early if needed
  250. func (cli *grpcClient) OfferSnapshotAsync(ctx context.Context, params types.RequestOfferSnapshot) (*ReqRes, error) {
  251. req := types.ToRequestOfferSnapshot(params)
  252. res, err := cli.client.OfferSnapshot(ctx, req.GetOfferSnapshot(), grpc.WaitForReady(true))
  253. if err != nil {
  254. return nil, err
  255. }
  256. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_OfferSnapshot{OfferSnapshot: res}})
  257. }
  258. // NOTE: call is synchronous, use ctx to break early if needed
  259. func (cli *grpcClient) LoadSnapshotChunkAsync(
  260. ctx context.Context,
  261. params types.RequestLoadSnapshotChunk,
  262. ) (*ReqRes, error) {
  263. req := types.ToRequestLoadSnapshotChunk(params)
  264. res, err := cli.client.LoadSnapshotChunk(ctx, req.GetLoadSnapshotChunk(), grpc.WaitForReady(true))
  265. if err != nil {
  266. return nil, err
  267. }
  268. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_LoadSnapshotChunk{LoadSnapshotChunk: res}})
  269. }
  270. // NOTE: call is synchronous, use ctx to break early if needed
  271. func (cli *grpcClient) ApplySnapshotChunkAsync(
  272. ctx context.Context,
  273. params types.RequestApplySnapshotChunk,
  274. ) (*ReqRes, error) {
  275. req := types.ToRequestApplySnapshotChunk(params)
  276. res, err := cli.client.ApplySnapshotChunk(ctx, req.GetApplySnapshotChunk(), grpc.WaitForReady(true))
  277. if err != nil {
  278. return nil, err
  279. }
  280. return cli.finishAsyncCall(
  281. ctx,
  282. req,
  283. &types.Response{Value: &types.Response_ApplySnapshotChunk{ApplySnapshotChunk: res}},
  284. )
  285. }
  286. // finishAsyncCall creates a ReqRes for an async call, and immediately populates it
  287. // with the response. We don't complete it until it's been ordered via the channel.
  288. func (cli *grpcClient) finishAsyncCall(ctx context.Context, req *types.Request, res *types.Response) (*ReqRes, error) {
  289. reqres := NewReqRes(req)
  290. reqres.Response = res
  291. select {
  292. case cli.chReqRes <- reqres: // use channel for async responses, since they must be ordered
  293. return reqres, nil
  294. case <-ctx.Done():
  295. return nil, ctx.Err()
  296. }
  297. }
  298. // finishSyncCall waits for an async call to complete. It is necessary to call all
  299. // sync calls asynchronously as well, to maintain call and response ordering via
  300. // the channel, and this method will wait until the async call completes.
  301. func (cli *grpcClient) finishSyncCall(reqres *ReqRes) *types.Response {
  302. // It's possible that the callback is called twice, since the callback can
  303. // be called immediately on SetCallback() in addition to after it has been
  304. // set. This is because completing the ReqRes happens in a separate critical
  305. // section from the one where the callback is called: there is a race where
  306. // SetCallback() is called between completing the ReqRes and dispatching the
  307. // callback.
  308. //
  309. // We also buffer the channel with 1 response, since SetCallback() will be
  310. // called synchronously if the reqres is already completed, in which case
  311. // it will block on sending to the channel since it hasn't gotten around to
  312. // receiving from it yet.
  313. //
  314. // ReqRes should really handle callback dispatch internally, to guarantee
  315. // that it's only called once and avoid the above race conditions.
  316. var once sync.Once
  317. ch := make(chan *types.Response, 1)
  318. reqres.SetCallback(func(res *types.Response) {
  319. once.Do(func() {
  320. ch <- res
  321. })
  322. })
  323. return <-ch
  324. }
  325. //----------------------------------------
  326. func (cli *grpcClient) FlushSync(ctx context.Context) error {
  327. return nil
  328. }
  329. func (cli *grpcClient) EchoSync(ctx context.Context, msg string) (*types.ResponseEcho, error) {
  330. reqres, err := cli.EchoAsync(ctx, msg)
  331. if err != nil {
  332. return nil, err
  333. }
  334. return cli.finishSyncCall(reqres).GetEcho(), cli.Error()
  335. }
  336. func (cli *grpcClient) InfoSync(
  337. ctx context.Context,
  338. req types.RequestInfo,
  339. ) (*types.ResponseInfo, error) {
  340. reqres, err := cli.InfoAsync(ctx, req)
  341. if err != nil {
  342. return nil, err
  343. }
  344. return cli.finishSyncCall(reqres).GetInfo(), cli.Error()
  345. }
  346. func (cli *grpcClient) DeliverTxSync(
  347. ctx context.Context,
  348. params types.RequestDeliverTx,
  349. ) (*types.ResponseDeliverTx, error) {
  350. reqres, err := cli.DeliverTxAsync(ctx, params)
  351. if err != nil {
  352. return nil, err
  353. }
  354. return cli.finishSyncCall(reqres).GetDeliverTx(), cli.Error()
  355. }
  356. func (cli *grpcClient) CheckTxSync(
  357. ctx context.Context,
  358. params types.RequestCheckTx,
  359. ) (*types.ResponseCheckTx, error) {
  360. reqres, err := cli.CheckTxAsync(ctx, params)
  361. if err != nil {
  362. return nil, err
  363. }
  364. return cli.finishSyncCall(reqres).GetCheckTx(), cli.Error()
  365. }
  366. func (cli *grpcClient) QuerySync(
  367. ctx context.Context,
  368. req types.RequestQuery,
  369. ) (*types.ResponseQuery, error) {
  370. reqres, err := cli.QueryAsync(ctx, req)
  371. if err != nil {
  372. return nil, err
  373. }
  374. return cli.finishSyncCall(reqres).GetQuery(), cli.Error()
  375. }
  376. func (cli *grpcClient) CommitSync(ctx context.Context) (*types.ResponseCommit, error) {
  377. reqres, err := cli.CommitAsync(ctx)
  378. if err != nil {
  379. return nil, err
  380. }
  381. return cli.finishSyncCall(reqres).GetCommit(), cli.Error()
  382. }
  383. func (cli *grpcClient) InitChainSync(
  384. ctx context.Context,
  385. params types.RequestInitChain,
  386. ) (*types.ResponseInitChain, error) {
  387. reqres, err := cli.InitChainAsync(ctx, params)
  388. if err != nil {
  389. return nil, err
  390. }
  391. return cli.finishSyncCall(reqres).GetInitChain(), cli.Error()
  392. }
  393. func (cli *grpcClient) BeginBlockSync(
  394. ctx context.Context,
  395. params types.RequestBeginBlock,
  396. ) (*types.ResponseBeginBlock, error) {
  397. reqres, err := cli.BeginBlockAsync(ctx, params)
  398. if err != nil {
  399. return nil, err
  400. }
  401. return cli.finishSyncCall(reqres).GetBeginBlock(), cli.Error()
  402. }
  403. func (cli *grpcClient) EndBlockSync(
  404. ctx context.Context,
  405. params types.RequestEndBlock,
  406. ) (*types.ResponseEndBlock, error) {
  407. reqres, err := cli.EndBlockAsync(ctx, params)
  408. if err != nil {
  409. return nil, err
  410. }
  411. return cli.finishSyncCall(reqres).GetEndBlock(), cli.Error()
  412. }
  413. func (cli *grpcClient) ListSnapshotsSync(
  414. ctx context.Context,
  415. params types.RequestListSnapshots,
  416. ) (*types.ResponseListSnapshots, error) {
  417. reqres, err := cli.ListSnapshotsAsync(ctx, params)
  418. if err != nil {
  419. return nil, err
  420. }
  421. return cli.finishSyncCall(reqres).GetListSnapshots(), cli.Error()
  422. }
  423. func (cli *grpcClient) OfferSnapshotSync(
  424. ctx context.Context,
  425. params types.RequestOfferSnapshot,
  426. ) (*types.ResponseOfferSnapshot, error) {
  427. reqres, err := cli.OfferSnapshotAsync(ctx, params)
  428. if err != nil {
  429. return nil, err
  430. }
  431. return cli.finishSyncCall(reqres).GetOfferSnapshot(), cli.Error()
  432. }
  433. func (cli *grpcClient) LoadSnapshotChunkSync(
  434. ctx context.Context,
  435. params types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
  436. reqres, err := cli.LoadSnapshotChunkAsync(ctx, params)
  437. if err != nil {
  438. return nil, err
  439. }
  440. return cli.finishSyncCall(reqres).GetLoadSnapshotChunk(), cli.Error()
  441. }
  442. func (cli *grpcClient) ApplySnapshotChunkSync(
  443. ctx context.Context,
  444. params types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
  445. reqres, err := cli.ApplySnapshotChunkAsync(ctx, params)
  446. if err != nil {
  447. return nil, err
  448. }
  449. return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error()
  450. }