[blockchain] v2 Routinespull/3998/head
@ -0,0 +1,124 @@ | |||
package v2 | |||
import ( | |||
"github.com/go-kit/kit/metrics" | |||
"github.com/go-kit/kit/metrics/discard" | |||
"github.com/go-kit/kit/metrics/prometheus" | |||
stdprometheus "github.com/prometheus/client_golang/prometheus" | |||
) | |||
const ( | |||
// MetricsSubsystem is a subsystem shared by all metrics exposed by this | |||
// package. | |||
MetricsSubsystem = "blockchain" | |||
) | |||
// Metrics contains metrics exposed by this package. | |||
type Metrics struct { | |||
// events_in | |||
EventsIn metrics.Counter | |||
// events_in | |||
EventsHandled metrics.Counter | |||
// events_out | |||
EventsOut metrics.Counter | |||
// errors_in | |||
ErrorsIn metrics.Counter | |||
// errors_handled | |||
ErrorsHandled metrics.Counter | |||
// errors_out | |||
ErrorsOut metrics.Counter | |||
// events_shed | |||
EventsShed metrics.Counter | |||
// events_sent | |||
EventsSent metrics.Counter | |||
// errors_sent | |||
ErrorsSent metrics.Counter | |||
// errors_shed | |||
ErrorsShed metrics.Counter | |||
} | |||
// Can we burn in the routine name here? | |||
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { | |||
labels := []string{} | |||
for i := 0; i < len(labelsAndValues); i += 2 { | |||
labels = append(labels, labelsAndValues[i]) | |||
} | |||
return &Metrics{ | |||
EventsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "events_in", | |||
Help: "Events read from the channel.", | |||
}, labels).With(labelsAndValues...), | |||
EventsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "events_handled", | |||
Help: "Events handled", | |||
}, labels).With(labelsAndValues...), | |||
EventsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "events_out", | |||
Help: "Events output from routine.", | |||
}, labels).With(labelsAndValues...), | |||
ErrorsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "errors_in", | |||
Help: "Errors read from the channel.", | |||
}, labels).With(labelsAndValues...), | |||
ErrorsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "errors_handled", | |||
Help: "Errors handled.", | |||
}, labels).With(labelsAndValues...), | |||
ErrorsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "errors_out", | |||
Help: "Errors output from routine.", | |||
}, labels).With(labelsAndValues...), | |||
ErrorsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "errors_sent", | |||
Help: "Errors sent to routine.", | |||
}, labels).With(labelsAndValues...), | |||
ErrorsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "errors_shed", | |||
Help: "Errors dropped from sending.", | |||
}, labels).With(labelsAndValues...), | |||
EventsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "events_sent", | |||
Help: "Events sent to routine.", | |||
}, labels).With(labelsAndValues...), | |||
EventsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "events_shed", | |||
Help: "Events dropped from sending.", | |||
}, labels).With(labelsAndValues...), | |||
} | |||
} | |||
// NopMetrics returns no-op Metrics. | |||
func NopMetrics() *Metrics { | |||
return &Metrics{ | |||
EventsIn: discard.NewCounter(), | |||
EventsHandled: discard.NewCounter(), | |||
EventsOut: discard.NewCounter(), | |||
ErrorsIn: discard.NewCounter(), | |||
ErrorsHandled: discard.NewCounter(), | |||
ErrorsOut: discard.NewCounter(), | |||
EventsShed: discard.NewCounter(), | |||
EventsSent: discard.NewCounter(), | |||
ErrorsSent: discard.NewCounter(), | |||
ErrorsShed: discard.NewCounter(), | |||
} | |||
} |
@ -0,0 +1,119 @@ | |||
package v2 | |||
import ( | |||
"fmt" | |||
"time" | |||
"github.com/tendermint/tendermint/libs/log" | |||
) | |||
type timeCheck struct { | |||
priorityHigh | |||
time time.Time | |||
} | |||
func schedulerHandle(event Event) (Event, error) { | |||
switch event.(type) { | |||
case timeCheck: | |||
fmt.Println("scheduler handle timeCheck") | |||
} | |||
return noOp, nil | |||
} | |||
func processorHandle(event Event) (Event, error) { | |||
switch event.(type) { | |||
case timeCheck: | |||
fmt.Println("processor handle timeCheck") | |||
} | |||
return noOp, nil | |||
} | |||
type Reactor struct { | |||
events chan Event | |||
stopDemux chan struct{} | |||
scheduler *Routine | |||
processor *Routine | |||
ticker *time.Ticker | |||
logger log.Logger | |||
} | |||
func NewReactor(bufferSize int) *Reactor { | |||
return &Reactor{ | |||
events: make(chan Event, bufferSize), | |||
stopDemux: make(chan struct{}), | |||
scheduler: newRoutine("scheduler", schedulerHandle, bufferSize), | |||
processor: newRoutine("processor", processorHandle, bufferSize), | |||
ticker: time.NewTicker(1 * time.Second), | |||
logger: log.NewNopLogger(), | |||
} | |||
} | |||
// nolint:unused | |||
func (r *Reactor) setLogger(logger log.Logger) { | |||
r.logger = logger | |||
r.scheduler.setLogger(logger) | |||
r.processor.setLogger(logger) | |||
} | |||
func (r *Reactor) Start() { | |||
go r.scheduler.start() | |||
go r.processor.start() | |||
go r.demux() | |||
<-r.scheduler.ready() | |||
<-r.processor.ready() | |||
go func() { | |||
for t := range r.ticker.C { | |||
r.events <- timeCheck{time: t} | |||
} | |||
}() | |||
} | |||
// XXX: Would it be possible here to provide some kind of type safety for the types | |||
// of events that each routine can produce and consume? | |||
func (r *Reactor) demux() { | |||
for { | |||
select { | |||
case event := <-r.events: | |||
// XXX: check for backpressure | |||
r.scheduler.send(event) | |||
r.processor.send(event) | |||
case <-r.stopDemux: | |||
r.logger.Info("demuxing stopped") | |||
return | |||
case event := <-r.scheduler.next(): | |||
r.processor.send(event) | |||
case event := <-r.processor.next(): | |||
r.scheduler.send(event) | |||
case err := <-r.scheduler.final(): | |||
r.logger.Info(fmt.Sprintf("scheduler final %s", err)) | |||
case err := <-r.processor.final(): | |||
r.logger.Info(fmt.Sprintf("processor final %s", err)) | |||
// XXX: switch to consensus | |||
} | |||
} | |||
} | |||
func (r *Reactor) Stop() { | |||
r.logger.Info("reactor stopping") | |||
r.ticker.Stop() | |||
r.scheduler.stop() | |||
r.processor.stop() | |||
close(r.stopDemux) | |||
close(r.events) | |||
r.logger.Info("reactor stopped") | |||
} | |||
func (r *Reactor) Receive(event Event) { | |||
// XXX: decode and serialize write events | |||
// TODO: backpressure | |||
r.events <- event | |||
} | |||
func (r *Reactor) AddPeer() { | |||
// TODO: add peer event and send to demuxer | |||
} |
@ -0,0 +1,22 @@ | |||
package v2 | |||
import ( | |||
"testing" | |||
) | |||
func TestReactor(t *testing.T) { | |||
var ( | |||
bufferSize = 10 | |||
reactor = NewReactor(bufferSize) | |||
) | |||
reactor.Start() | |||
script := []Event{ | |||
// TODO | |||
} | |||
for _, event := range script { | |||
reactor.Receive(event) | |||
} | |||
reactor.Stop() | |||
} |
@ -0,0 +1,133 @@ | |||
package v2 | |||
import ( | |||
"fmt" | |||
"sync/atomic" | |||
"github.com/Workiva/go-datastructures/queue" | |||
"github.com/tendermint/tendermint/libs/log" | |||
) | |||
type handleFunc = func(event Event) (Event, error) | |||
// Routines are a structure which model a finite state machine as serialized | |||
// stream of events processed by a handle function. This Routine structure | |||
// handles the concurrency and messaging guarantees. Events are sent via | |||
// `send` are handled by the `handle` function to produce an iterator | |||
// `next()`. Calling `close()` on a routine will conclude processing of all | |||
// sent events and produce `final()` event representing the terminal state. | |||
type Routine struct { | |||
name string | |||
handle handleFunc | |||
queue *queue.PriorityQueue | |||
out chan Event | |||
fin chan error | |||
rdy chan struct{} | |||
running *uint32 | |||
logger log.Logger | |||
metrics *Metrics | |||
} | |||
func newRoutine(name string, handleFunc handleFunc, bufferSize int) *Routine { | |||
return &Routine{ | |||
name: name, | |||
handle: handleFunc, | |||
queue: queue.NewPriorityQueue(bufferSize, true), | |||
out: make(chan Event, bufferSize), | |||
rdy: make(chan struct{}, 1), | |||
fin: make(chan error, 1), | |||
running: new(uint32), | |||
logger: log.NewNopLogger(), | |||
metrics: NopMetrics(), | |||
} | |||
} | |||
func (rt *Routine) setLogger(logger log.Logger) { | |||
rt.logger = logger | |||
} | |||
// nolint:unused | |||
func (rt *Routine) setMetrics(metrics *Metrics) { | |||
rt.metrics = metrics | |||
} | |||
func (rt *Routine) start() { | |||
rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name)) | |||
running := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) | |||
if !running { | |||
panic(fmt.Sprintf("%s is already running", rt.name)) | |||
} | |||
close(rt.rdy) | |||
defer func() { | |||
stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0)) | |||
if !stopped { | |||
panic(fmt.Sprintf("%s is failed to stop", rt.name)) | |||
} | |||
}() | |||
for { | |||
events, err := rt.queue.Get(1) | |||
if err != nil { | |||
rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) | |||
rt.terminate(fmt.Errorf("stopped")) | |||
return | |||
} | |||
oEvent, err := rt.handle(events[0].(Event)) | |||
rt.metrics.EventsHandled.With("routine", rt.name).Add(1) | |||
if err != nil { | |||
rt.terminate(err) | |||
return | |||
} | |||
rt.metrics.EventsOut.With("routine", rt.name).Add(1) | |||
rt.logger.Debug(fmt.Sprintf("%s produced %T %+v\n", rt.name, oEvent, oEvent)) | |||
rt.out <- oEvent | |||
} | |||
} | |||
// XXX: look into returning OpError in the net package | |||
func (rt *Routine) send(event Event) bool { | |||
rt.logger.Debug(fmt.Sprintf("%s: received %T %+v", rt.name, event, event)) | |||
if !rt.isRunning() { | |||
return false | |||
} | |||
err := rt.queue.Put(event) | |||
if err != nil { | |||
rt.metrics.EventsShed.With("routine", rt.name).Add(1) | |||
rt.logger.Info(fmt.Sprintf("%s: send failed, queue was full/stopped \n", rt.name)) | |||
return false | |||
} | |||
rt.metrics.EventsSent.With("routine", rt.name).Add(1) | |||
return true | |||
} | |||
func (rt *Routine) isRunning() bool { | |||
return atomic.LoadUint32(rt.running) == 1 | |||
} | |||
func (rt *Routine) next() chan Event { | |||
return rt.out | |||
} | |||
func (rt *Routine) ready() chan struct{} { | |||
return rt.rdy | |||
} | |||
func (rt *Routine) stop() { | |||
if !rt.isRunning() { | |||
return | |||
} | |||
rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name)) | |||
rt.queue.Dispose() // this should block until all queue items are free? | |||
} | |||
func (rt *Routine) final() chan error { | |||
return rt.fin | |||
} | |||
// XXX: Maybe get rid of this | |||
func (rt *Routine) terminate(reason error) { | |||
close(rt.out) | |||
rt.fin <- reason | |||
} |
@ -0,0 +1,164 @@ | |||
package v2 | |||
import ( | |||
"fmt" | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/assert" | |||
) | |||
type eventA struct { | |||
priorityNormal | |||
} | |||
var done = fmt.Errorf("done") | |||
func simpleHandler(event Event) (Event, error) { | |||
switch event.(type) { | |||
case eventA: | |||
return noOp, done | |||
} | |||
return noOp, nil | |||
} | |||
func TestRoutineFinal(t *testing.T) { | |||
var ( | |||
bufferSize = 10 | |||
routine = newRoutine("simpleRoutine", simpleHandler, bufferSize) | |||
) | |||
assert.False(t, routine.isRunning(), | |||
"expected an initialized routine to not be running") | |||
go routine.start() | |||
<-routine.ready() | |||
assert.True(t, routine.isRunning(), | |||
"expected an started routine") | |||
assert.True(t, routine.send(eventA{}), | |||
"expected sending to a ready routine to succeed") | |||
assert.Equal(t, done, <-routine.final(), | |||
"expected the final event to be done") | |||
assert.False(t, routine.isRunning(), | |||
"expected an completed routine to no longer be running") | |||
} | |||
func TestRoutineStop(t *testing.T) { | |||
var ( | |||
bufferSize = 10 | |||
routine = newRoutine("simpleRoutine", simpleHandler, bufferSize) | |||
) | |||
assert.False(t, routine.send(eventA{}), | |||
"expected sending to an unstarted routine to fail") | |||
go routine.start() | |||
<-routine.ready() | |||
assert.True(t, routine.send(eventA{}), | |||
"expected sending to a running routine to succeed") | |||
routine.stop() | |||
assert.False(t, routine.send(eventA{}), | |||
"expected sending to a stopped routine to fail") | |||
} | |||
type finalCount struct { | |||
count int | |||
} | |||
func (f finalCount) Error() string { | |||
return "end" | |||
} | |||
func genStatefulHandler(maxCount int) handleFunc { | |||
counter := 0 | |||
return func(event Event) (Event, error) { | |||
if _, ok := event.(eventA); ok { | |||
counter += 1 | |||
if counter >= maxCount { | |||
return noOp, finalCount{counter} | |||
} | |||
return eventA{}, nil | |||
} | |||
return noOp, nil | |||
} | |||
} | |||
func feedback(r *Routine) { | |||
for event := range r.next() { | |||
r.send(event) | |||
} | |||
} | |||
func TestStatefulRoutine(t *testing.T) { | |||
var ( | |||
count = 10 | |||
handler = genStatefulHandler(count) | |||
bufferSize = 20 | |||
routine = newRoutine("statefulRoutine", handler, bufferSize) | |||
) | |||
go routine.start() | |||
go feedback(routine) | |||
<-routine.ready() | |||
assert.True(t, routine.send(eventA{}), | |||
"expected sending to a started routine to succeed") | |||
final := <-routine.final() | |||
if fnl, ok := final.(finalCount); ok { | |||
assert.Equal(t, count, fnl.count, | |||
"expected the routine to count to 10") | |||
} else { | |||
t.Fail() | |||
} | |||
} | |||
type lowPriorityEvent struct { | |||
priorityLow | |||
} | |||
type highPriorityEvent struct { | |||
priorityHigh | |||
} | |||
func handleWithPriority(event Event) (Event, error) { | |||
switch event.(type) { | |||
case lowPriorityEvent: | |||
return noOp, nil | |||
case highPriorityEvent: | |||
return noOp, done | |||
} | |||
return noOp, nil | |||
} | |||
func TestPriority(t *testing.T) { | |||
var ( | |||
bufferSize = 20 | |||
routine = newRoutine("priorityRoutine", handleWithPriority, bufferSize) | |||
) | |||
go routine.start() | |||
<-routine.ready() | |||
go func() { | |||
for { | |||
routine.send(lowPriorityEvent{}) | |||
time.Sleep(1 * time.Millisecond) | |||
} | |||
}() | |||
time.Sleep(10 * time.Millisecond) | |||
assert.True(t, routine.isRunning(), | |||
"expected an started routine") | |||
assert.True(t, routine.send(highPriorityEvent{}), | |||
"expected send to succeed even when saturated") | |||
assert.Equal(t, done, <-routine.final()) | |||
assert.False(t, routine.isRunning(), | |||
"expected an started routine") | |||
} |
@ -0,0 +1,64 @@ | |||
package v2 | |||
import ( | |||
"github.com/Workiva/go-datastructures/queue" | |||
) | |||
type Event queue.Item | |||
type priority interface { | |||
Compare(other queue.Item) int | |||
Priority() int | |||
} | |||
type priorityLow struct{} | |||
type priorityNormal struct{} | |||
type priorityHigh struct{} | |||
func (p priorityLow) Priority() int { | |||
return 1 | |||
} | |||
func (p priorityNormal) Priority() int { | |||
return 2 | |||
} | |||
func (p priorityHigh) Priority() int { | |||
return 3 | |||
} | |||
func (p priorityLow) Compare(other queue.Item) int { | |||
op := other.(priority) | |||
if p.Priority() > op.Priority() { | |||
return 1 | |||
} else if p.Priority() == op.Priority() { | |||
return 0 | |||
} | |||
return -1 | |||
} | |||
func (p priorityNormal) Compare(other queue.Item) int { | |||
op := other.(priority) | |||
if p.Priority() > op.Priority() { | |||
return 1 | |||
} else if p.Priority() == op.Priority() { | |||
return 0 | |||
} | |||
return -1 | |||
} | |||
func (p priorityHigh) Compare(other queue.Item) int { | |||
op := other.(priority) | |||
if p.Priority() > op.Priority() { | |||
return 1 | |||
} else if p.Priority() == op.Priority() { | |||
return 0 | |||
} | |||
return -1 | |||
} | |||
type noOpEvent struct { | |||
priorityLow | |||
} | |||
var noOp = noOpEvent{} |