Browse Source

writing client_test for round-trip messages.

pull/9/head
Jae Kwon 11 years ago
parent
commit
e52166017d
4 changed files with 47 additions and 20 deletions
  1. +9
    -5
      peer/client.go
  2. +6
    -3
      peer/client_test.go
  3. +15
    -3
      peer/connection.go
  4. +17
    -9
      peer/peer.go

+ 9
- 5
peer/client.go View File

@ -2,7 +2,6 @@ package peer
import (
. "github.com/tendermint/tendermint/common"
. "github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/merkle"
"sync/atomic"
"sync"
@ -25,7 +24,7 @@ type Client struct {
targetNumPeers int
makePeerFn func(*Connection) *Peer
self *Peer
inQueues map[String]chan *InboundMsg
inQueues map[string]chan *InboundMsg
mtx sync.Mutex
peers merkle.Tree // addr -> *Peer
@ -44,7 +43,7 @@ func NewClient(makePeerFn func(*Connection) *Peer) *Client {
Panicf("makePeerFn(nil) must return a prototypical peer for self")
}
inQueues := make(map[String]chan *InboundMsg)
inQueues := make(map[string]chan *InboundMsg)
for chName, _ := range self.channels {
inQueues[chName] = make(chan *InboundMsg)
}
@ -95,21 +94,26 @@ func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer,
return peer, nil
}
func (c *Client) Broadcast(chName String, msg Msg) {
func (c *Client) Broadcast(chName string, msg Msg) {
if atomic.LoadUint32(&c.stopped) == 1 { return }
log.Tracef("Broadcast on [%v] msg: %v", chName, msg)
for v := range c.Peers().Values() {
peer := v.(*Peer)
success := peer.TryQueueOut(chName , msg)
log.Tracef("Broadcast for peer %v success: %v", peer, success)
if !success {
// TODO: notify the peer
}
}
}
func (c *Client) PopMessage(chName String) *InboundMsg {
// blocks until a message is popped.
func (c *Client) PopMessage(chName string) *InboundMsg {
if atomic.LoadUint32(&c.stopped) == 1 { return nil }
log.Tracef("PopMessage on [%v]", chName)
q := c.inQueues[chName]
if q == nil { Panicf("Expected inQueues[%f], found none", chName) }


+ 6
- 3
peer/client_test.go View File

@ -11,7 +11,7 @@ func TestConnection(t *testing.T) {
peerMaker := func(conn *Connection) *Peer {
bufferSize := 10
p := NewPeer(conn)
p.channels = map[String]*Channel{}
p.channels = map[string]*Channel{}
p.channels["ch1"] = NewChannel("ch1", bufferSize)
p.channels["ch2"] = NewChannel("ch2", bufferSize)
return p
@ -44,8 +44,11 @@ func TestConnection(t *testing.T) {
}
// TODO: test the transmission of information on channels.
time.Sleep(500 * time.Millisecond)
//inMsg := c2.PopMessage(String(""))
c1.Broadcast("ch1", Msg{Bytes:ByteSlice("test data")})
time.Sleep(100 * time.Millisecond)
inMsg := c2.PopMessage("ch1")
t.Logf("c2 popped message: %v", inMsg)
s1.Stop()
c2.Stop()


+ 15
- 3
peer/connection.go View File

@ -56,12 +56,14 @@ func (c *Connection) QueueOut(msg ByteSlice) bool {
}
func (c *Connection) Start() {
log.Debugf("Starting %v", c)
go c.outHandler()
go c.inHandler()
}
func (c *Connection) Stop() {
if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
log.Debugf("Stopping %v", c)
close(c.quit)
c.conn.Close()
c.pingDebouncer.Stop()
@ -90,6 +92,7 @@ func (c *Connection) flush() {
}
func (c *Connection) outHandler() {
log.Tracef("Connection %v outHandler", c)
FOR_LOOP:
for {
@ -98,6 +101,9 @@ func (c *Connection) outHandler() {
case <-c.pingDebouncer.Ch:
_, err = PACKET_TYPE_PING.WriteTo(c.conn)
case outMsg := <-c.outQueue:
log.Tracef("Found msg from outQueue. Writing msg to underlying connection")
_, err = PACKET_TYPE_MSG.WriteTo(c.conn)
if err != nil { break }
_, err = outMsg.WriteTo(c.conn)
case <-c.pong:
_, err = PACKET_TYPE_PONG.WriteTo(c.conn)
@ -114,20 +120,24 @@ func (c *Connection) outHandler() {
c.flush()
}
log.Tracef("Connection %v outHandler done", c)
// cleanup
}
func (c *Connection) inHandler() {
log.Tracef("Connection %v inHandler", c)
FOR_LOOP:
for {
msgType, err := ReadUInt8Safe(c.conn)
if err != nil {
if atomic.LoadUint32(&c.stopped) != 1 {
log.Infof("Connection %v failed @ inHandler", c)
c.Stop()
}
break FOR_LOOP
} else {
log.Tracef("Found msgType %v", msgType)
}
switch msgType {
@ -145,9 +155,10 @@ func (c *Connection) inHandler() {
break FOR_LOOP
}
// What to do?
// TODO
// XXX
XXX well, we need to push it into the channel or something.
or at least provide an inQueue.
log.Tracef("%v", msg)
default:
Panicf("Unknown message type %v", msgType)
}
@ -155,6 +166,7 @@ func (c *Connection) inHandler() {
c.pingDebouncer.Reset()
}
log.Tracef("Connection %v inHandler done", c)
// cleanup
close(c.pong)
for _ = range c.pong {


+ 17
- 9
peer/peer.go View File

@ -14,7 +14,7 @@ import (
type Peer struct {
outgoing bool
conn *Connection
channels map[String]*Channel
channels map[string]*Channel
mtx sync.Mutex
quit chan struct{}
@ -29,7 +29,9 @@ func NewPeer(conn *Connection) *Peer {
}
}
func (p *Peer) Start(peerInQueues map[String]chan *InboundMsg ) {
func (p *Peer) Start(peerInQueues map[string]chan *InboundMsg ) {
log.Debugf("Starting %v", p)
p.conn.Start()
for chName, _ := range p.channels {
go p.inHandler(chName, peerInQueues[chName])
go p.outHandler(chName)
@ -40,6 +42,7 @@ func (p *Peer) Stop() {
// lock
p.mtx.Lock()
if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) {
log.Debugf("Stopping %v", p)
close(p.quit)
p.conn.Stop()
}
@ -55,13 +58,13 @@ func (p *Peer) RemoteAddress() *NetAddress {
return p.conn.RemoteAddress()
}
func (p *Peer) Channel(chName String) *Channel {
func (p *Peer) Channel(chName string) *Channel {
return p.channels[chName]
}
// Queue the msg for output.
// If the queue is full, just return false.
func (p *Peer) TryQueueOut(chName String, msg Msg) bool {
func (p *Peer) TryQueueOut(chName string, msg Msg) bool {
channel := p.Channel(chName)
outQueue := channel.OutQueue()
@ -85,7 +88,8 @@ func (p *Peer) String() string {
return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing)
}
func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) {
func (p *Peer) inHandler(chName string, inboundMsgQueue chan<- *InboundMsg) {
log.Tracef("Peer %v inHandler [%v]", p, chName)
channel := p.channels[chName]
inQueue := channel.InQueue()
@ -111,11 +115,13 @@ func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) {
}
}
log.Tracef("Peer %v inHandler [%v] closed", p, chName)
// cleanup
// (none)
}
func (p *Peer) outHandler(chName String) {
func (p *Peer) outHandler(chName string) {
log.Tracef("Peer %v outHandler [%v]", p, chName)
outQueue := p.channels[chName].outQueue
FOR_LOOP:
for {
@ -123,12 +129,14 @@ func (p *Peer) outHandler(chName String) {
case <-p.quit:
break FOR_LOOP
case msg := <-outQueue:
log.Tracef("Sending msg to peer outQueue")
// blocks until the connection is Stop'd,
// which happens when this peer is Stop'd.
p.conn.QueueOut(msg.Bytes)
}
}
log.Tracef("Peer %v outHandler [%v] closed", p, chName)
// cleanup
// (none)
}
@ -137,7 +145,7 @@ func (p *Peer) outHandler(chName String) {
/* Channel */
type Channel struct {
name String
name string
inQueue chan Msg
outQueue chan Msg
//stats Stats
@ -145,13 +153,13 @@ type Channel struct {
func NewChannel(name string, bufferSize int) *Channel {
return &Channel{
name: String(name),
name: name,
inQueue: make(chan Msg, bufferSize),
outQueue: make(chan Msg, bufferSize),
}
}
func (c *Channel) Name() String {
func (c *Channel) Name() string {
return c.name
}


Loading…
Cancel
Save