diff --git a/peer/client.go b/peer/client.go index bb3d91c1d..b1dd50099 100644 --- a/peer/client.go +++ b/peer/client.go @@ -15,15 +15,15 @@ import ( It can reach out to the network and establish connections with servers. A client doesn't listen for incoming connections -- that's done by the server. - newPeerCb is a factory method for generating new peers from new *Connections. - newPeerCb(nil) must return a prototypical peer that represents the self "peer". + peerMaker is a factory method for generating new peers from new *Connections. + peerMaker(nil) must return a prototypical peer that represents the self "peer". XXX what about peer disconnects? */ type Client struct { - addrBook AddrBook + addrBook *AddrBook targetNumPeers int - newPeerCb func(*Connection) *Peer + peerMaker func(*Connection) *Peer self *Peer inQueues map[String]chan *InboundMsg @@ -38,22 +38,27 @@ var ( CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer") ) -func NewClient(newPeerCb func(*Connection) *Peer) *Client { - self := newPeerCb(nil) +func NewClient(peerMaker func(*Connection) *Peer) *Client { + self := peerMaker(nil) if self == nil { - Panicf("newPeerCb(nil) must return a prototypical peer for self") + Panicf("peerMaker(nil) must return a prototypical peer for self") } inQueues := make(map[String]chan *InboundMsg) - for chName, channel := range self.channels { + for chName, _ := range self.channels { inQueues[chName] = make(chan *InboundMsg) } c := &Client{ - newPeerCb: newPeerCb, - peers: merkle.NewIAVLTree(nil), - self: self, - inQueues: inQueues, + addrBook: nil, // TODO + targetNumPeers: 0, // TODO + peerMaker: peerMaker, + self: self, + inQueues: inQueues, + + peers: merkle.NewIAVLTree(nil), + quit: make(chan struct{}), + stopped: 0, } return c } @@ -78,7 +83,7 @@ func (c *Client) Stop() { func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) { if atomic.LoadUint32(&c.stopped) == 1 { return nil, CLIENT_STOPPED_ERROR } - peer := c.newPeerCb(conn) + peer := c.peerMaker(conn) peer.outgoing = outgoing err := c.addPeer(peer) if err != nil { return nil, err } @@ -103,7 +108,6 @@ func (c *Client) Broadcast(chName String, msg Msg) { func (c *Client) PopMessage(chName String) *InboundMsg { if atomic.LoadUint32(&c.stopped) == 1 { return nil } - channel := c.self.Channel(chName) q := c.inQueues[chName] if q == nil { Panicf("Expected inQueues[%f], found none", chName) } @@ -112,10 +116,6 @@ func (c *Client) PopMessage(chName String) *InboundMsg { case <-c.quit: return nil case inMsg := <-q: - // skip if known. - if channel.Has(inMsg.Msg) { - continue - } return inMsg } } diff --git a/peer/client_test.go b/peer/client_test.go new file mode 100644 index 000000000..29391940c --- /dev/null +++ b/peer/client_test.go @@ -0,0 +1,41 @@ +package peer + +import ( + . "github.com/tendermint/tendermint/binary" + "testing" + "time" +) + +func TestConnection(t *testing.T) { + + peerMaker := func(conn *Connection) *Peer { + bufferSize := 10 + p := NewPeer(conn) + p.channels = map[String]*Channel{} + p.channels["ch1"] = NewChannel("ch1", bufferSize) + p.channels["ch2"] = NewChannel("ch2", bufferSize) + return p + } + + c1 := NewClient(peerMaker) + c2 := NewClient(peerMaker) + + s1 := NewServer("tcp", ":8001", c1) + + conn, err := s1.LocalAddress().Dial() + if err != nil { + t.Fatalf("Could not connect to server address %v", s1.LocalAddress()) + } + + c2.AddPeerWithConnection(conn, true) + + // lets send a message from c1 to c2. + // XXX do we even want a broadcast function? + //c1.Broadcast(String(""), String("message")) + time.Sleep(500 * time.Millisecond) + + //inMsg := c2.PopMessage(String("")) + + s1.Stop() + c2.Stop() +} diff --git a/peer/connection_test.go b/peer/connection_test.go deleted file mode 100644 index b124fe9db..000000000 --- a/peer/connection_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package peer - -import ( - . "github.com/tendermint/tendermint/binary" - "testing" - "time" -) - -func TestLocalConnection(t *testing.T) { - - makePeer := func(conn *Connection) *Peer { - bufferSize := 10 - p := &Peer{conn: conn} - p.channels := map[String]*Channel{} - p.channels["ch1"] = NewChannel("ch1", bufferSize) - p.channels["ch2"] = NewChannel("ch2", bufferSize) - return p - } - - c1 := NewClient(makePeer) - c2 := NewClient(makePeer) - - s1 := NewServer("tcp", "127.0.0.1:8001", c1) - - c2.ConnectTo(c1.LocalAddress()) - - // lets send a message from c1 to c2. - // XXX do we even want a broadcast function? - c1.Broadcast(String(""), String("message")) - time.Sleep(500 * time.Millisecond) - - inMsg := c2.PopMessage(String("")) - - c1.Stop() - c2.Stop() -} diff --git a/peer/listener.go b/peer/listener.go index a314c599f..3911cc268 100644 --- a/peer/listener.go +++ b/peer/listener.go @@ -101,7 +101,7 @@ func GetUPNPLocalAddress() *NetAddress { ext, err := nat.GetExternalAddress() if err != nil { return nil } - _, err := nat.AddPortMapping("tcp", DEFAULT_PORT, DEFAULT_PORT, "tendermint", 0) + _, err = nat.AddPortMapping("tcp", DEFAULT_PORT, DEFAULT_PORT, "tendermint", 0) if err != nil { return nil } return NewNetAddressIPPort(ext, DEFAULT_PORT) diff --git a/peer/msg.go b/peer/msg.go index bf6642e8f..fed5c5c3f 100644 --- a/peer/msg.go +++ b/peer/msg.go @@ -27,7 +27,7 @@ type InboundMsg struct { type NewFilterMsg struct { ChName String - Filter Filter + Filter interface{} // todo } func (m *NewFilterMsg) WriteTo(w io.Writer) (int64, error) { diff --git a/peer/peer.go b/peer/peer.go index 8c84c0350..d4632619f 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -20,6 +20,14 @@ type Peer struct { stopped uint32 } +func NewPeer(conn *Connection) *Peer { + return &Peer{ + conn: conn, + quit: make(chan struct{}), + stopped: 0, + } +} + func (p *Peer) Start(peerInQueues map[String]chan *InboundMsg ) { for chName, _ := range p.channels { go p.inHandler(chName, peerInQueues[chName]) @@ -134,7 +142,7 @@ func NewChannel(name string, bufferSize int) *Channel { return &Channel{ name: String(name), inQueue: make(chan Msg, bufferSize), - outQueue: make(chan Msg, buffersize), + outQueue: make(chan Msg, bufferSize), } } diff --git a/peer/server.go b/peer/server.go index 65ea30f72..f88bc09b8 100644 --- a/peer/server.go +++ b/peer/server.go @@ -1,8 +1,6 @@ package peer import ( - "sync/atomic" - "net" ) /* Server */ @@ -13,7 +11,7 @@ type Server struct { } func NewServer(protocol string, laddr string, c *Client) *Server { - l := NewListener(protocol, laddr) + l := NewDefaultListener(protocol, laddr) s := &Server{ listener: l, client: c,