From 4e59575dc083bc449b029f91a4f538a0d7547db0 Mon Sep 17 00:00:00 2001 From: Cuong Manh Le Date: Thu, 10 Jun 2021 13:58:37 +0700 Subject: [PATCH] blockchain/v0: fix data race in blockchain channel (#6518) There is a possible data race/panic between processBlockchainCh and processPeerUpdates, since when we send to blockchainCh.Out in one goroutine and close the channel in the other. The race is seen in some Github Action runs. This commit fix the race, by adding a peerUpdatesCh as a bridge between processPeerUpdates and processBlockchainCh, so the former will send to this channel, the later will listen and forward the message to blockchainCh.Out channel. Updates #6516 --- internal/blockchain/v0/reactor.go | 42 ++++++++++++++++++------------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/internal/blockchain/v0/reactor.go b/internal/blockchain/v0/reactor.go index 8dec338c3..014f39f83 100644 --- a/internal/blockchain/v0/reactor.go +++ b/internal/blockchain/v0/reactor.go @@ -85,9 +85,10 @@ type Reactor struct { consReactor consensusReactor fastSync bool - blockchainCh *p2p.Channel - peerUpdates *p2p.PeerUpdates - closeCh chan struct{} + blockchainCh *p2p.Channel + peerUpdates *p2p.PeerUpdates + peerUpdatesCh chan p2p.Envelope + closeCh chan struct{} requestsCh <-chan BlockRequest errorsCh <-chan peerError @@ -122,17 +123,18 @@ func NewReactor( errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. r := &Reactor{ - initialState: state, - blockExec: blockExec, - store: store, - pool: NewBlockPool(startHeight, requestsCh, errorsCh), - consReactor: consReactor, - fastSync: fastSync, - requestsCh: requestsCh, - errorsCh: errorsCh, - blockchainCh: blockchainCh, - peerUpdates: peerUpdates, - closeCh: make(chan struct{}), + initialState: state, + blockExec: blockExec, + store: store, + pool: NewBlockPool(startHeight, requestsCh, errorsCh), + consReactor: consReactor, + fastSync: fastSync, + requestsCh: requestsCh, + errorsCh: errorsCh, + blockchainCh: blockchainCh, + peerUpdates: peerUpdates, + peerUpdatesCh: make(chan p2p.Envelope), + closeCh: make(chan struct{}), } r.BaseService = *service.NewBaseService(logger, "Blockchain", r) @@ -277,9 +279,9 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err } // processBlockchainCh initiates a blocking process where we listen for and handle -// envelopes on the BlockchainChannel. Any error encountered during message -// execution will result in a PeerError being sent on the BlockchainChannel. When -// the reactor is stopped, we will catch the signal and close the p2p Channel +// envelopes on the BlockchainChannel and peerUpdatesCh. Any error encountered during +// message execution will result in a PeerError being sent on the BlockchainChannel. +// When the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. func (r *Reactor) processBlockchainCh() { defer r.blockchainCh.Close() @@ -295,9 +297,13 @@ func (r *Reactor) processBlockchainCh() { } } + case envelop := <-r.peerUpdatesCh: + r.blockchainCh.Out <- envelop + case <-r.closeCh: r.Logger.Debug("stopped listening on blockchain channel; closing...") return + } } } @@ -314,7 +320,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { switch peerUpdate.Status { case p2p.PeerStatusUp: // send a status update the newly added peer - r.blockchainCh.Out <- p2p.Envelope{ + r.peerUpdatesCh <- p2p.Envelope{ To: peerUpdate.NodeID, Message: &bcproto.StatusResponse{ Base: r.store.Base(),