From 3cc385e8819ad2f635ef9c30cf40b7d2bb2a75f8 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 29 Jun 2014 00:35:16 -0700 Subject: [PATCH] . --- peer/client.go | 35 +++++++++++++++++++---------------- peer/client_test.go | 21 ++++++++++++++++----- peer/connection.go | 6 +++++- peer/listener.go | 6 ++++-- peer/log.go | 22 ++++++++++++++++++++-- peer/netaddress.go | 3 ++- peer/peer.go | 5 +++++ peer/server.go | 2 ++ 8 files changed, 73 insertions(+), 27 deletions(-) diff --git a/peer/client.go b/peer/client.go index b1dd50099..9b08a48d6 100644 --- a/peer/client.go +++ b/peer/client.go @@ -15,15 +15,15 @@ import ( It can reach out to the network and establish connections with servers. A client doesn't listen for incoming connections -- that's done by the server. - peerMaker is a factory method for generating new peers from new *Connections. - peerMaker(nil) must return a prototypical peer that represents the self "peer". + makePeerFn is a factory method for generating new peers from new *Connections. + makePeerFn(nil) must return a prototypical peer that represents the self "peer". XXX what about peer disconnects? */ type Client struct { addrBook *AddrBook targetNumPeers int - peerMaker func(*Connection) *Peer + makePeerFn func(*Connection) *Peer self *Peer inQueues map[String]chan *InboundMsg @@ -38,10 +38,10 @@ var ( CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer") ) -func NewClient(peerMaker func(*Connection) *Peer) *Client { - self := peerMaker(nil) +func NewClient(makePeerFn func(*Connection) *Peer) *Client { + self := makePeerFn(nil) if self == nil { - Panicf("peerMaker(nil) must return a prototypical peer for self") + Panicf("makePeerFn(nil) must return a prototypical peer for self") } inQueues := make(map[String]chan *InboundMsg) @@ -52,7 +52,7 @@ func NewClient(peerMaker func(*Connection) *Peer) *Client { c := &Client{ addrBook: nil, // TODO targetNumPeers: 0, // TODO - peerMaker: peerMaker, + makePeerFn: makePeerFn, self: self, inQueues: inQueues, @@ -64,6 +64,7 @@ func NewClient(peerMaker func(*Connection) *Peer) *Client { } func (c *Client) Stop() { + log.Infof("Stopping client") // lock c.mtx.Lock() if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { @@ -83,7 +84,8 @@ func (c *Client) Stop() { func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) { if atomic.LoadUint32(&c.stopped) == 1 { return nil, CLIENT_STOPPED_ERROR } - peer := c.peerMaker(conn) + log.Infof("Adding peer with connection: %v, outgoing: %v", conn, outgoing) + peer := c.makePeerFn(conn) peer.outgoing = outgoing err := c.addPeer(peer) if err != nil { return nil, err } @@ -96,7 +98,7 @@ func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, func (c *Client) Broadcast(chName String, msg Msg) { if atomic.LoadUint32(&c.stopped) == 1 { return } - for v := range c.peersCopy().Values() { + for v := range c.Peers().Values() { peer := v.(*Peer) success := peer.TryQueueOut(chName , msg) if !success { @@ -121,6 +123,13 @@ func (c *Client) PopMessage(chName String) *InboundMsg { } } +func (c *Client) Peers() merkle.Tree { + // lock & defer + c.mtx.Lock(); defer c.mtx.Unlock() + return c.peers.Copy() + // unlock deferred +} + func (c *Client) StopPeer(peer *Peer) { // lock c.mtx.Lock() @@ -141,6 +150,7 @@ func (c *Client) addPeer(peer *Peer) error { c.mtx.Lock(); defer c.mtx.Unlock() if c.stopped == 1 { return CLIENT_STOPPED_ERROR } if !c.peers.Has(addr) { + log.Tracef("Actually putting addr: %v, peer: %v", addr, peer) c.peers.Put(addr, peer) return nil } else { @@ -150,10 +160,3 @@ func (c *Client) addPeer(peer *Peer) error { } // unlock deferred } - -func (c *Client) peersCopy() merkle.Tree { - // lock & defer - c.mtx.Lock(); defer c.mtx.Unlock() - return c.peers.Copy() - // unlock deferred -} diff --git a/peer/client_test.go b/peer/client_test.go index 29391940c..e971b0022 100644 --- a/peer/client_test.go +++ b/peer/client_test.go @@ -21,19 +21,30 @@ func TestConnection(t *testing.T) { c2 := NewClient(peerMaker) s1 := NewServer("tcp", ":8001", c1) + s1laddr := s1.LocalAddress() - conn, err := s1.LocalAddress().Dial() + conn, err := s1laddr.Dial() if err != nil { - t.Fatalf("Could not connect to server address %v", s1.LocalAddress()) + t.Fatalf("Could not connect to server address %v", s1laddr) + } else { + t.Logf("Created a connection to local server address %v", s1laddr) } c2.AddPeerWithConnection(conn, true) + // Wait for things to happen, peers to get added... + time.Sleep(100 * time.Millisecond) + // lets send a message from c1 to c2. - // XXX do we even want a broadcast function? - //c1.Broadcast(String(""), String("message")) - time.Sleep(500 * time.Millisecond) + if c1.Peers().Size() != 1 { + t.Errorf("Expected exactly 1 peer in c1, got %v", c1.Peers().Size()) + } + if c2.Peers().Size() != 1 { + t.Errorf("Expected exactly 1 peer in c2, got %v", c2.Peers().Size()) + } + // TODO: test the transmission of information on channels. + time.Sleep(500 * time.Millisecond) //inMsg := c2.PopMessage(String("")) s1.Stop() diff --git a/peer/connection.go b/peer/connection.go index b03311f43..348b7e91e 100644 --- a/peer/connection.go +++ b/peer/connection.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "net" "time" + "fmt" ) const ( @@ -80,6 +81,10 @@ func (c *Connection) RemoteAddress() *NetAddress { return NewNetAddress(c.conn.RemoteAddr()) } +func (c *Connection) String() string { + return fmt.Sprintf("Connection{%v}", c.conn.RemoteAddr()) +} + func (c *Connection) flush() { // TODO flush? (turn off nagel, turn back on, etc) } @@ -158,7 +163,6 @@ func (c *Connection) inHandler() { } - /* IOStats */ type IOStats struct { TimeConnected Time diff --git a/peer/listener.go b/peer/listener.go index 3911cc268..3e4f7ab5c 100644 --- a/peer/listener.go +++ b/peer/listener.go @@ -1,6 +1,7 @@ package peer import ( + . "github.com/tendermint/tendermint/common" "sync/atomic" "net" ) @@ -111,13 +112,14 @@ func GetUPNPLocalAddress() *NetAddress { // TODO: use syscalls to get actual ourIP. http://pastebin.com/9exZG4rh func GetDefaultLocalAddress() *NetAddress { addrs, err := net.InterfaceAddrs() - if err != nil { panic("Wtf") } + if err != nil { Panicf("Unexpected error fetching interface addresses: %v", err) } + for _, a := range addrs { ipnet, ok := a.(*net.IPNet) if !ok { continue } v4 := ipnet.IP.To4() if v4 == nil || v4[0] == 127 { continue } // loopback - return NewNetAddress(a) + return NewNetAddressIPPort(ipnet.IP, DEFAULT_PORT) } return nil } diff --git a/peer/log.go b/peer/log.go index 1ff56693e..4fbb1e8e9 100644 --- a/peer/log.go +++ b/peer/log.go @@ -1,7 +1,25 @@ package peer import ( - "github.com/tendermint/btclog" + "github.com/cihub/seelog" ) -var log = btclog.Disabled +var log seelog.LoggerInterface + +func init() { + // TODO: replace with configuration file in the ~/.tendermint directory. + config := ` + + + + + + + + +` + + var err error + log, err = seelog.LoggerFromConfigAsBytes([]byte(config)) + if err != nil { panic(err) } +} diff --git a/peer/netaddress.go b/peer/netaddress.go index 0011f7deb..a7d6bc140 100644 --- a/peer/netaddress.go +++ b/peer/netaddress.go @@ -5,6 +5,7 @@ package peer import ( + . "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/binary" "io" "net" @@ -21,7 +22,7 @@ type NetAddress struct { // TODO: socks proxies? func NewNetAddress(addr net.Addr) *NetAddress { tcpAddr, ok := addr.(*net.TCPAddr) - if !ok { panic("Only TCPAddrs are supported") } + if !ok { Panicf("Only TCPAddrs are supported. Got: %v", addr) } ip := tcpAddr.IP port := UInt16(tcpAddr.Port) return NewNetAddressIPPort(ip, port) diff --git a/peer/peer.go b/peer/peer.go index d4632619f..a1197628d 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -6,6 +6,7 @@ import ( "sync" "io" "time" + "fmt" ) /* Peer */ @@ -80,6 +81,10 @@ func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { return p.RemoteAddress().WriteTo(w) } +func (p *Peer) String() string { + return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing) +} + func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) { channel := p.channels[chName] inQueue := channel.InQueue() diff --git a/peer/server.go b/peer/server.go index f88bc09b8..da21be904 100644 --- a/peer/server.go +++ b/peer/server.go @@ -27,11 +27,13 @@ func (s *Server) LocalAddress() *NetAddress { // meant to run in a goroutine func (s *Server) IncomingConnectionHandler() { for conn := range s.listener.Connections() { + log.Infof("New connection found: %v", conn) s.client.AddPeerWithConnection(conn, false) } } func (s *Server) Stop() { + log.Infof("Stopping server") s.listener.Stop() s.client.Stop() }