Browse Source

mempool: moved TxInfo parameter into Mempool.CheckTx() (#4083)

* mempool: moved TxInfo parameter into Mempool.CheckTx().

* Updated CHANGELOG_PENDING.md

* Added PR issue to CHANGELOG_PENDING

Fixes #3590
pull/4107/head
Erik Grinaker 5 years ago
committed by Anton Kaliaev
parent
commit
745846bd96
14 changed files with 47 additions and 56 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +2
    -2
      consensus/mempool_test.go
  3. +2
    -2
      consensus/reactor_test.go
  4. +8
    -7
      consensus/replay_test.go
  5. +2
    -2
      mempool/bench_test.go
  6. +2
    -2
      mempool/cache_test.go
  7. +2
    -6
      mempool/clist_mempool.go
  8. +16
    -16
      mempool/clist_mempool_test.go
  9. +1
    -7
      mempool/mempool.go
  10. +1
    -1
      mempool/reactor.go
  11. +1
    -5
      mock/mempool.go
  12. +1
    -1
      node/node_test.go
  13. +3
    -2
      rpc/client/rpc_test.go
  14. +5
    -3
      rpc/core/mempool.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -21,6 +21,7 @@ program](https://hackerone.com/tendermint).
### IMPROVEMENTS:
- [mempool] [\#4083](https://github.com/tendermint/tendermint/pull/4083) Added TxInfo parameter to CheckTx(), and removed CheckTxWithInfo() (@erikgrinaker)
- [mempool] [\#4057](https://github.com/tendermint/tendermint/issues/4057) Include peer ID when logging rejected txns (@erikgrinaker)
- [tools] [\#4023](https://github.com/tendermint/tendermint/issues/4023) Improved `tm-monitor` formatting of start time and avg tx throughput (@erikgrinaker)
- [libs/pubsub] [\#4070](https://github.com/tendermint/tendermint/pull/4070) No longer panic in `Query#(Matches|Conditions)` preferring to return an error instead.


+ 2
- 2
consensus/mempool_test.go View File

@ -99,7 +99,7 @@ func deliverTxsRange(cs *ConsensusState, start, end int) {
for i := start; i < end; i++ {
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i))
err := assertMempool(cs.txNotifier).CheckTx(txBytes, nil)
err := assertMempool(cs.txNotifier).CheckTx(txBytes, nil, mempl.TxInfo{})
if err != nil {
panic(fmt.Sprintf("Error after CheckTx: %v", err))
}
@ -159,7 +159,7 @@ func TestMempoolRmBadTx(t *testing.T) {
return
}
checkTxRespCh <- struct{}{}
})
}, mempl.TxInfo{})
if err != nil {
t.Errorf("Error after CheckTx: %v", err)
return


+ 2
- 2
consensus/reactor_test.go View File

@ -237,7 +237,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// send a tx
if err := assertMempool(css[3].txNotifier).CheckTx([]byte{1, 2, 3}, nil); err != nil {
if err := assertMempool(css[3].txNotifier).CheckTx([]byte{1, 2, 3}, nil, mempl.TxInfo{}); err != nil {
t.Error(err)
}
@ -532,7 +532,7 @@ func waitForAndValidateBlock(
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
for _, tx := range txs {
err := assertMempool(css[j].txNotifier).CheckTx(tx, nil)
err := assertMempool(css[j].txNotifier).CheckTx(tx, nil, mempl.TxInfo{})
assert.Nil(t, err)
}
}, css)


+ 8
- 7
consensus/replay_test.go View File

@ -23,6 +23,7 @@ import (
"github.com/tendermint/tendermint/crypto"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/mock"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/proxy"
@ -104,7 +105,7 @@ func sendTxs(ctx context.Context, cs *ConsensusState) {
return
default:
tx := []byte{byte(i)}
assertMempool(cs.txNotifier).CheckTx(tx, nil)
assertMempool(cs.txNotifier).CheckTx(tx, nil, mempl.TxInfo{})
i++
}
}
@ -344,7 +345,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
newValidatorPubKey1 := css[nVals].privValidator.GetPubKey()
valPubKey1ABCI := types.TM2PB.PubKey(newValidatorPubKey1)
newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower)
err := assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil)
err := assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ := css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts := propBlock.MakePartSet(partSize)
@ -369,7 +370,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
updateValidatorPubKey1 := css[nVals].privValidator.GetPubKey()
updatePubKey1ABCI := types.TM2PB.PubKey(updateValidatorPubKey1)
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil)
err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
@ -394,12 +395,12 @@ func TestSimulateValidatorsChange(t *testing.T) {
newValidatorPubKey2 := css[nVals+1].privValidator.GetPubKey()
newVal2ABCI := types.TM2PB.PubKey(newValidatorPubKey2)
newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil, mempl.TxInfo{})
assert.Nil(t, err)
newValidatorPubKey3 := css[nVals+2].privValidator.GetPubKey()
newVal3ABCI := types.TM2PB.PubKey(newValidatorPubKey3)
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
@ -427,7 +428,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
ensureNewProposal(proposalCh, height, round)
removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, nil)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, nil, mempl.TxInfo{})
assert.Nil(t, err)
rs = css[0].GetRoundState()
@ -457,7 +458,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
height++
incrementHeight(vss...)
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)


+ 2
- 2
mempool/bench_test.go View File

@ -18,7 +18,7 @@ func BenchmarkReap(b *testing.B) {
for i := 0; i < size; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTx(tx, nil)
mempool.CheckTx(tx, nil, TxInfo{})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
@ -35,7 +35,7 @@ func BenchmarkCheckTx(b *testing.B) {
for i := 0; i < b.N; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTx(tx, nil)
mempool.CheckTx(tx, nil, TxInfo{})
}
}


+ 2
- 2
mempool/cache_test.go View File

@ -58,7 +58,7 @@ func TestCacheAfterUpdate(t *testing.T) {
for tcIndex, tc := range tests {
for i := 0; i < tc.numTxsToCreate; i++ {
tx := types.Tx{byte(i)}
err := mempool.CheckTx(tx, nil)
err := mempool.CheckTx(tx, nil, TxInfo{})
require.NoError(t, err)
}
@ -71,7 +71,7 @@ func TestCacheAfterUpdate(t *testing.T) {
for _, v := range tc.reAddIndices {
tx := types.Tx{byte(v)}
_ = mempool.CheckTx(tx, nil)
_ = mempool.CheckTx(tx, nil, TxInfo{})
}
cache := mempool.cache.(*mapTxCache)


+ 2
- 6
mempool/clist_mempool.go View File

@ -209,11 +209,7 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
// cb: A callback from the CheckTx command.
// It gets called from another goroutine.
// CONTRACT: Either cb will get called, or err returned.
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
return mem.CheckTxWithInfo(tx, cb, TxInfo{SenderID: UnknownPeerID})
}
func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) {
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) {
mem.proxyMtx.Lock()
// use defer to unlock mutex because application (*local client*) might panic
defer mem.proxyMtx.Unlock()
@ -314,7 +310,7 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {
// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called
// when all other response processing is complete.
//
// Used in CheckTxWithInfo to record PeerID who sent us the tx.
// Used in CheckTx to record PeerID who sent us the tx.
func (mem *CListMempool) reqResCb(
tx []byte,
peerID uint16,


+ 16
- 16
mempool/clist_mempool_test.go View File

@ -76,7 +76,7 @@ func checkTxs(t *testing.T, mempool Mempool, count int, peerID uint16) types.Txs
if err != nil {
t.Error(err)
}
if err := mempool.CheckTxWithInfo(txBytes, nil, txInfo); err != nil {
if err := mempool.CheckTx(txBytes, nil, txInfo); err != nil {
// Skip invalid txs.
// TestMempoolFilters will fail otherwise. It asserts a number of txs
// returned.
@ -186,7 +186,7 @@ func TestMempoolUpdate(t *testing.T) {
// 1. Adds valid txs to the cache
{
mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
err := mempool.CheckTx([]byte{0x01}, nil)
err := mempool.CheckTx([]byte{0x01}, nil, TxInfo{})
if assert.Error(t, err) {
assert.Equal(t, ErrTxInCache, err)
}
@ -194,7 +194,7 @@ func TestMempoolUpdate(t *testing.T) {
// 2. Removes valid txs from the mempool
{
err := mempool.CheckTx([]byte{0x02}, nil)
err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{})
require.NoError(t, err)
mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
assert.Zero(t, mempool.Size())
@ -202,12 +202,12 @@ func TestMempoolUpdate(t *testing.T) {
// 3. Removes invalid transactions from the cache and the mempool (if present)
{
err := mempool.CheckTx([]byte{0x03}, nil)
err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{})
require.NoError(t, err)
mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil)
assert.Zero(t, mempool.Size())
err = mempool.CheckTx([]byte{0x03}, nil)
err = mempool.CheckTx([]byte{0x03}, nil, TxInfo{})
assert.NoError(t, err)
}
}
@ -277,7 +277,7 @@ func TestSerialReap(t *testing.T) {
// This will succeed
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i))
err := mempool.CheckTx(txBytes, nil)
err := mempool.CheckTx(txBytes, nil, TxInfo{})
_, cached := cacheMap[string(txBytes)]
if cached {
require.NotNil(t, err, "expected error for cached tx")
@ -287,7 +287,7 @@ func TestSerialReap(t *testing.T) {
cacheMap[string(txBytes)] = struct{}{}
// Duplicates are cached and should return error
err = mempool.CheckTx(txBytes, nil)
err = mempool.CheckTx(txBytes, nil, TxInfo{})
require.NotNil(t, err, "Expected error after CheckTx on duplicated tx")
}
}
@ -393,7 +393,7 @@ func TestMempoolCloseWAL(t *testing.T) {
require.Equal(t, 1, len(m2), "expecting the wal match in")
// 5. Write some contents to the WAL
mempool.CheckTx(types.Tx([]byte("foo")), nil)
mempool.CheckTx(types.Tx([]byte("foo")), nil, TxInfo{})
walFilepath := mempool.wal.Path
sum1 := checksumFile(walFilepath, t)
@ -403,7 +403,7 @@ func TestMempoolCloseWAL(t *testing.T) {
// 7. Invoke CloseWAL() and ensure it discards the
// WAL thus any other write won't go through.
mempool.CloseWAL()
mempool.CheckTx(types.Tx([]byte("bar")), nil)
mempool.CheckTx(types.Tx([]byte("bar")), nil, TxInfo{})
sum2 := checksumFile(walFilepath, t)
require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded")
@ -456,7 +456,7 @@ func TestMempoolMaxMsgSize(t *testing.T) {
caseString := fmt.Sprintf("case %d, len %d", i, testCase.len)
tx := cmn.RandBytes(testCase.len)
err := mempl.CheckTx(tx, nil)
err := mempl.CheckTx(tx, nil, TxInfo{})
msg := &TxMessage{tx}
encoded := cdc.MustMarshalBinaryBare(msg)
require.Equal(t, len(encoded), txMessageSize(tx), caseString)
@ -483,7 +483,7 @@ func TestMempoolTxsBytes(t *testing.T) {
assert.EqualValues(t, 0, mempool.TxsBytes())
// 2. len(tx) after CheckTx
err := mempool.CheckTx([]byte{0x01}, nil)
err := mempool.CheckTx([]byte{0x01}, nil, TxInfo{})
require.NoError(t, err)
assert.EqualValues(t, 1, mempool.TxsBytes())
@ -492,7 +492,7 @@ func TestMempoolTxsBytes(t *testing.T) {
assert.EqualValues(t, 0, mempool.TxsBytes())
// 4. zero after Flush
err = mempool.CheckTx([]byte{0x02, 0x03}, nil)
err = mempool.CheckTx([]byte{0x02, 0x03}, nil, TxInfo{})
require.NoError(t, err)
assert.EqualValues(t, 2, mempool.TxsBytes())
@ -500,9 +500,9 @@ func TestMempoolTxsBytes(t *testing.T) {
assert.EqualValues(t, 0, mempool.TxsBytes())
// 5. ErrMempoolIsFull is returned when/if MaxTxsBytes limit is reached.
err = mempool.CheckTx([]byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04}, nil)
err = mempool.CheckTx([]byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04}, nil, TxInfo{})
require.NoError(t, err)
err = mempool.CheckTx([]byte{0x05}, nil)
err = mempool.CheckTx([]byte{0x05}, nil, TxInfo{})
if assert.Error(t, err) {
assert.IsType(t, ErrMempoolIsFull{}, err)
}
@ -516,7 +516,7 @@ func TestMempoolTxsBytes(t *testing.T) {
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(0))
err = mempool.CheckTx(txBytes, nil)
err = mempool.CheckTx(txBytes, nil, TxInfo{})
require.NoError(t, err)
assert.EqualValues(t, 8, mempool.TxsBytes())
@ -567,7 +567,7 @@ func TestMempoolRemoteAppConcurrency(t *testing.T) {
tx := txs[txNum]
// this will err with ErrTxInCache many times ...
mempool.CheckTxWithInfo(tx, nil, TxInfo{SenderID: uint16(peerID)})
mempool.CheckTx(tx, nil, TxInfo{SenderID: uint16(peerID)})
}
err := mempool.FlushAppConn()
require.NoError(t, err)


+ 1
- 7
mempool/mempool.go View File

@ -15,13 +15,7 @@ import (
type Mempool interface {
// CheckTx executes a new transaction against the application to determine
// its validity and whether it should be added to the mempool.
CheckTx(tx types.Tx, callback func(*abci.Response)) error
// CheckTxWithInfo performs the same operation as CheckTx, but with extra
// meta data about the tx.
// Currently this metadata is the peer who sent it, used to prevent the tx
// from being gossiped back to them.
CheckTxWithInfo(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error
CheckTx(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error
// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes
// bytes total with the condition that the total gasWanted must be less than


+ 1
- 1
mempool/reactor.go View File

@ -169,7 +169,7 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
if src != nil {
txInfo.SenderP2PID = src.ID()
}
err := memR.mempool.CheckTxWithInfo(msg.Tx, nil, txInfo)
err := memR.mempool.CheckTx(msg.Tx, nil, txInfo)
if err != nil {
memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err)
}


+ 1
- 5
mock/mempool.go View File

@ -15,11 +15,7 @@ var _ mempl.Mempool = Mempool{}
func (Mempool) Lock() {}
func (Mempool) Unlock() {}
func (Mempool) Size() int { return 0 }
func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error {
return nil
}
func (Mempool) CheckTxWithInfo(_ types.Tx, _ func(*abci.Response),
_ mempl.TxInfo) error {
func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error {
return nil
}
func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }


+ 1
- 1
node/node_test.go View File

@ -267,7 +267,7 @@ func TestCreateProposalBlock(t *testing.T) {
txLength := 1000
for i := 0; i < maxBytes/txLength; i++ {
tx := cmn.RandBytes(txLength)
err := mempool.CheckTx(tx, nil)
err := mempool.CheckTx(tx, nil, mempl.TxInfo{})
assert.NoError(t, err)
}


+ 3
- 2
rpc/client/rpc_test.go View File

@ -17,6 +17,7 @@ import (
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/tmhash"
cmn "github.com/tendermint/tendermint/libs/common"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
@ -312,7 +313,7 @@ func TestUnconfirmedTxs(t *testing.T) {
_, _, tx := MakeTxKV()
mempool := node.Mempool()
_ = mempool.CheckTx(tx, nil)
_ = mempool.CheckTx(tx, nil, mempl.TxInfo{})
for i, c := range GetClients() {
mc, ok := c.(client.MempoolClient)
@ -333,7 +334,7 @@ func TestNumUnconfirmedTxs(t *testing.T) {
_, _, tx := MakeTxKV()
mempool := node.Mempool()
_ = mempool.CheckTx(tx, nil)
_ = mempool.CheckTx(tx, nil, mempl.TxInfo{})
mempoolSize := mempool.Size()
for i, c := range GetClients() {


+ 5
- 3
rpc/core/mempool.go View File

@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
abci "github.com/tendermint/tendermint/abci/types"
mempl "github.com/tendermint/tendermint/mempool"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
"github.com/tendermint/tendermint/types"
@ -73,7 +74,8 @@ import (
// |-----------+------+---------+----------+-----------------|
// | tx | Tx | nil | true | The transaction |
func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
err := mempool.CheckTx(tx, nil)
err := mempool.CheckTx(tx, nil, mempl.TxInfo{})
if err != nil {
return nil, err
}
@ -137,7 +139,7 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas
resCh := make(chan *abci.Response, 1)
err := mempool.CheckTx(tx, func(res *abci.Response) {
resCh <- res
})
}, mempl.TxInfo{})
if err != nil {
return nil, err
}
@ -237,7 +239,7 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc
checkTxResCh := make(chan *abci.Response, 1)
err = mempool.CheckTx(tx, func(res *abci.Response) {
checkTxResCh <- res
})
}, mempl.TxInfo{})
if err != nil {
logger.Error("Error on broadcastTxCommit", "err", err)
return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err)


Loading…
Cancel
Save