@ -17,6 +17,7 @@ import (
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/clist"
cmn "github.com/tendermint/tendermint/libs/common"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/types"
)
)
@ -281,7 +282,7 @@ func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), t
}
}
reqRes := mem . proxyAppConn . CheckTxAsync ( abci . RequestCheckTx { Tx : tx } )
reqRes := mem . proxyAppConn . CheckTxAsync ( abci . RequestCheckTx { Tx : tx } )
reqRes . SetCallback ( mem . reqResCb ( tx , txInfo . SenderID , cb ) )
reqRes . SetCallback ( mem . reqResCb ( tx , txInfo . SenderID , txInfo . SenderP2PID , cb ) )
return nil
return nil
}
}
@ -314,14 +315,19 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {
// when all other response processing is complete.
// when all other response processing is complete.
//
//
// Used in CheckTxWithInfo to record PeerID who sent us the tx.
// Used in CheckTxWithInfo to record PeerID who sent us the tx.
func ( mem * CListMempool ) reqResCb ( tx [ ] byte , peerID uint16 , externalCb func ( * abci . Response ) ) func ( res * abci . Response ) {
func ( mem * CListMempool ) reqResCb (
tx [ ] byte ,
peerID uint16 ,
peerP2PID p2p . ID ,
externalCb func ( * abci . Response ) ,
) func ( res * abci . Response ) {
return func ( res * abci . Response ) {
return func ( res * abci . Response ) {
if mem . recheckCursor != nil {
if mem . recheckCursor != nil {
// this should never happen
// this should never happen
panic ( "recheck cursor is not nil in reqResCb" )
panic ( "recheck cursor is not nil in reqResCb" )
}
}
mem . resCbFirstTime ( tx , peerID , res )
mem . resCbFirstTime ( tx , peerID , peerP2PID , res )
// update metrics
// update metrics
mem . metrics . Size . Set ( float64 ( mem . Size ( ) ) )
mem . metrics . Size . Set ( float64 ( mem . Size ( ) ) )
@ -360,7 +366,12 @@ func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromC
//
//
// The case where the app checks the tx for the second and subsequent times is
// The case where the app checks the tx for the second and subsequent times is
// handled by the resCbRecheck callback.
// handled by the resCbRecheck callback.
func ( mem * CListMempool ) resCbFirstTime ( tx [ ] byte , peerID uint16 , res * abci . Response ) {
func ( mem * CListMempool ) resCbFirstTime (
tx [ ] byte ,
peerID uint16 ,
peerP2PID p2p . ID ,
res * abci . Response ,
) {
switch r := res . Value . ( type ) {
switch r := res . Value . ( type ) {
case * abci . Response_CheckTx :
case * abci . Response_CheckTx :
var postCheckErr error
var postCheckErr error
@ -384,7 +395,8 @@ func (mem *CListMempool) resCbFirstTime(tx []byte, peerID uint16, res *abci.Resp
mem . notifyTxsAvailable ( )
mem . notifyTxsAvailable ( )
} else {
} else {
// ignore bad transaction
// ignore bad transaction
mem . logger . Info ( "Rejected bad transaction" , "tx" , txID ( tx ) , "res" , r , "err" , postCheckErr )
mem . logger . Info ( "Rejected bad transaction" ,
"tx" , txID ( tx ) , "peerID" , peerP2PID , "res" , r , "err" , postCheckErr )
mem . metrics . FailedTxs . Add ( 1 )
mem . metrics . FailedTxs . Add ( 1 )
// remove from cache (it might be good later)
// remove from cache (it might be good later)
mem . cache . Remove ( tx )
mem . cache . Remove ( tx )