Browse Source

p2p: use correct transport configuration (#7152)

pull/7158/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
23be048294
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 98 additions and 13 deletions
  1. +47
    -6
      config/config.go
  2. +18
    -0
      config/config_test.go
  3. +17
    -0
      config/toml.go
  4. +8
    -3
      internal/p2p/router.go
  5. +7
    -1
      node/setup.go
  6. +1
    -3
      test/e2e/runner/rpc.go

+ 47
- 6
config/config.go View File

@ -639,6 +639,18 @@ type P2PConfig struct { //nolint: maligned
// Toggle to disable guard against peers connecting from the same ip.
AllowDuplicateIP bool `mapstructure:"allow-duplicate-ip"`
// Time to wait before flushing messages out on the connection
FlushThrottleTimeout time.Duration `mapstructure:"flush-throttle-timeout"`
// Maximum size of a message packet payload, in bytes
MaxPacketMsgPayloadSize int `mapstructure:"max-packet-msg-payload-size"`
// Rate at which packets can be sent, in bytes/second
SendRate int64 `mapstructure:"send-rate"`
// Rate at which packets can be received, in bytes/second
RecvRate int64 `mapstructure:"recv-rate"`
// Peer connection configuration.
HandshakeTimeout time.Duration `mapstructure:"handshake-timeout"`
DialTimeout time.Duration `mapstructure:"dial-timeout"`
@ -661,13 +673,40 @@ func DefaultP2PConfig() *P2PConfig {
UPNP: false,
MaxConnections: 64,
MaxIncomingConnectionAttempts: 100,
PexReactor: true,
AllowDuplicateIP: false,
HandshakeTimeout: 20 * time.Second,
DialTimeout: 3 * time.Second,
TestDialFail: false,
QueueType: "priority",
FlushThrottleTimeout: 100 * time.Millisecond,
// The MTU (Maximum Transmission Unit) for Ethernet is 1500 bytes.
// The IP header and the TCP header take up 20 bytes each at least (unless
// optional header fields are used) and thus the max for (non-Jumbo frame)
// Ethernet is 1500 - 20 -20 = 1460
// Source: https://stackoverflow.com/a/3074427/820520
MaxPacketMsgPayloadSize: 1400,
SendRate: 5120000, // 5 mB/s
RecvRate: 5120000, // 5 mB/s
PexReactor: true,
AllowDuplicateIP: false,
HandshakeTimeout: 20 * time.Second,
DialTimeout: 3 * time.Second,
TestDialFail: false,
QueueType: "priority",
}
}
// ValidateBasic performs basic validation (checking param bounds, etc.) and
// returns an error if any check fails.
func (cfg *P2PConfig) ValidateBasic() error {
if cfg.FlushThrottleTimeout < 0 {
return errors.New("flush-throttle-timeout can't be negative")
}
if cfg.MaxPacketMsgPayloadSize < 0 {
return errors.New("max-packet-msg-payload-size can't be negative")
}
if cfg.SendRate < 0 {
return errors.New("send-rate can't be negative")
}
if cfg.RecvRate < 0 {
return errors.New("recv-rate can't be negative")
}
return nil
}
// TestP2PConfig returns a configuration for testing the peer-to-peer layer
@ -675,6 +714,8 @@ func TestP2PConfig() *P2PConfig {
cfg := DefaultP2PConfig()
cfg.ListenAddress = "tcp://127.0.0.1:36656"
cfg.AllowDuplicateIP = true
cfg.FlushThrottleTimeout = 10 * time.Millisecond
return cfg
}


+ 18
- 0
config/config_test.go View File

@ -159,3 +159,21 @@ func TestInstrumentationConfigValidateBasic(t *testing.T) {
cfg.MaxOpenConnections = -1
assert.Error(t, cfg.ValidateBasic())
}
func TestP2PConfigValidateBasic(t *testing.T) {
cfg := TestP2PConfig()
assert.NoError(t, cfg.ValidateBasic())
fieldsToTest := []string{
"FlushThrottleTimeout",
"MaxPacketMsgPayloadSize",
"SendRate",
"RecvRate",
}
for _, fieldName := range fieldsToTest {
reflect.ValueOf(cfg).Elem().FieldByName(fieldName).SetInt(-1)
assert.Error(t, cfg.ValidateBasic())
reflect.ValueOf(cfg).Elem().FieldByName(fieldName).SetInt(0)
}
}

+ 17
- 0
config/toml.go View File

@ -300,6 +300,23 @@ allow-duplicate-ip = {{ .P2P.AllowDuplicateIP }}
handshake-timeout = "{{ .P2P.HandshakeTimeout }}"
dial-timeout = "{{ .P2P.DialTimeout }}"
# Time to wait before flushing messages out on the connection
# TODO: Remove once MConnConnection is removed.
flush-throttle-timeout = "{{ .P2P.FlushThrottleTimeout }}"
# Maximum size of a message packet payload, in bytes
# TODO: Remove once MConnConnection is removed.
max-packet-msg-payload-size = {{ .P2P.MaxPacketMsgPayloadSize }}
# Rate at which packets can be sent, in bytes/second
# TODO: Remove once MConnConnection is removed.
send-rate = {{ .P2P.SendRate }}
# Rate at which packets can be received, in bytes/second
# TODO: Remove once MConnConnection is removed.
recv-rate = {{ .P2P.RecvRate }}
#######################################################
### Mempool Configuration Option ###
#######################################################


+ 8
- 3
internal/p2p/router.go View File

@ -21,8 +21,6 @@ import (
const queueBufferDefault = 32
const dialRandomizerIntervalMillisecond = 3000
// Envelope contains a message with sender/receiver routing info.
type Envelope struct {
From types.NodeID // sender (empty if outbound)
@ -536,8 +534,15 @@ func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error {
func (r *Router) dialSleep(ctx context.Context) {
if r.options.DialSleep == nil {
const (
maxDialerInterval = 3000
minDialerInterval = 250
)
// nolint:gosec // G404: Use of weak random number generator
timer := time.NewTimer(time.Duration(rand.Int63n(dialRandomizerIntervalMillisecond)) * time.Millisecond)
dur := time.Duration(rand.Int63n(maxDialerInterval-minDialerInterval+1) + minDialerInterval)
timer := time.NewTimer(dur * time.Millisecond)
defer timer.Stop()
select {


+ 7
- 1
node/setup.go View File

@ -402,8 +402,14 @@ func createConsensusReactor(
}
func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport {
conf := conn.DefaultMConnConfig()
conf.FlushThrottle = cfg.P2P.FlushThrottleTimeout
conf.SendRate = cfg.P2P.SendRate
conf.RecvRate = cfg.P2P.RecvRate
conf.MaxPacketMsgPayloadSize = cfg.P2P.MaxPacketMsgPayloadSize
return p2p.NewMConnTransport(
logger, conn.DefaultMConnConfig(), []*p2p.ChannelDescriptor{},
logger, conf, []*p2p.ChannelDescriptor{},
p2p.MConnTransportOptions{
MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections),
},


+ 1
- 3
test/e2e/runner/rpc.go View File

@ -70,9 +70,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty
clients[node.Name] = client
}
wctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
result, err := client.Status(wctx)
result, err := client.Status(ctx)
if err != nil {
continue
}


Loading…
Cancel
Save