Browse Source

fixied IsListening()

pull/43/merge
Jae Kwon 10 years ago
parent
commit
c74b5522a1
2 changed files with 47 additions and 15 deletions
  1. +3
    -6
      daemon/daemon.go
  2. +44
    -9
      p2p/switch.go

+ 3
- 6
daemon/daemon.go View File

@ -18,7 +18,6 @@ import (
) )
type Node struct { type Node struct {
lz []p2p.Listener
sw *p2p.Switch sw *p2p.Switch
book *p2p.AddrBook book *p2p.AddrBook
blockStore *bc.BlockStore blockStore *bc.BlockStore
@ -92,11 +91,7 @@ func NewNode() *Node {
func (n *Node) Start() { func (n *Node) Start() {
log.Info("Starting Node") log.Info("Starting Node")
for _, l := range n.lz {
go n.inboundConnectionRoutine(l)
}
n.book.Start() n.book.Start()
//n.sw.StartReactors()...
n.sw.Reactor("PEX").Start(n.sw) n.sw.Reactor("PEX").Start(n.sw)
n.sw.Reactor("MEMPOOL").Start(n.sw) n.sw.Reactor("MEMPOOL").Start(n.sw)
n.sw.Reactor("BLOCKCHAIN").Start(n.sw) n.sw.Reactor("BLOCKCHAIN").Start(n.sw)
@ -115,7 +110,7 @@ func (n *Node) Stop() {
// Add a Listener to accept inbound peer connections. // Add a Listener to accept inbound peer connections.
func (n *Node) AddListener(l p2p.Listener) { func (n *Node) AddListener(l p2p.Listener) {
log.Info(Fmt("Added %v", l)) log.Info(Fmt("Added %v", l))
n.lz = append(n.lz, l)
n.sw.AddListener(l)
n.book.AddOurAddress(l.ExternalAddress()) n.book.AddOurAddress(l.ExternalAddress())
} }
@ -173,6 +168,8 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor {
return n.mempoolReactor return n.mempoolReactor
} }
//------------------------------------------------------------------------------
// debora variables // debora variables
var ( var (
AppName = "tendermint" AppName = "tendermint"


+ 44
- 9
p2p/switch.go View File

@ -28,12 +28,12 @@ incoming messages are received on the reactor.
*/ */
type Switch struct { type Switch struct {
network string network string
listeners []Listener
reactors map[string]Reactor reactors map[string]Reactor
chDescs []*ChannelDescriptor chDescs []*ChannelDescriptor
reactorsByCh map[byte]Reactor reactorsByCh map[byte]Reactor
peers *PeerSet peers *PeerSet
dialing *CMap dialing *CMap
listeners *CMap // listenerName -> chan interface{}
} }
var ( var (
@ -53,7 +53,6 @@ func NewSwitch() *Switch {
reactorsByCh: make(map[byte]Reactor), reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(), peers: NewPeerSet(),
dialing: NewCMap(), dialing: NewCMap(),
listeners: NewCMap(),
} }
return sw return sw
@ -109,10 +108,20 @@ func (sw *Switch) StopPeers() {
sw.peers = NewPeerSet() sw.peers = NewPeerSet()
} }
// Convenience function
func (sw *Switch) StopListeners() {
// Stop each listener.
for _, listener := range sw.listeners {
listener.Stop()
}
sw.listeners = nil
}
// Convenience function // Convenience function
func (sw *Switch) Stop() { func (sw *Switch) Stop() {
sw.StopPeers() sw.StopPeers()
sw.StopReactors() sw.StopReactors()
sw.StopListeners()
} }
// Not goroutine safe to modify. // Not goroutine safe to modify.
@ -120,6 +129,15 @@ func (sw *Switch) Reactors() map[string]Reactor {
return sw.reactors return sw.reactors
} }
func (sw *Switch) AddListener(l Listener) {
sw.listeners = append(sw.listeners, l)
go sw.listenerRoutine(l)
}
func (sw *Switch) IsListening() bool {
return len(sw.listeners) > 0
}
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
@ -134,7 +152,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er
// Start the peer // Start the peer
peer.start() peer.start()
// Notify listeners.
// Notify reactors
sw.doAddPeer(peer) sw.doAddPeer(peer)
// Send handshake // Send handshake
@ -207,7 +225,7 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
sw.peers.Remove(peer) sw.peers.Remove(peer)
peer.stop() peer.stop()
// Notify listeners
// Notify reactors
sw.doRemovePeer(peer, reason) sw.doRemovePeer(peer, reason)
} }
@ -218,14 +236,10 @@ func (sw *Switch) StopPeerGracefully(peer *Peer) {
sw.peers.Remove(peer) sw.peers.Remove(peer)
peer.stop() peer.stop()
// Notify listeners
// Notify reactors
sw.doRemovePeer(peer, nil) sw.doRemovePeer(peer, nil)
} }
func (sw *Switch) IsListening() bool {
return sw.listeners.Size() > 0
}
func (sw *Switch) doAddPeer(peer *Peer) { func (sw *Switch) doAddPeer(peer *Peer) {
for _, reactor := range sw.reactors { for _, reactor := range sw.reactors {
reactor.AddPeer(peer) reactor.AddPeer(peer)
@ -238,6 +252,27 @@ func (sw *Switch) doRemovePeer(peer *Peer, reason interface{}) {
} }
} }
func (sw *Switch) listenerRoutine(l Listener) {
for {
inConn, ok := <-l.Connections()
if !ok {
break
}
// New inbound connection!
peer, err := sw.AddPeerWithConnection(inConn, false)
if err != nil {
log.Info("Ignoring error from inbound connection: %v\n%v",
peer, err)
continue
}
// NOTE: We don't yet have the external address of the
// remote (if they have a listener at all).
// PEXReactor's pexRoutine will handle that.
}
// cleanup
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
type SwitchEventNewPeer struct { type SwitchEventNewPeer struct {


Loading…
Cancel
Save