From db437e7a456bef2edff655bb870aa4968be1b0ac Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 21 Jun 2016 13:19:49 -0400 Subject: [PATCH 1/3] broadcast_tx via grpc --- cmd/tendermint/flags.go | 3 + config/tendermint/config.go | 1 + config/tendermint_test/config.go | 1 + node/node.go | 12 ++ rpc/grpc/api.go | 18 ++ rpc/grpc/client_server.go | 44 ++++ rpc/grpc/types.pb.go | 336 +++++++++++++++++++++++++++++++ rpc/grpc/types.proto | 41 ++++ rpc/test/grpc_test.go | 21 ++ rpc/test/helpers.go | 7 + test/app/counter_test.sh | 35 +++- test/app/grpc_client.go | 36 ++++ test/app/test.sh | 22 ++ 13 files changed, 573 insertions(+), 4 deletions(-) create mode 100644 rpc/grpc/api.go create mode 100644 rpc/grpc/client_server.go create mode 100644 rpc/grpc/types.pb.go create mode 100644 rpc/grpc/types.proto create mode 100644 rpc/test/grpc_test.go create mode 100644 test/app/grpc_client.go diff --git a/cmd/tendermint/flags.go b/cmd/tendermint/flags.go index 1765cc545..1cc41c4c9 100644 --- a/cmd/tendermint/flags.go +++ b/cmd/tendermint/flags.go @@ -16,6 +16,7 @@ func parseFlags(config cfg.Config, args []string) { fastSync bool skipUPNP bool rpcLaddr string + grpcLaddr string logLevel string proxyApp string tmspTransport string @@ -30,6 +31,7 @@ func parseFlags(config cfg.Config, args []string) { flags.BoolVar(&fastSync, "fast_sync", config.GetBool("fast_sync"), "Fast blockchain syncing") flags.BoolVar(&skipUPNP, "skip_upnp", config.GetBool("skip_upnp"), "Skip UPNP configuration") flags.StringVar(&rpcLaddr, "rpc_laddr", config.GetString("rpc_laddr"), "RPC listen address. Port required") + flags.StringVar(&grpcLaddr, "grpc_laddr", config.GetString("grpc_laddr"), "GRPC listen address (BroadcastTx only). Port required") flags.StringVar(&logLevel, "log_level", config.GetString("log_level"), "Log level") flags.StringVar(&proxyApp, "proxy_app", config.GetString("proxy_app"), "Proxy app address, or 'nilapp' or 'dummy' for local testing.") @@ -47,6 +49,7 @@ func parseFlags(config cfg.Config, args []string) { config.Set("fast_sync", fastSync) config.Set("skip_upnp", skipUPNP) config.Set("rpc_laddr", rpcLaddr) + config.Set("grpc_laddr", grpcLaddr) config.Set("log_level", logLevel) config.Set("proxy_app", proxyApp) config.Set("tmsp", tmspTransport) diff --git a/config/tendermint/config.go b/config/tendermint/config.go index dfdf41709..a48f801e9 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -67,6 +67,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("db_dir", rootDir+"/data") mapConfig.SetDefault("log_level", "info") mapConfig.SetDefault("rpc_laddr", "tcp://0.0.0.0:46657") + mapConfig.SetDefault("grpc_laddr", "") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") mapConfig.SetDefault("cs_wal_dir", rootDir+"/data/cs.wal") diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 1751af00d..30da33cce 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -80,6 +80,7 @@ func ResetConfig(localPath string) cfg.Config { mapConfig.SetDefault("db_dir", rootDir+"/data") mapConfig.SetDefault("log_level", "debug") mapConfig.SetDefault("rpc_laddr", "tcp://0.0.0.0:36657") + mapConfig.SetDefault("grpc_laddr", "tcp://0.0.0.0:36658") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") mapConfig.SetDefault("cs_wal_dir", rootDir+"/data/cs.wal") diff --git a/node/node.go b/node/node.go index dcde7faca..d2bb46f6e 100644 --- a/node/node.go +++ b/node/node.go @@ -21,6 +21,7 @@ import ( mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" rpccore "github.com/tendermint/tendermint/rpc/core" + grpccore "github.com/tendermint/tendermint/rpc/grpc" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" @@ -218,6 +219,17 @@ func (n *Node) StartRPC() ([]net.Listener, error) { } listeners[i] = listener } + + // we expose a simplified api over grpc for convenience to app devs + grpcListenAddr := n.config.GetString("grpc_laddr") + if grpcListenAddr != "" { + listener, err := grpccore.StartGRPCServer(grpcListenAddr) + if err != nil { + return nil, err + } + listeners = append(listeners, listener) + } + return listeners, nil } diff --git a/rpc/grpc/api.go b/rpc/grpc/api.go new file mode 100644 index 000000000..190500547 --- /dev/null +++ b/rpc/grpc/api.go @@ -0,0 +1,18 @@ +package core_grpc + +import ( + core "github.com/tendermint/tendermint/rpc/core" + + context "golang.org/x/net/context" +) + +type broadcastAPI struct { +} + +func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) { + res, err := core.BroadcastTxCommit(req.Tx) + if res == nil { + return nil, err + } + return &ResponseBroadcastTx{uint64(res.Code), res.Data, res.Log}, err +} diff --git a/rpc/grpc/client_server.go b/rpc/grpc/client_server.go new file mode 100644 index 000000000..d760bf254 --- /dev/null +++ b/rpc/grpc/client_server.go @@ -0,0 +1,44 @@ +package core_grpc + +import ( + "fmt" + "net" + "strings" + "time" + + "google.golang.org/grpc" + + . "github.com/tendermint/go-common" +) + +// Start the grpcServer in a go routine +func StartGRPCServer(protoAddr string) (net.Listener, error) { + parts := strings.SplitN(protoAddr, "://", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("Invalid listen address for grpc server (did you forget a tcp:// prefix?) : %s", protoAddr) + } + proto, addr := parts[0], parts[1] + ln, err := net.Listen(proto, addr) + if err != nil { + return nil, err + } + + grpcServer := grpc.NewServer() + RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{}) + go grpcServer.Serve(ln) + + return ln, nil +} + +// Start the client by dialing the server +func StartGRPCClient(protoAddr string) BroadcastAPIClient { + conn, err := grpc.Dial(protoAddr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) + if err != nil { + panic(err) + } + return NewBroadcastAPIClient(conn) +} + +func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) { + return Connect(addr) +} diff --git a/rpc/grpc/types.pb.go b/rpc/grpc/types.pb.go new file mode 100644 index 000000000..51dfb6974 --- /dev/null +++ b/rpc/grpc/types.pb.go @@ -0,0 +1,336 @@ +// Code generated by protoc-gen-go. +// source: types.proto +// DO NOT EDIT! + +/* +Package core_grpc is a generated protocol buffer package. + +It is generated from these files: + types.proto + +It has these top-level messages: + Request + RequestBroadcastTx + Response + ResponseBroadcastTx +*/ +package core_grpc + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Request struct { + // Types that are valid to be assigned to Value: + // *Request_BroadcastTx + Value isRequest_Value `protobuf_oneof:"value"` +} + +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} +func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type isRequest_Value interface { + isRequest_Value() +} + +type Request_BroadcastTx struct { + BroadcastTx *RequestBroadcastTx `protobuf:"bytes,1,opt,name=broadcast_tx,json=broadcastTx,oneof"` +} + +func (*Request_BroadcastTx) isRequest_Value() {} + +func (m *Request) GetValue() isRequest_Value { + if m != nil { + return m.Value + } + return nil +} + +func (m *Request) GetBroadcastTx() *RequestBroadcastTx { + if x, ok := m.GetValue().(*Request_BroadcastTx); ok { + return x.BroadcastTx + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*Request) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _Request_OneofMarshaler, _Request_OneofUnmarshaler, _Request_OneofSizer, []interface{}{ + (*Request_BroadcastTx)(nil), + } +} + +func _Request_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*Request) + // value + switch x := m.Value.(type) { + case *Request_BroadcastTx: + b.EncodeVarint(1<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.BroadcastTx); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("Request.Value has unexpected type %T", x) + } + return nil +} + +func _Request_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*Request) + switch tag { + case 1: // value.broadcast_tx + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(RequestBroadcastTx) + err := b.DecodeMessage(msg) + m.Value = &Request_BroadcastTx{msg} + return true, err + default: + return false, nil + } +} + +func _Request_OneofSizer(msg proto.Message) (n int) { + m := msg.(*Request) + // value + switch x := m.Value.(type) { + case *Request_BroadcastTx: + s := proto.Size(x.BroadcastTx) + n += proto.SizeVarint(1<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + +type RequestBroadcastTx struct { + Tx []byte `protobuf:"bytes,1,opt,name=tx,proto3" json:"tx,omitempty"` +} + +func (m *RequestBroadcastTx) Reset() { *m = RequestBroadcastTx{} } +func (m *RequestBroadcastTx) String() string { return proto.CompactTextString(m) } +func (*RequestBroadcastTx) ProtoMessage() {} +func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +type Response struct { + // Types that are valid to be assigned to Value: + // *Response_BroadcastTx + Value isResponse_Value `protobuf_oneof:"value"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} +func (*Response) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +type isResponse_Value interface { + isResponse_Value() +} + +type Response_BroadcastTx struct { + BroadcastTx *ResponseBroadcastTx `protobuf:"bytes,1,opt,name=broadcast_tx,json=broadcastTx,oneof"` +} + +func (*Response_BroadcastTx) isResponse_Value() {} + +func (m *Response) GetValue() isResponse_Value { + if m != nil { + return m.Value + } + return nil +} + +func (m *Response) GetBroadcastTx() *ResponseBroadcastTx { + if x, ok := m.GetValue().(*Response_BroadcastTx); ok { + return x.BroadcastTx + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*Response) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _Response_OneofMarshaler, _Response_OneofUnmarshaler, _Response_OneofSizer, []interface{}{ + (*Response_BroadcastTx)(nil), + } +} + +func _Response_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*Response) + // value + switch x := m.Value.(type) { + case *Response_BroadcastTx: + b.EncodeVarint(1<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.BroadcastTx); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("Response.Value has unexpected type %T", x) + } + return nil +} + +func _Response_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*Response) + switch tag { + case 1: // value.broadcast_tx + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(ResponseBroadcastTx) + err := b.DecodeMessage(msg) + m.Value = &Response_BroadcastTx{msg} + return true, err + default: + return false, nil + } +} + +func _Response_OneofSizer(msg proto.Message) (n int) { + m := msg.(*Response) + // value + switch x := m.Value.(type) { + case *Response_BroadcastTx: + s := proto.Size(x.BroadcastTx) + n += proto.SizeVarint(1<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + +type ResponseBroadcastTx struct { + Code uint64 `protobuf:"varint,1,opt,name=code" json:"code,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` + Log string `protobuf:"bytes,3,opt,name=log" json:"log,omitempty"` +} + +func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} } +func (m *ResponseBroadcastTx) String() string { return proto.CompactTextString(m) } +func (*ResponseBroadcastTx) ProtoMessage() {} +func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func init() { + proto.RegisterType((*Request)(nil), "core_grpc.Request") + proto.RegisterType((*RequestBroadcastTx)(nil), "core_grpc.RequestBroadcastTx") + proto.RegisterType((*Response)(nil), "core_grpc.Response") + proto.RegisterType((*ResponseBroadcastTx)(nil), "core_grpc.ResponseBroadcastTx") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion3 + +// Client API for BroadcastAPI service + +type BroadcastAPIClient interface { + BroadcastTx(ctx context.Context, in *RequestBroadcastTx, opts ...grpc.CallOption) (*ResponseBroadcastTx, error) +} + +type broadcastAPIClient struct { + cc *grpc.ClientConn +} + +func NewBroadcastAPIClient(cc *grpc.ClientConn) BroadcastAPIClient { + return &broadcastAPIClient{cc} +} + +func (c *broadcastAPIClient) BroadcastTx(ctx context.Context, in *RequestBroadcastTx, opts ...grpc.CallOption) (*ResponseBroadcastTx, error) { + out := new(ResponseBroadcastTx) + err := grpc.Invoke(ctx, "/core_grpc.BroadcastAPI/BroadcastTx", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for BroadcastAPI service + +type BroadcastAPIServer interface { + BroadcastTx(context.Context, *RequestBroadcastTx) (*ResponseBroadcastTx, error) +} + +func RegisterBroadcastAPIServer(s *grpc.Server, srv BroadcastAPIServer) { + s.RegisterService(&_BroadcastAPI_serviceDesc, srv) +} + +func _BroadcastAPI_BroadcastTx_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RequestBroadcastTx) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BroadcastAPIServer).BroadcastTx(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/core_grpc.BroadcastAPI/BroadcastTx", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BroadcastAPIServer).BroadcastTx(ctx, req.(*RequestBroadcastTx)) + } + return interceptor(ctx, in, info, handler) +} + +var _BroadcastAPI_serviceDesc = grpc.ServiceDesc{ + ServiceName: "core_grpc.BroadcastAPI", + HandlerType: (*BroadcastAPIServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "BroadcastTx", + Handler: _BroadcastAPI_BroadcastTx_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: fileDescriptor0, +} + +func init() { proto.RegisterFile("types.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 222 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0xa9, 0x2c, 0x48, + 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0xce, 0x2f, 0x4a, 0x8d, 0x4f, 0x2f, + 0x2a, 0x48, 0x56, 0x0a, 0xe3, 0x62, 0x0f, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x72, 0xe2, + 0xe2, 0x49, 0x2a, 0xca, 0x4f, 0x4c, 0x49, 0x4e, 0x2c, 0x2e, 0x89, 0x2f, 0xa9, 0x90, 0x60, 0x54, + 0x60, 0xd4, 0xe0, 0x36, 0x92, 0xd5, 0x83, 0x2b, 0xd6, 0x83, 0xaa, 0x74, 0x82, 0xa9, 0x0a, 0xa9, + 0xf0, 0x60, 0x08, 0xe2, 0x4e, 0x42, 0x70, 0x9d, 0xd8, 0xb9, 0x58, 0xcb, 0x12, 0x73, 0x4a, 0x53, + 0x95, 0x54, 0xb8, 0x84, 0x30, 0x55, 0x0b, 0xf1, 0x71, 0x31, 0x41, 0x0d, 0xe6, 0x09, 0x02, 0xb2, + 0x94, 0x22, 0xb8, 0x38, 0x82, 0x52, 0x8b, 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0x85, 0x9c, 0xb1, 0x5a, + 0x2f, 0x87, 0x62, 0x3d, 0x44, 0x29, 0x31, 0xf6, 0xfb, 0x73, 0x09, 0x63, 0x51, 0x2e, 0x24, 0xc4, + 0xc5, 0x92, 0x9c, 0x9f, 0x92, 0x0a, 0x36, 0x9c, 0x25, 0x08, 0xcc, 0x06, 0x89, 0xa5, 0x24, 0x96, + 0x24, 0x4a, 0x30, 0x81, 0x9d, 0x05, 0x66, 0x0b, 0x09, 0x70, 0x31, 0xe7, 0xe4, 0xa7, 0x4b, 0x30, + 0x03, 0x85, 0x38, 0x83, 0x40, 0x4c, 0xa3, 0x18, 0x2e, 0x1e, 0xb8, 0x41, 0x8e, 0x01, 0x9e, 0x42, + 0x3e, 0x5c, 0xdc, 0xc8, 0x06, 0xe3, 0x0f, 0x26, 0x29, 0x02, 0xde, 0x48, 0x62, 0x03, 0x47, 0x8c, + 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xb2, 0x77, 0xda, 0x85, 0xa7, 0x01, 0x00, 0x00, +} diff --git a/rpc/grpc/types.proto b/rpc/grpc/types.proto new file mode 100644 index 000000000..2f2b96de3 --- /dev/null +++ b/rpc/grpc/types.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; +package core_grpc; + +//---------------------------------------- +// Message types + +//---------------------------------------- +// Request types + +message Request { + oneof value{ + RequestBroadcastTx broadcast_tx = 1; + } +} + +message RequestBroadcastTx { + bytes tx = 1; +} + +//---------------------------------------- +// Response types + + +message Response { + oneof value{ + ResponseBroadcastTx broadcast_tx = 1; + } +} + +message ResponseBroadcastTx{ + uint64 code = 1; // TODO: import tmsp ... + bytes data = 2; + string log = 3; +} + +//---------------------------------------- +// Service Definition + +service BroadcastAPI { + rpc BroadcastTx(RequestBroadcastTx) returns (ResponseBroadcastTx) ; +} diff --git a/rpc/test/grpc_test.go b/rpc/test/grpc_test.go new file mode 100644 index 000000000..8fad465be --- /dev/null +++ b/rpc/test/grpc_test.go @@ -0,0 +1,21 @@ +package rpctest + +import ( + "testing" + + "golang.org/x/net/context" + + "github.com/tendermint/tendermint/rpc/grpc" +) + +//------------------------------------------- + +func TestBroadcastTx(t *testing.T) { + res, err := clientGRPC.BroadcastTx(context.Background(), &core_grpc.RequestBroadcastTx{[]byte("this is a tx")}) + if err != nil { + t.Fatal(err) + } + if res.Code != 0 { + t.Fatalf("Non-zero code: %d", res.Code) + } +} diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 17acaf9be..da6482483 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -13,6 +13,7 @@ import ( "github.com/tendermint/tendermint/config/tendermint_test" nm "github.com/tendermint/tendermint/node" ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/rpc/grpc" ) // global variables for use across all tests @@ -24,8 +25,10 @@ var ( requestAddr string websocketAddr string websocketEndpoint string + grpcAddr string clientURI *client.ClientURI clientJSON *client.ClientJSONRPC + clientGRPC core_grpc.BroadcastAPIClient ) // initialize config and create new node @@ -33,12 +36,14 @@ func init() { config = tendermint_test.ResetConfig("rpc_test_client_test") chainID = config.GetString("chain_id") rpcAddr = config.GetString("rpc_laddr") + grpcAddr = config.GetString("grpc_laddr") requestAddr = rpcAddr websocketAddr = rpcAddr websocketEndpoint = "/websocket" clientURI = client.NewClientURI(requestAddr) clientJSON = client.NewClientJSONRPC(requestAddr) + clientGRPC = core_grpc.StartGRPCClient(grpcAddr) // TODO: change consensus/state.go timeouts to be shorter @@ -59,6 +64,8 @@ func newNode(ready chan struct{}) { // Run the RPC server. node.StartRPC() + time.Sleep(time.Second) + ready <- struct{}{} // Sleep forever diff --git a/test/app/counter_test.sh b/test/app/counter_test.sh index 809a22128..b8d670dc2 100644 --- a/test/app/counter_test.sh +++ b/test/app/counter_test.sh @@ -9,10 +9,37 @@ TESTNAME=$1 function sendTx() { TX=$1 - RESPONSE=`curl -s localhost:46657/broadcast_tx_commit?tx=\"$TX\"` - CODE=`echo $RESPONSE | jq .result[1].code` - ERROR=`echo $RESPONSE | jq .error` - ERROR=$(echo "$ERROR" | tr -d '"') # remove surrounding quotes + if [[ "$GRPC_BROADCAST_TX" == "" ]]; then + RESPONSE=`curl -s localhost:46657/broadcast_tx_commit?tx=\"$TX\"` + CODE=`echo $RESPONSE | jq .result[1].code` + ERROR=`echo $RESPONSE | jq .error` + ERROR=$(echo "$ERROR" | tr -d '"') # remove surrounding quotes + else + RESPONSE=`go run grpc_client.go $TX` + echo $RESPONSE | jq . &> /dev/null + IS_JSON=$? + if [[ "$IS_JSON" != "0" ]]; then + ERROR="$RESPONSE" + else + ERROR="" # reset + fi + + if [[ "$RESPONSE" == "{}" ]]; then + # protobuf auto adds `omitempty` to everything so code OK and empty data/log + # will not even show when marshalled into json + # apparently we can use github.com/golang/protobuf/jsonpb to do the marshalling ... + CODE=0 + else + # this wont actually work if theres an error ... + CODE=`echo $RESPONSE | jq .code` + fi + #echo "-------" + #echo "TX $TX" + #echo "RESPONSE $RESPONSE" + #echo "CODE $CODE" + #echo "ERROR $ERROR" + #echo "----" + fi } # 0 should pass once and get in block, with no error diff --git a/test/app/grpc_client.go b/test/app/grpc_client.go new file mode 100644 index 000000000..e43b8ae39 --- /dev/null +++ b/test/app/grpc_client.go @@ -0,0 +1,36 @@ +package main + +import ( + "encoding/hex" + "fmt" + "os" + + "golang.org/x/net/context" + + "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/rpc/grpc" +) + +var grpcAddr = "tcp://localhost:36656" + +func main() { + args := os.Args + if len(args) == 1 { + fmt.Println("Must enter a transaction to send (hex)") + os.Exit(1) + } + tx := args[1] + txBytes, err := hex.DecodeString(tx) + if err != nil { + fmt.Println("Invalid hex", err) + os.Exit(1) + } + + clientGRPC := core_grpc.StartGRPCClient(grpcAddr) + res, err := clientGRPC.BroadcastTx(context.Background(), &core_grpc.RequestBroadcastTx{txBytes}) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + fmt.Println(string(wire.JSONBytes(res))) +} diff --git a/test/app/test.sh b/test/app/test.sh index d44ad2f25..f73ab6960 100644 --- a/test/app/test.sh +++ b/test/app/test.sh @@ -77,6 +77,23 @@ function counter_over_grpc() { kill -9 $pid_counter $pid_tendermint } +function counter_over_grpc_grpc() { + rm -rf $TMROOT + tendermint init + echo "Starting counter and tendermint" + counter --serial --tmsp grpc > /dev/null & + pid_counter=$! + GRPC_PORT=36656 + tendermint node --tmsp grpc --grpc_laddr tcp://localhost:$GRPC_PORT > tendermint.log & + pid_tendermint=$! + sleep 5 + + echo "running test" + GRPC_BROADCAST_TX=true bash counter_test.sh "Counter over GRPC via GRPC BroadcastTx" + + kill -9 $pid_counter $pid_tendermint +} + cd $GOPATH/src/github.com/tendermint/tendermint/test/app case "$1" in @@ -92,6 +109,9 @@ case "$1" in "counter_over_grpc") counter_over_grpc ;; + "counter_over_grpc_grpc") + counter_over_grpc_grpc + ;; *) echo "Running all" dummy_over_socket @@ -101,5 +121,7 @@ case "$1" in counter_over_socket echo "" counter_over_grpc + echo "" + counter_over_grpc_grpc esac From 2ef695da9794af914a9ae7906021d0eaff332803 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 27 Aug 2016 16:37:52 -0400 Subject: [PATCH 2/3] include check/append responses in broadcast_tx_commit --- rpc/core/mempool.go | 42 +++---- rpc/core/types/responses.go | 11 +- rpc/grpc/api.go | 4 +- rpc/grpc/compile.sh | 3 + rpc/grpc/types.pb.go | 224 +++++------------------------------- rpc/grpc/types.proto | 20 +--- rpc/test/client_test.go | 11 +- rpc/test/grpc_test.go | 7 +- 8 files changed, 73 insertions(+), 249 deletions(-) create mode 100644 rpc/grpc/compile.sh diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index ad599d228..0a57167ec 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -39,16 +39,9 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { }, nil } -// CONTRACT: returns error==nil iff the tx is included in a block. -// -// If CheckTx fails, return with the response from CheckTx AND an error. -// Else, block until the tx is included in a block, -// and return the result of AppendTx (with no error). -// Even if AppendTx fails, so long as the tx is included in a block this function -// will not return an error - it is the caller's responsibility to check res.Code. -// The function times out after five minutes and returns the result of CheckTx and an error. -// TODO: smarter timeout logic or someway to cancel (tx not getting committed is a sign of a larger problem!) -func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +// CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app) +// or if we timeout waiting for tx to commit +func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // subscribe to tx being committed in block appendTxResCh := make(chan types.EventDataTx, 1) @@ -66,13 +59,12 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { } checkTxRes := <-checkTxResCh checkTxR := checkTxRes.GetCheckTx() - if r := checkTxR; r.Code != tmsp.CodeType_OK { + if checkTxR.Code != tmsp.CodeType_OK { // CheckTx failed! - return &ctypes.ResultBroadcastTx{ - Code: r.Code, - Data: r.Data, - Log: r.Log, - }, fmt.Errorf("Check tx failed with non-zero code: %s. Data: %X; Log: %s", r.Code.String(), r.Data, r.Log) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: checkTxR, + AppendTx: nil, + }, nil } // Wait for the tx to be included in a block, @@ -81,19 +73,15 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { select { case appendTxRes := <-appendTxResCh: // The tx was included in a block. - // NOTE we don't return an error regardless of the AppendTx code; - // clients must check this to see if they need to send a new tx! - return &ctypes.ResultBroadcastTx{ - Code: appendTxRes.Code, - Data: appendTxRes.Result, - Log: appendTxRes.Log, + appendTxR := appendTxRes.GetAppendTx() + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: checkTxR, + AppendTx: appendTxR, }, nil case <-timer.C: - r := checkTxR - return &ctypes.ResultBroadcastTx{ - Code: r.Code, - Data: r.Data, - Log: r.Log, + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: checkTxR, + AppendTx: nil, }, fmt.Errorf("Timed out waiting for transaction to be included in a block") } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index f5f6bae02..0befac673 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -63,6 +63,11 @@ type ResultBroadcastTx struct { Log string `json:"log"` } +type ResultBroadcastTxCommit struct { + CheckTx *tmsp.ResponseCheckTx `json:"check_tx"` + AppendTx *tmsp.ResponseAppendTx `json:"append_tx"` +} + type ResultUnconfirmedTxs struct { N int `json:"n_txs"` Txs []types.Tx `json:"txs"` @@ -115,8 +120,9 @@ const ( ResultTypeDumpConsensusState = byte(0x41) // 0x6 bytes are for txs / the application - ResultTypeBroadcastTx = byte(0x60) - ResultTypeUnconfirmedTxs = byte(0x61) + ResultTypeBroadcastTx = byte(0x60) + ResultTypeUnconfirmedTxs = byte(0x61) + ResultTypeBroadcastTxCommit = byte(0x62) // 0x7 bytes are for querying the application ResultTypeTMSPQuery = byte(0x70) @@ -151,6 +157,7 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultValidators{}, ResultTypeValidators}, wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState}, wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx}, + wire.ConcreteType{&ResultBroadcastTxCommit{}, ResultTypeBroadcastTxCommit}, wire.ConcreteType{&ResultUnconfirmedTxs{}, ResultTypeUnconfirmedTxs}, wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, diff --git a/rpc/grpc/api.go b/rpc/grpc/api.go index 190500547..c8b8dce75 100644 --- a/rpc/grpc/api.go +++ b/rpc/grpc/api.go @@ -11,8 +11,8 @@ type broadcastAPI struct { func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) { res, err := core.BroadcastTxCommit(req.Tx) - if res == nil { + if err != nil { return nil, err } - return &ResponseBroadcastTx{uint64(res.Code), res.Data, res.Log}, err + return &ResponseBroadcastTx{res.CheckTx, res.AppendTx}, nil } diff --git a/rpc/grpc/compile.sh b/rpc/grpc/compile.sh new file mode 100644 index 000000000..2c4629c8e --- /dev/null +++ b/rpc/grpc/compile.sh @@ -0,0 +1,3 @@ +#! /bin/bash + +protoc --go_out=plugins=grpc:. -I $GOPATH/src/ -I . types.proto diff --git a/rpc/grpc/types.pb.go b/rpc/grpc/types.pb.go index 51dfb6974..06ff3f879 100644 --- a/rpc/grpc/types.pb.go +++ b/rpc/grpc/types.pb.go @@ -9,9 +9,7 @@ It is generated from these files: types.proto It has these top-level messages: - Request RequestBroadcastTx - Response ResponseBroadcastTx */ package core_grpc @@ -19,6 +17,7 @@ package core_grpc import proto "github.com/golang/protobuf/proto" import fmt "fmt" import math "math" +import types "github.com/tendermint/tmsp/types" import ( context "golang.org/x/net/context" @@ -36,96 +35,6 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package -type Request struct { - // Types that are valid to be assigned to Value: - // *Request_BroadcastTx - Value isRequest_Value `protobuf_oneof:"value"` -} - -func (m *Request) Reset() { *m = Request{} } -func (m *Request) String() string { return proto.CompactTextString(m) } -func (*Request) ProtoMessage() {} -func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } - -type isRequest_Value interface { - isRequest_Value() -} - -type Request_BroadcastTx struct { - BroadcastTx *RequestBroadcastTx `protobuf:"bytes,1,opt,name=broadcast_tx,json=broadcastTx,oneof"` -} - -func (*Request_BroadcastTx) isRequest_Value() {} - -func (m *Request) GetValue() isRequest_Value { - if m != nil { - return m.Value - } - return nil -} - -func (m *Request) GetBroadcastTx() *RequestBroadcastTx { - if x, ok := m.GetValue().(*Request_BroadcastTx); ok { - return x.BroadcastTx - } - return nil -} - -// XXX_OneofFuncs is for the internal use of the proto package. -func (*Request) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { - return _Request_OneofMarshaler, _Request_OneofUnmarshaler, _Request_OneofSizer, []interface{}{ - (*Request_BroadcastTx)(nil), - } -} - -func _Request_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { - m := msg.(*Request) - // value - switch x := m.Value.(type) { - case *Request_BroadcastTx: - b.EncodeVarint(1<<3 | proto.WireBytes) - if err := b.EncodeMessage(x.BroadcastTx); err != nil { - return err - } - case nil: - default: - return fmt.Errorf("Request.Value has unexpected type %T", x) - } - return nil -} - -func _Request_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { - m := msg.(*Request) - switch tag { - case 1: // value.broadcast_tx - if wire != proto.WireBytes { - return true, proto.ErrInternalBadWireType - } - msg := new(RequestBroadcastTx) - err := b.DecodeMessage(msg) - m.Value = &Request_BroadcastTx{msg} - return true, err - default: - return false, nil - } -} - -func _Request_OneofSizer(msg proto.Message) (n int) { - m := msg.(*Request) - // value - switch x := m.Value.(type) { - case *Request_BroadcastTx: - s := proto.Size(x.BroadcastTx) - n += proto.SizeVarint(1<<3 | proto.WireBytes) - n += proto.SizeVarint(uint64(s)) - n += s - case nil: - default: - panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) - } - return n -} - type RequestBroadcastTx struct { Tx []byte `protobuf:"bytes,1,opt,name=tx,proto3" json:"tx,omitempty"` } @@ -133,113 +42,34 @@ type RequestBroadcastTx struct { func (m *RequestBroadcastTx) Reset() { *m = RequestBroadcastTx{} } func (m *RequestBroadcastTx) String() string { return proto.CompactTextString(m) } func (*RequestBroadcastTx) ProtoMessage() {} -func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } -type Response struct { - // Types that are valid to be assigned to Value: - // *Response_BroadcastTx - Value isResponse_Value `protobuf_oneof:"value"` -} - -func (m *Response) Reset() { *m = Response{} } -func (m *Response) String() string { return proto.CompactTextString(m) } -func (*Response) ProtoMessage() {} -func (*Response) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } - -type isResponse_Value interface { - isResponse_Value() -} - -type Response_BroadcastTx struct { - BroadcastTx *ResponseBroadcastTx `protobuf:"bytes,1,opt,name=broadcast_tx,json=broadcastTx,oneof"` +type ResponseBroadcastTx struct { + CheckTx *types.ResponseCheckTx `protobuf:"bytes,1,opt,name=check_tx,json=checkTx" json:"check_tx,omitempty"` + AppendTx *types.ResponseAppendTx `protobuf:"bytes,2,opt,name=append_tx,json=appendTx" json:"append_tx,omitempty"` } -func (*Response_BroadcastTx) isResponse_Value() {} +func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} } +func (m *ResponseBroadcastTx) String() string { return proto.CompactTextString(m) } +func (*ResponseBroadcastTx) ProtoMessage() {} +func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } -func (m *Response) GetValue() isResponse_Value { +func (m *ResponseBroadcastTx) GetCheckTx() *types.ResponseCheckTx { if m != nil { - return m.Value + return m.CheckTx } return nil } -func (m *Response) GetBroadcastTx() *ResponseBroadcastTx { - if x, ok := m.GetValue().(*Response_BroadcastTx); ok { - return x.BroadcastTx - } - return nil -} - -// XXX_OneofFuncs is for the internal use of the proto package. -func (*Response) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { - return _Response_OneofMarshaler, _Response_OneofUnmarshaler, _Response_OneofSizer, []interface{}{ - (*Response_BroadcastTx)(nil), - } -} - -func _Response_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { - m := msg.(*Response) - // value - switch x := m.Value.(type) { - case *Response_BroadcastTx: - b.EncodeVarint(1<<3 | proto.WireBytes) - if err := b.EncodeMessage(x.BroadcastTx); err != nil { - return err - } - case nil: - default: - return fmt.Errorf("Response.Value has unexpected type %T", x) +func (m *ResponseBroadcastTx) GetAppendTx() *types.ResponseAppendTx { + if m != nil { + return m.AppendTx } return nil } -func _Response_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { - m := msg.(*Response) - switch tag { - case 1: // value.broadcast_tx - if wire != proto.WireBytes { - return true, proto.ErrInternalBadWireType - } - msg := new(ResponseBroadcastTx) - err := b.DecodeMessage(msg) - m.Value = &Response_BroadcastTx{msg} - return true, err - default: - return false, nil - } -} - -func _Response_OneofSizer(msg proto.Message) (n int) { - m := msg.(*Response) - // value - switch x := m.Value.(type) { - case *Response_BroadcastTx: - s := proto.Size(x.BroadcastTx) - n += proto.SizeVarint(1<<3 | proto.WireBytes) - n += proto.SizeVarint(uint64(s)) - n += s - case nil: - default: - panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) - } - return n -} - -type ResponseBroadcastTx struct { - Code uint64 `protobuf:"varint,1,opt,name=code" json:"code,omitempty"` - Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` - Log string `protobuf:"bytes,3,opt,name=log" json:"log,omitempty"` -} - -func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} } -func (m *ResponseBroadcastTx) String() string { return proto.CompactTextString(m) } -func (*ResponseBroadcastTx) ProtoMessage() {} -func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } - func init() { - proto.RegisterType((*Request)(nil), "core_grpc.Request") proto.RegisterType((*RequestBroadcastTx)(nil), "core_grpc.RequestBroadcastTx") - proto.RegisterType((*Response)(nil), "core_grpc.Response") proto.RegisterType((*ResponseBroadcastTx)(nil), "core_grpc.ResponseBroadcastTx") } @@ -318,19 +148,19 @@ var _BroadcastAPI_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("types.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 222 bytes of a gzipped FileDescriptorProto + // 223 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0xa9, 0x2c, 0x48, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0xce, 0x2f, 0x4a, 0x8d, 0x4f, 0x2f, - 0x2a, 0x48, 0x56, 0x0a, 0xe3, 0x62, 0x0f, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x72, 0xe2, - 0xe2, 0x49, 0x2a, 0xca, 0x4f, 0x4c, 0x49, 0x4e, 0x2c, 0x2e, 0x89, 0x2f, 0xa9, 0x90, 0x60, 0x54, - 0x60, 0xd4, 0xe0, 0x36, 0x92, 0xd5, 0x83, 0x2b, 0xd6, 0x83, 0xaa, 0x74, 0x82, 0xa9, 0x0a, 0xa9, - 0xf0, 0x60, 0x08, 0xe2, 0x4e, 0x42, 0x70, 0x9d, 0xd8, 0xb9, 0x58, 0xcb, 0x12, 0x73, 0x4a, 0x53, - 0x95, 0x54, 0xb8, 0x84, 0x30, 0x55, 0x0b, 0xf1, 0x71, 0x31, 0x41, 0x0d, 0xe6, 0x09, 0x02, 0xb2, - 0x94, 0x22, 0xb8, 0x38, 0x82, 0x52, 0x8b, 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0x85, 0x9c, 0xb1, 0x5a, - 0x2f, 0x87, 0x62, 0x3d, 0x44, 0x29, 0x31, 0xf6, 0xfb, 0x73, 0x09, 0x63, 0x51, 0x2e, 0x24, 0xc4, - 0xc5, 0x92, 0x9c, 0x9f, 0x92, 0x0a, 0x36, 0x9c, 0x25, 0x08, 0xcc, 0x06, 0x89, 0xa5, 0x24, 0x96, - 0x24, 0x4a, 0x30, 0x81, 0x9d, 0x05, 0x66, 0x0b, 0x09, 0x70, 0x31, 0xe7, 0xe4, 0xa7, 0x4b, 0x30, - 0x03, 0x85, 0x38, 0x83, 0x40, 0x4c, 0xa3, 0x18, 0x2e, 0x1e, 0xb8, 0x41, 0x8e, 0x01, 0x9e, 0x42, - 0x3e, 0x5c, 0xdc, 0xc8, 0x06, 0xe3, 0x0f, 0x26, 0x29, 0x02, 0xde, 0x48, 0x62, 0x03, 0x47, 0x8c, - 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xb2, 0x77, 0xda, 0x85, 0xa7, 0x01, 0x00, 0x00, + 0x2a, 0x48, 0x96, 0xd2, 0x49, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f, + 0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0xc9, 0x2d, 0x2e, 0xd0, 0x07, + 0x6b, 0xd1, 0x47, 0xd2, 0xa8, 0xa4, 0xc2, 0x25, 0x14, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0xe2, + 0x54, 0x94, 0x9f, 0x98, 0x92, 0x9c, 0x58, 0x5c, 0x12, 0x52, 0x21, 0xc4, 0xc7, 0xc5, 0x54, 0x52, + 0x21, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0x04, 0x64, 0x29, 0xd5, 0x71, 0x09, 0x07, 0xa5, 0x16, + 0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x22, 0x2b, 0x33, 0xe4, 0xe2, 0x48, 0xce, 0x48, 0x4d, 0xce, 0x8e, + 0x87, 0x2a, 0xe6, 0x36, 0x12, 0xd3, 0x83, 0x18, 0x0e, 0x53, 0xed, 0x0c, 0x92, 0x0e, 0xa9, 0x08, + 0x62, 0x4f, 0x86, 0x30, 0x84, 0x4c, 0xb8, 0x38, 0x13, 0x0b, 0x0a, 0x80, 0xce, 0x02, 0xe9, 0x61, + 0x02, 0xeb, 0x11, 0x47, 0xd3, 0xe3, 0x08, 0x96, 0x07, 0x6a, 0xe2, 0x48, 0x84, 0xb2, 0x8c, 0x62, + 0xb8, 0x78, 0xe0, 0xf6, 0x3a, 0x06, 0x78, 0x0a, 0xf9, 0x70, 0x71, 0x23, 0xbb, 0x43, 0x56, 0x0f, + 0xee, 0x7d, 0x3d, 0x4c, 0xdf, 0x48, 0xc9, 0xa1, 0x48, 0x63, 0x78, 0x23, 0x89, 0x0d, 0x1c, 0x14, + 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0x73, 0x87, 0xb0, 0x52, 0x01, 0x00, 0x00, } diff --git a/rpc/grpc/types.proto b/rpc/grpc/types.proto index 2f2b96de3..ec7f0d1e6 100644 --- a/rpc/grpc/types.proto +++ b/rpc/grpc/types.proto @@ -1,18 +1,14 @@ syntax = "proto3"; package core_grpc; +import "github.com/tendermint/tmsp/types/types.proto"; + //---------------------------------------- // Message types //---------------------------------------- // Request types -message Request { - oneof value{ - RequestBroadcastTx broadcast_tx = 1; - } -} - message RequestBroadcastTx { bytes tx = 1; } @@ -20,17 +16,9 @@ message RequestBroadcastTx { //---------------------------------------- // Response types - -message Response { - oneof value{ - ResponseBroadcastTx broadcast_tx = 1; - } -} - message ResponseBroadcastTx{ - uint64 code = 1; // TODO: import tmsp ... - bytes data = 2; - string log = 3; + types.ResponseCheckTx check_tx = 1; + types.ResponseAppendTx append_tx = 2; } //---------------------------------------- diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index 728e87bd3..73729b4f6 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -193,9 +193,14 @@ func TestJSONBroadcastTxCommit(t *testing.T) { func testBroadcastTxCommit(t *testing.T, resI interface{}, tx []byte) { tmRes := resI.(*ctypes.TMResult) - res := (*tmRes).(*ctypes.ResultBroadcastTx) - if res.Code != tmsp.CodeType_OK { - panic(Fmt("BroadcastTxCommit got non-zero exit code: %v. %X; %s", res.Code, res.Data, res.Log)) + res := (*tmRes).(*ctypes.ResultBroadcastTxCommit) + checkTx := res.CheckTx + if checkTx.Code != tmsp.CodeType_OK { + panic(Fmt("BroadcastTxCommit got non-zero exit code from CheckTx: %v. %X; %s", checkTx.Code, checkTx.Data, checkTx.Log)) + } + appendTx := res.AppendTx + if appendTx.Code != tmsp.CodeType_OK { + panic(Fmt("BroadcastTxCommit got non-zero exit code from CheckTx: %v. %X; %s", appendTx.Code, appendTx.Data, appendTx.Log)) } mem := node.MempoolReactor().Mempool if mem.Size() != 0 { diff --git a/rpc/test/grpc_test.go b/rpc/test/grpc_test.go index 8fad465be..13672773c 100644 --- a/rpc/test/grpc_test.go +++ b/rpc/test/grpc_test.go @@ -15,7 +15,10 @@ func TestBroadcastTx(t *testing.T) { if err != nil { t.Fatal(err) } - if res.Code != 0 { - t.Fatalf("Non-zero code: %d", res.Code) + if res.CheckTx.Code != 0 { + t.Fatalf("Non-zero check tx code: %d", res.CheckTx.Code) + } + if res.AppendTx.Code != 0 { + t.Fatalf("Non-zero append tx code: %d", res.AppendTx.Code) } } From b74a97a4f63f1627c907528ff00b7e1d6175f31f Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 30 Nov 2016 17:28:41 -0500 Subject: [PATCH 3/3] update grpc broadcast tx --- consensus/state.go | 5 ++- rpc/core/mempool.go | 16 +++++++-- rpc/grpc/types.pb.go | 30 ++++++++++------ state/execution.go | 10 +++--- test/app/counter_test.sh | 76 +++++++++++++++++++++++++++------------- test/app/test.sh | 1 + types/events.go | 10 +++--- 7 files changed, 98 insertions(+), 50 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index bef286e16..2bf94b7e8 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1274,7 +1274,10 @@ func (cs *ConsensusState) finalizeCommit(height int) { // Execute and commit the block, and update the mempool. // All calls to the proxyAppConn should come here. // NOTE: the block.AppHash wont reflect these txs until the next block - stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) + err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) + if err != nil { + // TODO! + } fail.Fail() // XXX diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 0a57167ec..3ce9d32c6 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -40,7 +40,9 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { } // CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app) -// or if we timeout waiting for tx to commit +// or if we timeout waiting for tx to commit. +// If CheckTx or AppendTx fail, no error will be returned, but the returned result +// will contain a non-OK TMSP code. func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // subscribe to tx being committed in block @@ -55,6 +57,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { checkTxResCh <- res }) if err != nil { + log.Error("err", "err", err) return nil, fmt.Errorf("Error broadcasting transaction: %v", err) } checkTxRes := <-checkTxResCh @@ -69,16 +72,23 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // Wait for the tx to be included in a block, // timeout after something reasonable. - timer := time.NewTimer(60 * 5 * time.Second) + // TODO: configureable? + timer := time.NewTimer(60 * 2 * time.Second) select { case appendTxRes := <-appendTxResCh: // The tx was included in a block. - appendTxR := appendTxRes.GetAppendTx() + appendTxR := &tmsp.ResponseAppendTx{ + Code: appendTxRes.Code, + Data: appendTxRes.Data, + Log: appendTxRes.Log, + } + log.Error("appendtx passed ", "r", appendTxR) return &ctypes.ResultBroadcastTxCommit{ CheckTx: checkTxR, AppendTx: appendTxR, }, nil case <-timer.C: + log.Error("failed to include tx") return &ctypes.ResultBroadcastTxCommit{ CheckTx: checkTxR, AppendTx: nil, diff --git a/rpc/grpc/types.pb.go b/rpc/grpc/types.pb.go index 06ff3f879..225110c23 100644 --- a/rpc/grpc/types.pb.go +++ b/rpc/grpc/types.pb.go @@ -44,6 +44,13 @@ func (m *RequestBroadcastTx) String() string { return proto.CompactTe func (*RequestBroadcastTx) ProtoMessage() {} func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (m *RequestBroadcastTx) GetTx() []byte { + if m != nil { + return m.Tx + } + return nil +} + type ResponseBroadcastTx struct { CheckTx *types.ResponseCheckTx `protobuf:"bytes,1,opt,name=check_tx,json=checkTx" json:"check_tx,omitempty"` AppendTx *types.ResponseAppendTx `protobuf:"bytes,2,opt,name=append_tx,json=appendTx" json:"append_tx,omitempty"` @@ -79,7 +86,7 @@ var _ grpc.ClientConn // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion3 +const _ = grpc.SupportPackageIsVersion4 // Client API for BroadcastAPI service @@ -142,25 +149,26 @@ var _BroadcastAPI_serviceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{}, - Metadata: fileDescriptor0, + Metadata: "types.proto", } func init() { proto.RegisterFile("types.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 223 bytes of a gzipped FileDescriptorProto + // 226 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0xa9, 0x2c, 0x48, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0xce, 0x2f, 0x4a, 0x8d, 0x4f, 0x2f, 0x2a, 0x48, 0x96, 0xd2, 0x49, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f, 0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0xc9, 0x2d, 0x2e, 0xd0, 0x07, 0x6b, 0xd1, 0x47, 0xd2, 0xa8, 0xa4, 0xc2, 0x25, 0x14, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0xe2, 0x54, 0x94, 0x9f, 0x98, 0x92, 0x9c, 0x58, 0x5c, 0x12, 0x52, 0x21, 0xc4, 0xc7, 0xc5, 0x54, 0x52, - 0x21, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0x04, 0x64, 0x29, 0xd5, 0x71, 0x09, 0x07, 0xa5, 0x16, - 0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x22, 0x2b, 0x33, 0xe4, 0xe2, 0x48, 0xce, 0x48, 0x4d, 0xce, 0x8e, - 0x87, 0x2a, 0xe6, 0x36, 0x12, 0xd3, 0x83, 0x18, 0x0e, 0x53, 0xed, 0x0c, 0x92, 0x0e, 0xa9, 0x08, - 0x62, 0x4f, 0x86, 0x30, 0x84, 0x4c, 0xb8, 0x38, 0x13, 0x0b, 0x0a, 0x80, 0xce, 0x02, 0xe9, 0x61, - 0x02, 0xeb, 0x11, 0x47, 0xd3, 0xe3, 0x08, 0x96, 0x07, 0x6a, 0xe2, 0x48, 0x84, 0xb2, 0x8c, 0x62, - 0xb8, 0x78, 0xe0, 0xf6, 0x3a, 0x06, 0x78, 0x0a, 0xf9, 0x70, 0x71, 0x23, 0xbb, 0x43, 0x56, 0x0f, - 0xee, 0x7d, 0x3d, 0x4c, 0xdf, 0x48, 0xc9, 0xa1, 0x48, 0x63, 0x78, 0x23, 0x89, 0x0d, 0x1c, 0x14, - 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0x73, 0x87, 0xb0, 0x52, 0x01, 0x00, 0x00, + 0x21, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0xc4, 0x54, 0x52, 0xa1, 0x54, 0xc7, 0x25, 0x1c, 0x94, + 0x5a, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0xac, 0xcc, 0x90, 0x8b, 0x23, 0x39, 0x23, 0x35, 0x39, + 0x3b, 0x1e, 0xaa, 0x98, 0xdb, 0x48, 0x4c, 0x0f, 0x62, 0x38, 0x4c, 0xb5, 0x33, 0x48, 0x3a, 0xa4, + 0x22, 0x88, 0x3d, 0x19, 0xc2, 0x10, 0x32, 0xe1, 0xe2, 0x4c, 0x2c, 0x28, 0x48, 0xcd, 0x4b, 0x01, + 0xe9, 0x61, 0x02, 0xeb, 0x11, 0x47, 0xd3, 0xe3, 0x08, 0x96, 0x0f, 0xa9, 0x08, 0xe2, 0x48, 0x84, + 0xb2, 0x8c, 0x62, 0xb8, 0x78, 0xe0, 0xf6, 0x3a, 0x06, 0x78, 0x0a, 0xf9, 0x70, 0x71, 0x23, 0xbb, + 0x43, 0x56, 0x0f, 0xee, 0x7d, 0x3d, 0x4c, 0xdf, 0x48, 0xc9, 0xa1, 0x48, 0x63, 0x78, 0x23, 0x89, + 0x0d, 0x1c, 0x14, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0x73, 0x87, 0xb0, 0x52, 0x01, + 0x00, 0x00, } diff --git a/state/execution.go b/state/execution.go index 07466c777..0eb4adcb9 100644 --- a/state/execution.go +++ b/state/execution.go @@ -88,11 +88,11 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo // NOTE: if we count we can access the tx from the block instead of // pulling it from the req event := types.EventDataTx{ - Tx: req.GetAppendTx().Tx, - Result: apTx.Data, - Code: apTx.Code, - Log: apTx.Log, - Error: txError, + Tx: req.GetAppendTx().Tx, + Data: apTx.Data, + Code: apTx.Code, + Log: apTx.Log, + Error: txError, } types.FireEventTx(eventCache, event) } diff --git a/test/app/counter_test.sh b/test/app/counter_test.sh index b8d670dc2..80b9b27b4 100644 --- a/test/app/counter_test.sh +++ b/test/app/counter_test.sh @@ -1,4 +1,5 @@ #! /bin/bash +set -u ##################### # counter over socket @@ -7,6 +8,19 @@ TESTNAME=$1 # Send some txs +function getCode() { + R=$1 + if [[ "$R" == "{}" ]]; then + # protobuf auto adds `omitempty` to everything so code OK and empty data/log + # will not even show when marshalled into json + # apparently we can use github.com/golang/protobuf/jsonpb to do the marshalling ... + echo 0 + else + # this wont actually work if theres an error ... + echo "$R" | jq .code + fi +} + function sendTx() { TX=$1 if [[ "$GRPC_BROADCAST_TX" == "" ]]; then @@ -15,7 +29,10 @@ function sendTx() { ERROR=`echo $RESPONSE | jq .error` ERROR=$(echo "$ERROR" | tr -d '"') # remove surrounding quotes else - RESPONSE=`go run grpc_client.go $TX` + if [ ! -f grpc_client ]; then + go build -o grpc_client grpc_client.go + fi + RESPONSE=`./grpc_client $TX` echo $RESPONSE | jq . &> /dev/null IS_JSON=$? if [[ "$IS_JSON" != "0" ]]; then @@ -23,72 +40,81 @@ function sendTx() { else ERROR="" # reset fi + APPEND_TX_RESPONSE=`echo $RESPONSE | jq .append_tx` + APPEND_TX_CODE=`getCode "$APPEND_TX_RESPONSE"` + CHECK_TX_RESPONSE=`echo $RESPONSE | jq .check_tx` + CHECK_TX_CODE=`getCode "$CHECK_TX_RESPONSE"` - if [[ "$RESPONSE" == "{}" ]]; then - # protobuf auto adds `omitempty` to everything so code OK and empty data/log - # will not even show when marshalled into json - # apparently we can use github.com/golang/protobuf/jsonpb to do the marshalling ... - CODE=0 - else - # this wont actually work if theres an error ... - CODE=`echo $RESPONSE | jq .code` - fi - #echo "-------" - #echo "TX $TX" - #echo "RESPONSE $RESPONSE" - #echo "CODE $CODE" - #echo "ERROR $ERROR" - #echo "----" + echo "-------" + echo "TX $TX" + echo "RESPONSE $RESPONSE" + echo "CHECK_TX_RESPONSE $CHECK_TX_RESPONSE" + echo "APPEND_TX_RESPONSE $APPEND_TX_RESPONSE" + echo "CHECK_TX_CODE $CHECK_TX_CODE" + echo "APPEND_TX_CODE $APPEND_TX_CODE" + echo "----" fi } +echo "... sending tx. expect no error" + # 0 should pass once and get in block, with no error TX=00 sendTx $TX -if [[ $CODE != 0 ]]; then +if [[ $APPEND_TX_CODE != 0 ]]; then echo "Got non-zero exit code for $TX. $RESPONSE" exit 1 fi -if [[ "$ERROR" != "" ]]; then + +if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" != "" ]]; then echo "Unexpected error. Tx $TX should have been included in a block. $ERROR" exit 1 fi - +echo "... sending tx. expect error" # second time should get rejected by the mempool (return error and non-zero code) sendTx $TX -if [[ $CODE == 0 ]]; then +echo "CHECKTX CODE: $CHECK_TX_CODE" +if [[ "$CHECK_TX_CODE" == 0 ]]; then echo "Got zero exit code for $TX. Expected tx to be rejected by mempool. $RESPONSE" exit 1 fi -if [[ "$ERROR" == "" ]]; then +if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" == "" ]]; then echo "Expected to get an error - tx $TX should have been rejected from mempool" echo "$RESPONSE" exit 1 fi +echo "... sending tx. expect no error" + # now, TX=01 should pass, with no error TX=01 sendTx $TX -if [[ $CODE != 0 ]]; then +if [[ $APPEND_TX_CODE != 0 ]]; then echo "Got non-zero exit code for $TX. $RESPONSE" exit 1 fi -if [[ "$ERROR" != "" ]]; then +if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" != "" ]]; then echo "Unexpected error. Tx $TX should have been accepted in block. $ERROR" exit 1 fi +echo "... sending tx. expect no error, but invalid" + # now, TX=03 should get in a block (passes CheckTx, no error), but is invalid TX=03 sendTx $TX -if [[ $CODE == 0 ]]; then +if [[ "$CHECK_TX_CODE" != 0 ]]; then + echo "Got non-zero exit code for checktx on $TX. $RESPONSE" + exit 1 +fi +if [[ $APPEND_TX_CODE == 0 ]]; then echo "Got zero exit code for $TX. Should have been bad nonce. $RESPONSE" exit 1 fi -if [[ "$ERROR" != "" ]]; then +if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" != "" ]]; then echo "Unexpected error. Tx $TX should have been included in a block. $ERROR" exit 1 fi diff --git a/test/app/test.sh b/test/app/test.sh index f73ab6960..4830c2b15 100644 --- a/test/app/test.sh +++ b/test/app/test.sh @@ -83,6 +83,7 @@ function counter_over_grpc_grpc() { echo "Starting counter and tendermint" counter --serial --tmsp grpc > /dev/null & pid_counter=$! + sleep 1 GRPC_PORT=36656 tendermint node --tmsp grpc --grpc_laddr tcp://localhost:$GRPC_PORT > tendermint.log & pid_tendermint=$! diff --git a/types/events.go b/types/events.go index c6eb7611a..8f7a5bbf0 100644 --- a/types/events.go +++ b/types/events.go @@ -73,11 +73,11 @@ type EventDataNewBlockHeader struct { // All txs fire EventDataTx type EventDataTx struct { - Tx Tx `json:"tx"` - Result []byte `json:"result"` - Log string `json:"log"` - Code tmsp.CodeType `json:"code"` - Error string `json:"error"` + Tx Tx `json:"tx"` + Data []byte `json:"data"` + Log string `json:"log"` + Code tmsp.CodeType `json:"code"` + Error string `json:"error"` // this is redundant information for now } // NOTE: This goes into the replay WAL