From 2936c68339f9b2926aa4fa6f50226ea10380658f Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sat, 30 Jan 2016 19:36:33 -0800 Subject: [PATCH] Convert TMSP to use Protobuf --- Makefile | 5 +- client/client.go | 120 +++++-------- cmd/tmsp-cli/tmsp-cli.go | 41 ++--- example/golang/dummy_test.go | 30 ++-- server/server.go | 66 ++++--- tests/benchmarks/parallel/parallel.go | 15 +- tests/benchmarks/simple/simple.go | 25 ++- tests/benchmarks/wire_test.go | 44 ----- types/messages.go | 249 ++++++++++++++------------ types/retcode.go | 2 +- types/types.pb.go | 69 +++++++ types/types.proto | 47 +++++ 12 files changed, 382 insertions(+), 331 deletions(-) delete mode 100644 tests/benchmarks/wire_test.go create mode 100644 types/types.pb.go create mode 100644 types/types.proto diff --git a/Makefile b/Makefile index bb7505989..20fb8a236 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,9 @@ .PHONY: all test get_deps -all: test install +all: protoc test install + +protoc: + protoc --go_out=. types/*.proto install: get_deps go install github.com/tendermint/tmsp/cmd/... diff --git a/client/client.go b/client/client.go index d25348d11..e95639473 100644 --- a/client/client.go +++ b/client/client.go @@ -6,18 +6,16 @@ import ( "errors" "fmt" "net" - "reflect" "sync" . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - tmsp "github.com/tendermint/tmsp/types" + "github.com/tendermint/tmsp/types" ) const maxResponseSize = 1048576 // 1MB TODO make configurable const flushThrottleMS = 20 // Don't wait longer than... -type Callback func(tmsp.Request, tmsp.Response) +type Callback func(*types.Request, *types.Response) // This is goroutine-safe, but users should beware that // the application in general is not meant to be interfaced @@ -34,7 +32,7 @@ type TMSPClient struct { bufWriter *bufio.Writer err error reqSent *list.List - resCb func(tmsp.Request, tmsp.Response) + resCb func(*types.Request, *types.Response) } func NewTMSPClient(conn net.Conn, bufferSize int) *TMSPClient { @@ -91,12 +89,10 @@ func (cli *TMSPClient) Error() error { func (cli *TMSPClient) sendRequestsRoutine() { for { - var n int - var err error select { case <-cli.flushTimer.Ch: select { - case cli.reqQueue <- newReqRes(tmsp.RequestFlush{}): + case cli.reqQueue <- newReqRes(types.RequestFlush()): default: // Probably will fill the buffer, or retry later. } @@ -104,13 +100,13 @@ func (cli *TMSPClient) sendRequestsRoutine() { return case reqres := <-cli.reqQueue: cli.willSendReq(reqres) - wire.WriteBinaryLengthPrefixed(struct{ tmsp.Request }{reqres.Request}, cli.bufWriter, &n, &err) // Length prefix + err := types.WriteMessage(reqres.Request, cli.bufWriter) if err != nil { cli.StopForError(err) return } // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) - if _, ok := reqres.Request.(tmsp.RequestFlush); ok { + if reqres.Request.Type == types.RequestTypeFlush { err = cli.bufWriter.Flush() if err != nil { cli.StopForError(err) @@ -124,16 +120,14 @@ func (cli *TMSPClient) sendRequestsRoutine() { func (cli *TMSPClient) recvResponseRoutine() { r := bufio.NewReader(cli.conn) // Buffer reads for { - var res tmsp.Response - var n int - var err error - wire.ReadBinaryPtrLengthPrefixed(&res, r, maxResponseSize, &n, &err) + var res = &types.Response{} + err := types.ReadMessage(r, res) if err != nil { cli.StopForError(err) return } - switch res := res.(type) { - case tmsp.ResponseException: + switch res.Type { + case types.ResponseTypeException: // XXX After setting cli.err, release waiters (e.g. reqres.Done()) cli.StopForError(errors.New(res.Error)) default: @@ -152,19 +146,19 @@ func (cli *TMSPClient) willSendReq(reqres *reqRes) { cli.reqSent.PushBack(reqres) } -func (cli *TMSPClient) didRecvResponse(res tmsp.Response) error { +func (cli *TMSPClient) didRecvResponse(res *types.Response) error { cli.mtx.Lock() defer cli.mtx.Unlock() // Get the first reqRes next := cli.reqSent.Front() if next == nil { - return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res)) + return fmt.Errorf("Unexpected result type %v when nothing expected", res.Type) } reqres := next.Value.(*reqRes) if !resMatchesReq(reqres.Request, res) { return fmt.Errorf("Unexpected result type %v when response to %v expected", - reflect.TypeOf(res), reflect.TypeOf(reqres.Request)) + res.Type, reqres.Request.Type) } reqres.Response = res // Set response @@ -182,99 +176,99 @@ func (cli *TMSPClient) didRecvResponse(res tmsp.Response) error { //---------------------------------------- func (cli *TMSPClient) EchoAsync(msg string) { - cli.queueRequest(tmsp.RequestEcho{msg}) + cli.queueRequest(types.RequestEcho(msg)) } func (cli *TMSPClient) FlushAsync() { - cli.queueRequest(tmsp.RequestFlush{}) + cli.queueRequest(types.RequestFlush()) } func (cli *TMSPClient) SetOptionAsync(key string, value string) { - cli.queueRequest(tmsp.RequestSetOption{key, value}) + cli.queueRequest(types.RequestSetOption(key, value)) } func (cli *TMSPClient) AppendTxAsync(tx []byte) { - cli.queueRequest(tmsp.RequestAppendTx{tx}) + cli.queueRequest(types.RequestAppendTx(tx)) } func (cli *TMSPClient) CheckTxAsync(tx []byte) { - cli.queueRequest(tmsp.RequestCheckTx{tx}) + cli.queueRequest(types.RequestCheckTx(tx)) } func (cli *TMSPClient) GetHashAsync() { - cli.queueRequest(tmsp.RequestGetHash{}) + cli.queueRequest(types.RequestGetHash()) } func (cli *TMSPClient) QueryAsync(query []byte) { - cli.queueRequest(tmsp.RequestQuery{query}) + cli.queueRequest(types.RequestQuery(query)) } //---------------------------------------- func (cli *TMSPClient) InfoSync() (info string, err error) { - reqres := cli.queueRequest(tmsp.RequestInfo{}) + reqres := cli.queueRequest(types.RequestInfo()) cli.FlushSync() if cli.err != nil { return "", cli.err } - return reqres.Response.(tmsp.ResponseInfo).Info, nil + return string(reqres.Response.Data), nil } func (cli *TMSPClient) FlushSync() error { - cli.queueRequest(tmsp.RequestFlush{}).Wait() + cli.queueRequest(types.RequestFlush()).Wait() return cli.err } -func (cli *TMSPClient) AppendTxSync(tx []byte) (code tmsp.RetCode, result []byte, log string, err error) { - reqres := cli.queueRequest(tmsp.RequestAppendTx{tx}) +func (cli *TMSPClient) AppendTxSync(tx []byte) (code types.RetCode, result []byte, log string, err error) { + reqres := cli.queueRequest(types.RequestAppendTx(tx)) cli.FlushSync() if cli.err != nil { - return tmsp.RetCodeInternalError, nil, "", cli.err + return types.RetCodeInternalError, nil, "", cli.err } - res := reqres.Response.(tmsp.ResponseAppendTx) - return res.Code, res.Result, res.Log, nil + res := reqres.Response + return types.RetCode(res.Code), res.Data, res.Log, nil } -func (cli *TMSPClient) CheckTxSync(tx []byte) (code tmsp.RetCode, result []byte, log string, err error) { - reqres := cli.queueRequest(tmsp.RequestCheckTx{tx}) +func (cli *TMSPClient) CheckTxSync(tx []byte) (code types.RetCode, result []byte, log string, err error) { + reqres := cli.queueRequest(types.RequestCheckTx(tx)) cli.FlushSync() if cli.err != nil { - return tmsp.RetCodeInternalError, nil, "", cli.err + return types.RetCodeInternalError, nil, "", cli.err } - res := reqres.Response.(tmsp.ResponseCheckTx) - return res.Code, res.Result, res.Log, nil + res := reqres.Response + return types.RetCode(res.Code), res.Data, res.Log, nil } func (cli *TMSPClient) GetHashSync() (hash []byte, log string, err error) { - reqres := cli.queueRequest(tmsp.RequestGetHash{}) + reqres := cli.queueRequest(types.RequestGetHash()) cli.FlushSync() if cli.err != nil { return nil, "", cli.err } - res := reqres.Response.(tmsp.ResponseGetHash) - return res.Hash, res.Log, nil + res := reqres.Response + return res.Data, res.Log, nil } func (cli *TMSPClient) QuerySync(query []byte) (result []byte, log string, err error) { - reqres := cli.queueRequest(tmsp.RequestQuery{query}) + reqres := cli.queueRequest(types.RequestQuery(query)) cli.FlushSync() if cli.err != nil { return nil, "", cli.err } - res := reqres.Response.(tmsp.ResponseQuery) - return res.Result, res.Log, nil + res := reqres.Response + return res.Data, res.Log, nil } //---------------------------------------- -func (cli *TMSPClient) queueRequest(req tmsp.Request) *reqRes { +func (cli *TMSPClient) queueRequest(req *types.Request) *reqRes { reqres := newReqRes(req) // TODO: set cli.err if reqQueue times out cli.reqQueue <- reqres // Maybe auto-flush, or unset auto-flush - switch req.(type) { - case tmsp.RequestFlush: + switch req.Type { + case types.RequestTypeFlush: cli.flushTimer.Unset() default: cli.flushTimer.Set() @@ -285,37 +279,17 @@ func (cli *TMSPClient) queueRequest(req tmsp.Request) *reqRes { //---------------------------------------- -func resMatchesReq(req tmsp.Request, res tmsp.Response) (ok bool) { - switch req.(type) { - case tmsp.RequestEcho: - _, ok = res.(tmsp.ResponseEcho) - case tmsp.RequestFlush: - _, ok = res.(tmsp.ResponseFlush) - case tmsp.RequestInfo: - _, ok = res.(tmsp.ResponseInfo) - case tmsp.RequestSetOption: - _, ok = res.(tmsp.ResponseSetOption) - case tmsp.RequestAppendTx: - _, ok = res.(tmsp.ResponseAppendTx) - case tmsp.RequestCheckTx: - _, ok = res.(tmsp.ResponseCheckTx) - case tmsp.RequestGetHash: - _, ok = res.(tmsp.ResponseGetHash) - case tmsp.RequestQuery: - _, ok = res.(tmsp.ResponseQuery) - default: - return false - } - return +func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { + return req.Type+0x10 == res.Type } type reqRes struct { - tmsp.Request + *types.Request *sync.WaitGroup - tmsp.Response // Not set atomically, so be sure to use WaitGroup. + *types.Response // Not set atomically, so be sure to use WaitGroup. } -func newReqRes(req tmsp.Request) *reqRes { +func newReqRes(req *types.Request) *reqRes { return &reqRes{ Request: req, WaitGroup: waitGroup1(), diff --git a/cmd/tmsp-cli/tmsp-cli.go b/cmd/tmsp-cli/tmsp-cli.go index 4aed1f1ec..9526fab6d 100644 --- a/cmd/tmsp-cli/tmsp-cli.go +++ b/cmd/tmsp-cli/tmsp-cli.go @@ -8,14 +8,11 @@ import ( "io" "net" "os" - "reflect" "strings" + "github.com/codegangsta/cli" . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" "github.com/tendermint/tmsp/types" - - "github.com/codegangsta/cli" ) // connection is a global variable so it can be reused by the console @@ -162,7 +159,7 @@ func cmdEcho(c *cli.Context) { fmt.Println("echo takes 1 argument") return } - res, err := makeRequest(conn, types.RequestEcho{args[0]}) + res, err := makeRequest(conn, types.RequestEcho(args[0])) if err != nil { fmt.Println(err.Error()) return @@ -172,7 +169,7 @@ func cmdEcho(c *cli.Context) { // Get some info from the application func cmdInfo(c *cli.Context) { - res, err := makeRequest(conn, types.RequestInfo{}) + res, err := makeRequest(conn, types.RequestInfo()) if err != nil { fmt.Println(err.Error()) return @@ -187,7 +184,7 @@ func cmdSetOption(c *cli.Context) { fmt.Println("set_option takes 2 arguments (key, value)") return } - _, err := makeRequest(conn, types.RequestSetOption{args[0], args[1]}) + _, err := makeRequest(conn, types.RequestSetOption(args[0], args[1])) if err != nil { fmt.Println(err.Error()) return @@ -213,7 +210,7 @@ func cmdAppendTx(c *cli.Context) { } } - res, err := makeRequest(conn, types.RequestAppendTx{tx}) + res, err := makeRequest(conn, types.RequestAppendTx(tx)) if err != nil { fmt.Println(err.Error()) return @@ -239,7 +236,7 @@ func cmdCheckTx(c *cli.Context) { } } - res, err := makeRequest(conn, types.RequestCheckTx{tx}) + res, err := makeRequest(conn, types.RequestCheckTx(tx)) if err != nil { fmt.Println(err.Error()) return @@ -249,12 +246,12 @@ func cmdCheckTx(c *cli.Context) { // Get application Merkle root hash func cmdGetHash(c *cli.Context) { - res, err := makeRequest(conn, types.RequestGetHash{}) + res, err := makeRequest(conn, types.RequestGetHash()) if err != nil { fmt.Println(err.Error()) return } - fmt.Printf("%X\n", res.(types.ResponseGetHash).Hash) + fmt.Printf("%X\n", res.Data) } // Query application state @@ -275,7 +272,7 @@ func cmdQuery(c *cli.Context) { } } - res, err := makeRequest(conn, types.RequestQuery{query}) + res, err := makeRequest(conn, types.RequestQuery(query)) if err != nil { fmt.Println(err.Error()) return @@ -285,37 +282,35 @@ func cmdQuery(c *cli.Context) { //-------------------------------------------------------------------------------- -func makeRequest(conn net.Conn, req types.Request) (types.Response, error) { - var n int - var err error +func makeRequest(conn net.Conn, req *types.Request) (*types.Response, error) { // Write desired request - wire.WriteBinaryLengthPrefixed(struct{ types.Request }{req}, conn, &n, &err) + err := types.WriteMessage(req, conn) if err != nil { return nil, err } // Write flush request - wire.WriteBinaryLengthPrefixed(struct{ types.Request }{types.RequestFlush{}}, conn, &n, &err) + err = types.WriteMessage(types.RequestFlush(), conn) if err != nil { return nil, err } // Read desired response - var res types.Response - wire.ReadBinaryPtrLengthPrefixed(&res, conn, 0, &n, &err) + var res = &types.Response{} + err = types.ReadMessage(conn, res) if err != nil { return nil, err } // Read flush response - var resFlush types.Response - wire.ReadBinaryPtrLengthPrefixed(&resFlush, conn, 0, &n, &err) + var resFlush = &types.Response{} + err = types.ReadMessage(conn, resFlush) if err != nil { return nil, err } - if _, ok := resFlush.(types.ResponseFlush); !ok { - return nil, errors.New(Fmt("Expected types.ResponseFlush but got %v instead", reflect.TypeOf(resFlush))) + if resFlush.Type != types.ResponseTypeFlush { + return nil, errors.New(Fmt("Expected types.ResponseTypesFlush but got %v instead", resFlush.Type)) } return res, nil diff --git a/example/golang/dummy_test.go b/example/golang/dummy_test.go index ccf34f3f5..7926a8032 100644 --- a/example/golang/dummy_test.go +++ b/example/golang/dummy_test.go @@ -1,12 +1,10 @@ package example import ( - "reflect" "testing" "time" . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" "github.com/tendermint/tmsp/server" "github.com/tendermint/tmsp/types" ) @@ -32,19 +30,18 @@ func TestStream(t *testing.T) { go func() { counter := 0 for { - var n int - var err error - var res types.Response - wire.ReadBinaryPtrLengthPrefixed(&res, conn, 0, &n, &err) + + var res = &types.Response{} + err := types.ReadMessage(conn, res) if err != nil { Exit(err.Error()) } // Process response - switch res := res.(type) { - case types.ResponseAppendTx: + switch res.Type { + case types.ResponseTypeAppendTx: counter += 1 - if res.Code != types.RetCodeOK { + if types.RetCode(res.Code) != types.RetCodeOK { t.Error("AppendTx failed with ret_code", res.Code) } if counter > numAppendTxs { @@ -57,10 +54,10 @@ func TestStream(t *testing.T) { close(done) }() } - case types.ResponseFlush: + case types.ResponseTypeFlush: // ignore default: - t.Error("Unexpected response type", reflect.TypeOf(res)) + t.Error("Unexpected response type", res.Type) } } }() @@ -68,10 +65,8 @@ func TestStream(t *testing.T) { // Write requests for counter := 0; counter < numAppendTxs; counter++ { // Send request - var n int - var err error - var req types.Request = types.RequestAppendTx{TxBytes: []byte("test")} - wire.WriteBinaryLengthPrefixed(struct{ types.Request }{req}, conn, &n, &err) + var req = types.RequestAppendTx([]byte("test")) + err := types.WriteMessage(req, conn) if err != nil { t.Fatal(err.Error()) } @@ -79,7 +74,7 @@ func TestStream(t *testing.T) { // Sometimes send flush messages if counter%123 == 0 { t.Log("flush") - wire.WriteBinaryLengthPrefixed(struct{ types.Request }{types.RequestFlush{}}, conn, &n, &err) + err := types.WriteMessage(types.RequestFlush(), conn) if err != nil { t.Fatal(err.Error()) } @@ -87,8 +82,7 @@ func TestStream(t *testing.T) { } // Send final flush message - var n int - wire.WriteBinaryLengthPrefixed(struct{ types.Request }{types.RequestFlush{}}, conn, &n, &err) + err = types.WriteMessage(types.RequestFlush(), conn) if err != nil { t.Fatal(err.Error()) } diff --git a/server/server.go b/server/server.go index 7c2a7c4cd..74b4ce74c 100644 --- a/server/server.go +++ b/server/server.go @@ -9,7 +9,6 @@ import ( "sync" . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" "github.com/tendermint/tmsp/types" ) @@ -40,8 +39,8 @@ func StartListener(protoAddr string, app types.Application) (net.Listener, error fmt.Println("Accepted a new connection") } - closeConn := make(chan error, 2) // Push to signal connection closed - responses := make(chan types.Response, 1000) // A channel to buffer responses + closeConn := make(chan error, 2) // Push to signal connection closed + responses := make(chan *types.Response, 1000) // A channel to buffer responses // Read requests from conn and deal with them go handleRequests(&mtx, app, closeConn, conn, responses) @@ -73,14 +72,13 @@ func StartListener(protoAddr string, app types.Application) (net.Listener, error } // Read requests from conn and deal with them -func handleRequests(mtx *sync.Mutex, app types.Application, closeConn chan error, conn net.Conn, responses chan<- types.Response) { +func handleRequests(mtx *sync.Mutex, app types.Application, closeConn chan error, conn net.Conn, responses chan<- *types.Response) { var count int var bufReader = bufio.NewReader(conn) for { - var n int - var err error - var req types.Request - wire.ReadBinaryPtrLengthPrefixed(&req, bufReader, 0, &n, &err) + + var req = &types.Request{} + err := types.ReadMessage(bufReader, req) if err != nil { if err == io.EOF { closeConn <- fmt.Errorf("Connection closed by client") @@ -96,49 +94,47 @@ func handleRequests(mtx *sync.Mutex, app types.Application, closeConn chan error } } -func handleRequest(app types.Application, req types.Request, responses chan<- types.Response) { - switch req := req.(type) { - case types.RequestEcho: - responses <- types.ResponseEcho{req.Message} - case types.RequestFlush: - responses <- types.ResponseFlush{} - case types.RequestInfo: +func handleRequest(app types.Application, req *types.Request, responses chan<- *types.Response) { + switch req.Type { + case types.RequestTypeEcho: + responses <- types.ResponseEcho(string(req.Data)) + case types.RequestTypeFlush: + responses <- types.ResponseFlush() + case types.RequestTypeInfo: data := app.Info() - responses <- types.ResponseInfo{data} - case types.RequestSetOption: + responses <- types.ResponseInfo(data) + case types.RequestTypeSetOption: logStr := app.SetOption(req.Key, req.Value) - responses <- types.ResponseSetOption{logStr} - case types.RequestAppendTx: - code, result, logStr := app.AppendTx(req.TxBytes) - responses <- types.ResponseAppendTx{code, result, logStr} - case types.RequestCheckTx: - code, result, logStr := app.CheckTx(req.TxBytes) - responses <- types.ResponseCheckTx{code, result, logStr} - case types.RequestGetHash: + responses <- types.ResponseSetOption(logStr) + case types.RequestTypeAppendTx: + code, result, logStr := app.AppendTx(req.Data) + responses <- types.ResponseAppendTx(code, result, logStr) + case types.RequestTypeCheckTx: + code, result, logStr := app.CheckTx(req.Data) + responses <- types.ResponseCheckTx(code, result, logStr) + case types.RequestTypeGetHash: hash, logStr := app.GetHash() - responses <- types.ResponseGetHash{hash, logStr} - case types.RequestQuery: - result, logStr := app.Query(req.QueryBytes) - responses <- types.ResponseQuery{result, logStr} + responses <- types.ResponseGetHash(hash, logStr) + case types.RequestTypeQuery: + result, logStr := app.Query(req.Data) + responses <- types.ResponseQuery(result, logStr) default: - responses <- types.ResponseException{"Unknown request"} + responses <- types.ResponseException("Unknown request") } } // Pull responses from 'responses' and write them to conn. -func handleResponses(closeConn chan error, responses <-chan types.Response, conn net.Conn) { +func handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) { var count int var bufWriter = bufio.NewWriter(conn) for { var res = <-responses - var n int - var err error - wire.WriteBinaryLengthPrefixed(struct{ types.Response }{res}, bufWriter, &n, &err) + err := types.WriteMessage(res, bufWriter) if err != nil { closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error()) return } - if _, ok := res.(types.ResponseFlush); ok { + if res.Type == types.ResponseTypeFlush { err = bufWriter.Flush() if err != nil { closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error()) diff --git a/tests/benchmarks/parallel/parallel.go b/tests/benchmarks/parallel/parallel.go index 481c14326..7cab2eb26 100644 --- a/tests/benchmarks/parallel/parallel.go +++ b/tests/benchmarks/parallel/parallel.go @@ -6,7 +6,6 @@ import ( //"encoding/hex" . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" "github.com/tendermint/tmsp/types" ) @@ -21,10 +20,8 @@ func main() { go func() { counter := 0 for { - var res types.Response - var n int - var err error - wire.ReadBinaryPtrLengthPrefixed(&res, conn, 0, &n, &err) + var res = &types.Response{} + err := types.ReadMessage(conn, res) if err != nil { Exit(err.Error()) } @@ -39,10 +36,9 @@ func main() { counter := 0 for i := 0; ; i++ { var bufWriter = bufio.NewWriter(conn) - var req types.Request = types.RequestEcho{"foobar"} - var n int - var err error - wire.WriteBinaryLengthPrefixed(struct{ types.Request }{req}, bufWriter, &n, &err) + var req = types.RequestEcho("foobar") + + err := types.WriteMessage(req, bufWriter) if err != nil { Exit(err.Error()) } @@ -50,6 +46,7 @@ func main() { if err != nil { Exit(err.Error()) } + counter += 1 if counter%1000 == 0 { fmt.Println("Write", counter) diff --git a/tests/benchmarks/simple/simple.go b/tests/benchmarks/simple/simple.go index 4009f8489..b40dbef5e 100644 --- a/tests/benchmarks/simple/simple.go +++ b/tests/benchmarks/simple/simple.go @@ -5,11 +5,9 @@ import ( "errors" "fmt" "net" - "reflect" //"encoding/hex" . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" "github.com/tendermint/tmsp/types" ) @@ -23,7 +21,7 @@ func main() { // Make a bunch of requests counter := 0 for i := 0; ; i++ { - req := types.RequestEcho{"foobar"} + req := types.RequestEcho("foobar") _, err := makeRequest(conn, req) if err != nil { Exit(err.Error()) @@ -35,35 +33,32 @@ func main() { } } -func makeRequest(conn net.Conn, req types.Request) (types.Response, error) { +func makeRequest(conn net.Conn, req *types.Request) (*types.Response, error) { var bufWriter = bufio.NewWriter(conn) - var n int - var err error // Write desired request - wire.WriteBinaryLengthPrefixed(struct{ types.Request }{req}, bufWriter, &n, &err) + err := types.WriteMessage(req, bufWriter) if err != nil { return nil, err } - bufWriter.Write([]byte{0x01, 0x01, types.RequestTypeFlush}) // Write flush msg - err = bufWriter.Flush() + err = types.WriteMessage(types.RequestFlush(), bufWriter) if err != nil { return nil, err } // Read desired response - var res types.Response - wire.ReadBinaryPtrLengthPrefixed(&res, conn, 0, &n, &err) + var res = &types.Response{} + err = types.ReadMessage(conn, res) if err != nil { return nil, err } - var resFlush types.Response // Read flush msg - wire.ReadBinaryPtrLengthPrefixed(&resFlush, conn, 0, &n, &err) + var resFlush = &types.Response{} + err = types.ReadMessage(conn, resFlush) if err != nil { return nil, err } - if _, ok := resFlush.(types.ResponseFlush); !ok { - return nil, errors.New(Fmt("Expected flush response but got something else", reflect.TypeOf(resFlush))) + if resFlush.Type != types.ResponseTypeFlush { + return nil, errors.New(Fmt("Expected flush response but got something else: %v", resFlush.Type)) } return res, nil diff --git a/tests/benchmarks/wire_test.go b/tests/benchmarks/wire_test.go deleted file mode 100644 index c730089aa..000000000 --- a/tests/benchmarks/wire_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package benchmarks - -import ( - "bytes" - "testing" - - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tmsp/types" -) - -func BenchmarkRequestWire(b *testing.B) { - b.StopTimer() - var bz = make([]byte, 1024) - copy(bz, []byte{1, 9, 0x01, 1, 6, 34, 34, 34, 34, 34, 34}) - var buf = bytes.NewBuffer(bz) - var req types.Request - b.StartTimer() - - for i := 0; i < b.N; i++ { - { - buf = bytes.NewBuffer(bz) - var n int - var err error - wire.ReadBinaryPtrLengthPrefixed(&req, buf, 0, &n, &err) - if err != nil { - Exit(err.Error()) - return - } - } - { - buf = bytes.NewBuffer(bz) - var n int - var err error - wire.WriteBinaryLengthPrefixed(struct{ types.Request }{req}, buf, &n, &err) - if err != nil { - Exit(err.Error()) - return - } - - } - } - -} diff --git a/types/messages.go b/types/messages.go index 0e05706d2..8c21f327c 100644 --- a/types/messages.go +++ b/types/messages.go @@ -1,155 +1,180 @@ package types -import "github.com/tendermint/go-wire" +import ( + "io" -const ( - RequestTypeEcho = byte(0x01) - RequestTypeFlush = byte(0x02) - RequestTypeInfo = byte(0x03) - RequestTypeSetOption = byte(0x04) - // reserved for GetOption = byte(0x05) - - ResponseTypeException = byte(0x10) - ResponseTypeEcho = byte(0x11) - ResponseTypeFlush = byte(0x12) - ResponseTypeInfo = byte(0x13) - ResponseTypeSetOption = byte(0x14) - // reserved for GetOption = byte(0x15) - - RequestTypeAppendTx = byte(0x21) - RequestTypeCheckTx = byte(0x22) - RequestTypeGetHash = byte(0x23) - RequestTypeQuery = byte(0x24) - - ResponseTypeAppendTx = byte(0x31) - ResponseTypeCheckTx = byte(0x32) - ResponseTypeGetHash = byte(0x33) - ResponseTypeQuery = byte(0x34) + "github.com/golang/protobuf/proto" + "github.com/tendermint/go-wire" ) -//---------------------------------------- - -type RequestEcho struct { - Message string -} +const ( + RequestTypeEcho = uint32(0x01) + RequestTypeFlush = uint32(0x02) + RequestTypeInfo = uint32(0x03) + RequestTypeSetOption = uint32(0x04) + // reserved for GetOption = uint32(0x05) + + ResponseTypeException = uint32(0x10) + ResponseTypeEcho = uint32(0x11) + ResponseTypeFlush = uint32(0x12) + ResponseTypeInfo = uint32(0x13) + ResponseTypeSetOption = uint32(0x14) + // reserved for GetOption = uint32(0x15) + + RequestTypeAppendTx = uint32(0x21) + RequestTypeCheckTx = uint32(0x22) + RequestTypeGetHash = uint32(0x23) + RequestTypeQuery = uint32(0x24) + + ResponseTypeAppendTx = uint32(0x31) + ResponseTypeCheckTx = uint32(0x32) + ResponseTypeGetHash = uint32(0x33) + ResponseTypeQuery = uint32(0x34) +) -type RequestFlush struct { +func RequestEcho(message string) *Request { + return &Request{ + Type: RequestTypeEcho, + Data: []byte(message), + } } -type RequestInfo struct { +func RequestFlush() *Request { + return &Request{ + Type: RequestTypeFlush, + } } -type RequestSetOption struct { - Key string - Value string +func RequestInfo() *Request { + return &Request{ + Type: RequestTypeInfo, + } } -type RequestAppendTx struct { - TxBytes []byte +func RequestSetOption(key string, value string) *Request { + return &Request{ + Type: RequestTypeSetOption, + Key: key, + Value: value, + } } -type RequestCheckTx struct { - TxBytes []byte +func RequestAppendTx(txBytes []byte) *Request { + return &Request{ + Type: RequestTypeAppendTx, + Data: txBytes, + } } -type RequestGetHash struct { +func RequestCheckTx(txBytes []byte) *Request { + return &Request{ + Type: RequestTypeCheckTx, + Data: txBytes, + } } -type RequestQuery struct { - QueryBytes []byte +func RequestGetHash() *Request { + return &Request{ + Type: RequestTypeGetHash, + } } -type Request interface { - AssertRequestType() +func RequestQuery(queryBytes []byte) *Request { + return &Request{ + Type: RequestTypeQuery, + Data: queryBytes, + } } -func (_ RequestEcho) AssertRequestType() {} -func (_ RequestFlush) AssertRequestType() {} -func (_ RequestInfo) AssertRequestType() {} -func (_ RequestSetOption) AssertRequestType() {} -func (_ RequestAppendTx) AssertRequestType() {} -func (_ RequestCheckTx) AssertRequestType() {} -func (_ RequestGetHash) AssertRequestType() {} -func (_ RequestQuery) AssertRequestType() {} - -var _ = wire.RegisterInterface( - struct{ Request }{}, - wire.ConcreteType{RequestEcho{}, RequestTypeEcho}, - wire.ConcreteType{RequestFlush{}, RequestTypeFlush}, - wire.ConcreteType{RequestInfo{}, RequestTypeInfo}, - wire.ConcreteType{RequestSetOption{}, RequestTypeSetOption}, - wire.ConcreteType{RequestAppendTx{}, RequestTypeAppendTx}, - wire.ConcreteType{RequestCheckTx{}, RequestTypeCheckTx}, - wire.ConcreteType{RequestGetHash{}, RequestTypeGetHash}, - wire.ConcreteType{RequestQuery{}, RequestTypeQuery}, -) - //---------------------------------------- -type ResponseException struct { - Error string -} - -type ResponseEcho struct { - Message string +func ResponseException(errStr string) *Response { + return &Response{ + Type: ResponseTypeException, + Error: errStr, + } } -type ResponseFlush struct { +func ResponseEcho(message string) *Response { + return &Response{ + Type: ResponseTypeEcho, + Data: []byte(message), + } } -type ResponseInfo struct { - Info string +func ResponseFlush() *Response { + return &Response{ + Type: ResponseTypeFlush, + } } -type ResponseSetOption struct { - Log string +func ResponseInfo(info string) *Response { + return &Response{ + Type: ResponseTypeInfo, + Data: []byte(info), + } } -type ResponseAppendTx struct { - Code RetCode - Result []byte - Log string +func ResponseSetOption(log string) *Response { + return &Response{ + Type: ResponseTypeSetOption, + Log: log, + } } -type ResponseCheckTx struct { - Code RetCode - Result []byte - Log string +func ResponseAppendTx(code RetCode, result []byte, log string) *Response { + return &Response{ + Type: ResponseTypeAppendTx, + Data: result, + Log: log, + } } -type ResponseGetHash struct { - Hash []byte - Log string +func ResponseCheckTx(code RetCode, result []byte, log string) *Response { + return &Response{ + Type: ResponseTypeCheckTx, + Data: result, + Log: log, + } } -type ResponseQuery struct { - Result []byte - Log string +func ResponseGetHash(hash []byte, log string) *Response { + return &Response{ + Type: ResponseTypeGetHash, + Data: hash, + Log: log, + } } -type Response interface { - AssertResponseType() +func ResponseQuery(result []byte, log string) *Response { + return &Response{ + Type: ResponseTypeQuery, + Data: result, + Log: log, + } } -func (_ ResponseEcho) AssertResponseType() {} -func (_ ResponseFlush) AssertResponseType() {} -func (_ ResponseInfo) AssertResponseType() {} -func (_ ResponseSetOption) AssertResponseType() {} -func (_ ResponseAppendTx) AssertResponseType() {} -func (_ ResponseCheckTx) AssertResponseType() {} -func (_ ResponseGetHash) AssertResponseType() {} -func (_ ResponseException) AssertResponseType() {} -func (_ ResponseQuery) AssertResponseType() {} +//---------------------------------------- -var _ = wire.RegisterInterface( - struct{ Response }{}, - wire.ConcreteType{ResponseEcho{}, ResponseTypeEcho}, - wire.ConcreteType{ResponseFlush{}, ResponseTypeFlush}, - wire.ConcreteType{ResponseInfo{}, ResponseTypeInfo}, - wire.ConcreteType{ResponseSetOption{}, ResponseTypeSetOption}, - wire.ConcreteType{ResponseAppendTx{}, ResponseTypeAppendTx}, - wire.ConcreteType{ResponseCheckTx{}, ResponseTypeCheckTx}, - wire.ConcreteType{ResponseGetHash{}, ResponseTypeGetHash}, - wire.ConcreteType{ResponseException{}, ResponseTypeException}, - wire.ConcreteType{ResponseQuery{}, ResponseTypeQuery}, -) +// Write proto message, length delimited +func WriteMessage(msg proto.Message, w io.Writer) error { + bz, err := proto.Marshal(msg) + if err != nil { + return err + } + var n int + wire.WriteByteSlice(bz, w, &n, &err) + return err +} + +// Read proto message, length delimited +func ReadMessage(r io.Reader, msg proto.Message) error { + var n int + var err error + bz := wire.ReadByteSlice(r, 0, &n, &err) + if err != nil { + return err + } + err = proto.Unmarshal(bz, msg) + return err +} diff --git a/types/retcode.go b/types/retcode.go index 05e49a2ec..c6bf3e209 100644 --- a/types/retcode.go +++ b/types/retcode.go @@ -1,6 +1,6 @@ package types -type RetCode int +type RetCode uint // Reserved return codes const ( diff --git a/types/types.pb.go b/types/types.pb.go new file mode 100644 index 000000000..877c72cae --- /dev/null +++ b/types/types.pb.go @@ -0,0 +1,69 @@ +// Code generated by protoc-gen-go. +// source: types/types.proto +// DO NOT EDIT! + +/* +Package types is a generated protocol buffer package. + +It is generated from these files: + types/types.proto + +It has these top-level messages: + Request + Response +*/ +package types + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type Request struct { + Type uint32 `protobuf:"varint,1,opt,name=type" json:"type,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` + Key string `protobuf:"bytes,3,opt,name=key" json:"key,omitempty"` + Value string `protobuf:"bytes,4,opt,name=value" json:"value,omitempty"` +} + +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} +func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type Response struct { + Type uint32 `protobuf:"varint,1,opt,name=type" json:"type,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` + Code uint32 `protobuf:"varint,3,opt,name=code" json:"code,omitempty"` + Error string `protobuf:"bytes,4,opt,name=error" json:"error,omitempty"` + Log string `protobuf:"bytes,5,opt,name=log" json:"log,omitempty"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} +func (*Response) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func init() { + proto.RegisterType((*Request)(nil), "types.Request") + proto.RegisterType((*Response)(nil), "types.Response") +} + +var fileDescriptor0 = []byte{ + // 165 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x8f, 0xb1, 0xae, 0xc2, 0x30, + 0x0c, 0x45, 0xd5, 0xd7, 0xe6, 0x01, 0x16, 0x48, 0x10, 0x31, 0x64, 0x44, 0x9d, 0x98, 0x60, 0xe0, + 0x4f, 0xb2, 0x31, 0x06, 0x6a, 0x31, 0x50, 0xd5, 0x21, 0x49, 0x91, 0xfa, 0xf7, 0xd8, 0xae, 0xd8, + 0x59, 0xa2, 0x73, 0x8f, 0xa2, 0xab, 0x6b, 0xd8, 0x95, 0x29, 0x62, 0x3e, 0xeb, 0x7b, 0x8a, 0x89, + 0x0a, 0x59, 0xa3, 0xa1, 0xbd, 0xc2, 0xc2, 0xe3, 0x6b, 0xc4, 0x5c, 0xac, 0x85, 0x46, 0x9c, 0xab, + 0x0e, 0xd5, 0x71, 0xe3, 0x95, 0xc5, 0x75, 0xa1, 0x04, 0xf7, 0xc7, 0x6e, 0xed, 0x95, 0xed, 0x16, + 0xea, 0x27, 0x4e, 0xae, 0x66, 0xb5, 0xf2, 0x82, 0x76, 0x0f, 0xe6, 0x1d, 0xfa, 0x11, 0x5d, 0xa3, + 0x6e, 0x0e, 0xed, 0x00, 0x4b, 0x8f, 0x39, 0xd2, 0x90, 0xf1, 0xe7, 0x6e, 0x76, 0x77, 0xea, 0x50, + 0xcb, 0xf9, 0x9f, 0xb0, 0xb4, 0x63, 0x4a, 0x94, 0xbe, 0xed, 0x1a, 0x64, 0x45, 0x4f, 0x0f, 0x67, + 0xe6, 0x15, 0x8c, 0xb7, 0x7f, 0x3d, 0xec, 0xf2, 0x09, 0x00, 0x00, 0xff, 0xff, 0xce, 0x9d, 0x3d, + 0x4f, 0xed, 0x00, 0x00, 0x00, +} diff --git a/types/types.proto b/types/types.proto new file mode 100644 index 000000000..8a00c3ca6 --- /dev/null +++ b/types/types.proto @@ -0,0 +1,47 @@ +syntax = "proto3"; + +package types; + +//---------------------------------------- +// Message types + +/* + RequestTypeEcho = 0x01; + RequestTypeFlush = 0x02; + RequestTypeInfo = 0x03; + RequestTypeSetOption = 0x04; + RequestTypeAppendTx = 0x21; + RequestTypeCheckTx = 0x22; + RequestTypeGetHash = 0x23; + RequestTypeQuery = 0x24; + + ResponseTypeEcho = 0x11; + ResponseTypeFlush = 0x12; + ResponseTypeInfo = 0x13; + ResponseTypeSetOption = 0x14; + ResponseTypeAppendTx = 0x31; + ResponseTypeCheckTx = 0x32; + ResponseTypeGetHash = 0x33; + ResponseTypeQuery = 0x34; +*/ + +//---------------------------------------- +// Request types + +message Request { + uint32 type = 1; + bytes data = 2; + string key = 3; + string value = 4; +} + +//---------------------------------------- +// Response types + +message Response { + uint32 type = 1; + bytes data = 2; + uint32 code = 3; + string error = 4; + string log = 5; +}