diff --git a/common/debounce.go b/common/debounce.go deleted file mode 100644 index 125aaa98f..000000000 --- a/common/debounce.go +++ /dev/null @@ -1,48 +0,0 @@ -package common - -import ( - "sync" - "time" -) - -/* Debouncer */ -type Debouncer struct { - Ch chan struct{} - quit chan struct{} - dur time.Duration - mtx sync.Mutex - timer *time.Timer -} - -func NewDebouncer(dur time.Duration) *Debouncer { - var timer *time.Timer - var ch = make(chan struct{}) - var quit = make(chan struct{}) - var mtx sync.Mutex - fire := func() { - go func() { - select { - case ch <- struct{}{}: - case <-quit: - } - }() - mtx.Lock() - defer mtx.Unlock() - timer.Reset(dur) - } - timer = time.AfterFunc(dur, fire) - return &Debouncer{Ch: ch, dur: dur, quit: quit, mtx: mtx, timer: timer} -} - -func (d *Debouncer) Reset() { - d.mtx.Lock() - defer d.mtx.Unlock() - d.timer.Reset(d.dur) -} - -func (d *Debouncer) Stop() bool { - d.mtx.Lock() - defer d.mtx.Unlock() - close(d.quit) - return d.timer.Stop() -} diff --git a/common/repeat_timer.go b/common/repeat_timer.go new file mode 100644 index 000000000..4ef1cbee4 --- /dev/null +++ b/common/repeat_timer.go @@ -0,0 +1,36 @@ +package common + +import "time" + +/* RepeatTimer */ +type RepeatTimer struct { + Ch chan struct{} + quit chan struct{} + dur time.Duration + timer *time.Timer +} + +func NewRepeatTimer(dur time.Duration) *RepeatTimer { + var ch = make(chan struct{}) + var quit = make(chan struct{}) + var t = &RepeatTimer{Ch: ch, dur: dur, quit: quit} + t.timer = time.AfterFunc(dur, t.fireHandler) + return t +} + +func (t *RepeatTimer) fireHandler() { + select { + case t.Ch <- struct{}{}: + t.timer.Reset(t.dur) + case <-t.quit: + } +} + +func (t *RepeatTimer) Reset() { + t.timer.Reset(t.dur) +} + +func (t *RepeatTimer) Stop() bool { + close(t.quit) + return t.timer.Stop() +} diff --git a/common/throttler.go b/common/throttler.go new file mode 100644 index 000000000..609a97455 --- /dev/null +++ b/common/throttler.go @@ -0,0 +1,42 @@ +package common + +import ( + "sync/atomic" + "time" +) + +/* Throttler */ +type Throttler struct { + Ch chan struct{} + quit chan struct{} + dur time.Duration + timer *time.Timer + isSet uint32 +} + +func NewThrottler(dur time.Duration) *Throttler { + var ch = make(chan struct{}) + var quit = make(chan struct{}) + var t = &Throttler{Ch: ch, dur: dur, quit: quit} + t.timer = time.AfterFunc(dur, t.fireHandler) + return t +} + +func (t *Throttler) fireHandler() { + select { + case t.Ch <- struct{}{}: + atomic.StoreUint32(&t.isSet, 0) + case <-t.quit: + } +} + +func (t *Throttler) Set() { + if atomic.CompareAndSwapUint32(&t.isSet, 0, 1) { + t.timer.Reset(t.dur) + } +} + +func (t *Throttler) Stop() bool { + close(t.quit) + return t.timer.Stop() +} diff --git a/peer/connection.go b/peer/connection.go index d143dd7ec..6b4e86152 100644 --- a/peer/connection.go +++ b/peer/connection.go @@ -12,23 +12,27 @@ import ( ) const ( - OUT_QUEUE_SIZE = 50 - IDLE_TIMEOUT_MINUTES = 5 - PING_TIMEOUT_MINUTES = 2 + READ_BUFFER_MIN_SIZE = 1024 + WRITE_BUFFER_MIN_SIZE = 1024 + FLUSH_THROTTLE_MS = 50 + OUT_QUEUE_SIZE = 50 + IDLE_TIMEOUT_MINUTES = 5 + PING_TIMEOUT_MINUTES = 2 ) /* Connnection */ type Connection struct { ioStats IOStats - sendQueue chan Packet // never closes - conn net.Conn - bufWriter *bufio.Writer - bufReader *bufio.Reader - quit chan struct{} - stopped uint32 - pingDebouncer *Debouncer - pong chan struct{} + sendQueue chan Packet // never closes + conn net.Conn + bufReader *bufio.Reader + bufWriter *bufio.Writer + flushThrottler *Throttler + quit chan struct{} + stopped uint32 + pingRepeatTimer *RepeatTimer + pong chan struct{} } var ( @@ -39,13 +43,14 @@ var ( func NewConnection(conn net.Conn) *Connection { return &Connection{ - sendQueue: make(chan Packet, OUT_QUEUE_SIZE), - conn: conn, - bufWriter: bufio.NewWriterSize(conn, 1024), - bufReader: bufio.NewReaderSize(conn, 1024), - quit: make(chan struct{}), - pingDebouncer: NewDebouncer(PING_TIMEOUT_MINUTES * time.Minute), - pong: make(chan struct{}), + sendQueue: make(chan Packet, OUT_QUEUE_SIZE), + conn: conn, + bufReader: bufio.NewReaderSize(conn, READ_BUFFER_MIN_SIZE), + bufWriter: bufio.NewWriterSize(conn, WRITE_BUFFER_MIN_SIZE), + flushThrottler: NewThrottler(FLUSH_THROTTLE_MS * time.Millisecond), + quit: make(chan struct{}), + pingRepeatTimer: NewRepeatTimer(PING_TIMEOUT_MINUTES * time.Minute), + pong: make(chan struct{}), } } @@ -72,7 +77,8 @@ func (c *Connection) Stop() { log.Debugf("Stopping %v", c) close(c.quit) c.conn.Close() - c.pingDebouncer.Stop() + c.flushThrottler.Stop() + c.pingRepeatTimer.Stop() // We can't close pong safely here because // recvHandler may write to it after we've stopped. // Though it doesn't need to get closed at all, @@ -94,7 +100,15 @@ func (c *Connection) String() string { } func (c *Connection) flush() { - // TODO flush? (turn off nagel, turn back on, etc) + // TODO: this is pretty naive. + // We end up flushing when we don't have to (yet). + // A better solution might require us implementing our own buffered writer. + err := c.bufWriter.Flush() + if err != nil { + if atomic.LoadUint32(&c.stopped) != 1 { + log.Warnf("Connection flush failed: %v", err) + } + } } func (c *Connection) sendHandler() { @@ -106,8 +120,6 @@ FOR_LOOP: for { var err error select { - case <-c.pingDebouncer.Ch: - _, err = PACKET_TYPE_PING.WriteTo(c.bufWriter) case sendPkt := <-c.sendQueue: log.Tracef("Found pkt from sendQueue. Writing pkt to underlying connection") _, err = PACKET_TYPE_MSG.WriteTo(c.bufWriter) @@ -115,8 +127,15 @@ FOR_LOOP: break } _, err = sendPkt.WriteTo(c.bufWriter) + c.flushThrottler.Set() + case <-c.flushThrottler.Ch: + c.flush() + case <-c.pingRepeatTimer.Ch: + _, err = PACKET_TYPE_PING.WriteTo(c.bufWriter) + c.flush() case <-c.pong: _, err = PACKET_TYPE_PONG.WriteTo(c.bufWriter) + c.flush() case <-c.quit: break FOR_LOOP } @@ -129,7 +148,6 @@ FOR_LOOP: c.Stop() break FOR_LOOP } - c.flush() } log.Tracef("%v sendHandler done", c) @@ -156,6 +174,8 @@ FOR_LOOP: switch pktType { case PACKET_TYPE_PING: + // TODO: keep track of these, make sure it isn't abused + // as they cause flush()'s in the send buffer. c.pong <- struct{}{} case PACKET_TYPE_PONG: // do nothing @@ -177,7 +197,7 @@ FOR_LOOP: Panicf("Unknown message type %v", pktType) } - c.pingDebouncer.Reset() + c.pingRepeatTimer.Reset() } log.Tracef("%v recvHandler done", c)