Browse Source

Moved PEX logic to PeerManager where it belongs.

pull/9/head
Jae Kwon 11 years ago
parent
commit
c5732f9374
4 changed files with 59 additions and 58 deletions
  1. +4
    -2
      common/throttler.go
  2. +7
    -45
      main.go
  3. +11
    -3
      p2p/addrbook.go
  4. +37
    -8
      p2p/peer_manager.go

+ 4
- 2
common/throttler.go View File

@ -6,8 +6,10 @@ import (
)
/*
Throttler sends a struct{}{} to .Ch "dur" after the last .Set().
It's good for ensuring that something happens last after a burst of events.
Throttler fires an event at most "dur" after each .Set() call.
If a short burst of .Set() calls happens, Throttler fires once.
If a long continuous burst of .Set() calls happens, Throttler fires
at most once every "dur".
*/
type Throttler struct {
Ch chan struct{}


+ 7
- 45
main.go View File

@ -9,11 +9,10 @@ import (
)
type Node struct {
lz []p2p.Listener
sw *p2p.Switch
swEvents chan interface{}
book *p2p.AddrBook
pmgr *p2p.PeerManager
lz []p2p.Listener
sw *p2p.Switch
book *p2p.AddrBook
pmgr *p2p.PeerManager
}
func NewNode() *Node {
@ -41,16 +40,13 @@ func NewNode() *Node {
},
}
sw := p2p.NewSwitch(chDescs)
swEvents := make(chan interface{})
sw.AddEventListener("Node.swEvents", swEvents)
book := p2p.NewAddrBook(config.RootDir + "/addrbook.json")
pmgr := p2p.NewPeerManager(sw, book)
return &Node{
sw: sw,
swEvents: swEvents,
book: book,
pmgr: pmgr,
sw: sw,
book: book,
pmgr: pmgr,
}
}
@ -59,7 +55,6 @@ func (n *Node) Start() {
for _, l := range n.lz {
go n.inboundConnectionHandler(l)
}
go n.switchEventsHandler()
n.sw.Start()
n.book.Start()
n.pmgr.Start()
@ -69,7 +64,6 @@ func (n *Node) Stop() {
log.Info("Stopping node")
// TODO: gracefully disconnect from peers.
n.sw.Stop()
close(n.swEvents)
n.book.Stop()
n.pmgr.Stop()
}
@ -102,38 +96,6 @@ func (n *Node) inboundConnectionHandler(l p2p.Listener) {
// cleanup
}
func (n *Node) switchEventsHandler() {
for {
swEvent, ok := <-n.swEvents
if !ok {
break
}
switch swEvent.(type) {
case p2p.SwitchEventNewPeer:
event := swEvent.(p2p.SwitchEventNewPeer)
if event.Peer.IsOutbound() {
n.sendOurExternalAddrs(event.Peer)
if n.book.NeedMoreAddrs() {
n.pmgr.RequestPEX(event.Peer)
}
}
case p2p.SwitchEventDonePeer:
// TODO
}
}
}
func (n *Node) sendOurExternalAddrs(peer *p2p.Peer) {
// Send listener our external address(es)
addrs := []*p2p.NetAddress{}
for _, l := range n.lz {
addrs = append(addrs, l.ExternalAddress())
}
n.pmgr.SendAddrs(peer, addrs)
// On the remote end, the pexHandler may choose
// to add these to its book.
}
//-----------------------------------------------------------------------------
func main() {


+ 11
- 3
p2p/addrbook.go View File

@ -80,7 +80,7 @@ type AddrBook struct {
mtx sync.Mutex
rand *rand.Rand
key string
ourAddrs map[string]struct{}
ourAddrs map[string]*NetAddress
addrLookup map[string]*knownAddress // new & old
addrNew []map[string]*knownAddress
addrOld []map[string]*knownAddress
@ -101,7 +101,7 @@ const (
func NewAddrBook(filePath string) *AddrBook {
am := AddrBook{
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
ourAddrs: make(map[string]struct{}),
ourAddrs: make(map[string]*NetAddress),
addrLookup: make(map[string]*knownAddress),
quit: make(chan struct{}),
filePath: filePath,
@ -145,7 +145,15 @@ func (a *AddrBook) Stop() {
func (a *AddrBook) AddOurAddress(addr *NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.ourAddrs[addr.String()] = struct{}{}
a.ourAddrs[addr.String()] = addr
}
func (a *AddrBook) OurAddresses() []*NetAddress {
addrs := []*NetAddress{}
for _, addr := range a.ourAddrs {
addrs = append(addrs, addr)
}
return addrs
}
func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) {


+ 37
- 8
p2p/peer_manager.go View File

@ -26,18 +26,22 @@ PeerManager handles PEX (peer exchange) and ensures that an
adequate number of peers are connected to the switch.
*/
type PeerManager struct {
sw *Switch
book *AddrBook
quit chan struct{}
started uint32
stopped uint32
sw *Switch
swEvents chan interface{}
book *AddrBook
quit chan struct{}
started uint32
stopped uint32
}
func NewPeerManager(sw *Switch, book *AddrBook) *PeerManager {
swEvents := make(chan interface{})
sw.AddEventListener("PeerManager.swEvents", swEvents)
pm := &PeerManager{
sw: sw,
book: book,
quit: make(chan struct{}),
sw: sw,
swEvents: swEvents,
book: book,
quit: make(chan struct{}),
}
return pm
}
@ -45,6 +49,7 @@ func NewPeerManager(sw *Switch, book *AddrBook) *PeerManager {
func (pm *PeerManager) Start() {
if atomic.CompareAndSwapUint32(&pm.started, 0, 1) {
log.Info("Starting PeerManager")
go pm.switchEventsHandler()
go pm.ensurePeersHandler()
go pm.pexHandler()
}
@ -54,6 +59,7 @@ func (pm *PeerManager) Stop() {
if atomic.CompareAndSwapUint32(&pm.stopped, 0, 1) {
log.Info("Stopping PeerManager")
close(pm.quit)
close(pm.swEvents)
}
}
@ -70,6 +76,29 @@ func (pm *PeerManager) SendAddrs(peer *Peer, addrs []*NetAddress) {
peer.Send(NewPacket(pexCh, tm))
}
// For new outbound peers, announce our listener addresses if any,
// and if .book needs more addresses, ask for them.
func (pm *PeerManager) switchEventsHandler() {
for {
swEvent, ok := <-pm.swEvents
if !ok {
break
}
switch swEvent.(type) {
case SwitchEventNewPeer:
event := swEvent.(SwitchEventNewPeer)
if event.Peer.IsOutbound() {
pm.SendAddrs(event.Peer, pm.book.OurAddresses())
if pm.book.NeedMoreAddrs() {
pm.RequestPEX(event.Peer)
}
}
case SwitchEventDonePeer:
// TODO
}
}
}
// Ensures that sufficient peers are connected. (continuous)
func (pm *PeerManager) ensurePeersHandler() {
// fire once immediately.


Loading…
Cancel
Save