From f965a4db15796dfca4e504d93eaa83760ced4af2 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 3 Apr 2019 11:22:52 +0200 Subject: [PATCH] p2p: seed mode refactoring (#3011) ListOfKnownAddresses is removed panic if addrbook size is less than zero CrawlPeers does not attempt to connect to existing or peers we're currently dialing various perf. fixes improved tests (though not complete) move IsDialingOrExistingAddress check into DialPeerWithAddress (Fixes #2716) * addrbook: preallocate memory when saving addrbook to file * addrbook: remove oldestFirst struct and check for ID * oldestFirst replaced with sort.Slice * ID is now mandatory, so no need to check * addrbook: remove ListOfKnownAddresses GetSelection is used instead in seed mode. * addrbook: panic if size is less than 0 * rewrite addrbook#saveToFile to not use a counter * test AttemptDisconnects func * move IsDialingOrExistingAddress check into DialPeerWithAddress * save and cleanup crawl peer data * get rid of DefaultSeedDisconnectWaitPeriod * make linter happy * fix TestPEXReactorSeedMode * fix comment * add a changelog entry * Apply suggestions from code review Co-Authored-By: melekes * rename ErrDialingOrExistingAddress to ErrCurrentlyDialingOrExistingAddress * lowercase errors * do not persist seed data pros: - no extra files - less IO cons: - if the node crashes, seed might crawl a peer too soon * fixes after Ethan's review * add a changelog entry * we should only consult Switch about peers checking addrbook size does not make sense since only PEX reactor uses it for dialing peers! https://github.com/tendermint/tendermint/pull/3011#discussion_r270948875 --- CHANGELOG_PENDING.md | 2 +- node/node.go | 6 ++ p2p/errors.go | 24 ++++-- p2p/pex/addrbook.go | 24 ++---- p2p/pex/file.go | 9 +- p2p/pex/known_address.go | 12 --- p2p/pex/pex_reactor.go | 163 +++++++++++++++++++----------------- p2p/pex/pex_reactor_test.go | 66 ++++++++------- p2p/switch.go | 30 ++++--- 9 files changed, 173 insertions(+), 163 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 045247937..21e52fe60 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -19,4 +19,4 @@ ### IMPROVEMENTS: ### BUG FIXES: - +- [p2p] \#2716 Check if we're already connected to peer right before dialing it (@melekes) diff --git a/node/node.go b/node/node.go index e91d36357..c0a4d736e 100644 --- a/node/node.go +++ b/node/node.go @@ -498,6 +498,12 @@ func NewNode(config *cfg.Config, &pex.PEXReactorConfig{ Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "), SeedMode: config.P2P.SeedMode, + // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000 + // blocks assuming 10s blocks ~ 28 hours. + // TODO (melekes): make it dynamic based on the actual block latencies + // from the live network. + // https://github.com/tendermint/tendermint/issues/3523 + SeedDisconnectWaitPeriod: 28 * time.Hour, }) pexReactor.SetLogger(logger.With("module", "pex")) sw.AddReactor("PEX", pexReactor) diff --git a/p2p/errors.go b/p2p/errors.go index 706150945..3650a7a0a 100644 --- a/p2p/errors.go +++ b/p2p/errors.go @@ -103,7 +103,7 @@ type ErrSwitchDuplicatePeerID struct { } func (e ErrSwitchDuplicatePeerID) Error() string { - return fmt.Sprintf("Duplicate peer ID %v", e.ID) + return fmt.Sprintf("duplicate peer ID %v", e.ID) } // ErrSwitchDuplicatePeerIP to be raised whena a peer is connecting with a known @@ -113,7 +113,7 @@ type ErrSwitchDuplicatePeerIP struct { } func (e ErrSwitchDuplicatePeerIP) Error() string { - return fmt.Sprintf("Duplicate peer IP %v", e.IP.String()) + return fmt.Sprintf("duplicate peer IP %v", e.IP.String()) } // ErrSwitchConnectToSelf to be raised when trying to connect to itself. @@ -122,7 +122,7 @@ type ErrSwitchConnectToSelf struct { } func (e ErrSwitchConnectToSelf) Error() string { - return fmt.Sprintf("Connect to self: %v", e.Addr) + return fmt.Sprintf("connect to self: %v", e.Addr) } type ErrSwitchAuthenticationFailure struct { @@ -132,7 +132,7 @@ type ErrSwitchAuthenticationFailure struct { func (e ErrSwitchAuthenticationFailure) Error() string { return fmt.Sprintf( - "Failed to authenticate peer. Dialed %v, but got peer with ID %s", + "failed to authenticate peer. Dialed %v, but got peer with ID %s", e.Dialed, e.Got, ) @@ -152,7 +152,7 @@ type ErrNetAddressNoID struct { } func (e ErrNetAddressNoID) Error() string { - return fmt.Sprintf("Address (%s) does not contain ID", e.Addr) + return fmt.Sprintf("address (%s) does not contain ID", e.Addr) } type ErrNetAddressInvalid struct { @@ -161,7 +161,7 @@ type ErrNetAddressInvalid struct { } func (e ErrNetAddressInvalid) Error() string { - return fmt.Sprintf("Invalid address (%s): %v", e.Addr, e.Err) + return fmt.Sprintf("invalid address (%s): %v", e.Addr, e.Err) } type ErrNetAddressLookup struct { @@ -170,5 +170,15 @@ type ErrNetAddressLookup struct { } func (e ErrNetAddressLookup) Error() string { - return fmt.Sprintf("Error looking up host (%s): %v", e.Addr, e.Err) + return fmt.Sprintf("error looking up host (%s): %v", e.Addr, e.Err) +} + +// ErrCurrentlyDialingOrExistingAddress indicates that we're currently +// dialing this address or it belongs to an existing peer. +type ErrCurrentlyDialingOrExistingAddress struct { + Addr string +} + +func (e ErrCurrentlyDialingOrExistingAddress) Error() string { + return fmt.Sprintf("connection with %s has been established or dialed", e.Addr) } diff --git a/p2p/pex/addrbook.go b/p2p/pex/addrbook.go index 3cb91c380..6be03d75b 100644 --- a/p2p/pex/addrbook.go +++ b/p2p/pex/addrbook.go @@ -66,8 +66,7 @@ type AddrBook interface { // Send a selection of addresses with bias GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddress - // TODO: remove - ListOfKnownAddresses() []*knownAddress + Size() int // Persist to disk Save() @@ -254,7 +253,7 @@ func (a *addrBook) PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress { bookSize := a.size() if bookSize <= 0 { if bookSize < 0 { - a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld) + panic(fmt.Sprintf("Addrbook size %d (new: %d + old: %d) is less than 0", a.nNew+a.nOld, a.nNew, a.nOld)) } return nil } @@ -339,7 +338,7 @@ func (a *addrBook) GetSelection() []*p2p.NetAddress { bookSize := a.size() if bookSize <= 0 { if bookSize < 0 { - a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld) + panic(fmt.Sprintf("Addrbook size %d (new: %d + old: %d) is less than 0", a.nNew+a.nOld, a.nNew, a.nOld)) } return nil } @@ -389,7 +388,7 @@ func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddre bookSize := a.size() if bookSize <= 0 { if bookSize < 0 { - a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld) + panic(fmt.Sprintf("Addrbook size %d (new: %d + old: %d) is less than 0", a.nNew+a.nOld, a.nNew, a.nOld)) } return nil } @@ -414,18 +413,6 @@ func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddre return selection } -// ListOfKnownAddresses returns the new and old addresses. -func (a *addrBook) ListOfKnownAddresses() []*knownAddress { - a.mtx.Lock() - defer a.mtx.Unlock() - - addrs := []*knownAddress{} - for _, addr := range a.addrLookup { - addrs = append(addrs, addr.copy()) - } - return addrs -} - //------------------------------------------------ // Size returns the number of addresses in the book. @@ -473,8 +460,7 @@ func (a *addrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAd case bucketTypeOld: return a.bucketsOld[bucketIdx] default: - cmn.PanicSanity("Should not happen") - return nil + panic("Invalid bucket type") } } diff --git a/p2p/pex/file.go b/p2p/pex/file.go index 33fec0336..d4a516850 100644 --- a/p2p/pex/file.go +++ b/p2p/pex/file.go @@ -16,16 +16,15 @@ type addrBookJSON struct { } func (a *addrBook) saveToFile(filePath string) { - a.Logger.Info("Saving AddrBook to file", "size", a.Size()) - a.mtx.Lock() defer a.mtx.Unlock() - // Compile Addrs - addrs := []*knownAddress{} + + a.Logger.Info("Saving AddrBook to file", "size", a.size()) + + addrs := make([]*knownAddress, 0, len(a.addrLookup)) for _, ka := range a.addrLookup { addrs = append(addrs, ka) } - aJSON := &addrBookJSON{ Key: a.key, Addrs: addrs, diff --git a/p2p/pex/known_address.go b/p2p/pex/known_address.go index 5673dec11..acde385bc 100644 --- a/p2p/pex/known_address.go +++ b/p2p/pex/known_address.go @@ -33,18 +33,6 @@ func (ka *knownAddress) ID() p2p.ID { return ka.Addr.ID } -func (ka *knownAddress) copy() *knownAddress { - return &knownAddress{ - Addr: ka.Addr, - Src: ka.Src, - Attempts: ka.Attempts, - LastAttempt: ka.LastAttempt, - LastSuccess: ka.LastSuccess, - BucketType: ka.BucketType, - Buckets: ka.Buckets, - } -} - func (ka *knownAddress) isOld() bool { return ka.BucketType == bucketTypeOld } diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 0ce116326..cf8cfe6f5 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -3,7 +3,6 @@ package pex import ( "fmt" "reflect" - "sort" "sync" "time" @@ -35,16 +34,11 @@ const ( // Seed/Crawler constants - // We want seeds to only advertise good peers. Therefore they should wait at - // least as long as we expect it to take for a peer to become good before - // disconnecting. - // see consensus/reactor.go: blocksToContributeToBecomeGoodPeer - // 10000 blocks assuming 1s blocks ~ 2.7 hours. - defaultSeedDisconnectWaitPeriod = 3 * time.Hour + // minTimeBetweenCrawls is a minimum time between attempts to crawl a peer. + minTimeBetweenCrawls = 2 * time.Minute - defaultCrawlPeerInterval = 2 * time.Minute // don't redial for this. TODO: back-off. what for? - - defaultCrawlPeersPeriod = 30 * time.Second // check some peers every this + // check some peers every this + crawlPeerPeriod = 30 * time.Second maxAttemptsToDial = 16 // ~ 35h in total (last attempt - 18h) @@ -77,6 +71,9 @@ type PEXReactor struct { seedAddrs []*p2p.NetAddress attemptsToDial sync.Map // address (string) -> {number of attempts (int), last time dialed (time.Time)} + + // seed/crawled mode fields + crawlPeerInfos map[p2p.ID]crawlPeerInfo } func (r *PEXReactor) minReceiveRequestInterval() time.Duration { @@ -90,6 +87,11 @@ type PEXReactorConfig struct { // Seed/Crawler mode SeedMode bool + // We want seeds to only advertise good peers. Therefore they should wait at + // least as long as we expect it to take for a peer to become good before + // disconnecting. + SeedDisconnectWaitPeriod time.Duration + // Seeds is a list of addresses reactor may use // if it can't connect to peers in the addrbook. Seeds []string @@ -108,6 +110,7 @@ func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor { ensurePeersPeriod: defaultEnsurePeersPeriod, requestsSent: cmn.NewCMap(), lastReceivedRequests: cmn.NewCMap(), + crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo), } r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r) return r @@ -363,9 +366,9 @@ func (r *PEXReactor) ensurePeersRoutine() { ) // Randomize first round of communication to avoid thundering herd. - // If no potential peers are present directly start connecting so we guarantee - // swift setup with the help of configured seeds. - if r.hasPotentialPeers() { + // If no peers are present directly start connecting so we guarantee swift + // setup with the help of configured seeds. + if r.nodeHasSomePeersOrDialingAny() { time.Sleep(time.Duration(jitter)) } @@ -493,23 +496,26 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { err := r.Switch.DialPeerWithAddress(addr, false) if err != nil { + if _, ok := err.(p2p.ErrCurrentlyDialingOrExistingAddress); ok { + return + } + r.Logger.Error("Dialing failed", "addr", addr, "err", err, "attempts", attempts) - // TODO: detect more "bad peer" scenarios + markAddrInBookBasedOnErr(addr, r.book, err) if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok { - r.book.MarkBad(addr) r.attemptsToDial.Delete(addr.DialString()) } else { - r.book.MarkAttempt(addr) // FIXME: if the addr is going to be removed from the addrbook (hard to // tell at this point), we need to Delete it from attemptsToDial, not // record another attempt. // record attempt r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{attempts + 1, time.Now()}) } - } else { - // cleanup any history - r.attemptsToDial.Delete(addr.DialString()) + return } + + // cleanup any history + r.attemptsToDial.Delete(addr.DialString()) } // checkSeeds checks that addresses are well formed. @@ -568,101 +574,92 @@ func (r *PEXReactor) AttemptsToDial(addr *p2p.NetAddress) int { // from peers, except other seed nodes. func (r *PEXReactor) crawlPeersRoutine() { // Do an initial crawl - r.crawlPeers() + r.crawlPeers(r.book.GetSelection()) // Fire periodically - ticker := time.NewTicker(defaultCrawlPeersPeriod) + ticker := time.NewTicker(crawlPeerPeriod) for { select { case <-ticker.C: r.attemptDisconnects() - r.crawlPeers() + r.crawlPeers(r.book.GetSelection()) + r.cleanupCrawlPeerInfos() case <-r.Quit(): return } } } -// hasPotentialPeers indicates if there is a potential peer to connect to, by -// consulting the Switch as well as the AddrBook. -func (r *PEXReactor) hasPotentialPeers() bool { +// nodeHasSomePeersOrDialingAny returns true if the node is connected to some +// peers or dialing them currently. +func (r *PEXReactor) nodeHasSomePeersOrDialingAny() bool { out, in, dial := r.Switch.NumPeers() - - return out+in+dial > 0 && len(r.book.ListOfKnownAddresses()) > 0 + return out+in+dial > 0 } -// crawlPeerInfo handles temporary data needed for the -// network crawling performed during seed/crawler mode. +// crawlPeerInfo handles temporary data needed for the network crawling +// performed during seed/crawler mode. type crawlPeerInfo struct { - // The listening address of a potential peer we learned about - Addr *p2p.NetAddress - - // The last time we attempt to reach this address - LastAttempt time.Time - - // The last time we successfully reached this address - LastSuccess time.Time + Addr *p2p.NetAddress `json:"addr"` + // The last time we crawled the peer or attempted to do so. + LastCrawled time.Time `json:"last_crawled"` } -// oldestFirst implements sort.Interface for []crawlPeerInfo -// based on the LastAttempt field. -type oldestFirst []crawlPeerInfo - -func (of oldestFirst) Len() int { return len(of) } -func (of oldestFirst) Swap(i, j int) { of[i], of[j] = of[j], of[i] } -func (of oldestFirst) Less(i, j int) bool { return of[i].LastAttempt.Before(of[j].LastAttempt) } +// crawlPeers will crawl the network looking for new peer addresses. +func (r *PEXReactor) crawlPeers(addrs []*p2p.NetAddress) { + now := time.Now() -// getPeersToCrawl returns addresses of potential peers that we wish to validate. -// NOTE: The status information is ordered as described above. -func (r *PEXReactor) getPeersToCrawl() []crawlPeerInfo { - // TODO: be more selective - addrs := r.book.ListOfKnownAddresses() - of := make(oldestFirst, 0, len(addrs)) for _, addr := range addrs { - if len(addr.ID()) == 0 { - continue // dont use peers without id - } - - of = append(of, crawlPeerInfo{ - Addr: addr.Addr, - LastAttempt: addr.LastAttempt, - LastSuccess: addr.LastSuccess, - }) - } - sort.Sort(of) - return of -} - -// crawlPeers will crawl the network looking for new peer addresses. (once) -func (r *PEXReactor) crawlPeers() { - peerInfos := r.getPeersToCrawl() + peerInfo, ok := r.crawlPeerInfos[addr.ID] - now := time.Now() - // Use addresses we know of to reach additional peers - for _, pi := range peerInfos { - // Do not attempt to connect with peers we recently dialed - if now.Sub(pi.LastAttempt) < defaultCrawlPeerInterval { + // Do not attempt to connect with peers we recently crawled. + if ok && now.Sub(peerInfo.LastCrawled) < minTimeBetweenCrawls { continue } - // Otherwise, attempt to connect with the known address - err := r.Switch.DialPeerWithAddress(pi.Addr, false) + + // Record crawling attempt. + r.crawlPeerInfos[addr.ID] = crawlPeerInfo{ + Addr: addr, + LastCrawled: now, + } + + err := r.Switch.DialPeerWithAddress(addr, false) if err != nil { - r.book.MarkAttempt(pi.Addr) + if _, ok := err.(p2p.ErrCurrentlyDialingOrExistingAddress); ok { + continue + } + + r.Logger.Error("Dialing failed", "addr", addr, "err", err) + markAddrInBookBasedOnErr(addr, r.book, err) continue } - // Ask for more addresses - peer := r.Switch.Peers().Get(pi.Addr.ID) + + peer := r.Switch.Peers().Get(addr.ID) if peer != nil { r.RequestAddrs(peer) } } } +func (r *PEXReactor) cleanupCrawlPeerInfos() { + for id, info := range r.crawlPeerInfos { + // If we did not crawl a peer for 24 hours, it means the peer was removed + // from the addrbook => remove + // + // 10000 addresses / maxGetSelection = 40 cycles to get all addresses in + // the ideal case, + // 40 * crawlPeerPeriod ~ 20 minutes + if time.Since(info.LastCrawled) > 24*time.Hour { + delete(r.crawlPeerInfos, id) + } + } +} + // attemptDisconnects checks if we've been with each peer long enough to disconnect func (r *PEXReactor) attemptDisconnects() { for _, peer := range r.Switch.Peers().List() { - if peer.Status().Duration < defaultSeedDisconnectWaitPeriod { + if peer.Status().Duration < r.config.SeedDisconnectWaitPeriod { continue } if peer.IsPersistent() { @@ -672,6 +669,16 @@ func (r *PEXReactor) attemptDisconnects() { } } +func markAddrInBookBasedOnErr(addr *p2p.NetAddress, book AddrBook, err error) { + // TODO: detect more "bad peer" scenarios + switch err.(type) { + case p2p.ErrSwitchAuthenticationFailure: + book.MarkBad(addr) + default: + book.MarkAttempt(addr) + } +} + //----------------------------------------------------------------------------- // Messages diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 4a6118c63..dda60daaa 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -204,26 +204,26 @@ func TestCheckSeeds(t *testing.T) { defer os.RemoveAll(dir) // nolint: errcheck // 1. test creating peer with no seeds works - peer := testCreateDefaultPeer(dir, 0) - require.Nil(t, peer.Start()) - peer.Stop() + peerSwitch := testCreateDefaultPeer(dir, 0) + require.Nil(t, peerSwitch.Start()) + peerSwitch.Stop() // 2. create seed seed := testCreateSeed(dir, 1, []*p2p.NetAddress{}, []*p2p.NetAddress{}) // 3. test create peer with online seed works - peer = testCreatePeerWithSeed(dir, 2, seed) - require.Nil(t, peer.Start()) - peer.Stop() + peerSwitch = testCreatePeerWithSeed(dir, 2, seed) + require.Nil(t, peerSwitch.Start()) + peerSwitch.Stop() // 4. test create peer with all seeds having unresolvable DNS fails badPeerConfig := &PEXReactorConfig{ Seeds: []string{"ed3dfd27bfc4af18f67a49862f04cc100696e84d@bad.network.addr:26657", "d824b13cb5d40fa1d8a614e089357c7eff31b670@anotherbad.network.addr:26657"}, } - peer = testCreatePeerWithConfig(dir, 2, badPeerConfig) - require.Error(t, peer.Start()) - peer.Stop() + peerSwitch = testCreatePeerWithConfig(dir, 2, badPeerConfig) + require.Error(t, peerSwitch.Start()) + peerSwitch.Stop() // 5. test create peer with one good seed address succeeds badPeerConfig = &PEXReactorConfig{ @@ -231,9 +231,9 @@ func TestCheckSeeds(t *testing.T) { "d824b13cb5d40fa1d8a614e089357c7eff31b670@anotherbad.network.addr:26657", seed.NetAddress().String()}, } - peer = testCreatePeerWithConfig(dir, 2, badPeerConfig) - require.Nil(t, peer.Start()) - peer.Stop() + peerSwitch = testCreatePeerWithConfig(dir, 2, badPeerConfig) + require.Nil(t, peerSwitch.Start()) + peerSwitch.Stop() } func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { @@ -285,31 +285,41 @@ func TestConnectionSpeedForPeerReceivedFromSeed(t *testing.T) { assertPeersWithTimeout(t, []*p2p.Switch{secondPeer}, 10*time.Millisecond, 1*time.Second, 2) } -func TestPEXReactorCrawlStatus(t *testing.T) { - pexR, book := createReactor(&PEXReactorConfig{SeedMode: true}) +func TestPEXReactorSeedMode(t *testing.T) { + // directory to store address books + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(t, err) + defer os.RemoveAll(dir) // nolint: errcheck + + pexR, book := createReactor(&PEXReactorConfig{SeedMode: true, SeedDisconnectWaitPeriod: 10 * time.Millisecond}) defer teardownReactor(book) - // Seed/Crawler mode uses data from the Switch sw := createSwitchAndAddReactors(pexR) sw.SetAddrBook(book) + err = sw.Start() + require.NoError(t, err) + defer sw.Stop() - // Create a peer, add it to the peer set and the addrbook. - peer := p2p.CreateRandomPeer(false) - p2p.AddPeerToSwitch(pexR.Switch, peer) - addr1 := peer.SocketAddr() - pexR.book.AddAddress(addr1, addr1) + assert.Zero(t, sw.Peers().Size()) + + peerSwitch := testCreateDefaultPeer(dir, 1) + require.NoError(t, peerSwitch.Start()) + defer peerSwitch.Stop() - // Add a non-connected address to the book. - _, addr2 := p2p.CreateRoutableAddr() - pexR.book.AddAddress(addr2, addr1) + // 1. Test crawlPeers dials the peer + pexR.crawlPeers([]*p2p.NetAddress{peerSwitch.NetAddress()}) + assert.Equal(t, 1, sw.Peers().Size()) + assert.True(t, sw.Peers().Has(peerSwitch.NodeInfo().ID())) - // Get some peerInfos to crawl - peerInfos := pexR.getPeersToCrawl() + // 2. attemptDisconnects should not disconnect because of wait period + pexR.attemptDisconnects() + assert.Equal(t, 1, sw.Peers().Size()) - // Make sure it has the proper number of elements - assert.Equal(t, 2, len(peerInfos)) + time.Sleep(100 * time.Millisecond) - // TODO: test + // 3. attemptDisconnects should disconnect after wait period + pexR.attemptDisconnects() + assert.Equal(t, 0, sw.Peers().Size()) } // connect a peer to a seed, wait a bit, then stop it. diff --git a/p2p/switch.go b/p2p/switch.go index 76da9ad0c..5b1da1ead 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -339,14 +339,11 @@ func (sw *Switch) reconnectToPeer(addr *NetAddress) { return } - if sw.IsDialingOrExistingAddress(addr) { - sw.Logger.Debug("Peer connection has been established or dialed while we waiting next try", "addr", addr) - return - } - err := sw.DialPeerWithAddress(addr, true) if err == nil { return // success + } else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok { + return } sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr) @@ -365,9 +362,12 @@ func (sw *Switch) reconnectToPeer(addr *NetAddress) { // sleep an exponentially increasing amount sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) + err := sw.DialPeerWithAddress(addr, true) if err == nil { return // success + } else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok { + return } sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr) } @@ -435,15 +435,10 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b sw.randomSleep(0) - if sw.IsDialingOrExistingAddress(addr) { - sw.Logger.Debug("Ignore attempt to connect to an existing peer", "addr", addr) - return - } - err := sw.DialPeerWithAddress(addr, persistent) if err != nil { switch err.(type) { - case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID: + case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID, ErrCurrentlyDialingOrExistingAddress: sw.Logger.Debug("Error dialing peer", "err", err) default: sw.Logger.Error("Error dialing peer", "err", err) @@ -454,11 +449,20 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b return nil } -// DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully. -// If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails. +// DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects +// and authenticates successfully. +// If `persistent == true`, the switch will always try to reconnect to this +// peer if the connection ever fails. +// If we're currently dialing this address or it belongs to an existing peer, +// ErrCurrentlyDialingOrExistingAddress is returned. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error { + if sw.IsDialingOrExistingAddress(addr) { + return ErrCurrentlyDialingOrExistingAddress{addr.String()} + } + sw.dialing.Set(string(addr.ID), addr) defer sw.dialing.Delete(string(addr.ID)) + return sw.addOutboundPeerWithConfig(addr, sw.config, persistent) }