|
|
@ -1,7 +1,7 @@ |
|
|
|
package tmspcli |
|
|
|
|
|
|
|
import ( |
|
|
|
"fmt" |
|
|
|
"errors" |
|
|
|
"net" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
@ -44,42 +44,57 @@ func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) { |
|
|
|
func (cli *grpcClient) OnStart() error { |
|
|
|
cli.QuitService.OnStart() |
|
|
|
RETRY_LOOP: |
|
|
|
|
|
|
|
for { |
|
|
|
conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) |
|
|
|
if err != nil { |
|
|
|
if cli.mustConnect { |
|
|
|
return err |
|
|
|
} else { |
|
|
|
fmt.Printf("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr) |
|
|
|
log.Warn(Fmt("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr)) |
|
|
|
time.Sleep(time.Second * 3) |
|
|
|
continue RETRY_LOOP |
|
|
|
} |
|
|
|
} |
|
|
|
cli.client = types.NewTMSPApplicationClient(conn) |
|
|
|
|
|
|
|
client := types.NewTMSPApplicationClient(conn) |
|
|
|
|
|
|
|
ENSURE_CONNECTED: |
|
|
|
for { |
|
|
|
_, err := client.Echo(context.Background(), &types.RequestEcho{"hello"}, grpc.FailFast(true)) |
|
|
|
if err == nil { |
|
|
|
break ENSURE_CONNECTED |
|
|
|
} |
|
|
|
time.Sleep(time.Second) |
|
|
|
} |
|
|
|
|
|
|
|
cli.client = client |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (cli *grpcClient) OnStop() { |
|
|
|
cli.QuitService.OnStop() |
|
|
|
// TODO: how to close when TMSPApplicationClient interface doesn't expose Close ?
|
|
|
|
} |
|
|
|
|
|
|
|
// Set listener for all responses
|
|
|
|
// NOTE: callback may get internally generated flush responses.
|
|
|
|
func (cli *grpcClient) SetResponseCallback(resCb Callback) { |
|
|
|
cli.mtx.Lock() |
|
|
|
defer cli.mtx.Unlock() |
|
|
|
cli.resCb = resCb |
|
|
|
// TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close()
|
|
|
|
/*if cli.conn != nil { |
|
|
|
cli.conn.Close() |
|
|
|
}*/ |
|
|
|
} |
|
|
|
|
|
|
|
func (cli *grpcClient) StopForError(err error) { |
|
|
|
cli.mtx.Lock() |
|
|
|
fmt.Printf("Stopping tmsp.grpcClient for error: %v\n", err.Error()) |
|
|
|
if !cli.IsRunning() { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
if cli.err == nil { |
|
|
|
cli.err = err |
|
|
|
} |
|
|
|
cli.mtx.Unlock() |
|
|
|
|
|
|
|
log.Warn(Fmt("Stopping tmsp.grpcClient for error: %v", err.Error())) |
|
|
|
cli.Stop() |
|
|
|
} |
|
|
|
|
|
|
@ -89,6 +104,14 @@ func (cli *grpcClient) Error() error { |
|
|
|
return cli.err |
|
|
|
} |
|
|
|
|
|
|
|
// Set listener for all responses
|
|
|
|
// NOTE: callback may get internally generated flush responses.
|
|
|
|
func (cli *grpcClient) SetResponseCallback(resCb Callback) { |
|
|
|
cli.mtx.Lock() |
|
|
|
defer cli.mtx.Unlock() |
|
|
|
cli.resCb = resCb |
|
|
|
} |
|
|
|
|
|
|
|
//----------------------------------------
|
|
|
|
// GRPC calls are synchronous, but some callbacks expect to be called asynchronously
|
|
|
|
// (eg. the mempool expects to be able to lock to remove bad txs from cache).
|
|
|
@ -99,7 +122,7 @@ func (cli *grpcClient) Error() error { |
|
|
|
|
|
|
|
func (cli *grpcClient) EchoAsync(msg string) *ReqRes { |
|
|
|
req := types.ToRequestEcho(msg) |
|
|
|
res, err := cli.client.Echo(context.Background(), req.GetEcho()) |
|
|
|
res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.FailFast(true)) |
|
|
|
if err != nil { |
|
|
|
cli.err = err |
|
|
|
} |
|
|
@ -108,7 +131,7 @@ func (cli *grpcClient) EchoAsync(msg string) *ReqRes { |
|
|
|
|
|
|
|
func (cli *grpcClient) FlushAsync() *ReqRes { |
|
|
|
req := types.ToRequestFlush() |
|
|
|
res, err := cli.client.Flush(context.Background(), req.GetFlush()) |
|
|
|
res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.FailFast(true)) |
|
|
|
if err != nil { |
|
|
|
cli.err = err |
|
|
|
} |
|
|
@ -117,7 +140,7 @@ func (cli *grpcClient) FlushAsync() *ReqRes { |
|
|
|
|
|
|
|
func (cli *grpcClient) InfoAsync() *ReqRes { |
|
|
|
req := types.ToRequestInfo() |
|
|
|
res, err := cli.client.Info(context.Background(), req.GetInfo()) |
|
|
|
res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.FailFast(true)) |
|
|
|
if err != nil { |
|
|
|
cli.err = err |
|
|
|
} |
|
|
@ -126,7 +149,7 @@ func (cli *grpcClient) InfoAsync() *ReqRes { |
|
|
|
|
|
|
|
func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes { |
|
|
|
req := types.ToRequestSetOption(key, value) |
|
|
|
res, err := cli.client.SetOption(context.Background(), req.GetSetOption()) |
|
|
|
res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.FailFast(true)) |
|
|
|
if err != nil { |
|
|
|
cli.err = err |
|
|
|
} |
|
|
@ -135,7 +158,7 @@ func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes { |
|
|
|
|
|
|
|
func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes { |
|
|
|
req := types.ToRequestAppendTx(tx) |
|
|
|
res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx()) |
|
|
|
res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx(), grpc.FailFast(true)) |
|
|
|
if err != nil { |
|
|
|
cli.err = err |
|
|
|
} |
|
|
@ -144,7 +167,7 @@ func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes { |
|
|
|
|
|
|
|
func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes { |
|
|
|
req := types.ToRequestCheckTx(tx) |
|
|
|
res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx()) |
|
|
|
res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true)) |
|
|
|
if err != nil { |
|
|
|
cli.err = err |
|
|
|
} |
|
|
@ -153,7 +176,7 @@ func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes { |
|
|
|
|
|
|
|
func (cli *grpcClient) QueryAsync(query []byte) *ReqRes { |
|
|
|
req := types.ToRequestQuery(query) |
|
|
|
res, err := cli.client.Query(context.Background(), req.GetQuery()) |
|
|
|
res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true)) |
|
|
|
if err != nil { |
|
|
|
cli.err = err |
|
|
|
} |
|
|
@ -162,7 +185,7 @@ func (cli *grpcClient) QueryAsync(query []byte) *ReqRes { |
|
|
|
|
|
|
|
func (cli *grpcClient) CommitAsync() *ReqRes { |
|
|
|
req := types.ToRequestCommit() |
|
|
|
res, err := cli.client.Commit(context.Background(), req.GetCommit()) |
|
|
|
res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.FailFast(true)) |
|
|
|
if err != nil { |
|
|
|
cli.err = err |
|
|
|
} |
|
|
@ -171,7 +194,7 @@ func (cli *grpcClient) CommitAsync() *ReqRes { |
|
|
|
|
|
|
|
func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes { |
|
|
|
req := types.ToRequestInitChain(validators) |
|
|
|
res, err := cli.client.InitChain(context.Background(), req.GetInitChain()) |
|
|
|
res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.FailFast(true)) |
|
|
|
if err != nil { |
|
|
|
cli.err = err |
|
|
|
} |
|
|
@ -180,7 +203,7 @@ func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes { |
|
|
|
|
|
|
|
func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes { |
|
|
|
req := types.ToRequestBeginBlock(height) |
|
|
|
res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock()) |
|
|
|
res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.FailFast(true)) |
|
|
|
if err != nil { |
|
|
|
cli.err = err |
|
|
|
} |
|
|
@ -189,7 +212,7 @@ func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes { |
|
|
|
|
|
|
|
func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes { |
|
|
|
req := types.ToRequestEndBlock(height) |
|
|
|
res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock()) |
|
|
|
res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.FailFast(true)) |
|
|
|
if err != nil { |
|
|
|
cli.err = err |
|
|
|
} |
|
|
@ -217,11 +240,31 @@ func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) |
|
|
|
return reqres |
|
|
|
} |
|
|
|
|
|
|
|
func (cli *grpcClient) checkErrGetResult() types.Result { |
|
|
|
if cli.err != nil { |
|
|
|
cli.StopForError(cli.err) |
|
|
|
return types.ErrInternalError.SetLog(cli.err.Error()) |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (cli *grpcClient) checkGetErr() error { |
|
|
|
if cli.err != nil { |
|
|
|
cli.StopForError(cli.err) |
|
|
|
return cli.err |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
//----------------------------------------
|
|
|
|
|
|
|
|
func (cli *grpcClient) EchoSync(msg string) (res types.Result) { |
|
|
|
r := cli.EchoAsync(msg).Response.GetEcho() |
|
|
|
return types.NewResultOK([]byte(r.Message), LOG) |
|
|
|
reqres := cli.EchoAsync(msg) |
|
|
|
if res := cli.checkErrGetResult(); res.IsErr() { |
|
|
|
return res |
|
|
|
} |
|
|
|
resp := reqres.Response.GetEcho() |
|
|
|
return types.NewResultOK([]byte(resp.Message), LOG) |
|
|
|
} |
|
|
|
|
|
|
|
func (cli *grpcClient) FlushSync() error { |
|
|
@ -229,14 +272,18 @@ func (cli *grpcClient) FlushSync() error { |
|
|
|
} |
|
|
|
|
|
|
|
func (cli *grpcClient) InfoSync() (res types.Result) { |
|
|
|
r := cli.InfoAsync().Response.GetInfo() |
|
|
|
return types.NewResultOK([]byte(r.Info), LOG) |
|
|
|
reqres := cli.InfoAsync() |
|
|
|
if res := cli.checkErrGetResult(); res.IsErr() { |
|
|
|
return res |
|
|
|
} |
|
|
|
resp := reqres.Response.GetInfo() |
|
|
|
return types.NewResultOK([]byte(resp.Info), LOG) |
|
|
|
} |
|
|
|
|
|
|
|
func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) { |
|
|
|
reqres := cli.SetOptionAsync(key, value) |
|
|
|
if cli.err != nil { |
|
|
|
return types.ErrInternalError.SetLog(cli.err.Error()) |
|
|
|
if res := cli.checkErrGetResult(); res.IsErr() { |
|
|
|
return res |
|
|
|
} |
|
|
|
resp := reqres.Response.GetSetOption() |
|
|
|
return types.Result{Code: OK, Data: nil, Log: resp.Log} |
|
|
@ -244,8 +291,8 @@ func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result |
|
|
|
|
|
|
|
func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) { |
|
|
|
reqres := cli.AppendTxAsync(tx) |
|
|
|
if cli.err != nil { |
|
|
|
return types.ErrInternalError.SetLog(cli.err.Error()) |
|
|
|
if res := cli.checkErrGetResult(); res.IsErr() { |
|
|
|
return res |
|
|
|
} |
|
|
|
resp := reqres.Response.GetAppendTx() |
|
|
|
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} |
|
|
@ -253,8 +300,8 @@ func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) { |
|
|
|
|
|
|
|
func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) { |
|
|
|
reqres := cli.CheckTxAsync(tx) |
|
|
|
if cli.err != nil { |
|
|
|
return types.ErrInternalError.SetLog(cli.err.Error()) |
|
|
|
if res := cli.checkErrGetResult(); res.IsErr() { |
|
|
|
return res |
|
|
|
} |
|
|
|
resp := reqres.Response.GetCheckTx() |
|
|
|
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} |
|
|
@ -262,8 +309,8 @@ func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) { |
|
|
|
|
|
|
|
func (cli *grpcClient) QuerySync(query []byte) (res types.Result) { |
|
|
|
reqres := cli.QueryAsync(query) |
|
|
|
if cli.err != nil { |
|
|
|
return types.ErrInternalError.SetLog(cli.err.Error()) |
|
|
|
if res := cli.checkErrGetResult(); res.IsErr() { |
|
|
|
return res |
|
|
|
} |
|
|
|
resp := reqres.Response.GetQuery() |
|
|
|
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} |
|
|
@ -271,8 +318,8 @@ func (cli *grpcClient) QuerySync(query []byte) (res types.Result) { |
|
|
|
|
|
|
|
func (cli *grpcClient) CommitSync() (res types.Result) { |
|
|
|
reqres := cli.CommitAsync() |
|
|
|
if cli.err != nil { |
|
|
|
return types.ErrInternalError.SetLog(cli.err.Error()) |
|
|
|
if res := cli.checkErrGetResult(); res.IsErr() { |
|
|
|
return res |
|
|
|
} |
|
|
|
resp := reqres.Response.GetCommit() |
|
|
|
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} |
|
|
@ -280,24 +327,24 @@ func (cli *grpcClient) CommitSync() (res types.Result) { |
|
|
|
|
|
|
|
func (cli *grpcClient) InitChainSync(validators []*types.Validator) (err error) { |
|
|
|
cli.InitChainAsync(validators) |
|
|
|
if cli.err != nil { |
|
|
|
return cli.err |
|
|
|
if err := cli.checkGetErr(); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (cli *grpcClient) BeginBlockSync(height uint64) (err error) { |
|
|
|
cli.BeginBlockAsync(height) |
|
|
|
if cli.err != nil { |
|
|
|
return cli.err |
|
|
|
if err := cli.checkGetErr(); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (cli *grpcClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) { |
|
|
|
reqres := cli.EndBlockAsync(height) |
|
|
|
if cli.err != nil { |
|
|
|
return nil, cli.err |
|
|
|
if err := cli.checkGetErr(); err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
return reqres.Response.GetEndBlock().Diffs, nil |
|
|
|
} |