You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

507 lines
16 KiB

9 years ago
9 years ago
8 years ago
7 years ago
9 years ago
8 years ago
9 years ago
9 years ago
  1. package abciclient
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "time"
  8. "google.golang.org/grpc"
  9. "github.com/tendermint/tendermint/abci/types"
  10. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  11. "github.com/tendermint/tendermint/libs/log"
  12. tmnet "github.com/tendermint/tendermint/libs/net"
  13. "github.com/tendermint/tendermint/libs/service"
  14. )
  15. // A gRPC client.
  16. type grpcClient struct {
  17. service.BaseService
  18. mustConnect bool
  19. client types.ABCIApplicationClient
  20. conn *grpc.ClientConn
  21. chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool
  22. mtx tmsync.Mutex
  23. addr string
  24. err error
  25. resCb func(*types.Request, *types.Response) // listens to all callbacks
  26. }
  27. var _ Client = (*grpcClient)(nil)
  28. // NewGRPCClient creates a gRPC client, which will connect to addr upon the
  29. // start. Note Client#Start returns an error if connection is unsuccessful and
  30. // mustConnect is true.
  31. //
  32. // GRPC calls are synchronous, but some callbacks expect to be called
  33. // asynchronously (eg. the mempool expects to be able to lock to remove bad txs
  34. // from cache). To accommodate, we finish each call in its own go-routine,
  35. // which is expensive, but easy - if you want something better, use the socket
  36. // protocol! maybe one day, if people really want it, we use grpc streams, but
  37. // hopefully not :D
  38. func NewGRPCClient(logger log.Logger, addr string, mustConnect bool) Client {
  39. cli := &grpcClient{
  40. addr: addr,
  41. mustConnect: mustConnect,
  42. // Buffering the channel is needed to make calls appear asynchronous,
  43. // which is required when the caller makes multiple async calls before
  44. // processing callbacks (e.g. due to holding locks). 64 means that a
  45. // caller can make up to 64 async calls before a callback must be
  46. // processed (otherwise it deadlocks). It also means that we can make 64
  47. // gRPC calls while processing a slow callback at the channel head.
  48. chReqRes: make(chan *ReqRes, 64),
  49. }
  50. cli.BaseService = *service.NewBaseService(logger, "grpcClient", cli)
  51. return cli
  52. }
  53. func dialerFunc(ctx context.Context, addr string) (net.Conn, error) {
  54. return tmnet.Connect(addr)
  55. }
  56. func (cli *grpcClient) OnStart(ctx context.Context) error {
  57. // This processes asynchronous request/response messages and dispatches
  58. // them to callbacks.
  59. go func() {
  60. // Use a separate function to use defer for mutex unlocks (this handles panics)
  61. callCb := func(reqres *ReqRes) {
  62. cli.mtx.Lock()
  63. defer cli.mtx.Unlock()
  64. reqres.SetDone()
  65. reqres.Done()
  66. // Notify client listener if set
  67. if cli.resCb != nil {
  68. cli.resCb(reqres.Request, reqres.Response)
  69. }
  70. // Notify reqRes listener if set
  71. if cb := reqres.GetCallback(); cb != nil {
  72. cb(reqres.Response)
  73. }
  74. }
  75. for reqres := range cli.chReqRes {
  76. if reqres != nil {
  77. callCb(reqres)
  78. } else {
  79. cli.Logger.Error("Received nil reqres")
  80. }
  81. }
  82. }()
  83. RETRY_LOOP:
  84. for {
  85. conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc))
  86. if err != nil {
  87. if cli.mustConnect {
  88. return err
  89. }
  90. cli.Logger.Error(fmt.Sprintf("abci.grpcClient failed to connect to %v. Retrying...\n", cli.addr), "err", err)
  91. time.Sleep(time.Second * dialRetryIntervalSeconds)
  92. continue RETRY_LOOP
  93. }
  94. cli.Logger.Info("Dialed server. Waiting for echo.", "addr", cli.addr)
  95. client := types.NewABCIApplicationClient(conn)
  96. cli.conn = conn
  97. ENSURE_CONNECTED:
  98. for {
  99. _, err := client.Echo(context.Background(), &types.RequestEcho{Message: "hello"}, grpc.WaitForReady(true))
  100. if err == nil {
  101. break ENSURE_CONNECTED
  102. }
  103. cli.Logger.Error("Echo failed", "err", err)
  104. time.Sleep(time.Second * echoRetryIntervalSeconds)
  105. }
  106. cli.client = client
  107. return nil
  108. }
  109. }
  110. func (cli *grpcClient) OnStop() {
  111. if cli.conn != nil {
  112. cli.conn.Close()
  113. }
  114. close(cli.chReqRes)
  115. }
  116. func (cli *grpcClient) StopForError(err error) {
  117. if !cli.IsRunning() {
  118. return
  119. }
  120. cli.mtx.Lock()
  121. if cli.err == nil {
  122. cli.err = err
  123. }
  124. cli.mtx.Unlock()
  125. cli.Logger.Error(fmt.Sprintf("Stopping abci.grpcClient for error: %v", err.Error()))
  126. if err := cli.Stop(); err != nil {
  127. cli.Logger.Error("Error stopping abci.grpcClient", "err", err)
  128. }
  129. }
  130. func (cli *grpcClient) Error() error {
  131. cli.mtx.Lock()
  132. defer cli.mtx.Unlock()
  133. return cli.err
  134. }
  135. // Set listener for all responses
  136. // NOTE: callback may get internally generated flush responses.
  137. func (cli *grpcClient) SetResponseCallback(resCb Callback) {
  138. cli.mtx.Lock()
  139. cli.resCb = resCb
  140. cli.mtx.Unlock()
  141. }
  142. //----------------------------------------
  143. // NOTE: call is synchronous, use ctx to break early if needed
  144. func (cli *grpcClient) EchoAsync(ctx context.Context, msg string) (*ReqRes, error) {
  145. req := types.ToRequestEcho(msg)
  146. res, err := cli.client.Echo(ctx, req.GetEcho(), grpc.WaitForReady(true))
  147. if err != nil {
  148. return nil, err
  149. }
  150. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Echo{Echo: res}})
  151. }
  152. // NOTE: call is synchronous, use ctx to break early if needed
  153. func (cli *grpcClient) FlushAsync(ctx context.Context) (*ReqRes, error) {
  154. req := types.ToRequestFlush()
  155. res, err := cli.client.Flush(ctx, req.GetFlush(), grpc.WaitForReady(true))
  156. if err != nil {
  157. return nil, err
  158. }
  159. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Flush{Flush: res}})
  160. }
  161. // NOTE: call is synchronous, use ctx to break early if needed
  162. func (cli *grpcClient) InfoAsync(ctx context.Context, params types.RequestInfo) (*ReqRes, error) {
  163. req := types.ToRequestInfo(params)
  164. res, err := cli.client.Info(ctx, req.GetInfo(), grpc.WaitForReady(true))
  165. if err != nil {
  166. return nil, err
  167. }
  168. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Info{Info: res}})
  169. }
  170. // NOTE: call is synchronous, use ctx to break early if needed
  171. func (cli *grpcClient) DeliverTxAsync(ctx context.Context, params types.RequestDeliverTx) (*ReqRes, error) {
  172. req := types.ToRequestDeliverTx(params)
  173. res, err := cli.client.DeliverTx(ctx, req.GetDeliverTx(), grpc.WaitForReady(true))
  174. if err != nil {
  175. return nil, err
  176. }
  177. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_DeliverTx{DeliverTx: res}})
  178. }
  179. // NOTE: call is synchronous, use ctx to break early if needed
  180. func (cli *grpcClient) CheckTxAsync(ctx context.Context, params types.RequestCheckTx) (*ReqRes, error) {
  181. req := types.ToRequestCheckTx(params)
  182. res, err := cli.client.CheckTx(ctx, req.GetCheckTx(), grpc.WaitForReady(true))
  183. if err != nil {
  184. return nil, err
  185. }
  186. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_CheckTx{CheckTx: res}})
  187. }
  188. // NOTE: call is synchronous, use ctx to break early if needed
  189. func (cli *grpcClient) QueryAsync(ctx context.Context, params types.RequestQuery) (*ReqRes, error) {
  190. req := types.ToRequestQuery(params)
  191. res, err := cli.client.Query(ctx, req.GetQuery(), grpc.WaitForReady(true))
  192. if err != nil {
  193. return nil, err
  194. }
  195. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Query{Query: res}})
  196. }
  197. // NOTE: call is synchronous, use ctx to break early if needed
  198. func (cli *grpcClient) CommitAsync(ctx context.Context) (*ReqRes, error) {
  199. req := types.ToRequestCommit()
  200. res, err := cli.client.Commit(ctx, req.GetCommit(), grpc.WaitForReady(true))
  201. if err != nil {
  202. return nil, err
  203. }
  204. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_Commit{Commit: res}})
  205. }
  206. // NOTE: call is synchronous, use ctx to break early if needed
  207. func (cli *grpcClient) InitChainAsync(ctx context.Context, params types.RequestInitChain) (*ReqRes, error) {
  208. req := types.ToRequestInitChain(params)
  209. res, err := cli.client.InitChain(ctx, req.GetInitChain(), grpc.WaitForReady(true))
  210. if err != nil {
  211. return nil, err
  212. }
  213. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_InitChain{InitChain: res}})
  214. }
  215. // NOTE: call is synchronous, use ctx to break early if needed
  216. func (cli *grpcClient) BeginBlockAsync(ctx context.Context, params types.RequestBeginBlock) (*ReqRes, error) {
  217. req := types.ToRequestBeginBlock(params)
  218. res, err := cli.client.BeginBlock(ctx, req.GetBeginBlock(), grpc.WaitForReady(true))
  219. if err != nil {
  220. return nil, err
  221. }
  222. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_BeginBlock{BeginBlock: res}})
  223. }
  224. // NOTE: call is synchronous, use ctx to break early if needed
  225. func (cli *grpcClient) EndBlockAsync(ctx context.Context, params types.RequestEndBlock) (*ReqRes, error) {
  226. req := types.ToRequestEndBlock(params)
  227. res, err := cli.client.EndBlock(ctx, req.GetEndBlock(), grpc.WaitForReady(true))
  228. if err != nil {
  229. return nil, err
  230. }
  231. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_EndBlock{EndBlock: res}})
  232. }
  233. // NOTE: call is synchronous, use ctx to break early if needed
  234. func (cli *grpcClient) ListSnapshotsAsync(ctx context.Context, params types.RequestListSnapshots) (*ReqRes, error) {
  235. req := types.ToRequestListSnapshots(params)
  236. res, err := cli.client.ListSnapshots(ctx, req.GetListSnapshots(), grpc.WaitForReady(true))
  237. if err != nil {
  238. return nil, err
  239. }
  240. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_ListSnapshots{ListSnapshots: res}})
  241. }
  242. // NOTE: call is synchronous, use ctx to break early if needed
  243. func (cli *grpcClient) OfferSnapshotAsync(ctx context.Context, params types.RequestOfferSnapshot) (*ReqRes, error) {
  244. req := types.ToRequestOfferSnapshot(params)
  245. res, err := cli.client.OfferSnapshot(ctx, req.GetOfferSnapshot(), grpc.WaitForReady(true))
  246. if err != nil {
  247. return nil, err
  248. }
  249. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_OfferSnapshot{OfferSnapshot: res}})
  250. }
  251. // NOTE: call is synchronous, use ctx to break early if needed
  252. func (cli *grpcClient) LoadSnapshotChunkAsync(
  253. ctx context.Context,
  254. params types.RequestLoadSnapshotChunk,
  255. ) (*ReqRes, error) {
  256. req := types.ToRequestLoadSnapshotChunk(params)
  257. res, err := cli.client.LoadSnapshotChunk(ctx, req.GetLoadSnapshotChunk(), grpc.WaitForReady(true))
  258. if err != nil {
  259. return nil, err
  260. }
  261. return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_LoadSnapshotChunk{LoadSnapshotChunk: res}})
  262. }
  263. // NOTE: call is synchronous, use ctx to break early if needed
  264. func (cli *grpcClient) ApplySnapshotChunkAsync(
  265. ctx context.Context,
  266. params types.RequestApplySnapshotChunk,
  267. ) (*ReqRes, error) {
  268. req := types.ToRequestApplySnapshotChunk(params)
  269. res, err := cli.client.ApplySnapshotChunk(ctx, req.GetApplySnapshotChunk(), grpc.WaitForReady(true))
  270. if err != nil {
  271. return nil, err
  272. }
  273. return cli.finishAsyncCall(
  274. ctx,
  275. req,
  276. &types.Response{Value: &types.Response_ApplySnapshotChunk{ApplySnapshotChunk: res}},
  277. )
  278. }
  279. // finishAsyncCall creates a ReqRes for an async call, and immediately populates it
  280. // with the response. We don't complete it until it's been ordered via the channel.
  281. func (cli *grpcClient) finishAsyncCall(ctx context.Context, req *types.Request, res *types.Response) (*ReqRes, error) {
  282. reqres := NewReqRes(req)
  283. reqres.Response = res
  284. select {
  285. case cli.chReqRes <- reqres: // use channel for async responses, since they must be ordered
  286. return reqres, nil
  287. case <-ctx.Done():
  288. return nil, ctx.Err()
  289. }
  290. }
  291. // finishSyncCall waits for an async call to complete. It is necessary to call all
  292. // sync calls asynchronously as well, to maintain call and response ordering via
  293. // the channel, and this method will wait until the async call completes.
  294. func (cli *grpcClient) finishSyncCall(reqres *ReqRes) *types.Response {
  295. // It's possible that the callback is called twice, since the callback can
  296. // be called immediately on SetCallback() in addition to after it has been
  297. // set. This is because completing the ReqRes happens in a separate critical
  298. // section from the one where the callback is called: there is a race where
  299. // SetCallback() is called between completing the ReqRes and dispatching the
  300. // callback.
  301. //
  302. // We also buffer the channel with 1 response, since SetCallback() will be
  303. // called synchronously if the reqres is already completed, in which case
  304. // it will block on sending to the channel since it hasn't gotten around to
  305. // receiving from it yet.
  306. //
  307. // ReqRes should really handle callback dispatch internally, to guarantee
  308. // that it's only called once and avoid the above race conditions.
  309. var once sync.Once
  310. ch := make(chan *types.Response, 1)
  311. reqres.SetCallback(func(res *types.Response) {
  312. once.Do(func() {
  313. ch <- res
  314. })
  315. })
  316. return <-ch
  317. }
  318. //----------------------------------------
  319. func (cli *grpcClient) FlushSync(ctx context.Context) error {
  320. return nil
  321. }
  322. func (cli *grpcClient) EchoSync(ctx context.Context, msg string) (*types.ResponseEcho, error) {
  323. reqres, err := cli.EchoAsync(ctx, msg)
  324. if err != nil {
  325. return nil, err
  326. }
  327. return cli.finishSyncCall(reqres).GetEcho(), cli.Error()
  328. }
  329. func (cli *grpcClient) InfoSync(
  330. ctx context.Context,
  331. req types.RequestInfo,
  332. ) (*types.ResponseInfo, error) {
  333. reqres, err := cli.InfoAsync(ctx, req)
  334. if err != nil {
  335. return nil, err
  336. }
  337. return cli.finishSyncCall(reqres).GetInfo(), cli.Error()
  338. }
  339. func (cli *grpcClient) DeliverTxSync(
  340. ctx context.Context,
  341. params types.RequestDeliverTx,
  342. ) (*types.ResponseDeliverTx, error) {
  343. reqres, err := cli.DeliverTxAsync(ctx, params)
  344. if err != nil {
  345. return nil, err
  346. }
  347. return cli.finishSyncCall(reqres).GetDeliverTx(), cli.Error()
  348. }
  349. func (cli *grpcClient) CheckTxSync(
  350. ctx context.Context,
  351. params types.RequestCheckTx,
  352. ) (*types.ResponseCheckTx, error) {
  353. reqres, err := cli.CheckTxAsync(ctx, params)
  354. if err != nil {
  355. return nil, err
  356. }
  357. return cli.finishSyncCall(reqres).GetCheckTx(), cli.Error()
  358. }
  359. func (cli *grpcClient) QuerySync(
  360. ctx context.Context,
  361. req types.RequestQuery,
  362. ) (*types.ResponseQuery, error) {
  363. reqres, err := cli.QueryAsync(ctx, req)
  364. if err != nil {
  365. return nil, err
  366. }
  367. return cli.finishSyncCall(reqres).GetQuery(), cli.Error()
  368. }
  369. func (cli *grpcClient) CommitSync(ctx context.Context) (*types.ResponseCommit, error) {
  370. reqres, err := cli.CommitAsync(ctx)
  371. if err != nil {
  372. return nil, err
  373. }
  374. return cli.finishSyncCall(reqres).GetCommit(), cli.Error()
  375. }
  376. func (cli *grpcClient) InitChainSync(
  377. ctx context.Context,
  378. params types.RequestInitChain,
  379. ) (*types.ResponseInitChain, error) {
  380. reqres, err := cli.InitChainAsync(ctx, params)
  381. if err != nil {
  382. return nil, err
  383. }
  384. return cli.finishSyncCall(reqres).GetInitChain(), cli.Error()
  385. }
  386. func (cli *grpcClient) BeginBlockSync(
  387. ctx context.Context,
  388. params types.RequestBeginBlock,
  389. ) (*types.ResponseBeginBlock, error) {
  390. reqres, err := cli.BeginBlockAsync(ctx, params)
  391. if err != nil {
  392. return nil, err
  393. }
  394. return cli.finishSyncCall(reqres).GetBeginBlock(), cli.Error()
  395. }
  396. func (cli *grpcClient) EndBlockSync(
  397. ctx context.Context,
  398. params types.RequestEndBlock,
  399. ) (*types.ResponseEndBlock, error) {
  400. reqres, err := cli.EndBlockAsync(ctx, params)
  401. if err != nil {
  402. return nil, err
  403. }
  404. return cli.finishSyncCall(reqres).GetEndBlock(), cli.Error()
  405. }
  406. func (cli *grpcClient) ListSnapshotsSync(
  407. ctx context.Context,
  408. params types.RequestListSnapshots,
  409. ) (*types.ResponseListSnapshots, error) {
  410. reqres, err := cli.ListSnapshotsAsync(ctx, params)
  411. if err != nil {
  412. return nil, err
  413. }
  414. return cli.finishSyncCall(reqres).GetListSnapshots(), cli.Error()
  415. }
  416. func (cli *grpcClient) OfferSnapshotSync(
  417. ctx context.Context,
  418. params types.RequestOfferSnapshot,
  419. ) (*types.ResponseOfferSnapshot, error) {
  420. reqres, err := cli.OfferSnapshotAsync(ctx, params)
  421. if err != nil {
  422. return nil, err
  423. }
  424. return cli.finishSyncCall(reqres).GetOfferSnapshot(), cli.Error()
  425. }
  426. func (cli *grpcClient) LoadSnapshotChunkSync(
  427. ctx context.Context,
  428. params types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
  429. reqres, err := cli.LoadSnapshotChunkAsync(ctx, params)
  430. if err != nil {
  431. return nil, err
  432. }
  433. return cli.finishSyncCall(reqres).GetLoadSnapshotChunk(), cli.Error()
  434. }
  435. func (cli *grpcClient) ApplySnapshotChunkSync(
  436. ctx context.Context,
  437. params types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
  438. reqres, err := cli.ApplySnapshotChunkAsync(ctx, params)
  439. if err != nil {
  440. return nil, err
  441. }
  442. return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error()
  443. }