diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go index 6b9df0f50..70b670b2f 100644 --- a/blockchain/v2/demuxer.go +++ b/blockchain/v2/demuxer.go @@ -3,6 +3,8 @@ package v2 import ( "fmt" "sync/atomic" + + "github.com/tendermint/tendermint/libs/log" ) type scFull struct{} @@ -14,15 +16,12 @@ type demuxer struct { processor *Routine fin chan error stopped chan struct{} + rdy chan struct{} running *uint32 + stopping *uint32 + logger log.Logger } -// TODO -// demuxer_test -// Termination process -// Logger -// Metrics -// Adhere to interface func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { return &demuxer{ input: make(chan Event, 10), @@ -30,16 +29,23 @@ func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { processor: processor, stopped: make(chan struct{}, 1), fin: make(chan error, 1), + rdy: make(chan struct{}, 1), running: new(uint32), + stopping: new(uint32), + logger: log.NewNopLogger(), } } +func (dm *demuxer) setLogger(logger log.Logger) { + dm.logger = logger +} + func (dm *demuxer) start() { starting := atomic.CompareAndSwapUint32(dm.running, uint32(0), uint32(1)) if !starting { panic("Routine has already started") } - fmt.Printf("demuxer: run\n") + dm.logger.Info("demuxer: run") for { if !dm.isRunning() { break @@ -47,7 +53,7 @@ func (dm *demuxer) start() { select { case event, ok := <-dm.input: if !ok { - fmt.Printf("demuxer: stopping\n") + dm.logger.Info("demuxer: stopping") dm.terminate(fmt.Errorf("stopped")) dm.stopped <- struct{}{} return @@ -62,7 +68,7 @@ func (dm *demuxer) start() { } case event, ok := <-dm.scheduler.next(): if !ok { - fmt.Printf("demuxer: scheduler output closed\n") + dm.logger.Info("demuxer: scheduler output closed") continue } oEvents, err := dm.handle(event) @@ -75,7 +81,7 @@ func (dm *demuxer) start() { } case event, ok := <-dm.processor.next(): if !ok { - fmt.Printf("demuxer: processor output closed\n") + dm.logger.Info("demuxer: processor output closed") continue } oEvents, err := dm.handle(event) @@ -105,15 +111,15 @@ func (dm *demuxer) handle(event Event) (Events, error) { } func (dm *demuxer) trySend(event Event) bool { - if !dm.isRunning() { - fmt.Println("dummuxer isn't running") + if !dm.isRunning() || dm.isStopping() { + dm.logger.Info("dummuxer isn't running") return false } select { case dm.input <- event: return true default: - fmt.Printf("demuxer channel was full\n") + dm.logger.Info("demuxer channel was full") return false } } @@ -122,11 +128,23 @@ func (dm *demuxer) isRunning() bool { return atomic.LoadUint32(dm.running) == 1 } +func (dm *demuxer) isStopping() bool { + return atomic.LoadUint32(dm.stopping) == 1 +} + +func (dm *demuxer) ready() chan struct{} { + return dm.rdy +} + func (dm *demuxer) stop() { if !dm.isRunning() { return } - fmt.Printf("demuxer stop\n") + stopping := atomic.CompareAndSwapUint32(dm.stopping, uint32(0), uint32(1)) + if !stopping { + panic("Demuxer has already stopped") + } + dm.logger.Info("demuxer stop") close(dm.input) <-dm.stopped }