Browse Source

p2p: Connect to peers from a seed node immediately (#2115)

This is to reduce wait times when initially connecting. This still runs checks
such as whether you still want additional peers.

A test case has been created, which fails if this change is not included.
pull/2132/head
Dev Ojha 6 years ago
committed by Alexander Simmerl
parent
commit
6fb2f44cc3
3 changed files with 121 additions and 37 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +19
    -9
      p2p/pex/pex_reactor.go
  3. +101
    -28
      p2p/pex/pex_reactor_test.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -20,6 +20,7 @@ IMPROVEMENTS:
- only process one block at a time to avoid starving
- [crypto] Switch hkdfchachapoly1305 to xchachapoly1305
- [common] bit array functions which take in another parameter are now thread safe
- [p2p] begin connecting to peers as soon a seed node provides them to you ([#2093](https://github.com/tendermint/tendermint/issues/2093))
BUG FIXES:
- [common] Safely handle cases where atomic write files already exist [#2109](https://github.com/tendermint/tendermint/issues/2109)


+ 19
- 9
p2p/pex/pex_reactor.go View File

@ -74,6 +74,8 @@ type PEXReactor struct {
requestsSent *cmn.CMap // ID->struct{}: unanswered send requests
lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us
seedAddrs []*p2p.NetAddress
attemptsToDial sync.Map // address (string) -> {number of attempts (int), last time dialed (time.Time)}
}
@ -120,10 +122,13 @@ func (r *PEXReactor) OnStart() error {
// return err if user provided a bad seed address
// or a host name that we cant resolve
if err := r.checkSeeds(); err != nil {
seedAddrs, err := r.checkSeeds()
if err != nil {
return err
}
r.seedAddrs = seedAddrs
// Check if this node should run
// in seed/crawler mode
if r.config.SeedMode {
@ -281,7 +286,6 @@ func (r *PEXReactor) RequestAddrs(p Peer) {
// request for this peer and deletes the open request.
// If there's no open request for the src peer, it returns an error.
func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
id := string(src.ID())
if !r.requestsSent.Has(id) {
return cmn.NewError("Received unsolicited pexAddrsMessage")
@ -297,6 +301,13 @@ func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
err := r.book.AddAddress(netAddr, srcAddr)
r.logErrAddrBook(err)
// If this address came from a seed node, try to connect to it without waiting.
for _, seedAddr := range r.seedAddrs {
if seedAddr.Equals(srcAddr) {
r.ensurePeers()
}
}
}
return nil
}
@ -468,18 +479,18 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) {
}
// check seed addresses are well formed
func (r *PEXReactor) checkSeeds() error {
func (r *PEXReactor) checkSeeds() ([]*p2p.NetAddress, error) {
lSeeds := len(r.config.Seeds)
if lSeeds == 0 {
return nil
return nil, nil
}
_, errs := p2p.NewNetAddressStrings(r.config.Seeds)
netAddrs, errs := p2p.NewNetAddressStrings(r.config.Seeds)
for _, err := range errs {
if err != nil {
return err
return nil, err
}
}
return nil
return netAddrs, nil
}
// randomly dial seeds until we connect to one or exhaust them
@ -488,13 +499,12 @@ func (r *PEXReactor) dialSeeds() {
if lSeeds == 0 {
return
}
seedAddrs, _ := p2p.NewNetAddressStrings(r.config.Seeds)
perm := cmn.RandPerm(lSeeds)
// perm := r.Switch.rng.Perm(lSeeds)
for _, i := range perm {
// dial a random seed
seedAddr := seedAddrs[i]
seedAddr := r.seedAddrs[i]
err := r.Switch.DialPeerWithAddress(seedAddr, false)
if err == nil {
return


+ 101
- 28
p2p/pex/pex_reactor_test.go View File

@ -211,31 +211,26 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
defer os.RemoveAll(dir) // nolint: errcheck
// 1. create seed
seed := p2p.MakeSwitch(
cfg,
0,
"127.0.0.1",
"123.123.123",
func(i int, sw *p2p.Switch) *p2p.Switch {
book := NewAddrBook(filepath.Join(dir, "addrbook0.json"), false)
book.SetLogger(log.TestingLogger())
sw.SetAddrBook(book)
sw.SetLogger(log.TestingLogger())
r := NewPEXReactor(book, &PEXReactorConfig{})
r.SetLogger(log.TestingLogger())
sw.AddReactor("pex", r)
return sw
},
)
seed.AddListener(
p2p.NewDefaultListener("tcp://"+seed.NodeInfo().ListenAddr, "", false, log.TestingLogger()),
)
seed := testCreateSeed(dir, 0, []*p2p.NetAddress{}, []*p2p.NetAddress{})
require.Nil(t, seed.Start())
defer seed.Stop()
// 2. create usual peer with only seed configured.
peer := testCreatePeerWithSeed(dir, 1, seed)
require.Nil(t, peer.Start())
defer peer.Stop()
// 3. check that the peer connects to seed immediately
assertPeersWithTimeout(t, []*p2p.Switch{peer}, 10*time.Millisecond, 3*time.Second, 1)
}
func TestConnectionSpeedForPeerReceivedFromSeed(t *testing.T) {
// directory to store address books
dir, err := ioutil.TempDir("", "pex_reactor")
require.Nil(t, err)
defer os.RemoveAll(dir) // nolint: errcheck
// 1. create peer
peer := p2p.MakeSwitch(
cfg,
1,
@ -250,20 +245,34 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
r := NewPEXReactor(
book,
&PEXReactorConfig{
Seeds: []string{seed.NodeInfo().NetAddress().String()},
},
&PEXReactorConfig{},
)
r.SetLogger(log.TestingLogger())
sw.AddReactor("pex", r)
return sw
},
)
peer.AddListener(
p2p.NewDefaultListener("tcp://"+peer.NodeInfo().ListenAddr, "", false, log.TestingLogger()),
)
require.Nil(t, peer.Start())
defer peer.Stop()
// 3. check that the peer connects to seed immediately
assertPeersWithTimeout(t, []*p2p.Switch{peer}, 10*time.Millisecond, 3*time.Second, 1)
// 2. Create seed which knows about the peer
seed := testCreateSeed(dir, 2, []*p2p.NetAddress{peer.NodeInfo().NetAddress()}, []*p2p.NetAddress{peer.NodeInfo().NetAddress()})
require.Nil(t, seed.Start())
defer seed.Stop()
// 3. create another peer with only seed configured.
secondPeer := testCreatePeerWithSeed(dir, 3, seed)
require.Nil(t, secondPeer.Start())
defer secondPeer.Stop()
// 4. check that the second peer connects to seed immediately
assertPeersWithTimeout(t, []*p2p.Switch{secondPeer}, 10*time.Millisecond, 3*time.Second, 1)
// 5. check that the second peer connects to the first peer immediately
assertPeersWithTimeout(t, []*p2p.Switch{secondPeer}, 10*time.Millisecond, 1*time.Second, 2)
}
func TestPEXReactorCrawlStatus(t *testing.T) {
@ -401,6 +410,7 @@ func assertPeersWithTimeout(
outbound, inbound, _ := s.NumPeers()
if outbound+inbound < nPeers {
allGood = false
break
}
}
remaining -= checkPeriod
@ -417,14 +427,77 @@ func assertPeersWithTimeout(
numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
}
t.Errorf(
"expected all switches to be connected to at least one peer (switches: %s)",
numPeersStr,
"expected all switches to be connected to at least %d peer(s) (switches: %s)",
nPeers, numPeersStr,
)
return
}
}
}
// Creates a seed which knows about the provided addresses / source address pairs.
// Starting and stopping the seed is left to the caller
func testCreateSeed(dir string, id int, knownAddrs, srcAddrs []*p2p.NetAddress) *p2p.Switch {
seed := p2p.MakeSwitch(
cfg,
id,
"127.0.0.1",
"123.123.123",
func(i int, sw *p2p.Switch) *p2p.Switch {
book := NewAddrBook(filepath.Join(dir, "addrbookSeed.json"), false)
book.SetLogger(log.TestingLogger())
for j := 0; j < len(knownAddrs); j++ {
book.AddAddress(knownAddrs[j], srcAddrs[j])
book.MarkGood(knownAddrs[j])
}
sw.SetAddrBook(book)
sw.SetLogger(log.TestingLogger())
r := NewPEXReactor(book, &PEXReactorConfig{})
r.SetLogger(log.TestingLogger())
sw.AddReactor("pex", r)
return sw
},
)
seed.AddListener(
p2p.NewDefaultListener("tcp://"+seed.NodeInfo().ListenAddr, "", false, log.TestingLogger()),
)
return seed
}
// Creates a peer which knows about the provided seed.
// Starting and stopping the peer is left to the caller
func testCreatePeerWithSeed(dir string, id int, seed *p2p.Switch) *p2p.Switch {
peer := p2p.MakeSwitch(
cfg,
id,
"127.0.0.1",
"123.123.123",
func(i int, sw *p2p.Switch) *p2p.Switch {
book := NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", id)), false)
book.SetLogger(log.TestingLogger())
sw.SetAddrBook(book)
sw.SetLogger(log.TestingLogger())
r := NewPEXReactor(
book,
&PEXReactorConfig{
Seeds: []string{seed.NodeInfo().NetAddress().String()},
},
)
r.SetLogger(log.TestingLogger())
sw.AddReactor("pex", r)
return sw
},
)
peer.AddListener(
p2p.NewDefaultListener("tcp://"+peer.NodeInfo().ListenAddr, "", false, log.TestingLogger()),
)
return peer
}
func createReactor(conf *PEXReactorConfig) (r *PEXReactor, book *addrBook) {
// directory to store address book
dir, err := ioutil.TempDir("", "pex_reactor")


Loading…
Cancel
Save