Browse Source

mempool: use checktx sync calls (#7868)

wb/state-serialize
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
cc18f87000
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 88 additions and 274 deletions
  1. +0
    -4
      abci/client/client.go
  2. +28
    -143
      abci/client/grpc_client.go
  3. +0
    -24
      abci/client/local_client.go
  4. +0
    -19
      abci/client/socket_client.go
  5. +2
    -2
      internal/consensus/mempool_test.go
  6. +1
    -1
      internal/consensus/replay_stubs.go
  7. +45
    -60
      internal/mempool/mempool.go
  8. +2
    -4
      internal/mempool/mempool_test.go
  9. +1
    -1
      internal/mempool/mock/mempool.go
  10. +1
    -1
      internal/mempool/types.go
  11. +0
    -6
      internal/proxy/app_conn.go
  12. +6
    -7
      internal/rpc/core/mempool.go
  13. +2
    -2
      rpc/client/rpc_test.go

+ 0
- 4
abci/client/client.go View File

@ -30,10 +30,6 @@ type Client interface {
Error() error
// Asynchronous requests
CheckTxAsync(context.Context, types.RequestCheckTx) (*ReqRes, error)
// Synchronous requests
Flush(context.Context) error
Echo(ctx context.Context, msg string) (*types.ResponseEcho, error)
Info(context.Context, types.RequestInfo) (*types.ResponseInfo, error)


+ 28
- 143
abci/client/grpc_client.go View File

@ -164,179 +164,64 @@ func (cli *grpcClient) Error() error {
//----------------------------------------
// NOTE: call is synchronous, use ctx to break early if needed
// NOTE: call is synchronous, use ctx to break early if needed
func (cli *grpcClient) CheckTxAsync(ctx context.Context, params types.RequestCheckTx) (*ReqRes, error) {
req := types.ToRequestCheckTx(params)
res, err := cli.client.CheckTx(ctx, req.GetCheckTx(), grpc.WaitForReady(true))
if err != nil {
return nil, err
}
return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_CheckTx{CheckTx: res}})
}
// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
// with the response. We don't complete it until it's been ordered via the channel.
func (cli *grpcClient) finishAsyncCall(ctx context.Context, req *types.Request, res *types.Response) (*ReqRes, error) {
reqres := NewReqRes(req)
reqres.Response = res
select {
case cli.chReqRes <- reqres: // use channel for async responses, since they must be ordered
return reqres, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// finishSyncCall waits for an async call to complete. It is necessary to call all
// sync calls asynchronously as well, to maintain call and response ordering via
// the channel, and this method will wait until the async call completes.
func (cli *grpcClient) finishSyncCall(reqres *ReqRes) *types.Response {
// It's possible that the callback is called twice, since the callback can
// be called immediately on SetCallback() in addition to after it has been
// set. This is because completing the ReqRes happens in a separate critical
// section from the one where the callback is called: there is a race where
// SetCallback() is called between completing the ReqRes and dispatching the
// callback.
//
// We also buffer the channel with 1 response, since SetCallback() will be
// called synchronously if the reqres is already completed, in which case
// it will block on sending to the channel since it hasn't gotten around to
// receiving from it yet.
//
// ReqRes should really handle callback dispatch internally, to guarantee
// that it's only called once and avoid the above race conditions.
var once sync.Once
ch := make(chan *types.Response, 1)
reqres.SetCallback(func(res *types.Response) {
once.Do(func() {
ch <- res
})
})
return <-ch
}
//----------------------------------------
func (cli *grpcClient) Flush(ctx context.Context) error { return nil }
func (cli *grpcClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
req := types.ToRequestEcho(msg)
return cli.client.Echo(ctx, req.GetEcho(), grpc.WaitForReady(true))
return cli.client.Echo(ctx, types.ToRequestEcho(msg).GetEcho(), grpc.WaitForReady(true))
}
func (cli *grpcClient) Info(
ctx context.Context,
params types.RequestInfo,
) (*types.ResponseInfo, error) {
req := types.ToRequestInfo(params)
return cli.client.Info(ctx, req.GetInfo(), grpc.WaitForReady(true))
func (cli *grpcClient) Info(ctx context.Context, params types.RequestInfo) (*types.ResponseInfo, error) {
return cli.client.Info(ctx, types.ToRequestInfo(params).GetInfo(), grpc.WaitForReady(true))
}
func (cli *grpcClient) CheckTx(
ctx context.Context,
params types.RequestCheckTx,
) (*types.ResponseCheckTx, error) {
reqres, err := cli.CheckTxAsync(ctx, params)
if err != nil {
return nil, err
}
return cli.finishSyncCall(reqres).GetCheckTx(), cli.Error()
func (cli *grpcClient) CheckTx(ctx context.Context, params types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true))
}
func (cli *grpcClient) Query(
ctx context.Context,
params types.RequestQuery,
) (*types.ResponseQuery, error) {
req := types.ToRequestQuery(params)
return cli.client.Query(ctx, req.GetQuery(), grpc.WaitForReady(true))
func (cli *grpcClient) Query(ctx context.Context, params types.RequestQuery) (*types.ResponseQuery, error) {
return cli.client.Query(ctx, types.ToRequestQuery(params).GetQuery(), grpc.WaitForReady(true))
}
func (cli *grpcClient) Commit(ctx context.Context) (*types.ResponseCommit, error) {
req := types.ToRequestCommit()
return cli.client.Commit(ctx, req.GetCommit(), grpc.WaitForReady(true))
return cli.client.Commit(ctx, types.ToRequestCommit().GetCommit(), grpc.WaitForReady(true))
}
func (cli *grpcClient) InitChain(
ctx context.Context,
params types.RequestInitChain,
) (*types.ResponseInitChain, error) {
req := types.ToRequestInitChain(params)
return cli.client.InitChain(ctx, req.GetInitChain(), grpc.WaitForReady(true))
func (cli *grpcClient) InitChain(ctx context.Context, params types.RequestInitChain) (*types.ResponseInitChain, error) {
return cli.client.InitChain(ctx, types.ToRequestInitChain(params).GetInitChain(), grpc.WaitForReady(true))
}
func (cli *grpcClient) ListSnapshots(
ctx context.Context,
params types.RequestListSnapshots,
) (*types.ResponseListSnapshots, error) {
req := types.ToRequestListSnapshots(params)
return cli.client.ListSnapshots(ctx, req.GetListSnapshots(), grpc.WaitForReady(true))
func (cli *grpcClient) ListSnapshots(ctx context.Context, params types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
return cli.client.ListSnapshots(ctx, types.ToRequestListSnapshots(params).GetListSnapshots(), grpc.WaitForReady(true))
}
func (cli *grpcClient) OfferSnapshot(
ctx context.Context,
params types.RequestOfferSnapshot,
) (*types.ResponseOfferSnapshot, error) {
req := types.ToRequestOfferSnapshot(params)
return cli.client.OfferSnapshot(ctx, req.GetOfferSnapshot(), grpc.WaitForReady(true))
func (cli *grpcClient) OfferSnapshot(ctx context.Context, params types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
return cli.client.OfferSnapshot(ctx, types.ToRequestOfferSnapshot(params).GetOfferSnapshot(), grpc.WaitForReady(true))
}
func (cli *grpcClient) LoadSnapshotChunk(
ctx context.Context,
params types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
req := types.ToRequestLoadSnapshotChunk(params)
return cli.client.LoadSnapshotChunk(ctx, req.GetLoadSnapshotChunk(), grpc.WaitForReady(true))
func (cli *grpcClient) LoadSnapshotChunk(ctx context.Context, params types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
return cli.client.LoadSnapshotChunk(ctx, types.ToRequestLoadSnapshotChunk(params).GetLoadSnapshotChunk(), grpc.WaitForReady(true))
}
func (cli *grpcClient) ApplySnapshotChunk(
ctx context.Context,
params types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
req := types.ToRequestApplySnapshotChunk(params)
return cli.client.ApplySnapshotChunk(ctx, req.GetApplySnapshotChunk(), grpc.WaitForReady(true))
func (cli *grpcClient) ApplySnapshotChunk(ctx context.Context, params types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
return cli.client.ApplySnapshotChunk(ctx, types.ToRequestApplySnapshotChunk(params).GetApplySnapshotChunk(), grpc.WaitForReady(true))
}
func (cli *grpcClient) PrepareProposal(
ctx context.Context,
params types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
req := types.ToRequestPrepareProposal(params)
return cli.client.PrepareProposal(ctx, req.GetPrepareProposal(), grpc.WaitForReady(true))
func (cli *grpcClient) PrepareProposal(ctx context.Context, params types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
return cli.client.PrepareProposal(ctx, types.ToRequestPrepareProposal(params).GetPrepareProposal(), grpc.WaitForReady(true))
}
func (cli *grpcClient) ProcessProposal(
ctx context.Context,
params types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
req := types.ToRequestProcessProposal(params)
return cli.client.ProcessProposal(ctx, req.GetProcessProposal(), grpc.WaitForReady(true))
func (cli *grpcClient) ProcessProposal(ctx context.Context, params types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
return cli.client.ProcessProposal(ctx, types.ToRequestProcessProposal(params).GetProcessProposal(), grpc.WaitForReady(true))
}
func (cli *grpcClient) ExtendVote(
ctx context.Context,
params types.RequestExtendVote) (*types.ResponseExtendVote, error) {
req := types.ToRequestExtendVote(params)
return cli.client.ExtendVote(ctx, req.GetExtendVote(), grpc.WaitForReady(true))
func (cli *grpcClient) ExtendVote(ctx context.Context, params types.RequestExtendVote) (*types.ResponseExtendVote, error) {
return cli.client.ExtendVote(ctx, types.ToRequestExtendVote(params).GetExtendVote(), grpc.WaitForReady(true))
}
func (cli *grpcClient) VerifyVoteExtension(
ctx context.Context,
params types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
req := types.ToRequestVerifyVoteExtension(params)
return cli.client.VerifyVoteExtension(ctx, req.GetVerifyVoteExtension(), grpc.WaitForReady(true))
func (cli *grpcClient) VerifyVoteExtension(ctx context.Context, params types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
return cli.client.VerifyVoteExtension(ctx, types.ToRequestVerifyVoteExtension(params).GetVerifyVoteExtension(), grpc.WaitForReady(true))
}
func (cli *grpcClient) FinalizeBlock(
ctx context.Context,
params types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
req := types.ToRequestFinalizeBlock(params)
return cli.client.FinalizeBlock(ctx, req.GetFinalizeBlock(), grpc.WaitForReady(true))
func (cli *grpcClient) FinalizeBlock(ctx context.Context, params types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
return cli.client.FinalizeBlock(ctx, types.ToRequestFinalizeBlock(params).GetFinalizeBlock(), grpc.WaitForReady(true))
}

+ 0
- 24
abci/client/local_client.go View File

@ -42,17 +42,6 @@ func (app *localClient) Error() error {
return nil
}
func (app *localClient) CheckTxAsync(ctx context.Context, req types.RequestCheckTx) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.CheckTx(req)
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
), nil
}
//-------------------------------------------------------
func (app *localClient) Flush(ctx context.Context) error {
@ -213,16 +202,3 @@ func (app *localClient) FinalizeBlock(
res := app.Application.FinalizeBlock(req)
return &res, nil
}
//-------------------------------------------------------
func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
return newLocalReqRes(req, res)
}
func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes {
reqRes := NewReqRes(req)
reqRes.Response = res
reqRes.SetDone()
return reqRes
}

+ 0
- 19
abci/client/socket_client.go View File

@ -206,12 +206,6 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {
//----------------------------------------
func (cli *socketClient) CheckTxAsync(ctx context.Context, req types.RequestCheckTx) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestCheckTx(req))
}
//----------------------------------------
func (cli *socketClient) Flush(ctx context.Context) error {
reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush(), true)
if err != nil {
@ -423,19 +417,6 @@ func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request, s
return reqres, nil
}
func (cli *socketClient) queueRequestAsync(
ctx context.Context,
req *types.Request,
) (*ReqRes, error) {
reqres, err := cli.queueRequest(ctx, req, false)
if err != nil {
return nil, queueErr(err)
}
return reqres, cli.Error()
}
func (cli *socketClient) queueRequestAndFlush(
ctx context.Context,
req *types.Request,


+ 2
- 2
internal/consensus/mempool_test.go View File

@ -204,8 +204,8 @@ func TestMempoolRmBadTx(t *testing.T) {
// Try to send the tx through the mempool.
// CheckTx should not err, but the app should return a bad abci code
// and the tx should get removed from the pool
err := assertMempool(t, cs.txNotifier).CheckTx(ctx, txBytes, func(r *abci.Response) {
if r.GetCheckTx().Code != code.CodeTypeBadNonce {
err := assertMempool(t, cs.txNotifier).CheckTx(ctx, txBytes, func(r *abci.ResponseCheckTx) {
if r.Code != code.CodeTypeBadNonce {
t.Errorf("expected checktx to return bad nonce, got %v", r)
return
}


+ 1
- 1
internal/consensus/replay_stubs.go View File

@ -22,7 +22,7 @@ var _ mempool.Mempool = emptyMempool{}
func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) CheckTx(_ context.Context, _ types.Tx, _ func(*abci.Response), _ mempool.TxInfo) error {
func (emptyMempool) CheckTx(context.Context, types.Tx, func(*abci.ResponseCheckTx), mempool.TxInfo) error {
return nil
}
func (emptyMempool) RemoveTxByKey(txKey types.TxKey) error { return nil }


+ 45
- 60
internal/mempool/mempool.go View File

@ -5,7 +5,6 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"
@ -205,11 +204,12 @@ func (txmp *TxMempool) TxsAvailable() <-chan struct{} {
return txmp.txsAvailable
}
// CheckTx executes the ABCI CheckTx method for a given transaction. It acquires
// a read-lock attempts to execute the application's CheckTx ABCI method via
// CheckTxAsync. We return an error if any of the following happen:
// CheckTx executes the ABCI CheckTx method for a given transaction.
// It acquires a read-lock and attempts to execute the application's
// CheckTx ABCI method synchronously. We return an error if any of
// the following happen:
//
// - The CheckTxAsync execution fails.
// - The CheckTx execution fails.
// - The transaction already exists in the cache and we've already received the
// transaction from the peer. Otherwise, if it solely exists in the cache, we
// return nil.
@ -228,7 +228,7 @@ func (txmp *TxMempool) TxsAvailable() <-chan struct{} {
func (txmp *TxMempool) CheckTx(
ctx context.Context,
tx types.Tx,
cb func(*abci.Response),
cb func(*abci.ResponseCheckTx),
txInfo TxInfo,
) error {
txmp.mtx.RLock()
@ -261,31 +261,29 @@ func (txmp *TxMempool) CheckTx(
return types.ErrTxInCache
}
reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})
res, err := txmp.proxyAppConn.CheckTx(ctx, abci.RequestCheckTx{Tx: tx})
if err != nil {
txmp.cache.Remove(tx)
return err
}
reqRes.SetCallback(func(res *abci.Response) {
if txmp.recheckCursor != nil {
panic("recheck cursor is non-nil in CheckTx callback")
}
if txmp.recheckCursor != nil {
return errors.New("recheck cursor is non-nil")
}
wtx := &WrappedTx{
tx: tx,
hash: txHash,
timestamp: time.Now().UTC(),
height: txmp.height,
}
wtx := &WrappedTx{
tx: tx,
hash: txHash,
timestamp: time.Now().UTC(),
height: txmp.height,
}
txmp.defaultTxCallback(reqRes.Request, res)
txmp.initTxCallback(wtx, res, txInfo)
txmp.defaultTxCallback(tx, res)
txmp.initTxCallback(wtx, res, txInfo)
if cb != nil {
cb(res)
}
})
if cb != nil {
cb(res)
}
return nil
}
@ -491,25 +489,20 @@ func (txmp *TxMempool) Update(
//
// NOTE:
// - An explicit lock is NOT required.
func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo TxInfo) {
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
if !ok {
return
}
func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.ResponseCheckTx, txInfo TxInfo) {
var err error
if txmp.postCheck != nil {
err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx)
err = txmp.postCheck(wtx.tx, res)
}
if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK {
if err != nil || res.Code != abci.CodeTypeOK {
// ignore bad transactions
txmp.logger.Info(
"rejected bad transaction",
"priority", wtx.priority,
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"peer_id", txInfo.SenderNodeID,
"code", checkTxRes.CheckTx.Code,
"code", res.Code,
"post_check_err", err,
)
@ -519,13 +512,13 @@ func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo
txmp.cache.Remove(wtx.tx)
}
if err != nil {
checkTxRes.CheckTx.MempoolError = err.Error()
res.MempoolError = err.Error()
}
return
}
sender := checkTxRes.CheckTx.Sender
priority := checkTxRes.CheckTx.Priority
sender := res.Sender
priority := res.Priority
if len(sender) > 0 {
if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil {
@ -577,7 +570,7 @@ func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo
}
}
wtx.gasWanted = checkTxRes.CheckTx.GasWanted
wtx.gasWanted = res.GasWanted
wtx.priority = priority
wtx.sender = sender
wtx.peers = map[uint16]struct{}{
@ -598,28 +591,20 @@ func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo
txmp.notifyTxsAvailable()
}
// defaultTxCallback is the CheckTx application callback used when a transaction
// is being re-checked (if re-checking is enabled). The caller must hold a mempool
// write-lock (via Lock()) and when executing Update(), if the mempool is non-empty
// and Recheck is enabled, then all remaining transactions will be rechecked via
// CheckTxAsync. The order transactions are rechecked must be the same as the
// order in which this callback is called.
func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) {
// defaultTxCallback is the CheckTx application callback used when a
// transaction is being re-checked (if re-checking is enabled). The
// caller must hold a mempool write-lock (via Lock()) and when
// executing Update(), if the mempool is non-empty and Recheck is
// enabled, then all remaining transactions will be rechecked via
// CheckTx. The order transactions are rechecked must be the same as
// the order in which this callback is called.
func (txmp *TxMempool) defaultTxCallback(tx types.Tx, res *abci.ResponseCheckTx) {
if txmp.recheckCursor == nil {
return
}
txmp.metrics.RecheckTimes.Add(1)
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
if !ok {
txmp.logger.Error("received incorrect type in mempool callback",
"expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(),
"got", reflect.TypeOf(res.Value).Name(),
)
return
}
tx := req.GetCheckTx().Tx
wtx := txmp.recheckCursor.Value.(*WrappedTx)
// Search through the remaining list of tx to recheck for a transaction that matches
@ -635,7 +620,7 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response)
txmp.logger.Error(
"re-CheckTx transaction mismatch",
"got", wtx.tx.Hash(),
"expected", types.Tx(tx).Key(),
"expected", tx.Key(),
)
if txmp.recheckCursor == txmp.recheckEnd {
@ -656,18 +641,18 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response)
if !txmp.txStore.IsTxRemoved(wtx.hash) {
var err error
if txmp.postCheck != nil {
err = txmp.postCheck(tx, checkTxRes.CheckTx)
err = txmp.postCheck(tx, res)
}
if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
wtx.priority = checkTxRes.CheckTx.Priority
if res.Code == abci.CodeTypeOK && err == nil {
wtx.priority = res.Priority
} else {
txmp.logger.Debug(
"existing transaction no longer valid; failed re-CheckTx callback",
"priority", wtx.priority,
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"err", err,
"code", checkTxRes.CheckTx.Code,
"code", res.Code,
)
if wtx.gossipEl != txmp.recheckCursor {
@ -697,7 +682,7 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response)
}
// updateReCheckTxs updates the recheck cursors using the gossipIndex. For
// each transaction, it executes CheckTxAsync. The global callback defined on
// each transaction, it executes CheckTx. The global callback defined on
// the proxyAppConn will be executed for each transaction after CheckTx is
// executed.
//
@ -717,7 +702,7 @@ func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) {
// Only execute CheckTx if the transaction is not marked as removed which
// could happen if the transaction was evicted.
if !txmp.txStore.IsTxRemoved(wtx.hash) {
res, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{
res, err := txmp.proxyAppConn.CheckTx(ctx, abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
@ -726,7 +711,7 @@ func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) {
txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err)
continue
}
txmp.defaultTxCallback(res.Request, res.Response)
txmp.defaultTxCallback(wtx.tx, res)
}
}


+ 2
- 4
internal/mempool/mempool_test.go View File

@ -565,14 +565,12 @@ func TestTxMempool_CheckTxPostCheckError(t *testing.T) {
_, err := rng.Read(tx)
require.NoError(t, err)
callback := func(res *abci.Response) {
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
require.True(t, ok)
callback := func(res *abci.ResponseCheckTx) {
expectedErrString := ""
if testCase.err != nil {
expectedErrString = testCase.err.Error()
}
require.Equal(t, expectedErrString, checkTxRes.CheckTx.MempoolError)
require.Equal(t, expectedErrString, res.MempoolError)
}
require.NoError(t, txmp.CheckTx(ctx, tx, callback, TxInfo{SenderID: 0}))
})


+ 1
- 1
internal/mempool/mock/mempool.go View File

@ -17,7 +17,7 @@ var _ Mempool = Mempool{}
func (Mempool) Lock() {}
func (Mempool) Unlock() {}
func (Mempool) Size() int { return 0 }
func (Mempool) CheckTx(_ context.Context, _ types.Tx, _ func(*abci.Response), _ mempool.TxInfo) error {
func (Mempool) CheckTx(context.Context, types.Tx, func(*abci.ResponseCheckTx), mempool.TxInfo) error {
return nil
}
func (Mempool) RemoveTxByKey(txKey types.TxKey) error { return nil }


+ 1
- 1
internal/mempool/types.go View File

@ -30,7 +30,7 @@ const (
type Mempool interface {
// CheckTx executes a new transaction against the application to determine
// its validity and whether it should be added to the mempool.
CheckTx(ctx context.Context, tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error
CheckTx(ctx context.Context, tx types.Tx, callback func(*abci.ResponseCheckTx), txInfo TxInfo) error
// RemoveTxByKey removes a transaction, identified by its key,
// from the mempool.


+ 0
- 6
internal/proxy/app_conn.go View File

@ -31,7 +31,6 @@ type AppConnConsensus interface {
type AppConnMempool interface {
Error() error
CheckTxAsync(context.Context, types.RequestCheckTx) (*abciclient.ReqRes, error)
CheckTx(context.Context, types.RequestCheckTx) (*types.ResponseCheckTx, error)
Flush(context.Context) error
@ -152,11 +151,6 @@ func (app *appConnMempool) Flush(ctx context.Context) error {
return app.appConn.Flush(ctx)
}
func (app *appConnMempool) CheckTxAsync(ctx context.Context, req types.RequestCheckTx) (*abciclient.ReqRes, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "async"))()
return app.appConn.CheckTxAsync(ctx, req)
}
func (app *appConnMempool) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))()
return app.appConn.CheckTx(ctx, req)


+ 6
- 7
internal/rpc/core/mempool.go View File

@ -34,19 +34,18 @@ func (env *Environment) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*cor
// DeliverTx result.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync
func (env *Environment) BroadcastTxSync(ctx context.Context, tx types.Tx) (*coretypes.ResultBroadcastTx, error) {
resCh := make(chan *abci.Response, 1)
resCh := make(chan *abci.ResponseCheckTx, 1)
err := env.Mempool.CheckTx(
ctx,
tx,
func(res *abci.Response) { resCh <- res },
func(res *abci.ResponseCheckTx) { resCh <- res },
mempool.TxInfo{},
)
if err != nil {
return nil, err
}
res := <-resCh
r := res.GetCheckTx()
r := <-resCh
return &coretypes.ResultBroadcastTx{
Code: r.Code,
@ -61,18 +60,18 @@ func (env *Environment) BroadcastTxSync(ctx context.Context, tx types.Tx) (*core
// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit
func (env *Environment) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*coretypes.ResultBroadcastTxCommit, error) {
resCh := make(chan *abci.Response, 1)
resCh := make(chan *abci.ResponseCheckTx, 1)
err := env.Mempool.CheckTx(
ctx,
tx,
func(res *abci.Response) { resCh <- res },
func(res *abci.ResponseCheckTx) { resCh <- res },
mempool.TxInfo{},
)
if err != nil {
return nil, err
}
r := (<-resCh).GetCheckTx()
r := <-resCh
if r.Code != abci.CodeTypeOK {
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,


+ 2
- 2
rpc/client/rpc_test.go View File

@ -594,7 +594,7 @@ func TestClientMethodCallsAdvanced(t *testing.T) {
_, _, tx := MakeTxKV()
txs[i] = tx
err := pool.CheckTx(ctx, tx, func(_ *abci.Response) { ch <- nil }, mempool.TxInfo{})
err := pool.CheckTx(ctx, tx, func(_ *abci.ResponseCheckTx) { ch <- nil }, mempool.TxInfo{})
require.NoError(t, err)
}
@ -636,7 +636,7 @@ func TestClientMethodCallsAdvanced(t *testing.T) {
_, _, tx := MakeTxKV()
err := pool.CheckTx(ctx, tx, func(_ *abci.Response) { close(ch) }, mempool.TxInfo{})
err := pool.CheckTx(ctx, tx, func(_ *abci.ResponseCheckTx) { close(ch) }, mempool.TxInfo{})
require.NoError(t, err)
// wait for tx to arrive in mempoool.


Loading…
Cancel
Save