diff --git a/pex_reactor.go b/pex_reactor.go index 2cffb529c..01d0cbd18 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -18,10 +18,8 @@ const ( maxPexMessageSize = 1048576 // 1MB ) -/* -PEXReactor handles PEX (peer exchange) and ensures that an -adequate number of peers are connected to the switch. -*/ +// PEXReactor handles PEX (peer exchange) and ensures that an +// adequate number of peers are connected to the switch. type PEXReactor struct { BaseReactor @@ -29,28 +27,28 @@ type PEXReactor struct { book *AddrBook } -func NewPEXReactor(book *AddrBook) *PEXReactor { - pexR := &PEXReactor{ - book: book, +func NewPEXReactor(b *AddrBook) *PEXReactor { + r := &PEXReactor{ + book: b, } - pexR.BaseReactor = *NewBaseReactor(log, "PEXReactor", pexR) - return pexR + r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r) + return r } -func (pexR *PEXReactor) OnStart() error { - pexR.BaseReactor.OnStart() - pexR.book.Start() - go pexR.ensurePeersRoutine() +func (r *PEXReactor) OnStart() error { + r.BaseReactor.OnStart() + r.book.Start() + go r.ensurePeersRoutine() return nil } -func (pexR *PEXReactor) OnStop() { - pexR.BaseReactor.OnStop() - pexR.book.Stop() +func (r *PEXReactor) OnStop() { + r.BaseReactor.OnStop() + r.book.Stop() } -// Implements Reactor -func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { +// GetChannels implements Reactor +func (r *PEXReactor) GetChannels() []*ChannelDescriptor { return []*ChannelDescriptor{ &ChannelDescriptor{ ID: PexChannel, @@ -60,49 +58,45 @@ func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { } } -// Implements Reactor -func (pexR *PEXReactor) AddPeer(peer *Peer) { - // Add the peer to the address book - netAddr, err := NewNetAddressString(peer.ListenAddr) +// AddPeer implements Reactor by adding peer to the address book (if inbound) +// or by requesting more addresses (if outbound). +func (r *PEXReactor) AddPeer(p *Peer) { + netAddr, err := NewNetAddressString(p.ListenAddr) if err != nil { // this should never happen - log.Error("Error in AddPeer: invalid peer address", "addr", peer.ListenAddr, "error", err) + log.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err) return } - if peer.IsOutbound() { - if pexR.book.NeedMoreAddrs() { - pexR.RequestPEX(peer) + if p.IsOutbound() { // For outbound peers, the address is already in the books + if r.book.NeedMoreAddrs() { + r.RequestPEX(p) } - } else { - // For inbound connections, the peer is its own source - // (For outbound peers, the address is already in the books) - pexR.book.AddAddress(netAddr, netAddr) + } else { // For inbound connections, the peer is its own source + r.book.AddAddress(netAddr, netAddr) } } -// Implements Reactor -func (pexR *PEXReactor) RemovePeer(peer *Peer, reason interface{}) { +// RemovePeer implements Reactor +func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) { // TODO } -// Implements Reactor -// Handles incoming PEX messages. -func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { - - // decode message +// Receive implements Reactor by handling incoming PEX messages. +func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) if err != nil { log.Warn("Error decoding message", "error", err) return } + log.Notice("Received message", "msg", msg) switch msg := msg.(type) { case *pexRequestMessage: // src requested some peers. // TODO: prevent abuse. - pexR.SendAddrs(src, pexR.book.GetSelection()) + r.SendAddrs(src, r.book.GetSelection()) case *pexAddrsMessage: // We received some peer addresses from src. // TODO: prevent abuse. @@ -110,7 +104,7 @@ func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { srcAddr := src.Connection().RemoteAddress for _, addr := range msg.Addrs { if addr != nil { - pexR.book.AddAddress(addr, srcAddr) + r.book.AddAddress(addr, srcAddr) } } default: @@ -118,30 +112,32 @@ func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { } } -// Asks peer for more addresses. -func (pexR *PEXReactor) RequestPEX(peer *Peer) { - peer.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}}) +// RequestPEX asks peer for more addresses. +func (r *PEXReactor) RequestPEX(p *Peer) { + p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}}) } -func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) { - peer.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) +// SendAddrs sends addrs to the peer. +func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) { + p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) } // Ensures that sufficient peers are connected. (continuous) -func (pexR *PEXReactor) ensurePeersRoutine() { +func (r *PEXReactor) ensurePeersRoutine() { // Randomize when routine starts time.Sleep(time.Duration(rand.Int63n(500*ensurePeersPeriodSeconds)) * time.Millisecond) // fire once immediately. - pexR.ensurePeers() + r.ensurePeers() + // fire periodically timer := NewRepeatTimer("pex", ensurePeersPeriodSeconds*time.Second) FOR_LOOP: for { select { case <-timer.Ch: - pexR.ensurePeers() - case <-pexR.Quit: + r.ensurePeers() + case <-r.Quit: break FOR_LOOP } } @@ -151,8 +147,8 @@ FOR_LOOP: } // Ensures that sufficient peers are connected. (once) -func (pexR *PEXReactor) ensurePeers() { - numOutPeers, _, numDialing := pexR.Switch.NumPeers() +func (r *PEXReactor) ensurePeers() { + numOutPeers, _, numDialing := r.Switch.NumPeers() numToDial := minNumOutboundPeers - (numOutPeers + numDialing) log.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial) if numToDial <= 0 { @@ -168,13 +164,13 @@ func (pexR *PEXReactor) ensurePeers() { // Try to fetch a new peer 3 times. // This caps the maximum number of tries to 3 * numToDial. for j := 0; j < 3; j++ { - try := pexR.book.PickAddress(newBias) + try := r.book.PickAddress(newBias) if try == nil { break } alreadySelected := toDial.Has(try.IP.String()) - alreadyDialing := pexR.Switch.IsDialing(try) - alreadyConnected := pexR.Switch.Peers().Has(try.IP.String()) + alreadyDialing := r.Switch.IsDialing(try) + alreadyConnected := r.Switch.Peers().Has(try.IP.String()) if alreadySelected || alreadyDialing || alreadyConnected { /* log.Info("Cannot dial address", "addr", try, @@ -198,20 +194,20 @@ func (pexR *PEXReactor) ensurePeers() { // Dial picked addresses for _, item := range toDial.Values() { go func(picked *NetAddress) { - _, err := pexR.Switch.DialPeerWithAddress(picked, false) + _, err := r.Switch.DialPeerWithAddress(picked, false) if err != nil { - pexR.book.MarkAttempt(picked) + r.book.MarkAttempt(picked) } }(item.(*NetAddress)) } // If we need more addresses, pick a random peer and ask for more. - if pexR.book.NeedMoreAddrs() { - if peers := pexR.Switch.Peers().List(); len(peers) > 0 { + if r.book.NeedMoreAddrs() { + if peers := r.Switch.Peers().List(); len(peers) > 0 { i := rand.Int() % len(peers) peer := peers[i] log.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer) - pexR.RequestPEX(peer) + r.RequestPEX(peer) } } }