diff --git a/daemon/daemon.go b/daemon/daemon.go index e13ff036c..f548275e1 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -18,7 +18,6 @@ import ( ) type Node struct { - lz []p2p.Listener sw *p2p.Switch book *p2p.AddrBook blockStore *bc.BlockStore @@ -92,11 +91,7 @@ func NewNode() *Node { func (n *Node) Start() { log.Info("Starting Node") - for _, l := range n.lz { - go n.inboundConnectionRoutine(l) - } n.book.Start() - //n.sw.StartReactors()... n.sw.Reactor("PEX").Start(n.sw) n.sw.Reactor("MEMPOOL").Start(n.sw) n.sw.Reactor("BLOCKCHAIN").Start(n.sw) @@ -115,7 +110,7 @@ func (n *Node) Stop() { // Add a Listener to accept inbound peer connections. func (n *Node) AddListener(l p2p.Listener) { log.Info(Fmt("Added %v", l)) - n.lz = append(n.lz, l) + n.sw.AddListener(l) n.book.AddOurAddress(l.ExternalAddress()) } @@ -173,6 +168,8 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor { return n.mempoolReactor } +//------------------------------------------------------------------------------ + // debora variables var ( AppName = "tendermint" diff --git a/p2p/switch.go b/p2p/switch.go index 1cb4edbf8..8a36fe652 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -28,12 +28,12 @@ incoming messages are received on the reactor. */ type Switch struct { network string + listeners []Listener reactors map[string]Reactor chDescs []*ChannelDescriptor reactorsByCh map[byte]Reactor peers *PeerSet dialing *CMap - listeners *CMap // listenerName -> chan interface{} } var ( @@ -53,7 +53,6 @@ func NewSwitch() *Switch { reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: NewCMap(), - listeners: NewCMap(), } return sw @@ -109,10 +108,20 @@ func (sw *Switch) StopPeers() { sw.peers = NewPeerSet() } +// Convenience function +func (sw *Switch) StopListeners() { + // Stop each listener. + for _, listener := range sw.listeners { + listener.Stop() + } + sw.listeners = nil +} + // Convenience function func (sw *Switch) Stop() { sw.StopPeers() sw.StopReactors() + sw.StopListeners() } // Not goroutine safe to modify. @@ -120,6 +129,15 @@ func (sw *Switch) Reactors() map[string]Reactor { return sw.reactors } +func (sw *Switch) AddListener(l Listener) { + sw.listeners = append(sw.listeners, l) + go sw.listenerRoutine(l) +} + +func (sw *Switch) IsListening() bool { + return len(sw.listeners) > 0 +} + func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) @@ -134,7 +152,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er // Start the peer peer.start() - // Notify listeners. + // Notify reactors sw.doAddPeer(peer) // Send handshake @@ -207,7 +225,7 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { sw.peers.Remove(peer) peer.stop() - // Notify listeners + // Notify reactors sw.doRemovePeer(peer, reason) } @@ -218,14 +236,10 @@ func (sw *Switch) StopPeerGracefully(peer *Peer) { sw.peers.Remove(peer) peer.stop() - // Notify listeners + // Notify reactors sw.doRemovePeer(peer, nil) } -func (sw *Switch) IsListening() bool { - return sw.listeners.Size() > 0 -} - func (sw *Switch) doAddPeer(peer *Peer) { for _, reactor := range sw.reactors { reactor.AddPeer(peer) @@ -238,6 +252,27 @@ func (sw *Switch) doRemovePeer(peer *Peer, reason interface{}) { } } +func (sw *Switch) listenerRoutine(l Listener) { + for { + inConn, ok := <-l.Connections() + if !ok { + break + } + // New inbound connection! + peer, err := sw.AddPeerWithConnection(inConn, false) + if err != nil { + log.Info("Ignoring error from inbound connection: %v\n%v", + peer, err) + continue + } + // NOTE: We don't yet have the external address of the + // remote (if they have a listener at all). + // PEXReactor's pexRoutine will handle that. + } + + // cleanup +} + //----------------------------------------------------------------------------- type SwitchEventNewPeer struct {