Browse Source

blockchain/v0: fix data race in blockchain channel (#6518)

There is a possible data race/panic between processBlockchainCh and
processPeerUpdates, since when we send to blockchainCh.Out in one
goroutine and close the channel in the other. The race is seen in some
Github Action runs.

This commit fix the race, by adding a peerUpdatesCh as a bridge between
processPeerUpdates and processBlockchainCh, so the former will send to
this channel, the later will listen and forward the message to
blockchainCh.Out channel.

Updates #6516
pull/6568/head
Cuong Manh Le 4 years ago
committed by GitHub
parent
commit
4e59575dc0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 24 additions and 18 deletions
  1. +24
    -18
      internal/blockchain/v0/reactor.go

+ 24
- 18
internal/blockchain/v0/reactor.go View File

@ -85,9 +85,10 @@ type Reactor struct {
consReactor consensusReactor consReactor consensusReactor
fastSync bool fastSync bool
blockchainCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
blockchainCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
peerUpdatesCh chan p2p.Envelope
closeCh chan struct{}
requestsCh <-chan BlockRequest requestsCh <-chan BlockRequest
errorsCh <-chan peerError errorsCh <-chan peerError
@ -122,17 +123,18 @@ func NewReactor(
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
r := &Reactor{ r := &Reactor{
initialState: state,
blockExec: blockExec,
store: store,
pool: NewBlockPool(startHeight, requestsCh, errorsCh),
consReactor: consReactor,
fastSync: fastSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
blockchainCh: blockchainCh,
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
initialState: state,
blockExec: blockExec,
store: store,
pool: NewBlockPool(startHeight, requestsCh, errorsCh),
consReactor: consReactor,
fastSync: fastSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
blockchainCh: blockchainCh,
peerUpdates: peerUpdates,
peerUpdatesCh: make(chan p2p.Envelope),
closeCh: make(chan struct{}),
} }
r.BaseService = *service.NewBaseService(logger, "Blockchain", r) r.BaseService = *service.NewBaseService(logger, "Blockchain", r)
@ -277,9 +279,9 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
} }
// processBlockchainCh initiates a blocking process where we listen for and handle // processBlockchainCh initiates a blocking process where we listen for and handle
// envelopes on the BlockchainChannel. Any error encountered during message
// execution will result in a PeerError being sent on the BlockchainChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// envelopes on the BlockchainChannel and peerUpdatesCh. Any error encountered during
// message execution will result in a PeerError being sent on the BlockchainChannel.
// When the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully. // gracefully.
func (r *Reactor) processBlockchainCh() { func (r *Reactor) processBlockchainCh() {
defer r.blockchainCh.Close() defer r.blockchainCh.Close()
@ -295,9 +297,13 @@ func (r *Reactor) processBlockchainCh() {
} }
} }
case envelop := <-r.peerUpdatesCh:
r.blockchainCh.Out <- envelop
case <-r.closeCh: case <-r.closeCh:
r.Logger.Debug("stopped listening on blockchain channel; closing...") r.Logger.Debug("stopped listening on blockchain channel; closing...")
return return
} }
} }
} }
@ -314,7 +320,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
switch peerUpdate.Status { switch peerUpdate.Status {
case p2p.PeerStatusUp: case p2p.PeerStatusUp:
// send a status update the newly added peer // send a status update the newly added peer
r.blockchainCh.Out <- p2p.Envelope{
r.peerUpdatesCh <- p2p.Envelope{
To: peerUpdate.NodeID, To: peerUpdate.NodeID,
Message: &bcproto.StatusResponse{ Message: &bcproto.StatusResponse{
Base: r.store.Base(), Base: r.store.Base(),


Loading…
Cancel
Save