You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

119 lines
2.4 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package v2
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/tendermint/tendermint/libs/log"
  6. )
  7. type timeCheck struct {
  8. priorityHigh
  9. time time.Time
  10. }
  11. func schedulerHandle(event Event) (Event, error) {
  12. switch event.(type) {
  13. case timeCheck:
  14. fmt.Println("scheduler handle timeCheck")
  15. }
  16. return noOp, nil
  17. }
  18. func processorHandle(event Event) (Event, error) {
  19. switch event.(type) {
  20. case timeCheck:
  21. fmt.Println("processor handle timeCheck")
  22. }
  23. return noOp, nil
  24. }
  25. type Reactor struct {
  26. events chan Event
  27. stopDemux chan struct{}
  28. scheduler *Routine
  29. processor *Routine
  30. ticker *time.Ticker
  31. logger log.Logger
  32. }
  33. func NewReactor(bufferSize int) *Reactor {
  34. return &Reactor{
  35. events: make(chan Event, bufferSize),
  36. stopDemux: make(chan struct{}),
  37. scheduler: newRoutine("scheduler", schedulerHandle, bufferSize),
  38. processor: newRoutine("processor", processorHandle, bufferSize),
  39. ticker: time.NewTicker(1 * time.Second),
  40. logger: log.NewNopLogger(),
  41. }
  42. }
  43. // nolint:unused
  44. func (r *Reactor) setLogger(logger log.Logger) {
  45. r.logger = logger
  46. r.scheduler.setLogger(logger)
  47. r.processor.setLogger(logger)
  48. }
  49. func (r *Reactor) Start() {
  50. go r.scheduler.start()
  51. go r.processor.start()
  52. go r.demux()
  53. <-r.scheduler.ready()
  54. <-r.processor.ready()
  55. go func() {
  56. for t := range r.ticker.C {
  57. r.events <- timeCheck{time: t}
  58. }
  59. }()
  60. }
  61. // XXX: Would it be possible here to provide some kind of type safety for the types
  62. // of events that each routine can produce and consume?
  63. func (r *Reactor) demux() {
  64. for {
  65. select {
  66. case event := <-r.events:
  67. // XXX: check for backpressure
  68. r.scheduler.send(event)
  69. r.processor.send(event)
  70. case <-r.stopDemux:
  71. r.logger.Info("demuxing stopped")
  72. return
  73. case event := <-r.scheduler.next():
  74. r.processor.send(event)
  75. case event := <-r.processor.next():
  76. r.scheduler.send(event)
  77. case err := <-r.scheduler.final():
  78. r.logger.Info(fmt.Sprintf("scheduler final %s", err))
  79. case err := <-r.processor.final():
  80. r.logger.Info(fmt.Sprintf("processor final %s", err))
  81. // XXX: switch to consensus
  82. }
  83. }
  84. }
  85. func (r *Reactor) Stop() {
  86. r.logger.Info("reactor stopping")
  87. r.ticker.Stop()
  88. r.scheduler.stop()
  89. r.processor.stop()
  90. close(r.stopDemux)
  91. close(r.events)
  92. r.logger.Info("reactor stopped")
  93. }
  94. func (r *Reactor) Receive(event Event) {
  95. // XXX: decode and serialize write events
  96. // TODO: backpressure
  97. r.events <- event
  98. }
  99. func (r *Reactor) AddPeer() {
  100. // TODO: add peer event and send to demuxer
  101. }