diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go new file mode 100644 index 000000000..511bf7f8b --- /dev/null +++ b/blockchain/v2/demuxer.go @@ -0,0 +1,115 @@ +package v2 + +import "fmt" + +type demuxer struct { + eventbus chan Event + scheduler *Routine + processor *Routine + finished chan error + stopped chan struct{} +} + +func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { + return &demuxer{ + eventbus: make(chan Event, 10), + scheduler: scheduler, + processor: processor, + stopped: make(chan struct{}, 1), + finished: make(chan error, 1), + } +} + +// What should the termination clause be? +// Is any of the subroutines finishe, the demuxer finishes +func (dm *demuxer) run() { + fmt.Printf("demuxer: run\n") + for { + select { + case event, ok := <-dm.eventbus: + if !ok { + fmt.Printf("demuxer: stopping\n") + dm.stopped <- struct{}{} + return + } + oEvents, err := dm.handle(event) + if err != nil { + // TODO Termination time + return + } + for _, event := range oEvents { + dm.eventbus <- event + } + case event, ok := <-dm.scheduler.output: + if !ok { + fmt.Printf("demuxer: scheduler output closed\n") + continue + } + oEvents, err := dm.handle(event) + if err != nil { + // TODO tTermination time + return + } + for _, event := range oEvents { + dm.eventbus <- event + } + case event, ok := <-dm.processor.output: + if !ok { + fmt.Printf("demuxer: processor output closed\n") + continue + } + oEvents, err := dm.handle(event) + if err != nil { + // TODO tTermination time + return + } + for _, event := range oEvents { + dm.eventbus <- event + } + case err := <-dm.scheduler.finished: + dm.finished <- err + case err := <-dm.processor.finished: + dm.finished <- err + } + } +} + +func (dm *demuxer) handle(event Event) (Events, error) { + received := dm.scheduler.send(event) + if !received { + return Events{scFull{}}, nil // backpressure + } + + received = dm.processor.send(event) + if !received { + return Events{pcFull{}}, nil // backpressure + } + + return Events{}, nil +} + +func (dm *demuxer) send(event Event) bool { + fmt.Printf("demuxer send\n") + select { + case dm.eventbus <- event: + return true + default: + fmt.Printf("demuxer channel was full\n") + return false + } +} + +func (dm *demuxer) stop() { + fmt.Printf("demuxer stop\n") + close(dm.eventbus) + <-dm.stopped + dm.terminate(fmt.Errorf("stopped")) +} + +func (dm *demuxer) terminate(reason error) { + dm.finished <- reason +} + +func (dm *demuxer) wait() error { + return <-dm.finished +} diff --git a/blockchain/v2/metrics.go b/blockchain/v2/metrics.go new file mode 100644 index 000000000..d865e7360 --- /dev/null +++ b/blockchain/v2/metrics.go @@ -0,0 +1,124 @@ +package v2 + +import ( + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/discard" + "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this + // package. + MetricsSubsystem = "blockchain" +) + +// Metrics contains metrics exposed by this package. +type Metrics struct { + // events_in + EventsIn metrics.Counter + // events_in + EventsHandled metrics.Counter + // events_out + EventsOut metrics.Counter + // errors_in + ErrorsIn metrics.Counter + // errors_handled + ErrorsHandled metrics.Counter + // errors_out + ErrorsOut metrics.Counter + // events_shed + EventsShed metrics.Counter + // events_sent + EventsSent metrics.Counter + // errors_sent + ErrorsSent metrics.Counter + // errors_shed + ErrorsShed metrics.Counter +} + +// Can we burn in the routine name here? +func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + EventsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_in", + Help: "Events read from the channel.", + }, labels).With(labelsAndValues...), + EventsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_handled", + Help: "Events handled", + }, labels).With(labelsAndValues...), + EventsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_out", + Help: "Events output from routine.", + }, labels).With(labelsAndValues...), + ErrorsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_in", + Help: "Errors read from the channel.", + }, labels).With(labelsAndValues...), + ErrorsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_handled", + Help: "Errors handled.", + }, labels).With(labelsAndValues...), + ErrorsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_out", + Help: "Errors output from routine.", + }, labels).With(labelsAndValues...), + ErrorsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_sent", + Help: "Errors sent to routine.", + }, labels).With(labelsAndValues...), + ErrorsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_shed", + Help: "Errors dropped from sending.", + }, labels).With(labelsAndValues...), + EventsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_sent", + Help: "Events sent to routine.", + }, labels).With(labelsAndValues...), + EventsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_shed", + Help: "Events dropped from sending.", + }, labels).With(labelsAndValues...), + } +} + +// NopMetrics returns no-op Metrics. +func NopMetrics() *Metrics { + return &Metrics{ + EventsIn: discard.NewCounter(), + EventsHandled: discard.NewCounter(), + EventsOut: discard.NewCounter(), + ErrorsIn: discard.NewCounter(), + ErrorsHandled: discard.NewCounter(), + ErrorsOut: discard.NewCounter(), + EventsShed: discard.NewCounter(), + EventsSent: discard.NewCounter(), + ErrorsSent: discard.NewCounter(), + ErrorsShed: discard.NewCounter(), + } +} diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go new file mode 100644 index 000000000..ffdf92911 --- /dev/null +++ b/blockchain/v2/reactor.go @@ -0,0 +1,98 @@ +package v2 + +import ( + "fmt" + "time" +) + +func schedulerHandle(event Event) (Events, error) { + switch event.(type) { + case timeCheck: + fmt.Println("scheduler handle timeCheck") + case testEvent: + fmt.Println("scheduler handle testEvent") + return Events{scTestEvent{}}, nil + } + return Events{}, nil +} + +func processorHandle(event Event) (Events, error) { + switch event.(type) { + case timeCheck: + fmt.Println("processor handle timeCheck") + case testEvent: + fmt.Println("processor handle testEvent") + case scTestEvent: + fmt.Println("processor handle scTestEvent") + return Events{}, fmt.Errorf("processor done") + } + return Events{}, nil +} + +// reactor +type Reactor struct { + events chan Event + demuxer *demuxer + scheduler *Routine + processor *Routine + ticker *time.Ticker + tickerStopped chan struct{} +} + +func (r *Reactor) Start() { + bufferSize := 10 + events := make(chan Event, bufferSize) + + r.scheduler = newRoutine("scheduler", events, schedulerHandle) + r.processor = newRoutine("processor", events, processorHandle) + r.demuxer = newDemuxer(r.scheduler, r.processor) + r.tickerStopped = make(chan struct{}) + + go r.scheduler.run() + go r.processor.run() + go r.demuxer.run() + + go func() { + ticker := time.NewTicker(1 * time.Second) + for { + select { + case <-ticker.C: + // xxx: what if !sent? + r.demuxer.send(timeCheck{}) + case <-r.tickerStopped: + fmt.Println("ticker stopped") + return + } + } + }() +} + +func (r *Reactor) Wait() { + fmt.Println("completed routines") + r.Stop() +} + +func (r *Reactor) Stop() { + fmt.Println("reactor stopping") + + r.tickerStopped <- struct{}{} + r.demuxer.stop() + r.scheduler.stop() + r.processor.stop() + // todo: accumulator + // todo: io + + fmt.Println("reactor stopped") +} + +func (r *Reactor) Receive(event Event) { + fmt.Println("receive event") + sent := r.demuxer.send(event) + if !sent { + panic("demuxer is full") + } +} + +func (r *Reactor) AddPeer() { + // TODO: add peer event and send to demuxer +} diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go new file mode 100644 index 000000000..b3430074f --- /dev/null +++ b/blockchain/v2/reactor_test.go @@ -0,0 +1,17 @@ +package v2 + +import "testing" + +// XXX: This makes assumptions about the message routing +func TestReactor(t *testing.T) { + reactor := Reactor{} + reactor.Start() + script := Events{ + testEvent{}, + } + + for _, event := range script { + reactor.Receive(event) + } + reactor.Wait() +} diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go new file mode 100644 index 000000000..31b918925 --- /dev/null +++ b/blockchain/v2/routine.go @@ -0,0 +1,165 @@ +package v2 + +import ( + "fmt" + "sync/atomic" + + "github.com/tendermint/tendermint/libs/log" +) + +// TODO +// * revisit panic conditions +// * audit log levels +// * maybe routine should be an interface and the concret tpe should be handlerRoutine + +// Adding Metrics +// we need a metrics definition +type handleFunc = func(event Event) (Events, error) + +type Routine struct { + name string + input chan Event + errors chan error + output chan Event + stopped chan struct{} + finished chan error + running *uint32 + handle handleFunc + logger log.Logger + metrics *Metrics +} + +func newRoutine(name string, output chan Event, handleFunc handleFunc) *Routine { + return &Routine{ + name: name, + input: make(chan Event, 1), + handle: handleFunc, + errors: make(chan error, 1), + output: output, + stopped: make(chan struct{}, 1), + finished: make(chan error, 1), + running: new(uint32), + logger: log.NewNopLogger(), + metrics: NopMetrics(), + } +} + +func (rt *Routine) setLogger(logger log.Logger) { + rt.logger = logger +} + +func (rt *Routine) setMetrics(metrics *Metrics) { + rt.metrics = metrics +} + +func (rt *Routine) run() { + rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name)) + starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) + if !starting { + panic("Routine has already started") + } + errorsDrained := false + for { + if !rt.isRunning() { + break + } + select { + case iEvent, ok := <-rt.input: + rt.metrics.EventsIn.With("routine", rt.name).Add(1) + if !ok { + if !errorsDrained { + continue // wait for errors to be drainned + } + rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) + rt.stopped <- struct{}{} + return + } + oEvents, err := rt.handle(iEvent) + rt.metrics.EventsHandled.With("routine", rt.name).Add(1) + if err != nil { + rt.terminate(err) + return + } + rt.metrics.EventsOut.With("routine", rt.name).Add(float64(len(oEvents))) + rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents))) + for _, event := range oEvents { + rt.logger.Info(fmt.Sprintln("writting back to output")) + rt.output <- event + } + case iEvent, ok := <-rt.errors: + rt.metrics.ErrorsIn.With("routine", rt.name).Add(1) + if !ok { + rt.logger.Info(fmt.Sprintf("%s: errors closed\n", rt.name)) + errorsDrained = true + continue + } + oEvents, err := rt.handle(iEvent) + rt.metrics.ErrorsHandled.With("routine", rt.name).Add(1) + if err != nil { + rt.terminate(err) + return + } + rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents))) + for _, event := range oEvents { + rt.output <- event + } + } + } +} +func (rt *Routine) feedback() { + for event := range rt.output { + rt.send(event) + } +} + +func (rt *Routine) send(event Event) bool { + if err, ok := event.(error); ok { + select { + case rt.errors <- err: + rt.metrics.ErrorsSent.With("routine", rt.name).Add(1) + return true + default: + rt.metrics.ErrorsShed.With("routine", rt.name).Add(1) + rt.logger.Info(fmt.Sprintf("%s: errors channel was full\n", rt.name)) + return false + } + } else { + select { + case rt.input <- event: + rt.metrics.EventsSent.With("routine", rt.name).Add(1) + return true + default: + rt.metrics.EventsShed.With("routine", rt.name).Add(1) + rt.logger.Info(fmt.Sprintf("%s: channel was full\n", rt.name)) + return false + } + } +} + +func (rt *Routine) isRunning() bool { + return atomic.LoadUint32(rt.running) == 1 +} + +// rename flush? +func (rt *Routine) stop() { + // XXX: what if already stopped? + rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name)) + close(rt.input) + close(rt.errors) + <-rt.stopped + rt.terminate(fmt.Errorf("routine stopped")) +} + +func (rt *Routine) terminate(reason error) { + stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0)) + if !stopped { + panic("called stop but already stopped") + } + rt.finished <- reason +} + +// XXX: this should probably produced the finished +// channel and let the caller deicde how long to wait +func (rt *Routine) wait() error { + return <-rt.finished +} diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go new file mode 100644 index 000000000..98fe716f0 --- /dev/null +++ b/blockchain/v2/routine_test.go @@ -0,0 +1,89 @@ +package v2 + +import ( + "fmt" + "testing" + "time" +) + +type eventA struct{} +type eventB struct{} + +var done = fmt.Errorf("done") + +func simpleHandler(event Event) (Events, error) { + switch event.(type) { + case eventA: + return Events{eventB{}}, nil + case eventB: + return Events{routineFinished{}}, done + } + return Events{}, nil +} + +func TestRoutine(t *testing.T) { + events := make(chan Event, 10) + routine := newRoutine("simpleRoutine", events, simpleHandler) + + go routine.run() + go routine.feedback() + + routine.send(eventA{}) + + routine.wait() +} + +func genStatefulHandler(maxCount int) handleFunc { + counter := 0 + return func(event Event) (Events, error) { + switch event.(type) { + case eventA: + counter += 1 + if counter >= maxCount { + return Events{}, done + } + + return Events{eventA{}}, nil + } + return Events{}, nil + } +} + +func TestStatefulRoutine(t *testing.T) { + events := make(chan Event, 10) + handler := genStatefulHandler(10) + routine := newRoutine("statefulRoutine", events, handler) + + go routine.run() + go routine.feedback() + + go routine.send(eventA{}) + + routine.wait() +} + +func handleWithErrors(event Event) (Events, error) { + switch event.(type) { + case eventA: + return Events{}, nil + case errEvent: + return Events{}, done + } + return Events{}, nil +} + +func TestErrorSaturation(t *testing.T) { + events := make(chan Event, 10) + routine := newRoutine("errorRoutine", events, handleWithErrors) + + go routine.run() + go func() { + for { + routine.send(eventA{}) + time.Sleep(10 * time.Millisecond) + } + }() + routine.send(errEvent{}) + + routine.wait() +} diff --git a/blockchain/v2/types.go b/blockchain/v2/types.go new file mode 100644 index 000000000..c9e31b209 --- /dev/null +++ b/blockchain/v2/types.go @@ -0,0 +1,32 @@ +package v2 + +import "time" + +type testEvent struct { + msg string + time time.Time +} + +type testEventTwo struct { + msg string +} + +type stopEvent struct{} +type timeCheck struct { + time time.Time +} + +type errEvent struct{} + +type scTestEvent struct{} + +type pcFinished struct{} + +type routineFinished struct{} + +func (rf *routineFinished) Error() string { + return "routine finished" +} + +type scFull struct{} +type pcFull struct{}