Browse Source

No more blocking on multiple Stop()

pull/1842/head
Ethan Frey 7 years ago
parent
commit
8797197cdf
4 changed files with 28 additions and 20 deletions
  1. +17
    -16
      common/repeat_timer.go
  2. +1
    -1
      common/repeat_timer_test.go
  3. +5
    -3
      common/throttle_timer.go
  4. +5
    -0
      common/throttle_timer_test.go

+ 17
- 16
common/repeat_timer.go View File

@ -16,8 +16,9 @@ type RepeatTimer struct {
output chan<- time.Time
input chan repeatCommand
dur time.Duration
timer *time.Timer
dur time.Duration
timer *time.Timer
stopped bool
}
type repeatCommand int32
@ -50,43 +51,42 @@ func (t *RepeatTimer) Reset() {
// For ease of .Stop()'ing services before .Start()'ing them,
// we ignore .Stop()'s on nil RepeatTimers.
func (t *RepeatTimer) Stop() bool {
if t == nil {
if t == nil || t.stopped {
return false
}
t.input <- RQuit
t.stopped = true
return true
}
func (t *RepeatTimer) run() {
for {
fmt.Println("for")
done := false
for !done {
select {
case cmd := <-t.input:
// stop goroutine if the input says so
// don't close channels, as closed channels mess up select reads
if t.processInput(cmd) {
t.timer.Stop()
return
}
done = t.processInput(cmd)
case <-t.timer.C:
fmt.Println("tick")
// send if not blocked, then start the next tick
// for blocking send, just
// t.output <- time.Now()
t.trySend()
t.timer.Reset(t.dur)
}
}
fmt.Println("end run")
}
// trySend performs non-blocking send on t.Ch
func (t *RepeatTimer) trySend() {
// TODO: this was blocking in previous version (t.Ch <- t_)
// should I use that behavior unstead of unblocking as per throttle?
select {
case t.output <- time.Now():
default:
}
// select {
// case t.output <- time.Now():
// default:
// }
t.output <- time.Now()
}
// all modifications of the internal state of ThrottleTimer
@ -98,6 +98,7 @@ func (t *RepeatTimer) processInput(cmd repeatCommand) (shutdown bool) {
case Reset:
t.timer.Reset(t.dur)
case RQuit:
fmt.Println("got quit")
t.timer.Stop()
shutdown = true
default:


+ 1
- 1
common/repeat_timer_test.go View File

@ -73,6 +73,6 @@ func TestRepeat(test *testing.T) {
time.Sleep(delay(7))
assert.Equal(6, c.Count())
// close channel to stop counter
// extra calls to stop don't block
t.Stop()
}

+ 5
- 3
common/throttle_timer.go View File

@ -17,8 +17,9 @@ type ThrottleTimer struct {
output chan<- struct{}
dur time.Duration
timer *time.Timer
isSet bool
timer *time.Timer
isSet bool
stopped bool
}
type throttleCommand int32
@ -82,6 +83,7 @@ func (t *ThrottleTimer) processInput(cmd throttleCommand) (shutdown bool) {
t.timer.Reset(t.dur)
}
case TQuit:
t.stopped = true
shutdown = true
fallthrough
case Unset:
@ -119,7 +121,7 @@ func (t *ThrottleTimer) Unset() {
// For ease of stopping services before starting them, we ignore Stop on nil
// ThrottleTimers.
func (t *ThrottleTimer) Stop() bool {
if t == nil {
if t == nil || t.stopped {
return false
}
t.input <- TQuit


+ 5
- 0
common/throttle_timer_test.go View File

@ -95,4 +95,9 @@ func TestThrottle(test *testing.T) {
stopped := t.Stop()
assert.True(stopped)
time.Sleep(longwait)
assert.Equal(5, c.Count())
// extra calls to stop don't block
t.Stop()
}

Loading…
Cancel
Save