@ -74,7 +74,7 @@ func (e peerError) Error() string {
return fmt . Sprintf ( "error with peer %v: %s" , e . peerID , e . err . Error ( ) )
}
// Blockchain Reactor handles long-term catchup syncing.
// Reactor handles long-term catchup syncing.
type Reactor struct {
service . BaseService
@ -87,10 +87,17 @@ type Reactor struct {
consReactor consensusReactor
fastSync * tmSync . AtomicBool
blockchainCh * p2p . Channel
peerUpdates * p2p . PeerUpdates
peerUpdatesCh chan p2p . Envelope
closeCh chan struct { }
blockchainCh * p2p . Channel
// blockchainOutBridgeCh defines a channel that acts as a bridge between sending Envelope
// messages that the reactor will consume in processBlockchainCh and receiving messages
// from the peer updates channel and other goroutines. We do this instead of directly
// sending on blockchainCh.Out to avoid race conditions in the case where other goroutines
// send Envelopes directly to the to blockchainCh.Out channel, since processBlockchainCh
// may close the blockchainCh.Out channel at the same time that other goroutines send to
// blockchainCh.Out.
blockchainOutBridgeCh chan p2p . Envelope
peerUpdates * p2p . PeerUpdates
closeCh chan struct { }
requestsCh <- chan BlockRequest
errorsCh <- chan peerError
@ -130,20 +137,20 @@ func NewReactor(
errorsCh := make ( chan peerError , maxPeerErrBuffer ) // NOTE: The capacity should be larger than the peer count.
r := & Reactor {
initialState : state ,
blockExec : blockExec ,
store : store ,
pool : NewBlockPool ( startHeight , requestsCh , errorsCh ) ,
consReactor : consReactor ,
fastSync : tmSync . NewBool ( fastSync ) ,
requestsCh : requestsCh ,
errorsCh : errorsCh ,
blockchainCh : blockchainCh ,
peerUpdates : peerUpdates ,
peerUpdatesCh : make ( chan p2p . Envelope ) ,
closeCh : make ( chan struct { } ) ,
metrics : metrics ,
syncStartTime : time . Time { } ,
initialState : state ,
blockExec : blockExec ,
store : store ,
pool : NewBlockPool ( startHeight , requestsCh , errorsCh ) ,
consReactor : consReactor ,
fastSync : tmSync . NewBool ( fastSync ) ,
requestsCh : requestsCh ,
errorsCh : errorsCh ,
blockchainCh : blockchainCh ,
blockchainOutBridgeCh : make ( chan p2p . Envelope ) ,
peerUpdates : peerUpdates ,
closeCh : make ( chan struct { } ) ,
metrics : metrics ,
syncStartTime : time . Time { } ,
}
r . BaseService = * service . NewBaseService ( logger , "Blockchain" , r )
@ -292,7 +299,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
}
// processBlockchainCh initiates a blocking process where we listen for and handle
// envelopes on the BlockchainChannel and peerUpdates Ch. Any error encountered during
// envelopes on the BlockchainChannel and blockchainOutBridge Ch. Any error encountered during
// message execution will result in a PeerError being sent on the BlockchainChannel.
// When the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
@ -310,8 +317,8 @@ func (r *Reactor) processBlockchainCh() {
}
}
case envelop := <- r . peerUpdates Ch:
r . blockchainCh . Out <- envelop
case envelope := <- r . blockchainOutBridge Ch:
r . blockchainCh . Out <- envelope
case <- r . closeCh :
r . Logger . Debug ( "stopped listening on blockchain channel; closing..." )
@ -333,7 +340,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
switch peerUpdate . Status {
case p2p . PeerStatusUp :
// send a status update the newly added peer
r . peerUpdates Ch <- p2p . Envelope {
r . blockchainOutBridge Ch <- p2p . Envelope {
To : peerUpdate . NodeID ,
Message : & bcproto . StatusResponse {
Base : r . store . Base ( ) ,
@ -399,7 +406,7 @@ func (r *Reactor) requestRoutine() {
return
case request := <- r . requestsCh :
r . blockchainCh . Out <- p2p . Envelope {
r . blockchainOutBridgeCh <- p2p . Envelope {
To : request . PeerID ,
Message : & bcproto . BlockRequest { Height : request . Height } ,
}
@ -416,7 +423,7 @@ func (r *Reactor) requestRoutine() {
go func ( ) {
defer r . poolWG . Done ( )
r . blockchainCh . Out <- p2p . Envelope {
r . blockchainOutBridgeCh <- p2p . Envelope {
Broadcast : true ,
Message : & bcproto . StatusRequest { } ,
}