diff --git a/common/repeat_timer.go b/common/repeat_timer.go index 0f6501131..734c2d32a 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -16,8 +16,9 @@ type RepeatTimer struct { output chan<- time.Time input chan repeatCommand - dur time.Duration - timer *time.Timer + dur time.Duration + timer *time.Timer + stopped bool } type repeatCommand int32 @@ -50,43 +51,42 @@ func (t *RepeatTimer) Reset() { // For ease of .Stop()'ing services before .Start()'ing them, // we ignore .Stop()'s on nil RepeatTimers. func (t *RepeatTimer) Stop() bool { - if t == nil { + if t == nil || t.stopped { return false } t.input <- RQuit + t.stopped = true return true } func (t *RepeatTimer) run() { - for { - fmt.Println("for") + done := false + for !done { select { case cmd := <-t.input: // stop goroutine if the input says so // don't close channels, as closed channels mess up select reads - if t.processInput(cmd) { - t.timer.Stop() - return - } + done = t.processInput(cmd) case <-t.timer.C: - fmt.Println("tick") // send if not blocked, then start the next tick - // for blocking send, just - // t.output <- time.Now() t.trySend() t.timer.Reset(t.dur) } } + fmt.Println("end run") } // trySend performs non-blocking send on t.Ch func (t *RepeatTimer) trySend() { // TODO: this was blocking in previous version (t.Ch <- t_) // should I use that behavior unstead of unblocking as per throttle? - select { - case t.output <- time.Now(): - default: - } + + // select { + // case t.output <- time.Now(): + // default: + // } + + t.output <- time.Now() } // all modifications of the internal state of ThrottleTimer @@ -98,6 +98,7 @@ func (t *RepeatTimer) processInput(cmd repeatCommand) (shutdown bool) { case Reset: t.timer.Reset(t.dur) case RQuit: + fmt.Println("got quit") t.timer.Stop() shutdown = true default: diff --git a/common/repeat_timer_test.go b/common/repeat_timer_test.go index d66cd3152..15ca32c31 100644 --- a/common/repeat_timer_test.go +++ b/common/repeat_timer_test.go @@ -73,6 +73,6 @@ func TestRepeat(test *testing.T) { time.Sleep(delay(7)) assert.Equal(6, c.Count()) - // close channel to stop counter + // extra calls to stop don't block t.Stop() } diff --git a/common/throttle_timer.go b/common/throttle_timer.go index c148d9904..0e54f1027 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -17,8 +17,9 @@ type ThrottleTimer struct { output chan<- struct{} dur time.Duration - timer *time.Timer - isSet bool + timer *time.Timer + isSet bool + stopped bool } type throttleCommand int32 @@ -82,6 +83,7 @@ func (t *ThrottleTimer) processInput(cmd throttleCommand) (shutdown bool) { t.timer.Reset(t.dur) } case TQuit: + t.stopped = true shutdown = true fallthrough case Unset: @@ -119,7 +121,7 @@ func (t *ThrottleTimer) Unset() { // For ease of stopping services before starting them, we ignore Stop on nil // ThrottleTimers. func (t *ThrottleTimer) Stop() bool { - if t == nil { + if t == nil || t.stopped { return false } t.input <- TQuit diff --git a/common/throttle_timer_test.go b/common/throttle_timer_test.go index a1b6606f5..2a81bb02e 100644 --- a/common/throttle_timer_test.go +++ b/common/throttle_timer_test.go @@ -95,4 +95,9 @@ func TestThrottle(test *testing.T) { stopped := t.Stop() assert.True(stopped) + time.Sleep(longwait) + assert.Equal(5, c.Count()) + + // extra calls to stop don't block + t.Stop() }