From d3bdb49aae984a3866a72e5980c932f13de63b75 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 10 Aug 2016 17:49:15 -0400 Subject: [PATCH 1/4] remote_client -> socket_client; use logger --- client/grpc_client.go | 5 +- client/log.go | 7 ++ client/{remote_client.go => socket_client.go} | 82 ++++++++++--------- 3 files changed, 51 insertions(+), 43 deletions(-) create mode 100644 client/log.go rename client/{remote_client.go => socket_client.go} (77%) diff --git a/client/grpc_client.go b/client/grpc_client.go index 813be658a..745de2f82 100644 --- a/client/grpc_client.go +++ b/client/grpc_client.go @@ -1,7 +1,6 @@ package tmspcli import ( - "fmt" "net" "sync" "time" @@ -50,7 +49,7 @@ RETRY_LOOP: if cli.mustConnect { return err } else { - fmt.Printf("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr) + log.Warn(Fmt("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr)) time.Sleep(time.Second * 3) continue RETRY_LOOP } @@ -75,7 +74,7 @@ func (cli *grpcClient) SetResponseCallback(resCb Callback) { func (cli *grpcClient) StopForError(err error) { cli.mtx.Lock() - fmt.Printf("Stopping tmsp.grpcClient for error: %v\n", err.Error()) + log.Warn(Fmt("Stopping tmsp.grpcClient for error: %v\n", err.Error())) if cli.err == nil { cli.err = err } diff --git a/client/log.go b/client/log.go new file mode 100644 index 000000000..deadf6c34 --- /dev/null +++ b/client/log.go @@ -0,0 +1,7 @@ +package tmspcli + +import ( + "github.com/tendermint/go-logger" +) + +var log = logger.New("module", "tmspcli") diff --git a/client/remote_client.go b/client/socket_client.go similarity index 77% rename from client/remote_client.go rename to client/socket_client.go index 3c6803b7e..2b89b294f 100644 --- a/client/remote_client.go +++ b/client/socket_client.go @@ -26,7 +26,7 @@ const flushThrottleMS = 20 // Don't wait longer than... // This is goroutine-safe, but users should beware that // the application in general is not meant to be interfaced // with concurrent callers. -type remoteClient struct { +type socketClient struct { QuitService sync.Mutex // [EB]: is this even used? @@ -42,22 +42,22 @@ type remoteClient struct { resCb func(*types.Request, *types.Response) // listens to all callbacks } -func NewSocketClient(addr string, mustConnect bool) (*remoteClient, error) { - cli := &remoteClient{ +func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) { + cli := &socketClient{ reqQueue: make(chan *ReqRes, reqQueueSize), - flushTimer: NewThrottleTimer("remoteClient", flushThrottleMS), + flushTimer: NewThrottleTimer("socketClient", flushThrottleMS), mustConnect: mustConnect, addr: addr, reqSent: list.New(), resCb: nil, } - cli.QuitService = *NewQuitService(nil, "remoteClient", cli) + cli.QuitService = *NewQuitService(nil, "socketClient", cli) _, err := cli.Start() // Just start it, it's confusing for callers to remember to start. return cli, err } -func (cli *remoteClient) OnStart() error { +func (cli *socketClient) OnStart() error { cli.QuitService.OnStart() RETRY_LOOP: for { @@ -66,7 +66,7 @@ RETRY_LOOP: if cli.mustConnect { return err } else { - fmt.Printf("tmsp.remoteClient failed to connect to %v. Retrying...\n", cli.addr) + log.Warn(Fmt("tmsp.socketClient failed to connect to %v. Retrying...\n", cli.addr)) time.Sleep(time.Second * 3) continue RETRY_LOOP } @@ -78,7 +78,7 @@ RETRY_LOOP: return nil // never happens } -func (cli *remoteClient) OnStop() { +func (cli *socketClient) OnStop() { cli.QuitService.OnStop() if cli.conn != nil { cli.conn.Close() @@ -87,15 +87,15 @@ func (cli *remoteClient) OnStop() { // Set listener for all responses // NOTE: callback may get internally generated flush responses. -func (cli *remoteClient) SetResponseCallback(resCb Callback) { +func (cli *socketClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() defer cli.mtx.Unlock() cli.resCb = resCb } -func (cli *remoteClient) StopForError(err error) { +func (cli *socketClient) StopForError(err error) { cli.mtx.Lock() - fmt.Printf("Stopping tmsp.remoteClient for error: %v\n", err.Error()) + log.Warn(Fmt("Stopping tmsp.socketClient for error: %v\n", err.Error())) if cli.err == nil { cli.err = err } @@ -103,7 +103,7 @@ func (cli *remoteClient) StopForError(err error) { cli.Stop() } -func (cli *remoteClient) Error() error { +func (cli *socketClient) Error() error { cli.mtx.Lock() defer cli.mtx.Unlock() return cli.err @@ -111,13 +111,13 @@ func (cli *remoteClient) Error() error { //---------------------------------------- -func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) { +func (cli *socketClient) sendRequestsRoutine(conn net.Conn) { w := bufio.NewWriter(conn) for { select { case <-cli.flushTimer.Ch: select { - case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): // cant this block ? + case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): default: // Probably will fill the buffer, or retry later. } @@ -142,7 +142,7 @@ func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) { } } -func (cli *remoteClient) recvResponseRoutine(conn net.Conn) { +func (cli *socketClient) recvResponseRoutine(conn net.Conn) { r := bufio.NewReader(conn) // Buffer reads for { var res = &types.Response{} @@ -165,13 +165,13 @@ func (cli *remoteClient) recvResponseRoutine(conn net.Conn) { } } -func (cli *remoteClient) willSendReq(reqres *ReqRes) { +func (cli *socketClient) willSendReq(reqres *ReqRes) { cli.mtx.Lock() defer cli.mtx.Unlock() cli.reqSent.PushBack(reqres) } -func (cli *remoteClient) didRecvResponse(res *types.Response) error { +func (cli *socketClient) didRecvResponse(res *types.Response) error { cli.mtx.Lock() defer cli.mtx.Unlock() @@ -205,53 +205,53 @@ func (cli *remoteClient) didRecvResponse(res *types.Response) error { //---------------------------------------- -func (cli *remoteClient) EchoAsync(msg string) *ReqRes { +func (cli *socketClient) EchoAsync(msg string) *ReqRes { return cli.queueRequest(types.ToRequestEcho(msg)) } -func (cli *remoteClient) FlushAsync() *ReqRes { +func (cli *socketClient) FlushAsync() *ReqRes { return cli.queueRequest(types.ToRequestFlush()) } -func (cli *remoteClient) InfoAsync() *ReqRes { +func (cli *socketClient) InfoAsync() *ReqRes { return cli.queueRequest(types.ToRequestInfo()) } -func (cli *remoteClient) SetOptionAsync(key string, value string) *ReqRes { +func (cli *socketClient) SetOptionAsync(key string, value string) *ReqRes { return cli.queueRequest(types.ToRequestSetOption(key, value)) } -func (cli *remoteClient) AppendTxAsync(tx []byte) *ReqRes { +func (cli *socketClient) AppendTxAsync(tx []byte) *ReqRes { return cli.queueRequest(types.ToRequestAppendTx(tx)) } -func (cli *remoteClient) CheckTxAsync(tx []byte) *ReqRes { +func (cli *socketClient) CheckTxAsync(tx []byte) *ReqRes { return cli.queueRequest(types.ToRequestCheckTx(tx)) } -func (cli *remoteClient) QueryAsync(query []byte) *ReqRes { +func (cli *socketClient) QueryAsync(query []byte) *ReqRes { return cli.queueRequest(types.ToRequestQuery(query)) } -func (cli *remoteClient) CommitAsync() *ReqRes { +func (cli *socketClient) CommitAsync() *ReqRes { return cli.queueRequest(types.ToRequestCommit()) } -func (cli *remoteClient) InitChainAsync(validators []*types.Validator) *ReqRes { +func (cli *socketClient) InitChainAsync(validators []*types.Validator) *ReqRes { return cli.queueRequest(types.ToRequestInitChain(validators)) } -func (cli *remoteClient) BeginBlockAsync(height uint64) *ReqRes { +func (cli *socketClient) BeginBlockAsync(height uint64) *ReqRes { return cli.queueRequest(types.ToRequestBeginBlock(height)) } -func (cli *remoteClient) EndBlockAsync(height uint64) *ReqRes { +func (cli *socketClient) EndBlockAsync(height uint64) *ReqRes { return cli.queueRequest(types.ToRequestEndBlock(height)) } //---------------------------------------- -func (cli *remoteClient) EchoSync(msg string) (res types.Result) { +func (cli *socketClient) EchoSync(msg string) (res types.Result) { reqres := cli.queueRequest(types.ToRequestEcho(msg)) cli.FlushSync() if cli.err != nil { @@ -261,12 +261,14 @@ func (cli *remoteClient) EchoSync(msg string) (res types.Result) { return types.Result{Code: OK, Data: []byte(resp.Message), Log: LOG} } -func (cli *remoteClient) FlushSync() error { +func (cli *socketClient) FlushSync() error { + log.Warn("FlushSync") cli.queueRequest(types.ToRequestFlush()).Wait() + log.Warn("Done FlushSync") return cli.err } -func (cli *remoteClient) InfoSync() (res types.Result) { +func (cli *socketClient) InfoSync() (res types.Result) { reqres := cli.queueRequest(types.ToRequestInfo()) cli.FlushSync() if cli.err != nil { @@ -276,7 +278,7 @@ func (cli *remoteClient) InfoSync() (res types.Result) { return types.Result{Code: OK, Data: []byte(resp.Info), Log: LOG} } -func (cli *remoteClient) SetOptionSync(key string, value string) (res types.Result) { +func (cli *socketClient) SetOptionSync(key string, value string) (res types.Result) { reqres := cli.queueRequest(types.ToRequestSetOption(key, value)) cli.FlushSync() if cli.err != nil { @@ -286,7 +288,7 @@ func (cli *remoteClient) SetOptionSync(key string, value string) (res types.Resu return types.Result{Code: OK, Data: nil, Log: resp.Log} } -func (cli *remoteClient) AppendTxSync(tx []byte) (res types.Result) { +func (cli *socketClient) AppendTxSync(tx []byte) (res types.Result) { reqres := cli.queueRequest(types.ToRequestAppendTx(tx)) cli.FlushSync() if cli.err != nil { @@ -296,7 +298,7 @@ func (cli *remoteClient) AppendTxSync(tx []byte) (res types.Result) { return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} } -func (cli *remoteClient) CheckTxSync(tx []byte) (res types.Result) { +func (cli *socketClient) CheckTxSync(tx []byte) (res types.Result) { reqres := cli.queueRequest(types.ToRequestCheckTx(tx)) cli.FlushSync() if cli.err != nil { @@ -306,7 +308,7 @@ func (cli *remoteClient) CheckTxSync(tx []byte) (res types.Result) { return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} } -func (cli *remoteClient) QuerySync(query []byte) (res types.Result) { +func (cli *socketClient) QuerySync(query []byte) (res types.Result) { reqres := cli.queueRequest(types.ToRequestQuery(query)) cli.FlushSync() if cli.err != nil { @@ -316,7 +318,7 @@ func (cli *remoteClient) QuerySync(query []byte) (res types.Result) { return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} } -func (cli *remoteClient) CommitSync() (res types.Result) { +func (cli *socketClient) CommitSync() (res types.Result) { reqres := cli.queueRequest(types.ToRequestCommit()) cli.FlushSync() if cli.err != nil { @@ -326,7 +328,7 @@ func (cli *remoteClient) CommitSync() (res types.Result) { return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} } -func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error) { +func (cli *socketClient) InitChainSync(validators []*types.Validator) (err error) { cli.queueRequest(types.ToRequestInitChain(validators)) cli.FlushSync() if cli.err != nil { @@ -335,7 +337,7 @@ func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error return nil } -func (cli *remoteClient) BeginBlockSync(height uint64) (err error) { +func (cli *socketClient) BeginBlockSync(height uint64) (err error) { cli.queueRequest(types.ToRequestBeginBlock(height)) cli.FlushSync() if cli.err != nil { @@ -344,7 +346,7 @@ func (cli *remoteClient) BeginBlockSync(height uint64) (err error) { return nil } -func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) { +func (cli *socketClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) { reqres := cli.queueRequest(types.ToRequestEndBlock(height)) cli.FlushSync() if cli.err != nil { @@ -355,7 +357,7 @@ func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Valida //---------------------------------------- -func (cli *remoteClient) queueRequest(req *types.Request) *ReqRes { +func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { reqres := NewReqRes(req) // TODO: set cli.err if reqQueue times out From a8066f9c8200203e64913ccd5c7bc67f3e9c683d Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 10 Aug 2016 17:58:11 -0400 Subject: [PATCH 2/4] return error if not running --- client/socket_client.go | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/client/socket_client.go b/client/socket_client.go index 2b89b294f..ea1e7d6ec 100644 --- a/client/socket_client.go +++ b/client/socket_client.go @@ -83,6 +83,19 @@ func (cli *socketClient) OnStop() { if cli.conn != nil { cli.conn.Close() } + cli.flushQueue() +} + +func (cli *socketClient) flushQueue() { +LOOP: + for { + select { + case reqres := <-cli.reqQueue: + reqres.Done() + default: + break LOOP + } + } } // Set listener for all responses @@ -94,6 +107,10 @@ func (cli *socketClient) SetResponseCallback(resCb Callback) { } func (cli *socketClient) StopForError(err error) { + if !cli.IsRunning() { + return + } + cli.mtx.Lock() log.Warn(Fmt("Stopping tmsp.socketClient for error: %v\n", err.Error())) if cli.err == nil { @@ -262,9 +279,12 @@ func (cli *socketClient) EchoSync(msg string) (res types.Result) { } func (cli *socketClient) FlushSync() error { - log.Warn("FlushSync") - cli.queueRequest(types.ToRequestFlush()).Wait() - log.Warn("Done FlushSync") + reqRes := cli.queueRequest(types.ToRequestFlush()) + if reqRes == nil { + return fmt.Errorf("Remote app is not running") + + } + reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here return cli.err } @@ -358,6 +378,10 @@ func (cli *socketClient) EndBlockSync(height uint64) (validators []*types.Valida //---------------------------------------- func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { + if !cli.IsRunning() { + return nil + } + reqres := NewReqRes(req) // TODO: set cli.err if reqQueue times out From 1b13f14e08e0058989b5b602e0c7e52777ad4455 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 10 Aug 2016 18:29:46 -0400 Subject: [PATCH 3/4] consolidate example tests; grpc fail fast --- client/client.go | 4 +- client/grpc_client.go | 132 +++++++++++++++++++++---------- client/local_client.go | 13 ++-- client/socket_client.go | 82 +++++++++++--------- example/dummy/dummy_test.go | 93 ---------------------- example/example_test.go | 151 ++++++++++++++++++++++++++++++++++++ example/nil/nil_test.go | 148 ----------------------------------- server/grpc_server.go | 9 ++- server/log.go | 7 ++ server/socket_server.go | 60 +++++++++++--- tests/test_cli/test.sh | 2 +- 11 files changed, 362 insertions(+), 339 deletions(-) delete mode 100644 example/dummy/dummy_test.go create mode 100644 example/example_test.go delete mode 100644 example/nil/nil_test.go create mode 100644 server/log.go diff --git a/client/client.go b/client/client.go index 3afb838f8..291c4863a 100644 --- a/client/client.go +++ b/client/client.go @@ -4,13 +4,15 @@ import ( "fmt" "sync" + . "github.com/tendermint/go-common" "github.com/tendermint/tmsp/types" ) type Client interface { + Service + SetResponseCallback(Callback) Error() error - Stop() bool FlushAsync() *ReqRes EchoAsync(msg string) *ReqRes diff --git a/client/grpc_client.go b/client/grpc_client.go index 745de2f82..3e44dcc11 100644 --- a/client/grpc_client.go +++ b/client/grpc_client.go @@ -1,6 +1,7 @@ package tmspcli import ( + "errors" "net" "sync" "time" @@ -43,6 +44,7 @@ func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) { func (cli *grpcClient) OnStart() error { cli.QuitService.OnStart() RETRY_LOOP: + for { conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) if err != nil { @@ -54,31 +56,45 @@ RETRY_LOOP: continue RETRY_LOOP } } - cli.client = types.NewTMSPApplicationClient(conn) + + client := types.NewTMSPApplicationClient(conn) + + ENSURE_CONNECTED: + for { + _, err := client.Echo(context.Background(), &types.RequestEcho{"hello"}, grpc.FailFast(true)) + if err == nil { + break ENSURE_CONNECTED + } + time.Sleep(time.Second) + } + + cli.client = client return nil } } func (cli *grpcClient) OnStop() { cli.QuitService.OnStop() - // TODO: how to close when TMSPApplicationClient interface doesn't expose Close ? -} - -// Set listener for all responses -// NOTE: callback may get internally generated flush responses. -func (cli *grpcClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() defer cli.mtx.Unlock() - cli.resCb = resCb + // TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close() + /*if cli.conn != nil { + cli.conn.Close() + }*/ } func (cli *grpcClient) StopForError(err error) { cli.mtx.Lock() - log.Warn(Fmt("Stopping tmsp.grpcClient for error: %v\n", err.Error())) + if !cli.IsRunning() { + return + } + if cli.err == nil { cli.err = err } cli.mtx.Unlock() + + log.Warn(Fmt("Stopping tmsp.grpcClient for error: %v", err.Error())) cli.Stop() } @@ -88,6 +104,14 @@ func (cli *grpcClient) Error() error { return cli.err } +// Set listener for all responses +// NOTE: callback may get internally generated flush responses. +func (cli *grpcClient) SetResponseCallback(resCb Callback) { + cli.mtx.Lock() + defer cli.mtx.Unlock() + cli.resCb = resCb +} + //---------------------------------------- // GRPC calls are synchronous, but some callbacks expect to be called asynchronously // (eg. the mempool expects to be able to lock to remove bad txs from cache). @@ -98,7 +122,7 @@ func (cli *grpcClient) Error() error { func (cli *grpcClient) EchoAsync(msg string) *ReqRes { req := types.ToRequestEcho(msg) - res, err := cli.client.Echo(context.Background(), req.GetEcho()) + res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -107,7 +131,7 @@ func (cli *grpcClient) EchoAsync(msg string) *ReqRes { func (cli *grpcClient) FlushAsync() *ReqRes { req := types.ToRequestFlush() - res, err := cli.client.Flush(context.Background(), req.GetFlush()) + res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -116,7 +140,7 @@ func (cli *grpcClient) FlushAsync() *ReqRes { func (cli *grpcClient) InfoAsync() *ReqRes { req := types.ToRequestInfo() - res, err := cli.client.Info(context.Background(), req.GetInfo()) + res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -125,7 +149,7 @@ func (cli *grpcClient) InfoAsync() *ReqRes { func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes { req := types.ToRequestSetOption(key, value) - res, err := cli.client.SetOption(context.Background(), req.GetSetOption()) + res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -134,7 +158,7 @@ func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes { func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes { req := types.ToRequestAppendTx(tx) - res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx()) + res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -143,7 +167,7 @@ func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes { func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes { req := types.ToRequestCheckTx(tx) - res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx()) + res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -152,7 +176,7 @@ func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes { func (cli *grpcClient) QueryAsync(query []byte) *ReqRes { req := types.ToRequestQuery(query) - res, err := cli.client.Query(context.Background(), req.GetQuery()) + res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -161,7 +185,7 @@ func (cli *grpcClient) QueryAsync(query []byte) *ReqRes { func (cli *grpcClient) CommitAsync() *ReqRes { req := types.ToRequestCommit() - res, err := cli.client.Commit(context.Background(), req.GetCommit()) + res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -170,7 +194,7 @@ func (cli *grpcClient) CommitAsync() *ReqRes { func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes { req := types.ToRequestInitChain(validators) - res, err := cli.client.InitChain(context.Background(), req.GetInitChain()) + res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -179,7 +203,7 @@ func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes { func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes { req := types.ToRequestBeginBlock(height) - res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock()) + res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -188,7 +212,7 @@ func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes { func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes { req := types.ToRequestEndBlock(height) - res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock()) + res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -216,11 +240,35 @@ func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) return reqres } +func (cli *grpcClient) checkErrGetResult() *types.Result { + if cli.err != nil { + errorLog := cli.err.Error() + cli.StopForError(cli.err) + result := types.ErrInternalError + result.SetLog(errorLog) + return &result + } + return nil +} + +func (cli *grpcClient) checkGetErr() error { + if cli.err != nil { + err := errors.New(cli.err.Error()) + cli.StopForError(cli.err) + return err + } + return nil +} + //---------------------------------------- func (cli *grpcClient) EchoSync(msg string) (res types.Result) { - r := cli.EchoAsync(msg).Response.GetEcho() - return types.NewResultOK([]byte(r.Message), LOG) + reqres := cli.EchoAsync(msg) + if res := cli.checkErrGetResult(); res != nil { + return *res + } + resp := reqres.Response.GetEcho() + return types.NewResultOK([]byte(resp.Message), LOG) } func (cli *grpcClient) FlushSync() error { @@ -228,14 +276,18 @@ func (cli *grpcClient) FlushSync() error { } func (cli *grpcClient) InfoSync() (res types.Result) { - r := cli.InfoAsync().Response.GetInfo() - return types.NewResultOK([]byte(r.Info), LOG) + reqres := cli.InfoAsync() + if res := cli.checkErrGetResult(); res != nil { + return *res + } + resp := reqres.Response.GetInfo() + return types.NewResultOK([]byte(resp.Info), LOG) } func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) { reqres := cli.SetOptionAsync(key, value) - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res != nil { + return *res } resp := reqres.Response.GetSetOption() return types.Result{Code: OK, Data: nil, Log: resp.Log} @@ -243,8 +295,8 @@ func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) { reqres := cli.AppendTxAsync(tx) - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res != nil { + return *res } resp := reqres.Response.GetAppendTx() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -252,8 +304,8 @@ func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) { func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) { reqres := cli.CheckTxAsync(tx) - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res != nil { + return *res } resp := reqres.Response.GetCheckTx() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -261,8 +313,8 @@ func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) { func (cli *grpcClient) QuerySync(query []byte) (res types.Result) { reqres := cli.QueryAsync(query) - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res != nil { + return *res } resp := reqres.Response.GetQuery() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -270,8 +322,8 @@ func (cli *grpcClient) QuerySync(query []byte) (res types.Result) { func (cli *grpcClient) CommitSync() (res types.Result) { reqres := cli.CommitAsync() - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res != nil { + return *res } resp := reqres.Response.GetCommit() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -279,24 +331,24 @@ func (cli *grpcClient) CommitSync() (res types.Result) { func (cli *grpcClient) InitChainSync(validators []*types.Validator) (err error) { cli.InitChainAsync(validators) - if cli.err != nil { - return cli.err + if err := cli.checkGetErr(); err != nil { + return err } return nil } func (cli *grpcClient) BeginBlockSync(height uint64) (err error) { cli.BeginBlockAsync(height) - if cli.err != nil { - return cli.err + if err := cli.checkGetErr(); err != nil { + return err } return nil } func (cli *grpcClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) { reqres := cli.EndBlockAsync(height) - if cli.err != nil { - return nil, cli.err + if err := cli.checkGetErr(); err != nil { + return nil, err } return reqres.Response.GetEndBlock().Diffs, nil } diff --git a/client/local_client.go b/client/local_client.go index 06db859b4..eac5661b8 100644 --- a/client/local_client.go +++ b/client/local_client.go @@ -1,11 +1,14 @@ package tmspcli import ( - types "github.com/tendermint/tmsp/types" "sync" + + . "github.com/tendermint/go-common" + types "github.com/tendermint/tmsp/types" ) type localClient struct { + *BaseService mtx *sync.Mutex types.Application Callback @@ -15,10 +18,12 @@ func NewLocalClient(mtx *sync.Mutex, app types.Application) *localClient { if mtx == nil { mtx = new(sync.Mutex) } - return &localClient{ + cli := &localClient{ mtx: mtx, Application: app, } + cli.BaseService = NewBaseService(log, "localClient", cli) + return cli } func (app *localClient) SetResponseCallback(cb Callback) { @@ -32,10 +37,6 @@ func (app *localClient) Error() error { return nil } -func (app *localClient) Stop() bool { - return true -} - func (app *localClient) FlushAsync() *ReqRes { // Do nothing return newLocalReqRes(types.ToRequestFlush(), nil) diff --git a/client/socket_client.go b/client/socket_client.go index ea1e7d6ec..10f780bc2 100644 --- a/client/socket_client.go +++ b/client/socket_client.go @@ -28,7 +28,6 @@ const flushThrottleMS = 20 // Don't wait longer than... // with concurrent callers. type socketClient struct { QuitService - sync.Mutex // [EB]: is this even used? reqQueue chan *ReqRes flushTimer *ThrottleTimer @@ -40,6 +39,7 @@ type socketClient struct { err error reqSent *list.List resCb func(*types.Request, *types.Response) // listens to all callbacks + } func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) { @@ -53,70 +53,63 @@ func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) { resCb: nil, } cli.QuitService = *NewQuitService(nil, "socketClient", cli) + _, err := cli.Start() // Just start it, it's confusing for callers to remember to start. return cli, err } func (cli *socketClient) OnStart() error { cli.QuitService.OnStart() + + var err error + var conn net.Conn RETRY_LOOP: for { - conn, err := Connect(cli.addr) + conn, err = Connect(cli.addr) if err != nil { if cli.mustConnect { return err } else { - log.Warn(Fmt("tmsp.socketClient failed to connect to %v. Retrying...\n", cli.addr)) + log.Warn(Fmt("tmsp.socketClient failed to connect to %v. Retrying...", cli.addr)) time.Sleep(time.Second * 3) continue RETRY_LOOP } } + cli.conn = conn + go cli.sendRequestsRoutine(conn) go cli.recvResponseRoutine(conn) - return err + + return nil } return nil // never happens } func (cli *socketClient) OnStop() { cli.QuitService.OnStop() + + cli.mtx.Lock() + defer cli.mtx.Unlock() if cli.conn != nil { cli.conn.Close() } - cli.flushQueue() -} -func (cli *socketClient) flushQueue() { -LOOP: - for { - select { - case reqres := <-cli.reqQueue: - reqres.Done() - default: - break LOOP - } - } -} - -// Set listener for all responses -// NOTE: callback may get internally generated flush responses. -func (cli *socketClient) SetResponseCallback(resCb Callback) { - cli.mtx.Lock() - defer cli.mtx.Unlock() - cli.resCb = resCb + cli.flushQueue() } +// Stop the client and set the error func (cli *socketClient) StopForError(err error) { + cli.mtx.Lock() if !cli.IsRunning() { return } - cli.mtx.Lock() - log.Warn(Fmt("Stopping tmsp.socketClient for error: %v\n", err.Error())) if cli.err == nil { cli.err = err } cli.mtx.Unlock() + + log.Warn(Fmt("Stopping tmsp.socketClient for error: %v", err.Error())) cli.Stop() } @@ -126,9 +119,18 @@ func (cli *socketClient) Error() error { return cli.err } +// Set listener for all responses +// NOTE: callback may get internally generated flush responses. +func (cli *socketClient) SetResponseCallback(resCb Callback) { + cli.mtx.Lock() + defer cli.mtx.Unlock() + cli.resCb = resCb +} + //---------------------------------------- func (cli *socketClient) sendRequestsRoutine(conn net.Conn) { + w := bufio.NewWriter(conn) for { select { @@ -144,14 +146,14 @@ func (cli *socketClient) sendRequestsRoutine(conn net.Conn) { cli.willSendReq(reqres) err := types.WriteMessage(reqres.Request, w) if err != nil { - cli.StopForError(err) + cli.StopForError(fmt.Errorf("Error writing msg: %v", err)) return } // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) if _, ok := reqres.Request.Value.(*types.Request_Flush); ok { err = w.Flush() if err != nil { - cli.StopForError(err) + cli.StopForError(fmt.Errorf("Error flushing writer: %v", err)) return } } @@ -160,6 +162,7 @@ func (cli *socketClient) sendRequestsRoutine(conn net.Conn) { } func (cli *socketClient) recvResponseRoutine(conn net.Conn) { + r := bufio.NewReader(conn) // Buffer reads for { var res = &types.Response{} @@ -172,11 +175,13 @@ func (cli *socketClient) recvResponseRoutine(conn net.Conn) { case *types.Response_Exception: // XXX After setting cli.err, release waiters (e.g. reqres.Done()) cli.StopForError(errors.New(r.Exception.Error)) + return default: // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) err := cli.didRecvResponse(res) if err != nil { cli.StopForError(err) + return } } } @@ -280,9 +285,8 @@ func (cli *socketClient) EchoSync(msg string) (res types.Result) { func (cli *socketClient) FlushSync() error { reqRes := cli.queueRequest(types.ToRequestFlush()) - if reqRes == nil { - return fmt.Errorf("Remote app is not running") - + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) } reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here return cli.err @@ -378,10 +382,6 @@ func (cli *socketClient) EndBlockSync(height uint64) (validators []*types.Valida //---------------------------------------- func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { - if !cli.IsRunning() { - return nil - } - reqres := NewReqRes(req) // TODO: set cli.err if reqQueue times out @@ -398,6 +398,18 @@ func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { return reqres } +func (cli *socketClient) flushQueue() { +LOOP: + for { + select { + case reqres := <-cli.reqQueue: + reqres.Done() + default: + break LOOP + } + } +} + //---------------------------------------- func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { diff --git a/example/dummy/dummy_test.go b/example/dummy/dummy_test.go deleted file mode 100644 index c5ac2d271..000000000 --- a/example/dummy/dummy_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package dummy - -import ( - "reflect" - "testing" - "time" - - . "github.com/tendermint/go-common" - "github.com/tendermint/tmsp/server" - "github.com/tendermint/tmsp/types" -) - -func TestStream(t *testing.T) { - - numAppendTxs := 200000 - - // Start the listener - server, err := server.NewSocketServer("unix://test.sock", NewDummyApplication()) - if err != nil { - Exit(err.Error()) - } - defer server.Stop() - - // Connect to the socket - conn, err := Connect("unix://test.sock") - if err != nil { - Exit(err.Error()) - } - - // Read response data - done := make(chan struct{}) - go func() { - counter := 0 - for { - - var res = &types.Response{} - err := types.ReadMessage(conn, res) - if err != nil { - Exit(err.Error()) - } - - // Process response - switch r := res.Value.(type) { - case *types.Response_AppendTx: - counter += 1 - if r.AppendTx.Code != types.CodeType_OK { - t.Error("AppendTx failed with ret_code", r.AppendTx.Code) - } - if counter > numAppendTxs { - t.Fatal("Too many AppendTx responses") - } - t.Log("response", counter) - if counter == numAppendTxs { - go func() { - time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow - close(done) - }() - } - case *types.Response_Flush: - // ignore - default: - t.Error("Unexpected response type", reflect.TypeOf(res.Value)) - } - } - }() - - // Write requests - for counter := 0; counter < numAppendTxs; counter++ { - // Send request - var req = types.ToRequestAppendTx([]byte("test")) - err := types.WriteMessage(req, conn) - if err != nil { - t.Fatal(err.Error()) - } - - // Sometimes send flush messages - if counter%123 == 0 { - t.Log("flush") - err := types.WriteMessage(types.ToRequestFlush(), conn) - if err != nil { - t.Fatal(err.Error()) - } - } - } - - // Send final flush message - err = types.WriteMessage(types.ToRequestFlush(), conn) - if err != nil { - t.Fatal(err.Error()) - } - - <-done -} diff --git a/example/example_test.go b/example/example_test.go new file mode 100644 index 000000000..fc9066d8c --- /dev/null +++ b/example/example_test.go @@ -0,0 +1,151 @@ +package nilapp + +import ( + "fmt" + "net" + "reflect" + "testing" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + + . "github.com/tendermint/go-common" + "github.com/tendermint/tmsp/client" + "github.com/tendermint/tmsp/example/dummy" + nilapp "github.com/tendermint/tmsp/example/nil" + "github.com/tendermint/tmsp/server" + "github.com/tendermint/tmsp/types" +) + +func TestDummy(t *testing.T) { + fmt.Println("### Testing Dummy") + testStream(t, dummy.NewDummyApplication()) +} + +func TestNilApp(t *testing.T) { + fmt.Println("### Testing NilApp") + testStream(t, nilapp.NewNilApplication()) +} + +func TestGRPC(t *testing.T) { + fmt.Println("### Testing GRPC") + testGRPCSync(t, types.NewGRPCApplication(nilapp.NewNilApplication())) +} + +func testStream(t *testing.T, app types.Application) { + + numAppendTxs := 200000 + + // Start the listener + server, err := server.NewSocketServer("unix://test.sock", app) + if err != nil { + Exit(Fmt("Error starting socket server: %v", err.Error())) + } + defer server.Stop() + + // Connect to the socket + client, err := tmspcli.NewSocketClient("unix://test.sock", false) + if err != nil { + Exit(Fmt("Error starting socket client: %v", err.Error())) + } + client.Start() + defer client.Stop() + + done := make(chan struct{}) + counter := 0 + client.SetResponseCallback(func(req *types.Request, res *types.Response) { + // Process response + switch r := res.Value.(type) { + case *types.Response_AppendTx: + counter += 1 + if r.AppendTx.Code != types.CodeType_OK { + t.Error("AppendTx failed with ret_code", r.AppendTx.Code) + } + if counter > numAppendTxs { + t.Fatalf("Too many AppendTx responses. Got %d, expected %d", counter, numAppendTxs) + } + if counter == numAppendTxs { + go func() { + time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow + close(done) + }() + return + } + case *types.Response_Flush: + // ignore + default: + t.Error("Unexpected response type", reflect.TypeOf(res.Value)) + } + }) + + // Write requests + for counter := 0; counter < numAppendTxs; counter++ { + // Send request + reqRes := client.AppendTxAsync([]byte("test")) + _ = reqRes + // check err ? + + // Sometimes send flush messages + if counter%123 == 0 { + client.FlushAsync() + // check err ? + } + } + + // Send final flush message + client.FlushAsync() + + <-done +} + +//------------------------- +// test grpc + +func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) { + return Connect(addr) +} + +func testGRPCSync(t *testing.T, app *types.GRPCApplication) { + + numAppendTxs := 2000 + + // Start the listener + server, err := server.NewGRPCServer("unix://test.sock", app) + if err != nil { + Exit(Fmt("Error starting GRPC server: %v", err.Error())) + } + defer server.Stop() + + // Connect to the socket + conn, err := grpc.Dial("unix://test.sock", grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) + if err != nil { + Exit(Fmt("Error dialing GRPC server: %v", err.Error())) + } + defer conn.Close() + + client := types.NewTMSPApplicationClient(conn) + + // Write requests + for counter := 0; counter < numAppendTxs; counter++ { + // Send request + response, err := client.AppendTx(context.Background(), &types.RequestAppendTx{[]byte("test")}) + if err != nil { + t.Fatalf("Error in GRPC AppendTx: %v", err.Error()) + } + counter += 1 + if response.Code != types.CodeType_OK { + t.Error("AppendTx failed with ret_code", response.Code) + } + if counter > numAppendTxs { + t.Fatal("Too many AppendTx responses") + } + t.Log("response", counter) + if counter == numAppendTxs { + go func() { + time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow + }() + } + + } +} diff --git a/example/nil/nil_test.go b/example/nil/nil_test.go deleted file mode 100644 index ed7a36ddb..000000000 --- a/example/nil/nil_test.go +++ /dev/null @@ -1,148 +0,0 @@ -package nilapp - -import ( - "net" - "reflect" - "testing" - "time" - - "golang.org/x/net/context" - "google.golang.org/grpc" - - . "github.com/tendermint/go-common" - "github.com/tendermint/tmsp/server" - "github.com/tendermint/tmsp/types" -) - -func TestStream(t *testing.T) { - - numAppendTxs := 200000 - - // Start the listener - server, err := server.NewSocketServer("unix://test.sock", NewNilApplication()) - if err != nil { - Exit(err.Error()) - } - defer server.Stop() - - // Connect to the socket - conn, err := Connect("unix://test.sock") - if err != nil { - Exit(err.Error()) - } - - // Read response data - done := make(chan struct{}) - go func() { - counter := 0 - for { - - var res = &types.Response{} - err := types.ReadMessage(conn, res) - if err != nil { - Exit(err.Error()) - } - - // Process response - switch r := res.Value.(type) { - case *types.Response_AppendTx: - counter += 1 - if r.AppendTx.Code != types.CodeType_OK { - t.Error("AppendTx failed with ret_code", r.AppendTx.Code) - } - if counter > numAppendTxs { - t.Fatal("Too many AppendTx responses") - } - t.Log("response", counter) - if counter == numAppendTxs { - go func() { - time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow - close(done) - }() - } - case *types.Response_Flush: - // ignore - default: - t.Error("Unexpected response type", reflect.TypeOf(res.Value)) - } - } - }() - - // Write requests - for counter := 0; counter < numAppendTxs; counter++ { - // Send request - var req = types.ToRequestAppendTx([]byte("test")) - err := types.WriteMessage(req, conn) - if err != nil { - t.Fatal(err.Error()) - } - - // Sometimes send flush messages - if counter%123 == 0 { - t.Log("flush") - err := types.WriteMessage(types.ToRequestFlush(), conn) - if err != nil { - t.Fatal(err.Error()) - } - } - } - - // Send final flush message - err = types.WriteMessage(types.ToRequestFlush(), conn) - if err != nil { - t.Fatal(err.Error()) - } - - <-done -} - -//------------------------- -// test grpc - -func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) { - return Connect(addr) -} - -func TestGRPCSync(t *testing.T) { - - numAppendTxs := 2000 - - // Start the listener - server, err := server.NewGRPCServer("unix://test.sock", types.NewGRPCApplication(NewNilApplication())) - if err != nil { - Exit(err.Error()) - } - defer server.Stop() - - // Connect to the socket - conn, err := grpc.Dial("unix://test.sock", grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) - if err != nil { - Exit(err.Error()) - } - defer conn.Close() - - client := types.NewTMSPApplicationClient(conn) - - // Write requests - for counter := 0; counter < numAppendTxs; counter++ { - // Send request - response, err := client.AppendTx(context.Background(), &types.RequestAppendTx{[]byte("test")}) - if err != nil { - t.Fatal(err.Error()) - } - counter += 1 - if response.Code != types.CodeType_OK { - t.Error("AppendTx failed with ret_code", response.Code) - } - if counter > numAppendTxs { - t.Fatal("Too many AppendTx responses") - } - t.Log("response", counter) - if counter == numAppendTxs { - go func() { - time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow - }() - } - - } -} diff --git a/server/grpc_server.go b/server/grpc_server.go index 3a21e3fb9..02f6fa3f8 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -18,6 +18,7 @@ type GRPCServer struct { proto string addr string listener net.Listener + server *grpc.Server app types.TMSPApplicationServer } @@ -43,13 +44,13 @@ func (s *GRPCServer) OnStart() error { return err } s.listener = ln - grpcServer := grpc.NewServer() - types.RegisterTMSPApplicationServer(grpcServer, s.app) - go grpcServer.Serve(ln) + s.server = grpc.NewServer() + types.RegisterTMSPApplicationServer(s.server, s.app) + go s.server.Serve(s.listener) return nil } func (s *GRPCServer) OnStop() { s.QuitService.OnStop() - s.listener.Close() + s.server.Stop() } diff --git a/server/log.go b/server/log.go new file mode 100644 index 000000000..8ac4092e7 --- /dev/null +++ b/server/log.go @@ -0,0 +1,7 @@ +package server + +import ( + "github.com/tendermint/go-logger" +) + +var log = logger.New("module", "tmsp-server") diff --git a/server/socket_server.go b/server/socket_server.go index 7969974e4..8e837d8fb 100644 --- a/server/socket_server.go +++ b/server/socket_server.go @@ -21,6 +21,10 @@ type SocketServer struct { addr string listener net.Listener + connsMtx sync.Mutex + conns map[int]net.Conn + nextConnID int + appMtx sync.Mutex app types.Application } @@ -33,6 +37,7 @@ func NewSocketServer(protoAddr string, app types.Application) (Service, error) { addr: addr, listener: nil, app: app, + conns: make(map[int]net.Conn), } s.QuitService = *NewQuitService(nil, "TMSPServer", s) _, err := s.Start() // Just start it @@ -53,6 +58,33 @@ func (s *SocketServer) OnStart() error { func (s *SocketServer) OnStop() { s.QuitService.OnStop() s.listener.Close() + + s.connsMtx.Lock() + for id, conn := range s.conns { + delete(s.conns, id) + conn.Close() + } + s.connsMtx.Unlock() +} + +func (s *SocketServer) addConn(conn net.Conn) int { + s.connsMtx.Lock() + defer s.connsMtx.Unlock() + + connID := s.nextConnID + s.nextConnID += 1 + s.conns[connID] = conn + + return connID +} + +// deletes conn even if close errs +func (s *SocketServer) rmConn(connID int, conn net.Conn) error { + s.connsMtx.Lock() + defer s.connsMtx.Unlock() + + delete(s.conns, connID) + return conn.Close() } func (s *SocketServer) acceptConnectionsRoutine() { @@ -62,7 +94,7 @@ func (s *SocketServer) acceptConnectionsRoutine() { // semaphore <- struct{}{} // Accept a connection - fmt.Println("Waiting for new connection...") + log.Notice("Waiting for new connection...") conn, err := s.listener.Accept() if err != nil { if !s.IsRunning() { @@ -70,9 +102,11 @@ func (s *SocketServer) acceptConnectionsRoutine() { } Exit("Failed to accept connection: " + err.Error()) } else { - fmt.Println("Accepted a new connection") + log.Notice("Accepted a new connection") } + connID := s.addConn(conn) + closeConn := make(chan error, 2) // Push to signal connection closed responses := make(chan *types.Response, 1000) // A channel to buffer responses @@ -84,16 +118,20 @@ func (s *SocketServer) acceptConnectionsRoutine() { go func() { // Wait until signal to close connection errClose := <-closeConn - if errClose != nil { - fmt.Printf("Connection error: %v\n", errClose) + if err == io.EOF { + log.Warn("Connection was closed by client") + return // is this correct? the conn is closed? + } else if errClose != nil { + log.Warn("Connection error", "error", errClose) } else { - fmt.Println("Connection was closed.") + // never happens + log.Warn("Connection was closed.") } // Close the connection - err := conn.Close() + err := s.rmConn(connID, conn) if err != nil { - fmt.Printf("Error in closing connection: %v\n", err) + log.Warn("Error in closing connection", "error", err) } // <-semaphore @@ -111,9 +149,9 @@ func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, respo err := types.ReadMessage(bufReader, req) if err != nil { if err == io.EOF { - closeConn <- fmt.Errorf("Connection closed by client") + closeConn <- err } else { - closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) + closeConn <- fmt.Errorf("Error reading message: %v", err.Error()) } return } @@ -176,13 +214,13 @@ func (s *SocketServer) handleResponses(closeConn chan error, responses <-chan *t var res = <-responses err := types.WriteMessage(res, bufWriter) if err != nil { - closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) + closeConn <- fmt.Errorf("Error writing message: %v", err.Error()) return } if _, ok := res.Value.(*types.Response_Flush); ok { err = bufWriter.Flush() if err != nil { - closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) + closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error()) return } } diff --git a/tests/test_cli/test.sh b/tests/test_cli/test.sh index f8920958a..adb3e4bd2 100644 --- a/tests/test_cli/test.sh +++ b/tests/test_cli/test.sh @@ -9,7 +9,7 @@ function testExample() { $APP &> /dev/null & sleep 2 tmsp-cli --verbose batch < $INPUT > "${INPUT}.out.new" - killall "$APP" > /dev/null + killall "$APP" &> /dev/null pre=`shasum < "${INPUT}.out"` post=`shasum < "${INPUT}.out.new"` From 83920a1c373d4e9c91b4f2d0f8ba3a40b8cdf5b0 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 17 Aug 2016 23:27:49 -0400 Subject: [PATCH 4/4] fixes from review --- client/grpc_client.go | 38 +++++++++++++++++--------------------- client/local_client.go | 4 ++-- server/socket_server.go | 1 - 3 files changed, 19 insertions(+), 24 deletions(-) diff --git a/client/grpc_client.go b/client/grpc_client.go index 3e44dcc11..916189b5b 100644 --- a/client/grpc_client.go +++ b/client/grpc_client.go @@ -240,22 +240,18 @@ func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) return reqres } -func (cli *grpcClient) checkErrGetResult() *types.Result { +func (cli *grpcClient) checkErrGetResult() types.Result { if cli.err != nil { - errorLog := cli.err.Error() cli.StopForError(cli.err) - result := types.ErrInternalError - result.SetLog(errorLog) - return &result + return types.ErrInternalError.SetLog(cli.err.Error()) } return nil } func (cli *grpcClient) checkGetErr() error { if cli.err != nil { - err := errors.New(cli.err.Error()) cli.StopForError(cli.err) - return err + return cli.err } return nil } @@ -264,8 +260,8 @@ func (cli *grpcClient) checkGetErr() error { func (cli *grpcClient) EchoSync(msg string) (res types.Result) { reqres := cli.EchoAsync(msg) - if res := cli.checkErrGetResult(); res != nil { - return *res + if res := cli.checkErrGetResult(); res.IsErr() { + return res } resp := reqres.Response.GetEcho() return types.NewResultOK([]byte(resp.Message), LOG) @@ -277,8 +273,8 @@ func (cli *grpcClient) FlushSync() error { func (cli *grpcClient) InfoSync() (res types.Result) { reqres := cli.InfoAsync() - if res := cli.checkErrGetResult(); res != nil { - return *res + if res := cli.checkErrGetResult(); res.IsErr() { + return res } resp := reqres.Response.GetInfo() return types.NewResultOK([]byte(resp.Info), LOG) @@ -286,8 +282,8 @@ func (cli *grpcClient) InfoSync() (res types.Result) { func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) { reqres := cli.SetOptionAsync(key, value) - if res := cli.checkErrGetResult(); res != nil { - return *res + if res := cli.checkErrGetResult(); res.IsErr() { + return res } resp := reqres.Response.GetSetOption() return types.Result{Code: OK, Data: nil, Log: resp.Log} @@ -295,8 +291,8 @@ func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) { reqres := cli.AppendTxAsync(tx) - if res := cli.checkErrGetResult(); res != nil { - return *res + if res := cli.checkErrGetResult(); res.IsErr() { + return res } resp := reqres.Response.GetAppendTx() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -304,8 +300,8 @@ func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) { func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) { reqres := cli.CheckTxAsync(tx) - if res := cli.checkErrGetResult(); res != nil { - return *res + if res := cli.checkErrGetResult(); res.IsErr() { + return res } resp := reqres.Response.GetCheckTx() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -313,8 +309,8 @@ func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) { func (cli *grpcClient) QuerySync(query []byte) (res types.Result) { reqres := cli.QueryAsync(query) - if res := cli.checkErrGetResult(); res != nil { - return *res + if res := cli.checkErrGetResult(); res.IsErr() { + return res } resp := reqres.Response.GetQuery() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -322,8 +318,8 @@ func (cli *grpcClient) QuerySync(query []byte) (res types.Result) { func (cli *grpcClient) CommitSync() (res types.Result) { reqres := cli.CommitAsync() - if res := cli.checkErrGetResult(); res != nil { - return *res + if res := cli.checkErrGetResult(); res.IsErr() { + return res } resp := reqres.Response.GetCommit() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} diff --git a/client/local_client.go b/client/local_client.go index eac5661b8..ce17f6c07 100644 --- a/client/local_client.go +++ b/client/local_client.go @@ -8,7 +8,7 @@ import ( ) type localClient struct { - *BaseService + BaseService mtx *sync.Mutex types.Application Callback @@ -22,7 +22,7 @@ func NewLocalClient(mtx *sync.Mutex, app types.Application) *localClient { mtx: mtx, Application: app, } - cli.BaseService = NewBaseService(log, "localClient", cli) + cli.BaseService = *NewBaseService(log, "localClient", cli) return cli } diff --git a/server/socket_server.go b/server/socket_server.go index 8e837d8fb..ff01a9082 100644 --- a/server/socket_server.go +++ b/server/socket_server.go @@ -120,7 +120,6 @@ func (s *SocketServer) acceptConnectionsRoutine() { errClose := <-closeConn if err == io.EOF { log.Warn("Connection was closed by client") - return // is this correct? the conn is closed? } else if errClose != nil { log.Warn("Connection error", "error", errClose) } else {