diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 2cee9e221..746fb3b39 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -260,9 +260,17 @@ func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) { // Ensures that sufficient peers are connected. (continuous) func (r *PEXReactor) ensurePeersRoutine() { - // Randomize when routine starts - ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6 - time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond) + var ( + seed = rand.New(rand.NewSource(time.Now().UnixNano())) + jitter = seed.Int63n(r.ensurePeersPeriod.Nanoseconds()) + ) + + // Randomize first round of communication to avoid thundering herd. + // If no potential peers are present directly start connecting so we guarantee + // swift setup with the help of configured seeds. + if r.hasPotentialPeers() { + time.Sleep(time.Duration(jitter)) + } // fire once immediately. // ensures we dial the seeds right away if the book is empty @@ -287,9 +295,18 @@ func (r *PEXReactor) ensurePeersRoutine() { // the node operator. It should not be used to compute what addresses are // already connected or not. func (r *PEXReactor) ensurePeers() { - numOutPeers, numInPeers, numDialing := r.Switch.NumPeers() - numToDial := defaultMinNumOutboundPeers - (numOutPeers + numDialing) - r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial) + var ( + out, in, dial = r.Switch.NumPeers() + numToDial = defaultMinNumOutboundPeers - (out + dial) + ) + r.Logger.Info( + "Ensure peers", + "numOutPeers", out, + "numInPeers", in, + "numDialing", dial, + "numToDial", numToDial, + ) + if numToDial <= 0 { return } @@ -297,11 +314,12 @@ func (r *PEXReactor) ensurePeers() { // bias to prefer more vetted peers when we have fewer connections. // not perfect, but somewhate ensures that we prioritize connecting to more-vetted // NOTE: range here is [10, 90]. Too high ? - newBias := cmn.MinInt(numOutPeers, 8)*10 + 10 + newBias := cmn.MinInt(out, 8)*10 + 10 toDial := make(map[p2p.ID]*p2p.NetAddress) // Try maxAttempts times to pick numToDial addresses to dial maxAttempts := numToDial * 3 + for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ { try := r.book.PickAddress(newBias) if try == nil { @@ -348,7 +366,7 @@ func (r *PEXReactor) ensurePeers() { } // If we are not connected to nor dialing anybody, fallback to dialing a seed. - if numOutPeers+numInPeers+numDialing+len(toDial) == 0 { + if out+in+dial+len(toDial) == 0 { r.Logger.Info("No addresses to dial nor connected peers. Falling back to seeds") r.dialSeeds() } @@ -414,6 +432,14 @@ func (r *PEXReactor) crawlPeersRoutine() { } } +// hasPotentialPeers indicates if there is a potential peer to connect to, by +// consulting the Switch as well as the AddrBook. +func (r *PEXReactor) hasPotentialPeers() bool { + out, in, dial := r.Switch.NumPeers() + + return out+in+dial > 0 && len(r.book.ListOfKnownAddresses()) > 0 +} + // crawlPeerInfo handles temporary data needed for the // network crawling performed during seed/crawler mode. type crawlPeerInfo struct { diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 57f02423f..41da867ad 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -119,39 +119,6 @@ func TestPEXReactorRunning(t *testing.T) { } } -func assertPeersWithTimeout(t *testing.T, switches []*p2p.Switch, checkPeriod, timeout time.Duration, nPeers int) { - ticker := time.NewTicker(checkPeriod) - remaining := timeout - for { - select { - case <-ticker.C: - // check peers are connected - allGood := true - for _, s := range switches { - outbound, inbound, _ := s.NumPeers() - if outbound+inbound < nPeers { - allGood = false - } - } - remaining -= checkPeriod - if remaining < 0 { - remaining = 0 - } - if allGood { - return - } - case <-time.After(remaining): - numPeersStr := "" - for i, s := range switches { - outbound, inbound, _ := s.NumPeers() - numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound) - } - t.Errorf("expected all switches to be connected to at least one peer (switches: %s)", numPeersStr) - return - } - } -} - func TestPEXReactorReceive(t *testing.T) { assert, require := assert.New(t), require.New(t) @@ -259,6 +226,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) { } func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { + dir, err := ioutil.TempDir("", "pex_reactor") require.Nil(t, err) defer os.RemoveAll(dir) // nolint: errcheck @@ -267,36 +235,56 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { book.SetLogger(log.TestingLogger()) // 1. create seed - seed := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { - sw.SetLogger(log.TestingLogger()) + seed := p2p.MakeSwitch( + config, + 0, + "127.0.0.1", + "123.123.123", + func(i int, sw *p2p.Switch) *p2p.Switch { + sw.SetLogger(log.TestingLogger()) - r := NewPEXReactor(book, &PEXReactorConfig{}) - r.SetLogger(log.TestingLogger()) - r.SetEnsurePeersPeriod(250 * time.Millisecond) - sw.AddReactor("pex", r) - return sw - }) - seed.AddListener(p2p.NewDefaultListener("tcp", seed.NodeInfo().ListenAddr, true, log.TestingLogger())) - err = seed.Start() - require.Nil(t, err) + r := NewPEXReactor(book, &PEXReactorConfig{}) + r.SetLogger(log.TestingLogger()) + sw.AddReactor("pex", r) + return sw + }, + ) + seed.AddListener( + p2p.NewDefaultListener( + "tcp", + seed.NodeInfo().ListenAddr, + true, + log.TestingLogger(), + ), + ) + require.Nil(t, seed.Start()) defer seed.Stop() - // 2. create usual peer - sw := p2p.MakeSwitch(config, 1, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { - sw.SetLogger(log.TestingLogger()) - - r := NewPEXReactor(book, &PEXReactorConfig{Seeds: []string{seed.NodeInfo().NetAddress().String()}}) - r.SetLogger(log.TestingLogger()) - r.SetEnsurePeersPeriod(250 * time.Millisecond) - sw.AddReactor("pex", r) - return sw - }) - err = sw.Start() - require.Nil(t, err) - defer sw.Stop() + // 2. create usual peer with only seed configured. + peer := p2p.MakeSwitch( + config, + 1, + "127.0.0.1", + "123.123.123", + func(i int, sw *p2p.Switch) *p2p.Switch { + sw.SetLogger(log.TestingLogger()) + + r := NewPEXReactor( + book, + &PEXReactorConfig{ + Seeds: []string{seed.NodeInfo().NetAddress().String()}, + }, + ) + r.SetLogger(log.TestingLogger()) + sw.AddReactor("pex", r) + return sw + }, + ) + require.Nil(t, peer.Start()) + defer peer.Stop() - // 3. check that peer at least connects to seed - assertPeersWithTimeout(t, []*p2p.Switch{sw}, 10*time.Millisecond, 10*time.Second, 1) + // 3. check that the peer connects to seed immediately + assertPeersWithTimeout(t, []*p2p.Switch{peer}, 10*time.Millisecond, 1*time.Second, 1) } func TestPEXReactorCrawlStatus(t *testing.T) { @@ -368,3 +356,47 @@ func (mp mockPeer) Send(byte, interface{}) bool { return false } func (mp mockPeer) TrySend(byte, interface{}) bool { return false } func (mp mockPeer) Set(string, interface{}) {} func (mp mockPeer) Get(string) interface{} { return nil } + +func assertPeersWithTimeout( + t *testing.T, + switches []*p2p.Switch, + checkPeriod, timeout time.Duration, + nPeers int, +) { + var ( + ticker = time.NewTicker(checkPeriod) + remaining = timeout + ) + + for { + select { + case <-ticker.C: + // check peers are connected + allGood := true + for _, s := range switches { + outbound, inbound, _ := s.NumPeers() + if outbound+inbound < nPeers { + allGood = false + } + } + remaining -= checkPeriod + if remaining < 0 { + remaining = 0 + } + if allGood { + return + } + case <-time.After(remaining): + numPeersStr := "" + for i, s := range switches { + outbound, inbound, _ := s.NumPeers() + numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound) + } + t.Errorf( + "expected all switches to be connected to at least one peer (switches: %s)", + numPeersStr, + ) + return + } + } +}