|
@ -80,15 +80,7 @@ type Reactor struct { |
|
|
blockSync *atomicBool |
|
|
blockSync *atomicBool |
|
|
|
|
|
|
|
|
blockSyncCh *p2p.Channel |
|
|
blockSyncCh *p2p.Channel |
|
|
// blockSyncOutBridgeCh defines a channel that acts as a bridge between sending Envelope
|
|
|
|
|
|
// messages that the reactor will consume in processBlockSyncCh and receiving messages
|
|
|
|
|
|
// from the peer updates channel and other goroutines. We do this instead of directly
|
|
|
|
|
|
// sending on blockSyncCh.Out to avoid race conditions in the case where other goroutines
|
|
|
|
|
|
// send Envelopes directly to the to blockSyncCh.Out channel, since processBlockSyncCh
|
|
|
|
|
|
// may close the blockSyncCh.Out channel at the same time that other goroutines send to
|
|
|
|
|
|
// blockSyncCh.Out.
|
|
|
|
|
|
blockSyncOutBridgeCh chan p2p.Envelope |
|
|
|
|
|
peerUpdates *p2p.PeerUpdates |
|
|
|
|
|
|
|
|
peerUpdates *p2p.PeerUpdates |
|
|
|
|
|
|
|
|
requestsCh <-chan BlockRequest |
|
|
requestsCh <-chan BlockRequest |
|
|
errorsCh <-chan peerError |
|
|
errorsCh <-chan peerError |
|
@ -119,17 +111,16 @@ func NewReactor( |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
r := &Reactor{ |
|
|
r := &Reactor{ |
|
|
logger: logger, |
|
|
|
|
|
stateStore: stateStore, |
|
|
|
|
|
blockExec: blockExec, |
|
|
|
|
|
store: store, |
|
|
|
|
|
consReactor: consReactor, |
|
|
|
|
|
blockSync: newAtomicBool(blockSync), |
|
|
|
|
|
blockSyncCh: blockSyncCh, |
|
|
|
|
|
blockSyncOutBridgeCh: make(chan p2p.Envelope), |
|
|
|
|
|
peerUpdates: peerUpdates, |
|
|
|
|
|
metrics: metrics, |
|
|
|
|
|
eventBus: eventBus, |
|
|
|
|
|
|
|
|
logger: logger, |
|
|
|
|
|
stateStore: stateStore, |
|
|
|
|
|
blockExec: blockExec, |
|
|
|
|
|
store: store, |
|
|
|
|
|
consReactor: consReactor, |
|
|
|
|
|
blockSync: newAtomicBool(blockSync), |
|
|
|
|
|
blockSyncCh: blockSyncCh, |
|
|
|
|
|
peerUpdates: peerUpdates, |
|
|
|
|
|
metrics: metrics, |
|
|
|
|
|
eventBus: eventBus, |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
r.BaseService = *service.NewBaseService(logger, "BlockSync", r) |
|
|
r.BaseService = *service.NewBaseService(logger, "BlockSync", r) |
|
@ -175,7 +166,6 @@ func (r *Reactor) OnStart(ctx context.Context) error { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
go r.processBlockSyncCh(ctx) |
|
|
go r.processBlockSyncCh(ctx) |
|
|
go r.processBlockSyncBridge(ctx) |
|
|
|
|
|
go r.processPeerUpdates(ctx) |
|
|
go r.processPeerUpdates(ctx) |
|
|
|
|
|
|
|
|
return nil |
|
|
return nil |
|
@ -306,21 +296,8 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (r *Reactor) processBlockSyncBridge(ctx context.Context) { |
|
|
|
|
|
for { |
|
|
|
|
|
select { |
|
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
|
return |
|
|
|
|
|
case envelope := <-r.blockSyncOutBridgeCh: |
|
|
|
|
|
if err := r.blockSyncCh.Send(ctx, envelope); err != nil { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// processPeerUpdate processes a PeerUpdate.
|
|
|
// processPeerUpdate processes a PeerUpdate.
|
|
|
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { |
|
|
|
|
|
|
|
|
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) { |
|
|
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) |
|
|
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) |
|
|
|
|
|
|
|
|
// XXX: Pool#RedoRequest can sometimes give us an empty peer.
|
|
|
// XXX: Pool#RedoRequest can sometimes give us an empty peer.
|
|
@ -331,12 +308,20 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { |
|
|
switch peerUpdate.Status { |
|
|
switch peerUpdate.Status { |
|
|
case p2p.PeerStatusUp: |
|
|
case p2p.PeerStatusUp: |
|
|
// send a status update the newly added peer
|
|
|
// send a status update the newly added peer
|
|
|
r.blockSyncOutBridgeCh <- p2p.Envelope{ |
|
|
|
|
|
|
|
|
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{ |
|
|
To: peerUpdate.NodeID, |
|
|
To: peerUpdate.NodeID, |
|
|
Message: &bcproto.StatusResponse{ |
|
|
Message: &bcproto.StatusResponse{ |
|
|
Base: r.store.Base(), |
|
|
Base: r.store.Base(), |
|
|
Height: r.store.Height(), |
|
|
Height: r.store.Height(), |
|
|
}, |
|
|
}, |
|
|
|
|
|
}); err != nil { |
|
|
|
|
|
r.pool.RemovePeer(peerUpdate.NodeID) |
|
|
|
|
|
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ |
|
|
|
|
|
NodeID: peerUpdate.NodeID, |
|
|
|
|
|
Err: err, |
|
|
|
|
|
}); err != nil { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
case p2p.PeerStatusDown: |
|
|
case p2p.PeerStatusDown: |
|
@ -353,7 +338,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { |
|
|
case <-ctx.Done(): |
|
|
case <-ctx.Done(): |
|
|
return |
|
|
return |
|
|
case peerUpdate := <-r.peerUpdates.Updates(): |
|
|
case peerUpdate := <-r.peerUpdates.Updates(): |
|
|
r.processPeerUpdate(peerUpdate) |
|
|
|
|
|
|
|
|
r.processPeerUpdate(ctx, peerUpdate) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -372,7 +357,6 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error { |
|
|
r.syncStartTime = time.Now() |
|
|
r.syncStartTime = time.Now() |
|
|
|
|
|
|
|
|
go r.requestRoutine(ctx) |
|
|
go r.requestRoutine(ctx) |
|
|
|
|
|
|
|
|
go r.poolRoutine(ctx, true) |
|
|
go r.poolRoutine(ctx, true) |
|
|
|
|
|
|
|
|
return nil |
|
|
return nil |
|
@ -387,15 +371,17 @@ func (r *Reactor) requestRoutine(ctx context.Context) { |
|
|
case <-ctx.Done(): |
|
|
case <-ctx.Done(): |
|
|
return |
|
|
return |
|
|
case request := <-r.requestsCh: |
|
|
case request := <-r.requestsCh: |
|
|
select { |
|
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
|
return |
|
|
|
|
|
case r.blockSyncOutBridgeCh <- p2p.Envelope{ |
|
|
|
|
|
|
|
|
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{ |
|
|
To: request.PeerID, |
|
|
To: request.PeerID, |
|
|
Message: &bcproto.BlockRequest{Height: request.Height}, |
|
|
Message: &bcproto.BlockRequest{Height: request.Height}, |
|
|
}: |
|
|
|
|
|
|
|
|
}); err != nil { |
|
|
|
|
|
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ |
|
|
|
|
|
NodeID: request.PeerID, |
|
|
|
|
|
Err: err, |
|
|
|
|
|
}); err != nil { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
case pErr := <-r.errorsCh: |
|
|
case pErr := <-r.errorsCh: |
|
|
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ |
|
|
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ |
|
|
NodeID: pErr.peerID, |
|
|
NodeID: pErr.peerID, |
|
@ -404,16 +390,12 @@ func (r *Reactor) requestRoutine(ctx context.Context) { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
case <-statusUpdateTicker.C: |
|
|
case <-statusUpdateTicker.C: |
|
|
go func() { |
|
|
|
|
|
select { |
|
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
|
return |
|
|
|
|
|
case r.blockSyncOutBridgeCh <- p2p.Envelope{ |
|
|
|
|
|
Broadcast: true, |
|
|
|
|
|
Message: &bcproto.StatusRequest{}, |
|
|
|
|
|
}: |
|
|
|
|
|
} |
|
|
|
|
|
}() |
|
|
|
|
|
|
|
|
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{ |
|
|
|
|
|
Broadcast: true, |
|
|
|
|
|
Message: &bcproto.StatusRequest{}, |
|
|
|
|
|
}); err != nil { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|