Browse Source

Subsume the demuxer into the reactor

+ Simplify the design by demuxing events directly in the reactor
pull/3878/head
Sean Braithwaite 5 years ago
parent
commit
e7ee314c99
4 changed files with 57 additions and 196 deletions
  1. +0
    -164
      blockchain/v2/demuxer.go
  2. +48
    -27
      blockchain/v2/reactor.go
  3. +8
    -4
      blockchain/v2/reactor_test.go
  4. +1
    -1
      blockchain/v2/routine.go

+ 0
- 164
blockchain/v2/demuxer.go View File

@ -1,164 +0,0 @@
// nolint:unused
package v2
import (
"fmt"
"sync/atomic"
"github.com/tendermint/tendermint/libs/log"
)
type scFull struct {
priorityHigh
}
type pcFull struct {
priorityHigh
}
const demuxerBufferSize = 10
type demuxer struct {
input chan Event
scheduler *Routine
processor *Routine
fin chan error
stopped chan struct{}
rdy chan struct{}
running *uint32
stopping *uint32
logger log.Logger
}
func newDemuxer(scheduler *Routine, processor *Routine) *demuxer {
return &demuxer{
input: make(chan Event, demuxerBufferSize),
scheduler: scheduler,
processor: processor,
stopped: make(chan struct{}, 1),
fin: make(chan error, 1),
rdy: make(chan struct{}, 1),
running: new(uint32),
stopping: new(uint32),
logger: log.NewNopLogger(),
}
}
func (dm *demuxer) setLogger(logger log.Logger) {
dm.logger = logger
}
func (dm *demuxer) start() {
starting := atomic.CompareAndSwapUint32(dm.running, uint32(0), uint32(1))
if !starting {
panic("Routine has already started")
}
dm.logger.Info("demuxer: run")
dm.rdy <- struct{}{}
for {
if !dm.isRunning() {
break
}
select {
case event, ok := <-dm.input:
if !ok {
dm.logger.Info("demuxer: stopping")
dm.terminate(fmt.Errorf("stopped"))
dm.stopped <- struct{}{}
return
}
oEvent, err := dm.handle(event)
if err != nil {
dm.terminate(err)
return
}
dm.input <- oEvent
case event, ok := <-dm.scheduler.next():
if !ok {
dm.logger.Info("demuxer: scheduler output closed")
continue
}
oEvent, err := dm.handle(event)
if err != nil {
dm.terminate(err)
return
}
dm.input <- oEvent
case event, ok := <-dm.processor.next():
if !ok {
dm.logger.Info("demuxer: processor output closed")
continue
}
oEvent, err := dm.handle(event)
if err != nil {
dm.terminate(err)
return
}
dm.input <- oEvent
}
}
}
func (dm *demuxer) handle(event Event) (Event, error) {
received := dm.scheduler.trySend(event)
if !received {
return scFull{}, nil // backpressure
}
received = dm.processor.trySend(event)
if !received {
return pcFull{}, nil // backpressure
}
return noOp, nil
}
func (dm *demuxer) trySend(event Event) bool {
if !dm.isRunning() || dm.isStopping() {
dm.logger.Info("dummuxer isn't running")
return false
}
select {
case dm.input <- event:
return true
default:
dm.logger.Info("demuxer channel was full")
return false
}
}
func (dm *demuxer) isRunning() bool {
return atomic.LoadUint32(dm.running) == 1
}
func (dm *demuxer) isStopping() bool {
return atomic.LoadUint32(dm.stopping) == 1
}
func (dm *demuxer) ready() chan struct{} {
return dm.rdy
}
func (dm *demuxer) stop() {
if !dm.isRunning() {
return
}
stopping := atomic.CompareAndSwapUint32(dm.stopping, uint32(0), uint32(1))
if !stopping {
panic("Demuxer has already stopped")
}
dm.logger.Info("demuxer stop")
close(dm.input)
<-dm.stopped
}
func (dm *demuxer) terminate(reason error) {
stopped := atomic.CompareAndSwapUint32(dm.running, uint32(1), uint32(0))
if !stopped {
panic("called terminate but already terminated")
}
dm.fin <- reason
}
func (dm *demuxer) final() chan error {
return dm.fin
}

+ 48
- 27
blockchain/v2/reactor.go View File

@ -16,8 +16,6 @@ func schedulerHandle(event Event) (Event, error) {
switch event.(type) {
case timeCheck:
fmt.Println("scheduler handle timeCheck")
case Event:
fmt.Println("scheduler handle testEvent")
}
return noOp, nil
}
@ -26,71 +24,94 @@ func processorHandle(event Event) (Event, error) {
switch event.(type) {
case timeCheck:
fmt.Println("processor handle timeCheck")
case Event:
fmt.Println("processor handle event")
}
return noOp, nil
}
type Reactor struct {
demuxer *demuxer
events chan Event
stopDemux chan struct{}
scheduler *Routine
processor *Routine
ticker *time.Ticker
logger log.Logger
}
var bufferSize int = 10
func NewReactor() *Reactor {
return &Reactor{
events: make(chan Event, bufferSize),
stopDemux: make(chan struct{}),
scheduler: newRoutine("scheduler", schedulerHandle),
processor: newRoutine("processor", processorHandle),
ticker: time.NewTicker(1 * time.Second),
logger: log.NewNopLogger(),
}
}
// nolint:unused
func (r *Reactor) setLogger(logger log.Logger) {
r.logger = logger
r.scheduler.setLogger(logger)
r.processor.setLogger(logger)
r.demuxer.setLogger(logger)
}
func (r *Reactor) Start() {
r.scheduler = newRoutine("scheduler", schedulerHandle)
r.processor = newRoutine("processor", processorHandle)
r.demuxer = newDemuxer(r.scheduler, r.processor)
r.ticker = time.NewTicker(1 * time.Second)
go r.scheduler.start()
go r.processor.start()
go r.demuxer.start()
go r.demux()
<-r.scheduler.ready()
<-r.processor.ready()
<-r.demuxer.ready()
go func() {
for t := range r.ticker.C {
r.demuxer.trySend(timeCheck{time: t})
r.events <- timeCheck{time: t}
}
}()
}
func (r *Reactor) Wait() {
fmt.Println("completed routines")
r.Stop()
func (r *Reactor) demux() {
for {
select {
case event := <-r.events:
// XXX: check for backpressure
r.scheduler.trySend(event)
r.processor.trySend(event)
case _ = <-r.stopDemux:
r.logger.Info("demuxing stopped")
return
case event := <-r.scheduler.next():
r.events <- event
case event := <-r.processor.next():
r.events <- event
case err := <-r.scheduler.final():
r.logger.Info(fmt.Sprintf("scheduler final %s", err))
case err := <-r.processor.final():
r.logger.Info(fmt.Sprintf("processor final %s", err))
// XXX: switch to consensus
}
}
}
func (r *Reactor) Stop() {
fmt.Println("reactor stopping")
r.logger.Info("reactor stopping")
r.ticker.Stop()
r.demuxer.stop()
r.scheduler.stop()
r.processor.stop()
// todo: accumulator
// todo: io
close(r.stopDemux)
close(r.events)
fmt.Println("reactor stopped")
r.logger.Info("reactor stopped")
}
func (r *Reactor) Receive(event Event) {
fmt.Println("receive event")
sent := r.demuxer.trySend(event)
if !sent {
fmt.Println("demuxer is full")
}
// XXX: decode and serialize write events
r.events <- event
}
func (r *Reactor) AddPeer() {


+ 8
- 4
blockchain/v2/reactor_test.go View File

@ -1,11 +1,15 @@
package v2
import "testing"
import (
"testing"
"github.com/tendermint/tendermint/libs/log"
)
// XXX: This makes assumptions about the message routing
func TestReactor(t *testing.T) {
reactor := Reactor{}
reactor := NewReactor()
reactor.Start()
reactor.setLogger(log.TestingLogger())
script := []Event{
// TODO
}
@ -13,5 +17,5 @@ func TestReactor(t *testing.T) {
for _, event := range script {
reactor.Receive(event)
}
reactor.Wait()
reactor.Stop()
}

+ 1
- 1
blockchain/v2/routine.go View File

@ -20,7 +20,7 @@ type handleFunc = func(event Event) (Event, error)
// handles the concurrency and messaging guarantees. Events are sent via
// `trySend` are handled by the `handle` function to produce an iterator
// `next()`. Calling `close()` on a routine will conclude processing of all
// sent events and produce `last()` event representing the terminal state.
// sent events and produce `final()` event representing the terminal state.
type Routine struct {
name string
handle handleFunc


Loading…
Cancel
Save