|
@ -31,7 +31,7 @@ func newPeer(conn *Connection, channels map[string]*Channel) *Peer { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (p *Peer) start(pktRecvQueues map[string]chan *InboundPacket, onPeerError func(*Peer, interface{})) { |
|
|
func (p *Peer) start(pktRecvQueues map[string]chan *InboundPacket, onPeerError func(*Peer, interface{})) { |
|
|
log.Debugf("Starting %v", p) |
|
|
|
|
|
|
|
|
log.Debug("Starting %v", p) |
|
|
|
|
|
|
|
|
if atomic.CompareAndSwapUint32(&p.started, 0, 1) { |
|
|
if atomic.CompareAndSwapUint32(&p.started, 0, 1) { |
|
|
// on connection error
|
|
|
// on connection error
|
|
@ -50,7 +50,7 @@ func (p *Peer) start(pktRecvQueues map[string]chan *InboundPacket, onPeerError f |
|
|
|
|
|
|
|
|
func (p *Peer) stop() { |
|
|
func (p *Peer) stop() { |
|
|
if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) { |
|
|
if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) { |
|
|
log.Debugf("Stopping %v", p) |
|
|
|
|
|
|
|
|
log.Debug("Stopping %v", p) |
|
|
close(p.quit) |
|
|
close(p.quit) |
|
|
p.conn.Stop() |
|
|
p.conn.Stop() |
|
|
} |
|
|
} |
|
@ -75,7 +75,7 @@ func (p *Peer) Channel(chName string) *Channel { |
|
|
// TrySend returns true if the packet was successfully queued.
|
|
|
// TrySend returns true if the packet was successfully queued.
|
|
|
// Returning true does not imply that the packet will be sent.
|
|
|
// Returning true does not imply that the packet will be sent.
|
|
|
func (p *Peer) TrySend(pkt Packet) bool { |
|
|
func (p *Peer) TrySend(pkt Packet) bool { |
|
|
log.Debugf("TrySend [%v] -> %v", pkt, p) |
|
|
|
|
|
|
|
|
log.Debug("TrySend [%v] -> %v", pkt, p) |
|
|
channel := p.Channel(string(pkt.Channel)) |
|
|
channel := p.Channel(string(pkt.Channel)) |
|
|
sendQueue := channel.sendQueue |
|
|
sendQueue := channel.sendQueue |
|
|
|
|
|
|
|
@ -92,7 +92,7 @@ func (p *Peer) TrySend(pkt Packet) bool { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (p *Peer) Send(pkt Packet) bool { |
|
|
func (p *Peer) Send(pkt Packet) bool { |
|
|
log.Debugf("Send [%v] -> %v", pkt, p) |
|
|
|
|
|
|
|
|
log.Debug("Send [%v] -> %v", pkt, p) |
|
|
channel := p.Channel(string(pkt.Channel)) |
|
|
channel := p.Channel(string(pkt.Channel)) |
|
|
sendQueue := channel.sendQueue |
|
|
sendQueue := channel.sendQueue |
|
|
|
|
|
|
|
@ -116,7 +116,7 @@ func (p *Peer) String() string { |
|
|
// Each channel gets its own sendHandler goroutine;
|
|
|
// Each channel gets its own sendHandler goroutine;
|
|
|
// Golang's channel implementation handles the scheduling.
|
|
|
// Golang's channel implementation handles the scheduling.
|
|
|
func (p *Peer) sendHandler(chName string) { |
|
|
func (p *Peer) sendHandler(chName string) { |
|
|
log.Tracef("%v sendHandler [%v]", p, chName) |
|
|
|
|
|
|
|
|
log.Debug("%v sendHandler [%v]", p, chName) |
|
|
channel := p.channels[chName] |
|
|
channel := p.channels[chName] |
|
|
sendQueue := channel.sendQueue |
|
|
sendQueue := channel.sendQueue |
|
|
FOR_LOOP: |
|
|
FOR_LOOP: |
|
@ -125,14 +125,14 @@ FOR_LOOP: |
|
|
case <-p.quit: |
|
|
case <-p.quit: |
|
|
break FOR_LOOP |
|
|
break FOR_LOOP |
|
|
case pkt := <-sendQueue: |
|
|
case pkt := <-sendQueue: |
|
|
log.Tracef("Sending packet to peer sendQueue") |
|
|
|
|
|
|
|
|
log.Debug("Sending packet to peer sendQueue") |
|
|
// blocks until the connection is Stop'd,
|
|
|
// blocks until the connection is Stop'd,
|
|
|
// which happens when this peer is Stop'd.
|
|
|
// which happens when this peer is Stop'd.
|
|
|
p.conn.Send(pkt) |
|
|
p.conn.Send(pkt) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
log.Tracef("%v sendHandler [%v] closed", p, chName) |
|
|
|
|
|
|
|
|
log.Debug("%v sendHandler [%v] closed", p, chName) |
|
|
// cleanup
|
|
|
// cleanup
|
|
|
// (none)
|
|
|
// (none)
|
|
|
} |
|
|
} |
|
@ -142,7 +142,7 @@ FOR_LOOP: |
|
|
// Many peers have goroutines that push to the same pktRecvQueue.
|
|
|
// Many peers have goroutines that push to the same pktRecvQueue.
|
|
|
// Golang's channel implementation handles the scheduling.
|
|
|
// Golang's channel implementation handles the scheduling.
|
|
|
func (p *Peer) recvHandler(chName string, pktRecvQueue chan<- *InboundPacket) { |
|
|
func (p *Peer) recvHandler(chName string, pktRecvQueue chan<- *InboundPacket) { |
|
|
log.Tracef("%v recvHandler [%v]", p, chName) |
|
|
|
|
|
|
|
|
log.Debug("%v recvHandler [%v]", p, chName) |
|
|
channel := p.channels[chName] |
|
|
channel := p.channels[chName] |
|
|
recvQueue := channel.recvQueue |
|
|
recvQueue := channel.recvQueue |
|
|
|
|
|
|
|
@ -167,7 +167,7 @@ FOR_LOOP: |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
log.Tracef("%v recvHandler [%v] closed", p, chName) |
|
|
|
|
|
|
|
|
log.Debug("%v recvHandler [%v] closed", p, chName) |
|
|
// cleanup
|
|
|
// cleanup
|
|
|
// (none)
|
|
|
// (none)
|
|
|
} |
|
|
} |
|
@ -224,7 +224,7 @@ type Packet struct { |
|
|
|
|
|
|
|
|
func NewPacket(chName String, msg Binary) Packet { |
|
|
func NewPacket(chName String, msg Binary) Packet { |
|
|
msgBytes := BinaryBytes(msg) |
|
|
msgBytes := BinaryBytes(msg) |
|
|
log.Tracef("NewPacket msg bytes: %X", msgBytes) |
|
|
|
|
|
|
|
|
log.Debug("NewPacket msg bytes: %X", msgBytes) |
|
|
return Packet{ |
|
|
return Packet{ |
|
|
Channel: chName, |
|
|
Channel: chName, |
|
|
Bytes: msgBytes, |
|
|
Bytes: msgBytes, |
|
@ -255,7 +255,7 @@ func ReadPacketSafe(r io.Reader) (pkt Packet, err error) { |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
log.Tracef("ReadPacket* msg bytes: %X", bytes) |
|
|
|
|
|
|
|
|
log.Debug("ReadPacket* msg bytes: %X", bytes) |
|
|
return Packet{Channel: chName, Bytes: bytes}, nil |
|
|
return Packet{Channel: chName, Bytes: bytes}, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|