You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

341 lines
9.5 KiB

8 years ago
8 years ago
8 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
  1. package abcicli
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "time"
  7. context "golang.org/x/net/context"
  8. grpc "google.golang.org/grpc"
  9. "github.com/tendermint/abci/types"
  10. cmn "github.com/tendermint/tmlibs/common"
  11. )
  12. // A stripped copy of the remoteClient that makes
  13. // synchronous calls using grpc
  14. type grpcClient struct {
  15. cmn.BaseService
  16. mustConnect bool
  17. client types.ABCIApplicationClient
  18. mtx sync.Mutex
  19. addr string
  20. err error
  21. resCb func(*types.Request, *types.Response) // listens to all callbacks
  22. }
  23. func NewGRPCClient(addr string, mustConnect bool) *grpcClient {
  24. cli := &grpcClient{
  25. addr: addr,
  26. mustConnect: mustConnect,
  27. }
  28. cli.BaseService = *cmn.NewBaseService(nil, "grpcClient", cli)
  29. return cli
  30. }
  31. func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
  32. return cmn.Connect(addr)
  33. }
  34. func (cli *grpcClient) OnStart() error {
  35. cli.BaseService.OnStart()
  36. RETRY_LOOP:
  37. for {
  38. conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
  39. if err != nil {
  40. if cli.mustConnect {
  41. return err
  42. }
  43. cli.Logger.Error(fmt.Sprintf("abci.grpcClient failed to connect to %v. Retrying...\n", cli.addr))
  44. time.Sleep(time.Second * 3)
  45. continue RETRY_LOOP
  46. }
  47. client := types.NewABCIApplicationClient(conn)
  48. ENSURE_CONNECTED:
  49. for {
  50. _, err := client.Echo(context.Background(), &types.RequestEcho{"hello"}, grpc.FailFast(true))
  51. if err == nil {
  52. break ENSURE_CONNECTED
  53. }
  54. time.Sleep(time.Second)
  55. }
  56. cli.client = client
  57. return nil
  58. }
  59. }
  60. func (cli *grpcClient) OnStop() {
  61. cli.BaseService.OnStop()
  62. cli.mtx.Lock()
  63. defer cli.mtx.Unlock()
  64. // TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close()
  65. /*if cli.conn != nil {
  66. cli.conn.Close()
  67. }*/
  68. }
  69. func (cli *grpcClient) StopForError(err error) {
  70. cli.mtx.Lock()
  71. if !cli.IsRunning() {
  72. return
  73. }
  74. if cli.err == nil {
  75. cli.err = err
  76. }
  77. cli.mtx.Unlock()
  78. cli.Logger.Error(fmt.Sprintf("Stopping abci.grpcClient for error: %v", err.Error()))
  79. cli.Stop()
  80. }
  81. func (cli *grpcClient) Error() error {
  82. cli.mtx.Lock()
  83. defer cli.mtx.Unlock()
  84. return cli.err
  85. }
  86. // Set listener for all responses
  87. // NOTE: callback may get internally generated flush responses.
  88. func (cli *grpcClient) SetResponseCallback(resCb Callback) {
  89. cli.mtx.Lock()
  90. defer cli.mtx.Unlock()
  91. cli.resCb = resCb
  92. }
  93. //----------------------------------------
  94. // GRPC calls are synchronous, but some callbacks expect to be called asynchronously
  95. // (eg. the mempool expects to be able to lock to remove bad txs from cache).
  96. // To accomodate, we finish each call in its own go-routine,
  97. // which is expensive, but easy - if you want something better, use the socket protocol!
  98. // maybe one day, if people really want it, we use grpc streams,
  99. // but hopefully not :D
  100. func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
  101. req := types.ToRequestEcho(msg)
  102. res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.FailFast(true))
  103. if err != nil {
  104. cli.StopForError(err)
  105. }
  106. return cli.finishAsyncCall(req, &types.Response{&types.Response_Echo{res}})
  107. }
  108. func (cli *grpcClient) FlushAsync() *ReqRes {
  109. req := types.ToRequestFlush()
  110. res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.FailFast(true))
  111. if err != nil {
  112. cli.StopForError(err)
  113. }
  114. return cli.finishAsyncCall(req, &types.Response{&types.Response_Flush{res}})
  115. }
  116. func (cli *grpcClient) InfoAsync() *ReqRes {
  117. req := types.ToRequestInfo()
  118. res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.FailFast(true))
  119. if err != nil {
  120. cli.StopForError(err)
  121. }
  122. return cli.finishAsyncCall(req, &types.Response{&types.Response_Info{res}})
  123. }
  124. func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes {
  125. req := types.ToRequestSetOption(key, value)
  126. res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.FailFast(true))
  127. if err != nil {
  128. cli.StopForError(err)
  129. }
  130. return cli.finishAsyncCall(req, &types.Response{&types.Response_SetOption{res}})
  131. }
  132. func (cli *grpcClient) DeliverTxAsync(tx []byte) *ReqRes {
  133. req := types.ToRequestDeliverTx(tx)
  134. res, err := cli.client.DeliverTx(context.Background(), req.GetDeliverTx(), grpc.FailFast(true))
  135. if err != nil {
  136. cli.StopForError(err)
  137. }
  138. return cli.finishAsyncCall(req, &types.Response{&types.Response_DeliverTx{res}})
  139. }
  140. func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes {
  141. req := types.ToRequestCheckTx(tx)
  142. res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true))
  143. if err != nil {
  144. cli.StopForError(err)
  145. }
  146. return cli.finishAsyncCall(req, &types.Response{&types.Response_CheckTx{res}})
  147. }
  148. func (cli *grpcClient) QueryAsync(reqQuery types.RequestQuery) *ReqRes {
  149. req := types.ToRequestQuery(reqQuery)
  150. res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true))
  151. if err != nil {
  152. cli.StopForError(err)
  153. }
  154. return cli.finishAsyncCall(req, &types.Response{&types.Response_Query{res}})
  155. }
  156. func (cli *grpcClient) CommitAsync() *ReqRes {
  157. req := types.ToRequestCommit()
  158. res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.FailFast(true))
  159. if err != nil {
  160. cli.StopForError(err)
  161. }
  162. return cli.finishAsyncCall(req, &types.Response{&types.Response_Commit{res}})
  163. }
  164. func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes {
  165. req := types.ToRequestInitChain(validators)
  166. res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.FailFast(true))
  167. if err != nil {
  168. cli.StopForError(err)
  169. }
  170. return cli.finishAsyncCall(req, &types.Response{&types.Response_InitChain{res}})
  171. }
  172. func (cli *grpcClient) BeginBlockAsync(hash []byte, header *types.Header) *ReqRes {
  173. req := types.ToRequestBeginBlock(hash, header)
  174. res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.FailFast(true))
  175. if err != nil {
  176. cli.StopForError(err)
  177. }
  178. return cli.finishAsyncCall(req, &types.Response{&types.Response_BeginBlock{res}})
  179. }
  180. func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes {
  181. req := types.ToRequestEndBlock(height)
  182. res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.FailFast(true))
  183. if err != nil {
  184. cli.StopForError(err)
  185. }
  186. return cli.finishAsyncCall(req, &types.Response{&types.Response_EndBlock{res}})
  187. }
  188. func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
  189. reqres := NewReqRes(req)
  190. reqres.Response = res // Set response
  191. reqres.Done() // Release waiters
  192. reqres.SetDone() // so reqRes.SetCallback will run the callback
  193. // go routine for callbacks
  194. go func() {
  195. // Notify reqRes listener if set
  196. if cb := reqres.GetCallback(); cb != nil {
  197. cb(res)
  198. }
  199. // Notify client listener if set
  200. if cli.resCb != nil {
  201. cli.resCb(reqres.Request, res)
  202. }
  203. }()
  204. return reqres
  205. }
  206. func (cli *grpcClient) checkErrGetResult() types.Result {
  207. if err := cli.Error(); err != nil {
  208. // StopForError should already have been called if error is set
  209. return types.ErrInternalError.SetLog(err.Error())
  210. }
  211. return types.Result{}
  212. }
  213. //----------------------------------------
  214. func (cli *grpcClient) EchoSync(msg string) (res types.Result) {
  215. reqres := cli.EchoAsync(msg)
  216. if res := cli.checkErrGetResult(); res.IsErr() {
  217. return res
  218. }
  219. resp := reqres.Response.GetEcho()
  220. return types.NewResultOK([]byte(resp.Message), "")
  221. }
  222. func (cli *grpcClient) FlushSync() error {
  223. return nil
  224. }
  225. func (cli *grpcClient) InfoSync() (resInfo types.ResponseInfo, err error) {
  226. reqres := cli.InfoAsync()
  227. if err = cli.Error(); err != nil {
  228. return resInfo, err
  229. }
  230. if info := reqres.Response.GetInfo(); info != nil {
  231. return *info, nil
  232. }
  233. return resInfo, nil
  234. }
  235. func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) {
  236. reqres := cli.SetOptionAsync(key, value)
  237. if res := cli.checkErrGetResult(); res.IsErr() {
  238. return res
  239. }
  240. resp := reqres.Response.GetSetOption()
  241. return types.Result{Code: OK, Data: nil, Log: resp.Log}
  242. }
  243. func (cli *grpcClient) DeliverTxSync(tx []byte) (res types.Result) {
  244. reqres := cli.DeliverTxAsync(tx)
  245. if res := cli.checkErrGetResult(); res.IsErr() {
  246. return res
  247. }
  248. resp := reqres.Response.GetDeliverTx()
  249. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  250. }
  251. func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) {
  252. reqres := cli.CheckTxAsync(tx)
  253. if res := cli.checkErrGetResult(); res.IsErr() {
  254. return res
  255. }
  256. resp := reqres.Response.GetCheckTx()
  257. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  258. }
  259. func (cli *grpcClient) QuerySync(reqQuery types.RequestQuery) (resQuery types.ResponseQuery, err error) {
  260. reqres := cli.QueryAsync(reqQuery)
  261. if err = cli.Error(); err != nil {
  262. return resQuery, err
  263. }
  264. if resQuery_ := reqres.Response.GetQuery(); resQuery_ != nil {
  265. return *resQuery_, nil
  266. }
  267. return resQuery, nil
  268. }
  269. func (cli *grpcClient) CommitSync() (res types.Result) {
  270. reqres := cli.CommitAsync()
  271. if res := cli.checkErrGetResult(); res.IsErr() {
  272. return res
  273. }
  274. resp := reqres.Response.GetCommit()
  275. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  276. }
  277. func (cli *grpcClient) InitChainSync(validators []*types.Validator) (err error) {
  278. cli.InitChainAsync(validators)
  279. return cli.Error()
  280. }
  281. func (cli *grpcClient) BeginBlockSync(hash []byte, header *types.Header) (err error) {
  282. cli.BeginBlockAsync(hash, header)
  283. return cli.Error()
  284. }
  285. func (cli *grpcClient) EndBlockSync(height uint64) (resEndBlock types.ResponseEndBlock, err error) {
  286. reqres := cli.EndBlockAsync(height)
  287. if err := cli.Error(); err != nil {
  288. return resEndBlock, err
  289. }
  290. if blk := reqres.Response.GetEndBlock(); blk != nil {
  291. return *blk, nil
  292. }
  293. return resEndBlock, nil
  294. }