From 6b5d08f7daf180036d338d7d7d729861bb58eae5 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sat, 23 Dec 2017 04:18:50 -0800 Subject: [PATCH] RepeatTimer fix --- common/repeat_timer.go | 228 ++++++++++++++++++++++++------------ common/repeat_timer_test.go | 86 ++++++++------ 2 files changed, 203 insertions(+), 111 deletions(-) diff --git a/common/repeat_timer.go b/common/repeat_timer.go index 0bc4d87b4..2947a9166 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -5,152 +5,224 @@ import ( "time" ) +// Used by RepeatTimer the first time, +// and every time it's Reset() after Stop(). +type TickerMaker func(dur time.Duration) Ticker + // Ticker is a basic ticker interface. type Ticker interface { + + // Never changes, never closes. Chan() <-chan time.Time + + // Stopping a stopped Ticker will panic. Stop() - Reset() } -// DefaultTicker wraps the stdlibs Ticker implementation. -type DefaultTicker struct { - t *time.Ticker - dur time.Duration -} +//---------------------------------------- +// defaultTickerMaker -// NewDefaultTicker returns a new DefaultTicker -func NewDefaultTicker(dur time.Duration) *DefaultTicker { - return &DefaultTicker{ - time.NewTicker(dur), - dur, - } +func defaultTickerMaker(dur time.Duration) Ticker { + ticker := time.NewTicker(dur) + return (*defaultTicker)(ticker) } +type defaultTicker time.Ticker + // Implements Ticker -func (t *DefaultTicker) Chan() <-chan time.Time { - return t.t.C +func (t *defaultTicker) Chan() <-chan time.Time { + return t.C } // Implements Ticker -func (t *DefaultTicker) Stop() { - t.t.Stop() - t.t = nil +func (t *defaultTicker) Stop() { + t.Stop() } -// Implements Ticker -func (t *DefaultTicker) Reset() { - t.t = time.NewTicker(t.dur) +//---------------------------------------- +// LogicalTickerMaker + +// Construct a TickerMaker that always uses `ch`. +// It's useful for simulating a deterministic clock. +func NewLogicalTickerMaker(ch chan time.Time) TickerMaker { + return func(dur time.Duration) Ticker { + return newLogicalTicker(ch, dur) + } } -// ManualTicker wraps a channel that can be manually sent on -type ManualTicker struct { - ch chan time.Time +type logicalTicker struct { + source <-chan time.Time + ch chan time.Time + quit chan struct{} } -// NewManualTicker returns a new ManualTicker -func NewManualTicker(ch chan time.Time) *ManualTicker { - return &ManualTicker{ - ch: ch, +func newLogicalTicker(source <-chan time.Time, interval time.Duration) Ticker { + lt := &logicalTicker{ + source: source, + ch: make(chan time.Time), + quit: make(chan struct{}), } + go lt.fireRoutine(interval) + return lt } -// Implements Ticker -func (t *ManualTicker) Chan() <-chan time.Time { - return t.ch +// We clearly need a new goroutine, for logicalTicker may have been created +// from a goroutine separate from the source. +func (t *logicalTicker) fireRoutine(interval time.Duration) { + source := t.source + + // Init `lasttime` + lasttime := time.Time{} + select { + case lasttime = <-source: + case <-t.quit: + return + } + // Init `lasttime` end + + timeleft := interval + for { + select { + case newtime := <-source: + elapsed := newtime.Sub(lasttime) + timeleft -= elapsed + if timeleft <= 0 { + // Block for determinism until the ticker is stopped. + select { + case t.ch <- newtime: + case <-t.quit: + return + } + // Reset timeleft. + // Don't try to "catch up" by sending more. + // "Ticker adjusts the intervals or drops ticks to make up for + // slow receivers" - https://golang.org/pkg/time/#Ticker + timeleft = interval + } + case <-t.quit: + return // done + } + } } // Implements Ticker -func (t *ManualTicker) Stop() { - // noop +func (t *logicalTicker) Chan() <-chan time.Time { + return t.ch // immutable } // Implements Ticker -func (t *ManualTicker) Reset() { - // noop +func (t *logicalTicker) Stop() { + close(t.quit) // it *should* panic when stopped twice. } //--------------------------------------------------------------------- /* -RepeatTimer repeatedly sends a struct{}{} to .Ch after each "dur" period. -It's good for keeping connections alive. -A RepeatTimer must be Stop()'d or it will keep a goroutine alive. + RepeatTimer repeatedly sends a struct{}{} to `.Chan()` after each `dur` + period. (It's good for keeping connections alive.) + A RepeatTimer must be stopped, or it will keep a goroutine alive. */ type RepeatTimer struct { - Ch chan time.Time + name string + ch chan time.Time + tm TickerMaker mtx sync.Mutex - name string + dur time.Duration ticker Ticker quit chan struct{} - wg *sync.WaitGroup } -// NewRepeatTimer returns a RepeatTimer with the DefaultTicker. +// NewRepeatTimer returns a RepeatTimer with a defaultTicker. func NewRepeatTimer(name string, dur time.Duration) *RepeatTimer { - ticker := NewDefaultTicker(dur) - return NewRepeatTimerWithTicker(name, ticker) + return NewRepeatTimerWithTickerMaker(name, dur, defaultTickerMaker) } -// NewRepeatTimerWithTicker returns a RepeatTimer with the given ticker. -func NewRepeatTimerWithTicker(name string, ticker Ticker) *RepeatTimer { +// NewRepeatTimerWithTicker returns a RepeatTimer with the given ticker +// maker. +func NewRepeatTimerWithTickerMaker(name string, dur time.Duration, tm TickerMaker) *RepeatTimer { var t = &RepeatTimer{ - Ch: make(chan time.Time), - ticker: ticker, - quit: make(chan struct{}), - wg: new(sync.WaitGroup), name: name, + ch: make(chan time.Time), + tm: tm, + dur: dur, + ticker: nil, + quit: nil, } - t.wg.Add(1) - go t.fireRoutine(t.ticker.Chan()) + t.reset() return t } -func (t *RepeatTimer) fireRoutine(ch <-chan time.Time) { +func (t *RepeatTimer) fireRoutine(ch <-chan time.Time, quit <-chan struct{}) { for { select { case t_ := <-ch: - t.Ch <- t_ - case <-t.quit: - // needed so we know when we can reset t.quit - t.wg.Done() + t.ch <- t_ + case <-quit: // NOTE: `t.quit` races. return } } } +func (t *RepeatTimer) Chan() <-chan time.Time { + return t.ch +} + +func (t *RepeatTimer) Stop() { + t.mtx.Lock() + defer t.mtx.Unlock() + + t.stop() +} + // Wait the duration again before firing. func (t *RepeatTimer) Reset() { - t.Stop() - - t.mtx.Lock() // Lock + t.mtx.Lock() defer t.mtx.Unlock() - t.ticker.Reset() - t.quit = make(chan struct{}) - t.wg.Add(1) - go t.fireRoutine(t.ticker.Chan()) + t.reset() } -// For ease of .Stop()'ing services before .Start()'ing them, -// we ignore .Stop()'s on nil RepeatTimers. -func (t *RepeatTimer) Stop() bool { - if t == nil { - return false +//---------------------------------------- +// Misc. + +// CONTRACT: (non-constructor) caller should hold t.mtx. +func (t *RepeatTimer) reset() { + if t.ticker != nil { + t.stop() } - t.mtx.Lock() // Lock - defer t.mtx.Unlock() + t.ticker = t.tm(t.dur) + t.quit = make(chan struct{}) + go t.fireRoutine(t.ticker.Chan(), t.quit) +} + +// CONTRACT: caller should hold t.mtx. +func (t *RepeatTimer) stop() { + if t.ticker == nil { + /* + Similar to the case of closing channels twice: + https://groups.google.com/forum/#!topic/golang-nuts/rhxMiNmRAPk + Stopping a RepeatTimer twice implies that you do + not know whether you are done or not. + If you're calling stop on a stopped RepeatTimer, + you probably have race conditions. + */ + panic("Tried to stop a stopped RepeatTimer") + } + t.ticker.Stop() + t.ticker = nil + /* + XXX + From https://golang.org/pkg/time/#Ticker: + "Stop the ticker to release associated resources" + "After Stop, no more ticks will be sent" + So we shouldn't have to do the below. - exists := t.ticker != nil - if exists { - t.ticker.Stop() // does not close the channel select { - case <-t.Ch: + case <-t.ch: // read off channel if there's anything there default: } - close(t.quit) - t.wg.Wait() // must wait for quit to close else we race Reset - } - return exists + */ + close(t.quit) } diff --git a/common/repeat_timer_test.go b/common/repeat_timer_test.go index 98d991e9c..f43cc7514 100644 --- a/common/repeat_timer_test.go +++ b/common/repeat_timer_test.go @@ -4,66 +4,86 @@ import ( "testing" "time" - // make govet noshadow happy... - asrt "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/assert" ) -// NOTE: this only tests with the ManualTicker. +// NOTE: this only tests with the LogicalTicker. // How do you test a real-clock ticker properly? -func TestRepeat(test *testing.T) { - assert := asrt.New(test) +func TestRepeat(t *testing.T) { ch := make(chan time.Time, 100) - // tick fires cnt times on ch + lt := time.Time{} // zero time is year 1 + + // tick fires `cnt` times for each second. tick := func(cnt int) { for i := 0; i < cnt; i++ { - ch <- time.Now() + lt = lt.Add(time.Second) + ch <- lt } } - tock := func(test *testing.T, t *RepeatTimer, cnt int) { + + // tock consumes Ticker.Chan() events `cnt` times. + tock := func(t *testing.T, rt *RepeatTimer, cnt int) { for i := 0; i < cnt; i++ { - after := time.After(time.Second * 2) + timeout := time.After(time.Second * 2) select { - case <-t.Ch: - case <-after: - test.Fatal("expected ticker to fire") + case _ = <-rt.Chan(): + case <-timeout: + panic("QWE") + t.Fatal("expected RepeatTimer to fire") } } done := true select { - case <-t.Ch: + case <-rt.Chan(): done = false default: } - assert.True(done) + assert.True(t, done) } - ticker := NewManualTicker(ch) - t := NewRepeatTimerWithTicker("bar", ticker) + tm := NewLogicalTickerMaker(ch) + dur := time.Duration(0) // dontcare + rt := NewRepeatTimerWithTickerMaker("bar", dur, tm) - // start at 0 - tock(test, t, 0) + // Start at 0. + tock(t, rt, 0) + tick(1) // init time - // wait for 4 periods - tick(4) - tock(test, t, 4) + tock(t, rt, 0) + tick(1) // wait 1 periods + tock(t, rt, 1) + tick(2) // wait 2 periods + tock(t, rt, 2) + tick(3) // wait 3 periods + tock(t, rt, 3) + tick(4) // wait 4 periods + tock(t, rt, 4) - // keep reseting leads to no firing + // Multiple resets leads to no firing. for i := 0; i < 20; i++ { time.Sleep(time.Millisecond) - t.Reset() + rt.Reset() } - tock(test, t, 0) - // after this, it still works normal - tick(2) - tock(test, t, 2) + // After this, it works as new. + tock(t, rt, 0) + tick(1) // init time + + tock(t, rt, 0) + tick(1) // wait 1 periods + tock(t, rt, 1) + tick(2) // wait 2 periods + tock(t, rt, 2) + tick(3) // wait 3 periods + tock(t, rt, 3) + tick(4) // wait 4 periods + tock(t, rt, 4) - // after a stop, nothing more is sent - stopped := t.Stop() - assert.True(stopped) - tock(test, t, 0) + // After a stop, nothing more is sent. + rt.Stop() + tock(t, rt, 0) - // close channel to stop counter - close(t.Ch) + // Another stop panics. + assert.Panics(t, func() { rt.Stop() }) }