diff --git a/main.go b/main.go index 70b77cc09..240b16a46 100644 --- a/main.go +++ b/main.go @@ -11,10 +11,11 @@ import ( ) type Node struct { - lz []p2p.Listener - sw *p2p.Switch - book *p2p.AddrBook - pmgr *p2p.PeerManager + lz []p2p.Listener + sw *p2p.Switch + swEvents chan interface{} + book *p2p.AddrBook + pmgr *p2p.PeerManager } func NewNode() *Node { @@ -42,13 +43,16 @@ func NewNode() *Node { }, } sw := p2p.NewSwitch(chDescs) + swEvents := make(chan interface{}) + sw.AddEventListener("Node.swEvents", swEvents) book := p2p.NewAddrBook(config.RootDir + "/addrbook.json") pmgr := p2p.NewPeerManager(sw, book) return &Node{ - sw: sw, - book: book, - pmgr: pmgr, + sw: sw, + swEvents: swEvents, + book: book, + pmgr: pmgr, } } @@ -57,11 +61,21 @@ func (n *Node) Start() { for _, l := range n.lz { go n.inboundConnectionHandler(l) } + go n.switchEventsHandler() n.sw.Start() n.book.Start() n.pmgr.Start() } +func (n *Node) Stop() { + log.Info("Stopping node") + // TODO: gracefully disconnect from peers. + n.sw.Stop() + close(n.swEvents) + n.book.Stop() + n.pmgr.Stop() +} + // Add a Listener to accept inbound peer connections. func (n *Node) AddListener(l p2p.Listener) { log.Info("Added %v", l) @@ -89,7 +103,27 @@ func (n *Node) inboundConnectionHandler(l p2p.Listener) { // cleanup } -func (n *Node) SendOurExternalAddrs(peer *p2p.Peer) { +func (n *Node) switchEventsHandler() { + for { + swEvent, ok := <-n.swEvents + if !ok { + break + } + switch swEvent.(type) { + case p2p.SwitchEventNewPeer: + event := swEvent.(p2p.SwitchEventNewPeer) + n.sendOurExternalAddrs(event.Peer) + if n.book.NeedMoreAddrs() { + pkt := p2p.NewPacket(p2p.PexCh, p2p.NewPexRequestMessage()) + event.Peer.TrySend(pkt) + } + case p2p.SwitchEventDonePeer: + // TODO + } + } +} + +func (n *Node) sendOurExternalAddrs(peer *p2p.Peer) { // Send listener our external address(es) addrs := []*p2p.NetAddress{} for _, l := range n.lz { @@ -101,25 +135,6 @@ func (n *Node) SendOurExternalAddrs(peer *p2p.Peer) { // to add these to its book. } -func (n *Node) newPeersHandler() { - for { - peer, ok := <-n.pmgr.NewPeers() - if !ok { - break - } - // New outbound peer! - n.SendOurExternalAddrs(peer) - } -} - -func (n *Node) Stop() { - log.Info("Stopping node") - // TODO: gracefully disconnect from peers. - n.sw.Stop() - n.book.Stop() - n.pmgr.Stop() -} - //----------------------------------------------------------------------------- func main() { @@ -139,7 +154,6 @@ func main() { return } else { log.Info("Connected to seed: %v", peer) - n.SendOurExternalAddrs(peer) } } diff --git a/p2p/addrbook.go b/p2p/addrbook.go index 584da1173..cdd09bd41 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -144,7 +144,7 @@ func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) { a.addAddress(addr, src) } -func (a *AddrBook) NeedMoreAddresses() bool { +func (a *AddrBook) NeedMoreAddrs() bool { return a.Size() < needAddressThreshold } @@ -359,6 +359,7 @@ out: for { select { case <-dumpAddressTicker.C: + log.Debug("Saving book to file (%v)", a.Size()) a.saveToFile(a.filePath) case <-a.quit: break out diff --git a/p2p/connection.go b/p2p/connection.go index 20d8f382a..327c3e376 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -180,7 +180,7 @@ FOR_LOOP: } log.Debug("%v sendHandler done", c) - // cleanup + // Cleanup } // recvHandler reads from .bufReader and pushes to the appropriate @@ -240,10 +240,10 @@ FOR_LOOP: } log.Debug("%v recvHandler done", c) - // cleanup + // Cleanup close(c.pong) for _ = range c.pong { - // drain + // Drain } } diff --git a/p2p/listener.go b/p2p/listener.go index a9b3d1b89..9b505a33e 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -91,7 +91,7 @@ func (l *DefaultListener) listenHandler() { conn, err := l.listener.Accept() if atomic.LoadUint32(&l.stopped) == 1 { - break // go to cleanup + break // Go to cleanup } // listener wasn't stopped, @@ -104,10 +104,10 @@ func (l *DefaultListener) listenHandler() { l.connections <- c } - // cleanup + // Cleanup close(l.connections) for _ = range l.connections { - // drain + // Drain } } diff --git a/p2p/peer.go b/p2p/peer.go index bb1126dff..701c56d0d 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -133,8 +133,7 @@ FOR_LOOP: } log.Debug("%v sendHandler [%v] closed", p, chName) - // cleanup - // (none) + // Cleanup } // recvHandler pulls from a channel and pushes to the given pktRecvQueue. @@ -168,8 +167,7 @@ FOR_LOOP: } log.Debug("%v recvHandler [%v] closed", p, chName) - // cleanup - // (none) + // Cleanup } //----------------------------------------------------------------------------- diff --git a/p2p/peer_manager.go b/p2p/peer_manager.go index ad65e6999..9f71fadb8 100644 --- a/p2p/peer_manager.go +++ b/p2p/peer_manager.go @@ -16,30 +16,27 @@ var pexErrInvalidMessage = errors.New("Invalid PEX message") const ( PexCh = "PEX" ensurePeersPeriodSeconds = 30 - minNumPeers = 10 - maxNumPeers = 20 + minNumOutboundPeers = 10 + maxNumPeers = 50 ) /* PeerManager handles PEX (peer exchange) and ensures that an adequate number of peers are connected to the switch. -User must pull from the .NewPeers() channel. */ type PeerManager struct { - sw *Switch - book *AddrBook - quit chan struct{} - newPeers chan *Peer - started uint32 - stopped uint32 + sw *Switch + book *AddrBook + quit chan struct{} + started uint32 + stopped uint32 } func NewPeerManager(sw *Switch, book *AddrBook) *PeerManager { pm := &PeerManager{ - sw: sw, - book: book, - quit: make(chan struct{}), - newPeers: make(chan *Peer), + sw: sw, + book: book, + quit: make(chan struct{}), } return pm } @@ -55,16 +52,11 @@ func (pm *PeerManager) Start() { func (pm *PeerManager) Stop() { if atomic.CompareAndSwapUint32(&pm.stopped, 0, 1) { log.Info("Stopping peerManager") - close(pm.newPeers) close(pm.quit) } } -// Closes when PeerManager closes. -func (pm *PeerManager) NewPeers() <-chan *Peer { - return pm.newPeers -} - +// Ensures that sufficient peers are connected. (continuous) func (pm *PeerManager) ensurePeersHandler() { // fire once immediately. pm.ensurePeers() @@ -80,14 +72,14 @@ FOR_LOOP: } } - // cleanup + // Cleanup timer.Stop() } -// Ensures that sufficient peers are connected. +// Ensures that sufficient peers are connected. (once) func (pm *PeerManager) ensurePeers() { numOutPeers, _, numDialing := pm.sw.NumPeers() - numToDial := minNumPeers - (numOutPeers + numDialing) + numToDial := minNumOutboundPeers - (numOutPeers + numDialing) if numToDial <= 0 { return } @@ -124,16 +116,15 @@ func (pm *PeerManager) ensurePeers() { for _, item := range toDial.Values() { picked := item.(*NetAddress) go func() { - peer, err := pm.sw.DialPeerWithAddress(picked) + _, err := pm.sw.DialPeerWithAddress(picked) if err != nil { pm.book.MarkAttempt(picked) } - // Connection established. - pm.newPeers <- peer }() } } +// Handles incoming Pex messages. func (pm *PeerManager) pexHandler() { for { @@ -172,7 +163,7 @@ func (pm *PeerManager) pexHandler() { } } - // cleanup + // Cleanup } @@ -205,6 +196,12 @@ A PexRequestMessage requests additional peer addresses. type PexRequestMessage struct { } +// TODO: define NewPexRequestPacket instead? + +func NewPexRequestMessage() *PexRequestMessage { + return &PexRequestMessage{} +} + func (m *PexRequestMessage) WriteTo(w io.Writer) (n int64, err error) { n, err = WriteOnto(pexTypeRequest, w, n, err) return diff --git a/p2p/switch.go b/p2p/switch.go index 463a84453..d7549ffac 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -24,6 +24,7 @@ type Switch struct { pktRecvQueues map[string]chan *InboundPacket peers *PeerSet dialing *CMap + listeners *CMap // name -> chan interface{} quit chan struct{} started uint32 stopped uint32 @@ -50,6 +51,7 @@ func NewSwitch(channels []ChannelDescriptor) *Switch { pktRecvQueues: pktRecvQueues, peers: NewPeerSet(), dialing: NewCMap(), + listeners: NewCMap(), quit: make(chan struct{}), stopped: 0, } @@ -89,13 +91,21 @@ func (s *Switch) AddPeerWithConnection(conn *Connection, outbound bool) (*Peer, } peer := newPeer(conn, channels) peer.outbound = outbound - err := s.addPeer(peer) - if err != nil { - return nil, err + + // Add the peer to .peers + if s.peers.Add(peer) { + log.Debug("Adding: %v", peer) + } else { + log.Info("Ignoring duplicate: %v", peer) + return nil, ErrSwitchDuplicatePeer } + // Start the peer go peer.start(s.pktRecvQueues, s.StopPeerForError) + // Notify listeners. + s.emit(SwitchEventNewPeer{Peer: peer}) + return peer, nil } @@ -141,6 +151,16 @@ func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) { } +// The events are of type SwitchEvent* defined below. +// Switch does not close these listeners. +func (s *Switch) AddEventListener(name string, listener chan<- interface{}) { + s.listeners.Set(name, listener) +} + +func (s *Switch) RemoveEventListener(name string) { + s.listeners.Delete(name) +} + /* Receive blocks on a channel until a message is found. */ @@ -185,27 +205,37 @@ func (s *Switch) Peers() IPeerSet { // TODO: make record depending on reason. func (s *Switch) StopPeerForError(peer *Peer, reason interface{}) { log.Info("%v errored: %v", peer, reason) - s.StopPeer(peer, false) + s.peers.Remove(peer) + peer.stop() + + // Notify listeners + s.emit(SwitchEventDonePeer{Peer: peer, Error: reason}) } -// Disconnect from a peer. -// If graceful is true, last message sent is a disconnect message. +// Disconnect from a peer gracefully. // TODO: handle graceful disconnects. -func (s *Switch) StopPeer(peer *Peer, graceful bool) { +func (s *Switch) StopPeerGracefully(peer *Peer) { s.peers.Remove(peer) peer.stop() + + // Notify listeners + s.emit(SwitchEventDonePeer{Peer: peer}) } -func (s *Switch) addPeer(peer *Peer) error { - if s.stopped == 1 { - return ErrSwitchStopped - } - if s.peers.Add(peer) { - log.Debug("Adding: %v", peer) - return nil - } else { - // ignore duplicate peer - log.Info("Ignoring duplicate: %v", peer) - return ErrSwitchDuplicatePeer +func (s *Switch) emit(event interface{}) { + for _, ch_i := range s.listeners.Values() { + ch := ch_i.(chan<- interface{}) + ch <- event } } + +//----------------------------------------------------------------------------- + +type SwitchEventNewPeer struct { + Peer *Peer +} + +type SwitchEventDonePeer struct { + Peer *Peer + Error interface{} +}