diff --git a/blockchain/v2/metrics.go b/blockchain/v2/metrics.go new file mode 100644 index 000000000..d865e7360 --- /dev/null +++ b/blockchain/v2/metrics.go @@ -0,0 +1,124 @@ +package v2 + +import ( + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/discard" + "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this + // package. + MetricsSubsystem = "blockchain" +) + +// Metrics contains metrics exposed by this package. +type Metrics struct { + // events_in + EventsIn metrics.Counter + // events_in + EventsHandled metrics.Counter + // events_out + EventsOut metrics.Counter + // errors_in + ErrorsIn metrics.Counter + // errors_handled + ErrorsHandled metrics.Counter + // errors_out + ErrorsOut metrics.Counter + // events_shed + EventsShed metrics.Counter + // events_sent + EventsSent metrics.Counter + // errors_sent + ErrorsSent metrics.Counter + // errors_shed + ErrorsShed metrics.Counter +} + +// Can we burn in the routine name here? +func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + EventsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_in", + Help: "Events read from the channel.", + }, labels).With(labelsAndValues...), + EventsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_handled", + Help: "Events handled", + }, labels).With(labelsAndValues...), + EventsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_out", + Help: "Events output from routine.", + }, labels).With(labelsAndValues...), + ErrorsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_in", + Help: "Errors read from the channel.", + }, labels).With(labelsAndValues...), + ErrorsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_handled", + Help: "Errors handled.", + }, labels).With(labelsAndValues...), + ErrorsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_out", + Help: "Errors output from routine.", + }, labels).With(labelsAndValues...), + ErrorsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_sent", + Help: "Errors sent to routine.", + }, labels).With(labelsAndValues...), + ErrorsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_shed", + Help: "Errors dropped from sending.", + }, labels).With(labelsAndValues...), + EventsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_sent", + Help: "Events sent to routine.", + }, labels).With(labelsAndValues...), + EventsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_shed", + Help: "Events dropped from sending.", + }, labels).With(labelsAndValues...), + } +} + +// NopMetrics returns no-op Metrics. +func NopMetrics() *Metrics { + return &Metrics{ + EventsIn: discard.NewCounter(), + EventsHandled: discard.NewCounter(), + EventsOut: discard.NewCounter(), + ErrorsIn: discard.NewCounter(), + ErrorsHandled: discard.NewCounter(), + ErrorsOut: discard.NewCounter(), + EventsShed: discard.NewCounter(), + EventsSent: discard.NewCounter(), + ErrorsSent: discard.NewCounter(), + ErrorsShed: discard.NewCounter(), + } +} diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go new file mode 100644 index 000000000..f96b325b0 --- /dev/null +++ b/blockchain/v2/reactor.go @@ -0,0 +1,119 @@ +package v2 + +import ( + "fmt" + "time" + + "github.com/tendermint/tendermint/libs/log" +) + +type timeCheck struct { + priorityHigh + time time.Time +} + +func schedulerHandle(event Event) (Event, error) { + switch event.(type) { + case timeCheck: + fmt.Println("scheduler handle timeCheck") + } + return noOp, nil +} + +func processorHandle(event Event) (Event, error) { + switch event.(type) { + case timeCheck: + fmt.Println("processor handle timeCheck") + } + return noOp, nil + +} + +type Reactor struct { + events chan Event + stopDemux chan struct{} + scheduler *Routine + processor *Routine + ticker *time.Ticker + logger log.Logger +} + +func NewReactor(bufferSize int) *Reactor { + return &Reactor{ + events: make(chan Event, bufferSize), + stopDemux: make(chan struct{}), + scheduler: newRoutine("scheduler", schedulerHandle, bufferSize), + processor: newRoutine("processor", processorHandle, bufferSize), + ticker: time.NewTicker(1 * time.Second), + logger: log.NewNopLogger(), + } +} + +// nolint:unused +func (r *Reactor) setLogger(logger log.Logger) { + r.logger = logger + r.scheduler.setLogger(logger) + r.processor.setLogger(logger) +} + +func (r *Reactor) Start() { + go r.scheduler.start() + go r.processor.start() + go r.demux() + + <-r.scheduler.ready() + <-r.processor.ready() + + go func() { + for t := range r.ticker.C { + r.events <- timeCheck{time: t} + } + }() +} + +// XXX: Would it be possible here to provide some kind of type safety for the types +// of events that each routine can produce and consume? +func (r *Reactor) demux() { + for { + select { + case event := <-r.events: + // XXX: check for backpressure + r.scheduler.send(event) + r.processor.send(event) + case <-r.stopDemux: + r.logger.Info("demuxing stopped") + return + case event := <-r.scheduler.next(): + r.processor.send(event) + case event := <-r.processor.next(): + r.scheduler.send(event) + case err := <-r.scheduler.final(): + r.logger.Info(fmt.Sprintf("scheduler final %s", err)) + case err := <-r.processor.final(): + r.logger.Info(fmt.Sprintf("processor final %s", err)) + // XXX: switch to consensus + } + } +} + +func (r *Reactor) Stop() { + r.logger.Info("reactor stopping") + + r.ticker.Stop() + r.scheduler.stop() + r.processor.stop() + close(r.stopDemux) + close(r.events) + + r.logger.Info("reactor stopped") +} + +func (r *Reactor) Receive(event Event) { + // XXX: decode and serialize write events + // TODO: backpressure + r.events <- event +} + +func (r *Reactor) AddPeer() { + // TODO: add peer event and send to demuxer +} diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go new file mode 100644 index 000000000..46a2e60c6 --- /dev/null +++ b/blockchain/v2/reactor_test.go @@ -0,0 +1,22 @@ +package v2 + +import ( + "testing" +) + +func TestReactor(t *testing.T) { + var ( + bufferSize = 10 + reactor = NewReactor(bufferSize) + ) + + reactor.Start() + script := []Event{ + // TODO + } + + for _, event := range script { + reactor.Receive(event) + } + reactor.Stop() +} diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go new file mode 100644 index 000000000..cc7e7ea0f --- /dev/null +++ b/blockchain/v2/routine.go @@ -0,0 +1,133 @@ +package v2 + +import ( + "fmt" + "sync/atomic" + + "github.com/Workiva/go-datastructures/queue" + "github.com/tendermint/tendermint/libs/log" +) + +type handleFunc = func(event Event) (Event, error) + +// Routines are a structure which model a finite state machine as serialized +// stream of events processed by a handle function. This Routine structure +// handles the concurrency and messaging guarantees. Events are sent via +// `send` are handled by the `handle` function to produce an iterator +// `next()`. Calling `close()` on a routine will conclude processing of all +// sent events and produce `final()` event representing the terminal state. +type Routine struct { + name string + handle handleFunc + queue *queue.PriorityQueue + out chan Event + fin chan error + rdy chan struct{} + running *uint32 + logger log.Logger + metrics *Metrics +} + +func newRoutine(name string, handleFunc handleFunc, bufferSize int) *Routine { + return &Routine{ + name: name, + handle: handleFunc, + queue: queue.NewPriorityQueue(bufferSize, true), + out: make(chan Event, bufferSize), + rdy: make(chan struct{}, 1), + fin: make(chan error, 1), + running: new(uint32), + logger: log.NewNopLogger(), + metrics: NopMetrics(), + } +} + +func (rt *Routine) setLogger(logger log.Logger) { + rt.logger = logger +} + +// nolint:unused +func (rt *Routine) setMetrics(metrics *Metrics) { + rt.metrics = metrics +} + +func (rt *Routine) start() { + rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name)) + running := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) + if !running { + panic(fmt.Sprintf("%s is already running", rt.name)) + } + close(rt.rdy) + defer func() { + stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0)) + if !stopped { + panic(fmt.Sprintf("%s is failed to stop", rt.name)) + } + }() + + for { + events, err := rt.queue.Get(1) + if err != nil { + rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) + rt.terminate(fmt.Errorf("stopped")) + return + } + oEvent, err := rt.handle(events[0].(Event)) + rt.metrics.EventsHandled.With("routine", rt.name).Add(1) + if err != nil { + rt.terminate(err) + return + } + rt.metrics.EventsOut.With("routine", rt.name).Add(1) + rt.logger.Debug(fmt.Sprintf("%s produced %T %+v\n", rt.name, oEvent, oEvent)) + + rt.out <- oEvent + } +} + +// XXX: look into returning OpError in the net package +func (rt *Routine) send(event Event) bool { + rt.logger.Debug(fmt.Sprintf("%s: received %T %+v", rt.name, event, event)) + if !rt.isRunning() { + return false + } + err := rt.queue.Put(event) + if err != nil { + rt.metrics.EventsShed.With("routine", rt.name).Add(1) + rt.logger.Info(fmt.Sprintf("%s: send failed, queue was full/stopped \n", rt.name)) + return false + } + rt.metrics.EventsSent.With("routine", rt.name).Add(1) + return true +} + +func (rt *Routine) isRunning() bool { + return atomic.LoadUint32(rt.running) == 1 +} + +func (rt *Routine) next() chan Event { + return rt.out +} + +func (rt *Routine) ready() chan struct{} { + return rt.rdy +} + +func (rt *Routine) stop() { + if !rt.isRunning() { + return + } + + rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name)) + rt.queue.Dispose() // this should block until all queue items are free? +} + +func (rt *Routine) final() chan error { + return rt.fin +} + +// XXX: Maybe get rid of this +func (rt *Routine) terminate(reason error) { + close(rt.out) + rt.fin <- reason +} diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go new file mode 100644 index 000000000..2bd5a1a30 --- /dev/null +++ b/blockchain/v2/routine_test.go @@ -0,0 +1,164 @@ +package v2 + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type eventA struct { + priorityNormal +} + +var done = fmt.Errorf("done") + +func simpleHandler(event Event) (Event, error) { + switch event.(type) { + case eventA: + return noOp, done + } + return noOp, nil +} + +func TestRoutineFinal(t *testing.T) { + var ( + bufferSize = 10 + routine = newRoutine("simpleRoutine", simpleHandler, bufferSize) + ) + + assert.False(t, routine.isRunning(), + "expected an initialized routine to not be running") + go routine.start() + <-routine.ready() + assert.True(t, routine.isRunning(), + "expected an started routine") + + assert.True(t, routine.send(eventA{}), + "expected sending to a ready routine to succeed") + + assert.Equal(t, done, <-routine.final(), + "expected the final event to be done") + + assert.False(t, routine.isRunning(), + "expected an completed routine to no longer be running") +} + +func TestRoutineStop(t *testing.T) { + var ( + bufferSize = 10 + routine = newRoutine("simpleRoutine", simpleHandler, bufferSize) + ) + + assert.False(t, routine.send(eventA{}), + "expected sending to an unstarted routine to fail") + + go routine.start() + <-routine.ready() + + assert.True(t, routine.send(eventA{}), + "expected sending to a running routine to succeed") + + routine.stop() + + assert.False(t, routine.send(eventA{}), + "expected sending to a stopped routine to fail") +} + +type finalCount struct { + count int +} + +func (f finalCount) Error() string { + return "end" +} + +func genStatefulHandler(maxCount int) handleFunc { + counter := 0 + return func(event Event) (Event, error) { + if _, ok := event.(eventA); ok { + counter += 1 + if counter >= maxCount { + return noOp, finalCount{counter} + } + + return eventA{}, nil + } + return noOp, nil + } +} + +func feedback(r *Routine) { + for event := range r.next() { + r.send(event) + } +} + +func TestStatefulRoutine(t *testing.T) { + var ( + count = 10 + handler = genStatefulHandler(count) + bufferSize = 20 + routine = newRoutine("statefulRoutine", handler, bufferSize) + ) + + go routine.start() + go feedback(routine) + <-routine.ready() + + assert.True(t, routine.send(eventA{}), + "expected sending to a started routine to succeed") + + final := <-routine.final() + if fnl, ok := final.(finalCount); ok { + assert.Equal(t, count, fnl.count, + "expected the routine to count to 10") + } else { + t.Fail() + } +} + +type lowPriorityEvent struct { + priorityLow +} + +type highPriorityEvent struct { + priorityHigh +} + +func handleWithPriority(event Event) (Event, error) { + switch event.(type) { + case lowPriorityEvent: + return noOp, nil + case highPriorityEvent: + return noOp, done + } + return noOp, nil +} + +func TestPriority(t *testing.T) { + var ( + bufferSize = 20 + routine = newRoutine("priorityRoutine", handleWithPriority, bufferSize) + ) + + go routine.start() + <-routine.ready() + go func() { + for { + routine.send(lowPriorityEvent{}) + time.Sleep(1 * time.Millisecond) + } + }() + time.Sleep(10 * time.Millisecond) + + assert.True(t, routine.isRunning(), + "expected an started routine") + assert.True(t, routine.send(highPriorityEvent{}), + "expected send to succeed even when saturated") + + assert.Equal(t, done, <-routine.final()) + assert.False(t, routine.isRunning(), + "expected an started routine") +} diff --git a/blockchain/v2/schedule.go b/blockchain/v2/schedule.go index 329557492..db3819b81 100644 --- a/blockchain/v2/schedule.go +++ b/blockchain/v2/schedule.go @@ -10,8 +10,6 @@ import ( "github.com/tendermint/tendermint/p2p" ) -type Event interface{} - type blockState int const ( diff --git a/blockchain/v2/types.go b/blockchain/v2/types.go new file mode 100644 index 000000000..836e87fd8 --- /dev/null +++ b/blockchain/v2/types.go @@ -0,0 +1,64 @@ +package v2 + +import ( + "github.com/Workiva/go-datastructures/queue" +) + +type Event queue.Item + +type priority interface { + Compare(other queue.Item) int + Priority() int +} + +type priorityLow struct{} +type priorityNormal struct{} +type priorityHigh struct{} + +func (p priorityLow) Priority() int { + return 1 +} + +func (p priorityNormal) Priority() int { + return 2 +} + +func (p priorityHigh) Priority() int { + return 3 +} + +func (p priorityLow) Compare(other queue.Item) int { + op := other.(priority) + if p.Priority() > op.Priority() { + return 1 + } else if p.Priority() == op.Priority() { + return 0 + } + return -1 +} + +func (p priorityNormal) Compare(other queue.Item) int { + op := other.(priority) + if p.Priority() > op.Priority() { + return 1 + } else if p.Priority() == op.Priority() { + return 0 + } + return -1 +} + +func (p priorityHigh) Compare(other queue.Item) int { + op := other.(priority) + if p.Priority() > op.Priority() { + return 1 + } else if p.Priority() == op.Priority() { + return 0 + } + return -1 +} + +type noOpEvent struct { + priorityLow +} + +var noOp = noOpEvent{} diff --git a/go.mod b/go.mod index 12393ae5d..01a122fe7 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.12 require ( github.com/VividCortex/gohistogram v1.0.0 // indirect + github.com/Workiva/go-datastructures v1.0.50 github.com/btcsuite/btcd v0.0.0-20190115013929-ed77733ec07d github.com/btcsuite/btcutil v0.0.0-20180706230648-ab6388e0c60a github.com/fortytw2/leaktest v1.3.0 diff --git a/go.sum b/go.sum index 766d1a64e..79370588d 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= +github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo= +github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=