diff --git a/client/client.go b/client/client.go index 3afb838f8..291c4863a 100644 --- a/client/client.go +++ b/client/client.go @@ -4,13 +4,15 @@ import ( "fmt" "sync" + . "github.com/tendermint/go-common" "github.com/tendermint/tmsp/types" ) type Client interface { + Service + SetResponseCallback(Callback) Error() error - Stop() bool FlushAsync() *ReqRes EchoAsync(msg string) *ReqRes diff --git a/client/grpc_client.go b/client/grpc_client.go index 813be658a..916189b5b 100644 --- a/client/grpc_client.go +++ b/client/grpc_client.go @@ -1,7 +1,7 @@ package tmspcli import ( - "fmt" + "errors" "net" "sync" "time" @@ -44,42 +44,57 @@ func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) { func (cli *grpcClient) OnStart() error { cli.QuitService.OnStart() RETRY_LOOP: + for { conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) if err != nil { if cli.mustConnect { return err } else { - fmt.Printf("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr) + log.Warn(Fmt("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr)) time.Sleep(time.Second * 3) continue RETRY_LOOP } } - cli.client = types.NewTMSPApplicationClient(conn) + + client := types.NewTMSPApplicationClient(conn) + + ENSURE_CONNECTED: + for { + _, err := client.Echo(context.Background(), &types.RequestEcho{"hello"}, grpc.FailFast(true)) + if err == nil { + break ENSURE_CONNECTED + } + time.Sleep(time.Second) + } + + cli.client = client return nil } } func (cli *grpcClient) OnStop() { cli.QuitService.OnStop() - // TODO: how to close when TMSPApplicationClient interface doesn't expose Close ? -} - -// Set listener for all responses -// NOTE: callback may get internally generated flush responses. -func (cli *grpcClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() defer cli.mtx.Unlock() - cli.resCb = resCb + // TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close() + /*if cli.conn != nil { + cli.conn.Close() + }*/ } func (cli *grpcClient) StopForError(err error) { cli.mtx.Lock() - fmt.Printf("Stopping tmsp.grpcClient for error: %v\n", err.Error()) + if !cli.IsRunning() { + return + } + if cli.err == nil { cli.err = err } cli.mtx.Unlock() + + log.Warn(Fmt("Stopping tmsp.grpcClient for error: %v", err.Error())) cli.Stop() } @@ -89,6 +104,14 @@ func (cli *grpcClient) Error() error { return cli.err } +// Set listener for all responses +// NOTE: callback may get internally generated flush responses. +func (cli *grpcClient) SetResponseCallback(resCb Callback) { + cli.mtx.Lock() + defer cli.mtx.Unlock() + cli.resCb = resCb +} + //---------------------------------------- // GRPC calls are synchronous, but some callbacks expect to be called asynchronously // (eg. the mempool expects to be able to lock to remove bad txs from cache). @@ -99,7 +122,7 @@ func (cli *grpcClient) Error() error { func (cli *grpcClient) EchoAsync(msg string) *ReqRes { req := types.ToRequestEcho(msg) - res, err := cli.client.Echo(context.Background(), req.GetEcho()) + res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -108,7 +131,7 @@ func (cli *grpcClient) EchoAsync(msg string) *ReqRes { func (cli *grpcClient) FlushAsync() *ReqRes { req := types.ToRequestFlush() - res, err := cli.client.Flush(context.Background(), req.GetFlush()) + res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -117,7 +140,7 @@ func (cli *grpcClient) FlushAsync() *ReqRes { func (cli *grpcClient) InfoAsync() *ReqRes { req := types.ToRequestInfo() - res, err := cli.client.Info(context.Background(), req.GetInfo()) + res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -126,7 +149,7 @@ func (cli *grpcClient) InfoAsync() *ReqRes { func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes { req := types.ToRequestSetOption(key, value) - res, err := cli.client.SetOption(context.Background(), req.GetSetOption()) + res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -135,7 +158,7 @@ func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes { func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes { req := types.ToRequestAppendTx(tx) - res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx()) + res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -144,7 +167,7 @@ func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes { func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes { req := types.ToRequestCheckTx(tx) - res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx()) + res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -153,7 +176,7 @@ func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes { func (cli *grpcClient) QueryAsync(query []byte) *ReqRes { req := types.ToRequestQuery(query) - res, err := cli.client.Query(context.Background(), req.GetQuery()) + res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -162,7 +185,7 @@ func (cli *grpcClient) QueryAsync(query []byte) *ReqRes { func (cli *grpcClient) CommitAsync() *ReqRes { req := types.ToRequestCommit() - res, err := cli.client.Commit(context.Background(), req.GetCommit()) + res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -171,7 +194,7 @@ func (cli *grpcClient) CommitAsync() *ReqRes { func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes { req := types.ToRequestInitChain(validators) - res, err := cli.client.InitChain(context.Background(), req.GetInitChain()) + res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -180,7 +203,7 @@ func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes { func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes { req := types.ToRequestBeginBlock(height) - res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock()) + res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -189,7 +212,7 @@ func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes { func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes { req := types.ToRequestEndBlock(height) - res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock()) + res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -217,11 +240,31 @@ func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) return reqres } +func (cli *grpcClient) checkErrGetResult() types.Result { + if cli.err != nil { + cli.StopForError(cli.err) + return types.ErrInternalError.SetLog(cli.err.Error()) + } + return nil +} + +func (cli *grpcClient) checkGetErr() error { + if cli.err != nil { + cli.StopForError(cli.err) + return cli.err + } + return nil +} + //---------------------------------------- func (cli *grpcClient) EchoSync(msg string) (res types.Result) { - r := cli.EchoAsync(msg).Response.GetEcho() - return types.NewResultOK([]byte(r.Message), LOG) + reqres := cli.EchoAsync(msg) + if res := cli.checkErrGetResult(); res.IsErr() { + return res + } + resp := reqres.Response.GetEcho() + return types.NewResultOK([]byte(resp.Message), LOG) } func (cli *grpcClient) FlushSync() error { @@ -229,14 +272,18 @@ func (cli *grpcClient) FlushSync() error { } func (cli *grpcClient) InfoSync() (res types.Result) { - r := cli.InfoAsync().Response.GetInfo() - return types.NewResultOK([]byte(r.Info), LOG) + reqres := cli.InfoAsync() + if res := cli.checkErrGetResult(); res.IsErr() { + return res + } + resp := reqres.Response.GetInfo() + return types.NewResultOK([]byte(resp.Info), LOG) } func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) { reqres := cli.SetOptionAsync(key, value) - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res.IsErr() { + return res } resp := reqres.Response.GetSetOption() return types.Result{Code: OK, Data: nil, Log: resp.Log} @@ -244,8 +291,8 @@ func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) { reqres := cli.AppendTxAsync(tx) - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res.IsErr() { + return res } resp := reqres.Response.GetAppendTx() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -253,8 +300,8 @@ func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) { func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) { reqres := cli.CheckTxAsync(tx) - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res.IsErr() { + return res } resp := reqres.Response.GetCheckTx() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -262,8 +309,8 @@ func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) { func (cli *grpcClient) QuerySync(query []byte) (res types.Result) { reqres := cli.QueryAsync(query) - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res.IsErr() { + return res } resp := reqres.Response.GetQuery() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -271,8 +318,8 @@ func (cli *grpcClient) QuerySync(query []byte) (res types.Result) { func (cli *grpcClient) CommitSync() (res types.Result) { reqres := cli.CommitAsync() - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res.IsErr() { + return res } resp := reqres.Response.GetCommit() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -280,24 +327,24 @@ func (cli *grpcClient) CommitSync() (res types.Result) { func (cli *grpcClient) InitChainSync(validators []*types.Validator) (err error) { cli.InitChainAsync(validators) - if cli.err != nil { - return cli.err + if err := cli.checkGetErr(); err != nil { + return err } return nil } func (cli *grpcClient) BeginBlockSync(height uint64) (err error) { cli.BeginBlockAsync(height) - if cli.err != nil { - return cli.err + if err := cli.checkGetErr(); err != nil { + return err } return nil } func (cli *grpcClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) { reqres := cli.EndBlockAsync(height) - if cli.err != nil { - return nil, cli.err + if err := cli.checkGetErr(); err != nil { + return nil, err } return reqres.Response.GetEndBlock().Diffs, nil } diff --git a/client/local_client.go b/client/local_client.go index 06db859b4..ce17f6c07 100644 --- a/client/local_client.go +++ b/client/local_client.go @@ -1,11 +1,14 @@ package tmspcli import ( - types "github.com/tendermint/tmsp/types" "sync" + + . "github.com/tendermint/go-common" + types "github.com/tendermint/tmsp/types" ) type localClient struct { + BaseService mtx *sync.Mutex types.Application Callback @@ -15,10 +18,12 @@ func NewLocalClient(mtx *sync.Mutex, app types.Application) *localClient { if mtx == nil { mtx = new(sync.Mutex) } - return &localClient{ + cli := &localClient{ mtx: mtx, Application: app, } + cli.BaseService = *NewBaseService(log, "localClient", cli) + return cli } func (app *localClient) SetResponseCallback(cb Callback) { @@ -32,10 +37,6 @@ func (app *localClient) Error() error { return nil } -func (app *localClient) Stop() bool { - return true -} - func (app *localClient) FlushAsync() *ReqRes { // Do nothing return newLocalReqRes(types.ToRequestFlush(), nil) diff --git a/client/log.go b/client/log.go new file mode 100644 index 000000000..deadf6c34 --- /dev/null +++ b/client/log.go @@ -0,0 +1,7 @@ +package tmspcli + +import ( + "github.com/tendermint/go-logger" +) + +var log = logger.New("module", "tmspcli") diff --git a/client/remote_client.go b/client/socket_client.go similarity index 72% rename from client/remote_client.go rename to client/socket_client.go index 3c6803b7e..10f780bc2 100644 --- a/client/remote_client.go +++ b/client/socket_client.go @@ -26,9 +26,8 @@ const flushThrottleMS = 20 // Don't wait longer than... // This is goroutine-safe, but users should beware that // the application in general is not meant to be interfaced // with concurrent callers. -type remoteClient struct { +type socketClient struct { QuitService - sync.Mutex // [EB]: is this even used? reqQueue chan *ReqRes flushTimer *ThrottleTimer @@ -40,84 +39,104 @@ type remoteClient struct { err error reqSent *list.List resCb func(*types.Request, *types.Response) // listens to all callbacks + } -func NewSocketClient(addr string, mustConnect bool) (*remoteClient, error) { - cli := &remoteClient{ +func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) { + cli := &socketClient{ reqQueue: make(chan *ReqRes, reqQueueSize), - flushTimer: NewThrottleTimer("remoteClient", flushThrottleMS), + flushTimer: NewThrottleTimer("socketClient", flushThrottleMS), mustConnect: mustConnect, addr: addr, reqSent: list.New(), resCb: nil, } - cli.QuitService = *NewQuitService(nil, "remoteClient", cli) + cli.QuitService = *NewQuitService(nil, "socketClient", cli) + _, err := cli.Start() // Just start it, it's confusing for callers to remember to start. return cli, err } -func (cli *remoteClient) OnStart() error { +func (cli *socketClient) OnStart() error { cli.QuitService.OnStart() + + var err error + var conn net.Conn RETRY_LOOP: for { - conn, err := Connect(cli.addr) + conn, err = Connect(cli.addr) if err != nil { if cli.mustConnect { return err } else { - fmt.Printf("tmsp.remoteClient failed to connect to %v. Retrying...\n", cli.addr) + log.Warn(Fmt("tmsp.socketClient failed to connect to %v. Retrying...", cli.addr)) time.Sleep(time.Second * 3) continue RETRY_LOOP } } + cli.conn = conn + go cli.sendRequestsRoutine(conn) go cli.recvResponseRoutine(conn) - return err + + return nil } return nil // never happens } -func (cli *remoteClient) OnStop() { +func (cli *socketClient) OnStop() { cli.QuitService.OnStop() + + cli.mtx.Lock() + defer cli.mtx.Unlock() if cli.conn != nil { cli.conn.Close() } -} -// Set listener for all responses -// NOTE: callback may get internally generated flush responses. -func (cli *remoteClient) SetResponseCallback(resCb Callback) { - cli.mtx.Lock() - defer cli.mtx.Unlock() - cli.resCb = resCb + cli.flushQueue() } -func (cli *remoteClient) StopForError(err error) { +// Stop the client and set the error +func (cli *socketClient) StopForError(err error) { cli.mtx.Lock() - fmt.Printf("Stopping tmsp.remoteClient for error: %v\n", err.Error()) + if !cli.IsRunning() { + return + } + if cli.err == nil { cli.err = err } cli.mtx.Unlock() + + log.Warn(Fmt("Stopping tmsp.socketClient for error: %v", err.Error())) cli.Stop() } -func (cli *remoteClient) Error() error { +func (cli *socketClient) Error() error { cli.mtx.Lock() defer cli.mtx.Unlock() return cli.err } +// Set listener for all responses +// NOTE: callback may get internally generated flush responses. +func (cli *socketClient) SetResponseCallback(resCb Callback) { + cli.mtx.Lock() + defer cli.mtx.Unlock() + cli.resCb = resCb +} + //---------------------------------------- -func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) { +func (cli *socketClient) sendRequestsRoutine(conn net.Conn) { + w := bufio.NewWriter(conn) for { select { case <-cli.flushTimer.Ch: select { - case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): // cant this block ? + case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): default: // Probably will fill the buffer, or retry later. } @@ -127,14 +146,14 @@ func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) { cli.willSendReq(reqres) err := types.WriteMessage(reqres.Request, w) if err != nil { - cli.StopForError(err) + cli.StopForError(fmt.Errorf("Error writing msg: %v", err)) return } // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) if _, ok := reqres.Request.Value.(*types.Request_Flush); ok { err = w.Flush() if err != nil { - cli.StopForError(err) + cli.StopForError(fmt.Errorf("Error flushing writer: %v", err)) return } } @@ -142,7 +161,8 @@ func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) { } } -func (cli *remoteClient) recvResponseRoutine(conn net.Conn) { +func (cli *socketClient) recvResponseRoutine(conn net.Conn) { + r := bufio.NewReader(conn) // Buffer reads for { var res = &types.Response{} @@ -155,23 +175,25 @@ func (cli *remoteClient) recvResponseRoutine(conn net.Conn) { case *types.Response_Exception: // XXX After setting cli.err, release waiters (e.g. reqres.Done()) cli.StopForError(errors.New(r.Exception.Error)) + return default: // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) err := cli.didRecvResponse(res) if err != nil { cli.StopForError(err) + return } } } } -func (cli *remoteClient) willSendReq(reqres *ReqRes) { +func (cli *socketClient) willSendReq(reqres *ReqRes) { cli.mtx.Lock() defer cli.mtx.Unlock() cli.reqSent.PushBack(reqres) } -func (cli *remoteClient) didRecvResponse(res *types.Response) error { +func (cli *socketClient) didRecvResponse(res *types.Response) error { cli.mtx.Lock() defer cli.mtx.Unlock() @@ -205,53 +227,53 @@ func (cli *remoteClient) didRecvResponse(res *types.Response) error { //---------------------------------------- -func (cli *remoteClient) EchoAsync(msg string) *ReqRes { +func (cli *socketClient) EchoAsync(msg string) *ReqRes { return cli.queueRequest(types.ToRequestEcho(msg)) } -func (cli *remoteClient) FlushAsync() *ReqRes { +func (cli *socketClient) FlushAsync() *ReqRes { return cli.queueRequest(types.ToRequestFlush()) } -func (cli *remoteClient) InfoAsync() *ReqRes { +func (cli *socketClient) InfoAsync() *ReqRes { return cli.queueRequest(types.ToRequestInfo()) } -func (cli *remoteClient) SetOptionAsync(key string, value string) *ReqRes { +func (cli *socketClient) SetOptionAsync(key string, value string) *ReqRes { return cli.queueRequest(types.ToRequestSetOption(key, value)) } -func (cli *remoteClient) AppendTxAsync(tx []byte) *ReqRes { +func (cli *socketClient) AppendTxAsync(tx []byte) *ReqRes { return cli.queueRequest(types.ToRequestAppendTx(tx)) } -func (cli *remoteClient) CheckTxAsync(tx []byte) *ReqRes { +func (cli *socketClient) CheckTxAsync(tx []byte) *ReqRes { return cli.queueRequest(types.ToRequestCheckTx(tx)) } -func (cli *remoteClient) QueryAsync(query []byte) *ReqRes { +func (cli *socketClient) QueryAsync(query []byte) *ReqRes { return cli.queueRequest(types.ToRequestQuery(query)) } -func (cli *remoteClient) CommitAsync() *ReqRes { +func (cli *socketClient) CommitAsync() *ReqRes { return cli.queueRequest(types.ToRequestCommit()) } -func (cli *remoteClient) InitChainAsync(validators []*types.Validator) *ReqRes { +func (cli *socketClient) InitChainAsync(validators []*types.Validator) *ReqRes { return cli.queueRequest(types.ToRequestInitChain(validators)) } -func (cli *remoteClient) BeginBlockAsync(height uint64) *ReqRes { +func (cli *socketClient) BeginBlockAsync(height uint64) *ReqRes { return cli.queueRequest(types.ToRequestBeginBlock(height)) } -func (cli *remoteClient) EndBlockAsync(height uint64) *ReqRes { +func (cli *socketClient) EndBlockAsync(height uint64) *ReqRes { return cli.queueRequest(types.ToRequestEndBlock(height)) } //---------------------------------------- -func (cli *remoteClient) EchoSync(msg string) (res types.Result) { +func (cli *socketClient) EchoSync(msg string) (res types.Result) { reqres := cli.queueRequest(types.ToRequestEcho(msg)) cli.FlushSync() if cli.err != nil { @@ -261,12 +283,16 @@ func (cli *remoteClient) EchoSync(msg string) (res types.Result) { return types.Result{Code: OK, Data: []byte(resp.Message), Log: LOG} } -func (cli *remoteClient) FlushSync() error { - cli.queueRequest(types.ToRequestFlush()).Wait() +func (cli *socketClient) FlushSync() error { + reqRes := cli.queueRequest(types.ToRequestFlush()) + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) + } + reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here return cli.err } -func (cli *remoteClient) InfoSync() (res types.Result) { +func (cli *socketClient) InfoSync() (res types.Result) { reqres := cli.queueRequest(types.ToRequestInfo()) cli.FlushSync() if cli.err != nil { @@ -276,7 +302,7 @@ func (cli *remoteClient) InfoSync() (res types.Result) { return types.Result{Code: OK, Data: []byte(resp.Info), Log: LOG} } -func (cli *remoteClient) SetOptionSync(key string, value string) (res types.Result) { +func (cli *socketClient) SetOptionSync(key string, value string) (res types.Result) { reqres := cli.queueRequest(types.ToRequestSetOption(key, value)) cli.FlushSync() if cli.err != nil { @@ -286,7 +312,7 @@ func (cli *remoteClient) SetOptionSync(key string, value string) (res types.Resu return types.Result{Code: OK, Data: nil, Log: resp.Log} } -func (cli *remoteClient) AppendTxSync(tx []byte) (res types.Result) { +func (cli *socketClient) AppendTxSync(tx []byte) (res types.Result) { reqres := cli.queueRequest(types.ToRequestAppendTx(tx)) cli.FlushSync() if cli.err != nil { @@ -296,7 +322,7 @@ func (cli *remoteClient) AppendTxSync(tx []byte) (res types.Result) { return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} } -func (cli *remoteClient) CheckTxSync(tx []byte) (res types.Result) { +func (cli *socketClient) CheckTxSync(tx []byte) (res types.Result) { reqres := cli.queueRequest(types.ToRequestCheckTx(tx)) cli.FlushSync() if cli.err != nil { @@ -306,7 +332,7 @@ func (cli *remoteClient) CheckTxSync(tx []byte) (res types.Result) { return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} } -func (cli *remoteClient) QuerySync(query []byte) (res types.Result) { +func (cli *socketClient) QuerySync(query []byte) (res types.Result) { reqres := cli.queueRequest(types.ToRequestQuery(query)) cli.FlushSync() if cli.err != nil { @@ -316,7 +342,7 @@ func (cli *remoteClient) QuerySync(query []byte) (res types.Result) { return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} } -func (cli *remoteClient) CommitSync() (res types.Result) { +func (cli *socketClient) CommitSync() (res types.Result) { reqres := cli.queueRequest(types.ToRequestCommit()) cli.FlushSync() if cli.err != nil { @@ -326,7 +352,7 @@ func (cli *remoteClient) CommitSync() (res types.Result) { return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} } -func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error) { +func (cli *socketClient) InitChainSync(validators []*types.Validator) (err error) { cli.queueRequest(types.ToRequestInitChain(validators)) cli.FlushSync() if cli.err != nil { @@ -335,7 +361,7 @@ func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error return nil } -func (cli *remoteClient) BeginBlockSync(height uint64) (err error) { +func (cli *socketClient) BeginBlockSync(height uint64) (err error) { cli.queueRequest(types.ToRequestBeginBlock(height)) cli.FlushSync() if cli.err != nil { @@ -344,7 +370,7 @@ func (cli *remoteClient) BeginBlockSync(height uint64) (err error) { return nil } -func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) { +func (cli *socketClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) { reqres := cli.queueRequest(types.ToRequestEndBlock(height)) cli.FlushSync() if cli.err != nil { @@ -355,7 +381,7 @@ func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Valida //---------------------------------------- -func (cli *remoteClient) queueRequest(req *types.Request) *ReqRes { +func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { reqres := NewReqRes(req) // TODO: set cli.err if reqQueue times out @@ -372,6 +398,18 @@ func (cli *remoteClient) queueRequest(req *types.Request) *ReqRes { return reqres } +func (cli *socketClient) flushQueue() { +LOOP: + for { + select { + case reqres := <-cli.reqQueue: + reqres.Done() + default: + break LOOP + } + } +} + //---------------------------------------- func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { diff --git a/example/dummy/dummy_test.go b/example/dummy/dummy_test.go deleted file mode 100644 index c5ac2d271..000000000 --- a/example/dummy/dummy_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package dummy - -import ( - "reflect" - "testing" - "time" - - . "github.com/tendermint/go-common" - "github.com/tendermint/tmsp/server" - "github.com/tendermint/tmsp/types" -) - -func TestStream(t *testing.T) { - - numAppendTxs := 200000 - - // Start the listener - server, err := server.NewSocketServer("unix://test.sock", NewDummyApplication()) - if err != nil { - Exit(err.Error()) - } - defer server.Stop() - - // Connect to the socket - conn, err := Connect("unix://test.sock") - if err != nil { - Exit(err.Error()) - } - - // Read response data - done := make(chan struct{}) - go func() { - counter := 0 - for { - - var res = &types.Response{} - err := types.ReadMessage(conn, res) - if err != nil { - Exit(err.Error()) - } - - // Process response - switch r := res.Value.(type) { - case *types.Response_AppendTx: - counter += 1 - if r.AppendTx.Code != types.CodeType_OK { - t.Error("AppendTx failed with ret_code", r.AppendTx.Code) - } - if counter > numAppendTxs { - t.Fatal("Too many AppendTx responses") - } - t.Log("response", counter) - if counter == numAppendTxs { - go func() { - time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow - close(done) - }() - } - case *types.Response_Flush: - // ignore - default: - t.Error("Unexpected response type", reflect.TypeOf(res.Value)) - } - } - }() - - // Write requests - for counter := 0; counter < numAppendTxs; counter++ { - // Send request - var req = types.ToRequestAppendTx([]byte("test")) - err := types.WriteMessage(req, conn) - if err != nil { - t.Fatal(err.Error()) - } - - // Sometimes send flush messages - if counter%123 == 0 { - t.Log("flush") - err := types.WriteMessage(types.ToRequestFlush(), conn) - if err != nil { - t.Fatal(err.Error()) - } - } - } - - // Send final flush message - err = types.WriteMessage(types.ToRequestFlush(), conn) - if err != nil { - t.Fatal(err.Error()) - } - - <-done -} diff --git a/example/example_test.go b/example/example_test.go new file mode 100644 index 000000000..fc9066d8c --- /dev/null +++ b/example/example_test.go @@ -0,0 +1,151 @@ +package nilapp + +import ( + "fmt" + "net" + "reflect" + "testing" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + + . "github.com/tendermint/go-common" + "github.com/tendermint/tmsp/client" + "github.com/tendermint/tmsp/example/dummy" + nilapp "github.com/tendermint/tmsp/example/nil" + "github.com/tendermint/tmsp/server" + "github.com/tendermint/tmsp/types" +) + +func TestDummy(t *testing.T) { + fmt.Println("### Testing Dummy") + testStream(t, dummy.NewDummyApplication()) +} + +func TestNilApp(t *testing.T) { + fmt.Println("### Testing NilApp") + testStream(t, nilapp.NewNilApplication()) +} + +func TestGRPC(t *testing.T) { + fmt.Println("### Testing GRPC") + testGRPCSync(t, types.NewGRPCApplication(nilapp.NewNilApplication())) +} + +func testStream(t *testing.T, app types.Application) { + + numAppendTxs := 200000 + + // Start the listener + server, err := server.NewSocketServer("unix://test.sock", app) + if err != nil { + Exit(Fmt("Error starting socket server: %v", err.Error())) + } + defer server.Stop() + + // Connect to the socket + client, err := tmspcli.NewSocketClient("unix://test.sock", false) + if err != nil { + Exit(Fmt("Error starting socket client: %v", err.Error())) + } + client.Start() + defer client.Stop() + + done := make(chan struct{}) + counter := 0 + client.SetResponseCallback(func(req *types.Request, res *types.Response) { + // Process response + switch r := res.Value.(type) { + case *types.Response_AppendTx: + counter += 1 + if r.AppendTx.Code != types.CodeType_OK { + t.Error("AppendTx failed with ret_code", r.AppendTx.Code) + } + if counter > numAppendTxs { + t.Fatalf("Too many AppendTx responses. Got %d, expected %d", counter, numAppendTxs) + } + if counter == numAppendTxs { + go func() { + time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow + close(done) + }() + return + } + case *types.Response_Flush: + // ignore + default: + t.Error("Unexpected response type", reflect.TypeOf(res.Value)) + } + }) + + // Write requests + for counter := 0; counter < numAppendTxs; counter++ { + // Send request + reqRes := client.AppendTxAsync([]byte("test")) + _ = reqRes + // check err ? + + // Sometimes send flush messages + if counter%123 == 0 { + client.FlushAsync() + // check err ? + } + } + + // Send final flush message + client.FlushAsync() + + <-done +} + +//------------------------- +// test grpc + +func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) { + return Connect(addr) +} + +func testGRPCSync(t *testing.T, app *types.GRPCApplication) { + + numAppendTxs := 2000 + + // Start the listener + server, err := server.NewGRPCServer("unix://test.sock", app) + if err != nil { + Exit(Fmt("Error starting GRPC server: %v", err.Error())) + } + defer server.Stop() + + // Connect to the socket + conn, err := grpc.Dial("unix://test.sock", grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) + if err != nil { + Exit(Fmt("Error dialing GRPC server: %v", err.Error())) + } + defer conn.Close() + + client := types.NewTMSPApplicationClient(conn) + + // Write requests + for counter := 0; counter < numAppendTxs; counter++ { + // Send request + response, err := client.AppendTx(context.Background(), &types.RequestAppendTx{[]byte("test")}) + if err != nil { + t.Fatalf("Error in GRPC AppendTx: %v", err.Error()) + } + counter += 1 + if response.Code != types.CodeType_OK { + t.Error("AppendTx failed with ret_code", response.Code) + } + if counter > numAppendTxs { + t.Fatal("Too many AppendTx responses") + } + t.Log("response", counter) + if counter == numAppendTxs { + go func() { + time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow + }() + } + + } +} diff --git a/example/nil/nil_test.go b/example/nil/nil_test.go deleted file mode 100644 index ed7a36ddb..000000000 --- a/example/nil/nil_test.go +++ /dev/null @@ -1,148 +0,0 @@ -package nilapp - -import ( - "net" - "reflect" - "testing" - "time" - - "golang.org/x/net/context" - "google.golang.org/grpc" - - . "github.com/tendermint/go-common" - "github.com/tendermint/tmsp/server" - "github.com/tendermint/tmsp/types" -) - -func TestStream(t *testing.T) { - - numAppendTxs := 200000 - - // Start the listener - server, err := server.NewSocketServer("unix://test.sock", NewNilApplication()) - if err != nil { - Exit(err.Error()) - } - defer server.Stop() - - // Connect to the socket - conn, err := Connect("unix://test.sock") - if err != nil { - Exit(err.Error()) - } - - // Read response data - done := make(chan struct{}) - go func() { - counter := 0 - for { - - var res = &types.Response{} - err := types.ReadMessage(conn, res) - if err != nil { - Exit(err.Error()) - } - - // Process response - switch r := res.Value.(type) { - case *types.Response_AppendTx: - counter += 1 - if r.AppendTx.Code != types.CodeType_OK { - t.Error("AppendTx failed with ret_code", r.AppendTx.Code) - } - if counter > numAppendTxs { - t.Fatal("Too many AppendTx responses") - } - t.Log("response", counter) - if counter == numAppendTxs { - go func() { - time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow - close(done) - }() - } - case *types.Response_Flush: - // ignore - default: - t.Error("Unexpected response type", reflect.TypeOf(res.Value)) - } - } - }() - - // Write requests - for counter := 0; counter < numAppendTxs; counter++ { - // Send request - var req = types.ToRequestAppendTx([]byte("test")) - err := types.WriteMessage(req, conn) - if err != nil { - t.Fatal(err.Error()) - } - - // Sometimes send flush messages - if counter%123 == 0 { - t.Log("flush") - err := types.WriteMessage(types.ToRequestFlush(), conn) - if err != nil { - t.Fatal(err.Error()) - } - } - } - - // Send final flush message - err = types.WriteMessage(types.ToRequestFlush(), conn) - if err != nil { - t.Fatal(err.Error()) - } - - <-done -} - -//------------------------- -// test grpc - -func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) { - return Connect(addr) -} - -func TestGRPCSync(t *testing.T) { - - numAppendTxs := 2000 - - // Start the listener - server, err := server.NewGRPCServer("unix://test.sock", types.NewGRPCApplication(NewNilApplication())) - if err != nil { - Exit(err.Error()) - } - defer server.Stop() - - // Connect to the socket - conn, err := grpc.Dial("unix://test.sock", grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) - if err != nil { - Exit(err.Error()) - } - defer conn.Close() - - client := types.NewTMSPApplicationClient(conn) - - // Write requests - for counter := 0; counter < numAppendTxs; counter++ { - // Send request - response, err := client.AppendTx(context.Background(), &types.RequestAppendTx{[]byte("test")}) - if err != nil { - t.Fatal(err.Error()) - } - counter += 1 - if response.Code != types.CodeType_OK { - t.Error("AppendTx failed with ret_code", response.Code) - } - if counter > numAppendTxs { - t.Fatal("Too many AppendTx responses") - } - t.Log("response", counter) - if counter == numAppendTxs { - go func() { - time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow - }() - } - - } -} diff --git a/server/grpc_server.go b/server/grpc_server.go index 3a21e3fb9..02f6fa3f8 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -18,6 +18,7 @@ type GRPCServer struct { proto string addr string listener net.Listener + server *grpc.Server app types.TMSPApplicationServer } @@ -43,13 +44,13 @@ func (s *GRPCServer) OnStart() error { return err } s.listener = ln - grpcServer := grpc.NewServer() - types.RegisterTMSPApplicationServer(grpcServer, s.app) - go grpcServer.Serve(ln) + s.server = grpc.NewServer() + types.RegisterTMSPApplicationServer(s.server, s.app) + go s.server.Serve(s.listener) return nil } func (s *GRPCServer) OnStop() { s.QuitService.OnStop() - s.listener.Close() + s.server.Stop() } diff --git a/server/log.go b/server/log.go new file mode 100644 index 000000000..8ac4092e7 --- /dev/null +++ b/server/log.go @@ -0,0 +1,7 @@ +package server + +import ( + "github.com/tendermint/go-logger" +) + +var log = logger.New("module", "tmsp-server") diff --git a/server/socket_server.go b/server/socket_server.go index 7969974e4..ff01a9082 100644 --- a/server/socket_server.go +++ b/server/socket_server.go @@ -21,6 +21,10 @@ type SocketServer struct { addr string listener net.Listener + connsMtx sync.Mutex + conns map[int]net.Conn + nextConnID int + appMtx sync.Mutex app types.Application } @@ -33,6 +37,7 @@ func NewSocketServer(protoAddr string, app types.Application) (Service, error) { addr: addr, listener: nil, app: app, + conns: make(map[int]net.Conn), } s.QuitService = *NewQuitService(nil, "TMSPServer", s) _, err := s.Start() // Just start it @@ -53,6 +58,33 @@ func (s *SocketServer) OnStart() error { func (s *SocketServer) OnStop() { s.QuitService.OnStop() s.listener.Close() + + s.connsMtx.Lock() + for id, conn := range s.conns { + delete(s.conns, id) + conn.Close() + } + s.connsMtx.Unlock() +} + +func (s *SocketServer) addConn(conn net.Conn) int { + s.connsMtx.Lock() + defer s.connsMtx.Unlock() + + connID := s.nextConnID + s.nextConnID += 1 + s.conns[connID] = conn + + return connID +} + +// deletes conn even if close errs +func (s *SocketServer) rmConn(connID int, conn net.Conn) error { + s.connsMtx.Lock() + defer s.connsMtx.Unlock() + + delete(s.conns, connID) + return conn.Close() } func (s *SocketServer) acceptConnectionsRoutine() { @@ -62,7 +94,7 @@ func (s *SocketServer) acceptConnectionsRoutine() { // semaphore <- struct{}{} // Accept a connection - fmt.Println("Waiting for new connection...") + log.Notice("Waiting for new connection...") conn, err := s.listener.Accept() if err != nil { if !s.IsRunning() { @@ -70,9 +102,11 @@ func (s *SocketServer) acceptConnectionsRoutine() { } Exit("Failed to accept connection: " + err.Error()) } else { - fmt.Println("Accepted a new connection") + log.Notice("Accepted a new connection") } + connID := s.addConn(conn) + closeConn := make(chan error, 2) // Push to signal connection closed responses := make(chan *types.Response, 1000) // A channel to buffer responses @@ -84,16 +118,19 @@ func (s *SocketServer) acceptConnectionsRoutine() { go func() { // Wait until signal to close connection errClose := <-closeConn - if errClose != nil { - fmt.Printf("Connection error: %v\n", errClose) + if err == io.EOF { + log.Warn("Connection was closed by client") + } else if errClose != nil { + log.Warn("Connection error", "error", errClose) } else { - fmt.Println("Connection was closed.") + // never happens + log.Warn("Connection was closed.") } // Close the connection - err := conn.Close() + err := s.rmConn(connID, conn) if err != nil { - fmt.Printf("Error in closing connection: %v\n", err) + log.Warn("Error in closing connection", "error", err) } // <-semaphore @@ -111,9 +148,9 @@ func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, respo err := types.ReadMessage(bufReader, req) if err != nil { if err == io.EOF { - closeConn <- fmt.Errorf("Connection closed by client") + closeConn <- err } else { - closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) + closeConn <- fmt.Errorf("Error reading message: %v", err.Error()) } return } @@ -176,13 +213,13 @@ func (s *SocketServer) handleResponses(closeConn chan error, responses <-chan *t var res = <-responses err := types.WriteMessage(res, bufWriter) if err != nil { - closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) + closeConn <- fmt.Errorf("Error writing message: %v", err.Error()) return } if _, ok := res.Value.(*types.Response_Flush); ok { err = bufWriter.Flush() if err != nil { - closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) + closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error()) return } } diff --git a/tests/test_cli/test.sh b/tests/test_cli/test.sh index f8920958a..adb3e4bd2 100644 --- a/tests/test_cli/test.sh +++ b/tests/test_cli/test.sh @@ -9,7 +9,7 @@ function testExample() { $APP &> /dev/null & sleep 2 tmsp-cli --verbose batch < $INPUT > "${INPUT}.out.new" - killall "$APP" > /dev/null + killall "$APP" &> /dev/null pre=`shasum < "${INPUT}.out"` post=`shasum < "${INPUT}.out.new"`