From 2ef695da9794af914a9ae7906021d0eaff332803 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 27 Aug 2016 16:37:52 -0400 Subject: [PATCH] include check/append responses in broadcast_tx_commit --- rpc/core/mempool.go | 42 +++---- rpc/core/types/responses.go | 11 +- rpc/grpc/api.go | 4 +- rpc/grpc/compile.sh | 3 + rpc/grpc/types.pb.go | 224 +++++------------------------------- rpc/grpc/types.proto | 20 +--- rpc/test/client_test.go | 11 +- rpc/test/grpc_test.go | 7 +- 8 files changed, 73 insertions(+), 249 deletions(-) create mode 100644 rpc/grpc/compile.sh diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index ad599d228..0a57167ec 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -39,16 +39,9 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { }, nil } -// CONTRACT: returns error==nil iff the tx is included in a block. -// -// If CheckTx fails, return with the response from CheckTx AND an error. -// Else, block until the tx is included in a block, -// and return the result of AppendTx (with no error). -// Even if AppendTx fails, so long as the tx is included in a block this function -// will not return an error - it is the caller's responsibility to check res.Code. -// The function times out after five minutes and returns the result of CheckTx and an error. -// TODO: smarter timeout logic or someway to cancel (tx not getting committed is a sign of a larger problem!) -func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +// CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app) +// or if we timeout waiting for tx to commit +func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // subscribe to tx being committed in block appendTxResCh := make(chan types.EventDataTx, 1) @@ -66,13 +59,12 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { } checkTxRes := <-checkTxResCh checkTxR := checkTxRes.GetCheckTx() - if r := checkTxR; r.Code != tmsp.CodeType_OK { + if checkTxR.Code != tmsp.CodeType_OK { // CheckTx failed! - return &ctypes.ResultBroadcastTx{ - Code: r.Code, - Data: r.Data, - Log: r.Log, - }, fmt.Errorf("Check tx failed with non-zero code: %s. Data: %X; Log: %s", r.Code.String(), r.Data, r.Log) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: checkTxR, + AppendTx: nil, + }, nil } // Wait for the tx to be included in a block, @@ -81,19 +73,15 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { select { case appendTxRes := <-appendTxResCh: // The tx was included in a block. - // NOTE we don't return an error regardless of the AppendTx code; - // clients must check this to see if they need to send a new tx! - return &ctypes.ResultBroadcastTx{ - Code: appendTxRes.Code, - Data: appendTxRes.Result, - Log: appendTxRes.Log, + appendTxR := appendTxRes.GetAppendTx() + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: checkTxR, + AppendTx: appendTxR, }, nil case <-timer.C: - r := checkTxR - return &ctypes.ResultBroadcastTx{ - Code: r.Code, - Data: r.Data, - Log: r.Log, + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: checkTxR, + AppendTx: nil, }, fmt.Errorf("Timed out waiting for transaction to be included in a block") } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index f5f6bae02..0befac673 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -63,6 +63,11 @@ type ResultBroadcastTx struct { Log string `json:"log"` } +type ResultBroadcastTxCommit struct { + CheckTx *tmsp.ResponseCheckTx `json:"check_tx"` + AppendTx *tmsp.ResponseAppendTx `json:"append_tx"` +} + type ResultUnconfirmedTxs struct { N int `json:"n_txs"` Txs []types.Tx `json:"txs"` @@ -115,8 +120,9 @@ const ( ResultTypeDumpConsensusState = byte(0x41) // 0x6 bytes are for txs / the application - ResultTypeBroadcastTx = byte(0x60) - ResultTypeUnconfirmedTxs = byte(0x61) + ResultTypeBroadcastTx = byte(0x60) + ResultTypeUnconfirmedTxs = byte(0x61) + ResultTypeBroadcastTxCommit = byte(0x62) // 0x7 bytes are for querying the application ResultTypeTMSPQuery = byte(0x70) @@ -151,6 +157,7 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultValidators{}, ResultTypeValidators}, wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState}, wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx}, + wire.ConcreteType{&ResultBroadcastTxCommit{}, ResultTypeBroadcastTxCommit}, wire.ConcreteType{&ResultUnconfirmedTxs{}, ResultTypeUnconfirmedTxs}, wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, diff --git a/rpc/grpc/api.go b/rpc/grpc/api.go index 190500547..c8b8dce75 100644 --- a/rpc/grpc/api.go +++ b/rpc/grpc/api.go @@ -11,8 +11,8 @@ type broadcastAPI struct { func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) { res, err := core.BroadcastTxCommit(req.Tx) - if res == nil { + if err != nil { return nil, err } - return &ResponseBroadcastTx{uint64(res.Code), res.Data, res.Log}, err + return &ResponseBroadcastTx{res.CheckTx, res.AppendTx}, nil } diff --git a/rpc/grpc/compile.sh b/rpc/grpc/compile.sh new file mode 100644 index 000000000..2c4629c8e --- /dev/null +++ b/rpc/grpc/compile.sh @@ -0,0 +1,3 @@ +#! /bin/bash + +protoc --go_out=plugins=grpc:. -I $GOPATH/src/ -I . types.proto diff --git a/rpc/grpc/types.pb.go b/rpc/grpc/types.pb.go index 51dfb6974..06ff3f879 100644 --- a/rpc/grpc/types.pb.go +++ b/rpc/grpc/types.pb.go @@ -9,9 +9,7 @@ It is generated from these files: types.proto It has these top-level messages: - Request RequestBroadcastTx - Response ResponseBroadcastTx */ package core_grpc @@ -19,6 +17,7 @@ package core_grpc import proto "github.com/golang/protobuf/proto" import fmt "fmt" import math "math" +import types "github.com/tendermint/tmsp/types" import ( context "golang.org/x/net/context" @@ -36,96 +35,6 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package -type Request struct { - // Types that are valid to be assigned to Value: - // *Request_BroadcastTx - Value isRequest_Value `protobuf_oneof:"value"` -} - -func (m *Request) Reset() { *m = Request{} } -func (m *Request) String() string { return proto.CompactTextString(m) } -func (*Request) ProtoMessage() {} -func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } - -type isRequest_Value interface { - isRequest_Value() -} - -type Request_BroadcastTx struct { - BroadcastTx *RequestBroadcastTx `protobuf:"bytes,1,opt,name=broadcast_tx,json=broadcastTx,oneof"` -} - -func (*Request_BroadcastTx) isRequest_Value() {} - -func (m *Request) GetValue() isRequest_Value { - if m != nil { - return m.Value - } - return nil -} - -func (m *Request) GetBroadcastTx() *RequestBroadcastTx { - if x, ok := m.GetValue().(*Request_BroadcastTx); ok { - return x.BroadcastTx - } - return nil -} - -// XXX_OneofFuncs is for the internal use of the proto package. -func (*Request) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { - return _Request_OneofMarshaler, _Request_OneofUnmarshaler, _Request_OneofSizer, []interface{}{ - (*Request_BroadcastTx)(nil), - } -} - -func _Request_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { - m := msg.(*Request) - // value - switch x := m.Value.(type) { - case *Request_BroadcastTx: - b.EncodeVarint(1<<3 | proto.WireBytes) - if err := b.EncodeMessage(x.BroadcastTx); err != nil { - return err - } - case nil: - default: - return fmt.Errorf("Request.Value has unexpected type %T", x) - } - return nil -} - -func _Request_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { - m := msg.(*Request) - switch tag { - case 1: // value.broadcast_tx - if wire != proto.WireBytes { - return true, proto.ErrInternalBadWireType - } - msg := new(RequestBroadcastTx) - err := b.DecodeMessage(msg) - m.Value = &Request_BroadcastTx{msg} - return true, err - default: - return false, nil - } -} - -func _Request_OneofSizer(msg proto.Message) (n int) { - m := msg.(*Request) - // value - switch x := m.Value.(type) { - case *Request_BroadcastTx: - s := proto.Size(x.BroadcastTx) - n += proto.SizeVarint(1<<3 | proto.WireBytes) - n += proto.SizeVarint(uint64(s)) - n += s - case nil: - default: - panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) - } - return n -} - type RequestBroadcastTx struct { Tx []byte `protobuf:"bytes,1,opt,name=tx,proto3" json:"tx,omitempty"` } @@ -133,113 +42,34 @@ type RequestBroadcastTx struct { func (m *RequestBroadcastTx) Reset() { *m = RequestBroadcastTx{} } func (m *RequestBroadcastTx) String() string { return proto.CompactTextString(m) } func (*RequestBroadcastTx) ProtoMessage() {} -func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } -type Response struct { - // Types that are valid to be assigned to Value: - // *Response_BroadcastTx - Value isResponse_Value `protobuf_oneof:"value"` -} - -func (m *Response) Reset() { *m = Response{} } -func (m *Response) String() string { return proto.CompactTextString(m) } -func (*Response) ProtoMessage() {} -func (*Response) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } - -type isResponse_Value interface { - isResponse_Value() -} - -type Response_BroadcastTx struct { - BroadcastTx *ResponseBroadcastTx `protobuf:"bytes,1,opt,name=broadcast_tx,json=broadcastTx,oneof"` +type ResponseBroadcastTx struct { + CheckTx *types.ResponseCheckTx `protobuf:"bytes,1,opt,name=check_tx,json=checkTx" json:"check_tx,omitempty"` + AppendTx *types.ResponseAppendTx `protobuf:"bytes,2,opt,name=append_tx,json=appendTx" json:"append_tx,omitempty"` } -func (*Response_BroadcastTx) isResponse_Value() {} +func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} } +func (m *ResponseBroadcastTx) String() string { return proto.CompactTextString(m) } +func (*ResponseBroadcastTx) ProtoMessage() {} +func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } -func (m *Response) GetValue() isResponse_Value { +func (m *ResponseBroadcastTx) GetCheckTx() *types.ResponseCheckTx { if m != nil { - return m.Value + return m.CheckTx } return nil } -func (m *Response) GetBroadcastTx() *ResponseBroadcastTx { - if x, ok := m.GetValue().(*Response_BroadcastTx); ok { - return x.BroadcastTx - } - return nil -} - -// XXX_OneofFuncs is for the internal use of the proto package. -func (*Response) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { - return _Response_OneofMarshaler, _Response_OneofUnmarshaler, _Response_OneofSizer, []interface{}{ - (*Response_BroadcastTx)(nil), - } -} - -func _Response_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { - m := msg.(*Response) - // value - switch x := m.Value.(type) { - case *Response_BroadcastTx: - b.EncodeVarint(1<<3 | proto.WireBytes) - if err := b.EncodeMessage(x.BroadcastTx); err != nil { - return err - } - case nil: - default: - return fmt.Errorf("Response.Value has unexpected type %T", x) +func (m *ResponseBroadcastTx) GetAppendTx() *types.ResponseAppendTx { + if m != nil { + return m.AppendTx } return nil } -func _Response_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { - m := msg.(*Response) - switch tag { - case 1: // value.broadcast_tx - if wire != proto.WireBytes { - return true, proto.ErrInternalBadWireType - } - msg := new(ResponseBroadcastTx) - err := b.DecodeMessage(msg) - m.Value = &Response_BroadcastTx{msg} - return true, err - default: - return false, nil - } -} - -func _Response_OneofSizer(msg proto.Message) (n int) { - m := msg.(*Response) - // value - switch x := m.Value.(type) { - case *Response_BroadcastTx: - s := proto.Size(x.BroadcastTx) - n += proto.SizeVarint(1<<3 | proto.WireBytes) - n += proto.SizeVarint(uint64(s)) - n += s - case nil: - default: - panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) - } - return n -} - -type ResponseBroadcastTx struct { - Code uint64 `protobuf:"varint,1,opt,name=code" json:"code,omitempty"` - Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` - Log string `protobuf:"bytes,3,opt,name=log" json:"log,omitempty"` -} - -func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} } -func (m *ResponseBroadcastTx) String() string { return proto.CompactTextString(m) } -func (*ResponseBroadcastTx) ProtoMessage() {} -func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } - func init() { - proto.RegisterType((*Request)(nil), "core_grpc.Request") proto.RegisterType((*RequestBroadcastTx)(nil), "core_grpc.RequestBroadcastTx") - proto.RegisterType((*Response)(nil), "core_grpc.Response") proto.RegisterType((*ResponseBroadcastTx)(nil), "core_grpc.ResponseBroadcastTx") } @@ -318,19 +148,19 @@ var _BroadcastAPI_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("types.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 222 bytes of a gzipped FileDescriptorProto + // 223 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0xa9, 0x2c, 0x48, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0xce, 0x2f, 0x4a, 0x8d, 0x4f, 0x2f, - 0x2a, 0x48, 0x56, 0x0a, 0xe3, 0x62, 0x0f, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x72, 0xe2, - 0xe2, 0x49, 0x2a, 0xca, 0x4f, 0x4c, 0x49, 0x4e, 0x2c, 0x2e, 0x89, 0x2f, 0xa9, 0x90, 0x60, 0x54, - 0x60, 0xd4, 0xe0, 0x36, 0x92, 0xd5, 0x83, 0x2b, 0xd6, 0x83, 0xaa, 0x74, 0x82, 0xa9, 0x0a, 0xa9, - 0xf0, 0x60, 0x08, 0xe2, 0x4e, 0x42, 0x70, 0x9d, 0xd8, 0xb9, 0x58, 0xcb, 0x12, 0x73, 0x4a, 0x53, - 0x95, 0x54, 0xb8, 0x84, 0x30, 0x55, 0x0b, 0xf1, 0x71, 0x31, 0x41, 0x0d, 0xe6, 0x09, 0x02, 0xb2, - 0x94, 0x22, 0xb8, 0x38, 0x82, 0x52, 0x8b, 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0x85, 0x9c, 0xb1, 0x5a, - 0x2f, 0x87, 0x62, 0x3d, 0x44, 0x29, 0x31, 0xf6, 0xfb, 0x73, 0x09, 0x63, 0x51, 0x2e, 0x24, 0xc4, - 0xc5, 0x92, 0x9c, 0x9f, 0x92, 0x0a, 0x36, 0x9c, 0x25, 0x08, 0xcc, 0x06, 0x89, 0xa5, 0x24, 0x96, - 0x24, 0x4a, 0x30, 0x81, 0x9d, 0x05, 0x66, 0x0b, 0x09, 0x70, 0x31, 0xe7, 0xe4, 0xa7, 0x4b, 0x30, - 0x03, 0x85, 0x38, 0x83, 0x40, 0x4c, 0xa3, 0x18, 0x2e, 0x1e, 0xb8, 0x41, 0x8e, 0x01, 0x9e, 0x42, - 0x3e, 0x5c, 0xdc, 0xc8, 0x06, 0xe3, 0x0f, 0x26, 0x29, 0x02, 0xde, 0x48, 0x62, 0x03, 0x47, 0x8c, - 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xb2, 0x77, 0xda, 0x85, 0xa7, 0x01, 0x00, 0x00, + 0x2a, 0x48, 0x96, 0xd2, 0x49, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f, + 0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0xc9, 0x2d, 0x2e, 0xd0, 0x07, + 0x6b, 0xd1, 0x47, 0xd2, 0xa8, 0xa4, 0xc2, 0x25, 0x14, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0xe2, + 0x54, 0x94, 0x9f, 0x98, 0x92, 0x9c, 0x58, 0x5c, 0x12, 0x52, 0x21, 0xc4, 0xc7, 0xc5, 0x54, 0x52, + 0x21, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0x04, 0x64, 0x29, 0xd5, 0x71, 0x09, 0x07, 0xa5, 0x16, + 0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x22, 0x2b, 0x33, 0xe4, 0xe2, 0x48, 0xce, 0x48, 0x4d, 0xce, 0x8e, + 0x87, 0x2a, 0xe6, 0x36, 0x12, 0xd3, 0x83, 0x18, 0x0e, 0x53, 0xed, 0x0c, 0x92, 0x0e, 0xa9, 0x08, + 0x62, 0x4f, 0x86, 0x30, 0x84, 0x4c, 0xb8, 0x38, 0x13, 0x0b, 0x0a, 0x80, 0xce, 0x02, 0xe9, 0x61, + 0x02, 0xeb, 0x11, 0x47, 0xd3, 0xe3, 0x08, 0x96, 0x07, 0x6a, 0xe2, 0x48, 0x84, 0xb2, 0x8c, 0x62, + 0xb8, 0x78, 0xe0, 0xf6, 0x3a, 0x06, 0x78, 0x0a, 0xf9, 0x70, 0x71, 0x23, 0xbb, 0x43, 0x56, 0x0f, + 0xee, 0x7d, 0x3d, 0x4c, 0xdf, 0x48, 0xc9, 0xa1, 0x48, 0x63, 0x78, 0x23, 0x89, 0x0d, 0x1c, 0x14, + 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0x73, 0x87, 0xb0, 0x52, 0x01, 0x00, 0x00, } diff --git a/rpc/grpc/types.proto b/rpc/grpc/types.proto index 2f2b96de3..ec7f0d1e6 100644 --- a/rpc/grpc/types.proto +++ b/rpc/grpc/types.proto @@ -1,18 +1,14 @@ syntax = "proto3"; package core_grpc; +import "github.com/tendermint/tmsp/types/types.proto"; + //---------------------------------------- // Message types //---------------------------------------- // Request types -message Request { - oneof value{ - RequestBroadcastTx broadcast_tx = 1; - } -} - message RequestBroadcastTx { bytes tx = 1; } @@ -20,17 +16,9 @@ message RequestBroadcastTx { //---------------------------------------- // Response types - -message Response { - oneof value{ - ResponseBroadcastTx broadcast_tx = 1; - } -} - message ResponseBroadcastTx{ - uint64 code = 1; // TODO: import tmsp ... - bytes data = 2; - string log = 3; + types.ResponseCheckTx check_tx = 1; + types.ResponseAppendTx append_tx = 2; } //---------------------------------------- diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index 728e87bd3..73729b4f6 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -193,9 +193,14 @@ func TestJSONBroadcastTxCommit(t *testing.T) { func testBroadcastTxCommit(t *testing.T, resI interface{}, tx []byte) { tmRes := resI.(*ctypes.TMResult) - res := (*tmRes).(*ctypes.ResultBroadcastTx) - if res.Code != tmsp.CodeType_OK { - panic(Fmt("BroadcastTxCommit got non-zero exit code: %v. %X; %s", res.Code, res.Data, res.Log)) + res := (*tmRes).(*ctypes.ResultBroadcastTxCommit) + checkTx := res.CheckTx + if checkTx.Code != tmsp.CodeType_OK { + panic(Fmt("BroadcastTxCommit got non-zero exit code from CheckTx: %v. %X; %s", checkTx.Code, checkTx.Data, checkTx.Log)) + } + appendTx := res.AppendTx + if appendTx.Code != tmsp.CodeType_OK { + panic(Fmt("BroadcastTxCommit got non-zero exit code from CheckTx: %v. %X; %s", appendTx.Code, appendTx.Data, appendTx.Log)) } mem := node.MempoolReactor().Mempool if mem.Size() != 0 { diff --git a/rpc/test/grpc_test.go b/rpc/test/grpc_test.go index 8fad465be..13672773c 100644 --- a/rpc/test/grpc_test.go +++ b/rpc/test/grpc_test.go @@ -15,7 +15,10 @@ func TestBroadcastTx(t *testing.T) { if err != nil { t.Fatal(err) } - if res.Code != 0 { - t.Fatalf("Non-zero code: %d", res.Code) + if res.CheckTx.Code != 0 { + t.Fatalf("Non-zero check tx code: %d", res.CheckTx.Code) + } + if res.AppendTx.Code != 0 { + t.Fatalf("Non-zero append tx code: %d", res.AppendTx.Code) } }