diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index ea9b5c487..62915423d 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -35,4 +35,6 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES: +- [blockchain/v2] [\#4761](https://github.com/tendermint/tendermint/pull/4761) Fix excessive CPU usage caused by spinning on closed channels (@erikgrinaker) +- [blockchain/v2] Respect `fast_sync` option (@erikgrinaker) - [light] [\#4741](https://github.com/tendermint/tendermint/pull/4741) Correctly return `ErrSignedHeaderNotFound` and `ErrValidatorSetNotFound` on corresponding RPC errors (@erikgrinaker) diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index ff89ee94c..ccf636f4a 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -129,8 +129,8 @@ type blockStore interface { type BlockchainReactor struct { p2p.BaseReactor + fastSync bool // if true, enable fast sync on start events chan Event // XXX: Rename eventsFromPeers - stopDemux chan struct{} scheduler *Routine processor *Routine logger log.Logger @@ -157,7 +157,7 @@ type blockApplier interface { // XXX: unify naming in this package around tmState // XXX: V1 stores a copy of state as initialState, which is never mutated. Is that nessesary? func newReactor(state state.State, store blockStore, reporter behaviour.Reporter, - blockApplier blockApplier, bufferSize int) *BlockchainReactor { + blockApplier blockApplier, bufferSize int, fastSync bool) *BlockchainReactor { scheduler := newScheduler(state.LastBlockHeight, time.Now()) pContext := newProcessorContext(store, blockApplier, state) // TODO: Fix naming to just newProcesssor @@ -166,12 +166,12 @@ func newReactor(state state.State, store blockStore, reporter behaviour.Reporter return &BlockchainReactor{ events: make(chan Event, bufferSize), - stopDemux: make(chan struct{}), scheduler: newRoutine("scheduler", scheduler.handle, bufferSize), processor: newRoutine("processor", processor.handle, bufferSize), store: store, reporter: reporter, logger: log.NewNopLogger(), + fastSync: fastSync, } } @@ -182,7 +182,7 @@ func NewBlockchainReactor( store blockStore, fastSync bool) *BlockchainReactor { reporter := behaviour.NewMockReporter() - return newReactor(state, store, reporter, blockApplier, 1000) + return newReactor(state, store, reporter, blockApplier, 1000, fastSync) } // SetSwitch implements Reactor interface. @@ -226,9 +226,11 @@ func (r *BlockchainReactor) SetLogger(logger log.Logger) { // Start implements cmn.Service interface func (r *BlockchainReactor) Start() error { r.reporter = behaviour.NewSwitchReporter(r.BaseReactor.Switch) - go r.scheduler.start() - go r.processor.start() - go r.demux() + if r.fastSync { + go r.scheduler.start() + go r.processor.start() + go r.demux() + } return nil } @@ -306,19 +308,29 @@ func (r *BlockchainReactor) demux() { processBlockFreq = 20 * time.Millisecond doProcessBlockCh = make(chan struct{}, 1) doProcessBlockTk = time.NewTicker(processBlockFreq) + ) + defer doProcessBlockTk.Stop() + var ( prunePeerFreq = 1 * time.Second doPrunePeerCh = make(chan struct{}, 1) doPrunePeerTk = time.NewTicker(prunePeerFreq) + ) + defer doPrunePeerTk.Stop() + var ( scheduleFreq = 20 * time.Millisecond doScheduleCh = make(chan struct{}, 1) doScheduleTk = time.NewTicker(scheduleFreq) + ) + defer doScheduleTk.Stop() + var ( statusFreq = 10 * time.Second doStatusCh = make(chan struct{}, 1) doStatusTk = time.NewTicker(statusFreq) ) + defer doStatusTk.Stop() // XXX: Extract timers to make testing atemporal for { @@ -355,14 +367,20 @@ func (r *BlockchainReactor) demux() { case <-doStatusCh: r.io.broadcastStatusRequest(r.store.Base(), r.SyncHeight()) - // Events from peers - case event := <-r.events: + // Events from peers. Closing the channel signals event loop termination. + case event, ok := <-r.events: + if !ok { + r.logger.Info("Stopping event processing") + return + } switch event := event.(type) { case bcStatusResponse: r.setMaxPeerHeight(event.height) r.scheduler.send(event) case bcAddNewPeer, bcRemovePeer, bcBlockResponse, bcNoBlockResponse: r.scheduler.send(event) + default: + r.logger.Error("Received unknown event", "event", fmt.Sprintf("%T", event)) } // Incremental events form scheduler @@ -378,6 +396,9 @@ func (r *BlockchainReactor) demux() { case scFinishedEv: r.processor.send(event) r.scheduler.stop() + case noOpEvent: + default: + r.logger.Error("Received unknown scheduler event", "event", fmt.Sprintf("%T", event)) } // Incremental events from processor @@ -397,20 +418,28 @@ func (r *BlockchainReactor) demux() { case pcFinished: r.io.trySwitchToConsensus(event.tmState, event.blocksSynced) r.processor.stop() + case noOpEvent: + default: + r.logger.Error("Received unknown processor event", "event", fmt.Sprintf("%T", event)) } - // Terminal events from scheduler + // Terminal event from scheduler case err := <-r.scheduler.final(): - r.logger.Info(fmt.Sprintf("scheduler final %s", err)) - // send the processor stop? + switch err { + case nil: + r.logger.Info("Scheduler stopped") + default: + r.logger.Error("Scheduler aborted with error", "err", err) + } // Terminal event from processor - case event := <-r.processor.final(): - r.logger.Info(fmt.Sprintf("processor final %s", event)) - - case <-r.stopDemux: - r.logger.Info("demuxing stopped") - return + case err := <-r.processor.final(): + switch err { + case nil: + r.logger.Info("Processor stopped") + default: + r.logger.Error("Processor aborted with error", "err", err) + } } } } @@ -421,7 +450,6 @@ func (r *BlockchainReactor) Stop() error { r.scheduler.stop() r.processor.stop() - close(r.stopDemux) close(r.events) r.logger.Info("reactor stopped") diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index 10b1d23df..a42e96b23 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -156,7 +156,7 @@ func newTestReactor(p testReactorParams) *BlockchainReactor { sm.SaveState(db, state) } - r := newReactor(state, store, reporter, appl, p.bufferSize) + r := newReactor(state, store, reporter, appl, p.bufferSize, true) logger := log.TestingLogger() r.SetLogger(logger.With("module", "blockchain")) diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index ff12bfebc..ad32e3e82 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -69,9 +69,11 @@ func (rt *Routine) start() { for { events, err := rt.queue.Get(1) - if err != nil { - rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) - rt.terminate(fmt.Errorf("stopped")) + if err == queue.ErrDisposed { + rt.terminate(nil) + return + } else if err != nil { + rt.terminate(err) return } oEvent, err := rt.handle(events[0].(Event)) @@ -131,6 +133,7 @@ func (rt *Routine) final() chan error { // XXX: Maybe get rid of this func (rt *Routine) terminate(reason error) { - close(rt.out) + // We don't close the rt.out channel here, to avoid spinning on the closed channel + // in the event loop. rt.fin <- reason }