Browse Source

skip dialing based on last time dialed

pull/1292/head
Anton Kaliaev 7 years ago
parent
commit
264bce4ddd
No known key found for this signature in database GPG Key ID: 7B6881D965918214
2 changed files with 25 additions and 9 deletions
  1. +21
    -8
      p2p/pex/pex_reactor.go
  2. +4
    -1
      p2p/pex/pex_reactor_test.go

+ 21
- 8
p2p/pex/pex_reactor.go View File

@ -63,7 +63,7 @@ type PEXReactor struct {
requestsSent *cmn.CMap // ID->struct{}: unanswered send requests requestsSent *cmn.CMap // ID->struct{}: unanswered send requests
lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us
attemptsToDial sync.Map // dial address (string) -> number of attempts (int) to dial (for exponential backoff)
attemptsToDial sync.Map // address (string) -> {number of attempts (int), last time dialed (time.Time)}
} }
// PEXReactorConfig holds reactor specific configuration data. // PEXReactorConfig holds reactor specific configuration data.
@ -76,6 +76,11 @@ type PEXReactorConfig struct {
Seeds []string Seeds []string
} }
type _attemptsToDial struct {
number int
lastDialed time.Time
}
// NewPEXReactor creates new PEX reactor. // NewPEXReactor creates new PEX reactor.
func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor { func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
r := &PEXReactor{ r := &PEXReactor{
@ -367,7 +372,12 @@ func (r *PEXReactor) ensurePeers() {
} }
func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) {
attempts := r.AttemptsToDial(addr)
var attempts int
var lastDialed time.Time
if lAttempts, attempted := r.attemptsToDial.Load(addr.DialString()); attempted {
attempts = lAttempts.(_attemptsToDial).number
lastDialed = lAttempts.(_attemptsToDial).lastDialed
}
if attempts > maxAttemptsToDial { if attempts > maxAttemptsToDial {
r.Logger.Error("Reached max attempts to dial", "addr", addr, "attempts", attempts) r.Logger.Error("Reached max attempts to dial", "addr", addr, "attempts", attempts)
@ -376,11 +386,14 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) {
} }
// exponential backoff if it's not our first attempt to dial given address // exponential backoff if it's not our first attempt to dial given address
if attempts != 0 {
if attempts > 0 {
jitterSeconds := time.Duration(rand.Float64() * float64(time.Second)) // 1s == (1e9 ns) jitterSeconds := time.Duration(rand.Float64() * float64(time.Second)) // 1s == (1e9 ns)
backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second) backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second)
r.Logger.Debug("Sleeping before dialing", "addr", addr, "dur", backoffDuration)
time.Sleep(backoffDuration)
sinceLastDialed := time.Now().Sub(lastDialed)
if sinceLastDialed < backoffDuration {
r.Logger.Debug("Too early to dial", "addr", addr, "backoff_duration", backoffDuration, "last_dialed", lastDialed, "time_since", sinceLastDialed)
return
}
} }
err := r.Switch.DialPeerWithAddress(addr, false) err := r.Switch.DialPeerWithAddress(addr, false)
@ -393,7 +406,7 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) {
r.book.MarkAttempt(addr) r.book.MarkAttempt(addr)
} }
// record attempt // record attempt
r.attemptsToDial.Store(addr.DialString(), attempts+1)
r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{attempts + 1, time.Now()})
} else { } else {
// cleanup any history // cleanup any history
r.attemptsToDial.Delete(addr.DialString()) r.attemptsToDial.Delete(addr.DialString())
@ -440,9 +453,9 @@ func (r *PEXReactor) dialSeeds() {
// AttemptsToDial returns the number of attempts to dial specific address. It // AttemptsToDial returns the number of attempts to dial specific address. It
// returns 0 if never attempted or successfully connected. // returns 0 if never attempted or successfully connected.
func (r *PEXReactor) AttemptsToDial(addr *p2p.NetAddress) int { func (r *PEXReactor) AttemptsToDial(addr *p2p.NetAddress) int {
attempts, attempted := r.attemptsToDial.Load(addr.DialString())
lAttempts, attempted := r.attemptsToDial.Load(addr.DialString())
if attempted { if attempted {
return attempts.(int)
return lAttempts.(_attemptsToDial).number
} else { } else {
return 0 return 0
} }


+ 4
- 1
p2p/pex/pex_reactor_test.go View File

@ -282,10 +282,13 @@ func TestPEXReactorDialPeer(t *testing.T) {
// 1st unsuccessful attempt // 1st unsuccessful attempt
pexR.dialPeer(addr) pexR.dialPeer(addr)
assert.Equal(t, 1, pexR.AttemptsToDial(addr))
// 2nd unsuccessful attempt // 2nd unsuccessful attempt
pexR.dialPeer(addr) pexR.dialPeer(addr)
assert.Equal(t, 2, pexR.AttemptsToDial(addr))
// must be skipped because it is too early
assert.Equal(t, 1, pexR.AttemptsToDial(addr))
} }
type mockPeer struct { type mockPeer struct {


Loading…
Cancel
Save