You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

134 lines
3.3 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package v2
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. "github.com/Workiva/go-datastructures/queue"
  6. "github.com/tendermint/tendermint/libs/log"
  7. )
  8. type handleFunc = func(event Event) (Event, error)
  9. // Routines are a structure which model a finite state machine as serialized
  10. // stream of events processed by a handle function. This Routine structure
  11. // handles the concurrency and messaging guarantees. Events are sent via
  12. // `send` are handled by the `handle` function to produce an iterator
  13. // `next()`. Calling `close()` on a routine will conclude processing of all
  14. // sent events and produce `final()` event representing the terminal state.
  15. type Routine struct {
  16. name string
  17. handle handleFunc
  18. queue *queue.PriorityQueue
  19. out chan Event
  20. fin chan error
  21. rdy chan struct{}
  22. running *uint32
  23. logger log.Logger
  24. metrics *Metrics
  25. }
  26. func newRoutine(name string, handleFunc handleFunc, bufferSize int) *Routine {
  27. return &Routine{
  28. name: name,
  29. handle: handleFunc,
  30. queue: queue.NewPriorityQueue(bufferSize, true),
  31. out: make(chan Event, bufferSize),
  32. rdy: make(chan struct{}, 1),
  33. fin: make(chan error, 1),
  34. running: new(uint32),
  35. logger: log.NewNopLogger(),
  36. metrics: NopMetrics(),
  37. }
  38. }
  39. // nolint: unused
  40. func (rt *Routine) setLogger(logger log.Logger) {
  41. rt.logger = logger
  42. }
  43. // nolint:unused
  44. func (rt *Routine) setMetrics(metrics *Metrics) {
  45. rt.metrics = metrics
  46. }
  47. func (rt *Routine) start() {
  48. rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name))
  49. running := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1))
  50. if !running {
  51. panic(fmt.Sprintf("%s is already running", rt.name))
  52. }
  53. close(rt.rdy)
  54. defer func() {
  55. stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0))
  56. if !stopped {
  57. panic(fmt.Sprintf("%s is failed to stop", rt.name))
  58. }
  59. }()
  60. for {
  61. events, err := rt.queue.Get(1)
  62. if err != nil {
  63. rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name))
  64. rt.terminate(fmt.Errorf("stopped"))
  65. return
  66. }
  67. oEvent, err := rt.handle(events[0].(Event))
  68. rt.metrics.EventsHandled.With("routine", rt.name).Add(1)
  69. if err != nil {
  70. rt.terminate(err)
  71. return
  72. }
  73. rt.metrics.EventsOut.With("routine", rt.name).Add(1)
  74. rt.logger.Debug(fmt.Sprintf("%s produced %T %+v\n", rt.name, oEvent, oEvent))
  75. rt.out <- oEvent
  76. }
  77. }
  78. // XXX: look into returning OpError in the net package
  79. func (rt *Routine) send(event Event) bool {
  80. rt.logger.Debug(fmt.Sprintf("%s: received %T %+v", rt.name, event, event))
  81. if !rt.isRunning() {
  82. return false
  83. }
  84. err := rt.queue.Put(event)
  85. if err != nil {
  86. rt.metrics.EventsShed.With("routine", rt.name).Add(1)
  87. rt.logger.Info(fmt.Sprintf("%s: send failed, queue was full/stopped \n", rt.name))
  88. return false
  89. }
  90. rt.metrics.EventsSent.With("routine", rt.name).Add(1)
  91. return true
  92. }
  93. func (rt *Routine) isRunning() bool {
  94. return atomic.LoadUint32(rt.running) == 1
  95. }
  96. func (rt *Routine) next() chan Event {
  97. return rt.out
  98. }
  99. func (rt *Routine) ready() chan struct{} {
  100. return rt.rdy
  101. }
  102. func (rt *Routine) stop() {
  103. if !rt.isRunning() {
  104. return
  105. }
  106. rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name))
  107. rt.queue.Dispose() // this should block until all queue items are free?
  108. }
  109. func (rt *Routine) final() chan error {
  110. return rt.fin
  111. }
  112. // XXX: Maybe get rid of this
  113. func (rt *Routine) terminate(reason error) {
  114. close(rt.out)
  115. rt.fin <- reason
  116. }