diff --git a/internal/blockchain/v0/reactor.go b/internal/blockchain/v0/reactor.go index 8dec338c3..014f39f83 100644 --- a/internal/blockchain/v0/reactor.go +++ b/internal/blockchain/v0/reactor.go @@ -85,9 +85,10 @@ type Reactor struct { consReactor consensusReactor fastSync bool - blockchainCh *p2p.Channel - peerUpdates *p2p.PeerUpdates - closeCh chan struct{} + blockchainCh *p2p.Channel + peerUpdates *p2p.PeerUpdates + peerUpdatesCh chan p2p.Envelope + closeCh chan struct{} requestsCh <-chan BlockRequest errorsCh <-chan peerError @@ -122,17 +123,18 @@ func NewReactor( errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. r := &Reactor{ - initialState: state, - blockExec: blockExec, - store: store, - pool: NewBlockPool(startHeight, requestsCh, errorsCh), - consReactor: consReactor, - fastSync: fastSync, - requestsCh: requestsCh, - errorsCh: errorsCh, - blockchainCh: blockchainCh, - peerUpdates: peerUpdates, - closeCh: make(chan struct{}), + initialState: state, + blockExec: blockExec, + store: store, + pool: NewBlockPool(startHeight, requestsCh, errorsCh), + consReactor: consReactor, + fastSync: fastSync, + requestsCh: requestsCh, + errorsCh: errorsCh, + blockchainCh: blockchainCh, + peerUpdates: peerUpdates, + peerUpdatesCh: make(chan p2p.Envelope), + closeCh: make(chan struct{}), } r.BaseService = *service.NewBaseService(logger, "Blockchain", r) @@ -277,9 +279,9 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err } // processBlockchainCh initiates a blocking process where we listen for and handle -// envelopes on the BlockchainChannel. Any error encountered during message -// execution will result in a PeerError being sent on the BlockchainChannel. When -// the reactor is stopped, we will catch the signal and close the p2p Channel +// envelopes on the BlockchainChannel and peerUpdatesCh. Any error encountered during +// message execution will result in a PeerError being sent on the BlockchainChannel. +// When the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. func (r *Reactor) processBlockchainCh() { defer r.blockchainCh.Close() @@ -295,9 +297,13 @@ func (r *Reactor) processBlockchainCh() { } } + case envelop := <-r.peerUpdatesCh: + r.blockchainCh.Out <- envelop + case <-r.closeCh: r.Logger.Debug("stopped listening on blockchain channel; closing...") return + } } } @@ -314,7 +320,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { switch peerUpdate.Status { case p2p.PeerStatusUp: // send a status update the newly added peer - r.blockchainCh.Out <- p2p.Envelope{ + r.peerUpdatesCh <- p2p.Envelope{ To: peerUpdate.NodeID, Message: &bcproto.StatusResponse{ Base: r.store.Base(),