diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go deleted file mode 100644 index 5ebb54633..000000000 --- a/blockchain/v2/demuxer.go +++ /dev/null @@ -1,164 +0,0 @@ -// nolint:unused -package v2 - -import ( - "fmt" - "sync/atomic" - - "github.com/tendermint/tendermint/libs/log" -) - -type scFull struct { - priorityHigh -} -type pcFull struct { - priorityHigh -} - -const demuxerBufferSize = 10 - -type demuxer struct { - input chan Event - scheduler *Routine - processor *Routine - fin chan error - stopped chan struct{} - rdy chan struct{} - running *uint32 - stopping *uint32 - logger log.Logger -} - -func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { - return &demuxer{ - input: make(chan Event, demuxerBufferSize), - scheduler: scheduler, - processor: processor, - stopped: make(chan struct{}, 1), - fin: make(chan error, 1), - rdy: make(chan struct{}, 1), - running: new(uint32), - stopping: new(uint32), - logger: log.NewNopLogger(), - } -} - -func (dm *demuxer) setLogger(logger log.Logger) { - dm.logger = logger -} - -func (dm *demuxer) start() { - starting := atomic.CompareAndSwapUint32(dm.running, uint32(0), uint32(1)) - if !starting { - panic("Routine has already started") - } - dm.logger.Info("demuxer: run") - dm.rdy <- struct{}{} - for { - if !dm.isRunning() { - break - } - select { - case event, ok := <-dm.input: - if !ok { - dm.logger.Info("demuxer: stopping") - dm.terminate(fmt.Errorf("stopped")) - dm.stopped <- struct{}{} - return - } - oEvent, err := dm.handle(event) - if err != nil { - dm.terminate(err) - return - } - dm.input <- oEvent - case event, ok := <-dm.scheduler.next(): - if !ok { - dm.logger.Info("demuxer: scheduler output closed") - continue - } - oEvent, err := dm.handle(event) - if err != nil { - dm.terminate(err) - return - } - dm.input <- oEvent - case event, ok := <-dm.processor.next(): - if !ok { - dm.logger.Info("demuxer: processor output closed") - continue - } - oEvent, err := dm.handle(event) - if err != nil { - dm.terminate(err) - return - } - dm.input <- oEvent - } - } -} - -func (dm *demuxer) handle(event Event) (Event, error) { - received := dm.scheduler.trySend(event) - if !received { - return scFull{}, nil // backpressure - } - - received = dm.processor.trySend(event) - if !received { - return pcFull{}, nil // backpressure - } - - return noOp, nil -} - -func (dm *demuxer) trySend(event Event) bool { - if !dm.isRunning() || dm.isStopping() { - dm.logger.Info("dummuxer isn't running") - return false - } - select { - case dm.input <- event: - return true - default: - dm.logger.Info("demuxer channel was full") - return false - } -} - -func (dm *demuxer) isRunning() bool { - return atomic.LoadUint32(dm.running) == 1 -} - -func (dm *demuxer) isStopping() bool { - return atomic.LoadUint32(dm.stopping) == 1 -} - -func (dm *demuxer) ready() chan struct{} { - return dm.rdy -} - -func (dm *demuxer) stop() { - if !dm.isRunning() { - return - } - stopping := atomic.CompareAndSwapUint32(dm.stopping, uint32(0), uint32(1)) - if !stopping { - panic("Demuxer has already stopped") - } - dm.logger.Info("demuxer stop") - close(dm.input) - <-dm.stopped -} - -func (dm *demuxer) terminate(reason error) { - stopped := atomic.CompareAndSwapUint32(dm.running, uint32(1), uint32(0)) - if !stopped { - panic("called terminate but already terminated") - } - dm.fin <- reason -} - -func (dm *demuxer) final() chan error { - return dm.fin -} diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 64deb5ce8..cba3f5857 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -16,8 +16,6 @@ func schedulerHandle(event Event) (Event, error) { switch event.(type) { case timeCheck: fmt.Println("scheduler handle timeCheck") - case Event: - fmt.Println("scheduler handle testEvent") } return noOp, nil } @@ -26,71 +24,94 @@ func processorHandle(event Event) (Event, error) { switch event.(type) { case timeCheck: fmt.Println("processor handle timeCheck") - case Event: - fmt.Println("processor handle event") } return noOp, nil + } type Reactor struct { - demuxer *demuxer + events chan Event + stopDemux chan struct{} scheduler *Routine processor *Routine ticker *time.Ticker + logger log.Logger +} + +var bufferSize int = 10 + +func NewReactor() *Reactor { + return &Reactor{ + events: make(chan Event, bufferSize), + stopDemux: make(chan struct{}), + scheduler: newRoutine("scheduler", schedulerHandle), + processor: newRoutine("processor", processorHandle), + ticker: time.NewTicker(1 * time.Second), + logger: log.NewNopLogger(), + } } // nolint:unused func (r *Reactor) setLogger(logger log.Logger) { + r.logger = logger r.scheduler.setLogger(logger) r.processor.setLogger(logger) - r.demuxer.setLogger(logger) } func (r *Reactor) Start() { - r.scheduler = newRoutine("scheduler", schedulerHandle) - r.processor = newRoutine("processor", processorHandle) - r.demuxer = newDemuxer(r.scheduler, r.processor) - r.ticker = time.NewTicker(1 * time.Second) - go r.scheduler.start() go r.processor.start() - go r.demuxer.start() + go r.demux() <-r.scheduler.ready() <-r.processor.ready() - <-r.demuxer.ready() go func() { for t := range r.ticker.C { - r.demuxer.trySend(timeCheck{time: t}) + r.events <- timeCheck{time: t} } }() } -func (r *Reactor) Wait() { - fmt.Println("completed routines") - r.Stop() +func (r *Reactor) demux() { + for { + select { + case event := <-r.events: + + // XXX: check for backpressure + r.scheduler.trySend(event) + r.processor.trySend(event) + case _ = <-r.stopDemux: + r.logger.Info("demuxing stopped") + return + case event := <-r.scheduler.next(): + r.events <- event + case event := <-r.processor.next(): + r.events <- event + case err := <-r.scheduler.final(): + r.logger.Info(fmt.Sprintf("scheduler final %s", err)) + case err := <-r.processor.final(): + r.logger.Info(fmt.Sprintf("processor final %s", err)) + // XXX: switch to consensus + } + } } func (r *Reactor) Stop() { - fmt.Println("reactor stopping") + r.logger.Info("reactor stopping") r.ticker.Stop() - r.demuxer.stop() r.scheduler.stop() r.processor.stop() - // todo: accumulator - // todo: io + close(r.stopDemux) + close(r.events) - fmt.Println("reactor stopped") + r.logger.Info("reactor stopped") } func (r *Reactor) Receive(event Event) { - fmt.Println("receive event") - sent := r.demuxer.trySend(event) - if !sent { - fmt.Println("demuxer is full") - } + // XXX: decode and serialize write events + r.events <- event } func (r *Reactor) AddPeer() { diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index 2b3e2be1e..e14e618de 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -1,11 +1,15 @@ package v2 -import "testing" +import ( + "testing" + + "github.com/tendermint/tendermint/libs/log" +) -// XXX: This makes assumptions about the message routing func TestReactor(t *testing.T) { - reactor := Reactor{} + reactor := NewReactor() reactor.Start() + reactor.setLogger(log.TestingLogger()) script := []Event{ // TODO } @@ -13,5 +17,5 @@ func TestReactor(t *testing.T) { for _, event := range script { reactor.Receive(event) } - reactor.Wait() + reactor.Stop() } diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 9c5b97b18..04cd43c63 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -20,7 +20,7 @@ type handleFunc = func(event Event) (Event, error) // handles the concurrency and messaging guarantees. Events are sent via // `trySend` are handled by the `handle` function to produce an iterator // `next()`. Calling `close()` on a routine will conclude processing of all -// sent events and produce `last()` event representing the terminal state. +// sent events and produce `final()` event representing the terminal state. type Routine struct { name string handle handleFunc