You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

420 lines
14 KiB

8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
7 years ago
9 years ago
8 years ago
9 years ago
9 years ago
  1. package abcicli
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "time"
  7. "golang.org/x/net/context"
  8. "google.golang.org/grpc"
  9. "github.com/tendermint/tendermint/abci/types"
  10. tmnet "github.com/tendermint/tendermint/libs/net"
  11. "github.com/tendermint/tendermint/libs/service"
  12. tmsync "github.com/tendermint/tendermint/libs/sync"
  13. )
  14. var _ Client = (*grpcClient)(nil)
  15. // A stripped copy of the remoteClient that makes
  16. // synchronous calls using grpc
  17. type grpcClient struct {
  18. service.BaseService
  19. mustConnect bool
  20. client types.ABCIApplicationClient
  21. conn *grpc.ClientConn
  22. chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool
  23. mtx tmsync.Mutex
  24. addr string
  25. err error
  26. resCb func(*types.Request, *types.Response) // listens to all callbacks
  27. }
  28. func NewGRPCClient(addr string, mustConnect bool) Client {
  29. cli := &grpcClient{
  30. addr: addr,
  31. mustConnect: mustConnect,
  32. // Buffering the channel is needed to make calls appear asynchronous,
  33. // which is required when the caller makes multiple async calls before
  34. // processing callbacks (e.g. due to holding locks). 64 means that a
  35. // caller can make up to 64 async calls before a callback must be
  36. // processed (otherwise it deadlocks). It also means that we can make 64
  37. // gRPC calls while processing a slow callback at the channel head.
  38. chReqRes: make(chan *ReqRes, 64),
  39. }
  40. cli.BaseService = *service.NewBaseService(nil, "grpcClient", cli)
  41. return cli
  42. }
  43. func dialerFunc(ctx context.Context, addr string) (net.Conn, error) {
  44. return tmnet.Connect(addr)
  45. }
  46. func (cli *grpcClient) OnStart() error {
  47. if err := cli.BaseService.OnStart(); err != nil {
  48. return err
  49. }
  50. // This processes asynchronous request/response messages and dispatches
  51. // them to callbacks.
  52. go func() {
  53. // Use a separate function to use defer for mutex unlocks (this handles panics)
  54. callCb := func(reqres *ReqRes) {
  55. cli.mtx.Lock()
  56. defer cli.mtx.Unlock()
  57. reqres.SetDone()
  58. reqres.Done()
  59. // Notify client listener if set
  60. if cli.resCb != nil {
  61. cli.resCb(reqres.Request, reqres.Response)
  62. }
  63. // Notify reqRes listener if set
  64. if cb := reqres.GetCallback(); cb != nil {
  65. cb(reqres.Response)
  66. }
  67. }
  68. for reqres := range cli.chReqRes {
  69. if reqres != nil {
  70. callCb(reqres)
  71. } else {
  72. cli.Logger.Error("Received nil reqres")
  73. }
  74. }
  75. }()
  76. RETRY_LOOP:
  77. for {
  78. //nolint:staticcheck // SA1019 Existing use of deprecated but supported dial option.
  79. conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc))
  80. if err != nil {
  81. if cli.mustConnect {
  82. return err
  83. }
  84. cli.Logger.Error(fmt.Sprintf("abci.grpcClient failed to connect to %v. Retrying...\n", cli.addr), "err", err)
  85. time.Sleep(time.Second * dialRetryIntervalSeconds)
  86. continue RETRY_LOOP
  87. }
  88. cli.Logger.Info("Dialed server. Waiting for echo.", "addr", cli.addr)
  89. client := types.NewABCIApplicationClient(conn)
  90. cli.conn = conn
  91. ENSURE_CONNECTED:
  92. for {
  93. _, err := client.Echo(context.Background(), &types.RequestEcho{Message: "hello"}, grpc.WaitForReady(true))
  94. if err == nil {
  95. break ENSURE_CONNECTED
  96. }
  97. cli.Logger.Error("Echo failed", "err", err)
  98. time.Sleep(time.Second * echoRetryIntervalSeconds)
  99. }
  100. cli.client = client
  101. return nil
  102. }
  103. }
  104. func (cli *grpcClient) OnStop() {
  105. cli.BaseService.OnStop()
  106. if cli.conn != nil {
  107. cli.conn.Close()
  108. }
  109. close(cli.chReqRes)
  110. }
  111. func (cli *grpcClient) StopForError(err error) {
  112. if !cli.IsRunning() {
  113. return
  114. }
  115. cli.mtx.Lock()
  116. if cli.err == nil {
  117. cli.err = err
  118. }
  119. cli.mtx.Unlock()
  120. cli.Logger.Error(fmt.Sprintf("Stopping abci.grpcClient for error: %v", err.Error()))
  121. if err := cli.Stop(); err != nil {
  122. cli.Logger.Error("Error stopping abci.grpcClient", "err", err)
  123. }
  124. }
  125. func (cli *grpcClient) Error() error {
  126. cli.mtx.Lock()
  127. defer cli.mtx.Unlock()
  128. return cli.err
  129. }
  130. // Set listener for all responses
  131. // NOTE: callback may get internally generated flush responses.
  132. func (cli *grpcClient) SetResponseCallback(resCb Callback) {
  133. cli.mtx.Lock()
  134. cli.resCb = resCb
  135. cli.mtx.Unlock()
  136. }
  137. //----------------------------------------
  138. // GRPC calls are synchronous, but some callbacks expect to be called asynchronously
  139. // (eg. the mempool expects to be able to lock to remove bad txs from cache).
  140. // To accommodate, we finish each call in its own go-routine,
  141. // which is expensive, but easy - if you want something better, use the socket protocol!
  142. // maybe one day, if people really want it, we use grpc streams,
  143. // but hopefully not :D
  144. func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
  145. req := types.ToRequestEcho(msg)
  146. res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.WaitForReady(true))
  147. if err != nil {
  148. cli.StopForError(err)
  149. }
  150. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Echo{Echo: res}})
  151. }
  152. func (cli *grpcClient) FlushAsync() *ReqRes {
  153. req := types.ToRequestFlush()
  154. res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.WaitForReady(true))
  155. if err != nil {
  156. cli.StopForError(err)
  157. }
  158. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Flush{Flush: res}})
  159. }
  160. func (cli *grpcClient) InfoAsync(params types.RequestInfo) *ReqRes {
  161. req := types.ToRequestInfo(params)
  162. res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.WaitForReady(true))
  163. if err != nil {
  164. cli.StopForError(err)
  165. }
  166. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Info{Info: res}})
  167. }
  168. func (cli *grpcClient) SetOptionAsync(params types.RequestSetOption) *ReqRes {
  169. req := types.ToRequestSetOption(params)
  170. res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.WaitForReady(true))
  171. if err != nil {
  172. cli.StopForError(err)
  173. }
  174. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_SetOption{SetOption: res}})
  175. }
  176. func (cli *grpcClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes {
  177. req := types.ToRequestDeliverTx(params)
  178. res, err := cli.client.DeliverTx(context.Background(), req.GetDeliverTx(), grpc.WaitForReady(true))
  179. if err != nil {
  180. cli.StopForError(err)
  181. }
  182. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_DeliverTx{DeliverTx: res}})
  183. }
  184. func (cli *grpcClient) CheckTxAsync(params types.RequestCheckTx) *ReqRes {
  185. req := types.ToRequestCheckTx(params)
  186. res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.WaitForReady(true))
  187. if err != nil {
  188. cli.StopForError(err)
  189. }
  190. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_CheckTx{CheckTx: res}})
  191. }
  192. func (cli *grpcClient) QueryAsync(params types.RequestQuery) *ReqRes {
  193. req := types.ToRequestQuery(params)
  194. res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.WaitForReady(true))
  195. if err != nil {
  196. cli.StopForError(err)
  197. }
  198. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Query{Query: res}})
  199. }
  200. func (cli *grpcClient) CommitAsync() *ReqRes {
  201. req := types.ToRequestCommit()
  202. res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.WaitForReady(true))
  203. if err != nil {
  204. cli.StopForError(err)
  205. }
  206. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Commit{Commit: res}})
  207. }
  208. func (cli *grpcClient) InitChainAsync(params types.RequestInitChain) *ReqRes {
  209. req := types.ToRequestInitChain(params)
  210. res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.WaitForReady(true))
  211. if err != nil {
  212. cli.StopForError(err)
  213. }
  214. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_InitChain{InitChain: res}})
  215. }
  216. func (cli *grpcClient) BeginBlockAsync(params types.RequestBeginBlock) *ReqRes {
  217. req := types.ToRequestBeginBlock(params)
  218. res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.WaitForReady(true))
  219. if err != nil {
  220. cli.StopForError(err)
  221. }
  222. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginBlock{BeginBlock: res}})
  223. }
  224. func (cli *grpcClient) EndBlockAsync(params types.RequestEndBlock) *ReqRes {
  225. req := types.ToRequestEndBlock(params)
  226. res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.WaitForReady(true))
  227. if err != nil {
  228. cli.StopForError(err)
  229. }
  230. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndBlock{EndBlock: res}})
  231. }
  232. func (cli *grpcClient) ListSnapshotsAsync(params types.RequestListSnapshots) *ReqRes {
  233. req := types.ToRequestListSnapshots(params)
  234. res, err := cli.client.ListSnapshots(context.Background(), req.GetListSnapshots(), grpc.WaitForReady(true))
  235. if err != nil {
  236. cli.StopForError(err)
  237. }
  238. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_ListSnapshots{ListSnapshots: res}})
  239. }
  240. func (cli *grpcClient) OfferSnapshotAsync(params types.RequestOfferSnapshot) *ReqRes {
  241. req := types.ToRequestOfferSnapshot(params)
  242. res, err := cli.client.OfferSnapshot(context.Background(), req.GetOfferSnapshot(), grpc.WaitForReady(true))
  243. if err != nil {
  244. cli.StopForError(err)
  245. }
  246. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_OfferSnapshot{OfferSnapshot: res}})
  247. }
  248. func (cli *grpcClient) LoadSnapshotChunkAsync(params types.RequestLoadSnapshotChunk) *ReqRes {
  249. req := types.ToRequestLoadSnapshotChunk(params)
  250. res, err := cli.client.LoadSnapshotChunk(context.Background(), req.GetLoadSnapshotChunk(), grpc.WaitForReady(true))
  251. if err != nil {
  252. cli.StopForError(err)
  253. }
  254. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_LoadSnapshotChunk{LoadSnapshotChunk: res}})
  255. }
  256. func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshotChunk) *ReqRes {
  257. req := types.ToRequestApplySnapshotChunk(params)
  258. res, err := cli.client.ApplySnapshotChunk(context.Background(), req.GetApplySnapshotChunk(), grpc.WaitForReady(true))
  259. if err != nil {
  260. cli.StopForError(err)
  261. }
  262. return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_ApplySnapshotChunk{ApplySnapshotChunk: res}})
  263. }
  264. // finishAsyncCall creates a ReqRes for an async call, and immediately populates it
  265. // with the response. We don't complete it until it's been ordered via the channel.
  266. func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
  267. reqres := NewReqRes(req)
  268. reqres.Response = res
  269. cli.chReqRes <- reqres // use channel for async responses, since they must be ordered
  270. return reqres
  271. }
  272. // finishSyncCall waits for an async call to complete. It is necessary to call all
  273. // sync calls asynchronously as well, to maintain call and response ordering via
  274. // the channel, and this method will wait until the async call completes.
  275. func (cli *grpcClient) finishSyncCall(reqres *ReqRes) *types.Response {
  276. // It's possible that the callback is called twice, since the callback can
  277. // be called immediately on SetCallback() in addition to after it has been
  278. // set. This is because completing the ReqRes happens in a separate critical
  279. // section from the one where the callback is called: there is a race where
  280. // SetCallback() is called between completing the ReqRes and dispatching the
  281. // callback.
  282. //
  283. // We also buffer the channel with 1 response, since SetCallback() will be
  284. // called synchronously if the reqres is already completed, in which case
  285. // it will block on sending to the channel since it hasn't gotten around to
  286. // receiving from it yet.
  287. //
  288. // ReqRes should really handle callback dispatch internally, to guarantee
  289. // that it's only called once and avoid the above race conditions.
  290. var once sync.Once
  291. ch := make(chan *types.Response, 1)
  292. reqres.SetCallback(func(res *types.Response) {
  293. once.Do(func() {
  294. ch <- res
  295. })
  296. })
  297. return <-ch
  298. }
  299. //----------------------------------------
  300. func (cli *grpcClient) FlushSync() error {
  301. return nil
  302. }
  303. func (cli *grpcClient) EchoSync(msg string) (*types.ResponseEcho, error) {
  304. reqres := cli.EchoAsync(msg)
  305. // StopForError should already have been called if error is set
  306. return cli.finishSyncCall(reqres).GetEcho(), cli.Error()
  307. }
  308. func (cli *grpcClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
  309. reqres := cli.InfoAsync(req)
  310. return cli.finishSyncCall(reqres).GetInfo(), cli.Error()
  311. }
  312. func (cli *grpcClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) {
  313. reqres := cli.SetOptionAsync(req)
  314. return reqres.Response.GetSetOption(), cli.Error()
  315. }
  316. func (cli *grpcClient) DeliverTxSync(params types.RequestDeliverTx) (*types.ResponseDeliverTx, error) {
  317. reqres := cli.DeliverTxAsync(params)
  318. return cli.finishSyncCall(reqres).GetDeliverTx(), cli.Error()
  319. }
  320. func (cli *grpcClient) CheckTxSync(params types.RequestCheckTx) (*types.ResponseCheckTx, error) {
  321. reqres := cli.CheckTxAsync(params)
  322. return cli.finishSyncCall(reqres).GetCheckTx(), cli.Error()
  323. }
  324. func (cli *grpcClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
  325. reqres := cli.QueryAsync(req)
  326. return cli.finishSyncCall(reqres).GetQuery(), cli.Error()
  327. }
  328. func (cli *grpcClient) CommitSync() (*types.ResponseCommit, error) {
  329. reqres := cli.CommitAsync()
  330. return cli.finishSyncCall(reqres).GetCommit(), cli.Error()
  331. }
  332. func (cli *grpcClient) InitChainSync(params types.RequestInitChain) (*types.ResponseInitChain, error) {
  333. reqres := cli.InitChainAsync(params)
  334. return cli.finishSyncCall(reqres).GetInitChain(), cli.Error()
  335. }
  336. func (cli *grpcClient) BeginBlockSync(params types.RequestBeginBlock) (*types.ResponseBeginBlock, error) {
  337. reqres := cli.BeginBlockAsync(params)
  338. return cli.finishSyncCall(reqres).GetBeginBlock(), cli.Error()
  339. }
  340. func (cli *grpcClient) EndBlockSync(params types.RequestEndBlock) (*types.ResponseEndBlock, error) {
  341. reqres := cli.EndBlockAsync(params)
  342. return cli.finishSyncCall(reqres).GetEndBlock(), cli.Error()
  343. }
  344. func (cli *grpcClient) ListSnapshotsSync(params types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
  345. reqres := cli.ListSnapshotsAsync(params)
  346. return cli.finishSyncCall(reqres).GetListSnapshots(), cli.Error()
  347. }
  348. func (cli *grpcClient) OfferSnapshotSync(params types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
  349. reqres := cli.OfferSnapshotAsync(params)
  350. return cli.finishSyncCall(reqres).GetOfferSnapshot(), cli.Error()
  351. }
  352. func (cli *grpcClient) LoadSnapshotChunkSync(
  353. params types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
  354. reqres := cli.LoadSnapshotChunkAsync(params)
  355. return cli.finishSyncCall(reqres).GetLoadSnapshotChunk(), cli.Error()
  356. }
  357. func (cli *grpcClient) ApplySnapshotChunkSync(
  358. params types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
  359. reqres := cli.ApplySnapshotChunkAsync(params)
  360. return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error()
  361. }