@ -5,7 +5,7 @@
package pex
import (
"crypto/sha256 "
crand "crypto/rand "
"encoding/binary"
"fmt"
"math"
@ -14,6 +14,7 @@ import (
"sync"
"time"
"github.com/minio/highwayhash"
"github.com/tendermint/tendermint/crypto"
tmmath "github.com/tendermint/tendermint/libs/math"
tmrand "github.com/tendermint/tendermint/libs/rand"
@ -100,10 +101,17 @@ type addrBook struct {
filePath string
key string // random prefix for bucket placement
routabilityStrict bool
hashKey [ ] byte
wg sync . WaitGroup
}
func newHashKey ( ) [ ] byte {
result := make ( [ ] byte , highwayhash . Size )
crand . Read ( result )
return result
}
// NewAddrBook creates a new address book.
// Use Start to begin processing asynchronous address updates.
func NewAddrBook ( filePath string , routabilityStrict bool ) AddrBook {
@ -115,6 +123,7 @@ func NewAddrBook(filePath string, routabilityStrict bool) AddrBook {
badPeers : make ( map [ p2p . ID ] * knownAddress ) ,
filePath : filePath ,
routabilityStrict : routabilityStrict ,
hashKey : newHashKey ( ) ,
}
am . init ( )
am . BaseService = * service . NewBaseService ( nil , "AddrBook" , am )
@ -344,16 +353,28 @@ func (a *addrBook) MarkBad(addr *p2p.NetAddress, banTime time.Duration) {
}
}
// ReinstateBadPeers removes bad peers from ban list and places them into a new
// bucket.
func ( a * addrBook ) ReinstateBadPeers ( ) {
a . mtx . Lock ( )
defer a . mtx . Unlock ( )
for _ , ka := range a . badPeers {
if ! ka . isBanned ( ) {
bucket := a . calcNewBucket ( ka . Addr , ka . Src )
a . addToNewBucket ( ka , bucket )
delete ( a . badPeers , ka . ID ( ) )
a . Logger . Info ( "Reinstated address" , "addr" , ka . Addr )
if ka . isBanned ( ) {
continue
}
bucket , err := a . calcNewBucket ( ka . Addr , ka . Src )
if err != nil {
a . Logger . Error ( "Failed to calculate new bucket (bad peer won't be reinstantiated)" ,
"addr" , ka . Addr , "err" , err )
continue
}
a . addToNewBucket ( ka , bucket )
delete ( a . badPeers , ka . ID ( ) )
a . Logger . Info ( "Reinstated address" , "addr" , ka . Addr )
}
}
@ -659,7 +680,10 @@ func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error {
ka = newKnownAddress ( addr , src )
}
bucket := a . calcNewBucket ( addr , src )
bucket , err := a . calcNewBucket ( addr , src )
if err != nil {
return err
}
a . addToNewBucket ( ka , bucket )
return nil
}
@ -722,15 +746,15 @@ func (a *addrBook) expireNew(bucketIdx int) {
// Promotes an address from new to old. If the destination bucket is full,
// demote the oldest one to a "new" bucket.
// TODO: Demote more probabilistically?
func ( a * addrBook ) moveToOld ( ka * knownAddress ) {
func ( a * addrBook ) moveToOld ( ka * knownAddress ) error {
// Sanity check
if ka . isOld ( ) {
a . Logger . Error ( fmt . Sprintf ( "Cannot promote address that is already old %v" , ka ) )
return
return nil
}
if len ( ka . Buckets ) == 0 {
a . Logger . Error ( fmt . Sprintf ( "Cannot promote address that isn't in any new buckets %v" , ka ) )
return
return nil
}
// Remove from all (new) buckets.
@ -739,13 +763,19 @@ func (a *addrBook) moveToOld(ka *knownAddress) {
ka . BucketType = bucketTypeOld
// Try to add it to its oldBucket destination.
oldBucketIdx := a . calcOldBucket ( ka . Addr )
oldBucketIdx , err := a . calcOldBucket ( ka . Addr )
if err != nil {
return err
}
added := a . addToOldBucket ( ka , oldBucketIdx )
if ! added {
// No room; move the oldest to a new bucket
oldest := a . pickOldest ( bucketTypeOld , oldBucketIdx )
a . removeFromBucket ( oldest , bucketTypeOld , oldBucketIdx )
newBucketIdx := a . calcNewBucket ( oldest . Addr , oldest . Src )
newBucketIdx , err := a . calcNewBucket ( oldest . Addr , oldest . Src )
if err != nil {
return err
}
a . addToNewBucket ( oldest , newBucketIdx )
// Finally, add our ka to old bucket again.
@ -754,6 +784,7 @@ func (a *addrBook) moveToOld(ka *knownAddress) {
a . Logger . Error ( fmt . Sprintf ( "Could not re-add ka %v to oldBucketIdx %v" , ka , oldBucketIdx ) )
}
}
return nil
}
func ( a * addrBook ) removeAddress ( addr * p2p . NetAddress ) {
@ -785,14 +816,16 @@ func (a *addrBook) addBadPeer(addr *p2p.NetAddress, banTime time.Duration) bool
//---------------------------------------------------------------------
// calculate bucket placements
// doublesha256( key + sourcegroup +
// int64(doublesha256(key + group + sourcegroup))%bucket_per_group ) % num_new_buckets
func ( a * addrBook ) calcNewBucket ( addr , src * p2p . NetAddress ) int {
// hash(key + sourcegroup + int64(hash(key + group + sourcegroup)) % bucket_per_group) % num_new_buckets
func ( a * addrBook ) calcNewBucket ( addr , src * p2p . NetAddress ) ( int , error ) {
data1 := [ ] byte { }
data1 = append ( data1 , [ ] byte ( a . key ) ... )
data1 = append ( data1 , [ ] byte ( a . groupKey ( addr ) ) ... )
data1 = append ( data1 , [ ] byte ( a . groupKey ( src ) ) ... )
hash1 := doubleSha256 ( data1 )
hash1 , err := a . hash ( data1 )
if err != nil {
return 0 , err
}
hash64 := binary . BigEndian . Uint64 ( hash1 )
hash64 %= newBucketsPerGroup
var hashbuf [ 8 ] byte
@ -802,17 +835,23 @@ func (a *addrBook) calcNewBucket(addr, src *p2p.NetAddress) int {
data2 = append ( data2 , a . groupKey ( src ) ... )
data2 = append ( data2 , hashbuf [ : ] ... )
hash2 := doubleSha256 ( data2 )
return int ( binary . BigEndian . Uint64 ( hash2 ) % newBucketCount )
hash2 , err := a . hash ( data2 )
if err != nil {
return 0 , err
}
result := int ( binary . BigEndian . Uint64 ( hash2 ) % newBucketCount )
return result , nil
}
// doublesha256( key + group +
// int64(doublesha256(key + addr))%buckets_per_group ) % num_old_buckets
func ( a * addrBook ) calcOldBucket ( addr * p2p . NetAddress ) int {
// hash(key + group + int64(hash(key + addr)) % buckets_per_group) % num_old_buckets
func ( a * addrBook ) calcOldBucket ( addr * p2p . NetAddress ) ( int , error ) {
data1 := [ ] byte { }
data1 = append ( data1 , [ ] byte ( a . key ) ... )
data1 = append ( data1 , [ ] byte ( addr . String ( ) ) ... )
hash1 := doubleSha256 ( data1 )
hash1 , err := a . hash ( data1 )
if err != nil {
return 0 , err
}
hash64 := binary . BigEndian . Uint64 ( hash1 )
hash64 %= oldBucketsPerGroup
var hashbuf [ 8 ] byte
@ -822,8 +861,12 @@ func (a *addrBook) calcOldBucket(addr *p2p.NetAddress) int {
data2 = append ( data2 , a . groupKey ( addr ) ... )
data2 = append ( data2 , hashbuf [ : ] ... )
hash2 := doubleSha256 ( data2 )
return int ( binary . BigEndian . Uint64 ( hash2 ) % oldBucketCount )
hash2 , err := a . hash ( data2 )
if err != nil {
return 0 , err
}
result := int ( binary . BigEndian . Uint64 ( hash2 ) % oldBucketCount )
return result , nil
}
// Return a string representing the network group of this address.
@ -875,12 +918,11 @@ func (a *addrBook) groupKey(na *p2p.NetAddress) string {
return ( & net . IPNet { IP : na . IP , Mask : net . CIDRMask ( bits , 128 ) } ) . String ( )
}
// doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes.
func doubleSha256 ( b [ ] byte ) [ ] byte {
hasher := sha256 . New ( )
hasher . Write ( b ) // nolint:errcheck
sum := hasher . Sum ( nil )
hasher . Reset ( )
hasher . Write ( sum ) // nolint:errcheck
return hasher . Sum ( nil )
func ( a * addrBook ) hash ( b [ ] byte ) ( [ ] byte , error ) {
hasher , err := highwayhash . New64 ( a . hashKey )
if err != nil {
return nil , err
}
hasher . Write ( b )
return hasher . Sum ( nil ) , nil
}