You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

129 lines
2.6 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package common
  2. import (
  3. "time"
  4. )
  5. /*
  6. ThrottleTimer fires an event at most "dur" after each .Set() call.
  7. If a short burst of .Set() calls happens, ThrottleTimer fires once.
  8. If a long continuous burst of .Set() calls happens, ThrottleTimer fires
  9. at most once every "dur".
  10. */
  11. type ThrottleTimer struct {
  12. Name string
  13. Ch <-chan struct{}
  14. input chan throttleCommand
  15. output chan<- struct{}
  16. dur time.Duration
  17. timer *time.Timer
  18. isSet bool
  19. stopped bool
  20. }
  21. type throttleCommand int8
  22. const (
  23. Set throttleCommand = iota
  24. Unset
  25. TQuit
  26. )
  27. // NewThrottleTimer creates a new ThrottleTimer.
  28. func NewThrottleTimer(name string, dur time.Duration) *ThrottleTimer {
  29. c := make(chan struct{})
  30. var t = &ThrottleTimer{
  31. Name: name,
  32. Ch: c,
  33. dur: dur,
  34. input: make(chan throttleCommand),
  35. output: c,
  36. timer: time.NewTimer(dur),
  37. }
  38. t.timer.Stop()
  39. go t.run()
  40. return t
  41. }
  42. func (t *ThrottleTimer) run() {
  43. for {
  44. select {
  45. case cmd := <-t.input:
  46. // stop goroutine if the input says so
  47. // don't close channels, as closed channels mess up select reads
  48. if t.processInput(cmd) {
  49. return
  50. }
  51. case <-t.timer.C:
  52. t.trySend()
  53. }
  54. }
  55. }
  56. // trySend performs non-blocking send on t.Ch
  57. func (t *ThrottleTimer) trySend() {
  58. select {
  59. case t.output <- struct{}{}:
  60. t.isSet = false
  61. default:
  62. // if we just want to drop, replace this with t.isSet = false
  63. t.timer.Reset(t.dur)
  64. }
  65. }
  66. // all modifications of the internal state of ThrottleTimer
  67. // happen in this method. It is only called from the run goroutine
  68. // so we avoid any race conditions
  69. func (t *ThrottleTimer) processInput(cmd throttleCommand) (shutdown bool) {
  70. switch cmd {
  71. case Set:
  72. if !t.isSet {
  73. t.isSet = true
  74. t.timer.Reset(t.dur)
  75. }
  76. case TQuit:
  77. shutdown = true
  78. fallthrough
  79. case Unset:
  80. if t.isSet {
  81. t.isSet = false
  82. t.timer.Stop()
  83. }
  84. default:
  85. panic("unknown command!")
  86. }
  87. return shutdown
  88. }
  89. func (t *ThrottleTimer) Set() {
  90. t.input <- Set
  91. }
  92. func (t *ThrottleTimer) Unset() {
  93. t.input <- Unset
  94. }
  95. // Stop prevents the ThrottleTimer from firing. It always returns true. Stop does not
  96. // close the channel, to prevent a read from the channel succeeding
  97. // incorrectly.
  98. //
  99. // To prevent a timer created with NewThrottleTimer from firing after a call to
  100. // Stop, check the return value and drain the channel.
  101. //
  102. // For example, assuming the program has not received from t.C already:
  103. //
  104. // if !t.Stop() {
  105. // <-t.C
  106. // }
  107. //
  108. // For ease of stopping services before starting them, we ignore Stop on nil
  109. // ThrottleTimers.
  110. func (t *ThrottleTimer) Stop() bool {
  111. if t == nil || t.stopped {
  112. return false
  113. }
  114. t.input <- TQuit
  115. t.stopped = true
  116. return true
  117. }