You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

229 lines
4.9 KiB

9 years ago
9 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
7 years ago
7 years ago
7 years ago
9 years ago
7 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
7 years ago
9 years ago
  1. package common
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // Used by RepeatTimer the first time,
  7. // and every time it's Reset() after Stop().
  8. type TickerMaker func(dur time.Duration) Ticker
  9. // Ticker is a basic ticker interface.
  10. type Ticker interface {
  11. // Never changes, never closes.
  12. Chan() <-chan time.Time
  13. // Stopping a stopped Ticker will panic.
  14. Stop()
  15. }
  16. //----------------------------------------
  17. // defaultTickerMaker
  18. func defaultTickerMaker(dur time.Duration) Ticker {
  19. ticker := time.NewTicker(dur)
  20. return (*defaultTicker)(ticker)
  21. }
  22. type defaultTicker time.Ticker
  23. // Implements Ticker
  24. func (t *defaultTicker) Chan() <-chan time.Time {
  25. return t.C
  26. }
  27. // Implements Ticker
  28. func (t *defaultTicker) Stop() {
  29. ((*time.Ticker)(t)).Stop()
  30. }
  31. //----------------------------------------
  32. // LogicalTickerMaker
  33. // Construct a TickerMaker that always uses `source`.
  34. // It's useful for simulating a deterministic clock.
  35. func NewLogicalTickerMaker(source chan time.Time) TickerMaker {
  36. return func(dur time.Duration) Ticker {
  37. return newLogicalTicker(source, dur)
  38. }
  39. }
  40. type logicalTicker struct {
  41. source <-chan time.Time
  42. ch chan time.Time
  43. quit chan struct{}
  44. }
  45. func newLogicalTicker(source <-chan time.Time, interval time.Duration) Ticker {
  46. lt := &logicalTicker{
  47. source: source,
  48. ch: make(chan time.Time),
  49. quit: make(chan struct{}),
  50. }
  51. go lt.fireRoutine(interval)
  52. return lt
  53. }
  54. // We need a goroutine to read times from t.source
  55. // and fire on t.Chan() when `interval` has passed.
  56. func (t *logicalTicker) fireRoutine(interval time.Duration) {
  57. source := t.source
  58. // Init `lasttime`
  59. lasttime := time.Time{}
  60. select {
  61. case lasttime = <-source:
  62. case <-t.quit:
  63. return
  64. }
  65. // Init `lasttime` end
  66. for {
  67. select {
  68. case newtime := <-source:
  69. elapsed := newtime.Sub(lasttime)
  70. if interval <= elapsed {
  71. // Block for determinism until the ticker is stopped.
  72. select {
  73. case t.ch <- newtime:
  74. case <-t.quit:
  75. return
  76. }
  77. // Reset timeleft.
  78. // Don't try to "catch up" by sending more.
  79. // "Ticker adjusts the intervals or drops ticks to make up for
  80. // slow receivers" - https://golang.org/pkg/time/#Ticker
  81. lasttime = newtime
  82. }
  83. case <-t.quit:
  84. return // done
  85. }
  86. }
  87. }
  88. // Implements Ticker
  89. func (t *logicalTicker) Chan() <-chan time.Time {
  90. return t.ch // immutable
  91. }
  92. // Implements Ticker
  93. func (t *logicalTicker) Stop() {
  94. close(t.quit) // it *should* panic when stopped twice.
  95. }
  96. //---------------------------------------------------------------------
  97. /*
  98. RepeatTimer repeatedly sends a struct{}{} to `.Chan()` after each `dur`
  99. period. (It's good for keeping connections alive.)
  100. A RepeatTimer must be stopped, or it will keep a goroutine alive.
  101. */
  102. type RepeatTimer struct {
  103. name string
  104. ch chan time.Time
  105. tm TickerMaker
  106. mtx sync.Mutex
  107. dur time.Duration
  108. ticker Ticker
  109. quit chan struct{}
  110. }
  111. // NewRepeatTimer returns a RepeatTimer with a defaultTicker.
  112. func NewRepeatTimer(name string, dur time.Duration) *RepeatTimer {
  113. return NewRepeatTimerWithTickerMaker(name, dur, defaultTickerMaker)
  114. }
  115. // NewRepeatTimerWithTicker returns a RepeatTimer with the given ticker
  116. // maker.
  117. func NewRepeatTimerWithTickerMaker(name string, dur time.Duration, tm TickerMaker) *RepeatTimer {
  118. var t = &RepeatTimer{
  119. name: name,
  120. ch: make(chan time.Time),
  121. tm: tm,
  122. dur: dur,
  123. ticker: nil,
  124. quit: nil,
  125. }
  126. t.reset()
  127. return t
  128. }
  129. func (t *RepeatTimer) fireRoutine(ch <-chan time.Time, quit <-chan struct{}) {
  130. for {
  131. select {
  132. case t_ := <-ch:
  133. select {
  134. case t.ch <- t_:
  135. case <-quit:
  136. return
  137. }
  138. case <-quit: // NOTE: `t.quit` races.
  139. return
  140. }
  141. }
  142. }
  143. func (t *RepeatTimer) Chan() <-chan time.Time {
  144. return t.ch
  145. }
  146. func (t *RepeatTimer) Stop() {
  147. t.mtx.Lock()
  148. defer t.mtx.Unlock()
  149. t.stop()
  150. }
  151. // Wait the duration again before firing.
  152. func (t *RepeatTimer) Reset() {
  153. t.mtx.Lock()
  154. defer t.mtx.Unlock()
  155. t.reset()
  156. }
  157. //----------------------------------------
  158. // Misc.
  159. // CONTRACT: (non-constructor) caller should hold t.mtx.
  160. func (t *RepeatTimer) reset() {
  161. if t.ticker != nil {
  162. t.stop()
  163. }
  164. t.ticker = t.tm(t.dur)
  165. t.quit = make(chan struct{})
  166. go t.fireRoutine(t.ticker.Chan(), t.quit)
  167. }
  168. // CONTRACT: caller should hold t.mtx.
  169. func (t *RepeatTimer) stop() {
  170. if t.ticker == nil {
  171. /*
  172. Similar to the case of closing channels twice:
  173. https://groups.google.com/forum/#!topic/golang-nuts/rhxMiNmRAPk
  174. Stopping a RepeatTimer twice implies that you do
  175. not know whether you are done or not.
  176. If you're calling stop on a stopped RepeatTimer,
  177. you probably have race conditions.
  178. */
  179. panic("Tried to stop a stopped RepeatTimer")
  180. }
  181. t.ticker.Stop()
  182. t.ticker = nil
  183. /*
  184. From https://golang.org/pkg/time/#Ticker:
  185. "Stop the ticker to release associated resources"
  186. "After Stop, no more ticks will be sent"
  187. So we shouldn't have to do the below.
  188. select {
  189. case <-t.ch:
  190. // read off channel if there's anything there
  191. default:
  192. }
  193. */
  194. close(t.quit)
  195. }