From 33dbff61d33dc7dc5e679d5fbc1746bcda6fd300 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 1 Dec 2020 17:08:33 +0400 Subject: [PATCH] blockchain/v1: fix deadlock (#5711) I introduced a new variable - syncEnded, which is now used to prevent sending new events to channels (which would block otherwise) if reactor is finished syncing Closes #4591 --- CHANGELOG_PENDING.md | 1 + blockchain/v1/reactor.go | 65 ++++++++++++++++++++++++----------- blockchain/v1/reactor_test.go | 8 ++--- 3 files changed, 48 insertions(+), 26 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 10b297be8..c4720e494 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -40,3 +40,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [privval] \#5638 Increase read/write timeout to 5s and calculate ping interval based on it (@JoeKash) - [blockchain/v1] [\#5701](https://github.com/tendermint/tendermint/pull/5701) Handle peers without blocks (@melekes) - [crypto] \#5707 Fix infinite recursion in string formatting of Secp256k1 keys (@erikgrinaker) +- [blockchain/v1] \#5711 Fix deadlock (@melekes) diff --git a/blockchain/v1/reactor.go b/blockchain/v1/reactor.go index 78ce71e18..ddfcb3650 100644 --- a/blockchain/v1/reactor.go +++ b/blockchain/v1/reactor.go @@ -3,6 +3,7 @@ package v1 import ( "fmt" "reflect" + "sync/atomic" "time" "github.com/tendermint/tendermint/behaviour" @@ -67,6 +68,9 @@ type BlockchainReactor struct { eventsFromFSMCh chan bcFsmMessage swReporter *behaviour.SwitchReporter + + // Atomic integer (0 - sync in progress, 1 - finished syncing) + syncEnded int32 } // NewBlockchainReactor returns new reactor instance. @@ -141,13 +145,22 @@ func (bcR *BlockchainReactor) OnStart() error { bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch) if bcR.fastSync { go bcR.poolRoutine() + } else { // if we're not fast syncing, mark it as finished + bcR.setSyncEnded() } return nil } // OnStop implements service.Service. func (bcR *BlockchainReactor) OnStop() { - _ = bcR.Stop() +} + +func (bcR *BlockchainReactor) isSyncEnded() bool { + return atomic.LoadInt32(&(bcR.syncEnded)) != 0 +} + +func (bcR *BlockchainReactor) setSyncEnded() { + atomic.StoreInt32(&(bcR.syncEnded), 1) } // SwitchToFastSync is called by the state sync reactor when switching to fast sync. @@ -239,6 +252,10 @@ func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcproto.StatusReques // RemovePeer implements Reactor by removing peer from the pool. func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { + if bcR.isSyncEnded() { + return + } + msgData := bcReactorMessage{ event: peerRemoveEv, data: bReactorEventData{ @@ -284,6 +301,10 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) } case *bcproto.BlockResponse: + if bcR.isSyncEnded() { + return + } + bi, err := types.BlockFromProto(msg.Block) if err != nil { bcR.Logger.Error("error transition block from protobuf", "err", err) @@ -302,6 +323,10 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) bcR.Logger.Info("Received", "src", src, "height", bi.Height) bcR.messagesForFSMCh <- msgForFSM case *bcproto.NoBlockResponse: + if bcR.isSyncEnded() { + return + } + msgForFSM := bcReactorMessage{ event: noBlockResponseEv, data: bReactorEventData{ @@ -311,8 +336,11 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) } bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height) bcR.messagesForFSMCh <- msgForFSM - case *bcproto.StatusResponse: + if bcR.isSyncEnded() { + return + } + // Got a peer status. Unverified. msgForFSM := bcReactorMessage{ event: statusResponseEv, @@ -323,7 +351,6 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) }, } bcR.messagesForFSMCh <- msgForFSM - default: bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg))) } @@ -331,16 +358,20 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) // processBlocksRoutine processes blocks until signlaed to stop over the stopProcessing channel func (bcR *BlockchainReactor) processBlocksRoutine(stopProcessing chan struct{}) { - processReceivedBlockTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) - doProcessBlockCh := make(chan struct{}, 1) + defer processReceivedBlockTicker.Stop() - lastHundred := time.Now() - lastRate := 0.0 + var ( + doProcessBlockCh = make(chan struct{}, 1) + lastHundred = time.Now() + lastRate = 0.0 + ) ForLoop: for { select { + case <-bcR.Quit(): + break ForLoop case <-stopProcessing: bcR.Logger.Info("finishing block execution") break ForLoop @@ -383,12 +414,14 @@ ForLoop: // poolRoutine receives and handles messages from the Receive() routine and from the FSM. func (bcR *BlockchainReactor) poolRoutine() { - bcR.fsm.Start() sendBlockRequestTicker := time.NewTicker(trySendIntervalMS * time.Millisecond) statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) + defer sendBlockRequestTicker.Stop() + // NOTE: statusUpdateTicker can continue to run + stopProcessing := make(chan struct{}, 1) go bcR.processBlocksRoutine(stopProcessing) @@ -421,12 +454,10 @@ ForLoop: case msg := <-bcR.eventsFromFSMCh: switch msg.event { - case syncFinishedEv: + case syncFinishedEv: // Sent from the FSM when it enters finished state. stopProcessing <- struct{}{} - // Sent from the FSM when it enters finished state. - break ForLoop - case peerErrorEv: - // Sent from the FSM when it detects peer error + bcR.setSyncEnded() + case peerErrorEv: // Sent from the FSM when it detects peer error bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerID) if msg.data.err == errNoPeerResponse { // Sent from the peer timeout handler routine @@ -493,7 +524,6 @@ func (bcR *BlockchainReactor) processBlock() error { return nil } -// Implements bcRNotifier // sendStatusRequest broadcasts `BlockStore` height. func (bcR *BlockchainReactor) sendStatusRequest() { msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{}) @@ -503,7 +533,6 @@ func (bcR *BlockchainReactor) sendStatusRequest() { bcR.Switch.Broadcast(BlockchainChannel, msgBytes) } -// Implements bcRNotifier // BlockRequest sends `BlockRequest` height. func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error { peer := bcR.Switch.Peers().Get(peerID) @@ -522,19 +551,14 @@ func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) erro return nil } -// Implements bcRNotifier func (bcR *BlockchainReactor) switchToConsensus() { conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) if ok { conR.SwitchToConsensus(bcR.state, bcR.blocksSynced > 0 || bcR.stateSynced) bcR.eventsFromFSMCh <- bcFsmMessage{event: syncFinishedEv} } - // else { - // Should only happen during testing. - // } } -// Implements bcRNotifier // Called by FSM and pool: // - pool calls when it detects slow peer or when peer times out // - FSM calls when: @@ -552,7 +576,6 @@ func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { bcR.eventsFromFSMCh <- msgData } -// Implements bcRNotifier func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) { if timer == nil { panic("nil timer pointer parameter") diff --git a/blockchain/v1/reactor_test.go b/blockchain/v1/reactor_test.go index c0f371905..3aaf7e913 100644 --- a/blockchain/v1/reactor_test.go +++ b/blockchain/v1/reactor_test.go @@ -242,11 +242,9 @@ func TestFastSyncNoBlockResponse(t *testing.T) { } } -// NOTE: This is too hard to test without -// an easy way to add test peer to switch -// or without significant refactoring of the module. -// Alternatively we could actually dial a TCP conn but -// that seems extreme. +// NOTE: This is too hard to test without an easy way to add test peer to +// switch or without significant refactoring of the module. Alternatively we +// could actually dial a TCP conn but that seems extreme. func TestFastSyncBadBlockStopsPeer(t *testing.T) { numNodes := 4 maxBlockHeight := int64(148)