You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

303 lines
8.9 KiB

8 years ago
8 years ago
8 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
8 years ago
9 years ago
8 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
7 years ago
7 years ago
  1. package abcicli
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "time"
  7. "github.com/pkg/errors"
  8. context "golang.org/x/net/context"
  9. grpc "google.golang.org/grpc"
  10. "github.com/tendermint/abci/types"
  11. cmn "github.com/tendermint/tmlibs/common"
  12. )
  13. var _ Client = (*grpcClient)(nil)
  14. // A stripped copy of the remoteClient that makes
  15. // synchronous calls using grpc
  16. type grpcClient struct {
  17. cmn.BaseService
  18. mustConnect bool
  19. client types.ABCIApplicationClient
  20. mtx sync.Mutex
  21. addr string
  22. err error
  23. resCb func(*types.Request, *types.Response) // listens to all callbacks
  24. }
  25. func NewGRPCClient(addr string, mustConnect bool) *grpcClient {
  26. cli := &grpcClient{
  27. addr: addr,
  28. mustConnect: mustConnect,
  29. }
  30. cli.BaseService = *cmn.NewBaseService(nil, "grpcClient", cli)
  31. return cli
  32. }
  33. func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
  34. return cmn.Connect(addr)
  35. }
  36. func (cli *grpcClient) OnStart() error {
  37. if err := cli.BaseService.OnStart(); err != nil {
  38. return err
  39. }
  40. RETRY_LOOP:
  41. for {
  42. conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
  43. if err != nil {
  44. if cli.mustConnect {
  45. return err
  46. }
  47. cli.Logger.Error(fmt.Sprintf("abci.grpcClient failed to connect to %v. Retrying...\n", cli.addr))
  48. time.Sleep(time.Second * dialRetryIntervalSeconds)
  49. continue RETRY_LOOP
  50. }
  51. cli.Logger.Info("Dialed server. Waiting for echo.", "addr", cli.addr)
  52. client := types.NewABCIApplicationClient(conn)
  53. ENSURE_CONNECTED:
  54. for {
  55. _, err := client.Echo(context.Background(), &types.RequestEcho{"hello"}, grpc.FailFast(true))
  56. if err == nil {
  57. break ENSURE_CONNECTED
  58. }
  59. cli.Logger.Info("Echo failed", "err", err)
  60. time.Sleep(time.Second * echoRetryIntervalSeconds)
  61. }
  62. cli.client = client
  63. return nil
  64. }
  65. }
  66. func (cli *grpcClient) OnStop() {
  67. cli.BaseService.OnStop()
  68. cli.mtx.Lock()
  69. defer cli.mtx.Unlock()
  70. // TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close()
  71. /*if cli.conn != nil {
  72. cli.conn.Close()
  73. }*/
  74. }
  75. func (cli *grpcClient) StopForError(err error) {
  76. cli.mtx.Lock()
  77. if !cli.IsRunning() {
  78. return
  79. }
  80. if cli.err == nil {
  81. cli.err = err
  82. }
  83. cli.mtx.Unlock()
  84. cli.Logger.Error(fmt.Sprintf("Stopping abci.grpcClient for error: %v", err.Error()))
  85. cli.Stop()
  86. }
  87. func (cli *grpcClient) Error() error {
  88. cli.mtx.Lock()
  89. defer cli.mtx.Unlock()
  90. return errors.Wrap(cli.err, "grpc client error")
  91. }
  92. // Set listener for all responses
  93. // NOTE: callback may get internally generated flush responses.
  94. func (cli *grpcClient) SetResponseCallback(resCb Callback) {
  95. cli.mtx.Lock()
  96. defer cli.mtx.Unlock()
  97. cli.resCb = resCb
  98. }
  99. //----------------------------------------
  100. // GRPC calls are synchronous, but some callbacks expect to be called asynchronously
  101. // (eg. the mempool expects to be able to lock to remove bad txs from cache).
  102. // To accommodate, we finish each call in its own go-routine,
  103. // which is expensive, but easy - if you want something better, use the socket protocol!
  104. // maybe one day, if people really want it, we use grpc streams,
  105. // but hopefully not :D
  106. func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
  107. req := types.ToRequestEcho(msg)
  108. res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.FailFast(true))
  109. if err != nil {
  110. cli.StopForError(err)
  111. }
  112. return cli.finishAsyncCall(req, &types.Response{&types.Response_Echo{res}})
  113. }
  114. func (cli *grpcClient) FlushAsync() *ReqRes {
  115. req := types.ToRequestFlush()
  116. res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.FailFast(true))
  117. if err != nil {
  118. cli.StopForError(err)
  119. }
  120. return cli.finishAsyncCall(req, &types.Response{&types.Response_Flush{res}})
  121. }
  122. func (cli *grpcClient) InfoAsync(params types.RequestInfo) *ReqRes {
  123. req := types.ToRequestInfo(params)
  124. res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.FailFast(true))
  125. if err != nil {
  126. cli.StopForError(err)
  127. }
  128. return cli.finishAsyncCall(req, &types.Response{&types.Response_Info{res}})
  129. }
  130. func (cli *grpcClient) SetOptionAsync(params types.RequestSetOption) *ReqRes {
  131. req := types.ToRequestSetOption(params)
  132. res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.FailFast(true))
  133. if err != nil {
  134. cli.StopForError(err)
  135. }
  136. return cli.finishAsyncCall(req, &types.Response{&types.Response_SetOption{res}})
  137. }
  138. func (cli *grpcClient) DeliverTxAsync(tx []byte) *ReqRes {
  139. req := types.ToRequestDeliverTx(tx)
  140. res, err := cli.client.DeliverTx(context.Background(), req.GetDeliverTx(), grpc.FailFast(true))
  141. if err != nil {
  142. cli.StopForError(err)
  143. }
  144. return cli.finishAsyncCall(req, &types.Response{&types.Response_DeliverTx{res}})
  145. }
  146. func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes {
  147. req := types.ToRequestCheckTx(tx)
  148. res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true))
  149. if err != nil {
  150. cli.StopForError(err)
  151. }
  152. return cli.finishAsyncCall(req, &types.Response{&types.Response_CheckTx{res}})
  153. }
  154. func (cli *grpcClient) QueryAsync(params types.RequestQuery) *ReqRes {
  155. req := types.ToRequestQuery(params)
  156. res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true))
  157. if err != nil {
  158. cli.StopForError(err)
  159. }
  160. return cli.finishAsyncCall(req, &types.Response{&types.Response_Query{res}})
  161. }
  162. func (cli *grpcClient) CommitAsync() *ReqRes {
  163. req := types.ToRequestCommit()
  164. res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.FailFast(true))
  165. if err != nil {
  166. cli.StopForError(err)
  167. }
  168. return cli.finishAsyncCall(req, &types.Response{&types.Response_Commit{res}})
  169. }
  170. func (cli *grpcClient) InitChainAsync(params types.RequestInitChain) *ReqRes {
  171. req := types.ToRequestInitChain(params)
  172. res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.FailFast(true))
  173. if err != nil {
  174. cli.StopForError(err)
  175. }
  176. return cli.finishAsyncCall(req, &types.Response{&types.Response_InitChain{res}})
  177. }
  178. func (cli *grpcClient) BeginBlockAsync(params types.RequestBeginBlock) *ReqRes {
  179. req := types.ToRequestBeginBlock(params)
  180. res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.FailFast(true))
  181. if err != nil {
  182. cli.StopForError(err)
  183. }
  184. return cli.finishAsyncCall(req, &types.Response{&types.Response_BeginBlock{res}})
  185. }
  186. func (cli *grpcClient) EndBlockAsync(params types.RequestEndBlock) *ReqRes {
  187. req := types.ToRequestEndBlock(params)
  188. res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.FailFast(true))
  189. if err != nil {
  190. cli.StopForError(err)
  191. }
  192. return cli.finishAsyncCall(req, &types.Response{&types.Response_EndBlock{res}})
  193. }
  194. func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
  195. reqres := NewReqRes(req)
  196. reqres.Response = res // Set response
  197. reqres.Done() // Release waiters
  198. reqres.SetDone() // so reqRes.SetCallback will run the callback
  199. // go routine for callbacks
  200. go func() {
  201. // Notify reqRes listener if set
  202. if cb := reqres.GetCallback(); cb != nil {
  203. cb(res)
  204. }
  205. // Notify client listener if set
  206. if cli.resCb != nil {
  207. cli.resCb(reqres.Request, res)
  208. }
  209. }()
  210. return reqres
  211. }
  212. //----------------------------------------
  213. func (cli *grpcClient) FlushSync() error {
  214. return nil
  215. }
  216. func (cli *grpcClient) EchoSync(msg string) (*types.ResponseEcho, error) {
  217. reqres := cli.EchoAsync(msg)
  218. // StopForError should already have been called if error is set
  219. return reqres.Response.GetEcho(), cli.Error()
  220. }
  221. func (cli *grpcClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
  222. reqres := cli.InfoAsync(req)
  223. return reqres.Response.GetInfo(), cli.Error()
  224. }
  225. func (cli *grpcClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) {
  226. reqres := cli.SetOptionAsync(req)
  227. return reqres.Response.GetSetOption(), cli.Error()
  228. }
  229. func (cli *grpcClient) DeliverTxSync(tx []byte) (*types.ResponseDeliverTx, error) {
  230. reqres := cli.DeliverTxAsync(tx)
  231. return reqres.Response.GetDeliverTx(), cli.Error()
  232. }
  233. func (cli *grpcClient) CheckTxSync(tx []byte) (*types.ResponseCheckTx, error) {
  234. reqres := cli.CheckTxAsync(tx)
  235. return reqres.Response.GetCheckTx(), cli.Error()
  236. }
  237. func (cli *grpcClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
  238. reqres := cli.QueryAsync(req)
  239. return reqres.Response.GetQuery(), cli.Error()
  240. }
  241. func (cli *grpcClient) CommitSync() (*types.ResponseCommit, error) {
  242. reqres := cli.CommitAsync()
  243. return reqres.Response.GetCommit(), cli.Error()
  244. }
  245. func (cli *grpcClient) InitChainSync(params types.RequestInitChain) (*types.ResponseInitChain, error) {
  246. reqres := cli.InitChainAsync(params)
  247. return reqres.Response.GetInitChain(), cli.Error()
  248. }
  249. func (cli *grpcClient) BeginBlockSync(params types.RequestBeginBlock) (*types.ResponseBeginBlock, error) {
  250. reqres := cli.BeginBlockAsync(params)
  251. return reqres.Response.GetBeginBlock(), cli.Error()
  252. }
  253. func (cli *grpcClient) EndBlockSync(params types.RequestEndBlock) (*types.ResponseEndBlock, error) {
  254. reqres := cli.EndBlockAsync(params)
  255. return reqres.Response.GetEndBlock(), cli.Error()
  256. }