Browse Source

p2p: add `unconditional_peer_ids` and `persistent_peers_max_dial_period` (#4176)

implementation spec of Improved Trusted Peering ADR-050 by B-Harvest

- add unconditional_peer_ids and persistent_peers_max_dial_period to config
- add unconditionalPeerIDs map to Switch struct

default config value of persistent_peers_max_dial_period is 0s(disabled)

Refs #4072, #4053
pull/4218/head
dongsamb 5 years ago
committed by Anton Kaliaev
parent
commit
701e9cac4d
10 changed files with 224 additions and 44 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +2
    -0
      cmd/tendermint/commands/run_node.go
  3. +29
    -19
      config/config.go
  4. +6
    -0
      config/toml.go
  5. +7
    -1
      node/node.go
  6. +8
    -2
      p2p/pex/pex_reactor.go
  7. +49
    -18
      p2p/switch.go
  8. +118
    -0
      p2p/switch_test.go
  9. +3
    -3
      rpc/core/net.go
  10. +0
    -1
      rpc/core/pipe.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -86,6 +86,8 @@ program](https://hackerone.com/tendermint).
### FEATURES:
- [p2p] \#4053 Add `unconditional_peer_ids` and `persistent_peers_max_dial_period` config variables (see ADR-050) (@dongsam)
### IMPROVEMENTS:
- [rpc] \#3188 Added `block_size` to `BlockMeta` this is reflected in `/blockchain`


+ 2
- 0
cmd/tendermint/commands/run_node.go View File

@ -49,6 +49,8 @@ func AddNodeFlags(cmd *cobra.Command) {
"Node listen address. (0.0.0.0:0 means any interface, any port)")
cmd.Flags().String("p2p.seeds", config.P2P.Seeds, "Comma-delimited ID@host:port seed nodes")
cmd.Flags().String("p2p.persistent_peers", config.P2P.PersistentPeers, "Comma-delimited ID@host:port persistent peers")
cmd.Flags().String("p2p.unconditional_peer_ids",
config.P2P.UnconditionalPeerIDs, "Comma-delimited IDs of unconditional peers")
cmd.Flags().Bool("p2p.upnp", config.P2P.UPNP, "Enable/disable UPNP port forwarding")
cmd.Flags().Bool("p2p.pex", config.P2P.PexReactor, "Enable/disable Peer-Exchange")
cmd.Flags().Bool("p2p.seed_mode", config.P2P.SeedMode, "Enable/disable seed mode")


+ 29
- 19
config/config.go View File

@ -505,6 +505,12 @@ type P2PConfig struct { //nolint: maligned
// Maximum number of outbound peers to connect to, excluding persistent peers
MaxNumOutboundPeers int `mapstructure:"max_num_outbound_peers"`
// List of node IDs, to which a connection will be (re)established ignoring any existing limits
UnconditionalPeerIDs string `mapstructure:"unconditional_peer_ids"`
// Maximum pause when redialing a persistent peer (if zero, exponential backoff is used)
PersistentPeersMaxDialPeriod time.Duration `mapstructure:"persistent_peers_max_dial_period"`
// Time to wait before flushing messages out on the connection
FlushThrottleTimeout time.Duration `mapstructure:"flush_throttle_timeout"`
@ -548,25 +554,26 @@ type P2PConfig struct { //nolint: maligned
// DefaultP2PConfig returns a default configuration for the peer-to-peer layer
func DefaultP2PConfig() *P2PConfig {
return &P2PConfig{
ListenAddress: "tcp://0.0.0.0:26656",
ExternalAddress: "",
UPNP: false,
AddrBook: defaultAddrBookPath,
AddrBookStrict: true,
MaxNumInboundPeers: 40,
MaxNumOutboundPeers: 10,
FlushThrottleTimeout: 100 * time.Millisecond,
MaxPacketMsgPayloadSize: 1024, // 1 kB
SendRate: 5120000, // 5 mB/s
RecvRate: 5120000, // 5 mB/s
PexReactor: true,
SeedMode: false,
AllowDuplicateIP: false,
HandshakeTimeout: 20 * time.Second,
DialTimeout: 3 * time.Second,
TestDialFail: false,
TestFuzz: false,
TestFuzzConfig: DefaultFuzzConnConfig(),
ListenAddress: "tcp://0.0.0.0:26656",
ExternalAddress: "",
UPNP: false,
AddrBook: defaultAddrBookPath,
AddrBookStrict: true,
MaxNumInboundPeers: 40,
MaxNumOutboundPeers: 10,
PersistentPeersMaxDialPeriod: 0 * time.Second,
FlushThrottleTimeout: 100 * time.Millisecond,
MaxPacketMsgPayloadSize: 1024, // 1 kB
SendRate: 5120000, // 5 mB/s
RecvRate: 5120000, // 5 mB/s
PexReactor: true,
SeedMode: false,
AllowDuplicateIP: false,
HandshakeTimeout: 20 * time.Second,
DialTimeout: 3 * time.Second,
TestDialFail: false,
TestFuzz: false,
TestFuzzConfig: DefaultFuzzConnConfig(),
}
}
@ -596,6 +603,9 @@ func (cfg *P2PConfig) ValidateBasic() error {
if cfg.FlushThrottleTimeout < 0 {
return errors.New("flush_throttle_timeout can't be negative")
}
if cfg.PersistentPeersMaxDialPeriod < 0 {
return errors.New("persistent_peers_max_dial_period can't be negative")
}
if cfg.MaxPacketMsgPayloadSize < 0 {
return errors.New("max_packet_msg_payload_size can't be negative")
}


+ 6
- 0
config/toml.go View File

@ -252,6 +252,12 @@ max_num_inbound_peers = {{ .P2P.MaxNumInboundPeers }}
# Maximum number of outbound peers to connect to, excluding persistent peers
max_num_outbound_peers = {{ .P2P.MaxNumOutboundPeers }}
# List of node IDs, to which a connection will be (re)established ignoring any existing limits
unconditional_peer_ids = "{{ .P2P.UnconditionalPeerIDs }}"
# Maximum pause when redialing a persistent peer (if zero, exponential backoff is used)
persistent_peers_max_dial_period = "{{ .P2P.PersistentPeersMaxDialPeriod }}"
# Time to wait before flushing messages out on the connection
flush_throttle_timeout = "{{ .P2P.FlushThrottleTimeout }}"


+ 7
- 1
node/node.go View File

@ -541,7 +541,8 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
// TODO (melekes): make it dynamic based on the actual block latencies
// from the live network.
// https://github.com/tendermint/tendermint/issues/3523
SeedDisconnectWaitPeriod: 28 * time.Hour,
SeedDisconnectWaitPeriod: 28 * time.Hour,
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
})
pexReactor.SetLogger(logger.With("module", "pex"))
sw.AddReactor("PEX", pexReactor)
@ -677,6 +678,11 @@ func NewNode(config *cfg.Config,
return nil, errors.Wrap(err, "could not add peers from persistent_peers field")
}
err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, errors.Wrap(err, "could not add peer ids from unconditional_peer_ids field")
}
addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, errors.Wrap(err, "could not create addrbook")


+ 8
- 2
p2p/pex/pex_reactor.go View File

@ -110,6 +110,9 @@ type PEXReactorConfig struct {
// disconnecting.
SeedDisconnectWaitPeriod time.Duration
// Maximum pause when redialing a persistent peer (if zero, exponential backoff is used)
PersistentPeersMaxDialPeriod time.Duration
// Seeds is a list of addresses reactor may use
// if it can't connect to peers in the addrbook.
Seeds []string
@ -517,8 +520,7 @@ func (r *PEXReactor) dialAttemptsInfo(addr *p2p.NetAddress) (attempts int, lastD
func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) error {
attempts, lastDialed := r.dialAttemptsInfo(addr)
if attempts > maxAttemptsToDial {
if !r.Switch.IsPeerPersistent(addr) && attempts > maxAttemptsToDial {
// TODO(melekes): have a blacklist in the addrbook with peers whom we've
// failed to connect to. Then we can clean up attemptsToDial, which acts as
// a blacklist currently.
@ -531,6 +533,10 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) error {
if attempts > 0 {
jitterSeconds := time.Duration(cmn.RandFloat64() * float64(time.Second)) // 1s == (1e9 ns)
backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second)
if r.Switch.IsPeerPersistent(addr) && r.config.PersistentPeersMaxDialPeriod > 0 &&
backoffDuration > r.config.PersistentPeersMaxDialPeriod {
backoffDuration = r.config.PersistentPeersMaxDialPeriod
}
sinceLastDialed := time.Since(lastDialed)
if sinceLastDialed < backoffDuration {
return errTooEarlyToDial{backoffDuration, lastDialed}


+ 49
- 18
p2p/switch.go View File

@ -79,6 +79,7 @@ type Switch struct {
addrBook AddrBook
// peers addresses with whom we'll maintain constant connection
persistentPeersAddrs []*NetAddress
unconditionalPeerIDs map[ID]struct{}
transport Transport
@ -117,6 +118,7 @@ func NewSwitch(
transport: transport,
filterTimeout: defaultFilterTimeout,
persistentPeersAddrs: make([]*NetAddress, 0),
unconditionalPeerIDs: make(map[ID]struct{}),
}
// Ensure we have a completely undeterministic PRNG.
@ -280,19 +282,29 @@ func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
}
// NumPeers returns the count of outbound/inbound and outbound-dialing peers.
// unconditional peers are not counted here.
func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
peers := sw.peers.List()
for _, peer := range peers {
if peer.IsOutbound() {
outbound++
if !sw.IsPeerUnconditional(peer.ID()) {
outbound++
}
} else {
inbound++
if !sw.IsPeerUnconditional(peer.ID()) {
inbound++
}
}
}
dialing = sw.dialing.Size()
return
}
func (sw *Switch) IsPeerUnconditional(id ID) bool {
_, ok := sw.unconditionalPeerIDs[id]
return ok
}
// MaxNumOutboundPeers returns a maximum number of outbound peers.
func (sw *Switch) MaxNumOutboundPeers() int {
return sw.config.MaxNumOutboundPeers
@ -558,15 +570,31 @@ func (sw *Switch) AddPersistentPeers(addrs []string) error {
return nil
}
func (sw *Switch) AddUnconditionalPeerIDs(ids []string) error {
sw.Logger.Info("Adding unconditional peer ids", "ids", ids)
for i, id := range ids {
err := validateID(ID(id))
if err != nil {
return errors.Wrapf(err, "wrong ID #%d", i)
}
sw.unconditionalPeerIDs[ID(id)] = struct{}{}
}
return nil
}
func (sw *Switch) isPeerPersistentFn() func(*NetAddress) bool {
return func(na *NetAddress) bool {
for _, pa := range sw.persistentPeersAddrs {
if pa.Equals(na) {
return true
}
return sw.IsPeerPersistent(na)
}
}
func (sw *Switch) IsPeerPersistent(na *NetAddress) bool {
for _, pa := range sw.persistentPeersAddrs {
if pa.Equals(na) {
return true
}
return false
}
return false
}
func (sw *Switch) acceptRoutine() {
@ -625,19 +653,22 @@ func (sw *Switch) acceptRoutine() {
break
}
// Ignore connection if we already have enough peers.
_, in, _ := sw.NumPeers()
if in >= sw.config.MaxNumInboundPeers {
sw.Logger.Info(
"Ignoring inbound connection: already have enough inbound peers",
"address", p.SocketAddr(),
"have", in,
"max", sw.config.MaxNumInboundPeers,
)
if !sw.IsPeerUnconditional(p.NodeInfo().ID()) {
// Ignore connection if we already have enough peers.
_, in, _ := sw.NumPeers()
if in >= sw.config.MaxNumInboundPeers {
sw.Logger.Info(
"Ignoring inbound connection: already have enough inbound peers",
"address", p.SocketAddr(),
"have", in,
"max", sw.config.MaxNumInboundPeers,
)
sw.transport.Cleanup(p)
sw.transport.Cleanup(p)
continue
}
continue
}
if err := sw.addPeer(p); err != nil {


+ 118
- 0
p2p/switch_test.go View File

@ -530,9 +530,20 @@ func TestSwitchFullConnectivity(t *testing.T) {
func TestSwitchAcceptRoutine(t *testing.T) {
cfg.MaxNumInboundPeers = 5
unconditionalNodeCnt := 2
remoteUnconditionalPeers := make([]*remotePeer, 0)
var unconditionalNodeIds []string
for i := 0; i < unconditionalNodeCnt; i++ {
rup := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
remoteUnconditionalPeers = append(remoteUnconditionalPeers, rup)
rup.Start()
unconditionalNodeIds = append(unconditionalNodeIds, string(rup.ID()))
}
// make switch
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
sw.AddUnconditionalPeerIDs(unconditionalNodeIds)
err := sw.Start()
require.NoError(t, err)
defer sw.Stop()
@ -574,10 +585,117 @@ func TestSwitchAcceptRoutine(t *testing.T) {
assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size())
rp.Stop()
for _, rup := range remoteUnconditionalPeers {
c, err := rup.Dial(sw.NetAddress())
require.NoError(t, err)
// spawn a reading routine to prevent connection from closing
go func(c net.Conn) {
for {
one := make([]byte, 1)
_, err := c.Read(one)
if err != nil {
return
}
}
}(c)
}
time.Sleep(10 * time.Millisecond)
assert.Equal(t, cfg.MaxNumInboundPeers+unconditionalNodeCnt, sw.Peers().Size())
// stop remote peers
for _, rp := range remotePeers {
rp.Stop()
}
// stop remote unconditional peers
for _, rup := range remoteUnconditionalPeers {
rup.Stop()
}
}
func TestSwitchAcceptRoutineUnconditionalPeersFirst(t *testing.T) {
cfg.MaxNumInboundPeers = 5
unconditionalNodeCnt := 7
remoteUnconditionalPeers := make([]*remotePeer, 0)
var unconditionalNodeIds []string
for i := 0; i < unconditionalNodeCnt; i++ {
rup := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
remoteUnconditionalPeers = append(remoteUnconditionalPeers, rup)
rup.Start()
unconditionalNodeIds = append(unconditionalNodeIds, string(rup.ID()))
}
// make switch
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
sw.AddUnconditionalPeerIDs(unconditionalNodeIds)
err := sw.Start()
require.NoError(t, err)
defer sw.Stop()
remotePeers := make([]*remotePeer, 0)
assert.Equal(t, 0, sw.Peers().Size())
for _, rup := range remoteUnconditionalPeers {
c, err := rup.Dial(sw.NetAddress())
require.NoError(t, err)
// spawn a reading routine to prevent connection from closing
go func(c net.Conn) {
for {
one := make([]byte, 1)
_, err := c.Read(one)
if err != nil {
return
}
}
}(c)
}
time.Sleep(10 * time.Millisecond)
assert.Equal(t, unconditionalNodeCnt, sw.Peers().Size())
assert.True(t, sw.Peers().Size() > cfg.MaxNumInboundPeers)
// check we connect up to MaxNumInboundPeers
for i := 0; i < cfg.MaxNumInboundPeers; i++ {
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
remotePeers = append(remotePeers, rp)
rp.Start()
c, err := rp.Dial(sw.NetAddress())
require.NoError(t, err)
// spawn a reading routine to prevent connection from closing
go func(c net.Conn) {
for {
one := make([]byte, 1)
_, err := c.Read(one)
if err != nil {
return
}
}
}(c)
}
time.Sleep(10 * time.Millisecond)
assert.Equal(t, cfg.MaxNumInboundPeers+unconditionalNodeCnt, sw.Peers().Size())
// check we close new connections if we already have MaxNumInboundPeers peers
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
conn, err := rp.Dial(sw.NetAddress())
require.NoError(t, err)
// check conn is closed
one := make([]byte, 1)
conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
_, err = conn.Read(one)
assert.Equal(t, io.EOF, err)
assert.Equal(t, cfg.MaxNumInboundPeers+unconditionalNodeCnt, sw.Peers().Size())
rp.Stop()
// stop remote peers
for _, rp := range remotePeers {
rp.Stop()
}
// stop remote unconditional peers
for _, rup := range remoteUnconditionalPeers {
rup.Stop()
}
}
type errorTransport struct {


+ 3
- 3
rpc/core/net.go View File

@ -13,9 +13,9 @@ import (
// NetInfo returns network info.
// More: https://tendermint.com/rpc/#/Info/net_info
func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) {
out, in, _ := p2pPeers.NumPeers()
peers := make([]ctypes.Peer, 0, out+in)
for _, peer := range p2pPeers.Peers().List() {
peersList := p2pPeers.Peers().List()
peers := make([]ctypes.Peer, 0, len(peersList))
for _, peer := range peersList {
nodeInfo, ok := peer.NodeInfo().(p2p.DefaultNodeInfo)
if !ok {
return nil, fmt.Errorf("peer.NodeInfo() is not DefaultNodeInfo")


+ 0
- 1
rpc/core/pipe.go View File

@ -47,7 +47,6 @@ type transport interface {
type peers interface {
AddPersistentPeers([]string) error
DialPeersAsync([]string) error
NumPeers() (outbound, inbound, dialig int)
Peers() p2p.IPeerSet
}


Loading…
Cancel
Save