From 0cf9f8629200584a3747a8bcdff6ee9db7de6094 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Tue, 6 Aug 2019 13:27:15 +0200 Subject: [PATCH] Modification based on feedback + `routine.send` returns false when routine is not running + this will prevent panics sending to channels which have been closed + Make output channels routine specific removing the risk of someone writting to a channel which was closed by another touine. + consistency changes between the routines and the demuxer --- blockchain/v2/demuxer.go | 70 ++++++++++++++++++++++++----------- blockchain/v2/reactor.go | 26 +++++++++---- blockchain/v2/routine.go | 33 +++++++++++------ blockchain/v2/routine_test.go | 65 +++++++++++++++++++++++++++----- blockchain/v2/types.go | 3 ++ 5 files changed, 147 insertions(+), 50 deletions(-) diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go index 511bf7f8b..32165735e 100644 --- a/blockchain/v2/demuxer.go +++ b/blockchain/v2/demuxer.go @@ -1,75 +1,88 @@ package v2 -import "fmt" +import ( + "fmt" + "sync/atomic" +) type demuxer struct { - eventbus chan Event + input chan Event scheduler *Routine processor *Routine finished chan error stopped chan struct{} + running *uint32 } +// TODO +// demuxer_test +// Termination process +// Logger +// Metrics +// Adhere to interface func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { return &demuxer{ - eventbus: make(chan Event, 10), + input: make(chan Event, 10), scheduler: scheduler, processor: processor, stopped: make(chan struct{}, 1), finished: make(chan error, 1), + running: new(uint32), } } -// What should the termination clause be? -// Is any of the subroutines finishe, the demuxer finishes func (dm *demuxer) run() { + starting := atomic.CompareAndSwapUint32(dm.running, uint32(0), uint32(1)) + if !starting { + panic("Routine has already started") + } fmt.Printf("demuxer: run\n") for { + if !dm.isRunning() { + break + } select { - case event, ok := <-dm.eventbus: + case event, ok := <-dm.input: if !ok { fmt.Printf("demuxer: stopping\n") + dm.terminate(fmt.Errorf("stopped")) dm.stopped <- struct{}{} return } oEvents, err := dm.handle(event) if err != nil { - // TODO Termination time + dm.terminate(err) return } for _, event := range oEvents { - dm.eventbus <- event + dm.input <- event } - case event, ok := <-dm.scheduler.output: + case event, ok := <-dm.scheduler.output(): if !ok { fmt.Printf("demuxer: scheduler output closed\n") continue } oEvents, err := dm.handle(event) if err != nil { - // TODO tTermination time + dm.terminate(err) return } for _, event := range oEvents { - dm.eventbus <- event + dm.input <- event } - case event, ok := <-dm.processor.output: + case event, ok := <-dm.processor.output(): if !ok { fmt.Printf("demuxer: processor output closed\n") continue } oEvents, err := dm.handle(event) if err != nil { - // TODO tTermination time + dm.terminate(err) return } for _, event := range oEvents { - dm.eventbus <- event + dm.input <- event } - case err := <-dm.scheduler.finished: - dm.finished <- err - case err := <-dm.processor.finished: - dm.finished <- err } } } @@ -89,9 +102,12 @@ func (dm *demuxer) handle(event Event) (Events, error) { } func (dm *demuxer) send(event Event) bool { - fmt.Printf("demuxer send\n") + if !dm.isRunning() { + fmt.Println("dummuxer isn't running") + return false + } select { - case dm.eventbus <- event: + case dm.input <- event: return true default: fmt.Printf("demuxer channel was full\n") @@ -99,14 +115,24 @@ func (dm *demuxer) send(event Event) bool { } } +func (dm *demuxer) isRunning() bool { + return atomic.LoadUint32(dm.running) == 1 +} + func (dm *demuxer) stop() { + if !dm.isRunning() { + return + } fmt.Printf("demuxer stop\n") - close(dm.eventbus) + close(dm.input) <-dm.stopped - dm.terminate(fmt.Errorf("stopped")) } func (dm *demuxer) terminate(reason error) { + stopped := atomic.CompareAndSwapUint32(dm.running, uint32(1), uint32(0)) + if !stopped { + panic("called terminate but already terminated") + } dm.finished <- reason } diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index ffdf92911..4f07224ca 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -3,6 +3,8 @@ package v2 import ( "fmt" "time" + + "github.com/tendermint/tendermint/libs/log" ) func schedulerHandle(event Event) (Events, error) { @@ -31,7 +33,6 @@ func processorHandle(event Event) (Events, error) { // reactor type Reactor struct { - events chan Event demuxer *demuxer scheduler *Routine processor *Routine @@ -40,11 +41,14 @@ type Reactor struct { } func (r *Reactor) Start() { - bufferSize := 10 - events := make(chan Event, bufferSize) + logger := log.TestingLogger() - r.scheduler = newRoutine("scheduler", events, schedulerHandle) - r.processor = newRoutine("processor", events, processorHandle) + // what is the best way to get the events out of the routine + r.scheduler = newRoutine("scheduler", schedulerHandle) + r.scheduler.setLogger(logger) + r.processor = newRoutine("processor", processorHandle) + r.processor.setLogger(logger) + // so actually the demuxer only needs to read from events r.demuxer = newDemuxer(r.scheduler, r.processor) r.tickerStopped = make(chan struct{}) @@ -52,12 +56,20 @@ func (r *Reactor) Start() { go r.processor.run() go r.demuxer.run() + for { + if r.scheduler.isRunning() && r.processor.isRunning() && r.demuxer.isRunning() { + fmt.Println("routines running") + break + } + fmt.Println("waiting") + time.Sleep(1 * time.Second) + } + go func() { ticker := time.NewTicker(1 * time.Second) for { select { case <-ticker.C: - // xxx: what if !sent? r.demuxer.send(timeCheck{}) case <-r.tickerStopped: fmt.Println("ticker stopped") @@ -89,7 +101,7 @@ func (r *Reactor) Receive(event Event) { fmt.Println("receive event") sent := r.demuxer.send(event) if !sent { - panic("demuxer is full") + fmt.Println("demuxer is full") } } diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 31b918925..fd923966e 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -10,17 +10,16 @@ import ( // TODO // * revisit panic conditions // * audit log levels -// * maybe routine should be an interface and the concret tpe should be handlerRoutine +// * Convert routine to an interface with concrete implmentation +// * determine the public interface -// Adding Metrics -// we need a metrics definition type handleFunc = func(event Event) (Events, error) type Routine struct { name string input chan Event errors chan error - output chan Event + out chan Event stopped chan struct{} finished chan error running *uint32 @@ -29,13 +28,13 @@ type Routine struct { metrics *Metrics } -func newRoutine(name string, output chan Event, handleFunc handleFunc) *Routine { +func newRoutine(name string, handleFunc handleFunc) *Routine { return &Routine{ name: name, input: make(chan Event, 1), handle: handleFunc, errors: make(chan error, 1), - output: output, + out: make(chan Event, 1), stopped: make(chan struct{}, 1), finished: make(chan error, 1), running: new(uint32), @@ -72,6 +71,7 @@ func (rt *Routine) run() { } rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) rt.stopped <- struct{}{} + rt.terminate(fmt.Errorf("stopped")) return } oEvents, err := rt.handle(iEvent) @@ -84,7 +84,7 @@ func (rt *Routine) run() { rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents))) for _, event := range oEvents { rt.logger.Info(fmt.Sprintln("writting back to output")) - rt.output <- event + rt.out <- event } case iEvent, ok := <-rt.errors: rt.metrics.ErrorsIn.With("routine", rt.name).Add(1) @@ -101,18 +101,23 @@ func (rt *Routine) run() { } rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents))) for _, event := range oEvents { - rt.output <- event + rt.out <- event } } } } func (rt *Routine) feedback() { - for event := range rt.output { + for event := range rt.out { rt.send(event) } } +// XXX: this should be called trySend for consistency func (rt *Routine) send(event Event) bool { + if !rt.isRunning() { + return false + } + rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event)) if err, ok := event.(error); ok { select { case rt.errors <- err: @@ -140,14 +145,18 @@ func (rt *Routine) isRunning() bool { return atomic.LoadUint32(rt.running) == 1 } -// rename flush? +func (rt *Routine) output() chan Event { + return rt.out +} + func (rt *Routine) stop() { - // XXX: what if already stopped? + if !rt.isRunning() { + return + } rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name)) close(rt.input) close(rt.errors) <-rt.stopped - rt.terminate(fmt.Errorf("routine stopped")) } func (rt *Routine) terminate(reason error) { diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go index 98fe716f0..8a84dc4de 100644 --- a/blockchain/v2/routine_test.go +++ b/blockchain/v2/routine_test.go @@ -4,6 +4,8 @@ import ( "fmt" "testing" "time" + + "github.com/stretchr/testify/assert" ) type eventA struct{} @@ -22,15 +24,47 @@ func simpleHandler(event Event) (Events, error) { } func TestRoutine(t *testing.T) { - events := make(chan Event, 10) - routine := newRoutine("simpleRoutine", events, simpleHandler) + routine := newRoutine("simpleRoutine", simpleHandler) + assert.False(t, routine.isRunning(), + "expected an initialized routine to not be running") go routine.run() go routine.feedback() + for { + if routine.isRunning() { + break + } + time.Sleep(10 * time.Millisecond) + } routine.send(eventA{}) - routine.wait() + routine.stop() +} + +func TesRoutineSend(t *testing.T) { + routine := newRoutine("simpleRoutine", simpleHandler) + + assert.False(t, routine.send(eventA{}), + "expected sending to an unstarted routine to fail") + + go routine.run() + + go routine.feedback() + for { + if routine.isRunning() { + break + } + time.Sleep(10 * time.Millisecond) + } + + assert.True(t, routine.send(eventA{}), + "expected sending to a running routine to succeed") + + routine.stop() + + assert.False(t, routine.send(eventA{}), + "expected sending to a stopped routine to fail") } func genStatefulHandler(maxCount int) handleFunc { @@ -50,16 +84,22 @@ func genStatefulHandler(maxCount int) handleFunc { } func TestStatefulRoutine(t *testing.T) { - events := make(chan Event, 10) handler := genStatefulHandler(10) - routine := newRoutine("statefulRoutine", events, handler) + routine := newRoutine("statefulRoutine", handler) go routine.run() go routine.feedback() + for { + if routine.isRunning() { + break + } + time.Sleep(10 * time.Millisecond) + } + go routine.send(eventA{}) - routine.wait() + routine.stop() } func handleWithErrors(event Event) (Events, error) { @@ -73,8 +113,7 @@ func handleWithErrors(event Event) (Events, error) { } func TestErrorSaturation(t *testing.T) { - events := make(chan Event, 10) - routine := newRoutine("errorRoutine", events, handleWithErrors) + routine := newRoutine("errorRoutine", handleWithErrors) go routine.run() go func() { @@ -83,7 +122,15 @@ func TestErrorSaturation(t *testing.T) { time.Sleep(10 * time.Millisecond) } }() - routine.send(errEvent{}) + + for { + if routine.isRunning() { + break + } + time.Sleep(10 * time.Millisecond) + } + assert.True(t, routine.send(errEvent{}), + "expected send to succeed even when saturated") routine.wait() } diff --git a/blockchain/v2/types.go b/blockchain/v2/types.go index c9e31b209..e6f491460 100644 --- a/blockchain/v2/types.go +++ b/blockchain/v2/types.go @@ -2,6 +2,9 @@ package v2 import "time" +type Event interface{} + +type Events []Event type testEvent struct { msg string time time.Time