From dc101f2eff5640e90c62ad5eb3fdf013ac107db6 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 21 Dec 2020 19:17:45 +0400 Subject: [PATCH] mempool: disable MaxBatchBytes (#5800) @p4u from vocdoni.io reported that the mempool might behave incorrectly under a high load. The consequences can range from pauses between blocks to the peers disconnecting from this node. My current theory is that the flowrate lib we're using to control flow (multiplex over a single TCP connection) was not designed w/ large blobs (1MB batch of txs) in mind. I've tried decreasing the Mempool reactor priority, but that did not have any visible effect. What actually worked is adding a time.Sleep into mempool.Reactor#broadcastTxRoutine after an each successful send == manual control flow of sort. As a temporary remedy (until the mempool package is refactored), the max-batch-bytes was disabled. Transactions will be sent one by one without batching Closes #5796 --- CHANGELOG_PENDING.md | 7 ++++ config/config.go | 16 +++------ config/toml.go | 1 + docs/tendermint-core/configuration.md | 1 + mempool/reactor.go | 50 +++++++-------------------- mempool/reactor_test.go | 15 +++----- 6 files changed, 31 insertions(+), 59 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 395b13f47..a5521561e 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -4,6 +4,12 @@ Special thanks to external contributors on this release: +@p4u from vocdoni.io reported that the mempool might behave incorrectly under a +high load. The consequences can range from pauses between blocks to the peers +disconnecting from this node. As a temporary remedy (until the mempool package +is refactored), the `max-batch-bytes` was disabled. Transactions will be sent +one by one without batching. + Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermint). ### BREAKING CHANGES @@ -29,3 +35,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES - [crypto] \#5707 Fix infinite recursion in string formatting of Secp256k1 keys (@erikgrinaker) +- [mempool] \#5800 Disable `max-batch-bytes` (@melekes) diff --git a/config/config.go b/config/config.go index 882e59fb4..a356cf05c 100644 --- a/config/config.go +++ b/config/config.go @@ -665,6 +665,7 @@ type MempoolConfig struct { MaxTxBytes int `mapstructure:"max_tx_bytes"` // Maximum size of a batch of transactions to send to a peer // Including space needed by encoding (one varint per transaction). + // XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796 MaxBatchBytes int `mapstructure:"max_batch_bytes"` } @@ -676,11 +677,10 @@ func DefaultMempoolConfig() *MempoolConfig { WalPath: "", // Each signature verification takes .5ms, Size reduced until we implement // ABCI Recheck - Size: 5000, - MaxTxsBytes: 1024 * 1024 * 1024, // 1GB - CacheSize: 10000, - MaxTxBytes: 1024 * 1024, // 1MB - MaxBatchBytes: 10 * 1024 * 1024, // 10MB + Size: 5000, + MaxTxsBytes: 1024 * 1024 * 1024, // 1GB + CacheSize: 10000, + MaxTxBytes: 1024 * 1024, // 1MB } } @@ -716,12 +716,6 @@ func (cfg *MempoolConfig) ValidateBasic() error { if cfg.MaxTxBytes < 0 { return errors.New("max_tx_bytes can't be negative") } - if cfg.MaxBatchBytes < 0 { - return errors.New("max_batch_bytes can't be negative") - } - if cfg.MaxBatchBytes <= cfg.MaxTxBytes { - return errors.New("max_batch_bytes can't be less or equal to max_tx_bytes") - } return nil } diff --git a/config/toml.go b/config/toml.go index 62a17db9a..f4bc4640b 100644 --- a/config/toml.go +++ b/config/toml.go @@ -340,6 +340,7 @@ max_tx_bytes = {{ .Mempool.MaxTxBytes }} # Maximum size of a batch of transactions to send to a peer # Including space needed by encoding (one varint per transaction). +# XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796 max_batch_bytes = {{ .Mempool.MaxBatchBytes }} ####################################################### diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index 16a600cc0..17aabcaef 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -289,6 +289,7 @@ max_tx_bytes = 1048576 # Maximum size of a batch of transactions to send to a peer # Including space needed by encoding (one varint per transaction). +# XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796 max_batch_bytes = 10485760 ####################################################### diff --git a/mempool/reactor.go b/mempool/reactor.go index d37aaaba4..0babb4b6c 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -134,12 +134,18 @@ func (memR *Reactor) OnStart() error { // GetChannels implements Reactor by returning the list of channels for this // reactor. func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { - maxMsgSize := memR.config.MaxBatchBytes + largestTx := make([]byte, memR.config.MaxTxBytes) + batchMsg := protomem.Message{ + Sum: &protomem.Message_Txs{ + Txs: &protomem.Txs{Txs: [][]byte{largestTx}}, + }, + } + return []*p2p.ChannelDescriptor{ { ID: MempoolChannel, Priority: 5, - RecvMessageCapacity: maxMsgSize, + RecvMessageCapacity: batchMsg.Size(), }, } } @@ -232,20 +238,19 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { continue } - txs := memR.txs(next, peerID, peerState.GetHeight()) // WARNING: mutates next! + // NOTE: Transaction batching was disabled due to + // https://github.com/tendermint/tendermint/issues/5796 - // send txs - if len(txs) > 0 { + if _, ok := memTx.senders.Load(peerID); !ok { msg := protomem.Message{ Sum: &protomem.Message_Txs{ - Txs: &protomem.Txs{Txs: txs}, + Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}}, }, } bz, err := msg.Marshal() if err != nil { panic(err) } - memR.Logger.Debug("Sending N txs to peer", "N", len(txs), "peer", peer) success := peer.Send(MempoolChannel, bz) if !success { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) @@ -265,37 +270,6 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { } } -// txs iterates over the transaction list and builds a batch of txs. next is -// included. -// WARNING: mutates next! -func (memR *Reactor) txs(next *clist.CElement, peerID uint16, peerHeight int64) [][]byte { - batch := make([][]byte, 0) - - for { - memTx := next.Value.(*mempoolTx) - - if _, ok := memTx.senders.Load(peerID); !ok { - // If current batch + this tx size is greater than max => return. - batchMsg := protomem.Message{ - Sum: &protomem.Message_Txs{ - Txs: &protomem.Txs{Txs: append(batch, memTx.tx)}, - }, - } - if batchMsg.Size() > memR.config.MaxBatchBytes { - return batch - } - - batch = append(batch, memTx.tx) - } - - n := next.Next() - if n == nil { - return batch - } - next = n - } -} - //----------------------------------------------------------------------------- // Messages diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index d9e67d166..bc51bfd9b 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -149,9 +149,8 @@ func TestReactorNoBroadcastToSender(t *testing.T) { ensureNoTxs(t, reactors[peerID], 100*time.Millisecond) } -func TestReactor_MaxBatchBytes(t *testing.T) { +func TestReactor_MaxTxBytes(t *testing.T) { config := cfg.TestConfig() - config.Mempool.MaxBatchBytes = 1024 const N = 2 reactors := makeAndConnectReactors(config, N) @@ -168,9 +167,9 @@ func TestReactor_MaxBatchBytes(t *testing.T) { } } - // Broadcast a tx, which has the max size (minus proto overhead) + // Broadcast a tx, which has the max size // => ensure it's received by the second reactor. - tx1 := tmrand.Bytes(1018) + tx1 := tmrand.Bytes(config.Mempool.MaxTxBytes) err := reactors[0].mempool.CheckTx(tx1, nil, TxInfo{SenderID: UnknownPeerID}) require.NoError(t, err) waitForTxsOnReactors(t, []types.Tx{tx1}, reactors) @@ -180,13 +179,9 @@ func TestReactor_MaxBatchBytes(t *testing.T) { // Broadcast a tx, which is beyond the max size // => ensure it's not sent - tx2 := tmrand.Bytes(1020) + tx2 := tmrand.Bytes(config.Mempool.MaxTxBytes + 1) err = reactors[0].mempool.CheckTx(tx2, nil, TxInfo{SenderID: UnknownPeerID}) - require.NoError(t, err) - ensureNoTxs(t, reactors[1], 100*time.Millisecond) - // => ensure the second reactor did not disconnect from us - out, in, _ := reactors[1].Switch.NumPeers() - assert.Equal(t, 1, out+in) + require.Error(t, err) } func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {