From d3bdb49aae984a3866a72e5980c932f13de63b75 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 10 Aug 2016 17:49:15 -0400 Subject: [PATCH] remote_client -> socket_client; use logger --- client/grpc_client.go | 5 +- client/log.go | 7 ++ client/{remote_client.go => socket_client.go} | 82 ++++++++++--------- 3 files changed, 51 insertions(+), 43 deletions(-) create mode 100644 client/log.go rename client/{remote_client.go => socket_client.go} (77%) diff --git a/client/grpc_client.go b/client/grpc_client.go index 813be658a..745de2f82 100644 --- a/client/grpc_client.go +++ b/client/grpc_client.go @@ -1,7 +1,6 @@ package tmspcli import ( - "fmt" "net" "sync" "time" @@ -50,7 +49,7 @@ RETRY_LOOP: if cli.mustConnect { return err } else { - fmt.Printf("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr) + log.Warn(Fmt("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr)) time.Sleep(time.Second * 3) continue RETRY_LOOP } @@ -75,7 +74,7 @@ func (cli *grpcClient) SetResponseCallback(resCb Callback) { func (cli *grpcClient) StopForError(err error) { cli.mtx.Lock() - fmt.Printf("Stopping tmsp.grpcClient for error: %v\n", err.Error()) + log.Warn(Fmt("Stopping tmsp.grpcClient for error: %v\n", err.Error())) if cli.err == nil { cli.err = err } diff --git a/client/log.go b/client/log.go new file mode 100644 index 000000000..deadf6c34 --- /dev/null +++ b/client/log.go @@ -0,0 +1,7 @@ +package tmspcli + +import ( + "github.com/tendermint/go-logger" +) + +var log = logger.New("module", "tmspcli") diff --git a/client/remote_client.go b/client/socket_client.go similarity index 77% rename from client/remote_client.go rename to client/socket_client.go index 3c6803b7e..2b89b294f 100644 --- a/client/remote_client.go +++ b/client/socket_client.go @@ -26,7 +26,7 @@ const flushThrottleMS = 20 // Don't wait longer than... // This is goroutine-safe, but users should beware that // the application in general is not meant to be interfaced // with concurrent callers. -type remoteClient struct { +type socketClient struct { QuitService sync.Mutex // [EB]: is this even used? @@ -42,22 +42,22 @@ type remoteClient struct { resCb func(*types.Request, *types.Response) // listens to all callbacks } -func NewSocketClient(addr string, mustConnect bool) (*remoteClient, error) { - cli := &remoteClient{ +func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) { + cli := &socketClient{ reqQueue: make(chan *ReqRes, reqQueueSize), - flushTimer: NewThrottleTimer("remoteClient", flushThrottleMS), + flushTimer: NewThrottleTimer("socketClient", flushThrottleMS), mustConnect: mustConnect, addr: addr, reqSent: list.New(), resCb: nil, } - cli.QuitService = *NewQuitService(nil, "remoteClient", cli) + cli.QuitService = *NewQuitService(nil, "socketClient", cli) _, err := cli.Start() // Just start it, it's confusing for callers to remember to start. return cli, err } -func (cli *remoteClient) OnStart() error { +func (cli *socketClient) OnStart() error { cli.QuitService.OnStart() RETRY_LOOP: for { @@ -66,7 +66,7 @@ RETRY_LOOP: if cli.mustConnect { return err } else { - fmt.Printf("tmsp.remoteClient failed to connect to %v. Retrying...\n", cli.addr) + log.Warn(Fmt("tmsp.socketClient failed to connect to %v. Retrying...\n", cli.addr)) time.Sleep(time.Second * 3) continue RETRY_LOOP } @@ -78,7 +78,7 @@ RETRY_LOOP: return nil // never happens } -func (cli *remoteClient) OnStop() { +func (cli *socketClient) OnStop() { cli.QuitService.OnStop() if cli.conn != nil { cli.conn.Close() @@ -87,15 +87,15 @@ func (cli *remoteClient) OnStop() { // Set listener for all responses // NOTE: callback may get internally generated flush responses. -func (cli *remoteClient) SetResponseCallback(resCb Callback) { +func (cli *socketClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() defer cli.mtx.Unlock() cli.resCb = resCb } -func (cli *remoteClient) StopForError(err error) { +func (cli *socketClient) StopForError(err error) { cli.mtx.Lock() - fmt.Printf("Stopping tmsp.remoteClient for error: %v\n", err.Error()) + log.Warn(Fmt("Stopping tmsp.socketClient for error: %v\n", err.Error())) if cli.err == nil { cli.err = err } @@ -103,7 +103,7 @@ func (cli *remoteClient) StopForError(err error) { cli.Stop() } -func (cli *remoteClient) Error() error { +func (cli *socketClient) Error() error { cli.mtx.Lock() defer cli.mtx.Unlock() return cli.err @@ -111,13 +111,13 @@ func (cli *remoteClient) Error() error { //---------------------------------------- -func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) { +func (cli *socketClient) sendRequestsRoutine(conn net.Conn) { w := bufio.NewWriter(conn) for { select { case <-cli.flushTimer.Ch: select { - case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): // cant this block ? + case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): default: // Probably will fill the buffer, or retry later. } @@ -142,7 +142,7 @@ func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) { } } -func (cli *remoteClient) recvResponseRoutine(conn net.Conn) { +func (cli *socketClient) recvResponseRoutine(conn net.Conn) { r := bufio.NewReader(conn) // Buffer reads for { var res = &types.Response{} @@ -165,13 +165,13 @@ func (cli *remoteClient) recvResponseRoutine(conn net.Conn) { } } -func (cli *remoteClient) willSendReq(reqres *ReqRes) { +func (cli *socketClient) willSendReq(reqres *ReqRes) { cli.mtx.Lock() defer cli.mtx.Unlock() cli.reqSent.PushBack(reqres) } -func (cli *remoteClient) didRecvResponse(res *types.Response) error { +func (cli *socketClient) didRecvResponse(res *types.Response) error { cli.mtx.Lock() defer cli.mtx.Unlock() @@ -205,53 +205,53 @@ func (cli *remoteClient) didRecvResponse(res *types.Response) error { //---------------------------------------- -func (cli *remoteClient) EchoAsync(msg string) *ReqRes { +func (cli *socketClient) EchoAsync(msg string) *ReqRes { return cli.queueRequest(types.ToRequestEcho(msg)) } -func (cli *remoteClient) FlushAsync() *ReqRes { +func (cli *socketClient) FlushAsync() *ReqRes { return cli.queueRequest(types.ToRequestFlush()) } -func (cli *remoteClient) InfoAsync() *ReqRes { +func (cli *socketClient) InfoAsync() *ReqRes { return cli.queueRequest(types.ToRequestInfo()) } -func (cli *remoteClient) SetOptionAsync(key string, value string) *ReqRes { +func (cli *socketClient) SetOptionAsync(key string, value string) *ReqRes { return cli.queueRequest(types.ToRequestSetOption(key, value)) } -func (cli *remoteClient) AppendTxAsync(tx []byte) *ReqRes { +func (cli *socketClient) AppendTxAsync(tx []byte) *ReqRes { return cli.queueRequest(types.ToRequestAppendTx(tx)) } -func (cli *remoteClient) CheckTxAsync(tx []byte) *ReqRes { +func (cli *socketClient) CheckTxAsync(tx []byte) *ReqRes { return cli.queueRequest(types.ToRequestCheckTx(tx)) } -func (cli *remoteClient) QueryAsync(query []byte) *ReqRes { +func (cli *socketClient) QueryAsync(query []byte) *ReqRes { return cli.queueRequest(types.ToRequestQuery(query)) } -func (cli *remoteClient) CommitAsync() *ReqRes { +func (cli *socketClient) CommitAsync() *ReqRes { return cli.queueRequest(types.ToRequestCommit()) } -func (cli *remoteClient) InitChainAsync(validators []*types.Validator) *ReqRes { +func (cli *socketClient) InitChainAsync(validators []*types.Validator) *ReqRes { return cli.queueRequest(types.ToRequestInitChain(validators)) } -func (cli *remoteClient) BeginBlockAsync(height uint64) *ReqRes { +func (cli *socketClient) BeginBlockAsync(height uint64) *ReqRes { return cli.queueRequest(types.ToRequestBeginBlock(height)) } -func (cli *remoteClient) EndBlockAsync(height uint64) *ReqRes { +func (cli *socketClient) EndBlockAsync(height uint64) *ReqRes { return cli.queueRequest(types.ToRequestEndBlock(height)) } //---------------------------------------- -func (cli *remoteClient) EchoSync(msg string) (res types.Result) { +func (cli *socketClient) EchoSync(msg string) (res types.Result) { reqres := cli.queueRequest(types.ToRequestEcho(msg)) cli.FlushSync() if cli.err != nil { @@ -261,12 +261,14 @@ func (cli *remoteClient) EchoSync(msg string) (res types.Result) { return types.Result{Code: OK, Data: []byte(resp.Message), Log: LOG} } -func (cli *remoteClient) FlushSync() error { +func (cli *socketClient) FlushSync() error { + log.Warn("FlushSync") cli.queueRequest(types.ToRequestFlush()).Wait() + log.Warn("Done FlushSync") return cli.err } -func (cli *remoteClient) InfoSync() (res types.Result) { +func (cli *socketClient) InfoSync() (res types.Result) { reqres := cli.queueRequest(types.ToRequestInfo()) cli.FlushSync() if cli.err != nil { @@ -276,7 +278,7 @@ func (cli *remoteClient) InfoSync() (res types.Result) { return types.Result{Code: OK, Data: []byte(resp.Info), Log: LOG} } -func (cli *remoteClient) SetOptionSync(key string, value string) (res types.Result) { +func (cli *socketClient) SetOptionSync(key string, value string) (res types.Result) { reqres := cli.queueRequest(types.ToRequestSetOption(key, value)) cli.FlushSync() if cli.err != nil { @@ -286,7 +288,7 @@ func (cli *remoteClient) SetOptionSync(key string, value string) (res types.Resu return types.Result{Code: OK, Data: nil, Log: resp.Log} } -func (cli *remoteClient) AppendTxSync(tx []byte) (res types.Result) { +func (cli *socketClient) AppendTxSync(tx []byte) (res types.Result) { reqres := cli.queueRequest(types.ToRequestAppendTx(tx)) cli.FlushSync() if cli.err != nil { @@ -296,7 +298,7 @@ func (cli *remoteClient) AppendTxSync(tx []byte) (res types.Result) { return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} } -func (cli *remoteClient) CheckTxSync(tx []byte) (res types.Result) { +func (cli *socketClient) CheckTxSync(tx []byte) (res types.Result) { reqres := cli.queueRequest(types.ToRequestCheckTx(tx)) cli.FlushSync() if cli.err != nil { @@ -306,7 +308,7 @@ func (cli *remoteClient) CheckTxSync(tx []byte) (res types.Result) { return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} } -func (cli *remoteClient) QuerySync(query []byte) (res types.Result) { +func (cli *socketClient) QuerySync(query []byte) (res types.Result) { reqres := cli.queueRequest(types.ToRequestQuery(query)) cli.FlushSync() if cli.err != nil { @@ -316,7 +318,7 @@ func (cli *remoteClient) QuerySync(query []byte) (res types.Result) { return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} } -func (cli *remoteClient) CommitSync() (res types.Result) { +func (cli *socketClient) CommitSync() (res types.Result) { reqres := cli.queueRequest(types.ToRequestCommit()) cli.FlushSync() if cli.err != nil { @@ -326,7 +328,7 @@ func (cli *remoteClient) CommitSync() (res types.Result) { return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} } -func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error) { +func (cli *socketClient) InitChainSync(validators []*types.Validator) (err error) { cli.queueRequest(types.ToRequestInitChain(validators)) cli.FlushSync() if cli.err != nil { @@ -335,7 +337,7 @@ func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error return nil } -func (cli *remoteClient) BeginBlockSync(height uint64) (err error) { +func (cli *socketClient) BeginBlockSync(height uint64) (err error) { cli.queueRequest(types.ToRequestBeginBlock(height)) cli.FlushSync() if cli.err != nil { @@ -344,7 +346,7 @@ func (cli *remoteClient) BeginBlockSync(height uint64) (err error) { return nil } -func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) { +func (cli *socketClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) { reqres := cli.queueRequest(types.ToRequestEndBlock(height)) cli.FlushSync() if cli.err != nil { @@ -355,7 +357,7 @@ func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Valida //---------------------------------------- -func (cli *remoteClient) queueRequest(req *types.Request) *ReqRes { +func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { reqres := NewReqRes(req) // TODO: set cli.err if reqQueue times out