From dcc19272f995438dc4aa6b4cd3cd17efde61ab29 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 29 Apr 2020 11:03:04 +0200 Subject: [PATCH] blockchain/v2: fix excessive CPU usage due to spinning on closed channels (#4761) The event loop uses a `select` on multiple channels. However, reading from a closed channel in Go always yields the channel's zero value. The processor and scheduler close their channels when done, and since these channels are always ready to receive, the event loop keeps spinning on them. This changes `routine.terminate()` to not close the channel, and also removes `stopDemux` and instead uses `events` channel closure to signal event loop termination. Fixes #4687. --- CHANGELOG_PENDING.md | 1 + blockchain/v2/reactor.go | 52 +++++++++++++++++++++++++++++----------- blockchain/v2/routine.go | 11 +++++---- 3 files changed, 46 insertions(+), 18 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index bb86caff4..41c694dd2 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -44,4 +44,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES: +- [blockchain/v2] [\#4761](https://github.com/tendermint/tendermint/pull/4761) Fix excessive CPU usage caused by spinning on closed channels (@erikgrinaker) - [light] [\#4741](https://github.com/tendermint/tendermint/pull/4741) Correctly return `ErrSignedHeaderNotFound` and `ErrValidatorSetNotFound` on corresponding RPC errors (@erikgrinaker) diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 1fb8ada1a..0bf345237 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -130,7 +130,6 @@ type BlockchainReactor struct { p2p.BaseReactor events chan Event // XXX: Rename eventsFromPeers - stopDemux chan struct{} scheduler *Routine processor *Routine logger log.Logger @@ -166,7 +165,6 @@ func newReactor(state state.State, store blockStore, reporter behaviour.Reporter return &BlockchainReactor{ events: make(chan Event, bufferSize), - stopDemux: make(chan struct{}), scheduler: newRoutine("scheduler", scheduler.handle, bufferSize), processor: newRoutine("processor", processor.handle, bufferSize), store: store, @@ -306,19 +304,29 @@ func (r *BlockchainReactor) demux() { processBlockFreq = 20 * time.Millisecond doProcessBlockCh = make(chan struct{}, 1) doProcessBlockTk = time.NewTicker(processBlockFreq) + ) + defer doProcessBlockTk.Stop() + var ( prunePeerFreq = 1 * time.Second doPrunePeerCh = make(chan struct{}, 1) doPrunePeerTk = time.NewTicker(prunePeerFreq) + ) + defer doPrunePeerTk.Stop() + var ( scheduleFreq = 20 * time.Millisecond doScheduleCh = make(chan struct{}, 1) doScheduleTk = time.NewTicker(scheduleFreq) + ) + defer doScheduleTk.Stop() + var ( statusFreq = 10 * time.Second doStatusCh = make(chan struct{}, 1) doStatusTk = time.NewTicker(statusFreq) ) + defer doStatusTk.Stop() // XXX: Extract timers to make testing atemporal for { @@ -355,14 +363,20 @@ func (r *BlockchainReactor) demux() { case <-doStatusCh: r.io.broadcastStatusRequest(r.store.Base(), r.SyncHeight()) - // Events from peers - case event := <-r.events: + // Events from peers. Closing the channel signals event loop termination. + case event, ok := <-r.events: + if !ok { + r.logger.Info("Stopping event processing") + return + } switch event := event.(type) { case bcStatusResponse: r.setMaxPeerHeight(event.height) r.scheduler.send(event) case bcAddNewPeer, bcRemovePeer, bcBlockResponse, bcNoBlockResponse: r.scheduler.send(event) + default: + r.logger.Error("Received unknown event", "event", fmt.Sprintf("%T", event)) } // Incremental events form scheduler @@ -378,6 +392,9 @@ func (r *BlockchainReactor) demux() { case scFinishedEv: r.processor.send(event) r.scheduler.stop() + case noOpEvent: + default: + r.logger.Error("Received unknown scheduler event", "event", fmt.Sprintf("%T", event)) } // Incremental events from processor @@ -397,20 +414,28 @@ func (r *BlockchainReactor) demux() { case pcFinished: r.io.trySwitchToConsensus(event.tmState, event.blocksSynced > 0) r.processor.stop() + case noOpEvent: + default: + r.logger.Error("Received unknown processor event", "event", fmt.Sprintf("%T", event)) } - // Terminal events from scheduler + // Terminal event from scheduler case err := <-r.scheduler.final(): - r.logger.Info(fmt.Sprintf("scheduler final %s", err)) - // send the processor stop? + switch err { + case nil: + r.logger.Info("Scheduler stopped") + default: + r.logger.Error("Scheduler aborted with error", "err", err) + } // Terminal event from processor - case event := <-r.processor.final(): - r.logger.Info(fmt.Sprintf("processor final %s", event)) - - case <-r.stopDemux: - r.logger.Info("demuxing stopped") - return + case err := <-r.processor.final(): + switch err { + case nil: + r.logger.Info("Processor stopped") + default: + r.logger.Error("Processor aborted with error", "err", err) + } } } } @@ -421,7 +446,6 @@ func (r *BlockchainReactor) Stop() error { r.scheduler.stop() r.processor.stop() - close(r.stopDemux) close(r.events) r.logger.Info("reactor stopped") diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index ff12bfebc..ad32e3e82 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -69,9 +69,11 @@ func (rt *Routine) start() { for { events, err := rt.queue.Get(1) - if err != nil { - rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) - rt.terminate(fmt.Errorf("stopped")) + if err == queue.ErrDisposed { + rt.terminate(nil) + return + } else if err != nil { + rt.terminate(err) return } oEvent, err := rt.handle(events[0].(Event)) @@ -131,6 +133,7 @@ func (rt *Routine) final() chan error { // XXX: Maybe get rid of this func (rt *Routine) terminate(reason error) { - close(rt.out) + // We don't close the rt.out channel here, to avoid spinning on the closed channel + // in the event loop. rt.fin <- reason }