diff --git a/common/repeat_timer.go b/common/repeat_timer.go index d7d9154d4..23faf74ae 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -1,7 +1,6 @@ package common import ( - "sync" "time" ) @@ -11,76 +10,93 @@ It's good for keeping connections alive. A RepeatTimer must be Stop()'d or it will keep a goroutine alive. */ type RepeatTimer struct { - Ch chan time.Time + Name string + Ch <-chan time.Time + output chan<- time.Time + input chan repeatCommand - mtx sync.Mutex - name string - ticker *time.Ticker - quit chan struct{} - wg *sync.WaitGroup - dur time.Duration + dur time.Duration + ticker *time.Ticker + stopped bool } +type repeatCommand int8 + +const ( + Reset repeatCommand = iota + RQuit +) + func NewRepeatTimer(name string, dur time.Duration) *RepeatTimer { + c := make(chan time.Time) var t = &RepeatTimer{ - Ch: make(chan time.Time), - ticker: time.NewTicker(dur), - quit: make(chan struct{}), - wg: new(sync.WaitGroup), - name: name, + Name: name, + Ch: c, + output: c, + input: make(chan repeatCommand), + dur: dur, + ticker: time.NewTicker(dur), } - t.wg.Add(1) - go t.fireRoutine(t.ticker) + go t.run() return t } -func (t *RepeatTimer) fireRoutine(ticker *time.Ticker) { - for { - select { - case t_ := <-ticker.C: - t.Ch <- t_ - case <-t.quit: - // needed so we know when we can reset t.quit - t.wg.Done() - return - } - } -} - // Wait the duration again before firing. func (t *RepeatTimer) Reset() { - t.Stop() - - t.mtx.Lock() // Lock - defer t.mtx.Unlock() - - t.ticker = time.NewTicker(t.dur) - t.quit = make(chan struct{}) - t.wg.Add(1) - go t.fireRoutine(t.ticker) + t.input <- Reset } // For ease of .Stop()'ing services before .Start()'ing them, // we ignore .Stop()'s on nil RepeatTimers. func (t *RepeatTimer) Stop() bool { - if t == nil { + // use t.stopped to gracefully handle many Stop() without blocking + if t == nil || t.stopped { return false } - t.mtx.Lock() // Lock - defer t.mtx.Unlock() + t.input <- RQuit + t.stopped = true + return true +} - exists := t.ticker != nil - if exists { - t.ticker.Stop() // does not close the channel +func (t *RepeatTimer) run() { + done := false + for !done { select { - case <-t.Ch: - // read off channel if there's anything there - default: + case cmd := <-t.input: + // stop goroutine if the input says so + // don't close channels, as closed channels mess up select reads + done = t.processInput(cmd) + case tick := <-t.ticker.C: + t.send(tick) } - close(t.quit) - t.wg.Wait() // must wait for quit to close else we race Reset - t.ticker = nil } - return exists +} + +// send performs blocking send on t.Ch +func (t *RepeatTimer) send(tick time.Time) { + // XXX: possibly it is better to not block: + // https://golang.org/src/time/sleep.go#L132 + // select { + // case t.output <- tick: + // default: + // } + t.output <- tick +} + +// all modifications of the internal state of ThrottleTimer +// happen in this method. It is only called from the run goroutine +// so we avoid any race conditions +func (t *RepeatTimer) processInput(cmd repeatCommand) (shutdown bool) { + switch cmd { + case Reset: + t.ticker.Stop() + t.ticker = time.NewTicker(t.dur) + case RQuit: + t.ticker.Stop() + shutdown = true + default: + panic("unknown command!") + } + return shutdown } diff --git a/common/repeat_timer_test.go b/common/repeat_timer_test.go index 87f34b950..db53aa614 100644 --- a/common/repeat_timer_test.go +++ b/common/repeat_timer_test.go @@ -10,7 +10,7 @@ import ( ) type rCounter struct { - input chan time.Time + input <-chan time.Time mtx sync.Mutex count int } @@ -39,11 +39,11 @@ func (c *rCounter) Read() { func TestRepeat(test *testing.T) { assert := asrt.New(test) - dur := time.Duration(50) * time.Millisecond + dur := time.Duration(100) * time.Millisecond short := time.Duration(20) * time.Millisecond // delay waits for cnt durations, an a little extra delay := func(cnt int) time.Duration { - return time.Duration(cnt)*dur + time.Duration(5)*time.Millisecond + return time.Duration(cnt)*dur + time.Duration(10)*time.Millisecond } t := NewRepeatTimer("bar", dur) @@ -70,9 +70,9 @@ func TestRepeat(test *testing.T) { // after a stop, nothing more is sent stopped := t.Stop() assert.True(stopped) - time.Sleep(delay(7)) + time.Sleep(delay(2)) assert.Equal(6, c.Count()) - // close channel to stop counter - close(t.Ch) + // extra calls to stop don't block + t.Stop() } diff --git a/common/throttle_timer.go b/common/throttle_timer.go index ab2ad2e62..a5bd6ded8 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -13,20 +13,21 @@ at most once every "dur". type ThrottleTimer struct { Name string Ch <-chan struct{} - input chan command + input chan throttleCommand output chan<- struct{} dur time.Duration - timer *time.Timer - isSet bool + timer *time.Timer + isSet bool + stopped bool } -type command int32 +type throttleCommand int8 const ( - Set command = iota + Set throttleCommand = iota Unset - Quit + TQuit ) // NewThrottleTimer creates a new ThrottleTimer. @@ -36,7 +37,7 @@ func NewThrottleTimer(name string, dur time.Duration) *ThrottleTimer { Name: name, Ch: c, dur: dur, - input: make(chan command), + input: make(chan throttleCommand), output: c, timer: time.NewTimer(dur), } @@ -74,14 +75,14 @@ func (t *ThrottleTimer) trySend() { // all modifications of the internal state of ThrottleTimer // happen in this method. It is only called from the run goroutine // so we avoid any race conditions -func (t *ThrottleTimer) processInput(cmd command) (shutdown bool) { +func (t *ThrottleTimer) processInput(cmd throttleCommand) (shutdown bool) { switch cmd { case Set: if !t.isSet { t.isSet = true t.timer.Reset(t.dur) } - case Quit: + case TQuit: shutdown = true fallthrough case Unset: @@ -119,9 +120,10 @@ func (t *ThrottleTimer) Unset() { // For ease of stopping services before starting them, we ignore Stop on nil // ThrottleTimers. func (t *ThrottleTimer) Stop() bool { - if t == nil { + if t == nil || t.stopped { return false } - t.input <- Quit + t.input <- TQuit + t.stopped = true return true } diff --git a/common/throttle_timer_test.go b/common/throttle_timer_test.go index a1b6606f5..94ec1b43c 100644 --- a/common/throttle_timer_test.go +++ b/common/throttle_timer_test.go @@ -95,4 +95,6 @@ func TestThrottle(test *testing.T) { stopped := t.Stop() assert.True(stopped) + // extra calls to stop don't block + t.Stop() }