diff --git a/client/grpc_client.go b/client/grpc_client.go index 0d28eed55..3fbd000b0 100644 --- a/client/grpc_client.go +++ b/client/grpc_client.go @@ -13,9 +13,8 @@ import ( "github.com/tendermint/tmsp/types" ) -// This is goroutine-safe, but users should beware that -// the application in general is not meant to be interfaced -// with concurrent callers. +// A stripped copy of the remoteClient that makes +// synchronous calls using grpc type grpcClient struct { QuitService mustConnect bool @@ -42,15 +41,14 @@ func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) { return Connect(addr) } -func (cli *grpcClient) OnStart() (err error) { +func (cli *grpcClient) OnStart() error { cli.QuitService.OnStart() RETRY_LOOP: for { - conn, err_ := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) - if err_ != nil { + conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) + if err != nil { if cli.mustConnect { - err = err_ // OnStart() will return this. - return + return err } else { fmt.Printf("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr) time.Sleep(time.Second * 3) @@ -58,13 +56,13 @@ RETRY_LOOP: } } cli.client = types.NewTMSPApplicationClient(conn) - return + return nil } } func (cli *grpcClient) OnStop() { cli.QuitService.OnStop() - // TODO: close client (?) + // TODO: how to close when TMSPApplicationClient interface doesn't expose Close ? } // Set listener for all responses @@ -96,23 +94,6 @@ func (cli *grpcClient) Error() error { // maybe one day, if people really want it, we use grpc streams, // but hopefully not :D -func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes { - reqres := NewReqRes(req) - reqres.Response = res // Set response - reqres.Done() // Release waiters - - // Notify reqRes listener if set - if cb := reqres.GetCallback(); cb != nil { - cb(res) - } - - // Notify client listener if set - if cli.resCb != nil { - cli.resCb(reqres.Request, res) - } - return reqres -} - func (cli *grpcClient) EchoAsync(msg string) *ReqRes { req := types.ToRequestEcho(msg) res, err := cli.client.Echo(context.Background(), req.GetEcho()) @@ -212,6 +193,23 @@ func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes { return cli.finishAsyncCall(req, &types.Response{&types.Response_EndBlock{res}}) } +func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes { + reqres := NewReqRes(req) + reqres.Response = res // Set response + reqres.Done() // Release waiters + + // Notify reqRes listener if set + if cb := reqres.GetCallback(); cb != nil { + cb(res) + } + + // Notify client listener if set + if cli.resCb != nil { + cli.resCb(reqres.Request, res) + } + return reqres +} + //---------------------------------------- func (cli *grpcClient) EchoSync(msg string) (res types.Result) { diff --git a/client/remote_client.go b/client/remote_client.go index ae832716c..3c6803b7e 100644 --- a/client/remote_client.go +++ b/client/remote_client.go @@ -55,39 +55,27 @@ func NewSocketClient(addr string, mustConnect bool) (*remoteClient, error) { cli.QuitService = *NewQuitService(nil, "remoteClient", cli) _, err := cli.Start() // Just start it, it's confusing for callers to remember to start. return cli, err - if mustConnect { - return nil, err - } else { - return cli, nil - } } -func (cli *remoteClient) OnStart() (err error) { +func (cli *remoteClient) OnStart() error { cli.QuitService.OnStart() - doneCh := make(chan struct{}) - go func() { - RETRY_LOOP: - for { - conn, err_ := Connect(cli.addr) - if err_ != nil { - if cli.mustConnect { - err = err_ // OnStart() will return this. - close(doneCh) - return - } else { - fmt.Printf("tmsp.remoteClient failed to connect to %v. Retrying...\n", cli.addr) - time.Sleep(time.Second * 3) - continue RETRY_LOOP - } +RETRY_LOOP: + for { + conn, err := Connect(cli.addr) + if err != nil { + if cli.mustConnect { + return err + } else { + fmt.Printf("tmsp.remoteClient failed to connect to %v. Retrying...\n", cli.addr) + time.Sleep(time.Second * 3) + continue RETRY_LOOP } - go cli.sendValueRoutine(conn) - go cli.recvResponseRoutine(conn) - close(doneCh) // OnStart() will return no error. - return } - }() - <-doneCh - return // err + go cli.sendRequestsRoutine(conn) + go cli.recvResponseRoutine(conn) + return err + } + return nil // never happens } func (cli *remoteClient) OnStop() { @@ -123,7 +111,7 @@ func (cli *remoteClient) Error() error { //---------------------------------------- -func (cli *remoteClient) sendValueRoutine(conn net.Conn) { +func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) { w := bufio.NewWriter(conn) for { select { diff --git a/cmd/tmsp-cli/tmsp-cli.go b/cmd/tmsp-cli/tmsp-cli.go index 6e02166a5..373266efc 100644 --- a/cmd/tmsp-cli/tmsp-cli.go +++ b/cmd/tmsp-cli/tmsp-cli.go @@ -15,7 +15,7 @@ import ( "github.com/tendermint/tmsp/types" ) -// clientection is a global variable so it can be reused by the console +// client is a global variable so it can be reused by the console var client tmspcli.Client func main() { @@ -28,8 +28,6 @@ func main() { Value: "tcp://127.0.0.1:46658", Usage: "address of application socket", }, - } - app.Flags = []cli.Flag{ cli.StringFlag{ Name: "tmsp", Value: "socket", diff --git a/types/types.proto b/types/types.proto index 22a2da4fe..cb3d82c1a 100644 --- a/types/types.proto +++ b/types/types.proto @@ -217,8 +217,6 @@ message Validator { //---------------------------------------- // Service Definition -// NOTE: we may want to make CheckTx and AppendTx two way streams. -// we should be able to drop Flush for sync calls service TMSPApplication { rpc Echo(RequestEcho) returns (ResponseEcho) ; rpc Flush(RequestFlush) returns (ResponseFlush);