You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
3.3 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. // nolint:unused
  2. package v2
  3. import (
  4. "fmt"
  5. "sync/atomic"
  6. "github.com/tendermint/tendermint/libs/log"
  7. )
  8. type scFull struct{}
  9. type pcFull struct{}
  10. const demuxerBufferSize = 10
  11. type demuxer struct {
  12. input chan Event
  13. scheduler *Routine
  14. processor *Routine
  15. fin chan error
  16. stopped chan struct{}
  17. rdy chan struct{}
  18. running *uint32
  19. stopping *uint32
  20. logger log.Logger
  21. }
  22. func newDemuxer(scheduler *Routine, processor *Routine) *demuxer {
  23. return &demuxer{
  24. input: make(chan Event, demuxerBufferSize),
  25. scheduler: scheduler,
  26. processor: processor,
  27. stopped: make(chan struct{}, 1),
  28. fin: make(chan error, 1),
  29. rdy: make(chan struct{}, 1),
  30. running: new(uint32),
  31. stopping: new(uint32),
  32. logger: log.NewNopLogger(),
  33. }
  34. }
  35. func (dm *demuxer) setLogger(logger log.Logger) {
  36. dm.logger = logger
  37. }
  38. func (dm *demuxer) start() {
  39. starting := atomic.CompareAndSwapUint32(dm.running, uint32(0), uint32(1))
  40. if !starting {
  41. panic("Routine has already started")
  42. }
  43. dm.logger.Info("demuxer: run")
  44. dm.rdy <- struct{}{}
  45. for {
  46. if !dm.isRunning() {
  47. break
  48. }
  49. select {
  50. case event, ok := <-dm.input:
  51. if !ok {
  52. dm.logger.Info("demuxer: stopping")
  53. dm.terminate(fmt.Errorf("stopped"))
  54. dm.stopped <- struct{}{}
  55. return
  56. }
  57. oEvents, err := dm.handle(event)
  58. if err != nil {
  59. dm.terminate(err)
  60. return
  61. }
  62. for _, event := range oEvents {
  63. dm.input <- event
  64. }
  65. case event, ok := <-dm.scheduler.next():
  66. if !ok {
  67. dm.logger.Info("demuxer: scheduler output closed")
  68. continue
  69. }
  70. oEvents, err := dm.handle(event)
  71. if err != nil {
  72. dm.terminate(err)
  73. return
  74. }
  75. for _, event := range oEvents {
  76. dm.input <- event
  77. }
  78. case event, ok := <-dm.processor.next():
  79. if !ok {
  80. dm.logger.Info("demuxer: processor output closed")
  81. continue
  82. }
  83. oEvents, err := dm.handle(event)
  84. if err != nil {
  85. dm.terminate(err)
  86. return
  87. }
  88. for _, event := range oEvents {
  89. dm.input <- event
  90. }
  91. }
  92. }
  93. }
  94. func (dm *demuxer) handle(event Event) (Events, error) {
  95. received := dm.scheduler.trySend(event)
  96. if !received {
  97. return Events{scFull{}}, nil // backpressure
  98. }
  99. received = dm.processor.trySend(event)
  100. if !received {
  101. return Events{pcFull{}}, nil // backpressure
  102. }
  103. return Events{}, nil
  104. }
  105. func (dm *demuxer) trySend(event Event) bool {
  106. if !dm.isRunning() || dm.isStopping() {
  107. dm.logger.Info("dummuxer isn't running")
  108. return false
  109. }
  110. select {
  111. case dm.input <- event:
  112. return true
  113. default:
  114. dm.logger.Info("demuxer channel was full")
  115. return false
  116. }
  117. }
  118. func (dm *demuxer) isRunning() bool {
  119. return atomic.LoadUint32(dm.running) == 1
  120. }
  121. func (dm *demuxer) isStopping() bool {
  122. return atomic.LoadUint32(dm.stopping) == 1
  123. }
  124. func (dm *demuxer) ready() chan struct{} {
  125. return dm.rdy
  126. }
  127. func (dm *demuxer) stop() {
  128. if !dm.isRunning() {
  129. return
  130. }
  131. stopping := atomic.CompareAndSwapUint32(dm.stopping, uint32(0), uint32(1))
  132. if !stopping {
  133. panic("Demuxer has already stopped")
  134. }
  135. dm.logger.Info("demuxer stop")
  136. close(dm.input)
  137. <-dm.stopped
  138. }
  139. func (dm *demuxer) terminate(reason error) {
  140. stopped := atomic.CompareAndSwapUint32(dm.running, uint32(1), uint32(0))
  141. if !stopped {
  142. panic("called terminate but already terminated")
  143. }
  144. dm.fin <- reason
  145. }
  146. func (dm *demuxer) final() chan error {
  147. return dm.fin
  148. }