From 887d766c86f1f217653915a2042374972c8f38ae Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Thu, 7 Dec 2017 10:15:38 +0100 Subject: [PATCH 1/5] Refactored RepeatTimer, tests hang --- common/repeat_timer.go | 121 +++++++++++++++++++++--------------- common/repeat_timer_test.go | 4 +- common/throttle_timer.go | 16 ++--- 3 files changed, 81 insertions(+), 60 deletions(-) diff --git a/common/repeat_timer.go b/common/repeat_timer.go index d7d9154d4..0f6501131 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -1,7 +1,7 @@ package common import ( - "sync" + "fmt" "time" ) @@ -11,54 +11,40 @@ It's good for keeping connections alive. A RepeatTimer must be Stop()'d or it will keep a goroutine alive. */ type RepeatTimer struct { - Ch chan time.Time + Name string + Ch <-chan time.Time + output chan<- time.Time + input chan repeatCommand - mtx sync.Mutex - name string - ticker *time.Ticker - quit chan struct{} - wg *sync.WaitGroup - dur time.Duration + dur time.Duration + timer *time.Timer } +type repeatCommand int32 + +const ( + Reset repeatCommand = iota + RQuit +) + func NewRepeatTimer(name string, dur time.Duration) *RepeatTimer { + c := make(chan time.Time) var t = &RepeatTimer{ - Ch: make(chan time.Time), - ticker: time.NewTicker(dur), - quit: make(chan struct{}), - wg: new(sync.WaitGroup), - name: name, - dur: dur, - } - t.wg.Add(1) - go t.fireRoutine(t.ticker) - return t -} + Name: name, + Ch: c, + output: c, + input: make(chan repeatCommand), -func (t *RepeatTimer) fireRoutine(ticker *time.Ticker) { - for { - select { - case t_ := <-ticker.C: - t.Ch <- t_ - case <-t.quit: - // needed so we know when we can reset t.quit - t.wg.Done() - return - } + timer: time.NewTimer(dur), + dur: dur, } + go t.run() + return t } // Wait the duration again before firing. func (t *RepeatTimer) Reset() { - t.Stop() - - t.mtx.Lock() // Lock - defer t.mtx.Unlock() - - t.ticker = time.NewTicker(t.dur) - t.quit = make(chan struct{}) - t.wg.Add(1) - go t.fireRoutine(t.ticker) + t.input <- Reset } // For ease of .Stop()'ing services before .Start()'ing them, @@ -67,20 +53,55 @@ func (t *RepeatTimer) Stop() bool { if t == nil { return false } - t.mtx.Lock() // Lock - defer t.mtx.Unlock() + t.input <- RQuit + return true +} - exists := t.ticker != nil - if exists { - t.ticker.Stop() // does not close the channel +func (t *RepeatTimer) run() { + for { + fmt.Println("for") select { - case <-t.Ch: - // read off channel if there's anything there - default: + case cmd := <-t.input: + // stop goroutine if the input says so + // don't close channels, as closed channels mess up select reads + if t.processInput(cmd) { + t.timer.Stop() + return + } + case <-t.timer.C: + fmt.Println("tick") + // send if not blocked, then start the next tick + // for blocking send, just + // t.output <- time.Now() + t.trySend() + t.timer.Reset(t.dur) } - close(t.quit) - t.wg.Wait() // must wait for quit to close else we race Reset - t.ticker = nil } - return exists +} + +// trySend performs non-blocking send on t.Ch +func (t *RepeatTimer) trySend() { + // TODO: this was blocking in previous version (t.Ch <- t_) + // should I use that behavior unstead of unblocking as per throttle? + select { + case t.output <- time.Now(): + default: + } +} + +// all modifications of the internal state of ThrottleTimer +// happen in this method. It is only called from the run goroutine +// so we avoid any race conditions +func (t *RepeatTimer) processInput(cmd repeatCommand) (shutdown bool) { + fmt.Printf("process: %d\n", cmd) + switch cmd { + case Reset: + t.timer.Reset(t.dur) + case RQuit: + t.timer.Stop() + shutdown = true + default: + panic("unknown command!") + } + return shutdown } diff --git a/common/repeat_timer_test.go b/common/repeat_timer_test.go index 87f34b950..d66cd3152 100644 --- a/common/repeat_timer_test.go +++ b/common/repeat_timer_test.go @@ -10,7 +10,7 @@ import ( ) type rCounter struct { - input chan time.Time + input <-chan time.Time mtx sync.Mutex count int } @@ -74,5 +74,5 @@ func TestRepeat(test *testing.T) { assert.Equal(6, c.Count()) // close channel to stop counter - close(t.Ch) + t.Stop() } diff --git a/common/throttle_timer.go b/common/throttle_timer.go index ab2ad2e62..c148d9904 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -13,7 +13,7 @@ at most once every "dur". type ThrottleTimer struct { Name string Ch <-chan struct{} - input chan command + input chan throttleCommand output chan<- struct{} dur time.Duration @@ -21,12 +21,12 @@ type ThrottleTimer struct { isSet bool } -type command int32 +type throttleCommand int32 const ( - Set command = iota + Set throttleCommand = iota Unset - Quit + TQuit ) // NewThrottleTimer creates a new ThrottleTimer. @@ -36,7 +36,7 @@ func NewThrottleTimer(name string, dur time.Duration) *ThrottleTimer { Name: name, Ch: c, dur: dur, - input: make(chan command), + input: make(chan throttleCommand), output: c, timer: time.NewTimer(dur), } @@ -74,14 +74,14 @@ func (t *ThrottleTimer) trySend() { // all modifications of the internal state of ThrottleTimer // happen in this method. It is only called from the run goroutine // so we avoid any race conditions -func (t *ThrottleTimer) processInput(cmd command) (shutdown bool) { +func (t *ThrottleTimer) processInput(cmd throttleCommand) (shutdown bool) { switch cmd { case Set: if !t.isSet { t.isSet = true t.timer.Reset(t.dur) } - case Quit: + case TQuit: shutdown = true fallthrough case Unset: @@ -122,6 +122,6 @@ func (t *ThrottleTimer) Stop() bool { if t == nil { return false } - t.input <- Quit + t.input <- TQuit return true } From 8797197cdfc9920e2dbce274c8aba8c09b15f86f Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Thu, 7 Dec 2017 10:36:03 +0100 Subject: [PATCH 2/5] No more blocking on multiple Stop() --- common/repeat_timer.go | 33 +++++++++++++++++---------------- common/repeat_timer_test.go | 2 +- common/throttle_timer.go | 8 +++++--- common/throttle_timer_test.go | 5 +++++ 4 files changed, 28 insertions(+), 20 deletions(-) diff --git a/common/repeat_timer.go b/common/repeat_timer.go index 0f6501131..734c2d32a 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -16,8 +16,9 @@ type RepeatTimer struct { output chan<- time.Time input chan repeatCommand - dur time.Duration - timer *time.Timer + dur time.Duration + timer *time.Timer + stopped bool } type repeatCommand int32 @@ -50,43 +51,42 @@ func (t *RepeatTimer) Reset() { // For ease of .Stop()'ing services before .Start()'ing them, // we ignore .Stop()'s on nil RepeatTimers. func (t *RepeatTimer) Stop() bool { - if t == nil { + if t == nil || t.stopped { return false } t.input <- RQuit + t.stopped = true return true } func (t *RepeatTimer) run() { - for { - fmt.Println("for") + done := false + for !done { select { case cmd := <-t.input: // stop goroutine if the input says so // don't close channels, as closed channels mess up select reads - if t.processInput(cmd) { - t.timer.Stop() - return - } + done = t.processInput(cmd) case <-t.timer.C: - fmt.Println("tick") // send if not blocked, then start the next tick - // for blocking send, just - // t.output <- time.Now() t.trySend() t.timer.Reset(t.dur) } } + fmt.Println("end run") } // trySend performs non-blocking send on t.Ch func (t *RepeatTimer) trySend() { // TODO: this was blocking in previous version (t.Ch <- t_) // should I use that behavior unstead of unblocking as per throttle? - select { - case t.output <- time.Now(): - default: - } + + // select { + // case t.output <- time.Now(): + // default: + // } + + t.output <- time.Now() } // all modifications of the internal state of ThrottleTimer @@ -98,6 +98,7 @@ func (t *RepeatTimer) processInput(cmd repeatCommand) (shutdown bool) { case Reset: t.timer.Reset(t.dur) case RQuit: + fmt.Println("got quit") t.timer.Stop() shutdown = true default: diff --git a/common/repeat_timer_test.go b/common/repeat_timer_test.go index d66cd3152..15ca32c31 100644 --- a/common/repeat_timer_test.go +++ b/common/repeat_timer_test.go @@ -73,6 +73,6 @@ func TestRepeat(test *testing.T) { time.Sleep(delay(7)) assert.Equal(6, c.Count()) - // close channel to stop counter + // extra calls to stop don't block t.Stop() } diff --git a/common/throttle_timer.go b/common/throttle_timer.go index c148d9904..0e54f1027 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -17,8 +17,9 @@ type ThrottleTimer struct { output chan<- struct{} dur time.Duration - timer *time.Timer - isSet bool + timer *time.Timer + isSet bool + stopped bool } type throttleCommand int32 @@ -82,6 +83,7 @@ func (t *ThrottleTimer) processInput(cmd throttleCommand) (shutdown bool) { t.timer.Reset(t.dur) } case TQuit: + t.stopped = true shutdown = true fallthrough case Unset: @@ -119,7 +121,7 @@ func (t *ThrottleTimer) Unset() { // For ease of stopping services before starting them, we ignore Stop on nil // ThrottleTimers. func (t *ThrottleTimer) Stop() bool { - if t == nil { + if t == nil || t.stopped { return false } t.input <- TQuit diff --git a/common/throttle_timer_test.go b/common/throttle_timer_test.go index a1b6606f5..2a81bb02e 100644 --- a/common/throttle_timer_test.go +++ b/common/throttle_timer_test.go @@ -95,4 +95,9 @@ func TestThrottle(test *testing.T) { stopped := t.Stop() assert.True(stopped) + time.Sleep(longwait) + assert.Equal(5, c.Count()) + + // extra calls to stop don't block + t.Stop() } From cc7a87e27caa55ca84e984d1d081b09eeb16ffe6 Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Thu, 7 Dec 2017 11:22:54 +0100 Subject: [PATCH 3/5] Use Ticker in Repeat again to avoid drift --- common/repeat_timer.go | 34 ++++++++++++++-------------------- common/repeat_timer_test.go | 6 +++--- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/common/repeat_timer.go b/common/repeat_timer.go index 734c2d32a..b3eb107d2 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -1,7 +1,6 @@ package common import ( - "fmt" "time" ) @@ -17,7 +16,7 @@ type RepeatTimer struct { input chan repeatCommand dur time.Duration - timer *time.Timer + ticker *time.Ticker stopped bool } @@ -36,8 +35,8 @@ func NewRepeatTimer(name string, dur time.Duration) *RepeatTimer { output: c, input: make(chan repeatCommand), - timer: time.NewTimer(dur), - dur: dur, + dur: dur, + ticker: time.NewTicker(dur), } go t.run() return t @@ -51,6 +50,7 @@ func (t *RepeatTimer) Reset() { // For ease of .Stop()'ing services before .Start()'ing them, // we ignore .Stop()'s on nil RepeatTimers. func (t *RepeatTimer) Stop() bool { + // use t.stopped to gracefully handle many Stop() without blocking if t == nil || t.stopped { return false } @@ -67,39 +67,33 @@ func (t *RepeatTimer) run() { // stop goroutine if the input says so // don't close channels, as closed channels mess up select reads done = t.processInput(cmd) - case <-t.timer.C: - // send if not blocked, then start the next tick + case <-t.ticker.C: t.trySend() - t.timer.Reset(t.dur) } } - fmt.Println("end run") } // trySend performs non-blocking send on t.Ch func (t *RepeatTimer) trySend() { - // TODO: this was blocking in previous version (t.Ch <- t_) + // NOTE: this was blocking in previous version (t.Ch <- t_) // should I use that behavior unstead of unblocking as per throttle? - - // select { - // case t.output <- time.Now(): - // default: - // } - - t.output <- time.Now() + // probably not: https://golang.org/src/time/sleep.go#L132 + select { + case t.output <- time.Now(): + default: + } } // all modifications of the internal state of ThrottleTimer // happen in this method. It is only called from the run goroutine // so we avoid any race conditions func (t *RepeatTimer) processInput(cmd repeatCommand) (shutdown bool) { - fmt.Printf("process: %d\n", cmd) switch cmd { case Reset: - t.timer.Reset(t.dur) + t.ticker.Stop() + t.ticker = time.NewTicker(t.dur) case RQuit: - fmt.Println("got quit") - t.timer.Stop() + t.ticker.Stop() shutdown = true default: panic("unknown command!") diff --git a/common/repeat_timer_test.go b/common/repeat_timer_test.go index 15ca32c31..db53aa614 100644 --- a/common/repeat_timer_test.go +++ b/common/repeat_timer_test.go @@ -39,11 +39,11 @@ func (c *rCounter) Read() { func TestRepeat(test *testing.T) { assert := asrt.New(test) - dur := time.Duration(50) * time.Millisecond + dur := time.Duration(100) * time.Millisecond short := time.Duration(20) * time.Millisecond // delay waits for cnt durations, an a little extra delay := func(cnt int) time.Duration { - return time.Duration(cnt)*dur + time.Duration(5)*time.Millisecond + return time.Duration(cnt)*dur + time.Duration(10)*time.Millisecond } t := NewRepeatTimer("bar", dur) @@ -70,7 +70,7 @@ func TestRepeat(test *testing.T) { // after a stop, nothing more is sent stopped := t.Stop() assert.True(stopped) - time.Sleep(delay(7)) + time.Sleep(delay(2)) assert.Equal(6, c.Count()) // extra calls to stop don't block From ec4adf21e0451f3fb7da33932d6cac168ddeaa93 Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Fri, 8 Dec 2017 10:07:04 +0100 Subject: [PATCH 4/5] Cleanup from PR comments --- common/repeat_timer.go | 20 ++++++++++---------- common/throttle_timer.go | 4 ++-- common/throttle_timer_test.go | 3 --- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/common/repeat_timer.go b/common/repeat_timer.go index b3eb107d2..77f736034 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -20,7 +20,7 @@ type RepeatTimer struct { stopped bool } -type repeatCommand int32 +type repeatCommand int8 const ( Reset repeatCommand = iota @@ -67,21 +67,21 @@ func (t *RepeatTimer) run() { // stop goroutine if the input says so // don't close channels, as closed channels mess up select reads done = t.processInput(cmd) - case <-t.ticker.C: - t.trySend() + case tick := <-t.ticker.C: + t.trySend(tick) } } } // trySend performs non-blocking send on t.Ch -func (t *RepeatTimer) trySend() { +func (t *RepeatTimer) trySend(tick time.Time) { // NOTE: this was blocking in previous version (t.Ch <- t_) - // should I use that behavior unstead of unblocking as per throttle? - // probably not: https://golang.org/src/time/sleep.go#L132 - select { - case t.output <- time.Now(): - default: - } + // probably better not: https://golang.org/src/time/sleep.go#L132 + t.output <- tick + // select { + // case t.output <- tick: + // default: + // } } // all modifications of the internal state of ThrottleTimer diff --git a/common/throttle_timer.go b/common/throttle_timer.go index 0e54f1027..a5bd6ded8 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -22,7 +22,7 @@ type ThrottleTimer struct { stopped bool } -type throttleCommand int32 +type throttleCommand int8 const ( Set throttleCommand = iota @@ -83,7 +83,6 @@ func (t *ThrottleTimer) processInput(cmd throttleCommand) (shutdown bool) { t.timer.Reset(t.dur) } case TQuit: - t.stopped = true shutdown = true fallthrough case Unset: @@ -125,5 +124,6 @@ func (t *ThrottleTimer) Stop() bool { return false } t.input <- TQuit + t.stopped = true return true } diff --git a/common/throttle_timer_test.go b/common/throttle_timer_test.go index 2a81bb02e..94ec1b43c 100644 --- a/common/throttle_timer_test.go +++ b/common/throttle_timer_test.go @@ -95,9 +95,6 @@ func TestThrottle(test *testing.T) { stopped := t.Stop() assert.True(stopped) - time.Sleep(longwait) - assert.Equal(5, c.Count()) - // extra calls to stop don't block t.Stop() } From ff2fd63bf7db6373e5fb0c1d311c6a139b99dfe0 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 8 Dec 2017 11:17:07 -0600 Subject: [PATCH 5/5] rename trySend to send --- common/repeat_timer.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/common/repeat_timer.go b/common/repeat_timer.go index 77f736034..23faf74ae 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -68,20 +68,20 @@ func (t *RepeatTimer) run() { // don't close channels, as closed channels mess up select reads done = t.processInput(cmd) case tick := <-t.ticker.C: - t.trySend(tick) + t.send(tick) } } } -// trySend performs non-blocking send on t.Ch -func (t *RepeatTimer) trySend(tick time.Time) { - // NOTE: this was blocking in previous version (t.Ch <- t_) - // probably better not: https://golang.org/src/time/sleep.go#L132 - t.output <- tick +// send performs blocking send on t.Ch +func (t *RepeatTimer) send(tick time.Time) { + // XXX: possibly it is better to not block: + // https://golang.org/src/time/sleep.go#L132 // select { // case t.output <- tick: // default: // } + t.output <- tick } // all modifications of the internal state of ThrottleTimer