From 8f2703e8b22682bab69e567ed59c087cc487e0b5 Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Thu, 1 Mar 2018 18:41:30 +0100 Subject: [PATCH] Dial seeds directly without potential peers In order to improve the operator experience we want the node to dial seeds immediately if there are no peers to connect to. Until now the routine responsible for ensuring peers are connected to would wait a random amount of time up to 30s (if not configured otherwise). --- p2p/pex/pex_reactor.go | 42 ++++++++-- p2p/pex/pex_reactor_test.go | 150 ++++++++++++++++++++++-------------- 2 files changed, 125 insertions(+), 67 deletions(-) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 2cee9e221..746fb3b39 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -260,9 +260,17 @@ func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) { // Ensures that sufficient peers are connected. (continuous) func (r *PEXReactor) ensurePeersRoutine() { - // Randomize when routine starts - ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6 - time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond) + var ( + seed = rand.New(rand.NewSource(time.Now().UnixNano())) + jitter = seed.Int63n(r.ensurePeersPeriod.Nanoseconds()) + ) + + // Randomize first round of communication to avoid thundering herd. + // If no potential peers are present directly start connecting so we guarantee + // swift setup with the help of configured seeds. + if r.hasPotentialPeers() { + time.Sleep(time.Duration(jitter)) + } // fire once immediately. // ensures we dial the seeds right away if the book is empty @@ -287,9 +295,18 @@ func (r *PEXReactor) ensurePeersRoutine() { // the node operator. It should not be used to compute what addresses are // already connected or not. func (r *PEXReactor) ensurePeers() { - numOutPeers, numInPeers, numDialing := r.Switch.NumPeers() - numToDial := defaultMinNumOutboundPeers - (numOutPeers + numDialing) - r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial) + var ( + out, in, dial = r.Switch.NumPeers() + numToDial = defaultMinNumOutboundPeers - (out + dial) + ) + r.Logger.Info( + "Ensure peers", + "numOutPeers", out, + "numInPeers", in, + "numDialing", dial, + "numToDial", numToDial, + ) + if numToDial <= 0 { return } @@ -297,11 +314,12 @@ func (r *PEXReactor) ensurePeers() { // bias to prefer more vetted peers when we have fewer connections. // not perfect, but somewhate ensures that we prioritize connecting to more-vetted // NOTE: range here is [10, 90]. Too high ? - newBias := cmn.MinInt(numOutPeers, 8)*10 + 10 + newBias := cmn.MinInt(out, 8)*10 + 10 toDial := make(map[p2p.ID]*p2p.NetAddress) // Try maxAttempts times to pick numToDial addresses to dial maxAttempts := numToDial * 3 + for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ { try := r.book.PickAddress(newBias) if try == nil { @@ -348,7 +366,7 @@ func (r *PEXReactor) ensurePeers() { } // If we are not connected to nor dialing anybody, fallback to dialing a seed. - if numOutPeers+numInPeers+numDialing+len(toDial) == 0 { + if out+in+dial+len(toDial) == 0 { r.Logger.Info("No addresses to dial nor connected peers. Falling back to seeds") r.dialSeeds() } @@ -414,6 +432,14 @@ func (r *PEXReactor) crawlPeersRoutine() { } } +// hasPotentialPeers indicates if there is a potential peer to connect to, by +// consulting the Switch as well as the AddrBook. +func (r *PEXReactor) hasPotentialPeers() bool { + out, in, dial := r.Switch.NumPeers() + + return out+in+dial > 0 && len(r.book.ListOfKnownAddresses()) > 0 +} + // crawlPeerInfo handles temporary data needed for the // network crawling performed during seed/crawler mode. type crawlPeerInfo struct { diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 57f02423f..41da867ad 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -119,39 +119,6 @@ func TestPEXReactorRunning(t *testing.T) { } } -func assertPeersWithTimeout(t *testing.T, switches []*p2p.Switch, checkPeriod, timeout time.Duration, nPeers int) { - ticker := time.NewTicker(checkPeriod) - remaining := timeout - for { - select { - case <-ticker.C: - // check peers are connected - allGood := true - for _, s := range switches { - outbound, inbound, _ := s.NumPeers() - if outbound+inbound < nPeers { - allGood = false - } - } - remaining -= checkPeriod - if remaining < 0 { - remaining = 0 - } - if allGood { - return - } - case <-time.After(remaining): - numPeersStr := "" - for i, s := range switches { - outbound, inbound, _ := s.NumPeers() - numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound) - } - t.Errorf("expected all switches to be connected to at least one peer (switches: %s)", numPeersStr) - return - } - } -} - func TestPEXReactorReceive(t *testing.T) { assert, require := assert.New(t), require.New(t) @@ -259,6 +226,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) { } func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { + dir, err := ioutil.TempDir("", "pex_reactor") require.Nil(t, err) defer os.RemoveAll(dir) // nolint: errcheck @@ -267,36 +235,56 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { book.SetLogger(log.TestingLogger()) // 1. create seed - seed := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { - sw.SetLogger(log.TestingLogger()) + seed := p2p.MakeSwitch( + config, + 0, + "127.0.0.1", + "123.123.123", + func(i int, sw *p2p.Switch) *p2p.Switch { + sw.SetLogger(log.TestingLogger()) - r := NewPEXReactor(book, &PEXReactorConfig{}) - r.SetLogger(log.TestingLogger()) - r.SetEnsurePeersPeriod(250 * time.Millisecond) - sw.AddReactor("pex", r) - return sw - }) - seed.AddListener(p2p.NewDefaultListener("tcp", seed.NodeInfo().ListenAddr, true, log.TestingLogger())) - err = seed.Start() - require.Nil(t, err) + r := NewPEXReactor(book, &PEXReactorConfig{}) + r.SetLogger(log.TestingLogger()) + sw.AddReactor("pex", r) + return sw + }, + ) + seed.AddListener( + p2p.NewDefaultListener( + "tcp", + seed.NodeInfo().ListenAddr, + true, + log.TestingLogger(), + ), + ) + require.Nil(t, seed.Start()) defer seed.Stop() - // 2. create usual peer - sw := p2p.MakeSwitch(config, 1, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { - sw.SetLogger(log.TestingLogger()) - - r := NewPEXReactor(book, &PEXReactorConfig{Seeds: []string{seed.NodeInfo().NetAddress().String()}}) - r.SetLogger(log.TestingLogger()) - r.SetEnsurePeersPeriod(250 * time.Millisecond) - sw.AddReactor("pex", r) - return sw - }) - err = sw.Start() - require.Nil(t, err) - defer sw.Stop() + // 2. create usual peer with only seed configured. + peer := p2p.MakeSwitch( + config, + 1, + "127.0.0.1", + "123.123.123", + func(i int, sw *p2p.Switch) *p2p.Switch { + sw.SetLogger(log.TestingLogger()) + + r := NewPEXReactor( + book, + &PEXReactorConfig{ + Seeds: []string{seed.NodeInfo().NetAddress().String()}, + }, + ) + r.SetLogger(log.TestingLogger()) + sw.AddReactor("pex", r) + return sw + }, + ) + require.Nil(t, peer.Start()) + defer peer.Stop() - // 3. check that peer at least connects to seed - assertPeersWithTimeout(t, []*p2p.Switch{sw}, 10*time.Millisecond, 10*time.Second, 1) + // 3. check that the peer connects to seed immediately + assertPeersWithTimeout(t, []*p2p.Switch{peer}, 10*time.Millisecond, 1*time.Second, 1) } func TestPEXReactorCrawlStatus(t *testing.T) { @@ -368,3 +356,47 @@ func (mp mockPeer) Send(byte, interface{}) bool { return false } func (mp mockPeer) TrySend(byte, interface{}) bool { return false } func (mp mockPeer) Set(string, interface{}) {} func (mp mockPeer) Get(string) interface{} { return nil } + +func assertPeersWithTimeout( + t *testing.T, + switches []*p2p.Switch, + checkPeriod, timeout time.Duration, + nPeers int, +) { + var ( + ticker = time.NewTicker(checkPeriod) + remaining = timeout + ) + + for { + select { + case <-ticker.C: + // check peers are connected + allGood := true + for _, s := range switches { + outbound, inbound, _ := s.NumPeers() + if outbound+inbound < nPeers { + allGood = false + } + } + remaining -= checkPeriod + if remaining < 0 { + remaining = 0 + } + if allGood { + return + } + case <-time.After(remaining): + numPeersStr := "" + for i, s := range switches { + outbound, inbound, _ := s.NumPeers() + numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound) + } + t.Errorf( + "expected all switches to be connected to at least one peer (switches: %s)", + numPeersStr, + ) + return + } + } +}