@ -7,6 +7,7 @@ package p2p
import (
"encoding/binary"
"encoding/json"
"fmt"
"math"
"math/rand"
"net"
@ -40,7 +41,7 @@ const (
// old buckets over which an address group will be spread.
oldBucketsPerGroup = 4
// new buckets over which an source address group will be spread.
// new buckets over which a source address group will be spread.
newBucketsPerGroup = 32
// buckets a frequently seen new address may end up in.
@ -79,18 +80,22 @@ const (
type AddrBook struct {
cmn . BaseService
mtx sync . Mutex
// immutable after creation
filePath string
routabilityStrict bool
rand * rand . Rand
key string
ourAddrs map [ string ] * NetAddress
addrLookup map [ string ] * knownAddress // new & old
addrNew [ ] map [ string ] * knownAddress
addrOld [ ] map [ string ] * knownAddress
wg sync . WaitGroup
nOld int
nNew int
// accessed concurrently
mtx sync . Mutex
rand * rand . Rand
ourAddrs map [ string ] * NetAddress
addrLookup map [ string ] * knownAddress // new & old
bucketsOld [ ] map [ string ] * knownAddress
bucketsNew [ ] map [ string ] * knownAddress
nOld int
nNew int
wg sync . WaitGroup
}
// NewAddrBook creates a new address book.
@ -112,14 +117,14 @@ func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
func ( a * AddrBook ) init ( ) {
a . key = crypto . CRandHex ( 24 ) // 24/2 * 8 = 96 bits
// New addr buckets
a . addr New = make ( [ ] map [ string ] * knownAddress , newBucketCount )
for i := range a . addr New {
a . addr New[ i ] = make ( map [ string ] * knownAddress )
a . buckets New = make ( [ ] map [ string ] * knownAddress , newBucketCount )
for i := range a . buckets New {
a . buckets New[ i ] = make ( map [ string ] * knownAddress )
}
// Old addr buckets
a . addr Old = make ( [ ] map [ string ] * knownAddress , oldBucketCount )
for i := range a . addr Old {
a . addr Old[ i ] = make ( map [ string ] * knownAddress )
a . buckets Old = make ( [ ] map [ string ] * knownAddress , oldBucketCount )
for i := range a . buckets Old {
a . buckets Old[ i ] = make ( map [ string ] * knownAddress )
}
}
@ -145,6 +150,7 @@ func (a *AddrBook) Wait() {
a . wg . Wait ( )
}
// AddOurAddress adds another one of our addresses.
func ( a * AddrBook ) AddOurAddress ( addr * NetAddress ) {
a . mtx . Lock ( )
defer a . mtx . Unlock ( )
@ -152,6 +158,7 @@ func (a *AddrBook) AddOurAddress(addr *NetAddress) {
a . ourAddrs [ addr . String ( ) ] = addr
}
// OurAddresses returns a list of our addresses.
func ( a * AddrBook ) OurAddresses ( ) [ ] * NetAddress {
addrs := [ ] * NetAddress { }
for _ , addr := range a . ourAddrs {
@ -160,18 +167,20 @@ func (a *AddrBook) OurAddresses() []*NetAddress {
return addrs
}
// AddAddress adds the given address as received from the given source.
// NOTE: addr must not be nil
func ( a * AddrBook ) AddAddress ( addr * NetAddress , src * NetAddress ) {
func ( a * AddrBook ) AddAddress ( addr * NetAddress , src * NetAddress ) error {
a . mtx . Lock ( )
defer a . mtx . Unlock ( )
a . Logger . Info ( "Add address to book" , "addr" , addr , "src" , src )
a . addAddress ( addr , src )
return a . addAddress ( addr , src )
}
// NeedMoreAddrs returns true if there are not have enough addresses in the book.
func ( a * AddrBook ) NeedMoreAddrs ( ) bool {
return a . Size ( ) < needAddressThreshold
}
// Size returns the number of addresses in the book.
func ( a * AddrBook ) Size ( ) int {
a . mtx . Lock ( )
defer a . mtx . Unlock ( )
@ -182,7 +191,12 @@ func (a *AddrBook) size() int {
return a . nNew + a . nOld
}
// Pick an address to connect to with new/old bias.
// PickAddress picks an address to connect to.
// The address is picked randomly from an old or new bucket according
// to the newBias argument, which must be between [0, 100] (or else is truncated to that range)
// and determines how biased we are to pick an address from a new bucket.
// PickAddress returns nil if the AddrBook is empty or if we try to pick
// from an empty bucket.
func ( a * AddrBook ) PickAddress ( newBias int ) * NetAddress {
a . mtx . Lock ( )
defer a . mtx . Unlock ( )
@ -201,40 +215,34 @@ func (a *AddrBook) PickAddress(newBias int) *NetAddress {
oldCorrelation := math . Sqrt ( float64 ( a . nOld ) ) * ( 100.0 - float64 ( newBias ) )
newCorrelation := math . Sqrt ( float64 ( a . nNew ) ) * float64 ( newBias )
if ( newCorrelation + oldCorrelation ) * a . rand . Float64 ( ) < oldCorrelation {
// pick random Old bucket.
var bucket map [ string ] * knownAddress = nil
for len ( bucket ) == 0 {
bucket = a . addrOld [ a . rand . Intn ( len ( a . addrOld ) ) ]
}
// pick a random ka from bucket.
randIndex := a . rand . Intn ( len ( bucket ) )
for _ , ka := range bucket {
if randIndex == 0 {
return ka . Addr
}
randIndex --
}
cmn . PanicSanity ( "Should not happen" )
} else {
// pick random New bucket.
var bucket map [ string ] * knownAddress = nil
for len ( bucket ) == 0 {
bucket = a . addrNew [ a . rand . Intn ( len ( a . addrNew ) ) ]
// pick a random peer from a random bucket
var bucket map [ string ] * knownAddress
pickFromOldBucket := ( newCorrelation + oldCorrelation ) * a . rand . Float64 ( ) < oldCorrelation
if ( pickFromOldBucket && a . nOld == 0 ) ||
( ! pickFromOldBucket && a . nNew == 0 ) {
return nil
}
// loop until we pick a random non-empty bucket
for len ( bucket ) == 0 {
if pickFromOldBucket {
bucket = a . bucketsOld [ a . rand . Intn ( len ( a . bucketsOld ) ) ]
} else {
bucket = a . bucketsNew [ a . rand . Intn ( len ( a . bucketsNew ) ) ]
}
// pick a random ka from bucket.
randIndex := a . rand . Intn ( len ( bucket ) )
for _ , ka := range bucket {
if randIndex == 0 {
return ka . Addr
}
randIndex --
}
// pick a random index and loop over the map to return that index
randIndex := a . rand . Intn ( len ( bucket ) )
for _ , ka := range bucket {
if randIndex == 0 {
return ka . Addr
}
cmn . PanicSanity ( "Should not happen" )
randIndex --
}
return nil
}
// MarkGood marks the peer as good and moves it into an "old" bucket.
// XXX: we never call this!
func ( a * AddrBook ) MarkGood ( addr * NetAddress ) {
a . mtx . Lock ( )
defer a . mtx . Unlock ( )
@ -248,6 +256,7 @@ func (a *AddrBook) MarkGood(addr *NetAddress) {
}
}
// MarkAttempt marks that an attempt was made to connect to the address.
func ( a * AddrBook ) MarkAttempt ( addr * NetAddress ) {
a . mtx . Lock ( )
defer a . mtx . Unlock ( )
@ -301,6 +310,7 @@ func (a *AddrBook) GetSelection() []*NetAddress {
// Fisher-Yates shuffle the array. We only need to do the first
// `numAddresses' since we are throwing the rest.
// XXX: What's the point of this if we already loop randomly through addrLookup ?
for i := 0 ; i < numAddresses ; i ++ {
// pick a number between current index and the end
j := rand . Intn ( len ( allAddr ) - i ) + i
@ -370,7 +380,7 @@ func (a *AddrBook) loadFromFile(filePath string) bool {
// Restore all the fields...
// Restore the key
a . key = aJSON . Key
// Restore .addrNew & .addr Old
// Restore .bucketsNew & .buckets Old
for _ , ka := range aJSON . Addrs {
for _ , bucketIndex := range ka . Buckets {
bucket := a . getBucket ( ka . BucketType , bucketIndex )
@ -397,17 +407,17 @@ func (a *AddrBook) Save() {
func ( a * AddrBook ) saveRoutine ( ) {
defer a . wg . Done ( )
dumpAddress Ticker := time . NewTicker ( dumpAddressInterval )
saveFile Ticker := time . NewTicker ( dumpAddressInterval )
out :
for {
select {
case <- dumpAddress Ticker. C :
case <- saveFile Ticker. C :
a . saveToFile ( a . filePath )
case <- a . Quit :
break out
}
}
dumpAddress Ticker. Stop ( )
saveFile Ticker. Stop ( )
a . saveToFile ( a . filePath )
a . Logger . Info ( "Address handler done" )
}
@ -415,9 +425,9 @@ out:
func ( a * AddrBook ) getBucket ( bucketType byte , bucketIdx int ) map [ string ] * knownAddress {
switch bucketType {
case bucketTypeNew :
return a . addr New[ bucketIdx ]
return a . buckets New[ bucketIdx ]
case bucketTypeOld :
return a . addr Old[ bucketIdx ]
return a . buckets Old[ bucketIdx ]
default :
cmn . PanicSanity ( "Should not happen" )
return nil
@ -472,7 +482,7 @@ func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool {
}
addrStr := ka . Addr . String ( )
bucket := a . getBucket ( bucketTypeNew , bucketIdx )
bucket := a . getBucket ( bucketTypeOld , bucketIdx )
// Already exists?
if _ , ok := bucket [ addrStr ] ; ok {
@ -538,14 +548,13 @@ func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
return oldest
}
func ( a * AddrBook ) addAddress ( addr , src * NetAddress ) {
func ( a * AddrBook ) addAddress ( addr , src * NetAddress ) error {
if a . routabilityStrict && ! addr . Routable ( ) {
a . Logger . Error ( cmn . Fmt ( "Cannot add non-routable address %v" , addr ) )
return
return fmt . Errorf ( "Cannot add non-routable address %v" , addr )
}
if _ , ok := a . ourAddrs [ addr . String ( ) ] ; ok {
// Ignore our own listener address.
return
return fmt . Errorf ( "Cannot add ourselves with address %v" , addr )
}
ka := a . addrLookup [ addr . String ( ) ]
@ -553,16 +562,16 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) {
if ka != nil {
// Already old.
if ka . isOld ( ) {
return
return nil
}
// Already in max new buckets.
if len ( ka . Buckets ) == maxNewBucketsPerAddress {
return
return nil
}
// The more entries we have, the less likely we are to add more.
factor := int32 ( 2 * len ( ka . Buckets ) )
if a . rand . Int31n ( factor ) != 0 {
return
return nil
}
} else {
ka = newKnownAddress ( addr , src )
@ -572,12 +581,13 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) {
a . addToNewBucket ( ka , bucket )
a . Logger . Info ( "Added new address" , "address" , addr , "total" , a . size ( ) )
return nil
}
// Make space in the new buckets by expiring the really bad entries.
// If no bad entries are available we remove the oldest.
func ( a * AddrBook ) expireNew ( bucketIdx int ) {
for addrStr , ka := range a . addr New[ bucketIdx ] {
for addrStr , ka := range a . buckets New[ bucketIdx ] {
// If an entry is bad, throw it away
if ka . isBad ( ) {
a . Logger . Info ( cmn . Fmt ( "expiring bad address %v" , addrStr ) )
@ -679,8 +689,8 @@ func (a *AddrBook) calcOldBucket(addr *NetAddress) int {
}
// Return a string representing the network group of this address.
// This is the /16 for IPv6 , the /32 (/36 for he.net) for IPv6, the string
// "local" for a local address and the string "unroutable for an unroutable
// This is the /16 for IPv4 , the /32 (/36 for he.net) for IPv6, the string
// "local" for a local address and the string "unroutable" for an unroutable
// address.
func ( a * AddrBook ) groupKey ( na * NetAddress ) string {
if a . routabilityStrict && na . Local ( ) {
@ -806,8 +816,8 @@ func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
}
/ *
An address is bad if the address in question has not been tried in the last
minute and meets one of the following criteria :
An address is bad if the address in question is a New address , has not been tried in the last
minute , and meets one of the following criteria :
1 ) It claims to be from the future
2 ) It hasn ' t been seen in over a month
@ -816,14 +826,23 @@ func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
All addresses that meet these criteria are assumed to be worthless and not
worth keeping hold of .
XXX : so a good peer needs us to call MarkGood before the conditions above are reached !
* /
func ( ka * knownAddress ) isBad ( ) bool {
// Is Old --> good
if ka . BucketType == bucketTypeOld {
return false
}
// Has been attempted in the last minute --> good
if ka . LastAttempt . Before ( time . Now ( ) . Add ( - 1 * time . Minute ) ) {
return false
}
// Over a month old?
// Too old?
// XXX: does this mean if we've kept a connection up for this long we'll disconnect?!
// and shouldn't it be .Before ?
if ka . LastAttempt . After ( time . Now ( ) . Add ( - 1 * numMissingDays * time . Hour * 24 ) ) {
return true
}
@ -834,6 +853,7 @@ func (ka *knownAddress) isBad() bool {
}
// Hasn't succeeded in too long?
// XXX: does this mean if we've kept a connection up for this long we'll disconnect?!
if ka . LastSuccess . Before ( time . Now ( ) . Add ( - 1 * minBadDays * time . Hour * 24 ) ) &&
ka . Attempts >= maxFailures {
return true