Browse Source

p2p/pex: minor cleanup and comments

pull/1519/head
Ethan Buchman 7 years ago
parent
commit
c23909eecf
3 changed files with 65 additions and 37 deletions
  1. +28
    -19
      p2p/pex/addrbook.go
  2. +35
    -18
      p2p/pex/pex_reactor.go
  3. +2
    -0
      p2p/switch.go

+ 28
- 19
p2p/pex/addrbook.go View File

@ -7,7 +7,6 @@ package pex
import ( import (
"crypto/sha256" "crypto/sha256"
"encoding/binary" "encoding/binary"
"fmt"
"math" "math"
"net" "net"
"sync" "sync"
@ -169,7 +168,9 @@ func (a *addrBook) OurAddress(addr *p2p.NetAddress) bool {
return ok return ok
} }
// AddAddress implements AddrBook - adds the given address as received from the given source.
// AddAddress implements AddrBook
// Add address to a "new" bucket. If it's already in one, only add it probabilistically.
// Returns error if the addr is non-routable. Does not add self.
// NOTE: addr must not be nil // NOTE: addr must not be nil
func (a *addrBook) AddAddress(addr *p2p.NetAddress, src *p2p.NetAddress) error { func (a *addrBook) AddAddress(addr *p2p.NetAddress, src *p2p.NetAddress) error {
a.mtx.Lock() a.mtx.Lock()
@ -298,25 +299,27 @@ func (a *addrBook) GetSelection() []*p2p.NetAddress {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
if a.size() == 0 {
bookSize := a.size()
if bookSize == 0 {
return nil return nil
} }
allAddr := make([]*p2p.NetAddress, a.size())
numAddresses := cmn.MaxInt(
cmn.MinInt(minGetSelection, bookSize),
bookSize*getSelectionPercent/100)
numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
// XXX: instead of making a list of all addresses, shuffling, and slicing a random chunk,
// could we just select a random numAddresses of indexes?
allAddr := make([]*p2p.NetAddress, bookSize)
i := 0 i := 0
for _, ka := range a.addrLookup { for _, ka := range a.addrLookup {
allAddr[i] = ka.Addr allAddr[i] = ka.Addr
i++ i++
} }
numAddresses := cmn.MaxInt(
cmn.MinInt(minGetSelection, len(allAddr)),
len(allAddr)*getSelectionPercent/100)
numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
// Fisher-Yates shuffle the array. We only need to do the first // Fisher-Yates shuffle the array. We only need to do the first
// `numAddresses' since we are throwing the rest. // `numAddresses' since we are throwing the rest.
// XXX: What's the point of this if we already loop randomly through addrLookup ?
for i := 0; i < numAddresses; i++ { for i := 0; i < numAddresses; i++ {
// pick a number between current index and the end // pick a number between current index and the end
j := cmn.RandIntn(len(allAddr)-i) + i j := cmn.RandIntn(len(allAddr)-i) + i
@ -338,7 +341,8 @@ func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddre
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
if a.size() == 0 {
bookSize := a.size()
if bookSize == 0 {
return nil return nil
} }
@ -350,8 +354,8 @@ func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddre
} }
numAddresses := cmn.MaxInt( numAddresses := cmn.MaxInt(
cmn.MinInt(minGetSelection, a.size()),
a.size()*getSelectionPercent/100)
cmn.MinInt(minGetSelection, bookSize),
bookSize*getSelectionPercent/100)
numAddresses = cmn.MinInt(maxGetSelection, numAddresses) numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
selection := make([]*p2p.NetAddress, numAddresses) selection := make([]*p2p.NetAddress, numAddresses)
@ -605,12 +609,19 @@ func (a *addrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
// adds the address to a "new" bucket. if its already in one, // adds the address to a "new" bucket. if its already in one,
// it only adds it probabilistically // it only adds it probabilistically
func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error { func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error {
if addr == nil {
return ErrAddrBookNilAddr{addr, src}
}
if src == nil {
return ErrAddrBookNilAddr{addr, src}
}
if a.routabilityStrict && !addr.Routable() { if a.routabilityStrict && !addr.Routable() {
return fmt.Errorf("Cannot add non-routable address %v", addr)
return ErrAddrBookNonRoutable{addr}
} }
// TODO: we should track ourAddrs by ID and by IP:PORT and refuse both.
if _, ok := a.ourAddrs[addr.String()]; ok { if _, ok := a.ourAddrs[addr.String()]; ok {
// Ignore our own listener address.
return fmt.Errorf("Cannot add ourselves with address %v", addr)
return ErrAddrBookSelf
} }
ka := a.addrLookup[addr.ID] ka := a.addrLookup[addr.ID]
@ -636,10 +647,8 @@ func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error {
bucket := a.calcNewBucket(addr, src) bucket := a.calcNewBucket(addr, src)
added := a.addToNewBucket(ka, bucket) added := a.addToNewBucket(ka, bucket)
if !added { if !added {
a.Logger.Info("Can't add new address, addr book is full", "address", addr, "total", a.size())
return ErrAddrBookFull{addr, a.size()}
} }
a.Logger.Info("Added new address", "address", addr, "total", a.size())
return nil return nil
} }


+ 35
- 18
p2p/pex/pex_reactor.go View File

@ -167,17 +167,28 @@ func (r *PEXReactor) AddPeer(p Peer) {
r.RequestAddrs(p) r.RequestAddrs(p)
} }
} else { } else {
// For inbound peers, the peer is its own source,
// and its NodeInfo has already been validated.
// Let the ensurePeersRoutine handle asking for more
// peers when we need - we don't trust inbound peers as much.
// inbound peer is its own source
addr := p.NodeInfo().NetAddress() addr := p.NodeInfo().NetAddress()
if !isAddrPrivate(addr, r.config.PrivatePeerIDs) {
err := r.book.AddAddress(addr, addr)
if err != nil {
src := addr
// ignore private addrs
if isAddrPrivate(addr, r.config.PrivatePeerIDs) {
return
}
// add to book. dont RequestAddrs right away because
// we don't trust inbound as much - let ensurePeersRoutine handle it.
err := r.book.AddAddress(addr, src)
if err != nil {
switch err.(type) {
case ErrAddrBookNilAddr:
r.Logger.Error("Failed to add new address", "err", err) r.Logger.Error("Failed to add new address", "err", err)
default:
// non-routable, self, full book, etc.
r.Logger.Debug("Failed to add new address", "err", err)
} }
} }
} }
} }
@ -277,25 +288,31 @@ func (r *PEXReactor) RequestAddrs(p Peer) {
// request for this peer and deletes the open request. // request for this peer and deletes the open request.
// If there's no open request for the src peer, it returns an error. // If there's no open request for the src peer, it returns an error.
func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error { func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
id := string(src.ID())
id := string(src.ID())
if !r.requestsSent.Has(id) { if !r.requestsSent.Has(id) {
return cmn.NewError("Received unsolicited pexAddrsMessage") return cmn.NewError("Received unsolicited pexAddrsMessage")
} }
r.requestsSent.Delete(id) r.requestsSent.Delete(id)
srcAddr := src.NodeInfo().NetAddress() srcAddr := src.NodeInfo().NetAddress()
for _, netAddr := range addrs { for _, netAddr := range addrs {
// TODO: make sure correct nodes never send nil and return error // TODO: make sure correct nodes never send nil and return error
// if a netAddr == nil
if netAddr != nil && !isAddrPrivate(netAddr, r.config.PrivatePeerIDs) {
// TODO: Should we moe the list of private peers into the AddrBook so AddAddress
// can do the check for us, and we don't have to worry about checking before calling ?
err := r.book.AddAddress(netAddr, srcAddr)
if err != nil {
r.Logger.Error("Failed to add new address", "err", err)
}
if netAddr == nil {
return cmn.NewError("received nil addr")
}
// ignore private peers
// TODO: give private peers to AddrBook so it can enforce this on AddAddress.
// We'd then have to check for ErrPrivatePeer on AddAddress here, which is
// an error we just ignore (maybe peer is probing us for our private peers :P)
if isAddrPrivate(netAddr, r.config.PrivatePeerIDs) {
continue
}
err := r.book.AddAddress(netAddr, srcAddr)
if err != nil {
r.Logger.Error("Failed to add new address", "err", err)
} }
} }
return nil return nil
@ -627,7 +644,7 @@ func (r *PEXReactor) attemptDisconnects() {
} }
} }
// isAddrPrivate returns true if addr is private.
// isAddrPrivate returns true if addr.ID is a private ID.
func isAddrPrivate(addr *p2p.NetAddress, privatePeerIDs []string) bool { func isAddrPrivate(addr *p2p.NetAddress, privatePeerIDs []string) bool {
for _, id := range privatePeerIDs { for _, id := range privatePeerIDs {
if string(addr.ID) == id { if string(addr.ID) == id {


+ 2
- 0
p2p/switch.go View File

@ -260,6 +260,7 @@ func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
sw.stopAndRemovePeer(peer, reason) sw.stopAndRemovePeer(peer, reason)
if peer.IsPersistent() { if peer.IsPersistent() {
// NOTE: this is the self-reported addr, not the original we dialed
go sw.reconnectToPeer(peer.NodeInfo().NetAddress()) go sw.reconnectToPeer(peer.NodeInfo().NetAddress())
} }
} }
@ -351,6 +352,7 @@ func (sw *Switch) IsDialing(id ID) bool {
} }
// DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent). // DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
// Used to dial peers from config on startup or from unsafe-RPC (trusted sources).
// TODO: remove addrBook arg since it's now set on the switch // TODO: remove addrBook arg since it's now set on the switch
func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error { func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error {
netAddrs, errs := NewNetAddressStrings(peers) netAddrs, errs := NewNetAddressStrings(peers)


Loading…
Cancel
Save