From 37bc1d74dfaf061e4b9247e9dedf1dae8bdd6b9a Mon Sep 17 00:00:00 2001 From: Cuong Manh Le Date: Thu, 8 Jul 2021 16:42:54 +0700 Subject: [PATCH] internal/blockchain/v0: prevent all possible race for blockchainCh.Out (#6637) This commit extends the fix in #6518, so all other goroutine which run concurrently with processBlockchainCh can safely send data to blockchain out channel via a bridge channel. This helps eliminating all possible data race with sending and closing blockchainCh.Out channel at the same time. Fixes #6516 --- internal/blockchain/v0/reactor.go | 57 +++++++++++++++++-------------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/internal/blockchain/v0/reactor.go b/internal/blockchain/v0/reactor.go index 6d0c76968..52a17b693 100644 --- a/internal/blockchain/v0/reactor.go +++ b/internal/blockchain/v0/reactor.go @@ -74,7 +74,7 @@ func (e peerError) Error() string { return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error()) } -// BlockchainReactor handles long-term catchup syncing. +// Reactor handles long-term catchup syncing. type Reactor struct { service.BaseService @@ -87,10 +87,17 @@ type Reactor struct { consReactor consensusReactor fastSync *tmSync.AtomicBool - blockchainCh *p2p.Channel - peerUpdates *p2p.PeerUpdates - peerUpdatesCh chan p2p.Envelope - closeCh chan struct{} + blockchainCh *p2p.Channel + // blockchainOutBridgeCh defines a channel that acts as a bridge between sending Envelope + // messages that the reactor will consume in processBlockchainCh and receiving messages + // from the peer updates channel and other goroutines. We do this instead of directly + // sending on blockchainCh.Out to avoid race conditions in the case where other goroutines + // send Envelopes directly to the to blockchainCh.Out channel, since processBlockchainCh + // may close the blockchainCh.Out channel at the same time that other goroutines send to + // blockchainCh.Out. + blockchainOutBridgeCh chan p2p.Envelope + peerUpdates *p2p.PeerUpdates + closeCh chan struct{} requestsCh <-chan BlockRequest errorsCh <-chan peerError @@ -130,20 +137,20 @@ func NewReactor( errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. r := &Reactor{ - initialState: state, - blockExec: blockExec, - store: store, - pool: NewBlockPool(startHeight, requestsCh, errorsCh), - consReactor: consReactor, - fastSync: tmSync.NewBool(fastSync), - requestsCh: requestsCh, - errorsCh: errorsCh, - blockchainCh: blockchainCh, - peerUpdates: peerUpdates, - peerUpdatesCh: make(chan p2p.Envelope), - closeCh: make(chan struct{}), - metrics: metrics, - syncStartTime: time.Time{}, + initialState: state, + blockExec: blockExec, + store: store, + pool: NewBlockPool(startHeight, requestsCh, errorsCh), + consReactor: consReactor, + fastSync: tmSync.NewBool(fastSync), + requestsCh: requestsCh, + errorsCh: errorsCh, + blockchainCh: blockchainCh, + blockchainOutBridgeCh: make(chan p2p.Envelope), + peerUpdates: peerUpdates, + closeCh: make(chan struct{}), + metrics: metrics, + syncStartTime: time.Time{}, } r.BaseService = *service.NewBaseService(logger, "Blockchain", r) @@ -292,7 +299,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err } // processBlockchainCh initiates a blocking process where we listen for and handle -// envelopes on the BlockchainChannel and peerUpdatesCh. Any error encountered during +// envelopes on the BlockchainChannel and blockchainOutBridgeCh. Any error encountered during // message execution will result in a PeerError being sent on the BlockchainChannel. // When the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. @@ -310,8 +317,8 @@ func (r *Reactor) processBlockchainCh() { } } - case envelop := <-r.peerUpdatesCh: - r.blockchainCh.Out <- envelop + case envelope := <-r.blockchainOutBridgeCh: + r.blockchainCh.Out <- envelope case <-r.closeCh: r.Logger.Debug("stopped listening on blockchain channel; closing...") @@ -333,7 +340,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { switch peerUpdate.Status { case p2p.PeerStatusUp: // send a status update the newly added peer - r.peerUpdatesCh <- p2p.Envelope{ + r.blockchainOutBridgeCh <- p2p.Envelope{ To: peerUpdate.NodeID, Message: &bcproto.StatusResponse{ Base: r.store.Base(), @@ -399,7 +406,7 @@ func (r *Reactor) requestRoutine() { return case request := <-r.requestsCh: - r.blockchainCh.Out <- p2p.Envelope{ + r.blockchainOutBridgeCh <- p2p.Envelope{ To: request.PeerID, Message: &bcproto.BlockRequest{Height: request.Height}, } @@ -416,7 +423,7 @@ func (r *Reactor) requestRoutine() { go func() { defer r.poolWG.Done() - r.blockchainCh.Out <- p2p.Envelope{ + r.blockchainOutBridgeCh <- p2p.Envelope{ Broadcast: true, Message: &bcproto.StatusRequest{}, }