Browse Source

AppendTx -> DeliverTx

pull/365/head
Ethan Buchman 7 years ago
parent
commit
94b6dd65ee
12 changed files with 48 additions and 48 deletions
  1. +5
    -5
      consensus/mempool_test.go
  2. +10
    -10
      mempool/mempool_test.go
  3. +3
    -3
      proxy/app_conn.go
  4. +12
    -12
      rpc/core/mempool.go
  5. +1
    -1
      rpc/core/types/responses.go
  6. +1
    -1
      rpc/grpc/api.go
  7. +3
    -3
      rpc/grpc/types.pb.go
  8. +1
    -1
      rpc/grpc/types.proto
  9. +3
    -3
      rpc/test/client_test.go
  10. +2
    -2
      rpc/test/grpc_test.go
  11. +6
    -6
      state/execution.go
  12. +1
    -1
      test/app/counter_test.sh

+ 5
- 5
consensus/mempool_test.go View File

@ -23,8 +23,8 @@ func TestTxConcurrentWithCommit(t *testing.T) {
height, round := cs.Height, cs.Round
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1)
appendTxsRange := func(start, end int) {
// Append some txs.
deliverTxsRange := func(start, end int) {
// Deliver some txs.
for i := start; i < end; i++ {
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i))
@ -37,7 +37,7 @@ func TestTxConcurrentWithCommit(t *testing.T) {
}
NTxs := 10000
go appendTxsRange(0, NTxs)
go deliverTxsRange(0, NTxs)
startTestRound(cs, height, round)
ticker := time.NewTicker(time.Second * 20)
@ -59,7 +59,7 @@ func TestRmBadTx(t *testing.T) {
// increment the counter by 1
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(0))
app.AppendTx(txBytes)
app.DeliverTx(txBytes)
app.Commit()
ch := make(chan struct{})
@ -130,7 +130,7 @@ func (app *CounterApplication) SetOption(key string, value string) (log string)
return ""
}
func (app *CounterApplication) AppendTx(tx []byte) abci.Result {
func (app *CounterApplication) DeliverTx(tx []byte) abci.Result {
return runTx(tx, &app.txCount)
}


+ 10
- 10
mempool/mempool_test.go View File

@ -20,8 +20,8 @@ func TestSerialReap(t *testing.T) {
appConnCon, _ := cc.NewABCIClient()
mempool := NewMempool(config, appConnMem)
appendTxsRange := func(start, end int) {
// Append some txs.
deliverTxsRange := func(start, end int) {
// Deliver some txs.
for i := start; i < end; i++ {
// This will succeed
@ -61,11 +61,11 @@ func TestSerialReap(t *testing.T) {
}
commitRange := func(start, end int) {
// Append some txs.
// Deliver some txs.
for i := start; i < end; i++ {
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i))
res := appConnCon.AppendTxSync(txBytes)
res := appConnCon.DeliverTxSync(txBytes)
if !res.IsOK() {
t.Errorf("Error committing tx. Code:%v result:%X log:%v",
res.Code, res.Data, res.Log)
@ -79,8 +79,8 @@ func TestSerialReap(t *testing.T) {
//----------------------------------------
// Append some txs.
appendTxsRange(0, 100)
// Deliver some txs.
deliverTxsRange(0, 100)
// Reap the txs.
reapCheck(100)
@ -88,9 +88,9 @@ func TestSerialReap(t *testing.T) {
// Reap again. We should get the same amount
reapCheck(100)
// Append 0 to 999, we should reap 900 new txs
// Deliver 0 to 999, we should reap 900 new txs
// because 100 were already counted.
appendTxsRange(0, 1000)
deliverTxsRange(0, 1000)
// Reap the txs.
reapCheck(1000)
@ -105,8 +105,8 @@ func TestSerialReap(t *testing.T) {
// We should have 500 left.
reapCheck(500)
// Append 100 invalid txs and 100 valid txs
appendTxsRange(900, 1100)
// Deliver 100 invalid txs and 100 valid txs
deliverTxsRange(900, 1100)
// We should have 600 now.
reapCheck(600)


+ 3
- 3
proxy/app_conn.go View File

@ -15,7 +15,7 @@ type AppConnConsensus interface {
InitChainSync(validators []*types.Validator) (err error)
BeginBlockSync(hash []byte, header *types.Header) (err error)
AppendTxAsync(tx []byte) *abcicli.ReqRes
DeliverTxAsync(tx []byte) *abcicli.ReqRes
EndBlockSync(height uint64) (types.ResponseEndBlock, error)
CommitSync() (res types.Result)
}
@ -69,8 +69,8 @@ func (app *appConnConsensus) BeginBlockSync(hash []byte, header *types.Header) (
return app.appConn.BeginBlockSync(hash, header)
}
func (app *appConnConsensus) AppendTxAsync(tx []byte) *abcicli.ReqRes {
return app.appConn.AppendTxAsync(tx)
func (app *appConnConsensus) DeliverTxAsync(tx []byte) *abcicli.ReqRes {
return app.appConn.DeliverTxAsync(tx)
}
func (app *appConnConsensus) EndBlockSync(height uint64) (types.ResponseEndBlock, error) {


+ 12
- 12
rpc/core/mempool.go View File

@ -41,14 +41,14 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app)
// or if we timeout waiting for tx to commit.
// If CheckTx or AppendTx fail, no error will be returned, but the returned result
// If CheckTx or DeliverTx fail, no error will be returned, but the returned result
// will contain a non-OK ABCI code.
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// subscribe to tx being committed in block
appendTxResCh := make(chan types.EventDataTx, 1)
deliverTxResCh := make(chan types.EventDataTx, 1)
types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) {
appendTxResCh <- data.(types.EventDataTx)
deliverTxResCh <- data.(types.EventDataTx)
})
// broadcast the tx and register checktx callback
@ -66,7 +66,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// CheckTx failed!
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
AppendTx: nil,
DeliverTx: nil,
}, nil
}
@ -75,23 +75,23 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// TODO: configureable?
timer := time.NewTimer(60 * 2 * time.Second)
select {
case appendTxRes := <-appendTxResCh:
case deliverTxRes := <-deliverTxResCh:
// The tx was included in a block.
appendTxR := &abci.ResponseAppendTx{
Code: appendTxRes.Code,
Data: appendTxRes.Data,
Log: appendTxRes.Log,
deliverTxR := &abci.ResponseDeliverTx{
Code: deliverTxRes.Code,
Data: deliverTxRes.Data,
Log: deliverTxRes.Log,
}
log.Notice("AppendTx passed ", "tx", []byte(tx), "response", appendTxR)
log.Notice("DeliverTx passed ", "tx", []byte(tx), "response", deliverTxR)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
AppendTx: appendTxR,
DeliverTx: deliverTxR,
}, nil
case <-timer.C:
log.Error("failed to include tx")
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
AppendTx: nil,
DeliverTx: nil,
}, fmt.Errorf("Timed out waiting for transaction to be included in a block")
}


+ 1
- 1
rpc/core/types/responses.go View File

@ -65,7 +65,7 @@ type ResultBroadcastTx struct {
type ResultBroadcastTxCommit struct {
CheckTx *abci.ResponseCheckTx `json:"check_tx"`
AppendTx *abci.ResponseAppendTx `json:"append_tx"`
DeliverTx *abci.ResponseDeliverTx `json:"deliver_tx"`
}
type ResultUnconfirmedTxs struct {


+ 1
- 1
rpc/grpc/api.go View File

@ -14,5 +14,5 @@ func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcast
if err != nil {
return nil, err
}
return &ResponseBroadcastTx{res.CheckTx, res.AppendTx}, nil
return &ResponseBroadcastTx{res.CheckTx, res.DeliverTx}, nil
}

+ 3
- 3
rpc/grpc/types.pb.go View File

@ -53,7 +53,7 @@ func (m *RequestBroadcastTx) GetTx() []byte {
type ResponseBroadcastTx struct {
CheckTx *types.ResponseCheckTx `protobuf:"bytes,1,opt,name=check_tx,json=checkTx" json:"check_tx,omitempty"`
AppendTx *types.ResponseAppendTx `protobuf:"bytes,2,opt,name=append_tx,json=appendTx" json:"append_tx,omitempty"`
DeliverTx *types.ResponseDeliverTx `protobuf:"bytes,2,opt,name=deliver_tx,json=deliverTx" json:"deliver_tx,omitempty"`
}
func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} }
@ -68,9 +68,9 @@ func (m *ResponseBroadcastTx) GetCheckTx() *types.ResponseCheckTx {
return nil
}
func (m *ResponseBroadcastTx) GetAppendTx() *types.ResponseAppendTx {
func (m *ResponseBroadcastTx) GetDeliverTx() *types.ResponseDeliverTx {
if m != nil {
return m.AppendTx
return m.DeliverTx
}
return nil
}


+ 1
- 1
rpc/grpc/types.proto View File

@ -18,7 +18,7 @@ message RequestBroadcastTx {
message ResponseBroadcastTx{
types.ResponseCheckTx check_tx = 1;
types.ResponseAppendTx append_tx = 2;
types.ResponseDeliverTx deliver_tx = 2;
}
//----------------------------------------


+ 3
- 3
rpc/test/client_test.go View File

@ -198,9 +198,9 @@ func testBroadcastTxCommit(t *testing.T, resI interface{}, tx []byte) {
if checkTx.Code != abci.CodeType_OK {
panic(Fmt("BroadcastTxCommit got non-zero exit code from CheckTx: %v. %X; %s", checkTx.Code, checkTx.Data, checkTx.Log))
}
appendTx := res.AppendTx
if appendTx.Code != abci.CodeType_OK {
panic(Fmt("BroadcastTxCommit got non-zero exit code from CheckTx: %v. %X; %s", appendTx.Code, appendTx.Data, appendTx.Log))
deliverTx := res.DeliverTx
if deliverTx.Code != abci.CodeType_OK {
panic(Fmt("BroadcastTxCommit got non-zero exit code from CheckTx: %v. %X; %s", deliverTx.Code, deliverTx.Data, deliverTx.Log))
}
mem := node.MempoolReactor().Mempool
if mem.Size() != 0 {


+ 2
- 2
rpc/test/grpc_test.go View File

@ -18,7 +18,7 @@ func TestBroadcastTx(t *testing.T) {
if res.CheckTx.Code != 0 {
t.Fatalf("Non-zero check tx code: %d", res.CheckTx.Code)
}
if res.AppendTx.Code != 0 {
t.Fatalf("Non-zero append tx code: %d", res.AppendTx.Code)
if res.DeliverTx.Code != 0 {
t.Fatalf("Non-zero append tx code: %d", res.DeliverTx.Code)
}
}

+ 6
- 6
state/execution.go View File

@ -73,24 +73,24 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
// Execute transactions and get hash
proxyCb := func(req *abci.Request, res *abci.Response) {
switch r := res.Value.(type) {
case *abci.Response_AppendTx:
case *abci.Response_DeliverTx:
// TODO: make use of res.Log
// TODO: make use of this info
// Blocks may include invalid txs.
// reqAppendTx := req.(abci.RequestAppendTx)
// reqDeliverTx := req.(abci.RequestDeliverTx)
txError := ""
apTx := r.AppendTx
apTx := r.DeliverTx
if apTx.Code == abci.CodeType_OK {
validTxs += 1
} else {
log.Debug("Invalid tx", "code", r.AppendTx.Code, "log", r.AppendTx.Log)
log.Debug("Invalid tx", "code", r.DeliverTx.Code, "log", r.DeliverTx.Log)
invalidTxs += 1
txError = apTx.Code.String()
}
// NOTE: if we count we can access the tx from the block instead of
// pulling it from the req
event := types.EventDataTx{
Tx: req.GetAppendTx().Tx,
Tx: req.GetDeliverTx().Tx,
Data: apTx.Data,
Code: apTx.Code,
Log: apTx.Log,
@ -113,7 +113,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
// Run txs of block
for _, tx := range block.Txs {
fail.FailRand(len(block.Txs)) // XXX
proxyAppConn.AppendTxAsync(tx)
proxyAppConn.DeliverTxAsync(tx)
if err := proxyAppConn.Error(); err != nil {
return nil, err
}


+ 1
- 1
test/app/counter_test.sh View File

@ -50,7 +50,7 @@ function sendTx() {
if [[ "$IS_JSON" != "0" ]]; then
ERROR="$RESPONSE"
fi
APPEND_TX_RESPONSE=`echo $RESPONSE | jq .append_tx`
APPEND_TX_RESPONSE=`echo $RESPONSE | jq .deliver_tx`
APPEND_TX_CODE=`getCode "$APPEND_TX_RESPONSE"`
CHECK_TX_RESPONSE=`echo $RESPONSE | jq .check_tx`
CHECK_TX_CODE=`getCode "$CHECK_TX_RESPONSE"`


Loading…
Cancel
Save