From 6fb2f44cc33027e3e70963b5029dc64314975bcf Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Tue, 31 Jul 2018 13:09:01 -0700 Subject: [PATCH] p2p: Connect to peers from a seed node immediately (#2115) This is to reduce wait times when initially connecting. This still runs checks such as whether you still want additional peers. A test case has been created, which fails if this change is not included. --- CHANGELOG_PENDING.md | 1 + p2p/pex/pex_reactor.go | 28 +++++--- p2p/pex/pex_reactor_test.go | 129 ++++++++++++++++++++++++++++-------- 3 files changed, 121 insertions(+), 37 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index a65c6e45c..e85a6ed64 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -20,6 +20,7 @@ IMPROVEMENTS: - only process one block at a time to avoid starving - [crypto] Switch hkdfchachapoly1305 to xchachapoly1305 - [common] bit array functions which take in another parameter are now thread safe +- [p2p] begin connecting to peers as soon a seed node provides them to you ([#2093](https://github.com/tendermint/tendermint/issues/2093)) BUG FIXES: - [common] Safely handle cases where atomic write files already exist [#2109](https://github.com/tendermint/tendermint/issues/2109) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 2929d9336..c59b3797d 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -74,6 +74,8 @@ type PEXReactor struct { requestsSent *cmn.CMap // ID->struct{}: unanswered send requests lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us + seedAddrs []*p2p.NetAddress + attemptsToDial sync.Map // address (string) -> {number of attempts (int), last time dialed (time.Time)} } @@ -120,10 +122,13 @@ func (r *PEXReactor) OnStart() error { // return err if user provided a bad seed address // or a host name that we cant resolve - if err := r.checkSeeds(); err != nil { + seedAddrs, err := r.checkSeeds() + if err != nil { return err } + r.seedAddrs = seedAddrs + // Check if this node should run // in seed/crawler mode if r.config.SeedMode { @@ -281,7 +286,6 @@ func (r *PEXReactor) RequestAddrs(p Peer) { // request for this peer and deletes the open request. // If there's no open request for the src peer, it returns an error. func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error { - id := string(src.ID()) if !r.requestsSent.Has(id) { return cmn.NewError("Received unsolicited pexAddrsMessage") @@ -297,6 +301,13 @@ func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error { err := r.book.AddAddress(netAddr, srcAddr) r.logErrAddrBook(err) + + // If this address came from a seed node, try to connect to it without waiting. + for _, seedAddr := range r.seedAddrs { + if seedAddr.Equals(srcAddr) { + r.ensurePeers() + } + } } return nil } @@ -468,18 +479,18 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { } // check seed addresses are well formed -func (r *PEXReactor) checkSeeds() error { +func (r *PEXReactor) checkSeeds() ([]*p2p.NetAddress, error) { lSeeds := len(r.config.Seeds) if lSeeds == 0 { - return nil + return nil, nil } - _, errs := p2p.NewNetAddressStrings(r.config.Seeds) + netAddrs, errs := p2p.NewNetAddressStrings(r.config.Seeds) for _, err := range errs { if err != nil { - return err + return nil, err } } - return nil + return netAddrs, nil } // randomly dial seeds until we connect to one or exhaust them @@ -488,13 +499,12 @@ func (r *PEXReactor) dialSeeds() { if lSeeds == 0 { return } - seedAddrs, _ := p2p.NewNetAddressStrings(r.config.Seeds) perm := cmn.RandPerm(lSeeds) // perm := r.Switch.rng.Perm(lSeeds) for _, i := range perm { // dial a random seed - seedAddr := seedAddrs[i] + seedAddr := r.seedAddrs[i] err := r.Switch.DialPeerWithAddress(seedAddr, false) if err == nil { return diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 8d54693fe..36f38c570 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -211,31 +211,26 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { defer os.RemoveAll(dir) // nolint: errcheck // 1. create seed - seed := p2p.MakeSwitch( - cfg, - 0, - "127.0.0.1", - "123.123.123", - func(i int, sw *p2p.Switch) *p2p.Switch { - book := NewAddrBook(filepath.Join(dir, "addrbook0.json"), false) - book.SetLogger(log.TestingLogger()) - sw.SetAddrBook(book) - - sw.SetLogger(log.TestingLogger()) - - r := NewPEXReactor(book, &PEXReactorConfig{}) - r.SetLogger(log.TestingLogger()) - sw.AddReactor("pex", r) - return sw - }, - ) - seed.AddListener( - p2p.NewDefaultListener("tcp://"+seed.NodeInfo().ListenAddr, "", false, log.TestingLogger()), - ) + seed := testCreateSeed(dir, 0, []*p2p.NetAddress{}, []*p2p.NetAddress{}) require.Nil(t, seed.Start()) defer seed.Stop() // 2. create usual peer with only seed configured. + peer := testCreatePeerWithSeed(dir, 1, seed) + require.Nil(t, peer.Start()) + defer peer.Stop() + + // 3. check that the peer connects to seed immediately + assertPeersWithTimeout(t, []*p2p.Switch{peer}, 10*time.Millisecond, 3*time.Second, 1) +} + +func TestConnectionSpeedForPeerReceivedFromSeed(t *testing.T) { + // directory to store address books + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(t, err) + defer os.RemoveAll(dir) // nolint: errcheck + + // 1. create peer peer := p2p.MakeSwitch( cfg, 1, @@ -250,20 +245,34 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { r := NewPEXReactor( book, - &PEXReactorConfig{ - Seeds: []string{seed.NodeInfo().NetAddress().String()}, - }, + &PEXReactorConfig{}, ) r.SetLogger(log.TestingLogger()) sw.AddReactor("pex", r) return sw }, ) + peer.AddListener( + p2p.NewDefaultListener("tcp://"+peer.NodeInfo().ListenAddr, "", false, log.TestingLogger()), + ) require.Nil(t, peer.Start()) defer peer.Stop() - // 3. check that the peer connects to seed immediately - assertPeersWithTimeout(t, []*p2p.Switch{peer}, 10*time.Millisecond, 3*time.Second, 1) + // 2. Create seed which knows about the peer + seed := testCreateSeed(dir, 2, []*p2p.NetAddress{peer.NodeInfo().NetAddress()}, []*p2p.NetAddress{peer.NodeInfo().NetAddress()}) + require.Nil(t, seed.Start()) + defer seed.Stop() + + // 3. create another peer with only seed configured. + secondPeer := testCreatePeerWithSeed(dir, 3, seed) + require.Nil(t, secondPeer.Start()) + defer secondPeer.Stop() + + // 4. check that the second peer connects to seed immediately + assertPeersWithTimeout(t, []*p2p.Switch{secondPeer}, 10*time.Millisecond, 3*time.Second, 1) + + // 5. check that the second peer connects to the first peer immediately + assertPeersWithTimeout(t, []*p2p.Switch{secondPeer}, 10*time.Millisecond, 1*time.Second, 2) } func TestPEXReactorCrawlStatus(t *testing.T) { @@ -401,6 +410,7 @@ func assertPeersWithTimeout( outbound, inbound, _ := s.NumPeers() if outbound+inbound < nPeers { allGood = false + break } } remaining -= checkPeriod @@ -417,14 +427,77 @@ func assertPeersWithTimeout( numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound) } t.Errorf( - "expected all switches to be connected to at least one peer (switches: %s)", - numPeersStr, + "expected all switches to be connected to at least %d peer(s) (switches: %s)", + nPeers, numPeersStr, ) return } } } +// Creates a seed which knows about the provided addresses / source address pairs. +// Starting and stopping the seed is left to the caller +func testCreateSeed(dir string, id int, knownAddrs, srcAddrs []*p2p.NetAddress) *p2p.Switch { + seed := p2p.MakeSwitch( + cfg, + id, + "127.0.0.1", + "123.123.123", + func(i int, sw *p2p.Switch) *p2p.Switch { + book := NewAddrBook(filepath.Join(dir, "addrbookSeed.json"), false) + book.SetLogger(log.TestingLogger()) + for j := 0; j < len(knownAddrs); j++ { + book.AddAddress(knownAddrs[j], srcAddrs[j]) + book.MarkGood(knownAddrs[j]) + } + sw.SetAddrBook(book) + + sw.SetLogger(log.TestingLogger()) + + r := NewPEXReactor(book, &PEXReactorConfig{}) + r.SetLogger(log.TestingLogger()) + sw.AddReactor("pex", r) + return sw + }, + ) + seed.AddListener( + p2p.NewDefaultListener("tcp://"+seed.NodeInfo().ListenAddr, "", false, log.TestingLogger()), + ) + return seed +} + +// Creates a peer which knows about the provided seed. +// Starting and stopping the peer is left to the caller +func testCreatePeerWithSeed(dir string, id int, seed *p2p.Switch) *p2p.Switch { + peer := p2p.MakeSwitch( + cfg, + id, + "127.0.0.1", + "123.123.123", + func(i int, sw *p2p.Switch) *p2p.Switch { + book := NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", id)), false) + book.SetLogger(log.TestingLogger()) + sw.SetAddrBook(book) + + sw.SetLogger(log.TestingLogger()) + + r := NewPEXReactor( + book, + &PEXReactorConfig{ + Seeds: []string{seed.NodeInfo().NetAddress().String()}, + }, + ) + r.SetLogger(log.TestingLogger()) + sw.AddReactor("pex", r) + return sw + }, + ) + peer.AddListener( + p2p.NewDefaultListener("tcp://"+peer.NodeInfo().ListenAddr, "", false, log.TestingLogger()), + ) + return peer +} + func createReactor(conf *PEXReactorConfig) (r *PEXReactor, book *addrBook) { // directory to store address book dir, err := ioutil.TempDir("", "pex_reactor")