diff --git a/config/config.go b/config/config.go index a9b2576fd..da1f56c34 100644 --- a/config/config.go +++ b/config/config.go @@ -639,6 +639,18 @@ type P2PConfig struct { //nolint: maligned // Toggle to disable guard against peers connecting from the same ip. AllowDuplicateIP bool `mapstructure:"allow-duplicate-ip"` + // Time to wait before flushing messages out on the connection + FlushThrottleTimeout time.Duration `mapstructure:"flush-throttle-timeout"` + + // Maximum size of a message packet payload, in bytes + MaxPacketMsgPayloadSize int `mapstructure:"max-packet-msg-payload-size"` + + // Rate at which packets can be sent, in bytes/second + SendRate int64 `mapstructure:"send-rate"` + + // Rate at which packets can be received, in bytes/second + RecvRate int64 `mapstructure:"recv-rate"` + // Peer connection configuration. HandshakeTimeout time.Duration `mapstructure:"handshake-timeout"` DialTimeout time.Duration `mapstructure:"dial-timeout"` @@ -661,13 +673,40 @@ func DefaultP2PConfig() *P2PConfig { UPNP: false, MaxConnections: 64, MaxIncomingConnectionAttempts: 100, - PexReactor: true, - AllowDuplicateIP: false, - HandshakeTimeout: 20 * time.Second, - DialTimeout: 3 * time.Second, - TestDialFail: false, - QueueType: "priority", + FlushThrottleTimeout: 100 * time.Millisecond, + // The MTU (Maximum Transmission Unit) for Ethernet is 1500 bytes. + // The IP header and the TCP header take up 20 bytes each at least (unless + // optional header fields are used) and thus the max for (non-Jumbo frame) + // Ethernet is 1500 - 20 -20 = 1460 + // Source: https://stackoverflow.com/a/3074427/820520 + MaxPacketMsgPayloadSize: 1400, + SendRate: 5120000, // 5 mB/s + RecvRate: 5120000, // 5 mB/s + PexReactor: true, + AllowDuplicateIP: false, + HandshakeTimeout: 20 * time.Second, + DialTimeout: 3 * time.Second, + TestDialFail: false, + QueueType: "priority", + } +} + +// ValidateBasic performs basic validation (checking param bounds, etc.) and +// returns an error if any check fails. +func (cfg *P2PConfig) ValidateBasic() error { + if cfg.FlushThrottleTimeout < 0 { + return errors.New("flush-throttle-timeout can't be negative") + } + if cfg.MaxPacketMsgPayloadSize < 0 { + return errors.New("max-packet-msg-payload-size can't be negative") + } + if cfg.SendRate < 0 { + return errors.New("send-rate can't be negative") } + if cfg.RecvRate < 0 { + return errors.New("recv-rate can't be negative") + } + return nil } // TestP2PConfig returns a configuration for testing the peer-to-peer layer @@ -675,6 +714,8 @@ func TestP2PConfig() *P2PConfig { cfg := DefaultP2PConfig() cfg.ListenAddress = "tcp://127.0.0.1:36656" cfg.AllowDuplicateIP = true + cfg.FlushThrottleTimeout = 10 * time.Millisecond + return cfg } diff --git a/config/config_test.go b/config/config_test.go index 181314492..08a77d032 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -159,3 +159,21 @@ func TestInstrumentationConfigValidateBasic(t *testing.T) { cfg.MaxOpenConnections = -1 assert.Error(t, cfg.ValidateBasic()) } + +func TestP2PConfigValidateBasic(t *testing.T) { + cfg := TestP2PConfig() + assert.NoError(t, cfg.ValidateBasic()) + + fieldsToTest := []string{ + "FlushThrottleTimeout", + "MaxPacketMsgPayloadSize", + "SendRate", + "RecvRate", + } + + for _, fieldName := range fieldsToTest { + reflect.ValueOf(cfg).Elem().FieldByName(fieldName).SetInt(-1) + assert.Error(t, cfg.ValidateBasic()) + reflect.ValueOf(cfg).Elem().FieldByName(fieldName).SetInt(0) + } +} diff --git a/config/toml.go b/config/toml.go index 3be385060..9c7d012f0 100644 --- a/config/toml.go +++ b/config/toml.go @@ -300,6 +300,23 @@ allow-duplicate-ip = {{ .P2P.AllowDuplicateIP }} handshake-timeout = "{{ .P2P.HandshakeTimeout }}" dial-timeout = "{{ .P2P.DialTimeout }}" +# Time to wait before flushing messages out on the connection +# TODO: Remove once MConnConnection is removed. +flush-throttle-timeout = "{{ .P2P.FlushThrottleTimeout }}" + +# Maximum size of a message packet payload, in bytes +# TODO: Remove once MConnConnection is removed. +max-packet-msg-payload-size = {{ .P2P.MaxPacketMsgPayloadSize }} + +# Rate at which packets can be sent, in bytes/second +# TODO: Remove once MConnConnection is removed. +send-rate = {{ .P2P.SendRate }} + +# Rate at which packets can be received, in bytes/second +# TODO: Remove once MConnConnection is removed. +recv-rate = {{ .P2P.RecvRate }} + + ####################################################### ### Mempool Configuration Option ### ####################################################### diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 6c4694624..402e2f0ed 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -21,8 +21,6 @@ import ( const queueBufferDefault = 32 -const dialRandomizerIntervalMillisecond = 3000 - // Envelope contains a message with sender/receiver routing info. type Envelope struct { From types.NodeID // sender (empty if outbound) @@ -536,8 +534,15 @@ func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error { func (r *Router) dialSleep(ctx context.Context) { if r.options.DialSleep == nil { + const ( + maxDialerInterval = 3000 + minDialerInterval = 250 + ) + // nolint:gosec // G404: Use of weak random number generator - timer := time.NewTimer(time.Duration(rand.Int63n(dialRandomizerIntervalMillisecond)) * time.Millisecond) + dur := time.Duration(rand.Int63n(maxDialerInterval-minDialerInterval+1) + minDialerInterval) + + timer := time.NewTimer(dur * time.Millisecond) defer timer.Stop() select { diff --git a/node/setup.go b/node/setup.go index 40ced410c..2fddceac1 100644 --- a/node/setup.go +++ b/node/setup.go @@ -402,8 +402,14 @@ func createConsensusReactor( } func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport { + conf := conn.DefaultMConnConfig() + conf.FlushThrottle = cfg.P2P.FlushThrottleTimeout + conf.SendRate = cfg.P2P.SendRate + conf.RecvRate = cfg.P2P.RecvRate + conf.MaxPacketMsgPayloadSize = cfg.P2P.MaxPacketMsgPayloadSize + return p2p.NewMConnTransport( - logger, conn.DefaultMConnConfig(), []*p2p.ChannelDescriptor{}, + logger, conf, []*p2p.ChannelDescriptor{}, p2p.MConnTransportOptions{ MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections), }, diff --git a/test/e2e/runner/rpc.go b/test/e2e/runner/rpc.go index ad5fa7a64..f6a32b114 100644 --- a/test/e2e/runner/rpc.go +++ b/test/e2e/runner/rpc.go @@ -70,9 +70,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty clients[node.Name] = client } - wctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - result, err := client.Status(wctx) + result, err := client.Status(ctx) if err != nil { continue }