diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 93fddc111..7313f7d56 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "reflect" + "sort" "time" "github.com/pkg/errors" @@ -334,6 +335,273 @@ func (r *PEXReactor) checkSeeds() error { return nil } +// Explores the network searching for more peers. (continuous) +// Seed/Crawler Mode causes this node to quickly disconnect +// from peers, except other seed nodes. +func (r *PEXReactor) seedCrawlerMode() { + // Do an initial crawl + r.crawlPeers() + + // Fire periodically + ticker := time.NewTicker(defaultSeedModePeriod) + + for { + select { + case <-ticker.C: + r.attemptDisconnects() + r.crawlPeers() + case <-r.Quit: + return + } + } +} + +// crawlStatus handles temporary data needed for the +// network crawling performed during seed/crawler mode. +type crawlStatus struct { + // The remote address of a potential peer we learned about + Addr *NetAddress + + // Not empty if we are connected to the address + PeerID string + + // The last time we attempt to reach this address + LastAttempt time.Time + + // The last time we successfully reached this address + LastSuccess time.Time +} + +// oldestFirst implements sort.Interface for []crawlStatus +// based on the LastAttempt field. +type oldestFirst []crawlStatus + +func (of oldestFirst) Len() int { return len(of) } +func (of oldestFirst) Swap(i, j int) { of[i], of[j] = of[j], of[i] } +func (of oldestFirst) Less(i, j int) bool { return of[i].LastAttempt.Before(of[j].LastAttempt) } + +// getCrawlStatus returns addresses of potential peers that we wish to validate. +// NOTE: The status information is ordered as described above. +func (r *PEXReactor) getCrawlStatus() []crawlStatus { + var of oldestFirst + + addrs := r.book.ListOfKnownAddresses() + // Go through all the addresses in the AddressBook + for _, addr := range addrs { + var peerID string + + // Check if a peer is already connected from this addr + if p := r.Switch.peers.GetByRemoteAddr(addr.Addr); p != nil { + peerID = p.Key() + } + + of = append(of, crawlStatus{ + Addr: addr.Addr, + PeerID: peerID, + LastAttempt: addr.LastAttempt, + LastSuccess: addr.LastSuccess, + }) + } + sort.Sort(of) + return of +} + +// crawlPeers will crawl the network looking for new peer addresses. (once) +// +// TODO Basically, we need to work harder on our good-peer/bad-peer marking. +// What we're currently doing in terms of marking good/bad peers is just a +// placeholder. It should not be the case that an address becomes old/vetted +// upon a single successful connection. +func (r *PEXReactor) crawlPeers() { + crawlerStatus := r.getCrawlStatus() + + now := time.Now() + // Use addresses we know of to reach additional peers + for _, cs := range crawlerStatus { + // Do not dial peers that are already connected + if cs.PeerID != "" { + continue + } + // Do not attempt to connect with peers we recently dialed + if now.Sub(cs.LastAttempt) < defaultCrawlPeerInterval { + continue + } + // Otherwise, attempt to connect with the known address + p, err := r.Switch.DialPeerWithAddress(cs.Addr, false) + if err != nil { + r.book.MarkAttempt(cs.Addr) + continue + } + // Enter the peer ID into our crawl status information + cs.PeerID = p.Key() + r.book.MarkGood(cs.Addr) + } + // Crawl the connected peers asking for more addresses + for _, cs := range crawlerStatus { + if cs.PeerID == "" { + continue + } + // We will wait a minimum period of time before crawling peers again + if now.Sub(cs.LastAttempt) >= defaultCrawlPeerInterval { + p := r.Switch.Peers().Get(cs.PeerID) + if p != nil { + r.RequestPEX(p) + r.book.MarkAttempt(cs.Addr) + } + } + } +} + +// attemptDisconnects checks the crawlStatus info for Peers to disconnect from. (once) +func (r *PEXReactor) attemptDisconnects() { + crawlerStatus := r.getCrawlStatus() + + now := time.Now() + // Go through each peer we have connected with + // looking for opportunities to disconnect + for _, cs := range crawlerStatus { + if cs.PeerID == "" { + continue + } + // Remain connected to each peer for a minimum period of time + if now.Sub(cs.LastSuccess) < defaultSeedDisconnectWaitPeriod { + continue + } + // Fetch the Peer using the saved ID + p := r.Switch.Peers().Get(cs.PeerID) + if p == nil { + continue + } + // Do not disconnect from persistent peers. + // Specifically, we need to remain connected to other seeds + if p.IsPersistent() { + continue + } + // Otherwise, disconnect from the peer + r.Switch.StopPeerGracefully(p) + } +} + +// crawlStatus handles temporary data needed for the +// network crawling performed during seed/crawler mode. +type crawlStatus struct { + // The remote address of a potential peer we learned about + Addr *NetAddress + + // Not empty if we are connected to the address + PeerID string + + // The last time we attempt to reach this address + LastAttempt time.Time + + // The last time we successfully reached this address + LastSuccess time.Time +} + +// oldestAttempt implements sort.Interface for []crawlStatus +// based on the LastAttempt field. +type oldestAttempt []crawlStatus + +func (oa oldestAttempt) Len() int { return len(oa) } +func (oa oldestAttempt) Swap(i, j int) { oa[i], oa[j] = oa[j], oa[i] } +func (oa oldestAttempt) Less(i, j int) bool { return oa[i].LastAttempt.Before(oa[j].LastAttempt) } + +// getCrawlStatus returns addresses of potential peers that we wish to validate. +// NOTE: The status information is ordered as described above. +func (r *PEXReactor) getCrawlStatus() []crawlStatus { + var oa oldestAttempt + + addrs := r.book.ListOfKnownAddresses() + // Go through all the addresses in the AddressBook + for _, addr := range addrs { + p := r.Switch.peers.GetByRemoteAddr(addr.Addr) + + oa = append(oa, crawlStatus{ + Addr: addr.Addr, + PeerID: p.Key(), + LastAttempt: addr.LastAttempt, + LastSuccess: addr.LastSuccess, + }) + } + sort.Sort(oa) + return oa +} + +// crawlPeers will crawl the network looking for new peer addresses. (once) +// +// TODO Basically, we need to work harder on our good-peer/bad-peer marking. +// What we're currently doing in terms of marking good/bad peers is just a +// placeholder. It should not be the case that an address becomes old/vetted +// upon a single successful connection. +func (r *PEXReactor) crawlPeers() { + crawlerStatus := r.getCrawlStatus() + + now := time.Now() + // Use addresses we know of to reach additional peers + for _, cs := range crawlerStatus { + // Do not dial peers that are already connected + if cs.PeerID != "" { + continue + } + // Do not attempt to connect with peers we recently dialed + if now.Sub(cs.LastAttempt) < defaultCrawlPeerInterval { + continue + } + // Otherwise, attempt to connect with the known address + p, err := r.Switch.DialPeerWithAddress(cs.Addr, false) + if err != nil { + r.book.MarkAttempt(cs.Addr) + continue + } + // Enter the peer ID into our crawl status information + cs.PeerID = p.Key() + r.book.MarkGood(cs.Addr) + } + // Crawl the connected peers asking for more addresses + for _, cs := range crawlerStatus { + if cs.PeerID == "" { + continue + } + // We will wait a minimum period of time before crawling peers again + if now.Sub(cs.LastAttempt) >= defaultCrawlPeerInterval { + p := r.Switch.peers.Get(cs.PeerID) + if p != nil { + r.RequestPEX(p) + } + } + } +} + +// attemptDisconnects checks the crawlStatus info for Peers to disconnect from. (once) +func (r *PEXReactor) attemptDisconnects() { + crawlerStatus := r.getCrawlStatus() + + now := time.Now() + // Go through each peer we have connected with + // looking for opportunities to disconnect + for _, cs := range crawlerStatus { + if cs.PeerID == "" { + continue + } + // Remain connected to each peer for a minimum period of time + if now.Sub(cs.LastSuccess) < defaultSeedDisconnectWaitPeriod { + continue + } + // Fetch the Peer using the saved ID + p := r.Switch.peers.Get(cs.PeerID) + if p == nil { + continue + } + // Do not disconnect from persistent peers. + // Specifically, we need to remain connected to other seeds + if p.IsPersistent() { + continue + } + // Otherwise, disconnect from the peer + r.Switch.StopPeerGracefully(p) + } +} + // randomly dial seeds until we connect to one or exhaust them func (r *PEXReactor) dialSeed() { lSeeds := len(r.config.Seeds) diff --git a/p2p/pex_reactor_test.go b/p2p/pex_reactor_test.go index c0681586a..b8ee89b32 100644 --- a/p2p/pex_reactor_test.go +++ b/p2p/pex_reactor_test.go @@ -286,6 +286,51 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { assertSomePeersWithTimeout(t, []*Switch{sw}, 10*time.Millisecond, 10*time.Second) } +func TestPEXReactorCrawlStatus(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(err) + defer os.RemoveAll(dir) // nolint: errcheck + book := NewAddrBook(dir+"addrbook.json", false) + book.SetLogger(log.TestingLogger()) + + var r *PEXReactor + // Seed/Crawler mode uses data from the Switch + makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { + r = NewPEXReactor(book, true) + r.SetLogger(log.TestingLogger()) + sw.SetLogger(log.TestingLogger().With("switch", i)) + sw.AddReactor("pex", r) + return sw + }) + + // Create a peer, and add it to the peer set + peer := createRandomPeer(false) + r.Switch.peers.Add(peer) + // Add the peer address to the address book + addr1, _ := NewNetAddressString(peer.NodeInfo().ListenAddr) + r.book.AddAddress(addr1, addr1) + // Add an address to the book that does not have a peer + _, addr2 := createRoutableAddr() + r.book.AddAddress(addr2, addr1) + + // Get the crawl status data + status := r.getCrawlStatus() + + // Make sure it has the proper number of elements + assert.Equal(2, len(status)) + + var num int + for _, cs := range status { + if cs.PeerID != "" { + num++ + } + } + // Check that only one has been identified as a connected peer + assert.Equal(1, num) +} + func createRoutableAddr() (addr string, netAddr *NetAddress) { for { addr = cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256)