You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

523 lines
16 KiB

mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
8 years ago
mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Update mempool/mempool.go Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Use unknown peer ID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon <ValarDragon@users.noreply.github.com> * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() <autogenerated>:1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime
6 years ago
8 years ago
  1. package mempool
  2. import (
  3. "crypto/rand"
  4. "crypto/sha256"
  5. "encoding/binary"
  6. "fmt"
  7. "io/ioutil"
  8. "os"
  9. "path/filepath"
  10. "testing"
  11. "time"
  12. "github.com/stretchr/testify/assert"
  13. "github.com/stretchr/testify/require"
  14. amino "github.com/tendermint/go-amino"
  15. "github.com/tendermint/tendermint/abci/example/counter"
  16. "github.com/tendermint/tendermint/abci/example/kvstore"
  17. abci "github.com/tendermint/tendermint/abci/types"
  18. cfg "github.com/tendermint/tendermint/config"
  19. cmn "github.com/tendermint/tendermint/libs/common"
  20. "github.com/tendermint/tendermint/libs/log"
  21. "github.com/tendermint/tendermint/proxy"
  22. "github.com/tendermint/tendermint/types"
  23. )
  24. // A cleanupFunc cleans up any config / test files created for a particular
  25. // test.
  26. type cleanupFunc func()
  27. func newMempoolWithApp(cc proxy.ClientCreator) (*Mempool, cleanupFunc) {
  28. return newMempoolWithAppAndConfig(cc, cfg.ResetTestRoot("mempool_test"))
  29. }
  30. func newMempoolWithAppAndConfig(cc proxy.ClientCreator, config *cfg.Config) (*Mempool, cleanupFunc) {
  31. appConnMem, _ := cc.NewABCIClient()
  32. appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
  33. err := appConnMem.Start()
  34. if err != nil {
  35. panic(err)
  36. }
  37. mempool := NewMempool(config.Mempool, appConnMem, 0)
  38. mempool.SetLogger(log.TestingLogger())
  39. return mempool, func() { os.RemoveAll(config.RootDir) }
  40. }
  41. func ensureNoFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
  42. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  43. select {
  44. case <-ch:
  45. t.Fatal("Expected not to fire")
  46. case <-timer.C:
  47. }
  48. }
  49. func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
  50. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  51. select {
  52. case <-ch:
  53. case <-timer.C:
  54. t.Fatal("Expected to fire")
  55. }
  56. }
  57. func checkTxs(t *testing.T, mempool *Mempool, count int, peerID uint16) types.Txs {
  58. txs := make(types.Txs, count)
  59. txInfo := TxInfo{PeerID: peerID}
  60. for i := 0; i < count; i++ {
  61. txBytes := make([]byte, 20)
  62. txs[i] = txBytes
  63. _, err := rand.Read(txBytes)
  64. if err != nil {
  65. t.Error(err)
  66. }
  67. if err := mempool.CheckTxWithInfo(txBytes, nil, txInfo); err != nil {
  68. // Skip invalid txs.
  69. // TestMempoolFilters will fail otherwise. It asserts a number of txs
  70. // returned.
  71. if IsPreCheckError(err) {
  72. continue
  73. }
  74. t.Fatalf("CheckTx failed: %v while checking #%d tx", err, i)
  75. }
  76. }
  77. return txs
  78. }
  79. func TestReapMaxBytesMaxGas(t *testing.T) {
  80. app := kvstore.NewKVStoreApplication()
  81. cc := proxy.NewLocalClientCreator(app)
  82. mempool, cleanup := newMempoolWithApp(cc)
  83. defer cleanup()
  84. // Ensure gas calculation behaves as expected
  85. checkTxs(t, mempool, 1, UnknownPeerID)
  86. tx0 := mempool.TxsFront().Value.(*mempoolTx)
  87. // assert that kv store has gas wanted = 1.
  88. require.Equal(t, app.CheckTx(tx0.tx).GasWanted, int64(1), "KVStore had a gas value neq to 1")
  89. require.Equal(t, tx0.gasWanted, int64(1), "transactions gas was set incorrectly")
  90. // ensure each tx is 20 bytes long
  91. require.Equal(t, len(tx0.tx), 20, "Tx is longer than 20 bytes")
  92. mempool.Flush()
  93. // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
  94. // each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
  95. tests := []struct {
  96. numTxsToCreate int
  97. maxBytes int64
  98. maxGas int64
  99. expectedNumTxs int
  100. }{
  101. {20, -1, -1, 20},
  102. {20, -1, 0, 0},
  103. {20, -1, 10, 10},
  104. {20, -1, 30, 20},
  105. {20, 0, -1, 0},
  106. {20, 0, 10, 0},
  107. {20, 10, 10, 0},
  108. {20, 22, 10, 1},
  109. {20, 220, -1, 10},
  110. {20, 220, 5, 5},
  111. {20, 220, 10, 10},
  112. {20, 220, 15, 10},
  113. {20, 20000, -1, 20},
  114. {20, 20000, 5, 5},
  115. {20, 20000, 30, 20},
  116. }
  117. for tcIndex, tt := range tests {
  118. checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID)
  119. got := mempool.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas)
  120. assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d",
  121. len(got), tt.expectedNumTxs, tcIndex)
  122. mempool.Flush()
  123. }
  124. }
  125. func TestMempoolFilters(t *testing.T) {
  126. app := kvstore.NewKVStoreApplication()
  127. cc := proxy.NewLocalClientCreator(app)
  128. mempool, cleanup := newMempoolWithApp(cc)
  129. defer cleanup()
  130. emptyTxArr := []types.Tx{[]byte{}}
  131. nopPreFilter := func(tx types.Tx) error { return nil }
  132. nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil }
  133. // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
  134. // each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
  135. tests := []struct {
  136. numTxsToCreate int
  137. preFilter PreCheckFunc
  138. postFilter PostCheckFunc
  139. expectedNumTxs int
  140. }{
  141. {10, nopPreFilter, nopPostFilter, 10},
  142. {10, PreCheckAminoMaxBytes(10), nopPostFilter, 0},
  143. {10, PreCheckAminoMaxBytes(20), nopPostFilter, 0},
  144. {10, PreCheckAminoMaxBytes(22), nopPostFilter, 10},
  145. {10, nopPreFilter, PostCheckMaxGas(-1), 10},
  146. {10, nopPreFilter, PostCheckMaxGas(0), 0},
  147. {10, nopPreFilter, PostCheckMaxGas(1), 10},
  148. {10, nopPreFilter, PostCheckMaxGas(3000), 10},
  149. {10, PreCheckAminoMaxBytes(10), PostCheckMaxGas(20), 0},
  150. {10, PreCheckAminoMaxBytes(30), PostCheckMaxGas(20), 10},
  151. {10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(1), 10},
  152. {10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(0), 0},
  153. }
  154. for tcIndex, tt := range tests {
  155. mempool.Update(1, emptyTxArr, tt.preFilter, tt.postFilter)
  156. checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID)
  157. require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
  158. mempool.Flush()
  159. }
  160. }
  161. func TestMempoolUpdateAddsTxsToCache(t *testing.T) {
  162. app := kvstore.NewKVStoreApplication()
  163. cc := proxy.NewLocalClientCreator(app)
  164. mempool, cleanup := newMempoolWithApp(cc)
  165. defer cleanup()
  166. mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil)
  167. err := mempool.CheckTx([]byte{0x01}, nil)
  168. if assert.Error(t, err) {
  169. assert.Equal(t, ErrTxInCache, err)
  170. }
  171. }
  172. func TestTxsAvailable(t *testing.T) {
  173. app := kvstore.NewKVStoreApplication()
  174. cc := proxy.NewLocalClientCreator(app)
  175. mempool, cleanup := newMempoolWithApp(cc)
  176. defer cleanup()
  177. mempool.EnableTxsAvailable()
  178. timeoutMS := 500
  179. // with no txs, it shouldnt fire
  180. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  181. // send a bunch of txs, it should only fire once
  182. txs := checkTxs(t, mempool, 100, UnknownPeerID)
  183. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  184. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  185. // call update with half the txs.
  186. // it should fire once now for the new height
  187. // since there are still txs left
  188. committedTxs, txs := txs[:50], txs[50:]
  189. if err := mempool.Update(1, committedTxs, nil, nil); err != nil {
  190. t.Error(err)
  191. }
  192. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  193. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  194. // send a bunch more txs. we already fired for this height so it shouldnt fire again
  195. moreTxs := checkTxs(t, mempool, 50, UnknownPeerID)
  196. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  197. // now call update with all the txs. it should not fire as there are no txs left
  198. committedTxs = append(txs, moreTxs...)
  199. if err := mempool.Update(2, committedTxs, nil, nil); err != nil {
  200. t.Error(err)
  201. }
  202. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  203. // send a bunch more txs, it should only fire once
  204. checkTxs(t, mempool, 100, UnknownPeerID)
  205. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  206. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  207. }
  208. func TestSerialReap(t *testing.T) {
  209. app := counter.NewCounterApplication(true)
  210. app.SetOption(abci.RequestSetOption{Key: "serial", Value: "on"})
  211. cc := proxy.NewLocalClientCreator(app)
  212. mempool, cleanup := newMempoolWithApp(cc)
  213. defer cleanup()
  214. appConnCon, _ := cc.NewABCIClient()
  215. appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
  216. err := appConnCon.Start()
  217. require.Nil(t, err)
  218. cacheMap := make(map[string]struct{})
  219. deliverTxsRange := func(start, end int) {
  220. // Deliver some txs.
  221. for i := start; i < end; i++ {
  222. // This will succeed
  223. txBytes := make([]byte, 8)
  224. binary.BigEndian.PutUint64(txBytes, uint64(i))
  225. err := mempool.CheckTx(txBytes, nil)
  226. _, cached := cacheMap[string(txBytes)]
  227. if cached {
  228. require.NotNil(t, err, "expected error for cached tx")
  229. } else {
  230. require.Nil(t, err, "expected no err for uncached tx")
  231. }
  232. cacheMap[string(txBytes)] = struct{}{}
  233. // Duplicates are cached and should return error
  234. err = mempool.CheckTx(txBytes, nil)
  235. require.NotNil(t, err, "Expected error after CheckTx on duplicated tx")
  236. }
  237. }
  238. reapCheck := func(exp int) {
  239. txs := mempool.ReapMaxBytesMaxGas(-1, -1)
  240. require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs)))
  241. }
  242. updateRange := func(start, end int) {
  243. txs := make([]types.Tx, 0)
  244. for i := start; i < end; i++ {
  245. txBytes := make([]byte, 8)
  246. binary.BigEndian.PutUint64(txBytes, uint64(i))
  247. txs = append(txs, txBytes)
  248. }
  249. if err := mempool.Update(0, txs, nil, nil); err != nil {
  250. t.Error(err)
  251. }
  252. }
  253. commitRange := func(start, end int) {
  254. // Deliver some txs.
  255. for i := start; i < end; i++ {
  256. txBytes := make([]byte, 8)
  257. binary.BigEndian.PutUint64(txBytes, uint64(i))
  258. res, err := appConnCon.DeliverTxSync(txBytes)
  259. if err != nil {
  260. t.Errorf("Client error committing tx: %v", err)
  261. }
  262. if res.IsErr() {
  263. t.Errorf("Error committing tx. Code:%v result:%X log:%v",
  264. res.Code, res.Data, res.Log)
  265. }
  266. }
  267. res, err := appConnCon.CommitSync()
  268. if err != nil {
  269. t.Errorf("Client error committing: %v", err)
  270. }
  271. if len(res.Data) != 8 {
  272. t.Errorf("Error committing. Hash:%X", res.Data)
  273. }
  274. }
  275. //----------------------------------------
  276. // Deliver some txs.
  277. deliverTxsRange(0, 100)
  278. // Reap the txs.
  279. reapCheck(100)
  280. // Reap again. We should get the same amount
  281. reapCheck(100)
  282. // Deliver 0 to 999, we should reap 900 new txs
  283. // because 100 were already counted.
  284. deliverTxsRange(0, 1000)
  285. // Reap the txs.
  286. reapCheck(1000)
  287. // Reap again. We should get the same amount
  288. reapCheck(1000)
  289. // Commit from the conensus AppConn
  290. commitRange(0, 500)
  291. updateRange(0, 500)
  292. // We should have 500 left.
  293. reapCheck(500)
  294. // Deliver 100 invalid txs and 100 valid txs
  295. deliverTxsRange(900, 1100)
  296. // We should have 600 now.
  297. reapCheck(600)
  298. }
  299. func TestMempoolCloseWAL(t *testing.T) {
  300. // 1. Create the temporary directory for mempool and WAL testing.
  301. rootDir, err := ioutil.TempDir("", "mempool-test")
  302. require.Nil(t, err, "expecting successful tmpdir creation")
  303. defer os.RemoveAll(rootDir)
  304. // 2. Ensure that it doesn't contain any elements -- Sanity check
  305. m1, err := filepath.Glob(filepath.Join(rootDir, "*"))
  306. require.Nil(t, err, "successful globbing expected")
  307. require.Equal(t, 0, len(m1), "no matches yet")
  308. // 3. Create the mempool
  309. wcfg := cfg.DefaultMempoolConfig()
  310. wcfg.RootDir = rootDir
  311. defer os.RemoveAll(wcfg.RootDir)
  312. app := kvstore.NewKVStoreApplication()
  313. cc := proxy.NewLocalClientCreator(app)
  314. appConnMem, _ := cc.NewABCIClient()
  315. mempool := NewMempool(wcfg, appConnMem, 10)
  316. mempool.InitWAL()
  317. // 4. Ensure that the directory contains the WAL file
  318. m2, err := filepath.Glob(filepath.Join(rootDir, "*"))
  319. require.Nil(t, err, "successful globbing expected")
  320. require.Equal(t, 1, len(m2), "expecting the wal match in")
  321. // 5. Write some contents to the WAL
  322. mempool.CheckTx(types.Tx([]byte("foo")), nil)
  323. walFilepath := mempool.wal.Path
  324. sum1 := checksumFile(walFilepath, t)
  325. // 6. Sanity check to ensure that the written TX matches the expectation.
  326. require.Equal(t, sum1, checksumIt([]byte("foo\n")), "foo with a newline should be written")
  327. // 7. Invoke CloseWAL() and ensure it discards the
  328. // WAL thus any other write won't go through.
  329. mempool.CloseWAL()
  330. mempool.CheckTx(types.Tx([]byte("bar")), nil)
  331. sum2 := checksumFile(walFilepath, t)
  332. require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded")
  333. // 8. Sanity check to ensure that the WAL file still exists
  334. m3, err := filepath.Glob(filepath.Join(rootDir, "*"))
  335. require.Nil(t, err, "successful globbing expected")
  336. require.Equal(t, 1, len(m3), "expecting the wal match in")
  337. }
  338. // Size of the amino encoded TxMessage is the length of the
  339. // encoded byte array, plus 1 for the struct field, plus 4
  340. // for the amino prefix.
  341. func txMessageSize(tx types.Tx) int {
  342. return amino.ByteSliceSize(tx) + 1 + 4
  343. }
  344. func TestMempoolMaxMsgSize(t *testing.T) {
  345. app := kvstore.NewKVStoreApplication()
  346. cc := proxy.NewLocalClientCreator(app)
  347. mempl, cleanup := newMempoolWithApp(cc)
  348. defer cleanup()
  349. testCases := []struct {
  350. len int
  351. err bool
  352. }{
  353. // check small txs. no error
  354. {10, false},
  355. {1000, false},
  356. {1000000, false},
  357. // check around maxTxSize
  358. // changes from no error to error
  359. {maxTxSize - 2, false},
  360. {maxTxSize - 1, false},
  361. {maxTxSize, false},
  362. {maxTxSize + 1, true},
  363. {maxTxSize + 2, true},
  364. // check around maxMsgSize. all error
  365. {maxMsgSize - 1, true},
  366. {maxMsgSize, true},
  367. {maxMsgSize + 1, true},
  368. }
  369. for i, testCase := range testCases {
  370. caseString := fmt.Sprintf("case %d, len %d", i, testCase.len)
  371. tx := cmn.RandBytes(testCase.len)
  372. err := mempl.CheckTx(tx, nil)
  373. msg := &TxMessage{tx}
  374. encoded := cdc.MustMarshalBinaryBare(msg)
  375. require.Equal(t, len(encoded), txMessageSize(tx), caseString)
  376. if !testCase.err {
  377. require.True(t, len(encoded) <= maxMsgSize, caseString)
  378. require.NoError(t, err, caseString)
  379. } else {
  380. require.True(t, len(encoded) > maxMsgSize, caseString)
  381. require.Equal(t, err, ErrTxTooLarge, caseString)
  382. }
  383. }
  384. }
  385. func TestMempoolTxsBytes(t *testing.T) {
  386. app := kvstore.NewKVStoreApplication()
  387. cc := proxy.NewLocalClientCreator(app)
  388. config := cfg.ResetTestRoot("mempool_test")
  389. config.Mempool.MaxTxsBytes = 10
  390. mempool, cleanup := newMempoolWithAppAndConfig(cc, config)
  391. defer cleanup()
  392. // 1. zero by default
  393. assert.EqualValues(t, 0, mempool.TxsBytes())
  394. // 2. len(tx) after CheckTx
  395. err := mempool.CheckTx([]byte{0x01}, nil)
  396. require.NoError(t, err)
  397. assert.EqualValues(t, 1, mempool.TxsBytes())
  398. // 3. zero again after tx is removed by Update
  399. mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil)
  400. assert.EqualValues(t, 0, mempool.TxsBytes())
  401. // 4. zero after Flush
  402. err = mempool.CheckTx([]byte{0x02, 0x03}, nil)
  403. require.NoError(t, err)
  404. assert.EqualValues(t, 2, mempool.TxsBytes())
  405. mempool.Flush()
  406. assert.EqualValues(t, 0, mempool.TxsBytes())
  407. // 5. ErrMempoolIsFull is returned when/if MaxTxsBytes limit is reached.
  408. err = mempool.CheckTx([]byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04}, nil)
  409. require.NoError(t, err)
  410. err = mempool.CheckTx([]byte{0x05}, nil)
  411. if assert.Error(t, err) {
  412. assert.IsType(t, ErrMempoolIsFull{}, err)
  413. }
  414. // 6. zero after tx is rechecked and removed due to not being valid anymore
  415. app2 := counter.NewCounterApplication(true)
  416. cc = proxy.NewLocalClientCreator(app2)
  417. mempool, cleanup = newMempoolWithApp(cc)
  418. defer cleanup()
  419. txBytes := make([]byte, 8)
  420. binary.BigEndian.PutUint64(txBytes, uint64(0))
  421. err = mempool.CheckTx(txBytes, nil)
  422. require.NoError(t, err)
  423. assert.EqualValues(t, 8, mempool.TxsBytes())
  424. appConnCon, _ := cc.NewABCIClient()
  425. appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
  426. err = appConnCon.Start()
  427. require.Nil(t, err)
  428. defer appConnCon.Stop()
  429. res, err := appConnCon.DeliverTxSync(txBytes)
  430. require.NoError(t, err)
  431. require.EqualValues(t, 0, res.Code)
  432. res2, err := appConnCon.CommitSync()
  433. require.NoError(t, err)
  434. require.NotEmpty(t, res2.Data)
  435. // Pretend like we committed nothing so txBytes gets rechecked and removed.
  436. mempool.Update(1, []types.Tx{}, nil, nil)
  437. assert.EqualValues(t, 0, mempool.TxsBytes())
  438. }
  439. func checksumIt(data []byte) string {
  440. h := sha256.New()
  441. h.Write(data)
  442. return fmt.Sprintf("%x", h.Sum(nil))
  443. }
  444. func checksumFile(p string, t *testing.T) string {
  445. data, err := ioutil.ReadFile(p)
  446. require.Nil(t, err, "expecting successful read of %q", p)
  447. return checksumIt(data)
  448. }