You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

301 lines
8.8 KiB

8 years ago
8 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
8 years ago
7 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
7 years ago
7 years ago
  1. package abcicli
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "time"
  7. context "golang.org/x/net/context"
  8. grpc "google.golang.org/grpc"
  9. "github.com/tendermint/tendermint/abci/types"
  10. cmn "github.com/tendermint/tmlibs/common"
  11. )
  12. var _ Client = (*grpcClient)(nil)
  13. // A stripped copy of the remoteClient that makes
  14. // synchronous calls using grpc
  15. type grpcClient struct {
  16. cmn.BaseService
  17. mustConnect bool
  18. client types.ABCIApplicationClient
  19. mtx sync.Mutex
  20. addr string
  21. err error
  22. resCb func(*types.Request, *types.Response) // listens to all callbacks
  23. }
  24. func NewGRPCClient(addr string, mustConnect bool) *grpcClient {
  25. cli := &grpcClient{
  26. addr: addr,
  27. mustConnect: mustConnect,
  28. }
  29. cli.BaseService = *cmn.NewBaseService(nil, "grpcClient", cli)
  30. return cli
  31. }
  32. func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
  33. return cmn.Connect(addr)
  34. }
  35. func (cli *grpcClient) OnStart() error {
  36. if err := cli.BaseService.OnStart(); err != nil {
  37. return err
  38. }
  39. RETRY_LOOP:
  40. for {
  41. conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
  42. if err != nil {
  43. if cli.mustConnect {
  44. return err
  45. }
  46. cli.Logger.Error(fmt.Sprintf("abci.grpcClient failed to connect to %v. Retrying...\n", cli.addr))
  47. time.Sleep(time.Second * dialRetryIntervalSeconds)
  48. continue RETRY_LOOP
  49. }
  50. cli.Logger.Info("Dialed server. Waiting for echo.", "addr", cli.addr)
  51. client := types.NewABCIApplicationClient(conn)
  52. ENSURE_CONNECTED:
  53. for {
  54. _, err := client.Echo(context.Background(), &types.RequestEcho{"hello"}, grpc.FailFast(true))
  55. if err == nil {
  56. break ENSURE_CONNECTED
  57. }
  58. cli.Logger.Error("Echo failed", "err", err)
  59. time.Sleep(time.Second * echoRetryIntervalSeconds)
  60. }
  61. cli.client = client
  62. return nil
  63. }
  64. }
  65. func (cli *grpcClient) OnStop() {
  66. cli.BaseService.OnStop()
  67. cli.mtx.Lock()
  68. defer cli.mtx.Unlock()
  69. // TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close()
  70. /*if cli.client.conn != nil {
  71. cli.client.conn.Close()
  72. }*/
  73. }
  74. func (cli *grpcClient) StopForError(err error) {
  75. cli.mtx.Lock()
  76. if !cli.IsRunning() {
  77. return
  78. }
  79. if cli.err == nil {
  80. cli.err = err
  81. }
  82. cli.mtx.Unlock()
  83. cli.Logger.Error(fmt.Sprintf("Stopping abci.grpcClient for error: %v", err.Error()))
  84. cli.Stop()
  85. }
  86. func (cli *grpcClient) Error() error {
  87. cli.mtx.Lock()
  88. defer cli.mtx.Unlock()
  89. return cli.err
  90. }
  91. // Set listener for all responses
  92. // NOTE: callback may get internally generated flush responses.
  93. func (cli *grpcClient) SetResponseCallback(resCb Callback) {
  94. cli.mtx.Lock()
  95. defer cli.mtx.Unlock()
  96. cli.resCb = resCb
  97. }
  98. //----------------------------------------
  99. // GRPC calls are synchronous, but some callbacks expect to be called asynchronously
  100. // (eg. the mempool expects to be able to lock to remove bad txs from cache).
  101. // To accommodate, we finish each call in its own go-routine,
  102. // which is expensive, but easy - if you want something better, use the socket protocol!
  103. // maybe one day, if people really want it, we use grpc streams,
  104. // but hopefully not :D
  105. func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
  106. req := types.ToRequestEcho(msg)
  107. res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.FailFast(true))
  108. if err != nil {
  109. cli.StopForError(err)
  110. }
  111. return cli.finishAsyncCall(req, &types.Response{&types.Response_Echo{res}})
  112. }
  113. func (cli *grpcClient) FlushAsync() *ReqRes {
  114. req := types.ToRequestFlush()
  115. res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.FailFast(true))
  116. if err != nil {
  117. cli.StopForError(err)
  118. }
  119. return cli.finishAsyncCall(req, &types.Response{&types.Response_Flush{res}})
  120. }
  121. func (cli *grpcClient) InfoAsync(params types.RequestInfo) *ReqRes {
  122. req := types.ToRequestInfo(params)
  123. res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.FailFast(true))
  124. if err != nil {
  125. cli.StopForError(err)
  126. }
  127. return cli.finishAsyncCall(req, &types.Response{&types.Response_Info{res}})
  128. }
  129. func (cli *grpcClient) SetOptionAsync(params types.RequestSetOption) *ReqRes {
  130. req := types.ToRequestSetOption(params)
  131. res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.FailFast(true))
  132. if err != nil {
  133. cli.StopForError(err)
  134. }
  135. return cli.finishAsyncCall(req, &types.Response{&types.Response_SetOption{res}})
  136. }
  137. func (cli *grpcClient) DeliverTxAsync(tx []byte) *ReqRes {
  138. req := types.ToRequestDeliverTx(tx)
  139. res, err := cli.client.DeliverTx(context.Background(), req.GetDeliverTx(), grpc.FailFast(true))
  140. if err != nil {
  141. cli.StopForError(err)
  142. }
  143. return cli.finishAsyncCall(req, &types.Response{&types.Response_DeliverTx{res}})
  144. }
  145. func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes {
  146. req := types.ToRequestCheckTx(tx)
  147. res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true))
  148. if err != nil {
  149. cli.StopForError(err)
  150. }
  151. return cli.finishAsyncCall(req, &types.Response{&types.Response_CheckTx{res}})
  152. }
  153. func (cli *grpcClient) QueryAsync(params types.RequestQuery) *ReqRes {
  154. req := types.ToRequestQuery(params)
  155. res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true))
  156. if err != nil {
  157. cli.StopForError(err)
  158. }
  159. return cli.finishAsyncCall(req, &types.Response{&types.Response_Query{res}})
  160. }
  161. func (cli *grpcClient) CommitAsync() *ReqRes {
  162. req := types.ToRequestCommit()
  163. res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.FailFast(true))
  164. if err != nil {
  165. cli.StopForError(err)
  166. }
  167. return cli.finishAsyncCall(req, &types.Response{&types.Response_Commit{res}})
  168. }
  169. func (cli *grpcClient) InitChainAsync(params types.RequestInitChain) *ReqRes {
  170. req := types.ToRequestInitChain(params)
  171. res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.FailFast(true))
  172. if err != nil {
  173. cli.StopForError(err)
  174. }
  175. return cli.finishAsyncCall(req, &types.Response{&types.Response_InitChain{res}})
  176. }
  177. func (cli *grpcClient) BeginBlockAsync(params types.RequestBeginBlock) *ReqRes {
  178. req := types.ToRequestBeginBlock(params)
  179. res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.FailFast(true))
  180. if err != nil {
  181. cli.StopForError(err)
  182. }
  183. return cli.finishAsyncCall(req, &types.Response{&types.Response_BeginBlock{res}})
  184. }
  185. func (cli *grpcClient) EndBlockAsync(params types.RequestEndBlock) *ReqRes {
  186. req := types.ToRequestEndBlock(params)
  187. res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.FailFast(true))
  188. if err != nil {
  189. cli.StopForError(err)
  190. }
  191. return cli.finishAsyncCall(req, &types.Response{&types.Response_EndBlock{res}})
  192. }
  193. func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
  194. reqres := NewReqRes(req)
  195. reqres.Response = res // Set response
  196. reqres.Done() // Release waiters
  197. reqres.SetDone() // so reqRes.SetCallback will run the callback
  198. // go routine for callbacks
  199. go func() {
  200. // Notify reqRes listener if set
  201. if cb := reqres.GetCallback(); cb != nil {
  202. cb(res)
  203. }
  204. // Notify client listener if set
  205. if cli.resCb != nil {
  206. cli.resCb(reqres.Request, res)
  207. }
  208. }()
  209. return reqres
  210. }
  211. //----------------------------------------
  212. func (cli *grpcClient) FlushSync() error {
  213. return nil
  214. }
  215. func (cli *grpcClient) EchoSync(msg string) (*types.ResponseEcho, error) {
  216. reqres := cli.EchoAsync(msg)
  217. // StopForError should already have been called if error is set
  218. return reqres.Response.GetEcho(), cli.Error()
  219. }
  220. func (cli *grpcClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
  221. reqres := cli.InfoAsync(req)
  222. return reqres.Response.GetInfo(), cli.Error()
  223. }
  224. func (cli *grpcClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) {
  225. reqres := cli.SetOptionAsync(req)
  226. return reqres.Response.GetSetOption(), cli.Error()
  227. }
  228. func (cli *grpcClient) DeliverTxSync(tx []byte) (*types.ResponseDeliverTx, error) {
  229. reqres := cli.DeliverTxAsync(tx)
  230. return reqres.Response.GetDeliverTx(), cli.Error()
  231. }
  232. func (cli *grpcClient) CheckTxSync(tx []byte) (*types.ResponseCheckTx, error) {
  233. reqres := cli.CheckTxAsync(tx)
  234. return reqres.Response.GetCheckTx(), cli.Error()
  235. }
  236. func (cli *grpcClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
  237. reqres := cli.QueryAsync(req)
  238. return reqres.Response.GetQuery(), cli.Error()
  239. }
  240. func (cli *grpcClient) CommitSync() (*types.ResponseCommit, error) {
  241. reqres := cli.CommitAsync()
  242. return reqres.Response.GetCommit(), cli.Error()
  243. }
  244. func (cli *grpcClient) InitChainSync(params types.RequestInitChain) (*types.ResponseInitChain, error) {
  245. reqres := cli.InitChainAsync(params)
  246. return reqres.Response.GetInitChain(), cli.Error()
  247. }
  248. func (cli *grpcClient) BeginBlockSync(params types.RequestBeginBlock) (*types.ResponseBeginBlock, error) {
  249. reqres := cli.BeginBlockAsync(params)
  250. return reqres.Response.GetBeginBlock(), cli.Error()
  251. }
  252. func (cli *grpcClient) EndBlockSync(params types.RequestEndBlock) (*types.ResponseEndBlock, error) {
  253. reqres := cli.EndBlockAsync(params)
  254. return reqres.Response.GetEndBlock(), cli.Error()
  255. }