diff --git a/client/client.go b/client/client.go index b4adfbcda..3afb838f8 100644 --- a/client/client.go +++ b/client/client.go @@ -1,8 +1,10 @@ package tmspcli import ( - "github.com/tendermint/tmsp/types" + "fmt" "sync" + + "github.com/tendermint/tmsp/types" ) type Client interface { @@ -39,6 +41,21 @@ type Client interface { //---------------------------------------- +func NewClient(addr, transport string, mustConnect bool) (client Client, err error) { + switch transport { + case "socket": + client, err = NewSocketClient(addr, mustConnect) + case "grpc": + client, err = NewGRPCClient(addr, mustConnect) + default: + err = fmt.Errorf("Unknown tmsp transport %s", transport) + + } + return +} + +//---------------------------------------- + type Callback func(*types.Request, *types.Response) //---------------------------------------- diff --git a/client/grpc_client.go b/client/grpc_client.go new file mode 100644 index 000000000..0d28eed55 --- /dev/null +++ b/client/grpc_client.go @@ -0,0 +1,298 @@ +package tmspcli + +import ( + "fmt" + "net" + "sync" + "time" + + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" + + . "github.com/tendermint/go-common" + "github.com/tendermint/tmsp/types" +) + +// This is goroutine-safe, but users should beware that +// the application in general is not meant to be interfaced +// with concurrent callers. +type grpcClient struct { + QuitService + mustConnect bool + + client types.TMSPApplicationClient + + mtx sync.Mutex + addr string + err error + resCb func(*types.Request, *types.Response) // listens to all callbacks +} + +func NewGRPCClient(addr string, mustConnect bool) (*grpcClient, error) { + cli := &grpcClient{ + addr: addr, + mustConnect: mustConnect, + } + cli.QuitService = *NewQuitService(nil, "grpcClient", cli) + _, err := cli.Start() // Just start it, it's confusing for callers to remember to start. + return cli, err +} + +func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) { + return Connect(addr) +} + +func (cli *grpcClient) OnStart() (err error) { + cli.QuitService.OnStart() +RETRY_LOOP: + for { + conn, err_ := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) + if err_ != nil { + if cli.mustConnect { + err = err_ // OnStart() will return this. + return + } else { + fmt.Printf("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr) + time.Sleep(time.Second * 3) + continue RETRY_LOOP + } + } + cli.client = types.NewTMSPApplicationClient(conn) + return + } +} + +func (cli *grpcClient) OnStop() { + cli.QuitService.OnStop() + // TODO: close client (?) +} + +// Set listener for all responses +// NOTE: callback may get internally generated flush responses. +func (cli *grpcClient) SetResponseCallback(resCb Callback) { + cli.mtx.Lock() + defer cli.mtx.Unlock() + cli.resCb = resCb +} + +func (cli *grpcClient) StopForError(err error) { + cli.mtx.Lock() + fmt.Printf("Stopping tmsp.grpcClient for error: %v\n", err.Error()) + if cli.err == nil { + cli.err = err + } + cli.mtx.Unlock() + cli.Stop() +} + +func (cli *grpcClient) Error() error { + cli.mtx.Lock() + defer cli.mtx.Unlock() + return cli.err +} + +//---------------------------------------- +// async calls are really sync. +// maybe one day, if people really want it, we use grpc streams, +// but hopefully not :D + +func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes { + reqres := NewReqRes(req) + reqres.Response = res // Set response + reqres.Done() // Release waiters + + // Notify reqRes listener if set + if cb := reqres.GetCallback(); cb != nil { + cb(res) + } + + // Notify client listener if set + if cli.resCb != nil { + cli.resCb(reqres.Request, res) + } + return reqres +} + +func (cli *grpcClient) EchoAsync(msg string) *ReqRes { + req := types.ToRequestEcho(msg) + res, err := cli.client.Echo(context.Background(), req.GetEcho()) + if err != nil { + cli.err = err + } + return cli.finishAsyncCall(req, &types.Response{&types.Response_Echo{res}}) +} + +func (cli *grpcClient) FlushAsync() *ReqRes { + req := types.ToRequestFlush() + res, err := cli.client.Flush(context.Background(), req.GetFlush()) + if err != nil { + cli.err = err + } + return cli.finishAsyncCall(req, &types.Response{&types.Response_Flush{res}}) +} + +func (cli *grpcClient) InfoAsync() *ReqRes { + req := types.ToRequestInfo() + res, err := cli.client.Info(context.Background(), req.GetInfo()) + if err != nil { + cli.err = err + } + return cli.finishAsyncCall(req, &types.Response{&types.Response_Info{res}}) +} + +func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes { + req := types.ToRequestSetOption(key, value) + res, err := cli.client.SetOption(context.Background(), req.GetSetOption()) + if err != nil { + cli.err = err + } + return cli.finishAsyncCall(req, &types.Response{&types.Response_SetOption{res}}) +} + +func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes { + req := types.ToRequestAppendTx(tx) + res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx()) + if err != nil { + cli.err = err + } + return cli.finishAsyncCall(req, &types.Response{&types.Response_AppendTx{res}}) +} + +func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes { + req := types.ToRequestCheckTx(tx) + res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx()) + if err != nil { + cli.err = err + } + return cli.finishAsyncCall(req, &types.Response{&types.Response_CheckTx{res}}) +} + +func (cli *grpcClient) QueryAsync(query []byte) *ReqRes { + req := types.ToRequestQuery(query) + res, err := cli.client.Query(context.Background(), req.GetQuery()) + if err != nil { + cli.err = err + } + return cli.finishAsyncCall(req, &types.Response{&types.Response_Query{res}}) +} + +func (cli *grpcClient) CommitAsync() *ReqRes { + req := types.ToRequestCommit() + res, err := cli.client.Commit(context.Background(), req.GetCommit()) + if err != nil { + cli.err = err + } + return cli.finishAsyncCall(req, &types.Response{&types.Response_Commit{res}}) +} + +func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes { + req := types.ToRequestInitChain(validators) + res, err := cli.client.InitChain(context.Background(), req.GetInitChain()) + if err != nil { + cli.err = err + } + return cli.finishAsyncCall(req, &types.Response{&types.Response_InitChain{res}}) +} + +func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes { + req := types.ToRequestBeginBlock(height) + res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock()) + if err != nil { + cli.err = err + } + return cli.finishAsyncCall(req, &types.Response{&types.Response_BeginBlock{res}}) +} + +func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes { + req := types.ToRequestEndBlock(height) + res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock()) + if err != nil { + cli.err = err + } + return cli.finishAsyncCall(req, &types.Response{&types.Response_EndBlock{res}}) +} + +//---------------------------------------- + +func (cli *grpcClient) EchoSync(msg string) (res types.Result) { + r := cli.EchoAsync(msg).Response.GetEcho() + return types.NewResultOK([]byte(r.Message), LOG) +} + +func (cli *grpcClient) FlushSync() error { + return nil +} + +func (cli *grpcClient) InfoSync() (res types.Result) { + r := cli.InfoAsync().Response.GetInfo() + return types.NewResultOK([]byte(r.Info), LOG) +} + +func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) { + reqres := cli.SetOptionAsync(key, value) + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) + } + resp := reqres.Response.GetSetOption() + return types.Result{Code: OK, Data: nil, Log: resp.Log} +} + +func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) { + reqres := cli.AppendTxAsync(tx) + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) + } + resp := reqres.Response.GetAppendTx() + return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} +} + +func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) { + reqres := cli.CheckTxAsync(tx) + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) + } + resp := reqres.Response.GetCheckTx() + return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} +} + +func (cli *grpcClient) QuerySync(query []byte) (res types.Result) { + reqres := cli.QueryAsync(query) + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) + } + resp := reqres.Response.GetQuery() + return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} +} + +func (cli *grpcClient) CommitSync() (res types.Result) { + reqres := cli.CommitAsync() + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) + } + resp := reqres.Response.GetCommit() + return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} +} + +func (cli *grpcClient) InitChainSync(validators []*types.Validator) (err error) { + cli.InitChainAsync(validators) + if cli.err != nil { + return cli.err + } + return nil +} + +func (cli *grpcClient) BeginBlockSync(height uint64) (err error) { + cli.BeginBlockAsync(height) + if cli.err != nil { + return cli.err + } + return nil +} + +func (cli *grpcClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) { + reqres := cli.EndBlockAsync(height) + if cli.err != nil { + return nil, cli.err + } + return reqres.Response.GetEndBlock().Diffs, nil +} diff --git a/client/remote_client.go b/client/remote_client.go index aa8a4aea5..ae832716c 100644 --- a/client/remote_client.go +++ b/client/remote_client.go @@ -42,7 +42,7 @@ type remoteClient struct { resCb func(*types.Request, *types.Response) // listens to all callbacks } -func NewClient(addr string, mustConnect bool) (*remoteClient, error) { +func NewSocketClient(addr string, mustConnect bool) (*remoteClient, error) { cli := &remoteClient{ reqQueue: make(chan *ReqRes, reqQueueSize), flushTimer: NewThrottleTimer("remoteClient", flushThrottleMS), @@ -129,7 +129,7 @@ func (cli *remoteClient) sendValueRoutine(conn net.Conn) { select { case <-cli.flushTimer.Ch: select { - case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): + case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): // cant this block ? default: // Probably will fill the buffer, or retry later. } @@ -369,6 +369,7 @@ func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Valida func (cli *remoteClient) queueRequest(req *types.Request) *ReqRes { reqres := NewReqRes(req) + // TODO: set cli.err if reqQueue times out cli.reqQueue <- reqres diff --git a/cmd/counter/main.go b/cmd/counter/main.go index 07bf4403c..c04a40e0f 100644 --- a/cmd/counter/main.go +++ b/cmd/counter/main.go @@ -6,31 +6,20 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/tmsp/example/counter" "github.com/tendermint/tmsp/server" - "github.com/tendermint/tmsp/types" ) func main() { addrPtr := flag.String("addr", "tcp://0.0.0.0:46658", "Listen address") - grpcPtr := flag.String("tmsp", "socket", "TMSP server: socket | grpc") + tmspPtr := flag.String("tmsp", "socket", "TMSP server: socket | grpc") serialPtr := flag.Bool("serial", false, "Enforce incrementing (serial) txs") flag.Parse() app := counter.NewCounterApplication(*serialPtr) // Start the listener - switch *grpcPtr { - case "socket": - _, err := server.NewServer(*addrPtr, app) - if err != nil { - Exit(err.Error()) - } - case "grpc": - _, err := server.NewGRPCServer(*addrPtr, types.NewGRPCApplication(app)) - if err != nil { - Exit(err.Error()) - } - default: - Exit(Fmt("Unknown server type %s", *grpcPtr)) + _, err := server.NewServer(*addrPtr, *tmspPtr, app) + if err != nil { + Exit(err.Error()) } // Wait forever diff --git a/cmd/dummy/main.go b/cmd/dummy/main.go index 25486e44f..8efa69e6b 100644 --- a/cmd/dummy/main.go +++ b/cmd/dummy/main.go @@ -11,10 +11,11 @@ import ( func main() { addrPtr := flag.String("addr", "tcp://0.0.0.0:46658", "Listen address") + tmspPtr := flag.String("tmsp", "socket", "socket | grpc") flag.Parse() // Start the listener - _, err := server.NewServer(*addrPtr, dummy.NewDummyApplication()) + _, err := server.NewServer(*addrPtr, *tmspPtr, dummy.NewDummyApplication()) if err != nil { Exit(err.Error()) } diff --git a/cmd/tmsp-cli/tmsp-cli.go b/cmd/tmsp-cli/tmsp-cli.go index 9e746b572..6e02166a5 100644 --- a/cmd/tmsp-cli/tmsp-cli.go +++ b/cmd/tmsp-cli/tmsp-cli.go @@ -29,6 +29,13 @@ func main() { Usage: "address of application socket", }, } + app.Flags = []cli.Flag{ + cli.StringFlag{ + Name: "tmsp", + Value: "socket", + Usage: "socket or grpc", + }, + } app.Commands = []cli.Command{ { Name: "batch", @@ -105,7 +112,7 @@ func main() { func before(c *cli.Context) error { if client == nil { var err error - client, err = tmspcli.NewClient(c.GlobalString("address"), false) + client, err = tmspcli.NewClient(c.GlobalString("address"), c.GlobalString("tmsp"), false) if err != nil { Exit(err.Error()) } diff --git a/example/dummy/dummy_test.go b/example/dummy/dummy_test.go index 7fcffd847..c5ac2d271 100644 --- a/example/dummy/dummy_test.go +++ b/example/dummy/dummy_test.go @@ -15,7 +15,7 @@ func TestStream(t *testing.T) { numAppendTxs := 200000 // Start the listener - server, err := server.NewServer("unix://test.sock", NewDummyApplication()) + server, err := server.NewSocketServer("unix://test.sock", NewDummyApplication()) if err != nil { Exit(err.Error()) } diff --git a/example/nil/nil_test.go b/example/nil/nil_test.go index ba038b87d..ed7a36ddb 100644 --- a/example/nil/nil_test.go +++ b/example/nil/nil_test.go @@ -19,7 +19,7 @@ func TestStream(t *testing.T) { numAppendTxs := 200000 // Start the listener - server, err := server.NewServer("unix://test.sock", NewNilApplication()) + server, err := server.NewSocketServer("unix://test.sock", NewNilApplication()) if err != nil { Exit(err.Error()) } diff --git a/server/server.go b/server/server.go index ed8325957..f2b9d0af4 100644 --- a/server/server.go +++ b/server/server.go @@ -1,191 +1,22 @@ package server import ( - "bufio" "fmt" - "io" - "net" - "strings" - "sync" . "github.com/tendermint/go-common" "github.com/tendermint/tmsp/types" ) -// var maxNumberConnections = 2 - -type Server struct { - QuitService - - proto string - addr string - listener net.Listener - - appMtx sync.Mutex - app types.Application -} - -func NewServer(protoAddr string, app types.Application) (Service, error) { - parts := strings.SplitN(protoAddr, "://", 2) - proto, addr := parts[0], parts[1] - s := &Server{ - proto: proto, - addr: addr, - listener: nil, - app: app, - } - s.QuitService = *NewQuitService(nil, "TMSPServer", s) - _, err := s.Start() // Just start it - return s, err -} - -func (s *Server) OnStart() error { - s.QuitService.OnStart() - ln, err := net.Listen(s.proto, s.addr) - if err != nil { - return err - } - s.listener = ln - go s.acceptConnectionsRoutine() - return nil -} - -func (s *Server) OnStop() { - s.QuitService.OnStop() - s.listener.Close() -} - -func (s *Server) acceptConnectionsRoutine() { - // semaphore := make(chan struct{}, maxNumberConnections) - - for { - // semaphore <- struct{}{} - - // Accept a connection - fmt.Println("Waiting for new connection...") - conn, err := s.listener.Accept() - if err != nil { - if !s.IsRunning() { - return // Ignore error from listener closing. - } - Exit("Failed to accept connection: " + err.Error()) - } else { - fmt.Println("Accepted a new connection") - } - - closeConn := make(chan error, 2) // Push to signal connection closed - responses := make(chan *types.Response, 1000) // A channel to buffer responses - - // Read requests from conn and deal with them - go s.handleRequests(closeConn, conn, responses) - // Pull responses from 'responses' and write them to conn. - go s.handleResponses(closeConn, responses, conn) - - go func() { - // Wait until signal to close connection - errClose := <-closeConn - if errClose != nil { - fmt.Printf("Connection error: %v\n", errClose) - } else { - fmt.Println("Connection was closed.") - } - - // Close the connection - err := conn.Close() - if err != nil { - fmt.Printf("Error in closing connection: %v\n", err) - } - - // <-semaphore - }() - } -} - -// Read requests from conn and deal with them -func (s *Server) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) { - var count int - var bufReader = bufio.NewReader(conn) - for { - - var req = &types.Request{} - err := types.ReadMessage(bufReader, req) - if err != nil { - if err == io.EOF { - closeConn <- fmt.Errorf("Connection closed by client") - } else { - closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) - } - return - } - s.appMtx.Lock() - count++ - s.handleRequest(req, responses) - s.appMtx.Unlock() - } -} - -func (s *Server) handleRequest(req *types.Request, responses chan<- *types.Response) { - switch r := req.Value.(type) { - case *types.Request_Echo: - responses <- types.ToResponseEcho(r.Echo.Message) - case *types.Request_Flush: - responses <- types.ToResponseFlush() - case *types.Request_Info: - data := s.app.Info() - responses <- types.ToResponseInfo(data) - case *types.Request_SetOption: - so := r.SetOption - logStr := s.app.SetOption(so.Key, so.Value) - responses <- types.ToResponseSetOption(logStr) - case *types.Request_AppendTx: - res := s.app.AppendTx(r.AppendTx.Tx) - responses <- types.ToResponseAppendTx(res.Code, res.Data, res.Log) - case *types.Request_CheckTx: - res := s.app.CheckTx(r.CheckTx.Tx) - responses <- types.ToResponseCheckTx(res.Code, res.Data, res.Log) - case *types.Request_Commit: - res := s.app.Commit() - responses <- types.ToResponseCommit(res.Code, res.Data, res.Log) - case *types.Request_Query: - res := s.app.Query(r.Query.Query) - responses <- types.ToResponseQuery(res.Code, res.Data, res.Log) - case *types.Request_InitChain: - if app, ok := s.app.(types.BlockchainAware); ok { - app.InitChain(r.InitChain.Validators) - responses <- types.ToResponseInitChain() - } else { - responses <- types.ToResponseInitChain() - } - case *types.Request_EndBlock: - if app, ok := s.app.(types.BlockchainAware); ok { - validators := app.EndBlock(r.EndBlock.Height) - responses <- types.ToResponseEndBlock(validators) - } else { - responses <- types.ToResponseEndBlock(nil) - } +func NewServer(protoAddr, transport string, app types.Application) (Service, error) { + var s Service + var err error + switch transport { + case "socket": + s, err = NewSocketServer(protoAddr, app) + case "grpc": + s, err = NewGRPCServer(protoAddr, types.NewGRPCApplication(app)) default: - responses <- types.ToResponseException("Unknown request") - } -} - -// Pull responses from 'responses' and write them to conn. -func (s *Server) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) { - var count int - var bufWriter = bufio.NewWriter(conn) - for { - var res = <-responses - err := types.WriteMessage(res, bufWriter) - if err != nil { - closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) - return - } - if _, ok := res.Value.(*types.Response_Flush); ok { - err = bufWriter.Flush() - if err != nil { - closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) - return - } - } - count++ + err = fmt.Errorf("Unknown server type %s", transport) } + return s, err } diff --git a/server/socket_server.go b/server/socket_server.go new file mode 100644 index 000000000..7969974e4 --- /dev/null +++ b/server/socket_server.go @@ -0,0 +1,191 @@ +package server + +import ( + "bufio" + "fmt" + "io" + "net" + "strings" + "sync" + + . "github.com/tendermint/go-common" + "github.com/tendermint/tmsp/types" +) + +// var maxNumberConnections = 2 + +type SocketServer struct { + QuitService + + proto string + addr string + listener net.Listener + + appMtx sync.Mutex + app types.Application +} + +func NewSocketServer(protoAddr string, app types.Application) (Service, error) { + parts := strings.SplitN(protoAddr, "://", 2) + proto, addr := parts[0], parts[1] + s := &SocketServer{ + proto: proto, + addr: addr, + listener: nil, + app: app, + } + s.QuitService = *NewQuitService(nil, "TMSPServer", s) + _, err := s.Start() // Just start it + return s, err +} + +func (s *SocketServer) OnStart() error { + s.QuitService.OnStart() + ln, err := net.Listen(s.proto, s.addr) + if err != nil { + return err + } + s.listener = ln + go s.acceptConnectionsRoutine() + return nil +} + +func (s *SocketServer) OnStop() { + s.QuitService.OnStop() + s.listener.Close() +} + +func (s *SocketServer) acceptConnectionsRoutine() { + // semaphore := make(chan struct{}, maxNumberConnections) + + for { + // semaphore <- struct{}{} + + // Accept a connection + fmt.Println("Waiting for new connection...") + conn, err := s.listener.Accept() + if err != nil { + if !s.IsRunning() { + return // Ignore error from listener closing. + } + Exit("Failed to accept connection: " + err.Error()) + } else { + fmt.Println("Accepted a new connection") + } + + closeConn := make(chan error, 2) // Push to signal connection closed + responses := make(chan *types.Response, 1000) // A channel to buffer responses + + // Read requests from conn and deal with them + go s.handleRequests(closeConn, conn, responses) + // Pull responses from 'responses' and write them to conn. + go s.handleResponses(closeConn, responses, conn) + + go func() { + // Wait until signal to close connection + errClose := <-closeConn + if errClose != nil { + fmt.Printf("Connection error: %v\n", errClose) + } else { + fmt.Println("Connection was closed.") + } + + // Close the connection + err := conn.Close() + if err != nil { + fmt.Printf("Error in closing connection: %v\n", err) + } + + // <-semaphore + }() + } +} + +// Read requests from conn and deal with them +func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) { + var count int + var bufReader = bufio.NewReader(conn) + for { + + var req = &types.Request{} + err := types.ReadMessage(bufReader, req) + if err != nil { + if err == io.EOF { + closeConn <- fmt.Errorf("Connection closed by client") + } else { + closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) + } + return + } + s.appMtx.Lock() + count++ + s.handleRequest(req, responses) + s.appMtx.Unlock() + } +} + +func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) { + switch r := req.Value.(type) { + case *types.Request_Echo: + responses <- types.ToResponseEcho(r.Echo.Message) + case *types.Request_Flush: + responses <- types.ToResponseFlush() + case *types.Request_Info: + data := s.app.Info() + responses <- types.ToResponseInfo(data) + case *types.Request_SetOption: + so := r.SetOption + logStr := s.app.SetOption(so.Key, so.Value) + responses <- types.ToResponseSetOption(logStr) + case *types.Request_AppendTx: + res := s.app.AppendTx(r.AppendTx.Tx) + responses <- types.ToResponseAppendTx(res.Code, res.Data, res.Log) + case *types.Request_CheckTx: + res := s.app.CheckTx(r.CheckTx.Tx) + responses <- types.ToResponseCheckTx(res.Code, res.Data, res.Log) + case *types.Request_Commit: + res := s.app.Commit() + responses <- types.ToResponseCommit(res.Code, res.Data, res.Log) + case *types.Request_Query: + res := s.app.Query(r.Query.Query) + responses <- types.ToResponseQuery(res.Code, res.Data, res.Log) + case *types.Request_InitChain: + if app, ok := s.app.(types.BlockchainAware); ok { + app.InitChain(r.InitChain.Validators) + responses <- types.ToResponseInitChain() + } else { + responses <- types.ToResponseInitChain() + } + case *types.Request_EndBlock: + if app, ok := s.app.(types.BlockchainAware); ok { + validators := app.EndBlock(r.EndBlock.Height) + responses <- types.ToResponseEndBlock(validators) + } else { + responses <- types.ToResponseEndBlock(nil) + } + default: + responses <- types.ToResponseException("Unknown request") + } +} + +// Pull responses from 'responses' and write them to conn. +func (s *SocketServer) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) { + var count int + var bufWriter = bufio.NewWriter(conn) + for { + var res = <-responses + err := types.WriteMessage(res, bufWriter) + if err != nil { + closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) + return + } + if _, ok := res.Value.(*types.Response_Flush); ok { + err = bufWriter.Flush() + if err != nil { + closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) + return + } + } + count++ + } +} diff --git a/tests/test.sh b/tests/test.sh index d375eafae..ebdab0c80 100755 --- a/tests/test.sh +++ b/tests/test.sh @@ -5,5 +5,8 @@ cd $ROOT # test golang counter COUNTER_APP="counter" go run $ROOT/tests/test_counter.go +# test golang counter via grpc +COUNTER_APP="counter -tmsp=grpc" go run $ROOT/tests/test_counter.go -tmsp=grpc + # test nodejs counter COUNTER_APP="node ../js-tmsp/example/app.js" go run $ROOT/tests/test_counter.go diff --git a/tests/test_counter.go b/tests/test_counter.go index 2d60543e1..2f78b6964 100644 --- a/tests/test_counter.go +++ b/tests/test_counter.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "flag" "fmt" "os" "time" @@ -12,7 +13,10 @@ import ( "github.com/tendermint/tmsp/types" ) +var tmspPtr = flag.String("tmsp", "socket", "socket or grpc") + func main() { + flag.Parse() // Run tests testBasic() @@ -70,7 +74,7 @@ func startApp() *process.Process { func startClient() tmspcli.Client { // Start client - client, err := tmspcli.NewClient("tcp://127.0.0.1:46658", true) + client, err := tmspcli.NewClient("tcp://127.0.0.1:46658", *tmspPtr, true) if err != nil { panic("connecting to counter_app: " + err.Error()) }