Browse Source

blocksync: remove intermediate channel (#8140)

Based on local testing, I'm now convinced that this is ok, and also I think the fact that the new p2p layer has more caching and queue.
pull/8148/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
13f7501950
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 36 additions and 54 deletions
  1. +36
    -54
      internal/blocksync/reactor.go

+ 36
- 54
internal/blocksync/reactor.go View File

@ -80,15 +80,7 @@ type Reactor struct {
blockSync *atomicBool
blockSyncCh *p2p.Channel
// blockSyncOutBridgeCh defines a channel that acts as a bridge between sending Envelope
// messages that the reactor will consume in processBlockSyncCh and receiving messages
// from the peer updates channel and other goroutines. We do this instead of directly
// sending on blockSyncCh.Out to avoid race conditions in the case where other goroutines
// send Envelopes directly to the to blockSyncCh.Out channel, since processBlockSyncCh
// may close the blockSyncCh.Out channel at the same time that other goroutines send to
// blockSyncCh.Out.
blockSyncOutBridgeCh chan p2p.Envelope
peerUpdates *p2p.PeerUpdates
peerUpdates *p2p.PeerUpdates
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
@ -119,17 +111,16 @@ func NewReactor(
}
r := &Reactor{
logger: logger,
stateStore: stateStore,
blockExec: blockExec,
store: store,
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
blockSyncCh: blockSyncCh,
blockSyncOutBridgeCh: make(chan p2p.Envelope),
peerUpdates: peerUpdates,
metrics: metrics,
eventBus: eventBus,
logger: logger,
stateStore: stateStore,
blockExec: blockExec,
store: store,
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
blockSyncCh: blockSyncCh,
peerUpdates: peerUpdates,
metrics: metrics,
eventBus: eventBus,
}
r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
@ -175,7 +166,6 @@ func (r *Reactor) OnStart(ctx context.Context) error {
}
go r.processBlockSyncCh(ctx)
go r.processBlockSyncBridge(ctx)
go r.processPeerUpdates(ctx)
return nil
@ -306,21 +296,8 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) {
}
}
func (r *Reactor) processBlockSyncBridge(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case envelope := <-r.blockSyncOutBridgeCh:
if err := r.blockSyncCh.Send(ctx, envelope); err != nil {
return
}
}
}
}
// processPeerUpdate processes a PeerUpdate.
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
// XXX: Pool#RedoRequest can sometimes give us an empty peer.
@ -331,12 +308,20 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
switch peerUpdate.Status {
case p2p.PeerStatusUp:
// send a status update the newly added peer
r.blockSyncOutBridgeCh <- p2p.Envelope{
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
To: peerUpdate.NodeID,
Message: &bcproto.StatusResponse{
Base: r.store.Base(),
Height: r.store.Height(),
},
}); err != nil {
r.pool.RemovePeer(peerUpdate.NodeID)
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerUpdate.NodeID,
Err: err,
}); err != nil {
return
}
}
case p2p.PeerStatusDown:
@ -353,7 +338,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
case <-ctx.Done():
return
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(peerUpdate)
r.processPeerUpdate(ctx, peerUpdate)
}
}
}
@ -372,7 +357,6 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
r.syncStartTime = time.Now()
go r.requestRoutine(ctx)
go r.poolRoutine(ctx, true)
return nil
@ -387,15 +371,17 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
case <-ctx.Done():
return
case request := <-r.requestsCh:
select {
case <-ctx.Done():
return
case r.blockSyncOutBridgeCh <- p2p.Envelope{
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
To: request.PeerID,
Message: &bcproto.BlockRequest{Height: request.Height},
}:
}); err != nil {
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: request.PeerID,
Err: err,
}); err != nil {
return
}
}
case pErr := <-r.errorsCh:
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: pErr.peerID,
@ -404,16 +390,12 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
return
}
case <-statusUpdateTicker.C:
go func() {
select {
case <-ctx.Done():
return
case r.blockSyncOutBridgeCh <- p2p.Envelope{
Broadcast: true,
Message: &bcproto.StatusRequest{},
}:
}
}()
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &bcproto.StatusRequest{},
}); err != nil {
return
}
}
}
}


Loading…
Cancel
Save