From a25ed5ba1b0124f82f77b722cf3225cf4b3f18f5 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 21 Dec 2017 10:02:25 -0500 Subject: [PATCH 1/3] cmn: fix race condition in prng --- common/random.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/random.go b/common/random.go index 9df55ff81..ca71b6143 100644 --- a/common/random.go +++ b/common/random.go @@ -40,7 +40,7 @@ func RandStr(length int) string { chars := []byte{} MAIN_LOOP: for { - val := prng.Int63() + val := RandInt63() for i := 0; i < 10; i++ { v := int(val & 0x3f) // rightmost 6 bits if v >= 62 { // only 62 characters in strChars From b0b740210c60b7fc789382ff3a709426eb71903d Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 21 Dec 2017 11:15:17 -0500 Subject: [PATCH 2/3] cmn: fix repeate timer test with manual ticker --- common/repeat_timer.go | 86 +++++++++++++++++++++++++++++++++---- common/repeat_timer_test.go | 81 ++++++++++++++++------------------ 2 files changed, 114 insertions(+), 53 deletions(-) diff --git a/common/repeat_timer.go b/common/repeat_timer.go index d7d9154d4..1500e95d1 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -5,6 +5,72 @@ import ( "time" ) +// Ticker is a basic ticker interface. +type Ticker interface { + Chan() <-chan time.Time + Stop() + Reset() +} + +// DefaultTicker wraps the stdlibs Ticker implementation. +type DefaultTicker struct { + t *time.Ticker + dur time.Duration +} + +// NewDefaultTicker returns a new DefaultTicker +func NewDefaultTicker(dur time.Duration) *DefaultTicker { + return &DefaultTicker{ + time.NewTicker(dur), + dur, + } +} + +// Implements Ticker +func (t *DefaultTicker) Chan() <-chan time.Time { + return t.t.C +} + +// Implements Ticker +func (t *DefaultTicker) Stop() { + t.t.Stop() + t.t = nil +} + +// Implements Ticker +func (t *DefaultTicker) Reset() { + t.t = time.NewTicker(t.dur) +} + +// ManualTicker wraps a channel that can be manually sent on +type ManualTicker struct { + ch chan time.Time +} + +// NewManualTicker returns a new ManualTicker +func NewManualTicker(ch chan time.Time) *ManualTicker { + return &ManualTicker{ + ch: ch, + } +} + +// Implements Ticker +func (t *ManualTicker) Chan() <-chan time.Time { + return t.ch +} + +// Implements Ticker +func (t *ManualTicker) Stop() { + // noop +} + +// Implements Ticker +func (t *ManualTicker) Reset() { + // noop +} + +//--------------------------------------------------------------------- + /* RepeatTimer repeatedly sends a struct{}{} to .Ch after each "dur" period. It's good for keeping connections alive. @@ -15,30 +81,35 @@ type RepeatTimer struct { mtx sync.Mutex name string - ticker *time.Ticker + ticker Ticker quit chan struct{} wg *sync.WaitGroup - dur time.Duration } +// NewRepeatTimer returns a RepeatTimer with the DefaultTicker. func NewRepeatTimer(name string, dur time.Duration) *RepeatTimer { + ticker := NewDefaultTicker(dur) + return NewRepeatTimerWithTicker(name, ticker) +} + +// NewRepeatTimerWithTicker returns a RepeatTimer with the given ticker. +func NewRepeatTimerWithTicker(name string, ticker Ticker) *RepeatTimer { var t = &RepeatTimer{ Ch: make(chan time.Time), - ticker: time.NewTicker(dur), + ticker: ticker, quit: make(chan struct{}), wg: new(sync.WaitGroup), name: name, - dur: dur, } t.wg.Add(1) go t.fireRoutine(t.ticker) return t } -func (t *RepeatTimer) fireRoutine(ticker *time.Ticker) { +func (t *RepeatTimer) fireRoutine(ticker Ticker) { for { select { - case t_ := <-ticker.C: + case t_ := <-ticker.Chan(): t.Ch <- t_ case <-t.quit: // needed so we know when we can reset t.quit @@ -55,7 +126,7 @@ func (t *RepeatTimer) Reset() { t.mtx.Lock() // Lock defer t.mtx.Unlock() - t.ticker = time.NewTicker(t.dur) + t.ticker.Reset() t.quit = make(chan struct{}) t.wg.Add(1) go t.fireRoutine(t.ticker) @@ -80,7 +151,6 @@ func (t *RepeatTimer) Stop() bool { } close(t.quit) t.wg.Wait() // must wait for quit to close else we race Reset - t.ticker = nil } return exists } diff --git a/common/repeat_timer_test.go b/common/repeat_timer_test.go index 9f03f41df..98d991e9c 100644 --- a/common/repeat_timer_test.go +++ b/common/repeat_timer_test.go @@ -1,7 +1,6 @@ package common import ( - "sync" "testing" "time" @@ -9,69 +8,61 @@ import ( asrt "github.com/stretchr/testify/assert" ) -type rCounter struct { - input chan time.Time - mtx sync.Mutex - count int -} - -func (c *rCounter) Increment() { - c.mtx.Lock() - c.count++ - c.mtx.Unlock() -} - -func (c *rCounter) Count() int { - c.mtx.Lock() - val := c.count - c.mtx.Unlock() - return val -} - -// Read should run in a go-routine and -// updates count by one every time a packet comes in -func (c *rCounter) Read() { - for range c.input { - c.Increment() - } -} - +// NOTE: this only tests with the ManualTicker. +// How do you test a real-clock ticker properly? func TestRepeat(test *testing.T) { assert := asrt.New(test) - dur := time.Duration(50) * time.Millisecond - short := time.Duration(20) * time.Millisecond - // delay waits for cnt durations, an a little extra - delay := func(cnt int) time.Duration { - return time.Duration(cnt)*dur + time.Millisecond + ch := make(chan time.Time, 100) + // tick fires cnt times on ch + tick := func(cnt int) { + for i := 0; i < cnt; i++ { + ch <- time.Now() + } } - t := NewRepeatTimer("bar", dur) + tock := func(test *testing.T, t *RepeatTimer, cnt int) { + for i := 0; i < cnt; i++ { + after := time.After(time.Second * 2) + select { + case <-t.Ch: + case <-after: + test.Fatal("expected ticker to fire") + } + } + done := true + select { + case <-t.Ch: + done = false + default: + } + assert.True(done) + } + + ticker := NewManualTicker(ch) + t := NewRepeatTimerWithTicker("bar", ticker) // start at 0 - c := &rCounter{input: t.Ch} - go c.Read() - assert.Equal(0, c.Count()) + tock(test, t, 0) // wait for 4 periods - time.Sleep(delay(4)) - assert.Equal(4, c.Count()) + tick(4) + tock(test, t, 4) // keep reseting leads to no firing for i := 0; i < 20; i++ { - time.Sleep(short) + time.Sleep(time.Millisecond) t.Reset() } - assert.Equal(4, c.Count()) + tock(test, t, 0) // after this, it still works normal - time.Sleep(delay(2)) - assert.Equal(6, c.Count()) + tick(2) + tock(test, t, 2) // after a stop, nothing more is sent stopped := t.Stop() assert.True(stopped) - time.Sleep(delay(7)) - assert.Equal(6, c.Count()) + tock(test, t, 0) // close channel to stop counter close(t.Ch) From e2d7f1aa41dde5f29057dd08e64371a574b84c86 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 21 Dec 2017 14:21:15 -0500 Subject: [PATCH 3/3] cmn: fix race --- common/repeat_timer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/common/repeat_timer.go b/common/repeat_timer.go index 1500e95d1..0bc4d87b4 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -102,14 +102,14 @@ func NewRepeatTimerWithTicker(name string, ticker Ticker) *RepeatTimer { name: name, } t.wg.Add(1) - go t.fireRoutine(t.ticker) + go t.fireRoutine(t.ticker.Chan()) return t } -func (t *RepeatTimer) fireRoutine(ticker Ticker) { +func (t *RepeatTimer) fireRoutine(ch <-chan time.Time) { for { select { - case t_ := <-ticker.Chan(): + case t_ := <-ch: t.Ch <- t_ case <-t.quit: // needed so we know when we can reset t.quit @@ -129,7 +129,7 @@ func (t *RepeatTimer) Reset() { t.ticker.Reset() t.quit = make(chan struct{}) t.wg.Add(1) - go t.fireRoutine(t.ticker) + go t.fireRoutine(t.ticker.Chan()) } // For ease of .Stop()'ing services before .Start()'ing them,