Browse Source

Dial seeds directly without potential peers

In order to improve the operator experience we want the node to dial
seeds immediately if there are no peers to connect to. Until now the
routine responsible for ensuring peers are connected to would wait
a random amount of time up to 30s (if not configured otherwise).
pull/1233/head
Alexander Simmerl 7 years ago
parent
commit
8f2703e8b2
No known key found for this signature in database GPG Key ID: 4694E95C9CC61BDA
2 changed files with 125 additions and 67 deletions
  1. +34
    -8
      p2p/pex/pex_reactor.go
  2. +91
    -59
      p2p/pex/pex_reactor_test.go

+ 34
- 8
p2p/pex/pex_reactor.go View File

@ -260,9 +260,17 @@ func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
// Ensures that sufficient peers are connected. (continuous)
func (r *PEXReactor) ensurePeersRoutine() {
// Randomize when routine starts
ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6
time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
var (
seed = rand.New(rand.NewSource(time.Now().UnixNano()))
jitter = seed.Int63n(r.ensurePeersPeriod.Nanoseconds())
)
// Randomize first round of communication to avoid thundering herd.
// If no potential peers are present directly start connecting so we guarantee
// swift setup with the help of configured seeds.
if r.hasPotentialPeers() {
time.Sleep(time.Duration(jitter))
}
// fire once immediately.
// ensures we dial the seeds right away if the book is empty
@ -287,9 +295,18 @@ func (r *PEXReactor) ensurePeersRoutine() {
// the node operator. It should not be used to compute what addresses are
// already connected or not.
func (r *PEXReactor) ensurePeers() {
numOutPeers, numInPeers, numDialing := r.Switch.NumPeers()
numToDial := defaultMinNumOutboundPeers - (numOutPeers + numDialing)
r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
var (
out, in, dial = r.Switch.NumPeers()
numToDial = defaultMinNumOutboundPeers - (out + dial)
)
r.Logger.Info(
"Ensure peers",
"numOutPeers", out,
"numInPeers", in,
"numDialing", dial,
"numToDial", numToDial,
)
if numToDial <= 0 {
return
}
@ -297,11 +314,12 @@ func (r *PEXReactor) ensurePeers() {
// bias to prefer more vetted peers when we have fewer connections.
// not perfect, but somewhate ensures that we prioritize connecting to more-vetted
// NOTE: range here is [10, 90]. Too high ?
newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
newBias := cmn.MinInt(out, 8)*10 + 10
toDial := make(map[p2p.ID]*p2p.NetAddress)
// Try maxAttempts times to pick numToDial addresses to dial
maxAttempts := numToDial * 3
for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
try := r.book.PickAddress(newBias)
if try == nil {
@ -348,7 +366,7 @@ func (r *PEXReactor) ensurePeers() {
}
// If we are not connected to nor dialing anybody, fallback to dialing a seed.
if numOutPeers+numInPeers+numDialing+len(toDial) == 0 {
if out+in+dial+len(toDial) == 0 {
r.Logger.Info("No addresses to dial nor connected peers. Falling back to seeds")
r.dialSeeds()
}
@ -414,6 +432,14 @@ func (r *PEXReactor) crawlPeersRoutine() {
}
}
// hasPotentialPeers indicates if there is a potential peer to connect to, by
// consulting the Switch as well as the AddrBook.
func (r *PEXReactor) hasPotentialPeers() bool {
out, in, dial := r.Switch.NumPeers()
return out+in+dial > 0 && len(r.book.ListOfKnownAddresses()) > 0
}
// crawlPeerInfo handles temporary data needed for the
// network crawling performed during seed/crawler mode.
type crawlPeerInfo struct {


+ 91
- 59
p2p/pex/pex_reactor_test.go View File

@ -119,39 +119,6 @@ func TestPEXReactorRunning(t *testing.T) {
}
}
func assertPeersWithTimeout(t *testing.T, switches []*p2p.Switch, checkPeriod, timeout time.Duration, nPeers int) {
ticker := time.NewTicker(checkPeriod)
remaining := timeout
for {
select {
case <-ticker.C:
// check peers are connected
allGood := true
for _, s := range switches {
outbound, inbound, _ := s.NumPeers()
if outbound+inbound < nPeers {
allGood = false
}
}
remaining -= checkPeriod
if remaining < 0 {
remaining = 0
}
if allGood {
return
}
case <-time.After(remaining):
numPeersStr := ""
for i, s := range switches {
outbound, inbound, _ := s.NumPeers()
numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
}
t.Errorf("expected all switches to be connected to at least one peer (switches: %s)", numPeersStr)
return
}
}
}
func TestPEXReactorReceive(t *testing.T) {
assert, require := assert.New(t), require.New(t)
@ -259,6 +226,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
}
func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
dir, err := ioutil.TempDir("", "pex_reactor")
require.Nil(t, err)
defer os.RemoveAll(dir) // nolint: errcheck
@ -267,36 +235,56 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
book.SetLogger(log.TestingLogger())
// 1. create seed
seed := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
sw.SetLogger(log.TestingLogger())
seed := p2p.MakeSwitch(
config,
0,
"127.0.0.1",
"123.123.123",
func(i int, sw *p2p.Switch) *p2p.Switch {
sw.SetLogger(log.TestingLogger())
r := NewPEXReactor(book, &PEXReactorConfig{})
r.SetLogger(log.TestingLogger())
r.SetEnsurePeersPeriod(250 * time.Millisecond)
sw.AddReactor("pex", r)
return sw
})
seed.AddListener(p2p.NewDefaultListener("tcp", seed.NodeInfo().ListenAddr, true, log.TestingLogger()))
err = seed.Start()
require.Nil(t, err)
r := NewPEXReactor(book, &PEXReactorConfig{})
r.SetLogger(log.TestingLogger())
sw.AddReactor("pex", r)
return sw
},
)
seed.AddListener(
p2p.NewDefaultListener(
"tcp",
seed.NodeInfo().ListenAddr,
true,
log.TestingLogger(),
),
)
require.Nil(t, seed.Start())
defer seed.Stop()
// 2. create usual peer
sw := p2p.MakeSwitch(config, 1, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
sw.SetLogger(log.TestingLogger())
r := NewPEXReactor(book, &PEXReactorConfig{Seeds: []string{seed.NodeInfo().NetAddress().String()}})
r.SetLogger(log.TestingLogger())
r.SetEnsurePeersPeriod(250 * time.Millisecond)
sw.AddReactor("pex", r)
return sw
})
err = sw.Start()
require.Nil(t, err)
defer sw.Stop()
// 2. create usual peer with only seed configured.
peer := p2p.MakeSwitch(
config,
1,
"127.0.0.1",
"123.123.123",
func(i int, sw *p2p.Switch) *p2p.Switch {
sw.SetLogger(log.TestingLogger())
r := NewPEXReactor(
book,
&PEXReactorConfig{
Seeds: []string{seed.NodeInfo().NetAddress().String()},
},
)
r.SetLogger(log.TestingLogger())
sw.AddReactor("pex", r)
return sw
},
)
require.Nil(t, peer.Start())
defer peer.Stop()
// 3. check that peer at least connects to seed
assertPeersWithTimeout(t, []*p2p.Switch{sw}, 10*time.Millisecond, 10*time.Second, 1)
// 3. check that the peer connects to seed immediately
assertPeersWithTimeout(t, []*p2p.Switch{peer}, 10*time.Millisecond, 1*time.Second, 1)
}
func TestPEXReactorCrawlStatus(t *testing.T) {
@ -368,3 +356,47 @@ func (mp mockPeer) Send(byte, interface{}) bool { return false }
func (mp mockPeer) TrySend(byte, interface{}) bool { return false }
func (mp mockPeer) Set(string, interface{}) {}
func (mp mockPeer) Get(string) interface{} { return nil }
func assertPeersWithTimeout(
t *testing.T,
switches []*p2p.Switch,
checkPeriod, timeout time.Duration,
nPeers int,
) {
var (
ticker = time.NewTicker(checkPeriod)
remaining = timeout
)
for {
select {
case <-ticker.C:
// check peers are connected
allGood := true
for _, s := range switches {
outbound, inbound, _ := s.NumPeers()
if outbound+inbound < nPeers {
allGood = false
}
}
remaining -= checkPeriod
if remaining < 0 {
remaining = 0
}
if allGood {
return
}
case <-time.After(remaining):
numPeersStr := ""
for i, s := range switches {
outbound, inbound, _ := s.NumPeers()
numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
}
t.Errorf(
"expected all switches to be connected to at least one peer (switches: %s)",
numPeersStr,
)
return
}
}
}

Loading…
Cancel
Save