diff --git a/p2p/connection.go b/p2p/connection.go index 306eaf7eb..dcb660967 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -88,6 +88,8 @@ type MConnection struct { flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. pingTimer *cmn.RepeatTimer // send pings periodically chStatsTimer *cmn.RepeatTimer // update channel stats periodically + + created time.Time // time of creation } // MConnConfig is a MConnection configuration. @@ -502,6 +504,7 @@ FOR_LOOP: } type ConnectionStatus struct { + Duration time.Duration SendMonitor flow.Status RecvMonitor flow.Status Channels []ChannelStatus @@ -517,6 +520,7 @@ type ChannelStatus struct { func (c *MConnection) Status() ConnectionStatus { var status ConnectionStatus + status.Duration = time.Since(c.created) status.SendMonitor = c.sendMonitor.Status() status.RecvMonitor = c.recvMonitor.Status() status.Channels = make([]ChannelStatus, len(c.channels)) diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 0c3567a35..5d9194213 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -17,15 +17,22 @@ const ( // PexChannel is a channel for PEX messages PexChannel = byte(0x00) - // period to ensure peers connected - defaultEnsurePeersPeriod = 30 * time.Second - minNumOutboundPeers = 10 - maxPexMessageSize = 1048576 // 1MB + maxPexMessageSize = 1048576 // 1MB + + // ensure we have enough peers + defaultEnsurePeersPeriod = 30 * time.Second + defaultMinNumOutboundPeers = 10 // Seed/Crawler constants - defaultSeedDisconnectWaitPeriod = 2 * time.Minute - defaultCrawlPeerInterval = 2 * time.Minute - defaultCrawlPeersPeriod = 30 * time.Second + // TODO: + // We want seeds to only advertise good peers. + // Peers are marked by external mechanisms. + // We need a config value that can be set to be + // on the order of how long it would take before a good + // peer is marked good. + defaultSeedDisconnectWaitPeriod = 2 * time.Minute // disconnect after this + defaultCrawlPeerInterval = 2 * time.Minute // dont redial for this. TODO: back-off + defaultCrawlPeersPeriod = 30 * time.Second // check some peers every this ) // PEXReactor handles PEX (peer exchange) and ensures that an @@ -51,8 +58,11 @@ type PEXReactor struct { // PEXReactorConfig holds reactor specific configuration data. type PEXReactorConfig struct { - // Seeds is a list of addresses reactor may use if it can't connect to peers - // in the addrbook. + // Seed/Crawler mode + SeedMode bool + + // Seeds is a list of addresses reactor may use + // if it can't connect to peers in the addrbook. Seeds []string } @@ -259,19 +269,12 @@ func (r *PEXReactor) ensurePeersRoutine() { // ensurePeers ensures that sufficient peers are connected. (once) // -// Old bucket / New bucket are arbitrary categories to denote whether an -// address is vetted or not, and this needs to be determined over time via a // heuristic that we haven't perfected yet, or, perhaps is manually edited by // the node operator. It should not be used to compute what addresses are // already connected or not. -// -// TODO Basically, we need to work harder on our good-peer/bad-peer marking. -// What we're currently doing in terms of marking good/bad peers is just a -// placeholder. It should not be the case that an address becomes old/vetted -// upon a single successful connection. func (r *PEXReactor) ensurePeers() { numOutPeers, numInPeers, numDialing := r.Switch.NumPeers() - numToDial := minNumOutboundPeers - (numOutPeers + numDialing) + numToDial := defaultMinNumOutboundPeers - (numOutPeers + numDialing) r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial) if numToDial <= 0 { return @@ -327,7 +330,7 @@ func (r *PEXReactor) ensurePeers() { // If we are not connected to nor dialing anybody, fallback to dialing a seed. if numOutPeers+numInPeers+numDialing+len(toDial) == 0 { r.Logger.Info("No addresses to dial nor connected peers. Falling back to seeds") - r.dialSeed() + r.dialSeeds() } } @@ -346,6 +349,31 @@ func (r *PEXReactor) checkSeeds() error { return nil } +// randomly dial seeds until we connect to one or exhaust them +func (r *PEXReactor) dialSeeds() { + lSeeds := len(r.config.Seeds) + if lSeeds == 0 { + return + } + seedAddrs, _ := NewNetAddressStrings(r.config.Seeds) + + perm := r.Switch.rng.Perm(lSeeds) + for _, i := range perm { + // dial a random seed + seedAddr := seedAddrs[i] + peer, err := r.Switch.DialPeerWithAddress(seedAddr, false) + if err != nil { + r.Switch.Logger.Error("Error dialing seed", "err", err, "seed", seedAddr) + } else { + r.Switch.Logger.Info("Connected to seed", "peer", peer) + return + } + } + r.Switch.Logger.Error("Couldn't connect to any seeds") +} + +//---------------------------------------------------------- + // Explores the network searching for more peers. (continuous) // Seed/Crawler Mode causes this node to quickly disconnect // from peers, except other seed nodes. @@ -354,7 +382,7 @@ func (r *PEXReactor) crawlPeersRoutine() { r.crawlPeers() // Fire periodically - ticker := time.NewTicker(defaultSeedModePeriod) + ticker := time.NewTicker(defaultCrawlPeersPeriod) for { select { @@ -367,15 +395,12 @@ func (r *PEXReactor) crawlPeersRoutine() { } } -// crawlStatus handles temporary data needed for the +// crawlPeerInfo handles temporary data needed for the // network crawling performed during seed/crawler mode. -type crawlStatus struct { - // The remote address of a potential peer we learned about +type crawlPeerInfo struct { + // The listening address of a potential peer we learned about Addr *NetAddress - // Not empty if we are connected to the address - PeerID string - // The last time we attempt to reach this address LastAttempt time.Time @@ -383,32 +408,28 @@ type crawlStatus struct { LastSuccess time.Time } -// oldestFirst implements sort.Interface for []crawlStatus +// oldestFirst implements sort.Interface for []crawlPeerInfo // based on the LastAttempt field. -type oldestFirst []crawlStatus +type oldestFirst []crawlPeerInfo func (of oldestFirst) Len() int { return len(of) } func (of oldestFirst) Swap(i, j int) { of[i], of[j] = of[j], of[i] } func (of oldestFirst) Less(i, j int) bool { return of[i].LastAttempt.Before(of[j].LastAttempt) } -// getCrawlStatus returns addresses of potential peers that we wish to validate. +// getPeersToCrawl returns addresses of potential peers that we wish to validate. // NOTE: The status information is ordered as described above. -func (r *PEXReactor) getCrawlStatus() []crawlStatus { +func (r *PEXReactor) getPeersToCrawl() []crawlPeerInfo { var of oldestFirst + // TODO: not this. be more selective addrs := r.book.ListOfKnownAddresses() - // Go through all the addresses in the AddressBook for _, addr := range addrs { - var peerID string - - // Check if a peer is already connected from this addr - if p := r.Switch.peers.GetByRemoteAddr(addr.Addr); p != nil { - peerID = p.Key() + if len(addr.ID()) == 0 { + continue // dont use peers without id } - of = append(of, crawlStatus{ + of = append(of, crawlPeerInfo{ Addr: addr.Addr, - PeerID: peerID, LastAttempt: addr.LastAttempt, LastSuccess: addr.LastSuccess, }) @@ -418,222 +439,47 @@ func (r *PEXReactor) getCrawlStatus() []crawlStatus { } // crawlPeers will crawl the network looking for new peer addresses. (once) -// -// TODO Basically, we need to work harder on our good-peer/bad-peer marking. -// What we're currently doing in terms of marking good/bad peers is just a -// placeholder. It should not be the case that an address becomes old/vetted -// upon a single successful connection. -func (r *PEXReactor) crawlPeers() { - crawlerStatus := r.getCrawlStatus() - - now := time.Now() - // Use addresses we know of to reach additional peers - for _, cs := range crawlerStatus { - // Do not dial peers that are already connected - if cs.PeerID != "" { - continue - } - // Do not attempt to connect with peers we recently dialed - if now.Sub(cs.LastAttempt) < defaultCrawlPeerInterval { - continue - } - // Otherwise, attempt to connect with the known address - p, err := r.Switch.DialPeerWithAddress(cs.Addr, false) - if err != nil { - r.book.MarkAttempt(cs.Addr) - continue - } - // Enter the peer ID into our crawl status information - cs.PeerID = p.Key() - r.book.MarkGood(cs.Addr) - } - // Crawl the connected peers asking for more addresses - for _, cs := range crawlerStatus { - if cs.PeerID == "" { - continue - } - // We will wait a minimum period of time before crawling peers again - if now.Sub(cs.LastAttempt) >= defaultCrawlPeerInterval { - p := r.Switch.Peers().Get(cs.PeerID) - if p != nil { - r.RequestPEX(p) - r.book.MarkAttempt(cs.Addr) - } - } - } -} - -// attemptDisconnects checks the crawlStatus info for Peers to disconnect from. (once) -func (r *PEXReactor) attemptDisconnects() { - crawlerStatus := r.getCrawlStatus() - - now := time.Now() - // Go through each peer we have connected with - // looking for opportunities to disconnect - for _, cs := range crawlerStatus { - if cs.PeerID == "" { - continue - } - // Remain connected to each peer for a minimum period of time - if now.Sub(cs.LastSuccess) < defaultSeedDisconnectWaitPeriod { - continue - } - // Fetch the Peer using the saved ID - p := r.Switch.Peers().Get(cs.PeerID) - if p == nil { - continue - } - // Do not disconnect from persistent peers. - // Specifically, we need to remain connected to other seeds - if p.IsPersistent() { - continue - } - // Otherwise, disconnect from the peer - r.Switch.StopPeerGracefully(p) - } -} - -// crawlStatus handles temporary data needed for the -// network crawling performed during seed/crawler mode. -type crawlStatus struct { - // The remote address of a potential peer we learned about - Addr *NetAddress - - // Not empty if we are connected to the address - PeerID string - - // The last time we attempt to reach this address - LastAttempt time.Time - - // The last time we successfully reached this address - LastSuccess time.Time -} - -// oldestAttempt implements sort.Interface for []crawlStatus -// based on the LastAttempt field. -type oldestAttempt []crawlStatus - -func (oa oldestAttempt) Len() int { return len(oa) } -func (oa oldestAttempt) Swap(i, j int) { oa[i], oa[j] = oa[j], oa[i] } -func (oa oldestAttempt) Less(i, j int) bool { return oa[i].LastAttempt.Before(oa[j].LastAttempt) } - -// getCrawlStatus returns addresses of potential peers that we wish to validate. -// NOTE: The status information is ordered as described above. -func (r *PEXReactor) getCrawlStatus() []crawlStatus { - var oa oldestAttempt - - addrs := r.book.ListOfKnownAddresses() - // Go through all the addresses in the AddressBook - for _, addr := range addrs { - p := r.Switch.peers.GetByRemoteAddr(addr.Addr) - - oa = append(oa, crawlStatus{ - Addr: addr.Addr, - PeerID: p.Key(), - LastAttempt: addr.LastAttempt, - LastSuccess: addr.LastSuccess, - }) - } - sort.Sort(oa) - return oa -} - -// crawlPeers will crawl the network looking for new peer addresses. (once) -// -// TODO Basically, we need to work harder on our good-peer/bad-peer marking. -// What we're currently doing in terms of marking good/bad peers is just a -// placeholder. It should not be the case that an address becomes old/vetted -// upon a single successful connection. func (r *PEXReactor) crawlPeers() { - crawlerStatus := r.getCrawlStatus() + peerInfos := r.getPeersToCrawl() now := time.Now() // Use addresses we know of to reach additional peers - for _, cs := range crawlerStatus { - // Do not dial peers that are already connected - if cs.PeerID != "" { - continue - } + for _, pi := range peerInfos { // Do not attempt to connect with peers we recently dialed - if now.Sub(cs.LastAttempt) < defaultCrawlPeerInterval { + if now.Sub(pi.LastAttempt) < defaultCrawlPeerInterval { continue } // Otherwise, attempt to connect with the known address - p, err := r.Switch.DialPeerWithAddress(cs.Addr, false) + _, err := r.Switch.DialPeerWithAddress(pi.Addr, false) if err != nil { - r.book.MarkAttempt(cs.Addr) + r.book.MarkAttempt(pi.Addr) continue } - // Enter the peer ID into our crawl status information - cs.PeerID = p.Key() - r.book.MarkGood(cs.Addr) } // Crawl the connected peers asking for more addresses - for _, cs := range crawlerStatus { - if cs.PeerID == "" { - continue - } + for _, pi := range peerInfos { // We will wait a minimum period of time before crawling peers again - if now.Sub(cs.LastAttempt) >= defaultCrawlPeerInterval { - p := r.Switch.peers.Get(cs.PeerID) - if p != nil { - r.RequestPEX(p) + if now.Sub(pi.LastAttempt) >= defaultCrawlPeerInterval { + peer := r.Switch.Peers().Get(pi.Addr.ID) + if peer != nil { + r.RequestPEX(peer) } } } } -// attemptDisconnects checks the crawlStatus info for Peers to disconnect from. (once) +// attemptDisconnects checks if we've been with each peer long enough to disconnect func (r *PEXReactor) attemptDisconnects() { - crawlerStatus := r.getCrawlStatus() - - now := time.Now() - // Go through each peer we have connected with - // looking for opportunities to disconnect - for _, cs := range crawlerStatus { - if cs.PeerID == "" { - continue - } - // Remain connected to each peer for a minimum period of time - if now.Sub(cs.LastSuccess) < defaultSeedDisconnectWaitPeriod { - continue - } - // Fetch the Peer using the saved ID - p := r.Switch.peers.Get(cs.PeerID) - if p == nil { + for _, peer := range r.Switch.Peers().List() { + status := peer.Status() + if status.Duration < defaultSeedDisconnectWaitPeriod { continue } - // Do not disconnect from persistent peers. - // Specifically, we need to remain connected to other seeds - if p.IsPersistent() { + if peer.IsPersistent() { continue } - // Otherwise, disconnect from the peer - r.Switch.StopPeerGracefully(p) - } -} - -// randomly dial seeds until we connect to one or exhaust them -func (r *PEXReactor) dialSeed() { - lSeeds := len(r.config.Seeds) - if lSeeds == 0 { - return + r.Switch.StopPeerGracefully(peer) } - seedAddrs, _ := NewNetAddressStrings(r.config.Seeds) - - perm := r.Switch.rng.Perm(lSeeds) - for _, i := range perm { - // dial a random seed - seedAddr := seedAddrs[i] - peer, err := r.Switch.DialPeerWithAddress(seedAddr, false) - if err != nil { - r.Switch.Logger.Error("Error dialing seed", "err", err, "seed", seedAddr) - } else { - r.Switch.Logger.Info("Connected to seed", "peer", peer) - return - } - } - r.Switch.Logger.Error("Couldn't connect to any seeds") } //----------------------------------------------------------------------------- diff --git a/p2p/pex_reactor_test.go b/p2p/pex_reactor_test.go index b8ee89b32..91e30fea2 100644 --- a/p2p/pex_reactor_test.go +++ b/p2p/pex_reactor_test.go @@ -295,46 +295,42 @@ func TestPEXReactorCrawlStatus(t *testing.T) { book := NewAddrBook(dir+"addrbook.json", false) book.SetLogger(log.TestingLogger()) - var r *PEXReactor + pexR := NewPEXReactor(book, &PEXReactorConfig{SeedMode: true}) // Seed/Crawler mode uses data from the Switch makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { - r = NewPEXReactor(book, true) - r.SetLogger(log.TestingLogger()) + pexR.SetLogger(log.TestingLogger()) sw.SetLogger(log.TestingLogger().With("switch", i)) - sw.AddReactor("pex", r) + sw.AddReactor("pex", pexR) return sw }) - // Create a peer, and add it to the peer set + // Create a peer, add it to the peer set and the addrbook. peer := createRandomPeer(false) - r.Switch.peers.Add(peer) - // Add the peer address to the address book - addr1, _ := NewNetAddressString(peer.NodeInfo().ListenAddr) - r.book.AddAddress(addr1, addr1) - // Add an address to the book that does not have a peer + pexR.Switch.peers.Add(peer) + addr1 := peer.NodeInfo().NetAddress() + pexR.book.AddAddress(addr1, addr1) + + // Add a non-connected address to the book. _, addr2 := createRoutableAddr() - r.book.AddAddress(addr2, addr1) + pexR.book.AddAddress(addr2, addr1) - // Get the crawl status data - status := r.getCrawlStatus() + // Get some peerInfos to crawl + peerInfos := pexR.getPeersToCrawl() // Make sure it has the proper number of elements - assert.Equal(2, len(status)) + assert.Equal(2, len(peerInfos)) - var num int - for _, cs := range status { - if cs.PeerID != "" { - num++ - } - } - // Check that only one has been identified as a connected peer - assert.Equal(1, num) + // TODO: test } func createRoutableAddr() (addr string, netAddr *NetAddress) { for { - addr = cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) - netAddr, _ = NewNetAddressString(addr) + var err error + addr = cmn.Fmt("%X@%v.%v.%v.%v:46656", cmn.RandBytes(20), rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) + netAddr, err = NewNetAddressString(addr) + if err != nil { + panic(err) + } if netAddr.Routable() { break } @@ -346,7 +342,7 @@ func createRandomPeer(outbound bool) *peer { addr, netAddr := createRoutableAddr() p := &peer{ nodeInfo: NodeInfo{ - ListenAddr: netAddr.String(), + ListenAddr: netAddr.DialString(), PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(), }, outbound: outbound,