diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index e33081dd4..51b512f28 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -21,6 +21,7 @@ program](https://hackerone.com/tendermint). ### IMPROVEMENTS: +- [mempool] [\#4083](https://github.com/tendermint/tendermint/pull/4083) Added TxInfo parameter to CheckTx(), and removed CheckTxWithInfo() (@erikgrinaker) - [mempool] [\#4057](https://github.com/tendermint/tendermint/issues/4057) Include peer ID when logging rejected txns (@erikgrinaker) - [tools] [\#4023](https://github.com/tendermint/tendermint/issues/4023) Improved `tm-monitor` formatting of start time and avg tx throughput (@erikgrinaker) - [libs/pubsub] [\#4070](https://github.com/tendermint/tendermint/pull/4070) No longer panic in `Query#(Matches|Conditions)` preferring to return an error instead. diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index c1d4f69a7..30c522a75 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -99,7 +99,7 @@ func deliverTxsRange(cs *ConsensusState, start, end int) { for i := start; i < end; i++ { txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) - err := assertMempool(cs.txNotifier).CheckTx(txBytes, nil) + err := assertMempool(cs.txNotifier).CheckTx(txBytes, nil, mempl.TxInfo{}) if err != nil { panic(fmt.Sprintf("Error after CheckTx: %v", err)) } @@ -159,7 +159,7 @@ func TestMempoolRmBadTx(t *testing.T) { return } checkTxRespCh <- struct{}{} - }) + }, mempl.TxInfo{}) if err != nil { t.Errorf("Error after CheckTx: %v", err) return diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 94190837d..b9ff6ea78 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -237,7 +237,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // send a tx - if err := assertMempool(css[3].txNotifier).CheckTx([]byte{1, 2, 3}, nil); err != nil { + if err := assertMempool(css[3].txNotifier).CheckTx([]byte{1, 2, 3}, nil, mempl.TxInfo{}); err != nil { t.Error(err) } @@ -532,7 +532,7 @@ func waitForAndValidateBlock( err := validateBlock(newBlock, activeVals) assert.Nil(t, err) for _, tx := range txs { - err := assertMempool(css[j].txNotifier).CheckTx(tx, nil) + err := assertMempool(css[j].txNotifier).CheckTx(tx, nil, mempl.TxInfo{}) assert.Nil(t, err) } }, css) diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 36aba9e72..b3ebf7340 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -23,6 +23,7 @@ import ( "github.com/tendermint/tendermint/crypto" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" + mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/mock" "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/proxy" @@ -104,7 +105,7 @@ func sendTxs(ctx context.Context, cs *ConsensusState) { return default: tx := []byte{byte(i)} - assertMempool(cs.txNotifier).CheckTx(tx, nil) + assertMempool(cs.txNotifier).CheckTx(tx, nil, mempl.TxInfo{}) i++ } } @@ -344,7 +345,7 @@ func TestSimulateValidatorsChange(t *testing.T) { newValidatorPubKey1 := css[nVals].privValidator.GetPubKey() valPubKey1ABCI := types.TM2PB.PubKey(newValidatorPubKey1) newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower) - err := assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil) + err := assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempl.TxInfo{}) assert.Nil(t, err) propBlock, _ := css[0].createProposalBlock() //changeProposer(t, cs1, vs2) propBlockParts := propBlock.MakePartSet(partSize) @@ -369,7 +370,7 @@ func TestSimulateValidatorsChange(t *testing.T) { updateValidatorPubKey1 := css[nVals].privValidator.GetPubKey() updatePubKey1ABCI := types.TM2PB.PubKey(updateValidatorPubKey1) updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25) - err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil) + err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempl.TxInfo{}) assert.Nil(t, err) propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2) propBlockParts = propBlock.MakePartSet(partSize) @@ -394,12 +395,12 @@ func TestSimulateValidatorsChange(t *testing.T) { newValidatorPubKey2 := css[nVals+1].privValidator.GetPubKey() newVal2ABCI := types.TM2PB.PubKey(newValidatorPubKey2) newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower) - err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil) + err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil, mempl.TxInfo{}) assert.Nil(t, err) newValidatorPubKey3 := css[nVals+2].privValidator.GetPubKey() newVal3ABCI := types.TM2PB.PubKey(newValidatorPubKey3) newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower) - err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil) + err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempl.TxInfo{}) assert.Nil(t, err) propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2) propBlockParts = propBlock.MakePartSet(partSize) @@ -427,7 +428,7 @@ func TestSimulateValidatorsChange(t *testing.T) { ensureNewProposal(proposalCh, height, round) removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0) - err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, nil) + err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, nil, mempl.TxInfo{}) assert.Nil(t, err) rs = css[0].GetRoundState() @@ -457,7 +458,7 @@ func TestSimulateValidatorsChange(t *testing.T) { height++ incrementHeight(vss...) removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0) - err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil) + err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempl.TxInfo{}) assert.Nil(t, err) propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2) propBlockParts = propBlock.MakePartSet(partSize) diff --git a/mempool/bench_test.go b/mempool/bench_test.go index 0cd394cd6..d6d9f0d14 100644 --- a/mempool/bench_test.go +++ b/mempool/bench_test.go @@ -18,7 +18,7 @@ func BenchmarkReap(b *testing.B) { for i := 0; i < size; i++ { tx := make([]byte, 8) binary.BigEndian.PutUint64(tx, uint64(i)) - mempool.CheckTx(tx, nil) + mempool.CheckTx(tx, nil, TxInfo{}) } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -35,7 +35,7 @@ func BenchmarkCheckTx(b *testing.B) { for i := 0; i < b.N; i++ { tx := make([]byte, 8) binary.BigEndian.PutUint64(tx, uint64(i)) - mempool.CheckTx(tx, nil) + mempool.CheckTx(tx, nil, TxInfo{}) } } diff --git a/mempool/cache_test.go b/mempool/cache_test.go index 539bf1197..3eec6322e 100644 --- a/mempool/cache_test.go +++ b/mempool/cache_test.go @@ -58,7 +58,7 @@ func TestCacheAfterUpdate(t *testing.T) { for tcIndex, tc := range tests { for i := 0; i < tc.numTxsToCreate; i++ { tx := types.Tx{byte(i)} - err := mempool.CheckTx(tx, nil) + err := mempool.CheckTx(tx, nil, TxInfo{}) require.NoError(t, err) } @@ -71,7 +71,7 @@ func TestCacheAfterUpdate(t *testing.T) { for _, v := range tc.reAddIndices { tx := types.Tx{byte(v)} - _ = mempool.CheckTx(tx, nil) + _ = mempool.CheckTx(tx, nil, TxInfo{}) } cache := mempool.cache.(*mapTxCache) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 50a37e5aa..8c7804992 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -209,11 +209,7 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} { // cb: A callback from the CheckTx command. // It gets called from another goroutine. // CONTRACT: Either cb will get called, or err returned. -func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { - return mem.CheckTxWithInfo(tx, cb, TxInfo{SenderID: UnknownPeerID}) -} - -func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) { +func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) { mem.proxyMtx.Lock() // use defer to unlock mutex because application (*local client*) might panic defer mem.proxyMtx.Unlock() @@ -314,7 +310,7 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { // External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called // when all other response processing is complete. // -// Used in CheckTxWithInfo to record PeerID who sent us the tx. +// Used in CheckTx to record PeerID who sent us the tx. func (mem *CListMempool) reqResCb( tx []byte, peerID uint16, diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 63139bbed..3e52c0319 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -76,7 +76,7 @@ func checkTxs(t *testing.T, mempool Mempool, count int, peerID uint16) types.Txs if err != nil { t.Error(err) } - if err := mempool.CheckTxWithInfo(txBytes, nil, txInfo); err != nil { + if err := mempool.CheckTx(txBytes, nil, txInfo); err != nil { // Skip invalid txs. // TestMempoolFilters will fail otherwise. It asserts a number of txs // returned. @@ -186,7 +186,7 @@ func TestMempoolUpdate(t *testing.T) { // 1. Adds valid txs to the cache { mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) - err := mempool.CheckTx([]byte{0x01}, nil) + err := mempool.CheckTx([]byte{0x01}, nil, TxInfo{}) if assert.Error(t, err) { assert.Equal(t, ErrTxInCache, err) } @@ -194,7 +194,7 @@ func TestMempoolUpdate(t *testing.T) { // 2. Removes valid txs from the mempool { - err := mempool.CheckTx([]byte{0x02}, nil) + err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{}) require.NoError(t, err) mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil) assert.Zero(t, mempool.Size()) @@ -202,12 +202,12 @@ func TestMempoolUpdate(t *testing.T) { // 3. Removes invalid transactions from the cache and the mempool (if present) { - err := mempool.CheckTx([]byte{0x03}, nil) + err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{}) require.NoError(t, err) mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil) assert.Zero(t, mempool.Size()) - err = mempool.CheckTx([]byte{0x03}, nil) + err = mempool.CheckTx([]byte{0x03}, nil, TxInfo{}) assert.NoError(t, err) } } @@ -277,7 +277,7 @@ func TestSerialReap(t *testing.T) { // This will succeed txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) - err := mempool.CheckTx(txBytes, nil) + err := mempool.CheckTx(txBytes, nil, TxInfo{}) _, cached := cacheMap[string(txBytes)] if cached { require.NotNil(t, err, "expected error for cached tx") @@ -287,7 +287,7 @@ func TestSerialReap(t *testing.T) { cacheMap[string(txBytes)] = struct{}{} // Duplicates are cached and should return error - err = mempool.CheckTx(txBytes, nil) + err = mempool.CheckTx(txBytes, nil, TxInfo{}) require.NotNil(t, err, "Expected error after CheckTx on duplicated tx") } } @@ -393,7 +393,7 @@ func TestMempoolCloseWAL(t *testing.T) { require.Equal(t, 1, len(m2), "expecting the wal match in") // 5. Write some contents to the WAL - mempool.CheckTx(types.Tx([]byte("foo")), nil) + mempool.CheckTx(types.Tx([]byte("foo")), nil, TxInfo{}) walFilepath := mempool.wal.Path sum1 := checksumFile(walFilepath, t) @@ -403,7 +403,7 @@ func TestMempoolCloseWAL(t *testing.T) { // 7. Invoke CloseWAL() and ensure it discards the // WAL thus any other write won't go through. mempool.CloseWAL() - mempool.CheckTx(types.Tx([]byte("bar")), nil) + mempool.CheckTx(types.Tx([]byte("bar")), nil, TxInfo{}) sum2 := checksumFile(walFilepath, t) require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded") @@ -456,7 +456,7 @@ func TestMempoolMaxMsgSize(t *testing.T) { caseString := fmt.Sprintf("case %d, len %d", i, testCase.len) tx := cmn.RandBytes(testCase.len) - err := mempl.CheckTx(tx, nil) + err := mempl.CheckTx(tx, nil, TxInfo{}) msg := &TxMessage{tx} encoded := cdc.MustMarshalBinaryBare(msg) require.Equal(t, len(encoded), txMessageSize(tx), caseString) @@ -483,7 +483,7 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 0, mempool.TxsBytes()) // 2. len(tx) after CheckTx - err := mempool.CheckTx([]byte{0x01}, nil) + err := mempool.CheckTx([]byte{0x01}, nil, TxInfo{}) require.NoError(t, err) assert.EqualValues(t, 1, mempool.TxsBytes()) @@ -492,7 +492,7 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 0, mempool.TxsBytes()) // 4. zero after Flush - err = mempool.CheckTx([]byte{0x02, 0x03}, nil) + err = mempool.CheckTx([]byte{0x02, 0x03}, nil, TxInfo{}) require.NoError(t, err) assert.EqualValues(t, 2, mempool.TxsBytes()) @@ -500,9 +500,9 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 0, mempool.TxsBytes()) // 5. ErrMempoolIsFull is returned when/if MaxTxsBytes limit is reached. - err = mempool.CheckTx([]byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04}, nil) + err = mempool.CheckTx([]byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04}, nil, TxInfo{}) require.NoError(t, err) - err = mempool.CheckTx([]byte{0x05}, nil) + err = mempool.CheckTx([]byte{0x05}, nil, TxInfo{}) if assert.Error(t, err) { assert.IsType(t, ErrMempoolIsFull{}, err) } @@ -516,7 +516,7 @@ func TestMempoolTxsBytes(t *testing.T) { txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(0)) - err = mempool.CheckTx(txBytes, nil) + err = mempool.CheckTx(txBytes, nil, TxInfo{}) require.NoError(t, err) assert.EqualValues(t, 8, mempool.TxsBytes()) @@ -567,7 +567,7 @@ func TestMempoolRemoteAppConcurrency(t *testing.T) { tx := txs[txNum] // this will err with ErrTxInCache many times ... - mempool.CheckTxWithInfo(tx, nil, TxInfo{SenderID: uint16(peerID)}) + mempool.CheckTx(tx, nil, TxInfo{SenderID: uint16(peerID)}) } err := mempool.FlushAppConn() require.NoError(t, err) diff --git a/mempool/mempool.go b/mempool/mempool.go index e129c642b..004c1c8d1 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -15,13 +15,7 @@ import ( type Mempool interface { // CheckTx executes a new transaction against the application to determine // its validity and whether it should be added to the mempool. - CheckTx(tx types.Tx, callback func(*abci.Response)) error - - // CheckTxWithInfo performs the same operation as CheckTx, but with extra - // meta data about the tx. - // Currently this metadata is the peer who sent it, used to prevent the tx - // from being gossiped back to them. - CheckTxWithInfo(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error + CheckTx(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes // bytes total with the condition that the total gasWanted must be less than diff --git a/mempool/reactor.go b/mempool/reactor.go index 35797b6c5..3388d4e4e 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -169,7 +169,7 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { if src != nil { txInfo.SenderP2PID = src.ID() } - err := memR.mempool.CheckTxWithInfo(msg.Tx, nil, txInfo) + err := memR.mempool.CheckTx(msg.Tx, nil, txInfo) if err != nil { memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) } diff --git a/mock/mempool.go b/mock/mempool.go index cebe156ba..8c5b6e38f 100644 --- a/mock/mempool.go +++ b/mock/mempool.go @@ -15,11 +15,7 @@ var _ mempl.Mempool = Mempool{} func (Mempool) Lock() {} func (Mempool) Unlock() {} func (Mempool) Size() int { return 0 } -func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error { - return nil -} -func (Mempool) CheckTxWithInfo(_ types.Tx, _ func(*abci.Response), - _ mempl.TxInfo) error { +func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { return nil } func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } diff --git a/node/node_test.go b/node/node_test.go index 6cdaceffb..f93fcd2b0 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -267,7 +267,7 @@ func TestCreateProposalBlock(t *testing.T) { txLength := 1000 for i := 0; i < maxBytes/txLength; i++ { tx := cmn.RandBytes(txLength) - err := mempool.CheckTx(tx, nil) + err := mempool.CheckTx(tx, nil, mempl.TxInfo{}) assert.NoError(t, err) } diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index f35669ecf..d325384dd 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -17,6 +17,7 @@ import ( "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/tmhash" cmn "github.com/tendermint/tendermint/libs/common" + mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -312,7 +313,7 @@ func TestUnconfirmedTxs(t *testing.T) { _, _, tx := MakeTxKV() mempool := node.Mempool() - _ = mempool.CheckTx(tx, nil) + _ = mempool.CheckTx(tx, nil, mempl.TxInfo{}) for i, c := range GetClients() { mc, ok := c.(client.MempoolClient) @@ -333,7 +334,7 @@ func TestNumUnconfirmedTxs(t *testing.T) { _, _, tx := MakeTxKV() mempool := node.Mempool() - _ = mempool.CheckTx(tx, nil) + _ = mempool.CheckTx(tx, nil, mempl.TxInfo{}) mempoolSize := mempool.Size() for i, c := range GetClients() { diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index ba1ed291d..86d180b18 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" abci "github.com/tendermint/tendermint/abci/types" + mempl "github.com/tendermint/tendermint/mempool" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpctypes "github.com/tendermint/tendermint/rpc/lib/types" "github.com/tendermint/tendermint/types" @@ -73,7 +74,8 @@ import ( // |-----------+------+---------+----------+-----------------| // | tx | Tx | nil | true | The transaction | func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - err := mempool.CheckTx(tx, nil) + err := mempool.CheckTx(tx, nil, mempl.TxInfo{}) + if err != nil { return nil, err } @@ -137,7 +139,7 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas resCh := make(chan *abci.Response, 1) err := mempool.CheckTx(tx, func(res *abci.Response) { resCh <- res - }) + }, mempl.TxInfo{}) if err != nil { return nil, err } @@ -237,7 +239,7 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc checkTxResCh := make(chan *abci.Response, 1) err = mempool.CheckTx(tx, func(res *abci.Response) { checkTxResCh <- res - }) + }, mempl.TxInfo{}) if err != nil { logger.Error("Error on broadcastTxCommit", "err", err) return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err)