Browse Source

include check/append responses in broadcast_tx_commit

pull/255/head
Ethan Buchman 8 years ago
parent
commit
2ef695da97
8 changed files with 73 additions and 249 deletions
  1. +15
    -27
      rpc/core/mempool.go
  2. +9
    -2
      rpc/core/types/responses.go
  3. +2
    -2
      rpc/grpc/api.go
  4. +3
    -0
      rpc/grpc/compile.sh
  5. +27
    -197
      rpc/grpc/types.pb.go
  6. +4
    -16
      rpc/grpc/types.proto
  7. +8
    -3
      rpc/test/client_test.go
  8. +5
    -2
      rpc/test/grpc_test.go

+ 15
- 27
rpc/core/mempool.go View File

@ -39,16 +39,9 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
}, nil
}
// CONTRACT: returns error==nil iff the tx is included in a block.
//
// If CheckTx fails, return with the response from CheckTx AND an error.
// Else, block until the tx is included in a block,
// and return the result of AppendTx (with no error).
// Even if AppendTx fails, so long as the tx is included in a block this function
// will not return an error - it is the caller's responsibility to check res.Code.
// The function times out after five minutes and returns the result of CheckTx and an error.
// TODO: smarter timeout logic or someway to cancel (tx not getting committed is a sign of a larger problem!)
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app)
// or if we timeout waiting for tx to commit
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// subscribe to tx being committed in block
appendTxResCh := make(chan types.EventDataTx, 1)
@ -66,13 +59,12 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
}
checkTxRes := <-checkTxResCh
checkTxR := checkTxRes.GetCheckTx()
if r := checkTxR; r.Code != tmsp.CodeType_OK {
if checkTxR.Code != tmsp.CodeType_OK {
// CheckTx failed!
return &ctypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
}, fmt.Errorf("Check tx failed with non-zero code: %s. Data: %X; Log: %s", r.Code.String(), r.Data, r.Log)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
AppendTx: nil,
}, nil
}
// Wait for the tx to be included in a block,
@ -81,19 +73,15 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
select {
case appendTxRes := <-appendTxResCh:
// The tx was included in a block.
// NOTE we don't return an error regardless of the AppendTx code;
// clients must check this to see if they need to send a new tx!
return &ctypes.ResultBroadcastTx{
Code: appendTxRes.Code,
Data: appendTxRes.Result,
Log: appendTxRes.Log,
appendTxR := appendTxRes.GetAppendTx()
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
AppendTx: appendTxR,
}, nil
case <-timer.C:
r := checkTxR
return &ctypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
AppendTx: nil,
}, fmt.Errorf("Timed out waiting for transaction to be included in a block")
}


+ 9
- 2
rpc/core/types/responses.go View File

@ -63,6 +63,11 @@ type ResultBroadcastTx struct {
Log string `json:"log"`
}
type ResultBroadcastTxCommit struct {
CheckTx *tmsp.ResponseCheckTx `json:"check_tx"`
AppendTx *tmsp.ResponseAppendTx `json:"append_tx"`
}
type ResultUnconfirmedTxs struct {
N int `json:"n_txs"`
Txs []types.Tx `json:"txs"`
@ -115,8 +120,9 @@ const (
ResultTypeDumpConsensusState = byte(0x41)
// 0x6 bytes are for txs / the application
ResultTypeBroadcastTx = byte(0x60)
ResultTypeUnconfirmedTxs = byte(0x61)
ResultTypeBroadcastTx = byte(0x60)
ResultTypeUnconfirmedTxs = byte(0x61)
ResultTypeBroadcastTxCommit = byte(0x62)
// 0x7 bytes are for querying the application
ResultTypeTMSPQuery = byte(0x70)
@ -151,6 +157,7 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&ResultValidators{}, ResultTypeValidators},
wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState},
wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx},
wire.ConcreteType{&ResultBroadcastTxCommit{}, ResultTypeBroadcastTxCommit},
wire.ConcreteType{&ResultUnconfirmedTxs{}, ResultTypeUnconfirmedTxs},
wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe},
wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe},


+ 2
- 2
rpc/grpc/api.go View File

@ -11,8 +11,8 @@ type broadcastAPI struct {
func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) {
res, err := core.BroadcastTxCommit(req.Tx)
if res == nil {
if err != nil {
return nil, err
}
return &ResponseBroadcastTx{uint64(res.Code), res.Data, res.Log}, err
return &ResponseBroadcastTx{res.CheckTx, res.AppendTx}, nil
}

+ 3
- 0
rpc/grpc/compile.sh View File

@ -0,0 +1,3 @@
#! /bin/bash
protoc --go_out=plugins=grpc:. -I $GOPATH/src/ -I . types.proto

+ 27
- 197
rpc/grpc/types.pb.go View File

@ -9,9 +9,7 @@ It is generated from these files:
types.proto
It has these top-level messages:
Request
RequestBroadcastTx
Response
ResponseBroadcastTx
*/
package core_grpc
@ -19,6 +17,7 @@ package core_grpc
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import types "github.com/tendermint/tmsp/types"
import (
context "golang.org/x/net/context"
@ -36,96 +35,6 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Request struct {
// Types that are valid to be assigned to Value:
// *Request_BroadcastTx
Value isRequest_Value `protobuf_oneof:"value"`
}
func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type isRequest_Value interface {
isRequest_Value()
}
type Request_BroadcastTx struct {
BroadcastTx *RequestBroadcastTx `protobuf:"bytes,1,opt,name=broadcast_tx,json=broadcastTx,oneof"`
}
func (*Request_BroadcastTx) isRequest_Value() {}
func (m *Request) GetValue() isRequest_Value {
if m != nil {
return m.Value
}
return nil
}
func (m *Request) GetBroadcastTx() *RequestBroadcastTx {
if x, ok := m.GetValue().(*Request_BroadcastTx); ok {
return x.BroadcastTx
}
return nil
}
// XXX_OneofFuncs is for the internal use of the proto package.
func (*Request) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _Request_OneofMarshaler, _Request_OneofUnmarshaler, _Request_OneofSizer, []interface{}{
(*Request_BroadcastTx)(nil),
}
}
func _Request_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
m := msg.(*Request)
// value
switch x := m.Value.(type) {
case *Request_BroadcastTx:
b.EncodeVarint(1<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.BroadcastTx); err != nil {
return err
}
case nil:
default:
return fmt.Errorf("Request.Value has unexpected type %T", x)
}
return nil
}
func _Request_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
m := msg.(*Request)
switch tag {
case 1: // value.broadcast_tx
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
msg := new(RequestBroadcastTx)
err := b.DecodeMessage(msg)
m.Value = &Request_BroadcastTx{msg}
return true, err
default:
return false, nil
}
}
func _Request_OneofSizer(msg proto.Message) (n int) {
m := msg.(*Request)
// value
switch x := m.Value.(type) {
case *Request_BroadcastTx:
s := proto.Size(x.BroadcastTx)
n += proto.SizeVarint(1<<3 | proto.WireBytes)
n += proto.SizeVarint(uint64(s))
n += s
case nil:
default:
panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
}
return n
}
type RequestBroadcastTx struct {
Tx []byte `protobuf:"bytes,1,opt,name=tx,proto3" json:"tx,omitempty"`
}
@ -133,113 +42,34 @@ type RequestBroadcastTx struct {
func (m *RequestBroadcastTx) Reset() { *m = RequestBroadcastTx{} }
func (m *RequestBroadcastTx) String() string { return proto.CompactTextString(m) }
func (*RequestBroadcastTx) ProtoMessage() {}
func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type Response struct {
// Types that are valid to be assigned to Value:
// *Response_BroadcastTx
Value isResponse_Value `protobuf_oneof:"value"`
}
func (m *Response) Reset() { *m = Response{} }
func (m *Response) String() string { return proto.CompactTextString(m) }
func (*Response) ProtoMessage() {}
func (*Response) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
type isResponse_Value interface {
isResponse_Value()
}
type Response_BroadcastTx struct {
BroadcastTx *ResponseBroadcastTx `protobuf:"bytes,1,opt,name=broadcast_tx,json=broadcastTx,oneof"`
type ResponseBroadcastTx struct {
CheckTx *types.ResponseCheckTx `protobuf:"bytes,1,opt,name=check_tx,json=checkTx" json:"check_tx,omitempty"`
AppendTx *types.ResponseAppendTx `protobuf:"bytes,2,opt,name=append_tx,json=appendTx" json:"append_tx,omitempty"`
}
func (*Response_BroadcastTx) isResponse_Value() {}
func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} }
func (m *ResponseBroadcastTx) String() string { return proto.CompactTextString(m) }
func (*ResponseBroadcastTx) ProtoMessage() {}
func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *Response) GetValue() isResponse_Value {
func (m *ResponseBroadcastTx) GetCheckTx() *types.ResponseCheckTx {
if m != nil {
return m.Value
return m.CheckTx
}
return nil
}
func (m *Response) GetBroadcastTx() *ResponseBroadcastTx {
if x, ok := m.GetValue().(*Response_BroadcastTx); ok {
return x.BroadcastTx
}
return nil
}
// XXX_OneofFuncs is for the internal use of the proto package.
func (*Response) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _Response_OneofMarshaler, _Response_OneofUnmarshaler, _Response_OneofSizer, []interface{}{
(*Response_BroadcastTx)(nil),
}
}
func _Response_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
m := msg.(*Response)
// value
switch x := m.Value.(type) {
case *Response_BroadcastTx:
b.EncodeVarint(1<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.BroadcastTx); err != nil {
return err
}
case nil:
default:
return fmt.Errorf("Response.Value has unexpected type %T", x)
func (m *ResponseBroadcastTx) GetAppendTx() *types.ResponseAppendTx {
if m != nil {
return m.AppendTx
}
return nil
}
func _Response_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
m := msg.(*Response)
switch tag {
case 1: // value.broadcast_tx
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
msg := new(ResponseBroadcastTx)
err := b.DecodeMessage(msg)
m.Value = &Response_BroadcastTx{msg}
return true, err
default:
return false, nil
}
}
func _Response_OneofSizer(msg proto.Message) (n int) {
m := msg.(*Response)
// value
switch x := m.Value.(type) {
case *Response_BroadcastTx:
s := proto.Size(x.BroadcastTx)
n += proto.SizeVarint(1<<3 | proto.WireBytes)
n += proto.SizeVarint(uint64(s))
n += s
case nil:
default:
panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
}
return n
}
type ResponseBroadcastTx struct {
Code uint64 `protobuf:"varint,1,opt,name=code" json:"code,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
Log string `protobuf:"bytes,3,opt,name=log" json:"log,omitempty"`
}
func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} }
func (m *ResponseBroadcastTx) String() string { return proto.CompactTextString(m) }
func (*ResponseBroadcastTx) ProtoMessage() {}
func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func init() {
proto.RegisterType((*Request)(nil), "core_grpc.Request")
proto.RegisterType((*RequestBroadcastTx)(nil), "core_grpc.RequestBroadcastTx")
proto.RegisterType((*Response)(nil), "core_grpc.Response")
proto.RegisterType((*ResponseBroadcastTx)(nil), "core_grpc.ResponseBroadcastTx")
}
@ -318,19 +148,19 @@ var _BroadcastAPI_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("types.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 222 bytes of a gzipped FileDescriptorProto
// 223 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0xa9, 0x2c, 0x48,
0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0xce, 0x2f, 0x4a, 0x8d, 0x4f, 0x2f,
0x2a, 0x48, 0x56, 0x0a, 0xe3, 0x62, 0x0f, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x72, 0xe2,
0xe2, 0x49, 0x2a, 0xca, 0x4f, 0x4c, 0x49, 0x4e, 0x2c, 0x2e, 0x89, 0x2f, 0xa9, 0x90, 0x60, 0x54,
0x60, 0xd4, 0xe0, 0x36, 0x92, 0xd5, 0x83, 0x2b, 0xd6, 0x83, 0xaa, 0x74, 0x82, 0xa9, 0x0a, 0xa9,
0xf0, 0x60, 0x08, 0xe2, 0x4e, 0x42, 0x70, 0x9d, 0xd8, 0xb9, 0x58, 0xcb, 0x12, 0x73, 0x4a, 0x53,
0x95, 0x54, 0xb8, 0x84, 0x30, 0x55, 0x0b, 0xf1, 0x71, 0x31, 0x41, 0x0d, 0xe6, 0x09, 0x02, 0xb2,
0x94, 0x22, 0xb8, 0x38, 0x82, 0x52, 0x8b, 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0x85, 0x9c, 0xb1, 0x5a,
0x2f, 0x87, 0x62, 0x3d, 0x44, 0x29, 0x31, 0xf6, 0xfb, 0x73, 0x09, 0x63, 0x51, 0x2e, 0x24, 0xc4,
0xc5, 0x92, 0x9c, 0x9f, 0x92, 0x0a, 0x36, 0x9c, 0x25, 0x08, 0xcc, 0x06, 0x89, 0xa5, 0x24, 0x96,
0x24, 0x4a, 0x30, 0x81, 0x9d, 0x05, 0x66, 0x0b, 0x09, 0x70, 0x31, 0xe7, 0xe4, 0xa7, 0x4b, 0x30,
0x03, 0x85, 0x38, 0x83, 0x40, 0x4c, 0xa3, 0x18, 0x2e, 0x1e, 0xb8, 0x41, 0x8e, 0x01, 0x9e, 0x42,
0x3e, 0x5c, 0xdc, 0xc8, 0x06, 0xe3, 0x0f, 0x26, 0x29, 0x02, 0xde, 0x48, 0x62, 0x03, 0x47, 0x8c,
0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xb2, 0x77, 0xda, 0x85, 0xa7, 0x01, 0x00, 0x00,
0x2a, 0x48, 0x96, 0xd2, 0x49, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f,
0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0xc9, 0x2d, 0x2e, 0xd0, 0x07,
0x6b, 0xd1, 0x47, 0xd2, 0xa8, 0xa4, 0xc2, 0x25, 0x14, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0xe2,
0x54, 0x94, 0x9f, 0x98, 0x92, 0x9c, 0x58, 0x5c, 0x12, 0x52, 0x21, 0xc4, 0xc7, 0xc5, 0x54, 0x52,
0x21, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0x04, 0x64, 0x29, 0xd5, 0x71, 0x09, 0x07, 0xa5, 0x16,
0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x22, 0x2b, 0x33, 0xe4, 0xe2, 0x48, 0xce, 0x48, 0x4d, 0xce, 0x8e,
0x87, 0x2a, 0xe6, 0x36, 0x12, 0xd3, 0x83, 0x18, 0x0e, 0x53, 0xed, 0x0c, 0x92, 0x0e, 0xa9, 0x08,
0x62, 0x4f, 0x86, 0x30, 0x84, 0x4c, 0xb8, 0x38, 0x13, 0x0b, 0x0a, 0x80, 0xce, 0x02, 0xe9, 0x61,
0x02, 0xeb, 0x11, 0x47, 0xd3, 0xe3, 0x08, 0x96, 0x07, 0x6a, 0xe2, 0x48, 0x84, 0xb2, 0x8c, 0x62,
0xb8, 0x78, 0xe0, 0xf6, 0x3a, 0x06, 0x78, 0x0a, 0xf9, 0x70, 0x71, 0x23, 0xbb, 0x43, 0x56, 0x0f,
0xee, 0x7d, 0x3d, 0x4c, 0xdf, 0x48, 0xc9, 0xa1, 0x48, 0x63, 0x78, 0x23, 0x89, 0x0d, 0x1c, 0x14,
0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0x73, 0x87, 0xb0, 0x52, 0x01, 0x00, 0x00,
}

+ 4
- 16
rpc/grpc/types.proto View File

@ -1,18 +1,14 @@
syntax = "proto3";
package core_grpc;
import "github.com/tendermint/tmsp/types/types.proto";
//----------------------------------------
// Message types
//----------------------------------------
// Request types
message Request {
oneof value{
RequestBroadcastTx broadcast_tx = 1;
}
}
message RequestBroadcastTx {
bytes tx = 1;
}
@ -20,17 +16,9 @@ message RequestBroadcastTx {
//----------------------------------------
// Response types
message Response {
oneof value{
ResponseBroadcastTx broadcast_tx = 1;
}
}
message ResponseBroadcastTx{
uint64 code = 1; // TODO: import tmsp ...
bytes data = 2;
string log = 3;
types.ResponseCheckTx check_tx = 1;
types.ResponseAppendTx append_tx = 2;
}
//----------------------------------------


+ 8
- 3
rpc/test/client_test.go View File

@ -193,9 +193,14 @@ func TestJSONBroadcastTxCommit(t *testing.T) {
func testBroadcastTxCommit(t *testing.T, resI interface{}, tx []byte) {
tmRes := resI.(*ctypes.TMResult)
res := (*tmRes).(*ctypes.ResultBroadcastTx)
if res.Code != tmsp.CodeType_OK {
panic(Fmt("BroadcastTxCommit got non-zero exit code: %v. %X; %s", res.Code, res.Data, res.Log))
res := (*tmRes).(*ctypes.ResultBroadcastTxCommit)
checkTx := res.CheckTx
if checkTx.Code != tmsp.CodeType_OK {
panic(Fmt("BroadcastTxCommit got non-zero exit code from CheckTx: %v. %X; %s", checkTx.Code, checkTx.Data, checkTx.Log))
}
appendTx := res.AppendTx
if appendTx.Code != tmsp.CodeType_OK {
panic(Fmt("BroadcastTxCommit got non-zero exit code from CheckTx: %v. %X; %s", appendTx.Code, appendTx.Data, appendTx.Log))
}
mem := node.MempoolReactor().Mempool
if mem.Size() != 0 {


+ 5
- 2
rpc/test/grpc_test.go View File

@ -15,7 +15,10 @@ func TestBroadcastTx(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if res.Code != 0 {
t.Fatalf("Non-zero code: %d", res.Code)
if res.CheckTx.Code != 0 {
t.Fatalf("Non-zero check tx code: %d", res.CheckTx.Code)
}
if res.AppendTx.Code != 0 {
t.Fatalf("Non-zero append tx code: %d", res.AppendTx.Code)
}
}

Loading…
Cancel
Save