Browse Source

Switch to a priority queue:

* Routines will now use a priority queue instead of channels to
    iterate over events
pull/3878/head
Sean Braithwaite 5 years ago
parent
commit
5474528db1
8 changed files with 196 additions and 185 deletions
  1. +16
    -18
      blockchain/v2/demuxer.go
  2. +6
    -5
      blockchain/v2/reactor.go
  3. +2
    -2
      blockchain/v2/reactor_test.go
  4. +60
    -121
      blockchain/v2/routine.go
  5. +47
    -37
      blockchain/v2/routine_test.go
  6. +62
    -2
      blockchain/v2/types.go
  7. +1
    -0
      go.mod
  8. +2
    -0
      go.sum

+ 16
- 18
blockchain/v2/demuxer.go View File

@ -8,8 +8,12 @@ import (
"github.com/tendermint/tendermint/libs/log"
)
type scFull struct{}
type pcFull struct{}
type scFull struct {
priorityHigh
}
type pcFull struct {
priorityHigh
}
const demuxerBufferSize = 10
@ -62,56 +66,50 @@ func (dm *demuxer) start() {
dm.stopped <- struct{}{}
return
}
oEvents, err := dm.handle(event)
oEvent, err := dm.handle(event)
if err != nil {
dm.terminate(err)
return
}
for _, event := range oEvents {
dm.input <- event
}
dm.input <- oEvent
case event, ok := <-dm.scheduler.next():
if !ok {
dm.logger.Info("demuxer: scheduler output closed")
continue
}
oEvents, err := dm.handle(event)
oEvent, err := dm.handle(event)
if err != nil {
dm.terminate(err)
return
}
for _, event := range oEvents {
dm.input <- event
}
dm.input <- oEvent
case event, ok := <-dm.processor.next():
if !ok {
dm.logger.Info("demuxer: processor output closed")
continue
}
oEvents, err := dm.handle(event)
oEvent, err := dm.handle(event)
if err != nil {
dm.terminate(err)
return
}
for _, event := range oEvents {
dm.input <- event
}
dm.input <- oEvent
}
}
}
func (dm *demuxer) handle(event Event) (Events, error) {
func (dm *demuxer) handle(event Event) (Event, error) {
received := dm.scheduler.trySend(event)
if !received {
return Events{scFull{}}, nil // backpressure
return scFull{}, nil // backpressure
}
received = dm.processor.trySend(event)
if !received {
return Events{pcFull{}}, nil // backpressure
return pcFull{}, nil // backpressure
}
return Events{}, nil
return noOp, nil
}
func (dm *demuxer) trySend(event Event) bool {


+ 6
- 5
blockchain/v2/reactor.go View File

@ -8,27 +8,28 @@ import (
)
type timeCheck struct {
priorityHigh
time time.Time
}
func schedulerHandle(event Event) (Events, error) {
func schedulerHandle(event Event) (Event, error) {
switch event.(type) {
case timeCheck:
fmt.Println("scheduler handle timeCheck")
case Event:
fmt.Println("scheduler handle testEvent")
}
return Events{}, nil
return noOp, nil
}
func processorHandle(event Event) (Events, error) {
func processorHandle(event Event) (Event, error) {
switch event.(type) {
case timeCheck:
fmt.Println("processor handle timeCheck")
case Event:
fmt.Println("processor handle event")
}
return Events{}, nil
return noOp, nil
}
type Reactor struct {
@ -61,7 +62,7 @@ func (r *Reactor) Start() {
go func() {
for t := range r.ticker.C {
r.demuxer.trySend(timeCheck{t})
r.demuxer.trySend(timeCheck{time: t})
}
}()
}


+ 2
- 2
blockchain/v2/reactor_test.go View File

@ -6,8 +6,8 @@ import "testing"
func TestReactor(t *testing.T) {
reactor := Reactor{}
reactor.Start()
script := Events{
struct{}{},
script := []Event{
// TODO
}
for _, event := range script {


+ 60
- 121
blockchain/v2/routine.go View File

@ -4,6 +4,7 @@ import (
"fmt"
"sync/atomic"
"github.com/Workiva/go-datastructures/queue"
"github.com/tendermint/tendermint/libs/log"
)
@ -12,7 +13,7 @@ import (
// * audit log levels
// * Convert routine to an interface with concrete implmentation
type handleFunc = func(event Event) (Events, error)
type handleFunc = func(event Event) (Event, error)
// Routines are a structure which model a finite state machine as serialized
// stream of events processed by a handle function. This Routine structure
@ -21,34 +22,30 @@ type handleFunc = func(event Event) (Events, error)
// `next()`. Calling `close()` on a routine will conclude processing of all
// sent events and produce `last()` event representing the terminal state.
type Routine struct {
name string
input chan Event
errors chan error
out chan Event
stopped chan struct{}
rdy chan struct{}
fin chan error
running *uint32
handle handleFunc
logger log.Logger
metrics *Metrics
stopping *uint32
name string
queue *queue.PriorityQueue
out chan Event // XXX: actually item
fin chan error
rdy chan struct{}
running *uint32
handle handleFunc
logger log.Logger
metrics *Metrics
}
var queueSize int = 10
func newRoutine(name string, handleFunc handleFunc) *Routine {
return &Routine{
name: name,
input: make(chan Event, 1),
handle: handleFunc,
errors: make(chan error, 1),
out: make(chan Event, 1),
stopped: make(chan struct{}, 1),
rdy: make(chan struct{}, 1),
fin: make(chan error, 1),
running: new(uint32),
stopping: new(uint32),
logger: log.NewNopLogger(),
metrics: NopMetrics(),
name: name,
queue: queue.NewPriorityQueue(queueSize, true),
handle: handleFunc,
out: make(chan Event, queueSize),
rdy: make(chan struct{}, 1),
fin: make(chan error, 1),
running: new(uint32),
logger: log.NewNopLogger(),
metrics: NopMetrics(),
}
}
@ -63,138 +60,80 @@ func (rt *Routine) setMetrics(metrics *Metrics) {
func (rt *Routine) start() {
rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name))
starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1))
if !starting {
panic("Routine has already started")
running := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1))
if !running {
panic(fmt.Sprintf("%s is already running", rt.name))
}
close(rt.rdy)
errorsDrained := false
defer func() {
stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0))
if !stopped {
panic(fmt.Sprintf("%s is failed to stop", rt.name))
}
}()
for {
if !rt.isRunning() {
rt.logger.Info(fmt.Sprintf("%s: breaking because not running\n", rt.name))
break
events, err := rt.queue.Get(1)
if err != nil {
rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name))
rt.terminate(fmt.Errorf("stopped"))
return
}
select {
case iEvent, ok := <-rt.input:
rt.metrics.EventsIn.With("routine", rt.name).Add(1)
if !ok {
if !errorsDrained {
rt.logger.Info(fmt.Sprintf("%s: waiting for errors to drain\n", rt.name))
continue // wait for errors to be drainned
}
rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name))
close(rt.stopped)
rt.terminate(fmt.Errorf("stopped"))
return
}
oEvents, err := rt.handle(iEvent)
rt.metrics.EventsHandled.With("routine", rt.name).Add(1)
if err != nil {
rt.terminate(err)
return
}
rt.metrics.EventsOut.With("routine", rt.name).Add(float64(len(oEvents)))
rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents)))
for _, event := range oEvents {
rt.logger.Info(fmt.Sprintln("writing back to output"))
rt.out <- event
}
case iEvent, ok := <-rt.errors:
rt.metrics.ErrorsIn.With("routine", rt.name).Add(1)
if !ok {
rt.logger.Info(fmt.Sprintf("%s: errors closed\n", rt.name))
errorsDrained = true
continue
}
oEvents, err := rt.handle(iEvent)
rt.metrics.ErrorsHandled.With("routine", rt.name).Add(1)
if err != nil {
rt.terminate(err)
return
}
rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents)))
for _, event := range oEvents {
rt.out <- event
}
oEvent, err := rt.handle(events[0])
rt.metrics.EventsHandled.With("routine", rt.name).Add(1)
if err != nil {
rt.terminate(err)
return
}
}
}
func (rt *Routine) feedback() {
for event := range rt.out {
rt.trySend(event)
rt.metrics.EventsOut.With("routine", rt.name).Add(1)
rt.logger.Debug(fmt.Sprintf("%s produced %+v event\n", rt.name, oEvent))
rt.out <- oEvent
}
}
func (rt *Routine) trySend(event Event) bool {
if !rt.isRunning() || rt.isStopping() {
rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event))
if !rt.isRunning() {
return false
}
rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event))
if err, ok := event.(error); ok {
select {
case rt.errors <- err:
rt.metrics.ErrorsSent.With("routine", rt.name).Add(1)
return true
default:
rt.metrics.ErrorsShed.With("routine", rt.name).Add(1)
rt.logger.Info(fmt.Sprintf("%s: errors channel was full\n", rt.name))
return false
}
} else {
select {
case rt.input <- event:
rt.metrics.EventsSent.With("routine", rt.name).Add(1)
return true
default:
rt.metrics.EventsShed.With("routine", rt.name).Add(1)
rt.logger.Info(fmt.Sprintf("%s: channel was full\n", rt.name))
return false
}
err := rt.queue.Put(event)
if err != nil {
rt.metrics.EventsShed.With("routine", rt.name).Add(1)
rt.logger.Info(fmt.Sprintf("%s: trySend fail, queue was full/stopped \n", rt.name))
return false
}
rt.metrics.EventsSent.With("routine", rt.name).Add(1)
return true
}
func (rt *Routine) isRunning() bool {
return atomic.LoadUint32(rt.running) == 1
}
func (rt *Routine) isStopping() bool {
return atomic.LoadUint32(rt.stopping) == 1
func (rt *Routine) next() chan Event {
return rt.out
}
func (rt *Routine) ready() chan struct{} {
return rt.rdy
}
func (rt *Routine) next() chan Event {
return rt.out
}
func (rt *Routine) stop() {
if !rt.isRunning() {
return
}
rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name))
stopping := atomic.CompareAndSwapUint32(rt.stopping, uint32(0), uint32(1))
if !stopping {
panic("Routine has already stopped")
}
close(rt.input)
close(rt.errors)
<-rt.stopped
rt.queue.Dispose() // this should block until all queue items are free?
}
func (rt *Routine) final() chan error {
return rt.fin
}
// XXX: Maybe get rid of this
func (rt *Routine) terminate(reason error) {
stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0))
if !stopped {
panic("called stop but already stopped")
}
close(rt.out)
rt.fin <- reason
}

+ 47
- 37
blockchain/v2/routine_test.go View File

@ -9,47 +9,47 @@ import (
"github.com/tendermint/tendermint/libs/log"
)
type eventA struct{}
type eventB struct{}
type errEvent struct{}
type eventA struct {
priorityNormal
}
var done = fmt.Errorf("done")
func simpleHandler(event Event) (Events, error) {
func simpleHandler(event Event) (Event, error) {
switch event.(type) {
case eventA:
return Events{eventB{}}, nil
case eventB:
return Events{}, done
return noOp, done
}
return Events{}, nil
return noOp, nil
}
func TestRoutine(t *testing.T) {
func TestRoutineFinal(t *testing.T) {
routine := newRoutine("simpleRoutine", simpleHandler)
assert.False(t, routine.isRunning(),
"expected an initialized routine to not be running")
go routine.start()
go routine.feedback()
<-routine.ready()
assert.True(t, routine.isRunning(),
"expected an started routine")
assert.True(t, routine.trySend(eventA{}),
"expected sending to a ready routine to succeed")
assert.Equal(t, done, <-routine.final(),
"expected the final event to be done")
assert.False(t, routine.isRunning(),
"expected an completed routine to no longer be running")
}
func TestRoutineSend(t *testing.T) {
func TestRoutineStop(t *testing.T) {
routine := newRoutine("simpleRoutine", simpleHandler)
assert.False(t, routine.trySend(eventA{}),
"expected sending to an unstarted routine to fail")
go routine.start()
go routine.feedback()
<-routine.ready()
assert.True(t, routine.trySend(eventA{}),
@ -71,20 +71,22 @@ func (f finalCount) Error() string {
func genStatefulHandler(maxCount int) handleFunc {
counter := 0
return func(event Event) (Events, error) {
// golint fixme
switch event.(type) {
case eventA:
return func(event Event) (Event, error) {
if _, ok := event.(eventA); ok {
counter += 1
if counter >= maxCount {
return Events{}, finalCount{counter}
return noOp, finalCount{counter}
}
return Events{eventA{}}, nil
case eventB:
return Events{}, nil
return eventA{}, nil
}
return Events{}, nil
return noOp, nil
}
}
func feedback(r *Routine) {
for event := range r.next() {
r.trySend(event)
}
}
@ -95,16 +97,14 @@ func TestStatefulRoutine(t *testing.T) {
routine.setLogger(log.TestingLogger())
go routine.start()
go routine.feedback()
go feedback(routine)
<-routine.ready()
assert.True(t, routine.trySend(eventA{}),
"expected sending to a started routine to succeed")
final := <-routine.final()
fnl, ok := final.(finalCount)
if ok {
if fnl, ok := final.(finalCount); ok {
assert.Equal(t, count, fnl.count,
"expected the routine to count to 10")
} else {
@ -112,28 +112,38 @@ func TestStatefulRoutine(t *testing.T) {
}
}
func handleWithErrors(event Event) (Events, error) {
type lowPriorityEvent struct {
priorityLow
}
type highPriorityEvent struct {
priorityHigh
}
func handleWithPriority(event Event) (Event, error) {
switch event.(type) {
case eventA:
return Events{}, nil
case errEvent:
return Events{}, done
case lowPriorityEvent:
return noOp, nil
case highPriorityEvent:
return noOp, done
}
return Events{}, nil
return noOp, nil
}
func TestErrorSaturation(t *testing.T) {
routine := newRoutine("errorRoutine", handleWithErrors)
func TestPriority(t *testing.T) {
// XXX: align with buffer size
routine := newRoutine("priorityRoutine", handleWithPriority)
go routine.start()
<-routine.ready()
go func() {
for {
routine.trySend(eventA{})
time.Sleep(10 * time.Millisecond)
routine.trySend(lowPriorityEvent{})
time.Sleep(1 * time.Millisecond)
}
}()
time.Sleep(10 * time.Millisecond)
assert.True(t, routine.trySend(errEvent{}),
assert.True(t, routine.trySend(highPriorityEvent{}),
"expected send to succeed even when saturated")
assert.Equal(t, done, <-routine.final())


+ 62
- 2
blockchain/v2/types.go View File

@ -1,4 +1,64 @@
package v2
type Event interface{}
type Events []Event
import (
"github.com/Workiva/go-datastructures/queue"
)
type Event queue.Item
type priority interface {
Compare(other queue.Item) int
Priority() int
}
type priorityLow struct{}
type priorityNormal struct{}
type priorityHigh struct{}
func (p priorityLow) Priority() int {
return 1
}
func (p priorityNormal) Priority() int {
return 2
}
func (p priorityHigh) Priority() int {
return 3
}
func (p priorityLow) Compare(other queue.Item) int {
op := other.(priority)
if p.Priority() > op.Priority() {
return 1
} else if p.Priority() == op.Priority() {
return 0
}
return -1
}
func (p priorityNormal) Compare(other queue.Item) int {
op := other.(priority)
if p.Priority() > op.Priority() {
return 1
} else if p.Priority() == op.Priority() {
return 0
}
return -1
}
func (p priorityHigh) Compare(other queue.Item) int {
op := other.(priority)
if p.Priority() > op.Priority() {
return 1
} else if p.Priority() == op.Priority() {
return 0
}
return -1
}
type noOpEvent struct {
priorityLow
}
var noOp = noOpEvent{}

+ 1
- 0
go.mod View File

@ -4,6 +4,7 @@ go 1.12
require (
github.com/VividCortex/gohistogram v1.0.0 // indirect
github.com/Workiva/go-datastructures v1.0.50
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/btcsuite/btcd v0.0.0-20190115013929-ed77733ec07d
github.com/btcsuite/btcutil v0.0.0-20180706230648-ab6388e0c60a


+ 2
- 0
go.sum View File

@ -3,6 +3,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo=
github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=


Loading…
Cancel
Save