diff --git a/abci/client/client.go b/abci/client/client.go index dd19425c9..5940a7900 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -30,10 +30,6 @@ type Client interface { Error() error - // Asynchronous requests - CheckTxAsync(context.Context, types.RequestCheckTx) (*ReqRes, error) - - // Synchronous requests Flush(context.Context) error Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) Info(context.Context, types.RequestInfo) (*types.ResponseInfo, error) diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index ec9471fd8..0f2eb5ade 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -164,179 +164,64 @@ func (cli *grpcClient) Error() error { //---------------------------------------- -// NOTE: call is synchronous, use ctx to break early if needed -// NOTE: call is synchronous, use ctx to break early if needed -func (cli *grpcClient) CheckTxAsync(ctx context.Context, params types.RequestCheckTx) (*ReqRes, error) { - req := types.ToRequestCheckTx(params) - res, err := cli.client.CheckTx(ctx, req.GetCheckTx(), grpc.WaitForReady(true)) - if err != nil { - return nil, err - } - return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}) -} - -// finishAsyncCall creates a ReqRes for an async call, and immediately populates it -// with the response. We don't complete it until it's been ordered via the channel. -func (cli *grpcClient) finishAsyncCall(ctx context.Context, req *types.Request, res *types.Response) (*ReqRes, error) { - reqres := NewReqRes(req) - reqres.Response = res - select { - case cli.chReqRes <- reqres: // use channel for async responses, since they must be ordered - return reqres, nil - case <-ctx.Done(): - return nil, ctx.Err() - } -} - -// finishSyncCall waits for an async call to complete. It is necessary to call all -// sync calls asynchronously as well, to maintain call and response ordering via -// the channel, and this method will wait until the async call completes. -func (cli *grpcClient) finishSyncCall(reqres *ReqRes) *types.Response { - // It's possible that the callback is called twice, since the callback can - // be called immediately on SetCallback() in addition to after it has been - // set. This is because completing the ReqRes happens in a separate critical - // section from the one where the callback is called: there is a race where - // SetCallback() is called between completing the ReqRes and dispatching the - // callback. - // - // We also buffer the channel with 1 response, since SetCallback() will be - // called synchronously if the reqres is already completed, in which case - // it will block on sending to the channel since it hasn't gotten around to - // receiving from it yet. - // - // ReqRes should really handle callback dispatch internally, to guarantee - // that it's only called once and avoid the above race conditions. - var once sync.Once - ch := make(chan *types.Response, 1) - reqres.SetCallback(func(res *types.Response) { - once.Do(func() { - ch <- res - }) - }) - return <-ch -} - -//---------------------------------------- - func (cli *grpcClient) Flush(ctx context.Context) error { return nil } func (cli *grpcClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { - req := types.ToRequestEcho(msg) - return cli.client.Echo(ctx, req.GetEcho(), grpc.WaitForReady(true)) + return cli.client.Echo(ctx, types.ToRequestEcho(msg).GetEcho(), grpc.WaitForReady(true)) } -func (cli *grpcClient) Info( - ctx context.Context, - params types.RequestInfo, -) (*types.ResponseInfo, error) { - req := types.ToRequestInfo(params) - return cli.client.Info(ctx, req.GetInfo(), grpc.WaitForReady(true)) +func (cli *grpcClient) Info(ctx context.Context, params types.RequestInfo) (*types.ResponseInfo, error) { + return cli.client.Info(ctx, types.ToRequestInfo(params).GetInfo(), grpc.WaitForReady(true)) } -func (cli *grpcClient) CheckTx( - ctx context.Context, - params types.RequestCheckTx, -) (*types.ResponseCheckTx, error) { - - reqres, err := cli.CheckTxAsync(ctx, params) - if err != nil { - return nil, err - } - return cli.finishSyncCall(reqres).GetCheckTx(), cli.Error() +func (cli *grpcClient) CheckTx(ctx context.Context, params types.RequestCheckTx) (*types.ResponseCheckTx, error) { + return cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true)) } -func (cli *grpcClient) Query( - ctx context.Context, - params types.RequestQuery, -) (*types.ResponseQuery, error) { - req := types.ToRequestQuery(params) - return cli.client.Query(ctx, req.GetQuery(), grpc.WaitForReady(true)) +func (cli *grpcClient) Query(ctx context.Context, params types.RequestQuery) (*types.ResponseQuery, error) { + return cli.client.Query(ctx, types.ToRequestQuery(params).GetQuery(), grpc.WaitForReady(true)) } func (cli *grpcClient) Commit(ctx context.Context) (*types.ResponseCommit, error) { - req := types.ToRequestCommit() - return cli.client.Commit(ctx, req.GetCommit(), grpc.WaitForReady(true)) + return cli.client.Commit(ctx, types.ToRequestCommit().GetCommit(), grpc.WaitForReady(true)) } -func (cli *grpcClient) InitChain( - ctx context.Context, - params types.RequestInitChain, -) (*types.ResponseInitChain, error) { - - req := types.ToRequestInitChain(params) - return cli.client.InitChain(ctx, req.GetInitChain(), grpc.WaitForReady(true)) +func (cli *grpcClient) InitChain(ctx context.Context, params types.RequestInitChain) (*types.ResponseInitChain, error) { + return cli.client.InitChain(ctx, types.ToRequestInitChain(params).GetInitChain(), grpc.WaitForReady(true)) } -func (cli *grpcClient) ListSnapshots( - ctx context.Context, - params types.RequestListSnapshots, -) (*types.ResponseListSnapshots, error) { - - req := types.ToRequestListSnapshots(params) - return cli.client.ListSnapshots(ctx, req.GetListSnapshots(), grpc.WaitForReady(true)) +func (cli *grpcClient) ListSnapshots(ctx context.Context, params types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { + return cli.client.ListSnapshots(ctx, types.ToRequestListSnapshots(params).GetListSnapshots(), grpc.WaitForReady(true)) } -func (cli *grpcClient) OfferSnapshot( - ctx context.Context, - params types.RequestOfferSnapshot, -) (*types.ResponseOfferSnapshot, error) { - - req := types.ToRequestOfferSnapshot(params) - return cli.client.OfferSnapshot(ctx, req.GetOfferSnapshot(), grpc.WaitForReady(true)) +func (cli *grpcClient) OfferSnapshot(ctx context.Context, params types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { + return cli.client.OfferSnapshot(ctx, types.ToRequestOfferSnapshot(params).GetOfferSnapshot(), grpc.WaitForReady(true)) } -func (cli *grpcClient) LoadSnapshotChunk( - ctx context.Context, - params types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { - - req := types.ToRequestLoadSnapshotChunk(params) - return cli.client.LoadSnapshotChunk(ctx, req.GetLoadSnapshotChunk(), grpc.WaitForReady(true)) +func (cli *grpcClient) LoadSnapshotChunk(ctx context.Context, params types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { + return cli.client.LoadSnapshotChunk(ctx, types.ToRequestLoadSnapshotChunk(params).GetLoadSnapshotChunk(), grpc.WaitForReady(true)) } -func (cli *grpcClient) ApplySnapshotChunk( - ctx context.Context, - params types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { - - req := types.ToRequestApplySnapshotChunk(params) - return cli.client.ApplySnapshotChunk(ctx, req.GetApplySnapshotChunk(), grpc.WaitForReady(true)) +func (cli *grpcClient) ApplySnapshotChunk(ctx context.Context, params types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { + return cli.client.ApplySnapshotChunk(ctx, types.ToRequestApplySnapshotChunk(params).GetApplySnapshotChunk(), grpc.WaitForReady(true)) } -func (cli *grpcClient) PrepareProposal( - ctx context.Context, - params types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) { - - req := types.ToRequestPrepareProposal(params) - return cli.client.PrepareProposal(ctx, req.GetPrepareProposal(), grpc.WaitForReady(true)) +func (cli *grpcClient) PrepareProposal(ctx context.Context, params types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) { + return cli.client.PrepareProposal(ctx, types.ToRequestPrepareProposal(params).GetPrepareProposal(), grpc.WaitForReady(true)) } -func (cli *grpcClient) ProcessProposal( - ctx context.Context, - params types.RequestProcessProposal) (*types.ResponseProcessProposal, error) { - - req := types.ToRequestProcessProposal(params) - return cli.client.ProcessProposal(ctx, req.GetProcessProposal(), grpc.WaitForReady(true)) +func (cli *grpcClient) ProcessProposal(ctx context.Context, params types.RequestProcessProposal) (*types.ResponseProcessProposal, error) { + return cli.client.ProcessProposal(ctx, types.ToRequestProcessProposal(params).GetProcessProposal(), grpc.WaitForReady(true)) } -func (cli *grpcClient) ExtendVote( - ctx context.Context, - params types.RequestExtendVote) (*types.ResponseExtendVote, error) { - - req := types.ToRequestExtendVote(params) - return cli.client.ExtendVote(ctx, req.GetExtendVote(), grpc.WaitForReady(true)) +func (cli *grpcClient) ExtendVote(ctx context.Context, params types.RequestExtendVote) (*types.ResponseExtendVote, error) { + return cli.client.ExtendVote(ctx, types.ToRequestExtendVote(params).GetExtendVote(), grpc.WaitForReady(true)) } -func (cli *grpcClient) VerifyVoteExtension( - ctx context.Context, - params types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) { - - req := types.ToRequestVerifyVoteExtension(params) - return cli.client.VerifyVoteExtension(ctx, req.GetVerifyVoteExtension(), grpc.WaitForReady(true)) +func (cli *grpcClient) VerifyVoteExtension(ctx context.Context, params types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) { + return cli.client.VerifyVoteExtension(ctx, types.ToRequestVerifyVoteExtension(params).GetVerifyVoteExtension(), grpc.WaitForReady(true)) } -func (cli *grpcClient) FinalizeBlock( - ctx context.Context, - params types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { - - req := types.ToRequestFinalizeBlock(params) - return cli.client.FinalizeBlock(ctx, req.GetFinalizeBlock(), grpc.WaitForReady(true)) +func (cli *grpcClient) FinalizeBlock(ctx context.Context, params types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { + return cli.client.FinalizeBlock(ctx, types.ToRequestFinalizeBlock(params).GetFinalizeBlock(), grpc.WaitForReady(true)) } diff --git a/abci/client/local_client.go b/abci/client/local_client.go index d4e8aecd7..a589fa02f 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -42,17 +42,6 @@ func (app *localClient) Error() error { return nil } -func (app *localClient) CheckTxAsync(ctx context.Context, req types.RequestCheckTx) (*ReqRes, error) { - app.mtx.Lock() - defer app.mtx.Unlock() - - res := app.Application.CheckTx(req) - return app.callback( - types.ToRequestCheckTx(req), - types.ToResponseCheckTx(res), - ), nil -} - //------------------------------------------------------- func (app *localClient) Flush(ctx context.Context) error { @@ -213,16 +202,3 @@ func (app *localClient) FinalizeBlock( res := app.Application.FinalizeBlock(req) return &res, nil } - -//------------------------------------------------------- - -func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes { - return newLocalReqRes(req, res) -} - -func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes { - reqRes := NewReqRes(req) - reqRes.Response = res - reqRes.SetDone() - return reqRes -} diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 8b199e4d9..ed1e24111 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -206,12 +206,6 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error { //---------------------------------------- -func (cli *socketClient) CheckTxAsync(ctx context.Context, req types.RequestCheckTx) (*ReqRes, error) { - return cli.queueRequestAsync(ctx, types.ToRequestCheckTx(req)) -} - -//---------------------------------------- - func (cli *socketClient) Flush(ctx context.Context) error { reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush(), true) if err != nil { @@ -423,19 +417,6 @@ func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request, s return reqres, nil } -func (cli *socketClient) queueRequestAsync( - ctx context.Context, - req *types.Request, -) (*ReqRes, error) { - - reqres, err := cli.queueRequest(ctx, req, false) - if err != nil { - return nil, queueErr(err) - } - - return reqres, cli.Error() -} - func (cli *socketClient) queueRequestAndFlush( ctx context.Context, req *types.Request, diff --git a/internal/consensus/mempool_test.go b/internal/consensus/mempool_test.go index 6d0d0d81b..7f846161e 100644 --- a/internal/consensus/mempool_test.go +++ b/internal/consensus/mempool_test.go @@ -204,8 +204,8 @@ func TestMempoolRmBadTx(t *testing.T) { // Try to send the tx through the mempool. // CheckTx should not err, but the app should return a bad abci code // and the tx should get removed from the pool - err := assertMempool(t, cs.txNotifier).CheckTx(ctx, txBytes, func(r *abci.Response) { - if r.GetCheckTx().Code != code.CodeTypeBadNonce { + err := assertMempool(t, cs.txNotifier).CheckTx(ctx, txBytes, func(r *abci.ResponseCheckTx) { + if r.Code != code.CodeTypeBadNonce { t.Errorf("expected checktx to return bad nonce, got %v", r) return } diff --git a/internal/consensus/replay_stubs.go b/internal/consensus/replay_stubs.go index fc0ec312f..08eed5d69 100644 --- a/internal/consensus/replay_stubs.go +++ b/internal/consensus/replay_stubs.go @@ -22,7 +22,7 @@ var _ mempool.Mempool = emptyMempool{} func (emptyMempool) Lock() {} func (emptyMempool) Unlock() {} func (emptyMempool) Size() int { return 0 } -func (emptyMempool) CheckTx(_ context.Context, _ types.Tx, _ func(*abci.Response), _ mempool.TxInfo) error { +func (emptyMempool) CheckTx(context.Context, types.Tx, func(*abci.ResponseCheckTx), mempool.TxInfo) error { return nil } func (emptyMempool) RemoveTxByKey(txKey types.TxKey) error { return nil } diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index f60ead160..21429721d 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "reflect" "sync" "sync/atomic" "time" @@ -205,11 +204,12 @@ func (txmp *TxMempool) TxsAvailable() <-chan struct{} { return txmp.txsAvailable } -// CheckTx executes the ABCI CheckTx method for a given transaction. It acquires -// a read-lock attempts to execute the application's CheckTx ABCI method via -// CheckTxAsync. We return an error if any of the following happen: +// CheckTx executes the ABCI CheckTx method for a given transaction. +// It acquires a read-lock and attempts to execute the application's +// CheckTx ABCI method synchronously. We return an error if any of +// the following happen: // -// - The CheckTxAsync execution fails. +// - The CheckTx execution fails. // - The transaction already exists in the cache and we've already received the // transaction from the peer. Otherwise, if it solely exists in the cache, we // return nil. @@ -228,7 +228,7 @@ func (txmp *TxMempool) TxsAvailable() <-chan struct{} { func (txmp *TxMempool) CheckTx( ctx context.Context, tx types.Tx, - cb func(*abci.Response), + cb func(*abci.ResponseCheckTx), txInfo TxInfo, ) error { txmp.mtx.RLock() @@ -261,31 +261,29 @@ func (txmp *TxMempool) CheckTx( return types.ErrTxInCache } - reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx}) + res, err := txmp.proxyAppConn.CheckTx(ctx, abci.RequestCheckTx{Tx: tx}) if err != nil { txmp.cache.Remove(tx) return err } - reqRes.SetCallback(func(res *abci.Response) { - if txmp.recheckCursor != nil { - panic("recheck cursor is non-nil in CheckTx callback") - } + if txmp.recheckCursor != nil { + return errors.New("recheck cursor is non-nil") + } - wtx := &WrappedTx{ - tx: tx, - hash: txHash, - timestamp: time.Now().UTC(), - height: txmp.height, - } + wtx := &WrappedTx{ + tx: tx, + hash: txHash, + timestamp: time.Now().UTC(), + height: txmp.height, + } - txmp.defaultTxCallback(reqRes.Request, res) - txmp.initTxCallback(wtx, res, txInfo) + txmp.defaultTxCallback(tx, res) + txmp.initTxCallback(wtx, res, txInfo) - if cb != nil { - cb(res) - } - }) + if cb != nil { + cb(res) + } return nil } @@ -491,25 +489,20 @@ func (txmp *TxMempool) Update( // // NOTE: // - An explicit lock is NOT required. -func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo TxInfo) { - checkTxRes, ok := res.Value.(*abci.Response_CheckTx) - if !ok { - return - } - +func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.ResponseCheckTx, txInfo TxInfo) { var err error if txmp.postCheck != nil { - err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx) + err = txmp.postCheck(wtx.tx, res) } - if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK { + if err != nil || res.Code != abci.CodeTypeOK { // ignore bad transactions txmp.logger.Info( "rejected bad transaction", "priority", wtx.priority, "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "peer_id", txInfo.SenderNodeID, - "code", checkTxRes.CheckTx.Code, + "code", res.Code, "post_check_err", err, ) @@ -519,13 +512,13 @@ func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo txmp.cache.Remove(wtx.tx) } if err != nil { - checkTxRes.CheckTx.MempoolError = err.Error() + res.MempoolError = err.Error() } return } - sender := checkTxRes.CheckTx.Sender - priority := checkTxRes.CheckTx.Priority + sender := res.Sender + priority := res.Priority if len(sender) > 0 { if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil { @@ -577,7 +570,7 @@ func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo } } - wtx.gasWanted = checkTxRes.CheckTx.GasWanted + wtx.gasWanted = res.GasWanted wtx.priority = priority wtx.sender = sender wtx.peers = map[uint16]struct{}{ @@ -598,28 +591,20 @@ func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo txmp.notifyTxsAvailable() } -// defaultTxCallback is the CheckTx application callback used when a transaction -// is being re-checked (if re-checking is enabled). The caller must hold a mempool -// write-lock (via Lock()) and when executing Update(), if the mempool is non-empty -// and Recheck is enabled, then all remaining transactions will be rechecked via -// CheckTxAsync. The order transactions are rechecked must be the same as the -// order in which this callback is called. -func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) { +// defaultTxCallback is the CheckTx application callback used when a +// transaction is being re-checked (if re-checking is enabled). The +// caller must hold a mempool write-lock (via Lock()) and when +// executing Update(), if the mempool is non-empty and Recheck is +// enabled, then all remaining transactions will be rechecked via +// CheckTx. The order transactions are rechecked must be the same as +// the order in which this callback is called. +func (txmp *TxMempool) defaultTxCallback(tx types.Tx, res *abci.ResponseCheckTx) { if txmp.recheckCursor == nil { return } txmp.metrics.RecheckTimes.Add(1) - checkTxRes, ok := res.Value.(*abci.Response_CheckTx) - if !ok { - txmp.logger.Error("received incorrect type in mempool callback", - "expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(), - "got", reflect.TypeOf(res.Value).Name(), - ) - return - } - tx := req.GetCheckTx().Tx wtx := txmp.recheckCursor.Value.(*WrappedTx) // Search through the remaining list of tx to recheck for a transaction that matches @@ -635,7 +620,7 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) txmp.logger.Error( "re-CheckTx transaction mismatch", "got", wtx.tx.Hash(), - "expected", types.Tx(tx).Key(), + "expected", tx.Key(), ) if txmp.recheckCursor == txmp.recheckEnd { @@ -656,18 +641,18 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) if !txmp.txStore.IsTxRemoved(wtx.hash) { var err error if txmp.postCheck != nil { - err = txmp.postCheck(tx, checkTxRes.CheckTx) + err = txmp.postCheck(tx, res) } - if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil { - wtx.priority = checkTxRes.CheckTx.Priority + if res.Code == abci.CodeTypeOK && err == nil { + wtx.priority = res.Priority } else { txmp.logger.Debug( "existing transaction no longer valid; failed re-CheckTx callback", "priority", wtx.priority, "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "err", err, - "code", checkTxRes.CheckTx.Code, + "code", res.Code, ) if wtx.gossipEl != txmp.recheckCursor { @@ -697,7 +682,7 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) } // updateReCheckTxs updates the recheck cursors using the gossipIndex. For -// each transaction, it executes CheckTxAsync. The global callback defined on +// each transaction, it executes CheckTx. The global callback defined on // the proxyAppConn will be executed for each transaction after CheckTx is // executed. // @@ -717,7 +702,7 @@ func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) { // Only execute CheckTx if the transaction is not marked as removed which // could happen if the transaction was evicted. if !txmp.txStore.IsTxRemoved(wtx.hash) { - res, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{ + res, err := txmp.proxyAppConn.CheckTx(ctx, abci.RequestCheckTx{ Tx: wtx.tx, Type: abci.CheckTxType_Recheck, }) @@ -726,7 +711,7 @@ func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) { txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err) continue } - txmp.defaultTxCallback(res.Request, res.Response) + txmp.defaultTxCallback(wtx.tx, res) } } diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index 21e3743ed..e2cf12e07 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -565,14 +565,12 @@ func TestTxMempool_CheckTxPostCheckError(t *testing.T) { _, err := rng.Read(tx) require.NoError(t, err) - callback := func(res *abci.Response) { - checkTxRes, ok := res.Value.(*abci.Response_CheckTx) - require.True(t, ok) + callback := func(res *abci.ResponseCheckTx) { expectedErrString := "" if testCase.err != nil { expectedErrString = testCase.err.Error() } - require.Equal(t, expectedErrString, checkTxRes.CheckTx.MempoolError) + require.Equal(t, expectedErrString, res.MempoolError) } require.NoError(t, txmp.CheckTx(ctx, tx, callback, TxInfo{SenderID: 0})) }) diff --git a/internal/mempool/mock/mempool.go b/internal/mempool/mock/mempool.go index 2b32a7ce6..e8782c914 100644 --- a/internal/mempool/mock/mempool.go +++ b/internal/mempool/mock/mempool.go @@ -17,7 +17,7 @@ var _ Mempool = Mempool{} func (Mempool) Lock() {} func (Mempool) Unlock() {} func (Mempool) Size() int { return 0 } -func (Mempool) CheckTx(_ context.Context, _ types.Tx, _ func(*abci.Response), _ mempool.TxInfo) error { +func (Mempool) CheckTx(context.Context, types.Tx, func(*abci.ResponseCheckTx), mempool.TxInfo) error { return nil } func (Mempool) RemoveTxByKey(txKey types.TxKey) error { return nil } diff --git a/internal/mempool/types.go b/internal/mempool/types.go index 05d4ba3e3..d78517372 100644 --- a/internal/mempool/types.go +++ b/internal/mempool/types.go @@ -30,7 +30,7 @@ const ( type Mempool interface { // CheckTx executes a new transaction against the application to determine // its validity and whether it should be added to the mempool. - CheckTx(ctx context.Context, tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error + CheckTx(ctx context.Context, tx types.Tx, callback func(*abci.ResponseCheckTx), txInfo TxInfo) error // RemoveTxByKey removes a transaction, identified by its key, // from the mempool. diff --git a/internal/proxy/app_conn.go b/internal/proxy/app_conn.go index bc67f554a..f30757f45 100644 --- a/internal/proxy/app_conn.go +++ b/internal/proxy/app_conn.go @@ -31,7 +31,6 @@ type AppConnConsensus interface { type AppConnMempool interface { Error() error - CheckTxAsync(context.Context, types.RequestCheckTx) (*abciclient.ReqRes, error) CheckTx(context.Context, types.RequestCheckTx) (*types.ResponseCheckTx, error) Flush(context.Context) error @@ -152,11 +151,6 @@ func (app *appConnMempool) Flush(ctx context.Context) error { return app.appConn.Flush(ctx) } -func (app *appConnMempool) CheckTxAsync(ctx context.Context, req types.RequestCheckTx) (*abciclient.ReqRes, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "async"))() - return app.appConn.CheckTxAsync(ctx, req) -} - func (app *appConnMempool) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))() return app.appConn.CheckTx(ctx, req) diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 325d293cb..61d36e93a 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -34,19 +34,18 @@ func (env *Environment) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*cor // DeliverTx result. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync func (env *Environment) BroadcastTxSync(ctx context.Context, tx types.Tx) (*coretypes.ResultBroadcastTx, error) { - resCh := make(chan *abci.Response, 1) + resCh := make(chan *abci.ResponseCheckTx, 1) err := env.Mempool.CheckTx( ctx, tx, - func(res *abci.Response) { resCh <- res }, + func(res *abci.ResponseCheckTx) { resCh <- res }, mempool.TxInfo{}, ) if err != nil { return nil, err } - res := <-resCh - r := res.GetCheckTx() + r := <-resCh return &coretypes.ResultBroadcastTx{ Code: r.Code, @@ -61,18 +60,18 @@ func (env *Environment) BroadcastTxSync(ctx context.Context, tx types.Tx) (*core // BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit func (env *Environment) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*coretypes.ResultBroadcastTxCommit, error) { - resCh := make(chan *abci.Response, 1) + resCh := make(chan *abci.ResponseCheckTx, 1) err := env.Mempool.CheckTx( ctx, tx, - func(res *abci.Response) { resCh <- res }, + func(res *abci.ResponseCheckTx) { resCh <- res }, mempool.TxInfo{}, ) if err != nil { return nil, err } - r := (<-resCh).GetCheckTx() + r := <-resCh if r.Code != abci.CodeTypeOK { return &coretypes.ResultBroadcastTxCommit{ CheckTx: *r, diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 883716c19..fbdf257ed 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -594,7 +594,7 @@ func TestClientMethodCallsAdvanced(t *testing.T) { _, _, tx := MakeTxKV() txs[i] = tx - err := pool.CheckTx(ctx, tx, func(_ *abci.Response) { ch <- nil }, mempool.TxInfo{}) + err := pool.CheckTx(ctx, tx, func(_ *abci.ResponseCheckTx) { ch <- nil }, mempool.TxInfo{}) require.NoError(t, err) } @@ -636,7 +636,7 @@ func TestClientMethodCallsAdvanced(t *testing.T) { _, _, tx := MakeTxKV() - err := pool.CheckTx(ctx, tx, func(_ *abci.Response) { close(ch) }, mempool.TxInfo{}) + err := pool.CheckTx(ctx, tx, func(_ *abci.ResponseCheckTx) { close(ch) }, mempool.TxInfo{}) require.NoError(t, err) // wait for tx to arrive in mempoool.