Browse Source

p2p: seed mode fixes from rebase and review

pull/1128/head
Ethan Buchman 7 years ago
parent
commit
c2f97e6454
3 changed files with 99 additions and 253 deletions
  1. +4
    -0
      p2p/connection.go
  2. +74
    -228
      p2p/pex_reactor.go
  3. +21
    -25
      p2p/pex_reactor_test.go

+ 4
- 0
p2p/connection.go View File

@ -88,6 +88,8 @@ type MConnection struct {
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
pingTimer *cmn.RepeatTimer // send pings periodically
chStatsTimer *cmn.RepeatTimer // update channel stats periodically
created time.Time // time of creation
}
// MConnConfig is a MConnection configuration.
@ -502,6 +504,7 @@ FOR_LOOP:
}
type ConnectionStatus struct {
Duration time.Duration
SendMonitor flow.Status
RecvMonitor flow.Status
Channels []ChannelStatus
@ -517,6 +520,7 @@ type ChannelStatus struct {
func (c *MConnection) Status() ConnectionStatus {
var status ConnectionStatus
status.Duration = time.Since(c.created)
status.SendMonitor = c.sendMonitor.Status()
status.RecvMonitor = c.recvMonitor.Status()
status.Channels = make([]ChannelStatus, len(c.channels))


+ 74
- 228
p2p/pex_reactor.go View File

@ -17,15 +17,22 @@ const (
// PexChannel is a channel for PEX messages
PexChannel = byte(0x00)
// period to ensure peers connected
defaultEnsurePeersPeriod = 30 * time.Second
minNumOutboundPeers = 10
maxPexMessageSize = 1048576 // 1MB
maxPexMessageSize = 1048576 // 1MB
// ensure we have enough peers
defaultEnsurePeersPeriod = 30 * time.Second
defaultMinNumOutboundPeers = 10
// Seed/Crawler constants
defaultSeedDisconnectWaitPeriod = 2 * time.Minute
defaultCrawlPeerInterval = 2 * time.Minute
defaultCrawlPeersPeriod = 30 * time.Second
// TODO:
// We want seeds to only advertise good peers.
// Peers are marked by external mechanisms.
// We need a config value that can be set to be
// on the order of how long it would take before a good
// peer is marked good.
defaultSeedDisconnectWaitPeriod = 2 * time.Minute // disconnect after this
defaultCrawlPeerInterval = 2 * time.Minute // dont redial for this. TODO: back-off
defaultCrawlPeersPeriod = 30 * time.Second // check some peers every this
)
// PEXReactor handles PEX (peer exchange) and ensures that an
@ -51,8 +58,11 @@ type PEXReactor struct {
// PEXReactorConfig holds reactor specific configuration data.
type PEXReactorConfig struct {
// Seeds is a list of addresses reactor may use if it can't connect to peers
// in the addrbook.
// Seed/Crawler mode
SeedMode bool
// Seeds is a list of addresses reactor may use
// if it can't connect to peers in the addrbook.
Seeds []string
}
@ -259,19 +269,12 @@ func (r *PEXReactor) ensurePeersRoutine() {
// ensurePeers ensures that sufficient peers are connected. (once)
//
// Old bucket / New bucket are arbitrary categories to denote whether an
// address is vetted or not, and this needs to be determined over time via a
// heuristic that we haven't perfected yet, or, perhaps is manually edited by
// the node operator. It should not be used to compute what addresses are
// already connected or not.
//
// TODO Basically, we need to work harder on our good-peer/bad-peer marking.
// What we're currently doing in terms of marking good/bad peers is just a
// placeholder. It should not be the case that an address becomes old/vetted
// upon a single successful connection.
func (r *PEXReactor) ensurePeers() {
numOutPeers, numInPeers, numDialing := r.Switch.NumPeers()
numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
numToDial := defaultMinNumOutboundPeers - (numOutPeers + numDialing)
r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
if numToDial <= 0 {
return
@ -327,7 +330,7 @@ func (r *PEXReactor) ensurePeers() {
// If we are not connected to nor dialing anybody, fallback to dialing a seed.
if numOutPeers+numInPeers+numDialing+len(toDial) == 0 {
r.Logger.Info("No addresses to dial nor connected peers. Falling back to seeds")
r.dialSeed()
r.dialSeeds()
}
}
@ -346,6 +349,31 @@ func (r *PEXReactor) checkSeeds() error {
return nil
}
// randomly dial seeds until we connect to one or exhaust them
func (r *PEXReactor) dialSeeds() {
lSeeds := len(r.config.Seeds)
if lSeeds == 0 {
return
}
seedAddrs, _ := NewNetAddressStrings(r.config.Seeds)
perm := r.Switch.rng.Perm(lSeeds)
for _, i := range perm {
// dial a random seed
seedAddr := seedAddrs[i]
peer, err := r.Switch.DialPeerWithAddress(seedAddr, false)
if err != nil {
r.Switch.Logger.Error("Error dialing seed", "err", err, "seed", seedAddr)
} else {
r.Switch.Logger.Info("Connected to seed", "peer", peer)
return
}
}
r.Switch.Logger.Error("Couldn't connect to any seeds")
}
//----------------------------------------------------------
// Explores the network searching for more peers. (continuous)
// Seed/Crawler Mode causes this node to quickly disconnect
// from peers, except other seed nodes.
@ -354,7 +382,7 @@ func (r *PEXReactor) crawlPeersRoutine() {
r.crawlPeers()
// Fire periodically
ticker := time.NewTicker(defaultSeedModePeriod)
ticker := time.NewTicker(defaultCrawlPeersPeriod)
for {
select {
@ -367,15 +395,12 @@ func (r *PEXReactor) crawlPeersRoutine() {
}
}
// crawlStatus handles temporary data needed for the
// crawlPeerInfo handles temporary data needed for the
// network crawling performed during seed/crawler mode.
type crawlStatus struct {
// The remote address of a potential peer we learned about
type crawlPeerInfo struct {
// The listening address of a potential peer we learned about
Addr *NetAddress
// Not empty if we are connected to the address
PeerID string
// The last time we attempt to reach this address
LastAttempt time.Time
@ -383,32 +408,28 @@ type crawlStatus struct {
LastSuccess time.Time
}
// oldestFirst implements sort.Interface for []crawlStatus
// oldestFirst implements sort.Interface for []crawlPeerInfo
// based on the LastAttempt field.
type oldestFirst []crawlStatus
type oldestFirst []crawlPeerInfo
func (of oldestFirst) Len() int { return len(of) }
func (of oldestFirst) Swap(i, j int) { of[i], of[j] = of[j], of[i] }
func (of oldestFirst) Less(i, j int) bool { return of[i].LastAttempt.Before(of[j].LastAttempt) }
// getCrawlStatus returns addresses of potential peers that we wish to validate.
// getPeersToCrawl returns addresses of potential peers that we wish to validate.
// NOTE: The status information is ordered as described above.
func (r *PEXReactor) getCrawlStatus() []crawlStatus {
func (r *PEXReactor) getPeersToCrawl() []crawlPeerInfo {
var of oldestFirst
// TODO: not this. be more selective
addrs := r.book.ListOfKnownAddresses()
// Go through all the addresses in the AddressBook
for _, addr := range addrs {
var peerID string
// Check if a peer is already connected from this addr
if p := r.Switch.peers.GetByRemoteAddr(addr.Addr); p != nil {
peerID = p.Key()
if len(addr.ID()) == 0 {
continue // dont use peers without id
}
of = append(of, crawlStatus{
of = append(of, crawlPeerInfo{
Addr: addr.Addr,
PeerID: peerID,
LastAttempt: addr.LastAttempt,
LastSuccess: addr.LastSuccess,
})
@ -418,222 +439,47 @@ func (r *PEXReactor) getCrawlStatus() []crawlStatus {
}
// crawlPeers will crawl the network looking for new peer addresses. (once)
//
// TODO Basically, we need to work harder on our good-peer/bad-peer marking.
// What we're currently doing in terms of marking good/bad peers is just a
// placeholder. It should not be the case that an address becomes old/vetted
// upon a single successful connection.
func (r *PEXReactor) crawlPeers() {
crawlerStatus := r.getCrawlStatus()
now := time.Now()
// Use addresses we know of to reach additional peers
for _, cs := range crawlerStatus {
// Do not dial peers that are already connected
if cs.PeerID != "" {
continue
}
// Do not attempt to connect with peers we recently dialed
if now.Sub(cs.LastAttempt) < defaultCrawlPeerInterval {
continue
}
// Otherwise, attempt to connect with the known address
p, err := r.Switch.DialPeerWithAddress(cs.Addr, false)
if err != nil {
r.book.MarkAttempt(cs.Addr)
continue
}
// Enter the peer ID into our crawl status information
cs.PeerID = p.Key()
r.book.MarkGood(cs.Addr)
}
// Crawl the connected peers asking for more addresses
for _, cs := range crawlerStatus {
if cs.PeerID == "" {
continue
}
// We will wait a minimum period of time before crawling peers again
if now.Sub(cs.LastAttempt) >= defaultCrawlPeerInterval {
p := r.Switch.Peers().Get(cs.PeerID)
if p != nil {
r.RequestPEX(p)
r.book.MarkAttempt(cs.Addr)
}
}
}
}
// attemptDisconnects checks the crawlStatus info for Peers to disconnect from. (once)
func (r *PEXReactor) attemptDisconnects() {
crawlerStatus := r.getCrawlStatus()
now := time.Now()
// Go through each peer we have connected with
// looking for opportunities to disconnect
for _, cs := range crawlerStatus {
if cs.PeerID == "" {
continue
}
// Remain connected to each peer for a minimum period of time
if now.Sub(cs.LastSuccess) < defaultSeedDisconnectWaitPeriod {
continue
}
// Fetch the Peer using the saved ID
p := r.Switch.Peers().Get(cs.PeerID)
if p == nil {
continue
}
// Do not disconnect from persistent peers.
// Specifically, we need to remain connected to other seeds
if p.IsPersistent() {
continue
}
// Otherwise, disconnect from the peer
r.Switch.StopPeerGracefully(p)
}
}
// crawlStatus handles temporary data needed for the
// network crawling performed during seed/crawler mode.
type crawlStatus struct {
// The remote address of a potential peer we learned about
Addr *NetAddress
// Not empty if we are connected to the address
PeerID string
// The last time we attempt to reach this address
LastAttempt time.Time
// The last time we successfully reached this address
LastSuccess time.Time
}
// oldestAttempt implements sort.Interface for []crawlStatus
// based on the LastAttempt field.
type oldestAttempt []crawlStatus
func (oa oldestAttempt) Len() int { return len(oa) }
func (oa oldestAttempt) Swap(i, j int) { oa[i], oa[j] = oa[j], oa[i] }
func (oa oldestAttempt) Less(i, j int) bool { return oa[i].LastAttempt.Before(oa[j].LastAttempt) }
// getCrawlStatus returns addresses of potential peers that we wish to validate.
// NOTE: The status information is ordered as described above.
func (r *PEXReactor) getCrawlStatus() []crawlStatus {
var oa oldestAttempt
addrs := r.book.ListOfKnownAddresses()
// Go through all the addresses in the AddressBook
for _, addr := range addrs {
p := r.Switch.peers.GetByRemoteAddr(addr.Addr)
oa = append(oa, crawlStatus{
Addr: addr.Addr,
PeerID: p.Key(),
LastAttempt: addr.LastAttempt,
LastSuccess: addr.LastSuccess,
})
}
sort.Sort(oa)
return oa
}
// crawlPeers will crawl the network looking for new peer addresses. (once)
//
// TODO Basically, we need to work harder on our good-peer/bad-peer marking.
// What we're currently doing in terms of marking good/bad peers is just a
// placeholder. It should not be the case that an address becomes old/vetted
// upon a single successful connection.
func (r *PEXReactor) crawlPeers() {
crawlerStatus := r.getCrawlStatus()
peerInfos := r.getPeersToCrawl()
now := time.Now()
// Use addresses we know of to reach additional peers
for _, cs := range crawlerStatus {
// Do not dial peers that are already connected
if cs.PeerID != "" {
continue
}
for _, pi := range peerInfos {
// Do not attempt to connect with peers we recently dialed
if now.Sub(cs.LastAttempt) < defaultCrawlPeerInterval {
if now.Sub(pi.LastAttempt) < defaultCrawlPeerInterval {
continue
}
// Otherwise, attempt to connect with the known address
p, err := r.Switch.DialPeerWithAddress(cs.Addr, false)
_, err := r.Switch.DialPeerWithAddress(pi.Addr, false)
if err != nil {
r.book.MarkAttempt(cs.Addr)
r.book.MarkAttempt(pi.Addr)
continue
}
// Enter the peer ID into our crawl status information
cs.PeerID = p.Key()
r.book.MarkGood(cs.Addr)
}
// Crawl the connected peers asking for more addresses
for _, cs := range crawlerStatus {
if cs.PeerID == "" {
continue
}
for _, pi := range peerInfos {
// We will wait a minimum period of time before crawling peers again
if now.Sub(cs.LastAttempt) >= defaultCrawlPeerInterval {
p := r.Switch.peers.Get(cs.PeerID)
if p != nil {
r.RequestPEX(p)
if now.Sub(pi.LastAttempt) >= defaultCrawlPeerInterval {
peer := r.Switch.Peers().Get(pi.Addr.ID)
if peer != nil {
r.RequestPEX(peer)
}
}
}
}
// attemptDisconnects checks the crawlStatus info for Peers to disconnect from. (once)
// attemptDisconnects checks if we've been with each peer long enough to disconnect
func (r *PEXReactor) attemptDisconnects() {
crawlerStatus := r.getCrawlStatus()
now := time.Now()
// Go through each peer we have connected with
// looking for opportunities to disconnect
for _, cs := range crawlerStatus {
if cs.PeerID == "" {
continue
}
// Remain connected to each peer for a minimum period of time
if now.Sub(cs.LastSuccess) < defaultSeedDisconnectWaitPeriod {
continue
}
// Fetch the Peer using the saved ID
p := r.Switch.peers.Get(cs.PeerID)
if p == nil {
for _, peer := range r.Switch.Peers().List() {
status := peer.Status()
if status.Duration < defaultSeedDisconnectWaitPeriod {
continue
}
// Do not disconnect from persistent peers.
// Specifically, we need to remain connected to other seeds
if p.IsPersistent() {
if peer.IsPersistent() {
continue
}
// Otherwise, disconnect from the peer
r.Switch.StopPeerGracefully(p)
}
}
// randomly dial seeds until we connect to one or exhaust them
func (r *PEXReactor) dialSeed() {
lSeeds := len(r.config.Seeds)
if lSeeds == 0 {
return
r.Switch.StopPeerGracefully(peer)
}
seedAddrs, _ := NewNetAddressStrings(r.config.Seeds)
perm := r.Switch.rng.Perm(lSeeds)
for _, i := range perm {
// dial a random seed
seedAddr := seedAddrs[i]
peer, err := r.Switch.DialPeerWithAddress(seedAddr, false)
if err != nil {
r.Switch.Logger.Error("Error dialing seed", "err", err, "seed", seedAddr)
} else {
r.Switch.Logger.Info("Connected to seed", "peer", peer)
return
}
}
r.Switch.Logger.Error("Couldn't connect to any seeds")
}
//-----------------------------------------------------------------------------


+ 21
- 25
p2p/pex_reactor_test.go View File

@ -295,46 +295,42 @@ func TestPEXReactorCrawlStatus(t *testing.T) {
book := NewAddrBook(dir+"addrbook.json", false)
book.SetLogger(log.TestingLogger())
var r *PEXReactor
pexR := NewPEXReactor(book, &PEXReactorConfig{SeedMode: true})
// Seed/Crawler mode uses data from the Switch
makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
r = NewPEXReactor(book, true)
r.SetLogger(log.TestingLogger())
pexR.SetLogger(log.TestingLogger())
sw.SetLogger(log.TestingLogger().With("switch", i))
sw.AddReactor("pex", r)
sw.AddReactor("pex", pexR)
return sw
})
// Create a peer, and add it to the peer set
// Create a peer, add it to the peer set and the addrbook.
peer := createRandomPeer(false)
r.Switch.peers.Add(peer)
// Add the peer address to the address book
addr1, _ := NewNetAddressString(peer.NodeInfo().ListenAddr)
r.book.AddAddress(addr1, addr1)
// Add an address to the book that does not have a peer
pexR.Switch.peers.Add(peer)
addr1 := peer.NodeInfo().NetAddress()
pexR.book.AddAddress(addr1, addr1)
// Add a non-connected address to the book.
_, addr2 := createRoutableAddr()
r.book.AddAddress(addr2, addr1)
pexR.book.AddAddress(addr2, addr1)
// Get the crawl status data
status := r.getCrawlStatus()
// Get some peerInfos to crawl
peerInfos := pexR.getPeersToCrawl()
// Make sure it has the proper number of elements
assert.Equal(2, len(status))
assert.Equal(2, len(peerInfos))
var num int
for _, cs := range status {
if cs.PeerID != "" {
num++
}
}
// Check that only one has been identified as a connected peer
assert.Equal(1, num)
// TODO: test
}
func createRoutableAddr() (addr string, netAddr *NetAddress) {
for {
addr = cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256)
netAddr, _ = NewNetAddressString(addr)
var err error
addr = cmn.Fmt("%X@%v.%v.%v.%v:46656", cmn.RandBytes(20), rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256)
netAddr, err = NewNetAddressString(addr)
if err != nil {
panic(err)
}
if netAddr.Routable() {
break
}
@ -346,7 +342,7 @@ func createRandomPeer(outbound bool) *peer {
addr, netAddr := createRoutableAddr()
p := &peer{
nodeInfo: NodeInfo{
ListenAddr: netAddr.String(),
ListenAddr: netAddr.DialString(),
PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(),
},
outbound: outbound,


Loading…
Cancel
Save