diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index aff1ab714..4a6eafee5 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -63,7 +63,7 @@ type PEXReactor struct { requestsSent *cmn.CMap // ID->struct{}: unanswered send requests lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us - attemptsToDial sync.Map // dial address (string) -> number of attempts (int) to dial (for exponential backoff) + attemptsToDial sync.Map // address (string) -> {number of attempts (int), last time dialed (time.Time)} } // PEXReactorConfig holds reactor specific configuration data. @@ -76,6 +76,11 @@ type PEXReactorConfig struct { Seeds []string } +type _attemptsToDial struct { + number int + lastDialed time.Time +} + // NewPEXReactor creates new PEX reactor. func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor { r := &PEXReactor{ @@ -367,7 +372,12 @@ func (r *PEXReactor) ensurePeers() { } func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { - attempts := r.AttemptsToDial(addr) + var attempts int + var lastDialed time.Time + if lAttempts, attempted := r.attemptsToDial.Load(addr.DialString()); attempted { + attempts = lAttempts.(_attemptsToDial).number + lastDialed = lAttempts.(_attemptsToDial).lastDialed + } if attempts > maxAttemptsToDial { r.Logger.Error("Reached max attempts to dial", "addr", addr, "attempts", attempts) @@ -376,11 +386,14 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { } // exponential backoff if it's not our first attempt to dial given address - if attempts != 0 { + if attempts > 0 { jitterSeconds := time.Duration(rand.Float64() * float64(time.Second)) // 1s == (1e9 ns) backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second) - r.Logger.Debug("Sleeping before dialing", "addr", addr, "dur", backoffDuration) - time.Sleep(backoffDuration) + sinceLastDialed := time.Now().Sub(lastDialed) + if sinceLastDialed < backoffDuration { + r.Logger.Debug("Too early to dial", "addr", addr, "backoff_duration", backoffDuration, "last_dialed", lastDialed, "time_since", sinceLastDialed) + return + } } err := r.Switch.DialPeerWithAddress(addr, false) @@ -393,7 +406,7 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { r.book.MarkAttempt(addr) } // record attempt - r.attemptsToDial.Store(addr.DialString(), attempts+1) + r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{attempts + 1, time.Now()}) } else { // cleanup any history r.attemptsToDial.Delete(addr.DialString()) @@ -440,9 +453,9 @@ func (r *PEXReactor) dialSeeds() { // AttemptsToDial returns the number of attempts to dial specific address. It // returns 0 if never attempted or successfully connected. func (r *PEXReactor) AttemptsToDial(addr *p2p.NetAddress) int { - attempts, attempted := r.attemptsToDial.Load(addr.DialString()) + lAttempts, attempted := r.attemptsToDial.Load(addr.DialString()) if attempted { - return attempts.(int) + return lAttempts.(_attemptsToDial).number } else { return 0 } diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 41431d34c..f5d815037 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -282,10 +282,13 @@ func TestPEXReactorDialPeer(t *testing.T) { // 1st unsuccessful attempt pexR.dialPeer(addr) + assert.Equal(t, 1, pexR.AttemptsToDial(addr)) + // 2nd unsuccessful attempt pexR.dialPeer(addr) - assert.Equal(t, 2, pexR.AttemptsToDial(addr)) + // must be skipped because it is too early + assert.Equal(t, 1, pexR.AttemptsToDial(addr)) } type mockPeer struct {