diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 395b13f47..a5521561e 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -4,6 +4,12 @@ Special thanks to external contributors on this release: +@p4u from vocdoni.io reported that the mempool might behave incorrectly under a +high load. The consequences can range from pauses between blocks to the peers +disconnecting from this node. As a temporary remedy (until the mempool package +is refactored), the `max-batch-bytes` was disabled. Transactions will be sent +one by one without batching. + Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermint). ### BREAKING CHANGES @@ -29,3 +35,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES - [crypto] \#5707 Fix infinite recursion in string formatting of Secp256k1 keys (@erikgrinaker) +- [mempool] \#5800 Disable `max-batch-bytes` (@melekes) diff --git a/config/config.go b/config/config.go index 882e59fb4..a356cf05c 100644 --- a/config/config.go +++ b/config/config.go @@ -665,6 +665,7 @@ type MempoolConfig struct { MaxTxBytes int `mapstructure:"max_tx_bytes"` // Maximum size of a batch of transactions to send to a peer // Including space needed by encoding (one varint per transaction). + // XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796 MaxBatchBytes int `mapstructure:"max_batch_bytes"` } @@ -676,11 +677,10 @@ func DefaultMempoolConfig() *MempoolConfig { WalPath: "", // Each signature verification takes .5ms, Size reduced until we implement // ABCI Recheck - Size: 5000, - MaxTxsBytes: 1024 * 1024 * 1024, // 1GB - CacheSize: 10000, - MaxTxBytes: 1024 * 1024, // 1MB - MaxBatchBytes: 10 * 1024 * 1024, // 10MB + Size: 5000, + MaxTxsBytes: 1024 * 1024 * 1024, // 1GB + CacheSize: 10000, + MaxTxBytes: 1024 * 1024, // 1MB } } @@ -716,12 +716,6 @@ func (cfg *MempoolConfig) ValidateBasic() error { if cfg.MaxTxBytes < 0 { return errors.New("max_tx_bytes can't be negative") } - if cfg.MaxBatchBytes < 0 { - return errors.New("max_batch_bytes can't be negative") - } - if cfg.MaxBatchBytes <= cfg.MaxTxBytes { - return errors.New("max_batch_bytes can't be less or equal to max_tx_bytes") - } return nil } diff --git a/config/toml.go b/config/toml.go index 62a17db9a..f4bc4640b 100644 --- a/config/toml.go +++ b/config/toml.go @@ -340,6 +340,7 @@ max_tx_bytes = {{ .Mempool.MaxTxBytes }} # Maximum size of a batch of transactions to send to a peer # Including space needed by encoding (one varint per transaction). +# XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796 max_batch_bytes = {{ .Mempool.MaxBatchBytes }} ####################################################### diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index 16a600cc0..17aabcaef 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -289,6 +289,7 @@ max_tx_bytes = 1048576 # Maximum size of a batch of transactions to send to a peer # Including space needed by encoding (one varint per transaction). +# XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796 max_batch_bytes = 10485760 ####################################################### diff --git a/mempool/reactor.go b/mempool/reactor.go index d37aaaba4..0babb4b6c 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -134,12 +134,18 @@ func (memR *Reactor) OnStart() error { // GetChannels implements Reactor by returning the list of channels for this // reactor. func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { - maxMsgSize := memR.config.MaxBatchBytes + largestTx := make([]byte, memR.config.MaxTxBytes) + batchMsg := protomem.Message{ + Sum: &protomem.Message_Txs{ + Txs: &protomem.Txs{Txs: [][]byte{largestTx}}, + }, + } + return []*p2p.ChannelDescriptor{ { ID: MempoolChannel, Priority: 5, - RecvMessageCapacity: maxMsgSize, + RecvMessageCapacity: batchMsg.Size(), }, } } @@ -232,20 +238,19 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { continue } - txs := memR.txs(next, peerID, peerState.GetHeight()) // WARNING: mutates next! + // NOTE: Transaction batching was disabled due to + // https://github.com/tendermint/tendermint/issues/5796 - // send txs - if len(txs) > 0 { + if _, ok := memTx.senders.Load(peerID); !ok { msg := protomem.Message{ Sum: &protomem.Message_Txs{ - Txs: &protomem.Txs{Txs: txs}, + Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}}, }, } bz, err := msg.Marshal() if err != nil { panic(err) } - memR.Logger.Debug("Sending N txs to peer", "N", len(txs), "peer", peer) success := peer.Send(MempoolChannel, bz) if !success { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) @@ -265,37 +270,6 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { } } -// txs iterates over the transaction list and builds a batch of txs. next is -// included. -// WARNING: mutates next! -func (memR *Reactor) txs(next *clist.CElement, peerID uint16, peerHeight int64) [][]byte { - batch := make([][]byte, 0) - - for { - memTx := next.Value.(*mempoolTx) - - if _, ok := memTx.senders.Load(peerID); !ok { - // If current batch + this tx size is greater than max => return. - batchMsg := protomem.Message{ - Sum: &protomem.Message_Txs{ - Txs: &protomem.Txs{Txs: append(batch, memTx.tx)}, - }, - } - if batchMsg.Size() > memR.config.MaxBatchBytes { - return batch - } - - batch = append(batch, memTx.tx) - } - - n := next.Next() - if n == nil { - return batch - } - next = n - } -} - //----------------------------------------------------------------------------- // Messages diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index d9e67d166..bc51bfd9b 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -149,9 +149,8 @@ func TestReactorNoBroadcastToSender(t *testing.T) { ensureNoTxs(t, reactors[peerID], 100*time.Millisecond) } -func TestReactor_MaxBatchBytes(t *testing.T) { +func TestReactor_MaxTxBytes(t *testing.T) { config := cfg.TestConfig() - config.Mempool.MaxBatchBytes = 1024 const N = 2 reactors := makeAndConnectReactors(config, N) @@ -168,9 +167,9 @@ func TestReactor_MaxBatchBytes(t *testing.T) { } } - // Broadcast a tx, which has the max size (minus proto overhead) + // Broadcast a tx, which has the max size // => ensure it's received by the second reactor. - tx1 := tmrand.Bytes(1018) + tx1 := tmrand.Bytes(config.Mempool.MaxTxBytes) err := reactors[0].mempool.CheckTx(tx1, nil, TxInfo{SenderID: UnknownPeerID}) require.NoError(t, err) waitForTxsOnReactors(t, []types.Tx{tx1}, reactors) @@ -180,13 +179,9 @@ func TestReactor_MaxBatchBytes(t *testing.T) { // Broadcast a tx, which is beyond the max size // => ensure it's not sent - tx2 := tmrand.Bytes(1020) + tx2 := tmrand.Bytes(config.Mempool.MaxTxBytes + 1) err = reactors[0].mempool.CheckTx(tx2, nil, TxInfo{SenderID: UnknownPeerID}) - require.NoError(t, err) - ensureNoTxs(t, reactors[1], 100*time.Millisecond) - // => ensure the second reactor did not disconnect from us - out, in, _ := reactors[1].Switch.NumPeers() - assert.Equal(t, 1, out+in) + require.Error(t, err) } func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {