Browse Source

prefer short names

pull/456/head
Anton Kalyaev 8 years ago
committed by Anton Kaliaev
parent
commit
26f661a5dd
No known key found for this signature in database GPG Key ID: 7B6881D965918214
1 changed files with 54 additions and 58 deletions
  1. +54
    -58
      pex_reactor.go

+ 54
- 58
pex_reactor.go View File

@ -18,10 +18,8 @@ const (
maxPexMessageSize = 1048576 // 1MB maxPexMessageSize = 1048576 // 1MB
) )
/*
PEXReactor handles PEX (peer exchange) and ensures that an
adequate number of peers are connected to the switch.
*/
// PEXReactor handles PEX (peer exchange) and ensures that an
// adequate number of peers are connected to the switch.
type PEXReactor struct { type PEXReactor struct {
BaseReactor BaseReactor
@ -29,28 +27,28 @@ type PEXReactor struct {
book *AddrBook book *AddrBook
} }
func NewPEXReactor(book *AddrBook) *PEXReactor {
pexR := &PEXReactor{
book: book,
func NewPEXReactor(b *AddrBook) *PEXReactor {
r := &PEXReactor{
book: b,
} }
pexR.BaseReactor = *NewBaseReactor(log, "PEXReactor", pexR)
return pexR
r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r)
return r
} }
func (pexR *PEXReactor) OnStart() error {
pexR.BaseReactor.OnStart()
pexR.book.Start()
go pexR.ensurePeersRoutine()
func (r *PEXReactor) OnStart() error {
r.BaseReactor.OnStart()
r.book.Start()
go r.ensurePeersRoutine()
return nil return nil
} }
func (pexR *PEXReactor) OnStop() {
pexR.BaseReactor.OnStop()
pexR.book.Stop()
func (r *PEXReactor) OnStop() {
r.BaseReactor.OnStop()
r.book.Stop()
} }
// Implements Reactor
func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor {
// GetChannels implements Reactor
func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
return []*ChannelDescriptor{ return []*ChannelDescriptor{
&ChannelDescriptor{ &ChannelDescriptor{
ID: PexChannel, ID: PexChannel,
@ -60,49 +58,45 @@ func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor {
} }
} }
// Implements Reactor
func (pexR *PEXReactor) AddPeer(peer *Peer) {
// Add the peer to the address book
netAddr, err := NewNetAddressString(peer.ListenAddr)
// AddPeer implements Reactor by adding peer to the address book (if inbound)
// or by requesting more addresses (if outbound).
func (r *PEXReactor) AddPeer(p *Peer) {
netAddr, err := NewNetAddressString(p.ListenAddr)
if err != nil { if err != nil {
// this should never happen // this should never happen
log.Error("Error in AddPeer: invalid peer address", "addr", peer.ListenAddr, "error", err)
log.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err)
return return
} }
if peer.IsOutbound() {
if pexR.book.NeedMoreAddrs() {
pexR.RequestPEX(peer)
if p.IsOutbound() { // For outbound peers, the address is already in the books
if r.book.NeedMoreAddrs() {
r.RequestPEX(p)
} }
} else {
// For inbound connections, the peer is its own source
// (For outbound peers, the address is already in the books)
pexR.book.AddAddress(netAddr, netAddr)
} else { // For inbound connections, the peer is its own source
r.book.AddAddress(netAddr, netAddr)
} }
} }
// Implements Reactor
func (pexR *PEXReactor) RemovePeer(peer *Peer, reason interface{}) {
// RemovePeer implements Reactor
func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
// TODO // TODO
} }
// Implements Reactor
// Handles incoming PEX messages.
func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
// decode message
// Receive implements Reactor by handling incoming PEX messages.
func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes) _, msg, err := DecodeMessage(msgBytes)
if err != nil { if err != nil {
log.Warn("Error decoding message", "error", err) log.Warn("Error decoding message", "error", err)
return return
} }
log.Notice("Received message", "msg", msg) log.Notice("Received message", "msg", msg)
switch msg := msg.(type) { switch msg := msg.(type) {
case *pexRequestMessage: case *pexRequestMessage:
// src requested some peers. // src requested some peers.
// TODO: prevent abuse. // TODO: prevent abuse.
pexR.SendAddrs(src, pexR.book.GetSelection())
r.SendAddrs(src, r.book.GetSelection())
case *pexAddrsMessage: case *pexAddrsMessage:
// We received some peer addresses from src. // We received some peer addresses from src.
// TODO: prevent abuse. // TODO: prevent abuse.
@ -110,7 +104,7 @@ func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
srcAddr := src.Connection().RemoteAddress srcAddr := src.Connection().RemoteAddress
for _, addr := range msg.Addrs { for _, addr := range msg.Addrs {
if addr != nil { if addr != nil {
pexR.book.AddAddress(addr, srcAddr)
r.book.AddAddress(addr, srcAddr)
} }
} }
default: default:
@ -118,30 +112,32 @@ func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
} }
} }
// Asks peer for more addresses.
func (pexR *PEXReactor) RequestPEX(peer *Peer) {
peer.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
// RequestPEX asks peer for more addresses.
func (r *PEXReactor) RequestPEX(p *Peer) {
p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
} }
func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) {
peer.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
// SendAddrs sends addrs to the peer.
func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) {
p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
} }
// Ensures that sufficient peers are connected. (continuous) // Ensures that sufficient peers are connected. (continuous)
func (pexR *PEXReactor) ensurePeersRoutine() {
func (r *PEXReactor) ensurePeersRoutine() {
// Randomize when routine starts // Randomize when routine starts
time.Sleep(time.Duration(rand.Int63n(500*ensurePeersPeriodSeconds)) * time.Millisecond) time.Sleep(time.Duration(rand.Int63n(500*ensurePeersPeriodSeconds)) * time.Millisecond)
// fire once immediately. // fire once immediately.
pexR.ensurePeers()
r.ensurePeers()
// fire periodically // fire periodically
timer := NewRepeatTimer("pex", ensurePeersPeriodSeconds*time.Second) timer := NewRepeatTimer("pex", ensurePeersPeriodSeconds*time.Second)
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
case <-timer.Ch: case <-timer.Ch:
pexR.ensurePeers()
case <-pexR.Quit:
r.ensurePeers()
case <-r.Quit:
break FOR_LOOP break FOR_LOOP
} }
} }
@ -151,8 +147,8 @@ FOR_LOOP:
} }
// Ensures that sufficient peers are connected. (once) // Ensures that sufficient peers are connected. (once)
func (pexR *PEXReactor) ensurePeers() {
numOutPeers, _, numDialing := pexR.Switch.NumPeers()
func (r *PEXReactor) ensurePeers() {
numOutPeers, _, numDialing := r.Switch.NumPeers()
numToDial := minNumOutboundPeers - (numOutPeers + numDialing) numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
log.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial) log.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
if numToDial <= 0 { if numToDial <= 0 {
@ -168,13 +164,13 @@ func (pexR *PEXReactor) ensurePeers() {
// Try to fetch a new peer 3 times. // Try to fetch a new peer 3 times.
// This caps the maximum number of tries to 3 * numToDial. // This caps the maximum number of tries to 3 * numToDial.
for j := 0; j < 3; j++ { for j := 0; j < 3; j++ {
try := pexR.book.PickAddress(newBias)
try := r.book.PickAddress(newBias)
if try == nil { if try == nil {
break break
} }
alreadySelected := toDial.Has(try.IP.String()) alreadySelected := toDial.Has(try.IP.String())
alreadyDialing := pexR.Switch.IsDialing(try)
alreadyConnected := pexR.Switch.Peers().Has(try.IP.String())
alreadyDialing := r.Switch.IsDialing(try)
alreadyConnected := r.Switch.Peers().Has(try.IP.String())
if alreadySelected || alreadyDialing || alreadyConnected { if alreadySelected || alreadyDialing || alreadyConnected {
/* /*
log.Info("Cannot dial address", "addr", try, log.Info("Cannot dial address", "addr", try,
@ -198,20 +194,20 @@ func (pexR *PEXReactor) ensurePeers() {
// Dial picked addresses // Dial picked addresses
for _, item := range toDial.Values() { for _, item := range toDial.Values() {
go func(picked *NetAddress) { go func(picked *NetAddress) {
_, err := pexR.Switch.DialPeerWithAddress(picked, false)
_, err := r.Switch.DialPeerWithAddress(picked, false)
if err != nil { if err != nil {
pexR.book.MarkAttempt(picked)
r.book.MarkAttempt(picked)
} }
}(item.(*NetAddress)) }(item.(*NetAddress))
} }
// If we need more addresses, pick a random peer and ask for more. // If we need more addresses, pick a random peer and ask for more.
if pexR.book.NeedMoreAddrs() {
if peers := pexR.Switch.Peers().List(); len(peers) > 0 {
if r.book.NeedMoreAddrs() {
if peers := r.Switch.Peers().List(); len(peers) > 0 {
i := rand.Int() % len(peers) i := rand.Int() % len(peers)
peer := peers[i] peer := peers[i]
log.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer) log.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer)
pexR.RequestPEX(peer)
r.RequestPEX(peer)
} }
} }
} }


Loading…
Cancel
Save