From b74a97a4f63f1627c907528ff00b7e1d6175f31f Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 30 Nov 2016 17:28:41 -0500 Subject: [PATCH] update grpc broadcast tx --- consensus/state.go | 5 ++- rpc/core/mempool.go | 16 +++++++-- rpc/grpc/types.pb.go | 30 ++++++++++------ state/execution.go | 10 +++--- test/app/counter_test.sh | 76 +++++++++++++++++++++++++++------------- test/app/test.sh | 1 + types/events.go | 10 +++--- 7 files changed, 98 insertions(+), 50 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index bef286e16..2bf94b7e8 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1274,7 +1274,10 @@ func (cs *ConsensusState) finalizeCommit(height int) { // Execute and commit the block, and update the mempool. // All calls to the proxyAppConn should come here. // NOTE: the block.AppHash wont reflect these txs until the next block - stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) + err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) + if err != nil { + // TODO! + } fail.Fail() // XXX diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 0a57167ec..3ce9d32c6 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -40,7 +40,9 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { } // CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app) -// or if we timeout waiting for tx to commit +// or if we timeout waiting for tx to commit. +// If CheckTx or AppendTx fail, no error will be returned, but the returned result +// will contain a non-OK TMSP code. func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // subscribe to tx being committed in block @@ -55,6 +57,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { checkTxResCh <- res }) if err != nil { + log.Error("err", "err", err) return nil, fmt.Errorf("Error broadcasting transaction: %v", err) } checkTxRes := <-checkTxResCh @@ -69,16 +72,23 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // Wait for the tx to be included in a block, // timeout after something reasonable. - timer := time.NewTimer(60 * 5 * time.Second) + // TODO: configureable? + timer := time.NewTimer(60 * 2 * time.Second) select { case appendTxRes := <-appendTxResCh: // The tx was included in a block. - appendTxR := appendTxRes.GetAppendTx() + appendTxR := &tmsp.ResponseAppendTx{ + Code: appendTxRes.Code, + Data: appendTxRes.Data, + Log: appendTxRes.Log, + } + log.Error("appendtx passed ", "r", appendTxR) return &ctypes.ResultBroadcastTxCommit{ CheckTx: checkTxR, AppendTx: appendTxR, }, nil case <-timer.C: + log.Error("failed to include tx") return &ctypes.ResultBroadcastTxCommit{ CheckTx: checkTxR, AppendTx: nil, diff --git a/rpc/grpc/types.pb.go b/rpc/grpc/types.pb.go index 06ff3f879..225110c23 100644 --- a/rpc/grpc/types.pb.go +++ b/rpc/grpc/types.pb.go @@ -44,6 +44,13 @@ func (m *RequestBroadcastTx) String() string { return proto.CompactTe func (*RequestBroadcastTx) ProtoMessage() {} func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (m *RequestBroadcastTx) GetTx() []byte { + if m != nil { + return m.Tx + } + return nil +} + type ResponseBroadcastTx struct { CheckTx *types.ResponseCheckTx `protobuf:"bytes,1,opt,name=check_tx,json=checkTx" json:"check_tx,omitempty"` AppendTx *types.ResponseAppendTx `protobuf:"bytes,2,opt,name=append_tx,json=appendTx" json:"append_tx,omitempty"` @@ -79,7 +86,7 @@ var _ grpc.ClientConn // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion3 +const _ = grpc.SupportPackageIsVersion4 // Client API for BroadcastAPI service @@ -142,25 +149,26 @@ var _BroadcastAPI_serviceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{}, - Metadata: fileDescriptor0, + Metadata: "types.proto", } func init() { proto.RegisterFile("types.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 223 bytes of a gzipped FileDescriptorProto + // 226 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0xa9, 0x2c, 0x48, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0xce, 0x2f, 0x4a, 0x8d, 0x4f, 0x2f, 0x2a, 0x48, 0x96, 0xd2, 0x49, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f, 0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0xc9, 0x2d, 0x2e, 0xd0, 0x07, 0x6b, 0xd1, 0x47, 0xd2, 0xa8, 0xa4, 0xc2, 0x25, 0x14, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0xe2, 0x54, 0x94, 0x9f, 0x98, 0x92, 0x9c, 0x58, 0x5c, 0x12, 0x52, 0x21, 0xc4, 0xc7, 0xc5, 0x54, 0x52, - 0x21, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0x04, 0x64, 0x29, 0xd5, 0x71, 0x09, 0x07, 0xa5, 0x16, - 0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x22, 0x2b, 0x33, 0xe4, 0xe2, 0x48, 0xce, 0x48, 0x4d, 0xce, 0x8e, - 0x87, 0x2a, 0xe6, 0x36, 0x12, 0xd3, 0x83, 0x18, 0x0e, 0x53, 0xed, 0x0c, 0x92, 0x0e, 0xa9, 0x08, - 0x62, 0x4f, 0x86, 0x30, 0x84, 0x4c, 0xb8, 0x38, 0x13, 0x0b, 0x0a, 0x80, 0xce, 0x02, 0xe9, 0x61, - 0x02, 0xeb, 0x11, 0x47, 0xd3, 0xe3, 0x08, 0x96, 0x07, 0x6a, 0xe2, 0x48, 0x84, 0xb2, 0x8c, 0x62, - 0xb8, 0x78, 0xe0, 0xf6, 0x3a, 0x06, 0x78, 0x0a, 0xf9, 0x70, 0x71, 0x23, 0xbb, 0x43, 0x56, 0x0f, - 0xee, 0x7d, 0x3d, 0x4c, 0xdf, 0x48, 0xc9, 0xa1, 0x48, 0x63, 0x78, 0x23, 0x89, 0x0d, 0x1c, 0x14, - 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0x73, 0x87, 0xb0, 0x52, 0x01, 0x00, 0x00, + 0x21, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0xc4, 0x54, 0x52, 0xa1, 0x54, 0xc7, 0x25, 0x1c, 0x94, + 0x5a, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0xac, 0xcc, 0x90, 0x8b, 0x23, 0x39, 0x23, 0x35, 0x39, + 0x3b, 0x1e, 0xaa, 0x98, 0xdb, 0x48, 0x4c, 0x0f, 0x62, 0x38, 0x4c, 0xb5, 0x33, 0x48, 0x3a, 0xa4, + 0x22, 0x88, 0x3d, 0x19, 0xc2, 0x10, 0x32, 0xe1, 0xe2, 0x4c, 0x2c, 0x28, 0x48, 0xcd, 0x4b, 0x01, + 0xe9, 0x61, 0x02, 0xeb, 0x11, 0x47, 0xd3, 0xe3, 0x08, 0x96, 0x0f, 0xa9, 0x08, 0xe2, 0x48, 0x84, + 0xb2, 0x8c, 0x62, 0xb8, 0x78, 0xe0, 0xf6, 0x3a, 0x06, 0x78, 0x0a, 0xf9, 0x70, 0x71, 0x23, 0xbb, + 0x43, 0x56, 0x0f, 0xee, 0x7d, 0x3d, 0x4c, 0xdf, 0x48, 0xc9, 0xa1, 0x48, 0x63, 0x78, 0x23, 0x89, + 0x0d, 0x1c, 0x14, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0x73, 0x87, 0xb0, 0x52, 0x01, + 0x00, 0x00, } diff --git a/state/execution.go b/state/execution.go index 07466c777..0eb4adcb9 100644 --- a/state/execution.go +++ b/state/execution.go @@ -88,11 +88,11 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo // NOTE: if we count we can access the tx from the block instead of // pulling it from the req event := types.EventDataTx{ - Tx: req.GetAppendTx().Tx, - Result: apTx.Data, - Code: apTx.Code, - Log: apTx.Log, - Error: txError, + Tx: req.GetAppendTx().Tx, + Data: apTx.Data, + Code: apTx.Code, + Log: apTx.Log, + Error: txError, } types.FireEventTx(eventCache, event) } diff --git a/test/app/counter_test.sh b/test/app/counter_test.sh index b8d670dc2..80b9b27b4 100644 --- a/test/app/counter_test.sh +++ b/test/app/counter_test.sh @@ -1,4 +1,5 @@ #! /bin/bash +set -u ##################### # counter over socket @@ -7,6 +8,19 @@ TESTNAME=$1 # Send some txs +function getCode() { + R=$1 + if [[ "$R" == "{}" ]]; then + # protobuf auto adds `omitempty` to everything so code OK and empty data/log + # will not even show when marshalled into json + # apparently we can use github.com/golang/protobuf/jsonpb to do the marshalling ... + echo 0 + else + # this wont actually work if theres an error ... + echo "$R" | jq .code + fi +} + function sendTx() { TX=$1 if [[ "$GRPC_BROADCAST_TX" == "" ]]; then @@ -15,7 +29,10 @@ function sendTx() { ERROR=`echo $RESPONSE | jq .error` ERROR=$(echo "$ERROR" | tr -d '"') # remove surrounding quotes else - RESPONSE=`go run grpc_client.go $TX` + if [ ! -f grpc_client ]; then + go build -o grpc_client grpc_client.go + fi + RESPONSE=`./grpc_client $TX` echo $RESPONSE | jq . &> /dev/null IS_JSON=$? if [[ "$IS_JSON" != "0" ]]; then @@ -23,72 +40,81 @@ function sendTx() { else ERROR="" # reset fi + APPEND_TX_RESPONSE=`echo $RESPONSE | jq .append_tx` + APPEND_TX_CODE=`getCode "$APPEND_TX_RESPONSE"` + CHECK_TX_RESPONSE=`echo $RESPONSE | jq .check_tx` + CHECK_TX_CODE=`getCode "$CHECK_TX_RESPONSE"` - if [[ "$RESPONSE" == "{}" ]]; then - # protobuf auto adds `omitempty` to everything so code OK and empty data/log - # will not even show when marshalled into json - # apparently we can use github.com/golang/protobuf/jsonpb to do the marshalling ... - CODE=0 - else - # this wont actually work if theres an error ... - CODE=`echo $RESPONSE | jq .code` - fi - #echo "-------" - #echo "TX $TX" - #echo "RESPONSE $RESPONSE" - #echo "CODE $CODE" - #echo "ERROR $ERROR" - #echo "----" + echo "-------" + echo "TX $TX" + echo "RESPONSE $RESPONSE" + echo "CHECK_TX_RESPONSE $CHECK_TX_RESPONSE" + echo "APPEND_TX_RESPONSE $APPEND_TX_RESPONSE" + echo "CHECK_TX_CODE $CHECK_TX_CODE" + echo "APPEND_TX_CODE $APPEND_TX_CODE" + echo "----" fi } +echo "... sending tx. expect no error" + # 0 should pass once and get in block, with no error TX=00 sendTx $TX -if [[ $CODE != 0 ]]; then +if [[ $APPEND_TX_CODE != 0 ]]; then echo "Got non-zero exit code for $TX. $RESPONSE" exit 1 fi -if [[ "$ERROR" != "" ]]; then + +if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" != "" ]]; then echo "Unexpected error. Tx $TX should have been included in a block. $ERROR" exit 1 fi - +echo "... sending tx. expect error" # second time should get rejected by the mempool (return error and non-zero code) sendTx $TX -if [[ $CODE == 0 ]]; then +echo "CHECKTX CODE: $CHECK_TX_CODE" +if [[ "$CHECK_TX_CODE" == 0 ]]; then echo "Got zero exit code for $TX. Expected tx to be rejected by mempool. $RESPONSE" exit 1 fi -if [[ "$ERROR" == "" ]]; then +if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" == "" ]]; then echo "Expected to get an error - tx $TX should have been rejected from mempool" echo "$RESPONSE" exit 1 fi +echo "... sending tx. expect no error" + # now, TX=01 should pass, with no error TX=01 sendTx $TX -if [[ $CODE != 0 ]]; then +if [[ $APPEND_TX_CODE != 0 ]]; then echo "Got non-zero exit code for $TX. $RESPONSE" exit 1 fi -if [[ "$ERROR" != "" ]]; then +if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" != "" ]]; then echo "Unexpected error. Tx $TX should have been accepted in block. $ERROR" exit 1 fi +echo "... sending tx. expect no error, but invalid" + # now, TX=03 should get in a block (passes CheckTx, no error), but is invalid TX=03 sendTx $TX -if [[ $CODE == 0 ]]; then +if [[ "$CHECK_TX_CODE" != 0 ]]; then + echo "Got non-zero exit code for checktx on $TX. $RESPONSE" + exit 1 +fi +if [[ $APPEND_TX_CODE == 0 ]]; then echo "Got zero exit code for $TX. Should have been bad nonce. $RESPONSE" exit 1 fi -if [[ "$ERROR" != "" ]]; then +if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" != "" ]]; then echo "Unexpected error. Tx $TX should have been included in a block. $ERROR" exit 1 fi diff --git a/test/app/test.sh b/test/app/test.sh index f73ab6960..4830c2b15 100644 --- a/test/app/test.sh +++ b/test/app/test.sh @@ -83,6 +83,7 @@ function counter_over_grpc_grpc() { echo "Starting counter and tendermint" counter --serial --tmsp grpc > /dev/null & pid_counter=$! + sleep 1 GRPC_PORT=36656 tendermint node --tmsp grpc --grpc_laddr tcp://localhost:$GRPC_PORT > tendermint.log & pid_tendermint=$! diff --git a/types/events.go b/types/events.go index c6eb7611a..8f7a5bbf0 100644 --- a/types/events.go +++ b/types/events.go @@ -73,11 +73,11 @@ type EventDataNewBlockHeader struct { // All txs fire EventDataTx type EventDataTx struct { - Tx Tx `json:"tx"` - Result []byte `json:"result"` - Log string `json:"log"` - Code tmsp.CodeType `json:"code"` - Error string `json:"error"` + Tx Tx `json:"tx"` + Data []byte `json:"data"` + Log string `json:"log"` + Code tmsp.CodeType `json:"code"` + Error string `json:"error"` // this is redundant information for now } // NOTE: This goes into the replay WAL