Browse Source

some cleanup

pull/1780/head
Ethan Buchman 8 years ago
parent
commit
516a929e5f
4 changed files with 43 additions and 61 deletions
  1. +25
    -27
      client/grpc_client.go
  2. +17
    -29
      client/remote_client.go
  3. +1
    -3
      cmd/tmsp-cli/tmsp-cli.go
  4. +0
    -2
      types/types.proto

+ 25
- 27
client/grpc_client.go View File

@ -13,9 +13,8 @@ import (
"github.com/tendermint/tmsp/types"
)
// This is goroutine-safe, but users should beware that
// the application in general is not meant to be interfaced
// with concurrent callers.
// A stripped copy of the remoteClient that makes
// synchronous calls using grpc
type grpcClient struct {
QuitService
mustConnect bool
@ -42,15 +41,14 @@ func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
return Connect(addr)
}
func (cli *grpcClient) OnStart() (err error) {
func (cli *grpcClient) OnStart() error {
cli.QuitService.OnStart()
RETRY_LOOP:
for {
conn, err_ := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
if err_ != nil {
conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
if err != nil {
if cli.mustConnect {
err = err_ // OnStart() will return this.
return
return err
} else {
fmt.Printf("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr)
time.Sleep(time.Second * 3)
@ -58,13 +56,13 @@ RETRY_LOOP:
}
}
cli.client = types.NewTMSPApplicationClient(conn)
return
return nil
}
}
func (cli *grpcClient) OnStop() {
cli.QuitService.OnStop()
// TODO: close client (?)
// TODO: how to close when TMSPApplicationClient interface doesn't expose Close ?
}
// Set listener for all responses
@ -96,23 +94,6 @@ func (cli *grpcClient) Error() error {
// maybe one day, if people really want it, we use grpc streams,
// but hopefully not :D
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
reqres := NewReqRes(req)
reqres.Response = res // Set response
reqres.Done() // Release waiters
// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(res)
}
// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, res)
}
return reqres
}
func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
req := types.ToRequestEcho(msg)
res, err := cli.client.Echo(context.Background(), req.GetEcho())
@ -212,6 +193,23 @@ func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes {
return cli.finishAsyncCall(req, &types.Response{&types.Response_EndBlock{res}})
}
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
reqres := NewReqRes(req)
reqres.Response = res // Set response
reqres.Done() // Release waiters
// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(res)
}
// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, res)
}
return reqres
}
//----------------------------------------
func (cli *grpcClient) EchoSync(msg string) (res types.Result) {


+ 17
- 29
client/remote_client.go View File

@ -55,39 +55,27 @@ func NewSocketClient(addr string, mustConnect bool) (*remoteClient, error) {
cli.QuitService = *NewQuitService(nil, "remoteClient", cli)
_, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
return cli, err
if mustConnect {
return nil, err
} else {
return cli, nil
}
}
func (cli *remoteClient) OnStart() (err error) {
func (cli *remoteClient) OnStart() error {
cli.QuitService.OnStart()
doneCh := make(chan struct{})
go func() {
RETRY_LOOP:
for {
conn, err_ := Connect(cli.addr)
if err_ != nil {
if cli.mustConnect {
err = err_ // OnStart() will return this.
close(doneCh)
return
} else {
fmt.Printf("tmsp.remoteClient failed to connect to %v. Retrying...\n", cli.addr)
time.Sleep(time.Second * 3)
continue RETRY_LOOP
}
RETRY_LOOP:
for {
conn, err := Connect(cli.addr)
if err != nil {
if cli.mustConnect {
return err
} else {
fmt.Printf("tmsp.remoteClient failed to connect to %v. Retrying...\n", cli.addr)
time.Sleep(time.Second * 3)
continue RETRY_LOOP
}
go cli.sendValueRoutine(conn)
go cli.recvResponseRoutine(conn)
close(doneCh) // OnStart() will return no error.
return
}
}()
<-doneCh
return // err
go cli.sendRequestsRoutine(conn)
go cli.recvResponseRoutine(conn)
return err
}
return nil // never happens
}
func (cli *remoteClient) OnStop() {
@ -123,7 +111,7 @@ func (cli *remoteClient) Error() error {
//----------------------------------------
func (cli *remoteClient) sendValueRoutine(conn net.Conn) {
func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) {
w := bufio.NewWriter(conn)
for {
select {


+ 1
- 3
cmd/tmsp-cli/tmsp-cli.go View File

@ -15,7 +15,7 @@ import (
"github.com/tendermint/tmsp/types"
)
// clientection is a global variable so it can be reused by the console
// client is a global variable so it can be reused by the console
var client tmspcli.Client
func main() {
@ -28,8 +28,6 @@ func main() {
Value: "tcp://127.0.0.1:46658",
Usage: "address of application socket",
},
}
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "tmsp",
Value: "socket",


+ 0
- 2
types/types.proto View File

@ -217,8 +217,6 @@ message Validator {
//----------------------------------------
// Service Definition
// NOTE: we may want to make CheckTx and AppendTx two way streams.
// we should be able to drop Flush for sync calls
service TMSPApplication {
rpc Echo(RequestEcho) returns (ResponseEcho) ;
rpc Flush(RequestFlush) returns (ResponseFlush);


Loading…
Cancel
Save