+ Include an implementaiton of the routines specified in ADR-43 along with a demuxer and some dummy reactor codepull/3907/head
@ -0,0 +1,115 @@ | |||||
package v2 | |||||
import "fmt" | |||||
type demuxer struct { | |||||
eventbus chan Event | |||||
scheduler *Routine | |||||
processor *Routine | |||||
finished chan error | |||||
stopped chan struct{} | |||||
} | |||||
func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { | |||||
return &demuxer{ | |||||
eventbus: make(chan Event, 10), | |||||
scheduler: scheduler, | |||||
processor: processor, | |||||
stopped: make(chan struct{}, 1), | |||||
finished: make(chan error, 1), | |||||
} | |||||
} | |||||
// What should the termination clause be? | |||||
// Is any of the subroutines finishe, the demuxer finishes | |||||
func (dm *demuxer) run() { | |||||
fmt.Printf("demuxer: run\n") | |||||
for { | |||||
select { | |||||
case event, ok := <-dm.eventbus: | |||||
if !ok { | |||||
fmt.Printf("demuxer: stopping\n") | |||||
dm.stopped <- struct{}{} | |||||
return | |||||
} | |||||
oEvents, err := dm.handle(event) | |||||
if err != nil { | |||||
// TODO Termination time | |||||
return | |||||
} | |||||
for _, event := range oEvents { | |||||
dm.eventbus <- event | |||||
} | |||||
case event, ok := <-dm.scheduler.output: | |||||
if !ok { | |||||
fmt.Printf("demuxer: scheduler output closed\n") | |||||
continue | |||||
} | |||||
oEvents, err := dm.handle(event) | |||||
if err != nil { | |||||
// TODO tTermination time | |||||
return | |||||
} | |||||
for _, event := range oEvents { | |||||
dm.eventbus <- event | |||||
} | |||||
case event, ok := <-dm.processor.output: | |||||
if !ok { | |||||
fmt.Printf("demuxer: processor output closed\n") | |||||
continue | |||||
} | |||||
oEvents, err := dm.handle(event) | |||||
if err != nil { | |||||
// TODO tTermination time | |||||
return | |||||
} | |||||
for _, event := range oEvents { | |||||
dm.eventbus <- event | |||||
} | |||||
case err := <-dm.scheduler.finished: | |||||
dm.finished <- err | |||||
case err := <-dm.processor.finished: | |||||
dm.finished <- err | |||||
} | |||||
} | |||||
} | |||||
func (dm *demuxer) handle(event Event) (Events, error) { | |||||
received := dm.scheduler.send(event) | |||||
if !received { | |||||
return Events{scFull{}}, nil // backpressure | |||||
} | |||||
received = dm.processor.send(event) | |||||
if !received { | |||||
return Events{pcFull{}}, nil // backpressure | |||||
} | |||||
return Events{}, nil | |||||
} | |||||
func (dm *demuxer) send(event Event) bool { | |||||
fmt.Printf("demuxer send\n") | |||||
select { | |||||
case dm.eventbus <- event: | |||||
return true | |||||
default: | |||||
fmt.Printf("demuxer channel was full\n") | |||||
return false | |||||
} | |||||
} | |||||
func (dm *demuxer) stop() { | |||||
fmt.Printf("demuxer stop\n") | |||||
close(dm.eventbus) | |||||
<-dm.stopped | |||||
dm.terminate(fmt.Errorf("stopped")) | |||||
} | |||||
func (dm *demuxer) terminate(reason error) { | |||||
dm.finished <- reason | |||||
} | |||||
func (dm *demuxer) wait() error { | |||||
return <-dm.finished | |||||
} |
@ -0,0 +1,124 @@ | |||||
package v2 | |||||
import ( | |||||
"github.com/go-kit/kit/metrics" | |||||
"github.com/go-kit/kit/metrics/discard" | |||||
"github.com/go-kit/kit/metrics/prometheus" | |||||
stdprometheus "github.com/prometheus/client_golang/prometheus" | |||||
) | |||||
const ( | |||||
// MetricsSubsystem is a subsystem shared by all metrics exposed by this | |||||
// package. | |||||
MetricsSubsystem = "blockchain" | |||||
) | |||||
// Metrics contains metrics exposed by this package. | |||||
type Metrics struct { | |||||
// events_in | |||||
EventsIn metrics.Counter | |||||
// events_in | |||||
EventsHandled metrics.Counter | |||||
// events_out | |||||
EventsOut metrics.Counter | |||||
// errors_in | |||||
ErrorsIn metrics.Counter | |||||
// errors_handled | |||||
ErrorsHandled metrics.Counter | |||||
// errors_out | |||||
ErrorsOut metrics.Counter | |||||
// events_shed | |||||
EventsShed metrics.Counter | |||||
// events_sent | |||||
EventsSent metrics.Counter | |||||
// errors_sent | |||||
ErrorsSent metrics.Counter | |||||
// errors_shed | |||||
ErrorsShed metrics.Counter | |||||
} | |||||
// Can we burn in the routine name here? | |||||
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { | |||||
labels := []string{} | |||||
for i := 0; i < len(labelsAndValues); i += 2 { | |||||
labels = append(labels, labelsAndValues[i]) | |||||
} | |||||
return &Metrics{ | |||||
EventsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "events_in", | |||||
Help: "Events read from the channel.", | |||||
}, labels).With(labelsAndValues...), | |||||
EventsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "events_handled", | |||||
Help: "Events handled", | |||||
}, labels).With(labelsAndValues...), | |||||
EventsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "events_out", | |||||
Help: "Events output from routine.", | |||||
}, labels).With(labelsAndValues...), | |||||
ErrorsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "errors_in", | |||||
Help: "Errors read from the channel.", | |||||
}, labels).With(labelsAndValues...), | |||||
ErrorsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "errors_handled", | |||||
Help: "Errors handled.", | |||||
}, labels).With(labelsAndValues...), | |||||
ErrorsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "errors_out", | |||||
Help: "Errors output from routine.", | |||||
}, labels).With(labelsAndValues...), | |||||
ErrorsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "errors_sent", | |||||
Help: "Errors sent to routine.", | |||||
}, labels).With(labelsAndValues...), | |||||
ErrorsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "errors_shed", | |||||
Help: "Errors dropped from sending.", | |||||
}, labels).With(labelsAndValues...), | |||||
EventsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "events_sent", | |||||
Help: "Events sent to routine.", | |||||
}, labels).With(labelsAndValues...), | |||||
EventsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "events_shed", | |||||
Help: "Events dropped from sending.", | |||||
}, labels).With(labelsAndValues...), | |||||
} | |||||
} | |||||
// NopMetrics returns no-op Metrics. | |||||
func NopMetrics() *Metrics { | |||||
return &Metrics{ | |||||
EventsIn: discard.NewCounter(), | |||||
EventsHandled: discard.NewCounter(), | |||||
EventsOut: discard.NewCounter(), | |||||
ErrorsIn: discard.NewCounter(), | |||||
ErrorsHandled: discard.NewCounter(), | |||||
ErrorsOut: discard.NewCounter(), | |||||
EventsShed: discard.NewCounter(), | |||||
EventsSent: discard.NewCounter(), | |||||
ErrorsSent: discard.NewCounter(), | |||||
ErrorsShed: discard.NewCounter(), | |||||
} | |||||
} |
@ -0,0 +1,98 @@ | |||||
package v2 | |||||
import ( | |||||
"fmt" | |||||
"time" | |||||
) | |||||
func schedulerHandle(event Event) (Events, error) { | |||||
switch event.(type) { | |||||
case timeCheck: | |||||
fmt.Println("scheduler handle timeCheck") | |||||
case testEvent: | |||||
fmt.Println("scheduler handle testEvent") | |||||
return Events{scTestEvent{}}, nil | |||||
} | |||||
return Events{}, nil | |||||
} | |||||
func processorHandle(event Event) (Events, error) { | |||||
switch event.(type) { | |||||
case timeCheck: | |||||
fmt.Println("processor handle timeCheck") | |||||
case testEvent: | |||||
fmt.Println("processor handle testEvent") | |||||
case scTestEvent: | |||||
fmt.Println("processor handle scTestEvent") | |||||
return Events{}, fmt.Errorf("processor done") | |||||
} | |||||
return Events{}, nil | |||||
} | |||||
// reactor | |||||
type Reactor struct { | |||||
events chan Event | |||||
demuxer *demuxer | |||||
scheduler *Routine | |||||
processor *Routine | |||||
ticker *time.Ticker | |||||
tickerStopped chan struct{} | |||||
} | |||||
func (r *Reactor) Start() { | |||||
bufferSize := 10 | |||||
events := make(chan Event, bufferSize) | |||||
r.scheduler = newRoutine("scheduler", events, schedulerHandle) | |||||
r.processor = newRoutine("processor", events, processorHandle) | |||||
r.demuxer = newDemuxer(r.scheduler, r.processor) | |||||
r.tickerStopped = make(chan struct{}) | |||||
go r.scheduler.run() | |||||
go r.processor.run() | |||||
go r.demuxer.run() | |||||
go func() { | |||||
ticker := time.NewTicker(1 * time.Second) | |||||
for { | |||||
select { | |||||
case <-ticker.C: | |||||
// xxx: what if !sent? | |||||
r.demuxer.send(timeCheck{}) | |||||
case <-r.tickerStopped: | |||||
fmt.Println("ticker stopped") | |||||
return | |||||
} | |||||
} | |||||
}() | |||||
} | |||||
func (r *Reactor) Wait() { | |||||
fmt.Println("completed routines") | |||||
r.Stop() | |||||
} | |||||
func (r *Reactor) Stop() { | |||||
fmt.Println("reactor stopping") | |||||
r.tickerStopped <- struct{}{} | |||||
r.demuxer.stop() | |||||
r.scheduler.stop() | |||||
r.processor.stop() | |||||
// todo: accumulator | |||||
// todo: io | |||||
fmt.Println("reactor stopped") | |||||
} | |||||
func (r *Reactor) Receive(event Event) { | |||||
fmt.Println("receive event") | |||||
sent := r.demuxer.send(event) | |||||
if !sent { | |||||
panic("demuxer is full") | |||||
} | |||||
} | |||||
func (r *Reactor) AddPeer() { | |||||
// TODO: add peer event and send to demuxer | |||||
} |
@ -0,0 +1,17 @@ | |||||
package v2 | |||||
import "testing" | |||||
// XXX: This makes assumptions about the message routing | |||||
func TestReactor(t *testing.T) { | |||||
reactor := Reactor{} | |||||
reactor.Start() | |||||
script := Events{ | |||||
testEvent{}, | |||||
} | |||||
for _, event := range script { | |||||
reactor.Receive(event) | |||||
} | |||||
reactor.Wait() | |||||
} |
@ -0,0 +1,165 @@ | |||||
package v2 | |||||
import ( | |||||
"fmt" | |||||
"sync/atomic" | |||||
"github.com/tendermint/tendermint/libs/log" | |||||
) | |||||
// TODO | |||||
// * revisit panic conditions | |||||
// * audit log levels | |||||
// * maybe routine should be an interface and the concret tpe should be handlerRoutine | |||||
// Adding Metrics | |||||
// we need a metrics definition | |||||
type handleFunc = func(event Event) (Events, error) | |||||
type Routine struct { | |||||
name string | |||||
input chan Event | |||||
errors chan error | |||||
output chan Event | |||||
stopped chan struct{} | |||||
finished chan error | |||||
running *uint32 | |||||
handle handleFunc | |||||
logger log.Logger | |||||
metrics *Metrics | |||||
} | |||||
func newRoutine(name string, output chan Event, handleFunc handleFunc) *Routine { | |||||
return &Routine{ | |||||
name: name, | |||||
input: make(chan Event, 1), | |||||
handle: handleFunc, | |||||
errors: make(chan error, 1), | |||||
output: output, | |||||
stopped: make(chan struct{}, 1), | |||||
finished: make(chan error, 1), | |||||
running: new(uint32), | |||||
logger: log.NewNopLogger(), | |||||
metrics: NopMetrics(), | |||||
} | |||||
} | |||||
func (rt *Routine) setLogger(logger log.Logger) { | |||||
rt.logger = logger | |||||
} | |||||
func (rt *Routine) setMetrics(metrics *Metrics) { | |||||
rt.metrics = metrics | |||||
} | |||||
func (rt *Routine) run() { | |||||
rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name)) | |||||
starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) | |||||
if !starting { | |||||
panic("Routine has already started") | |||||
} | |||||
errorsDrained := false | |||||
for { | |||||
if !rt.isRunning() { | |||||
break | |||||
} | |||||
select { | |||||
case iEvent, ok := <-rt.input: | |||||
rt.metrics.EventsIn.With("routine", rt.name).Add(1) | |||||
if !ok { | |||||
if !errorsDrained { | |||||
continue // wait for errors to be drainned | |||||
} | |||||
rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) | |||||
rt.stopped <- struct{}{} | |||||
return | |||||
} | |||||
oEvents, err := rt.handle(iEvent) | |||||
rt.metrics.EventsHandled.With("routine", rt.name).Add(1) | |||||
if err != nil { | |||||
rt.terminate(err) | |||||
return | |||||
} | |||||
rt.metrics.EventsOut.With("routine", rt.name).Add(float64(len(oEvents))) | |||||
rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents))) | |||||
for _, event := range oEvents { | |||||
rt.logger.Info(fmt.Sprintln("writting back to output")) | |||||
rt.output <- event | |||||
} | |||||
case iEvent, ok := <-rt.errors: | |||||
rt.metrics.ErrorsIn.With("routine", rt.name).Add(1) | |||||
if !ok { | |||||
rt.logger.Info(fmt.Sprintf("%s: errors closed\n", rt.name)) | |||||
errorsDrained = true | |||||
continue | |||||
} | |||||
oEvents, err := rt.handle(iEvent) | |||||
rt.metrics.ErrorsHandled.With("routine", rt.name).Add(1) | |||||
if err != nil { | |||||
rt.terminate(err) | |||||
return | |||||
} | |||||
rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents))) | |||||
for _, event := range oEvents { | |||||
rt.output <- event | |||||
} | |||||
} | |||||
} | |||||
} | |||||
func (rt *Routine) feedback() { | |||||
for event := range rt.output { | |||||
rt.send(event) | |||||
} | |||||
} | |||||
func (rt *Routine) send(event Event) bool { | |||||
if err, ok := event.(error); ok { | |||||
select { | |||||
case rt.errors <- err: | |||||
rt.metrics.ErrorsSent.With("routine", rt.name).Add(1) | |||||
return true | |||||
default: | |||||
rt.metrics.ErrorsShed.With("routine", rt.name).Add(1) | |||||
rt.logger.Info(fmt.Sprintf("%s: errors channel was full\n", rt.name)) | |||||
return false | |||||
} | |||||
} else { | |||||
select { | |||||
case rt.input <- event: | |||||
rt.metrics.EventsSent.With("routine", rt.name).Add(1) | |||||
return true | |||||
default: | |||||
rt.metrics.EventsShed.With("routine", rt.name).Add(1) | |||||
rt.logger.Info(fmt.Sprintf("%s: channel was full\n", rt.name)) | |||||
return false | |||||
} | |||||
} | |||||
} | |||||
func (rt *Routine) isRunning() bool { | |||||
return atomic.LoadUint32(rt.running) == 1 | |||||
} | |||||
// rename flush? | |||||
func (rt *Routine) stop() { | |||||
// XXX: what if already stopped? | |||||
rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name)) | |||||
close(rt.input) | |||||
close(rt.errors) | |||||
<-rt.stopped | |||||
rt.terminate(fmt.Errorf("routine stopped")) | |||||
} | |||||
func (rt *Routine) terminate(reason error) { | |||||
stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0)) | |||||
if !stopped { | |||||
panic("called stop but already stopped") | |||||
} | |||||
rt.finished <- reason | |||||
} | |||||
// XXX: this should probably produced the finished | |||||
// channel and let the caller deicde how long to wait | |||||
func (rt *Routine) wait() error { | |||||
return <-rt.finished | |||||
} |
@ -0,0 +1,89 @@ | |||||
package v2 | |||||
import ( | |||||
"fmt" | |||||
"testing" | |||||
"time" | |||||
) | |||||
type eventA struct{} | |||||
type eventB struct{} | |||||
var done = fmt.Errorf("done") | |||||
func simpleHandler(event Event) (Events, error) { | |||||
switch event.(type) { | |||||
case eventA: | |||||
return Events{eventB{}}, nil | |||||
case eventB: | |||||
return Events{routineFinished{}}, done | |||||
} | |||||
return Events{}, nil | |||||
} | |||||
func TestRoutine(t *testing.T) { | |||||
events := make(chan Event, 10) | |||||
routine := newRoutine("simpleRoutine", events, simpleHandler) | |||||
go routine.run() | |||||
go routine.feedback() | |||||
routine.send(eventA{}) | |||||
routine.wait() | |||||
} | |||||
func genStatefulHandler(maxCount int) handleFunc { | |||||
counter := 0 | |||||
return func(event Event) (Events, error) { | |||||
switch event.(type) { | |||||
case eventA: | |||||
counter += 1 | |||||
if counter >= maxCount { | |||||
return Events{}, done | |||||
} | |||||
return Events{eventA{}}, nil | |||||
} | |||||
return Events{}, nil | |||||
} | |||||
} | |||||
func TestStatefulRoutine(t *testing.T) { | |||||
events := make(chan Event, 10) | |||||
handler := genStatefulHandler(10) | |||||
routine := newRoutine("statefulRoutine", events, handler) | |||||
go routine.run() | |||||
go routine.feedback() | |||||
go routine.send(eventA{}) | |||||
routine.wait() | |||||
} | |||||
func handleWithErrors(event Event) (Events, error) { | |||||
switch event.(type) { | |||||
case eventA: | |||||
return Events{}, nil | |||||
case errEvent: | |||||
return Events{}, done | |||||
} | |||||
return Events{}, nil | |||||
} | |||||
func TestErrorSaturation(t *testing.T) { | |||||
events := make(chan Event, 10) | |||||
routine := newRoutine("errorRoutine", events, handleWithErrors) | |||||
go routine.run() | |||||
go func() { | |||||
for { | |||||
routine.send(eventA{}) | |||||
time.Sleep(10 * time.Millisecond) | |||||
} | |||||
}() | |||||
routine.send(errEvent{}) | |||||
routine.wait() | |||||
} |
@ -0,0 +1,32 @@ | |||||
package v2 | |||||
import "time" | |||||
type testEvent struct { | |||||
msg string | |||||
time time.Time | |||||
} | |||||
type testEventTwo struct { | |||||
msg string | |||||
} | |||||
type stopEvent struct{} | |||||
type timeCheck struct { | |||||
time time.Time | |||||
} | |||||
type errEvent struct{} | |||||
type scTestEvent struct{} | |||||
type pcFinished struct{} | |||||
type routineFinished struct{} | |||||
func (rf *routineFinished) Error() string { | |||||
return "routine finished" | |||||
} | |||||
type scFull struct{} | |||||
type pcFull struct{} |