Browse Source

rename trySend to end

pull/3878/head
Sean Braithwaite 5 years ago
parent
commit
9bd2c0389f
3 changed files with 15 additions and 16 deletions
  1. +4
    -4
      blockchain/v2/reactor.go
  2. +3
    -4
      blockchain/v2/routine.go
  3. +8
    -8
      blockchain/v2/routine_test.go

+ 4
- 4
blockchain/v2/reactor.go View File

@ -80,15 +80,15 @@ func (r *Reactor) demux() {
select { select {
case event := <-r.events: case event := <-r.events:
// XXX: check for backpressure // XXX: check for backpressure
r.scheduler.trySend(event)
r.processor.trySend(event)
r.scheduler.send(event)
r.processor.send(event)
case _ = <-r.stopDemux: case _ = <-r.stopDemux:
r.logger.Info("demuxing stopped") r.logger.Info("demuxing stopped")
return return
case event := <-r.scheduler.next(): case event := <-r.scheduler.next():
r.processor.trySend(event)
r.processor.send(event)
case event := <-r.processor.next(): case event := <-r.processor.next():
r.scheduler.trySend(event)
r.scheduler.send(event)
case err := <-r.scheduler.final(): case err := <-r.scheduler.final():
r.logger.Info(fmt.Sprintf("scheduler final %s", err)) r.logger.Info(fmt.Sprintf("scheduler final %s", err))
case err := <-r.processor.final(): case err := <-r.processor.final():


+ 3
- 4
blockchain/v2/routine.go View File

@ -18,7 +18,7 @@ type handleFunc = func(event Event) (Event, error)
// Routines are a structure which model a finite state machine as serialized // Routines are a structure which model a finite state machine as serialized
// stream of events processed by a handle function. This Routine structure // stream of events processed by a handle function. This Routine structure
// handles the concurrency and messaging guarantees. Events are sent via // handles the concurrency and messaging guarantees. Events are sent via
// `trySend` are handled by the `handle` function to produce an iterator
// `send` are handled by the `handle` function to produce an iterator
// `next()`. Calling `close()` on a routine will conclude processing of all // `next()`. Calling `close()` on a routine will conclude processing of all
// sent events and produce `final()` event representing the terminal state. // sent events and produce `final()` event representing the terminal state.
type Routine struct { type Routine struct {
@ -92,9 +92,8 @@ func (rt *Routine) start() {
} }
} }
// XXX: rename send
// XXX: look into returning OpError in the net package // XXX: look into returning OpError in the net package
func (rt *Routine) trySend(event Event) bool {
func (rt *Routine) send(event Event) bool {
rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event)) rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event))
if !rt.isRunning() { if !rt.isRunning() {
return false return false
@ -102,7 +101,7 @@ func (rt *Routine) trySend(event Event) bool {
err := rt.queue.Put(event) err := rt.queue.Put(event)
if err != nil { if err != nil {
rt.metrics.EventsShed.With("routine", rt.name).Add(1) rt.metrics.EventsShed.With("routine", rt.name).Add(1)
rt.logger.Info(fmt.Sprintf("%s: trySend fail, queue was full/stopped \n", rt.name))
rt.logger.Info(fmt.Sprintf("%s: send failed, queue was full/stopped \n", rt.name))
return false return false
} }
rt.metrics.EventsSent.With("routine", rt.name).Add(1) rt.metrics.EventsSent.With("routine", rt.name).Add(1)


+ 8
- 8
blockchain/v2/routine_test.go View File

@ -33,7 +33,7 @@ func TestRoutineFinal(t *testing.T) {
assert.True(t, routine.isRunning(), assert.True(t, routine.isRunning(),
"expected an started routine") "expected an started routine")
assert.True(t, routine.trySend(eventA{}),
assert.True(t, routine.send(eventA{}),
"expected sending to a ready routine to succeed") "expected sending to a ready routine to succeed")
assert.Equal(t, done, <-routine.final(), assert.Equal(t, done, <-routine.final(),
@ -46,18 +46,18 @@ func TestRoutineFinal(t *testing.T) {
func TestRoutineStop(t *testing.T) { func TestRoutineStop(t *testing.T) {
routine := newRoutine("simpleRoutine", simpleHandler) routine := newRoutine("simpleRoutine", simpleHandler)
assert.False(t, routine.trySend(eventA{}),
assert.False(t, routine.send(eventA{}),
"expected sending to an unstarted routine to fail") "expected sending to an unstarted routine to fail")
go routine.start() go routine.start()
<-routine.ready() <-routine.ready()
assert.True(t, routine.trySend(eventA{}),
assert.True(t, routine.send(eventA{}),
"expected sending to a running routine to succeed") "expected sending to a running routine to succeed")
routine.stop() routine.stop()
assert.False(t, routine.trySend(eventA{}),
assert.False(t, routine.send(eventA{}),
"expected sending to a stopped routine to fail") "expected sending to a stopped routine to fail")
} }
@ -86,7 +86,7 @@ func genStatefulHandler(maxCount int) handleFunc {
func feedback(r *Routine) { func feedback(r *Routine) {
for event := range r.next() { for event := range r.next() {
r.trySend(event)
r.send(event)
} }
} }
@ -100,7 +100,7 @@ func TestStatefulRoutine(t *testing.T) {
go feedback(routine) go feedback(routine)
<-routine.ready() <-routine.ready()
assert.True(t, routine.trySend(eventA{}),
assert.True(t, routine.send(eventA{}),
"expected sending to a started routine to succeed") "expected sending to a started routine to succeed")
final := <-routine.final() final := <-routine.final()
@ -137,7 +137,7 @@ func TestPriority(t *testing.T) {
<-routine.ready() <-routine.ready()
go func() { go func() {
for { for {
routine.trySend(lowPriorityEvent{})
routine.send(lowPriorityEvent{})
time.Sleep(1 * time.Millisecond) time.Sleep(1 * time.Millisecond)
} }
}() }()
@ -145,7 +145,7 @@ func TestPriority(t *testing.T) {
assert.True(t, routine.isRunning(), assert.True(t, routine.isRunning(),
"expected an started routine") "expected an started routine")
assert.True(t, routine.trySend(highPriorityEvent{}),
assert.True(t, routine.send(highPriorityEvent{}),
"expected send to succeed even when saturated") "expected send to succeed even when saturated")
assert.Equal(t, done, <-routine.final()) assert.Equal(t, done, <-routine.final())


Loading…
Cancel
Save