From 4e0010a4e91da3d77494e7425cba411e46273e26 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 8 Jun 2014 17:17:17 -0700 Subject: [PATCH] . --- peer/addrmanager.go | 120 ++++++++++++++++++++++++++++++++++++++----- peer/knownaddress.go | 24 +++++++++ 2 files changed, 130 insertions(+), 14 deletions(-) diff --git a/peer/addrmanager.go b/peer/addrmanager.go index e837ebd89..c60a70d0d 100644 --- a/peer/addrmanager.go +++ b/peer/addrmanager.go @@ -8,6 +8,7 @@ import ( . "github.com/tendermint/tendermint/binary" crand "crypto/rand" // for seeding "encoding/binary" + "encoding/json" "io" "math" "math/rand" @@ -15,10 +16,13 @@ import ( "sync" "sync/atomic" "time" + "os" + "fmt" ) /* AddrManager - concurrency safe peer address manager */ type AddrManager struct { + mtx sync.Mutex rand *rand.Rand key [32]byte addrIndex map[string]*KnownAddress // addr.String() -> KnownAddress @@ -27,10 +31,11 @@ type AddrManager struct { started int32 shutdown int32 wg sync.WaitGroup - quit chan bool + quit chan struct{} nOld int nNew int localAddresses map[string]*localAddress + filePath string } const ( @@ -86,16 +91,18 @@ const ( ) // Use Start to begin processing asynchronous address updates. -func NewAddrManager() *AddrManager { +func NewAddrManager(filePath string) *AddrManager { am := AddrManager{ rand: rand.New(rand.NewSource(time.Now().UnixNano())), - quit: make(chan bool), + quit: make(chan struct{}), localAddresses: make(map[string]*localAddress), + filePath: filePath, } am.init() return &am } +// When modifying this, don't forget to update loadFromFile() func (a *AddrManager) init() { a.addrIndex = make(map[string]*KnownAddress) io.ReadFull(crand.Reader, a.key[:]) @@ -110,8 +117,8 @@ func (a *AddrManager) init() { func (a *AddrManager) Start() { if atomic.AddInt32(&a.started, 1) != 1 { return } amgrLog.Trace("Starting address manager") + a.loadFromFile(a.filePath) a.wg.Add(1) - a.loadPeers() go a.addressHandler() } @@ -123,7 +130,7 @@ func (a *AddrManager) Stop() { } func (a *AddrManager) AddAddress(addr *NetAddress, src *NetAddress) { - // XXX use a channel for concurrency + a.mtx.Lock(); defer a.mtx.Unlock() a.addAddress(addr, src) } @@ -132,12 +139,15 @@ func (a *AddrManager) NeedMoreAddresses() bool { } func (a *AddrManager) NumAddresses() int { + a.mtx.Lock(); defer a.mtx.Unlock() return a.nOld + a.nNew } // Pick a new address to connect to. func (a *AddrManager) PickAddress(class string, newBias int) *KnownAddress { - if a.NumAddresses() == 0 { return nil } + a.mtx.Lock(); defer a.mtx.Unlock() + + if a.nOld == 0 && a.nNew == 0 { return nil } if newBias > 100 { newBias = 100 } if newBias < 0 { newBias = 0 } @@ -146,28 +156,110 @@ func (a *AddrManager) PickAddress(class string, newBias int) *KnownAddress { newCorrelation := math.Sqrt(float64(a.nNew)) * float64(newBias) if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation { - // Old entry. - // XXX + // pick random Old bucket. + var bucket []*KnownAddress = nil + for len(bucket) == 0 { + bucket = a.addrOld[a.rand.Intn(len(a.addrOld))] + } + // pick a random ka from bucket. + return bucket[a.rand.Intn(len(bucket))] } else { - // New entry. - // XXX + // pick random New bucket. + var bucket map[string]*KnownAddress = nil + for len(bucket) == 0 { + bucket = a.addrNew[a.rand.Intn(len(a.addrNew))] + } + // pick a random ka from bucket. + randIndex := a.rand.Intn(len(bucket)) + for _, ka := range bucket { + randIndex-- + if randIndex == 0 { + return ka + } + } + panic("Should not happen") } return nil } func (a *AddrManager) MarkGood(ka *KnownAddress) { + a.mtx.Lock(); defer a.mtx.Unlock() ka.MarkAttempt(true) a.moveToOld(ka) } +func (a *AddrManager) MarkBad(ka *KnownAddress) { + a.mtx.Lock(); defer a.mtx.Unlock() + ka.MarkAttempt(false) +} + /* Loading & Saving */ -func (a *AddrManager) loadPeers() { +type addrManagerJSON struct { + Key [32]byte + AddrNew [newBucketCount]map[string]*KnownAddress + AddrOld [oldBucketCount][]*KnownAddress + NOld int + NNew int } -func (a *AddrManager) savePeers() { +func (a *AddrManager) saveToFile(filePath string) { + aJSON := &addrManagerJSON{ + Key: a.key, + AddrNew: a.addrNew, + AddrOld: a.addrOld, + NOld: a.nOld, + NNew: a.nNew, + } + + w, err := os.Create(filePath) + if err != nil { + amgrLog.Error("Error opening file: ", filePath, err) + return + } + enc := json.NewEncoder(w) + defer w.Close() + err = enc.Encode(&aJSON) + if err != nil { panic(err) } } +func (a *AddrManager) loadFromFile(filePath string) { + // If doesn't exist, do nothing. + _, err := os.Stat(filePath) + if os.IsNotExist(err) { return } + + r, err := os.Open(filePath) + if err != nil { + panic(fmt.Errorf("%s error opening file: %v", filePath, err)) + } + defer r.Close() + + aJSON := &addrManagerJSON{} + dec := json.NewDecoder(r) + err = dec.Decode(aJSON) + if err != nil { + panic(fmt.Errorf("error reading %s: %v", filePath, err)) + } + + // Now we need to initialize 'a'. + + copy(a.key[:], aJSON.Key[:]) + a.addrNew = aJSON.AddrNew + for i, oldBucket := range aJSON.AddrOld { + copy(a.addrOld[i], oldBucket) + } + a.nNew = aJSON.NNew + a.nOld = aJSON.NOld + + a.addrIndex = make(map[string]*KnownAddress) + for _, newBucket := range a.addrNew { + for key, ka := range newBucket { + a.addrIndex[key] = ka + } + } +} + + /* Private methods */ func (a *AddrManager) addressHandler() { @@ -176,13 +268,13 @@ out: for { select { case <-dumpAddressTicker.C: - a.savePeers() + a.saveToFile(a.filePath) case <-a.quit: break out } } dumpAddressTicker.Stop() - a.savePeers() + a.saveToFile(a.filePath) a.wg.Done() amgrLog.Trace("Address handler done") } diff --git a/peer/knownaddress.go b/peer/knownaddress.go index 32685587b..73a1fc2fa 100644 --- a/peer/knownaddress.go +++ b/peer/knownaddress.go @@ -3,6 +3,7 @@ package peer import ( . "github.com/tendermint/tendermint/binary" "time" + "io" ) /* @@ -31,6 +32,29 @@ func NewKnownAddress(addr *NetAddress, src *NetAddress) *KnownAddress { } } +func ReadKnownAddress(r io.Reader) *KnownAddress { + return &KnownAddress{ + Addr: ReadNetAddress(r), + Src: ReadNetAddress(r), + Attempts: ReadUInt32(r), + LastAttempt: ReadUInt64(r), + LastSuccess: ReadUInt64(r), + NewRefs: ReadUInt16(r), + OldBucket: ReadInt16(r), + } +} + +func (ka *KnownAddress) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteOnto(ka.Addr, w, n, err) + n, err = WriteOnto(ka.Src, w, n, err) + n, err = WriteOnto(ka.Attempts, w, n, err) + n, err = WriteOnto(ka.LastAttempt, w, n, err) + n, err = WriteOnto(ka.LastSuccess, w, n, err) + n, err = WriteOnto(ka.NewRefs, w, n, err) + n, err = WriteOnto(ka.OldBucket, w, n, err) + return +} + func (ka *KnownAddress) MarkAttempt(success bool) { now := UInt64(time.Now().Unix()) ka.LastAttempt = now