diff --git a/peer/client_test.go b/peer/client_test.go index 424268ee1..746bbda70 100644 --- a/peer/client_test.go +++ b/peer/client_test.go @@ -6,23 +6,27 @@ import ( "time" ) -func TestConnection(t *testing.T) { +// convenience method for creating two clients connected to each other. +func makeClientPair(t *testing.T, bufferSize int, channels []string) (*Client, *Client) { peerMaker := func(conn *Connection) *Peer { - bufferSize := 10 p := NewPeer(conn) p.channels = map[String]*Channel{} - p.channels["ch1"] = NewChannel("ch1", bufferSize) - p.channels["ch2"] = NewChannel("ch2", bufferSize) + for chName := range channels { + p.channels[String(chName)] = NewChannel(String(chName), bufferSize) + } return p } + // Create two clients that will be interconnected. c1 := NewClient(peerMaker) c2 := NewClient(peerMaker) + // Create a server for the listening client. s1 := NewServer("tcp", ":8001", c1) - s1laddr := s1.LocalAddress() + // Dial the server & add the connection to c2. + s1laddr := s1.LocalAddress() conn, err := s1laddr.Dial() if err != nil { t.Fatalf("Could not connect to server address %v", s1laddr) @@ -35,7 +39,14 @@ func TestConnection(t *testing.T) { // Wait for things to happen, peers to get added... time.Sleep(100 * time.Millisecond) - // lets send a message from c1 to c2. + return c1, c2 +} + +func TestClients(t *testing.T) { + + c1, c2 := makeClientPair(t, 10, []string{"ch1", "ch2", "ch3"}) + + // Lets send a message from c1 to c2. if c1.Peers().Size() != 1 { t.Errorf("Expected exactly 1 peer in c1, got %v", c1.Peers().Size()) } @@ -43,13 +54,53 @@ func TestConnection(t *testing.T) { t.Errorf("Expected exactly 1 peer in c2, got %v", c2.Peers().Size()) } - // TODO: test the transmission of information on channels. - c1.Broadcast(NewPacket("ch1", ByteSlice("test data"))) + // Broadcast a message on ch1 + c1.Broadcast(NewPacket("ch1", ByteSlice("channel one"))) + // Broadcast a message on ch2 + c1.Broadcast(NewPacket("ch2", ByteSlice("channel two"))) + // Broadcast a message on ch3 + c1.Broadcast(NewPacket("ch3", ByteSlice("channel three"))) + + // Wait for things to settle... time.Sleep(100 * time.Millisecond) - inMsg := c2.Receive("ch1") - t.Logf("c2 popped message: %v", inMsg) + // Receive message from channel 2 and check + inMsg := c2.Receive("ch2") + if string(inMsg.Bytes) != "channel two" { + t.Errorf("Unexpected received message bytes: %v", string(inMsg.Bytes)) + } + + // Receive message from channel 1 and check + inMsg = c2.Receive("ch1") + if string(inMsg.Bytes) != "channel one" { + t.Errorf("Unexpected received message bytes: %v", string(inMsg.Bytes)) + } s1.Stop() c2.Stop() } + + +func BenchmarkClients(b *testing.B) { + + b.StopTimer() + + // TODO: benchmark the random functions, which is faster? + + c1, c2 := makeClientPair(t, 10, []string{"ch1", "ch2", "ch3"}) + + // Create a sink on either channel to just pop off messages. + // TODO: ensure that when clients stop, this goroutine stops. + func recvHandler(c *Client) { + } + + go recvHandler(c1) + go recvHandler(c2) + + b.StartTimer() + + // Send random message from one channel to another + for i := 0; i < b.N; i++ { + } + +} diff --git a/peer/connection.go b/peer/connection.go index 756e77ddb..91ed5a743 100644 --- a/peer/connection.go +++ b/peer/connection.go @@ -92,7 +92,7 @@ func (c *Connection) flush() { } func (c *Connection) sendHandler() { - log.Tracef("Connection %v sendHandler", c) + log.Tracef("%v sendHandler", c) // TODO: catch panics & stop connection. @@ -114,7 +114,7 @@ func (c *Connection) sendHandler() { } if err != nil { - log.Infof("Connection %v failed @ sendHandler:\n%v", c, err) + log.Infof("%v failed @ sendHandler:\n%v", c, err) c.Stop() break FOR_LOOP } @@ -122,12 +122,12 @@ func (c *Connection) sendHandler() { c.flush() } - log.Tracef("Connection %v sendHandler done", c) + log.Tracef("%v sendHandler done", c) // cleanup } func (c *Connection) recvHandler(channels map[String]*Channel) { - log.Tracef("Connection %v recvHandler with %v channels", c, len(channels)) + log.Tracef("%v recvHandler with %v channels", c, len(channels)) // TODO: catch panics & stop connection. @@ -136,7 +136,7 @@ func (c *Connection) recvHandler(channels map[String]*Channel) { pktType, err := ReadUInt8Safe(c.conn) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { - log.Infof("Connection %v failed @ recvHandler", c) + log.Infof("%v failed @ recvHandler", c) c.Stop() } break FOR_LOOP @@ -153,7 +153,7 @@ func (c *Connection) recvHandler(channels map[String]*Channel) { pkt, err := ReadPacketSafe(c.conn) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { - log.Infof("Connection %v failed @ recvHandler", c) + log.Infof("%v failed @ recvHandler", c) c.Stop() } break FOR_LOOP @@ -170,7 +170,7 @@ func (c *Connection) recvHandler(channels map[String]*Channel) { c.pingDebouncer.Reset() } - log.Tracef("Connection %v recvHandler done", c) + log.Tracef("%v recvHandler done", c) // cleanup close(c.pong) for _ = range c.pong { diff --git a/peer/peer.go b/peer/peer.go index cbd7016fe..c9e9a16da 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -89,7 +89,7 @@ func (p *Peer) String() string { } func (p *Peer) recvHandler(chName String, inboundPacketQueue chan<- *InboundPacket) { - log.Tracef("Peer %v recvHandler [%v]", p, chName) + log.Tracef("%v recvHandler [%v]", p, chName) channel := p.channels[chName] recvQueue := channel.RecvQueue() @@ -115,13 +115,13 @@ func (p *Peer) recvHandler(chName String, inboundPacketQueue chan<- *InboundPack } } - log.Tracef("Peer %v recvHandler [%v] closed", p, chName) + log.Tracef("%v recvHandler [%v] closed", p, chName) // cleanup // (none) } func (p *Peer) sendHandler(chName String) { - log.Tracef("Peer %v sendHandler [%v]", p, chName) + log.Tracef("%v sendHandler [%v]", p, chName) chSendQueue := p.channels[chName].sendQueue FOR_LOOP: for { @@ -136,7 +136,7 @@ func (p *Peer) sendHandler(chName String) { } } - log.Tracef("Peer %v sendHandler [%v] closed", p, chName) + log.Tracef("%v sendHandler [%v] closed", p, chName) // cleanup // (none) }