You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

127 lines
2.5 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package common
  2. import (
  3. "time"
  4. )
  5. /*
  6. ThrottleTimer fires an event at most "dur" after each .Set() call.
  7. If a short burst of .Set() calls happens, ThrottleTimer fires once.
  8. If a long continuous burst of .Set() calls happens, ThrottleTimer fires
  9. at most once every "dur".
  10. */
  11. type ThrottleTimer struct {
  12. Name string
  13. Ch <-chan struct{}
  14. input chan command
  15. output chan<- struct{}
  16. dur time.Duration
  17. timer *time.Timer
  18. isSet bool
  19. }
  20. type command int32
  21. const (
  22. Set command = iota
  23. Unset
  24. Quit
  25. )
  26. // NewThrottleTimer creates a new ThrottleTimer.
  27. func NewThrottleTimer(name string, dur time.Duration) *ThrottleTimer {
  28. c := make(chan struct{})
  29. var t = &ThrottleTimer{
  30. Name: name,
  31. Ch: c,
  32. dur: dur,
  33. input: make(chan command),
  34. output: c,
  35. timer: time.NewTimer(dur),
  36. }
  37. t.timer.Stop()
  38. go t.run()
  39. return t
  40. }
  41. func (t *ThrottleTimer) run() {
  42. for {
  43. select {
  44. case cmd := <-t.input:
  45. // stop goroutine if the input says so
  46. // don't close channels, as closed channels mess up select reads
  47. if t.processInput(cmd) {
  48. return
  49. }
  50. case <-t.timer.C:
  51. t.trySend()
  52. }
  53. }
  54. }
  55. // trySend performs non-blocking send on t.Ch
  56. func (t *ThrottleTimer) trySend() {
  57. select {
  58. case t.output <- struct{}{}:
  59. t.isSet = false
  60. default:
  61. // if we just want to drop, replace this with t.isSet = false
  62. t.timer.Reset(t.dur)
  63. }
  64. }
  65. // all modifications of the internal state of ThrottleTimer
  66. // happen in this method. It is only called from the run goroutine
  67. // so we avoid any race conditions
  68. func (t *ThrottleTimer) processInput(cmd command) (shutdown bool) {
  69. switch cmd {
  70. case Set:
  71. if !t.isSet {
  72. t.isSet = true
  73. t.timer.Reset(t.dur)
  74. }
  75. case Quit:
  76. shutdown = true
  77. fallthrough
  78. case Unset:
  79. if t.isSet {
  80. t.isSet = false
  81. t.timer.Stop()
  82. }
  83. default:
  84. panic("unknown command!")
  85. }
  86. return shutdown
  87. }
  88. func (t *ThrottleTimer) Set() {
  89. t.input <- Set
  90. }
  91. func (t *ThrottleTimer) Unset() {
  92. t.input <- Unset
  93. }
  94. // Stop prevents the ThrottleTimer from firing. It always returns true. Stop does not
  95. // close the channel, to prevent a read from the channel succeeding
  96. // incorrectly.
  97. //
  98. // To prevent a timer created with NewThrottleTimer from firing after a call to
  99. // Stop, check the return value and drain the channel.
  100. //
  101. // For example, assuming the program has not received from t.C already:
  102. //
  103. // if !t.Stop() {
  104. // <-t.C
  105. // }
  106. //
  107. // For ease of stopping services before starting them, we ignore Stop on nil
  108. // ThrottleTimers.
  109. func (t *ThrottleTimer) Stop() bool {
  110. if t == nil {
  111. return false
  112. }
  113. t.input <- Quit
  114. return true
  115. }