Browse Source

mempool: disable MaxBatchBytes (#5800)

@p4u from vocdoni.io reported that the mempool might behave incorrectly under a
high load. The consequences can range from pauses between blocks to the peers
disconnecting from this node.

My current theory is that the flowrate lib we're using to control flow
(multiplex over a single TCP connection) was not designed w/ large blobs
(1MB batch of txs) in mind.

I've tried decreasing the Mempool reactor priority, but that did not
have any visible effect. What actually worked is adding a time.Sleep
into mempool.Reactor#broadcastTxRoutine after an each successful send ==
manual control flow of sort.

As a temporary remedy (until the mempool package
is refactored), the max-batch-bytes was disabled. Transactions will be sent
one by one without batching

Closes #5796
pull/5822/head
Anton Kaliaev 4 years ago
committed by GitHub
parent
commit
77deb710fb
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 33 additions and 61 deletions
  1. +7
    -0
      CHANGELOG_PENDING.md
  2. +5
    -11
      config/config.go
  3. +1
    -0
      config/toml.go
  4. +3
    -2
      docs/nodes/configuration.md
  5. +12
    -38
      mempool/reactor.go
  6. +5
    -10
      mempool/reactor_test.go

+ 7
- 0
CHANGELOG_PENDING.md View File

@ -4,6 +4,12 @@
Special thanks to external contributors on this release:
@p4u from vocdoni.io reported that the mempool might behave incorrectly under a
high load. The consequences can range from pauses between blocks to the peers
disconnecting from this node. As a temporary remedy (until the mempool package
is refactored), the `max-batch-bytes` was disabled. Transactions will be sent
one by one without batching.
Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermint).
### BREAKING CHANGES
@ -53,3 +59,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [blockchain/v1] [\#5701](https://github.com/tendermint/tendermint/pull/5701) Handle peers without blocks (@melekes)
- [crypto] \#5707 Fix infinite recursion in string formatting of Secp256k1 keys (@erikgrinaker)
- [blockchain/v1] \#5711 Fix deadlock (@melekes)
- [mempool] \#5800 Disable `max-batch-bytes` (@melekes)

+ 5
- 11
config/config.go View File

@ -645,6 +645,7 @@ type MempoolConfig struct {
MaxTxBytes int `mapstructure:"max-tx-bytes"`
// Maximum size of a batch of transactions to send to a peer
// Including space needed by encoding (one varint per transaction).
// XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
MaxBatchBytes int `mapstructure:"max-batch-bytes"`
}
@ -656,11 +657,10 @@ func DefaultMempoolConfig() *MempoolConfig {
WalPath: "",
// Each signature verification takes .5ms, Size reduced until we implement
// ABCI Recheck
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
MaxBatchBytes: 10 * 1024 * 1024, // 10MB
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
}
}
@ -696,12 +696,6 @@ func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.MaxTxBytes < 0 {
return errors.New("max-tx-bytes can't be negative")
}
if cfg.MaxBatchBytes < 0 {
return errors.New("max-batch-bytes can't be negative")
}
if cfg.MaxBatchBytes <= cfg.MaxTxBytes {
return errors.New("max-batch-bytes can't be less or equal to max-tx-bytes")
}
return nil
}


+ 1
- 0
config/toml.go View File

@ -341,6 +341,7 @@ max-tx-bytes = {{ .Mempool.MaxTxBytes }}
# Maximum size of a batch of transactions to send to a peer
# Including space needed by encoding (one varint per transaction).
# XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
max-batch-bytes = {{ .Mempool.MaxBatchBytes }}
#######################################################


+ 3
- 2
docs/nodes/configuration.md View File

@ -289,7 +289,8 @@ max-tx-bytes = 1048576
# Maximum size of a batch of transactions to send to a peer
# Including space needed by encoding (one varint per transaction).
max-batch-bytes = 10485760
# XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
max-batch-bytes = 0
#######################################################
### State Sync Configuration Options ###
@ -493,4 +494,4 @@ This section will cover settings within the p2p section of the `config.toml`.
- `unconditional-peer-ids` = is similar to `persistent-peers` except that these peers will be connected to even if you are already connected to the maximum number of peers. This can be a validator node ID on your sentry node.
- `pex` = turns the peer exchange reactor on or off. Validator node will want the `pex` turned off so it would not begin gossiping to unknown peers on the network. PeX can also be turned off for statically configured networks with fixed network connectivity. For full nodes on open, dynamic networks, it should be turned on.
- `seed-mode` = is used for when node operators want to run their node as a seed node. Seed node's run a variation of the PeX protocol that disconnects from peers after sending them a list of peers to connect to. To minimize the servers usage, it is recommended to set the mempool's size to 0.
- `private-peer-ids` = is a comma separated list of node ids that you would not like exposed to other peers (ie. you will not tell other peers about the private-peer-ids). This can be filled with a validators node id.
- `private-peer-ids` = is a comma separated list of node ids that you would not like exposed to other peers (ie. you will not tell other peers about the private-peer-ids). This can be filled with a validators node id.

+ 12
- 38
mempool/reactor.go View File

@ -134,12 +134,18 @@ func (memR *Reactor) OnStart() error {
// GetChannels implements Reactor by returning the list of channels for this
// reactor.
func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
maxMsgSize := memR.config.MaxBatchBytes
largestTx := make([]byte, memR.config.MaxTxBytes)
batchMsg := protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{largestTx}},
},
}
return []*p2p.ChannelDescriptor{
{
ID: MempoolChannel,
Priority: 5,
RecvMessageCapacity: maxMsgSize,
RecvMessageCapacity: batchMsg.Size(),
},
}
}
@ -234,20 +240,19 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
continue
}
txs := memR.txs(next, peerID, peerState.GetHeight()) // WARNING: mutates next!
// NOTE: Transaction batching was disabled due to
// https://github.com/tendermint/tendermint/issues/5796
// send txs
if len(txs) > 0 {
if _, ok := memTx.senders.Load(peerID); !ok {
msg := protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: txs},
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
},
}
bz, err := msg.Marshal()
if err != nil {
panic(err)
}
memR.Logger.Debug("Sending N txs to peer", "N", len(txs), "peer", peer)
success := peer.Send(MempoolChannel, bz)
if !success {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
@ -267,37 +272,6 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
}
}
// txs iterates over the transaction list and builds a batch of txs. next is
// included.
// WARNING: mutates next!
func (memR *Reactor) txs(next *clist.CElement, peerID uint16, peerHeight int64) [][]byte {
batch := make([][]byte, 0)
for {
memTx := next.Value.(*mempoolTx)
if _, ok := memTx.senders.Load(peerID); !ok {
// If current batch + this tx size is greater than max => return.
batchMsg := protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: append(batch, memTx.tx)},
},
}
if batchMsg.Size() > memR.config.MaxBatchBytes {
return batch
}
batch = append(batch, memTx.tx)
}
n := next.Next()
if n == nil {
return batch
}
next = n
}
}
//-----------------------------------------------------------------------------
// Messages


+ 5
- 10
mempool/reactor_test.go View File

@ -149,9 +149,8 @@ func TestReactorNoBroadcastToSender(t *testing.T) {
ensureNoTxs(t, reactors[peerID], 100*time.Millisecond)
}
func TestReactor_MaxBatchBytes(t *testing.T) {
func TestReactor_MaxTxBytes(t *testing.T) {
config := cfg.TestConfig()
config.Mempool.MaxBatchBytes = 1024
const N = 2
reactors := makeAndConnectReactors(config, N)
@ -168,9 +167,9 @@ func TestReactor_MaxBatchBytes(t *testing.T) {
}
}
// Broadcast a tx, which has the max size (minus proto overhead)
// Broadcast a tx, which has the max size
// => ensure it's received by the second reactor.
tx1 := tmrand.Bytes(1018)
tx1 := tmrand.Bytes(config.Mempool.MaxTxBytes)
err := reactors[0].mempool.CheckTx(tx1, nil, TxInfo{SenderID: UnknownPeerID})
require.NoError(t, err)
waitForTxsOnReactors(t, []types.Tx{tx1}, reactors)
@ -180,13 +179,9 @@ func TestReactor_MaxBatchBytes(t *testing.T) {
// Broadcast a tx, which is beyond the max size
// => ensure it's not sent
tx2 := tmrand.Bytes(1020)
tx2 := tmrand.Bytes(config.Mempool.MaxTxBytes + 1)
err = reactors[0].mempool.CheckTx(tx2, nil, TxInfo{SenderID: UnknownPeerID})
require.NoError(t, err)
ensureNoTxs(t, reactors[1], 100*time.Millisecond)
// => ensure the second reactor did not disconnect from us
out, in, _ := reactors[1].Switch.NumPeers()
assert.Equal(t, 1, out+in)
require.Error(t, err)
}
func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {


Loading…
Cancel
Save