From fe5b312337628b8af986a469d0dba832fde16a93 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 26 Jan 2021 16:58:33 +0100 Subject: [PATCH] p2p: resolve PEX addresses in PEX reactor (#5980) This changes the new prototype PEX reactor to resolve peer address URLs into IP/port PEX addresses itself. Branched off of #5974. I've spent some time thinking about address handling in the P2P stack. We currently use `PeerAddress` URLs everywhere, except for two places: when dialing a peer, and when exchanging addresses via PEX. We had two options: 1. Resolve addresses to endpoints inside `PeerManager`. This would introduce a lot of added complexity: we would have to track connection statistics per endpoint, have goroutines that asynchronously resolve and refresh these endpoints, deal with resolve scheduling before dialing (which is trickier than it sounds since it involves multiple goroutines in the peer manager and router and messes with peer rating order), handle IP address visibility issues, and so on. 2. Resolve addresses to endpoints (IP/port) only where they're used: when dialing, and in PEX. Everywhere else we use URLs. I went with 2, because this significantly simplifies the handling of hostname resolution, and because I really think the PEX reactor should migrate to exchanging URLs instead of IP/port numbers anyway -- this allows operators to use DNS names for validators (and can easily migrate them to new IPs and/or load balance requests), and also allows different protocols (e.g. QUIC and `MemoryTransport`). Happy to discuss this. --- p2p/netaddress.go | 4 +++ p2p/peer.go | 34 +++++++----------------- p2p/pex/reactor.go | 65 ++++++++++++++++++++++++++++++++++++---------- 3 files changed, 64 insertions(+), 39 deletions(-) diff --git a/p2p/netaddress.go b/p2p/netaddress.go index ec549cf61..a9bd72315 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -140,6 +140,7 @@ func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress { } // NetAddressFromProto converts a Protobuf PexAddress into a native struct. +// FIXME: Remove this when legacy PEX reactor is removed. func NetAddressFromProto(pb tmp2p.PexAddress) (*NetAddress, error) { ip := net.ParseIP(pb.IP) if ip == nil { @@ -156,6 +157,7 @@ func NetAddressFromProto(pb tmp2p.PexAddress) (*NetAddress, error) { } // NetAddressesFromProto converts a slice of Protobuf PexAddresses into a native slice. +// FIXME: Remove this when legacy PEX reactor is removed. func NetAddressesFromProto(pbs []tmp2p.PexAddress) ([]*NetAddress, error) { nas := make([]*NetAddress, 0, len(pbs)) for _, pb := range pbs { @@ -169,6 +171,7 @@ func NetAddressesFromProto(pbs []tmp2p.PexAddress) ([]*NetAddress, error) { } // NetAddressesToProto converts a slice of NetAddresses into a Protobuf PexAddress slice. +// FIXME: Remove this when legacy PEX reactor is removed. func NetAddressesToProto(nas []*NetAddress) []tmp2p.PexAddress { pbs := make([]tmp2p.PexAddress, 0, len(nas)) for _, na := range nas { @@ -180,6 +183,7 @@ func NetAddressesToProto(nas []*NetAddress) []tmp2p.PexAddress { } // ToProto converts a NetAddress to a Protobuf PexAddress. +// FIXME: Remove this when legacy PEX reactor is removed. func (na *NetAddress) ToProto() tmp2p.PexAddress { return tmp2p.PexAddress{ ID: string(na.ID), diff --git a/p2p/peer.go b/p2p/peer.go index 13ddc3cae..d0b46e9aa 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -493,43 +493,27 @@ func (m *PeerManager) Add(address PeerAddress) error { return nil } -// Advertise returns a list of peer endpoints to advertise to a peer. +// Advertise returns a list of peer addresses to advertise to a peer. // -// FIXME: The current PEX protocol only supports IP/port endpoints, so we -// returns endpoints here. The PEX protocol should exchange addresses (URLs) -// instead, so it can support multiple protocols and allow operators to -// change their IP addresses. -// -// FIXME: We currently just resolve and pass all addresses we have, which is -// very naïve. We should e.g. only send addresses that the peer can actually -// reach, by making sure any private IP addresses are on the same network as the -// remote peer endpoint. We should also resolve endpoints when addresses are -// added to the peer manager, and periodically as appropriate. -func (m *PeerManager) Advertise(peerID NodeID, limit uint16) []Endpoint { +// FIXME: This is fairly naïve and only returns the addresses of the +// highest-ranked peers. +func (m *PeerManager) Advertise(peerID NodeID, limit uint16) []PeerAddress { m.mtx.Lock() defer m.mtx.Unlock() - endpoints := make([]Endpoint, 0, limit) + addresses := make([]PeerAddress, 0, limit) for _, peer := range m.store.Ranked() { if peer.ID == peerID { continue } for _, addressInfo := range peer.AddressInfo { - addressEndpoints, err := addressInfo.Address.Resolve(context.Background()) - if err != nil { - continue - } - for _, endpoint := range addressEndpoints { - if len(endpoints) >= int(limit) { - return endpoints - } - if endpoint.IP != nil { - endpoints = append(endpoints, endpoint) - } + if len(addresses) >= int(limit) { + return addresses } + addresses = append(addresses, addressInfo.Address) } } - return endpoints + return addresses } // makePeerInfo creates a peerInfo for a new peer. diff --git a/p2p/pex/reactor.go b/p2p/pex/reactor.go index c0d8dee10..c9c5b6779 100644 --- a/p2p/pex/reactor.go +++ b/p2p/pex/reactor.go @@ -1,7 +1,9 @@ package pex import ( + "context" "fmt" + "time" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" @@ -15,7 +17,8 @@ var ( ) const ( - maxAddresses uint16 = 100 + maxAddresses uint16 = 100 + resolveTimeout = 3 * time.Second ) // ReactorV2 is a PEX reactor for the new P2P stack. The legacy reactor @@ -81,25 +84,22 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { // only processing addresses we actually requested. switch msg := envelope.Message.(type) { case *protop2p.PexRequest: - endpoints := r.peerManager.Advertise(envelope.From, maxAddresses) - resp := &protop2p.PexResponse{Addresses: make([]protop2p.PexAddress, 0, len(endpoints))} - for _, endpoint := range endpoints { - // FIXME: This shouldn't rely on NetAddress. - resp.Addresses = append(resp.Addresses, endpoint.NetAddress().ToProto()) + pexAddresses := r.resolve(r.peerManager.Advertise(envelope.From, maxAddresses), maxAddresses) + r.pexCh.Out() <- p2p.Envelope{ + To: envelope.From, + Message: &protop2p.PexResponse{Addresses: pexAddresses}, } - r.pexCh.Out() <- p2p.Envelope{To: envelope.From, Message: resp} case *protop2p.PexResponse: - for _, pbAddr := range msg.Addresses { - // FIXME: This shouldn't rely on NetAddress. - netaddr, err := p2p.NetAddressFromProto(pbAddr) + for _, pexAddress := range msg.Addresses { + peerAddress, err := p2p.ParsePeerAddress( + fmt.Sprintf("%s@%s:%d", pexAddress.ID, pexAddress.IP, pexAddress.Port)) if err != nil { - logger.Debug("received invalid PEX address", "addr", netaddr, "err", err) + logger.Debug("invalid PEX address", "address", pexAddress, "err", err) continue } - if err = r.peerManager.Add(netaddr.Endpoint().PeerAddress()); err != nil { - logger.Debug("received invalid PEX address", "addr", netaddr, "err", err) - continue + if err = r.peerManager.Add(peerAddress); err != nil { + logger.Debug("failed to register PEX address", "address", peerAddress, "err", err) } } @@ -110,6 +110,43 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { return nil } +// resolve resolves a set of peer addresses into PEX addresses. +// +// FIXME: This is necessary because the current PEX protocol only supports +// IP/port pairs, while the P2P stack uses PeerAddress URLs. The PEX protocol +// should really use URLs too, to exchange DNS names instead of IPs and allow +// different transport protocols (e.g. QUIC and MemoryTransport). +// +// FIXME: We may want to cache and parallelize this, but for now we'll just rely +// on the operating system to cache it for us. +func (r *ReactorV2) resolve(addresses []p2p.PeerAddress, limit uint16) []protop2p.PexAddress { + pexAddresses := make([]protop2p.PexAddress, 0, len(addresses)) + for _, address := range addresses { + ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout) + endpoints, err := address.Resolve(ctx) + cancel() + if err != nil { + r.Logger.Debug("failed to resolve address", "address", address, "err", err) + continue + } + for _, endpoint := range endpoints { + if len(pexAddresses) >= int(limit) { + return pexAddresses + + } else if endpoint.IP != nil { + // PEX currently only supports IP-networked transports (as + // opposed to e.g. p2p.MemoryTransport). + pexAddresses = append(pexAddresses, protop2p.PexAddress{ + ID: string(endpoint.PeerID), + IP: endpoint.IP.String(), + Port: uint32(endpoint.Port), + }) + } + } + } + return pexAddresses +} + // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel.