@ -5,6 +5,7 @@ import (
"context"
"context"
"errors"
"errors"
"fmt"
"fmt"
"reflect"
"sync/atomic"
"sync/atomic"
"time"
"time"
@ -639,58 +640,88 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response)
txmp . metrics . RecheckTimes . Add ( 1 )
txmp . metrics . RecheckTimes . Add ( 1 )
checkTxRes , ok := res . Value . ( * abci . Response_CheckTx )
checkTxRes , ok := res . Value . ( * abci . Response_CheckTx )
if ok {
tx := req . GetCheckTx ( ) . Tx
wtx := txmp . recheckCursor . Value . ( * WrappedTx )
if ! bytes . Equal ( tx , wtx . tx ) {
panic ( fmt . Sprintf ( "re-CheckTx transaction mismatch; got: %X, expected: %X" , wtx . tx . Hash ( ) , types . Tx ( tx ) . Key ( ) ) )
if ! ok {
txmp . logger . Error ( "received incorrect type in mempool callback" ,
"expected" , reflect . TypeOf ( & abci . Response_CheckTx { } ) . Name ( ) ,
"got" , reflect . TypeOf ( res . Value ) . Name ( ) ,
)
return
}
tx := req . GetCheckTx ( ) . Tx
wtx := txmp . recheckCursor . Value . ( * WrappedTx )
// Search through the remaining list of tx to recheck for a transaction that matches
// the one we received from the ABCI application.
for {
if bytes . Equal ( tx , wtx . tx ) {
// We've found a tx in the recheck list that matches the tx that we
// received from the ABCI application.
// Break, and use this transaction for further checks.
break
}
}
// Only evaluate transactions that have not been removed. This can happen
// if an existing transaction is evicted during CheckTx and while this
// callback is being executed for the same evicted transaction.
if ! txmp . txStore . IsTxRemoved ( wtx . hash ) {
var err error
if txmp . postCheck != nil {
err = txmp . postCheck ( tx , checkTxRes . CheckTx )
}
if checkTxRes . CheckTx . Code == abci . CodeTypeOK && err == nil {
wtx . priority = checkTxRes . CheckTx . Priority
} else {
txmp . logger . Debug (
"existing transaction no longer valid; failed re-CheckTx callback" ,
"priority" , wtx . priority ,
"tx" , fmt . Sprintf ( "%X" , wtx . tx . Hash ( ) ) ,
"err" , err ,
"code" , checkTxRes . CheckTx . Code ,
)
if wtx . gossipEl != txmp . recheckCursor {
panic ( "corrupted reCheckTx cursor" )
}
txmp . removeTx ( wtx , ! txmp . config . KeepInvalidTxsInCache )
}
}
txmp . logger . Error (
"re-CheckTx transaction mismatch" ,
"got" , wtx . tx . Hash ( ) ,
"expected" , types . Tx ( tx ) . Key ( ) ,
)
// move reCheckTx cursor to next element
if txmp . recheckCursor == txmp . recheckEnd {
if txmp . recheckCursor == txmp . recheckEnd {
// we reached the end of the recheckTx list without finding a tx
// matching the one we received from the ABCI application.
// Return without processing any tx.
txmp . recheckCursor = nil
txmp . recheckCursor = nil
} else {
txmp . recheckCursor = txmp . recheckCursor . Next ( )
return
}
}
if txmp . recheckCursor == nil {
txmp . logger . Debug ( "finished rechecking transactions" )
txmp . recheckCursor = txmp . recheckCursor . Next ( )
wtx = txmp . recheckCursor . Value . ( * WrappedTx )
}
// Only evaluate transactions that have not been removed. This can happen
// if an existing transaction is evicted during CheckTx and while this
// callback is being executed for the same evicted transaction.
if ! txmp . txStore . IsTxRemoved ( wtx . hash ) {
var err error
if txmp . postCheck != nil {
err = txmp . postCheck ( tx , checkTxRes . CheckTx )
}
if txmp . Size ( ) > 0 {
txmp . notifyTxsAvailable ( )
if checkTxRes . CheckTx . Code == abci . CodeTypeOK && err == nil {
wtx . priority = checkTxRes . CheckTx . Priority
} else {
txmp . logger . Debug (
"existing transaction no longer valid; failed re-CheckTx callback" ,
"priority" , wtx . priority ,
"tx" , fmt . Sprintf ( "%X" , wtx . tx . Hash ( ) ) ,
"err" , err ,
"code" , checkTxRes . CheckTx . Code ,
)
if wtx . gossipEl != txmp . recheckCursor {
panic ( "corrupted reCheckTx cursor" )
}
}
txmp . removeTx ( wtx , ! txmp . config . KeepInvalidTxsInCache )
}
}
}
txmp . metrics . Size . Set ( float64 ( txmp . Size ( ) ) )
// move reCheckTx cursor to next element
if txmp . recheckCursor == txmp . recheckEnd {
txmp . recheckCursor = nil
} else {
txmp . recheckCursor = txmp . recheckCursor . Next ( )
}
}
if txmp . recheckCursor == nil {
txmp . logger . Debug ( "finished rechecking transactions" )
if txmp . Size ( ) > 0 {
txmp . notifyTxsAvailable ( )
}
}
txmp . metrics . Size . Set ( float64 ( txmp . Size ( ) ) )
}
}
// updateReCheckTxs updates the recheck cursors by using the gossipIndex. For
// updateReCheckTxs updates the recheck cursors by using the gossipIndex. For