From a4d2dc78972ca262b3784283556f93f224e9416e Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 7 Jul 2014 19:25:19 -0700 Subject: [PATCH] Client -> Switch. Everything is clear now :) --- peer/client.go | 209 -------------------------------------------- peer/client_test.go | 131 --------------------------- peer/connection.go | 6 +- peer/peer.go | 34 ++++--- peer/pex.go | 10 +-- peer/server.go | 36 -------- peer/switch.go | 197 +++++++++++++++++++++++++++++++++++++++++ peer/switch_test.go | 137 +++++++++++++++++++++++++++++ 8 files changed, 363 insertions(+), 397 deletions(-) delete mode 100644 peer/client.go delete mode 100644 peer/client_test.go delete mode 100644 peer/server.go create mode 100644 peer/switch.go create mode 100644 peer/switch_test.go diff --git a/peer/client.go b/peer/client.go deleted file mode 100644 index 029bd703e..000000000 --- a/peer/client.go +++ /dev/null @@ -1,209 +0,0 @@ -package peer - -import ( - "errors" - "sync" - "sync/atomic" - - . "github.com/tendermint/tendermint/binary" - . "github.com/tendermint/tendermint/common" - "github.com/tendermint/tendermint/merkle" -) - -/* -A client is half of a p2p system. -It can reach out to the network and establish connections with other peers. -A client doesn't listen for incoming connections -- that's done by the server. - -All communication amongst peers are multiplexed by "channels". -(Not the same as Go "channels") - -To send a message, encapsulate it into a "Packet" and send it to each peer. -You can find all connected and active peers by iterating over ".Peers()". -".Broadcast()" is provided for convenience, but by iterating over -the peers manually the caller can decide which subset receives a message. - -Incoming messages are received by calling ".Receive()". -*/ -type Client struct { - addrBook *AddrBook - targetNumPeers int - makePeerFn func(*Connection) *Peer - self *Peer - pktRecvQueues map[String]chan *InboundPacket - peersMtx sync.Mutex - peers merkle.Tree // addr -> *Peer - quit chan struct{} - stopped uint32 -} - -var ( - ErrClientStopped = errors.New("Client already stopped") - ErrClientDuplicatePeer = errors.New("Duplicate peer") -) - -// "makePeerFn" is a factory method for generating new peers from new *Connections. -// "makePeerFn(nil)" must return a prototypical peer that represents the self "peer". -func NewClient(makePeerFn func(*Connection) *Peer) *Client { - self := makePeerFn(nil) - if self == nil { - Panicf("makePeerFn(nil) must return a prototypical peer for self") - } - - pktRecvQueues := make(map[String]chan *InboundPacket) - for chName, _ := range self.channels { - pktRecvQueues[chName] = make(chan *InboundPacket) - } - - c := &Client{ - addrBook: nil, // TODO - targetNumPeers: 0, // TODO - makePeerFn: makePeerFn, - self: self, - pktRecvQueues: pktRecvQueues, - peers: merkle.NewIAVLTree(nil), - quit: make(chan struct{}), - stopped: 0, - } - - // automatically start - c.start() - - return c -} - -func (c *Client) start() { - // Handle PEX messages - // TODO: hmm - // go peerExchangeHandler(c) -} - -func (c *Client) Stop() { - log.Infof("Stopping client") - // lock - c.peersMtx.Lock() - if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { - close(c.quit) - // stop each peer. - for peerValue := range c.peers.Values() { - peer := peerValue.(*Peer) - peer.stop() - } - // empty tree. - c.peers = merkle.NewIAVLTree(nil) - } - c.peersMtx.Unlock() - // unlock -} - -func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) { - if atomic.LoadUint32(&c.stopped) == 1 { - return nil, ErrClientStopped - } - - log.Infof("Adding peer with connection: %v, outgoing: %v", conn, outgoing) - peer := c.makePeerFn(conn) - peer.outgoing = outgoing - err := c.addPeer(peer) - if err != nil { - return nil, err - } - - go peer.start(c.pktRecvQueues, c.StopPeerForError) - - return peer, nil -} - -func (c *Client) Broadcast(pkt Packet) (numSuccess, numFailure int) { - if atomic.LoadUint32(&c.stopped) == 1 { - return - } - - log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes)) - for v := range c.peers.Values() { - peer := v.(*Peer) - success := peer.TryQueue(pkt) - log.Tracef("Broadcast for peer %v success: %v", peer, success) - if success { - numSuccess += 1 - } else { - numFailure += 1 - } - } - return - -} - -/* -Receive blocks on a channel until a message is found. -*/ -func (c *Client) Receive(chName String) *InboundPacket { - if atomic.LoadUint32(&c.stopped) == 1 { - return nil - } - - log.Tracef("Receive on [%v]", chName) - q := c.pktRecvQueues[chName] - if q == nil { - Panicf("Expected pktRecvQueues[%f], found none", chName) - } - - select { - case <-c.quit: - return nil - case inPacket := <-q: - return inPacket - } -} - -func (c *Client) Peers() merkle.Tree { - // lock & defer - c.peersMtx.Lock() - defer c.peersMtx.Unlock() - return c.peers.Copy() - // unlock deferred -} - -// Disconnect from a peer due to external error. -// TODO: make record depending on reason. -func (c *Client) StopPeerForError(peer *Peer, reason interface{}) { - log.Infof("%v errored: %v", peer, reason) - c.StopPeer(peer, false) -} - -// Disconnect from a peer. -// If graceful is true, last message sent is a disconnect message. -// TODO: handle graceful disconnects. -func (c *Client) StopPeer(peer *Peer, graceful bool) { - // lock - c.peersMtx.Lock() - peerValue, _ := c.peers.Remove(peer.RemoteAddress()) - c.peersMtx.Unlock() - // unlock - - peer_ := peerValue.(*Peer) - if peer_ != nil { - peer_.stop() - } -} - -func (c *Client) addPeer(peer *Peer) error { - addr := peer.RemoteAddress() - - // lock & defer - c.peersMtx.Lock() - defer c.peersMtx.Unlock() - if c.stopped == 1 { - return ErrClientStopped - } - if !c.peers.Has(addr) { - log.Tracef("Actually putting addr: %v, peer: %v", addr, peer) - c.peers.Put(addr, peer) - return nil - } else { - // ignore duplicate peer for addr. - log.Infof("Ignoring duplicate peer for addr %v", addr) - return ErrClientDuplicatePeer - } - // unlock deferred -} diff --git a/peer/client_test.go b/peer/client_test.go deleted file mode 100644 index 69b0ba053..000000000 --- a/peer/client_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package peer - -import ( - "testing" - "time" - - . "github.com/tendermint/tendermint/binary" -) - -// convenience method for creating two clients connected to each other. -func makeClientPair(t testing.TB, bufferSize int, chNames []String) (*Client, *Client) { - - peerMaker := func(conn *Connection) *Peer { - channels := map[String]*Channel{} - for _, chName := range chNames { - channels[chName] = NewChannel(chName, bufferSize) - } - return NewPeer(conn, channels) - } - - // Create two clients that will be interconnected. - c1 := NewClient(peerMaker) - c2 := NewClient(peerMaker) - - // Create a server for the listening client. - s1 := NewServer("tcp", ":8001", c1) - - // Dial the server & add the connection to c2. - s1laddr := s1.ExternalAddress() - conn, err := s1laddr.Dial() - if err != nil { - t.Fatalf("Could not connect to server address %v", s1laddr) - } else { - t.Logf("Created a connection to local server address %v", s1laddr) - } - - c2.AddPeerWithConnection(conn, true) - - // Wait for things to happen, peers to get added... - time.Sleep(100 * time.Millisecond) - - // Close the server, no longer needed. - s1.Stop() - - return c1, c2 -} - -func TestClients(t *testing.T) { - - channels := []String{"ch1", "ch2", "ch3", "ch4", "ch5", "ch6", "ch7", "ch8", "ch9", "ch0"} - c1, c2 := makeClientPair(t, 10, channels) - defer c1.Stop() - defer c2.Stop() - - // Lets send a message from c1 to c2. - if c1.Peers().Size() != 1 { - t.Errorf("Expected exactly 1 peer in c1, got %v", c1.Peers().Size()) - } - if c2.Peers().Size() != 1 { - t.Errorf("Expected exactly 1 peer in c2, got %v", c2.Peers().Size()) - } - - // Broadcast a message on ch1 - c1.Broadcast(NewPacket("ch1", ByteSlice("channel one"))) - // Broadcast a message on ch2 - c1.Broadcast(NewPacket("ch2", ByteSlice("channel two"))) - // Broadcast a message on ch3 - c1.Broadcast(NewPacket("ch3", ByteSlice("channel three"))) - - // Wait for things to settle... - time.Sleep(100 * time.Millisecond) - - // Receive message from channel 2 and check - inMsg := c2.Receive("ch2") - if string(inMsg.Bytes) != "channel two" { - t.Errorf("Unexpected received message bytes: %v", string(inMsg.Bytes)) - } - - // Receive message from channel 1 and check - inMsg = c2.Receive("ch1") - if string(inMsg.Bytes) != "channel one" { - t.Errorf("Unexpected received message bytes: %v", string(inMsg.Bytes)) - } -} - -func BenchmarkClients(b *testing.B) { - - b.StopTimer() - - channels := []String{"ch1", "ch2", "ch3", "ch4", "ch5", "ch6", "ch7", "ch8", "ch9", "ch0"} - c1, c2 := makeClientPair(b, 10, channels) - defer c1.Stop() - defer c2.Stop() - - // Create a sink on either channel to just pop off messages. - recvHandler := func(c *Client, chName String) { - for { - it := c.Receive(chName) - if it == nil { - break - } - } - } - - for _, chName := range channels { - go recvHandler(c1, chName) - go recvHandler(c2, chName) - } - - // Allow time for goroutines to boot up - time.Sleep(1000 * time.Millisecond) - b.StartTimer() - - numSuccess, numFailure := 0, 0 - - // Send random message from one channel to another - for i := 0; i < b.N; i++ { - chName := channels[i%len(channels)] - pkt := NewPacket(chName, ByteSlice("test data")) - nS, nF := c1.Broadcast(pkt) - numSuccess += nS - numFailure += nF - } - - log.Warnf("success: %v, failure: %v", numSuccess, numFailure) - - // Allow everything to flush before stopping clients & closing connections. - b.StopTimer() - time.Sleep(1000 * time.Millisecond) - -} diff --git a/peer/connection.go b/peer/connection.go index cd218ac2e..e743f757d 100644 --- a/peer/connection.go +++ b/peer/connection.go @@ -36,7 +36,7 @@ type Connection struct { quit chan struct{} pingRepeatTimer *RepeatTimer pong chan struct{} - channels map[String]*Channel + channels map[string]*Channel onError func(interface{}) started uint32 stopped uint32 @@ -64,7 +64,7 @@ func NewConnection(conn net.Conn) *Connection { // .Start() begins multiplexing packets to and from "channels". // If an error occurs, the recovered reason is passed to "onError". -func (c *Connection) Start(channels map[String]*Channel, onError func(interface{})) { +func (c *Connection) Start(channels map[string]*Channel, onError func(interface{})) { log.Debugf("Starting %v", c) if atomic.CompareAndSwapUint32(&c.started, 0, 1) { c.channels = channels @@ -215,7 +215,7 @@ FOR_LOOP: } break FOR_LOOP } - channel := c.channels[pkt.Channel] + channel := c.channels[string(pkt.Channel)] if channel == nil { Panicf("Unknown channel %v", pkt.Channel) } diff --git a/peer/peer.go b/peer/peer.go index f5c71626f..2dfed5962 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -14,13 +14,13 @@ import ( type Peer struct { outgoing bool conn *Connection - channels map[String]*Channel + channels map[string]*Channel quit chan struct{} started uint32 stopped uint32 } -func NewPeer(conn *Connection, channels map[String]*Channel) *Peer { +func newPeer(conn *Connection, channels map[string]*Channel) *Peer { return &Peer{ conn: conn, channels: channels, @@ -29,7 +29,7 @@ func NewPeer(conn *Connection, channels map[String]*Channel) *Peer { } } -func (p *Peer) start(pktRecvQueues map[String]chan *InboundPacket, onPeerError func(*Peer, interface{})) { +func (p *Peer) start(pktRecvQueues map[string]chan *InboundPacket, onPeerError func(*Peer, interface{})) { log.Debugf("Starting %v", p) if atomic.CompareAndSwapUint32(&p.started, 0, 1) { @@ -63,14 +63,14 @@ func (p *Peer) RemoteAddress() *NetAddress { return p.conn.RemoteAddress() } -func (p *Peer) Channel(chName String) *Channel { +func (p *Peer) Channel(chName string) *Channel { return p.channels[chName] } // TryQueue returns true if the packet was successfully queued. // Returning true does not imply that the packet will be sent. func (p *Peer) TryQueue(pkt Packet) bool { - channel := p.Channel(pkt.Channel) + channel := p.Channel(string(pkt.Channel)) sendQueue := channel.sendQueue if atomic.LoadUint32(&p.stopped) == 1 { @@ -96,7 +96,7 @@ func (p *Peer) String() string { // sendHandler pulls from a channel and pushes to the connection. // Each channel gets its own sendHandler goroutine; // Golang's channel implementation handles the scheduling. -func (p *Peer) sendHandler(chName String) { +func (p *Peer) sendHandler(chName string) { log.Tracef("%v sendHandler [%v]", p, chName) channel := p.channels[chName] sendQueue := channel.sendQueue @@ -122,7 +122,7 @@ FOR_LOOP: // Each channel gets its own recvHandler goroutine. // Many peers have goroutines that push to the same pktRecvQueue. // Golang's channel implementation handles the scheduling. -func (p *Peer) recvHandler(chName String, pktRecvQueue chan<- *InboundPacket) { +func (p *Peer) recvHandler(chName string, pktRecvQueue chan<- *InboundPacket) { log.Tracef("%v recvHandler [%v]", p, chName) channel := p.channels[chName] recvQueue := channel.recvQueue @@ -153,24 +153,32 @@ FOR_LOOP: // (none) } +/* ChannelDescriptor */ + +type ChannelDescriptor struct { + Name string + SendBufferSize int + RecvBufferSize int +} + /* Channel */ type Channel struct { - name String + name string recvQueue chan Packet sendQueue chan Packet //stats Stats } -func NewChannel(name String, bufferSize int) *Channel { +func newChannel(desc ChannelDescriptor) *Channel { return &Channel{ - name: name, - recvQueue: make(chan Packet, bufferSize), - sendQueue: make(chan Packet, bufferSize), + name: desc.Name, + recvQueue: make(chan Packet, desc.RecvBufferSize), + sendQueue: make(chan Packet, desc.SendBufferSize), } } -func (c *Channel) Name() String { +func (c *Channel) Name() string { return c.name } diff --git a/peer/pex.go b/peer/pex.go index a391f470d..b7507a0bd 100644 --- a/peer/pex.go +++ b/peer/pex.go @@ -12,10 +12,10 @@ var pexErrInvalidMessage = errors.New("Invalid PEX message") const pexCh = "PEX" -func peerExchangeHandler(c *Client) { +func peerExchangeHandler(s *Switch, addrBook *AddrBook) { for { - inPkt := c.Receive(pexCh) // {Peer, Time, Packet} + inPkt := s.Receive(pexCh) // {Peer, Time, Packet} if inPkt == nil { // Client has stopped break @@ -28,7 +28,7 @@ func peerExchangeHandler(c *Client) { case *pexRequestMessage: // inPkt.Peer requested some peers. // TODO: prevent abuse. - addrs := c.addrBook.GetSelection() + addrs := addrBook.GetSelection() response := &pexResponseMessage{Addrs: addrs} pkt := NewPacket(pexCh, BinaryBytes(response)) queued := inPkt.Peer.TryQueue(pkt) @@ -41,11 +41,11 @@ func peerExchangeHandler(c *Client) { // (We don't want to get spammed with bad peers) srcAddr := inPkt.Peer.RemoteAddress() for _, addr := range msg.(*pexResponseMessage).Addrs { - c.addrBook.AddAddress(addr, srcAddr) + addrBook.AddAddress(addr, srcAddr) } default: // Bad peer. - c.StopPeerForError(inPkt.Peer, pexErrInvalidMessage) + s.StopPeerForError(inPkt.Peer, pexErrInvalidMessage) } } diff --git a/peer/server.go b/peer/server.go deleted file mode 100644 index f6fd032f0..000000000 --- a/peer/server.go +++ /dev/null @@ -1,36 +0,0 @@ -package peer - -/* Server */ - -type Server struct { - listener Listener - client *Client -} - -func NewServer(protocol string, laddr string, c *Client) *Server { - l := NewDefaultListener(protocol, laddr) - s := &Server{ - listener: l, - client: c, - } - go s.IncomingConnectionHandler() - return s -} - -func (s *Server) ExternalAddress() *NetAddress { - return s.listener.ExternalAddress() -} - -// meant to run in a goroutine -func (s *Server) IncomingConnectionHandler() { - for conn := range s.listener.Connections() { - log.Infof("New connection found: %v", conn) - s.client.AddPeerWithConnection(conn, false) - } -} - -// stops the server, not the client. -func (s *Server) Stop() { - log.Infof("Stopping server") - s.listener.Stop() -} diff --git a/peer/switch.go b/peer/switch.go new file mode 100644 index 000000000..d07b756fa --- /dev/null +++ b/peer/switch.go @@ -0,0 +1,197 @@ +package peer + +import ( + "errors" + "sync" + "sync/atomic" + + . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/merkle" +) + +/* +All communication amongst peers are multiplexed by "channels". +(Not the same as Go "channels") + +To send a message, encapsulate it into a "Packet" and send it to each peer. +You can find all connected and active peers by iterating over ".Peers()". +".Broadcast()" is provided for convenience, but by iterating over +the peers manually the caller can decide which subset receives a message. + +Incoming messages are received by calling ".Receive()". +*/ +type Switch struct { + channels []ChannelDescriptor + pktRecvQueues map[string]chan *InboundPacket + peersMtx sync.Mutex + peers merkle.Tree // addr -> *Peer + quit chan struct{} + stopped uint32 +} + +var ( + ErrSwitchStopped = errors.New("Switch already stopped") + ErrSwitchDuplicatePeer = errors.New("Duplicate peer") +) + +func NewSwitch(channels []ChannelDescriptor) *Switch { + // make pktRecvQueues... + pktRecvQueues := make(map[string]chan *InboundPacket) + for _, chDesc := range channels { + pktRecvQueues[chDesc.Name] = make(chan *InboundPacket) + } + + s := &Switch{ + channels: channels, + pktRecvQueues: pktRecvQueues, + peers: merkle.NewIAVLTree(nil), + quit: make(chan struct{}), + stopped: 0, + } + + // automatically start + s.start() + + return s +} + +func (s *Switch) start() { + // Handle PEX messages + // TODO: hmm + // go peerExchangeHandler(c) +} + +func (s *Switch) Stop() { + log.Infof("Stopping switch") + // lock + s.peersMtx.Lock() + if atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { + close(s.quit) + // stop each peer. + for peerValue := range s.peers.Values() { + peer := peerValue.(*Peer) + peer.stop() + } + // empty tree. + s.peers = merkle.NewIAVLTree(nil) + } + s.peersMtx.Unlock() + // unlock +} + +func (s *Switch) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) { + if atomic.LoadUint32(&s.stopped) == 1 { + return nil, ErrSwitchStopped + } + + log.Infof("Adding peer with connection: %v, outgoing: %v", conn, outgoing) + // Create channels for peer + channels := map[string]*Channel{} + for _, chDesc := range s.channels { + channels[chDesc.Name] = newChannel(chDesc) + } + peer := newPeer(conn, channels) + peer.outgoing = outgoing + err := s.addPeer(peer) + if err != nil { + return nil, err + } + + go peer.start(s.pktRecvQueues, s.StopPeerForError) + + return peer, nil +} + +func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) { + if atomic.LoadUint32(&s.stopped) == 1 { + return + } + + log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes)) + for v := range s.peers.Values() { + peer := v.(*Peer) + success := peer.TryQueue(pkt) + log.Tracef("Broadcast for peer %v success: %v", peer, success) + if success { + numSuccess += 1 + } else { + numFailure += 1 + } + } + return + +} + +/* +Receive blocks on a channel until a message is found. +*/ +func (s *Switch) Receive(chName string) *InboundPacket { + if atomic.LoadUint32(&s.stopped) == 1 { + return nil + } + + log.Tracef("Receive on [%v]", chName) + q := s.pktRecvQueues[chName] + if q == nil { + Panicf("Expected pktRecvQueues[%f], found none", chName) + } + + select { + case <-s.quit: + return nil + case inPacket := <-q: + return inPacket + } +} + +func (s *Switch) Peers() merkle.Tree { + // lock & defer + s.peersMtx.Lock() + defer s.peersMtx.Unlock() + return s.peers.Copy() + // unlock deferred +} + +// Disconnect from a peer due to external error. +// TODO: make record depending on reason. +func (s *Switch) StopPeerForError(peer *Peer, reason interface{}) { + log.Infof("%v errored: %v", peer, reason) + s.StopPeer(peer, false) +} + +// Disconnect from a peer. +// If graceful is true, last message sent is a disconnect message. +// TODO: handle graceful disconnects. +func (s *Switch) StopPeer(peer *Peer, graceful bool) { + // lock + s.peersMtx.Lock() + peerValue, _ := s.peers.Remove(peer.RemoteAddress()) + s.peersMtx.Unlock() + // unlock + + peer_ := peerValue.(*Peer) + if peer_ != nil { + peer_.stop() + } +} + +func (s *Switch) addPeer(peer *Peer) error { + addr := peer.RemoteAddress() + + // lock & defer + s.peersMtx.Lock() + defer s.peersMtx.Unlock() + if s.stopped == 1 { + return ErrSwitchStopped + } + if !s.peers.Has(addr) { + log.Tracef("Actually putting addr: %v, peer: %v", addr, peer) + s.peers.Put(addr, peer) + return nil + } else { + // ignore duplicate peer for addr. + log.Infof("Ignoring duplicate peer for addr %v", addr) + return ErrSwitchDuplicatePeer + } + // unlock deferred +} diff --git a/peer/switch_test.go b/peer/switch_test.go new file mode 100644 index 000000000..f2faa6bfe --- /dev/null +++ b/peer/switch_test.go @@ -0,0 +1,137 @@ +package peer + +import ( + "testing" + "time" + + . "github.com/tendermint/tendermint/binary" +) + +// convenience method for creating two switches connected to each other. +func makeSwitchPair(t testing.TB, bufferSize int, chNames []string) (*Switch, *Switch) { + + chDescs := []ChannelDescriptor{} + for _, chName := range chNames { + chDescs = append(chDescs, ChannelDescriptor{ + Name: chName, + SendBufferSize: bufferSize, + RecvBufferSize: bufferSize, + }) + } + + // Create two switches that will be interconnected. + s1 := NewSwitch(chDescs) + s2 := NewSwitch(chDescs) + + // Create a listener for s1 + l := NewDefaultListener("tcp", ":8001") + + // Dial the listener & add the connection to s2. + lAddr := l.ExternalAddress() + connOut, err := lAddr.Dial() + if err != nil { + t.Fatalf("Could not connect to listener address %v", lAddr) + } else { + t.Logf("Created a connection to listener address %v", lAddr) + } + connIn, ok := <-l.Connections() + if !ok { + t.Fatalf("Could not get incoming connection from listener") + } + + s1.AddPeerWithConnection(connIn, false) + s2.AddPeerWithConnection(connOut, true) + + // Wait for things to happen, peers to get added... + time.Sleep(100 * time.Millisecond) + + // Close the server, no longer needed. + l.Stop() + + return s1, s2 +} + +func TestSwitches(t *testing.T) { + + channels := []string{"ch1", "ch2", "ch3", "ch4", "ch5", "ch6", "ch7", "ch8", "ch9", "ch0"} + s1, s2 := makeSwitchPair(t, 10, channels) + defer s1.Stop() + defer s2.Stop() + + // Lets send a message from s1 to s2. + if s1.Peers().Size() != 1 { + t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size()) + } + if s2.Peers().Size() != 1 { + t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size()) + } + + // Broadcast a message on ch1 + s1.Broadcast(NewPacket("ch1", ByteSlice("channel one"))) + // Broadcast a message on ch2 + s1.Broadcast(NewPacket("ch2", ByteSlice("channel two"))) + // Broadcast a message on ch3 + s1.Broadcast(NewPacket("ch3", ByteSlice("channel three"))) + + // Wait for things to settle... + time.Sleep(100 * time.Millisecond) + + // Receive message from channel 2 and check + inMsg := s2.Receive("ch2") + if string(inMsg.Bytes) != "channel two" { + t.Errorf("Unexpected received message bytes: %v", string(inMsg.Bytes)) + } + + // Receive message from channel 1 and check + inMsg = s2.Receive("ch1") + if string(inMsg.Bytes) != "channel one" { + t.Errorf("Unexpected received message bytes: %v", string(inMsg.Bytes)) + } +} + +func BenchmarkSwitches(b *testing.B) { + + b.StopTimer() + + channels := []string{"ch1", "ch2", "ch3", "ch4", "ch5", "ch6", "ch7", "ch8", "ch9", "ch0"} + s1, s2 := makeSwitchPair(b, 10, channels) + defer s1.Stop() + defer s2.Stop() + + // Create a sink on either channel to just pop off messages. + recvHandler := func(c *Switch, chName string) { + for { + it := c.Receive(chName) + if it == nil { + break + } + } + } + + for _, chName := range channels { + go recvHandler(s1, chName) + go recvHandler(s2, chName) + } + + // Allow time for goroutines to boot up + time.Sleep(1000 * time.Millisecond) + b.StartTimer() + + numSuccess, numFailure := 0, 0 + + // Send random message from one channel to another + for i := 0; i < b.N; i++ { + chName := channels[i%len(channels)] + pkt := NewPacket(String(chName), ByteSlice("test data")) + nS, nF := s1.Broadcast(pkt) + numSuccess += nS + numFailure += nF + } + + log.Warnf("success: %v, failure: %v", numSuccess, numFailure) + + // Allow everything to flush before stopping switches & closing connections. + b.StopTimer() + time.Sleep(1000 * time.Millisecond) + +}