From 21e2c41c6b728efe715179d3c4bdd8c4fc25dde0 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 8 Mar 2018 13:59:30 +0400 Subject: [PATCH 1/7] exponential backoff for addrs in the address book Refs #1125 --- p2p/pex/pex_reactor.go | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 746fb3b39..b358bd486 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -6,6 +6,7 @@ import ( "math/rand" "reflect" "sort" + "sync" "time" "github.com/pkg/errors" @@ -59,6 +60,8 @@ type PEXReactor struct { // maps to prevent abuse requestsSent *cmn.CMap // ID->struct{}: unanswered send requests lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us + + attemptsToDial sync.Map // dial addr -> number of attempts to dial (for exponential backoff) } // PEXReactorConfig holds reactor specific configuration data. @@ -338,9 +341,22 @@ func (r *PEXReactor) ensurePeers() { toDial[try.ID] = try } + // 1s == (1e9 ns) == (1 Billion ns) + billionNs := float64(time.Second.Nanoseconds()) + // Dial picked addresses - for _, item := range toDial { + for _, addr := range toDial { go func(picked *p2p.NetAddress) { + // exponential backoff if it's not our first attempt to dial picked address + var attempt int + if lAttempt, notFirstAttempt := r.attemptsToDial.Load(picked.DialString()); notFirstAttempt { + attempt = lAttempt.(int) + jitterSeconds := time.Duration(rand.Float64() * billionNs) + backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second) + r.Logger.Debug(fmt.Sprintf("Dialing %v", picked), "attempt", attempt, "backoff_duration", backoffDuration) + time.Sleep(backoffDuration) + } + err := r.Switch.DialPeerWithAddress(picked, false) if err != nil { r.Logger.Error("Dialing failed", "err", err) @@ -350,8 +366,13 @@ func (r *PEXReactor) ensurePeers() { } else { r.book.MarkAttempt(picked) } + // record attempt + r.attemptsToDial.Store(picked.DialString(), attempt+1) + } else { + // cleanup any history + r.attemptsToDial.Delete(picked.DialString()) } - }(item) + }(addr) } // If we need more addresses, pick a random peer and ask for more. From 1941b5c769f0fde2ca68ae087705163dd6fc42e5 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 8 Mar 2018 15:55:23 +0400 Subject: [PATCH 2/7] fixes from @xla's review --- p2p/pex/pex_reactor.go | 62 ++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index b358bd486..4d85259fc 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -341,38 +341,9 @@ func (r *PEXReactor) ensurePeers() { toDial[try.ID] = try } - // 1s == (1e9 ns) == (1 Billion ns) - billionNs := float64(time.Second.Nanoseconds()) - // Dial picked addresses for _, addr := range toDial { - go func(picked *p2p.NetAddress) { - // exponential backoff if it's not our first attempt to dial picked address - var attempt int - if lAttempt, notFirstAttempt := r.attemptsToDial.Load(picked.DialString()); notFirstAttempt { - attempt = lAttempt.(int) - jitterSeconds := time.Duration(rand.Float64() * billionNs) - backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second) - r.Logger.Debug(fmt.Sprintf("Dialing %v", picked), "attempt", attempt, "backoff_duration", backoffDuration) - time.Sleep(backoffDuration) - } - - err := r.Switch.DialPeerWithAddress(picked, false) - if err != nil { - r.Logger.Error("Dialing failed", "err", err) - // TODO: detect more "bad peer" scenarios - if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok { - r.book.MarkBad(picked) - } else { - r.book.MarkAttempt(picked) - } - // record attempt - r.attemptsToDial.Store(picked.DialString(), attempt+1) - } else { - // cleanup any history - r.attemptsToDial.Delete(picked.DialString()) - } - }(addr) + go r.dialPeer(addr) } // If we need more addresses, pick a random peer and ask for more. @@ -393,6 +364,37 @@ func (r *PEXReactor) ensurePeers() { } } +func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { + // 1s == (1e9 ns) == (1 Billion ns) + billionNs := float64(time.Second.Nanoseconds()) + + // exponential backoff if it's not our first attempt to dial given address + var attempts int + if lAttempts, attempted := r.attemptsToDial.Load(addr.DialString()); attempted { + attempts = lAttempts.(int) + jitterSeconds := time.Duration(rand.Float64() * billionNs) + backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second) + r.Logger.Debug(fmt.Sprintf("Dialing %v", addr), "attempts", attempts, "backoff_duration", backoffDuration) + time.Sleep(backoffDuration) + } + + err := r.Switch.DialPeerWithAddress(addr, false) + if err != nil { + r.Logger.Error("Dialing failed", "err", err) + // TODO: detect more "bad peer" scenarios + if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok { + r.book.MarkBad(addr) + } else { + r.book.MarkAttempt(addr) + } + // record attempt + r.attemptsToDial.Store(addr.DialString(), attempts+1) + } else { + // cleanup any history + r.attemptsToDial.Delete(addr.DialString()) + } +} + // check seed addresses are well formed func (r *PEXReactor) checkSeeds() error { lSeeds := len(r.config.Seeds) From f0d4f56327caff786b1835649b2df4b7664f70b1 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 9 Mar 2018 16:02:24 +0400 Subject: [PATCH 3/7] refactor pex_reactor tests --- p2p/pex/addrbook.go | 4 + p2p/pex/addrbook_test.go | 35 +++++--- p2p/pex/pex_reactor_test.go | 163 +++++++++++++++--------------------- 3 files changed, 97 insertions(+), 105 deletions(-) diff --git a/p2p/pex/addrbook.go b/p2p/pex/addrbook.go index 95ad70fe7..3a3920486 100644 --- a/p2p/pex/addrbook.go +++ b/p2p/pex/addrbook.go @@ -139,6 +139,10 @@ func (a *addrBook) Wait() { a.wg.Wait() } +func (a *addrBook) FilePath() string { + return a.filePath +} + //------------------------------------------------------- // AddOurAddress one of our addresses. diff --git a/p2p/pex/addrbook_test.go b/p2p/pex/addrbook_test.go index 166d31847..4a8df7166 100644 --- a/p2p/pex/addrbook_test.go +++ b/p2p/pex/addrbook_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "math/rand" + "os" "testing" "github.com/stretchr/testify/assert" @@ -26,17 +27,24 @@ func createTempFileName(prefix string) string { return fname } +func deleteTempFile(fname string) { + err := os.Remove(fname) + if err != nil { + panic(err) + } +} + func TestAddrBookPickAddress(t *testing.T) { - assert := assert.New(t) fname := createTempFileName("addrbook_test") + defer deleteTempFile(fname) // 0 addresses book := NewAddrBook(fname, true) book.SetLogger(log.TestingLogger()) - assert.Zero(book.Size()) + assert.Zero(t, book.Size()) addr := book.PickAddress(50) - assert.Nil(addr, "expected no address") + assert.Nil(t, addr, "expected no address") randAddrs := randNetAddressPairs(t, 1) addrSrc := randAddrs[0] @@ -44,26 +52,27 @@ func TestAddrBookPickAddress(t *testing.T) { // pick an address when we only have new address addr = book.PickAddress(0) - assert.NotNil(addr, "expected an address") + assert.NotNil(t, addr, "expected an address") addr = book.PickAddress(50) - assert.NotNil(addr, "expected an address") + assert.NotNil(t, addr, "expected an address") addr = book.PickAddress(100) - assert.NotNil(addr, "expected an address") + assert.NotNil(t, addr, "expected an address") // pick an address when we only have old address book.MarkGood(addrSrc.addr) addr = book.PickAddress(0) - assert.NotNil(addr, "expected an address") + assert.NotNil(t, addr, "expected an address") addr = book.PickAddress(50) - assert.NotNil(addr, "expected an address") + assert.NotNil(t, addr, "expected an address") // in this case, nNew==0 but we biased 100% to new, so we return nil addr = book.PickAddress(100) - assert.Nil(addr, "did not expected an address") + assert.Nil(t, addr, "did not expected an address") } func TestAddrBookSaveLoad(t *testing.T) { fname := createTempFileName("addrbook_test") + defer deleteTempFile(fname) // 0 addresses book := NewAddrBook(fname, true) @@ -95,6 +104,7 @@ func TestAddrBookSaveLoad(t *testing.T) { func TestAddrBookLookup(t *testing.T) { fname := createTempFileName("addrbook_test") + defer deleteTempFile(fname) randAddrs := randNetAddressPairs(t, 100) @@ -115,8 +125,8 @@ func TestAddrBookLookup(t *testing.T) { } func TestAddrBookPromoteToOld(t *testing.T) { - assert := assert.New(t) fname := createTempFileName("addrbook_test") + defer deleteTempFile(fname) randAddrs := randNetAddressPairs(t, 100) @@ -147,11 +157,12 @@ func TestAddrBookPromoteToOld(t *testing.T) { t.Errorf("selection could not be bigger than the book") } - assert.Equal(book.Size(), 100, "expecting book size to be 100") + assert.Equal(t, book.Size(), 100, "expecting book size to be 100") } func TestAddrBookHandlesDuplicates(t *testing.T) { fname := createTempFileName("addrbook_test") + defer deleteTempFile(fname) book := NewAddrBook(fname, true) book.SetLogger(log.TestingLogger()) @@ -202,6 +213,8 @@ func randIPv4Address(t *testing.T) *p2p.NetAddress { func TestAddrBookRemoveAddress(t *testing.T) { fname := createTempFileName("addrbook_test") + defer deleteTempFile(fname) + book := NewAddrBook(fname, true) book.SetLogger(log.TestingLogger()) diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 41da867ad..6b610f009 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -4,6 +4,7 @@ import ( "fmt" "io/ioutil" "os" + "path/filepath" "testing" "time" @@ -30,49 +31,33 @@ func init() { } func TestPEXReactorBasic(t *testing.T) { - assert, require := assert.New(t), require.New(t) + r, book := createReactor(&PEXReactorConfig{}) + defer teardownReactor(book) - dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(err) - defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", true) - book.SetLogger(log.TestingLogger()) - - r := NewPEXReactor(book, &PEXReactorConfig{}) - r.SetLogger(log.TestingLogger()) - - assert.NotNil(r) - assert.NotEmpty(r.GetChannels()) + assert.NotNil(t, r) + assert.NotEmpty(t, r.GetChannels()) } func TestPEXReactorAddRemovePeer(t *testing.T) { - assert, require := assert.New(t), require.New(t) - - dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(err) - defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", true) - book.SetLogger(log.TestingLogger()) - - r := NewPEXReactor(book, &PEXReactorConfig{}) - r.SetLogger(log.TestingLogger()) + r, book := createReactor(&PEXReactorConfig{}) + defer teardownReactor(book) size := book.Size() peer := p2p.CreateRandomPeer(false) r.AddPeer(peer) - assert.Equal(size+1, book.Size()) + assert.Equal(t, size+1, book.Size()) r.RemovePeer(peer, "peer not available") - assert.Equal(size+1, book.Size()) + assert.Equal(t, size+1, book.Size()) outboundPeer := p2p.CreateRandomPeer(true) r.AddPeer(outboundPeer) - assert.Equal(size+1, book.Size(), "outbound peers should not be added to the address book") + assert.Equal(t, size+1, book.Size(), "outbound peers should not be added to the address book") r.RemovePeer(outboundPeer, "peer not available") - assert.Equal(size+1, book.Size()) + assert.Equal(t, size+1, book.Size()) } func TestPEXReactorRunning(t *testing.T) { @@ -82,7 +67,7 @@ func TestPEXReactorRunning(t *testing.T) { dir, err := ioutil.TempDir("", "pex_reactor") require.Nil(t, err) defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", false) + book := NewAddrBook(filepath.Join(dir, "addrbook.json"), false) book.SetLogger(log.TestingLogger()) // create switches @@ -120,16 +105,8 @@ func TestPEXReactorRunning(t *testing.T) { } func TestPEXReactorReceive(t *testing.T) { - assert, require := assert.New(t), require.New(t) - - dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(err) - defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", false) - book.SetLogger(log.TestingLogger()) - - r := NewPEXReactor(book, &PEXReactorConfig{}) - r.SetLogger(log.TestingLogger()) + r, book := createReactor(&PEXReactorConfig{}) + defer teardownReactor(book) peer := p2p.CreateRandomPeer(false) @@ -140,98 +117,77 @@ func TestPEXReactorReceive(t *testing.T) { addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()} msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) r.Receive(PexChannel, peer, msg) - assert.Equal(size+1, book.Size()) + assert.Equal(t, size+1, book.Size()) msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}}) r.Receive(PexChannel, peer, msg) } func TestPEXReactorRequestMessageAbuse(t *testing.T) { - assert, require := assert.New(t), require.New(t) + r, book := createReactor(&PEXReactorConfig{}) + defer teardownReactor(book) - dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(err) - defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", true) - book.SetLogger(log.TestingLogger()) - - r := NewPEXReactor(book, &PEXReactorConfig{}) - sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) - sw.SetLogger(log.TestingLogger()) - sw.AddReactor("PEX", r) - r.SetSwitch(sw) - r.SetLogger(log.TestingLogger()) + sw := createSwitchAndAddReactors(r) peer := newMockPeer() p2p.AddPeerToSwitch(sw, peer) - assert.True(sw.Peers().Has(peer.ID())) + assert.True(t, sw.Peers().Has(peer.ID())) id := string(peer.ID()) msg := wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}}) // first time creates the entry r.Receive(PexChannel, peer, msg) - assert.True(r.lastReceivedRequests.Has(id)) - assert.True(sw.Peers().Has(peer.ID())) + assert.True(t, r.lastReceivedRequests.Has(id)) + assert.True(t, sw.Peers().Has(peer.ID())) // next time sets the last time value r.Receive(PexChannel, peer, msg) - assert.True(r.lastReceivedRequests.Has(id)) - assert.True(sw.Peers().Has(peer.ID())) + assert.True(t, r.lastReceivedRequests.Has(id)) + assert.True(t, sw.Peers().Has(peer.ID())) // third time is too many too soon - peer is removed r.Receive(PexChannel, peer, msg) - assert.False(r.lastReceivedRequests.Has(id)) - assert.False(sw.Peers().Has(peer.ID())) + assert.False(t, r.lastReceivedRequests.Has(id)) + assert.False(t, sw.Peers().Has(peer.ID())) } func TestPEXReactorAddrsMessageAbuse(t *testing.T) { - assert, require := assert.New(t), require.New(t) - - dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(err) - defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", true) - book.SetLogger(log.TestingLogger()) + r, book := createReactor(&PEXReactorConfig{}) + defer teardownReactor(book) - r := NewPEXReactor(book, &PEXReactorConfig{}) - sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) - sw.SetLogger(log.TestingLogger()) - sw.AddReactor("PEX", r) - r.SetSwitch(sw) - r.SetLogger(log.TestingLogger()) + sw := createSwitchAndAddReactors(r) peer := newMockPeer() p2p.AddPeerToSwitch(sw, peer) - assert.True(sw.Peers().Has(peer.ID())) + assert.True(t, sw.Peers().Has(peer.ID())) id := string(peer.ID()) // request addrs from the peer r.RequestAddrs(peer) - assert.True(r.requestsSent.Has(id)) - assert.True(sw.Peers().Has(peer.ID())) + assert.True(t, r.requestsSent.Has(id)) + assert.True(t, sw.Peers().Has(peer.ID())) addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()} msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) // receive some addrs. should clear the request r.Receive(PexChannel, peer, msg) - assert.False(r.requestsSent.Has(id)) - assert.True(sw.Peers().Has(peer.ID())) + assert.False(t, r.requestsSent.Has(id)) + assert.True(t, sw.Peers().Has(peer.ID())) // receiving more addrs causes a disconnect r.Receive(PexChannel, peer, msg) - assert.False(sw.Peers().Has(peer.ID())) + assert.False(t, sw.Peers().Has(peer.ID())) } func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { - dir, err := ioutil.TempDir("", "pex_reactor") require.Nil(t, err) defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", false) + book := NewAddrBook(filepath.Join(dir, "addrbook.json"), false) book.SetLogger(log.TestingLogger()) // 1. create seed @@ -288,22 +244,11 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { } func TestPEXReactorCrawlStatus(t *testing.T) { - assert, require := assert.New(t), require.New(t) + pexR, book := createReactor(&PEXReactorConfig{SeedMode: true}) + defer teardownReactor(book) - dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(err) - defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", false) - book.SetLogger(log.TestingLogger()) - - pexR := NewPEXReactor(book, &PEXReactorConfig{SeedMode: true}) // Seed/Crawler mode uses data from the Switch - p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { - pexR.SetLogger(log.TestingLogger()) - sw.SetLogger(log.TestingLogger().With("switch", i)) - sw.AddReactor("pex", pexR) - return sw - }) + _ = createSwitchAndAddReactors(pexR) // Create a peer, add it to the peer set and the addrbook. peer := p2p.CreateRandomPeer(false) @@ -319,7 +264,7 @@ func TestPEXReactorCrawlStatus(t *testing.T) { peerInfos := pexR.getPeersToCrawl() // Make sure it has the proper number of elements - assert.Equal(2, len(peerInfos)) + assert.Equal(t, 2, len(peerInfos)) // TODO: test } @@ -400,3 +345,33 @@ func assertPeersWithTimeout( } } } + +func createReactor(config *PEXReactorConfig) (r *PEXReactor, book *addrBook) { + dir, err := ioutil.TempDir("", "pex_reactor") + if err != nil { + panic(err) + } + book = NewAddrBook(filepath.Join(dir, "addrbook.json"), true) + book.SetLogger(log.TestingLogger()) + + r = NewPEXReactor(book, &PEXReactorConfig{}) + r.SetLogger(log.TestingLogger()) + return +} + +func teardownReactor(book *addrBook) { + err := os.RemoveAll(filepath.Dir(book.FilePath())) + if err != nil { + panic(err) + } +} + +func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch { + sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) + sw.SetLogger(log.TestingLogger()) + for _, r := range reactors { + sw.AddReactor(r.String(), r) + r.SetSwitch(sw) + } + return sw +} From f85c8896d9877f034e6c40d11887637150940841 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 9 Mar 2018 16:23:52 +0400 Subject: [PATCH 4/7] test pex_reactor's dialPeer --- p2p/pex/pex_reactor.go | 11 +++++++++++ p2p/pex/pex_reactor_test.go | 25 ++++++++++++++++++++++--- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 4d85259fc..73df67dca 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -432,6 +432,17 @@ func (r *PEXReactor) dialSeeds() { r.Switch.Logger.Error("Couldn't connect to any seeds") } +// AttemptsToDial returns the number of attempts to dial specific address. It +// returns 0 if never attempted or successfully connected. +func (r *PEXReactor) AttemptsToDial(addr *p2p.NetAddress) int { + attempts, attempted := r.attemptsToDial.Load(addr.DialString()) + if attempted { + return attempts.(int) + } else { + return 0 + } +} + //---------------------------------------------------------- // Explores the network searching for more peers. (continuous) diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 6b610f009..41431d34c 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -13,12 +13,11 @@ import ( crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" - cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tmlibs/log" - cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p/conn" + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" ) var ( @@ -269,6 +268,26 @@ func TestPEXReactorCrawlStatus(t *testing.T) { // TODO: test } +func TestPEXReactorDialPeer(t *testing.T) { + pexR, book := createReactor(&PEXReactorConfig{}) + defer teardownReactor(book) + + _ = createSwitchAndAddReactors(pexR) + + peer := newMockPeer() + addr := peer.NodeInfo().NetAddress() + + assert.Equal(t, 0, pexR.AttemptsToDial(addr)) + + // 1st unsuccessful attempt + pexR.dialPeer(addr) + + // 2nd unsuccessful attempt + pexR.dialPeer(addr) + + assert.Equal(t, 2, pexR.AttemptsToDial(addr)) +} + type mockPeer struct { *cmn.BaseService pubKey crypto.PubKey From 0f41570c80858e0081392c03977e0f77c0d21299 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Sun, 11 Mar 2018 13:20:51 +0400 Subject: [PATCH 5/7] fixes from bucky's review --- p2p/pex/pex_reactor.go | 23 ++++++++++++++--------- rpc/lib/client/ws_client.go | 4 +--- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 73df67dca..aff1ab714 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -39,6 +39,8 @@ const ( defaultSeedDisconnectWaitPeriod = 2 * time.Minute // disconnect after this defaultCrawlPeerInterval = 2 * time.Minute // dont redial for this. TODO: back-off defaultCrawlPeersPeriod = 30 * time.Second // check some peers every this + + maxAttemptsToDial = 16 // ~ 35h in total (last attempt - 18h sleep) ) // PEXReactor handles PEX (peer exchange) and ensures that an @@ -61,7 +63,7 @@ type PEXReactor struct { requestsSent *cmn.CMap // ID->struct{}: unanswered send requests lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us - attemptsToDial sync.Map // dial addr -> number of attempts to dial (for exponential backoff) + attemptsToDial sync.Map // dial address (string) -> number of attempts (int) to dial (for exponential backoff) } // PEXReactorConfig holds reactor specific configuration data. @@ -365,22 +367,25 @@ func (r *PEXReactor) ensurePeers() { } func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { - // 1s == (1e9 ns) == (1 Billion ns) - billionNs := float64(time.Second.Nanoseconds()) + attempts := r.AttemptsToDial(addr) + + if attempts > maxAttemptsToDial { + r.Logger.Error("Reached max attempts to dial", "addr", addr, "attempts", attempts) + r.book.MarkBad(addr) + return + } // exponential backoff if it's not our first attempt to dial given address - var attempts int - if lAttempts, attempted := r.attemptsToDial.Load(addr.DialString()); attempted { - attempts = lAttempts.(int) - jitterSeconds := time.Duration(rand.Float64() * billionNs) + if attempts != 0 { + jitterSeconds := time.Duration(rand.Float64() * float64(time.Second)) // 1s == (1e9 ns) backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second) - r.Logger.Debug(fmt.Sprintf("Dialing %v", addr), "attempts", attempts, "backoff_duration", backoffDuration) + r.Logger.Debug("Sleeping before dialing", "addr", addr, "dur", backoffDuration) time.Sleep(backoffDuration) } err := r.Switch.DialPeerWithAddress(addr, false) if err != nil { - r.Logger.Error("Dialing failed", "err", err) + r.Logger.Error("Dialing failed", "addr", addr, "err", err, "attempts", attempts) // TODO: detect more "bad peer" scenarios if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok { r.book.MarkBad(addr) diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index ca75ad561..fe15cda21 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -254,10 +254,8 @@ func (c *WSClient) reconnect() error { c.mtx.Unlock() }() - // 1s == (1e9 ns) == (1 Billion ns) - billionNs := float64(time.Second.Nanoseconds()) for { - jitterSeconds := time.Duration(rand.Float64() * billionNs) + jitterSeconds := time.Duration(rand.Float64() * float64(time.Second)) // 1s == (1e9 ns) backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second) c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration) From 264bce4ddd6c71808a145e02f28b4ea41f8b96a9 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Sun, 11 Mar 2018 14:00:49 +0400 Subject: [PATCH 6/7] skip dialing based on last time dialed --- p2p/pex/pex_reactor.go | 29 +++++++++++++++++++++-------- p2p/pex/pex_reactor_test.go | 5 ++++- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index aff1ab714..4a6eafee5 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -63,7 +63,7 @@ type PEXReactor struct { requestsSent *cmn.CMap // ID->struct{}: unanswered send requests lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us - attemptsToDial sync.Map // dial address (string) -> number of attempts (int) to dial (for exponential backoff) + attemptsToDial sync.Map // address (string) -> {number of attempts (int), last time dialed (time.Time)} } // PEXReactorConfig holds reactor specific configuration data. @@ -76,6 +76,11 @@ type PEXReactorConfig struct { Seeds []string } +type _attemptsToDial struct { + number int + lastDialed time.Time +} + // NewPEXReactor creates new PEX reactor. func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor { r := &PEXReactor{ @@ -367,7 +372,12 @@ func (r *PEXReactor) ensurePeers() { } func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { - attempts := r.AttemptsToDial(addr) + var attempts int + var lastDialed time.Time + if lAttempts, attempted := r.attemptsToDial.Load(addr.DialString()); attempted { + attempts = lAttempts.(_attemptsToDial).number + lastDialed = lAttempts.(_attemptsToDial).lastDialed + } if attempts > maxAttemptsToDial { r.Logger.Error("Reached max attempts to dial", "addr", addr, "attempts", attempts) @@ -376,11 +386,14 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { } // exponential backoff if it's not our first attempt to dial given address - if attempts != 0 { + if attempts > 0 { jitterSeconds := time.Duration(rand.Float64() * float64(time.Second)) // 1s == (1e9 ns) backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second) - r.Logger.Debug("Sleeping before dialing", "addr", addr, "dur", backoffDuration) - time.Sleep(backoffDuration) + sinceLastDialed := time.Now().Sub(lastDialed) + if sinceLastDialed < backoffDuration { + r.Logger.Debug("Too early to dial", "addr", addr, "backoff_duration", backoffDuration, "last_dialed", lastDialed, "time_since", sinceLastDialed) + return + } } err := r.Switch.DialPeerWithAddress(addr, false) @@ -393,7 +406,7 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { r.book.MarkAttempt(addr) } // record attempt - r.attemptsToDial.Store(addr.DialString(), attempts+1) + r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{attempts + 1, time.Now()}) } else { // cleanup any history r.attemptsToDial.Delete(addr.DialString()) @@ -440,9 +453,9 @@ func (r *PEXReactor) dialSeeds() { // AttemptsToDial returns the number of attempts to dial specific address. It // returns 0 if never attempted or successfully connected. func (r *PEXReactor) AttemptsToDial(addr *p2p.NetAddress) int { - attempts, attempted := r.attemptsToDial.Load(addr.DialString()) + lAttempts, attempted := r.attemptsToDial.Load(addr.DialString()) if attempted { - return attempts.(int) + return lAttempts.(_attemptsToDial).number } else { return 0 } diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 41431d34c..f5d815037 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -282,10 +282,13 @@ func TestPEXReactorDialPeer(t *testing.T) { // 1st unsuccessful attempt pexR.dialPeer(addr) + assert.Equal(t, 1, pexR.AttemptsToDial(addr)) + // 2nd unsuccessful attempt pexR.dialPeer(addr) - assert.Equal(t, 2, pexR.AttemptsToDial(addr)) + // must be skipped because it is too early + assert.Equal(t, 1, pexR.AttemptsToDial(addr)) } type mockPeer struct { From fc5b0471d930d742fed138622462b6eb144f2663 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Sun, 11 Mar 2018 14:13:34 +0400 Subject: [PATCH 7/7] use time.Since --- p2p/pex/pex_reactor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 4a6eafee5..193efc88d 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -40,7 +40,7 @@ const ( defaultCrawlPeerInterval = 2 * time.Minute // dont redial for this. TODO: back-off defaultCrawlPeersPeriod = 30 * time.Second // check some peers every this - maxAttemptsToDial = 16 // ~ 35h in total (last attempt - 18h sleep) + maxAttemptsToDial = 16 // ~ 35h in total (last attempt - 18h) ) // PEXReactor handles PEX (peer exchange) and ensures that an @@ -389,7 +389,7 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { if attempts > 0 { jitterSeconds := time.Duration(rand.Float64() * float64(time.Second)) // 1s == (1e9 ns) backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second) - sinceLastDialed := time.Now().Sub(lastDialed) + sinceLastDialed := time.Since(lastDialed) if sinceLastDialed < backoffDuration { r.Logger.Debug("Too early to dial", "addr", addr, "backoff_duration", backoffDuration, "last_dialed", lastDialed, "time_since", sinceLastDialed) return