Browse Source

p2p: add tests and fix bugs for `NodeAddress` and `NodeID` (#6021)

This renames `PeerAddress` to `NodeAddress`, moves it and `NodeID` into a separate file `address.go`, adds tests for them, and fixes a bunch of bugs and inconsistencies.
pull/6022/head
Erik Grinaker 3 years ago
committed by GitHub
parent
commit
fc71882f74
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 661 additions and 258 deletions
  1. +227
    -0
      p2p/address.go
  2. +369
    -0
      p2p/address_test.go
  3. +0
    -55
      p2p/key.go
  4. +8
    -7
      p2p/key_test.go
  5. +13
    -167
      p2p/peer.go
  6. +3
    -3
      p2p/pex/reactor.go
  7. +2
    -2
      p2p/router.go
  8. +2
    -2
      p2p/router_test.go
  9. +13
    -5
      p2p/transport.go
  10. +1
    -1
      p2p/transport_mconn.go
  11. +23
    -16
      p2p/transport_test.go

+ 227
- 0
p2p/address.go View File

@ -0,0 +1,227 @@
package p2p
import (
"context"
"encoding/hex"
"errors"
"fmt"
"net"
"net/url"
"regexp"
"strconv"
"strings"
"github.com/tendermint/tendermint/crypto"
)
const (
// NodeIDByteLength is the length of a crypto.Address. Currently only 20.
// FIXME: support other length addresses?
NodeIDByteLength = crypto.AddressSize
)
var (
// reNodeID is a regexp for valid node IDs.
reNodeID = regexp.MustCompile(`^[0-9a-f]{40}$`)
// reHasScheme tries to detect URLs with schemes. It looks for a : before a / (if any).
reHasScheme = regexp.MustCompile(`^[^/]+:`)
// reSchemeIsHost tries to detect URLs where the scheme part is instead a
// hostname, i.e. of the form "host:80/path" where host: is a hostname.
reSchemeIsHost = regexp.MustCompile(`^[^/:]+:\d+(/|$)`)
)
// NodeID is a hex-encoded crypto.Address. It must be lowercased
// (for uniqueness) and of length 2*NodeIDByteLength.
type NodeID string
// NewNodeID returns a lowercased (normalized) NodeID, or errors if the
// node ID is invalid.
func NewNodeID(nodeID string) (NodeID, error) {
n := NodeID(strings.ToLower(nodeID))
return n, n.Validate()
}
// NodeIDFromPubKey creates a node ID from a given PubKey address.
func NodeIDFromPubKey(pubKey crypto.PubKey) NodeID {
return NodeID(hex.EncodeToString(pubKey.Address()))
}
// Bytes converts the node ID to its binary byte representation.
func (id NodeID) Bytes() ([]byte, error) {
bz, err := hex.DecodeString(string(id))
if err != nil {
return nil, fmt.Errorf("invalid node ID encoding: %w", err)
}
return bz, nil
}
// Validate validates the NodeID.
func (id NodeID) Validate() error {
switch {
case len(id) == 0:
return errors.New("empty node ID")
case len(id) != 2*NodeIDByteLength:
return fmt.Errorf("invalid node ID length %d, expected %d", len(id), 2*NodeIDByteLength)
case !reNodeID.MatchString(string(id)):
return fmt.Errorf("node ID can only contain lowercased hex digits")
default:
return nil
}
}
// NodeAddress is a node address URL. It differs from a transport Endpoint in
// that it contains the node's ID, and that the address hostname may be resolved
// into multiple IP addresses (and thus multiple endpoints).
//
// If the URL is opaque, i.e. of the form "scheme:opaque", then the opaque part
// is expected to contain a node ID.
type NodeAddress struct {
NodeID NodeID
Protocol Protocol
Hostname string
Port uint16
Path string
}
// ParseNodeAddress parses a node address URL into a NodeAddress, normalizing
// and validating it.
func ParseNodeAddress(urlString string) (NodeAddress, error) {
// url.Parse requires a scheme, so if it fails to parse a scheme-less URL
// we try to apply a default scheme.
url, err := url.Parse(urlString)
if (err != nil || url.Scheme == "") &&
(!reHasScheme.MatchString(urlString) || reSchemeIsHost.MatchString(urlString)) {
url, err = url.Parse(string(defaultProtocol) + "://" + urlString)
}
if err != nil {
return NodeAddress{}, fmt.Errorf("invalid node address %q: %w", urlString, err)
}
address := NodeAddress{
Protocol: Protocol(strings.ToLower(url.Scheme)),
}
// Opaque URLs are expected to contain only a node ID, also used as path.
if url.Opaque != "" {
address.NodeID = NodeID(url.Opaque)
address.Path = url.Opaque
return address, address.Validate()
}
// Otherwise, just parse a normal networked URL.
if url.User != nil {
address.NodeID = NodeID(strings.ToLower(url.User.Username()))
}
address.Hostname = strings.ToLower(url.Hostname())
if portString := url.Port(); portString != "" {
port64, err := strconv.ParseUint(portString, 10, 16)
if err != nil {
return NodeAddress{}, fmt.Errorf("invalid port %q: %w", portString, err)
}
address.Port = uint16(port64)
}
address.Path = url.Path
if url.RawQuery != "" {
address.Path += "?" + url.RawQuery
}
if url.Fragment != "" {
address.Path += "#" + url.Fragment
}
if address.Path != "" {
switch address.Path[0] {
case '/', '#', '?':
default:
address.Path = "/" + address.Path
}
}
return address, address.Validate()
}
// Resolve resolves a NodeAddress into a set of Endpoints, by expanding
// out a DNS hostname to IP addresses.
func (a NodeAddress) Resolve(ctx context.Context) ([]Endpoint, error) {
if a.Protocol == "" {
return nil, errors.New("address has no protocol")
}
// If there is no hostname, this is an opaque URL in the form
// "scheme:opaque", and the opaque part is assumed to be node ID used as
// Path.
if a.Hostname == "" {
if a.NodeID == "" {
return nil, errors.New("local address has no node ID")
}
return []Endpoint{{
Protocol: a.Protocol,
Path: string(a.NodeID),
}}, nil
}
ips, err := net.DefaultResolver.LookupIP(ctx, "ip", a.Hostname)
if err != nil {
return nil, err
}
endpoints := make([]Endpoint, len(ips))
for i, ip := range ips {
endpoints[i] = Endpoint{
Protocol: a.Protocol,
IP: ip,
Port: a.Port,
Path: a.Path,
}
}
return endpoints, nil
}
// String formats the address as a URL string.
func (a NodeAddress) String() string {
u := url.URL{Scheme: string(a.Protocol)}
if a.NodeID != "" {
u.User = url.User(string(a.NodeID))
}
switch {
case a.Hostname != "":
if a.Port > 0 {
u.Host = net.JoinHostPort(a.Hostname, strconv.Itoa(int(a.Port)))
} else {
u.Host = a.Hostname
}
u.Path = a.Path
case a.Protocol != "" && (a.Path == "" || a.Path == string(a.NodeID)):
u.User = nil
u.Opaque = string(a.NodeID) // e.g. memory:id
case a.Path != "" && a.Path[0] != '/':
u.Path = "/" + a.Path // e.g. some/path
default:
u.Path = a.Path // e.g. /some/path
}
return strings.TrimPrefix(u.String(), "//")
}
// Validate validates a NodeAddress.
func (a NodeAddress) Validate() error {
if a.Protocol == "" {
return errors.New("no protocol")
}
if a.NodeID == "" {
return errors.New("no peer ID")
} else if err := a.NodeID.Validate(); err != nil {
return fmt.Errorf("invalid peer ID: %w", err)
}
if a.Port > 0 && a.Hostname == "" {
return errors.New("cannot specify port without hostname")
}
return nil
}

+ 369
- 0
p2p/address_test.go View File

@ -0,0 +1,369 @@
package p2p_test
import (
"net"
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/p2p"
)
func TestNewNodeID(t *testing.T) {
// Most tests are in TestNodeID_Validate, this just checks that it's validated.
testcases := []struct {
input string
expect p2p.NodeID
ok bool
}{
{"", "", false},
{"foo", "", false},
{"00112233445566778899aabbccddeeff00112233", "00112233445566778899aabbccddeeff00112233", true},
{"00112233445566778899AABBCCDDEEFF00112233", "00112233445566778899aabbccddeeff00112233", true},
{"00112233445566778899aabbccddeeff0011223", "", false},
{"00112233445566778899aabbccddeeff0011223g", "", false},
}
for _, tc := range testcases {
tc := tc
t.Run(tc.input, func(t *testing.T) {
id, err := p2p.NewNodeID(tc.input)
if !tc.ok {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, id, tc.expect)
}
})
}
}
func TestNewNodeIDFromPubKey(t *testing.T) {
privKey := ed25519.GenPrivKeyFromSecret([]byte("foo"))
nodeID := p2p.NodeIDFromPubKey(privKey.PubKey())
require.Equal(t, p2p.NodeID("045f5600654182cfeaccfe6cb19f0642e8a59898"), nodeID)
}
func TestNodeID_Bytes(t *testing.T) {
testcases := []struct {
nodeID p2p.NodeID
expect []byte
ok bool
}{
{"", []byte{}, true},
{"01f0", []byte{0x01, 0xf0}, true},
{"01F0", []byte{0x01, 0xf0}, true},
{"01F", nil, false},
{"01g0", nil, false},
}
for _, tc := range testcases {
tc := tc
t.Run(string(tc.nodeID), func(t *testing.T) {
bz, err := tc.nodeID.Bytes()
if tc.ok {
require.NoError(t, err)
require.Equal(t, tc.expect, bz)
} else {
require.Error(t, err)
}
})
}
}
func TestNodeID_Validate(t *testing.T) {
testcases := []struct {
nodeID p2p.NodeID
ok bool
}{
{"", false},
{"00", false},
{"00112233445566778899aabbccddeeff00112233", true},
{"00112233445566778899aabbccddeeff001122334", false},
{"00112233445566778899aabbccddeeffgg001122", false},
{"00112233445566778899AABBCCDDEEFF00112233", false},
}
for _, tc := range testcases {
tc := tc
t.Run(string(tc.nodeID), func(t *testing.T) {
err := tc.nodeID.Validate()
if tc.ok {
require.NoError(t, err)
} else {
require.Error(t, err)
}
})
}
}
func TestParseNodeAddress(t *testing.T) {
user := "00112233445566778899aabbccddeeff00112233"
id := p2p.NodeID(user)
testcases := []struct {
url string
expect p2p.NodeAddress
ok bool
}{
// Valid addresses.
{
"mconn://" + user + "@127.0.0.1:26657/some/path?foo=bar",
p2p.NodeAddress{Protocol: "mconn", NodeID: id, Hostname: "127.0.0.1", Port: 26657, Path: "/some/path?foo=bar"},
true,
},
{
"TCP://" + strings.ToUpper(user) + "@hostname.DOMAIN:8080/Path/%f0%9f%91%8B#Anchor",
p2p.NodeAddress{Protocol: "tcp", NodeID: id, Hostname: "hostname.domain", Port: 8080, Path: "/Path/👋#Anchor"},
true,
},
{
user + "@127.0.0.1",
p2p.NodeAddress{Protocol: "mconn", NodeID: id, Hostname: "127.0.0.1"},
true,
},
{
user + "@hostname.domain",
p2p.NodeAddress{Protocol: "mconn", NodeID: id, Hostname: "hostname.domain"},
true,
},
{
user + "@hostname.domain:80",
p2p.NodeAddress{Protocol: "mconn", NodeID: id, Hostname: "hostname.domain", Port: 80},
true,
},
{
user + "@%F0%9F%91%8B",
p2p.NodeAddress{Protocol: "mconn", NodeID: id, Hostname: "👋"},
true,
},
{
user + "@%F0%9F%91%8B:80/path",
p2p.NodeAddress{Protocol: "mconn", NodeID: id, Hostname: "👋", Port: 80, Path: "/path"},
true,
},
{
user + "@127.0.0.1:26657",
p2p.NodeAddress{Protocol: "mconn", NodeID: id, Hostname: "127.0.0.1", Port: 26657},
true,
},
{
user + "@127.0.0.1:26657/path",
p2p.NodeAddress{Protocol: "mconn", NodeID: id, Hostname: "127.0.0.1", Port: 26657, Path: "/path"},
true,
},
{
user + "@0.0.0.0:0",
p2p.NodeAddress{Protocol: "mconn", NodeID: id, Hostname: "0.0.0.0", Port: 0},
true,
},
{
"memory:" + user,
p2p.NodeAddress{Protocol: "memory", NodeID: id, Path: user},
true,
},
// Invalid addresses.
{"", p2p.NodeAddress{}, false},
{"127.0.0.1", p2p.NodeAddress{}, false},
{"hostname", p2p.NodeAddress{}, false},
{"scheme:", p2p.NodeAddress{}, false},
{"memory:foo", p2p.NodeAddress{}, false},
{user + "@%F%F0", p2p.NodeAddress{}, false},
{"//" + user + "@127.0.0.1", p2p.NodeAddress{}, false},
{"://" + user + "@127.0.0.1", p2p.NodeAddress{}, false},
{"mconn://foo@127.0.0.1", p2p.NodeAddress{}, false},
{"mconn://" + user + "@127.0.0.1:65536", p2p.NodeAddress{}, false},
{"mconn://" + user + "@:80", p2p.NodeAddress{}, false},
}
for _, tc := range testcases {
tc := tc
t.Run(tc.url, func(t *testing.T) {
address, err := p2p.ParseNodeAddress(tc.url)
if !tc.ok {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tc.expect, address)
}
})
}
}
func TestNodeAddress_Resolve(t *testing.T) {
id := p2p.NodeID("00112233445566778899aabbccddeeff00112233")
testcases := []struct {
address p2p.NodeAddress
expect p2p.Endpoint
ok bool
}{
// Valid networked addresses (with hostname).
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "127.0.0.1", Port: 80, Path: "/path"},
p2p.Endpoint{Protocol: "tcp", IP: net.IPv4(127, 0, 0, 1), Port: 80, Path: "/path"},
true,
},
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "localhost", Port: 80, Path: "/path"},
p2p.Endpoint{Protocol: "tcp", IP: net.IPv4(127, 0, 0, 1), Port: 80, Path: "/path"},
true,
},
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "localhost", Port: 80, Path: "/path"},
p2p.Endpoint{Protocol: "tcp", IP: net.IPv6loopback, Port: 80, Path: "/path"},
true,
},
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "127.0.0.1"},
p2p.Endpoint{Protocol: "tcp", IP: net.IPv4(127, 0, 0, 1)},
true,
},
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "::1"},
p2p.Endpoint{Protocol: "tcp", IP: net.IPv6loopback},
true,
},
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "8.8.8.8"},
p2p.Endpoint{Protocol: "tcp", IP: net.IPv4(8, 8, 8, 8)},
true,
},
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "2001:0db8::ff00:0042:8329"},
p2p.Endpoint{Protocol: "tcp", IP: []byte{
0x20, 0x01, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x00, 0x00, 0x42, 0x83, 0x29}},
true,
},
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "some.missing.host.tendermint.com"},
p2p.Endpoint{},
false,
},
// Valid non-networked addresses.
{
p2p.NodeAddress{Protocol: "memory", NodeID: id},
p2p.Endpoint{Protocol: "memory", Path: string(id)},
true,
},
{
p2p.NodeAddress{Protocol: "memory", NodeID: id, Path: string(id)},
p2p.Endpoint{Protocol: "memory", Path: string(id)},
true,
},
// Invalid addresses.
{p2p.NodeAddress{}, p2p.Endpoint{}, false},
{p2p.NodeAddress{Hostname: "127.0.0.1"}, p2p.Endpoint{}, false},
{p2p.NodeAddress{Protocol: "tcp", Hostname: "127.0.0.1:80"}, p2p.Endpoint{}, false},
{p2p.NodeAddress{Protocol: "memory"}, p2p.Endpoint{}, false},
{p2p.NodeAddress{Protocol: "memory", Path: string(id)}, p2p.Endpoint{}, false},
{p2p.NodeAddress{Protocol: "tcp", Hostname: "💥"}, p2p.Endpoint{}, false},
}
for _, tc := range testcases {
tc := tc
t.Run(tc.address.String(), func(t *testing.T) {
endpoints, err := tc.address.Resolve(ctx)
if !tc.ok {
require.Error(t, err)
return
}
require.Contains(t, endpoints, tc.expect)
})
}
}
func TestNodeAddress_String(t *testing.T) {
id := p2p.NodeID("00112233445566778899aabbccddeeff00112233")
user := string(id)
testcases := []struct {
address p2p.NodeAddress
expect string
}{
// Valid networked addresses (with hostname).
{
p2p.NodeAddress{Protocol: "tcp", NodeID: id, Hostname: "host", Port: 80, Path: "/path/sub?foo=bar&x=y#anchor"},
"tcp://" + user + "@host:80/path/sub%3Ffoo=bar&x=y%23anchor",
},
{
p2p.NodeAddress{Protocol: "tcp", NodeID: id, Hostname: "host.domain"},
"tcp://" + user + "@host.domain",
},
{
p2p.NodeAddress{NodeID: id, Hostname: "host", Port: 80, Path: "foo/bar"},
user + "@host:80/foo/bar",
},
// Valid non-networked addresses (without hostname).
{
p2p.NodeAddress{Protocol: "memory", NodeID: id, Path: string(id)},
"memory:" + user,
},
{
p2p.NodeAddress{Protocol: "memory", NodeID: id},
"memory:" + user,
},
// Addresses with weird contents, which are technically fine (not harmful).
{
p2p.NodeAddress{Protocol: "💬", NodeID: "👨", Hostname: "💻", Port: 80, Path: "🛣"},
"💬://%F0%9F%91%A8@%F0%9F%92%BB:80/%F0%9F%9B%A3",
},
// Partial (invalid) addresses.
{p2p.NodeAddress{}, ""},
{p2p.NodeAddress{NodeID: id}, user + "@"},
{p2p.NodeAddress{Protocol: "tcp"}, "tcp:"},
{p2p.NodeAddress{Hostname: "host"}, "host"},
{p2p.NodeAddress{Port: 80}, ""},
{p2p.NodeAddress{Path: "path"}, "/path"},
{p2p.NodeAddress{NodeID: id, Port: 80}, user + "@"},
{p2p.NodeAddress{Protocol: "tcp", Hostname: "host"}, "tcp://host"},
{
p2p.NodeAddress{Protocol: "memory", NodeID: id, Path: "path"},
"memory://00112233445566778899aabbccddeeff00112233@/path",
},
{
p2p.NodeAddress{Protocol: "memory", NodeID: id, Port: 80},
"memory:00112233445566778899aabbccddeeff00112233",
},
}
for _, tc := range testcases {
tc := tc
t.Run(tc.address.String(), func(t *testing.T) {
require.Equal(t, tc.expect, tc.address.String())
})
}
}
func TestNodeAddress_Validate(t *testing.T) {
id := p2p.NodeID("00112233445566778899aabbccddeeff00112233")
testcases := []struct {
address p2p.NodeAddress
ok bool
}{
// Valid addresses.
{p2p.NodeAddress{Protocol: "mconn", NodeID: id, Hostname: "host", Port: 80, Path: "/path"}, true},
{p2p.NodeAddress{Protocol: "mconn", NodeID: id, Hostname: "host"}, true},
{p2p.NodeAddress{Protocol: "mconn", NodeID: id, Path: "path"}, true},
{p2p.NodeAddress{Protocol: "mconn", NodeID: id, Hostname: "👋", Path: "👋"}, true},
// Invalid addresses.
{p2p.NodeAddress{}, false},
{p2p.NodeAddress{NodeID: "foo", Hostname: "host"}, false},
{p2p.NodeAddress{Protocol: "mconn", NodeID: id}, true},
{p2p.NodeAddress{Protocol: "mconn", NodeID: "foo", Hostname: "host"}, false},
{p2p.NodeAddress{Protocol: "mconn", NodeID: id, Port: 80, Path: "path"}, false},
}
for _, tc := range testcases {
tc := tc
t.Run(tc.address.String(), func(t *testing.T) {
err := tc.address.Validate()
if tc.ok {
require.NoError(t, err)
} else {
require.Error(t, err)
}
})
}
}

+ 0
- 55
p2p/key.go View File

@ -1,11 +1,7 @@
package p2p
import (
"encoding/hex"
"errors"
"fmt"
"io/ioutil"
"strings"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
@ -13,57 +9,6 @@ import (
tmos "github.com/tendermint/tendermint/libs/os"
)
// NodeIDByteLength is the length of a crypto.Address. Currently only 20.
// FIXME: support other length addresses?
const NodeIDByteLength = crypto.AddressSize
// NodeID is a hex-encoded crypto.Address.
type NodeID string
// NewNodeID returns a lowercased (normalized) NodeID.
func NewNodeID(nodeID string) (NodeID, error) {
n := NodeID(strings.ToLower(nodeID))
return n, n.Validate()
}
// NodeIDFromPubKey returns the noe ID corresponding to the given PubKey. It's
// the hex-encoding of the pubKey.Address().
func NodeIDFromPubKey(pubKey crypto.PubKey) NodeID {
return NodeID(hex.EncodeToString(pubKey.Address()))
}
// Bytes converts the node ID to it's binary byte representation.
func (id NodeID) Bytes() ([]byte, error) {
bz, err := hex.DecodeString(string(id))
if err != nil {
return nil, fmt.Errorf("invalid node ID encoding: %w", err)
}
return bz, nil
}
// Validate validates the NodeID.
func (id NodeID) Validate() error {
if len(id) == 0 {
return errors.New("empty node ID")
}
bz, err := id.Bytes()
if err != nil {
return err
}
if len(bz) != NodeIDByteLength {
return fmt.Errorf("invalid node ID length; got %d, expected %d", len(bz), NodeIDByteLength)
}
idStr := string(id)
if strings.ToLower(idStr) != idStr {
return fmt.Errorf("invalid node ID; must be lowercased")
}
return nil
}
//------------------------------------------------------------------------------
// Persistent peer ID
// TODO: encrypt on disk


+ 8
- 7
p2p/key_test.go View File

@ -1,4 +1,4 @@
package p2p
package p2p_test
import (
"os"
@ -8,15 +8,16 @@ import (
"github.com/stretchr/testify/require"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
)
func TestLoadOrGenNodeKey(t *testing.T) {
filePath := filepath.Join(os.TempDir(), tmrand.Str(12)+"_peer_id.json")
nodeKey, err := LoadOrGenNodeKey(filePath)
nodeKey, err := p2p.LoadOrGenNodeKey(filePath)
require.Nil(t, err)
nodeKey2, err := LoadOrGenNodeKey(filePath)
nodeKey2, err := p2p.LoadOrGenNodeKey(filePath)
require.Nil(t, err)
require.Equal(t, nodeKey, nodeKey2)
}
@ -24,13 +25,13 @@ func TestLoadOrGenNodeKey(t *testing.T) {
func TestLoadNodeKey(t *testing.T) {
filePath := filepath.Join(os.TempDir(), tmrand.Str(12)+"_peer_id.json")
_, err := LoadNodeKey(filePath)
_, err := p2p.LoadNodeKey(filePath)
require.True(t, os.IsNotExist(err))
_, err = LoadOrGenNodeKey(filePath)
_, err = p2p.LoadOrGenNodeKey(filePath)
require.NoError(t, err)
nodeKey, err := LoadNodeKey(filePath)
nodeKey, err := p2p.LoadNodeKey(filePath)
require.NoError(t, err)
require.NotNil(t, nodeKey)
}
@ -39,7 +40,7 @@ func TestNodeKeySaveAs(t *testing.T) {
filePath := filepath.Join(os.TempDir(), tmrand.Str(12)+"_peer_id.json")
require.NoFileExists(t, filePath)
nodeKey := GenNodeKey()
nodeKey := p2p.GenNodeKey()
require.NoError(t, nodeKey.SaveAs(filePath))
require.FileExists(t, filePath)
}

+ 13
- 167
p2p/peer.go View File

@ -8,11 +8,8 @@ import (
"math"
"math/rand"
"net"
"net/url"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -27,157 +24,6 @@ import (
p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
// PeerAddress is a peer address URL. It differs from Endpoint in that the
// address hostname may be expanded into multiple IP addresses (thus multiple
// endpoints), and that it knows the node's ID.
//
// If the URL is opaque, i.e. of the form "scheme:<opaque>", then the opaque
// part has to contain either the node ID or a node ID and path in the form
// "scheme:<nodeid>@<path>".
type PeerAddress struct {
NodeID NodeID
Protocol Protocol
Hostname string
Port uint16
Path string
}
// ParsePeerAddress parses a peer address URL into a PeerAddress,
// normalizing and validating it.
func ParsePeerAddress(urlString string) (PeerAddress, error) {
url, err := url.Parse(urlString)
if err != nil || url == nil {
return PeerAddress{}, fmt.Errorf("invalid peer address %q: %w", urlString, err)
}
address := PeerAddress{}
// If the URL is opaque, i.e. in the form "scheme:<opaque>", we specify the
// opaque bit to be either a node ID or a node ID and path in the form
// "scheme:<nodeid>@<path>".
if url.Opaque != "" {
parts := strings.Split(url.Opaque, "@")
if len(parts) > 2 {
return PeerAddress{}, fmt.Errorf("invalid address format %q, unexpected @", urlString)
}
address.NodeID, err = NewNodeID(parts[0])
if err != nil {
return PeerAddress{}, fmt.Errorf("invalid peer ID %q: %w", parts[0], err)
}
if len(parts) == 2 {
address.Path = parts[1]
}
return address, nil
}
// Otherwise, just parse a normal networked URL.
address.NodeID, err = NewNodeID(url.User.Username())
if err != nil {
return PeerAddress{}, fmt.Errorf("invalid peer ID %q: %w", url.User.Username(), err)
}
if url.Scheme != "" {
address.Protocol = Protocol(strings.ToLower(url.Scheme))
} else {
address.Protocol = defaultProtocol
}
address.Hostname = strings.ToLower(url.Hostname())
if portString := url.Port(); portString != "" {
port64, err := strconv.ParseUint(portString, 10, 16)
if err != nil {
return PeerAddress{}, fmt.Errorf("invalid port %q: %w", portString, err)
}
address.Port = uint16(port64)
}
// NOTE: URL paths are case-sensitive, so we don't lowercase them.
address.Path = url.Path
if url.RawPath != "" {
address.Path = url.RawPath
}
if url.RawQuery != "" {
address.Path += "?" + url.RawQuery
}
if url.RawFragment != "" {
address.Path += "#" + url.RawFragment
}
if address.Path != "" && address.Path[0] != '/' && address.Path[0] != '#' {
address.Path = "/" + address.Path
}
return address, address.Validate()
}
// Resolve resolves a PeerAddress into a set of Endpoints, by expanding
// out a DNS hostname to IP addresses.
func (a PeerAddress) Resolve(ctx context.Context) ([]Endpoint, error) {
// If there is no hostname, this is an opaque URL in the form
// "scheme:<opaque>".
if a.Hostname == "" {
return []Endpoint{{
Protocol: a.Protocol,
Path: a.Path,
}}, nil
}
ips, err := net.DefaultResolver.LookupIP(ctx, "ip", a.Hostname)
if err != nil {
return nil, err
}
endpoints := make([]Endpoint, len(ips))
for i, ip := range ips {
endpoints[i] = Endpoint{
Protocol: a.Protocol,
IP: ip,
Port: a.Port,
Path: a.Path,
}
}
return endpoints, nil
}
// Validates validates a PeerAddress.
func (a PeerAddress) Validate() error {
if a.Protocol == "" {
return errors.New("no protocol")
}
if a.NodeID == "" {
return errors.New("no peer ID")
} else if err := a.NodeID.Validate(); err != nil {
return fmt.Errorf("invalid peer ID: %w", err)
}
if a.Port > 0 && a.Hostname == "" {
return errors.New("cannot specify port without hostname")
}
return nil
}
// String formats the address as a URL string.
func (a PeerAddress) String() string {
u := url.URL{Scheme: string(a.Protocol)}
if a.NodeID != "" {
u.User = url.User(string(a.NodeID))
}
switch {
case a.Hostname != "":
if a.Port > 0 {
u.Host = net.JoinHostPort(a.Hostname, strconv.Itoa(int(a.Port)))
} else {
u.Host = a.Hostname
}
u.Path = a.Path
case a.Protocol != "":
u.Opaque = a.Path // e.g. memory:foo
case a.Path != "" && a.Path[0] != '/':
u.Path = "/" + a.Path // e.g. some/path
default:
u.Path = a.Path // e.g. /some/path
}
return strings.TrimPrefix(u.String(), "//")
}
// PeerStatus specifies peer statuses.
type PeerStatus string
@ -477,7 +323,7 @@ func (m *PeerManager) Close() {
// Add adds a peer to the manager, given as an address. If the peer already
// exists, the address is added to it.
func (m *PeerManager) Add(address PeerAddress) error {
func (m *PeerManager) Add(address NodeAddress) error {
if err := address.Validate(); err != nil {
return err
}
@ -505,11 +351,11 @@ func (m *PeerManager) Add(address PeerAddress) error {
//
// FIXME: This is fairly naïve and only returns the addresses of the
// highest-ranked peers.
func (m *PeerManager) Advertise(peerID NodeID, limit uint16) []PeerAddress {
func (m *PeerManager) Advertise(peerID NodeID, limit uint16) []NodeAddress {
m.mtx.Lock()
defer m.mtx.Unlock()
addresses := make([]PeerAddress, 0, limit)
addresses := make([]NodeAddress, 0, limit)
for _, peer := range m.store.Ranked() {
if peer.ID == peerID {
continue
@ -586,7 +432,7 @@ func (m *PeerManager) broadcast(peerUpdate PeerUpdate) {
// If no peer is found, or all connection slots are full, it blocks until one
// becomes available. The caller must call Dialed() or DialFailed() for the
// returned peer. The context can be used to cancel the call.
func (m *PeerManager) DialNext(ctx context.Context) (NodeID, PeerAddress, error) {
func (m *PeerManager) DialNext(ctx context.Context) (NodeID, NodeAddress, error) {
for {
id, address, err := m.TryDialNext()
if err != nil || id != "" {
@ -595,14 +441,14 @@ func (m *PeerManager) DialNext(ctx context.Context) (NodeID, PeerAddress, error)
select {
case <-m.wakeDialCh:
case <-ctx.Done():
return "", PeerAddress{}, ctx.Err()
return "", NodeAddress{}, ctx.Err()
}
}
}
// TryDialNext is equivalent to DialNext(), but immediately returns an empty
// peer ID if no peers or connection slots are available.
func (m *PeerManager) TryDialNext() (NodeID, PeerAddress, error) {
func (m *PeerManager) TryDialNext() (NodeID, NodeAddress, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -611,7 +457,7 @@ func (m *PeerManager) TryDialNext() (NodeID, PeerAddress, error) {
// higher score than any other peers, and if successful evict it.
if m.options.MaxConnected > 0 &&
len(m.connected)+len(m.dialing) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) {
return "", PeerAddress{}, nil
return "", NodeAddress{}, nil
}
for _, peer := range m.store.Ranked() {
@ -634,7 +480,7 @@ func (m *PeerManager) TryDialNext() (NodeID, PeerAddress, error) {
if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) {
upgradeFromPeer := m.findUpgradeCandidate(peer.ID, peer.Score())
if upgradeFromPeer == "" {
return "", PeerAddress{}, nil
return "", NodeAddress{}, nil
}
m.upgrading[upgradeFromPeer] = peer.ID
}
@ -643,7 +489,7 @@ func (m *PeerManager) TryDialNext() (NodeID, PeerAddress, error) {
return peer.ID, addressInfo.Address, nil
}
}
return "", PeerAddress{}, nil
return "", NodeAddress{}, nil
}
// wakeDial is used to notify DialNext about changes that *may* cause new
@ -697,7 +543,7 @@ func (m *PeerManager) retryDelay(failures uint32, persistent bool) time.Duration
// for dialing again when appropriate.
//
// FIXME: This should probably delete or mark bad addresses/peers after some time.
func (m *PeerManager) DialFailed(peerID NodeID, address PeerAddress) error {
func (m *PeerManager) DialFailed(peerID NodeID, address NodeAddress) error {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -746,7 +592,7 @@ func (m *PeerManager) DialFailed(peerID NodeID, address PeerAddress) error {
// Dialed marks a peer as successfully dialed. Any further incoming connections
// will be rejected, and once disconnected the peer may be dialed again.
func (m *PeerManager) Dialed(peerID NodeID, address PeerAddress) error {
func (m *PeerManager) Dialed(peerID NodeID, address NodeAddress) error {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -1237,7 +1083,7 @@ func (p *peerInfo) Validate() error {
// peerAddressInfo contains information and statistics about a peer address.
type peerAddressInfo struct {
Address PeerAddress
Address NodeAddress
LastDialSuccess time.Time
LastDialFailure time.Time
DialFailures uint32 // since last successful dial
@ -1246,7 +1092,7 @@ type peerAddressInfo struct {
// peerAddressInfoFromProto converts a Protobuf PeerAddressInfo message
// to a peerAddressInfo.
func peerAddressInfoFromProto(msg *p2pproto.PeerAddressInfo) (*peerAddressInfo, error) {
address, err := ParsePeerAddress(msg.Address)
address, err := ParseNodeAddress(msg.Address)
if err != nil {
return nil, fmt.Errorf("invalid address %q: %w", address, err)
}


+ 3
- 3
p2p/pex/reactor.go View File

@ -92,7 +92,7 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error {
case *protop2p.PexResponse:
for _, pexAddress := range msg.Addresses {
peerAddress, err := p2p.ParsePeerAddress(
peerAddress, err := p2p.ParseNodeAddress(
fmt.Sprintf("%s@%s:%d", pexAddress.ID, pexAddress.IP, pexAddress.Port))
if err != nil {
logger.Debug("invalid PEX address", "address", pexAddress, "err", err)
@ -113,13 +113,13 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error {
// resolve resolves a set of peer addresses into PEX addresses.
//
// FIXME: This is necessary because the current PEX protocol only supports
// IP/port pairs, while the P2P stack uses PeerAddress URLs. The PEX protocol
// IP/port pairs, while the P2P stack uses NodeAddress URLs. The PEX protocol
// should really use URLs too, to exchange DNS names instead of IPs and allow
// different transport protocols (e.g. QUIC and MemoryTransport).
//
// FIXME: We may want to cache and parallelize this, but for now we'll just rely
// on the operating system to cache it for us.
func (r *ReactorV2) resolve(addresses []p2p.PeerAddress, limit uint16) []protop2p.PexAddress {
func (r *ReactorV2) resolve(addresses []p2p.NodeAddress, limit uint16) []protop2p.PexAddress {
pexAddresses := make([]protop2p.PexAddress, 0, len(addresses))
for _, address := range addresses {
ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout)


+ 2
- 2
p2p/router.go View File

@ -16,7 +16,7 @@ import (
// RouterOptions specifies options for a Router.
type RouterOptions struct {
// ResolveTimeout is the timeout for resolving a PeerAddress URLs.
// ResolveTimeout is the timeout for resolving NodeAddress URLs.
// 0 means no timeout.
ResolveTimeout time.Duration
@ -421,7 +421,7 @@ func (r *Router) dialPeers() {
}
// dialPeer connects to a peer by dialing it.
func (r *Router) dialPeer(ctx context.Context, address PeerAddress) (Connection, error) {
func (r *Router) dialPeer(ctx context.Context, address NodeAddress) (Connection, error) {
r.logger.Info("resolving peer address", "address", address)
resolveCtx := ctx
if r.options.ResolveTimeout > 0 {


+ 2
- 2
p2p/router_test.go View File

@ -56,7 +56,7 @@ func TestRouter(t *testing.T) {
// Start some other in-memory network nodes to communicate with, running
// a simple echo reactor that returns received messages.
peers := []p2p.PeerAddress{}
peers := []p2p.NodeAddress{}
for i := 0; i < 3; i++ {
peerManager, err := p2p.NewPeerManager(dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
@ -72,7 +72,7 @@ func TestRouter(t *testing.T) {
p2p.RouterOptions{},
)
require.NoError(t, err)
peers = append(peers, peerTransport.Endpoints()[0].PeerAddress(peerInfo.NodeID))
peers = append(peers, peerTransport.Endpoints()[0].NodeAddress(peerInfo.NodeID))
channel, err := peerRouter.OpenChannel(chID, &TestMessage{})
require.NoError(t, err)


+ 13
- 5
p2p/transport.go View File

@ -11,7 +11,7 @@ import (
)
const (
// defaultProtocol is the default protocol used for PeerAddress when
// defaultProtocol is the default protocol used for NodeAddress when
// a protocol isn't explicitly given as a URL scheme.
defaultProtocol Protocol = MConnProtocol
)
@ -143,9 +143,9 @@ type Endpoint struct {
Path string
}
// PeerAddress converts the endpoint into a PeerAddress for the given node ID.
func (e Endpoint) PeerAddress(nodeID NodeID) PeerAddress {
address := PeerAddress{
// NodeAddress converts the endpoint into a NodeAddress for the given node ID.
func (e Endpoint) NodeAddress(nodeID NodeID) NodeAddress {
address := NodeAddress{
NodeID: nodeID,
Protocol: e.Protocol,
Path: e.Path,
@ -159,7 +159,15 @@ func (e Endpoint) PeerAddress(nodeID NodeID) PeerAddress {
// String formats the endpoint as a URL string.
func (e Endpoint) String() string {
return e.PeerAddress("").String()
// If this is a non-networked endpoint with a valid node ID as a path,
// assume that path is a node ID (to handle opaque URLs of the form
// scheme:id).
if e.IP == nil {
if nodeID, err := NewNodeID(e.Path); err == nil {
return e.NodeAddress(nodeID).String()
}
}
return e.NodeAddress("").String()
}
// Validate validates the endpoint.


+ 1
- 1
p2p/transport_mconn.go View File

@ -330,7 +330,7 @@ func (c *mConnConnection) handshake(
c.onError,
c.mConnConfig,
)
mconn.SetLogger(c.logger.With("peer", c.RemoteEndpoint().PeerAddress(peerInfo.NodeID)))
mconn.SetLogger(c.logger.With("peer", c.RemoteEndpoint().NodeAddress(peerInfo.NodeID)))
return mconn, peerInfo, secretConn.RemotePubKey(), nil
}


+ 23
- 16
p2p/transport_test.go View File

@ -435,52 +435,57 @@ func TestConnection_String(t *testing.T) {
})
}
func TestEndpoint_PeerAddress(t *testing.T) {
func TestEndpoint_NodeAddress(t *testing.T) {
var (
ip4 = []byte{1, 2, 3, 4}
ip4in6 = net.IPv4(1, 2, 3, 4)
ip6 = []byte{0xb1, 0x0c, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01}
id = p2p.NodeID("00112233445566778899aabbccddeeff00112233")
)
testcases := []struct {
endpoint p2p.Endpoint
expect p2p.PeerAddress
expect p2p.NodeAddress
}{
// Valid endpoints.
{
p2p.Endpoint{Protocol: "tcp", IP: ip4, Port: 8080, Path: "path"},
p2p.PeerAddress{Protocol: "tcp", Hostname: "1.2.3.4", Port: 8080, Path: "path"},
p2p.NodeAddress{Protocol: "tcp", Hostname: "1.2.3.4", Port: 8080, Path: "path"},
},
{
p2p.Endpoint{Protocol: "tcp", IP: ip4in6, Port: 8080, Path: "path"},
p2p.PeerAddress{Protocol: "tcp", Hostname: "1.2.3.4", Port: 8080, Path: "path"},
p2p.NodeAddress{Protocol: "tcp", Hostname: "1.2.3.4", Port: 8080, Path: "path"},
},
{
p2p.Endpoint{Protocol: "tcp", IP: ip6, Port: 8080, Path: "path"},
p2p.PeerAddress{Protocol: "tcp", Hostname: "b10c::1", Port: 8080, Path: "path"},
p2p.NodeAddress{Protocol: "tcp", Hostname: "b10c::1", Port: 8080, Path: "path"},
},
{
p2p.Endpoint{Protocol: "memory", Path: "foo"},
p2p.PeerAddress{Protocol: "memory", Path: "foo"},
p2p.NodeAddress{Protocol: "memory", Path: "foo"},
},
{
p2p.Endpoint{Protocol: "memory", Path: string(id)},
p2p.NodeAddress{Protocol: "memory", Path: string(id)},
},
// Partial (invalid) endpoints.
{p2p.Endpoint{}, p2p.PeerAddress{}},
{p2p.Endpoint{Protocol: "tcp"}, p2p.PeerAddress{Protocol: "tcp"}},
{p2p.Endpoint{IP: net.IPv4(1, 2, 3, 4)}, p2p.PeerAddress{Hostname: "1.2.3.4"}},
{p2p.Endpoint{Port: 8080}, p2p.PeerAddress{}},
{p2p.Endpoint{Path: "path"}, p2p.PeerAddress{Path: "path"}},
{p2p.Endpoint{}, p2p.NodeAddress{}},
{p2p.Endpoint{Protocol: "tcp"}, p2p.NodeAddress{Protocol: "tcp"}},
{p2p.Endpoint{IP: net.IPv4(1, 2, 3, 4)}, p2p.NodeAddress{Hostname: "1.2.3.4"}},
{p2p.Endpoint{Port: 8080}, p2p.NodeAddress{}},
{p2p.Endpoint{Path: "path"}, p2p.NodeAddress{Path: "path"}},
}
for _, tc := range testcases {
tc := tc
t.Run(tc.endpoint.String(), func(t *testing.T) {
// Without NodeID.
expect := tc.expect
require.Equal(t, expect, tc.endpoint.PeerAddress(""))
require.Equal(t, expect, tc.endpoint.NodeAddress(""))
// With NodeID.
expect.NodeID = p2p.NodeID("b10c")
require.Equal(t, expect, tc.endpoint.PeerAddress(expect.NodeID))
expect.NodeID = id
require.Equal(t, expect, tc.endpoint.NodeAddress(expect.NodeID))
})
}
}
@ -490,6 +495,7 @@ func TestEndpoint_String(t *testing.T) {
ip4 = []byte{1, 2, 3, 4}
ip4in6 = net.IPv4(1, 2, 3, 4)
ip6 = []byte{0xb1, 0x0c, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01}
nodeID = p2p.NodeID("00112233445566778899aabbccddeeff00112233")
)
testcases := []struct {
@ -497,8 +503,9 @@ func TestEndpoint_String(t *testing.T) {
expect string
}{
// Non-networked endpoints.
{p2p.Endpoint{Protocol: "memory", Path: "foo"}, "memory:foo"},
{p2p.Endpoint{Protocol: "memory", Path: "👋"}, "memory:👋"},
{p2p.Endpoint{Protocol: "memory", Path: string(nodeID)}, "memory:" + string(nodeID)},
{p2p.Endpoint{Protocol: "file", Path: "foo"}, "file:///foo"},
{p2p.Endpoint{Protocol: "file", Path: "👋"}, "file:///%F0%9F%91%8B"},
// IPv4 endpoints.
{p2p.Endpoint{Protocol: "tcp", IP: ip4}, "tcp://1.2.3.4"},


Loading…
Cancel
Save