From 1b59caf950364e12cc9f49d6588198c2d96732a5 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 9 Jul 2014 18:33:44 -0700 Subject: [PATCH] created "Node" --- common/repeat_timer.go | 6 ++- common/throttler.go | 6 ++- log.go | 2 +- main.go | 107 +++++++++++++++++++++++++++++++++++------ p2p/netaddress.go | 8 +++ p2p/peer.go | 4 ++ p2p/switch.go | 3 ++ rpc/rpc.go | 3 ++ 8 files changed, 121 insertions(+), 18 deletions(-) create mode 100644 rpc/rpc.go diff --git a/common/repeat_timer.go b/common/repeat_timer.go index 4ef1cbee4..49d26beef 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -2,7 +2,10 @@ package common import "time" -/* RepeatTimer */ +/* +RepeatTimer repeatedly sends a struct{}{} to .Ch after each "dur" period. +It's good for keeping connections alive. +*/ type RepeatTimer struct { Ch chan struct{} quit chan struct{} @@ -26,6 +29,7 @@ func (t *RepeatTimer) fireHandler() { } } +// Wait the duration again before firing. func (t *RepeatTimer) Reset() { t.timer.Reset(t.dur) } diff --git a/common/throttler.go b/common/throttler.go index 609a97455..5a378ec4a 100644 --- a/common/throttler.go +++ b/common/throttler.go @@ -5,7 +5,10 @@ import ( "time" ) -/* Throttler */ +/* +Throttler sends a struct{}{} to .Ch "dur" after the last .Set(). +It's good for ensuring that something happens last after a burst of events. +*/ type Throttler struct { Ch chan struct{} quit chan struct{} @@ -19,6 +22,7 @@ func NewThrottler(dur time.Duration) *Throttler { var quit = make(chan struct{}) var t = &Throttler{Ch: ch, dur: dur, quit: quit} t.timer = time.AfterFunc(dur, t.fireHandler) + t.timer.Stop() return t } diff --git a/log.go b/log.go index 560d009f3..9b31067df 100644 --- a/log.go +++ b/log.go @@ -16,7 +16,7 @@ func init() { - + ` diff --git a/main.go b/main.go index c8ec09a17..9136e61a3 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,21 @@ import ( "github.com/tendermint/tendermint/p2p" ) -func main() { +const ( + minNumPeers = 10 + maxNumPeers = 20 + + ensurePeersPeriodSeconds = 30 + peerDialTimeoutSeconds = 30 +) + +type Node struct { + sw *p2p.Switch + book *p2p.AddressBook + quit chan struct{} +} +func NewNode() *Node { // Define channels for our app chDescs := []p2p.ChannelDescriptor{ p2p.ChannelDescriptor{ @@ -33,43 +46,107 @@ func main() { RecvBufferSize: 1000, }, } - - // Create the switch sw := p2p.NewSwitch(chDescs) + book := p2p.NewAddrBook(config.AppDir + "/addrbook.json") - // Create a listener for incoming connections - l := p2p.NewDefaultListener("tcp", ":8001") + return &New{ + sw: sw, + book: book, + } +} + +func (n *Node) Start() { + n.sw.Start() + n.book.Start() + go p2p.PexHandler(sw, book) + go n.ensurePeersHandler(sw, book) +} + +func (n *Node) initPeer(peer *Peer) { + if peer.IsOutgoing() { + // TODO: initiate PEX + } +} + +// Add a Listener to accept incoming peer connections. +func (n *Node) AddListener(l Listener) { go func() { for { inConn, ok := <-l.Connections() if !ok { break } - peer, err := sw.AddPeerWithConnection(inConn, false) + peer, err := n.sw.AddPeerWithConnection(inConn, false) if err != nil { log.Infof("Ignoring error from incoming connection: %v\n%v", peer, err) continue } - initPeer(peer) + n.initPeer(peer) } }() +} - // Open our address book - book := p2p.NewAddrBook(config.AppDir + "/addrbook.json") +// Ensures that sufficient peers are connected. +func (n *Node) ensurePeers() { + numPeers := len(n.sw.Peers()) + if numPeers < minNumPeers { + // XXX + } +} - // Start PEX - go p2p.PexHandler(sw, book) +func (n *Node) ensurePeersHandler() { + timer := NewRepeatTimer(ensurePeersPeriodSeconds * time.Second) +FOR_LOOP: + for { + select { + case <-timer.Ch: + n.ensurePeers() + case <-n.quit: + break FOR_LOOP + } + } + + // cleanup + timer.Stop() +} + +func (n *Node) Stop() { + // TODO: gracefully disconnect from peers. + n.sw.Stop() + n.book.Stop() +} + +//----------------------------------------------------------------------------- + +func main() { + + n := NewNode() + l := p2p.NewDefaultListener("tcp", ":8001") + n.AddListener() + + if false { + // TODO remove + // let's connect to 66.175.218.199 + conn, err := p2p.NewNetAddressString("66.175.218.199:8001").Dial() + if err != nil { + log.Infof("Error connecting to it: %v", err) + return + } + peer, err := sw.AddPeerWithConnection(conn, true) + if err != nil { + log.Infof("Error adding peer with connection: %v", err) + return + } + log.Infof("Connected to peer: %v", peer) + // TODO remove + } // Sleep forever trapSignal() select {} } -func initPeer(peer *p2p.Peer) { - // TODO: ask for more peers if we need them. -} - func trapSignal() { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) diff --git a/p2p/netaddress.go b/p2p/netaddress.go index 9f47df59b..3c76ef610 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -95,6 +95,14 @@ func (na *NetAddress) Dial() (*Connection, error) { return NewConnection(conn), nil } +func (na *NetAddress) DialTimeout(timeout time.Duration) (*Connection, error) { + conn, err := net.DialTimeout("tcp", na.String(), timeout) + if err != nil { + return nil, err + } + return NewConnection(conn), nil +} + func (na *NetAddress) Routable() bool { // TODO(oga) bitcoind doesn't include RFC3849 here, but should we? return na.Valid() && !(na.RFC1918() || na.RFC3927() || na.RFC4862() || diff --git a/p2p/peer.go b/p2p/peer.go index 65008c962..039662f38 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -55,6 +55,10 @@ func (p *Peer) stop() { } } +func (p *Peer) IsOutgoing() bool { + return p.outgoing +} + func (p *Peer) LocalAddress() *NetAddress { return p.conn.LocalAddress() } diff --git a/p2p/switch.go b/p2p/switch.go index 43d9e0556..e0bd58378 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -49,6 +49,9 @@ func NewSwitch(channels []ChannelDescriptor) *Switch { return s } +func (s *Switch) Start() { +} + func (s *Switch) Stop() { log.Infof("Stopping switch") if atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { diff --git a/rpc/rpc.go b/rpc/rpc.go new file mode 100644 index 000000000..41f125650 --- /dev/null +++ b/rpc/rpc.go @@ -0,0 +1,3 @@ +package rpc + +// TODO