Browse Source

demuxer cleanup

pull/3907/head
Sean Braithwaite 5 years ago
parent
commit
e826ca3c49
1 changed files with 32 additions and 14 deletions
  1. +32
    -14
      blockchain/v2/demuxer.go

+ 32
- 14
blockchain/v2/demuxer.go View File

@ -3,6 +3,8 @@ package v2
import ( import (
"fmt" "fmt"
"sync/atomic" "sync/atomic"
"github.com/tendermint/tendermint/libs/log"
) )
type scFull struct{} type scFull struct{}
@ -14,15 +16,12 @@ type demuxer struct {
processor *Routine processor *Routine
fin chan error fin chan error
stopped chan struct{} stopped chan struct{}
rdy chan struct{}
running *uint32 running *uint32
stopping *uint32
logger log.Logger
} }
// TODO
// demuxer_test
// Termination process
// Logger
// Metrics
// Adhere to interface
func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { func newDemuxer(scheduler *Routine, processor *Routine) *demuxer {
return &demuxer{ return &demuxer{
input: make(chan Event, 10), input: make(chan Event, 10),
@ -30,16 +29,23 @@ func newDemuxer(scheduler *Routine, processor *Routine) *demuxer {
processor: processor, processor: processor,
stopped: make(chan struct{}, 1), stopped: make(chan struct{}, 1),
fin: make(chan error, 1), fin: make(chan error, 1),
rdy: make(chan struct{}, 1),
running: new(uint32), running: new(uint32),
stopping: new(uint32),
logger: log.NewNopLogger(),
} }
} }
func (dm *demuxer) setLogger(logger log.Logger) {
dm.logger = logger
}
func (dm *demuxer) start() { func (dm *demuxer) start() {
starting := atomic.CompareAndSwapUint32(dm.running, uint32(0), uint32(1)) starting := atomic.CompareAndSwapUint32(dm.running, uint32(0), uint32(1))
if !starting { if !starting {
panic("Routine has already started") panic("Routine has already started")
} }
fmt.Printf("demuxer: run\n")
dm.logger.Info("demuxer: run")
for { for {
if !dm.isRunning() { if !dm.isRunning() {
break break
@ -47,7 +53,7 @@ func (dm *demuxer) start() {
select { select {
case event, ok := <-dm.input: case event, ok := <-dm.input:
if !ok { if !ok {
fmt.Printf("demuxer: stopping\n")
dm.logger.Info("demuxer: stopping")
dm.terminate(fmt.Errorf("stopped")) dm.terminate(fmt.Errorf("stopped"))
dm.stopped <- struct{}{} dm.stopped <- struct{}{}
return return
@ -62,7 +68,7 @@ func (dm *demuxer) start() {
} }
case event, ok := <-dm.scheduler.next(): case event, ok := <-dm.scheduler.next():
if !ok { if !ok {
fmt.Printf("demuxer: scheduler output closed\n")
dm.logger.Info("demuxer: scheduler output closed")
continue continue
} }
oEvents, err := dm.handle(event) oEvents, err := dm.handle(event)
@ -75,7 +81,7 @@ func (dm *demuxer) start() {
} }
case event, ok := <-dm.processor.next(): case event, ok := <-dm.processor.next():
if !ok { if !ok {
fmt.Printf("demuxer: processor output closed\n")
dm.logger.Info("demuxer: processor output closed")
continue continue
} }
oEvents, err := dm.handle(event) oEvents, err := dm.handle(event)
@ -105,15 +111,15 @@ func (dm *demuxer) handle(event Event) (Events, error) {
} }
func (dm *demuxer) trySend(event Event) bool { func (dm *demuxer) trySend(event Event) bool {
if !dm.isRunning() {
fmt.Println("dummuxer isn't running")
if !dm.isRunning() || dm.isStopping() {
dm.logger.Info("dummuxer isn't running")
return false return false
} }
select { select {
case dm.input <- event: case dm.input <- event:
return true return true
default: default:
fmt.Printf("demuxer channel was full\n")
dm.logger.Info("demuxer channel was full")
return false return false
} }
} }
@ -122,11 +128,23 @@ func (dm *demuxer) isRunning() bool {
return atomic.LoadUint32(dm.running) == 1 return atomic.LoadUint32(dm.running) == 1
} }
func (dm *demuxer) isStopping() bool {
return atomic.LoadUint32(dm.stopping) == 1
}
func (dm *demuxer) ready() chan struct{} {
return dm.rdy
}
func (dm *demuxer) stop() { func (dm *demuxer) stop() {
if !dm.isRunning() { if !dm.isRunning() {
return return
} }
fmt.Printf("demuxer stop\n")
stopping := atomic.CompareAndSwapUint32(dm.stopping, uint32(0), uint32(1))
if !stopping {
panic("Demuxer has already stopped")
}
dm.logger.Info("demuxer stop")
close(dm.input) close(dm.input)
<-dm.stopped <-dm.stopped
} }


Loading…
Cancel
Save