You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

174 lines
4.1 KiB

  1. package v2
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. "github.com/tendermint/tendermint/libs/log"
  6. )
  7. // TODO
  8. // * revisit panic conditions
  9. // * audit log levels
  10. // * Convert routine to an interface with concrete implmentation
  11. // * determine the public interface
  12. type handleFunc = func(event Event) (Events, error)
  13. type Routine struct {
  14. name string
  15. input chan Event
  16. errors chan error
  17. out chan Event
  18. stopped chan struct{}
  19. finished chan error
  20. running *uint32
  21. handle handleFunc
  22. logger log.Logger
  23. metrics *Metrics
  24. }
  25. func newRoutine(name string, handleFunc handleFunc) *Routine {
  26. return &Routine{
  27. name: name,
  28. input: make(chan Event, 1),
  29. handle: handleFunc,
  30. errors: make(chan error, 1),
  31. out: make(chan Event, 1),
  32. stopped: make(chan struct{}, 1),
  33. finished: make(chan error, 1),
  34. running: new(uint32),
  35. logger: log.NewNopLogger(),
  36. metrics: NopMetrics(),
  37. }
  38. }
  39. func (rt *Routine) setLogger(logger log.Logger) {
  40. rt.logger = logger
  41. }
  42. func (rt *Routine) setMetrics(metrics *Metrics) {
  43. rt.metrics = metrics
  44. }
  45. func (rt *Routine) run() {
  46. rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name))
  47. starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1))
  48. if !starting {
  49. panic("Routine has already started")
  50. }
  51. errorsDrained := false
  52. for {
  53. if !rt.isRunning() {
  54. break
  55. }
  56. select {
  57. case iEvent, ok := <-rt.input:
  58. rt.metrics.EventsIn.With("routine", rt.name).Add(1)
  59. if !ok {
  60. if !errorsDrained {
  61. continue // wait for errors to be drainned
  62. }
  63. rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name))
  64. rt.stopped <- struct{}{}
  65. rt.terminate(fmt.Errorf("stopped"))
  66. return
  67. }
  68. oEvents, err := rt.handle(iEvent)
  69. rt.metrics.EventsHandled.With("routine", rt.name).Add(1)
  70. if err != nil {
  71. rt.terminate(err)
  72. return
  73. }
  74. rt.metrics.EventsOut.With("routine", rt.name).Add(float64(len(oEvents)))
  75. rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents)))
  76. for _, event := range oEvents {
  77. rt.logger.Info(fmt.Sprintln("writting back to output"))
  78. rt.out <- event
  79. }
  80. case iEvent, ok := <-rt.errors:
  81. rt.metrics.ErrorsIn.With("routine", rt.name).Add(1)
  82. if !ok {
  83. rt.logger.Info(fmt.Sprintf("%s: errors closed\n", rt.name))
  84. errorsDrained = true
  85. continue
  86. }
  87. oEvents, err := rt.handle(iEvent)
  88. rt.metrics.ErrorsHandled.With("routine", rt.name).Add(1)
  89. if err != nil {
  90. rt.terminate(err)
  91. return
  92. }
  93. rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents)))
  94. for _, event := range oEvents {
  95. rt.out <- event
  96. }
  97. }
  98. }
  99. }
  100. func (rt *Routine) feedback() {
  101. for event := range rt.out {
  102. rt.send(event)
  103. }
  104. }
  105. // XXX: this should be called trySend for consistency
  106. func (rt *Routine) send(event Event) bool {
  107. if !rt.isRunning() {
  108. return false
  109. }
  110. rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event))
  111. if err, ok := event.(error); ok {
  112. select {
  113. case rt.errors <- err:
  114. rt.metrics.ErrorsSent.With("routine", rt.name).Add(1)
  115. return true
  116. default:
  117. rt.metrics.ErrorsShed.With("routine", rt.name).Add(1)
  118. rt.logger.Info(fmt.Sprintf("%s: errors channel was full\n", rt.name))
  119. return false
  120. }
  121. } else {
  122. select {
  123. case rt.input <- event:
  124. rt.metrics.EventsSent.With("routine", rt.name).Add(1)
  125. return true
  126. default:
  127. rt.metrics.EventsShed.With("routine", rt.name).Add(1)
  128. rt.logger.Info(fmt.Sprintf("%s: channel was full\n", rt.name))
  129. return false
  130. }
  131. }
  132. }
  133. func (rt *Routine) isRunning() bool {
  134. return atomic.LoadUint32(rt.running) == 1
  135. }
  136. func (rt *Routine) output() chan Event {
  137. return rt.out
  138. }
  139. func (rt *Routine) stop() {
  140. if !rt.isRunning() {
  141. return
  142. }
  143. rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name))
  144. close(rt.input)
  145. close(rt.errors)
  146. <-rt.stopped
  147. }
  148. func (rt *Routine) terminate(reason error) {
  149. stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0))
  150. if !stopped {
  151. panic("called stop but already stopped")
  152. }
  153. rt.finished <- reason
  154. }
  155. // XXX: this should probably produced the finished
  156. // channel and let the caller deicde how long to wait
  157. func (rt *Routine) wait() error {
  158. return <-rt.finished
  159. }