Browse Source

consensus: use delivertxsync (#7616)

pull/7646/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
4e5c2b5e8f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 39 additions and 56 deletions
  1. +12
    -18
      internal/consensus/replay_test.go
  2. +5
    -5
      internal/proxy/app_conn.go
  3. +5
    -5
      internal/proxy/mocks/app_conn_consensus.go
  4. +10
    -21
      internal/state/execution.go
  5. +7
    -7
      rpc/client/mocks/client.go

+ 12
- 18
internal/consensus/replay_test.go View File

@ -684,26 +684,20 @@ func TestMockProxyApp(t *testing.T) {
abciRes := new(tmstate.ABCIResponses) abciRes := new(tmstate.ABCIResponses)
abciRes.DeliverTxs = make([]*abci.ResponseDeliverTx, len(loadedAbciRes.DeliverTxs)) abciRes.DeliverTxs = make([]*abci.ResponseDeliverTx, len(loadedAbciRes.DeliverTxs))
// Execute transactions and get hash.
proxyCb := func(req *abci.Request, res *abci.Response) {
if r, ok := res.Value.(*abci.Response_DeliverTx); ok {
// TODO: make use of res.Log
// TODO: make use of this info
// Blocks may include invalid txs.
txRes := r.DeliverTx
if txRes.Code == abci.CodeTypeOK {
validTxs++
} else {
invalidTxs++
}
abciRes.DeliverTxs[txIndex] = txRes
txIndex++
}
}
mock.SetResponseCallback(proxyCb)
someTx := []byte("tx") someTx := []byte("tx")
_, err = mock.DeliverTxAsync(ctx, abci.RequestDeliverTx{Tx: someTx})
resp, err := mock.DeliverTx(ctx, abci.RequestDeliverTx{Tx: someTx})
// TODO: make use of res.Log
// TODO: make use of this info
// Blocks may include invalid txs.
if resp.Code == abci.CodeTypeOK {
validTxs++
} else {
invalidTxs++
}
abciRes.DeliverTxs[txIndex] = resp
txIndex++
assert.NoError(t, err) assert.NoError(t, err)
}) })
assert.True(t, validTxs == 1) assert.True(t, validTxs == 1)


+ 5
- 5
internal/proxy/app_conn.go View File

@ -21,7 +21,7 @@ type AppConnConsensus interface {
InitChain(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error) InitChain(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error)
BeginBlock(context.Context, types.RequestBeginBlock) (*types.ResponseBeginBlock, error) BeginBlock(context.Context, types.RequestBeginBlock) (*types.ResponseBeginBlock, error)
DeliverTxAsync(context.Context, types.RequestDeliverTx) (*abciclient.ReqRes, error)
DeliverTx(context.Context, types.RequestDeliverTx) (*types.ResponseDeliverTx, error)
EndBlock(context.Context, types.RequestEndBlock) (*types.ResponseEndBlock, error) EndBlock(context.Context, types.RequestEndBlock) (*types.ResponseEndBlock, error)
Commit(context.Context) (*types.ResponseCommit, error) Commit(context.Context) (*types.ResponseCommit, error)
} }
@ -93,12 +93,12 @@ func (app *appConnConsensus) BeginBlock(
return app.appConn.BeginBlock(ctx, req) return app.appConn.BeginBlock(ctx, req)
} }
func (app *appConnConsensus) DeliverTxAsync(
func (app *appConnConsensus) DeliverTx(
ctx context.Context, ctx context.Context,
req types.RequestDeliverTx, req types.RequestDeliverTx,
) (*abciclient.ReqRes, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "deliver_tx", "type", "async"))()
return app.appConn.DeliverTxAsync(ctx, req)
) (*types.ResponseDeliverTx, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "deliver_tx", "type", "sync"))()
return app.appConn.DeliverTx(ctx, req)
} }
func (app *appConnConsensus) EndBlock( func (app *appConnConsensus) EndBlock(


+ 5
- 5
internal/proxy/mocks/app_conn_consensus.go View File

@ -63,16 +63,16 @@ func (_m *AppConnConsensus) Commit(_a0 context.Context) (*types.ResponseCommit,
return r0, r1 return r0, r1
} }
// DeliverTxAsync provides a mock function with given fields: _a0, _a1
func (_m *AppConnConsensus) DeliverTxAsync(_a0 context.Context, _a1 types.RequestDeliverTx) (*abciclient.ReqRes, error) {
// DeliverTx provides a mock function with given fields: _a0, _a1
func (_m *AppConnConsensus) DeliverTx(_a0 context.Context, _a1 types.RequestDeliverTx) (*types.ResponseDeliverTx, error) {
ret := _m.Called(_a0, _a1) ret := _m.Called(_a0, _a1)
var r0 *abciclient.ReqRes
if rf, ok := ret.Get(0).(func(context.Context, types.RequestDeliverTx) *abciclient.ReqRes); ok {
var r0 *types.ResponseDeliverTx
if rf, ok := ret.Get(0).(func(context.Context, types.RequestDeliverTx) *types.ResponseDeliverTx); ok {
r0 = rf(_a0, _a1) r0 = rf(_a0, _a1)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(*abciclient.ReqRes)
r0 = ret.Get(0).(*types.ResponseDeliverTx)
} }
} }


+ 10
- 21
internal/state/execution.go View File

@ -302,26 +302,6 @@ func execBlockOnProxyApp(
dtxs := make([]*abci.ResponseDeliverTx, len(block.Txs)) dtxs := make([]*abci.ResponseDeliverTx, len(block.Txs))
abciResponses.DeliverTxs = dtxs abciResponses.DeliverTxs = dtxs
// Execute transactions and get hash.
proxyCb := func(req *abci.Request, res *abci.Response) {
if r, ok := res.Value.(*abci.Response_DeliverTx); ok {
// TODO: make use of res.Log
// TODO: make use of this info
// Blocks may include invalid txs.
txRes := r.DeliverTx
if txRes.Code == abci.CodeTypeOK {
validTxs++
} else {
logger.Debug("invalid tx", "code", txRes.Code, "log", txRes.Log)
invalidTxs++
}
abciResponses.DeliverTxs[txIndex] = txRes
txIndex++
}
}
proxyAppConn.SetResponseCallback(proxyCb)
commitInfo := getBeginBlockValidatorInfo(block, store, initialHeight) commitInfo := getBeginBlockValidatorInfo(block, store, initialHeight)
byzVals := make([]abci.Evidence, 0) byzVals := make([]abci.Evidence, 0)
@ -352,10 +332,19 @@ func execBlockOnProxyApp(
// run txs of block // run txs of block
for _, tx := range block.Txs { for _, tx := range block.Txs {
_, err = proxyAppConn.DeliverTxAsync(ctx, abci.RequestDeliverTx{Tx: tx})
resp, err := proxyAppConn.DeliverTx(ctx, abci.RequestDeliverTx{Tx: tx})
if err != nil { if err != nil {
return nil, err return nil, err
} }
if resp.Code == abci.CodeTypeOK {
validTxs++
} else {
logger.Debug("invalid tx", "code", resp.Code, "log", resp.Log)
invalidTxs++
}
abciResponses.DeliverTxs[txIndex] = resp
txIndex++
} }
abciResponses.EndBlock, err = proxyAppConn.EndBlock(ctx, abci.RequestEndBlock{Height: block.Height}) abciResponses.EndBlock, err = proxyAppConn.EndBlock(ctx, abci.RequestEndBlock{Height: block.Height})


+ 7
- 7
rpc/client/mocks/client.go View File

@ -713,13 +713,13 @@ func (_m *Client) TxSearch(ctx context.Context, query string, prove bool, page *
return r0, r1 return r0, r1
} }
// UnconfirmedTxs provides a mock function with given fields: ctx, limit
func (_m *Client) UnconfirmedTxs(ctx context.Context, limit *int) (*coretypes.ResultUnconfirmedTxs, error) {
ret := _m.Called(ctx, limit)
// UnconfirmedTxs provides a mock function with given fields: ctx, page, perPage
func (_m *Client) UnconfirmedTxs(ctx context.Context, page *int, perPage *int) (*coretypes.ResultUnconfirmedTxs, error) {
ret := _m.Called(ctx, page, perPage)
var r0 *coretypes.ResultUnconfirmedTxs var r0 *coretypes.ResultUnconfirmedTxs
if rf, ok := ret.Get(0).(func(context.Context, *int) *coretypes.ResultUnconfirmedTxs); ok {
r0 = rf(ctx, limit)
if rf, ok := ret.Get(0).(func(context.Context, *int, *int) *coretypes.ResultUnconfirmedTxs); ok {
r0 = rf(ctx, page, perPage)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(*coretypes.ResultUnconfirmedTxs) r0 = ret.Get(0).(*coretypes.ResultUnconfirmedTxs)
@ -727,8 +727,8 @@ func (_m *Client) UnconfirmedTxs(ctx context.Context, limit *int) (*coretypes.Re
} }
var r1 error var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *int) error); ok {
r1 = rf(ctx, limit)
if rf, ok := ret.Get(1).(func(context.Context, *int, *int) error); ok {
r1 = rf(ctx, page, perPage)
} else { } else {
r1 = ret.Error(1) r1 = ret.Error(1)
} }


Loading…
Cancel
Save