Browse Source

Merge pull request #859 from tendermint/fix/addrbook

Fix/addrbook
pull/761/merge
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
e997db7a23
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 146 additions and 101 deletions
  1. +88
    -68
      p2p/addrbook.go
  2. +39
    -0
      p2p/addrbook_test.go
  3. +19
    -33
      p2p/pex_reactor.go

+ 88
- 68
p2p/addrbook.go View File

@ -7,6 +7,7 @@ package p2p
import (
"encoding/binary"
"encoding/json"
"fmt"
"math"
"math/rand"
"net"
@ -40,7 +41,7 @@ const (
// old buckets over which an address group will be spread.
oldBucketsPerGroup = 4
// new buckets over which an source address group will be spread.
// new buckets over which a source address group will be spread.
newBucketsPerGroup = 32
// buckets a frequently seen new address may end up in.
@ -79,18 +80,22 @@ const (
type AddrBook struct {
cmn.BaseService
mtx sync.Mutex
// immutable after creation
filePath string
routabilityStrict bool
rand *rand.Rand
key string
ourAddrs map[string]*NetAddress
addrLookup map[string]*knownAddress // new & old
addrNew []map[string]*knownAddress
addrOld []map[string]*knownAddress
wg sync.WaitGroup
nOld int
nNew int
// accessed concurrently
mtx sync.Mutex
rand *rand.Rand
ourAddrs map[string]*NetAddress
addrLookup map[string]*knownAddress // new & old
bucketsOld []map[string]*knownAddress
bucketsNew []map[string]*knownAddress
nOld int
nNew int
wg sync.WaitGroup
}
// NewAddrBook creates a new address book.
@ -112,14 +117,14 @@ func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
func (a *AddrBook) init() {
a.key = crypto.CRandHex(24) // 24/2 * 8 = 96 bits
// New addr buckets
a.addrNew = make([]map[string]*knownAddress, newBucketCount)
for i := range a.addrNew {
a.addrNew[i] = make(map[string]*knownAddress)
a.bucketsNew = make([]map[string]*knownAddress, newBucketCount)
for i := range a.bucketsNew {
a.bucketsNew[i] = make(map[string]*knownAddress)
}
// Old addr buckets
a.addrOld = make([]map[string]*knownAddress, oldBucketCount)
for i := range a.addrOld {
a.addrOld[i] = make(map[string]*knownAddress)
a.bucketsOld = make([]map[string]*knownAddress, oldBucketCount)
for i := range a.bucketsOld {
a.bucketsOld[i] = make(map[string]*knownAddress)
}
}
@ -145,6 +150,7 @@ func (a *AddrBook) Wait() {
a.wg.Wait()
}
// AddOurAddress adds another one of our addresses.
func (a *AddrBook) AddOurAddress(addr *NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
@ -152,6 +158,7 @@ func (a *AddrBook) AddOurAddress(addr *NetAddress) {
a.ourAddrs[addr.String()] = addr
}
// OurAddresses returns a list of our addresses.
func (a *AddrBook) OurAddresses() []*NetAddress {
addrs := []*NetAddress{}
for _, addr := range a.ourAddrs {
@ -160,18 +167,20 @@ func (a *AddrBook) OurAddresses() []*NetAddress {
return addrs
}
// AddAddress adds the given address as received from the given source.
// NOTE: addr must not be nil
func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) {
func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) error {
a.mtx.Lock()
defer a.mtx.Unlock()
a.Logger.Info("Add address to book", "addr", addr, "src", src)
a.addAddress(addr, src)
return a.addAddress(addr, src)
}
// NeedMoreAddrs returns true if there are not have enough addresses in the book.
func (a *AddrBook) NeedMoreAddrs() bool {
return a.Size() < needAddressThreshold
}
// Size returns the number of addresses in the book.
func (a *AddrBook) Size() int {
a.mtx.Lock()
defer a.mtx.Unlock()
@ -182,7 +191,12 @@ func (a *AddrBook) size() int {
return a.nNew + a.nOld
}
// Pick an address to connect to with new/old bias.
// PickAddress picks an address to connect to.
// The address is picked randomly from an old or new bucket according
// to the newBias argument, which must be between [0, 100] (or else is truncated to that range)
// and determines how biased we are to pick an address from a new bucket.
// PickAddress returns nil if the AddrBook is empty or if we try to pick
// from an empty bucket.
func (a *AddrBook) PickAddress(newBias int) *NetAddress {
a.mtx.Lock()
defer a.mtx.Unlock()
@ -201,40 +215,34 @@ func (a *AddrBook) PickAddress(newBias int) *NetAddress {
oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(newBias))
newCorrelation := math.Sqrt(float64(a.nNew)) * float64(newBias)
if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation {
// pick random Old bucket.
var bucket map[string]*knownAddress = nil
for len(bucket) == 0 {
bucket = a.addrOld[a.rand.Intn(len(a.addrOld))]
}
// pick a random ka from bucket.
randIndex := a.rand.Intn(len(bucket))
for _, ka := range bucket {
if randIndex == 0 {
return ka.Addr
}
randIndex--
}
cmn.PanicSanity("Should not happen")
} else {
// pick random New bucket.
var bucket map[string]*knownAddress = nil
for len(bucket) == 0 {
bucket = a.addrNew[a.rand.Intn(len(a.addrNew))]
// pick a random peer from a random bucket
var bucket map[string]*knownAddress
pickFromOldBucket := (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation
if (pickFromOldBucket && a.nOld == 0) ||
(!pickFromOldBucket && a.nNew == 0) {
return nil
}
// loop until we pick a random non-empty bucket
for len(bucket) == 0 {
if pickFromOldBucket {
bucket = a.bucketsOld[a.rand.Intn(len(a.bucketsOld))]
} else {
bucket = a.bucketsNew[a.rand.Intn(len(a.bucketsNew))]
}
// pick a random ka from bucket.
randIndex := a.rand.Intn(len(bucket))
for _, ka := range bucket {
if randIndex == 0 {
return ka.Addr
}
randIndex--
}
// pick a random index and loop over the map to return that index
randIndex := a.rand.Intn(len(bucket))
for _, ka := range bucket {
if randIndex == 0 {
return ka.Addr
}
cmn.PanicSanity("Should not happen")
randIndex--
}
return nil
}
// MarkGood marks the peer as good and moves it into an "old" bucket.
// XXX: we never call this!
func (a *AddrBook) MarkGood(addr *NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
@ -248,6 +256,7 @@ func (a *AddrBook) MarkGood(addr *NetAddress) {
}
}
// MarkAttempt marks that an attempt was made to connect to the address.
func (a *AddrBook) MarkAttempt(addr *NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
@ -301,6 +310,7 @@ func (a *AddrBook) GetSelection() []*NetAddress {
// Fisher-Yates shuffle the array. We only need to do the first
// `numAddresses' since we are throwing the rest.
// XXX: What's the point of this if we already loop randomly through addrLookup ?
for i := 0; i < numAddresses; i++ {
// pick a number between current index and the end
j := rand.Intn(len(allAddr)-i) + i
@ -370,7 +380,7 @@ func (a *AddrBook) loadFromFile(filePath string) bool {
// Restore all the fields...
// Restore the key
a.key = aJSON.Key
// Restore .addrNew & .addrOld
// Restore .bucketsNew & .bucketsOld
for _, ka := range aJSON.Addrs {
for _, bucketIndex := range ka.Buckets {
bucket := a.getBucket(ka.BucketType, bucketIndex)
@ -397,17 +407,17 @@ func (a *AddrBook) Save() {
func (a *AddrBook) saveRoutine() {
defer a.wg.Done()
dumpAddressTicker := time.NewTicker(dumpAddressInterval)
saveFileTicker := time.NewTicker(dumpAddressInterval)
out:
for {
select {
case <-dumpAddressTicker.C:
case <-saveFileTicker.C:
a.saveToFile(a.filePath)
case <-a.Quit:
break out
}
}
dumpAddressTicker.Stop()
saveFileTicker.Stop()
a.saveToFile(a.filePath)
a.Logger.Info("Address handler done")
}
@ -415,9 +425,9 @@ out:
func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
switch bucketType {
case bucketTypeNew:
return a.addrNew[bucketIdx]
return a.bucketsNew[bucketIdx]
case bucketTypeOld:
return a.addrOld[bucketIdx]
return a.bucketsOld[bucketIdx]
default:
cmn.PanicSanity("Should not happen")
return nil
@ -472,7 +482,7 @@ func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool {
}
addrStr := ka.Addr.String()
bucket := a.getBucket(bucketTypeNew, bucketIdx)
bucket := a.getBucket(bucketTypeOld, bucketIdx)
// Already exists?
if _, ok := bucket[addrStr]; ok {
@ -538,14 +548,13 @@ func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
return oldest
}
func (a *AddrBook) addAddress(addr, src *NetAddress) {
func (a *AddrBook) addAddress(addr, src *NetAddress) error {
if a.routabilityStrict && !addr.Routable() {
a.Logger.Error(cmn.Fmt("Cannot add non-routable address %v", addr))
return
return fmt.Errorf("Cannot add non-routable address %v", addr)
}
if _, ok := a.ourAddrs[addr.String()]; ok {
// Ignore our own listener address.
return
return fmt.Errorf("Cannot add ourselves with address %v", addr)
}
ka := a.addrLookup[addr.String()]
@ -553,16 +562,16 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) {
if ka != nil {
// Already old.
if ka.isOld() {
return
return nil
}
// Already in max new buckets.
if len(ka.Buckets) == maxNewBucketsPerAddress {
return
return nil
}
// The more entries we have, the less likely we are to add more.
factor := int32(2 * len(ka.Buckets))
if a.rand.Int31n(factor) != 0 {
return
return nil
}
} else {
ka = newKnownAddress(addr, src)
@ -572,12 +581,13 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) {
a.addToNewBucket(ka, bucket)
a.Logger.Info("Added new address", "address", addr, "total", a.size())
return nil
}
// Make space in the new buckets by expiring the really bad entries.
// If no bad entries are available we remove the oldest.
func (a *AddrBook) expireNew(bucketIdx int) {
for addrStr, ka := range a.addrNew[bucketIdx] {
for addrStr, ka := range a.bucketsNew[bucketIdx] {
// If an entry is bad, throw it away
if ka.isBad() {
a.Logger.Info(cmn.Fmt("expiring bad address %v", addrStr))
@ -679,8 +689,8 @@ func (a *AddrBook) calcOldBucket(addr *NetAddress) int {
}
// Return a string representing the network group of this address.
// This is the /16 for IPv6, the /32 (/36 for he.net) for IPv6, the string
// "local" for a local address and the string "unroutable for an unroutable
// This is the /16 for IPv4, the /32 (/36 for he.net) for IPv6, the string
// "local" for a local address and the string "unroutable" for an unroutable
// address.
func (a *AddrBook) groupKey(na *NetAddress) string {
if a.routabilityStrict && na.Local() {
@ -806,8 +816,8 @@ func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
}
/*
An address is bad if the address in question has not been tried in the last
minute and meets one of the following criteria:
An address is bad if the address in question is a New address, has not been tried in the last
minute, and meets one of the following criteria:
1) It claims to be from the future
2) It hasn't been seen in over a month
@ -816,14 +826,23 @@ func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
All addresses that meet these criteria are assumed to be worthless and not
worth keeping hold of.
XXX: so a good peer needs us to call MarkGood before the conditions above are reached!
*/
func (ka *knownAddress) isBad() bool {
// Is Old --> good
if ka.BucketType == bucketTypeOld {
return false
}
// Has been attempted in the last minute --> good
if ka.LastAttempt.Before(time.Now().Add(-1 * time.Minute)) {
return false
}
// Over a month old?
// Too old?
// XXX: does this mean if we've kept a connection up for this long we'll disconnect?!
// and shouldn't it be .Before ?
if ka.LastAttempt.After(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) {
return true
}
@ -834,6 +853,7 @@ func (ka *knownAddress) isBad() bool {
}
// Hasn't succeeded in too long?
// XXX: does this mean if we've kept a connection up for this long we'll disconnect?!
if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) &&
ka.Attempts >= maxFailures {
return true


+ 39
- 0
p2p/addrbook_test.go View File

@ -23,6 +23,42 @@ func createTempFileName(prefix string) string {
return fname
}
func TestAddrBookPickAddress(t *testing.T) {
assert := assert.New(t)
fname := createTempFileName("addrbook_test")
// 0 addresses
book := NewAddrBook(fname, true)
book.SetLogger(log.TestingLogger())
assert.Zero(book.Size())
addr := book.PickAddress(50)
assert.Nil(addr, "expected no address")
randAddrs := randNetAddressPairs(t, 1)
addrSrc := randAddrs[0]
book.AddAddress(addrSrc.addr, addrSrc.src)
// pick an address when we only have new address
addr = book.PickAddress(0)
assert.NotNil(addr, "expected an address")
addr = book.PickAddress(50)
assert.NotNil(addr, "expected an address")
addr = book.PickAddress(100)
assert.NotNil(addr, "expected an address")
// pick an address when we only have old address
book.MarkGood(addrSrc.addr)
addr = book.PickAddress(0)
assert.NotNil(addr, "expected an address")
addr = book.PickAddress(50)
assert.NotNil(addr, "expected an address")
// in this case, nNew==0 but we biased 100% to new, so we return nil
addr = book.PickAddress(100)
assert.Nil(addr, "did not expected an address")
}
func TestAddrBookSaveLoad(t *testing.T) {
fname := createTempFileName("addrbook_test")
@ -76,6 +112,7 @@ func TestAddrBookLookup(t *testing.T) {
}
func TestAddrBookPromoteToOld(t *testing.T) {
assert := assert.New(t)
fname := createTempFileName("addrbook_test")
randAddrs := randNetAddressPairs(t, 100)
@ -106,6 +143,8 @@ func TestAddrBookPromoteToOld(t *testing.T) {
if len(selection) > book.Size() {
t.Errorf("selection could not be bigger than the book")
}
assert.Equal(book.Size(), 100, "expecting book size to be 100")
}
func TestAddrBookHandlesDuplicates(t *testing.T) {


+ 19
- 33
p2p/pex_reactor.go View File

@ -240,43 +240,29 @@ func (r *PEXReactor) ensurePeers() {
return
}
toDial := make(map[string]*NetAddress)
// bias to prefer more vetted peers when we have fewer connections.
// not perfect, but somewhate ensures that we prioritize connecting to more-vetted
newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
// Try to pick numToDial addresses to dial.
for i := 0; i < numToDial; i++ {
// The purpose of newBias is to first prioritize old (more vetted) peers
// when we have few connections, but to allow for new (less vetted) peers
// if we already have many connections. This algorithm isn't perfect, but
// it somewhat ensures that we prioritize connecting to more-vetted
// peers.
newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
var picked *NetAddress
// Try to fetch a new peer 3 times.
// This caps the maximum number of tries to 3 * numToDial.
for j := 0; j < 3; j++ {
try := r.book.PickAddress(newBias)
if try == nil {
break
}
_, alreadySelected := toDial[try.IP.String()]
alreadyDialing := r.Switch.IsDialing(try)
alreadyConnected := r.Switch.Peers().Has(try.IP.String())
if alreadySelected || alreadyDialing || alreadyConnected {
// r.Logger.Info("Cannot dial address", "addr", try,
// "alreadySelected", alreadySelected,
// "alreadyDialing", alreadyDialing,
// "alreadyConnected", alreadyConnected)
continue
} else {
r.Logger.Info("Will dial address", "addr", try)
picked = try
break
}
toDial := make(map[string]*NetAddress)
// Try maxAttempts times to pick numToDial addresses to dial
maxAttempts := numToDial * 3
for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
try := r.book.PickAddress(newBias)
if try == nil {
continue
}
if _, selected := toDial[try.IP.String()]; selected {
continue
}
if dialling := r.Switch.IsDialing(try); dialling {
continue
}
if picked == nil {
if connected := r.Switch.Peers().Has(try.IP.String()); connected {
continue
}
toDial[picked.IP.String()] = picked
r.Logger.Info("Will dial address", "addr", try)
toDial[try.IP.String()] = try
}
// Dial picked addresses


Loading…
Cancel
Save