From d1671d6175dc00960381eeb8a5a4281e11203e70 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Sat, 3 Aug 2019 09:19:32 +0200 Subject: [PATCH 01/22] blockchain v2: routines + Include an implementaiton of the routines specified in ADR-43 along with a demuxer and some dummy reactor code --- blockchain/v2/demuxer.go | 115 ++++++++++++++++++++++++ blockchain/v2/metrics.go | 124 +++++++++++++++++++++++++ blockchain/v2/reactor.go | 98 ++++++++++++++++++++ blockchain/v2/reactor_test.go | 17 ++++ blockchain/v2/routine.go | 165 ++++++++++++++++++++++++++++++++++ blockchain/v2/routine_test.go | 89 ++++++++++++++++++ blockchain/v2/types.go | 32 +++++++ 7 files changed, 640 insertions(+) create mode 100644 blockchain/v2/demuxer.go create mode 100644 blockchain/v2/metrics.go create mode 100644 blockchain/v2/reactor.go create mode 100644 blockchain/v2/reactor_test.go create mode 100644 blockchain/v2/routine.go create mode 100644 blockchain/v2/routine_test.go create mode 100644 blockchain/v2/types.go diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go new file mode 100644 index 000000000..511bf7f8b --- /dev/null +++ b/blockchain/v2/demuxer.go @@ -0,0 +1,115 @@ +package v2 + +import "fmt" + +type demuxer struct { + eventbus chan Event + scheduler *Routine + processor *Routine + finished chan error + stopped chan struct{} +} + +func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { + return &demuxer{ + eventbus: make(chan Event, 10), + scheduler: scheduler, + processor: processor, + stopped: make(chan struct{}, 1), + finished: make(chan error, 1), + } +} + +// What should the termination clause be? +// Is any of the subroutines finishe, the demuxer finishes +func (dm *demuxer) run() { + fmt.Printf("demuxer: run\n") + for { + select { + case event, ok := <-dm.eventbus: + if !ok { + fmt.Printf("demuxer: stopping\n") + dm.stopped <- struct{}{} + return + } + oEvents, err := dm.handle(event) + if err != nil { + // TODO Termination time + return + } + for _, event := range oEvents { + dm.eventbus <- event + } + case event, ok := <-dm.scheduler.output: + if !ok { + fmt.Printf("demuxer: scheduler output closed\n") + continue + } + oEvents, err := dm.handle(event) + if err != nil { + // TODO tTermination time + return + } + for _, event := range oEvents { + dm.eventbus <- event + } + case event, ok := <-dm.processor.output: + if !ok { + fmt.Printf("demuxer: processor output closed\n") + continue + } + oEvents, err := dm.handle(event) + if err != nil { + // TODO tTermination time + return + } + for _, event := range oEvents { + dm.eventbus <- event + } + case err := <-dm.scheduler.finished: + dm.finished <- err + case err := <-dm.processor.finished: + dm.finished <- err + } + } +} + +func (dm *demuxer) handle(event Event) (Events, error) { + received := dm.scheduler.send(event) + if !received { + return Events{scFull{}}, nil // backpressure + } + + received = dm.processor.send(event) + if !received { + return Events{pcFull{}}, nil // backpressure + } + + return Events{}, nil +} + +func (dm *demuxer) send(event Event) bool { + fmt.Printf("demuxer send\n") + select { + case dm.eventbus <- event: + return true + default: + fmt.Printf("demuxer channel was full\n") + return false + } +} + +func (dm *demuxer) stop() { + fmt.Printf("demuxer stop\n") + close(dm.eventbus) + <-dm.stopped + dm.terminate(fmt.Errorf("stopped")) +} + +func (dm *demuxer) terminate(reason error) { + dm.finished <- reason +} + +func (dm *demuxer) wait() error { + return <-dm.finished +} diff --git a/blockchain/v2/metrics.go b/blockchain/v2/metrics.go new file mode 100644 index 000000000..d865e7360 --- /dev/null +++ b/blockchain/v2/metrics.go @@ -0,0 +1,124 @@ +package v2 + +import ( + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/discard" + "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this + // package. + MetricsSubsystem = "blockchain" +) + +// Metrics contains metrics exposed by this package. +type Metrics struct { + // events_in + EventsIn metrics.Counter + // events_in + EventsHandled metrics.Counter + // events_out + EventsOut metrics.Counter + // errors_in + ErrorsIn metrics.Counter + // errors_handled + ErrorsHandled metrics.Counter + // errors_out + ErrorsOut metrics.Counter + // events_shed + EventsShed metrics.Counter + // events_sent + EventsSent metrics.Counter + // errors_sent + ErrorsSent metrics.Counter + // errors_shed + ErrorsShed metrics.Counter +} + +// Can we burn in the routine name here? +func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + EventsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_in", + Help: "Events read from the channel.", + }, labels).With(labelsAndValues...), + EventsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_handled", + Help: "Events handled", + }, labels).With(labelsAndValues...), + EventsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_out", + Help: "Events output from routine.", + }, labels).With(labelsAndValues...), + ErrorsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_in", + Help: "Errors read from the channel.", + }, labels).With(labelsAndValues...), + ErrorsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_handled", + Help: "Errors handled.", + }, labels).With(labelsAndValues...), + ErrorsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_out", + Help: "Errors output from routine.", + }, labels).With(labelsAndValues...), + ErrorsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_sent", + Help: "Errors sent to routine.", + }, labels).With(labelsAndValues...), + ErrorsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_shed", + Help: "Errors dropped from sending.", + }, labels).With(labelsAndValues...), + EventsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_sent", + Help: "Events sent to routine.", + }, labels).With(labelsAndValues...), + EventsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_shed", + Help: "Events dropped from sending.", + }, labels).With(labelsAndValues...), + } +} + +// NopMetrics returns no-op Metrics. +func NopMetrics() *Metrics { + return &Metrics{ + EventsIn: discard.NewCounter(), + EventsHandled: discard.NewCounter(), + EventsOut: discard.NewCounter(), + ErrorsIn: discard.NewCounter(), + ErrorsHandled: discard.NewCounter(), + ErrorsOut: discard.NewCounter(), + EventsShed: discard.NewCounter(), + EventsSent: discard.NewCounter(), + ErrorsSent: discard.NewCounter(), + ErrorsShed: discard.NewCounter(), + } +} diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go new file mode 100644 index 000000000..ffdf92911 --- /dev/null +++ b/blockchain/v2/reactor.go @@ -0,0 +1,98 @@ +package v2 + +import ( + "fmt" + "time" +) + +func schedulerHandle(event Event) (Events, error) { + switch event.(type) { + case timeCheck: + fmt.Println("scheduler handle timeCheck") + case testEvent: + fmt.Println("scheduler handle testEvent") + return Events{scTestEvent{}}, nil + } + return Events{}, nil +} + +func processorHandle(event Event) (Events, error) { + switch event.(type) { + case timeCheck: + fmt.Println("processor handle timeCheck") + case testEvent: + fmt.Println("processor handle testEvent") + case scTestEvent: + fmt.Println("processor handle scTestEvent") + return Events{}, fmt.Errorf("processor done") + } + return Events{}, nil +} + +// reactor +type Reactor struct { + events chan Event + demuxer *demuxer + scheduler *Routine + processor *Routine + ticker *time.Ticker + tickerStopped chan struct{} +} + +func (r *Reactor) Start() { + bufferSize := 10 + events := make(chan Event, bufferSize) + + r.scheduler = newRoutine("scheduler", events, schedulerHandle) + r.processor = newRoutine("processor", events, processorHandle) + r.demuxer = newDemuxer(r.scheduler, r.processor) + r.tickerStopped = make(chan struct{}) + + go r.scheduler.run() + go r.processor.run() + go r.demuxer.run() + + go func() { + ticker := time.NewTicker(1 * time.Second) + for { + select { + case <-ticker.C: + // xxx: what if !sent? + r.demuxer.send(timeCheck{}) + case <-r.tickerStopped: + fmt.Println("ticker stopped") + return + } + } + }() +} + +func (r *Reactor) Wait() { + fmt.Println("completed routines") + r.Stop() +} + +func (r *Reactor) Stop() { + fmt.Println("reactor stopping") + + r.tickerStopped <- struct{}{} + r.demuxer.stop() + r.scheduler.stop() + r.processor.stop() + // todo: accumulator + // todo: io + + fmt.Println("reactor stopped") +} + +func (r *Reactor) Receive(event Event) { + fmt.Println("receive event") + sent := r.demuxer.send(event) + if !sent { + panic("demuxer is full") + } +} + +func (r *Reactor) AddPeer() { + // TODO: add peer event and send to demuxer +} diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go new file mode 100644 index 000000000..b3430074f --- /dev/null +++ b/blockchain/v2/reactor_test.go @@ -0,0 +1,17 @@ +package v2 + +import "testing" + +// XXX: This makes assumptions about the message routing +func TestReactor(t *testing.T) { + reactor := Reactor{} + reactor.Start() + script := Events{ + testEvent{}, + } + + for _, event := range script { + reactor.Receive(event) + } + reactor.Wait() +} diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go new file mode 100644 index 000000000..31b918925 --- /dev/null +++ b/blockchain/v2/routine.go @@ -0,0 +1,165 @@ +package v2 + +import ( + "fmt" + "sync/atomic" + + "github.com/tendermint/tendermint/libs/log" +) + +// TODO +// * revisit panic conditions +// * audit log levels +// * maybe routine should be an interface and the concret tpe should be handlerRoutine + +// Adding Metrics +// we need a metrics definition +type handleFunc = func(event Event) (Events, error) + +type Routine struct { + name string + input chan Event + errors chan error + output chan Event + stopped chan struct{} + finished chan error + running *uint32 + handle handleFunc + logger log.Logger + metrics *Metrics +} + +func newRoutine(name string, output chan Event, handleFunc handleFunc) *Routine { + return &Routine{ + name: name, + input: make(chan Event, 1), + handle: handleFunc, + errors: make(chan error, 1), + output: output, + stopped: make(chan struct{}, 1), + finished: make(chan error, 1), + running: new(uint32), + logger: log.NewNopLogger(), + metrics: NopMetrics(), + } +} + +func (rt *Routine) setLogger(logger log.Logger) { + rt.logger = logger +} + +func (rt *Routine) setMetrics(metrics *Metrics) { + rt.metrics = metrics +} + +func (rt *Routine) run() { + rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name)) + starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) + if !starting { + panic("Routine has already started") + } + errorsDrained := false + for { + if !rt.isRunning() { + break + } + select { + case iEvent, ok := <-rt.input: + rt.metrics.EventsIn.With("routine", rt.name).Add(1) + if !ok { + if !errorsDrained { + continue // wait for errors to be drainned + } + rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) + rt.stopped <- struct{}{} + return + } + oEvents, err := rt.handle(iEvent) + rt.metrics.EventsHandled.With("routine", rt.name).Add(1) + if err != nil { + rt.terminate(err) + return + } + rt.metrics.EventsOut.With("routine", rt.name).Add(float64(len(oEvents))) + rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents))) + for _, event := range oEvents { + rt.logger.Info(fmt.Sprintln("writting back to output")) + rt.output <- event + } + case iEvent, ok := <-rt.errors: + rt.metrics.ErrorsIn.With("routine", rt.name).Add(1) + if !ok { + rt.logger.Info(fmt.Sprintf("%s: errors closed\n", rt.name)) + errorsDrained = true + continue + } + oEvents, err := rt.handle(iEvent) + rt.metrics.ErrorsHandled.With("routine", rt.name).Add(1) + if err != nil { + rt.terminate(err) + return + } + rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents))) + for _, event := range oEvents { + rt.output <- event + } + } + } +} +func (rt *Routine) feedback() { + for event := range rt.output { + rt.send(event) + } +} + +func (rt *Routine) send(event Event) bool { + if err, ok := event.(error); ok { + select { + case rt.errors <- err: + rt.metrics.ErrorsSent.With("routine", rt.name).Add(1) + return true + default: + rt.metrics.ErrorsShed.With("routine", rt.name).Add(1) + rt.logger.Info(fmt.Sprintf("%s: errors channel was full\n", rt.name)) + return false + } + } else { + select { + case rt.input <- event: + rt.metrics.EventsSent.With("routine", rt.name).Add(1) + return true + default: + rt.metrics.EventsShed.With("routine", rt.name).Add(1) + rt.logger.Info(fmt.Sprintf("%s: channel was full\n", rt.name)) + return false + } + } +} + +func (rt *Routine) isRunning() bool { + return atomic.LoadUint32(rt.running) == 1 +} + +// rename flush? +func (rt *Routine) stop() { + // XXX: what if already stopped? + rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name)) + close(rt.input) + close(rt.errors) + <-rt.stopped + rt.terminate(fmt.Errorf("routine stopped")) +} + +func (rt *Routine) terminate(reason error) { + stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0)) + if !stopped { + panic("called stop but already stopped") + } + rt.finished <- reason +} + +// XXX: this should probably produced the finished +// channel and let the caller deicde how long to wait +func (rt *Routine) wait() error { + return <-rt.finished +} diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go new file mode 100644 index 000000000..98fe716f0 --- /dev/null +++ b/blockchain/v2/routine_test.go @@ -0,0 +1,89 @@ +package v2 + +import ( + "fmt" + "testing" + "time" +) + +type eventA struct{} +type eventB struct{} + +var done = fmt.Errorf("done") + +func simpleHandler(event Event) (Events, error) { + switch event.(type) { + case eventA: + return Events{eventB{}}, nil + case eventB: + return Events{routineFinished{}}, done + } + return Events{}, nil +} + +func TestRoutine(t *testing.T) { + events := make(chan Event, 10) + routine := newRoutine("simpleRoutine", events, simpleHandler) + + go routine.run() + go routine.feedback() + + routine.send(eventA{}) + + routine.wait() +} + +func genStatefulHandler(maxCount int) handleFunc { + counter := 0 + return func(event Event) (Events, error) { + switch event.(type) { + case eventA: + counter += 1 + if counter >= maxCount { + return Events{}, done + } + + return Events{eventA{}}, nil + } + return Events{}, nil + } +} + +func TestStatefulRoutine(t *testing.T) { + events := make(chan Event, 10) + handler := genStatefulHandler(10) + routine := newRoutine("statefulRoutine", events, handler) + + go routine.run() + go routine.feedback() + + go routine.send(eventA{}) + + routine.wait() +} + +func handleWithErrors(event Event) (Events, error) { + switch event.(type) { + case eventA: + return Events{}, nil + case errEvent: + return Events{}, done + } + return Events{}, nil +} + +func TestErrorSaturation(t *testing.T) { + events := make(chan Event, 10) + routine := newRoutine("errorRoutine", events, handleWithErrors) + + go routine.run() + go func() { + for { + routine.send(eventA{}) + time.Sleep(10 * time.Millisecond) + } + }() + routine.send(errEvent{}) + + routine.wait() +} diff --git a/blockchain/v2/types.go b/blockchain/v2/types.go new file mode 100644 index 000000000..c9e31b209 --- /dev/null +++ b/blockchain/v2/types.go @@ -0,0 +1,32 @@ +package v2 + +import "time" + +type testEvent struct { + msg string + time time.Time +} + +type testEventTwo struct { + msg string +} + +type stopEvent struct{} +type timeCheck struct { + time time.Time +} + +type errEvent struct{} + +type scTestEvent struct{} + +type pcFinished struct{} + +type routineFinished struct{} + +func (rf *routineFinished) Error() string { + return "routine finished" +} + +type scFull struct{} +type pcFull struct{} From 0cf9f8629200584a3747a8bcdff6ee9db7de6094 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Tue, 6 Aug 2019 13:27:15 +0200 Subject: [PATCH 02/22] Modification based on feedback + `routine.send` returns false when routine is not running + this will prevent panics sending to channels which have been closed + Make output channels routine specific removing the risk of someone writting to a channel which was closed by another touine. + consistency changes between the routines and the demuxer --- blockchain/v2/demuxer.go | 70 ++++++++++++++++++++++++----------- blockchain/v2/reactor.go | 26 +++++++++---- blockchain/v2/routine.go | 33 +++++++++++------ blockchain/v2/routine_test.go | 65 +++++++++++++++++++++++++++----- blockchain/v2/types.go | 3 ++ 5 files changed, 147 insertions(+), 50 deletions(-) diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go index 511bf7f8b..32165735e 100644 --- a/blockchain/v2/demuxer.go +++ b/blockchain/v2/demuxer.go @@ -1,75 +1,88 @@ package v2 -import "fmt" +import ( + "fmt" + "sync/atomic" +) type demuxer struct { - eventbus chan Event + input chan Event scheduler *Routine processor *Routine finished chan error stopped chan struct{} + running *uint32 } +// TODO +// demuxer_test +// Termination process +// Logger +// Metrics +// Adhere to interface func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { return &demuxer{ - eventbus: make(chan Event, 10), + input: make(chan Event, 10), scheduler: scheduler, processor: processor, stopped: make(chan struct{}, 1), finished: make(chan error, 1), + running: new(uint32), } } -// What should the termination clause be? -// Is any of the subroutines finishe, the demuxer finishes func (dm *demuxer) run() { + starting := atomic.CompareAndSwapUint32(dm.running, uint32(0), uint32(1)) + if !starting { + panic("Routine has already started") + } fmt.Printf("demuxer: run\n") for { + if !dm.isRunning() { + break + } select { - case event, ok := <-dm.eventbus: + case event, ok := <-dm.input: if !ok { fmt.Printf("demuxer: stopping\n") + dm.terminate(fmt.Errorf("stopped")) dm.stopped <- struct{}{} return } oEvents, err := dm.handle(event) if err != nil { - // TODO Termination time + dm.terminate(err) return } for _, event := range oEvents { - dm.eventbus <- event + dm.input <- event } - case event, ok := <-dm.scheduler.output: + case event, ok := <-dm.scheduler.output(): if !ok { fmt.Printf("demuxer: scheduler output closed\n") continue } oEvents, err := dm.handle(event) if err != nil { - // TODO tTermination time + dm.terminate(err) return } for _, event := range oEvents { - dm.eventbus <- event + dm.input <- event } - case event, ok := <-dm.processor.output: + case event, ok := <-dm.processor.output(): if !ok { fmt.Printf("demuxer: processor output closed\n") continue } oEvents, err := dm.handle(event) if err != nil { - // TODO tTermination time + dm.terminate(err) return } for _, event := range oEvents { - dm.eventbus <- event + dm.input <- event } - case err := <-dm.scheduler.finished: - dm.finished <- err - case err := <-dm.processor.finished: - dm.finished <- err } } } @@ -89,9 +102,12 @@ func (dm *demuxer) handle(event Event) (Events, error) { } func (dm *demuxer) send(event Event) bool { - fmt.Printf("demuxer send\n") + if !dm.isRunning() { + fmt.Println("dummuxer isn't running") + return false + } select { - case dm.eventbus <- event: + case dm.input <- event: return true default: fmt.Printf("demuxer channel was full\n") @@ -99,14 +115,24 @@ func (dm *demuxer) send(event Event) bool { } } +func (dm *demuxer) isRunning() bool { + return atomic.LoadUint32(dm.running) == 1 +} + func (dm *demuxer) stop() { + if !dm.isRunning() { + return + } fmt.Printf("demuxer stop\n") - close(dm.eventbus) + close(dm.input) <-dm.stopped - dm.terminate(fmt.Errorf("stopped")) } func (dm *demuxer) terminate(reason error) { + stopped := atomic.CompareAndSwapUint32(dm.running, uint32(1), uint32(0)) + if !stopped { + panic("called terminate but already terminated") + } dm.finished <- reason } diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index ffdf92911..4f07224ca 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -3,6 +3,8 @@ package v2 import ( "fmt" "time" + + "github.com/tendermint/tendermint/libs/log" ) func schedulerHandle(event Event) (Events, error) { @@ -31,7 +33,6 @@ func processorHandle(event Event) (Events, error) { // reactor type Reactor struct { - events chan Event demuxer *demuxer scheduler *Routine processor *Routine @@ -40,11 +41,14 @@ type Reactor struct { } func (r *Reactor) Start() { - bufferSize := 10 - events := make(chan Event, bufferSize) + logger := log.TestingLogger() - r.scheduler = newRoutine("scheduler", events, schedulerHandle) - r.processor = newRoutine("processor", events, processorHandle) + // what is the best way to get the events out of the routine + r.scheduler = newRoutine("scheduler", schedulerHandle) + r.scheduler.setLogger(logger) + r.processor = newRoutine("processor", processorHandle) + r.processor.setLogger(logger) + // so actually the demuxer only needs to read from events r.demuxer = newDemuxer(r.scheduler, r.processor) r.tickerStopped = make(chan struct{}) @@ -52,12 +56,20 @@ func (r *Reactor) Start() { go r.processor.run() go r.demuxer.run() + for { + if r.scheduler.isRunning() && r.processor.isRunning() && r.demuxer.isRunning() { + fmt.Println("routines running") + break + } + fmt.Println("waiting") + time.Sleep(1 * time.Second) + } + go func() { ticker := time.NewTicker(1 * time.Second) for { select { case <-ticker.C: - // xxx: what if !sent? r.demuxer.send(timeCheck{}) case <-r.tickerStopped: fmt.Println("ticker stopped") @@ -89,7 +101,7 @@ func (r *Reactor) Receive(event Event) { fmt.Println("receive event") sent := r.demuxer.send(event) if !sent { - panic("demuxer is full") + fmt.Println("demuxer is full") } } diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 31b918925..fd923966e 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -10,17 +10,16 @@ import ( // TODO // * revisit panic conditions // * audit log levels -// * maybe routine should be an interface and the concret tpe should be handlerRoutine +// * Convert routine to an interface with concrete implmentation +// * determine the public interface -// Adding Metrics -// we need a metrics definition type handleFunc = func(event Event) (Events, error) type Routine struct { name string input chan Event errors chan error - output chan Event + out chan Event stopped chan struct{} finished chan error running *uint32 @@ -29,13 +28,13 @@ type Routine struct { metrics *Metrics } -func newRoutine(name string, output chan Event, handleFunc handleFunc) *Routine { +func newRoutine(name string, handleFunc handleFunc) *Routine { return &Routine{ name: name, input: make(chan Event, 1), handle: handleFunc, errors: make(chan error, 1), - output: output, + out: make(chan Event, 1), stopped: make(chan struct{}, 1), finished: make(chan error, 1), running: new(uint32), @@ -72,6 +71,7 @@ func (rt *Routine) run() { } rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) rt.stopped <- struct{}{} + rt.terminate(fmt.Errorf("stopped")) return } oEvents, err := rt.handle(iEvent) @@ -84,7 +84,7 @@ func (rt *Routine) run() { rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents))) for _, event := range oEvents { rt.logger.Info(fmt.Sprintln("writting back to output")) - rt.output <- event + rt.out <- event } case iEvent, ok := <-rt.errors: rt.metrics.ErrorsIn.With("routine", rt.name).Add(1) @@ -101,18 +101,23 @@ func (rt *Routine) run() { } rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents))) for _, event := range oEvents { - rt.output <- event + rt.out <- event } } } } func (rt *Routine) feedback() { - for event := range rt.output { + for event := range rt.out { rt.send(event) } } +// XXX: this should be called trySend for consistency func (rt *Routine) send(event Event) bool { + if !rt.isRunning() { + return false + } + rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event)) if err, ok := event.(error); ok { select { case rt.errors <- err: @@ -140,14 +145,18 @@ func (rt *Routine) isRunning() bool { return atomic.LoadUint32(rt.running) == 1 } -// rename flush? +func (rt *Routine) output() chan Event { + return rt.out +} + func (rt *Routine) stop() { - // XXX: what if already stopped? + if !rt.isRunning() { + return + } rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name)) close(rt.input) close(rt.errors) <-rt.stopped - rt.terminate(fmt.Errorf("routine stopped")) } func (rt *Routine) terminate(reason error) { diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go index 98fe716f0..8a84dc4de 100644 --- a/blockchain/v2/routine_test.go +++ b/blockchain/v2/routine_test.go @@ -4,6 +4,8 @@ import ( "fmt" "testing" "time" + + "github.com/stretchr/testify/assert" ) type eventA struct{} @@ -22,15 +24,47 @@ func simpleHandler(event Event) (Events, error) { } func TestRoutine(t *testing.T) { - events := make(chan Event, 10) - routine := newRoutine("simpleRoutine", events, simpleHandler) + routine := newRoutine("simpleRoutine", simpleHandler) + assert.False(t, routine.isRunning(), + "expected an initialized routine to not be running") go routine.run() go routine.feedback() + for { + if routine.isRunning() { + break + } + time.Sleep(10 * time.Millisecond) + } routine.send(eventA{}) - routine.wait() + routine.stop() +} + +func TesRoutineSend(t *testing.T) { + routine := newRoutine("simpleRoutine", simpleHandler) + + assert.False(t, routine.send(eventA{}), + "expected sending to an unstarted routine to fail") + + go routine.run() + + go routine.feedback() + for { + if routine.isRunning() { + break + } + time.Sleep(10 * time.Millisecond) + } + + assert.True(t, routine.send(eventA{}), + "expected sending to a running routine to succeed") + + routine.stop() + + assert.False(t, routine.send(eventA{}), + "expected sending to a stopped routine to fail") } func genStatefulHandler(maxCount int) handleFunc { @@ -50,16 +84,22 @@ func genStatefulHandler(maxCount int) handleFunc { } func TestStatefulRoutine(t *testing.T) { - events := make(chan Event, 10) handler := genStatefulHandler(10) - routine := newRoutine("statefulRoutine", events, handler) + routine := newRoutine("statefulRoutine", handler) go routine.run() go routine.feedback() + for { + if routine.isRunning() { + break + } + time.Sleep(10 * time.Millisecond) + } + go routine.send(eventA{}) - routine.wait() + routine.stop() } func handleWithErrors(event Event) (Events, error) { @@ -73,8 +113,7 @@ func handleWithErrors(event Event) (Events, error) { } func TestErrorSaturation(t *testing.T) { - events := make(chan Event, 10) - routine := newRoutine("errorRoutine", events, handleWithErrors) + routine := newRoutine("errorRoutine", handleWithErrors) go routine.run() go func() { @@ -83,7 +122,15 @@ func TestErrorSaturation(t *testing.T) { time.Sleep(10 * time.Millisecond) } }() - routine.send(errEvent{}) + + for { + if routine.isRunning() { + break + } + time.Sleep(10 * time.Millisecond) + } + assert.True(t, routine.send(errEvent{}), + "expected send to succeed even when saturated") routine.wait() } diff --git a/blockchain/v2/types.go b/blockchain/v2/types.go index c9e31b209..e6f491460 100644 --- a/blockchain/v2/types.go +++ b/blockchain/v2/types.go @@ -2,6 +2,9 @@ package v2 import "time" +type Event interface{} + +type Events []Event type testEvent struct { msg string time time.Time From e4913f533a390117086879bbaa09f48115a0d925 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Tue, 6 Aug 2019 18:00:14 +0200 Subject: [PATCH 03/22] Fix race condition in shutdown: + ensure that we stop accepting messages once `stop` has been called to avoid the case in which we attempt to write to a channel which has already been closed --- blockchain/v2/reactor.go | 7 +------ blockchain/v2/routine.go | 24 ++++++++++++++++++++++-- blockchain/v2/routine_test.go | 5 +++-- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 4f07224ca..e8e10ae2d 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -3,8 +3,6 @@ package v2 import ( "fmt" "time" - - "github.com/tendermint/tendermint/libs/log" ) func schedulerHandle(event Event) (Events, error) { @@ -40,14 +38,11 @@ type Reactor struct { tickerStopped chan struct{} } +// TODO: setLogger should set loggers of the routines func (r *Reactor) Start() { - logger := log.TestingLogger() - // what is the best way to get the events out of the routine r.scheduler = newRoutine("scheduler", schedulerHandle) - r.scheduler.setLogger(logger) r.processor = newRoutine("processor", processorHandle) - r.processor.setLogger(logger) // so actually the demuxer only needs to read from events r.demuxer = newDemuxer(r.scheduler, r.processor) r.tickerStopped = make(chan struct{}) diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index fd923966e..e70c76679 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -26,6 +26,7 @@ type Routine struct { handle handleFunc logger log.Logger metrics *Metrics + stopping *uint32 } func newRoutine(name string, handleFunc handleFunc) *Routine { @@ -38,6 +39,7 @@ func newRoutine(name string, handleFunc handleFunc) *Routine { stopped: make(chan struct{}, 1), finished: make(chan error, 1), running: new(uint32), + stopping: new(uint32), logger: log.NewNopLogger(), metrics: NopMetrics(), } @@ -60,6 +62,7 @@ func (rt *Routine) run() { errorsDrained := false for { if !rt.isRunning() { + rt.logger.Info(fmt.Sprintf("%s: breaking because not running\n", rt.name)) break } select { @@ -67,6 +70,8 @@ func (rt *Routine) run() { rt.metrics.EventsIn.With("routine", rt.name).Add(1) if !ok { if !errorsDrained { + rt.logger.Info(fmt.Sprintf("%s: waiting for errors to drain\n", rt.name)) + continue // wait for errors to be drainned } rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) @@ -112,11 +117,11 @@ func (rt *Routine) feedback() { } } -// XXX: this should be called trySend for consistency func (rt *Routine) send(event Event) bool { - if !rt.isRunning() { + if !rt.isRunning() || rt.isStopping() { return false } + rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event)) if err, ok := event.(error); ok { select { @@ -145,6 +150,10 @@ func (rt *Routine) isRunning() bool { return atomic.LoadUint32(rt.running) == 1 } +func (rt *Routine) isStopping() bool { + return atomic.LoadUint32(rt.stopping) == 1 +} + func (rt *Routine) output() chan Event { return rt.out } @@ -153,7 +162,13 @@ func (rt *Routine) stop() { if !rt.isRunning() { return } + rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name)) + stopping := atomic.CompareAndSwapUint32(rt.stopping, uint32(0), uint32(1)) + if !stopping { + panic("Routine has already stopped") + } + close(rt.input) close(rt.errors) <-rt.stopped @@ -172,3 +187,8 @@ func (rt *Routine) terminate(reason error) { func (rt *Routine) wait() error { return <-rt.finished } + +/* + Problem: + We can't write to channels from one thread and close channels from another thread +*/ diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go index 8a84dc4de..7ef8da4d7 100644 --- a/blockchain/v2/routine_test.go +++ b/blockchain/v2/routine_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/tendermint/tendermint/libs/log" ) type eventA struct{} @@ -86,6 +87,7 @@ func genStatefulHandler(maxCount int) handleFunc { func TestStatefulRoutine(t *testing.T) { handler := genStatefulHandler(10) routine := newRoutine("statefulRoutine", handler) + routine.setLogger(log.TestingLogger()) go routine.run() go routine.feedback() @@ -97,7 +99,7 @@ func TestStatefulRoutine(t *testing.T) { time.Sleep(10 * time.Millisecond) } - go routine.send(eventA{}) + routine.send(eventA{}) routine.stop() } @@ -114,7 +116,6 @@ func handleWithErrors(event Event) (Events, error) { func TestErrorSaturation(t *testing.T) { routine := newRoutine("errorRoutine", handleWithErrors) - go routine.run() go func() { for { From c081b60ef6f07e1b3b6d9b4a41429481ed95ffdf Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Thu, 8 Aug 2019 15:12:11 +0200 Subject: [PATCH 04/22] Solidify API: + use `trySend` the replicate peer sending + expose `next()` as a chan of events as output + expose `final()` as a chan of error, for the final error + add `ready()` as chan struct when routine is ready --- blockchain/v2/demuxer.go | 22 +++++----- blockchain/v2/reactor.go | 12 ++--- blockchain/v2/routine.go | 37 ++++++++-------- blockchain/v2/routine_test.go | 82 +++++++++++++++++------------------ 4 files changed, 76 insertions(+), 77 deletions(-) diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go index 32165735e..9ae4c3f38 100644 --- a/blockchain/v2/demuxer.go +++ b/blockchain/v2/demuxer.go @@ -9,7 +9,7 @@ type demuxer struct { input chan Event scheduler *Routine processor *Routine - finished chan error + fin chan error stopped chan struct{} running *uint32 } @@ -26,12 +26,12 @@ func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { scheduler: scheduler, processor: processor, stopped: make(chan struct{}, 1), - finished: make(chan error, 1), + fin: make(chan error, 1), running: new(uint32), } } -func (dm *demuxer) run() { +func (dm *demuxer) start() { starting := atomic.CompareAndSwapUint32(dm.running, uint32(0), uint32(1)) if !starting { panic("Routine has already started") @@ -57,7 +57,7 @@ func (dm *demuxer) run() { for _, event := range oEvents { dm.input <- event } - case event, ok := <-dm.scheduler.output(): + case event, ok := <-dm.scheduler.next(): if !ok { fmt.Printf("demuxer: scheduler output closed\n") continue @@ -70,7 +70,7 @@ func (dm *demuxer) run() { for _, event := range oEvents { dm.input <- event } - case event, ok := <-dm.processor.output(): + case event, ok := <-dm.processor.next(): if !ok { fmt.Printf("demuxer: processor output closed\n") continue @@ -88,12 +88,12 @@ func (dm *demuxer) run() { } func (dm *demuxer) handle(event Event) (Events, error) { - received := dm.scheduler.send(event) + received := dm.scheduler.trySend(event) if !received { return Events{scFull{}}, nil // backpressure } - received = dm.processor.send(event) + received = dm.processor.trySend(event) if !received { return Events{pcFull{}}, nil // backpressure } @@ -101,7 +101,7 @@ func (dm *demuxer) handle(event Event) (Events, error) { return Events{}, nil } -func (dm *demuxer) send(event Event) bool { +func (dm *demuxer) trySend(event Event) bool { if !dm.isRunning() { fmt.Println("dummuxer isn't running") return false @@ -133,9 +133,9 @@ func (dm *demuxer) terminate(reason error) { if !stopped { panic("called terminate but already terminated") } - dm.finished <- reason + dm.fin <- reason } -func (dm *demuxer) wait() error { - return <-dm.finished +func (dm *demuxer) final() chan error { + return dm.fin } diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index e8e10ae2d..85f669739 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -47,9 +47,9 @@ func (r *Reactor) Start() { r.demuxer = newDemuxer(r.scheduler, r.processor) r.tickerStopped = make(chan struct{}) - go r.scheduler.run() - go r.processor.run() - go r.demuxer.run() + go r.scheduler.start() + go r.processor.start() + go r.demuxer.start() for { if r.scheduler.isRunning() && r.processor.isRunning() && r.demuxer.isRunning() { @@ -57,7 +57,7 @@ func (r *Reactor) Start() { break } fmt.Println("waiting") - time.Sleep(1 * time.Second) + time.Sleep(10 * time.Millisecond) } go func() { @@ -65,7 +65,7 @@ func (r *Reactor) Start() { for { select { case <-ticker.C: - r.demuxer.send(timeCheck{}) + r.demuxer.trySend(timeCheck{}) case <-r.tickerStopped: fmt.Println("ticker stopped") return @@ -94,7 +94,7 @@ func (r *Reactor) Stop() { func (r *Reactor) Receive(event Event) { fmt.Println("receive event") - sent := r.demuxer.send(event) + sent := r.demuxer.trySend(event) if !sent { fmt.Println("demuxer is full") } diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index e70c76679..1addff468 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -11,7 +11,6 @@ import ( // * revisit panic conditions // * audit log levels // * Convert routine to an interface with concrete implmentation -// * determine the public interface type handleFunc = func(event Event) (Events, error) @@ -21,7 +20,8 @@ type Routine struct { errors chan error out chan Event stopped chan struct{} - finished chan error + rdy chan struct{} + fin chan error running *uint32 handle handleFunc logger log.Logger @@ -37,7 +37,8 @@ func newRoutine(name string, handleFunc handleFunc) *Routine { errors: make(chan error, 1), out: make(chan Event, 1), stopped: make(chan struct{}, 1), - finished: make(chan error, 1), + rdy: make(chan struct{}, 1), + fin: make(chan error, 1), running: new(uint32), stopping: new(uint32), logger: log.NewNopLogger(), @@ -53,12 +54,13 @@ func (rt *Routine) setMetrics(metrics *Metrics) { rt.metrics = metrics } -func (rt *Routine) run() { +func (rt *Routine) start() { rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name)) starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) if !starting { panic("Routine has already started") } + rt.rdy <- struct{}{} errorsDrained := false for { if !rt.isRunning() { @@ -113,11 +115,11 @@ func (rt *Routine) run() { } func (rt *Routine) feedback() { for event := range rt.out { - rt.send(event) + rt.trySend(event) } } -func (rt *Routine) send(event Event) bool { +func (rt *Routine) trySend(event Event) bool { if !rt.isRunning() || rt.isStopping() { return false } @@ -154,7 +156,11 @@ func (rt *Routine) isStopping() bool { return atomic.LoadUint32(rt.stopping) == 1 } -func (rt *Routine) output() chan Event { +func (rt *Routine) ready() chan struct{} { + return rt.rdy +} + +func (rt *Routine) next() chan Event { return rt.out } @@ -174,21 +180,14 @@ func (rt *Routine) stop() { <-rt.stopped } +func (rt *Routine) final() chan error { + return rt.fin +} + func (rt *Routine) terminate(reason error) { stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0)) if !stopped { panic("called stop but already stopped") } - rt.finished <- reason + rt.fin <- reason } - -// XXX: this should probably produced the finished -// channel and let the caller deicde how long to wait -func (rt *Routine) wait() error { - return <-rt.finished -} - -/* - Problem: - We can't write to channels from one thread and close channels from another thread -*/ diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go index 7ef8da4d7..cbf4768a8 100644 --- a/blockchain/v2/routine_test.go +++ b/blockchain/v2/routine_test.go @@ -19,7 +19,7 @@ func simpleHandler(event Event) (Events, error) { case eventA: return Events{eventB{}}, nil case eventB: - return Events{routineFinished{}}, done + return Events{}, done } return Events{}, nil } @@ -29,53 +29,54 @@ func TestRoutine(t *testing.T) { assert.False(t, routine.isRunning(), "expected an initialized routine to not be running") - go routine.run() + go routine.start() go routine.feedback() - for { - if routine.isRunning() { - break - } - time.Sleep(10 * time.Millisecond) - } + <-routine.ready() - routine.send(eventA{}) + assert.True(t, routine.trySend(eventA{}), + "expected sending to a ready routine to succeed") - routine.stop() + assert.Equal(t, done, <-routine.final(), + "expected the final event to be done") } func TesRoutineSend(t *testing.T) { routine := newRoutine("simpleRoutine", simpleHandler) - assert.False(t, routine.send(eventA{}), + assert.False(t, routine.trySend(eventA{}), "expected sending to an unstarted routine to fail") - go routine.run() + go routine.start() go routine.feedback() - for { - if routine.isRunning() { - break - } - time.Sleep(10 * time.Millisecond) - } + <-routine.ready() - assert.True(t, routine.send(eventA{}), + assert.True(t, routine.trySend(eventA{}), "expected sending to a running routine to succeed") routine.stop() - assert.False(t, routine.send(eventA{}), + assert.False(t, routine.trySend(eventA{}), "expected sending to a stopped routine to fail") } +type finalCount struct { + count int +} + +func (f finalCount) Error() string { + return "end" +} + func genStatefulHandler(maxCount int) handleFunc { counter := 0 return func(event Event) (Events, error) { + // golint fixme switch event.(type) { case eventA: counter += 1 if counter >= maxCount { - return Events{}, done + return Events{}, finalCount{counter} } return Events{eventA{}}, nil @@ -85,23 +86,27 @@ func genStatefulHandler(maxCount int) handleFunc { } func TestStatefulRoutine(t *testing.T) { - handler := genStatefulHandler(10) + count := 10 + handler := genStatefulHandler(count) routine := newRoutine("statefulRoutine", handler) routine.setLogger(log.TestingLogger()) - go routine.run() + go routine.start() go routine.feedback() - for { - if routine.isRunning() { - break - } - time.Sleep(10 * time.Millisecond) - } + <-routine.ready() - routine.send(eventA{}) + assert.True(t, routine.trySend(eventA{}), + "expected sending to a started routine to succeed") - routine.stop() + final := <-routine.final() + fnl, ok := final.(finalCount) + if ok { + assert.Equal(t, count, fnl.count, + "expected the routine to count to 10") + } else { + t.Fail() + } } func handleWithErrors(event Event) (Events, error) { @@ -116,22 +121,17 @@ func handleWithErrors(event Event) (Events, error) { func TestErrorSaturation(t *testing.T) { routine := newRoutine("errorRoutine", handleWithErrors) - go routine.run() + go routine.start() + <-routine.ready() go func() { for { - routine.send(eventA{}) + routine.trySend(eventA{}) time.Sleep(10 * time.Millisecond) } }() - for { - if routine.isRunning() { - break - } - time.Sleep(10 * time.Millisecond) - } - assert.True(t, routine.send(errEvent{}), + assert.True(t, routine.trySend(errEvent{}), "expected send to succeed even when saturated") - routine.wait() + assert.Equal(t, done, <-routine.final()) } From 5b880fbcff0439099932a133e9566de19494134b Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Thu, 8 Aug 2019 15:53:02 +0200 Subject: [PATCH 05/22] cleanup events --- blockchain/v2/demuxer.go | 3 +++ blockchain/v2/reactor.go | 16 +++++++--------- blockchain/v2/reactor_test.go | 2 +- blockchain/v2/routine_test.go | 1 + blockchain/v2/types.go | 31 ------------------------------- 5 files changed, 12 insertions(+), 41 deletions(-) diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go index 9ae4c3f38..6b9df0f50 100644 --- a/blockchain/v2/demuxer.go +++ b/blockchain/v2/demuxer.go @@ -5,6 +5,9 @@ import ( "sync/atomic" ) +type scFull struct{} +type pcFull struct{} + type demuxer struct { input chan Event scheduler *Routine diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 85f669739..0a12f385c 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -5,13 +5,16 @@ import ( "time" ) +type timeCheck struct { + time time.Time +} + func schedulerHandle(event Event) (Events, error) { switch event.(type) { case timeCheck: fmt.Println("scheduler handle timeCheck") - case testEvent: + case Event: fmt.Println("scheduler handle testEvent") - return Events{scTestEvent{}}, nil } return Events{}, nil } @@ -20,11 +23,8 @@ func processorHandle(event Event) (Events, error) { switch event.(type) { case timeCheck: fmt.Println("processor handle timeCheck") - case testEvent: - fmt.Println("processor handle testEvent") - case scTestEvent: - fmt.Println("processor handle scTestEvent") - return Events{}, fmt.Errorf("processor done") + case Event: + fmt.Println("processor handle event") } return Events{}, nil } @@ -40,10 +40,8 @@ type Reactor struct { // TODO: setLogger should set loggers of the routines func (r *Reactor) Start() { - // what is the best way to get the events out of the routine r.scheduler = newRoutine("scheduler", schedulerHandle) r.processor = newRoutine("processor", processorHandle) - // so actually the demuxer only needs to read from events r.demuxer = newDemuxer(r.scheduler, r.processor) r.tickerStopped = make(chan struct{}) diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index b3430074f..d075641f7 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -7,7 +7,7 @@ func TestReactor(t *testing.T) { reactor := Reactor{} reactor.Start() script := Events{ - testEvent{}, + struct{}{}, } for _, event := range script { diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go index cbf4768a8..cb7944ba3 100644 --- a/blockchain/v2/routine_test.go +++ b/blockchain/v2/routine_test.go @@ -11,6 +11,7 @@ import ( type eventA struct{} type eventB struct{} +type errEvent struct{} var done = fmt.Errorf("done") diff --git a/blockchain/v2/types.go b/blockchain/v2/types.go index e6f491460..dbde352a3 100644 --- a/blockchain/v2/types.go +++ b/blockchain/v2/types.go @@ -1,35 +1,4 @@ package v2 -import "time" - type Event interface{} - type Events []Event -type testEvent struct { - msg string - time time.Time -} - -type testEventTwo struct { - msg string -} - -type stopEvent struct{} -type timeCheck struct { - time time.Time -} - -type errEvent struct{} - -type scTestEvent struct{} - -type pcFinished struct{} - -type routineFinished struct{} - -func (rf *routineFinished) Error() string { - return "routine finished" -} - -type scFull struct{} -type pcFull struct{} From e826ca3c494904d63142766a5bf6165eab4a08cc Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Thu, 8 Aug 2019 16:48:07 +0200 Subject: [PATCH 06/22] demuxer cleanup --- blockchain/v2/demuxer.go | 46 ++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go index 6b9df0f50..70b670b2f 100644 --- a/blockchain/v2/demuxer.go +++ b/blockchain/v2/demuxer.go @@ -3,6 +3,8 @@ package v2 import ( "fmt" "sync/atomic" + + "github.com/tendermint/tendermint/libs/log" ) type scFull struct{} @@ -14,15 +16,12 @@ type demuxer struct { processor *Routine fin chan error stopped chan struct{} + rdy chan struct{} running *uint32 + stopping *uint32 + logger log.Logger } -// TODO -// demuxer_test -// Termination process -// Logger -// Metrics -// Adhere to interface func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { return &demuxer{ input: make(chan Event, 10), @@ -30,16 +29,23 @@ func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { processor: processor, stopped: make(chan struct{}, 1), fin: make(chan error, 1), + rdy: make(chan struct{}, 1), running: new(uint32), + stopping: new(uint32), + logger: log.NewNopLogger(), } } +func (dm *demuxer) setLogger(logger log.Logger) { + dm.logger = logger +} + func (dm *demuxer) start() { starting := atomic.CompareAndSwapUint32(dm.running, uint32(0), uint32(1)) if !starting { panic("Routine has already started") } - fmt.Printf("demuxer: run\n") + dm.logger.Info("demuxer: run") for { if !dm.isRunning() { break @@ -47,7 +53,7 @@ func (dm *demuxer) start() { select { case event, ok := <-dm.input: if !ok { - fmt.Printf("demuxer: stopping\n") + dm.logger.Info("demuxer: stopping") dm.terminate(fmt.Errorf("stopped")) dm.stopped <- struct{}{} return @@ -62,7 +68,7 @@ func (dm *demuxer) start() { } case event, ok := <-dm.scheduler.next(): if !ok { - fmt.Printf("demuxer: scheduler output closed\n") + dm.logger.Info("demuxer: scheduler output closed") continue } oEvents, err := dm.handle(event) @@ -75,7 +81,7 @@ func (dm *demuxer) start() { } case event, ok := <-dm.processor.next(): if !ok { - fmt.Printf("demuxer: processor output closed\n") + dm.logger.Info("demuxer: processor output closed") continue } oEvents, err := dm.handle(event) @@ -105,15 +111,15 @@ func (dm *demuxer) handle(event Event) (Events, error) { } func (dm *demuxer) trySend(event Event) bool { - if !dm.isRunning() { - fmt.Println("dummuxer isn't running") + if !dm.isRunning() || dm.isStopping() { + dm.logger.Info("dummuxer isn't running") return false } select { case dm.input <- event: return true default: - fmt.Printf("demuxer channel was full\n") + dm.logger.Info("demuxer channel was full") return false } } @@ -122,11 +128,23 @@ func (dm *demuxer) isRunning() bool { return atomic.LoadUint32(dm.running) == 1 } +func (dm *demuxer) isStopping() bool { + return atomic.LoadUint32(dm.stopping) == 1 +} + +func (dm *demuxer) ready() chan struct{} { + return dm.rdy +} + func (dm *demuxer) stop() { if !dm.isRunning() { return } - fmt.Printf("demuxer stop\n") + stopping := atomic.CompareAndSwapUint32(dm.stopping, uint32(0), uint32(1)) + if !stopping { + panic("Demuxer has already stopped") + } + dm.logger.Info("demuxer stop") close(dm.input) <-dm.stopped } From aeac4743ccf292b57f0d02587a33b3fc7300a180 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Thu, 8 Aug 2019 16:54:25 +0200 Subject: [PATCH 07/22] typo fix --- blockchain/v2/routine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 1addff468..715be6b4a 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -90,7 +90,7 @@ func (rt *Routine) start() { rt.metrics.EventsOut.With("routine", rt.name).Add(float64(len(oEvents))) rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents))) for _, event := range oEvents { - rt.logger.Info(fmt.Sprintln("writting back to output")) + rt.logger.Info(fmt.Sprintln("writing back to output")) rt.out <- event } case iEvent, ok := <-rt.errors: From acbfe67fb84175444132b4017a0c0d6d8579841c Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Thu, 8 Aug 2019 16:56:39 +0200 Subject: [PATCH 08/22] set logger --- blockchain/v2/reactor.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 0a12f385c..e76cb4e7e 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -3,6 +3,8 @@ package v2 import ( "fmt" "time" + + "github.com/tendermint/tendermint/libs/log" ) type timeCheck struct { @@ -38,7 +40,12 @@ type Reactor struct { tickerStopped chan struct{} } -// TODO: setLogger should set loggers of the routines +func (r *Reactor) setLogger(logger log.Logger) { + r.scheduler.setLogger(logger) + r.processor.setLogger(logger) + r.demuxer.setLogger(logger) +} + func (r *Reactor) Start() { r.scheduler = newRoutine("scheduler", schedulerHandle) r.processor = newRoutine("processor", processorHandle) From 2c8cbfc26ad5790e03acb62770fd386e63a457c9 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Thu, 8 Aug 2019 17:42:46 +0200 Subject: [PATCH 09/22] linter fixes --- blockchain/v2/demuxer.go | 2 ++ blockchain/v2/reactor.go | 15 ++++----------- blockchain/v2/routine.go | 1 + blockchain/v2/routine_test.go | 4 +++- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go index 70b670b2f..7996ecfa0 100644 --- a/blockchain/v2/demuxer.go +++ b/blockchain/v2/demuxer.go @@ -1,3 +1,4 @@ +// nolint:unused package v2 import ( @@ -46,6 +47,7 @@ func (dm *demuxer) start() { panic("Routine has already started") } dm.logger.Info("demuxer: run") + dm.rdy <- struct{}{} for { if !dm.isRunning() { break diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index e76cb4e7e..97226a5d9 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -8,7 +8,6 @@ import ( ) type timeCheck struct { - time time.Time } func schedulerHandle(event Event) (Events, error) { @@ -31,15 +30,14 @@ func processorHandle(event Event) (Events, error) { return Events{}, nil } -// reactor type Reactor struct { demuxer *demuxer scheduler *Routine processor *Routine - ticker *time.Ticker tickerStopped chan struct{} } +// nolint:unused func (r *Reactor) setLogger(logger log.Logger) { r.scheduler.setLogger(logger) r.processor.setLogger(logger) @@ -56,14 +54,9 @@ func (r *Reactor) Start() { go r.processor.start() go r.demuxer.start() - for { - if r.scheduler.isRunning() && r.processor.isRunning() && r.demuxer.isRunning() { - fmt.Println("routines running") - break - } - fmt.Println("waiting") - time.Sleep(10 * time.Millisecond) - } + <-r.scheduler.ready() + <-r.processor.ready() + <-r.demuxer.ready() go func() { ticker := time.NewTicker(1 * time.Second) diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 715be6b4a..977d2cd16 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -50,6 +50,7 @@ func (rt *Routine) setLogger(logger log.Logger) { rt.logger = logger } +// nolint:unused func (rt *Routine) setMetrics(metrics *Metrics) { rt.metrics = metrics } diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go index cb7944ba3..8de36e275 100644 --- a/blockchain/v2/routine_test.go +++ b/blockchain/v2/routine_test.go @@ -41,7 +41,7 @@ func TestRoutine(t *testing.T) { "expected the final event to be done") } -func TesRoutineSend(t *testing.T) { +func TestRoutineSend(t *testing.T) { routine := newRoutine("simpleRoutine", simpleHandler) assert.False(t, routine.trySend(eventA{}), @@ -81,6 +81,8 @@ func genStatefulHandler(maxCount int) handleFunc { } return Events{eventA{}}, nil + case eventB: + return Events{}, nil } return Events{}, nil } From 78d4c3b88a098249e00b347cabadae1523d2a42d Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Tue, 13 Aug 2019 17:57:17 +0200 Subject: [PATCH 10/22] fixes based on feedback --- blockchain/v2/demuxer.go | 4 +++- blockchain/v2/reactor.go | 24 +++++++++--------------- blockchain/v2/routine.go | 3 ++- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go index 7996ecfa0..e19816a2d 100644 --- a/blockchain/v2/demuxer.go +++ b/blockchain/v2/demuxer.go @@ -11,6 +11,8 @@ import ( type scFull struct{} type pcFull struct{} +const demuxerBufferSize = 10 + type demuxer struct { input chan Event scheduler *Routine @@ -25,7 +27,7 @@ type demuxer struct { func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { return &demuxer{ - input: make(chan Event, 10), + input: make(chan Event, demuxerBufferSize), scheduler: scheduler, processor: processor, stopped: make(chan struct{}, 1), diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 97226a5d9..7c7bf4e27 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -8,6 +8,7 @@ import ( ) type timeCheck struct { + time time.Time } func schedulerHandle(event Event) (Events, error) { @@ -31,10 +32,10 @@ func processorHandle(event Event) (Events, error) { } type Reactor struct { - demuxer *demuxer - scheduler *Routine - processor *Routine - tickerStopped chan struct{} + demuxer *demuxer + scheduler *Routine + processor *Routine + ticker *time.Ticker } // nolint:unused @@ -48,7 +49,7 @@ func (r *Reactor) Start() { r.scheduler = newRoutine("scheduler", schedulerHandle) r.processor = newRoutine("processor", processorHandle) r.demuxer = newDemuxer(r.scheduler, r.processor) - r.tickerStopped = make(chan struct{}) + r.ticker = time.NewTicker(1 * time.Second) go r.scheduler.start() go r.processor.start() @@ -59,15 +60,8 @@ func (r *Reactor) Start() { <-r.demuxer.ready() go func() { - ticker := time.NewTicker(1 * time.Second) - for { - select { - case <-ticker.C: - r.demuxer.trySend(timeCheck{}) - case <-r.tickerStopped: - fmt.Println("ticker stopped") - return - } + for t := range r.ticker.C { + r.demuxer.trySend(timeCheck{t}) } }() } @@ -80,7 +74,7 @@ func (r *Reactor) Wait() { func (r *Reactor) Stop() { fmt.Println("reactor stopping") - r.tickerStopped <- struct{}{} + r.ticker.Stop() r.demuxer.stop() r.scheduler.stop() r.processor.stop() diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 977d2cd16..ecd12c82c 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -56,6 +56,7 @@ func (rt *Routine) setMetrics(metrics *Metrics) { } func (rt *Routine) start() { + // what if we call baseService.start rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name)) starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) if !starting { @@ -78,7 +79,7 @@ func (rt *Routine) start() { continue // wait for errors to be drainned } rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) - rt.stopped <- struct{}{} + close(rt.stopped) rt.terminate(fmt.Errorf("stopped")) return } From f81c319ecef8b20c7f77b817177f8305b3232bb3 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Tue, 13 Aug 2019 19:29:28 +0200 Subject: [PATCH 11/22] Add some docs --- blockchain/v2/routine.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index ecd12c82c..089414708 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -14,6 +14,12 @@ import ( type handleFunc = func(event Event) (Events, error) +// Routines are a structure which model a finite state machine as serialized +// stream of events processed by a handle function. This Routine structure +// handles the concurrency and messaging guarantees. Events are sent via +// `trySend` are handled by the `handle` function to produce an iterator +// `next()`. Calling `close()` on a routine will conclude processing of all +// sent events and produce `last()` event representing the terminal state. type Routine struct { name string input chan Event @@ -56,7 +62,6 @@ func (rt *Routine) setMetrics(metrics *Metrics) { } func (rt *Routine) start() { - // what if we call baseService.start rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name)) starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) if !starting { From 9d41770a99b7331abcfa258f564e5acfe03bcea1 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Wed, 21 Aug 2019 21:37:53 +0200 Subject: [PATCH 12/22] Close rdy channel + close `rdy` channel to ensure that calls to `<-ready()` will always return if the routine is ready --- blockchain/v2/routine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 089414708..b9b09e3df 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -67,7 +67,7 @@ func (rt *Routine) start() { if !starting { panic("Routine has already started") } - rt.rdy <- struct{}{} + close(rt.rdy) errorsDrained := false for { if !rt.isRunning() { From 5474528db14bfb4b0b245788d16e125a1ddbeee1 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Thu, 12 Sep 2019 12:06:26 -0400 Subject: [PATCH 13/22] Switch to a priority queue: * Routines will now use a priority queue instead of channels to iterate over events --- blockchain/v2/demuxer.go | 34 +++---- blockchain/v2/reactor.go | 11 ++- blockchain/v2/reactor_test.go | 4 +- blockchain/v2/routine.go | 181 +++++++++++----------------------- blockchain/v2/routine_test.go | 84 +++++++++------- blockchain/v2/types.go | 64 +++++++++++- go.mod | 1 + go.sum | 2 + 8 files changed, 196 insertions(+), 185 deletions(-) diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go index e19816a2d..5ebb54633 100644 --- a/blockchain/v2/demuxer.go +++ b/blockchain/v2/demuxer.go @@ -8,8 +8,12 @@ import ( "github.com/tendermint/tendermint/libs/log" ) -type scFull struct{} -type pcFull struct{} +type scFull struct { + priorityHigh +} +type pcFull struct { + priorityHigh +} const demuxerBufferSize = 10 @@ -62,56 +66,50 @@ func (dm *demuxer) start() { dm.stopped <- struct{}{} return } - oEvents, err := dm.handle(event) + oEvent, err := dm.handle(event) if err != nil { dm.terminate(err) return } - for _, event := range oEvents { - dm.input <- event - } + dm.input <- oEvent case event, ok := <-dm.scheduler.next(): if !ok { dm.logger.Info("demuxer: scheduler output closed") continue } - oEvents, err := dm.handle(event) + oEvent, err := dm.handle(event) if err != nil { dm.terminate(err) return } - for _, event := range oEvents { - dm.input <- event - } + dm.input <- oEvent case event, ok := <-dm.processor.next(): if !ok { dm.logger.Info("demuxer: processor output closed") continue } - oEvents, err := dm.handle(event) + oEvent, err := dm.handle(event) if err != nil { dm.terminate(err) return } - for _, event := range oEvents { - dm.input <- event - } + dm.input <- oEvent } } } -func (dm *demuxer) handle(event Event) (Events, error) { +func (dm *demuxer) handle(event Event) (Event, error) { received := dm.scheduler.trySend(event) if !received { - return Events{scFull{}}, nil // backpressure + return scFull{}, nil // backpressure } received = dm.processor.trySend(event) if !received { - return Events{pcFull{}}, nil // backpressure + return pcFull{}, nil // backpressure } - return Events{}, nil + return noOp, nil } func (dm *demuxer) trySend(event Event) bool { diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 7c7bf4e27..64deb5ce8 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -8,27 +8,28 @@ import ( ) type timeCheck struct { + priorityHigh time time.Time } -func schedulerHandle(event Event) (Events, error) { +func schedulerHandle(event Event) (Event, error) { switch event.(type) { case timeCheck: fmt.Println("scheduler handle timeCheck") case Event: fmt.Println("scheduler handle testEvent") } - return Events{}, nil + return noOp, nil } -func processorHandle(event Event) (Events, error) { +func processorHandle(event Event) (Event, error) { switch event.(type) { case timeCheck: fmt.Println("processor handle timeCheck") case Event: fmt.Println("processor handle event") } - return Events{}, nil + return noOp, nil } type Reactor struct { @@ -61,7 +62,7 @@ func (r *Reactor) Start() { go func() { for t := range r.ticker.C { - r.demuxer.trySend(timeCheck{t}) + r.demuxer.trySend(timeCheck{time: t}) } }() } diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index d075641f7..2b3e2be1e 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -6,8 +6,8 @@ import "testing" func TestReactor(t *testing.T) { reactor := Reactor{} reactor.Start() - script := Events{ - struct{}{}, + script := []Event{ + // TODO } for _, event := range script { diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index b9b09e3df..eaaa68cf0 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -4,6 +4,7 @@ import ( "fmt" "sync/atomic" + "github.com/Workiva/go-datastructures/queue" "github.com/tendermint/tendermint/libs/log" ) @@ -12,7 +13,7 @@ import ( // * audit log levels // * Convert routine to an interface with concrete implmentation -type handleFunc = func(event Event) (Events, error) +type handleFunc = func(event Event) (Event, error) // Routines are a structure which model a finite state machine as serialized // stream of events processed by a handle function. This Routine structure @@ -21,34 +22,30 @@ type handleFunc = func(event Event) (Events, error) // `next()`. Calling `close()` on a routine will conclude processing of all // sent events and produce `last()` event representing the terminal state. type Routine struct { - name string - input chan Event - errors chan error - out chan Event - stopped chan struct{} - rdy chan struct{} - fin chan error - running *uint32 - handle handleFunc - logger log.Logger - metrics *Metrics - stopping *uint32 + name string + queue *queue.PriorityQueue + out chan Event // XXX: actually item + fin chan error + rdy chan struct{} + running *uint32 + handle handleFunc + logger log.Logger + metrics *Metrics } +var queueSize int = 10 + func newRoutine(name string, handleFunc handleFunc) *Routine { return &Routine{ - name: name, - input: make(chan Event, 1), - handle: handleFunc, - errors: make(chan error, 1), - out: make(chan Event, 1), - stopped: make(chan struct{}, 1), - rdy: make(chan struct{}, 1), - fin: make(chan error, 1), - running: new(uint32), - stopping: new(uint32), - logger: log.NewNopLogger(), - metrics: NopMetrics(), + name: name, + queue: queue.NewPriorityQueue(queueSize, true), + handle: handleFunc, + out: make(chan Event, queueSize), + rdy: make(chan struct{}, 1), + fin: make(chan error, 1), + running: new(uint32), + logger: log.NewNopLogger(), + metrics: NopMetrics(), } } @@ -63,138 +60,80 @@ func (rt *Routine) setMetrics(metrics *Metrics) { func (rt *Routine) start() { rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name)) - starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) - if !starting { - panic("Routine has already started") + running := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) + if !running { + panic(fmt.Sprintf("%s is already running", rt.name)) } close(rt.rdy) - errorsDrained := false + defer func() { + stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0)) + if !stopped { + panic(fmt.Sprintf("%s is failed to stop", rt.name)) + } + }() + for { - if !rt.isRunning() { - rt.logger.Info(fmt.Sprintf("%s: breaking because not running\n", rt.name)) - break + events, err := rt.queue.Get(1) + if err != nil { + rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) + rt.terminate(fmt.Errorf("stopped")) + return } - select { - case iEvent, ok := <-rt.input: - rt.metrics.EventsIn.With("routine", rt.name).Add(1) - if !ok { - if !errorsDrained { - rt.logger.Info(fmt.Sprintf("%s: waiting for errors to drain\n", rt.name)) - - continue // wait for errors to be drainned - } - rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) - close(rt.stopped) - rt.terminate(fmt.Errorf("stopped")) - return - } - oEvents, err := rt.handle(iEvent) - rt.metrics.EventsHandled.With("routine", rt.name).Add(1) - if err != nil { - rt.terminate(err) - return - } - rt.metrics.EventsOut.With("routine", rt.name).Add(float64(len(oEvents))) - rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents))) - for _, event := range oEvents { - rt.logger.Info(fmt.Sprintln("writing back to output")) - rt.out <- event - } - case iEvent, ok := <-rt.errors: - rt.metrics.ErrorsIn.With("routine", rt.name).Add(1) - if !ok { - rt.logger.Info(fmt.Sprintf("%s: errors closed\n", rt.name)) - errorsDrained = true - continue - } - oEvents, err := rt.handle(iEvent) - rt.metrics.ErrorsHandled.With("routine", rt.name).Add(1) - if err != nil { - rt.terminate(err) - return - } - rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents))) - for _, event := range oEvents { - rt.out <- event - } + oEvent, err := rt.handle(events[0]) + rt.metrics.EventsHandled.With("routine", rt.name).Add(1) + if err != nil { + rt.terminate(err) + return } - } -} -func (rt *Routine) feedback() { - for event := range rt.out { - rt.trySend(event) + rt.metrics.EventsOut.With("routine", rt.name).Add(1) + rt.logger.Debug(fmt.Sprintf("%s produced %+v event\n", rt.name, oEvent)) + + rt.out <- oEvent } } func (rt *Routine) trySend(event Event) bool { - if !rt.isRunning() || rt.isStopping() { + rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event)) + if !rt.isRunning() { return false } - - rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event)) - if err, ok := event.(error); ok { - select { - case rt.errors <- err: - rt.metrics.ErrorsSent.With("routine", rt.name).Add(1) - return true - default: - rt.metrics.ErrorsShed.With("routine", rt.name).Add(1) - rt.logger.Info(fmt.Sprintf("%s: errors channel was full\n", rt.name)) - return false - } - } else { - select { - case rt.input <- event: - rt.metrics.EventsSent.With("routine", rt.name).Add(1) - return true - default: - rt.metrics.EventsShed.With("routine", rt.name).Add(1) - rt.logger.Info(fmt.Sprintf("%s: channel was full\n", rt.name)) - return false - } + err := rt.queue.Put(event) + if err != nil { + rt.metrics.EventsShed.With("routine", rt.name).Add(1) + rt.logger.Info(fmt.Sprintf("%s: trySend fail, queue was full/stopped \n", rt.name)) + return false } + rt.metrics.EventsSent.With("routine", rt.name).Add(1) + return true } func (rt *Routine) isRunning() bool { return atomic.LoadUint32(rt.running) == 1 } -func (rt *Routine) isStopping() bool { - return atomic.LoadUint32(rt.stopping) == 1 +func (rt *Routine) next() chan Event { + return rt.out } func (rt *Routine) ready() chan struct{} { return rt.rdy } -func (rt *Routine) next() chan Event { - return rt.out -} - func (rt *Routine) stop() { if !rt.isRunning() { return } rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name)) - stopping := atomic.CompareAndSwapUint32(rt.stopping, uint32(0), uint32(1)) - if !stopping { - panic("Routine has already stopped") - } - - close(rt.input) - close(rt.errors) - <-rt.stopped + rt.queue.Dispose() // this should block until all queue items are free? } func (rt *Routine) final() chan error { return rt.fin } +// XXX: Maybe get rid of this func (rt *Routine) terminate(reason error) { - stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0)) - if !stopped { - panic("called stop but already stopped") - } + close(rt.out) rt.fin <- reason } diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go index 8de36e275..38bff1fcf 100644 --- a/blockchain/v2/routine_test.go +++ b/blockchain/v2/routine_test.go @@ -9,47 +9,47 @@ import ( "github.com/tendermint/tendermint/libs/log" ) -type eventA struct{} -type eventB struct{} -type errEvent struct{} +type eventA struct { + priorityNormal +} var done = fmt.Errorf("done") -func simpleHandler(event Event) (Events, error) { +func simpleHandler(event Event) (Event, error) { switch event.(type) { case eventA: - return Events{eventB{}}, nil - case eventB: - return Events{}, done + return noOp, done } - return Events{}, nil + return noOp, nil } -func TestRoutine(t *testing.T) { +func TestRoutineFinal(t *testing.T) { routine := newRoutine("simpleRoutine", simpleHandler) assert.False(t, routine.isRunning(), "expected an initialized routine to not be running") go routine.start() - go routine.feedback() <-routine.ready() + assert.True(t, routine.isRunning(), + "expected an started routine") assert.True(t, routine.trySend(eventA{}), "expected sending to a ready routine to succeed") assert.Equal(t, done, <-routine.final(), "expected the final event to be done") + + assert.False(t, routine.isRunning(), + "expected an completed routine to no longer be running") } -func TestRoutineSend(t *testing.T) { +func TestRoutineStop(t *testing.T) { routine := newRoutine("simpleRoutine", simpleHandler) assert.False(t, routine.trySend(eventA{}), "expected sending to an unstarted routine to fail") go routine.start() - - go routine.feedback() <-routine.ready() assert.True(t, routine.trySend(eventA{}), @@ -71,20 +71,22 @@ func (f finalCount) Error() string { func genStatefulHandler(maxCount int) handleFunc { counter := 0 - return func(event Event) (Events, error) { - // golint fixme - switch event.(type) { - case eventA: + return func(event Event) (Event, error) { + if _, ok := event.(eventA); ok { counter += 1 if counter >= maxCount { - return Events{}, finalCount{counter} + return noOp, finalCount{counter} } - return Events{eventA{}}, nil - case eventB: - return Events{}, nil + return eventA{}, nil } - return Events{}, nil + return noOp, nil + } +} + +func feedback(r *Routine) { + for event := range r.next() { + r.trySend(event) } } @@ -95,16 +97,14 @@ func TestStatefulRoutine(t *testing.T) { routine.setLogger(log.TestingLogger()) go routine.start() - go routine.feedback() - + go feedback(routine) <-routine.ready() assert.True(t, routine.trySend(eventA{}), "expected sending to a started routine to succeed") final := <-routine.final() - fnl, ok := final.(finalCount) - if ok { + if fnl, ok := final.(finalCount); ok { assert.Equal(t, count, fnl.count, "expected the routine to count to 10") } else { @@ -112,28 +112,38 @@ func TestStatefulRoutine(t *testing.T) { } } -func handleWithErrors(event Event) (Events, error) { +type lowPriorityEvent struct { + priorityLow +} + +type highPriorityEvent struct { + priorityHigh +} + +func handleWithPriority(event Event) (Event, error) { switch event.(type) { - case eventA: - return Events{}, nil - case errEvent: - return Events{}, done + case lowPriorityEvent: + return noOp, nil + case highPriorityEvent: + return noOp, done } - return Events{}, nil + return noOp, nil } -func TestErrorSaturation(t *testing.T) { - routine := newRoutine("errorRoutine", handleWithErrors) +func TestPriority(t *testing.T) { + // XXX: align with buffer size + routine := newRoutine("priorityRoutine", handleWithPriority) go routine.start() <-routine.ready() go func() { for { - routine.trySend(eventA{}) - time.Sleep(10 * time.Millisecond) + routine.trySend(lowPriorityEvent{}) + time.Sleep(1 * time.Millisecond) } }() + time.Sleep(10 * time.Millisecond) - assert.True(t, routine.trySend(errEvent{}), + assert.True(t, routine.trySend(highPriorityEvent{}), "expected send to succeed even when saturated") assert.Equal(t, done, <-routine.final()) diff --git a/blockchain/v2/types.go b/blockchain/v2/types.go index dbde352a3..836e87fd8 100644 --- a/blockchain/v2/types.go +++ b/blockchain/v2/types.go @@ -1,4 +1,64 @@ package v2 -type Event interface{} -type Events []Event +import ( + "github.com/Workiva/go-datastructures/queue" +) + +type Event queue.Item + +type priority interface { + Compare(other queue.Item) int + Priority() int +} + +type priorityLow struct{} +type priorityNormal struct{} +type priorityHigh struct{} + +func (p priorityLow) Priority() int { + return 1 +} + +func (p priorityNormal) Priority() int { + return 2 +} + +func (p priorityHigh) Priority() int { + return 3 +} + +func (p priorityLow) Compare(other queue.Item) int { + op := other.(priority) + if p.Priority() > op.Priority() { + return 1 + } else if p.Priority() == op.Priority() { + return 0 + } + return -1 +} + +func (p priorityNormal) Compare(other queue.Item) int { + op := other.(priority) + if p.Priority() > op.Priority() { + return 1 + } else if p.Priority() == op.Priority() { + return 0 + } + return -1 +} + +func (p priorityHigh) Compare(other queue.Item) int { + op := other.(priority) + if p.Priority() > op.Priority() { + return 1 + } else if p.Priority() == op.Priority() { + return 0 + } + return -1 +} + +type noOpEvent struct { + priorityLow +} + +var noOp = noOpEvent{} diff --git a/go.mod b/go.mod index ffa305e38..1bf4f0277 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.12 require ( github.com/VividCortex/gohistogram v1.0.0 // indirect + github.com/Workiva/go-datastructures v1.0.50 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect github.com/btcsuite/btcd v0.0.0-20190115013929-ed77733ec07d github.com/btcsuite/btcutil v0.0.0-20180706230648-ab6388e0c60a diff --git a/go.sum b/go.sum index c6d506f0b..a8a72a82d 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= +github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo= +github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= From c62b7fbd7e97cdafd343e6949903e8b4eecae4a1 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Thu, 12 Sep 2019 12:50:25 -0400 Subject: [PATCH 14/22] feedback tweaks --- blockchain/v2/routine.go | 6 +++--- blockchain/v2/routine_test.go | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index eaaa68cf0..9c5b97b18 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -23,12 +23,12 @@ type handleFunc = func(event Event) (Event, error) // sent events and produce `last()` event representing the terminal state. type Routine struct { name string + handle handleFunc queue *queue.PriorityQueue - out chan Event // XXX: actually item + out chan Event fin chan error rdy chan struct{} running *uint32 - handle handleFunc logger log.Logger metrics *Metrics } @@ -38,8 +38,8 @@ var queueSize int = 10 func newRoutine(name string, handleFunc handleFunc) *Routine { return &Routine{ name: name, - queue: queue.NewPriorityQueue(queueSize, true), handle: handleFunc, + queue: queue.NewPriorityQueue(queueSize, true), out: make(chan Event, queueSize), rdy: make(chan struct{}, 1), fin: make(chan error, 1), diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go index 38bff1fcf..6fa8bde32 100644 --- a/blockchain/v2/routine_test.go +++ b/blockchain/v2/routine_test.go @@ -143,8 +143,12 @@ func TestPriority(t *testing.T) { }() time.Sleep(10 * time.Millisecond) + assert.True(t, routine.isRunning(), + "expected an started routine") assert.True(t, routine.trySend(highPriorityEvent{}), "expected send to succeed even when saturated") assert.Equal(t, done, <-routine.final()) + assert.False(t, routine.isRunning(), + "expected an started routine") } From e7ee314c99830f0cc0cebe6d2415d05fe32ceb2e Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Fri, 13 Sep 2019 11:33:38 -0400 Subject: [PATCH 15/22] Subsume the demuxer into the reactor + Simplify the design by demuxing events directly in the reactor --- blockchain/v2/demuxer.go | 164 ---------------------------------- blockchain/v2/reactor.go | 75 ++++++++++------ blockchain/v2/reactor_test.go | 12 ++- blockchain/v2/routine.go | 2 +- 4 files changed, 57 insertions(+), 196 deletions(-) delete mode 100644 blockchain/v2/demuxer.go diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go deleted file mode 100644 index 5ebb54633..000000000 --- a/blockchain/v2/demuxer.go +++ /dev/null @@ -1,164 +0,0 @@ -// nolint:unused -package v2 - -import ( - "fmt" - "sync/atomic" - - "github.com/tendermint/tendermint/libs/log" -) - -type scFull struct { - priorityHigh -} -type pcFull struct { - priorityHigh -} - -const demuxerBufferSize = 10 - -type demuxer struct { - input chan Event - scheduler *Routine - processor *Routine - fin chan error - stopped chan struct{} - rdy chan struct{} - running *uint32 - stopping *uint32 - logger log.Logger -} - -func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { - return &demuxer{ - input: make(chan Event, demuxerBufferSize), - scheduler: scheduler, - processor: processor, - stopped: make(chan struct{}, 1), - fin: make(chan error, 1), - rdy: make(chan struct{}, 1), - running: new(uint32), - stopping: new(uint32), - logger: log.NewNopLogger(), - } -} - -func (dm *demuxer) setLogger(logger log.Logger) { - dm.logger = logger -} - -func (dm *demuxer) start() { - starting := atomic.CompareAndSwapUint32(dm.running, uint32(0), uint32(1)) - if !starting { - panic("Routine has already started") - } - dm.logger.Info("demuxer: run") - dm.rdy <- struct{}{} - for { - if !dm.isRunning() { - break - } - select { - case event, ok := <-dm.input: - if !ok { - dm.logger.Info("demuxer: stopping") - dm.terminate(fmt.Errorf("stopped")) - dm.stopped <- struct{}{} - return - } - oEvent, err := dm.handle(event) - if err != nil { - dm.terminate(err) - return - } - dm.input <- oEvent - case event, ok := <-dm.scheduler.next(): - if !ok { - dm.logger.Info("demuxer: scheduler output closed") - continue - } - oEvent, err := dm.handle(event) - if err != nil { - dm.terminate(err) - return - } - dm.input <- oEvent - case event, ok := <-dm.processor.next(): - if !ok { - dm.logger.Info("demuxer: processor output closed") - continue - } - oEvent, err := dm.handle(event) - if err != nil { - dm.terminate(err) - return - } - dm.input <- oEvent - } - } -} - -func (dm *demuxer) handle(event Event) (Event, error) { - received := dm.scheduler.trySend(event) - if !received { - return scFull{}, nil // backpressure - } - - received = dm.processor.trySend(event) - if !received { - return pcFull{}, nil // backpressure - } - - return noOp, nil -} - -func (dm *demuxer) trySend(event Event) bool { - if !dm.isRunning() || dm.isStopping() { - dm.logger.Info("dummuxer isn't running") - return false - } - select { - case dm.input <- event: - return true - default: - dm.logger.Info("demuxer channel was full") - return false - } -} - -func (dm *demuxer) isRunning() bool { - return atomic.LoadUint32(dm.running) == 1 -} - -func (dm *demuxer) isStopping() bool { - return atomic.LoadUint32(dm.stopping) == 1 -} - -func (dm *demuxer) ready() chan struct{} { - return dm.rdy -} - -func (dm *demuxer) stop() { - if !dm.isRunning() { - return - } - stopping := atomic.CompareAndSwapUint32(dm.stopping, uint32(0), uint32(1)) - if !stopping { - panic("Demuxer has already stopped") - } - dm.logger.Info("demuxer stop") - close(dm.input) - <-dm.stopped -} - -func (dm *demuxer) terminate(reason error) { - stopped := atomic.CompareAndSwapUint32(dm.running, uint32(1), uint32(0)) - if !stopped { - panic("called terminate but already terminated") - } - dm.fin <- reason -} - -func (dm *demuxer) final() chan error { - return dm.fin -} diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 64deb5ce8..cba3f5857 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -16,8 +16,6 @@ func schedulerHandle(event Event) (Event, error) { switch event.(type) { case timeCheck: fmt.Println("scheduler handle timeCheck") - case Event: - fmt.Println("scheduler handle testEvent") } return noOp, nil } @@ -26,71 +24,94 @@ func processorHandle(event Event) (Event, error) { switch event.(type) { case timeCheck: fmt.Println("processor handle timeCheck") - case Event: - fmt.Println("processor handle event") } return noOp, nil + } type Reactor struct { - demuxer *demuxer + events chan Event + stopDemux chan struct{} scheduler *Routine processor *Routine ticker *time.Ticker + logger log.Logger +} + +var bufferSize int = 10 + +func NewReactor() *Reactor { + return &Reactor{ + events: make(chan Event, bufferSize), + stopDemux: make(chan struct{}), + scheduler: newRoutine("scheduler", schedulerHandle), + processor: newRoutine("processor", processorHandle), + ticker: time.NewTicker(1 * time.Second), + logger: log.NewNopLogger(), + } } // nolint:unused func (r *Reactor) setLogger(logger log.Logger) { + r.logger = logger r.scheduler.setLogger(logger) r.processor.setLogger(logger) - r.demuxer.setLogger(logger) } func (r *Reactor) Start() { - r.scheduler = newRoutine("scheduler", schedulerHandle) - r.processor = newRoutine("processor", processorHandle) - r.demuxer = newDemuxer(r.scheduler, r.processor) - r.ticker = time.NewTicker(1 * time.Second) - go r.scheduler.start() go r.processor.start() - go r.demuxer.start() + go r.demux() <-r.scheduler.ready() <-r.processor.ready() - <-r.demuxer.ready() go func() { for t := range r.ticker.C { - r.demuxer.trySend(timeCheck{time: t}) + r.events <- timeCheck{time: t} } }() } -func (r *Reactor) Wait() { - fmt.Println("completed routines") - r.Stop() +func (r *Reactor) demux() { + for { + select { + case event := <-r.events: + + // XXX: check for backpressure + r.scheduler.trySend(event) + r.processor.trySend(event) + case _ = <-r.stopDemux: + r.logger.Info("demuxing stopped") + return + case event := <-r.scheduler.next(): + r.events <- event + case event := <-r.processor.next(): + r.events <- event + case err := <-r.scheduler.final(): + r.logger.Info(fmt.Sprintf("scheduler final %s", err)) + case err := <-r.processor.final(): + r.logger.Info(fmt.Sprintf("processor final %s", err)) + // XXX: switch to consensus + } + } } func (r *Reactor) Stop() { - fmt.Println("reactor stopping") + r.logger.Info("reactor stopping") r.ticker.Stop() - r.demuxer.stop() r.scheduler.stop() r.processor.stop() - // todo: accumulator - // todo: io + close(r.stopDemux) + close(r.events) - fmt.Println("reactor stopped") + r.logger.Info("reactor stopped") } func (r *Reactor) Receive(event Event) { - fmt.Println("receive event") - sent := r.demuxer.trySend(event) - if !sent { - fmt.Println("demuxer is full") - } + // XXX: decode and serialize write events + r.events <- event } func (r *Reactor) AddPeer() { diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index 2b3e2be1e..e14e618de 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -1,11 +1,15 @@ package v2 -import "testing" +import ( + "testing" + + "github.com/tendermint/tendermint/libs/log" +) -// XXX: This makes assumptions about the message routing func TestReactor(t *testing.T) { - reactor := Reactor{} + reactor := NewReactor() reactor.Start() + reactor.setLogger(log.TestingLogger()) script := []Event{ // TODO } @@ -13,5 +17,5 @@ func TestReactor(t *testing.T) { for _, event := range script { reactor.Receive(event) } - reactor.Wait() + reactor.Stop() } diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 9c5b97b18..04cd43c63 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -20,7 +20,7 @@ type handleFunc = func(event Event) (Event, error) // handles the concurrency and messaging guarantees. Events are sent via // `trySend` are handled by the `handle` function to produce an iterator // `next()`. Calling `close()` on a routine will conclude processing of all -// sent events and produce `last()` event representing the terminal state. +// sent events and produce `final()` event representing the terminal state. type Routine struct { name string handle handleFunc From fbede85e20c424a00f0c6bac3660a9b1e8f6b5c2 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Fri, 13 Sep 2019 18:36:02 -0400 Subject: [PATCH 16/22] changes based on feedback --- blockchain/v2/reactor.go | 7 ++++--- blockchain/v2/routine.go | 2 ++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index cba3f5857..16f473276 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -73,11 +73,12 @@ func (r *Reactor) Start() { }() } +// Would it be possible here to provide some kind of type safety for the types +// of events that each routine can produce and consume? func (r *Reactor) demux() { for { select { case event := <-r.events: - // XXX: check for backpressure r.scheduler.trySend(event) r.processor.trySend(event) @@ -85,9 +86,9 @@ func (r *Reactor) demux() { r.logger.Info("demuxing stopped") return case event := <-r.scheduler.next(): - r.events <- event + r.processor.trySend(event) case event := <-r.processor.next(): - r.events <- event + r.scheduler.trySend(event) case err := <-r.scheduler.final(): r.logger.Info(fmt.Sprintf("scheduler final %s", err)) case err := <-r.processor.final(): diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 04cd43c63..82015c618 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -92,6 +92,8 @@ func (rt *Routine) start() { } } +// XXX: rename send +// XXX: look into returning OpError in the net package func (rt *Routine) trySend(event Event) bool { rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event)) if !rt.isRunning() { From 9bd2c0389f5182d4a30ac06523d229639ff51f62 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Fri, 13 Sep 2019 18:54:25 -0400 Subject: [PATCH 17/22] rename trySend to end --- blockchain/v2/reactor.go | 8 ++++---- blockchain/v2/routine.go | 7 +++---- blockchain/v2/routine_test.go | 16 ++++++++-------- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 16f473276..26b2cb979 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -80,15 +80,15 @@ func (r *Reactor) demux() { select { case event := <-r.events: // XXX: check for backpressure - r.scheduler.trySend(event) - r.processor.trySend(event) + r.scheduler.send(event) + r.processor.send(event) case _ = <-r.stopDemux: r.logger.Info("demuxing stopped") return case event := <-r.scheduler.next(): - r.processor.trySend(event) + r.processor.send(event) case event := <-r.processor.next(): - r.scheduler.trySend(event) + r.scheduler.send(event) case err := <-r.scheduler.final(): r.logger.Info(fmt.Sprintf("scheduler final %s", err)) case err := <-r.processor.final(): diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 82015c618..231da703a 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -18,7 +18,7 @@ type handleFunc = func(event Event) (Event, error) // Routines are a structure which model a finite state machine as serialized // stream of events processed by a handle function. This Routine structure // handles the concurrency and messaging guarantees. Events are sent via -// `trySend` are handled by the `handle` function to produce an iterator +// `send` are handled by the `handle` function to produce an iterator // `next()`. Calling `close()` on a routine will conclude processing of all // sent events and produce `final()` event representing the terminal state. type Routine struct { @@ -92,9 +92,8 @@ func (rt *Routine) start() { } } -// XXX: rename send // XXX: look into returning OpError in the net package -func (rt *Routine) trySend(event Event) bool { +func (rt *Routine) send(event Event) bool { rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event)) if !rt.isRunning() { return false @@ -102,7 +101,7 @@ func (rt *Routine) trySend(event Event) bool { err := rt.queue.Put(event) if err != nil { rt.metrics.EventsShed.With("routine", rt.name).Add(1) - rt.logger.Info(fmt.Sprintf("%s: trySend fail, queue was full/stopped \n", rt.name)) + rt.logger.Info(fmt.Sprintf("%s: send failed, queue was full/stopped \n", rt.name)) return false } rt.metrics.EventsSent.With("routine", rt.name).Add(1) diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go index 6fa8bde32..e32394ee4 100644 --- a/blockchain/v2/routine_test.go +++ b/blockchain/v2/routine_test.go @@ -33,7 +33,7 @@ func TestRoutineFinal(t *testing.T) { assert.True(t, routine.isRunning(), "expected an started routine") - assert.True(t, routine.trySend(eventA{}), + assert.True(t, routine.send(eventA{}), "expected sending to a ready routine to succeed") assert.Equal(t, done, <-routine.final(), @@ -46,18 +46,18 @@ func TestRoutineFinal(t *testing.T) { func TestRoutineStop(t *testing.T) { routine := newRoutine("simpleRoutine", simpleHandler) - assert.False(t, routine.trySend(eventA{}), + assert.False(t, routine.send(eventA{}), "expected sending to an unstarted routine to fail") go routine.start() <-routine.ready() - assert.True(t, routine.trySend(eventA{}), + assert.True(t, routine.send(eventA{}), "expected sending to a running routine to succeed") routine.stop() - assert.False(t, routine.trySend(eventA{}), + assert.False(t, routine.send(eventA{}), "expected sending to a stopped routine to fail") } @@ -86,7 +86,7 @@ func genStatefulHandler(maxCount int) handleFunc { func feedback(r *Routine) { for event := range r.next() { - r.trySend(event) + r.send(event) } } @@ -100,7 +100,7 @@ func TestStatefulRoutine(t *testing.T) { go feedback(routine) <-routine.ready() - assert.True(t, routine.trySend(eventA{}), + assert.True(t, routine.send(eventA{}), "expected sending to a started routine to succeed") final := <-routine.final() @@ -137,7 +137,7 @@ func TestPriority(t *testing.T) { <-routine.ready() go func() { for { - routine.trySend(lowPriorityEvent{}) + routine.send(lowPriorityEvent{}) time.Sleep(1 * time.Millisecond) } }() @@ -145,7 +145,7 @@ func TestPriority(t *testing.T) { assert.True(t, routine.isRunning(), "expected an started routine") - assert.True(t, routine.trySend(highPriorityEvent{}), + assert.True(t, routine.send(highPriorityEvent{}), "expected send to succeed even when saturated") assert.Equal(t, done, <-routine.final()) From 822942a2e4ef5868a97b25ca12d0c60f6f45f3e5 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Sat, 14 Sep 2019 12:49:10 -0400 Subject: [PATCH 18/22] better debugging logging --- blockchain/v2/routine.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 231da703a..823485bf2 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -8,11 +8,6 @@ import ( "github.com/tendermint/tendermint/libs/log" ) -// TODO -// * revisit panic conditions -// * audit log levels -// * Convert routine to an interface with concrete implmentation - type handleFunc = func(event Event) (Event, error) // Routines are a structure which model a finite state machine as serialized @@ -79,14 +74,14 @@ func (rt *Routine) start() { rt.terminate(fmt.Errorf("stopped")) return } - oEvent, err := rt.handle(events[0]) + oEvent, err := rt.handle(events[0].(Event)) rt.metrics.EventsHandled.With("routine", rt.name).Add(1) if err != nil { rt.terminate(err) return } rt.metrics.EventsOut.With("routine", rt.name).Add(1) - rt.logger.Debug(fmt.Sprintf("%s produced %+v event\n", rt.name, oEvent)) + rt.logger.Debug(fmt.Sprintf("%s produced %T %+v\n", rt.name, oEvent, oEvent)) rt.out <- oEvent } @@ -94,7 +89,7 @@ func (rt *Routine) start() { // XXX: look into returning OpError in the net package func (rt *Routine) send(event Event) bool { - rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event)) + rt.logger.Debug(fmt.Sprintf("%s: received %T %+v", rt.name, event, event)) if !rt.isRunning() { return false } From 99b7a33f90f23ba1ae2b3375a205af555fc7410e Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Sat, 14 Sep 2019 13:01:19 -0400 Subject: [PATCH 19/22] align buffer sizes --- blockchain/v2/reactor.go | 6 +++--- blockchain/v2/reactor_test.go | 3 --- blockchain/v2/routine.go | 8 +++----- blockchain/v2/routine_test.go | 28 +++++++++++++++++++--------- 4 files changed, 25 insertions(+), 20 deletions(-) diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 26b2cb979..8cc8ac218 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -42,10 +42,10 @@ var bufferSize int = 10 func NewReactor() *Reactor { return &Reactor{ - events: make(chan Event, bufferSize), + events: make(chan Event, bufferSize*2), stopDemux: make(chan struct{}), - scheduler: newRoutine("scheduler", schedulerHandle), - processor: newRoutine("processor", processorHandle), + scheduler: newRoutine("scheduler", schedulerHandle, bufferSize), + processor: newRoutine("processor", processorHandle, bufferSize), ticker: time.NewTicker(1 * time.Second), logger: log.NewNopLogger(), } diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index e14e618de..86ac728a9 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -2,14 +2,11 @@ package v2 import ( "testing" - - "github.com/tendermint/tendermint/libs/log" ) func TestReactor(t *testing.T) { reactor := NewReactor() reactor.Start() - reactor.setLogger(log.TestingLogger()) script := []Event{ // TODO } diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 823485bf2..cc7e7ea0f 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -28,14 +28,12 @@ type Routine struct { metrics *Metrics } -var queueSize int = 10 - -func newRoutine(name string, handleFunc handleFunc) *Routine { +func newRoutine(name string, handleFunc handleFunc, bufferSize int) *Routine { return &Routine{ name: name, handle: handleFunc, - queue: queue.NewPriorityQueue(queueSize, true), - out: make(chan Event, queueSize), + queue: queue.NewPriorityQueue(bufferSize, true), + out: make(chan Event, bufferSize), rdy: make(chan struct{}, 1), fin: make(chan error, 1), running: new(uint32), diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go index e32394ee4..2bd5a1a30 100644 --- a/blockchain/v2/routine_test.go +++ b/blockchain/v2/routine_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/tendermint/tendermint/libs/log" ) type eventA struct { @@ -24,7 +23,10 @@ func simpleHandler(event Event) (Event, error) { } func TestRoutineFinal(t *testing.T) { - routine := newRoutine("simpleRoutine", simpleHandler) + var ( + bufferSize = 10 + routine = newRoutine("simpleRoutine", simpleHandler, bufferSize) + ) assert.False(t, routine.isRunning(), "expected an initialized routine to not be running") @@ -44,7 +46,10 @@ func TestRoutineFinal(t *testing.T) { } func TestRoutineStop(t *testing.T) { - routine := newRoutine("simpleRoutine", simpleHandler) + var ( + bufferSize = 10 + routine = newRoutine("simpleRoutine", simpleHandler, bufferSize) + ) assert.False(t, routine.send(eventA{}), "expected sending to an unstarted routine to fail") @@ -91,10 +96,12 @@ func feedback(r *Routine) { } func TestStatefulRoutine(t *testing.T) { - count := 10 - handler := genStatefulHandler(count) - routine := newRoutine("statefulRoutine", handler) - routine.setLogger(log.TestingLogger()) + var ( + count = 10 + handler = genStatefulHandler(count) + bufferSize = 20 + routine = newRoutine("statefulRoutine", handler, bufferSize) + ) go routine.start() go feedback(routine) @@ -131,8 +138,11 @@ func handleWithPriority(event Event) (Event, error) { } func TestPriority(t *testing.T) { - // XXX: align with buffer size - routine := newRoutine("priorityRoutine", handleWithPriority) + var ( + bufferSize = 20 + routine = newRoutine("priorityRoutine", handleWithPriority, bufferSize) + ) + go routine.start() <-routine.ready() go func() { From d3d034e57226aa9afb7df70041d81722b43eda34 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Tue, 17 Sep 2019 15:18:15 -0400 Subject: [PATCH 20/22] tidying --- blockchain/v2/reactor.go | 11 +++++------ blockchain/v2/reactor_test.go | 6 +++++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 8cc8ac218..f96b325b0 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -38,11 +38,9 @@ type Reactor struct { logger log.Logger } -var bufferSize int = 10 - -func NewReactor() *Reactor { +func NewReactor(bufferSize int) *Reactor { return &Reactor{ - events: make(chan Event, bufferSize*2), + events: make(chan Event, bufferSize), stopDemux: make(chan struct{}), scheduler: newRoutine("scheduler", schedulerHandle, bufferSize), processor: newRoutine("processor", processorHandle, bufferSize), @@ -73,7 +71,7 @@ func (r *Reactor) Start() { }() } -// Would it be possible here to provide some kind of type safety for the types +// XXX: Would it be possible here to provide some kind of type safety for the types // of events that each routine can produce and consume? func (r *Reactor) demux() { for { @@ -82,7 +80,7 @@ func (r *Reactor) demux() { // XXX: check for backpressure r.scheduler.send(event) r.processor.send(event) - case _ = <-r.stopDemux: + case <-r.stopDemux: r.logger.Info("demuxing stopped") return case event := <-r.scheduler.next(): @@ -112,6 +110,7 @@ func (r *Reactor) Stop() { func (r *Reactor) Receive(event Event) { // XXX: decode and serialize write events + // TODO: backpressure r.events <- event } diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index 86ac728a9..46a2e60c6 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -5,7 +5,11 @@ import ( ) func TestReactor(t *testing.T) { - reactor := NewReactor() + var ( + bufferSize = 10 + reactor = NewReactor(bufferSize) + ) + reactor.Start() script := []Event{ // TODO From 0cbf32de975f725aa97cca384a5c8a577cf9cace Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Wed, 18 Sep 2019 15:22:24 -0400 Subject: [PATCH 21/22] merge fix --- blockchain/v2/schedule.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/blockchain/v2/schedule.go b/blockchain/v2/schedule.go index 329557492..db3819b81 100644 --- a/blockchain/v2/schedule.go +++ b/blockchain/v2/schedule.go @@ -10,8 +10,6 @@ import ( "github.com/tendermint/tendermint/p2p" ) -type Event interface{} - type blockState int const ( From 2ae7a300b76e173ed530d534bc4936cac00fb9be Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Wed, 18 Sep 2019 15:59:51 -0400 Subject: [PATCH 22/22] merge artifact go build file --- go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/go.mod b/go.mod index ddb067c29..01a122fe7 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.12 require ( github.com/VividCortex/gohistogram v1.0.0 // indirect github.com/Workiva/go-datastructures v1.0.50 - github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect github.com/btcsuite/btcd v0.0.0-20190115013929-ed77733ec07d github.com/btcsuite/btcutil v0.0.0-20180706230648-ab6388e0c60a github.com/fortytw2/leaktest v1.3.0