You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

296 lines
8.0 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package tmspcli
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "time"
  7. context "golang.org/x/net/context"
  8. grpc "google.golang.org/grpc"
  9. . "github.com/tendermint/go-common"
  10. "github.com/tendermint/tmsp/types"
  11. )
  12. // A stripped copy of the remoteClient that makes
  13. // synchronous calls using grpc
  14. type grpcClient struct {
  15. QuitService
  16. mustConnect bool
  17. client types.TMSPApplicationClient
  18. mtx sync.Mutex
  19. addr string
  20. err error
  21. resCb func(*types.Request, *types.Response) // listens to all callbacks
  22. }
  23. func NewGRPCClient(addr string, mustConnect bool) (*grpcClient, error) {
  24. cli := &grpcClient{
  25. addr: addr,
  26. mustConnect: mustConnect,
  27. }
  28. cli.QuitService = *NewQuitService(nil, "grpcClient", cli)
  29. _, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
  30. return cli, err
  31. }
  32. func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
  33. return Connect(addr)
  34. }
  35. func (cli *grpcClient) OnStart() error {
  36. cli.QuitService.OnStart()
  37. RETRY_LOOP:
  38. for {
  39. conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
  40. if err != nil {
  41. if cli.mustConnect {
  42. return err
  43. } else {
  44. fmt.Printf("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr)
  45. time.Sleep(time.Second * 3)
  46. continue RETRY_LOOP
  47. }
  48. }
  49. cli.client = types.NewTMSPApplicationClient(conn)
  50. return nil
  51. }
  52. }
  53. func (cli *grpcClient) OnStop() {
  54. cli.QuitService.OnStop()
  55. // TODO: how to close when TMSPApplicationClient interface doesn't expose Close ?
  56. }
  57. // Set listener for all responses
  58. // NOTE: callback may get internally generated flush responses.
  59. func (cli *grpcClient) SetResponseCallback(resCb Callback) {
  60. cli.mtx.Lock()
  61. defer cli.mtx.Unlock()
  62. cli.resCb = resCb
  63. }
  64. func (cli *grpcClient) StopForError(err error) {
  65. cli.mtx.Lock()
  66. fmt.Printf("Stopping tmsp.grpcClient for error: %v\n", err.Error())
  67. if cli.err == nil {
  68. cli.err = err
  69. }
  70. cli.mtx.Unlock()
  71. cli.Stop()
  72. }
  73. func (cli *grpcClient) Error() error {
  74. cli.mtx.Lock()
  75. defer cli.mtx.Unlock()
  76. return cli.err
  77. }
  78. //----------------------------------------
  79. // async calls are really sync.
  80. // maybe one day, if people really want it, we use grpc streams,
  81. // but hopefully not :D
  82. func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
  83. req := types.ToRequestEcho(msg)
  84. res, err := cli.client.Echo(context.Background(), req.GetEcho())
  85. if err != nil {
  86. cli.err = err
  87. }
  88. return cli.finishAsyncCall(req, &types.Response{&types.Response_Echo{res}})
  89. }
  90. func (cli *grpcClient) FlushAsync() *ReqRes {
  91. req := types.ToRequestFlush()
  92. res, err := cli.client.Flush(context.Background(), req.GetFlush())
  93. if err != nil {
  94. cli.err = err
  95. }
  96. return cli.finishAsyncCall(req, &types.Response{&types.Response_Flush{res}})
  97. }
  98. func (cli *grpcClient) InfoAsync() *ReqRes {
  99. req := types.ToRequestInfo()
  100. res, err := cli.client.Info(context.Background(), req.GetInfo())
  101. if err != nil {
  102. cli.err = err
  103. }
  104. return cli.finishAsyncCall(req, &types.Response{&types.Response_Info{res}})
  105. }
  106. func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes {
  107. req := types.ToRequestSetOption(key, value)
  108. res, err := cli.client.SetOption(context.Background(), req.GetSetOption())
  109. if err != nil {
  110. cli.err = err
  111. }
  112. return cli.finishAsyncCall(req, &types.Response{&types.Response_SetOption{res}})
  113. }
  114. func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes {
  115. req := types.ToRequestAppendTx(tx)
  116. res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx())
  117. if err != nil {
  118. cli.err = err
  119. }
  120. return cli.finishAsyncCall(req, &types.Response{&types.Response_AppendTx{res}})
  121. }
  122. func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes {
  123. req := types.ToRequestCheckTx(tx)
  124. res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx())
  125. if err != nil {
  126. cli.err = err
  127. }
  128. return cli.finishAsyncCall(req, &types.Response{&types.Response_CheckTx{res}})
  129. }
  130. func (cli *grpcClient) QueryAsync(query []byte) *ReqRes {
  131. req := types.ToRequestQuery(query)
  132. res, err := cli.client.Query(context.Background(), req.GetQuery())
  133. if err != nil {
  134. cli.err = err
  135. }
  136. return cli.finishAsyncCall(req, &types.Response{&types.Response_Query{res}})
  137. }
  138. func (cli *grpcClient) CommitAsync() *ReqRes {
  139. req := types.ToRequestCommit()
  140. res, err := cli.client.Commit(context.Background(), req.GetCommit())
  141. if err != nil {
  142. cli.err = err
  143. }
  144. return cli.finishAsyncCall(req, &types.Response{&types.Response_Commit{res}})
  145. }
  146. func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes {
  147. req := types.ToRequestInitChain(validators)
  148. res, err := cli.client.InitChain(context.Background(), req.GetInitChain())
  149. if err != nil {
  150. cli.err = err
  151. }
  152. return cli.finishAsyncCall(req, &types.Response{&types.Response_InitChain{res}})
  153. }
  154. func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes {
  155. req := types.ToRequestBeginBlock(height)
  156. res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock())
  157. if err != nil {
  158. cli.err = err
  159. }
  160. return cli.finishAsyncCall(req, &types.Response{&types.Response_BeginBlock{res}})
  161. }
  162. func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes {
  163. req := types.ToRequestEndBlock(height)
  164. res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock())
  165. if err != nil {
  166. cli.err = err
  167. }
  168. return cli.finishAsyncCall(req, &types.Response{&types.Response_EndBlock{res}})
  169. }
  170. func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
  171. reqres := NewReqRes(req)
  172. reqres.Response = res // Set response
  173. reqres.Done() // Release waiters
  174. // Notify reqRes listener if set
  175. if cb := reqres.GetCallback(); cb != nil {
  176. cb(res)
  177. }
  178. // Notify client listener if set
  179. if cli.resCb != nil {
  180. cli.resCb(reqres.Request, res)
  181. }
  182. return reqres
  183. }
  184. //----------------------------------------
  185. func (cli *grpcClient) EchoSync(msg string) (res types.Result) {
  186. r := cli.EchoAsync(msg).Response.GetEcho()
  187. return types.NewResultOK([]byte(r.Message), LOG)
  188. }
  189. func (cli *grpcClient) FlushSync() error {
  190. return nil
  191. }
  192. func (cli *grpcClient) InfoSync() (res types.Result) {
  193. r := cli.InfoAsync().Response.GetInfo()
  194. return types.NewResultOK([]byte(r.Info), LOG)
  195. }
  196. func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) {
  197. reqres := cli.SetOptionAsync(key, value)
  198. if cli.err != nil {
  199. return types.ErrInternalError.SetLog(cli.err.Error())
  200. }
  201. resp := reqres.Response.GetSetOption()
  202. return types.Result{Code: OK, Data: nil, Log: resp.Log}
  203. }
  204. func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) {
  205. reqres := cli.AppendTxAsync(tx)
  206. if cli.err != nil {
  207. return types.ErrInternalError.SetLog(cli.err.Error())
  208. }
  209. resp := reqres.Response.GetAppendTx()
  210. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  211. }
  212. func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) {
  213. reqres := cli.CheckTxAsync(tx)
  214. if cli.err != nil {
  215. return types.ErrInternalError.SetLog(cli.err.Error())
  216. }
  217. resp := reqres.Response.GetCheckTx()
  218. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  219. }
  220. func (cli *grpcClient) QuerySync(query []byte) (res types.Result) {
  221. reqres := cli.QueryAsync(query)
  222. if cli.err != nil {
  223. return types.ErrInternalError.SetLog(cli.err.Error())
  224. }
  225. resp := reqres.Response.GetQuery()
  226. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  227. }
  228. func (cli *grpcClient) CommitSync() (res types.Result) {
  229. reqres := cli.CommitAsync()
  230. if cli.err != nil {
  231. return types.ErrInternalError.SetLog(cli.err.Error())
  232. }
  233. resp := reqres.Response.GetCommit()
  234. return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
  235. }
  236. func (cli *grpcClient) InitChainSync(validators []*types.Validator) (err error) {
  237. cli.InitChainAsync(validators)
  238. if cli.err != nil {
  239. return cli.err
  240. }
  241. return nil
  242. }
  243. func (cli *grpcClient) BeginBlockSync(height uint64) (err error) {
  244. cli.BeginBlockAsync(height)
  245. if cli.err != nil {
  246. return cli.err
  247. }
  248. return nil
  249. }
  250. func (cli *grpcClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) {
  251. reqres := cli.EndBlockAsync(height)
  252. if cli.err != nil {
  253. return nil, cli.err
  254. }
  255. return reqres.Response.GetEndBlock().Diffs, nil
  256. }