You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

343 lines
9.5 KiB

8 years ago
8 years ago
8 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
  1. package abcicli
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "time"
  7. context "golang.org/x/net/context"
  8. grpc "google.golang.org/grpc"
  9. "github.com/tendermint/abci/types"
  10. cmn "github.com/tendermint/tmlibs/common"
  11. )
  12. // A stripped copy of the remoteClient that makes
  13. // synchronous calls using grpc
  14. type grpcClient struct {
  15. cmn.BaseService
  16. mustConnect bool
  17. client types.ABCIApplicationClient
  18. mtx sync.Mutex
  19. addr string
  20. err error
  21. resCb func(*types.Request, *types.Response) // listens to all callbacks
  22. }
  23. func NewGRPCClient(addr string, mustConnect bool) *grpcClient {
  24. cli := &grpcClient{
  25. addr: addr,
  26. mustConnect: mustConnect,
  27. }
  28. cli.BaseService = *cmn.NewBaseService(nil, "grpcClient", cli)
  29. return cli
  30. }
  31. func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
  32. return cmn.Connect(addr)
  33. }
  34. func (cli *grpcClient) OnStart() error {
  35. if err := cli.BaseService.OnStart(); err != nil {
  36. return err
  37. }
  38. RETRY_LOOP:
  39. for {
  40. conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
  41. if err != nil {
  42. if cli.mustConnect {
  43. return err
  44. }
  45. cli.Logger.Error(fmt.Sprintf("abci.grpcClient failed to connect to %v. Retrying...\n", cli.addr))
  46. time.Sleep(time.Second * 3)
  47. continue RETRY_LOOP
  48. }
  49. client := types.NewABCIApplicationClient(conn)
  50. ENSURE_CONNECTED:
  51. for {
  52. _, err := client.Echo(context.Background(), &types.RequestEcho{"hello"}, grpc.FailFast(true))
  53. if err == nil {
  54. break ENSURE_CONNECTED
  55. }
  56. time.Sleep(time.Second)
  57. }
  58. cli.client = client
  59. return nil
  60. }
  61. }
  62. func (cli *grpcClient) OnStop() {
  63. cli.BaseService.OnStop()
  64. cli.mtx.Lock()
  65. defer cli.mtx.Unlock()
  66. // TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close()
  67. /*if cli.conn != nil {
  68. cli.conn.Close()
  69. }*/
  70. }
  71. func (cli *grpcClient) StopForError(err error) {
  72. cli.mtx.Lock()
  73. if !cli.IsRunning() {
  74. return
  75. }
  76. if cli.err == nil {
  77. cli.err = err
  78. }
  79. cli.mtx.Unlock()
  80. cli.Logger.Error(fmt.Sprintf("Stopping abci.grpcClient for error: %v", err.Error()))
  81. cli.Stop()
  82. }
  83. func (cli *grpcClient) Error() error {
  84. cli.mtx.Lock()
  85. defer cli.mtx.Unlock()
  86. return cli.err
  87. }
  88. // Set listener for all responses
  89. // NOTE: callback may get internally generated flush responses.
  90. func (cli *grpcClient) SetResponseCallback(resCb Callback) {
  91. cli.mtx.Lock()
  92. defer cli.mtx.Unlock()
  93. cli.resCb = resCb
  94. }
  95. //----------------------------------------
  96. // GRPC calls are synchronous, but some callbacks expect to be called asynchronously
  97. // (eg. the mempool expects to be able to lock to remove bad txs from cache).
  98. // To accommodate, we finish each call in its own go-routine,
  99. // which is expensive, but easy - if you want something better, use the socket protocol!
  100. // maybe one day, if people really want it, we use grpc streams,
  101. // but hopefully not :D
  102. func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
  103. req := types.ToRequestEcho(msg)
  104. res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.FailFast(true))
  105. if err != nil {
  106. cli.StopForError(err)
  107. }
  108. return cli.finishAsyncCall(req, &types.Response{&types.Response_Echo{res}})
  109. }
  110. func (cli *grpcClient) FlushAsync() *ReqRes {
  111. req := types.ToRequestFlush()
  112. res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.FailFast(true))
  113. if err != nil {
  114. cli.StopForError(err)
  115. }
  116. return cli.finishAsyncCall(req, &types.Response{&types.Response_Flush{res}})
  117. }
  118. func (cli *grpcClient) InfoAsync(params types.RequestInfo) *ReqRes {
  119. req := types.ToRequestInfo(params)
  120. res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.FailFast(true))
  121. if err != nil {
  122. cli.StopForError(err)
  123. }
  124. return cli.finishAsyncCall(req, &types.Response{&types.Response_Info{res}})
  125. }
  126. func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes {
  127. req := types.ToRequestSetOption(key, value)
  128. res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.FailFast(true))
  129. if err != nil {
  130. cli.StopForError(err)
  131. }
  132. return cli.finishAsyncCall(req, &types.Response{&types.Response_SetOption{res}})
  133. }
  134. func (cli *grpcClient) DeliverTxAsync(tx []byte) *ReqRes {
  135. req := types.ToRequestDeliverTx(tx)
  136. res, err := cli.client.DeliverTx(context.Background(), req.GetDeliverTx(), grpc.FailFast(true))
  137. if err != nil {
  138. cli.StopForError(err)
  139. }
  140. return cli.finishAsyncCall(req, &types.Response{&types.Response_DeliverTx{res}})
  141. }
  142. func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes {
  143. req := types.ToRequestCheckTx(tx)
  144. res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true))
  145. if err != nil {
  146. cli.StopForError(err)
  147. }
  148. return cli.finishAsyncCall(req, &types.Response{&types.Response_CheckTx{res}})
  149. }
  150. func (cli *grpcClient) QueryAsync(params types.RequestQuery) *ReqRes {
  151. req := types.ToRequestQuery(params)
  152. res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true))
  153. if err != nil {
  154. cli.StopForError(err)
  155. }
  156. return cli.finishAsyncCall(req, &types.Response{&types.Response_Query{res}})
  157. }
  158. func (cli *grpcClient) CommitAsync() *ReqRes {
  159. req := types.ToRequestCommit()
  160. res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.FailFast(true))
  161. if err != nil {
  162. cli.StopForError(err)
  163. }
  164. return cli.finishAsyncCall(req, &types.Response{&types.Response_Commit{res}})
  165. }
  166. func (cli *grpcClient) InitChainAsync(params types.RequestInitChain) *ReqRes {
  167. req := types.ToRequestInitChain(params)
  168. res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.FailFast(true))
  169. if err != nil {
  170. cli.StopForError(err)
  171. }
  172. return cli.finishAsyncCall(req, &types.Response{&types.Response_InitChain{res}})
  173. }
  174. func (cli *grpcClient) BeginBlockAsync(params types.RequestBeginBlock) *ReqRes {
  175. req := types.ToRequestBeginBlock(params)
  176. res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.FailFast(true))
  177. if err != nil {
  178. cli.StopForError(err)
  179. }
  180. return cli.finishAsyncCall(req, &types.Response{&types.Response_BeginBlock{res}})
  181. }
  182. func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes {
  183. req := types.ToRequestEndBlock(height)
  184. res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.FailFast(true))
  185. if err != nil {
  186. cli.StopForError(err)
  187. }
  188. return cli.finishAsyncCall(req, &types.Response{&types.Response_EndBlock{res}})
  189. }
  190. func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
  191. reqres := NewReqRes(req)
  192. reqres.Response = res // Set response
  193. reqres.Done() // Release waiters
  194. reqres.SetDone() // so reqRes.SetCallback will run the callback
  195. // go routine for callbacks
  196. go func() {
  197. // Notify reqRes listener if set
  198. if cb := reqres.GetCallback(); cb != nil {
  199. cb(res)
  200. }
  201. // Notify client listener if set
  202. if cli.resCb != nil {
  203. cli.resCb(reqres.Request, res)
  204. }
  205. }()
  206. return reqres
  207. }
  208. func (cli *grpcClient) checkErrGetResult() types.Result {
  209. if err := cli.Error(); err != nil {
  210. // StopForError should already have been called if error is set
  211. return types.ErrInternalError.SetLog(err.Error())
  212. }
  213. return types.Result{}
  214. }
  215. //----------------------------------------
  216. func (cli *grpcClient) EchoSync(msg string) (res types.Result) {
  217. reqres := cli.EchoAsync(msg)
  218. if res := cli.checkErrGetResult(); res.IsErr() {
  219. return res
  220. }
  221. resp := reqres.Response.GetEcho()
  222. return types.NewResultOK([]byte(resp.Message), "")
  223. }
  224. func (cli *grpcClient) FlushSync() error {
  225. return nil
  226. }
  227. func (cli *grpcClient) InfoSync(req types.RequestInfo) (resInfo types.ResponseInfo, err error) {
  228. reqres := cli.InfoAsync(req)
  229. if err = cli.Error(); err != nil {
  230. return resInfo, err
  231. }
  232. if info := reqres.Response.GetInfo(); info != nil {
  233. return *info, nil
  234. }
  235. return resInfo, nil
  236. }
  237. func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) {
  238. reqres := cli.SetOptionAsync(key, value)
  239. if res := cli.checkErrGetResult(); res.IsErr() {
  240. return res
  241. }
  242. resp := reqres.Response.GetSetOption()
  243. return types.Result{Code: OK, Data: nil, Log: resp.Log}
  244. }
  245. func (cli *grpcClient) DeliverTxSync(tx []byte) (res types.Result) {
  246. reqres := cli.DeliverTxAsync(tx)
  247. if res := cli.checkErrGetResult(); res.IsErr() {
  248. return res
  249. }
  250. resp := reqres.Response.GetDeliverTx()
  251. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  252. }
  253. func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) {
  254. reqres := cli.CheckTxAsync(tx)
  255. if res := cli.checkErrGetResult(); res.IsErr() {
  256. return res
  257. }
  258. resp := reqres.Response.GetCheckTx()
  259. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  260. }
  261. func (cli *grpcClient) QuerySync(reqQuery types.RequestQuery) (resQuery types.ResponseQuery, err error) {
  262. reqres := cli.QueryAsync(reqQuery)
  263. if err = cli.Error(); err != nil {
  264. return resQuery, err
  265. }
  266. if resQuery_ := reqres.Response.GetQuery(); resQuery_ != nil {
  267. return *resQuery_, nil
  268. }
  269. return resQuery, nil
  270. }
  271. func (cli *grpcClient) CommitSync() (res types.Result) {
  272. reqres := cli.CommitAsync()
  273. if res := cli.checkErrGetResult(); res.IsErr() {
  274. return res
  275. }
  276. resp := reqres.Response.GetCommit()
  277. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  278. }
  279. func (cli *grpcClient) InitChainSync(params types.RequestInitChain) (err error) {
  280. cli.InitChainAsync(params)
  281. return cli.Error()
  282. }
  283. func (cli *grpcClient) BeginBlockSync(params types.RequestBeginBlock) (err error) {
  284. cli.BeginBlockAsync(params)
  285. return cli.Error()
  286. }
  287. func (cli *grpcClient) EndBlockSync(height uint64) (resEndBlock types.ResponseEndBlock, err error) {
  288. reqres := cli.EndBlockAsync(height)
  289. if err := cli.Error(); err != nil {
  290. return resEndBlock, err
  291. }
  292. if blk := reqres.Response.GetEndBlock(); blk != nil {
  293. return *blk, nil
  294. }
  295. return resEndBlock, nil
  296. }