Browse Source

p2p: resolve PEX addresses in PEX reactor (#5980)

This changes the new prototype PEX reactor to resolve peer address URLs into IP/port PEX addresses itself. Branched off of #5974.

I've spent some time thinking about address handling in the P2P stack. We currently use `PeerAddress` URLs everywhere, except for two places: when dialing a peer, and when exchanging addresses via PEX. We had two options:

1. Resolve addresses to endpoints inside `PeerManager`. This would introduce a lot of added complexity: we would have to track connection statistics per endpoint, have goroutines that asynchronously resolve and refresh these endpoints, deal with resolve scheduling before dialing (which is trickier than it sounds since it involves multiple goroutines in the peer manager and router and messes with peer rating order), handle IP address visibility issues, and so on.

2. Resolve addresses to endpoints (IP/port) only where they're used: when dialing, and in PEX. Everywhere else we use URLs.

I went with 2, because this significantly simplifies the handling of hostname resolution, and because I really think the PEX reactor should migrate to exchanging URLs instead of IP/port numbers anyway -- this allows operators to use DNS names for validators (and can easily migrate them to new IPs and/or load balance requests), and also allows different protocols (e.g. QUIC and `MemoryTransport`). Happy to discuss this.
pull/5989/head
Erik Grinaker 3 years ago
committed by GitHub
parent
commit
fe5b312337
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 39 deletions
  1. +4
    -0
      p2p/netaddress.go
  2. +9
    -25
      p2p/peer.go
  3. +51
    -14
      p2p/pex/reactor.go

+ 4
- 0
p2p/netaddress.go View File

@ -140,6 +140,7 @@ func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress {
}
// NetAddressFromProto converts a Protobuf PexAddress into a native struct.
// FIXME: Remove this when legacy PEX reactor is removed.
func NetAddressFromProto(pb tmp2p.PexAddress) (*NetAddress, error) {
ip := net.ParseIP(pb.IP)
if ip == nil {
@ -156,6 +157,7 @@ func NetAddressFromProto(pb tmp2p.PexAddress) (*NetAddress, error) {
}
// NetAddressesFromProto converts a slice of Protobuf PexAddresses into a native slice.
// FIXME: Remove this when legacy PEX reactor is removed.
func NetAddressesFromProto(pbs []tmp2p.PexAddress) ([]*NetAddress, error) {
nas := make([]*NetAddress, 0, len(pbs))
for _, pb := range pbs {
@ -169,6 +171,7 @@ func NetAddressesFromProto(pbs []tmp2p.PexAddress) ([]*NetAddress, error) {
}
// NetAddressesToProto converts a slice of NetAddresses into a Protobuf PexAddress slice.
// FIXME: Remove this when legacy PEX reactor is removed.
func NetAddressesToProto(nas []*NetAddress) []tmp2p.PexAddress {
pbs := make([]tmp2p.PexAddress, 0, len(nas))
for _, na := range nas {
@ -180,6 +183,7 @@ func NetAddressesToProto(nas []*NetAddress) []tmp2p.PexAddress {
}
// ToProto converts a NetAddress to a Protobuf PexAddress.
// FIXME: Remove this when legacy PEX reactor is removed.
func (na *NetAddress) ToProto() tmp2p.PexAddress {
return tmp2p.PexAddress{
ID: string(na.ID),


+ 9
- 25
p2p/peer.go View File

@ -493,43 +493,27 @@ func (m *PeerManager) Add(address PeerAddress) error {
return nil
}
// Advertise returns a list of peer endpoints to advertise to a peer.
// Advertise returns a list of peer addresses to advertise to a peer.
//
// FIXME: The current PEX protocol only supports IP/port endpoints, so we
// returns endpoints here. The PEX protocol should exchange addresses (URLs)
// instead, so it can support multiple protocols and allow operators to
// change their IP addresses.
//
// FIXME: We currently just resolve and pass all addresses we have, which is
// very naïve. We should e.g. only send addresses that the peer can actually
// reach, by making sure any private IP addresses are on the same network as the
// remote peer endpoint. We should also resolve endpoints when addresses are
// added to the peer manager, and periodically as appropriate.
func (m *PeerManager) Advertise(peerID NodeID, limit uint16) []Endpoint {
// FIXME: This is fairly naïve and only returns the addresses of the
// highest-ranked peers.
func (m *PeerManager) Advertise(peerID NodeID, limit uint16) []PeerAddress {
m.mtx.Lock()
defer m.mtx.Unlock()
endpoints := make([]Endpoint, 0, limit)
addresses := make([]PeerAddress, 0, limit)
for _, peer := range m.store.Ranked() {
if peer.ID == peerID {
continue
}
for _, addressInfo := range peer.AddressInfo {
addressEndpoints, err := addressInfo.Address.Resolve(context.Background())
if err != nil {
continue
}
for _, endpoint := range addressEndpoints {
if len(endpoints) >= int(limit) {
return endpoints
}
if endpoint.IP != nil {
endpoints = append(endpoints, endpoint)
}
if len(addresses) >= int(limit) {
return addresses
}
addresses = append(addresses, addressInfo.Address)
}
}
return endpoints
return addresses
}
// makePeerInfo creates a peerInfo for a new peer.


+ 51
- 14
p2p/pex/reactor.go View File

@ -1,7 +1,9 @@
package pex
import (
"context"
"fmt"
"time"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
@ -15,7 +17,8 @@ var (
)
const (
maxAddresses uint16 = 100
maxAddresses uint16 = 100
resolveTimeout = 3 * time.Second
)
// ReactorV2 is a PEX reactor for the new P2P stack. The legacy reactor
@ -81,25 +84,22 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error {
// only processing addresses we actually requested.
switch msg := envelope.Message.(type) {
case *protop2p.PexRequest:
endpoints := r.peerManager.Advertise(envelope.From, maxAddresses)
resp := &protop2p.PexResponse{Addresses: make([]protop2p.PexAddress, 0, len(endpoints))}
for _, endpoint := range endpoints {
// FIXME: This shouldn't rely on NetAddress.
resp.Addresses = append(resp.Addresses, endpoint.NetAddress().ToProto())
pexAddresses := r.resolve(r.peerManager.Advertise(envelope.From, maxAddresses), maxAddresses)
r.pexCh.Out() <- p2p.Envelope{
To: envelope.From,
Message: &protop2p.PexResponse{Addresses: pexAddresses},
}
r.pexCh.Out() <- p2p.Envelope{To: envelope.From, Message: resp}
case *protop2p.PexResponse:
for _, pbAddr := range msg.Addresses {
// FIXME: This shouldn't rely on NetAddress.
netaddr, err := p2p.NetAddressFromProto(pbAddr)
for _, pexAddress := range msg.Addresses {
peerAddress, err := p2p.ParsePeerAddress(
fmt.Sprintf("%s@%s:%d", pexAddress.ID, pexAddress.IP, pexAddress.Port))
if err != nil {
logger.Debug("received invalid PEX address", "addr", netaddr, "err", err)
logger.Debug("invalid PEX address", "address", pexAddress, "err", err)
continue
}
if err = r.peerManager.Add(netaddr.Endpoint().PeerAddress()); err != nil {
logger.Debug("received invalid PEX address", "addr", netaddr, "err", err)
continue
if err = r.peerManager.Add(peerAddress); err != nil {
logger.Debug("failed to register PEX address", "address", peerAddress, "err", err)
}
}
@ -110,6 +110,43 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error {
return nil
}
// resolve resolves a set of peer addresses into PEX addresses.
//
// FIXME: This is necessary because the current PEX protocol only supports
// IP/port pairs, while the P2P stack uses PeerAddress URLs. The PEX protocol
// should really use URLs too, to exchange DNS names instead of IPs and allow
// different transport protocols (e.g. QUIC and MemoryTransport).
//
// FIXME: We may want to cache and parallelize this, but for now we'll just rely
// on the operating system to cache it for us.
func (r *ReactorV2) resolve(addresses []p2p.PeerAddress, limit uint16) []protop2p.PexAddress {
pexAddresses := make([]protop2p.PexAddress, 0, len(addresses))
for _, address := range addresses {
ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout)
endpoints, err := address.Resolve(ctx)
cancel()
if err != nil {
r.Logger.Debug("failed to resolve address", "address", address, "err", err)
continue
}
for _, endpoint := range endpoints {
if len(pexAddresses) >= int(limit) {
return pexAddresses
} else if endpoint.IP != nil {
// PEX currently only supports IP-networked transports (as
// opposed to e.g. p2p.MemoryTransport).
pexAddresses = append(pexAddresses, protop2p.PexAddress{
ID: string(endpoint.PeerID),
IP: endpoint.IP.String(),
Port: uint32(endpoint.Port),
})
}
}
}
return pexAddresses
}
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.


Loading…
Cancel
Save