Browse Source

p2p: improve PEX reactor (#6305)

pull/6400/head
Callum Waters 4 years ago
committed by GitHub
parent
commit
9efc20c963
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 2109 additions and 205 deletions
  1. +3
    -0
      CHANGELOG_PENDING.md
  2. +4
    -1
      blockchain/v0/reactor_test.go
  3. +1
    -1
      node/node.go
  4. +16
    -4
      p2p/p2ptest/network.go
  5. +25
    -9
      p2p/peermanager.go
  6. +3
    -1
      p2p/peermanager_scoring_test.go
  7. +273
    -94
      p2p/peermanager_test.go
  8. +342
    -53
      p2p/pex/reactor.go
  9. +722
    -0
      p2p/pex/reactor_test.go
  10. +14
    -4
      p2p/router_test.go
  11. +9
    -0
      p2p/transport_memory.go
  12. +2
    -2
      p2p/transport_test.go
  13. +10
    -2
      proto/tendermint/p2p/pex.go
  14. +670
    -32
      proto/tendermint/p2p/pex.pb.go
  15. +15
    -2
      proto/tendermint/p2p/pex.proto

+ 3
- 0
CHANGELOG_PENDING.md View File

@ -57,6 +57,9 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- [config] Add `--mode` flag and config variable. See [ADR-52](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-052-tendermint-mode.md) @dongsam
- [rpc] \#6329 Don't cap page size in unsafe mode (@gotjoshua, @cmwaters)
- [pex] \#6305 v2 pex reactor with backwards compatability. Introduces two new pex messages to
accomodate for the new p2p stack. Removes the notion of seeds and crawling. All peer
exchange reactors behave the same. (@cmwaters)
- [crypto] \#6376 Enable sr25519 as a validator key
### IMPROVEMENTS


+ 4
- 1
blockchain/v0/reactor_test.go View File

@ -300,7 +300,10 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) {
// XXX: This causes a potential race condition.
// See: https://github.com/tendermint/tendermint/issues/6005
otherGenDoc, otherPrivVals := randGenesisDoc(config, 1, false, 30)
newNode := rts.network.MakeNode(t)
newNode := rts.network.MakeNode(t, p2ptest.NodeOptions{
MaxPeers: uint16(len(rts.nodes) + 1),
MaxConnected: uint16(len(rts.nodes) + 1),
})
rts.addNode(t, newNode.NodeID, otherGenDoc, otherPrivVals[0], maxBlockHeight)
// add a fake peer just so we do not wait for the consensus ticker to timeout


+ 1
- 1
node/node.go View File

@ -646,7 +646,7 @@ func createPeerManager(
}
for _, peer := range peers {
if err := peerManager.Add(peer); err != nil {
if _, err := peerManager.Add(peer); err != nil {
return nil, fmt.Errorf("failed to add peer %q: %w", peer, err)
}
}


+ 16
- 4
p2p/p2ptest/network.go View File

@ -30,6 +30,12 @@ type Network struct {
type NetworkOptions struct {
NumNodes int
BufferSize int
NodeOpts NodeOptions
}
type NodeOptions struct {
MaxPeers uint16
MaxConnected uint16
}
func (opts *NetworkOptions) setDefaults() {
@ -50,7 +56,7 @@ func MakeNetwork(t *testing.T, opts NetworkOptions) *Network {
}
for i := 0; i < opts.NumNodes; i++ {
node := network.MakeNode(t)
node := network.MakeNode(t, opts.NodeOpts)
network.Nodes[node.NodeID] = node
}
@ -81,7 +87,9 @@ func (n *Network) Start(t *testing.T) {
for _, targetAddress := range dialQueue[i+1:] { // nodes <i already connected
targetNode := n.Nodes[targetAddress.NodeID]
targetSub := subs[targetAddress.NodeID]
require.NoError(t, sourceNode.PeerManager.Add(targetAddress))
added, err := sourceNode.PeerManager.Add(targetAddress)
require.NoError(t, err)
require.True(t, added)
select {
case peerUpdate := <-sourceSub.Updates():
@ -107,7 +115,9 @@ func (n *Network) Start(t *testing.T) {
// Add the address to the target as well, so it's able to dial the
// source back if that's even necessary.
require.NoError(t, targetNode.PeerManager.Add(sourceAddress))
added, err = targetNode.PeerManager.Add(sourceAddress)
require.NoError(t, err)
require.True(t, added)
}
}
}
@ -214,7 +224,7 @@ type Node struct {
// MakeNode creates a new Node configured for the network with a
// running peer manager, but does not add it to the existing
// network. Callers are responsible for updating peering relationships.
func (n *Network) MakeNode(t *testing.T) *Node {
func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node {
privKey := ed25519.GenPrivKey()
nodeID := p2p.NodeIDFromPubKey(privKey.PubKey())
nodeInfo := p2p.NodeInfo{
@ -230,6 +240,8 @@ func (n *Network) MakeNode(t *testing.T) *Node {
MinRetryTime: 10 * time.Millisecond,
MaxRetryTime: 100 * time.Millisecond,
RetryTimeJitter: time.Millisecond,
MaxPeers: opts.MaxPeers,
MaxConnected: opts.MaxConnected,
})
require.NoError(t, err)


+ 25
- 9
p2p/peermanager.go View File

@ -384,13 +384,14 @@ func (m *PeerManager) prunePeers() error {
}
// Add adds a peer to the manager, given as an address. If the peer already
// exists, the address is added to it if not already present.
func (m *PeerManager) Add(address NodeAddress) error {
// exists, the address is added to it if it isn't already present. This will push
// low scoring peers out of the address book if it exceeds the maximum size.
func (m *PeerManager) Add(address NodeAddress) (bool, error) {
if err := address.Validate(); err != nil {
return err
return false, err
}
if address.NodeID == m.selfID {
return fmt.Errorf("can't add self (%v) to peer store", m.selfID)
return false, fmt.Errorf("can't add self (%v) to peer store", m.selfID)
}
m.mtx.Lock()
@ -400,17 +401,32 @@ func (m *PeerManager) Add(address NodeAddress) error {
if !ok {
peer = m.newPeerInfo(address.NodeID)
}
if _, ok := peer.AddressInfo[address]; !ok {
peer.AddressInfo[address] = &peerAddressInfo{Address: address}
_, ok = peer.AddressInfo[address]
// if we already have the peer address, there's no need to continue
if ok {
return false, nil
}
// else add the new address
peer.AddressInfo[address] = &peerAddressInfo{Address: address}
if err := m.store.Set(peer); err != nil {
return err
return false, err
}
if err := m.prunePeers(); err != nil {
return err
return true, err
}
m.dialWaker.Wake()
return nil
return true, nil
}
// PeerRatio returns the ratio of peer addresses stored to the maximum size.
func (m *PeerManager) PeerRatio() float64 {
if m.options.MaxPeers == 0 {
return 0
}
m.mtx.Lock()
defer m.mtx.Unlock()
return float64(m.store.Size()) / float64(m.options.MaxPeers)
}
// DialNext finds an appropriate peer address to dial, and marks it as dialing.


+ 3
- 1
p2p/peermanager_scoring_test.go View File

@ -23,7 +23,9 @@ func TestPeerScoring(t *testing.T) {
// create a fake node
id := NodeID(strings.Repeat("a1", 20))
require.NoError(t, peerManager.Add(NodeAddress{NodeID: id, Protocol: "memory"}))
added, err := peerManager.Add(NodeAddress{NodeID: id, Protocol: "memory"})
require.NoError(t, err)
require.True(t, added)
t.Run("Synchronous", func(t *testing.T) {
// update the manager and make sure it's correct


+ 273
- 94
p2p/peermanager_test.go View File

@ -156,7 +156,9 @@ func TestNewPeerManager_Persistence(t *testing.T) {
defer peerManager.Close()
for _, addr := range append(append(aAddresses, bAddresses...), cAddresses...) {
require.NoError(t, peerManager.Add(addr))
added, err := peerManager.Add(addr)
require.NoError(t, err)
require.True(t, added)
}
require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
@ -198,8 +200,12 @@ func TestNewPeerManager_SelfIDChange(t *testing.T) {
peerManager, err := p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{})
require.NoError(t, err)
require.NoError(t, peerManager.Add(a))
require.NoError(t, peerManager.Add(b))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
require.ElementsMatch(t, []p2p.NodeID{a.NodeID, b.NodeID}, peerManager.Peers())
peerManager.Close()
@ -228,32 +234,41 @@ func TestPeerManager_Add(t *testing.T) {
{Protocol: "memory", NodeID: aID},
}
for _, addr := range aAddresses {
err = peerManager.Add(addr)
added, err := peerManager.Add(addr)
require.NoError(t, err)
require.True(t, added)
}
require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
// Adding a different peer should be fine.
bAddress := p2p.NodeAddress{Protocol: "tcp", NodeID: bID, Hostname: "localhost"}
require.NoError(t, peerManager.Add(bAddress))
added, err := peerManager.Add(bAddress)
require.NoError(t, err)
require.True(t, added)
require.Equal(t, []p2p.NodeAddress{bAddress}, peerManager.Addresses(bID))
require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
// Adding an existing address again should be a noop.
require.NoError(t, peerManager.Add(aAddresses[0]))
added, err = peerManager.Add(aAddresses[0])
require.NoError(t, err)
require.False(t, added)
require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
// Adding a third peer with MaxPeers=2 should cause bID, which is
// the lowest-scored peer (not in PersistentPeers), to be removed.
require.NoError(t, peerManager.Add(p2p.NodeAddress{
Protocol: "tcp", NodeID: cID, Hostname: "localhost"}))
added, err = peerManager.Add(p2p.NodeAddress{
Protocol: "tcp", NodeID: cID, Hostname: "localhost"})
require.NoError(t, err)
require.True(t, added)
require.ElementsMatch(t, []p2p.NodeID{aID, cID}, peerManager.Peers())
// Adding an invalid address should error.
require.Error(t, peerManager.Add(p2p.NodeAddress{Path: "foo"}))
_, err = peerManager.Add(p2p.NodeAddress{Path: "foo"})
require.Error(t, err)
// Adding self should error
require.Error(t, peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: selfID}))
_, err = peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: selfID})
require.Error(t, err)
}
func TestPeerManager_DialNext(t *testing.T) {
@ -263,7 +278,9 @@ func TestPeerManager_DialNext(t *testing.T) {
require.NoError(t, err)
// Add an address. DialNext should return it.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
address, err := peerManager.DialNext(ctx)
require.NoError(t, err)
require.Equal(t, a, address)
@ -287,7 +304,9 @@ func TestPeerManager_DialNext_Retry(t *testing.T) {
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), options)
require.NoError(t, err)
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
// Do five dial retries (six dials total). The retry time should double for
// each failure. At the forth retry, MaxRetryTime should kick in.
@ -330,7 +349,9 @@ func TestPeerManager_DialNext_WakeOnAdd(t *testing.T) {
// Spawn a goroutine to add a peer after a delay.
go func() {
time.Sleep(200 * time.Millisecond)
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
}()
// This will block until peer is added above.
@ -351,13 +372,17 @@ func TestPeerManager_DialNext_WakeOnDialFailed(t *testing.T) {
b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
// Add and dial a.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
// Add b. We shouldn't be able to dial it, due to MaxConnected.
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
@ -384,7 +409,9 @@ func TestPeerManager_DialNext_WakeOnDialFailedRetry(t *testing.T) {
a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
// Add a, dial it, and mark it a failure. This will start a retry timer.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
@ -407,8 +434,9 @@ func TestPeerManager_DialNext_WakeOnDisconnected(t *testing.T) {
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
err = peerManager.Add(a)
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
err = peerManager.Accepted(a.NodeID)
require.NoError(t, err)
@ -439,20 +467,26 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) {
require.NoError(t, err)
// Add a and connect to it.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
// Add b and start dialing it.
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// At this point, adding c will not allow dialing it.
require.NoError(t, peerManager.Add(c))
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
@ -480,28 +514,36 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
require.NoError(t, err)
// Add a and connect to it.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
// Add b and start dialing it.
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// Even though we are at capacity, we should be allowed to dial c for an
// upgrade of a, since it's higher-scored.
require.NoError(t, peerManager.Add(c))
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
// However, since we're using all upgrade slots now, we can't add and dial
// d, even though it's also higher-scored.
require.NoError(t, peerManager.Add(d))
added, err = peerManager.Add(d)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
@ -526,7 +568,9 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
// should not be allowed to dial e even though there are upgrade slots,
// because there are no lower-scored nodes that can be upgraded.
require.NoError(t, peerManager.Disconnected(b.NodeID))
require.NoError(t, peerManager.Add(e))
added, err = peerManager.Add(e)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
@ -545,21 +589,27 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
require.NoError(t, err)
// Add a and connect to it.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
// Add b and start dialing it. This will claim a for upgrading.
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// Adding c and dialing it will fail, because a is the only connected
// peer that can be upgraded, and b is already trying to upgrade it.
require.NoError(t, peerManager.Add(c))
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Empty(t, dial)
@ -579,13 +629,17 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) {
require.NoError(t, err)
// Add a and dial it.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
// Adding a's TCP address will not dispense a, since it's already dialing.
require.NoError(t, peerManager.Add(aTCP))
added, err = peerManager.Add(aTCP)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
@ -597,7 +651,9 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) {
require.Zero(t, dial)
// Adding b and accepting a connection from it will not dispense it either.
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(bID))
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
@ -618,7 +674,9 @@ func TestPeerManager_TryDialNext_Multiple(t *testing.T) {
require.NoError(t, err)
for _, address := range addresses {
require.NoError(t, peerManager.Add(address))
added, err := peerManager.Add(address)
require.NoError(t, err)
require.True(t, added)
}
// All addresses should be dispensed as long as dialing them has failed.
@ -648,7 +706,9 @@ func TestPeerManager_DialFailed(t *testing.T) {
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
// Dialing and then calling DialFailed with a different address (same
// NodeID) should unmark as dialing and allow us to dial the other address
@ -686,14 +746,18 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
require.NoError(t, err)
// Add a and connect to it.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
// Add b and start dialing it. This will claim a for upgrading.
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
@ -701,7 +765,9 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
// Adding c and dialing it will fail, even though it could upgrade a and we
// have free upgrade slots, because a is the only connected peer that can be
// upgraded and b is already trying to upgrade it.
require.NoError(t, peerManager.Add(c))
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Empty(t, dial)
@ -721,7 +787,9 @@ func TestPeerManager_Dialed_Connected(t *testing.T) {
require.NoError(t, err)
// Marking a as dialed twice should error.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
@ -730,7 +798,9 @@ func TestPeerManager_Dialed_Connected(t *testing.T) {
require.Error(t, peerManager.Dialed(a))
// Accepting a connection from b and then trying to mark it as dialed should fail.
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
@ -744,7 +814,8 @@ func TestPeerManager_Dialed_Self(t *testing.T) {
require.NoError(t, err)
// Dialing self should error.
require.Error(t, peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: selfID}))
_, err = peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: selfID})
require.Error(t, err)
}
func TestPeerManager_Dialed_MaxConnected(t *testing.T) {
@ -757,14 +828,18 @@ func TestPeerManager_Dialed_MaxConnected(t *testing.T) {
require.NoError(t, err)
// Start to dial a.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
// Marking b as dialed in the meanwhile (even without TryDialNext)
// should be fine.
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Dialed(b))
// Completing the dial for a should now error.
@ -785,14 +860,20 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) {
require.NoError(t, err)
// Dialing a and b is fine.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Dialed(a))
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Dialed(b))
// Starting an upgrade of c should be fine.
require.NoError(t, peerManager.Add(c))
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
@ -800,7 +881,9 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) {
// Trying to mark d dialed should fail, since there are no more upgrade
// slots and a/b haven't been evicted yet.
require.NoError(t, peerManager.Add(d))
added, err = peerManager.Add(d)
require.NoError(t, err)
require.True(t, added)
require.Error(t, peerManager.Dialed(d))
}
@ -827,11 +910,15 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) {
require.NoError(t, err)
// Dialing a is fine.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Dialed(a))
// Upgrading it with b should work, since b has a higher score.
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
@ -839,7 +926,9 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) {
// a hasn't been evicted yet, but c shouldn't be allowed to upgrade anyway
// since it's about to be evicted.
require.NoError(t, peerManager.Add(c))
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Empty(t, dial)
@ -869,15 +958,21 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) {
require.NoError(t, err)
// Connect to a and b.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Dialed(a))
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Dialed(b))
// Start an upgrade with c, which should pick b to upgrade (since it
// has score 2).
require.NoError(t, peerManager.Add(c))
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
@ -885,7 +980,9 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) {
// In the meanwhile, a disconnects and d connects. d is even lower-scored
// than b (1 vs 2), which is currently being upgraded.
require.NoError(t, peerManager.Disconnected(a.NodeID))
require.NoError(t, peerManager.Add(d))
added, err = peerManager.Add(d)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(d.NodeID))
// Once c completes the upgrade of b, it should instead evict d,
@ -913,14 +1010,20 @@ func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) {
require.NoError(t, err)
// Connect to a and b.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Dialed(a))
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Dialed(b))
// Start an upgrade with c, which should pick a to upgrade.
require.NoError(t, peerManager.Add(c))
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
@ -950,7 +1053,9 @@ func TestPeerManager_Accepted(t *testing.T) {
require.Error(t, peerManager.Accepted(selfID))
// Accepting a connection from a known peer should work.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
// Accepting a connection from an already accepted peer should error.
@ -962,7 +1067,9 @@ func TestPeerManager_Accepted(t *testing.T) {
// Accepting a connection from a peer that's being dialed should work, and
// should cause the dial to fail.
require.NoError(t, peerManager.Add(c))
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
@ -970,7 +1077,9 @@ func TestPeerManager_Accepted(t *testing.T) {
require.Error(t, peerManager.Dialed(c))
// Accepting a connection from a peer that's been dialed should fail.
require.NoError(t, peerManager.Add(d))
added, err = peerManager.Add(d)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, d, dial)
@ -989,14 +1098,20 @@ func TestPeerManager_Accepted_MaxConnected(t *testing.T) {
require.NoError(t, err)
// Connect to a and b.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Dialed(a))
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(b.NodeID))
// Accepting c should now fail.
require.NoError(t, peerManager.Add(c))
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
require.Error(t, peerManager.Accepted(c.NodeID))
}
@ -1017,23 +1132,32 @@ func TestPeerManager_Accepted_MaxConnectedUpgrade(t *testing.T) {
require.NoError(t, err)
// Dial a.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Dialed(a))
// Accepting b should fail, since it's not an upgrade over a.
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
require.Error(t, peerManager.Accepted(b.NodeID))
// Accepting c should work, since it upgrades a.
require.NoError(t, peerManager.Add(c))
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(c.NodeID))
// a still hasn't been evicted, so accepting b should still fail.
require.NoError(t, peerManager.Add(b))
_, err = peerManager.Add(b)
require.NoError(t, err)
require.Error(t, peerManager.Accepted(b.NodeID))
// Also, accepting d should fail, since all upgrade slots are full.
require.NoError(t, peerManager.Add(d))
added, err = peerManager.Add(d)
require.NoError(t, err)
require.True(t, added)
require.Error(t, peerManager.Accepted(d.NodeID))
}
@ -1053,15 +1177,21 @@ func TestPeerManager_Accepted_Upgrade(t *testing.T) {
require.NoError(t, err)
// Accept a.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
// Accepting b should work, since it upgrades a.
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(b.NodeID))
// c cannot get accepted, since a has been upgraded by b.
require.NoError(t, peerManager.Add(c))
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
require.Error(t, peerManager.Accepted(c.NodeID))
// This should cause a to get evicted.
@ -1090,18 +1220,24 @@ func TestPeerManager_Accepted_UpgradeDialing(t *testing.T) {
require.NoError(t, err)
// Accept a.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
// Start dial upgrade from a to b.
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// a has already been claimed as an upgrade of a, so accepting
// c should fail since there's noone else to upgrade.
require.NoError(t, peerManager.Add(c))
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
require.Error(t, peerManager.Accepted(c.NodeID))
// However, if b connects to us while we're also trying to upgrade to it via
@ -1126,7 +1262,9 @@ func TestPeerManager_Ready(t *testing.T) {
defer sub.Close()
// Connecting to a should still have it as status down.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID))
@ -1139,7 +1277,9 @@ func TestPeerManager_Ready(t *testing.T) {
}, <-sub.Updates())
// Marking an unconnected peer as ready should do nothing.
require.NoError(t, peerManager.Add(b))
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
require.NoError(t, peerManager.Ready(b.NodeID))
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
@ -1153,7 +1293,9 @@ func TestPeerManager_EvictNext(t *testing.T) {
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
@ -1184,7 +1326,9 @@ func TestPeerManager_EvictNext_WakeOnError(t *testing.T) {
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
@ -1214,14 +1358,18 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) {
require.NoError(t, err)
// Connect a.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
// Spawn a goroutine to upgrade to b with a delay.
go func() {
time.Sleep(200 * time.Millisecond)
require.NoError(t, peerManager.Add(b))
added, err := peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
@ -1248,7 +1396,9 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) {
require.NoError(t, err)
// Connect a.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
@ -1271,7 +1421,9 @@ func TestPeerManager_TryEvictNext(t *testing.T) {
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
// Nothing is evicted with no peers connected.
evict, err := peerManager.TryEvictNext()
@ -1314,13 +1466,16 @@ func TestPeerManager_Disconnected(t *testing.T) {
require.Empty(t, sub.Updates())
// Disconnecting an accepted non-ready peer does not send a status update.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Disconnected(a.NodeID))
require.Empty(t, sub.Updates())
// Disconnecting a ready peer sends a status update.
require.NoError(t, peerManager.Add(a))
_, err = peerManager.Add(a)
require.NoError(t, err)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
@ -1365,7 +1520,9 @@ func TestPeerManager_Errored(t *testing.T) {
// Erroring a known peer does nothing, and won't evict it later,
// even when it connects.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
evict, err = peerManager.TryEvictNext()
require.NoError(t, err)
@ -1394,7 +1551,9 @@ func TestPeerManager_Subscribe(t *testing.T) {
sub := peerManager.Subscribe()
defer sub.Close()
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.Empty(t, sub.Updates())
// Inbound connection.
@ -1452,7 +1611,9 @@ func TestPeerManager_Subscribe_Close(t *testing.T) {
sub := peerManager.Subscribe()
defer sub.Close()
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.Empty(t, sub.Updates())
@ -1482,7 +1643,9 @@ func TestPeerManager_Subscribe_Broadcast(t *testing.T) {
defer s3.Close()
// Connecting to a peer should send updates on all subscriptions.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
@ -1524,7 +1687,9 @@ func TestPeerManager_Close(t *testing.T) {
// This dial failure will start a retry timer for 10 seconds, which
// should be reaped.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
@ -1556,12 +1721,24 @@ func TestPeerManager_Advertise(t *testing.T) {
require.NoError(t, err)
defer peerManager.Close()
require.NoError(t, peerManager.Add(aTCP))
require.NoError(t, peerManager.Add(aMem))
require.NoError(t, peerManager.Add(bTCP))
require.NoError(t, peerManager.Add(bMem))
require.NoError(t, peerManager.Add(cTCP))
require.NoError(t, peerManager.Add(cMem))
added, err := peerManager.Add(aTCP)
require.NoError(t, err)
require.True(t, added)
added, err = peerManager.Add(aMem)
require.NoError(t, err)
require.True(t, added)
added, err = peerManager.Add(bTCP)
require.NoError(t, err)
require.True(t, added)
added, err = peerManager.Add(bMem)
require.NoError(t, err)
require.True(t, added)
added, err = peerManager.Add(cTCP)
require.NoError(t, err)
require.True(t, added)
added, err = peerManager.Add(cMem)
require.NoError(t, err)
require.True(t, added)
// d should get all addresses.
require.ElementsMatch(t, []p2p.NodeAddress{
@ -1592,7 +1769,9 @@ func TestPeerManager_SetHeight_GetHeight(t *testing.T) {
// Getting a height should default to 0, for unknown peers and
// for known peers without height.
require.NoError(t, peerManager.Add(a))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.EqualValues(t, 0, peerManager.GetHeight(a.NodeID))
require.EqualValues(t, 0, peerManager.GetHeight(b.NodeID))


+ 342
- 53
p2p/pex/reactor.go View File

@ -3,9 +3,12 @@ package pex
import (
"context"
"fmt"
"sync"
"time"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/p2p"
protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
@ -16,15 +19,40 @@ var (
_ p2p.Wrapper = (*protop2p.PexMessage)(nil)
)
// TODO: Consolidate with params file.
// See https://github.com/tendermint/tendermint/issues/6371
const (
maxAddresses uint16 = 100
resolveTimeout = 3 * time.Second
// the minimum time one peer can send another request to the same peer
minReceiveRequestInterval = 300 * time.Millisecond
// the maximum amount of addresses that can be included in a response
maxAddresses uint16 = 100
// allocated time to resolve a node address into a set of endpoints
resolveTimeout = 3 * time.Second
// How long to wait when there are no peers available before trying again
noAvailablePeersWaitPeriod = 1 * time.Second
// indicates the ping rate of the pex reactor when the peer store is full.
// The reactor should still look to add new peers in order to flush out low
// scoring peers that are still in the peer store
fullCapacityInterval = 10 * time.Minute
)
// ReactorV2 is a PEX reactor for the new P2P stack. The legacy reactor
// is Reactor.
//
// FIXME: Rename this when Reactor is removed, and consider moving to p2p/.
//
// The peer exchange or PEX reactor supports the peer manager by sending
// requests to other peers for addresses that can be given to the peer manager
// and at the same time advertises addresses to peers that need more.
//
// The reactor is able to tweak the intensity of it's search by decreasing or
// increasing the interval between each request. It tracks connected peers via
// a linked list, sending a request to the node at the front of the list and
// adding it to the back of the list once a response is received.
type ReactorV2 struct {
service.BaseService
@ -32,6 +60,33 @@ type ReactorV2 struct {
pexCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
// list of available peers to loop through and send peer requests to
availablePeers *clist.CList
mtx sync.RWMutex
// requestsSent keeps track of which peers the PEX reactor has sent requests
// to. This prevents the sending of spurious responses.
// NOTE: If a node never responds, they will remain in this map until a
// peer down status update is sent
requestsSent map[p2p.NodeID]struct{}
// lastReceivedRequests keeps track of when peers send a request to prevent
// peers from sending requests too often (as defined by
// minReceiveRequestInterval).
lastReceivedRequests map[p2p.NodeID]time.Time
// the time when another request will be sent
nextRequestTime time.Time
// keep track of how many new peers to existing peers we have received to
// extrapolate the size of the network
newPeers uint32
totalPeers uint32
// discoveryRatio is the inverse ratio of new peers to old peers squared.
// This is multiplied by the minimum duration to calculate how long to wait
// between each request.
discoveryRatio float32
}
// NewReactor returns a reference to a new reactor.
@ -42,10 +97,13 @@ func NewReactorV2(
peerUpdates *p2p.PeerUpdates,
) *ReactorV2 {
r := &ReactorV2{
peerManager: peerManager,
pexCh: pexCh,
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
peerManager: peerManager,
pexCh: pexCh,
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
availablePeers: clist.New(),
requestsSent: make(map[p2p.NodeID]struct{}),
lastReceivedRequests: make(map[p2p.NodeID]time.Time),
}
r.BaseService = *service.NewBaseService(logger, "PEX", r)
@ -76,32 +134,156 @@ func (r *ReactorV2) OnStop() {
<-r.peerUpdates.Done()
}
// processPexCh implements a blocking event loop where we listen for p2p
// Envelope messages from the pexCh.
func (r *ReactorV2) processPexCh() {
defer r.pexCh.Close()
for {
select {
case <-r.closeCh:
r.Logger.Debug("stopped listening on PEX channel; closing...")
return
// outbound requests for new peers
case <-r.waitUntilNextRequest():
r.sendRequestForPeers()
// inbound requests for new peers or responses to requests sent by this
// reactor
case envelope := <-r.pexCh.In:
if err := r.handleMessage(r.pexCh.ID, envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err)
r.pexCh.Error <- p2p.PeerError{
NodeID: envelope.From,
Err: err,
}
}
}
}
}
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *ReactorV2) processPeerUpdates() {
defer r.peerUpdates.Close()
for {
select {
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(peerUpdate)
case <-r.closeCh:
r.Logger.Debug("stopped listening on peer updates channel; closing...")
return
}
}
}
// handlePexMessage handles envelopes sent from peers on the PexChannel.
func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error {
logger := r.Logger.With("peer", envelope.From)
// FIXME: We may want to add DoS protection here, by rate limiting peers and
// only processing addresses we actually requested.
switch msg := envelope.Message.(type) {
case *protop2p.PexRequest:
pexAddresses := r.resolve(r.peerManager.Advertise(envelope.From, maxAddresses), maxAddresses)
// check if the peer hasn't sent a prior request too close to this one
// in time
if err := r.markPeerRequest(envelope.From); err != nil {
return err
}
// parse and send the legacy PEX addresses
pexAddresses := r.resolve(r.peerManager.Advertise(envelope.From, maxAddresses))
r.pexCh.Out <- p2p.Envelope{
To: envelope.From,
Message: &protop2p.PexResponse{Addresses: pexAddresses},
}
case *protop2p.PexResponse:
// check if the response matches a request that was made to that peer
if err := r.markPeerResponse(envelope.From); err != nil {
return err
}
// check the size of the response
if len(msg.Addresses) > int(maxAddresses) {
return fmt.Errorf("peer sent too many addresses (max: %d, got: %d)",
maxAddresses,
len(msg.Addresses),
)
}
for _, pexAddress := range msg.Addresses {
// no protocol is prefixed so we assume the default (mconn)
peerAddress, err := p2p.ParseNodeAddress(
fmt.Sprintf("%s@%s:%d", pexAddress.ID, pexAddress.IP, pexAddress.Port))
if err != nil {
logger.Debug("invalid PEX address", "address", pexAddress, "err", err)
continue
}
if err = r.peerManager.Add(peerAddress); err != nil {
logger.Debug("failed to register PEX address", "address", peerAddress, "err", err)
added, err := r.peerManager.Add(peerAddress)
if err != nil {
logger.Error("failed to add PEX address", "address", peerAddress, "err", err)
}
if added {
r.newPeers++
logger.Debug("added PEX address", "address", peerAddress)
}
r.totalPeers++
}
// V2 PEX MESSAGES
case *protop2p.PexRequestV2:
// check if the peer hasn't sent a prior request too close to this one
// in time
if err := r.markPeerRequest(envelope.From); err != nil {
return err
}
// request peers from the peer manager and parse the NodeAddresses into
// URL strings
nodeAddresses := r.peerManager.Advertise(envelope.From, maxAddresses)
pexAddressesV2 := make([]protop2p.PexAddressV2, len(nodeAddresses))
for idx, addr := range nodeAddresses {
pexAddressesV2[idx] = protop2p.PexAddressV2{
URL: addr.String(),
}
}
r.pexCh.Out <- p2p.Envelope{
To: envelope.From,
Message: &protop2p.PexResponseV2{Addresses: pexAddressesV2},
}
case *protop2p.PexResponseV2:
// check if the response matches a request that was made to that peer
if err := r.markPeerResponse(envelope.From); err != nil {
return err
}
// check the size of the response
if len(msg.Addresses) > int(maxAddresses) {
return fmt.Errorf("peer sent too many addresses (max: %d, got: %d)",
maxAddresses,
len(msg.Addresses),
)
}
for _, pexAddress := range msg.Addresses {
peerAddress, err := p2p.ParseNodeAddress(pexAddress.URL)
if err != nil {
continue
}
added, err := r.peerManager.Add(peerAddress)
if err != nil {
logger.Error("failed to add V2 PEX address", "address", peerAddress, "err", err)
}
if added {
r.newPeers++
logger.Debug("added V2 PEX address", "address", peerAddress)
}
r.totalPeers++
}
default:
return fmt.Errorf("received unknown message: %T", msg)
@ -119,23 +301,31 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error {
//
// FIXME: We may want to cache and parallelize this, but for now we'll just rely
// on the operating system to cache it for us.
func (r *ReactorV2) resolve(addresses []p2p.NodeAddress, limit uint16) []protop2p.PexAddress {
pexAddresses := make([]protop2p.PexAddress, 0, len(addresses))
func (r *ReactorV2) resolve(addresses []p2p.NodeAddress) []protop2p.PexAddress {
limit := len(addresses)
pexAddresses := make([]protop2p.PexAddress, 0, limit)
for _, address := range addresses {
ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout)
endpoints, err := address.Resolve(ctx)
r.Logger.Debug("resolved node address", "endpoints", endpoints)
cancel()
if err != nil {
r.Logger.Debug("failed to resolve address", "address", address, "err", err)
continue
}
for _, endpoint := range endpoints {
if len(pexAddresses) >= int(limit) {
r.Logger.Debug("checking endpint", "IP", endpoint.IP, "Port", endpoint.Port)
if len(pexAddresses) >= limit {
return pexAddresses
} else if endpoint.IP != nil {
r.Logger.Debug("appending pex address")
// PEX currently only supports IP-networked transports (as
// opposed to e.g. p2p.MemoryTransport).
//
// FIXME: as the PEX address contains no information about the
// protocol, we jam this into the ID. We won't need to this once
// we support URLs
pexAddresses = append(pexAddresses, protop2p.PexAddress{
ID: string(address.NodeID),
IP: endpoint.IP.String(),
@ -157,7 +347,7 @@ func (r *ReactorV2) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (er
}
}()
r.Logger.Debug("received message", "peer", envelope.From)
r.Logger.Debug("received PEX message", "peer", envelope.From)
switch chID {
case p2p.ChannelID(PexChannel):
@ -170,56 +360,155 @@ func (r *ReactorV2) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (er
return err
}
// processPexCh implements a blocking event loop where we listen for p2p
// Envelope messages from the pexCh.
func (r *ReactorV2) processPexCh() {
defer r.pexCh.Close()
// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we
// send a request for addresses.
func (r *ReactorV2) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.Logger.Debug("received PEX peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
switch peerUpdate.Status {
case p2p.PeerStatusUp:
r.availablePeers.PushBack(peerUpdate.NodeID)
case p2p.PeerStatusDown:
r.removePeer(peerUpdate.NodeID)
default:
}
}
for {
select {
case envelope := <-r.pexCh.In:
if err := r.handleMessage(r.pexCh.ID, envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err)
r.pexCh.Error <- p2p.PeerError{
NodeID: envelope.From,
Err: err,
}
}
func (r *ReactorV2) waitUntilNextRequest() <-chan time.Time {
return time.After(time.Until(r.nextRequestTime))
}
case <-r.closeCh:
r.Logger.Debug("stopped listening on PEX channel; closing...")
return
// sendRequestForPeers pops the first peerID off the list and sends the
// peer a request for more peer addresses. The function then moves the
// peer into the requestsSent bucket and calculates when the next request
// time should be
func (r *ReactorV2) sendRequestForPeers() {
peer := r.availablePeers.Front()
if peer == nil {
// no peers are available
r.Logger.Debug("no available peers to send request to, waiting...")
r.nextRequestTime = time.Now().Add(noAvailablePeersWaitPeriod)
return
}
peerID := peer.Value.(p2p.NodeID)
// The node accommodates for both pex systems
if r.isLegacyPeer(peerID) {
r.pexCh.Out <- p2p.Envelope{
To: peerID,
Message: &protop2p.PexRequest{},
}
} else {
r.pexCh.Out <- p2p.Envelope{
To: peerID,
Message: &protop2p.PexRequestV2{},
}
}
// remove the peer from the available peers list and mark it in the requestsSent map
r.availablePeers.Remove(peer)
peer.DetachPrev()
r.mtx.Lock()
r.requestsSent[peerID] = struct{}{}
r.mtx.Unlock()
r.calculateNextRequestTime()
r.Logger.Debug("peer request sent", "next_request_time", r.nextRequestTime)
}
// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we
// send a request for addresses.
func (r *ReactorV2) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
// calculateNextRequestTime implements something of a proportional controller
// to estimate how often the reactor should be requesting new peer addresses.
// The dependent variable in this calculation is the ratio of new peers to
// all peers that the reactor receives. The interval is thus calculated as the
// inverse squared. In the beginning, all peers should be new peers.
// We expect this ratio to be near 1 and thus the interval to be as short
// as possible. As the node becomes more familiar with the network the ratio of
// new nodes will plummet to a very small number, meaning the interval expands
// to its upper bound.
// CONTRACT: Must use a write lock as nextRequestTime is updated
func (r *ReactorV2) calculateNextRequestTime() {
// check if the peer store is full. If so then there is no need
// to send peer requests too often
if ratio := r.peerManager.PeerRatio(); ratio >= 0.95 {
r.Logger.Debug("peer manager near full ratio, sleeping...",
"sleep_period", fullCapacityInterval, "ratio", ratio)
r.nextRequestTime = time.Now().Add(fullCapacityInterval)
return
}
if peerUpdate.Status == p2p.PeerStatusUp {
r.pexCh.Out <- p2p.Envelope{
To: peerUpdate.NodeID,
Message: &protop2p.PexRequest{},
// baseTime represents the shortest interval that we can send peer requests
// in. For example if we have 10 peers and we can't send a message to the
// same peer every 500ms, then we can send a request every 50ms. In practice
// we use a safety margin of 2, ergo 100ms
peers := tmmath.MinInt(r.availablePeers.Len(), 50)
baseTime := minReceiveRequestInterval
if peers > 0 {
baseTime = minReceiveRequestInterval * 2 / time.Duration(peers)
}
if r.totalPeers > 0 || r.discoveryRatio == 0 {
// find the ratio of new peers. NOTE: We add 1 to both sides to avoid
// divide by zero problems
ratio := float32(r.totalPeers+1) / float32(r.newPeers+1)
// square the ratio in order to get non linear time intervals
// NOTE: The longest possible interval for a network with 100 or more peers
// where a node is connected to 50 of them is 2 minutes.
r.discoveryRatio = ratio * ratio
r.newPeers = 0
r.totalPeers = 0
}
// NOTE: As ratio is always >= 1, discovery ratio is >= 1. Therefore we don't need to worry
// about the next request time being less than the minimum time
r.nextRequestTime = time.Now().Add(baseTime * time.Duration(r.discoveryRatio))
}
func (r *ReactorV2) removePeer(id p2p.NodeID) {
for e := r.availablePeers.Front(); e != nil; e = e.Next() {
if e.Value == id {
r.availablePeers.Remove(e)
e.DetachPrev()
break
}
}
r.mtx.Lock()
defer r.mtx.Unlock()
delete(r.requestsSent, id)
delete(r.lastReceivedRequests, id)
}
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *ReactorV2) processPeerUpdates() {
defer r.peerUpdates.Close()
func (r *ReactorV2) markPeerRequest(peer p2p.NodeID) error {
r.mtx.Lock()
defer r.mtx.Unlock()
if lastRequestTime, ok := r.lastReceivedRequests[peer]; ok {
if time.Now().Before(lastRequestTime.Add(minReceiveRequestInterval)) {
return fmt.Errorf("peer sent a request too close after a prior one. Minimum interval: %v",
minReceiveRequestInterval)
}
}
r.lastReceivedRequests[peer] = time.Now()
return nil
}
for {
select {
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(peerUpdate)
func (r *ReactorV2) markPeerResponse(peer p2p.NodeID) error {
r.mtx.Lock()
defer r.mtx.Unlock()
// check if a request to this peer was sent
if _, ok := r.requestsSent[peer]; !ok {
return fmt.Errorf("peer sent a PEX response when none was requested (%v)", peer)
}
delete(r.requestsSent, peer)
// attach to the back of the list so that the peer can be used again for
// future requests
r.availablePeers.PushBack(peer)
return nil
}
case <-r.closeCh:
r.Logger.Debug("stopped listening on peer updates channel; closing...")
return
// all addresses must use a MCONN protocol for the peer to be considered part of the
// legacy p2p pex system
func (r *ReactorV2) isLegacyPeer(peer p2p.NodeID) bool {
for _, addr := range r.peerManager.Addresses(peer) {
if addr.Protocol != p2p.MConnProtocol {
return false
}
}
return true
}

+ 722
- 0
p2p/pex/reactor_test.go View File

@ -0,0 +1,722 @@
package pex_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/p2ptest"
"github.com/tendermint/tendermint/p2p/pex"
proto "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
const (
checkFrequency = 500 * time.Millisecond
defaultBufferSize = 2
shortWait = 10 * time.Second
longWait = 60 * time.Second
firstNode = 0
secondNode = 1
thirdNode = 2
fourthNode = 3
)
func TestReactorBasic(t *testing.T) {
// start a network with one mock reactor and one "real" reactor
testNet := setup(t, testOptions{
MockNodes: 1,
TotalNodes: 2,
})
testNet.connectAll(t)
testNet.start(t)
// assert that the mock node receives a request from the real node
testNet.listenForRequest(t, secondNode, firstNode, shortWait)
// assert that when a mock node sends a request it receives a response (and
// the correct one)
testNet.sendRequest(t, firstNode, secondNode, true)
testNet.listenForResponse(t, secondNode, firstNode, shortWait, []proto.PexAddressV2(nil))
}
func TestReactorConnectFullNetwork(t *testing.T) {
testNet := setup(t, testOptions{
TotalNodes: 8,
})
// make every node be only connected with one other node (it actually ends up
// being two because of two way connections but oh well)
testNet.connectN(t, 1)
testNet.start(t)
// assert that all nodes add each other in the network
for idx := 0; idx < len(testNet.nodes); idx++ {
testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1, longWait)
}
}
func TestReactorSendsRequestsTooOften(t *testing.T) {
testNet := setup(t, testOptions{
MockNodes: 1,
TotalNodes: 3,
})
testNet.connectAll(t)
testNet.start(t)
// firstNode sends two requests to the secondNode
testNet.sendRequest(t, firstNode, secondNode, true)
testNet.sendRequest(t, firstNode, secondNode, true)
// assert that the secondNode evicts the first node (although they reconnect
// straight away again)
testNet.listenForPeerUpdate(t, secondNode, firstNode, p2p.PeerStatusDown, shortWait)
// firstNode should still receive the address of the thirdNode by the secondNode
expectedAddrs := testNet.getV2AddressesFor([]int{thirdNode})
testNet.listenForResponse(t, secondNode, firstNode, shortWait, expectedAddrs)
}
func TestReactorSendsResponseWithoutRequest(t *testing.T) {
testNet := setup(t, testOptions{
MockNodes: 1,
TotalNodes: 3,
})
testNet.connectAll(t)
testNet.start(t)
// firstNode sends the secondNode an unrequested response
// NOTE: secondNode will send a request by default during startup so we send
// two responses to counter that.
testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode}, true)
testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode}, true)
// secondNode should evict the firstNode
testNet.listenForPeerUpdate(t, secondNode, firstNode, p2p.PeerStatusDown, shortWait)
}
func TestReactorNeverSendsTooManyPeers(t *testing.T) {
testNet := setup(t, testOptions{
MockNodes: 1,
TotalNodes: 2,
})
testNet.connectAll(t)
testNet.start(t)
testNet.addNodes(t, 110)
nodes := make([]int, 110)
for i := 0; i < len(nodes); i++ {
nodes[i] = i + 2
}
testNet.addAddresses(t, secondNode, nodes)
// first we check that even although we have 110 peers, honest pex reactors
// only send 100 (test if secondNode sends firstNode 100 addresses)
testNet.pingAndlistenForNAddresses(t, secondNode, firstNode, shortWait, 100)
}
func TestReactorErrorsOnReceivingTooManyPeers(t *testing.T) {
testNet := setup(t, testOptions{
MockNodes: 1,
TotalNodes: 2,
})
testNet.connectAll(t)
testNet.start(t)
testNet.addNodes(t, 110)
nodes := make([]int, 110)
for i := 0; i < len(nodes); i++ {
nodes[i] = i + 2
}
// now we send a response with more than 100 peers
testNet.sendResponse(t, firstNode, secondNode, nodes, true)
// secondNode should evict the firstNode
testNet.listenForPeerUpdate(t, secondNode, firstNode, p2p.PeerStatusDown, shortWait)
}
func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) {
testNet := setup(t, testOptions{
TotalNodes: 16,
MaxPeers: 8,
MaxConnected: 6,
BufferSize: 8,
})
testNet.connectN(t, 1)
testNet.start(t)
// test that all nodes reach full capacity
for _, nodeID := range testNet.nodes {
require.Eventually(t, func() bool {
// nolint:scopelint
return testNet.network.Nodes[nodeID].PeerManager.PeerRatio() >= 0.9
}, longWait, checkFrequency)
}
}
func TestReactorLargePeerStoreInASmallNetwork(t *testing.T) {
testNet := setup(t, testOptions{
TotalNodes: 10,
MaxPeers: 100,
MaxConnected: 100,
BufferSize: 10,
})
testNet.connectN(t, 1)
testNet.start(t)
// assert that all nodes add each other in the network
for idx := 0; idx < len(testNet.nodes); idx++ {
testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1, longWait)
}
}
func TestReactorWithNetworkGrowth(t *testing.T) {
testNet := setup(t, testOptions{
TotalNodes: 5,
BufferSize: 5,
})
testNet.connectAll(t)
testNet.start(t)
// assert that all nodes add each other in the network
for idx := 0; idx < len(testNet.nodes); idx++ {
testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1, shortWait)
}
// now we inject 10 more nodes
testNet.addNodes(t, 10)
for i := 5; i < testNet.total; i++ {
node := testNet.nodes[i]
require.NoError(t, testNet.reactors[node].Start())
require.True(t, testNet.reactors[node].IsRunning())
// we connect all new nodes to a single entry point and check that the
// node can distribute the addresses to all the others
testNet.connectPeers(t, 0, i)
}
require.Len(t, testNet.reactors, 15)
// assert that all nodes add each other in the network
for idx := 0; idx < len(testNet.nodes); idx++ {
testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1, longWait)
}
}
func TestReactorIntegrationWithLegacyHandleRequest(t *testing.T) {
testNet := setup(t, testOptions{
MockNodes: 1,
TotalNodes: 3,
})
testNet.connectAll(t)
testNet.start(t)
t.Log(testNet.nodes)
// mock node sends a V1 Pex message to the second node
testNet.sendRequest(t, firstNode, secondNode, false)
addrs := testNet.getAddressesFor(t, []int{thirdNode})
testNet.listenForLegacyResponse(t, secondNode, firstNode, shortWait, addrs)
}
func TestReactorIntegrationWithLegacyHandleResponse(t *testing.T) {
testNet := setup(t, testOptions{
MockNodes: 1,
TotalNodes: 4,
BufferSize: 4,
})
testNet.connectPeers(t, firstNode, secondNode)
testNet.connectPeers(t, firstNode, thirdNode)
testNet.connectPeers(t, firstNode, fourthNode)
testNet.start(t)
testNet.listenForRequest(t, secondNode, firstNode, shortWait)
// send a v1 response instead
testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode, fourthNode}, false)
testNet.requireNumberOfPeers(t, secondNode, len(testNet.nodes)-1, shortWait)
}
type reactorTestSuite struct {
network *p2ptest.Network
logger log.Logger
reactors map[p2p.NodeID]*pex.ReactorV2
pexChannels map[p2p.NodeID]*p2p.Channel
peerChans map[p2p.NodeID]chan p2p.PeerUpdate
peerUpdates map[p2p.NodeID]*p2p.PeerUpdates
nodes []p2p.NodeID
mocks []p2p.NodeID
total int
opts testOptions
}
type testOptions struct {
MockNodes int
TotalNodes int
BufferSize int
MaxPeers uint16
MaxConnected uint16
}
// setup setups a test suite with a network of nodes. Mocknodes represent the
// hollow nodes that the test can listen and send on
func setup(t *testing.T, opts testOptions) *reactorTestSuite {
t.Helper()
require.Greater(t, opts.TotalNodes, opts.MockNodes)
if opts.BufferSize == 0 {
opts.BufferSize = defaultBufferSize
}
networkOpts := p2ptest.NetworkOptions{
NumNodes: opts.TotalNodes,
BufferSize: opts.BufferSize,
NodeOpts: p2ptest.NodeOptions{
MaxPeers: opts.MaxPeers,
MaxConnected: opts.MaxConnected,
},
}
chBuf := opts.BufferSize
realNodes := opts.TotalNodes - opts.MockNodes
rts := &reactorTestSuite{
logger: log.TestingLogger().With("testCase", t.Name()),
network: p2ptest.MakeNetwork(t, networkOpts),
reactors: make(map[p2p.NodeID]*pex.ReactorV2, realNodes),
pexChannels: make(map[p2p.NodeID]*p2p.Channel, opts.TotalNodes),
peerChans: make(map[p2p.NodeID]chan p2p.PeerUpdate, opts.TotalNodes),
peerUpdates: make(map[p2p.NodeID]*p2p.PeerUpdates, opts.TotalNodes),
total: opts.TotalNodes,
opts: opts,
}
// NOTE: we don't assert that the channels get drained after stopping the
// reactor
rts.pexChannels = rts.network.MakeChannelsNoCleanup(
t, p2p.ChannelID(pex.PexChannel), new(proto.PexMessage), chBuf,
)
idx := 0
for nodeID := range rts.network.Nodes {
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf)
rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID])
// the first nodes in the array are always mock nodes
if idx < opts.MockNodes {
rts.mocks = append(rts.mocks, nodeID)
} else {
rts.reactors[nodeID] = pex.NewReactorV2(
rts.logger.With("nodeID", nodeID),
rts.network.Nodes[nodeID].PeerManager,
rts.pexChannels[nodeID],
rts.peerUpdates[nodeID],
)
}
rts.nodes = append(rts.nodes, nodeID)
idx++
}
require.Len(t, rts.reactors, realNodes)
t.Cleanup(func() {
for nodeID, reactor := range rts.reactors {
if reactor.IsRunning() {
require.NoError(t, reactor.Stop())
require.False(t, reactor.IsRunning())
}
rts.pexChannels[nodeID].Close()
rts.peerUpdates[nodeID].Close()
}
for _, nodeID := range rts.mocks {
rts.pexChannels[nodeID].Close()
rts.peerUpdates[nodeID].Close()
}
})
return rts
}
// starts up the pex reactors for each node
func (r *reactorTestSuite) start(t *testing.T) {
t.Helper()
for _, reactor := range r.reactors {
require.NoError(t, reactor.Start())
require.True(t, reactor.IsRunning())
}
}
func (r *reactorTestSuite) addNodes(t *testing.T, nodes int) {
t.Helper()
for i := 0; i < nodes; i++ {
node := r.network.MakeNode(t, p2ptest.NodeOptions{
MaxPeers: r.opts.MaxPeers,
MaxConnected: r.opts.MaxConnected,
})
r.network.Nodes[node.NodeID] = node
nodeID := node.NodeID
r.pexChannels[nodeID] = node.MakeChannelNoCleanup(
t, p2p.ChannelID(pex.PexChannel), new(proto.PexMessage), r.opts.BufferSize,
)
r.peerChans[nodeID] = make(chan p2p.PeerUpdate, r.opts.BufferSize)
r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize)
r.network.Nodes[nodeID].PeerManager.Register(r.peerUpdates[nodeID])
r.reactors[nodeID] = pex.NewReactorV2(
r.logger.With("nodeID", nodeID),
r.network.Nodes[nodeID].PeerManager,
r.pexChannels[nodeID],
r.peerUpdates[nodeID],
)
r.nodes = append(r.nodes, nodeID)
r.total++
}
}
func (r *reactorTestSuite) listenFor(
t *testing.T,
node p2p.NodeID,
conditional func(msg p2p.Envelope) bool,
assertion func(t *testing.T, msg p2p.Envelope) bool,
waitPeriod time.Duration,
) {
timesUp := time.After(waitPeriod)
for {
select {
case envelope := <-r.pexChannels[node].In:
if conditional(envelope) && assertion(t, envelope) {
return
}
case <-timesUp:
require.Fail(t, "timed out waiting for message",
"node=%v, waitPeriod=%s", node, waitPeriod)
}
}
}
func (r *reactorTestSuite) listenForRequest(t *testing.T, fromNode, toNode int, waitPeriod time.Duration) {
r.logger.Info("Listening for request", "from", fromNode, "to", toNode)
to, from := r.checkNodePair(t, toNode, fromNode)
conditional := func(msg p2p.Envelope) bool {
_, ok := msg.Message.(*proto.PexRequestV2)
return ok && msg.From == from
}
assertion := func(t *testing.T, msg p2p.Envelope) bool {
require.Equal(t, &proto.PexRequestV2{}, msg.Message)
return true
}
r.listenFor(t, to, conditional, assertion, waitPeriod)
}
func (r *reactorTestSuite) pingAndlistenForNAddresses(
t *testing.T,
fromNode, toNode int,
waitPeriod time.Duration,
addresses int,
) {
r.logger.Info("Listening for addresses", "from", fromNode, "to", toNode)
to, from := r.checkNodePair(t, toNode, fromNode)
conditional := func(msg p2p.Envelope) bool {
_, ok := msg.Message.(*proto.PexResponseV2)
return ok && msg.From == from
}
assertion := func(t *testing.T, msg p2p.Envelope) bool {
m, ok := msg.Message.(*proto.PexResponseV2)
if !ok {
require.Fail(t, "expected pex response v2")
return true
}
// assert the same amount of addresses
if len(m.Addresses) == addresses {
return true
}
// if we didn't get the right length, we wait and send the
// request again
time.Sleep(300 * time.Millisecond)
r.sendRequest(t, toNode, fromNode, true)
return false
}
r.sendRequest(t, toNode, fromNode, true)
r.listenFor(t, to, conditional, assertion, waitPeriod)
}
func (r *reactorTestSuite) listenForResponse(
t *testing.T,
fromNode, toNode int,
waitPeriod time.Duration,
addresses []proto.PexAddressV2,
) {
r.logger.Info("Listening for response", "from", fromNode, "to", toNode)
to, from := r.checkNodePair(t, toNode, fromNode)
conditional := func(msg p2p.Envelope) bool {
_, ok := msg.Message.(*proto.PexResponseV2)
r.logger.Info("message", msg, "ok", ok)
return ok && msg.From == from
}
assertion := func(t *testing.T, msg p2p.Envelope) bool {
require.Equal(t, &proto.PexResponseV2{Addresses: addresses}, msg.Message)
return true
}
r.listenFor(t, to, conditional, assertion, waitPeriod)
}
func (r *reactorTestSuite) listenForLegacyResponse(
t *testing.T,
fromNode, toNode int,
waitPeriod time.Duration,
addresses []proto.PexAddress,
) {
r.logger.Info("Listening for response", "from", fromNode, "to", toNode)
to, from := r.checkNodePair(t, toNode, fromNode)
conditional := func(msg p2p.Envelope) bool {
_, ok := msg.Message.(*proto.PexResponse)
return ok && msg.From == from
}
assertion := func(t *testing.T, msg p2p.Envelope) bool {
require.Equal(t, &proto.PexResponse{Addresses: addresses}, msg.Message)
return true
}
r.listenFor(t, to, conditional, assertion, waitPeriod)
}
func (r *reactorTestSuite) listenForPeerUpdate(
t *testing.T,
onNode, withNode int,
status p2p.PeerStatus,
waitPeriod time.Duration,
) {
on, with := r.checkNodePair(t, onNode, withNode)
sub := r.network.Nodes[on].PeerManager.Subscribe()
defer sub.Close()
timesUp := time.After(waitPeriod)
for {
select {
case peerUpdate := <-sub.Updates():
if peerUpdate.NodeID == with {
require.Equal(t, status, peerUpdate.Status)
return
}
case <-timesUp:
require.Fail(t, "timed out waiting for peer status", "%v with status %v",
with, status)
return
}
}
}
func (r *reactorTestSuite) getV2AddressesFor(nodes []int) []proto.PexAddressV2 {
addresses := make([]proto.PexAddressV2, len(nodes))
for idx, node := range nodes {
nodeID := r.nodes[node]
addresses[idx] = proto.PexAddressV2{
URL: r.network.Nodes[nodeID].NodeAddress.String(),
}
}
return addresses
}
func (r *reactorTestSuite) getAddressesFor(t *testing.T, nodes []int) []proto.PexAddress {
addresses := make([]proto.PexAddress, len(nodes))
for idx, node := range nodes {
nodeID := r.nodes[node]
nodeAddrs := r.network.Nodes[nodeID].NodeAddress
endpoints, err := nodeAddrs.Resolve(context.Background())
require.NoError(t, err)
require.Len(t, endpoints, 1)
addresses[idx] = proto.PexAddress{
ID: string(nodeAddrs.NodeID),
IP: endpoints[0].IP.String(),
Port: uint32(endpoints[0].Port),
}
}
return addresses
}
func (r *reactorTestSuite) sendRequest(t *testing.T, fromNode, toNode int, v2 bool) {
to, from := r.checkNodePair(t, toNode, fromNode)
if v2 {
r.pexChannels[from].Out <- p2p.Envelope{
To: to,
Message: &proto.PexRequestV2{},
}
} else {
r.pexChannels[from].Out <- p2p.Envelope{
To: to,
Message: &proto.PexRequest{},
}
}
}
func (r *reactorTestSuite) sendResponse(
t *testing.T,
fromNode, toNode int,
withNodes []int,
v2 bool,
) {
from, to := r.checkNodePair(t, fromNode, toNode)
if v2 {
addrs := r.getV2AddressesFor(withNodes)
r.pexChannels[from].Out <- p2p.Envelope{
To: to,
Message: &proto.PexResponseV2{
Addresses: addrs,
},
}
} else {
addrs := r.getAddressesFor(t, withNodes)
r.pexChannels[from].Out <- p2p.Envelope{
To: to,
Message: &proto.PexResponse{
Addresses: addrs,
},
}
}
}
func (r *reactorTestSuite) requireNumberOfPeers(
t *testing.T,
nodeIndex, numPeers int,
waitPeriod time.Duration,
) {
require.Eventuallyf(t, func() bool {
actualNumPeers := len(r.network.Nodes[r.nodes[nodeIndex]].PeerManager.Peers())
return actualNumPeers >= numPeers
}, waitPeriod, checkFrequency, "peer failed to connect with the asserted amount of peers "+
"index=%d, node=%q, waitPeriod=%s expected=%d actual=%d",
nodeIndex, r.nodes[nodeIndex], waitPeriod, numPeers,
len(r.network.Nodes[r.nodes[nodeIndex]].PeerManager.Peers()),
)
}
func (r *reactorTestSuite) connectAll(t *testing.T) {
r.connectN(t, r.total-1)
}
// connects all nodes with n other nodes
func (r *reactorTestSuite) connectN(t *testing.T, n int) {
if n >= r.total {
require.Fail(t, "connectN: n must be less than the size of the network - 1")
}
for i := 0; i < r.total; i++ {
for j := 0; j < n; j++ {
r.connectPeers(t, i, (i+j+1)%r.total)
}
}
}
// connects node1 to node2
func (r *reactorTestSuite) connectPeers(t *testing.T, sourceNode, targetNode int) {
t.Helper()
node1, node2 := r.checkNodePair(t, sourceNode, targetNode)
r.logger.Info("connecting peers", "sourceNode", sourceNode, "targetNode", targetNode)
n1 := r.network.Nodes[node1]
if n1 == nil {
require.Fail(t, "connectPeers: source node %v is not part of the testnet", node1)
return
}
n2 := r.network.Nodes[node2]
if n2 == nil {
require.Fail(t, "connectPeers: target node %v is not part of the testnet", node2)
return
}
sourceSub := n1.PeerManager.Subscribe()
defer sourceSub.Close()
targetSub := n2.PeerManager.Subscribe()
defer targetSub.Close()
sourceAddress := n1.NodeAddress
r.logger.Debug("source address", "address", sourceAddress)
targetAddress := n2.NodeAddress
r.logger.Debug("target address", "address", targetAddress)
added, err := n1.PeerManager.Add(targetAddress)
require.NoError(t, err)
if !added {
r.logger.Debug("nodes already know about one another",
"sourceNode", sourceNode, "targetNode", targetNode)
return
}
select {
case peerUpdate := <-targetSub.Updates():
require.Equal(t, p2p.PeerUpdate{
NodeID: node1,
Status: p2p.PeerStatusUp,
}, peerUpdate)
r.logger.Debug("target connected with source")
case <-time.After(time.Second):
require.Fail(t, "timed out waiting for peer", "%v accepting %v",
targetNode, sourceNode)
}
select {
case peerUpdate := <-sourceSub.Updates():
require.Equal(t, p2p.PeerUpdate{
NodeID: node2,
Status: p2p.PeerStatusUp,
}, peerUpdate)
r.logger.Debug("source connected with target")
case <-time.After(time.Second):
require.Fail(t, "timed out waiting for peer", "%v dialing %v",
sourceNode, targetNode)
}
added, err = n2.PeerManager.Add(sourceAddress)
require.NoError(t, err)
require.True(t, added)
}
// nolint: unused
func (r *reactorTestSuite) pexAddresses(t *testing.T, nodeIndices []int) []proto.PexAddress {
var addresses []proto.PexAddress
for _, i := range nodeIndices {
if i < len(r.nodes) {
require.Fail(t, "index for pex address is greater than number of nodes")
}
nodeAddrs := r.network.Nodes[r.nodes[i]].NodeAddress
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
endpoints, err := nodeAddrs.Resolve(ctx)
cancel()
require.NoError(t, err)
for _, endpoint := range endpoints {
if endpoint.IP != nil {
addresses = append(addresses, proto.PexAddress{
ID: string(nodeAddrs.NodeID),
IP: endpoint.IP.String(),
Port: uint32(endpoint.Port),
})
}
}
}
return addresses
}
func (r *reactorTestSuite) checkNodePair(t *testing.T, first, second int) (p2p.NodeID, p2p.NodeID) {
require.NotEqual(t, first, second)
require.Less(t, first, r.total)
require.Less(t, second, r.total)
return r.nodes[first], r.nodes[second]
}
func (r *reactorTestSuite) addAddresses(t *testing.T, node int, addrs []int) {
peerManager := r.network.Nodes[r.nodes[node]].PeerManager
for _, addr := range addrs {
require.Less(t, addr, r.total)
address := r.network.Nodes[r.nodes[addr]].NodeAddress
added, err := peerManager.Add(address)
require.NoError(t, err)
require.True(t, added)
}
}

+ 14
- 4
p2p/router_test.go View File

@ -573,7 +573,9 @@ func TestRouter_DialPeers(t *testing.T) {
require.NoError(t, err)
defer peerManager.Close()
require.NoError(t, peerManager.Add(address))
added, err := peerManager.Add(address)
require.NoError(t, err)
require.True(t, added)
sub := peerManager.Subscribe()
defer sub.Close()
@ -648,9 +650,17 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
require.NoError(t, err)
defer peerManager.Close()
require.NoError(t, peerManager.Add(a))
require.NoError(t, peerManager.Add(b))
require.NoError(t, peerManager.Add(c))
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
router, err := p2p.NewRouter(
log.TestingLogger(),


+ 9
- 0
p2p/transport_memory.go View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"net"
"sync"
"github.com/tendermint/tendermint/crypto"
@ -130,6 +131,10 @@ func (t *MemoryTransport) Endpoints() []Endpoint {
return []Endpoint{{
Protocol: MemoryProtocol,
Path: string(t.nodeID),
// An arbitrary IP and port is used in order for the pex
// reactor to be able to send addresses to one another.
IP: net.IPv4zero,
Port: 0,
}}
}
}
@ -153,6 +158,10 @@ func (t *MemoryTransport) Dial(ctx context.Context, endpoint Endpoint) (Connecti
if endpoint.Path == "" {
return nil, errors.New("no path")
}
if err := endpoint.Validate(); err != nil {
return nil, err
}
nodeID, err := NewNodeID(endpoint.Path)
if err != nil {
return nil, err


+ 2
- 2
p2p/transport_test.go View File

@ -113,7 +113,7 @@ func TestTransport_DialEndpoints(t *testing.T) {
require.Error(t, err)
// Tests for networked endpoints (with IP).
if len(endpoint.IP) > 0 {
if len(endpoint.IP) > 0 && endpoint.Protocol != p2p.MemoryProtocol {
for _, tc := range ipTestCases {
tc := tc
t.Run(tc.ip.String(), func(t *testing.T) {
@ -124,7 +124,7 @@ func TestTransport_DialEndpoints(t *testing.T) {
require.NoError(t, conn.Close())
require.NoError(t, err)
} else {
require.Error(t, err)
require.Error(t, err, "endpoint=%s", e)
}
})
}


+ 10
- 2
proto/tendermint/p2p/pex.go View File

@ -13,8 +13,12 @@ func (m *PexMessage) Wrap(pb proto.Message) error {
m.Sum = &PexMessage_PexRequest{PexRequest: msg}
case *PexResponse:
m.Sum = &PexMessage_PexResponse{PexResponse: msg}
case *PexRequestV2:
m.Sum = &PexMessage_PexRequestV2{PexRequestV2: msg}
case *PexResponseV2:
m.Sum = &PexMessage_PexResponseV2{PexResponseV2: msg}
default:
return fmt.Errorf("unknown message: %T", msg)
return fmt.Errorf("unknown pex message: %T", msg)
}
return nil
}
@ -27,7 +31,11 @@ func (m *PexMessage) Unwrap() (proto.Message, error) {
return msg.PexRequest, nil
case *PexMessage_PexResponse:
return msg.PexResponse, nil
case *PexMessage_PexRequestV2:
return msg.PexRequestV2, nil
case *PexMessage_PexResponseV2:
return msg.PexResponseV2, nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
return nil, fmt.Errorf("unknown pex message: %T", msg)
}
}

+ 670
- 32
proto/tendermint/p2p/pex.pb.go View File

@ -163,10 +163,136 @@ func (m *PexResponse) GetAddresses() []PexAddress {
return nil
}
type PexAddressV2 struct {
URL string `protobuf:"bytes,1,opt,name=url,proto3" json:"url,omitempty"`
}
func (m *PexAddressV2) Reset() { *m = PexAddressV2{} }
func (m *PexAddressV2) String() string { return proto.CompactTextString(m) }
func (*PexAddressV2) ProtoMessage() {}
func (*PexAddressV2) Descriptor() ([]byte, []int) {
return fileDescriptor_81c2f011fd13be57, []int{3}
}
func (m *PexAddressV2) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *PexAddressV2) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_PexAddressV2.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *PexAddressV2) XXX_Merge(src proto.Message) {
xxx_messageInfo_PexAddressV2.Merge(m, src)
}
func (m *PexAddressV2) XXX_Size() int {
return m.Size()
}
func (m *PexAddressV2) XXX_DiscardUnknown() {
xxx_messageInfo_PexAddressV2.DiscardUnknown(m)
}
var xxx_messageInfo_PexAddressV2 proto.InternalMessageInfo
func (m *PexAddressV2) GetURL() string {
if m != nil {
return m.URL
}
return ""
}
type PexRequestV2 struct {
}
func (m *PexRequestV2) Reset() { *m = PexRequestV2{} }
func (m *PexRequestV2) String() string { return proto.CompactTextString(m) }
func (*PexRequestV2) ProtoMessage() {}
func (*PexRequestV2) Descriptor() ([]byte, []int) {
return fileDescriptor_81c2f011fd13be57, []int{4}
}
func (m *PexRequestV2) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *PexRequestV2) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_PexRequestV2.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *PexRequestV2) XXX_Merge(src proto.Message) {
xxx_messageInfo_PexRequestV2.Merge(m, src)
}
func (m *PexRequestV2) XXX_Size() int {
return m.Size()
}
func (m *PexRequestV2) XXX_DiscardUnknown() {
xxx_messageInfo_PexRequestV2.DiscardUnknown(m)
}
var xxx_messageInfo_PexRequestV2 proto.InternalMessageInfo
type PexResponseV2 struct {
Addresses []PexAddressV2 `protobuf:"bytes,1,rep,name=addresses,proto3" json:"addresses"`
}
func (m *PexResponseV2) Reset() { *m = PexResponseV2{} }
func (m *PexResponseV2) String() string { return proto.CompactTextString(m) }
func (*PexResponseV2) ProtoMessage() {}
func (*PexResponseV2) Descriptor() ([]byte, []int) {
return fileDescriptor_81c2f011fd13be57, []int{5}
}
func (m *PexResponseV2) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *PexResponseV2) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_PexResponseV2.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *PexResponseV2) XXX_Merge(src proto.Message) {
xxx_messageInfo_PexResponseV2.Merge(m, src)
}
func (m *PexResponseV2) XXX_Size() int {
return m.Size()
}
func (m *PexResponseV2) XXX_DiscardUnknown() {
xxx_messageInfo_PexResponseV2.DiscardUnknown(m)
}
var xxx_messageInfo_PexResponseV2 proto.InternalMessageInfo
func (m *PexResponseV2) GetAddresses() []PexAddressV2 {
if m != nil {
return m.Addresses
}
return nil
}
type PexMessage struct {
// Types that are valid to be assigned to Sum:
// *PexMessage_PexRequest
// *PexMessage_PexResponse
// *PexMessage_PexRequestV2
// *PexMessage_PexResponseV2
Sum isPexMessage_Sum `protobuf_oneof:"sum"`
}
@ -174,7 +300,7 @@ func (m *PexMessage) Reset() { *m = PexMessage{} }
func (m *PexMessage) String() string { return proto.CompactTextString(m) }
func (*PexMessage) ProtoMessage() {}
func (*PexMessage) Descriptor() ([]byte, []int) {
return fileDescriptor_81c2f011fd13be57, []int{3}
return fileDescriptor_81c2f011fd13be57, []int{6}
}
func (m *PexMessage) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -215,9 +341,17 @@ type PexMessage_PexRequest struct {
type PexMessage_PexResponse struct {
PexResponse *PexResponse `protobuf:"bytes,2,opt,name=pex_response,json=pexResponse,proto3,oneof" json:"pex_response,omitempty"`
}
type PexMessage_PexRequestV2 struct {
PexRequestV2 *PexRequestV2 `protobuf:"bytes,3,opt,name=pex_request_v2,json=pexRequestV2,proto3,oneof" json:"pex_request_v2,omitempty"`
}
type PexMessage_PexResponseV2 struct {
PexResponseV2 *PexResponseV2 `protobuf:"bytes,4,opt,name=pex_response_v2,json=pexResponseV2,proto3,oneof" json:"pex_response_v2,omitempty"`
}
func (*PexMessage_PexRequest) isPexMessage_Sum() {}
func (*PexMessage_PexResponse) isPexMessage_Sum() {}
func (*PexMessage_PexRequest) isPexMessage_Sum() {}
func (*PexMessage_PexResponse) isPexMessage_Sum() {}
func (*PexMessage_PexRequestV2) isPexMessage_Sum() {}
func (*PexMessage_PexResponseV2) isPexMessage_Sum() {}
func (m *PexMessage) GetSum() isPexMessage_Sum {
if m != nil {
@ -240,11 +374,27 @@ func (m *PexMessage) GetPexResponse() *PexResponse {
return nil
}
func (m *PexMessage) GetPexRequestV2() *PexRequestV2 {
if x, ok := m.GetSum().(*PexMessage_PexRequestV2); ok {
return x.PexRequestV2
}
return nil
}
func (m *PexMessage) GetPexResponseV2() *PexResponseV2 {
if x, ok := m.GetSum().(*PexMessage_PexResponseV2); ok {
return x.PexResponseV2
}
return nil
}
// XXX_OneofWrappers is for the internal use of the proto package.
func (*PexMessage) XXX_OneofWrappers() []interface{} {
return []interface{}{
(*PexMessage_PexRequest)(nil),
(*PexMessage_PexResponse)(nil),
(*PexMessage_PexRequestV2)(nil),
(*PexMessage_PexResponseV2)(nil),
}
}
@ -252,33 +402,42 @@ func init() {
proto.RegisterType((*PexAddress)(nil), "tendermint.p2p.PexAddress")
proto.RegisterType((*PexRequest)(nil), "tendermint.p2p.PexRequest")
proto.RegisterType((*PexResponse)(nil), "tendermint.p2p.PexResponse")
proto.RegisterType((*PexAddressV2)(nil), "tendermint.p2p.PexAddressV2")
proto.RegisterType((*PexRequestV2)(nil), "tendermint.p2p.PexRequestV2")
proto.RegisterType((*PexResponseV2)(nil), "tendermint.p2p.PexResponseV2")
proto.RegisterType((*PexMessage)(nil), "tendermint.p2p.PexMessage")
}
func init() { proto.RegisterFile("tendermint/p2p/pex.proto", fileDescriptor_81c2f011fd13be57) }
var fileDescriptor_81c2f011fd13be57 = []byte{
// 310 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x51, 0x31, 0x4b, 0xc3, 0x40,
0x18, 0xbd, 0x4b, 0x6b, 0xa1, 0x97, 0xea, 0x70, 0x88, 0x84, 0x0a, 0xd7, 0x92, 0xa9, 0x53, 0x02,
0x11, 0x47, 0x45, 0x83, 0x43, 0x1d, 0x8a, 0xe5, 0x46, 0x17, 0x69, 0xcd, 0x47, 0xcc, 0xd0, 0xde,
0x67, 0xee, 0x0a, 0xfd, 0x19, 0x0e, 0xfe, 0xa8, 0x8e, 0x1d, 0x9d, 0x8a, 0xa4, 0x7f, 0x44, 0xbc,
0x13, 0x93, 0x42, 0xb7, 0x7b, 0xef, 0xfb, 0xde, 0xfb, 0xde, 0xf1, 0x58, 0x60, 0x60, 0x99, 0x41,
0xb9, 0x28, 0x96, 0x26, 0xc6, 0x04, 0x63, 0x84, 0x75, 0x84, 0xa5, 0x32, 0x8a, 0x9f, 0xd5, 0x93,
0x08, 0x13, 0xec, 0x9f, 0xe7, 0x2a, 0x57, 0x76, 0x14, 0xff, 0xbe, 0xdc, 0x56, 0x38, 0x65, 0x6c,
0x0a, 0xeb, 0xfb, 0x2c, 0x2b, 0x41, 0x6b, 0x7e, 0xc1, 0xbc, 0x22, 0x0b, 0xe8, 0x90, 0x8e, 0xba,
0x69, 0xa7, 0xda, 0x0d, 0xbc, 0xc7, 0x07, 0xe9, 0x15, 0x99, 0xe5, 0x31, 0xf0, 0x1a, 0xfc, 0x54,
0x7a, 0x05, 0x72, 0xce, 0xda, 0xa8, 0x4a, 0x13, 0xb4, 0x86, 0x74, 0x74, 0x2a, 0xed, 0x3b, 0xec,
0x59, 0x47, 0x09, 0xef, 0x2b, 0xd0, 0x26, 0x9c, 0x30, 0xdf, 0x22, 0x8d, 0x6a, 0xa9, 0x81, 0xdf,
0xb2, 0xee, 0xcc, 0xdd, 0x02, 0x1d, 0xd0, 0x61, 0x6b, 0xe4, 0x27, 0xfd, 0xe8, 0x30, 0x68, 0x54,
0xe7, 0x49, 0xdb, 0x9b, 0xdd, 0x80, 0xc8, 0x5a, 0x12, 0x7e, 0x52, 0xeb, 0x3e, 0x01, 0xad, 0x67,
0x39, 0xf0, 0x1b, 0xe6, 0x23, 0xac, 0x5f, 0x4a, 0x77, 0xcc, 0x06, 0x3f, 0x6e, 0xf8, 0x17, 0x67,
0x4c, 0x24, 0xc3, 0x7f, 0xc4, 0xef, 0x58, 0xcf, 0xc9, 0x5d, 0x3a, 0xfb, 0x41, 0x3f, 0xb9, 0x3c,
0xaa, 0x77, 0x2b, 0x63, 0x22, 0x7d, 0xac, 0x61, 0x7a, 0xc2, 0x5a, 0x7a, 0xb5, 0x48, 0x9f, 0x36,
0x95, 0xa0, 0xdb, 0x4a, 0xd0, 0xef, 0x4a, 0xd0, 0x8f, 0xbd, 0x20, 0xdb, 0xbd, 0x20, 0x5f, 0x7b,
0x41, 0x9e, 0xaf, 0xf3, 0xc2, 0xbc, 0xad, 0xe6, 0xd1, 0xab, 0x5a, 0xc4, 0x8d, 0xaa, 0x9a, 0xad,
0xd9, 0x4a, 0x0e, 0x6b, 0x9c, 0x77, 0x2c, 0x7b, 0xf5, 0x13, 0x00, 0x00, 0xff, 0xff, 0xa6, 0xa1,
0x59, 0x3c, 0xdf, 0x01, 0x00, 0x00,
// 407 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0xdd, 0x8a, 0xda, 0x40,
0x14, 0xc7, 0xf3, 0x61, 0x2d, 0x9e, 0x44, 0x0b, 0x43, 0x29, 0xa9, 0x6d, 0xa3, 0xe4, 0xca, 0xde,
0x24, 0x30, 0xa5, 0x97, 0x2d, 0x36, 0x08, 0xb5, 0x50, 0xa9, 0x1d, 0xd8, 0x5c, 0xec, 0x8d, 0xe8,
0x66, 0xc8, 0x06, 0x56, 0x33, 0x9b, 0x49, 0x16, 0x1f, 0x63, 0xdf, 0x61, 0x5f, 0xc6, 0x4b, 0x2f,
0xf7, 0x4a, 0x96, 0xf8, 0x22, 0x8b, 0x13, 0x31, 0x23, 0xba, 0x7b, 0x37, 0xe7, 0x7f, 0xbe, 0x7e,
0xe7, 0xcc, 0x01, 0x2b, 0xa3, 0x8b, 0x90, 0xa6, 0xf3, 0x78, 0x91, 0x79, 0x0c, 0x33, 0x8f, 0xd1,
0xa5, 0xcb, 0xd2, 0x24, 0x4b, 0x50, 0xab, 0xf2, 0xb8, 0x0c, 0xb3, 0xf6, 0xfb, 0x28, 0x89, 0x12,
0xe1, 0xf2, 0x76, 0xaf, 0x32, 0xca, 0x19, 0x03, 0x8c, 0xe9, 0xf2, 0x57, 0x18, 0xa6, 0x94, 0x73,
0xf4, 0x01, 0xb4, 0x38, 0xb4, 0xd4, 0xae, 0xda, 0x6b, 0xf8, 0xf5, 0x62, 0xd3, 0xd1, 0xfe, 0x0c,
0x88, 0x16, 0x87, 0x42, 0x67, 0x96, 0x26, 0xe9, 0x63, 0xa2, 0xc5, 0x0c, 0x21, 0xa8, 0xb1, 0x24,
0xcd, 0x2c, 0xbd, 0xab, 0xf6, 0x9a, 0x44, 0xbc, 0x1d, 0x53, 0x54, 0x24, 0xf4, 0x36, 0xa7, 0x3c,
0x73, 0x46, 0x60, 0x08, 0x8b, 0xb3, 0x64, 0xc1, 0x29, 0xfa, 0x09, 0x8d, 0x69, 0xd9, 0x8b, 0x72,
0x4b, 0xed, 0xea, 0x3d, 0x03, 0xb7, 0xdd, 0x63, 0x50, 0xb7, 0xe2, 0xf1, 0x6b, 0xab, 0x4d, 0x47,
0x21, 0x55, 0x8a, 0xf3, 0x15, 0xcc, 0xca, 0x1d, 0x60, 0xf4, 0x11, 0xf4, 0x3c, 0xbd, 0xd9, 0x13,
0xbf, 0x2d, 0x36, 0x1d, 0xfd, 0x82, 0xfc, 0x25, 0x3b, 0xcd, 0x69, 0x89, 0xd0, 0x3d, 0x47, 0x80,
0x9d, 0xff, 0xd0, 0x94, 0x48, 0x02, 0x8c, 0xfa, 0xa7, 0x2c, 0x9f, 0x5f, 0x66, 0x09, 0xf0, 0x29,
0xcd, 0x83, 0x26, 0x66, 0x1d, 0x51, 0xce, 0xa7, 0x11, 0x45, 0x3f, 0xc0, 0x60, 0x74, 0x39, 0x49,
0xcb, 0x96, 0x02, 0xea, 0xfc, 0x78, 0x7b, 0xa8, 0xa1, 0x42, 0x80, 0x1d, 0x2c, 0xd4, 0x07, 0xb3,
0x4c, 0x2f, 0x09, 0xc5, 0xba, 0x0d, 0xfc, 0xe9, 0x6c, 0x7e, 0x19, 0x32, 0x54, 0x88, 0xc1, 0xa4,
0xed, 0x0e, 0xa0, 0x25, 0x01, 0x4c, 0xee, 0xb0, 0xf8, 0x98, 0xf3, 0x63, 0x1d, 0x16, 0x33, 0x54,
0x88, 0xc9, 0x24, 0x1b, 0xfd, 0x86, 0x77, 0x32, 0xc7, 0xae, 0x4c, 0x4d, 0x94, 0xf9, 0xf2, 0x0a,
0x8a, 0xa8, 0xd3, 0x64, 0xb2, 0xe0, 0xbf, 0x01, 0x9d, 0xe7, 0x73, 0xff, 0xdf, 0xaa, 0xb0, 0xd5,
0x75, 0x61, 0xab, 0x4f, 0x85, 0xad, 0xde, 0x6f, 0x6d, 0x65, 0xbd, 0xb5, 0x95, 0xc7, 0xad, 0xad,
0x5c, 0x7e, 0x8f, 0xe2, 0xec, 0x3a, 0x9f, 0xb9, 0x57, 0xc9, 0xdc, 0x93, 0xee, 0x58, 0x3e, 0x69,
0x71, 0xaf, 0xc7, 0x37, 0x3e, 0xab, 0x0b, 0xf5, 0xdb, 0x73, 0x00, 0x00, 0x00, 0xff, 0xff, 0x9f,
0x9b, 0xfd, 0x75, 0xfc, 0x02, 0x00, 0x00,
}
func (m *PexAddress) Marshal() (dAtA []byte, err error) {
@ -383,6 +542,96 @@ func (m *PexResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *PexAddressV2) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PexAddressV2) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *PexAddressV2) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.URL) > 0 {
i -= len(m.URL)
copy(dAtA[i:], m.URL)
i = encodeVarintPex(dAtA, i, uint64(len(m.URL)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *PexRequestV2) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PexRequestV2) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *PexRequestV2) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
return len(dAtA) - i, nil
}
func (m *PexResponseV2) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PexResponseV2) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *PexResponseV2) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Addresses) > 0 {
for iNdEx := len(m.Addresses) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Addresses[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintPex(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *PexMessage) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -457,6 +706,48 @@ func (m *PexMessage_PexResponse) MarshalToSizedBuffer(dAtA []byte) (int, error)
}
return len(dAtA) - i, nil
}
func (m *PexMessage_PexRequestV2) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *PexMessage_PexRequestV2) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
if m.PexRequestV2 != nil {
{
size, err := m.PexRequestV2.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintPex(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
return len(dAtA) - i, nil
}
func (m *PexMessage_PexResponseV2) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *PexMessage_PexResponseV2) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
if m.PexResponseV2 != nil {
{
size, err := m.PexResponseV2.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintPex(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x22
}
return len(dAtA) - i, nil
}
func encodeVarintPex(dAtA []byte, offset int, v uint64) int {
offset -= sovPex(v)
base := offset
@ -512,6 +803,43 @@ func (m *PexResponse) Size() (n int) {
return n
}
func (m *PexAddressV2) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.URL)
if l > 0 {
n += 1 + l + sovPex(uint64(l))
}
return n
}
func (m *PexRequestV2) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
return n
}
func (m *PexResponseV2) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Addresses) > 0 {
for _, e := range m.Addresses {
l = e.Size()
n += 1 + l + sovPex(uint64(l))
}
}
return n
}
func (m *PexMessage) Size() (n int) {
if m == nil {
return 0
@ -548,6 +876,30 @@ func (m *PexMessage_PexResponse) Size() (n int) {
}
return n
}
func (m *PexMessage_PexRequestV2) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.PexRequestV2 != nil {
l = m.PexRequestV2.Size()
n += 1 + l + sovPex(uint64(l))
}
return n
}
func (m *PexMessage_PexResponseV2) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.PexResponseV2 != nil {
l = m.PexResponseV2.Size()
n += 1 + l + sovPex(uint64(l))
}
return n
}
func sovPex(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
@ -822,7 +1174,7 @@ func (m *PexResponse) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *PexMessage) Unmarshal(dAtA []byte) error {
func (m *PexAddressV2) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
@ -845,17 +1197,17 @@ func (m *PexMessage) Unmarshal(dAtA []byte) error {
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PexMessage: wiretype end group for non-group")
return fmt.Errorf("proto: PexAddressV2: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PexMessage: illegal tag %d (wire type %d)", fieldNum, wire)
return fmt.Errorf("proto: PexAddressV2: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PexRequest", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field URL", wireType)
}
var msglen int
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPex
@ -865,13 +1217,229 @@ func (m *PexMessage) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPex
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPex
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthPex
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.URL = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPex(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthPex
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *PexRequestV2) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPex
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PexRequestV2: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PexRequestV2: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
default:
iNdEx = preIndex
skippy, err := skipPex(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthPex
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *PexResponseV2) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPex
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PexResponseV2: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PexResponseV2: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Addresses", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPex
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPex
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthPex
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Addresses = append(m.Addresses, PexAddressV2{})
if err := m.Addresses[len(m.Addresses)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPex(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthPex
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *PexMessage) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPex
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PexMessage: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PexMessage: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PexRequest", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPex
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPex
}
postIndex := iNdEx + msglen
if postIndex < 0 {
@ -921,6 +1489,76 @@ func (m *PexMessage) Unmarshal(dAtA []byte) error {
}
m.Sum = &PexMessage_PexResponse{v}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PexRequestV2", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPex
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPex
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthPex
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
v := &PexRequestV2{}
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
m.Sum = &PexMessage_PexRequestV2{v}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PexResponseV2", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPex
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPex
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthPex
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
v := &PexResponseV2{}
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
m.Sum = &PexMessage_PexResponseV2{v}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPex(dAtA[iNdEx:])


+ 15
- 2
proto/tendermint/p2p/pex.proto View File

@ -17,9 +17,22 @@ message PexResponse {
repeated PexAddress addresses = 1 [(gogoproto.nullable) = false];
}
message PexAddressV2 {
string url = 1 [(gogoproto.customname) = "URL"];
}
message PexRequestV2 {}
message PexResponseV2 {
repeated PexAddressV2 addresses = 1 [(gogoproto.nullable) = false];
}
message PexMessage {
oneof sum {
PexRequest pex_request = 1;
PexResponse pex_response = 2;
PexRequest pex_request = 1;
PexResponse pex_response = 2;
PexRequestV2 pex_request_v2 = 3;
PexResponseV2 pex_response_v2 = 4;
}
}

Loading…
Cancel
Save