From 3d9113c16e08fe53f31a2403a5280202c8c9cc14 Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Wed, 6 Dec 2017 09:18:04 +0100 Subject: [PATCH 1/8] Add a bit more padding to tests so they pass on osx with -race --- common/repeat_timer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/repeat_timer_test.go b/common/repeat_timer_test.go index 9f03f41df..87f34b950 100644 --- a/common/repeat_timer_test.go +++ b/common/repeat_timer_test.go @@ -43,7 +43,7 @@ func TestRepeat(test *testing.T) { short := time.Duration(20) * time.Millisecond // delay waits for cnt durations, an a little extra delay := func(cnt int) time.Duration { - return time.Duration(cnt)*dur + time.Millisecond + return time.Duration(cnt)*dur + time.Duration(5)*time.Millisecond } t := NewRepeatTimer("bar", dur) From dcb43956048f0d38495f39e43fd4438ec6d47de7 Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Wed, 6 Dec 2017 11:17:50 +0100 Subject: [PATCH 2/8] Refactor throttle timer --- common/throttle_timer.go | 102 ++++++++++++++++++++++------------ common/throttle_timer_test.go | 19 ++++++- 2 files changed, 85 insertions(+), 36 deletions(-) diff --git a/common/throttle_timer.go b/common/throttle_timer.go index 38ef4e9a3..e260e01bd 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -1,7 +1,7 @@ package common import ( - "sync" + "fmt" "time" ) @@ -12,54 +12,88 @@ If a long continuous burst of .Set() calls happens, ThrottleTimer fires at most once every "dur". */ type ThrottleTimer struct { - Name string - Ch chan struct{} - quit chan struct{} - dur time.Duration + Name string + Ch chan struct{} + input chan command + dur time.Duration - mtx sync.Mutex timer *time.Timer isSet bool } +type command int32 + +const ( + Set command = iota + Unset + Quit +) + func NewThrottleTimer(name string, dur time.Duration) *ThrottleTimer { - var ch = make(chan struct{}) - var quit = make(chan struct{}) - var t = &ThrottleTimer{Name: name, Ch: ch, dur: dur, quit: quit} - t.mtx.Lock() - t.timer = time.AfterFunc(dur, t.fireRoutine) - t.mtx.Unlock() + var t = &ThrottleTimer{ + Name: name, + Ch: make(chan struct{}, 1), + dur: dur, + input: make(chan command), + timer: time.NewTimer(dur), + } t.timer.Stop() + go t.run() return t } -func (t *ThrottleTimer) fireRoutine() { - t.mtx.Lock() - defer t.mtx.Unlock() - select { - case t.Ch <- struct{}{}: - t.isSet = false - case <-t.quit: - // do nothing +func (t *ThrottleTimer) run() { + for { + select { + case cmd := <-t.input: + // stop goroutine if the input says so + if t.processInput(cmd) { + // TODO: do we want to close the channels??? + // close(t.Ch) + // close(t.input) + return + } + case <-t.timer.C: + t.isSet = false + t.Ch <- struct{}{} + } + } +} + +// all modifications of the internal state of ThrottleTimer +// happen in this method. It is only called from the run goroutine +// so we avoid any race conditions +func (t *ThrottleTimer) processInput(cmd command) (shutdown bool) { + fmt.Printf("processInput: %d\n", cmd) + switch cmd { + case Set: + if !t.isSet { + t.isSet = true + t.timer.Reset(t.dur) + } + case Quit: + shutdown = true + fallthrough + case Unset: + if t.isSet { + t.isSet = false + if !t.timer.Stop() { + <-t.timer.C + } + } default: - t.timer.Reset(t.dur) + panic("unknown command!") } + // return true + return shutdown } func (t *ThrottleTimer) Set() { - t.mtx.Lock() - defer t.mtx.Unlock() - if !t.isSet { - t.isSet = true - t.timer.Reset(t.dur) - } + t.input <- Set } func (t *ThrottleTimer) Unset() { - t.mtx.Lock() - defer t.mtx.Unlock() - t.isSet = false - t.timer.Stop() + t.input <- Unset } // For ease of .Stop()'ing services before .Start()'ing them, @@ -68,8 +102,6 @@ func (t *ThrottleTimer) Stop() bool { if t == nil { return false } - close(t.quit) - t.mtx.Lock() - defer t.mtx.Unlock() - return t.timer.Stop() + t.input <- Quit + return true } diff --git a/common/throttle_timer_test.go b/common/throttle_timer_test.go index 00f5abdec..014f9dcdc 100644 --- a/common/throttle_timer_test.go +++ b/common/throttle_timer_test.go @@ -41,6 +41,7 @@ func TestThrottle(test *testing.T) { ms := 50 delay := time.Duration(ms) * time.Millisecond + shortwait := time.Duration(ms/2) * time.Millisecond longwait := time.Duration(2) * delay t := NewThrottleTimer("foo", delay) @@ -65,6 +66,21 @@ func TestThrottle(test *testing.T) { time.Sleep(longwait) assert.Equal(2, c.Count()) + // keep cancelling before it is ready + for i := 0; i < 10; i++ { + t.Set() + time.Sleep(shortwait) + t.Unset() + } + time.Sleep(longwait) + assert.Equal(2, c.Count()) + + // a few unsets do nothing... + for i := 0; i < 5; i++ { + t.Unset() + } + assert.Equal(2, c.Count()) + // send 12, over 2 delay sections, adds 3 short := time.Duration(ms/5) * time.Millisecond for i := 0; i < 13; i++ { @@ -74,5 +90,6 @@ func TestThrottle(test *testing.T) { time.Sleep(longwait) assert.Equal(5, c.Count()) - close(t.Ch) + stopped := t.Stop() + assert.True(stopped) } From 4ec7883891fa9700ce4b122252b8fc697df0bfca Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Wed, 6 Dec 2017 11:21:01 +0100 Subject: [PATCH 3/8] Cleanup --- common/throttle_timer.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/common/throttle_timer.go b/common/throttle_timer.go index e260e01bd..705a12a12 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -1,7 +1,6 @@ package common import ( - "fmt" "time" ) @@ -64,7 +63,6 @@ func (t *ThrottleTimer) run() { // happen in this method. It is only called from the run goroutine // so we avoid any race conditions func (t *ThrottleTimer) processInput(cmd command) (shutdown bool) { - fmt.Printf("processInput: %d\n", cmd) switch cmd { case Set: if !t.isSet { @@ -77,9 +75,7 @@ func (t *ThrottleTimer) processInput(cmd command) (shutdown bool) { case Unset: if t.isSet { t.isSet = false - if !t.timer.Stop() { - <-t.timer.C - } + t.timer.Stop() } default: panic("unknown command!") From 0a8721113a67b3c05f58e12328a0fe0216811b0c Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Wed, 6 Dec 2017 21:08:55 +0100 Subject: [PATCH 4/8] First pass of PR updates --- common/throttle_timer.go | 28 ++++++++++++++-------------- common/throttle_timer_test.go | 2 +- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/common/throttle_timer.go b/common/throttle_timer.go index 705a12a12..f2ce60b2a 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -11,10 +11,11 @@ If a long continuous burst of .Set() calls happens, ThrottleTimer fires at most once every "dur". */ type ThrottleTimer struct { - Name string - Ch chan struct{} - input chan command - dur time.Duration + Name string + Ch <-chan struct{} + output chan<- struct{} + input chan command + dur time.Duration timer *time.Timer isSet bool @@ -29,12 +30,14 @@ const ( ) func NewThrottleTimer(name string, dur time.Duration) *ThrottleTimer { + c := make(chan struct{}, 1) var t = &ThrottleTimer{ - Name: name, - Ch: make(chan struct{}, 1), - dur: dur, - input: make(chan command), - timer: time.NewTimer(dur), + Name: name, + Ch: c, + dur: dur, + output: c, + input: make(chan command), + timer: time.NewTimer(dur), } t.timer.Stop() go t.run() @@ -47,14 +50,12 @@ func (t *ThrottleTimer) run() { case cmd := <-t.input: // stop goroutine if the input says so if t.processInput(cmd) { - // TODO: do we want to close the channels??? - // close(t.Ch) - // close(t.input) + close(t.output) return } case <-t.timer.C: t.isSet = false - t.Ch <- struct{}{} + t.output <- struct{}{} } } } @@ -80,7 +81,6 @@ func (t *ThrottleTimer) processInput(cmd command) (shutdown bool) { default: panic("unknown command!") } - // return true return shutdown } diff --git a/common/throttle_timer_test.go b/common/throttle_timer_test.go index 014f9dcdc..81b817038 100644 --- a/common/throttle_timer_test.go +++ b/common/throttle_timer_test.go @@ -10,7 +10,7 @@ import ( ) type thCounter struct { - input chan struct{} + input <-chan struct{} mtx sync.Mutex count int } From 1ac4c5dd6d007a708337e1ad2636e456e1e4b8db Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Wed, 6 Dec 2017 21:20:30 +0100 Subject: [PATCH 5/8] Made throttle output non-blocking --- common/throttle_timer.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/common/throttle_timer.go b/common/throttle_timer.go index f2ce60b2a..069b6d84b 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -30,7 +30,7 @@ const ( ) func NewThrottleTimer(name string, dur time.Duration) *ThrottleTimer { - c := make(chan struct{}, 1) + c := make(chan struct{}) var t = &ThrottleTimer{ Name: name, Ch: c, @@ -54,12 +54,22 @@ func (t *ThrottleTimer) run() { return } case <-t.timer.C: - t.isSet = false - t.output <- struct{}{} + t.trySend() } } } +// trySend performs non-blocking send on t.output (t.Ch) +func (t *ThrottleTimer) trySend() { + select { + case t.output <- struct{}{}: + t.isSet = false + default: + // if we just want to drop, replace this with t.isSet = false + t.timer.Reset(t.dur) + } +} + // all modifications of the internal state of ThrottleTimer // happen in this method. It is only called from the run goroutine // so we avoid any race conditions From e430d3f8447d23b739840d5137ae75c37ff33a1d Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Wed, 6 Dec 2017 21:51:23 +0100 Subject: [PATCH 6/8] One more attempt with a read-only channel --- common/throttle_timer.go | 33 ++++++++++++++++++--------------- common/throttle_timer_test.go | 2 +- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/common/throttle_timer.go b/common/throttle_timer.go index 069b6d84b..4a4b30033 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -11,11 +11,10 @@ If a long continuous burst of .Set() calls happens, ThrottleTimer fires at most once every "dur". */ type ThrottleTimer struct { - Name string - Ch <-chan struct{} - output chan<- struct{} - input chan command - dur time.Duration + Name string + Ch chan struct{} + input chan command + dur time.Duration timer *time.Timer isSet bool @@ -30,27 +29,31 @@ const ( ) func NewThrottleTimer(name string, dur time.Duration) *ThrottleTimer { - c := make(chan struct{}) var t = &ThrottleTimer{ - Name: name, - Ch: c, - dur: dur, - output: c, - input: make(chan command), - timer: time.NewTimer(dur), + Name: name, + Ch: make(chan struct{}), + dur: dur, + input: make(chan command), + timer: time.NewTimer(dur), } t.timer.Stop() go t.run() return t } +// C is the proper way to listen to the timer output. +// t.Ch will be made private in the (near?) future +func (t *ThrottleTimer) C() <-chan struct{} { + return t.Ch +} + func (t *ThrottleTimer) run() { for { select { case cmd := <-t.input: // stop goroutine if the input says so if t.processInput(cmd) { - close(t.output) + close(t.Ch) return } case <-t.timer.C: @@ -59,10 +62,10 @@ func (t *ThrottleTimer) run() { } } -// trySend performs non-blocking send on t.output (t.Ch) +// trySend performs non-blocking send on t.Ch func (t *ThrottleTimer) trySend() { select { - case t.output <- struct{}{}: + case t.Ch <- struct{}{}: t.isSet = false default: // if we just want to drop, replace this with t.isSet = false diff --git a/common/throttle_timer_test.go b/common/throttle_timer_test.go index 81b817038..f6b5d1df5 100644 --- a/common/throttle_timer_test.go +++ b/common/throttle_timer_test.go @@ -46,7 +46,7 @@ func TestThrottle(test *testing.T) { t := NewThrottleTimer("foo", delay) // start at 0 - c := &thCounter{input: t.Ch} + c := &thCounter{input: t.C()} assert.Equal(0, c.Count()) go c.Read() From 8b518fadb2f3eb928ce5d5a014b4087c5b31309a Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Wed, 6 Dec 2017 22:28:18 +0100 Subject: [PATCH 7/8] Don't close throttle channel, explain why --- common/throttle_timer.go | 2 +- common/throttle_timer_test.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/common/throttle_timer.go b/common/throttle_timer.go index 4a4b30033..051d44376 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -52,8 +52,8 @@ func (t *ThrottleTimer) run() { select { case cmd := <-t.input: // stop goroutine if the input says so + // don't close channels, as closed channels mess up select reads if t.processInput(cmd) { - close(t.Ch) return } case <-t.timer.C: diff --git a/common/throttle_timer_test.go b/common/throttle_timer_test.go index f6b5d1df5..7d96ac7c5 100644 --- a/common/throttle_timer_test.go +++ b/common/throttle_timer_test.go @@ -31,6 +31,9 @@ func (c *thCounter) Count() int { // Read should run in a go-routine and // updates count by one every time a packet comes in func (c *thCounter) Read() { + // note, since this channel never closes, this will never end + // if thCounter was used in anything beyond trivial test cases. + // it would have to be smarter. for range c.input { c.Increment() } From 3779310c72c93173b9e87561281e697e7cdf9437 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 6 Dec 2017 18:48:39 -0600 Subject: [PATCH 8/8] return back output internal channel (way go does with Timer) --- common/throttle_timer.go | 47 +++++++++++++++++++++-------------- common/throttle_timer_test.go | 2 +- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/common/throttle_timer.go b/common/throttle_timer.go index 051d44376..ab2ad2e62 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -11,10 +11,11 @@ If a long continuous burst of .Set() calls happens, ThrottleTimer fires at most once every "dur". */ type ThrottleTimer struct { - Name string - Ch chan struct{} - input chan command - dur time.Duration + Name string + Ch <-chan struct{} + input chan command + output chan<- struct{} + dur time.Duration timer *time.Timer isSet bool @@ -28,25 +29,22 @@ const ( Quit ) +// NewThrottleTimer creates a new ThrottleTimer. func NewThrottleTimer(name string, dur time.Duration) *ThrottleTimer { + c := make(chan struct{}) var t = &ThrottleTimer{ - Name: name, - Ch: make(chan struct{}), - dur: dur, - input: make(chan command), - timer: time.NewTimer(dur), + Name: name, + Ch: c, + dur: dur, + input: make(chan command), + output: c, + timer: time.NewTimer(dur), } t.timer.Stop() go t.run() return t } -// C is the proper way to listen to the timer output. -// t.Ch will be made private in the (near?) future -func (t *ThrottleTimer) C() <-chan struct{} { - return t.Ch -} - func (t *ThrottleTimer) run() { for { select { @@ -65,7 +63,7 @@ func (t *ThrottleTimer) run() { // trySend performs non-blocking send on t.Ch func (t *ThrottleTimer) trySend() { select { - case t.Ch <- struct{}{}: + case t.output <- struct{}{}: t.isSet = false default: // if we just want to drop, replace this with t.isSet = false @@ -105,8 +103,21 @@ func (t *ThrottleTimer) Unset() { t.input <- Unset } -// For ease of .Stop()'ing services before .Start()'ing them, -// we ignore .Stop()'s on nil ThrottleTimers +// Stop prevents the ThrottleTimer from firing. It always returns true. Stop does not +// close the channel, to prevent a read from the channel succeeding +// incorrectly. +// +// To prevent a timer created with NewThrottleTimer from firing after a call to +// Stop, check the return value and drain the channel. +// +// For example, assuming the program has not received from t.C already: +// +// if !t.Stop() { +// <-t.C +// } +// +// For ease of stopping services before starting them, we ignore Stop on nil +// ThrottleTimers. func (t *ThrottleTimer) Stop() bool { if t == nil { return false diff --git a/common/throttle_timer_test.go b/common/throttle_timer_test.go index 7d96ac7c5..a1b6606f5 100644 --- a/common/throttle_timer_test.go +++ b/common/throttle_timer_test.go @@ -49,7 +49,7 @@ func TestThrottle(test *testing.T) { t := NewThrottleTimer("foo", delay) // start at 0 - c := &thCounter{input: t.C()} + c := &thCounter{input: t.Ch} assert.Equal(0, c.Count()) go c.Read()