Browse Source

.

pull/9/head
Jae Kwon 11 years ago
parent
commit
237b3dc7ff
2 changed files with 22 additions and 22 deletions
  1. +20
    -20
      p2p/connection.go
  2. +2
    -2
      p2p/listener.go

+ 20
- 20
p2p/connection.go View File

@ -12,12 +12,12 @@ import (
) )
const ( const (
MinReadBufferSize = 1024
MinWriteBufferSize = 1024
FlushThrottleMS = 50
OutQueueSize = 50
IdleTimeoutMinutes = 5
PingTimeoutMinutes = 2
minReadBufferSize = 1024
minWriteBufferSize = 1024
flushThrottleMS = 50
outQueueSize = 50
idleTimeoutMinutes = 5
pingTimeoutMinutes = 2
) )
/* /*
@ -44,20 +44,20 @@ type Connection struct {
} }
const ( const (
PacketTypePing = UInt8(0x00)
PacketTypePong = UInt8(0x01)
PacketTypeMessage = UInt8(0x10)
packetTypePing = UInt8(0x00)
packetTypePong = UInt8(0x01)
packetTypeMessage = UInt8(0x10)
) )
func NewConnection(conn net.Conn) *Connection { func NewConnection(conn net.Conn) *Connection {
return &Connection{ return &Connection{
sendQueue: make(chan Packet, OutQueueSize),
sendQueue: make(chan Packet, outQueueSize),
conn: conn, conn: conn,
bufReader: bufio.NewReaderSize(conn, MinReadBufferSize),
bufWriter: bufio.NewWriterSize(conn, MinWriteBufferSize),
flushThrottler: NewThrottler(FlushThrottleMS * time.Millisecond),
bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
flushThrottler: NewThrottler(flushThrottleMS * time.Millisecond),
quit: make(chan struct{}), quit: make(chan struct{}),
pingRepeatTimer: NewRepeatTimer(PingTimeoutMinutes * time.Minute),
pingRepeatTimer: NewRepeatTimer(pingTimeoutMinutes * time.Minute),
pong: make(chan struct{}), pong: make(chan struct{}),
} }
} }
@ -148,7 +148,7 @@ FOR_LOOP:
select { select {
case sendPkt := <-c.sendQueue: case sendPkt := <-c.sendQueue:
log.Tracef("Found pkt from sendQueue. Writing pkt to underlying connection") log.Tracef("Found pkt from sendQueue. Writing pkt to underlying connection")
_, err = PacketTypeMessage.WriteTo(c.bufWriter)
_, err = packetTypeMessage.WriteTo(c.bufWriter)
if err != nil { if err != nil {
break break
} }
@ -157,11 +157,11 @@ FOR_LOOP:
case <-c.flushThrottler.Ch: case <-c.flushThrottler.Ch:
c.flush() c.flush()
case <-c.pingRepeatTimer.Ch: case <-c.pingRepeatTimer.Ch:
_, err = PacketTypePing.WriteTo(c.bufWriter)
_, err = packetTypePing.WriteTo(c.bufWriter)
log.Debugf("[%v] Sending Ping", c) log.Debugf("[%v] Sending Ping", c)
c.flush() c.flush()
case <-c.pong: case <-c.pong:
_, err = PacketTypePong.WriteTo(c.bufWriter)
_, err = packetTypePong.WriteTo(c.bufWriter)
log.Debugf("[%v] Sending Pong", c) log.Debugf("[%v] Sending Pong", c)
c.flush() c.flush()
case <-c.quit: case <-c.quit:
@ -202,14 +202,14 @@ FOR_LOOP:
} }
switch pktType { switch pktType {
case PacketTypePing:
case packetTypePing:
// TODO: keep track of these, make sure it isn't abused // TODO: keep track of these, make sure it isn't abused
// as they cause flush()'s in the send buffer. // as they cause flush()'s in the send buffer.
c.pong <- struct{}{} c.pong <- struct{}{}
case PacketTypePong:
case packetTypePong:
// do nothing // do nothing
log.Debugf("[%v] Received Pong", c) log.Debugf("[%v] Received Pong", c)
case PacketTypeMessage:
case packetTypeMessage:
pkt, err := ReadPacketSafe(c.bufReader) pkt, err := ReadPacketSafe(c.bufReader)
if err != nil { if err != nil {
if atomic.LoadUint32(&c.stopped) != 1 { if atomic.LoadUint32(&c.stopped) != 1 {


+ 2
- 2
p2p/listener.go View File

@ -30,7 +30,7 @@ type DefaultListener struct {
} }
const ( const (
NumBufferedConnections = 10
numBufferedConnections = 10
) )
func NewDefaultListener(protocol string, listenAddr string) Listener { func NewDefaultListener(protocol string, listenAddr string) Listener {
@ -66,7 +66,7 @@ func NewDefaultListener(protocol string, listenAddr string) Listener {
dl := &DefaultListener{ dl := &DefaultListener{
listener: listener, listener: listener,
extAddr: extAddr, extAddr: extAddr,
connections: make(chan *Connection, NumBufferedConnections),
connections: make(chan *Connection, numBufferedConnections),
} }
go dl.listenHandler() go dl.listenHandler()


Loading…
Cancel
Save