diff --git a/p2p/netaddress.go b/p2p/netaddress.go index ec549cf61..a9bd72315 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -140,6 +140,7 @@ func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress { } // NetAddressFromProto converts a Protobuf PexAddress into a native struct. +// FIXME: Remove this when legacy PEX reactor is removed. func NetAddressFromProto(pb tmp2p.PexAddress) (*NetAddress, error) { ip := net.ParseIP(pb.IP) if ip == nil { @@ -156,6 +157,7 @@ func NetAddressFromProto(pb tmp2p.PexAddress) (*NetAddress, error) { } // NetAddressesFromProto converts a slice of Protobuf PexAddresses into a native slice. +// FIXME: Remove this when legacy PEX reactor is removed. func NetAddressesFromProto(pbs []tmp2p.PexAddress) ([]*NetAddress, error) { nas := make([]*NetAddress, 0, len(pbs)) for _, pb := range pbs { @@ -169,6 +171,7 @@ func NetAddressesFromProto(pbs []tmp2p.PexAddress) ([]*NetAddress, error) { } // NetAddressesToProto converts a slice of NetAddresses into a Protobuf PexAddress slice. +// FIXME: Remove this when legacy PEX reactor is removed. func NetAddressesToProto(nas []*NetAddress) []tmp2p.PexAddress { pbs := make([]tmp2p.PexAddress, 0, len(nas)) for _, na := range nas { @@ -180,6 +183,7 @@ func NetAddressesToProto(nas []*NetAddress) []tmp2p.PexAddress { } // ToProto converts a NetAddress to a Protobuf PexAddress. +// FIXME: Remove this when legacy PEX reactor is removed. func (na *NetAddress) ToProto() tmp2p.PexAddress { return tmp2p.PexAddress{ ID: string(na.ID), diff --git a/p2p/peer.go b/p2p/peer.go index 13ddc3cae..d0b46e9aa 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -493,43 +493,27 @@ func (m *PeerManager) Add(address PeerAddress) error { return nil } -// Advertise returns a list of peer endpoints to advertise to a peer. +// Advertise returns a list of peer addresses to advertise to a peer. // -// FIXME: The current PEX protocol only supports IP/port endpoints, so we -// returns endpoints here. The PEX protocol should exchange addresses (URLs) -// instead, so it can support multiple protocols and allow operators to -// change their IP addresses. -// -// FIXME: We currently just resolve and pass all addresses we have, which is -// very naïve. We should e.g. only send addresses that the peer can actually -// reach, by making sure any private IP addresses are on the same network as the -// remote peer endpoint. We should also resolve endpoints when addresses are -// added to the peer manager, and periodically as appropriate. -func (m *PeerManager) Advertise(peerID NodeID, limit uint16) []Endpoint { +// FIXME: This is fairly naïve and only returns the addresses of the +// highest-ranked peers. +func (m *PeerManager) Advertise(peerID NodeID, limit uint16) []PeerAddress { m.mtx.Lock() defer m.mtx.Unlock() - endpoints := make([]Endpoint, 0, limit) + addresses := make([]PeerAddress, 0, limit) for _, peer := range m.store.Ranked() { if peer.ID == peerID { continue } for _, addressInfo := range peer.AddressInfo { - addressEndpoints, err := addressInfo.Address.Resolve(context.Background()) - if err != nil { - continue - } - for _, endpoint := range addressEndpoints { - if len(endpoints) >= int(limit) { - return endpoints - } - if endpoint.IP != nil { - endpoints = append(endpoints, endpoint) - } + if len(addresses) >= int(limit) { + return addresses } + addresses = append(addresses, addressInfo.Address) } } - return endpoints + return addresses } // makePeerInfo creates a peerInfo for a new peer. diff --git a/p2p/pex/reactor.go b/p2p/pex/reactor.go index c0d8dee10..c9c5b6779 100644 --- a/p2p/pex/reactor.go +++ b/p2p/pex/reactor.go @@ -1,7 +1,9 @@ package pex import ( + "context" "fmt" + "time" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" @@ -15,7 +17,8 @@ var ( ) const ( - maxAddresses uint16 = 100 + maxAddresses uint16 = 100 + resolveTimeout = 3 * time.Second ) // ReactorV2 is a PEX reactor for the new P2P stack. The legacy reactor @@ -81,25 +84,22 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { // only processing addresses we actually requested. switch msg := envelope.Message.(type) { case *protop2p.PexRequest: - endpoints := r.peerManager.Advertise(envelope.From, maxAddresses) - resp := &protop2p.PexResponse{Addresses: make([]protop2p.PexAddress, 0, len(endpoints))} - for _, endpoint := range endpoints { - // FIXME: This shouldn't rely on NetAddress. - resp.Addresses = append(resp.Addresses, endpoint.NetAddress().ToProto()) + pexAddresses := r.resolve(r.peerManager.Advertise(envelope.From, maxAddresses), maxAddresses) + r.pexCh.Out() <- p2p.Envelope{ + To: envelope.From, + Message: &protop2p.PexResponse{Addresses: pexAddresses}, } - r.pexCh.Out() <- p2p.Envelope{To: envelope.From, Message: resp} case *protop2p.PexResponse: - for _, pbAddr := range msg.Addresses { - // FIXME: This shouldn't rely on NetAddress. - netaddr, err := p2p.NetAddressFromProto(pbAddr) + for _, pexAddress := range msg.Addresses { + peerAddress, err := p2p.ParsePeerAddress( + fmt.Sprintf("%s@%s:%d", pexAddress.ID, pexAddress.IP, pexAddress.Port)) if err != nil { - logger.Debug("received invalid PEX address", "addr", netaddr, "err", err) + logger.Debug("invalid PEX address", "address", pexAddress, "err", err) continue } - if err = r.peerManager.Add(netaddr.Endpoint().PeerAddress()); err != nil { - logger.Debug("received invalid PEX address", "addr", netaddr, "err", err) - continue + if err = r.peerManager.Add(peerAddress); err != nil { + logger.Debug("failed to register PEX address", "address", peerAddress, "err", err) } } @@ -110,6 +110,43 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { return nil } +// resolve resolves a set of peer addresses into PEX addresses. +// +// FIXME: This is necessary because the current PEX protocol only supports +// IP/port pairs, while the P2P stack uses PeerAddress URLs. The PEX protocol +// should really use URLs too, to exchange DNS names instead of IPs and allow +// different transport protocols (e.g. QUIC and MemoryTransport). +// +// FIXME: We may want to cache and parallelize this, but for now we'll just rely +// on the operating system to cache it for us. +func (r *ReactorV2) resolve(addresses []p2p.PeerAddress, limit uint16) []protop2p.PexAddress { + pexAddresses := make([]protop2p.PexAddress, 0, len(addresses)) + for _, address := range addresses { + ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout) + endpoints, err := address.Resolve(ctx) + cancel() + if err != nil { + r.Logger.Debug("failed to resolve address", "address", address, "err", err) + continue + } + for _, endpoint := range endpoints { + if len(pexAddresses) >= int(limit) { + return pexAddresses + + } else if endpoint.IP != nil { + // PEX currently only supports IP-networked transports (as + // opposed to e.g. p2p.MemoryTransport). + pexAddresses = append(pexAddresses, protop2p.PexAddress{ + ID: string(endpoint.PeerID), + IP: endpoint.IP.String(), + Port: uint32(endpoint.Port), + }) + } + } + } + return pexAddresses +} + // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel.