You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

621 lines
18 KiB

mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
mempool: move interface into mempool package (#3524) ## Description Refs #2659 Breaking changes in the mempool package: [mempool] #2659 Mempool now an interface old Mempool renamed to CListMempool NewMempool renamed to NewCListMempool Option renamed to CListOption MempoolReactor renamed to Reactor NewMempoolReactor renamed to NewReactor unexpose TxID method TxInfo.PeerID renamed to SenderID unexpose MempoolReactor.Mempool Breaking changes in the state package: [state] #2659 Mempool interface moved to mempool package MockMempool moved to top-level mock package and renamed to Mempool Non Breaking changes in the node package: [node] #2659 Add Mempool method, which allows you to access mempool ## Commits * move Mempool interface into mempool package Refs #2659 Breaking changes in the mempool package: - Mempool now an interface - old Mempool renamed to CListMempool Breaking changes to state package: - MockMempool moved to mempool/mock package and renamed to Mempool - Mempool interface moved to mempool package * assert CListMempool impl Mempool * gofmt code * rename MempoolReactor to Reactor - combine everything into one interface - rename TxInfo.PeerID to TxInfo.SenderID - unexpose MempoolReactor.Mempool * move mempool mock into top-level mock package * add a fixme TxsFront should not be a part of the Mempool interface because it leaks implementation details. Instead, we need to come up with general interface for querying the mempool so the MempoolReactor can fetch and broadcast txs to peers. * change node#Mempool to return interface * save commit = new reactor arch * Revert "save commit = new reactor arch" This reverts commit 1bfceacd9d65a720574683a7f22771e69af9af4d. * require CListMempool in mempool.Reactor * add two changelog entries * fixes after my own review * quote interfaces, structs and functions * fixes after Ismail's review * make node's mempool an interface * make InitWAL/CloseWAL methods a part of Mempool interface * fix merge conflicts * make node's mempool an interface
6 years ago
mempool: move interface into mempool package (#3524) ## Description Refs #2659 Breaking changes in the mempool package: [mempool] #2659 Mempool now an interface old Mempool renamed to CListMempool NewMempool renamed to NewCListMempool Option renamed to CListOption MempoolReactor renamed to Reactor NewMempoolReactor renamed to NewReactor unexpose TxID method TxInfo.PeerID renamed to SenderID unexpose MempoolReactor.Mempool Breaking changes in the state package: [state] #2659 Mempool interface moved to mempool package MockMempool moved to top-level mock package and renamed to Mempool Non Breaking changes in the node package: [node] #2659 Add Mempool method, which allows you to access mempool ## Commits * move Mempool interface into mempool package Refs #2659 Breaking changes in the mempool package: - Mempool now an interface - old Mempool renamed to CListMempool Breaking changes to state package: - MockMempool moved to mempool/mock package and renamed to Mempool - Mempool interface moved to mempool package * assert CListMempool impl Mempool * gofmt code * rename MempoolReactor to Reactor - combine everything into one interface - rename TxInfo.PeerID to TxInfo.SenderID - unexpose MempoolReactor.Mempool * move mempool mock into top-level mock package * add a fixme TxsFront should not be a part of the Mempool interface because it leaks implementation details. Instead, we need to come up with general interface for querying the mempool so the MempoolReactor can fetch and broadcast txs to peers. * change node#Mempool to return interface * save commit = new reactor arch * Revert "save commit = new reactor arch" This reverts commit 1bfceacd9d65a720574683a7f22771e69af9af4d. * require CListMempool in mempool.Reactor * add two changelog entries * fixes after my own review * quote interfaces, structs and functions * fixes after Ismail's review * make node's mempool an interface * make InitWAL/CloseWAL methods a part of Mempool interface * fix merge conflicts * make node's mempool an interface
6 years ago
8 years ago
lint: Enable Golint (#4212) * Fix many golint errors * Fix golint errors in the 'lite' package * Don't export Pool.store * Fix typo * Revert unwanted changes * Fix errors in counter package * Fix linter errors in kvstore package * Fix linter error in example package * Fix error in tests package * Fix linter errors in v2 package * Fix linter errors in consensus package * Fix linter errors in evidence package * Fix linter error in fail package * Fix linter errors in query package * Fix linter errors in core package * Fix linter errors in node package * Fix linter errors in mempool package * Fix linter error in conn package * Fix linter errors in pex package * Rename PEXReactor export to Reactor * Fix linter errors in trust package * Fix linter errors in upnp package * Fix linter errors in p2p package * Fix linter errors in proxy package * Fix linter errors in mock_test package * Fix linter error in client_test package * Fix linter errors in coretypes package * Fix linter errors in coregrpc package * Fix linter errors in rpcserver package * Fix linter errors in rpctypes package * Fix linter errors in rpctest package * Fix linter error in json2wal script * Fix linter error in wal2json script * Fix linter errors in kv package * Fix linter error in state package * Fix linter error in grpc_client * Fix linter errors in types package * Fix linter error in version package * Fix remaining errors * Address review comments * Fix broken tests * Reconcile package coregrpc * Fix golangci bot error * Fix new golint errors * Fix broken reference * Enable golint linter * minor changes to bring golint into line * fix failing test * fix pex reactor naming * address PR comments
5 years ago
lint: Enable Golint (#4212) * Fix many golint errors * Fix golint errors in the 'lite' package * Don't export Pool.store * Fix typo * Revert unwanted changes * Fix errors in counter package * Fix linter errors in kvstore package * Fix linter error in example package * Fix error in tests package * Fix linter errors in v2 package * Fix linter errors in consensus package * Fix linter errors in evidence package * Fix linter error in fail package * Fix linter errors in query package * Fix linter errors in core package * Fix linter errors in node package * Fix linter errors in mempool package * Fix linter error in conn package * Fix linter errors in pex package * Rename PEXReactor export to Reactor * Fix linter errors in trust package * Fix linter errors in upnp package * Fix linter errors in p2p package * Fix linter errors in proxy package * Fix linter errors in mock_test package * Fix linter error in client_test package * Fix linter errors in coretypes package * Fix linter errors in coregrpc package * Fix linter errors in rpcserver package * Fix linter errors in rpctypes package * Fix linter errors in rpctest package * Fix linter error in json2wal script * Fix linter error in wal2json script * Fix linter errors in kv package * Fix linter error in state package * Fix linter error in grpc_client * Fix linter errors in types package * Fix linter error in version package * Fix remaining errors * Address review comments * Fix broken tests * Reconcile package coregrpc * Fix golangci bot error * Fix new golint errors * Fix broken reference * Enable golint linter * minor changes to bring golint into line * fix failing test * fix pex reactor naming * address PR comments
5 years ago
lint: Enable Golint (#4212) * Fix many golint errors * Fix golint errors in the 'lite' package * Don't export Pool.store * Fix typo * Revert unwanted changes * Fix errors in counter package * Fix linter errors in kvstore package * Fix linter error in example package * Fix error in tests package * Fix linter errors in v2 package * Fix linter errors in consensus package * Fix linter errors in evidence package * Fix linter error in fail package * Fix linter errors in query package * Fix linter errors in core package * Fix linter errors in node package * Fix linter errors in mempool package * Fix linter error in conn package * Fix linter errors in pex package * Rename PEXReactor export to Reactor * Fix linter errors in trust package * Fix linter errors in upnp package * Fix linter errors in p2p package * Fix linter errors in proxy package * Fix linter errors in mock_test package * Fix linter error in client_test package * Fix linter errors in coretypes package * Fix linter errors in coregrpc package * Fix linter errors in rpcserver package * Fix linter errors in rpctypes package * Fix linter errors in rpctest package * Fix linter error in json2wal script * Fix linter error in wal2json script * Fix linter errors in kv package * Fix linter error in state package * Fix linter error in grpc_client * Fix linter errors in types package * Fix linter error in version package * Fix remaining errors * Address review comments * Fix broken tests * Reconcile package coregrpc * Fix golangci bot error * Fix new golint errors * Fix broken reference * Enable golint linter * minor changes to bring golint into line * fix failing test * fix pex reactor naming * address PR comments
5 years ago
lint: Enable Golint (#4212) * Fix many golint errors * Fix golint errors in the 'lite' package * Don't export Pool.store * Fix typo * Revert unwanted changes * Fix errors in counter package * Fix linter errors in kvstore package * Fix linter error in example package * Fix error in tests package * Fix linter errors in v2 package * Fix linter errors in consensus package * Fix linter errors in evidence package * Fix linter error in fail package * Fix linter errors in query package * Fix linter errors in core package * Fix linter errors in node package * Fix linter errors in mempool package * Fix linter error in conn package * Fix linter errors in pex package * Rename PEXReactor export to Reactor * Fix linter errors in trust package * Fix linter errors in upnp package * Fix linter errors in p2p package * Fix linter errors in proxy package * Fix linter errors in mock_test package * Fix linter error in client_test package * Fix linter errors in coretypes package * Fix linter errors in coregrpc package * Fix linter errors in rpcserver package * Fix linter errors in rpctypes package * Fix linter errors in rpctest package * Fix linter error in json2wal script * Fix linter error in wal2json script * Fix linter errors in kv package * Fix linter error in state package * Fix linter error in grpc_client * Fix linter errors in types package * Fix linter error in version package * Fix remaining errors * Address review comments * Fix broken tests * Reconcile package coregrpc * Fix golangci bot error * Fix new golint errors * Fix broken reference * Enable golint linter * minor changes to bring golint into line * fix failing test * fix pex reactor naming * address PR comments
5 years ago
lint: Enable Golint (#4212) * Fix many golint errors * Fix golint errors in the 'lite' package * Don't export Pool.store * Fix typo * Revert unwanted changes * Fix errors in counter package * Fix linter errors in kvstore package * Fix linter error in example package * Fix error in tests package * Fix linter errors in v2 package * Fix linter errors in consensus package * Fix linter errors in evidence package * Fix linter error in fail package * Fix linter errors in query package * Fix linter errors in core package * Fix linter errors in node package * Fix linter errors in mempool package * Fix linter error in conn package * Fix linter errors in pex package * Rename PEXReactor export to Reactor * Fix linter errors in trust package * Fix linter errors in upnp package * Fix linter errors in p2p package * Fix linter errors in proxy package * Fix linter errors in mock_test package * Fix linter error in client_test package * Fix linter errors in coretypes package * Fix linter errors in coregrpc package * Fix linter errors in rpcserver package * Fix linter errors in rpctypes package * Fix linter errors in rpctest package * Fix linter error in json2wal script * Fix linter error in wal2json script * Fix linter errors in kv package * Fix linter error in state package * Fix linter error in grpc_client * Fix linter errors in types package * Fix linter error in version package * Fix remaining errors * Address review comments * Fix broken tests * Reconcile package coregrpc * Fix golangci bot error * Fix new golint errors * Fix broken reference * Enable golint linter * minor changes to bring golint into line * fix failing test * fix pex reactor naming * address PR comments
5 years ago
8 years ago
lint: Enable Golint (#4212) * Fix many golint errors * Fix golint errors in the 'lite' package * Don't export Pool.store * Fix typo * Revert unwanted changes * Fix errors in counter package * Fix linter errors in kvstore package * Fix linter error in example package * Fix error in tests package * Fix linter errors in v2 package * Fix linter errors in consensus package * Fix linter errors in evidence package * Fix linter error in fail package * Fix linter errors in query package * Fix linter errors in core package * Fix linter errors in node package * Fix linter errors in mempool package * Fix linter error in conn package * Fix linter errors in pex package * Rename PEXReactor export to Reactor * Fix linter errors in trust package * Fix linter errors in upnp package * Fix linter errors in p2p package * Fix linter errors in proxy package * Fix linter errors in mock_test package * Fix linter error in client_test package * Fix linter errors in coretypes package * Fix linter errors in coregrpc package * Fix linter errors in rpcserver package * Fix linter errors in rpctypes package * Fix linter errors in rpctest package * Fix linter error in json2wal script * Fix linter error in wal2json script * Fix linter errors in kv package * Fix linter error in state package * Fix linter error in grpc_client * Fix linter errors in types package * Fix linter error in version package * Fix remaining errors * Address review comments * Fix broken tests * Reconcile package coregrpc * Fix golangci bot error * Fix new golint errors * Fix broken reference * Enable golint linter * minor changes to bring golint into line * fix failing test * fix pex reactor naming * address PR comments
5 years ago
lint: Enable Golint (#4212) * Fix many golint errors * Fix golint errors in the 'lite' package * Don't export Pool.store * Fix typo * Revert unwanted changes * Fix errors in counter package * Fix linter errors in kvstore package * Fix linter error in example package * Fix error in tests package * Fix linter errors in v2 package * Fix linter errors in consensus package * Fix linter errors in evidence package * Fix linter error in fail package * Fix linter errors in query package * Fix linter errors in core package * Fix linter errors in node package * Fix linter errors in mempool package * Fix linter error in conn package * Fix linter errors in pex package * Rename PEXReactor export to Reactor * Fix linter errors in trust package * Fix linter errors in upnp package * Fix linter errors in p2p package * Fix linter errors in proxy package * Fix linter errors in mock_test package * Fix linter error in client_test package * Fix linter errors in coretypes package * Fix linter errors in coregrpc package * Fix linter errors in rpcserver package * Fix linter errors in rpctypes package * Fix linter errors in rpctest package * Fix linter error in json2wal script * Fix linter error in wal2json script * Fix linter errors in kv package * Fix linter error in state package * Fix linter error in grpc_client * Fix linter errors in types package * Fix linter error in version package * Fix remaining errors * Address review comments * Fix broken tests * Reconcile package coregrpc * Fix golangci bot error * Fix new golint errors * Fix broken reference * Enable golint linter * minor changes to bring golint into line * fix failing test * fix pex reactor naming * address PR comments
5 years ago
lint: Enable Golint (#4212) * Fix many golint errors * Fix golint errors in the 'lite' package * Don't export Pool.store * Fix typo * Revert unwanted changes * Fix errors in counter package * Fix linter errors in kvstore package * Fix linter error in example package * Fix error in tests package * Fix linter errors in v2 package * Fix linter errors in consensus package * Fix linter errors in evidence package * Fix linter error in fail package * Fix linter errors in query package * Fix linter errors in core package * Fix linter errors in node package * Fix linter errors in mempool package * Fix linter error in conn package * Fix linter errors in pex package * Rename PEXReactor export to Reactor * Fix linter errors in trust package * Fix linter errors in upnp package * Fix linter errors in p2p package * Fix linter errors in proxy package * Fix linter errors in mock_test package * Fix linter error in client_test package * Fix linter errors in coretypes package * Fix linter errors in coregrpc package * Fix linter errors in rpcserver package * Fix linter errors in rpctypes package * Fix linter errors in rpctest package * Fix linter error in json2wal script * Fix linter error in wal2json script * Fix linter errors in kv package * Fix linter error in state package * Fix linter error in grpc_client * Fix linter errors in types package * Fix linter error in version package * Fix remaining errors * Address review comments * Fix broken tests * Reconcile package coregrpc * Fix golangci bot error * Fix new golint errors * Fix broken reference * Enable golint linter * minor changes to bring golint into line * fix failing test * fix pex reactor naming * address PR comments
5 years ago
lint: Enable Golint (#4212) * Fix many golint errors * Fix golint errors in the 'lite' package * Don't export Pool.store * Fix typo * Revert unwanted changes * Fix errors in counter package * Fix linter errors in kvstore package * Fix linter error in example package * Fix error in tests package * Fix linter errors in v2 package * Fix linter errors in consensus package * Fix linter errors in evidence package * Fix linter error in fail package * Fix linter errors in query package * Fix linter errors in core package * Fix linter errors in node package * Fix linter errors in mempool package * Fix linter error in conn package * Fix linter errors in pex package * Rename PEXReactor export to Reactor * Fix linter errors in trust package * Fix linter errors in upnp package * Fix linter errors in p2p package * Fix linter errors in proxy package * Fix linter errors in mock_test package * Fix linter error in client_test package * Fix linter errors in coretypes package * Fix linter errors in coregrpc package * Fix linter errors in rpcserver package * Fix linter errors in rpctypes package * Fix linter errors in rpctest package * Fix linter error in json2wal script * Fix linter error in wal2json script * Fix linter errors in kv package * Fix linter error in state package * Fix linter error in grpc_client * Fix linter errors in types package * Fix linter error in version package * Fix remaining errors * Address review comments * Fix broken tests * Reconcile package coregrpc * Fix golangci bot error * Fix new golint errors * Fix broken reference * Enable golint linter * minor changes to bring golint into line * fix failing test * fix pex reactor naming * address PR comments
5 years ago
  1. package v0
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "encoding/binary"
  6. "fmt"
  7. mrand "math/rand"
  8. "os"
  9. "testing"
  10. "time"
  11. "github.com/gogo/protobuf/proto"
  12. gogotypes "github.com/gogo/protobuf/types"
  13. "github.com/stretchr/testify/assert"
  14. "github.com/stretchr/testify/require"
  15. "github.com/tendermint/tendermint/abci/example/counter"
  16. "github.com/tendermint/tendermint/abci/example/kvstore"
  17. abciserver "github.com/tendermint/tendermint/abci/server"
  18. abci "github.com/tendermint/tendermint/abci/types"
  19. cfg "github.com/tendermint/tendermint/config"
  20. "github.com/tendermint/tendermint/internal/mempool"
  21. "github.com/tendermint/tendermint/libs/log"
  22. tmrand "github.com/tendermint/tendermint/libs/rand"
  23. "github.com/tendermint/tendermint/libs/service"
  24. "github.com/tendermint/tendermint/proxy"
  25. "github.com/tendermint/tendermint/types"
  26. )
  27. // A cleanupFunc cleans up any config / test files created for a particular
  28. // test.
  29. type cleanupFunc func()
  30. func newMempoolWithApp(cc proxy.ClientCreator) (*CListMempool, cleanupFunc) {
  31. return newMempoolWithAppAndConfig(cc, cfg.ResetTestRoot("mempool_test"))
  32. }
  33. func newMempoolWithAppAndConfig(cc proxy.ClientCreator, config *cfg.Config) (*CListMempool, cleanupFunc) {
  34. appConnMem, _ := cc.NewABCIClient()
  35. appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
  36. err := appConnMem.Start()
  37. if err != nil {
  38. panic(err)
  39. }
  40. mp := NewCListMempool(config.Mempool, appConnMem, 0)
  41. mp.SetLogger(log.TestingLogger())
  42. return mp, func() { os.RemoveAll(config.RootDir) }
  43. }
  44. func ensureNoFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
  45. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  46. select {
  47. case <-ch:
  48. t.Fatal("Expected not to fire")
  49. case <-timer.C:
  50. }
  51. }
  52. func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
  53. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  54. select {
  55. case <-ch:
  56. case <-timer.C:
  57. t.Fatal("Expected to fire")
  58. }
  59. }
  60. func checkTxs(t *testing.T, mp mempool.Mempool, count int, peerID uint16) types.Txs {
  61. txs := make(types.Txs, count)
  62. txInfo := mempool.TxInfo{SenderID: peerID}
  63. for i := 0; i < count; i++ {
  64. txBytes := make([]byte, 20)
  65. txs[i] = txBytes
  66. _, err := rand.Read(txBytes)
  67. if err != nil {
  68. t.Error(err)
  69. }
  70. if err := mp.CheckTx(context.Background(), txBytes, nil, txInfo); err != nil {
  71. // Skip invalid txs.
  72. // TestMempoolFilters will fail otherwise. It asserts a number of txs
  73. // returned.
  74. if mempool.IsPreCheckError(err) {
  75. continue
  76. }
  77. t.Fatalf("CheckTx failed: %v while checking #%d tx", err, i)
  78. }
  79. }
  80. return txs
  81. }
  82. func TestReapMaxBytesMaxGas(t *testing.T) {
  83. app := kvstore.NewApplication()
  84. cc := proxy.NewLocalClientCreator(app)
  85. mp, cleanup := newMempoolWithApp(cc)
  86. defer cleanup()
  87. // Ensure gas calculation behaves as expected
  88. checkTxs(t, mp, 1, mempool.UnknownPeerID)
  89. tx0 := mp.TxsFront().Value.(*mempoolTx)
  90. // assert that kv store has gas wanted = 1.
  91. require.Equal(t, app.CheckTx(abci.RequestCheckTx{Tx: tx0.tx}).GasWanted, int64(1), "KVStore had a gas value neq to 1")
  92. require.Equal(t, tx0.gasWanted, int64(1), "transactions gas was set incorrectly")
  93. // ensure each tx is 20 bytes long
  94. require.Equal(t, len(tx0.tx), 20, "Tx is longer than 20 bytes")
  95. mp.Flush()
  96. // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
  97. // each tx has 20 bytes
  98. tests := []struct {
  99. numTxsToCreate int
  100. maxBytes int64
  101. maxGas int64
  102. expectedNumTxs int
  103. }{
  104. {20, -1, -1, 20},
  105. {20, -1, 0, 0},
  106. {20, -1, 10, 10},
  107. {20, -1, 30, 20},
  108. {20, 0, -1, 0},
  109. {20, 0, 10, 0},
  110. {20, 10, 10, 0},
  111. {20, 24, 10, 1},
  112. {20, 240, 5, 5},
  113. {20, 240, -1, 10},
  114. {20, 240, 10, 10},
  115. {20, 240, 15, 10},
  116. {20, 20000, -1, 20},
  117. {20, 20000, 5, 5},
  118. {20, 20000, 30, 20},
  119. }
  120. for tcIndex, tt := range tests {
  121. checkTxs(t, mp, tt.numTxsToCreate, mempool.UnknownPeerID)
  122. got := mp.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas)
  123. assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d",
  124. len(got), tt.expectedNumTxs, tcIndex)
  125. mp.Flush()
  126. }
  127. }
  128. func TestMempoolFilters(t *testing.T) {
  129. app := kvstore.NewApplication()
  130. cc := proxy.NewLocalClientCreator(app)
  131. mp, cleanup := newMempoolWithApp(cc)
  132. defer cleanup()
  133. emptyTxArr := []types.Tx{[]byte{}}
  134. nopPreFilter := func(tx types.Tx) error { return nil }
  135. nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil }
  136. // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
  137. // each tx has 20 bytes
  138. tests := []struct {
  139. numTxsToCreate int
  140. preFilter mempool.PreCheckFunc
  141. postFilter mempool.PostCheckFunc
  142. expectedNumTxs int
  143. }{
  144. {10, nopPreFilter, nopPostFilter, 10},
  145. {10, mempool.PreCheckMaxBytes(10), nopPostFilter, 0},
  146. {10, mempool.PreCheckMaxBytes(22), nopPostFilter, 10},
  147. {10, nopPreFilter, mempool.PostCheckMaxGas(-1), 10},
  148. {10, nopPreFilter, mempool.PostCheckMaxGas(0), 0},
  149. {10, nopPreFilter, mempool.PostCheckMaxGas(1), 10},
  150. {10, nopPreFilter, mempool.PostCheckMaxGas(3000), 10},
  151. {10, mempool.PreCheckMaxBytes(10), mempool.PostCheckMaxGas(20), 0},
  152. {10, mempool.PreCheckMaxBytes(30), mempool.PostCheckMaxGas(20), 10},
  153. {10, mempool.PreCheckMaxBytes(22), mempool.PostCheckMaxGas(1), 10},
  154. {10, mempool.PreCheckMaxBytes(22), mempool.PostCheckMaxGas(0), 0},
  155. }
  156. for tcIndex, tt := range tests {
  157. err := mp.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter)
  158. require.NoError(t, err)
  159. checkTxs(t, mp, tt.numTxsToCreate, mempool.UnknownPeerID)
  160. require.Equal(t, tt.expectedNumTxs, mp.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
  161. mp.Flush()
  162. }
  163. }
  164. func TestMempoolUpdate(t *testing.T) {
  165. app := kvstore.NewApplication()
  166. cc := proxy.NewLocalClientCreator(app)
  167. mp, cleanup := newMempoolWithApp(cc)
  168. defer cleanup()
  169. // 1. Adds valid txs to the cache
  170. {
  171. err := mp.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
  172. require.NoError(t, err)
  173. err = mp.CheckTx(context.Background(), []byte{0x01}, nil, mempool.TxInfo{})
  174. require.NoError(t, err)
  175. }
  176. // 2. Removes valid txs from the mempool
  177. {
  178. err := mp.CheckTx(context.Background(), []byte{0x02}, nil, mempool.TxInfo{})
  179. require.NoError(t, err)
  180. err = mp.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
  181. require.NoError(t, err)
  182. assert.Zero(t, mp.Size())
  183. }
  184. // 3. Removes invalid transactions from the cache and the mempool (if present)
  185. {
  186. err := mp.CheckTx(context.Background(), []byte{0x03}, nil, mempool.TxInfo{})
  187. require.NoError(t, err)
  188. err = mp.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil)
  189. require.NoError(t, err)
  190. assert.Zero(t, mp.Size())
  191. err = mp.CheckTx(context.Background(), []byte{0x03}, nil, mempool.TxInfo{})
  192. require.NoError(t, err)
  193. }
  194. }
  195. func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
  196. app := counter.NewApplication(true)
  197. cc := proxy.NewLocalClientCreator(app)
  198. wcfg := cfg.DefaultConfig()
  199. wcfg.Mempool.KeepInvalidTxsInCache = true
  200. mp, cleanup := newMempoolWithAppAndConfig(cc, wcfg)
  201. defer cleanup()
  202. // 1. An invalid transaction must remain in the cache after Update
  203. {
  204. a := make([]byte, 8)
  205. binary.BigEndian.PutUint64(a, 0)
  206. b := make([]byte, 8)
  207. binary.BigEndian.PutUint64(b, 1)
  208. err := mp.CheckTx(context.Background(), b, nil, mempool.TxInfo{})
  209. require.NoError(t, err)
  210. // simulate new block
  211. _ = app.DeliverTx(abci.RequestDeliverTx{Tx: a})
  212. _ = app.DeliverTx(abci.RequestDeliverTx{Tx: b})
  213. err = mp.Update(1, []types.Tx{a, b},
  214. []*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}, {Code: 2}}, nil, nil)
  215. require.NoError(t, err)
  216. // a must be added to the cache
  217. err = mp.CheckTx(context.Background(), a, nil, mempool.TxInfo{})
  218. require.NoError(t, err)
  219. // b must remain in the cache
  220. err = mp.CheckTx(context.Background(), b, nil, mempool.TxInfo{})
  221. require.NoError(t, err)
  222. }
  223. // 2. An invalid transaction must remain in the cache
  224. {
  225. a := make([]byte, 8)
  226. binary.BigEndian.PutUint64(a, 0)
  227. // remove a from the cache to test (2)
  228. mp.cache.Remove(a)
  229. err := mp.CheckTx(context.Background(), a, nil, mempool.TxInfo{})
  230. require.NoError(t, err)
  231. }
  232. }
  233. func TestTxsAvailable(t *testing.T) {
  234. app := kvstore.NewApplication()
  235. cc := proxy.NewLocalClientCreator(app)
  236. mp, cleanup := newMempoolWithApp(cc)
  237. defer cleanup()
  238. mp.EnableTxsAvailable()
  239. timeoutMS := 500
  240. // with no txs, it shouldnt fire
  241. ensureNoFire(t, mp.TxsAvailable(), timeoutMS)
  242. // send a bunch of txs, it should only fire once
  243. txs := checkTxs(t, mp, 100, mempool.UnknownPeerID)
  244. ensureFire(t, mp.TxsAvailable(), timeoutMS)
  245. ensureNoFire(t, mp.TxsAvailable(), timeoutMS)
  246. // call update with half the txs.
  247. // it should fire once now for the new height
  248. // since there are still txs left
  249. committedTxs, txs := txs[:50], txs[50:]
  250. if err := mp.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
  251. t.Error(err)
  252. }
  253. ensureFire(t, mp.TxsAvailable(), timeoutMS)
  254. ensureNoFire(t, mp.TxsAvailable(), timeoutMS)
  255. // send a bunch more txs. we already fired for this height so it shouldnt fire again
  256. moreTxs := checkTxs(t, mp, 50, mempool.UnknownPeerID)
  257. ensureNoFire(t, mp.TxsAvailable(), timeoutMS)
  258. // now call update with all the txs. it should not fire as there are no txs left
  259. committedTxs = append(txs, moreTxs...) //nolint: gocritic
  260. if err := mp.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
  261. t.Error(err)
  262. }
  263. ensureNoFire(t, mp.TxsAvailable(), timeoutMS)
  264. // send a bunch more txs, it should only fire once
  265. checkTxs(t, mp, 100, mempool.UnknownPeerID)
  266. ensureFire(t, mp.TxsAvailable(), timeoutMS)
  267. ensureNoFire(t, mp.TxsAvailable(), timeoutMS)
  268. }
  269. func TestSerialReap(t *testing.T) {
  270. app := counter.NewApplication(true)
  271. cc := proxy.NewLocalClientCreator(app)
  272. mp, cleanup := newMempoolWithApp(cc)
  273. defer cleanup()
  274. appConnCon, _ := cc.NewABCIClient()
  275. appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
  276. err := appConnCon.Start()
  277. require.Nil(t, err)
  278. cacheMap := make(map[string]struct{})
  279. deliverTxsRange := func(start, end int) {
  280. // Deliver some txs.
  281. for i := start; i < end; i++ {
  282. // This will succeed
  283. txBytes := make([]byte, 8)
  284. binary.BigEndian.PutUint64(txBytes, uint64(i))
  285. err := mp.CheckTx(context.Background(), txBytes, nil, mempool.TxInfo{})
  286. _, cached := cacheMap[string(txBytes)]
  287. if cached {
  288. require.NotNil(t, err, "expected error for cached tx")
  289. } else {
  290. require.Nil(t, err, "expected no err for uncached tx")
  291. }
  292. cacheMap[string(txBytes)] = struct{}{}
  293. // Duplicates are cached and should return error
  294. err = mp.CheckTx(context.Background(), txBytes, nil, mempool.TxInfo{})
  295. require.NotNil(t, err, "Expected error after CheckTx on duplicated tx")
  296. }
  297. }
  298. reapCheck := func(exp int) {
  299. txs := mp.ReapMaxBytesMaxGas(-1, -1)
  300. require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs)))
  301. }
  302. updateRange := func(start, end int) {
  303. txs := make([]types.Tx, 0)
  304. for i := start; i < end; i++ {
  305. txBytes := make([]byte, 8)
  306. binary.BigEndian.PutUint64(txBytes, uint64(i))
  307. txs = append(txs, txBytes)
  308. }
  309. if err := mp.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil {
  310. t.Error(err)
  311. }
  312. }
  313. commitRange := func(start, end int) {
  314. ctx := context.Background()
  315. // Deliver some txs.
  316. for i := start; i < end; i++ {
  317. txBytes := make([]byte, 8)
  318. binary.BigEndian.PutUint64(txBytes, uint64(i))
  319. res, err := appConnCon.DeliverTxSync(ctx, abci.RequestDeliverTx{Tx: txBytes})
  320. if err != nil {
  321. t.Errorf("client error committing tx: %v", err)
  322. }
  323. if res.IsErr() {
  324. t.Errorf("error committing tx. Code:%v result:%X log:%v",
  325. res.Code, res.Data, res.Log)
  326. }
  327. }
  328. res, err := appConnCon.CommitSync(ctx)
  329. if err != nil {
  330. t.Errorf("client error committing: %v", err)
  331. }
  332. if len(res.Data) != 8 {
  333. t.Errorf("error committing. Hash:%X", res.Data)
  334. }
  335. }
  336. //----------------------------------------
  337. // Deliver some txs.
  338. deliverTxsRange(0, 100)
  339. // Reap the txs.
  340. reapCheck(100)
  341. // Reap again. We should get the same amount
  342. reapCheck(100)
  343. // Deliver 0 to 999, we should reap 900 new txs
  344. // because 100 were already counted.
  345. deliverTxsRange(0, 1000)
  346. // Reap the txs.
  347. reapCheck(1000)
  348. // Reap again. We should get the same amount
  349. reapCheck(1000)
  350. // Commit from the conensus AppConn
  351. commitRange(0, 500)
  352. updateRange(0, 500)
  353. // We should have 500 left.
  354. reapCheck(500)
  355. // Deliver 100 invalid txs and 100 valid txs
  356. deliverTxsRange(900, 1100)
  357. // We should have 600 now.
  358. reapCheck(600)
  359. }
  360. func TestMempool_CheckTxChecksTxSize(t *testing.T) {
  361. app := kvstore.NewApplication()
  362. cc := proxy.NewLocalClientCreator(app)
  363. mempl, cleanup := newMempoolWithApp(cc)
  364. defer cleanup()
  365. maxTxSize := mempl.config.MaxTxBytes
  366. testCases := []struct {
  367. len int
  368. err bool
  369. }{
  370. // check small txs. no error
  371. 0: {10, false},
  372. 1: {1000, false},
  373. 2: {1000000, false},
  374. // check around maxTxSize
  375. 3: {maxTxSize - 1, false},
  376. 4: {maxTxSize, false},
  377. 5: {maxTxSize + 1, true},
  378. }
  379. for i, testCase := range testCases {
  380. caseString := fmt.Sprintf("case %d, len %d", i, testCase.len)
  381. tx := tmrand.Bytes(testCase.len)
  382. err := mempl.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
  383. bv := gogotypes.BytesValue{Value: tx}
  384. bz, err2 := bv.Marshal()
  385. require.NoError(t, err2)
  386. require.Equal(t, len(bz), proto.Size(&bv), caseString)
  387. if !testCase.err {
  388. require.NoError(t, err, caseString)
  389. } else {
  390. require.Equal(t, err, mempool.ErrTxTooLarge{
  391. Max: maxTxSize,
  392. Actual: testCase.len,
  393. }, caseString)
  394. }
  395. }
  396. }
  397. func TestMempoolTxsBytes(t *testing.T) {
  398. app := kvstore.NewApplication()
  399. cc := proxy.NewLocalClientCreator(app)
  400. config := cfg.ResetTestRoot("mempool_test")
  401. config.Mempool.MaxTxsBytes = 10
  402. mp, cleanup := newMempoolWithAppAndConfig(cc, config)
  403. defer cleanup()
  404. // 1. zero by default
  405. assert.EqualValues(t, 0, mp.SizeBytes())
  406. // 2. len(tx) after CheckTx
  407. err := mp.CheckTx(context.Background(), []byte{0x01}, nil, mempool.TxInfo{})
  408. require.NoError(t, err)
  409. assert.EqualValues(t, 1, mp.SizeBytes())
  410. // 3. zero again after tx is removed by Update
  411. err = mp.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
  412. require.NoError(t, err)
  413. assert.EqualValues(t, 0, mp.SizeBytes())
  414. // 4. zero after Flush
  415. err = mp.CheckTx(context.Background(), []byte{0x02, 0x03}, nil, mempool.TxInfo{})
  416. require.NoError(t, err)
  417. assert.EqualValues(t, 2, mp.SizeBytes())
  418. mp.Flush()
  419. assert.EqualValues(t, 0, mp.SizeBytes())
  420. // 5. ErrMempoolIsFull is returned when/if MaxTxsBytes limit is reached.
  421. err = mp.CheckTx(
  422. context.Background(),
  423. []byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04},
  424. nil,
  425. mempool.TxInfo{},
  426. )
  427. require.NoError(t, err)
  428. err = mp.CheckTx(context.Background(), []byte{0x05}, nil, mempool.TxInfo{})
  429. if assert.Error(t, err) {
  430. assert.IsType(t, mempool.ErrMempoolIsFull{}, err)
  431. }
  432. // 6. zero after tx is rechecked and removed due to not being valid anymore
  433. app2 := counter.NewApplication(true)
  434. cc = proxy.NewLocalClientCreator(app2)
  435. mp, cleanup = newMempoolWithApp(cc)
  436. defer cleanup()
  437. txBytes := make([]byte, 8)
  438. binary.BigEndian.PutUint64(txBytes, uint64(0))
  439. err = mp.CheckTx(context.Background(), txBytes, nil, mempool.TxInfo{})
  440. require.NoError(t, err)
  441. assert.EqualValues(t, 8, mp.SizeBytes())
  442. appConnCon, _ := cc.NewABCIClient()
  443. appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
  444. err = appConnCon.Start()
  445. require.Nil(t, err)
  446. t.Cleanup(func() {
  447. if err := appConnCon.Stop(); err != nil {
  448. t.Error(err)
  449. }
  450. })
  451. ctx := context.Background()
  452. res, err := appConnCon.DeliverTxSync(ctx, abci.RequestDeliverTx{Tx: txBytes})
  453. require.NoError(t, err)
  454. require.EqualValues(t, 0, res.Code)
  455. res2, err := appConnCon.CommitSync(ctx)
  456. require.NoError(t, err)
  457. require.NotEmpty(t, res2.Data)
  458. // Pretend like we committed nothing so txBytes gets rechecked and removed.
  459. err = mp.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil)
  460. require.NoError(t, err)
  461. assert.EqualValues(t, 0, mp.SizeBytes())
  462. // 7. Test RemoveTxByKey function
  463. err = mp.CheckTx(context.Background(), []byte{0x06}, nil, mempool.TxInfo{})
  464. require.NoError(t, err)
  465. assert.EqualValues(t, 1, mp.SizeBytes())
  466. mp.RemoveTxByKey(mempool.TxKey([]byte{0x07}), true)
  467. assert.EqualValues(t, 1, mp.SizeBytes())
  468. mp.RemoveTxByKey(mempool.TxKey([]byte{0x06}), true)
  469. assert.EqualValues(t, 0, mp.SizeBytes())
  470. }
  471. // This will non-deterministically catch some concurrency failures like
  472. // https://github.com/tendermint/tendermint/issues/3509
  473. // TODO: all of the tests should probably also run using the remote proxy app
  474. // since otherwise we're not actually testing the concurrency of the mempool here!
  475. func TestMempoolRemoteAppConcurrency(t *testing.T) {
  476. sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
  477. app := kvstore.NewApplication()
  478. cc, server := newRemoteApp(t, sockPath, app)
  479. t.Cleanup(func() {
  480. if err := server.Stop(); err != nil {
  481. t.Error(err)
  482. }
  483. })
  484. config := cfg.ResetTestRoot("mempool_test")
  485. mp, cleanup := newMempoolWithAppAndConfig(cc, config)
  486. defer cleanup()
  487. // generate small number of txs
  488. nTxs := 10
  489. txLen := 200
  490. txs := make([]types.Tx, nTxs)
  491. for i := 0; i < nTxs; i++ {
  492. txs[i] = tmrand.Bytes(txLen)
  493. }
  494. // simulate a group of peers sending them over and over
  495. N := config.Mempool.Size
  496. maxPeers := 5
  497. for i := 0; i < N; i++ {
  498. peerID := mrand.Intn(maxPeers)
  499. txNum := mrand.Intn(nTxs)
  500. tx := txs[txNum]
  501. // this will err with ErrTxInCache many times ...
  502. mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: uint16(peerID)}) //nolint: errcheck // will error
  503. }
  504. err := mp.FlushAppConn()
  505. require.NoError(t, err)
  506. }
  507. // caller must close server
  508. func newRemoteApp(
  509. t *testing.T,
  510. addr string,
  511. app abci.Application,
  512. ) (
  513. clientCreator proxy.ClientCreator,
  514. server service.Service,
  515. ) {
  516. clientCreator = proxy.NewRemoteClientCreator(addr, "socket", true)
  517. // Start server
  518. server = abciserver.NewSocketServer(addr, app)
  519. server.SetLogger(log.TestingLogger().With("module", "abci-server"))
  520. if err := server.Start(); err != nil {
  521. t.Fatalf("Error starting socket server: %v", err.Error())
  522. }
  523. return clientCreator, server
  524. }
  525. func abciResponses(n int, code uint32) []*abci.ResponseDeliverTx {
  526. responses := make([]*abci.ResponseDeliverTx, 0, n)
  527. for i := 0; i < n; i++ {
  528. responses = append(responses, &abci.ResponseDeliverTx{Code: code})
  529. }
  530. return responses
  531. }