diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index e2e8b25d1..30b253fbe 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -86,6 +86,8 @@ program](https://hackerone.com/tendermint). ### FEATURES: +- [p2p] \#4053 Add `unconditional_peer_ids` and `persistent_peers_max_dial_period` config variables (see ADR-050) (@dongsam) + ### IMPROVEMENTS: - [rpc] \#3188 Added `block_size` to `BlockMeta` this is reflected in `/blockchain` diff --git a/cmd/tendermint/commands/run_node.go b/cmd/tendermint/commands/run_node.go index 278c60197..680ecf11a 100644 --- a/cmd/tendermint/commands/run_node.go +++ b/cmd/tendermint/commands/run_node.go @@ -49,6 +49,8 @@ func AddNodeFlags(cmd *cobra.Command) { "Node listen address. (0.0.0.0:0 means any interface, any port)") cmd.Flags().String("p2p.seeds", config.P2P.Seeds, "Comma-delimited ID@host:port seed nodes") cmd.Flags().String("p2p.persistent_peers", config.P2P.PersistentPeers, "Comma-delimited ID@host:port persistent peers") + cmd.Flags().String("p2p.unconditional_peer_ids", + config.P2P.UnconditionalPeerIDs, "Comma-delimited IDs of unconditional peers") cmd.Flags().Bool("p2p.upnp", config.P2P.UPNP, "Enable/disable UPNP port forwarding") cmd.Flags().Bool("p2p.pex", config.P2P.PexReactor, "Enable/disable Peer-Exchange") cmd.Flags().Bool("p2p.seed_mode", config.P2P.SeedMode, "Enable/disable seed mode") diff --git a/config/config.go b/config/config.go index c5d42c7e8..8efc41b98 100644 --- a/config/config.go +++ b/config/config.go @@ -505,6 +505,12 @@ type P2PConfig struct { //nolint: maligned // Maximum number of outbound peers to connect to, excluding persistent peers MaxNumOutboundPeers int `mapstructure:"max_num_outbound_peers"` + // List of node IDs, to which a connection will be (re)established ignoring any existing limits + UnconditionalPeerIDs string `mapstructure:"unconditional_peer_ids"` + + // Maximum pause when redialing a persistent peer (if zero, exponential backoff is used) + PersistentPeersMaxDialPeriod time.Duration `mapstructure:"persistent_peers_max_dial_period"` + // Time to wait before flushing messages out on the connection FlushThrottleTimeout time.Duration `mapstructure:"flush_throttle_timeout"` @@ -548,25 +554,26 @@ type P2PConfig struct { //nolint: maligned // DefaultP2PConfig returns a default configuration for the peer-to-peer layer func DefaultP2PConfig() *P2PConfig { return &P2PConfig{ - ListenAddress: "tcp://0.0.0.0:26656", - ExternalAddress: "", - UPNP: false, - AddrBook: defaultAddrBookPath, - AddrBookStrict: true, - MaxNumInboundPeers: 40, - MaxNumOutboundPeers: 10, - FlushThrottleTimeout: 100 * time.Millisecond, - MaxPacketMsgPayloadSize: 1024, // 1 kB - SendRate: 5120000, // 5 mB/s - RecvRate: 5120000, // 5 mB/s - PexReactor: true, - SeedMode: false, - AllowDuplicateIP: false, - HandshakeTimeout: 20 * time.Second, - DialTimeout: 3 * time.Second, - TestDialFail: false, - TestFuzz: false, - TestFuzzConfig: DefaultFuzzConnConfig(), + ListenAddress: "tcp://0.0.0.0:26656", + ExternalAddress: "", + UPNP: false, + AddrBook: defaultAddrBookPath, + AddrBookStrict: true, + MaxNumInboundPeers: 40, + MaxNumOutboundPeers: 10, + PersistentPeersMaxDialPeriod: 0 * time.Second, + FlushThrottleTimeout: 100 * time.Millisecond, + MaxPacketMsgPayloadSize: 1024, // 1 kB + SendRate: 5120000, // 5 mB/s + RecvRate: 5120000, // 5 mB/s + PexReactor: true, + SeedMode: false, + AllowDuplicateIP: false, + HandshakeTimeout: 20 * time.Second, + DialTimeout: 3 * time.Second, + TestDialFail: false, + TestFuzz: false, + TestFuzzConfig: DefaultFuzzConnConfig(), } } @@ -596,6 +603,9 @@ func (cfg *P2PConfig) ValidateBasic() error { if cfg.FlushThrottleTimeout < 0 { return errors.New("flush_throttle_timeout can't be negative") } + if cfg.PersistentPeersMaxDialPeriod < 0 { + return errors.New("persistent_peers_max_dial_period can't be negative") + } if cfg.MaxPacketMsgPayloadSize < 0 { return errors.New("max_packet_msg_payload_size can't be negative") } diff --git a/config/toml.go b/config/toml.go index b9d4b6a8b..a6599ff57 100644 --- a/config/toml.go +++ b/config/toml.go @@ -252,6 +252,12 @@ max_num_inbound_peers = {{ .P2P.MaxNumInboundPeers }} # Maximum number of outbound peers to connect to, excluding persistent peers max_num_outbound_peers = {{ .P2P.MaxNumOutboundPeers }} +# List of node IDs, to which a connection will be (re)established ignoring any existing limits +unconditional_peer_ids = "{{ .P2P.UnconditionalPeerIDs }}" + +# Maximum pause when redialing a persistent peer (if zero, exponential backoff is used) +persistent_peers_max_dial_period = "{{ .P2P.PersistentPeersMaxDialPeriod }}" + # Time to wait before flushing messages out on the connection flush_throttle_timeout = "{{ .P2P.FlushThrottleTimeout }}" diff --git a/node/node.go b/node/node.go index 452ec8eeb..eb81d5b0a 100644 --- a/node/node.go +++ b/node/node.go @@ -541,7 +541,8 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, // TODO (melekes): make it dynamic based on the actual block latencies // from the live network. // https://github.com/tendermint/tendermint/issues/3523 - SeedDisconnectWaitPeriod: 28 * time.Hour, + SeedDisconnectWaitPeriod: 28 * time.Hour, + PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod, }) pexReactor.SetLogger(logger.With("module", "pex")) sw.AddReactor("PEX", pexReactor) @@ -677,6 +678,11 @@ func NewNode(config *cfg.Config, return nil, errors.Wrap(err, "could not add peers from persistent_peers field") } + err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) + if err != nil { + return nil, errors.Wrap(err, "could not add peer ids from unconditional_peer_ids field") + } + addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) if err != nil { return nil, errors.Wrap(err, "could not create addrbook") diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 3235e4e4b..c60ac5e83 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -110,6 +110,9 @@ type PEXReactorConfig struct { // disconnecting. SeedDisconnectWaitPeriod time.Duration + // Maximum pause when redialing a persistent peer (if zero, exponential backoff is used) + PersistentPeersMaxDialPeriod time.Duration + // Seeds is a list of addresses reactor may use // if it can't connect to peers in the addrbook. Seeds []string @@ -517,8 +520,7 @@ func (r *PEXReactor) dialAttemptsInfo(addr *p2p.NetAddress) (attempts int, lastD func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) error { attempts, lastDialed := r.dialAttemptsInfo(addr) - - if attempts > maxAttemptsToDial { + if !r.Switch.IsPeerPersistent(addr) && attempts > maxAttemptsToDial { // TODO(melekes): have a blacklist in the addrbook with peers whom we've // failed to connect to. Then we can clean up attemptsToDial, which acts as // a blacklist currently. @@ -531,6 +533,10 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) error { if attempts > 0 { jitterSeconds := time.Duration(cmn.RandFloat64() * float64(time.Second)) // 1s == (1e9 ns) backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second) + if r.Switch.IsPeerPersistent(addr) && r.config.PersistentPeersMaxDialPeriod > 0 && + backoffDuration > r.config.PersistentPeersMaxDialPeriod { + backoffDuration = r.config.PersistentPeersMaxDialPeriod + } sinceLastDialed := time.Since(lastDialed) if sinceLastDialed < backoffDuration { return errTooEarlyToDial{backoffDuration, lastDialed} diff --git a/p2p/switch.go b/p2p/switch.go index 4898b80c9..f93ed73f9 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -79,6 +79,7 @@ type Switch struct { addrBook AddrBook // peers addresses with whom we'll maintain constant connection persistentPeersAddrs []*NetAddress + unconditionalPeerIDs map[ID]struct{} transport Transport @@ -117,6 +118,7 @@ func NewSwitch( transport: transport, filterTimeout: defaultFilterTimeout, persistentPeersAddrs: make([]*NetAddress, 0), + unconditionalPeerIDs: make(map[ID]struct{}), } // Ensure we have a completely undeterministic PRNG. @@ -280,19 +282,29 @@ func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool { } // NumPeers returns the count of outbound/inbound and outbound-dialing peers. +// unconditional peers are not counted here. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { peers := sw.peers.List() for _, peer := range peers { if peer.IsOutbound() { - outbound++ + if !sw.IsPeerUnconditional(peer.ID()) { + outbound++ + } } else { - inbound++ + if !sw.IsPeerUnconditional(peer.ID()) { + inbound++ + } } } dialing = sw.dialing.Size() return } +func (sw *Switch) IsPeerUnconditional(id ID) bool { + _, ok := sw.unconditionalPeerIDs[id] + return ok +} + // MaxNumOutboundPeers returns a maximum number of outbound peers. func (sw *Switch) MaxNumOutboundPeers() int { return sw.config.MaxNumOutboundPeers @@ -558,15 +570,31 @@ func (sw *Switch) AddPersistentPeers(addrs []string) error { return nil } +func (sw *Switch) AddUnconditionalPeerIDs(ids []string) error { + sw.Logger.Info("Adding unconditional peer ids", "ids", ids) + for i, id := range ids { + err := validateID(ID(id)) + if err != nil { + return errors.Wrapf(err, "wrong ID #%d", i) + } + sw.unconditionalPeerIDs[ID(id)] = struct{}{} + } + return nil +} + func (sw *Switch) isPeerPersistentFn() func(*NetAddress) bool { return func(na *NetAddress) bool { - for _, pa := range sw.persistentPeersAddrs { - if pa.Equals(na) { - return true - } + return sw.IsPeerPersistent(na) + } +} + +func (sw *Switch) IsPeerPersistent(na *NetAddress) bool { + for _, pa := range sw.persistentPeersAddrs { + if pa.Equals(na) { + return true } - return false } + return false } func (sw *Switch) acceptRoutine() { @@ -625,19 +653,22 @@ func (sw *Switch) acceptRoutine() { break } - // Ignore connection if we already have enough peers. - _, in, _ := sw.NumPeers() - if in >= sw.config.MaxNumInboundPeers { - sw.Logger.Info( - "Ignoring inbound connection: already have enough inbound peers", - "address", p.SocketAddr(), - "have", in, - "max", sw.config.MaxNumInboundPeers, - ) + if !sw.IsPeerUnconditional(p.NodeInfo().ID()) { + // Ignore connection if we already have enough peers. + _, in, _ := sw.NumPeers() + if in >= sw.config.MaxNumInboundPeers { + sw.Logger.Info( + "Ignoring inbound connection: already have enough inbound peers", + "address", p.SocketAddr(), + "have", in, + "max", sw.config.MaxNumInboundPeers, + ) - sw.transport.Cleanup(p) + sw.transport.Cleanup(p) + + continue + } - continue } if err := sw.addPeer(p); err != nil { diff --git a/p2p/switch_test.go b/p2p/switch_test.go index ef6fa6f34..8fc0d4767 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -530,9 +530,20 @@ func TestSwitchFullConnectivity(t *testing.T) { func TestSwitchAcceptRoutine(t *testing.T) { cfg.MaxNumInboundPeers = 5 + unconditionalNodeCnt := 2 + remoteUnconditionalPeers := make([]*remotePeer, 0) + var unconditionalNodeIds []string + + for i := 0; i < unconditionalNodeCnt; i++ { + rup := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + remoteUnconditionalPeers = append(remoteUnconditionalPeers, rup) + rup.Start() + unconditionalNodeIds = append(unconditionalNodeIds, string(rup.ID())) + } // make switch sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) + sw.AddUnconditionalPeerIDs(unconditionalNodeIds) err := sw.Start() require.NoError(t, err) defer sw.Stop() @@ -574,10 +585,117 @@ func TestSwitchAcceptRoutine(t *testing.T) { assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size()) rp.Stop() + for _, rup := range remoteUnconditionalPeers { + c, err := rup.Dial(sw.NetAddress()) + require.NoError(t, err) + // spawn a reading routine to prevent connection from closing + go func(c net.Conn) { + for { + one := make([]byte, 1) + _, err := c.Read(one) + if err != nil { + return + } + } + }(c) + } + time.Sleep(10 * time.Millisecond) + assert.Equal(t, cfg.MaxNumInboundPeers+unconditionalNodeCnt, sw.Peers().Size()) + + // stop remote peers + for _, rp := range remotePeers { + rp.Stop() + } + // stop remote unconditional peers + for _, rup := range remoteUnconditionalPeers { + rup.Stop() + } +} + +func TestSwitchAcceptRoutineUnconditionalPeersFirst(t *testing.T) { + cfg.MaxNumInboundPeers = 5 + unconditionalNodeCnt := 7 + remoteUnconditionalPeers := make([]*remotePeer, 0) + var unconditionalNodeIds []string + + for i := 0; i < unconditionalNodeCnt; i++ { + rup := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + remoteUnconditionalPeers = append(remoteUnconditionalPeers, rup) + rup.Start() + unconditionalNodeIds = append(unconditionalNodeIds, string(rup.ID())) + } + + // make switch + sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) + sw.AddUnconditionalPeerIDs(unconditionalNodeIds) + err := sw.Start() + require.NoError(t, err) + defer sw.Stop() + + remotePeers := make([]*remotePeer, 0) + assert.Equal(t, 0, sw.Peers().Size()) + + for _, rup := range remoteUnconditionalPeers { + c, err := rup.Dial(sw.NetAddress()) + require.NoError(t, err) + // spawn a reading routine to prevent connection from closing + go func(c net.Conn) { + for { + one := make([]byte, 1) + _, err := c.Read(one) + if err != nil { + return + } + } + }(c) + } + + time.Sleep(10 * time.Millisecond) + assert.Equal(t, unconditionalNodeCnt, sw.Peers().Size()) + assert.True(t, sw.Peers().Size() > cfg.MaxNumInboundPeers) + + // check we connect up to MaxNumInboundPeers + for i := 0; i < cfg.MaxNumInboundPeers; i++ { + rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + remotePeers = append(remotePeers, rp) + rp.Start() + c, err := rp.Dial(sw.NetAddress()) + require.NoError(t, err) + // spawn a reading routine to prevent connection from closing + go func(c net.Conn) { + for { + one := make([]byte, 1) + _, err := c.Read(one) + if err != nil { + return + } + } + }(c) + } + time.Sleep(10 * time.Millisecond) + assert.Equal(t, cfg.MaxNumInboundPeers+unconditionalNodeCnt, sw.Peers().Size()) + + // check we close new connections if we already have MaxNumInboundPeers peers + rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + rp.Start() + conn, err := rp.Dial(sw.NetAddress()) + require.NoError(t, err) + // check conn is closed + one := make([]byte, 1) + conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)) + _, err = conn.Read(one) + assert.Equal(t, io.EOF, err) + assert.Equal(t, cfg.MaxNumInboundPeers+unconditionalNodeCnt, sw.Peers().Size()) + rp.Stop() + // stop remote peers for _, rp := range remotePeers { rp.Stop() } + // stop remote unconditional peers + for _, rup := range remoteUnconditionalPeers { + rup.Stop() + } } type errorTransport struct { diff --git a/rpc/core/net.go b/rpc/core/net.go index ddab08a4d..6f3e4bb7f 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -13,9 +13,9 @@ import ( // NetInfo returns network info. // More: https://tendermint.com/rpc/#/Info/net_info func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { - out, in, _ := p2pPeers.NumPeers() - peers := make([]ctypes.Peer, 0, out+in) - for _, peer := range p2pPeers.Peers().List() { + peersList := p2pPeers.Peers().List() + peers := make([]ctypes.Peer, 0, len(peersList)) + for _, peer := range peersList { nodeInfo, ok := peer.NodeInfo().(p2p.DefaultNodeInfo) if !ok { return nil, fmt.Errorf("peer.NodeInfo() is not DefaultNodeInfo") diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index a3fff7b10..272192594 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -47,7 +47,6 @@ type transport interface { type peers interface { AddPersistentPeers([]string) error DialPeersAsync([]string) error - NumPeers() (outbound, inbound, dialig int) Peers() p2p.IPeerSet }